从spark streaming dataframe中删除(损坏的)不符合架构的行(来自kafka的传入json数据)

lc8prwob  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(368)

我有一个spark结构的蒸汽应用程序,我在读Kafka。这是我的代码的基本结构。
我创建了spark会话。

val spark = SparkSession
  .builder
  .appName("app_name")
  .getOrCreate()

然后我在小溪里看书

val data_stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "server_list")
  .option("subscribe", "topic")
  .load()

在Kafka记录中,我将“值”转换为字符串。它从二进制转换为字符串。此时数据框中有1列

val df = data_stream
    .select($"value".cast("string") as "json")

基于预定义的模式,我尝试将json结构解析为列。但是,这里的问题是,如果数据是“坏的”,或者是不同的格式,那么它与定义的模式不匹配。因此,下一个Dataframe(df2)会将其空值放入列中。

val df2 = df.select(from_json($"json", schema) as "data")
  .select("data.*")

我希望能够从df2中过滤出在某一列(我在数据库中用作主键的列)中有“null”的行,即忽略与模式不匹配的坏数据?
编辑:我有点能做到这一点,但不是我想要的方式。在我的过程中,我使用一个查询 .foreach(writer) 过程。这会打开到数据库的连接,处理每一行,然后关闭连接。结构化流媒体的文档提到了此过程所需的必要性。在process方法中,我从每一行获取值,并检查主键是否为null,如果为null,则不将其插入数据库。

f45qwnt8

f45qwnt81#

只需过滤掉不需要的空值:

df2
  .filter(row => row("colName") != null)
wj8zmpe1

wj8zmpe12#

kafka将数据存储为原始字节数组格式。数据生产者和消费者需要就处理数据的结构达成一致。
如果生成的消息格式发生变化,消费者需要调整以读取相同的格式。当您的数据结构正在演变时,问题就来了,您可能需要在用户端具有兼容的数据结构。
protobuff定义消息格式解决了这个问题。

相关问题