使用spark structured streaming(pyspark)从kafka connect jsonconverter消息中提取“有效负载”(schema&payload)

col17t5w  于 2021-05-24  发布在  Spark
关注(0)|答案(2)|浏览(487)

然而,我要做的正是这个问题所涉及的内容;在我的例子中,我使用的是python/pyspark而不是scala。
我试图提取kafka connect消息中包含schema的“payload”部分。
示例消息:

{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}

步骤1-定义“有效负载”部分的模式:

payload_schema = StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)])

第二步-读Kafka的话:

df =spark.readStream.format("kafka")

步骤3-从kafka消息获取消息值:

kafka_df = df.selectExpr("CAST(value AS STRING)")

步骤4-仅提取“有效负载”(我被困在这里):

import pyspark.sql.functions as psf

    emp_df = kafka_df\
    .select(psf.from_json(psf.col('value'), payload_schema).alias("DF"))\
    .select("DF.*")

我被困在这一部分中,因为在将有效负载传递给from\ json()函数之前,我不知道如何从json字符串中提取有效负载。
注意:我知道我需要为整个消息定义完整的模式,然后才能在from\u json()中使用它;我尝试只获取“payload”json字符串部分。

4dc9hkyq

4dc9hkyq1#

您可以使用sql函数 get_json_object :

import pyspark.sql.functions as psf

kafka_df
  .select(psf.get_json_object(kafka_df['value'],"$.payload").alias('payload'))
  .select(psf.from_json(psf.col('payload'), payload_schema).alias("DF"))
  .select("DF.*")

或者,您需要为整个消息定义完整的模式,然后才能在中使用它 from_json .
这意味着您的模式应该如下所示:

full_schema = StructType([
  StructField("schema", StructType([
    StructField("type", StringType(), False),
    StructField("name", StringType(), False),
    StructField("fields", StructType([
      StructField("field", StringType(), False),
      StructField("type", StringType(), False)
    ]),
  StructField("payload", StructType([
    StructField("emp_id", StringType(), False),
    StructField("emp_name", StringType(), True),
    StructField("city", StringType(), True),
    StructField("emp_sal", StringType(), True),
    StructField("manager_name", StringType(), True)
  ])
])

请仔细检查这个模式定义,因为我不完全确定如何在python模式中定义数组,但我希望这个想法很清楚。
一旦完成,您就可以通过

import pyspark.sql.functions as psf

    emp_df = kafka_df\
    .select(psf.from_json(psf.col('value'), full_schema).alias("DF"))\
    .select("DF.payload.*")
o3imoua4

o3imoua42#

出于某种原因,我错过了pyspark的get_json_object()函数。在迈克的评论之后,我又回到了文档中,找到了我要找的东西。
答案如下:

kafka_df = df.selectExpr("CAST(value AS STRING)")
    payload_df = kafka_df.select(psf.get_json_object(kafka_df.value, "$.payload").alias("payload"))
    emp_df = payload_df.select(psf.from_json(psf.col('payload'), schema).alias("DF")).select("DF.*")

相关问题