spark数据倾斜解决方案七:使用随机前缀和扩容RDD进行join

sapv博客之家,spark数据倾斜解决方案七:使用随机前缀和扩容RDD进行join

     方案适用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

方案实现思路:

  • 该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。
  • 然后将该RDD的每条数据都打上一个n以内的随机前缀。
  • 同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。
  • 最后将两个处理后的RDD进行join即可。

      方案实现原理:将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方案六”的不同之处就在于,上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。

      方案优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。

      方案缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。

      方案实践经验:曾经开发一个数据需求的时候,发现一个join导致了数据倾斜。优化之前,作业的执行时间大约是60分钟左右;使用该方案优化之后,执行时间缩短到10分钟左右,性能提升了6倍。

 

[plain] view plain copy

  1. // 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。  
  2. JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(  
  3.         new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {  
  4.             private static final long serialVersionUID = 1L;  
  5.             @Override  
  6.             public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)  
  7.                     throws Exception {  
  8.                 List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();  
  9.                 for(int i = 0; i < 100; i++) {  
  10.                     list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));  
  11.                 }  
  12.                 return list;  
  13.             }  
  14.         });  
  15.   
  16. // 其次,将另一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。  
  17. JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(  
  18.         new PairFunction<Tuple2<Long,String>, String, String>() {  
  19.             private static final long serialVersionUID = 1L;  
  20.             @Override  
  21.             public Tuple2<String, String> call(Tuple2<Long, String> tuple)  
  22.                     throws Exception {  
  23.                 Random random = new Random();  
  24.                 int prefix = random.nextInt(100);  
  25.                 return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);  
  26.             }  
  27.         });  
  28.   
  29. // 将两个处理后的RDD进行join即可。  
  30. JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);  
  • 发表于 2018-05-23 22:50
  • 阅读 ( 392 )
  • 分类:大数据

0 条评论

请先 登录 后评论
不写代码的码农
张鹏

大数据工程师

95 篇文章

作家榜 »

  1. 张鹏 95 文章
  2. 0 文章
  3. 赵科 0 文章
  4. 王孖珺397954227 0 文章