我正在尝试编写一个spark结构的流式作业,它读取多个kafka主题(可能是100个),并根据主题名称将结果写入s3上的不同位置。我已经开发了这个代码片段,当前可以读取多个主题,并将结果输出到控制台(基于循环),它可以按预期工作。但是,我想了解性能影响是什么。这是推荐的方法吗?不建议有多个readstream和writestream操作吗?如果是,建议采用什么方法?
my_topics = ["topic_1", "topic_2"]
for i in my_topics:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("subscribePattern", i) \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
output_df = df \
.writeStream \
.format("console") \
.option("truncate", False) \
.outputMode("update") \
.option("checkpointLocation", "s3://<MY_BUCKET>/{}".format(i)) \
.start()
2条答案
按热度按时间bvjveswy1#
每个驱动程序节点运行多个并发流当然是合理的。
在spark中,每个.start()都会消耗一定数量的驱动程序资源。限制因素将是驱动程序节点上的负载及其可用资源。100个以高速率连续运行的主题需要分布在多个驱动程序节点上[在databricks中,每个集群有一个驱动程序]。spark的优点正如您所提到的,它有多个接收器,还有一个用于转换的统一批处理和流式api。
另一个问题是处理可能最终导致s3和文件一致性的小写操作。看看delta.io如何处理对s3的一致可靠的写入。
yqlxgs2m2#
以下方法的优点。
泛型
多线程,所有线程将单独工作。
易于维护的代码和任何问题的支持。
如果一个主题失败,对生产中的其他主题没有影响。你只需要专注于失败的一个。
如果您想获取特定主题的所有数据,只需停止该主题的作业,更新或更改配置并重新启动同一作业即可。
注意-下面的代码不是完全通用的,您可能需要更改或调整下面的代码。
以下方法的问题。
如果一个主题失败,它将终止整个程序。
有限的线程。
难以维护代码,调试和支持任何问题。
如果您想从kafka中提取特定主题的所有数据,这是不可能的,因为任何配置更改都将应用于所有主题,因此它的操作成本太高。