我试图使用kafka主题中的数据,将其加载到数据集,然后在加载到hdfs之前执行filter。
我可以从Kafka主题消费,加载到数据集,并保存为Parquet文件在hdfs,但不能执行过滤条件。你能分享一下在保存到hdfs之前进行过滤的方法吗?我用java和spark来消费来自kafka的主题。我的部分代码如下:
DataframeDeserializer dataframe = new DataframeDeserializer(dataset);
ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);
StreamingQuery query = ds.coalesce(10)
.writeStream()
.format("parquet")
.option("path", path.toString())
.option("checkpointLocation", "<your path>")
.trigger(Trigger.Once())
.start();
2条答案
按热度按时间tpxzln5u1#
在之前写入过滤器逻辑
coalesce
即ds.filter().coalesce()
```DataframeDeserializer dataframe = new DataframeDeserializer(dataset);
ds = dataframe.fromConfluentAvro("value", , , RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);
StreamingQuery query =
ds
.filter(...) // Write your filter condition here
.coalesce(10)
.writeStream()
.format("parquet")
.option("path", path.toString())
.option("checkpointLocation", "")
.trigger(Trigger.Once())
.start();
l2osamch2#
与其重新发明轮子,我强烈建议Kafka连接。您只需要hdfs接收器连接器,它将kafka主题中的数据复制到hdfs。
对于hdfs 2.x文件,可以使用hdfs 2接收器连接器
对于hdfs 3.x文件,请使用hdfs 3接收器连接器