我想用nifi中的consumekafka来消费一个主题。从中,我想取出一个键值对,并使用putkafka将其放入另一个kafka主题中。我如何做到这一点,并避免重复的基础上,一个特定的关键?
zhte4eai1#
如果来自Kafka流的消息将以标准格式接收,这将是一件容易的事情。如果接收的消息是json dormat格式的,下面是执行此操作的步骤。配置 ConsumeKafka 从一个主题接收使用 EvaluateJsonPath 解析和读取特定键的值,并将其分配给nifi flowfile属性使用 ReplaceText 处理器来形成您自己的消息(其中包含已解析的密钥),您希望将其发送到另一个Kafka主题将流程与 PutKafka 有关evaluatejsonpath的更多详细信息,请参见此处和此处
ConsumeKafka
EvaluateJsonPath
ReplaceText
PutKafka
1条答案
按热度按时间zhte4eai1#
如果来自Kafka流的消息将以标准格式接收,这将是一件容易的事情。如果接收的消息是json dormat格式的,下面是执行此操作的步骤。
配置
ConsumeKafka
从一个主题接收使用
EvaluateJsonPath
解析和读取特定键的值,并将其分配给nifi flowfile属性使用
ReplaceText
处理器来形成您自己的消息(其中包含已解析的密钥),您希望将其发送到另一个Kafka主题将流程与
PutKafka
有关evaluatejsonpath的更多详细信息,请参见此处和此处