org.apache.kafka.clients.consumer.Consumer.commitAsync()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(6.7k)|赞(0)|评价(0)|浏览(283)

本文整理了Java中org.apache.kafka.clients.consumer.Consumer.commitAsync()方法的一些代码示例,展示了Consumer.commitAsync()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.commitAsync()方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:commitAsync

Consumer.commitAsync介绍

暂无

代码示例

代码示例来源:origin: openzipkin/brave

@Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
  OffsetCommitCallback callback) {
 delegate.commitAsync(offsets, callback);
}

代码示例来源:origin: openzipkin/brave

@Override public void commitAsync() {
 delegate.commitAsync();
}

代码示例来源:origin: openzipkin/brave

@Override public void commitAsync(OffsetCommitCallback callback) {
 delegate.commitAsync(callback);
}

代码示例来源:origin: apache/storm

@Override
public void nextTuple() {
  try {
    if (refreshAssignmentTimer.isExpiredResetOnTrue()) {
      refreshAssignment();
    }
    if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
      if (isAtLeastOnceProcessing()) {
        commitOffsetsForAckedTuples();
      } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
          createFetchedOffsetsMetadata(consumer.assignment());
        consumer.commitAsync(offsetsToCommit, null);
        LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
      }
    }
    PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
    if (pollablePartitionsInfo.shouldPoll()) {
      try {
        setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo));
      } catch (RetriableException e) {
        LOG.error("Failed to poll from kafka.", e);
      }
    }
    emitIfWaitingNotEmitted();
  } catch (InterruptException e) {
    throwKafkaConsumerInterruptedException();
  }
}

代码示例来源:origin: io.zipkin.brave/brave-instrumentation-kafka-clients

@Override public void commitAsync() {
 delegate.commitAsync();
}

代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client

@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
  OffsetCommitCallback callback) {
 consumer.commitAsync(offsets, callback);
}

代码示例来源:origin: opentracing-contrib/java-kafka-client

@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
  OffsetCommitCallback callback) {
 consumer.commitAsync(offsets, callback);
}

代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client

@Override
public void commitAsync(OffsetCommitCallback callback) {
 consumer.commitAsync(callback);
}

代码示例来源:origin: rayokota/kafka-graphs

@Override
public void commitAsync() {
  kafkaConsumer.commitAsync();
}

代码示例来源:origin: rayokota/kafka-graphs

@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
  kafkaConsumer.commitAsync(offsets, callback);
}

代码示例来源:origin: opentracing-contrib/java-kafka-client

@Override
public void commitAsync(OffsetCommitCallback callback) {
 consumer.commitAsync(callback);
}

代码示例来源:origin: io.zipkin.brave/brave-instrumentation-kafka-clients

@Override public void commitAsync(OffsetCommitCallback callback) {
 delegate.commitAsync(callback);
}

代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client

@Override
public void commitAsync() {
 consumer.commitAsync();
}

代码示例来源:origin: io.zipkin.brave/brave-instrumentation-kafka-clients

@Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
  OffsetCommitCallback callback) {
 delegate.commitAsync(offsets, callback);
}

代码示例来源:origin: rayokota/kafka-graphs

@Override
public void commitAsync(OffsetCommitCallback callback) {
  kafkaConsumer.commitAsync(callback);
}

代码示例来源:origin: opentracing-contrib/java-kafka-client

@Override
public void commitAsync() {
 consumer.commitAsync();
}

代码示例来源:origin: vert-x3/vertx-kafka-client

@Override
public void commit(Map<TopicPartition, OffsetAndMetadata> offsets, Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>> completionHandler) {
 this.submitTask((consumer, future) -> {
  OffsetCommitCallback callback = (result, exception) -> {
   if (future != null) {
    if (exception != null) {
     future.fail(exception);
    } else {
     future.complete(result);
    }
   }
  };
  if (offsets == null) {
   consumer.commitAsync(callback);
  } else {
   consumer.commitAsync(offsets, callback);
  }
 }, completionHandler);
}

代码示例来源:origin: berndruecker/flowing-retail

public void run( ) {
  
  final Properties props = new Properties();
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  consumer = new KafkaConsumer<>(props);
  consumer.subscribe(Collections.singletonList(topicName));
  while (running) {
   consumer.poll(pollingInterval);
   consumer.commitAsync();
  }        
  consumer.close();
 }
};

代码示例来源:origin: linkedin/li-apache-kafka-clients

private void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets,
              boolean ignoreConsumerHighWatermark,
              OffsetCommitCallback callback,
              boolean sync,
              Duration timeout) {
 Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
   _consumerRecordsProcessor.safeOffsetsToCommit(offsets, ignoreConsumerHighWatermark);
 if (sync) {
  if (timeout == null) {
   LOG.trace("Committing offsets synchronously: {}", offsetsToCommit);
   _kafkaConsumer.commitSync(offsetsToCommit);
  } else {
   LOG.trace("Committing offsets synchronously with timeout {} ms: {}", timeout.toMillis(), offsetsToCommit);
   _kafkaConsumer.commitSync(offsetsToCommit, timeout);
  }
 } else {
  LOG.trace("Committing offsets asynchronously: {}", offsetsToCommit);
  _offsetCommitCallback.setUserCallback(callback);
  _kafkaConsumer.commitAsync(offsetsToCommit, _offsetCommitCallback);
 }
}

代码示例来源:origin: authorjapps/zerocode

public static void handleCommitSyncAsync(Consumer<Long, String> consumer,
                     ConsumerCommonConfigs consumerCommonConfigs,
                     ConsumerLocalConfigs consumeLocalTestProps) {
  if (consumeLocalTestProps == null) {
    LOGGER.warn("[No local test configs]-Kafka client neither did `commitAsync()` nor `commitSync()`");
    return;
  }
  Boolean effectiveCommitSync;
  Boolean effectiveCommitAsync;
  Boolean localCommitSync = consumeLocalTestProps.getCommitSync();
  Boolean localCommitAsync = consumeLocalTestProps.getCommitAsync();
  if (localCommitSync == null && localCommitAsync == null) {
    effectiveCommitSync = consumerCommonConfigs.getCommitSync();
    effectiveCommitAsync = consumerCommonConfigs.getCommitAsync();
  } else {
    effectiveCommitSync = localCommitSync;
    effectiveCommitAsync = localCommitAsync;
  }
  if (effectiveCommitSync != null && effectiveCommitSync == true) {
    consumer.commitSync();
  } else if (effectiveCommitAsync != null && effectiveCommitAsync == true) {
    consumer.commitAsync();
  } else {
    LOGGER.warn("Kafka client neither configured for `commitAsync()` nor `commitSync()`");
  }
  // --------------------------------------------------------
  // Leave this to the user to "commit" the offset explicitly
  // --------------------------------------------------------
}

相关文章