如何将Kafka的信息从一个主题传递到另一个主题?

zbdgwd5y  于 2021-06-05  发布在  Kafka
关注(0)|答案(3)|浏览(590)

假设我的制作人正在将消息写入主题a…一旦消息位于主题a中,我就要将相同的消息复制到主题b。这在Kafka有可能吗?

ezykj2lf

ezykj2lf1#

将一个主题的内容转发给另一个主题有两个即时选项:
利用kafka的流特性在两个主题之间建立转发链接。
通过创建一个消费者/生产者对,并使用它们来接收和转发消息
我有一段简短的代码(在scala中)显示了这两个方面:

def topologyPlan(): StreamsBuilder = {
    val builder = new StreamsBuilder
    val inputTopic: KStream[String, String] = builder.stream[String, String]("topic2")
    inputTopic.to("topic3")
    builder
  }

  def run() = {
    val kafkaStreams = createStreams(topologyPlan())
    kafkaStreams.start()

    val kafkaConsumer = createConsumer()
    val kafkaProducer = createProducer()
    kafkaConsumer.subscribe(List("topic1").asJava)
    while (true) {
      val record = kafkaConsumer.poll(Duration.ofSeconds(5)).asScala
      for (data <- record.iterator) {
        kafkaProducer.send(new ProducerRecord[String, String]("topic2", data.value()))
      }
    }
  }

在run方法中,前两行设置了一个streams对象,该对象使用topologyplan()侦听“topic2”中的消息,然后转发到“topic3”。
剩下的几行显示了消费者如何收听“topic1”并使用制作人将其发送到“topic2”。
这里示例的最后一点是,kafka非常灵活,允许您根据需要混合选项,因此上面的代码将在“topic1”中接收消息,并通过“topic2”将它们发送到“topic3”。
如果您想查看设置使用者、生产者和流的代码,请参阅此处的完整类。

nbysray5

nbysray52#

如果我理解正确,你只需要 stream.to("topic-b") 不过,如果不对数据做些什么,这似乎很奇怪。
注:
指定的主题应在使用前手动创建

svmlkihl

svmlkihl3#

我不清楚通过简单地将数据从一个主题复制到另一个主题,您到底想要实现什么用例。如果两个主题都在同一个kafka集群中,那么让两个主题具有相同的消息/内容永远不是一个好主意。
我相信这里的差距是,你可能不清楚Kafka的消费群体的概念。通过使用Kafka主题的信息,您可能有两个动作项要做。您认为,如果第一个应用程序使用来自kafka主题的消息,那么第二个应用程序是否可以使用相同的消息。kafka允许您在消费者群体的帮助下解决这种常见用例。
让我们尝试区分其他消息队列和kafka,您将了解不需要在两个主题之间复制相同的数据/消息。
在其他消息队列中,如sqs(简单队列服务),如果消息被使用者使用,则同一消息不可被其他使用者使用。消费者有责任在处理完信息后安全地删除信息。通过这样做,我们可以保证相同的消息不会被两个使用者处理而导致不一致。
但是,在Kafka,有多组消费者从同一主题消费是完全好的。这组消费者形成了一个群体,通常称为消费者群体。在这里,消费者组中的一个消费者可以根据消息所使用的kafka主题的分区来处理消息。
现在的关键是,我们可以有多个消费群体从同一个Kafka主题消费。每个消费群体都将以他们想要的方式处理消息。两个不同消费群体的消费者之间没有干扰。
为了实现您的用例,我相信您可能需要两个消费者组,他们可以简单地以他们想要的方式处理消息。基本上不必在两个主题之间复制数据。
希望这有帮助。

相关问题