使用spark结构化流媒体技术读取多个kafka主题并写入不同的接收器的最佳方式是什么?

7ivaypg9  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(379)

我正在尝试编写一个spark结构的流式作业,它读取多个kafka主题(可能是100个),并根据主题名称将结果写入s3上的不同位置。我已经开发了这个代码片段,当前可以读取多个主题,并将结果输出到控制台(基于循环),它可以按预期工作。但是,我想了解性能影响是什么。这是推荐的方法吗?不建议有多个readstream和writestream操作吗?如果是,建议采用什么方法?

my_topics = ["topic_1", "topic_2"]

for i in my_topics:
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", i) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", "s3://<MY_BUCKET>/{}".format(i)) \
        .start()
bvjveswy

bvjveswy1#

每个驱动程序节点运行多个并发流当然是合理的。
在spark中,每个.start()都会消耗一定数量的驱动程序资源。限制因素将是驱动程序节点上的负载及其可用资源。100个以高速率连续运行的主题需要分布在多个驱动程序节点上[在databricks中,每个集群有一个驱动程序]。spark的优点正如您所提到的,它有多个接收器,还有一个用于转换的统一批处理和流式api。
另一个问题是处理可能最终导致s3和文件一致性的小写操作。看看delta.io如何处理对s3的一致可靠的写入。

yqlxgs2m

yqlxgs2m2#

以下方法的优点。
泛型
多线程,所有线程将单独工作。
易于维护的代码和任何问题的支持。
如果一个主题失败,对生产中的其他主题没有影响。你只需要专注于失败的一个。
如果您想获取特定主题的所有数据,只需停止该主题的作业,更新或更改配置并重新启动同一作业即可。
注意-下面的代码不是完全通用的,您可能需要更改或调整下面的代码。

topic="" // Get value from input arguments
sink="" // Get value from input arguments

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", topic) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", sink) \
        .start()

以下方法的问题。
如果一个主题失败,它将终止整个程序。
有限的线程。
难以维护代码,调试和支持任何问题。
如果您想从kafka中提取特定主题的所有数据,这是不可能的,因为任何配置更改都将应用于所有主题,因此它的操作成本太高。

my_topics = ["topic_1", "topic_2"]

for i in my_topics:
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", i) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", "s3://<MY_BUCKET>/{}".format(i)) \
        .start()

相关问题