spark structured streaming:在滚动窗口结束时输出结果,而不是批处理

wecizke3  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(419)

我希望Spark流的输出在翻滚窗口的末尾发送到接收器,而不是以批处理间隔发送。
我从一个Kafka流中读取并输出到另一个Kafka流。
查询和写入输出的代码如下所示:

Dataset<Row> sqlResult = session.sql("select window, user, sum(amount) as amount from users where type = 'A' group by window(timestamp, '1 minute', '1 minute'), user");
sqlResult = sqlResult.select(to_json(struct("window", "user", "amount")).as("value"));

StreamingQuery query = sqlResult.writeStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "aggregated-topic")
    .option("checkpointLocation", "c:/tmp")
    .outputMode(OutputMode.Update())
    .start();

当我在1分钟的窗口内为一个特定用户发送多个记录时,我希望在1分钟结束时这些事件的总数。
但是我在输出kafka流上得到了多个输出,并向其中写入了间歇聚合。
如。
我将在一分钟内发送以下7条记录,但间隔一定时间。

>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}

我得到的结果是:

{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":10.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":20.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":40.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":60.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":70.0}

可以看到,输出在同一个窗口中,但是有多个输出。
我想要的是一分钟结束时的单个输出

{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":70.0}

我怎样才能做到呢?

ddrv8njm

ddrv8njm1#

在将流写入接收器时,需要设置处理触发器。
使用datastreamwriter的.trigger(trigger.processingtime)和适当的触发器值。

StreamingQuery query = sqlResult.writeStream()
        .trigger(Trigger.ProcessingTime("1 minute")) //this

相关问题