spark数据倾斜解决方案四:两阶段聚合(局部聚合+全局聚合)

sapv博客之家,spark数据倾斜解决方案四:两阶段聚合(局部聚合+全局聚合)

方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

      方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

      方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。

      方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。

      方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。

[plain] view plain copy

 

  1. <span style="font-size:12px;">// 第一步,给RDD中的每个key都打上一个随机前缀。  
  2. JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(  
  3.         new PairFunction<Tuple2<Long,Long>, String, Long>() {  
  4.             private static final long serialVersionUID = 1L;  
  5.             @Override  
  6.             public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)  
  7.                     throws Exception {  
  8.                 Random random = new Random();  
  9.                 int prefix = random.nextInt(10);  
  10.                 return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);  
  11.             }  
  12.         });  
  13.   
  14. // 第二步,对打上随机前缀的key进行局部聚合。  
  15. JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(  
  16.         new Function2<Long, Long, Long>() {  
  17.             private static final long serialVersionUID = 1L;  
  18.             @Override  
  19.             public Long call(Long v1, Long v2) throws Exception {  
  20.                 return v1 + v2;  
  21.             }  
  22.         });  
  23.   
  24. // 第三步,去除RDD中每个key的随机前缀。  
  25. JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(  
  26.         new PairFunction<Tuple2<String,Long>, Long, Long>() {  
  27.             private static final long serialVersionUID = 1L;  
  28.             @Override  
  29.             public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)  
  30.                     throws Exception {  
  31.                 long originalKey = Long.valueOf(tuple._1.split("_")[1]);  
  32.                 return new Tuple2<Long, Long>(originalKey, tuple._2);  
  33.             }  
  34.         });  
  35.   
  36. // 第四步,对去除了随机前缀的RDD进行全局聚合。  
  37. JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(  
  38.         new Function2<Long, Long, Long>() {  
  39.             private static final long serialVersionUID = 1L;  
  40.             @Override  
  41.             public Long call(Long v1, Long v2) throws Exception {  
  42.                 return v1 + v2;  
  43.             }  
  44.         });</span>  
  • 发表于 2018-05-23 22:42
  • 阅读 ( 292 )
  • 分类:大数据

0 条评论

请先 登录 后评论
不写代码的码农
张鹏

大数据工程师

95 篇文章

作家榜 »

  1. 张鹏 95 文章
  2. 0 文章
  3. 赵科 0 文章
  4. 王孖珺397954227 0 文章