在这里,您可以找到使用字符串键和json值发送数据的控制器。。
我试图理解为什么消费者有时把这个值当作字符串,有时是JSON或ByTeRy的我的控制器:
@RestController
公共类hellokafkacontroller{
private static final Logger logger =
LoggerFactory.getLogger(HelloKafkaController.class);
private final KafkaTemplate<String, Object> template;
private final String topicName;
private final int messagesPerRequest;
private CountDownLatch latch;
public HelloKafkaController(
final KafkaTemplate<String, Object> template,
@Value("${tpd.topic-name}") final String topicName,
@Value("${tpd.messages-per-request}") final int messagesPerRequest) {
this.template = template;
this.topicName = topicName;
this.messagesPerRequest = messagesPerRequest;
}
@GetMapping("/hello")
public String hello() throws Exception {
latch = new CountDownLatch(messagesPerRequest);
IntStream.range(0, messagesPerRequest)
.forEach(i -> this.template.send(topicName, String.valueOf(i),
new PracticalAdvice("A Practical Advice", i))
);
latch.await(60, TimeUnit.SECONDS);
logger.info("All messages received");
return "Hello Kafka!";
}
@KafkaListener(topics = "advice-topic", clientIdPrefix = "json",
containerFactory = "kafkaListenerContainerFactory")
public void listenAsObject(ConsumerRecord<String, PracticalAdvice> cr,
@Payload PracticalAdvice payload) {
logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
latch.countDown();
}
@KafkaListener(topics = "advice-topic", clientIdPrefix = "string",
containerFactory = "kafkaListenerStringContainerFactory")
public void listenasString(ConsumerRecord<String, String> cr,
@Payload String payload) {
logger.info("Logger 2 [String] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
latch.countDown();
}
@KafkaListener(topics = "advice-topic", clientIdPrefix = "bytearray",
containerFactory = "kafkaListenerByteArrayContainerFactory")
public void listenAsByteArray(ConsumerRecord<String, byte[]> cr,
@Payload byte[] payload) {
logger.info("Logger 3 [ByteArray] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
latch.countDown();
}
private static String typeIdHeader(Headers headers) {
return StreamSupport.stream(headers.spliterator(), false)
.filter(header -> header.key().equals("__TypeId__"))
.findFirst().map(header -> new String(header.value())).orElse("N/A");
}
}
日志:
INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 2 [String] received key 0: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":0} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 44, CreateTime = 1542911788418, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 0, value = {"message":"A Practical Advice","identifier":0})
INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 3 [ByteArray] received key 1: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 49, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 44, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 1, value = [B@39113414)
INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 2 [String] received key 2: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":2} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 45, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 2, value = {"message":"A Practical Advice","identifier":2})
INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 3 [ByteArray] received key 5: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 53, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 45, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 5, value = [B@476e998b)
INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 2 [String] received key 3: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":3} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 46, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 3, value = {"message":"A Practical Advice","identifier":3})
INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 3 [ByteArray] received key 7: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 55, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 46, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 7, value = [B@7a229d60)
INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 2 [String] received key 9: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":9} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 47, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 9, value = {"message":"A Practical Advice","identifier":9})
INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 3 [ByteArray] received key 8: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 56, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 47, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 8, value = [B@536adff4)
INFO 15292 --- [ntainer#0-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 1 [JSON] received key 4: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=4} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 22, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 4, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=4})
INFO 15292 --- [ntainer#0-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 1 [JSON] received key 6: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=6} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 23, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 6, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=6})
INFO 15292 --- [nio-8080-exec-1] i.tpd.kafkaexample.HelloKafkaController : All messages received
1条答案
按热度按时间ohfgkhjo1#
您有3个消费者,@kafkalitseners,线程从主题“advice topic”读取,您要求将消息反序列化为json、string和bytearray
java.lang.String
clientIdPrefix
提供时,将覆盖消费者工厂配置中的客户端id属性你想达到什么目标?
kafka只知道字节数组,它是您定义的反序列化程序,告诉它使用哪个“眼镜”查看数据