我正在尝试使用同步pullapi使用googlepubsub消息。这在apache beam google pubio连接器库中提供。我想用kafkaio把消耗的信息写给kafka。我想使用flinkrunner来执行作业,因为我们在gcp之外运行这个应用程序。
我面临的问题是,在gcp pubsub中消耗的消息没有得到确认。我已经确认本地kafka示例有从gcp pubsub消费的消息。gcpdataflow中的文档表明,当管道以数据接收器(在我的例子中是kafka)终止时,数据包就完成了。
但是,由于代码是在apache flink中运行的,而不是gcp数据流,因此我认为与确认提交的消息相关的某种回调不会被触发。
我做错什么了?
pipeline
.apply("Read GCP PubSub Messages", PubsubIO.readStrings()
.fromSubscription(subscription)
)
.apply(ParseJsons.of(User.class))
.setCoder(SerializableCoder.of(User.class))
.apply("Filter-1", ParDo.of(new FilterTextFn()))
.apply(AsJsons.of(User.class).withMapper(new ObjectMapper()))
.apply("Write to Local Kafka",
KafkaIO.<Void,String>write()
.withBootstrapServers("127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094")
.withTopic("test-topic")
.withValueSerializer((StringSerializer.class))
.values()
);
2条答案
按热度按时间jdg4fx2g1#
我解决这个问题的方法是使用纪尧姆·布拉奎尔的方法(https://stackoverflow.com/users/11372593/guillaume-blaquiere)建议查看检查点。即使在管道中添加了window.into()函数,源pubsub订阅终结点也没有收到ack。
问题出在flink服务器配置上,我没有提到检查点配置。如果没有这些参数,将禁用检查点。
这些配置应该放在flink\u home/conf/flink-conf.yaml中。在添加这些条目并重新启动flink之后。在gcp pubsub监控图表中,所有积压的(未确认的消息)都变为0。
bxfogqkk2#
在pubio子类的beam文档中提到:
检查点用于将接收到的消息确认回pubsub(以便它们可以在pubsub端失效),以及在需要恢复检查点时对已使用的消息进行nack(以便pubsub将立即重新发送这些消息)。
如果ack没有链接到数据流,则应该在数据流上具有相同的行为。ack通过检查点发送。通常检查点是在流上设置的窗口。
但是,你没有设置窗口!默认情况下,窗口是全局的,只有在结束时才关闭,如果你优雅地停止你的工作(甚至,我也不确定这一点)。无论如何,一个更好的解决方案是有固定的窗口(例如5分钟)来确认每个窗口上的消息。