我是Apache Kafka新手,从Kafka主题中阅读了以下消息:
{
"ordertime": 1497014222380,
"orderid": 18,
"itemid": "Item_184"
}
字符串
我只需要更新ordertime
字段,但不知道如何将消息传递到这个Kafka主题。那么,我应该在Apache Camel中做这样的东西吗?:
Producer<String, String> producer = null;
Properties props = new Properties();
props.put("ordertime", "123245545454");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
try {
producer = new KafkaProducer<String, String>(props);
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
型
2条答案
按热度按时间wqlqzqxt1#
您通常会创建一个Map到JSON有效负载的Bean
字符串
然后,您将使用
Producer<String, Order>
(假设您的密钥类型为String)型
参见the kafka json serializer docs
jdgnovmf2#
如果您使用的是Camel,那么您可以使用Processor来拦截和转换数据
https://camel.apache.org/manual/processor.html
您的其他选项在https://camel.apache.org/components/3.21.x/eips/transform-eip.html中提到
否则,在Kafka中,您可以使用Kafka Streams做同样的事情,使用
map()
函数https://kafka.apache.org/35/documentation/streams/developer-guide/dsl-api.html的