在kafka消息中添加自定义头

jhkqcmku  于 2021-06-08  发布在  Kafka
关注(0)|答案(5)|浏览(2051)

我通过使用kafka producer将文件转换为字节数组来发送消息。
我还需要为消息添加一些头,例如文件名、时间戳等,以便在使用者端,我可以基于文件名和其他头来处理消息。
我目前正在做的是创建一个对象,并将原始消息和头 Package 在其中,然后将对象作为消息以字节数组的形式发送。
我想知道是否有一种方法,我可以添加自定义标题,同时发布的消息?

hpxqektj

hpxqektj1#

kafka v0.11.0.0增加了对自定义头的支持。
您可以在创建producerrecord时添加它们,如下所示:
新的producerrecord(key,value,headers,…),其中headers的类型为iterable
有关详细信息,请参阅:
https://issues.apache.org/jira/browse/kafka-4208
https://cwiki.apache.org/confluence/display/kafka/kip-82+-+add+record+headers

biswetbf

biswetbf2#

您可以创建自己的小型java应用程序,将带有标头的消息发送给kafka。用intellij或任何支持的语言编写以下代码ide:-

public static void main(String[] args) throws JsonProcessingException, 
InterruptedException {
  Properties props=new Properties();       
  props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
  props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
  JsonSerializer.class.getName());
  props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class.getName());

  KafkaProducer<String, JsonNode> producer=new KafkaProducer<String, JsonNode>(props);
  String json = "{ \"f1\" : \"v1\" } ";
  ObjectMapper mapper = new ObjectMapper();
  JsonNode jsonNode = mapper.readTree(json);
  ProducerRecord<String,JsonNode> record =new ProducerRecord<String, 
  JsonNode>("test topic", jsonNode);
  record.headers().add(new RecordHeader("key","value1".getBytes()));
  producer.send(record);
  Thread.sleep(10000);
}

string json=“{\”f1\“:\”v1\“}”-这是我们要使用objectmapper发送给kafka并将其转换为jsonnode形式的键和值。record.headers().add(new recordheader(“key”,“value1”.getbytes())——这是我们发送给kafka的headers数据的键和值。要验证您的数据,您可以在kafka控制中心中检查主题并验证发送的头。

6uxekuva

6uxekuva3#

记录级头文件是从kafka0.11.0引入的。我们可以发送每个记录中的标题列表。

List<Header> headers = Arrays.asList(new RecordHeader("header_key", "header_value".getBytes()));
ProducerRecord<String, String> record = new ProducerRecord<>("topic", null, "key", "value", headers);
dxxyhpgq

dxxyhpgq4#

我在从事的项目中也遇到过类似的问题,所以我创建了这个简单的库来帮助解决这个问题:https://github.com/leandronunes85/messaging. 目前包含一个基于avro的实现,但是可以扩展到使用您选择的任何其他序列化框架。
您只需为流中想要的对象创建(反)序列化程序(基于avro或不基于avro),并让avromessageserializer发挥其魔力。
这仍然是一个非常年轻的图书馆,但我觉得它可以节省很多人很多时间!

z4bn682m

z4bn682m5#

Kafka是不可知论的信息内容,并没有提供任何特殊的手段来丰富它,所以这是你需要自己做的事情。处理这些事情的一种常见方法是使用结构化格式,如json、avro或类似的格式,您可以自由地定义必要的字段,并且可以轻松地将元数据添加到消息中并将其发送给kafka代理。

相关问题