当我尝试通过偏移量的时间戳从Kafka主题中检索数据时,作业失败并出现错误:
原因:java.lang.AssertionError:Assert失败:没有与topic-partition topicA-0和timestamp 1686877634000的请求匹配的偏移量。
问题是主题中实际上有数据,但仅在分区-1中,但作业仍然失败,因为分区-0中没有数据。
我使用的代码:
df_stream = (
spark.read.format("kafka")
.option("kafka.bootstrap.servers", kafka_servers)
.option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("subscribe", kafka_topic)
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.group.id", kafka_group_id)
.option("startingOffsetsByTimestamp", """{"topicA":{"0": 1686877634000, "1": 1686877634000}}""")
.option("endingOffsetsByTimestamp", """{"topicA":{"0": 1686881234000, "1": 1686881234000}""")
.option("failOnDataLoss", false)
.options(**options)
.load()
)
有没有办法避免这种错误?我需要检索特定时间戳的数据。我尝试读取主题中的所有数据并通过Kafka时间戳进行过滤,它可以工作,但我认为这不是一个好的解决方案。
1条答案
按热度按时间vlju58qv1#
您应该将分区0的起始偏移量设置为另一个值。如果你查看你的
checkpointing
目录(在sparksession创建过程中设置),并检查offsets
目录,你应该能够设置你的分区的值来启动流。