我使用的是spark 2.1.0和kafka 0.9.0。
我正试图把批量Spark作业的输出推给Kafka。作业应该每小时运行一次,但不能像流媒体那样运行。
在网上寻找答案的时候,我只能找到Kafka与spark streaming的集成,而没有找到与批处理作业的集成。
有人知道这样做是否可行吗?
谢谢
更新:
正如user8371915所提到的,我试图遵循将批处理查询的输出写入kafka时所做的操作。
我用了一个Spark壳:
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
下面是我尝试的简单代码:
val df = Seq(("Rey", "23"), ("John", "44")).toDF("key", "value")
val newdf = df.select(to_json(struct(df.columns.map(column):_*)).alias("value"))
newdf.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "alerts").save()
但我得到了一个错误:
java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider does not allow create table as select.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:497)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
... 50 elided
你知道这和什么有关吗?
谢谢
2条答案
按热度按时间mw3dktmi1#
对于此错误,java.lang.runtimeexception:org.apache.spark.sql.kafka010.kafkasourceprovider不允许将表创建为select。在scala.sys.package$.error(package。scala:27)
我认为您需要将消息解析为键值对。您的Dataframe应该有值列。
假设你有一个带有学生id的数据框,分数。
然后您应该将Dataframe修改为
要转换,可以使用类似的代码
68de4m5k2#
热释光;你用的是过时的spark版本。写入在2.2及更高版本中启用。
开箱即用,您可以使用kafka sql连接器(与结构化流媒体相同)。包括
spark-sql-kafka
在你的依赖中。将数据转换为
DataFrame
至少包含value
类型的列StringType
或者BinaryType
.向Kafka写入数据:
有关详细信息,请遵循结构化流式处理文档(从将批处理查询的输出写入kafka开始)。