我通过使用kafka producer将文件转换为字节数组来发送消息。我还需要为消息添加一些头,例如文件名、时间戳等,以便在使用者端,我可以基于文件名和其他头来处理消息。我目前正在做的是创建一个对象,并将原始消息和头 Package 在其中,然后将对象作为消息以字节数组的形式发送。我想知道是否有一种方法,我可以添加自定义标题,同时发布的消息?
hpxqektj1#
kafka v0.11.0.0增加了对自定义头的支持。您可以在创建producerrecord时添加它们,如下所示:新的producerrecord(key,value,headers,…),其中headers的类型为iterable有关详细信息,请参阅:https://issues.apache.org/jira/browse/kafka-4208https://cwiki.apache.org/confluence/display/kafka/kip-82+-+add+record+headers
biswetbf2#
您可以创建自己的小型java应用程序,将带有标头的消息发送给kafka。用intellij或任何支持的语言编写以下代码ide:-
public static void main(String[] args) throws JsonProcessingException, InterruptedException { Properties props=new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class.getName()); KafkaProducer<String, JsonNode> producer=new KafkaProducer<String, JsonNode>(props); String json = "{ \"f1\" : \"v1\" } "; ObjectMapper mapper = new ObjectMapper(); JsonNode jsonNode = mapper.readTree(json); ProducerRecord<String,JsonNode> record =new ProducerRecord<String, JsonNode>("test topic", jsonNode); record.headers().add(new RecordHeader("key","value1".getBytes())); producer.send(record); Thread.sleep(10000); }
string json=“{\”f1\“:\”v1\“}”-这是我们要使用objectmapper发送给kafka并将其转换为jsonnode形式的键和值。record.headers().add(new recordheader(“key”,“value1”.getbytes())——这是我们发送给kafka的headers数据的键和值。要验证您的数据,您可以在kafka控制中心中检查主题并验证发送的头。
6uxekuva3#
记录级头文件是从kafka0.11.0引入的。我们可以发送每个记录中的标题列表。
List<Header> headers = Arrays.asList(new RecordHeader("header_key", "header_value".getBytes())); ProducerRecord<String, String> record = new ProducerRecord<>("topic", null, "key", "value", headers);
dxxyhpgq4#
我在从事的项目中也遇到过类似的问题,所以我创建了这个简单的库来帮助解决这个问题:https://github.com/leandronunes85/messaging. 目前包含一个基于avro的实现,但是可以扩展到使用您选择的任何其他序列化框架。您只需为流中想要的对象创建(反)序列化程序(基于avro或不基于avro),并让avromessageserializer发挥其魔力。这仍然是一个非常年轻的图书馆,但我觉得它可以节省很多人很多时间!
z4bn682m5#
Kafka是不可知论的信息内容,并没有提供任何特殊的手段来丰富它,所以这是你需要自己做的事情。处理这些事情的一种常见方法是使用结构化格式,如json、avro或类似的格式,您可以自由地定义必要的字段,并且可以轻松地将元数据添加到消息中并将其发送给kafka代理。
5条答案
按热度按时间hpxqektj1#
kafka v0.11.0.0增加了对自定义头的支持。
您可以在创建producerrecord时添加它们,如下所示:
新的producerrecord(key,value,headers,…),其中headers的类型为iterable
有关详细信息,请参阅:
https://issues.apache.org/jira/browse/kafka-4208
https://cwiki.apache.org/confluence/display/kafka/kip-82+-+add+record+headers
biswetbf2#
您可以创建自己的小型java应用程序,将带有标头的消息发送给kafka。用intellij或任何支持的语言编写以下代码ide:-
string json=“{\”f1\“:\”v1\“}”-这是我们要使用objectmapper发送给kafka并将其转换为jsonnode形式的键和值。record.headers().add(new recordheader(“key”,“value1”.getbytes())——这是我们发送给kafka的headers数据的键和值。要验证您的数据,您可以在kafka控制中心中检查主题并验证发送的头。
6uxekuva3#
记录级头文件是从kafka0.11.0引入的。我们可以发送每个记录中的标题列表。
dxxyhpgq4#
我在从事的项目中也遇到过类似的问题,所以我创建了这个简单的库来帮助解决这个问题:https://github.com/leandronunes85/messaging. 目前包含一个基于avro的实现,但是可以扩展到使用您选择的任何其他序列化框架。
您只需为流中想要的对象创建(反)序列化程序(基于avro或不基于avro),并让avromessageserializer发挥其魔力。
这仍然是一个非常年轻的图书馆,但我觉得它可以节省很多人很多时间!
z4bn682m5#
Kafka是不可知论的信息内容,并没有提供任何特殊的手段来丰富它,所以这是你需要自己做的事情。处理这些事情的一种常见方法是使用结构化格式,如json、avro或类似的格式,您可以自由地定义必要的字段,并且可以轻松地将元数据添加到消息中并将其发送给kafka代理。