如何将两个不同的喷口的输出发送到同一个螺栓?

u5rb5r59  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(279)

我有两个Kafka喷口的价值观,我想发送到同一螺栓。
有可能吗?

eh57zj3b

eh57zj3b1#

是的,有可能:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new KafkaSpout(...));
b.setSpout("topic_2", new KafkaSpout(...));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1").shuffleGrouping("topic_2");

您也可以使用任何其他分组。
更新:
为了区分consumer bolt中的元组(即主题1或主题2),有两种可能性:
1) 您可以使用操作员ID(按照@user-4870385的建议):

if(input.getSourceComponent().equalsIgnoreCase("topic_1")) {
    //do something
} else {
    //do something
}

2) 您可以使用流名称(正如@zenbeni所建议的)。对于这种情况,两个喷口都需要声明已命名的流,并且螺栓需要通过流名称连接到喷口:

public class MyKafkaSpout extends KafkaSpout {
  final String streamName;

  public MyKafkaSpout(String stream) {
    this.streamName = stream;
  }

  // other stuff omitted

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // compare KafkaSpout.declareOutputFields(...)
    declarer.declare(streamName, _spoutConfig.scheme.getOutputFields());
  }
}

构建拓扑,现在需要使用流名称:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new MyKafkaSpout("stream_t1"));
b.setSpout("topic_2", new MyKafkaSpout("stream_t2"));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1", "stream_t1").shuffleGrouping("topic_2", "stream_t2");

MyBolt 流名称现在可以用来区分输入元组:

// in my MyBolt.execute():
if(input.getSourceStreamId().equals("Topic1")) {
  // do something
} else {
  // do something
}

讨论:
虽然使用流名称的第二种方法更自然(根据@zenbeni),但第一种方法更灵活(ihmo)。流名称由spout/bolt直接声明(即,在编写spout/bolt代码时);相反,当拓扑组合在一起时(即,在使用喷口/螺栓时)分配操作员ID。
假设我们得到三个螺栓作为类文件(没有源代码)。前两个应该用作生产者,并且都用相同的名称声明输出流。如果第三个使用者通过流来区分输入元组,这将不起作用。即使两个给定的producer bolt声明不同的输出流名称,预期的输入流名称也可能在consumer bolt中硬编码,并且可能不匹配。因此,它也不起作用。但是,如果使用者bolt使用组件名称(即使它们是硬编码的)来区分传入的元组,则可以正确分配预期的组件id。
当然,可以从给定的类继承(如果没有声明的话) final 并覆盖 declareOutputFields(...) 以便分配自己的流名称。然而,这是更多的额外工作要做。

lg40wkob

lg40wkob2#

是的,这是可能的。你可以让任何一个喷口和同一个螺栓说话。参考https://storm.apache.org/documentation/tutorial.html “溪流”部分。

相关问题