V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
getui
V2EX  ›  互联网

Spark Streaming 的优化之从 Receiver 模式到 Direct 模式

  •  
  •   getui · 2019-06-17 11:10:40 +08:00 · 2392 次点击
    这是一个创建于 2034 天前的主题,其中的信息可能已经有所发展或是发生改变。

    作者:个推数据研发工程师 学长

    1 业务背景

    随着大数据的快速发展,业务场景越来越复杂,离线式的批处理框架 MapReduce 已经不能满足业务,大量的场景需要实时的数据处理结果来进行分析、决策。Spark Streaming 是一种分布式的大数据实时计算框架,他提供了动态的,高吞吐量的,可容错的流式数据处理,不仅可以实现用户行为分析,还能在金融、舆情分析、网络监控等方面发挥作用。个推开发者服务——消息推送“应景推送”正是应用了 Spark Streaming 技术,基于大数据分析人群属性,同时利用 LBS 地理围栏技术,实时触发精准消息推送,实现用户的精细化运营。此外,个推在应用 Spark Streaming 做实时处理 kafka 数据时,采用 Direct 模式代替 Receiver 模式的手段,实现了资源优化和程序稳定性提升。

    本文将从 Spark Streaming 获取 kafka 数据的两种模式入手,结合个推实践,带你解读 Receiver 和 Direct 模式的原理和特点,以及从 Receiver 模式到 Direct 模式的优化对比。

    2 两种模式的原理和区别

    Receiver 模式

    1. Receiver 模式下的运行架构

    1)InputDStream: 从流数据源接收的输入数据。

    2)Receiver:负责接收数据流,并将数据写到本地。

    3)Streaming Context:代表 SparkStreaming,负责 Streaming 层面的任务调度,生成 jobs 发送到 Spark engine 处理。

    4)Spark Context: 代表 Spark Core,负责批处理层面的任务调度,真正执行 job 的 Spark engine。

    2. Receiver 从 kafka 拉取数据的过程

    该模式下:

    1)在 executor 上会有 receiver 从 kafka 接收数据并存储在 Spark executor 中,在到了 batch 时间后触发 job 去处理接收到的数据,1 个 receiver 占用 1 个 core ;

    2)为了不丢数据需要开启 WAL 机制,这会将 receiver 接收到的数据写一份备份到第三方系统上(如:HDFS );

    3)receiver 内部使用 kafka High Level API 去消费数据及自动更新 offset。

    Direct 模式

    1. Direct 模式下的运行架构

    与 receiver 模式类似,不同在于 executor 中没有 receiver 组件,从 kafka 拉去数据的方式不同。

    2. Direct 从 kafka 拉取数据的过程

    该模式下:

    1)没有 receiver,无需额外的 core 用于不停地接收数据,而是定期查询 kafka 中的每个 partition 的最新的 offset,每个批次拉取上次处理的 offset 和当前查询的 offset 的范围的数据进行处理;

    2)为了不丢数据,无需将数据备份落地,而只需要手动保存 offset 即可;

    3)内部使用 kafka simple Level API 去消费数据, 需要手动维护 offset,kafka zk 上不会自动更新 offset。

    Receiver 与 Direct 模式的区别

    1.前者在 executor 中有 Receiver 接受数据,并且 1 个 Receiver 占用一个 core ;而后者无 Receiver,所以不会暂用 core ;

    2.前者 InputDStream 的分区是 num_receiver *batchInterval/blockInteral,后者的分区数是 kafka topic partition 的数量。Receiver 模式下 num_receiver 的设置不合理会影响性能或造成资源浪费;如果设置太小,并行度不够,整个链路上接收数据将是瓶颈;如果设置太多,则会浪费资源;

    3.前者使用 zookeeper 来维护 consumer 的偏移量,而后者需要自己维护偏移量;

    4.为了保证不丢失数据,前者需要开启 WAL 机制,而后者不需要,只需要在程序中成功消费完数据后再更新偏移量即可。

    3 Receiver 改造成 Direct 模式

    个推使用 Spark Streaming 做实时处理 kafka 数据,先前使用的是 receiver 模式;

    receiver 有以下特点

    1.receiver 模式下,每个 receiver 需要单独占用一个 core ;

    2.为了保证不丢失数据,需要开启 WAL 机制,使用 checkpoint 保存状态;

    3.当 receiver 接受数据速率大于处理数据速率,导致数据积压,最终可能会导致程序挂掉。

    由于以上特点,receiver 模式下会造成一定的资源浪费;使用 checkpoint 保存状态, 如果需要升级程序,则会导致 checkpoint 无法使用;第 3 点 receiver 模式下会导致程序不太稳定;并且如果设置 receiver 数量不合理也会造成性能瓶颈在 receiver。为了优化资源和程序稳定性,应将 receiver 模式改造成 direct 模式。

    修改方式如下:

    1. 修改 InputDStream 的创建

    将 receiver 的:

    val kafkaStream = KafkaUtils.createStream(streamingContext,
         [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
    

    改成 direct 的:

    val directKafkaStream = KafkaUtils.createDirectStream[
         [key class], [value class], [key decoder class], [value decoder class] ](
         streamingContext, [map of Kafka parameters], [set of topics to consume])
    

    2. 手动维护 offset

    receiver 模式代码: ( receiver 模式不需要手动维护 offset,而是内部通过 kafka consumer high level API 提交到 kafka/zk 保存)

    kafkaStream.map {
               ...
     }.foreachRDD { rdd =>
        // 数据处理
        doCompute(rdd)
     }
    

    direct 模式代码:

    directKafkaStream.map {
               ...
     }.foreachRDD { rdd =>
        // 获取当前 rdd 数据对应的 offset
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        // 数据处理
        doCompute(rdd)
        // 自己实现保存 offset
        commitOffsets(offsetRanges)
     }
    

    4 其他优化点

    1. 在 receiver 模式下

    1)拆分 InputDStream,增加 Receiver,从而增加接收数据的并行度;

    2)调整 blockInterval,适当减小,增加 task 数量,从而增加并行度(在 core 的数量>task 数量的情况下);

    3)如果开启了 WAL 机制,数据的存储级别设置为 MOMERY_AND_DISK_SER。

    2.数据序列化使用 Kryoserializationl,相比 Java serializationl 更快,序列化后的数据更小;

    3.建议使用 CMS 垃圾回收器降低 GC 开销;

    4.选择高性能的算子(mapPartitions, foreachPartitions, aggregateByKey 等);

    5.**repartition 的使用:**在 streaming 程序中因为 batch 时间特别短,所以数据量一般较小,所以 repartition 的时间短,可以解决一些因为 topicpartition 中数据分配不均匀导致的数据倾斜问题;

    6.因为 SparkStreaming 生产的 job 最终都是在 sparkcore 上运行的,所以sparkCore 的优化也很重要;

    7.BackPressure 流控

    1)为什么引入 Backpressure ? 当 batch processing time>batchinterval 这种情况持续过长的时间,会造成数据在内存中堆积,导致 Receiver 所在 Executor 内存溢出等问题;

    2)Backpressure:根据 JobScheduler 反馈作业的执行信息来动态调整数据接收率;

    3)配置使用:

    spark.streaming.backpressure.enabled
    含义: 是否启用 SparkStreaming 内部的 backpressure 机制,
    默认值:false ,表示禁用
    
    spark.streaming.backpressure.initialRate
    含义:receiver 为第一个 batch 接收数据时的比率
    
    spark.streaming.receiver.maxRate
    含义:receiver 接收数据的最大比率,如果设置值<=0, 则 receiver 接收数据比率不受限制
    
    spark.streaming.kafka.maxRatePerPartition
    含义: 从每个 kafka partition 中读取数据的最大比率
    

    8.speculation 机制

    spark 内置 speculation 机制,推测 job 中的运行特别慢的 task,将这些 task kill,并重新调度这些 task 执行。 默认 speculation 机制是关闭的,通过以下配置参数开启:

    spark.speculation=true
    

    注意:在有些情况下,开启 speculation 反而效果不好,比如:streaming 程序消费多个 topic 时,从 kafka 读取数据直接处理,没有重新分区,这时如果多个 topic 的 partition 的数据量相差较大那么可能会导致正常执行更大数据量的 task 会被认为执行缓慢,而被中途 kill 掉,这种情况下可能导致 batch 的处理时间反而变长;可以通过 repartition 来解决这个问题,但是要衡量 repartition 的时间;而在 streaming 程序中因为 batch 时间特别短,所以数据量一般较小,所以 repartition 的时间短,不像 spark_batch 一次处理大量数据一旦 repartition 则会特别久,所以最终还是要根据具体情况测试来决定。

    5 总结

    将 Receiver 模式改成 Direct 模式,实现了资源优化,提升了程序的稳定性,缺点是需要自己管理 offset,操作相对复杂。未来,个推将不断探索和优化 Spark Streaming 技术,发挥其强大的数据处理能力,为建设实时数仓提供保障。

    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   954 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 21:28 · PVG 05:28 · LAX 13:28 · JFK 16:28
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.