org.apache.kafka.clients.consumer.Consumer.poll()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(13.7k)|赞(0)|评价(0)|浏览(307)

本文整理了Java中org.apache.kafka.clients.consumer.Consumer.poll()方法的一些代码示例,展示了Consumer.poll()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.poll()方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:poll

Consumer.poll介绍

[英]Fetches data for the subscribed list of topics and partitions
[中]获取订阅的主题和分区列表的数据

代码示例

代码示例来源:origin: confluentinc/ksql

public Iterable<ConsumerRecord<CommandId, Command>> getNewCommands(final Duration timeout) {
 return commandConsumer.poll(timeout);
}

代码示例来源:origin: confluentinc/ksql

consumer.poll(POLL_TIMEOUT).forEach(acquired::add);
consumer.poll(POLL_TIMEOUT).forEach(acquired::add);

代码示例来源:origin: confluentinc/ksql

public List<QueuedCommand> getRestoreCommands(final Duration duration) {
 final List<QueuedCommand> restoreCommands = Lists.newArrayList();
 commandConsumer.seekToBeginning(
   Collections.singletonList(commandTopicPartition));
 log.debug("Reading prior command records");
 ConsumerRecords<CommandId, Command> records =
   commandConsumer.poll(duration);
 while (!records.isEmpty()) {
  log.debug("Received {} records from poll", records.count());
  for (final ConsumerRecord<CommandId, Command> record : records) {
   if (record.value() == null) {
    continue;
   }
   restoreCommands.add(
     new QueuedCommand(
       record.key(),
       record.value(),
       Optional.empty()));
  }
  records = commandConsumer.poll(duration);
 }
 return restoreCommands;
}

代码示例来源:origin: azkaban/azkaban

this.consumerSubscriptionRebalance();
final ConsumerRecords<String, String> records = this.consumer.poll(10000);
final Record recordToProcess = null;
for (final ConsumerRecord<String, String> record : records) {

代码示例来源:origin: apache/incubator-gobblin

/**
 * Return the next record when available. Will never time out since this is a streaming source.
 */
@Override
public RecordEnvelope<D> readRecordEnvelopeImpl()
  throws DataRecordException, IOException {
 if (!_isStarted.get()) {
  throw new IOException("Streaming extractor has not been started.");
 }
 while ((_records == null) || (!_records.hasNext())) {
  synchronized (_consumer) {
   if (_close.get()) {
    throw new ClosedChannelException();
   }
   _records = _consumer.poll(this.fetchTimeOut).iterator();
  }
 }
 ConsumerRecord<S, D> record = _records.next();
 _rowCount.getAndIncrement();
 return new RecordEnvelope<D>(record.value(), new KafkaWatermark(_partition, new LongWatermark(record.offset())));
}

代码示例来源:origin: apache/incubator-gobblin

@Override
public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset) {
 if (nextOffset > maxOffset) {
  return null;
 }
 this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
 this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
 ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
 return Iterators.transform(consumerRecords.iterator(), new Function<ConsumerRecord<K, V>, KafkaConsumerRecord>() {
  @Override
  public KafkaConsumerRecord apply(ConsumerRecord<K, V> input) {
   return new Kafka09ConsumerRecord<>(input);
  }
 });
}

代码示例来源:origin: apache/nifi

/**
 * Executes a poll on the underlying Kafka Consumer and creates any new
 * flowfiles necessary or appends to existing ones if in demarcation mode.
 */
void poll() {
  /**
   * Implementation note: If we take too long (30 secs?) between kafka
   * poll calls and our own record processing to any subsequent poll calls
   * or the commit we can run into a situation where the commit will
   * succeed to the session but fail on committing offsets. This is
   * apparently different than the Kafka scenario of electing to rebalance
   * for other reasons but in this case is due a session timeout. It
   * appears Kafka KIP-62 aims to offer more control over the meaning of
   * various timeouts. If we do run into this case it could result in
   * duplicates.
   * This can be avoided by calling retainConnection() periodically.
   */
  pollingLock.lock();
  try {
    final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
    lastPollEmpty = records.count() == 0;
    processRecords(records);
  } catch (final Throwable t) {
    this.poison();
    throw t;
  } finally {
    pollingLock.unlock();
  }
}

代码示例来源:origin: apache/hive

/**
 * Poll more records from the Kafka Broker.
 *
 * @throws PollTimeoutException if poll returns 0 record  and consumer's position < requested endOffset.
 */
private void pollRecords() {
 if (LOG.isTraceEnabled()) {
  stopwatch.reset().start();
 }
 records = consumer.poll(pollTimeoutDurationMs);
 if (LOG.isTraceEnabled()) {
  stopwatch.stop();
  LOG.trace("Pulled [{}] records in [{}] ms", records.count(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
 }
 // Fail if we can not poll within one lap of pollTimeoutMs.
 if (records.isEmpty() && consumer.position(topicPartition) < endOffset) {
  throw new PollTimeoutException(String.format(ERROR_POLL_TIMEOUT_FORMAT,
    pollTimeoutMs,
    topicPartition.toString(),
    startOffset,
    consumer.position(topicPartition),
    endOffset));
 }
 consumerRecordIterator = records.iterator();
 consumerPosition = consumer.position(topicPartition);
}

代码示例来源:origin: apache/nifi

/**
 * Executes a poll on the underlying Kafka Consumer and creates any new
 * flowfiles necessary or appends to existing ones if in demarcation mode.
 */
void poll() {
  /**
   * Implementation note:
   * Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
   * for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
   * If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
   * Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
   * if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
   * This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
   */
  try {
    final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
    lastPollEmpty = records.count() == 0;
    processRecords(records);
  } catch (final ProcessException pe) {
    throw pe;
  } catch (final Throwable t) {
    this.poison();
    throw t;
  }
}

代码示例来源:origin: apache/nifi

/**
 * Executes a poll on the underlying Kafka Consumer and creates any new
 * flowfiles necessary or appends to existing ones if in demarcation mode.
 */
void poll() {
  /**
   * Implementation note:
   * Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
   * for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
   * If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
   * Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
   * if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
   * This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
   */
  try {
    final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
    lastPollEmpty = records.count() == 0;
    processRecords(records);
  } catch (final ProcessException pe) {
    throw pe;
  } catch (final Throwable t) {
    this.poison();
    throw t;
  }
}

代码示例来源:origin: apache/nifi

/**
 * Execute poll using pause API just for sending heartbeat, not polling messages.
 */
void retainConnection() {
  pollingLock.lock();
  TopicPartition[] assignments = null;
  try {
    final Set<TopicPartition> assignmentSet = kafkaConsumer.assignment();
    if (assignmentSet.isEmpty()) {
      return;
    }
    if (logger.isDebugEnabled()) {
      logger.debug("Pausing " + assignmentSet);
    }
    assignments = assignmentSet.toArray(new TopicPartition[assignmentSet.size()]);
    kafkaConsumer.pause(assignments);
    kafkaConsumer.poll(0);
    if (logger.isDebugEnabled()) {
      logger.debug("Resuming " + assignments);
    }
  } finally {
    try {
      if (assignments != null) {
        kafkaConsumer.resume(assignments);
      }
    } finally {
      pollingLock.unlock();
    }
  }
}

代码示例来源:origin: apache/nifi

/**
 * Executes a poll on the underlying Kafka Consumer and creates any new
 * flowfiles necessary or appends to existing ones if in demarcation mode.
 */
void poll() {
  /**
   * Implementation note:
   * Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
   * for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
   * If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
   * Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
   * if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
   * This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
   */
  try {
    final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
    lastPollEmpty = records.count() == 0;
    processRecords(records);
  } catch (final ProcessException pe) {
    throw pe;
  } catch (final Throwable t) {
    this.poison();
    throw t;
  }
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldGetNewCommandsIteratorCorrectly() {
 // Given:
 when(commandConsumer.poll(any(Duration.class))).thenReturn(consumerRecords);
 // When:
 final Iterable<ConsumerRecord<CommandId, Command>> newCommands = commandTopic
   .getNewCommands(Duration.ofHours(1));
 // Then:
 assertThat(newCommands, sameInstance(consumerRecords));
}

代码示例来源:origin: apache/nifi

/**
 * Executes a poll on the underlying Kafka Consumer and creates any new
 * flowfiles necessary or appends to existing ones if in demarcation mode.
 */
void poll() {
  /**
   * Implementation note:
   * Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
   * for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
   * If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
   * Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
   * if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
   * This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
   */
  try {
    final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
    lastPollEmpty = records.count() == 0;
    processRecords(records);
  } catch (final ProcessException pe) {
    throw pe;
  } catch (final Throwable t) {
    this.poison();
    throw t;
  }
}

代码示例来源:origin: apache/storm

private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
  doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
  Set<TopicPartition> pausedPartitions = new HashSet<>(consumer.assignment());
  pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
  try {
    consumer.pause(pausedPartitions);
    final ConsumerRecords<K, V> consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
    ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
    final int numPolledRecords = consumerRecords.count();
    LOG.debug("Polled [{}] records from Kafka",
      numPolledRecords);
    if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
      //Commit polled records immediately to ensure delivery is at-most-once.
      Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
        createFetchedOffsetsMetadata(consumer.assignment());
      consumer.commitSync(offsetsToCommit);
      LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
    }
    return consumerRecords;
  } finally {
    consumer.resume(pausedPartitions);
  }
}

代码示例来源:origin: apache/storm

final List<ConsumerRecord<K, V>> records = consumer.poll(pollTimeoutMs).records(currBatchTp);
LOG.debug("Polled [{}] records from Kafka.", records.size());

代码示例来源:origin: apache/storm

consumer.seek(currBatchTp, seekOffset);
final ConsumerRecords<K, V> records = consumer.poll(pollTimeoutMs);
LOG.debug("Polled [{}] records from Kafka.", records.count());

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldFilterNullCommandsWhileRestoringCommands() {
 // Given:
 when(commandConsumer.poll(any(Duration.class)))
   .thenReturn(someConsumerRecords(
     new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
     new ConsumerRecord<>("topic", 0, 0, commandId2, command2),
     new ConsumerRecord<>("topic", 0, 0, commandId2, null)
   ))
   .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
 // When:
 final List<QueuedCommand> queuedCommandList = commandTopic
   .getRestoreCommands(Duration.ofMillis(1));
 // Then:
 assertThat(queuedCommandList, equalTo(ImmutableList.of(
   new QueuedCommand(commandId1, command1, Optional.empty()),
   new QueuedCommand(commandId2, command2, Optional.empty()))));
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldGetRestoreCommandsCorrectly() {
 // Given:
 when(commandConsumer.poll(any(Duration.class)))
   .thenReturn(someConsumerRecords(
     new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
     new ConsumerRecord<>("topic", 0, 0, commandId2, command2)))
   .thenReturn(someConsumerRecords(
     new ConsumerRecord<>("topic", 0, 0, commandId3, command3)))
   .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
 // When:
 final List<QueuedCommand> queuedCommandList = commandTopic
   .getRestoreCommands(Duration.ofMillis(1));
 // Then:
 verify(commandConsumer).seekToBeginning(topicPartitionsCaptor.capture());
 assertThat(topicPartitionsCaptor.getValue(),
   equalTo(Collections.singletonList(new TopicPartition(COMMAND_TOPIC_NAME, 0))));
 assertThat(queuedCommandList, equalTo(ImmutableList.of(
   new QueuedCommand(commandId1, command1, Optional.empty()),
   new QueuedCommand(commandId2, command2, Optional.empty()),
   new QueuedCommand(commandId3, command3, Optional.empty()))));
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldGetRestoreCommandsCorrectlyWithDuplicateKeys() {
 // Given:
 when(commandConsumer.poll(any(Duration.class)))
   .thenReturn(someConsumerRecords(
     new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
     new ConsumerRecord<>("topic", 0, 0, commandId2, command2)))
   .thenReturn(someConsumerRecords(
     new ConsumerRecord<>("topic", 0, 0, commandId2, command3),
     new ConsumerRecord<>("topic", 0, 0, commandId3, command3)))
   .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
 // When:
 final List<QueuedCommand> queuedCommandList = commandTopic
   .getRestoreCommands(Duration.ofMillis(1));
 // Then:
 assertThat(queuedCommandList, equalTo(ImmutableList.of(
   new QueuedCommand(commandId1, command1, Optional.empty()),
   new QueuedCommand(commandId2, command2, Optional.empty()),
   new QueuedCommand(commandId2, command3, Optional.empty()),
   new QueuedCommand(commandId3, command3, Optional.empty()))));
}

相关文章