如何在kafka流中使用特定于示例的主题?

b09cbbtk  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(325)

我有这样的拓扑结构:

Topology topology = new Topology();

//WS connection processor
topology.addSource(WS_CONNECTION_SOURCE, new StringDeserializer(), new WebSocketConnectionEventDeserializer(), KafkaTopics.WS_CONNECTION_EVENTS_TOPIC)
    .addProcessor(SESSION_PROCESSOR, WSUserSessionProcessor::new, WS_CONNECTION_SOURCE)
    .addStateStore(sessionStoreBuilder, SESSION_PROCESSOR)
    .addSink(WS_STATUS_SINK, KafkaTopics.WS_USER_ONLINE_STATUS_TOPIC, stringSerializer, stringSerializer, SESSION_PROCESSOR)

//WS session routing
    .addSource(WS_ROUTING_BY_SESSION_SOURCE, new StringDeserializer(), new StringDeserializer(),
                    KafkaTopics.WS_DELIVERY_TOPIC)
    .addProcessor(WS_ROUTING_BY_SESSION_PROCESSOR, WSSessionRoutingProcessor::new,
                    WS_ROUTING_BY_SESSION_SOURCE)
    .addStateStore(userConnectedNodesStoreBuilder, WS_ROUTING_BY_SESSION_PROCESSOR, SESSION_PROCESSOR)

//WS delivery
    .addSource(WS_DELIVERY_SOURCE, new StringDeserializer(), new StringDeserializer(),
                    INSTANCE_SPECIFIC_TOPIC)
    .addProcessor(WS_DELIVERY_PROCESSOR,  WSDeliveryProcessor::new, WS_DELIVERY_SOURCE);

拓扑中最后提到的源是特定于每个应用程序示例的主题。我希望该主题仅由该示例处理。此主题中的数据由前一个处理器根据处理该消息的示例推送。
但一旦流启动,它就会尝试将特定于示例的主题分区也分配给其他示例。我们能在Kafka流中达到这个要求吗?
我希望一个主题只能由特定示例处理。

nzrxty8p

nzrxty8p1#

你想要的是不可能的。对于kafka streams程序,同一应用程序的所有示例都必须完全相同,因此需要具有相同的输入主题。
您需要将应用程序拆分为4个应用程序:第一个应用程序执行程序的共享分区并写入3个不同的主题。此外,您还有3个应用程序(具有自己的 application.id s) 每个人从一本书中读到这些主题。
请注意,如果需要,可以运行多个 KafkaStreams 同一jvm中的客户端。

相关问题