当一个Kafka主题分区中没有数据时,结构化流作业失败

332nm8kg  于 2023-06-21  发布在  Apache
关注(0)|答案(1)|浏览(131)

当我尝试通过偏移量的时间戳从Kafka主题中检索数据时,作业失败并出现错误:
原因:java.lang.AssertionError:Assert失败:没有与topic-partition topicA-0和timestamp 1686877634000的请求匹配的偏移量。
问题是主题中实际上有数据,但仅在分区-1中,但作业仍然失败,因为分区-0中没有数据。
我使用的代码:

df_stream = (
            spark.read.format("kafka")
            .option("kafka.bootstrap.servers", kafka_servers)
            .option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            .option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            .option("subscribe", kafka_topic)
            .option("kafka.security.protocol", "SASL_PLAINTEXT")
            .option("kafka.group.id", kafka_group_id)
            .option("startingOffsetsByTimestamp", """{"topicA":{"0": 1686877634000, "1": 1686877634000}}""")
            .option("endingOffsetsByTimestamp", """{"topicA":{"0": 1686881234000, "1": 1686881234000}""")
            .option("failOnDataLoss", false) 
            .options(**options)
            .load()
        )

有没有办法避免这种错误?我需要检索特定时间戳的数据。我尝试读取主题中的所有数据并通过Kafka时间戳进行过滤,它可以工作,但我认为这不是一个好的解决方案。

vlju58qv

vlju58qv1#

您应该将分区0的起始偏移量设置为另一个值。如果你查看你的checkpointing目录(在sparksession创建过程中设置),并检查offsets目录,你应该能够设置你的分区的值来启动流。

相关问题