独立kafka spark sinks(多个生产商和经纪人)

2ic8powd  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(267)

所以我有一个问题,Kafka沉入Spark流,而发送JSON到多个主题和不可靠的Kafka经纪人。以下是部分代码:

val kS = KafkaUtils.createDirectStream[String, TMapRecord]
(ssc,
PreferConsistent,
Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT))

然后我迭代rdd

kSMapped.foreachRDD {
  rdd: RDD[TMsg] => {
    rdd.foreachPartition {
      part => {
        part.foreach { ...........

在我的内心深处

kafkaSink.value.send(kafkaTopic, strJSON)

kafkaSinkMirror.value.send(kafkaTopicMirrorBroker, strJSON)

当镜像代理关闭时,整个流应用程序都在等待它,我们不会向主代理发送任何内容。
你怎么处理?
对于您提出的最简单的解决方案,假设我跳过了要发送给宕机的代理的消息(例如,情况1)
对于第二种情况,我们会做一些缓冲。
p、 稍后我将使用Kafka镜像,但目前我没有这样的选择,所以我需要在我的代码中做出一些解决方案。

6mzjoqzu

6mzjoqzu1#

我发现了几个解决这个问题的方法:
您可以使用在worker和checkpoints上抛出任何超时异常。spark多次尝试重新启动错误任务,如中所述 spark.task.maxFailures 财产。可以增加重试次数。如果流作业在最大重试次数后失败,则只会在代理可用时从检查点重新启动作业。或者,当作业失败时,可以手动停止该作业。
你可以配置背压 spark.streaming.backpressure.enabled=true 它只允许以处理数据的速度接收数据。
您可以将两个结果发送回您的技术Kafka主题,稍后再使用另一个流式处理作业来处理它。
您可以为这种情况创建配置单元或hbase缓冲区,稍后以批处理模式发送未处理的数据。

相关问题