java—从kafka主题读取消息并将其转储到hdfs中

rta7y2nd  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(433)

我试图使用kafka主题中的数据,将其加载到数据集,然后在加载到hdfs之前执行filter。
我可以从Kafka主题消费,加载到数据集,并保存为Parquet文件在hdfs,但不能执行过滤条件。你能分享一下在保存到hdfs之前进行过滤的方法吗?我用java和spark来消费来自kafka的主题。我的部分代码如下:

DataframeDeserializer dataframe = new DataframeDeserializer(dataset);

 ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);

StreamingQuery query = ds.coalesce(10)
                .writeStream()
                .format("parquet")
                .option("path", path.toString())
                .option("checkpointLocation", "<your path>")
                .trigger(Trigger.Once())
                .start();
tpxzln5u

tpxzln5u1#

在之前写入过滤器逻辑 coalesceds.filter().coalesce() ```
DataframeDeserializer dataframe = new DataframeDeserializer(dataset);

ds = dataframe.fromConfluentAvro("value", , , RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);

StreamingQuery query =
ds
.filter(...) // Write your filter condition here
.coalesce(10)
.writeStream()
.format("parquet")
.option("path", path.toString())
.option("checkpointLocation", "")
.trigger(Trigger.Once())
.start();

l2osamch

l2osamch2#

与其重新发明轮子,我强烈建议Kafka连接。您只需要hdfs接收器连接器,它将kafka主题中的数据复制到hdfs。
对于hdfs 2.x文件,可以使用hdfs 2接收器连接器
对于hdfs 3.x文件,请使用hdfs 3接收器连接器

相关问题