如何确保在保存()失败时SparkStream应用中的数据不会丢失?

f4t66c6m  于 2023-03-30  发布在  Apache
关注(0)|答案(2)|浏览(131)

我有一个spark readStream函数,它可以连续地从Kafka读取数据。我对数据执行了一些操作,并希望使用Spark writeStream将其批量写入Cassandra DB。在写入Cassandra时,它可能会抛出任何类型的异常(ConnectionTimeOut等)。我可以做些什么来确保数据不会丢失,以及我可以做些什么来对特定的一批数据执行重试。
这是我的writeStream函数,它在内部调用保存方法,我们在该方法中执行对表的写入。

query = df.writeStream \
    .outputMode("append") \
    .option("checkpointLocation", "path") \
    .option("failOnDataLoss", "false") \  
    .option("maxAttempts", "5") \ 
    .option("retryOnDataLoss", "true") \ 
    .option("failedWriteFile", "path") \
    .foreachBatch(save) \
    .start()

这就是保存方法。

`def save(df, batch_id):
    try:
        (df.write
         .format("org.apache.spark.sql.cassandra")
         .options(table=tableName, keyspace=keyspaceName)
         .mode("append")
         .save())
        return None
    except Exception as e:
        raise e`

就我所知,当保存方法抛出异常时,spark函数会再次重试该批处理,直到重试次数耗尽。即使仍然失败,它也会写入指定的路径并继续下一批处理。
这些选项maxAttemptsretryOnDataLossfailedWriteFile仍然有效吗?我没有在官方sparkDocs或spark-cassandra-connector库中找到任何参考。或者有任何其他替代方案。
https://github.com/datastax/spark-cassandra-connector
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch

8hhllhi2

8hhllhi21#

配置:

.option("retryOnDataLoss", "true")
   .option("failedWriteFile", "path")

可以毫无问题地删除,当异常发生时,spark已经处理失败任务的重试。

y53ybaqx

y53ybaqx2#

默认情况下,Spark Cassandra连接器将重试save() 60次:

spark.cassandra.query.retry.count   60

除非在应用程序的配置中重写该属性。
如果写 cassandra 失败后,60次尝试,我会建议数据丢失是去你的问题最小,因为你的 cassandra 是最有可能下来。干杯!

相关问题