我有一个主题,其中所有日志都被推到集中的主题,但如果可能的话,我想过滤掉一些记录到一个单独的主题和集群中。谢谢
dy2hfwbg1#
kafka streams不允许使用来自不同kafka群集的源和输出主题创建流。所以下面的代码将不适用于您
streamsBuilder.stream(sourceTopicName).filter(..).to(outputTopicName)
在本例中,它期望outputtopicname与topic sourcetopicname来自同一集群。作为一种解决方法,为了将消息从另一个集群发送到输出主题,您可以使用另外创建的kafkaproducer with property bootstrap.servers 指向外部集群和 KStream.foreach() 方法。
bootstrap.servers
KStream.foreach()
streamsBuilder.stream(sourceTopicName) .filter((key, value) -> ..) .foreach((key, value) -> sendMessage(kafkaProducerFromAnotherCluster, destinationTopicName, key, value); public static void sendMessage(KafkaProducer<String, String> kafkaProducer, String destinationTopicName, String key, String value) { try { kafkaProducer.send(new ProducerRecord(destinationTopicName, key, value)); } catch (RuntimeException ex) { log.error(errorMessage, ex); } }
另一个选项是在kafka集群中创建输出主题,该主题将过滤消息,并在两个集群之间设置kafka镜像(因此消息将从一个主题复制到另一个集群的第二个主题)。
1条答案
按热度按时间dy2hfwbg1#
kafka streams不允许使用来自不同kafka群集的源和输出主题创建流。所以下面的代码将不适用于您
在本例中,它期望outputtopicname与topic sourcetopicname来自同一集群。
作为一种解决方法,为了将消息从另一个集群发送到输出主题,您可以使用另外创建的kafkaproducer with property
bootstrap.servers
指向外部集群和KStream.foreach()
方法。另一个选项是在kafka集群中创建输出主题,该主题将过滤消息,并在两个集群之间设置kafka镜像(因此消息将从一个主题复制到另一个集群的第二个主题)。