从kafka消费者内部,如何获取消息头

syqv5f0l  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(928)

简单地说,我想让消费者监听消息头,但我得到的是“required argument[header]not specified”。
以下是我尝试失败的原因:

  1. package com.tolearn.consumer
  2. import io.micronaut.configuration.kafka.Acknowledgement
  3. import io.micronaut.configuration.kafka.annotation.KafkaKey
  4. import io.micronaut.configuration.kafka.annotation.KafkaListener
  5. import io.micronaut.configuration.kafka.annotation.OffsetStrategy
  6. import io.micronaut.configuration.kafka.annotation.Topic
  7. import io.micronaut.messaging.annotation.Header
  8. @KafkaListener(
  9. groupId="myGroup",
  10. offsetStrategy=OffsetStrategy.SYNC_PER_RECORD
  11. )
  12. class DemoConsumer {
  13. @Topic("demotopic")
  14. fun receive(@KafkaKey key: String?,
  15. msg: String,
  16. header: Header,
  17. acknowledgement: Acknowledgement
  18. ){
  19. println("Key = $key " +
  20. "msg = $msg " +
  21. "header = $header"
  22. )
  23. acknowledgement.ack();
  24. }
  25. }

它打印此错误:

  1. 00:02:05.890 [consumer-executor-thread-1] ERROR i.m.c.k.e.KafkaListenerExceptionHandler - Error processing record [Optional[ConsumerRecord(topic = demotopic, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1607828525818, serialized key size = 36, serialized value size = 14, headers = RecordHeaders(headers = [RecordHeader(key = My-Header, value = [109, 121, 72, 101, 97, 100, 101, 114])], isReadOnly = false), key = 2afe3f0d-40c0-44f6-93a3-cce06678df80, value = name: "Hello"
  2. )]] for Kafka consumer [com.tolearn.consumer.DemoConsumer@4ca2237e] produced error: Required argument [Header header] not specified
  3. io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: Required argument [Header header] not specified
  4. at io.micronaut.core.bind.DefaultExecutableBinder.bind(DefaultExecutableBinder.java:88)
  5. at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.lambda$process$8(KafkaConsumerProcessor.java:494)
  6. at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  7. at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
  8. at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
  9. at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  10. at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  11. at java.base/java.lang.Thread.run(Thread.java:834)

如果它添加了一些内容,下面是制作人:

  1. package com.tolearn.producer
  2. import io.micronaut.configuration.kafka.annotation.KafkaClient
  3. import io.micronaut.configuration.kafka.annotation.KafkaKey
  4. import io.micronaut.configuration.kafka.annotation.Topic
  5. import io.micronaut.messaging.annotation.Header
  6. @KafkaClient(
  7. id = "demo-producer",
  8. acks = KafkaClient.Acknowledge.ALL)
  9. public interface DemoProducer {
  10. @Topic("demotopic")
  11. fun sendDemoMsg(
  12. @KafkaKey key: String?,
  13. @Header("My-Header") myHeader: String,
  14. msg: String?) {
  15. }
  16. }

消息头抛出生产者的服务发送消息

  1. package com.tolearn.service
  2. import com.tolearn.producer.DemoProducer
  3. import javax.inject.Inject
  4. import javax.inject.Named
  5. import javax.inject.Singleton
  6. @Singleton
  7. class DemoService {
  8. @Inject
  9. @Named("dp")
  10. lateinit var dp : DemoProducer
  11. fun postDemo(key: String, msg: String){
  12. //blocking
  13. dp.sendDemoMsg(key, "myHeader", msg)
  14. }
  15. }

我想打印。。。我的标题:我的标题

***编辑

对于未来的读者,我的解决方案是:
应用程序.yml

  1. micronaut:
  2. application:
  3. name: demoGrpcKafka
  4. executors:
  5. consumer:
  6. type: fixed
  7. nThreads: 1
  8. # kafka.bootstrap.servers: localhost:9092
  9. kafka:
  10. bootstrap:
  11. servers: localhost:9092
  12. consumers:
  13. default:
  14. auto:
  15. commit:
  16. enable: false
  17. producers:
  18. #default:
  19. demo-producer:
  20. retries: 2
  21. my:
  22. application:
  23. token: tokenFromYml

制作人

  1. package com.tolearn.producer
  2. import io.micronaut.configuration.kafka.annotation.KafkaClient
  3. import io.micronaut.configuration.kafka.annotation.KafkaKey
  4. import io.micronaut.configuration.kafka.annotation.Topic
  5. import io.micronaut.messaging.annotation.Header
  6. @KafkaClient(
  7. id = "demo-producer",
  8. acks = KafkaClient.Acknowledge.ALL)
  9. @Header(name = "X-Token", value = "\${my.application.token}")
  10. public interface DemoProducer {
  11. @Topic("demotopic")
  12. fun sendDemoMsg(
  13. @KafkaKey key: String?,
  14. msg: String?) {
  15. }
  16. }

消费者

  1. package com.tolearn.consumer
  2. import io.micronaut.configuration.kafka.Acknowledgement
  3. import io.micronaut.configuration.kafka.KafkaHeaders
  4. import io.micronaut.configuration.kafka.annotation.KafkaKey
  5. import io.micronaut.configuration.kafka.annotation.KafkaListener
  6. import io.micronaut.configuration.kafka.annotation.OffsetStrategy
  7. import io.micronaut.configuration.kafka.annotation.Topic
  8. import io.micronaut.messaging.MessageHeaders
  9. @KafkaListener(
  10. groupId="myGroup",
  11. offsetStrategy=OffsetStrategy.SYNC_PER_RECORD
  12. )
  13. class DemoConsumer {
  14. @Topic("demotopic")
  15. fun receive(@KafkaKey key: String?,
  16. msg: String,
  17. headers: MessageHeaders,
  18. acknowledgement: Acknowledgement,
  19. offset: Long,
  20. partition: Int,
  21. topic: String,
  22. timestamp: Long
  23. ){
  24. val h = (headers).get("X-Token")
  25. println("Key = $key " +
  26. "msg = $msg " +
  27. "offset = $offset " +
  28. "partition = $partition " +
  29. "topic = $topic " +
  30. "timestamp = $timestamp " +
  31. "header = $h"
  32. )
  33. acknowledgement.ack();
  34. }
  35. }
klh5stk1

klh5stk11#

看起来您应该尝试使用io.micronaut.messaging.messageheaders,而不仅仅是kafka头类。

  1. @Topic("demotopic")
  2. fun receive(@KafkaKey key: String?,
  3. msg: String,
  4. headers: MessageHeaders,
  5. acknowledgement: Acknowledgement)

另外,您也可以在producer端使用@io.micronaut.messaging.annotation.header(“我的头”)指定头的值。
如果header是可选的,或者仅仅是字符串,不要忘记添加@javax.annotation.nullable?如果是Kotlin。

  1. @Topic("demotopic")
  2. fun receive(@KafkaKey key: String?,
  3. msg: String,
  4. @Nullable @Header("My-Header") myHeader: String,
  5. acknowledgement: Acknowledgement)

相关问题