我有一个使用pubsubio读取消息的管道,这个管道发布了很多消息,但不幸的是我没有得到确认。这是读取消息的代码
public CampaignMetricStream() {
super(Options.class);
}
@Override
public PCollection<String> source(Pipeline pipeline, Options options) {
return pipeline.apply("pubsub-pull-events", PubsubIO
.readStrings()
.fromSubscription(options.getSubscription()));
}
我很确定这不是代码中的问题,而是配置中的问题。
flink配置在k8s集群上
taskmanager.numberOfTaskSlots: 64
taskmanager.memory.managed.size: 0
jobmanager.memory.process.size: 1g
taskmanager.memory.process.size: 50g
parallelism.default: 2
state.backend: filesystem
state.checkpoints.dir: file:///data/flink/checkpoints
taskmanager.memory.jvm-metaspace.size: 4g
管道的部署参数如下:
--runner=FlinkRunner
--flinkMaster=localhost:8081
--checkpointingInterval=30000
--parallelism=20
--numConcurrentCheckpoints=200
--autoBalanceWriteFilesShardingEnabled=true
我有很多未送达的信息,我找不到解决办法。
暂无答案!
目前还没有任何答案,快来回答吧!