kafka消费者将值读取为字符串、bytearray或json,即使我们将数据作为对象发送,为什么?

qncylg1j  于 2021-08-25  发布在  Java
关注(0)|答案(1)|浏览(444)

在这里,您可以找到使用字符串键和json值发送数据的控制器。。
我试图理解为什么消费者有时把这个值当作字符串,有时是JSON或ByTeRy的我的控制器:

@RestController

公共类hellokafkacontroller{

private static final Logger logger =
        LoggerFactory.getLogger(HelloKafkaController.class);

private final KafkaTemplate<String, Object> template;
private final String topicName;
private final int messagesPerRequest;
private CountDownLatch latch;

public HelloKafkaController(
        final KafkaTemplate<String, Object> template,
        @Value("${tpd.topic-name}") final String topicName,
        @Value("${tpd.messages-per-request}") final int messagesPerRequest) {
    this.template = template;
    this.topicName = topicName;
    this.messagesPerRequest = messagesPerRequest;
}

@GetMapping("/hello")
public String hello() throws Exception {
    latch = new CountDownLatch(messagesPerRequest);
    IntStream.range(0, messagesPerRequest)
            .forEach(i -> this.template.send(topicName, String.valueOf(i),
                    new PracticalAdvice("A Practical Advice", i))
            );
    latch.await(60, TimeUnit.SECONDS);
    logger.info("All messages received");
    return "Hello Kafka!";
}

@KafkaListener(topics = "advice-topic", clientIdPrefix = "json",
        containerFactory = "kafkaListenerContainerFactory")
public void listenAsObject(ConsumerRecord<String, PracticalAdvice> cr,
                           @Payload PracticalAdvice payload) {
    logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
            typeIdHeader(cr.headers()), payload, cr.toString());
    latch.countDown();
}

@KafkaListener(topics = "advice-topic", clientIdPrefix = "string",
        containerFactory = "kafkaListenerStringContainerFactory")
public void listenasString(ConsumerRecord<String, String> cr,
                           @Payload String payload) {
    logger.info("Logger 2 [String] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
            typeIdHeader(cr.headers()), payload, cr.toString());
    latch.countDown();
}

@KafkaListener(topics = "advice-topic", clientIdPrefix = "bytearray",
        containerFactory = "kafkaListenerByteArrayContainerFactory")
public void listenAsByteArray(ConsumerRecord<String, byte[]> cr,
                              @Payload byte[] payload) {
    logger.info("Logger 3 [ByteArray] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
            typeIdHeader(cr.headers()), payload, cr.toString());
    latch.countDown();
}

private static String typeIdHeader(Headers headers) {
    return StreamSupport.stream(headers.spliterator(), false)
            .filter(header -> header.key().equals("__TypeId__"))
            .findFirst().map(header -> new String(header.value())).orElse("N/A");
}

}
日志:

INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 2 [String] received key 0: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":0} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 44, CreateTime = 1542911788418, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 0, value = {"message":"A Practical Advice","identifier":0})
INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 3 [ByteArray] received key 1: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 49, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 44, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 1, value = [B@39113414)
INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 2 [String] received key 2: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":2} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 45, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 2, value = {"message":"A Practical Advice","identifier":2})
INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 3 [ByteArray] received key 5: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 53, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 45, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 5, value = [B@476e998b)
INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 2 [String] received key 3: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":3} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 46, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 3, value = {"message":"A Practical Advice","identifier":3})
INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 3 [ByteArray] received key 7: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 55, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 46, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 7, value = [B@7a229d60)
INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 2 [String] received key 9: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":9} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 47, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 9, value = {"message":"A Practical Advice","identifier":9})
INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 3 [ByteArray] received key 8: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 56, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 47, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 8, value = [B@536adff4)
INFO 15292 --- [ntainer#0-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 1 [JSON] received key 4: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=4} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 22, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 4, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=4})
INFO 15292 --- [ntainer#0-0-C-1] i.tpd.kafkaexample.HelloKafkaController  : Logger 1 [JSON] received key 6: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=6} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 23, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 6, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=6})
INFO 15292 --- [nio-8080-exec-1] i.tpd.kafkaexample.HelloKafkaController  : All messages received
ohfgkhjo

ohfgkhjo1#

您有3个消费者,@kafkalitseners,线程从主题“advice topic”读取,您要求将消息反序列化为json、string和bytearray
java.lang.String clientIdPrefix 提供时,将覆盖消费者工厂配置中的客户端id属性
你想达到什么目标?
kafka只知道字节数组,它是您定义的反序列化程序,告诉它使用哪个“眼镜”查看数据

相关问题