如何使用JavaSpark中的foreachbatch()编写cassandra?

cwxwcias  于 2021-06-10  发布在  Cassandra
关注(0)|答案(2)|浏览(455)

我有下面的代码,我想写进Cassandra使用Spark2.4结构化流foreachbatch

  1. Dataset<Row> df = spark
  2. .readStream()
  3. .format("kafka")
  4. .option("kafka.bootstrap.servers", "localhost:9092")
  5. .option("subscribe", "topic1")
  6. .load();
  7. Dataset<Row> values=df.selectExpr(
  8. "split(value,',')[0] as field1",
  9. "split(value,',')[1] as field2",
  10. "split(value,',')[2] as field3",
  11. "split(value,',')[3] as field4",
  12. "split(value,',')[4] as field5");
  13. //TODO write into cassandra
  14. values.writeStream().foreachBatch(
  15. new VoidFunction2<Dataset<String>, Long> {
  16. public void call(Dataset<String> dataset, Long batchId) {
  17. // Transform and write batchDF
  18. }
  19. ).start();
kuuvgm7e

kuuvgm7e1#

尝试将其添加到pom.xml:

  1. <dependency>
  2. <groupId>com.datastax.spark</groupId>
  3. <artifactId>spark-cassandra-connector_2.11</artifactId>
  4. <version>2.4.2</version>
  5. </dependency>

在那之后,Cassandra暗示:

  1. import org.apache.spark.sql.cassandra._

您可以在df上使用cassandraformat方法:

  1. dataset
  2. .write
  3. .cassandraFormat("table","keyspace")
  4. .save()
展开查看全部
rn0zuynd

rn0zuynd2#

当你使用 .forEachBatch ,您的代码与普通数据集一样工作。。。在java中,代码可以如下所示(完整的源代码如下所示):

  1. .foreachBatch((VoidFunction2<Dataset<Row>, Long>) (df, batchId) ->
  2. df.write()
  3. .format("org.apache.spark.sql.cassandra")
  4. .options(ImmutableMap.of("table", "sttest", "keyspace", "test"))
  5. .mode(SaveMode.Append)
  6. .save()
  7. )

2020年9月更新:spark cassandra connector 2.5.0中增加了对spark结构化流媒体的支持

相关问题