我有一个spark结构的蒸汽应用程序,我在读Kafka。这是我的代码的基本结构。
我创建了spark会话。
val spark = SparkSession
.builder
.appName("app_name")
.getOrCreate()
然后我在小溪里看书
val data_stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server_list")
.option("subscribe", "topic")
.load()
在Kafka记录中,我将“值”转换为字符串。它从二进制转换为字符串。此时数据框中有1列
val df = data_stream
.select($"value".cast("string") as "json")
基于预定义的模式,我尝试将json结构解析为列。但是,这里的问题是,如果数据是“坏的”,或者是不同的格式,那么它与定义的模式不匹配。因此,下一个Dataframe(df2)会将其空值放入列中。
val df2 = df.select(from_json($"json", schema) as "data")
.select("data.*")
我希望能够从df2中过滤出在某一列(我在数据库中用作主键的列)中有“null”的行,即忽略与模式不匹配的坏数据?
编辑:我有点能做到这一点,但不是我想要的方式。在我的过程中,我使用一个查询 .foreach(writer)
过程。这会打开到数据库的连接,处理每一行,然后关闭连接。结构化流媒体的文档提到了此过程所需的必要性。在process方法中,我从每一行获取值,并检查主键是否为null,如果为null,则不将其插入数据库。
2条答案
按热度按时间f45qwnt81#
只需过滤掉不需要的空值:
wj8zmpe12#
kafka将数据存储为原始字节数组格式。数据生产者和消费者需要就处理数据的结构达成一致。
如果生成的消息格式发生变化,消费者需要调整以读取相同的格式。当您的数据结构正在演变时,问题就来了,您可能需要在用户端具有兼容的数据结构。
protobuff定义消息格式解决了这个问题。