如何将流式数据集写入Cassandra?

t5fffqht  于 2022-11-05  发布在  Cassandra
关注(0)|答案(2)|浏览(205)

我有一个Python Stream-sourced DataFrame df,它包含了所有我想用spark-cassandra-connector放入Cassandra表中的数据。

df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode('append') \
    .options(table="myTable",keyspace="myKeySpace") \
    .save() 

query = df.writeStream \
    .format("org.apache.spark.sql.cassandra") \
    .outputMode('append') \
    .options(table="myTable",keyspace="myKeySpace") \
    .start()

query.awaitTermination()

然而我不断地得到这样的错误,分别是:

pyspark.sql.utils.AnalysisException: "'write' can not be called on streaming Dataset/DataFrame;

java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.cassandra does not support streamed writing.

是否有任何方法可以将流 Dataframe 发送到Cassandra表中?

jhiyze9q

jhiyze9q1#

Spark Cassandra连接器中目前没有Cassandra的Sink流。您需要实现自己的Sink或等待它可用。
如果您使用的是Scala或Java,则可以使用foreach运算符和ForeachWriter,如使用Foreach中所述。

nuypyhwy

nuypyhwy2#

我知道这是一个旧的职位,更新它供将来参考。
您可以将其作为流数据的批处理。如下所示

def writeToCassandra(writeDF, epochId):
 writeDF.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="table_name", keyspace="keyspacename")\
    .mode("append") \
    .save()

query = sdf3.writeStream \
.trigger(processingTime="10 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()

相关问题