所以我有一个问题,Kafka沉入Spark流,而发送JSON到多个主题和不可靠的Kafka经纪人。以下是部分代码:
val kS = KafkaUtils.createDirectStream[String, TMapRecord]
(ssc,
PreferConsistent,
Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT))
然后我迭代rdd
kSMapped.foreachRDD {
rdd: RDD[TMsg] => {
rdd.foreachPartition {
part => {
part.foreach { ...........
在我的内心深处
kafkaSink.value.send(kafkaTopic, strJSON)
kafkaSinkMirror.value.send(kafkaTopicMirrorBroker, strJSON)
当镜像代理关闭时,整个流应用程序都在等待它,我们不会向主代理发送任何内容。
你怎么处理?
对于您提出的最简单的解决方案,假设我跳过了要发送给宕机的代理的消息(例如,情况1)
对于第二种情况,我们会做一些缓冲。
p、 稍后我将使用Kafka镜像,但目前我没有这样的选择,所以我需要在我的代码中做出一些解决方案。
1条答案
按热度按时间6mzjoqzu1#
我发现了几个解决这个问题的方法:
您可以使用在worker和checkpoints上抛出任何超时异常。spark多次尝试重新启动错误任务,如中所述
spark.task.maxFailures
财产。可以增加重试次数。如果流作业在最大重试次数后失败,则只会在代理可用时从检查点重新启动作业。或者,当作业失败时,可以手动停止该作业。你可以配置背压
spark.streaming.backpressure.enabled=true
它只允许以处理数据的速度接收数据。您可以将两个结果发送回您的技术Kafka主题,稍后再使用另一个流式处理作业来处理它。
您可以为这种情况创建配置单元或hbase缓冲区,稍后以批处理模式发送未处理的数据。