我从一个kafka主题中读取数据,并以分区模式将其放入azureadls(类似hdfs)。
我的代码如下:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("failOnDataLoss", false)
.load()
.selectExpr(/*"CAST(key AS STRING)",*/ "CAST(value AS STRING)").as(Encoders.STRING)
df.writeStream
.partitionBy("year", "month", "day", "hour", "minute")
.format("parquet")
.option("path", outputDirectory)
.option("checkpointLocation", checkpointDirectory)
.outputMode("append")
.start()
.awaitTermination()
我有大约2000条记录/秒,我的问题是spark每45秒插入一次数据,我希望立即插入数据。
有人知道如何控制微批量的大小吗?
1条答案
按热度按时间pexxcrt21#
从spark 2.3版本开始,可以使用连续处理模式。在官方文件里。您可以看到,此模式仅支持三个接收器,并且只有kafka接收器可用于生产,并且“最好使用kafka作为源和接收器来观察端到端的低延迟处理”
所以,现在看来,你可以´t使用连续模式将hdfs用作接收器。在你的情况下,也许你可以测试阿克卡流和阿尔帕卡连接器