使用scala和spark将流Dataframe转换为sparkDataframe

laximzn5  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(508)

我有下面的流Dataframe。

+----------------------------------
|______value______________________| 
| I am going to school ?        |   
| why are you crying ? ?       | 
| You are not very good my friend |

我用下面的代码创建了上面的Dataframe

val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("failOnDataLoss", false)
      .option("subscribe", "myTopic.raw")
      .load()

我想将相同的流Dataframe存储到sparkDataframe中。在scala和spark中有可能转换成这样吗?因为最后我想把sparkDataframe转换成一个句子列表。streamdataframe的问题是我无法将它直接转换成一个列表,我可以迭代并执行一些数据处理操作。

3zwjbxry

3zwjbxry1#

您应该能够对从kafka获得的流执行许多标准操作,但是您需要考虑批处理和流处理之间的语义差异—请参阅spark文档。
另外,当您从kafka获取数据时,列的集合是固定的,您将获得一个二进制有效负载,您需要将 value 列到字符串或类似的内容(请参见文档):

val df = readStream.select($"value".cast("string").alias("sentences"))

之后,您将获得一个具有实际有效负载的Dataframe,并开始处理。根据处理的复杂性,您可能需要恢复foreachbatch功能,但这可能不是必需的—您需要提供有关需要执行哪种处理的更多详细信息。

相关问题