如何解决dataset.tojson与结构化流不兼容的问题

6pp0gazn  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(311)

我想把推特上的数据写进Kafka。出于教育目的,我尝试使用结构化流媒体来实现这一点。我基于socket源代码创建了一个twitter源代码,效果很好。
我的来源如下:

val tweets = spark
  .readStream
  .format("twitter")
  .option("query", terms)
  .load()
  .as[SparkTweet]

这给了我一个很好的分析查询数据集。太好了!
接下来,我想将每个tweet以稍微稀疏的模式保存到kafka中:

val kafkaOutStream = tweets
  .toJSON.as("value")
  .writeStream
  .queryName("stream_to_kafka")
  .outputMode(OutputMode.Append())
  .trigger(Trigger.ProcessingTime("1 second"))
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("topic","tweets")
  .start

那很简单!只是,它不起作用。在 QueryExecution.scala 通话进入 assertSupported 最终被赶出去,因为

Exception in thread "main" org.apache.spark.sql.AnalysisException:
    Queries with streaming sources must be executed with writeStream.start();;

我没想到 toJSON 是一个纯粹的批量操作,但没有它,并使用说 select($"text" as "value") 相反,代码将起作用。
现在,我有点目瞪口呆,希望有人能解释为什么tojson不应该与流媒体兼容(这是一个bug吗?一个缺少的功能?),并告诉是否有一个结构化的流式处理方式获得我的对象到Kafka序列化表示。

hujrc8aj

hujrc8aj1#

有点冗长,但是 to_json 函数应该执行以下操作:

import org.apache.spark.sql.functions.{to_json, struct, col}

tweets.select(to_json(struct(df.columns map col: _*)).alias("value"))
  .writeStream
  ...

问题在于 toJSON 似乎是这种对rdd的转换:

val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
  ...

而且(正如maasg在评论中指出的)似乎已经在开发版本中解决了。

相关问题