然而,我要做的正是这个问题所涉及的内容;在我的例子中,我使用的是python/pyspark而不是scala。
我试图提取kafka connect消息中包含schema的“payload”部分。
示例消息:
{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}
步骤1-定义“有效负载”部分的模式:
payload_schema = StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)])
第二步-读Kafka的话:
df =spark.readStream.format("kafka")
步骤3-从kafka消息获取消息值:
kafka_df = df.selectExpr("CAST(value AS STRING)")
步骤4-仅提取“有效负载”(我被困在这里):
import pyspark.sql.functions as psf
emp_df = kafka_df\
.select(psf.from_json(psf.col('value'), payload_schema).alias("DF"))\
.select("DF.*")
我被困在这一部分中,因为在将有效负载传递给from\ json()函数之前,我不知道如何从json字符串中提取有效负载。
注意:我知道我需要为整个消息定义完整的模式,然后才能在from\u json()中使用它;我尝试只获取“payload”json字符串部分。
2条答案
按热度按时间4dc9hkyq1#
您可以使用sql函数
get_json_object
:或者,您需要为整个消息定义完整的模式,然后才能在中使用它
from_json
.这意味着您的模式应该如下所示:
请仔细检查这个模式定义,因为我不完全确定如何在python模式中定义数组,但我希望这个想法很清楚。
一旦完成,您就可以通过
o3imoua42#
出于某种原因,我错过了pyspark的get_json_object()函数。在迈克的评论之后,我又回到了文档中,找到了我要找的东西。
答案如下: