spark结构化流媒体:如何在处理kafka主题及其在s3上的备份时达到相同的顺序

mgdq6dx1  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(211)

问题

如何在处理kafka主题和使用spark结构化流的it s3备份之间实现幂等(相同的事件顺序)?
看到了吗 # QUESTION: 在下面的代码示例中添加注解。

用例

假设您拥有与uber流处理完全相同的用例,用于这两个方面:低延迟和历史分析。有关实际示例,请参阅设计生产就绪kappa体系结构以便及时处理数据流的“动机”部分:
您将数据存储在kafka中,并使用kafka connect aws s3 sink将它们归档到s3。流处理采用spark结构化流。您决定使用“组合方法”部分来解决这个问题:在s3上以后台回填模式运行代码(Kafka备份主题)。

代码示例

def some_processing(spark: SparkSession, stream: DataFrame, backfilling_mode: bool):
  input_stream = read(spark, 'input_topic', backfilling_mode)
  processed_stream = input_stream \
    .withWatermark('event_time', '10 seconds') \
    .groupBy(window('event_time', '10 seconds', '10 seconds'), 'user_id') \
    .agg(sum(col('price'))) \
    .join(...)
  write(processed_stream, 'output_topic', backfilling_mode)

def read(spark: SparkSession, topic_name: str, backfilling_mode: bool) -> DataFrame:
  """Reads data from Kafka topic or its S3 backup"""
  # QUESTION: how Spark achieve the same order on backfilling_mode=True and backfilling_mode=False
  if backfilling_mode:
    stream = spark \
      .readStream \
      .schema(input_schema) \ 
      .format('com.databricks.spark.avro') \
      .option('maxFilesPerTrigger', 20) \
      .load('/s3_base_bath/{}'.format(topic_name)) 
  else:
    stream = spark \
      .readStream \
      .format('kafka') \
      .option('kafka.bootstrap.servers', 'host1:port1,host2:port2') \
      .option('subscribe', topic_name) \
      .load() 

def write(stream: DataFrame, topic_name: str, backfilling_mode: bool):
  """Writes data to Kafka topic or its S3 backup"""
  if backfilling_mode:
    stream \
      .writeStream \
      .trigger(processingTime='60 seconds') \
      .format('com.databricks.spark.avro') \
      .option('path', '/s3_base_bath/{}'.format(topic_name)) \
      .start() \
      .awaitTermination()
  else:
    stream \
      .writeStream \
      .trigger(processingTime='10 seconds') \
      .option('kafka.bootstrap.servers', 'host1:port1,host2:port2') \
      .option('topic', 'output_topic')
      .format('kafka') \
      .start() \
      .awaitTermination()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题