Apache·Flink和Apache脉冲星

to94eoyn  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(484)

我用flink读取apachepulsar的数据。我在pulsar有一个分区主题,有8个分区。我在这个主题中生成了1000条消息,分布在8个分区中。我的笔记本电脑有8个内核,所以我有8个子任务(默认情况下并行度=#个内核)。在执行eclipse的代码之后,我打开了flinkui,我发现一些子任务没有收到任何记录(空闲)。我期望所有8个子任务都能被利用(我期望每个子任务都Map到我的主题中的一个分区)。
重新启动作业后,我发现有时使用3个子任务,有时使用4个任务,而其余子任务保持空闲。
请您支持澄清这一情况。
另外,我怎么知道有一个洗牌子采取与否?
我的代码:

ConsumerConfigurationData<String> consumerConfigurationData = new ConsumerConfigurationData<>();

Set<String> topicsSet = new HashSet<>();
topicsSet.add("flink-08");

consumerConfigurationData.setTopicNames(topicsSet);
consumerConfigurationData.setSubscriptionName("my-sub0111");
consumerConfigurationData.setSubscriptionType(SubscriptionType.Key_Shared);
consumerConfigurationData.setConsumerName("consumer-01");
consumerConfigurationData.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);

PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema()).pulsarAllConsumerConf(consumerConfigurationData).serviceUrl("pulsar://localhost:6650");

SourceFunction<String> src = builder.build();
DataStream<String> stream = env.addSource(src);

stream.print(" >>> ");
ruarlubt

ruarlubt1#

关于脉冲星的问题,我知道的还不够。我建议设置一个更大的测试,看看结果如何。通常,您的分区比插槽多,并且一些插槽以某种随机的方式使用多个分区。
另外,我怎么知道有一个洗牌子采取与否?
最简单的方法是查看flinkwebui的拓扑结构。在那里您应该可以看到任务的数量和通道类型。你可以发布一个截图,如果你想要更多的细节,但在这种情况下,没有什么会被洗牌,因为你只有一个源和一个接收器。

相关问题