如何在Apache Camel中更新Kafka主题JSON值?

yzxexxkh  于 2023-08-05  发布在  Apache
关注(0)|答案(2)|浏览(135)

我是Apache Kafka新手,从Kafka主题中阅读了以下消息:

{
    "ordertime": 1497014222380,
    "orderid": 18,
    "itemid": "Item_184"
}

字符串
我只需要更新ordertime字段,但不知道如何将消息传递到这个Kafka主题。那么,我应该在Apache Camel中做这样的东西吗?:

Producer<String, String> producer = null;

    Properties props = new Properties();
    props.put("ordertime", "123245545454");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    producer = new KafkaProducer<>(props);

    try {
        producer = new KafkaProducer<String, String>(props);
    } catch (Exception e) {
        e.printStackTrace();
    }
    
    producer.close();

wqlqzqxt

wqlqzqxt1#

您通常会创建一个Map到JSON有效负载的Bean

public class Order {
   private Long ordertime;
   private Long orderid;
   private Long itemid;

   public Order(Long ordertime, Long orderid, Long itemid) { ... }

   // getters
}

字符串
然后,您将使用Producer<String, Order>(假设您的密钥类型为String)

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");
props.put("schema.registry.url", "http://127.0.0.1:8081");
Producer<String, Order> producer = new KafkaProducer<>(props);
...
Order order = new Order(1, 2, 3);
String key = "someString";
producer.send(key, order);


参见the kafka json serializer docs

jdgnovmf

jdgnovmf2#

如果您使用的是Camel,那么您可以使用Processor来拦截和转换数据
https://camel.apache.org/manual/processor.html
您的其他选项在https://camel.apache.org/components/3.21.x/eips/transform-eip.html中提到
否则,在Kafka中,您可以使用Kafka Streams做同样的事情,使用map()函数
https://kafka.apache.org/35/documentation/streams/developer-guide/dsl-api.html

相关问题