本文整理了Java中org.apache.kafka.clients.consumer.Consumer.resume()
方法的一些代码示例,展示了Consumer.resume()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.resume()
方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:resume
暂无
代码示例来源:origin: openzipkin/brave
@Override public void resume(Collection<TopicPartition> partitions) {
delegate.resume(partitions);
}
代码示例来源:origin: apache/storm
private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
Set<TopicPartition> pausedPartitions = new HashSet<>(consumer.assignment());
pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
try {
consumer.pause(pausedPartitions);
final ConsumerRecords<K, V> consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
final int numPolledRecords = consumerRecords.count();
LOG.debug("Polled [{}] records from Kafka",
numPolledRecords);
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(consumer.assignment());
consumer.commitSync(offsetsToCommit);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
return consumerRecords;
} finally {
consumer.resume(pausedPartitions);
}
}
代码示例来源:origin: apache/nifi
/**
* Execute poll using pause API just for sending heartbeat, not polling messages.
*/
void retainConnection() {
pollingLock.lock();
TopicPartition[] assignments = null;
try {
final Set<TopicPartition> assignmentSet = kafkaConsumer.assignment();
if (assignmentSet.isEmpty()) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Pausing " + assignmentSet);
}
assignments = assignmentSet.toArray(new TopicPartition[assignmentSet.size()]);
kafkaConsumer.pause(assignments);
kafkaConsumer.poll(0);
if (logger.isDebugEnabled()) {
logger.debug("Resuming " + assignments);
}
} finally {
try {
if (assignments != null) {
kafkaConsumer.resume(assignments);
}
} finally {
pollingLock.unlock();
}
}
}
代码示例来源:origin: apache/storm
consumer.resume(pausedTopicPartitions);
LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
代码示例来源:origin: linkedin/cruise-control
LOG.debug("Starting consuming from metrics reporter topic partitions {}.", _metricConsumer.assignment());
_metricConsumer.resume(_metricConsumer.paused());
int totalMetricsAdded = 0;
long maxTimeStamp = -1L;
代码示例来源:origin: apache/storm
consumer.resume(pausedTopicPartitions);
LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
代码示例来源:origin: linkedin/li-apache-kafka-clients
@Override
public void resume(Collection<TopicPartition> partitions) {
_kafkaConsumer.resume(partitions);
}
代码示例来源:origin: rayokota/kafka-graphs
@Override
public void resume(Collection<TopicPartition> partitions) {
kafkaConsumer.resume(partitions);
}
代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client
@Override
public void resume(Collection<TopicPartition> partitions) {
consumer.resume(partitions);
}
代码示例来源:origin: opentracing-contrib/java-kafka-client
@Override
public void resume(Collection<TopicPartition> partitions) {
consumer.resume(partitions);
}
代码示例来源:origin: io.zipkin.brave/brave-instrumentation-kafka-clients
@Override public void resume(Collection<TopicPartition> partitions) {
delegate.resume(partitions);
}
代码示例来源:origin: spring-projects/spring-kafka
resumeLatch.countDown();
return null;
}).given(consumer).resume(records.keySet());
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
new TopicPartitionInitialOffset("foo", 0) };
代码示例来源:origin: vert-x3/vertx-kafka-client
@Override
public KafkaReadStream<K, V> resume(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Void>> completionHandler) {
this.submitTask((consumer, future) -> {
consumer.resume(topicPartitions);
if (future != null) {
future.complete();
}
}, completionHandler);
return this;
}
代码示例来源:origin: com.cerner.common.kafka/common-kafka
/**
* Check if the partition should be un-paused
*
* @param currentTime
* the current time since epoch
*/
public void maybeUnpause(long currentTime) {
if (!paused) {
LOGGER.debug("Partition [{}] not paused. Nothing to do", topicPartition);
return;
}
if (currentTime >= pausedTillTime) {
if(LOGGER.isInfoEnabled()){
LOGGER.info("Unpausing partition [{}] as the current time [{}] is >= paused time [{}]",
new Object[] { topicPartition, new Date(currentTime), new Date(pausedTillTime) });
}
// This method does not throw a KafkaException
consumer.resume(Collections.singleton(topicPartition));
PAUSED_PARTITIONS.dec();
paused = false;
// Reset successful results to 100% successful
resetResults();
}
else{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Not unpausing partition [{}] as the current time [{}] is < paused time [{}]",
topicPartition, currentTime, pausedTillTime);
}
}
代码示例来源:origin: org.apache.samza/samza-kafka
kafkaConsumer.resume(topicPartitionsToPoll);
records = kafkaConsumer.poll(timeoutMs);
代码示例来源:origin: apache/samza
kafkaConsumer.resume(topicPartitionsToPoll);
records = kafkaConsumer.poll(timeoutMs);
代码示例来源:origin: org.apache.samza/samza-kafka_2.11
kafkaConsumer.resume(topicPartitionsToPoll);
records = kafkaConsumer.poll(timeoutMs);
代码示例来源:origin: org.apache.nifi/nifi-kafka-0-9-processors
/**
* Execute poll using pause API just for sending heartbeat, not polling messages.
*/
void retainConnection() {
pollingLock.lock();
TopicPartition[] assignments = null;
try {
final Set<TopicPartition> assignmentSet = kafkaConsumer.assignment();
if (assignmentSet.isEmpty()) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Pausing " + assignmentSet);
}
assignments = assignmentSet.toArray(new TopicPartition[assignmentSet.size()]);
kafkaConsumer.pause(assignments);
kafkaConsumer.poll(0);
if (logger.isDebugEnabled()) {
logger.debug("Resuming " + assignments);
}
} finally {
try {
if (assignments != null) {
kafkaConsumer.resume(assignments);
}
} finally {
pollingLock.unlock();
}
}
}
代码示例来源:origin: org.apache.kafka/kafka-streams
/**
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
boolean updateNewAndRestoringTasks() {
active.initializeNewTasks();
standby.initializeNewTasks();
final Collection<TopicPartition> restored = changelogReader.restore(active);
active.updateRestored(restored);
if (active.allTasksRunning()) {
final Set<TopicPartition> assignment = consumer.assignment();
log.trace("Resuming partitions {}", assignment);
consumer.resume(assignment);
assignStandbyPartitions();
return true;
}
return false;
}
代码示例来源:origin: reactor/reactor-kafka
@Test
public void consumerMethods() throws Exception {
testConsumerMethod(c -> assertEquals(this.assignedPartitions, c.assignment()));
testConsumerMethod(c -> assertEquals(Collections.singleton(topic), c.subscription()));
testConsumerMethod(c -> assertEquals(2, c.partitionsFor(topics.get(2)).size()));
testConsumerMethod(c -> assertEquals(topics.size(), c.listTopics().size()));
testConsumerMethod(c -> assertEquals(0, c.metrics().size()));
testConsumerMethod(c -> {
Collection<TopicPartition> partitions = Collections.singleton(new TopicPartition(topic, 1));
c.pause(partitions);
assertEquals(partitions, c.paused());
c.resume(partitions);
});
testConsumerMethod(c -> {
TopicPartition partition = new TopicPartition(topic, 1);
Collection<TopicPartition> partitions = Collections.singleton(partition);
long position = c.position(partition);
c.seekToBeginning(partitions);
assertEquals(0, c.position(partition));
c.seekToEnd(partitions);
assertTrue("Did not seek to end", c.position(partition) > 0);
c.seek(partition, position);
});
}
内容来源于网络,如有侵权,请联系作者删除!