如何将数据集写入kafka主题?

ttvkxqim  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(377)

我使用的是spark 2.1.0和kafka 0.9.0。
我正试图把批量Spark作业的输出推给Kafka。作业应该每小时运行一次,但不能像流媒体那样运行。
在网上寻找答案的时候,我只能找到Kafka与spark streaming的集成,而没有找到与批处理作业的集成。
有人知道这样做是否可行吗?
谢谢
更新:
正如user8371915所提到的,我试图遵循将批处理查询的输出写入kafka时所做的操作。
我用了一个Spark壳:

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

下面是我尝试的简单代码:

val df = Seq(("Rey", "23"), ("John", "44")).toDF("key", "value")
val newdf = df.select(to_json(struct(df.columns.map(column):_*)).alias("value"))
newdf.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "alerts").save()

但我得到了一个错误:

java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider does not allow create table as select.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:497)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
... 50 elided

你知道这和什么有关吗?
谢谢

mw3dktmi

mw3dktmi1#

对于此错误,java.lang.runtimeexception:org.apache.spark.sql.kafka010.kafkasourceprovider不允许将表创建为select。在scala.sys.package$.error(package。scala:27)
我认为您需要将消息解析为键值对。您的Dataframe应该有值列。
假设你有一个带有学生id的数据框,分数。

df.show()
>> student_id | scores
    1         |  99.00
    2         |  98.00

然后您应该将Dataframe修改为

value
{"student_id":1,"score":99.00}
{"student_id":2,"score":98.00}

要转换,可以使用类似的代码

df.select(to_json(struct($"student_id",$"score")).alias("value"))
68de4m5k

68de4m5k2#

热释光;你用的是过时的spark版本。写入在2.2及更高版本中启用。
开箱即用,您可以使用kafka sql连接器(与结构化流媒体相同)。包括 spark-sql-kafka 在你的依赖中。
将数据转换为 DataFrame 至少包含 value 类型的列 StringType 或者 BinaryType .
向Kafka写入数据:

df   
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", server)
  .save()

有关详细信息,请遵循结构化流式处理文档(从将批处理查询的输出写入kafka开始)。

相关问题