时间:2021-05-20
方式1:
1. 明确 Spark中Job 与 Streaming中 Job 的区别
1.1 Spark Core
一个 RDD DAG Graph 可以生成一个或多个 Job(Action操作)
一个Job可以认为就是会最终输出一个结果RDD的一条由RDD组织而成的计算
Job在spark里应用里是一个被调度的单位
1.2 Streaming
一个 batch 的数据对应一个 DStreamGraph
而一个 DStreamGraph 包含一或多个关于 DStream 的输出操作
每一个输出对应于一个Job,一个 DStreamGraph 对应一个JobSet,里面包含一个或多个Job
2. Streaming Job的并行度
Job的并行度由两个配置决定:
spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs
一个 Batch 可能会有多个 Action 执行,比如注册了多个 Kafka 数据流,每个Action都会产生一个Job
所以一个 Batch 有可能是一批 Job,也就是 JobSet 的概念
这些 Job 由 jobExecutor 依次提交执行
而 JobExecutor 是一个默认池子大小为1的线程池,所以只能执行完一个Job再执行另外一个Job
这里说的池子,大小就是由spark.streaming.concurrentJobs 控制的
concurrentJobs 决定了向 Spark Core 提交Job的并行度
提交一个Job,必须等这个执行完了,才会提交第二个
假设我们把它设置为2,则会并发的把 Job 提交给 Spark Core
Spark 有自己的机制决定如何运行这两个Job,这个机制其实就是FIFO或者FAIR(决定了资源的分配规则)
默认是 FIFO,也就是先进先出,把 concurrentJobs 设置为2,但是如果底层是FIFO,那么会优先执行先提交的Job
虽然如此,如果资源够两个job运行,还是会并行运行两个Job
Spark Streaming 不同Batch任务可以并行计算么https://developer.aliyun.com/article/73004
conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3") //job 并行对
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))
你会发现,不同batch的job其实也可以并行运行的,这里需要有几个条件:
有延时发生了,batch无法在本batch完成
concurrentJobs > 1
如果scheduler mode 是FIFO则需要某个Job无法一直消耗掉所有资源
Mode是FAIR则尽力保证你的Job是并行运行的,毫无疑问是可以并行的。
方式2:
场景1:
程序每次处理的数据量是波动的,比如周末比工作日多很多,晚八点比凌晨四点多很多。
一个spark程序处理的时间在1-2小时波动是OK的。而spark streaming程序不可以,如果每次处理的时间是1-10分钟,就很蛋疼。
设置10分钟吧,实际上10分钟的也就那一段高峰时间,如果设置每次是1分钟,很多时候会出现程序处理不过来,排队过多的任务延迟更久,还可能出现程序崩溃的可能。
场景2:
spark虽然是串行执行job,但是是可以把job放到线程池里多线程执行的。如何在一个SparkContext中提交多个任务
DStream.foreachRDD{ rdd => //创建线程池 val executors=Executors.newFixedThreadPool(rules.length) //将规则放入线程池 for( ru <- rules){ val task= executors.submit(new Callable[String] { override def call(): String ={ //执行规则 runRule(ru,spark) } }) } //每次创建的线程池执行完所有规则后shutdown executors.shutdown() }注意点
1.最后需要executors.shutdown()。
2.可不可以将创建线程池放到foreachRDD外面?
不可以,这个关系到对于scala闭包到理解,经测试,第一次或者前几次batch是正常的,后面的batch无线程可用。
3.线程池executor崩溃了就会导致数据丢失
原则上是这样的,但是正常的代码一般不会发生executor崩溃。至少我在使用的时候没遇到过。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
1.多线程的作用简而言之,多线程是并行处理相互独立的子任务,从而大幅度提高整个任务的效率。2.Python中的多线程相关模块和方法Python中提供几个用于多线
多任务(并行和并发)在讲协程之前,先谈谈多进程、多线程、并行和并发。对于单核处理器,多进程实现多任务的原理是让操作系统给一个任务每次分配一定的CPU时间片,然后
在上一篇博客,我们学习了Parallel的用法。并行编程,本质上是多线程的编程,那么当多个线程同时处理一个任务的时候,必然会出现资源访问问题,及所谓的线程安
并发编程与多线程编程要了解并发编程,首先要懂得与并行这个概念进行区分。并行是指两个事件同时进行,并发是CPU切换速度快,看起来像是每个任务同时进行一样。多线程是
1、问题提出1)为何需要多线程?2)多线程如何实现?3)多线程机制的核心是啥?4)到底有多少种实现方式?2、问题分析1)究其为啥需要多线程的本质就是异步处理,直