kafka消费者不使用spark消费重新处理的数据

eqqqjvef  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(280)

我们使用pyspark应用程序来处理kafka源主题中的一些数据,并将处理后的数据写入单独的主题中。我们有另一个python应用程序,它使用kafkapython来使用处理过的主题。第一次运行时一切正常。
稍后,我们决定向处理过的主题添加另一列。源主题已经包含了这个信息,所以我们停止旧的kafka/spark流,开始一个新的,除了包含新的列之外,它做的完全相同。新的流开始于 startingOffsets 设置为 earliest 新列被打印到控制台,因此我假设该列现在包含在处理的主题中。
python应用程序与使用者一起停止旧的使用者,并启动一个新的使用者。这个是从 startingOffsets 设置为 latest . 问题是,消费者似乎并不使用新处理的数据。不知何故,重新处理的数据不会触发python应用程序的消费。我是不是漏了什么?
顺便说一句:使用时 startingOffsets 设置为 earliest 在使用consumer的python应用程序上,它开始使用所有旧数据,而不是使用新列使用新处理的数据。
例子:
第一次运行
源主题包含以下内容:

|column1|column2|column3|column4|column5|column6|

这由pyspark应用程序处理到已处理的主题:

|column1|column2|column4|column6|

python和消费者应用程序使用它。即使一小时后新的数据进来,它也会被处理。
更新运行后
源主题包含以下内容:

|column1|column2|column3|column4|column5|column6|

这由pyspark应用程序处理到已处理的主题:

|column1|column2|column4|column5|column6|

python应用程序不会使用它。

更新

pyspark应用程序代码:

self.data_frame = self.spark_session.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_HOST) \
            .option("subscribe", self.compute_config.topic_sources[0]) \
            .option("startingOffsets", "earliest" if reset_offset is True else "latest") \
            .option("failOnDataLoss", False) \
            .load()

...

self.ds = self.data_frame.select("key", from_json(col("value").cast("string"), self.schema).alias("value")) \
            .withColumn("value", col("value").cast(self.rename_schema)) \
            .withColumn("value", to_json("value")) \
            .writeStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_HOST) \
            .option("topic", self.compute_config.topic_target) \
            .option("checkpointLocation", f"/app/checkpoints/{self.compute_config.topic_target+str(self.compute_config.id)}") \
            .start()

来自python应用程序和使用者的代码:

self.consumer = KafkaConsumer(self.sink_config.topic_target,
                                      bootstrap_servers=[KAFKA_HOST],
                                      auto_offset_reset="latest",
                                      enable_auto_commit=False)

for message in self.consumer:
    ...

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题