kafka消费者将值读取为字符串、bytearray或json,即使我们将数据作为对象发送,为什么?

qncylg1j  于 2021-08-25  发布在  Java
关注(0)|答案(1)|浏览(514)

在这里,您可以找到使用字符串键和json值发送数据的控制器。。
我试图理解为什么消费者有时把这个值当作字符串,有时是JSON或ByTeRy的我的控制器:

  1. @RestController

公共类hellokafkacontroller{

  1. private static final Logger logger =
  2. LoggerFactory.getLogger(HelloKafkaController.class);
  3. private final KafkaTemplate<String, Object> template;
  4. private final String topicName;
  5. private final int messagesPerRequest;
  6. private CountDownLatch latch;
  7. public HelloKafkaController(
  8. final KafkaTemplate<String, Object> template,
  9. @Value("${tpd.topic-name}") final String topicName,
  10. @Value("${tpd.messages-per-request}") final int messagesPerRequest) {
  11. this.template = template;
  12. this.topicName = topicName;
  13. this.messagesPerRequest = messagesPerRequest;
  14. }
  15. @GetMapping("/hello")
  16. public String hello() throws Exception {
  17. latch = new CountDownLatch(messagesPerRequest);
  18. IntStream.range(0, messagesPerRequest)
  19. .forEach(i -> this.template.send(topicName, String.valueOf(i),
  20. new PracticalAdvice("A Practical Advice", i))
  21. );
  22. latch.await(60, TimeUnit.SECONDS);
  23. logger.info("All messages received");
  24. return "Hello Kafka!";
  25. }
  26. @KafkaListener(topics = "advice-topic", clientIdPrefix = "json",
  27. containerFactory = "kafkaListenerContainerFactory")
  28. public void listenAsObject(ConsumerRecord<String, PracticalAdvice> cr,
  29. @Payload PracticalAdvice payload) {
  30. logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
  31. typeIdHeader(cr.headers()), payload, cr.toString());
  32. latch.countDown();
  33. }
  34. @KafkaListener(topics = "advice-topic", clientIdPrefix = "string",
  35. containerFactory = "kafkaListenerStringContainerFactory")
  36. public void listenasString(ConsumerRecord<String, String> cr,
  37. @Payload String payload) {
  38. logger.info("Logger 2 [String] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
  39. typeIdHeader(cr.headers()), payload, cr.toString());
  40. latch.countDown();
  41. }
  42. @KafkaListener(topics = "advice-topic", clientIdPrefix = "bytearray",
  43. containerFactory = "kafkaListenerByteArrayContainerFactory")
  44. public void listenAsByteArray(ConsumerRecord<String, byte[]> cr,
  45. @Payload byte[] payload) {
  46. logger.info("Logger 3 [ByteArray] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
  47. typeIdHeader(cr.headers()), payload, cr.toString());
  48. latch.countDown();
  49. }
  50. private static String typeIdHeader(Headers headers) {
  51. return StreamSupport.stream(headers.spliterator(), false)
  52. .filter(header -> header.key().equals("__TypeId__"))
  53. .findFirst().map(header -> new String(header.value())).orElse("N/A");
  54. }

}
日志:

  1. INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 2 [String] received key 0: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":0} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 44, CreateTime = 1542911788418, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 0, value = {"message":"A Practical Advice","identifier":0})
  2. INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 3 [ByteArray] received key 1: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 49, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 44, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 1, value = [B@39113414)
  3. INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 2 [String] received key 2: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":2} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 45, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 2, value = {"message":"A Practical Advice","identifier":2})
  4. INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 3 [ByteArray] received key 5: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 53, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 45, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 5, value = [B@476e998b)
  5. INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 2 [String] received key 3: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":3} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 46, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 3, value = {"message":"A Practical Advice","identifier":3})
  6. INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 3 [ByteArray] received key 7: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 55, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 46, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 7, value = [B@7a229d60)
  7. INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 2 [String] received key 9: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":9} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 47, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 9, value = {"message":"A Practical Advice","identifier":9})
  8. INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 3 [ByteArray] received key 8: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 56, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 47, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 8, value = [B@536adff4)
  9. INFO 15292 --- [ntainer#0-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 1 [JSON] received key 4: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=4} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 22, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 4, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=4})
  10. INFO 15292 --- [ntainer#0-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 1 [JSON] received key 6: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=6} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 23, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 6, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=6})
  11. INFO 15292 --- [nio-8080-exec-1] i.tpd.kafkaexample.HelloKafkaController : All messages received
ohfgkhjo

ohfgkhjo1#

您有3个消费者,@kafkalitseners,线程从主题“advice topic”读取,您要求将消息反序列化为json、string和bytearray
java.lang.String clientIdPrefix 提供时,将覆盖消费者工厂配置中的客户端id属性
你想达到什么目标?
kafka只知道字节数组,它是您定义的反序列化程序,告诉它使用哪个“眼镜”查看数据

相关问题