org.apache.kafka.clients.consumer.Consumer.committed()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(7.0k)|赞(0)|评价(0)|浏览(329)

本文整理了Java中org.apache.kafka.clients.consumer.Consumer.committed()方法的一些代码示例,展示了Consumer.committed()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.committed()方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:committed

Consumer.committed介绍

[英]Fetches the last committed offsets for the input list of partitions
[中]获取分区输入列表的最后提交的偏移量

代码示例

代码示例来源:origin: openzipkin/brave

  1. public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) {
  2. return delegate.committed(partition, timeout);
  3. }

代码示例来源:origin: openzipkin/brave

  1. @Override public OffsetAndMetadata committed(TopicPartition partition) {
  2. return delegate.committed(partition);
  3. }

代码示例来源:origin: apache/nifi

  1. private void rollback(final TopicPartition topicPartition) {
  2. OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
  3. if (offsetAndMetadata == null) {
  4. offsetAndMetadata = kafkaConsumer.committed(topicPartition);
  5. }
  6. final long offset = offsetAndMetadata.offset();
  7. kafkaConsumer.seek(topicPartition, offset);
  8. }

代码示例来源:origin: apache/nifi

  1. private void rollback(final TopicPartition topicPartition) {
  2. try {
  3. OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
  4. if (offsetAndMetadata == null) {
  5. offsetAndMetadata = kafkaConsumer.committed(topicPartition);
  6. }
  7. final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
  8. kafkaConsumer.seek(topicPartition, offset);
  9. } catch (final Exception rollbackException) {
  10. logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
  11. }
  12. }

代码示例来源:origin: apache/nifi

  1. private void rollback(final TopicPartition topicPartition) {
  2. try {
  3. OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
  4. if (offsetAndMetadata == null) {
  5. offsetAndMetadata = kafkaConsumer.committed(topicPartition);
  6. }
  7. final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
  8. kafkaConsumer.seek(topicPartition, offset);
  9. } catch (final Exception rollbackException) {
  10. logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
  11. }
  12. }

代码示例来源:origin: apache/nifi

  1. private void rollback(final TopicPartition topicPartition) {
  2. try {
  3. OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
  4. if (offsetAndMetadata == null) {
  5. offsetAndMetadata = kafkaConsumer.committed(topicPartition);
  6. }
  7. final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
  8. kafkaConsumer.seek(topicPartition, offset);
  9. } catch (final Exception rollbackException) {
  10. logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
  11. }
  12. }

代码示例来源:origin: spring-projects/spring-kafka

  1. @KafkaListener(id = "batchAckListener", topics = { "annotated26", "annotated27" },
  2. containerFactory = "batchFactory")
  3. public void batchAckListener(List<String> in,
  4. @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
  5. @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
  6. Consumer<?, ?> consumer) {
  7. for (int i = 0; i < topics.size(); i++) {
  8. this.latch17.countDown();
  9. String topic = topics.get(i);
  10. if ("annotated26".equals(topic) && consumer.committed(
  11. new org.apache.kafka.common.TopicPartition(topic, partitions.get(i))).offset() == 1) {
  12. this.latch18.countDown();
  13. }
  14. else if ("annotated27".equals(topic) && consumer.committed(
  15. new org.apache.kafka.common.TopicPartition(topic, partitions.get(i))).offset() == 3) {
  16. this.latch18.countDown();
  17. }
  18. }
  19. }

代码示例来源:origin: opentracing-contrib/java-kafka-client

  1. @Override
  2. public OffsetAndMetadata committed(TopicPartition partition) {
  3. return consumer.committed(partition);
  4. }

代码示例来源:origin: opentracing-contrib/java-kafka-client

  1. @Override
  2. public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) {
  3. return consumer.committed(topicPartition, duration);
  4. }

代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client

  1. @Override
  2. public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) {
  3. return consumer.committed(topicPartition, duration);
  4. }

代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client

  1. @Override
  2. public OffsetAndMetadata committed(TopicPartition partition) {
  3. return consumer.committed(partition);
  4. }

代码示例来源:origin: rayokota/kafka-graphs

  1. @Override
  2. public OffsetAndMetadata committed(TopicPartition partition) {
  3. return kafkaConsumer.committed(partition);
  4. }

代码示例来源:origin: rayokota/kafka-graphs

  1. @Override
  2. public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) {
  3. return kafkaConsumer.committed(partition, timeout);
  4. }

代码示例来源:origin: io.zipkin.brave/brave-instrumentation-kafka-clients

  1. public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) {
  2. return delegate.committed(partition, timeout);
  3. }

代码示例来源:origin: vert-x3/vertx-kafka-client

  1. @Override
  2. public void committed(TopicPartition topicPartition, Handler<AsyncResult<OffsetAndMetadata>> handler) {
  3. this.submitTask((consumer, future) -> {
  4. OffsetAndMetadata result = consumer.committed(topicPartition);
  5. if (future != null) {
  6. future.complete(result);
  7. }
  8. }, handler);
  9. }

代码示例来源:origin: linkedin/li-apache-kafka-clients

  1. @Override
  2. public Long committedSafeOffset(TopicPartition tp) {
  3. OffsetAndMetadata rawOffsetAndMetadata = _kafkaConsumer.committed(tp);
  4. if (rawOffsetAndMetadata == null || rawOffsetAndMetadata.metadata().isEmpty()) {
  5. return null;
  6. }
  7. return rawOffsetAndMetadata.offset();
  8. }

代码示例来源:origin: org.apache.nifi/nifi-kafka-0-10-processors

  1. private void rollback(final TopicPartition topicPartition) {
  2. OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
  3. if (offsetAndMetadata == null) {
  4. offsetAndMetadata = kafkaConsumer.committed(topicPartition);
  5. }
  6. final long offset = offsetAndMetadata.offset();
  7. kafkaConsumer.seek(topicPartition, offset);
  8. }

代码示例来源:origin: org.apache.nifi/nifi-kafka-2-0-processors

  1. private void rollback(final TopicPartition topicPartition) {
  2. try {
  3. OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
  4. if (offsetAndMetadata == null) {
  5. offsetAndMetadata = kafkaConsumer.committed(topicPartition);
  6. }
  7. final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
  8. kafkaConsumer.seek(topicPartition, offset);
  9. } catch (final Exception rollbackException) {
  10. logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
  11. }
  12. }

代码示例来源:origin: org.apache.nifi/nifi-kafka-1-0-processors

  1. private void rollback(final TopicPartition topicPartition) {
  2. try {
  3. OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
  4. if (offsetAndMetadata == null) {
  5. offsetAndMetadata = kafkaConsumer.committed(topicPartition);
  6. }
  7. final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
  8. kafkaConsumer.seek(topicPartition, offset);
  9. } catch (final Exception rollbackException) {
  10. logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
  11. }
  12. }

代码示例来源:origin: reactor/reactor-kafka

  1. private long committedCount(KafkaReceiver<Integer, String> receiver) {
  2. long committed = 0;
  3. for (int j = 0; j < partitions; j++) {
  4. TopicPartition p = new TopicPartition(topic, j);
  5. OffsetAndMetadata offset = receiver.doOnConsumer(c -> c.committed(p)).block(Duration.ofSeconds(receiveTimeoutMillis));
  6. if (offset != null && offset.offset() > 0)
  7. committed += offset.offset();
  8. }
  9. return committed;
  10. }

相关文章