我有下面的流Dataframe。
+----------------------------------
|______value______________________|
| I am going to school ? |
| why are you crying ? ? |
| You are not very good my friend |
我用下面的代码创建了上面的Dataframe
val readStream = existingSparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("failOnDataLoss", false)
.option("subscribe", "myTopic.raw")
.load()
我想将相同的流Dataframe存储到sparkDataframe中。在scala和spark中有可能转换成这样吗?因为最后我想把sparkDataframe转换成一个句子列表。streamdataframe的问题是我无法将它直接转换成一个列表,我可以迭代并执行一些数据处理操作。
1条答案
按热度按时间3zwjbxry1#
您应该能够对从kafka获得的流执行许多标准操作,但是您需要考虑批处理和流处理之间的语义差异—请参阅spark文档。
另外,当您从kafka获取数据时,列的集合是固定的,您将获得一个二进制有效负载,您需要将
value
列到字符串或类似的内容(请参见文档):之后,您将获得一个具有实际有效负载的Dataframe,并开始处理。根据处理的复杂性,您可能需要恢复foreachbatch功能,但这可能不是必需的—您需要提供有关需要执行哪种处理的更多详细信息。