1.使用pipline的原因
Redis 使用的是客户端-服务器(CS)模型和请求/响应协议的 TCP 服务器。
这意味着通常情况下一个请求会遵循以下步骤:
- 客户端向服务端发送一个查询请求,并监听 Socket 返回,通常是以阻塞模式,等待服务端响应。
- 服务端处理命令,并将结果返回给客户端。
- 管道(pipeline)可以一次性发送多条命令并在执行完后一次性将结果返回,pipeline 通过减少客户端与 redis 的通信次数来实现降低往返延时时间,而且 Pipeline 实现的原理是队列,而队列的原理是时先进先出,这样就保证数据的顺序性。
通俗点:
- pipeline就是把一组命令进行打包,然后一次性通过网络发送到Redis。同时将执行的结果批量的返回回来
- pipelined.sync()表示我一次性的异步发送到redis,不关注执行结果。
- pipeline.syncAndReturnAll ();将返回执行过的命令结果返回到List列表中
2.方法
2.1写入redis的方法
2.1.1参数说明
sc:SparkContext Spark上下文
spark:SparkSession 使用Dataset和DataFrame API编程Spark的入口点
1 2 3 4 5 6 7 8 | def writeRedis(sc: SparkContext,spark: SparkSession): Unit = { / / spark读取数据集 val df: DataFrame = spark.read.parquet( "file:///F://delRedisData//1//delData.snappy.parquet" ) df.show( 1 ,false) val rdd: RDD[String] = df.rdd. map (x = >x.getAs[String]( "r" )) / / 这个集合写的是 2000 多万的数据 sc.toRedisSET(rdd, "test:task:deplicate" ) } |
2.2读取本地待删除数据的方法
2.2.1参数说明
sc:SparkContext Spark上下文
spark:SparkSession 使用Dataset和DataFrame API编程Spark的入口点
1 2 3 4 5 6 | def readParquet(spark: SparkSession,path:String): RDD[String] = { val df: DataFrame = spark.read.parquet(path) val strRDD: RDD[String] = df.rdd. map (_.getAs[String]( "r" )) / / 返回String类型的RDD strRDD } |
2.3调用pipline删除的方法
2.3.1参数说明
collectionName 其中redis set集合的名称
num是要删除的数据量是多少
arr是要删除的数据存放的是set集合的key
jedis是redis的客户端
1 2 3 4 5 6 7 8 | def delPipleine(collectionName:String,num: Int ,arr:Array[String],jedis:Jedis):Unit = { try { val pipeline: Pipeline = jedis.pipelined() / / 选择数据库 默认为 0 pipeline.select( 1 ) for (i e.printStackTrace() } finally if (jedis ! = null) jedis.close() } |
3.完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 | import com.redislabs.provider.redis._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.{SparkConf, SparkContext} import redis.clients.jedis.exceptions.JedisException import redis.clients.jedis.{Jedis, Pipeline} / * * * Date 2022 / 5 / 25 17 : 57 * / object DelRedis { def main(args: Array[String]): Unit = { val conf = new SparkConf() / / 驱动进程使用的内核数,仅在集群模式下使用。 . set ( "spark.driver.cores" , "5" ) / * * * 驱动进程使用的内存数量,也就是SparkContext初始化的地方, * 其格式与JVM内存字符串具有大小单位后缀(“k”,“m”,“g”或“t”)(例如 512m , 2g )相同。 * 注意:在客户端模式下,不能直接在应用程序中通过SparkConf设置此配置,因为此时驱动程 * 序JVM已经启动。相反,请通过——driver - memory命令行选项或在默认属性文件中设置。 * / . set ( "spark.driver.memory" , "5g" ) / * * * 限制每个Spark操作(例如collect)的所有分区的序列化结果的总大小(以字节为单位)。 * 应该至少是 1M ,或者 0 表示无限制。如果总大小超过此限制,则作业将被终止。 * 过高的限制可能会导致驱动程序内存不足错误(取决于spark.driver.memory和JVM中对象的内存开销)。 * 设置适当的限制可以防止驱动程序出现内存不足的错误。 * / . set ( "spark.driver.maxResultSize" , "10g" ) / * * * 每个执行程序进程使用的内存数量, * 格式与带有大小单位后缀(“k”,“m”,“g”或“t”)的JVM内存字符串相同(例如 512m , 2g )。 * * / . set ( "spark.executor.memory" , "5g" ) / * * * 默认 1 在YARN模式下,worker上所有可用的内核在standalone和Mesos粗粒度模式下。 * / . set ( "spark.executor.cores" , "5" ) val spark: SparkSession = SparkSession.builder().appName( "DelRedis" ).master( "local[*]" ) .config( "spark.redis.host" , "192.168.100.201" ) .config( "spark.redis.port" , "6379" ) .config( "spark.redis.db" , "1" ) / / 可选的数据库编号。避免使用它,尤其是在集群模式下,redisRedis默认支持 16 个数据库,默认是选择数据库 0 ,这里设置为 1 。 .config( "spark.redis.timeout" , "2000000" ) / / 连接超时,以毫秒为单位,默认为 2000 毫秒 .config(conf) .getOrCreate() val sc: SparkContext = spark.sparkContext / / 1. 写入数据集 writeRedis(sc,spark) / / 2. 读取待删除的数据key val path = "file:///F://delRedisData//test.parquet" val rdd: RDD[String] = readParquet(spark,path) / / 3. 使用redis 中的 pipeline 方法 进行删除操作 rdd.foreachPartition( iter = >{ / / 连接redis客户端 val jedis = new Jedis( "192.168.100.201" , 6379 ) val array: Array[String] = iter .toArray val length: Int = array.length val beginTime: Long = System.currentTimeMillis() delPipleine(collectionName,length,array,jedis) val endTime: Long = System.currentTimeMillis() println( "删除:" + length + "条数据,耗时:" + (endTime - beginTime) / 1000 + "秒" ) }) sc.stop() spark.stop() } def delPipleine(collectionName:String,num: Int ,arr:Array[String],jedis:Jedis):Unit = { try { val pipeline: Pipeline = jedis.pipelined() / / 选择数据库 默认为 0 pipeline.select( 1 ) for (i e.printStackTrace() } finally if (jedis ! = null) jedis.close() } def writeRedis(sc: SparkContext,spark: SparkSession): Unit = { / / spark读取数据集 val df: DataFrame = spark.read.parquet( "file:///F://delRedisData//1//delData.snappy.parquet" ) df.show( 1 ,false) val rdd: RDD[String] = df.rdd. map (x = >x.getAs[String]( "r" )) / / 这个集合写的是 2000 多万的数据 sc.toRedisSET(rdd, "test:task:deplicate" ) } def readParquet(spark: SparkSession,path:String): RDD[String] = { val df: DataFrame = spark.read.parquet(path) val strRDD: RDD[String] = df.rdd. map (_.getAs[String]( "r" )) / / 返回String类型的RDD strRDD } } |
4.总结
经检测:redis 的 pipeline(管道)方法 ,经单机版的redis测试 ,百万级别数据删除仅需要1分钟左右与硬件有关,还包括读取数据的时长等方面原因
以上就是Spark删除redis千万级别set集合数据实现分析的详细内容,更多关于Spark删除redis set集合的资料请关注IT俱乐部其它相关文章!