本文整理了Java中org.apache.kafka.clients.consumer.Consumer.commitAsync()
方法的一些代码示例,展示了Consumer.commitAsync()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.commitAsync()
方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:commitAsync
暂无
代码示例来源:origin: openzipkin/brave
@Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback) {
delegate.commitAsync(offsets, callback);
}
代码示例来源:origin: openzipkin/brave
@Override public void commitAsync() {
delegate.commitAsync();
}
代码示例来源:origin: openzipkin/brave
@Override public void commitAsync(OffsetCommitCallback callback) {
delegate.commitAsync(callback);
}
代码示例来源:origin: apache/storm
@Override
public void nextTuple() {
try {
if (refreshAssignmentTimer.isExpiredResetOnTrue()) {
refreshAssignment();
}
if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
if (isAtLeastOnceProcessing()) {
commitOffsetsForAckedTuples();
} else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(consumer.assignment());
consumer.commitAsync(offsetsToCommit, null);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
}
PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
if (pollablePartitionsInfo.shouldPoll()) {
try {
setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo));
} catch (RetriableException e) {
LOG.error("Failed to poll from kafka.", e);
}
}
emitIfWaitingNotEmitted();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}
代码示例来源:origin: io.zipkin.brave/brave-instrumentation-kafka-clients
@Override public void commitAsync() {
delegate.commitAsync();
}
代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client
@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback) {
consumer.commitAsync(offsets, callback);
}
代码示例来源:origin: opentracing-contrib/java-kafka-client
@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback) {
consumer.commitAsync(offsets, callback);
}
代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client
@Override
public void commitAsync(OffsetCommitCallback callback) {
consumer.commitAsync(callback);
}
代码示例来源:origin: rayokota/kafka-graphs
@Override
public void commitAsync() {
kafkaConsumer.commitAsync();
}
代码示例来源:origin: rayokota/kafka-graphs
@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
kafkaConsumer.commitAsync(offsets, callback);
}
代码示例来源:origin: opentracing-contrib/java-kafka-client
@Override
public void commitAsync(OffsetCommitCallback callback) {
consumer.commitAsync(callback);
}
代码示例来源:origin: io.zipkin.brave/brave-instrumentation-kafka-clients
@Override public void commitAsync(OffsetCommitCallback callback) {
delegate.commitAsync(callback);
}
代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client
@Override
public void commitAsync() {
consumer.commitAsync();
}
代码示例来源:origin: io.zipkin.brave/brave-instrumentation-kafka-clients
@Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback) {
delegate.commitAsync(offsets, callback);
}
代码示例来源:origin: rayokota/kafka-graphs
@Override
public void commitAsync(OffsetCommitCallback callback) {
kafkaConsumer.commitAsync(callback);
}
代码示例来源:origin: opentracing-contrib/java-kafka-client
@Override
public void commitAsync() {
consumer.commitAsync();
}
代码示例来源:origin: vert-x3/vertx-kafka-client
@Override
public void commit(Map<TopicPartition, OffsetAndMetadata> offsets, Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>> completionHandler) {
this.submitTask((consumer, future) -> {
OffsetCommitCallback callback = (result, exception) -> {
if (future != null) {
if (exception != null) {
future.fail(exception);
} else {
future.complete(result);
}
}
};
if (offsets == null) {
consumer.commitAsync(callback);
} else {
consumer.commitAsync(offsets, callback);
}
}, completionHandler);
}
代码示例来源:origin: berndruecker/flowing-retail
public void run( ) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));
while (running) {
consumer.poll(pollingInterval);
consumer.commitAsync();
}
consumer.close();
}
};
代码示例来源:origin: linkedin/li-apache-kafka-clients
private void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets,
boolean ignoreConsumerHighWatermark,
OffsetCommitCallback callback,
boolean sync,
Duration timeout) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
_consumerRecordsProcessor.safeOffsetsToCommit(offsets, ignoreConsumerHighWatermark);
if (sync) {
if (timeout == null) {
LOG.trace("Committing offsets synchronously: {}", offsetsToCommit);
_kafkaConsumer.commitSync(offsetsToCommit);
} else {
LOG.trace("Committing offsets synchronously with timeout {} ms: {}", timeout.toMillis(), offsetsToCommit);
_kafkaConsumer.commitSync(offsetsToCommit, timeout);
}
} else {
LOG.trace("Committing offsets asynchronously: {}", offsetsToCommit);
_offsetCommitCallback.setUserCallback(callback);
_kafkaConsumer.commitAsync(offsetsToCommit, _offsetCommitCallback);
}
}
代码示例来源:origin: authorjapps/zerocode
public static void handleCommitSyncAsync(Consumer<Long, String> consumer,
ConsumerCommonConfigs consumerCommonConfigs,
ConsumerLocalConfigs consumeLocalTestProps) {
if (consumeLocalTestProps == null) {
LOGGER.warn("[No local test configs]-Kafka client neither did `commitAsync()` nor `commitSync()`");
return;
}
Boolean effectiveCommitSync;
Boolean effectiveCommitAsync;
Boolean localCommitSync = consumeLocalTestProps.getCommitSync();
Boolean localCommitAsync = consumeLocalTestProps.getCommitAsync();
if (localCommitSync == null && localCommitAsync == null) {
effectiveCommitSync = consumerCommonConfigs.getCommitSync();
effectiveCommitAsync = consumerCommonConfigs.getCommitAsync();
} else {
effectiveCommitSync = localCommitSync;
effectiveCommitAsync = localCommitAsync;
}
if (effectiveCommitSync != null && effectiveCommitSync == true) {
consumer.commitSync();
} else if (effectiveCommitAsync != null && effectiveCommitAsync == true) {
consumer.commitAsync();
} else {
LOGGER.warn("Kafka client neither configured for `commitAsync()` nor `commitSync()`");
}
// --------------------------------------------------------
// Leave this to the user to "commit" the offset explicitly
// --------------------------------------------------------
}
内容来源于网络,如有侵权,请联系作者删除!