我有下面的代码,我想写进Cassandra使用Spark2.4结构化流foreachbatch
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
Dataset<Row> values=df.selectExpr(
"split(value,',')[0] as field1",
"split(value,',')[1] as field2",
"split(value,',')[2] as field3",
"split(value,',')[3] as field4",
"split(value,',')[4] as field5");
//TODO write into cassandra
values.writeStream().foreachBatch(
new VoidFunction2<Dataset<String>, Long> {
public void call(Dataset<String> dataset, Long batchId) {
// Transform and write batchDF
}
).start();
2条答案
按热度按时间kuuvgm7e1#
尝试将其添加到pom.xml:
在那之后,Cassandra暗示:
您可以在df上使用cassandraformat方法:
rn0zuynd2#
当你使用
.forEachBatch
,您的代码与普通数据集一样工作。。。在java中,代码可以如下所示(完整的源代码如下所示):2020年9月更新:spark cassandra connector 2.5.0中增加了对spark结构化流媒体的支持