三叉戟拓扑中的多个流

bxgwgixi  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(274)

我有多个不透明Kafka从不同的Kafka主题阅读。我希望所有这些流中的数据通过相同的函数集。实现这一目标的最佳方式是什么。我是否需要创建单独的流并再次将每个元组传递给同一组函数。像下面这样?

BrokerHosts zk = new ZkHosts(getZooKeeperHosts());
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "Test");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
TridentKafkaConfig spoutConf1 = new TridentKafkaConfig(zk, "Test1");
spoutConf1.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout1 = new OpaqueTridentKafkaSpout(spoutConf1);

topology.newStream("event", kafkaSpout).each(new Fields("document"), new ExtractDocumentInfo(), new Fields("id", "index", "type"));
topology.newStream("event1", kafkaSpout1).each(new Fields("document"), new ExtractDocumentInfo(), new Fields("id", "index", "type"));
ar5n3qh5

ar5n3qh51#

您可以将流合并在一起,但任何失败都将导致两个喷口重放批处理。

相关问题