记spark过程中Scala多线程小问题

spark中scala使用多线程问题

这次更改ThriftServer源码,加了些业务,中间遇到这样一个问题,异步提交任务的时候想做成多线程,刚开始是使用的scala的Actor,传递了SQLContext和sql,发现每次sparkSessionId在一直变化,每次提交和触发Action之后产生的sessionId都不一致,这是怎么回事,后来才发现是多线程异步的问题,传递sqlContext在线程那边执行任务的时候会重新触发一个会话,那可怎么办呢,只能用以下方式实现了


java.util.concurrent.ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Callable<Void>() {
@Override
public Void call(){
df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file3",com.hadoop.compression.lzo.LzopCodec.class);
return null;
}
});
executorService.submit(new Callable<Void>() {
@Override
public Void call(){
df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file4",com.hadoop.compression.lzo.LzopCodec.class);
return null;
}
});

executorService.shutdown();

在当前方法内部使用context变量就可以了


当然为了方便大家学习,另外常见的写法如下:

import java.util.concurrent.{Executors, ExecutorService}

object Test {
def main(args: Array[String]) {
//创建线程池
val threadPool:ExecutorService=Executors.newFixedThreadPool(5)
try {
//提交5个线程
for(i <- 1 to 5){
//threadPool.submit(new ThreadDemo("thread"+i))
threadPool.execute(new ThreadDemo("thread"+i))
}
}finally {
threadPool.shutdown()
}
}

//定义线程类,每打印一次睡眠100毫秒
class ThreadDemo(threadName:String) extends Runnable{
override def run(){
for(i <- 1 to 10){
println(threadName+"|"+i)
Thread.sleep(100)
}
}
}
}
  • 发表于 2018-11-03 09:53
  • 阅读 ( 114 )
  • 分类:大数据

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
不写代码的码农
张鹏

大数据工程师

95 篇文章

作家榜 »

  1. 张鹏 95 文章
  2. 0 文章
  3. 赵科 0 文章
  4. 王孖珺397954227 0 文章