问题
如何在处理kafka主题和使用spark结构化流的it s3备份之间实现幂等(相同的事件顺序)?
看到了吗 # QUESTION:
在下面的代码示例中添加注解。
用例
假设您拥有与uber流处理完全相同的用例,用于这两个方面:低延迟和历史分析。有关实际示例,请参阅设计生产就绪kappa体系结构以便及时处理数据流的“动机”部分:
您将数据存储在kafka中,并使用kafka connect aws s3 sink将它们归档到s3。流处理采用spark结构化流。您决定使用“组合方法”部分来解决这个问题:在s3上以后台回填模式运行代码(Kafka备份主题)。
代码示例
def some_processing(spark: SparkSession, stream: DataFrame, backfilling_mode: bool):
input_stream = read(spark, 'input_topic', backfilling_mode)
processed_stream = input_stream \
.withWatermark('event_time', '10 seconds') \
.groupBy(window('event_time', '10 seconds', '10 seconds'), 'user_id') \
.agg(sum(col('price'))) \
.join(...)
write(processed_stream, 'output_topic', backfilling_mode)
def read(spark: SparkSession, topic_name: str, backfilling_mode: bool) -> DataFrame:
"""Reads data from Kafka topic or its S3 backup"""
# QUESTION: how Spark achieve the same order on backfilling_mode=True and backfilling_mode=False
if backfilling_mode:
stream = spark \
.readStream \
.schema(input_schema) \
.format('com.databricks.spark.avro') \
.option('maxFilesPerTrigger', 20) \
.load('/s3_base_bath/{}'.format(topic_name))
else:
stream = spark \
.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'host1:port1,host2:port2') \
.option('subscribe', topic_name) \
.load()
def write(stream: DataFrame, topic_name: str, backfilling_mode: bool):
"""Writes data to Kafka topic or its S3 backup"""
if backfilling_mode:
stream \
.writeStream \
.trigger(processingTime='60 seconds') \
.format('com.databricks.spark.avro') \
.option('path', '/s3_base_bath/{}'.format(topic_name)) \
.start() \
.awaitTermination()
else:
stream \
.writeStream \
.trigger(processingTime='10 seconds') \
.option('kafka.bootstrap.servers', 'host1:port1,host2:port2') \
.option('topic', 'output_topic')
.format('kafka') \
.start() \
.awaitTermination()
暂无答案!
目前还没有任何答案,快来回答吧!