迭代流Dataframe中的列值,并使用scala和spark将每个值分配给公共列表

x9ybnkn6  于 2021-07-06  发布在  Spark
关注(0)|答案(1)|浏览(402)

我有以下流Dataframe

  1. +------------------------------------+
  2. |______sentence______________________|
  3. | Representative is a scientist |
  4. | Norman did a good job in the exam |
  5. | you want to go on shopping? |
  6. --------------------------------------

我的清单如下

  1. val myList

作为最终输出,我需要mylist在streamdataframe中包含以上三句话
输出

  1. myList = [Representative is a scientist, Norman did a good job in the exam, you want to go on shopping? ]

我尝试了下面给出的流错误

  1. val myList = sentenceDataframe.select("sentence").rdd.map(r => r(0)).collect.toList

使用上述方法引发错误
org.apache.spark.sql.analysisexception:具有流源的查询必须使用writestream.start()执行
请注意,上述方法适用于正常的Dataframe,但不适用于流Dataframe。
有没有一种方法可以迭代流Dataframe的每一行,并使用scala和spark将行值赋给公共列表?

kgsdhlau

kgsdhlau1#

这听起来像是一个非常奇怪的用例,因为流理论上永远不会结束。你确定你不是在寻找普通的sparkDataframe吗?
如果不是这样,你可以做的是使用蓄能器和Spark流Flume。我使用了一个简单的套接字连接来演示这一点。你可以用nc-lp3030在ubuntu下启动一个简单的socket服务器,然后将消息传递到流中,结果Dataframe的模式是[value:string]

  1. val acc = spark.sparkContext.collectionAccumulator[String]
  2. val stream = spark.readStream.format("socket").option("host", "localhost").option("port", "3030").load()
  3. val query = stream.writeStream.foreachBatch((df: DataFrame, l: Long) => {
  4. df.collect.foreach(v => acc.add(v(0).asInstanceOf[String]))
  5. }).start()
  6. ...
  7. // For some reason you are stopping the stream here
  8. query.stop()
  9. val myList = acc.value

现在你可能有一个问题,为什么我们使用累加器,而不仅仅是数组缓冲区。arraybuffers可以在本地工作,但在集群上,foreachbatch中的代码可能在完全不同的节点上执行。这意味着它不会产生任何影响,这也是蓄能器首先存在的原因(参见https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators)

展开查看全部

相关问题