我有一个spark readStream函数,它可以连续地从Kafka读取数据。我对数据执行了一些操作,并希望使用Spark writeStream将其批量写入Cassandra DB。在写入Cassandra时,它可能会抛出任何类型的异常(ConnectionTimeOut等)。我可以做些什么来确保数据不会丢失,以及我可以做些什么来对特定的一批数据执行重试。
这是我的writeStream函数,它在内部调用保存方法,我们在该方法中执行对表的写入。
query = df.writeStream \
.outputMode("append") \
.option("checkpointLocation", "path") \
.option("failOnDataLoss", "false") \
.option("maxAttempts", "5") \
.option("retryOnDataLoss", "true") \
.option("failedWriteFile", "path") \
.foreachBatch(save) \
.start()
这就是保存方法。
`def save(df, batch_id):
try:
(df.write
.format("org.apache.spark.sql.cassandra")
.options(table=tableName, keyspace=keyspaceName)
.mode("append")
.save())
return None
except Exception as e:
raise e`
就我所知,当保存方法抛出异常时,spark函数会再次重试该批处理,直到重试次数耗尽。即使仍然失败,它也会写入指定的路径并继续下一批处理。
这些选项maxAttempts
,retryOnDataLoss
,failedWriteFile
仍然有效吗?我没有在官方sparkDocs或spark-cassandra-connector库中找到任何参考。或者有任何其他替代方案。
https://github.com/datastax/spark-cassandra-connector
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch
2条答案
按热度按时间8hhllhi21#
配置:
可以毫无问题地删除,当异常发生时,spark已经处理失败任务的重试。
y53ybaqx2#
默认情况下,Spark Cassandra连接器将重试
save()
60次:除非在应用程序的配置中重写该属性。
如果写 cassandra 失败后,60次尝试,我会建议数据丢失是去你的问题最小,因为你的 cassandra 是最有可能下来。干杯!