pyspark 如何在没有json Schema的情况下使用Spark Stream从Kafka获取Dataframe?

vkc1a9a2  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(150)

我是个Spark新人
我正在尝试使用Spark Stream阅读Kafka主题。
从Kafka流出的数据的“value”字段是一个json字符串。我想将这个“value”字段转换为一个字符串,并将其更改为一个parquet文件。
我想从value字段中包含的字符串值中获取模式信息。原因是,JSON数据字段继续添加
比如像这样的Kafka数据。
| 关键|值|......这是什么?|
| --|--|--|
| 0 |“{a:1,B:2,c:3}.”|......这是什么?|
| 1 |“{a:1,B:2,c:3,d:4}..”|......这是什么?|
我正在尝试这个代码

  1. source_df = streaming_data.selectExpr("CAST(value AS STRING)").alias("value") \
  2. .select(from_json("value", schema_of_json(streaming_data.select('value').first().getString(0)))).alias("data") \
  3. .select("data.*")

字符串
我得到了错误pyspark.sql.utils.AnalysisException:与流源的连接必须用writeStream.start()执行;
请帮

oiopk7p5

oiopk7p51#

选项1:硬编码模式并在F.from_json()中使用它。

  1. my_schema = T.StructType([
  2. T.StructField('a', T.IntegerType()),
  3. T.StructField('b', T.IntegerType()),
  4. T.StructField('c', T.IntegerType()),
  5. T.StructField('d', T.IntegerType()),
  6. ])
  7. value = F.col('value').cast(T.StringType())
  8. data = F.from_json(value, my_schema).alias('data')
  9. source_df = streaming_data.select(data).select('data.*')

字符串
备选方案2:如果你想动态地推断模式,你可以使用foreachbatch。但是要注意,这是有风险的,破坏模式的改变会使流查询失败。而且不能保证模式会被正确地推断。

  1. def parse_and_process(df: DataFrame, epoch_id: int) -> None:
  2. # cache the current micro batch, it will be scanned more than once
  3. df.persist()
  4. # infer the schema of the current batch
  5. spark = SparkSession.getActiveSession()
  6. value = F.col('value').cast(T.StringType())
  7. inferred_df = spark.read.json(
  8. df.select(value).rdd.map(lambda x: x[0]),
  9. dropFieldIfAllNull=True
  10. )
  11. inferred_schema = inferred_df.schema
  12. # parse the json with the schema
  13. res_df = df.withColumn('data', F.from_json(value, inferred_schema))
  14. # process the DataFramee, it's not a streaming DataFrame anymore.
  15. res_df.write....
  16. df.unpersist()
  17. streaming_data.writeStream.foreachBatch(parse_and_process).start()

展开查看全部

相关问题