本文整理了Java中org.apache.kafka.clients.consumer.Consumer.poll()
方法的一些代码示例,展示了Consumer.poll()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.poll()
方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:poll
[英]Fetches data for the subscribed list of topics and partitions
[中]获取订阅的主题和分区列表的数据
代码示例来源:origin: confluentinc/ksql
public Iterable<ConsumerRecord<CommandId, Command>> getNewCommands(final Duration timeout) {
return commandConsumer.poll(timeout);
}
代码示例来源:origin: confluentinc/ksql
consumer.poll(POLL_TIMEOUT).forEach(acquired::add);
consumer.poll(POLL_TIMEOUT).forEach(acquired::add);
代码示例来源:origin: confluentinc/ksql
public List<QueuedCommand> getRestoreCommands(final Duration duration) {
final List<QueuedCommand> restoreCommands = Lists.newArrayList();
commandConsumer.seekToBeginning(
Collections.singletonList(commandTopicPartition));
log.debug("Reading prior command records");
ConsumerRecords<CommandId, Command> records =
commandConsumer.poll(duration);
while (!records.isEmpty()) {
log.debug("Received {} records from poll", records.count());
for (final ConsumerRecord<CommandId, Command> record : records) {
if (record.value() == null) {
continue;
}
restoreCommands.add(
new QueuedCommand(
record.key(),
record.value(),
Optional.empty()));
}
records = commandConsumer.poll(duration);
}
return restoreCommands;
}
代码示例来源:origin: azkaban/azkaban
this.consumerSubscriptionRebalance();
final ConsumerRecords<String, String> records = this.consumer.poll(10000);
final Record recordToProcess = null;
for (final ConsumerRecord<String, String> record : records) {
代码示例来源:origin: apache/incubator-gobblin
/**
* Return the next record when available. Will never time out since this is a streaming source.
*/
@Override
public RecordEnvelope<D> readRecordEnvelopeImpl()
throws DataRecordException, IOException {
if (!_isStarted.get()) {
throw new IOException("Streaming extractor has not been started.");
}
while ((_records == null) || (!_records.hasNext())) {
synchronized (_consumer) {
if (_close.get()) {
throw new ClosedChannelException();
}
_records = _consumer.poll(this.fetchTimeOut).iterator();
}
}
ConsumerRecord<S, D> record = _records.next();
_rowCount.getAndIncrement();
return new RecordEnvelope<D>(record.value(), new KafkaWatermark(_partition, new LongWatermark(record.offset())));
}
代码示例来源:origin: apache/incubator-gobblin
@Override
public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset) {
if (nextOffset > maxOffset) {
return null;
}
this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
return Iterators.transform(consumerRecords.iterator(), new Function<ConsumerRecord<K, V>, KafkaConsumerRecord>() {
@Override
public KafkaConsumerRecord apply(ConsumerRecord<K, V> input) {
return new Kafka09ConsumerRecord<>(input);
}
});
}
代码示例来源:origin: apache/nifi
/**
* Executes a poll on the underlying Kafka Consumer and creates any new
* flowfiles necessary or appends to existing ones if in demarcation mode.
*/
void poll() {
/**
* Implementation note: If we take too long (30 secs?) between kafka
* poll calls and our own record processing to any subsequent poll calls
* or the commit we can run into a situation where the commit will
* succeed to the session but fail on committing offsets. This is
* apparently different than the Kafka scenario of electing to rebalance
* for other reasons but in this case is due a session timeout. It
* appears Kafka KIP-62 aims to offer more control over the meaning of
* various timeouts. If we do run into this case it could result in
* duplicates.
* This can be avoided by calling retainConnection() periodically.
*/
pollingLock.lock();
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
lastPollEmpty = records.count() == 0;
processRecords(records);
} catch (final Throwable t) {
this.poison();
throw t;
} finally {
pollingLock.unlock();
}
}
代码示例来源:origin: apache/hive
/**
* Poll more records from the Kafka Broker.
*
* @throws PollTimeoutException if poll returns 0 record and consumer's position < requested endOffset.
*/
private void pollRecords() {
if (LOG.isTraceEnabled()) {
stopwatch.reset().start();
}
records = consumer.poll(pollTimeoutDurationMs);
if (LOG.isTraceEnabled()) {
stopwatch.stop();
LOG.trace("Pulled [{}] records in [{}] ms", records.count(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
// Fail if we can not poll within one lap of pollTimeoutMs.
if (records.isEmpty() && consumer.position(topicPartition) < endOffset) {
throw new PollTimeoutException(String.format(ERROR_POLL_TIMEOUT_FORMAT,
pollTimeoutMs,
topicPartition.toString(),
startOffset,
consumer.position(topicPartition),
endOffset));
}
consumerRecordIterator = records.iterator();
consumerPosition = consumer.position(topicPartition);
}
代码示例来源:origin: apache/nifi
/**
* Executes a poll on the underlying Kafka Consumer and creates any new
* flowfiles necessary or appends to existing ones if in demarcation mode.
*/
void poll() {
/**
* Implementation note:
* Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
* for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
* If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
* Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
* if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
* This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
*/
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
lastPollEmpty = records.count() == 0;
processRecords(records);
} catch (final ProcessException pe) {
throw pe;
} catch (final Throwable t) {
this.poison();
throw t;
}
}
代码示例来源:origin: apache/nifi
/**
* Executes a poll on the underlying Kafka Consumer and creates any new
* flowfiles necessary or appends to existing ones if in demarcation mode.
*/
void poll() {
/**
* Implementation note:
* Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
* for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
* If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
* Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
* if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
* This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
*/
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
lastPollEmpty = records.count() == 0;
processRecords(records);
} catch (final ProcessException pe) {
throw pe;
} catch (final Throwable t) {
this.poison();
throw t;
}
}
代码示例来源:origin: apache/nifi
/**
* Execute poll using pause API just for sending heartbeat, not polling messages.
*/
void retainConnection() {
pollingLock.lock();
TopicPartition[] assignments = null;
try {
final Set<TopicPartition> assignmentSet = kafkaConsumer.assignment();
if (assignmentSet.isEmpty()) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Pausing " + assignmentSet);
}
assignments = assignmentSet.toArray(new TopicPartition[assignmentSet.size()]);
kafkaConsumer.pause(assignments);
kafkaConsumer.poll(0);
if (logger.isDebugEnabled()) {
logger.debug("Resuming " + assignments);
}
} finally {
try {
if (assignments != null) {
kafkaConsumer.resume(assignments);
}
} finally {
pollingLock.unlock();
}
}
}
代码示例来源:origin: apache/nifi
/**
* Executes a poll on the underlying Kafka Consumer and creates any new
* flowfiles necessary or appends to existing ones if in demarcation mode.
*/
void poll() {
/**
* Implementation note:
* Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
* for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
* If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
* Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
* if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
* This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
*/
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
lastPollEmpty = records.count() == 0;
processRecords(records);
} catch (final ProcessException pe) {
throw pe;
} catch (final Throwable t) {
this.poison();
throw t;
}
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldGetNewCommandsIteratorCorrectly() {
// Given:
when(commandConsumer.poll(any(Duration.class))).thenReturn(consumerRecords);
// When:
final Iterable<ConsumerRecord<CommandId, Command>> newCommands = commandTopic
.getNewCommands(Duration.ofHours(1));
// Then:
assertThat(newCommands, sameInstance(consumerRecords));
}
代码示例来源:origin: apache/nifi
/**
* Executes a poll on the underlying Kafka Consumer and creates any new
* flowfiles necessary or appends to existing ones if in demarcation mode.
*/
void poll() {
/**
* Implementation note:
* Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
* for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
* If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
* Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
* if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
* This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
*/
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
lastPollEmpty = records.count() == 0;
processRecords(records);
} catch (final ProcessException pe) {
throw pe;
} catch (final Throwable t) {
this.poison();
throw t;
}
}
代码示例来源:origin: apache/storm
private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
Set<TopicPartition> pausedPartitions = new HashSet<>(consumer.assignment());
pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
try {
consumer.pause(pausedPartitions);
final ConsumerRecords<K, V> consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
final int numPolledRecords = consumerRecords.count();
LOG.debug("Polled [{}] records from Kafka",
numPolledRecords);
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(consumer.assignment());
consumer.commitSync(offsetsToCommit);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
return consumerRecords;
} finally {
consumer.resume(pausedPartitions);
}
}
代码示例来源:origin: apache/storm
final List<ConsumerRecord<K, V>> records = consumer.poll(pollTimeoutMs).records(currBatchTp);
LOG.debug("Polled [{}] records from Kafka.", records.size());
代码示例来源:origin: apache/storm
consumer.seek(currBatchTp, seekOffset);
final ConsumerRecords<K, V> records = consumer.poll(pollTimeoutMs);
LOG.debug("Polled [{}] records from Kafka.", records.count());
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldFilterNullCommandsWhileRestoringCommands() {
// Given:
when(commandConsumer.poll(any(Duration.class)))
.thenReturn(someConsumerRecords(
new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
new ConsumerRecord<>("topic", 0, 0, commandId2, command2),
new ConsumerRecord<>("topic", 0, 0, commandId2, null)
))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
// When:
final List<QueuedCommand> queuedCommandList = commandTopic
.getRestoreCommands(Duration.ofMillis(1));
// Then:
assertThat(queuedCommandList, equalTo(ImmutableList.of(
new QueuedCommand(commandId1, command1, Optional.empty()),
new QueuedCommand(commandId2, command2, Optional.empty()))));
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldGetRestoreCommandsCorrectly() {
// Given:
when(commandConsumer.poll(any(Duration.class)))
.thenReturn(someConsumerRecords(
new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
new ConsumerRecord<>("topic", 0, 0, commandId2, command2)))
.thenReturn(someConsumerRecords(
new ConsumerRecord<>("topic", 0, 0, commandId3, command3)))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
// When:
final List<QueuedCommand> queuedCommandList = commandTopic
.getRestoreCommands(Duration.ofMillis(1));
// Then:
verify(commandConsumer).seekToBeginning(topicPartitionsCaptor.capture());
assertThat(topicPartitionsCaptor.getValue(),
equalTo(Collections.singletonList(new TopicPartition(COMMAND_TOPIC_NAME, 0))));
assertThat(queuedCommandList, equalTo(ImmutableList.of(
new QueuedCommand(commandId1, command1, Optional.empty()),
new QueuedCommand(commandId2, command2, Optional.empty()),
new QueuedCommand(commandId3, command3, Optional.empty()))));
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldGetRestoreCommandsCorrectlyWithDuplicateKeys() {
// Given:
when(commandConsumer.poll(any(Duration.class)))
.thenReturn(someConsumerRecords(
new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
new ConsumerRecord<>("topic", 0, 0, commandId2, command2)))
.thenReturn(someConsumerRecords(
new ConsumerRecord<>("topic", 0, 0, commandId2, command3),
new ConsumerRecord<>("topic", 0, 0, commandId3, command3)))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
// When:
final List<QueuedCommand> queuedCommandList = commandTopic
.getRestoreCommands(Duration.ofMillis(1));
// Then:
assertThat(queuedCommandList, equalTo(ImmutableList.of(
new QueuedCommand(commandId1, command1, Optional.empty()),
new QueuedCommand(commandId2, command2, Optional.empty()),
new QueuedCommand(commandId2, command3, Optional.empty()),
new QueuedCommand(commandId3, command3, Optional.empty()))));
}
内容来源于网络,如有侵权,请联系作者删除!