apachekafka系统错误处理

vlurs2pr  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(522)

我们正在尝试实现kafka作为我们的消息代理解决方案。我们正在ibmbluemix中部署spring-boot微服务,其内部消息代理实现是kafka版本0.10。由于我的经验更多地是在jms、activemq端,我想知道什么是处理java用户中的系统级错误的理想方法?
下面是我们目前是如何实现的
消费者财产

  1. enable.auto.commit=false
  2. auto.offset.reset=latest

我们正在使用的默认属性

  1. max.partition.fetch.bytes
  2. session.timeout.ms

Kafka消费者
我们正在为每个主题旋转3个线程,所有线程都具有相同的groupid,即每个线程一个kafkaconsumer示例。我们现在只有一个分区。在thread类的构造函数中,使用者代码如下所示

  1. kafkaConsumer = new KafkaConsumer<String, String>(properties);
  2. final List<String> topicList = new ArrayList<String>();
  3. topicList.add(properties.getTopic());
  4. kafkaConsumer.subscribe(topicList, new ConsumerRebalanceListener() {
  5. @Override
  6. public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
  7. }
  8. @Override
  9. public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
  10. try {
  11. logger.info("Partitions assigned, consumer seeking to end.");
  12. for (final TopicPartition partition : partitions) {
  13. final long position = kafkaConsumer.position(partition);
  14. logger.info("current Position: " + position);
  15. logger.info("Seeking to end...");
  16. kafkaConsumer.seekToEnd(Arrays.asList(partition));
  17. logger.info("Seek from the current position: " + kafkaConsumer.position(partition));
  18. kafkaConsumer.seek(partition, position);
  19. }
  20. logger.info("Consumer can now begin consuming messages.");
  21. } catch (final Exception e) {
  22. logger.error("Consumer can now begin consuming messages.");
  23. }
  24. }
  25. });

实际读取发生在线程的run方法中

  1. try {
  2. // Poll on the Kafka consumer every second.
  3. final ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
  4. // Iterate through all the messages received and print their
  5. // content.
  6. for (final TopicPartition partition : records.partitions()) {
  7. final List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
  8. logger.info("consumer is alive and is processing "+ partitionRecords.size() +" records");
  9. for (final ConsumerRecord<String, String> record : partitionRecords) {
  10. logger.info("processing topic "+ record.topic()+" for key "+record.key()+" on offset "+ record.offset());
  11. final Class<? extends Event> resourceClass = eventProcessors.getResourceClass();
  12. final Object obj = converter.convertToObject(record.value(), resourceClass);
  13. if (obj != null) {
  14. logger.info("Event: " + obj + " acquired by " + Thread.currentThread().getName());
  15. final CommsEvent event = resourceClass.cast(converter.convertToObject(record.value(), resourceClass));
  16. final MessageResults results = eventProcessors.processEvent(event
  17. );
  18. if ("Success".equals(results.getStatus())) {
  19. // commit the processed message which changes
  20. // the offset
  21. kafkaConsumer.commitSync();
  22. logger.info("Message processed sucessfully");
  23. } else {
  24. kafkaConsumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
  25. logger.error("Error processing message : {} with error : {},resetting offset to {} ", obj,results.getError().getMessage(),record.offset());
  26. break;
  27. }
  28. }
  29. }
  30. }
  31. // TODO add return
  32. } catch (final Exception e) {
  33. logger.error("Consumer has failed with exception: " + e, e);
  34. shutdown();
  35. }

您将注意到eventprocessor,它是一个处理每个记录的服务类,在大多数情况下提交数据库中的记录。如果处理器抛出错误(系统异常或validationexception),我们不会提交,而是通过编程将seek设置为该偏移量,以便后续轮询将从该偏移量返回该组id。
现在的疑问是,这是正确的方法吗?如果我们得到一个错误并设置了偏移量,那么在修复之前,不会处理其他消息。这可能适用于系统错误,如无法连接到数据库,但如果问题只是与该事件,而不是其他人处理这一个记录,我们将无法处理任何其他记录。我们想到了errortopic的概念,当我们得到一个错误时,消费者会将该事件发布到errortopic,同时它会继续处理其他后续事件。但是看起来我们正在尝试将jms的设计概念(由于我以前的经验)引入kafka,并且可能有更好的方法来解决kafka中的错误处理。另外,从错误主题重新处理它可能会改变某些场景中不需要的消息序列
请让我知道有人如何处理这个场景在他们的项目遵循Kafka标准。
-塔塔

iq0todco

iq0todco1#

如果问题只与该事件有关,而不是其他人处理这一条记录,我们将无法处理任何其他记录
这是正确的,你的建议使用一个错误的主题似乎是一个可能的。
我也注意到你对 onPartitionsAssigned 您基本上不使用consumercommitted补偿,因为您似乎一直在寻找到底。
如果要从上次成功提交的偏移量重新启动,则不应执行 seek 最后,我想指出的是,尽管看起来您知道,在同一个组中有3个使用者订阅了一个分区,这意味着3个分区中有2个空闲。
江户

相关问题