我正在Databricks集群上使用Spark Structured Streaming从Azure Event Hub提取数据,处理数据,并使用ForEachBatch将数据写入雪花,其中Epoch_Id/ Batch_Id传递给foreach批处理函数。
我的代码如下所示:
ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(EVENT_HUB_CONNECTION_STRING)
ehConf['eventhubs.consumerGroup'] = consumergroup
# Read stream data from event hub
spark_df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
一些变形......
写入雪花
def foreach_batch_function(df, epoch_id):
df.write\
.format(SNOWFLAKE_SOURCE_NAME)\
.options(**sfOptions)\
.option("dbtable", snowflake_table)\
.mode('append')\
.save()
processed_df.writeStream.outputMode('append').\
trigger(processingTime='10 seconds').\
option("checkpointLocation",f"checkpoint/P1").\
foreachBatch(foreach_batch_function).start()
目前我面临两个问题:
1.当节点故障发生。虽然在Spark官方网站上,它提到,当一个使用ForeachBatch随着epoch_id/batch_id在恢复表单节点故障不应该有任何重复,但我发现重复得到填充在我的雪花表。链接供参考:[具有纪元Id的每个批次的Spark结构化流][1]。
1.我遇到错误a.)**传输客户端:无法将RPC 5782383376229127321发送到/30.62.166.7:**和B.)任务调度器实现:在www. example. com上丢失执行程序156030.62.166.7:工作程序已解除:Worker Decommissioned在我的数据块集群上非常频繁。无论我分配多少执行器或增加多少执行器内存,集群都会达到最大Worker限制,并且我收到两个错误中的一个,在恢复雪花表后,将重复项填充到雪花表中。
对上述任何一点的任何解决方案/建议都将是有帮助的。
先谢谢你。
1条答案
按热度按时间pftdvrlh1#
根据定义,
foreachBatch
不是幂等的,因为当当前执行的批处理失败时,它会重试,并且可能会观察到部分结果,这与您的观察结果相匹配。foreachBatch
中的幂等写入仅适用于Delta Lake表,而不是所有接收器类型。(在某些情况下,比如, cassandra ,它也可以工作)。我不是很熟悉雪花,但也许您可以实现类似于其他数据库功能--将数据写入临时表,(每个批处理将执行一次覆盖),然后从临时表合并到目标表中。关于第二个问题-看起来您正在使用自动缩放群集-在这种情况下,工作进程可能会被停用,因为群集管理器检测到群集未完全加载要避免这种情况,您可以禁用自动缩放,并使用固定大小群集