我正在尝试运行多个spark结构化流作业(在emr上),这些作业从kafka主题读取数据,然后写入s3中的不同路径(每个路径在各自的作业中执行)。我已将群集配置为使用capacityscheduler。下面是我尝试运行的代码片段:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", <BOOTSTRAP_SERVERS>) \
.option("subscribePattern", "<MY_TOPIC>") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
output = df \
.writeStream \
.format("json") \
.outputMode("update") \
.option("checkpointLocation", "s3://<CHECKPOINT_LOCATION>") \
.option("path", "s3://<SINK>") \
.start() \
.awaitTermination()
我试着并行运行两个作业:
spark-submit --queue <QUEUE_1> --deploy-mode cluster --master yarn <STREAM_1_SCRIPT>.py
spark-submit --queue <QUEUE_2> --deploy-mode cluster --master yarn <STREAM_2_SCRIPT>.py
在执行过程中,我注意到第二个作业没有写入s3(即使第一个作业是)。我还注意到,在第二个作业中,通过spark ui的利用率出现了一个巨大的峰值。
在停止第一个作业后,s3中显示了第二个作业的数据。难道不可能并行运行两个单独的spark结构化流式作业来写入接收器(特别是在s3上)?写操作会导致某种阻塞吗?
1条答案
按热度按时间drnojrws1#
是的,你可以!也就是说,它不是一个有多个源文档的东西,但是,你唯一需要它在你的多个作业线程之间共享spark上下文的东西。在本文之后,我创建了一个多spark结构的流媒体管道https://cm.engineering/multiple-spark-streaming-jobs-in-a-single-emr-cluster-ca86c28d1411 任何问题,你可以给我发电子邮件或交谈收件箱给我。
谢谢您!