micronaut kafka with auto.commit.enable=false:如何手动提交偏移量

2mbi3lxu  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(622)

我只想提交成功保存在数据库中的消息。
我知道我已关闭此应用程序的自动提交。yml

  1. micronaut:
  2. application:
  3. name: demoGrpcKafka
  4. executors:
  5. consumer:
  6. type: fixed
  7. nThreads: 1
  8. # kafka.bootstrap.servers: localhost:9092
  9. kafka:
  10. bootstrap:
  11. servers: localhost:9092
  12. consumers:
  13. default:
  14. auto:
  15. commit:
  16. enable: false
  17. producers:
  18. #default:
  19. demo-producer:
  20. retries: 2

消费者

  1. package com.tolearn.consumer
  2. import io.micronaut.configuration.kafka.annotation.KafkaKey
  3. import io.micronaut.configuration.kafka.annotation.KafkaListener
  4. import io.micronaut.configuration.kafka.annotation.Topic
  5. @KafkaListener(groupId="myGroup")
  6. class DemoConsumer {
  7. @Topic("testkey")
  8. fun receive(@KafkaKey key: String?,
  9. msg: String,
  10. offset: Long,
  11. partition: Int,
  12. topic: String,
  13. timestamp: Long
  14. ){
  15. println("Key = $key " +
  16. "msg = $msg " +
  17. "offset = $offset " +
  18. "partition = $partition " +
  19. "topic = $topic " +
  20. "timestamp = $timestamp ")
  21. //saved to database
  22. // THE ISSUE IS HERE: how commit like consumer.commitOffsets(true) ?????
  23. }
  24. }

换句话说,在使用micronaut-kafka时,如何使用commitofset或commitsync()或任何其他方法手动提交消息?

***第二版

我回到application.yaml

  1. consumers:
  2. default:
  3. auto:
  4. commit:
  5. enable: false

***第三次编辑

我尝试添加io.micronaut.configuration.kafka.acknowledgement(已弃用)或import io.micronaut.messaging.acknowledgement,其中一个导致

  1. com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

似乎我必须做些别的事情来注入这样的对象。我错过了什么?

  1. package com.tolearn.consumer
  2. import io.micronaut.configuration.kafka.Acknowledgement
  3. import io.micronaut.configuration.kafka.annotation.KafkaKey
  4. import io.micronaut.configuration.kafka.annotation.KafkaListener
  5. import io.micronaut.configuration.kafka.annotation.OffsetStrategy
  6. import io.micronaut.configuration.kafka.annotation.Topic
  7. //import io.micronaut.messaging.Acknowledgement
  8. //import io.micronaut.messaging.annotation.Header
  9. @KafkaListener(
  10. groupId="myGroup",
  11. offsetStrategy=OffsetStrategy.SYNC_PER_RECORD
  12. )
  13. class DemoConsumer {
  14. @Topic("demotopic")
  15. fun receive(@KafkaKey key: String?,
  16. acknowledgement: Acknowledgement,
  17. msg: String,
  18. offset: Long,
  19. partition: Int,
  20. topic: String,
  21. timestamp: Long
  22. //,header: Header
  23. ){
  24. println("Key = $key " +
  25. "msg = $msg " +
  26. "offset = $offset " +
  27. "partition = $partition " +
  28. "topic = $topic " +
  29. "timestamp = $timestamp "
  30. // + "header = $header"
  31. )
  32. //saved to database
  33. // how commit like consumer.commitOffsets(true)
  34. //Consumer.commitSync()
  35. acknowledgement.ack();
  36. }
  37. }

整个日志是

  1. 18:13:13.812 [consumer-executor-thread-1] ERROR i.m.c.k.e.KafkaListenerExceptionHandler - Kafka consumer [com.tolearn.consumer.DemoConsumer@17e970dd] failed to deserialize value: Error deserializing key/value for partition demotopic-0 at offset 4. If needed, please seek past the record to continue consumption.
  2. org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition demotopic-0 at offset 4. If needed, please seek past the record to continue consumption.
  3. Caused by: io.micronaut.core.serialize.exceptions.SerializationException: Error deserializing object from JSON: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
  4. at [Source: (byte[])"name: "Hello"
  5. "; line: 1, column: 6]
  6. at io.micronaut.jackson.serialize.JacksonObjectSerializer.deserialize(JacksonObjectSerializer.java:73)
  7. at io.micronaut.configuration.kafka.serde.JsonSerde.deserialize(JsonSerde.java:82)
  8. at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
  9. at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365)
  10. at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130)
  11. at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596)
  12. at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
  13. at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
  14. at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
  15. at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1308)
  16. at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
  17. at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
  18. at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.lambda$process$8(KafkaConsumerProcessor.java:396)
  19. at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  20. at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
  21. at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
  22. at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  23. at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  24. at java.base/java.lang.Thread.run(Thread.java:834)
  25. Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
  26. at [Source: (byte[])"name: "Hello"
  27. "; line: 1, column: 6]
  28. at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
  29. at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:717)
  30. at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3588)
  31. at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3564)
  32. at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchToken2(UTF8StreamJsonParser.java:2899)
  33. at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchNull(UTF8StreamJsonParser.java:2870)
  34. at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:844)
  35. at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:757)
  36. at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4664)
  37. at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4513)
  38. at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3529)
  39. at io.micronaut.jackson.serialize.JacksonObjectSerializer.deserialize(JacksonObjectSerializer.java:71)
  40. ... 18 common frames omitted
  41. 18:13:13.812 [consumer-executor-thread-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=demo-grpc-kafka-demo-consumer, groupId=myGroup] Seeking to offset 5 for partition demotopic-0
6qfn3psc

6qfn3psc1#

根据文档,您可以设置 offsetStrategy 在你的 KafkaListener 注解式

  1. @KafkaListener(groupId="myGroup", offsetStrategy=OffsetStrategy.SYNC)
  2. class DemoConsumer {
  3. @Topic("testkey")
  4. fun receive(@KafkaKey key: String?,
  5. [...]

以下选项之一:

  1. ASYNC: Asynchronously commit offsets using Consumer.commitAsync() after each batch of messages is processed.
  2. ASYNC_PER_RECORD: Asynchronously commit offsets using Consumer.commitSync() after each ConsumerRecord is consumed.
  3. AUTO: Automatically commit offsets with the Consumer.poll(long) loop.
  4. DISABLED: Do not commit offsets.
  5. SYNC: Synchronously commit offsets using Consumer.commitSync() after each batch of messages is processed.
  6. SYNC_PER_RECORD: Synchronously commit offsets using Consumer.commitSync() after each ConsumerRecord is consumed.

如果我对你的问题理解正确,你想把它设为 SYNC .

展开查看全部

相关问题