我想把推特上的数据写进Kafka。出于教育目的,我尝试使用结构化流媒体来实现这一点。我基于socket源代码创建了一个twitter源代码,效果很好。
我的来源如下:
val tweets = spark
.readStream
.format("twitter")
.option("query", terms)
.load()
.as[SparkTweet]
这给了我一个很好的分析查询数据集。太好了!
接下来,我想将每个tweet以稍微稀疏的模式保存到kafka中:
val kafkaOutStream = tweets
.toJSON.as("value")
.writeStream
.queryName("stream_to_kafka")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime("1 second"))
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("topic","tweets")
.start
那很简单!只是,它不起作用。在 QueryExecution.scala
通话进入 assertSupported
最终被赶出去,因为
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Queries with streaming sources must be executed with writeStream.start();;
我没想到 toJSON
是一个纯粹的批量操作,但没有它,并使用说 select($"text" as "value")
相反,代码将起作用。
现在,我有点目瞪口呆,希望有人能解释为什么tojson不应该与流媒体兼容(这是一个bug吗?一个缺少的功能?),并告诉是否有一个结构化的流式处理方式获得我的对象到Kafka序列化表示。
1条答案
按热度按时间hujrc8aj1#
有点冗长,但是
to_json
函数应该执行以下操作:问题在于
toJSON
似乎是这种对rdd的转换:而且(正如maasg在评论中指出的)似乎已经在开发版本中解决了。