我实现了一个spark任务,用foreachbatch在结构化流媒体中读取kafka主题中的流。
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "mykafka.broker.io:6667")
.option("subscribe", "test-topic")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", "/home/hadoop/cacerts")
.option("kafka.ssl.truststore.password", tspass)
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("groupIdPrefix","MY_GROUP_ID")
.load()
val streamservice = df.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), schema).as("data"))
.select("data.*")
var stream_df = streamservice
.selectExpr("cast(id as string) id", "cast(x as int) x")
val monitoring_stream = stream_df.writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
if(!batchDF.isEmpty) { }
}
.start()
.awaitTermination()
我有以下问题。
如果Kafka主题长时间没有数据,stream_df.writestream会自动终止吗?有超时控制吗?
如果从kafka broker中删除kafka主题,stream_df.writestream是否会终止?
希望星火工作在上述两种情况下,不间断地对Kafka主题进行监控。我需要一些Kafka连接器和/或流的特殊设置吗?df.writerstream?
1条答案
按热度按时间ac1kyiln1#
如果Kafka主题长时间没有数据,stream_df.writestream会自动终止吗?有超时控制吗?
查询的终止与正在处理的数据无关。即使没有新消息生成到您的kafka主题,查询也会继续运行,因为它是作为流运行的。
我猜这就是你在测试时已经发现的。我们使用结构化流式查询来处理来自kafka的数据,并且它们在较长时间内(例如在周末营业时间以外)没有空闲的问题。
如果从kafka broker中删除kafka主题,stream_df.writestream是否会终止?
默认情况下,如果在查询运行时删除kafka主题,将引发异常:
我提到“默认”是因为查询选项
failOnDataLoss
默认为true
. 如异常消息中所述,您可以将其设置为false以运行流式查询。此选项在结构化流媒体+Kafka集成指南中描述为:“当数据可能丢失(例如,主题被删除或偏移量超出范围)时,是否使查询失败。”。这可能是虚惊一场。当它不能按预期工作时,您可以禁用它。”