spark结构化流在从kafka主题读取流时是否存在超时问题?

3qpi33ja  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(298)

我实现了一个spark任务,用foreachbatch在结构化流媒体中读取kafka主题中的流。

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "mykafka.broker.io:6667")
  .option("subscribe", "test-topic")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", "/home/hadoop/cacerts")
  .option("kafka.ssl.truststore.password", tspass)
  .option("kafka.ssl.truststore.type", "JKS")
  .option("kafka.sasl.kerberos.service.name", "kafka")
  .option("kafka.sasl.mechanism", "GSSAPI")
  .option("groupIdPrefix","MY_GROUP_ID")
  .load()

val streamservice = df.selectExpr("CAST(value AS STRING)")
  .select(from_json(col("value"), schema).as("data"))
  .select("data.*")

var stream_df = streamservice
  .selectExpr("cast(id as string) id", "cast(x as int) x")

val monitoring_stream = stream_df.writeStream
  .trigger(Trigger.ProcessingTime("120 seconds"))
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    if(!batchDF.isEmpty) { }
  }
  .start()
  .awaitTermination()

我有以下问题。
如果Kafka主题长时间没有数据,stream_df.writestream会自动终止吗?有超时控制吗?
如果从kafka broker中删除kafka主题,stream_df.writestream是否会终止?
希望星火工作在上述两种情况下,不间断地对Kafka主题进行监控。我需要一些Kafka连接器和/或流的特殊设置吗?df.writerstream?

ac1kyiln

ac1kyiln1#

如果Kafka主题长时间没有数据,stream_df.writestream会自动终止吗?有超时控制吗?
查询的终止与正在处理的数据无关。即使没有新消息生成到您的kafka主题,查询也会继续运行,因为它是作为流运行的。
我猜这就是你在测试时已经发现的。我们使用结构化流式查询来处理来自kafka的数据,并且它们在较长时间内(例如在周末营业时间以外)没有空闲的问题。
如果从kafka broker中删除kafka主题,stream_df.writestream是否会终止?
默认情况下,如果在查询运行时删除kafka主题,将引发异常:

ERROR MicroBatchExecution: Query [id = b1f84242-d72b-4097-97c9-ee603badc484, runId = 752b0fe4-2762-4fff-8912-f4cffdbd7bdc] terminated with error
java.lang.IllegalStateException: Partition test-0's offset was changed from 1 to 0, some data may have been missed. 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".

我提到“默认”是因为查询选项 failOnDataLoss 默认为 true . 如异常消息中所述,您可以将其设置为false以运行流式查询。此选项在结构化流媒体+Kafka集成指南中描述为:
“当数据可能丢失(例如,主题被删除或偏移量超出范围)时,是否使查询失败。”。这可能是虚惊一场。当它不能按预期工作时,您可以禁用它。”

相关问题