我们使用pyspark应用程序来处理kafka源主题中的一些数据,并将处理后的数据写入单独的主题中。我们有另一个python应用程序,它使用kafkapython来使用处理过的主题。第一次运行时一切正常。
稍后,我们决定向处理过的主题添加另一列。源主题已经包含了这个信息,所以我们停止旧的kafka/spark流,开始一个新的,除了包含新的列之外,它做的完全相同。新的流开始于 startingOffsets
设置为 earliest
新列被打印到控制台,因此我假设该列现在包含在处理的主题中。
python应用程序与使用者一起停止旧的使用者,并启动一个新的使用者。这个是从 startingOffsets
设置为 latest
. 问题是,消费者似乎并不使用新处理的数据。不知何故,重新处理的数据不会触发python应用程序的消费。我是不是漏了什么?
顺便说一句:使用时 startingOffsets
设置为 earliest
在使用consumer的python应用程序上,它开始使用所有旧数据,而不是使用新列使用新处理的数据。
例子:
第一次运行
源主题包含以下内容:
|column1|column2|column3|column4|column5|column6|
这由pyspark应用程序处理到已处理的主题:
|column1|column2|column4|column6|
python和消费者应用程序使用它。即使一小时后新的数据进来,它也会被处理。
更新运行后
源主题包含以下内容:
|column1|column2|column3|column4|column5|column6|
这由pyspark应用程序处理到已处理的主题:
|column1|column2|column4|column5|column6|
python应用程序不会使用它。
更新
pyspark应用程序代码:
self.data_frame = self.spark_session.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_HOST) \
.option("subscribe", self.compute_config.topic_sources[0]) \
.option("startingOffsets", "earliest" if reset_offset is True else "latest") \
.option("failOnDataLoss", False) \
.load()
...
self.ds = self.data_frame.select("key", from_json(col("value").cast("string"), self.schema).alias("value")) \
.withColumn("value", col("value").cast(self.rename_schema)) \
.withColumn("value", to_json("value")) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_HOST) \
.option("topic", self.compute_config.topic_target) \
.option("checkpointLocation", f"/app/checkpoints/{self.compute_config.topic_target+str(self.compute_config.id)}") \
.start()
来自python应用程序和使用者的代码:
self.consumer = KafkaConsumer(self.sink_config.topic_target,
bootstrap_servers=[KAFKA_HOST],
auto_offset_reset="latest",
enable_auto_commit=False)
for message in self.consumer:
...
暂无答案!
目前还没有任何答案,快来回答吧!