如何将流数据集写入kafka?

4si2a6ki  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(415)

我想对主题数据做些充实。因此,使用spark结构化流媒体从Kafka接收器读回Kafka。

val ds = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("subscribe", "topicname")
      .load()

val enriched = ds.select("key", "value", "topic").as[(String, String, String)].map(record => enrich(record._1,
      record._2, record._3)

val query = enriched.writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("topic", "desttopic")
      .start()

但我有个例外:

Exception in thread "main" java.lang.UnsupportedOperationException: Data source kafka does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266)
    at kafka_bridge.KafkaBridge$.main(KafkaBridge.scala:319)
    at kafka_bridge.KafkaBridge.main(KafkaBridge.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

有解决办法吗?

pbpqsu0x

pbpqsu0x1#

试试这个

ds.map(_.toString.getBytes).toDF("value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092"))
      .option("topic", topic)
      .start
      .awaitTermination()
evrscar2

evrscar22#

作为t。gaw公司ęda提到,没有kafka格式将流数据集写入kafka(即kafka接收器)。
spark 2.1中当前推荐的解决方案是使用foreach操作符。
foreach操作允许对输出数据计算任意操作。从spark 2.1开始,这只适用于scala和java。要使用它,您必须实现foreachwriter接口(scala/javadocs),该接口有一些方法,只要在触发器之后有一系列行作为输出生成,就会调用这些方法。注意以下要点。

sd2nnvve

sd2nnvve3#

spark 2.1(目前是spark的最新版本)没有。下一个版本-2.2-将有Kafka作家,看看这个提交。
Kafka和Kafka的作家是一样的。

相关问题