Fetches data for the subscribed list of topics and partitions
public Iterable<ConsumerRecord<CommandId, Command>> getNewCommands(final Duration timeout) {
return commandConsumer.poll(timeout);
public List<QueuedCommand> getRestoreCommands(final Duration duration) {
final List<QueuedCommand> restoreCommands = Lists.newArrayList();
log.debug("Reading prior command records");
ConsumerRecords<CommandId, Command> records =
while (!records.isEmpty()) {
log.debug("Received {} records from poll", records.count());
for (final ConsumerRecord<CommandId, Command> record : records) {
if (record.value() == null) {
new QueuedCommand(
records = commandConsumer.poll(duration);
return restoreCommands;
final ConsumerRecords<String, String> records = this.consumer.poll(10000);
final Record recordToProcess = null;
for (final ConsumerRecord<String, String> record : records) {
* Return the next record when available. Will never time out since this is a streaming source.
public RecordEnvelope<D> readRecordEnvelopeImpl()
throws DataRecordException, IOException {
if (!_isStarted.get()) {
throw new IOException("Streaming extractor has not been started.");
while ((_records == null) || (!_records.hasNext())) {
synchronized (_consumer) {
if (_close.get()) {
throw new ClosedChannelException();
_records = _consumer.poll(this.fetchTimeOut).iterator();
ConsumerRecord<S, D> record = _records.next();
return new RecordEnvelope<D>(record.value(), new KafkaWatermark(_partition, new LongWatermark(record.offset())));
public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset) {
if (nextOffset > maxOffset) {
return null;
this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
return Iterators.transform(consumerRecords.iterator(), new Function<ConsumerRecord<K, V>, KafkaConsumerRecord>() {
public KafkaConsumerRecord apply(ConsumerRecord<K, V> input) {
return new Kafka09ConsumerRecord<>(input);
* Executes a poll on the underlying Kafka Consumer and creates any new
* flowfiles necessary or appends to existing ones if in demarcation mode.
void poll() {
* Implementation note: If we take too long (30 secs?) between kafka
* poll calls and our own record processing to any subsequent poll calls
* or the commit we can run into a situation where the commit will
* succeed to the session but fail on committing offsets. This is
* apparently different than the Kafka scenario of electing to rebalance
* for other reasons but in this case is due a session timeout. It
* appears Kafka KIP-62 aims to offer more control over the meaning of
* various timeouts. If we do run into this case it could result in
* duplicates.
* This can be avoided by calling retainConnection() periodically.
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
lastPollEmpty = records.count() == 0;
} catch (final Throwable t) {
throw t;
} finally {
* Poll more records from the Kafka Broker.
* @throws PollTimeoutException if poll returns 0 record and consumer's position < requested endOffset.
private void pollRecords() {
if (LOG.isTraceEnabled()) {
records = consumer.poll(pollTimeoutDurationMs);
if (LOG.isTraceEnabled()) {
LOG.trace("Pulled [{}] records in [{}] ms", records.count(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
// Fail if we can not poll within one lap of pollTimeoutMs.
if (records.isEmpty() && consumer.position(topicPartition) < endOffset) {
throw new PollTimeoutException(String.format(ERROR_POLL_TIMEOUT_FORMAT,
consumerRecordIterator = records.iterator();
consumerPosition = consumer.position(topicPartition);
* Executes a poll on the underlying Kafka Consumer and creates any new
* flowfiles necessary or appends to existing ones if in demarcation mode.
void poll() {
* Implementation note:
* Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
* for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
* If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
* Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
* if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
* This behavior has been fixed via Kafka KIP-62 and available from Kafka client
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
lastPollEmpty = records.count() == 0;
} catch (final ProcessException pe) {
throw pe;
} catch (final Throwable t) {
throw t;
* Executes a poll on the underlying Kafka Consumer and creates any new
* flowfiles necessary or appends to existing ones if in demarcation mode.
void poll() {
* Implementation note:
* Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
* for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
* If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
* Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
* if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
* This behavior has been fixed via Kafka KIP-62 and available from Kafka client
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
lastPollEmpty = records.count() == 0;
} catch (final ProcessException pe) {
throw pe;
} catch (final Throwable t) {
throw t;
* Execute poll using pause API just for sending heartbeat, not polling messages.
void retainConnection() {
TopicPartition[] assignments = null;
try {
final Set<TopicPartition> assignmentSet = kafkaConsumer.assignment();
if (assignmentSet.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("Pausing " + assignmentSet);
assignments = assignmentSet.toArray(new TopicPartition[assignmentSet.size()]);
if (logger.isDebugEnabled()) {
logger.debug("Resuming " + assignments);
} finally {
try {
if (assignments != null) {
} finally {
* Executes a poll on the underlying Kafka Consumer and creates any new
* flowfiles necessary or appends to existing ones if in demarcation mode.
void poll() {
* Implementation note:
* Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
* for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
* If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
* Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
* if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
* This behavior has been fixed via Kafka KIP-62 and available from Kafka client
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
lastPollEmpty = records.count() == 0;
} catch (final ProcessException pe) {
throw pe;
} catch (final Throwable t) {
throw t;
public void shouldGetNewCommandsIteratorCorrectly() {
// Given:
// When:
final Iterable<ConsumerRecord<CommandId, Command>> newCommands = commandTopic
// Then:
assertThat(newCommands, sameInstance(consumerRecords));
* Executes a poll on the underlying Kafka Consumer and creates any new
* flowfiles necessary or appends to existing ones if in demarcation mode.
void poll() {
* Implementation note:
* Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
* for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
* If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
* Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
* if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
* This behavior has been fixed via Kafka KIP-62 and available from Kafka client
try {
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
lastPollEmpty = records.count() == 0;
} catch (final ProcessException pe) {
throw pe;
} catch (final Throwable t) {
throw t;
private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
Set<TopicPartition> pausedPartitions = new HashSet<>(consumer.assignment());
try {
final ConsumerRecords<K, V> consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
final int numPolledRecords = consumerRecords.count();
LOG.debug("Polled [{}] records from Kafka",
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
return consumerRecords;
} finally {
代码示例来源:origin: apache/storm
final List<ConsumerRecord<K, V>> records = consumer.poll(pollTimeoutMs).records(currBatchTp);
LOG.debug("Polled [{}] records from Kafka.", records.size());
代码示例来源:origin: apache/storm
consumer.seek(currBatchTp, seekOffset);
final ConsumerRecords<K, V> records = consumer.poll(pollTimeoutMs);
LOG.debug("Polled [{}] records from Kafka.", records.count());
代码示例来源:origin: confluentinc/ksql
public void shouldFilterNullCommandsWhileRestoringCommands() {
// Given:
new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
new ConsumerRecord<>("topic", 0, 0, commandId2, command2),
new ConsumerRecord<>("topic", 0, 0, commandId2, null)
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
// When:
final List<QueuedCommand> queuedCommandList = commandTopic
// Then:
assertThat(queuedCommandList, equalTo(ImmutableList.of(
new QueuedCommand(commandId1, command1, Optional.empty()),
new QueuedCommand(commandId2, command2, Optional.empty()))));
代码示例来源:origin: confluentinc/ksql
public void shouldGetRestoreCommandsCorrectly() {
// Given:
new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
new ConsumerRecord<>("topic", 0, 0, commandId2, command2)))
new ConsumerRecord<>("topic", 0, 0, commandId3, command3)))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
// When:
final List<QueuedCommand> queuedCommandList = commandTopic
// Then:
equalTo(Collections.singletonList(new TopicPartition(COMMAND_TOPIC_NAME, 0))));
assertThat(queuedCommandList, equalTo(ImmutableList.of(
new QueuedCommand(commandId1, command1, Optional.empty()),
new QueuedCommand(commandId2, command2, Optional.empty()),
new QueuedCommand(commandId3, command3, Optional.empty()))));
代码示例来源:origin: confluentinc/ksql
public void shouldGetRestoreCommandsCorrectlyWithDuplicateKeys() {
// Given:
new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
new ConsumerRecord<>("topic", 0, 0, commandId2, command2)))
new ConsumerRecord<>("topic", 0, 0, commandId2, command3),
new ConsumerRecord<>("topic", 0, 0, commandId3, command3)))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
// When:
final List<QueuedCommand> queuedCommandList = commandTopic
// Then:
assertThat(queuedCommandList, equalTo(ImmutableList.of(
new QueuedCommand(commandId1, command1, Optional.empty()),
new QueuedCommand(commandId2, command2, Optional.empty()),
new QueuedCommand(commandId2, command3, Optional.empty()),
new QueuedCommand(commandId3, command3, Optional.empty()))));