我有以下流Dataframe
+------------------------------------+
|______sentence______________________|
| Representative is a scientist |
| Norman did a good job in the exam |
| you want to go on shopping? |
--------------------------------------
我的清单如下
val myList
作为最终输出,我需要mylist在streamdataframe中包含以上三句话
输出
myList = [Representative is a scientist, Norman did a good job in the exam, you want to go on shopping? ]
我尝试了下面给出的流错误
val myList = sentenceDataframe.select("sentence").rdd.map(r => r(0)).collect.toList
使用上述方法引发错误
org.apache.spark.sql.analysisexception:具有流源的查询必须使用writestream.start()执行
请注意,上述方法适用于正常的Dataframe,但不适用于流Dataframe。
有没有一种方法可以迭代流Dataframe的每一行,并使用scala和spark将行值赋给公共列表?
1条答案
按热度按时间kgsdhlau1#
这听起来像是一个非常奇怪的用例,因为流理论上永远不会结束。你确定你不是在寻找普通的sparkDataframe吗?
如果不是这样,你可以做的是使用蓄能器和Spark流Flume。我使用了一个简单的套接字连接来演示这一点。你可以用nc-lp3030在ubuntu下启动一个简单的socket服务器,然后将消息传递到流中,结果Dataframe的模式是[value:string]
现在你可能有一个问题,为什么我们使用累加器,而不仅仅是数组缓冲区。arraybuffers可以在本地工作,但在集群上,foreachbatch中的代码可能在完全不同的节点上执行。这意味着它不会产生任何影响,这也是蓄能器首先存在的原因(参见https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators)