org.apache.kafka.clients.consumer.Consumer.position()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(368)

本文整理了Java中org.apache.kafka.clients.consumer.Consumer.position()方法的一些代码示例,展示了Consumer.position()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.position()方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:position

Consumer.position介绍

[英]Returns the fetch position of the next message for the specified topic partition to be used on the next #poll(long)
[中]返回指定主题分区的下一条消息的获取位置,该分区将在下一次轮询(长)中使用

代码示例

代码示例来源:origin: confluentinc/ksql

public long getCommandTopicConsumerPosition() {
 return commandConsumer.position(commandTopicPartition);
}

代码示例来源:origin: openzipkin/brave

@Override public long position(TopicPartition partition) {
 return delegate.position(partition);
}

代码示例来源:origin: openzipkin/brave

public long position(TopicPartition partition, Duration timeout) {
 return delegate.position(partition, timeout);
}

代码示例来源:origin: apache/storm

private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) {
  Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
  for (TopicPartition tp : assignedPartitions) {
    offsetsToCommit.put(tp, new OffsetAndMetadata(consumer.position(tp), commitMetadataManager.getCommitMetadata()));
  }
  return offsetsToCommit;
}

代码示例来源:origin: linkedin/cruise-control

/**
 * The check if the consumption is done or not. The consumption is done if the consumer has caught up with the
 * log end or all the partitions are paused.
 * @param endOffsets the log end for each partition.
 * @return true if the consumption is done, false otherwise.
 */
private boolean consumptionDone(Map<TopicPartition, Long> endOffsets) {
 Set<TopicPartition> partitionsNotPaused = new HashSet<>(_metricConsumer.assignment());
 partitionsNotPaused.removeAll(_metricConsumer.paused());
 for (TopicPartition tp : partitionsNotPaused) {
  if (_metricConsumer.position(tp) < endOffsets.get(tp)) {
   return false;
  }
 }
 return true;
}

代码示例来源:origin: apache/hive

/**
 * Poll more records from the Kafka Broker.
 *
 * @throws PollTimeoutException if poll returns 0 record  and consumer's position < requested endOffset.
 */
private void pollRecords() {
 if (LOG.isTraceEnabled()) {
  stopwatch.reset().start();
 }
 records = consumer.poll(pollTimeoutDurationMs);
 if (LOG.isTraceEnabled()) {
  stopwatch.stop();
  LOG.trace("Pulled [{}] records in [{}] ms", records.count(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
 }
 // Fail if we can not poll within one lap of pollTimeoutMs.
 if (records.isEmpty() && consumer.position(topicPartition) < endOffset) {
  throw new PollTimeoutException(String.format(ERROR_POLL_TIMEOUT_FORMAT,
    pollTimeoutMs,
    topicPartition.toString(),
    startOffset,
    consumer.position(topicPartition),
    endOffset));
 }
 consumerRecordIterator = records.iterator();
 consumerPosition = consumer.position(topicPartition);
}

代码示例来源:origin: apache/hive

this.endOffset = consumer.position(topicPartition);
 LOG.info("End Offset set to [{}]", this.endOffset);
} else {
 LOG.info("Seeking to offset [{}] of topic partition [{}]", requestedStartOffset, topicPartition);
 consumer.seek(topicPartition, requestedStartOffset);
 this.startOffset = consumer.position(topicPartition);
 if (this.startOffset != requestedStartOffset) {
  LOG.warn("Current Start Offset [{}] is different form the requested start position [{}]",
 this.startOffset = consumer.position(topicPartition);
 LOG.info("Consumer at beginning of topic partition [{}], current start offset [{}]",
   topicPartition,
consumerPosition = consumer.position(topicPartition);
Preconditions.checkState(this.endOffset >= consumerPosition,
  "End offset [%s] need to be greater or equal than start offset [%s]",

代码示例来源:origin: apache/incubator-gobblin

@Override
public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
 TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
 this.consumer.assign(Collections.singletonList(topicPartition));
 this.consumer.seekToEnd(topicPartition);
 return this.consumer.position(topicPartition);
}

代码示例来源:origin: apache/storm

long position = consumer.position(tp);
long committedOffset = tpOffset.getValue().offset();
if (position < committedOffset) {

代码示例来源:origin: apache/incubator-gobblin

@Override
public long getEarliestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
 TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
 this.consumer.assign(Collections.singletonList(topicPartition));
 this.consumer.seekToBeginning(topicPartition);
 return this.consumer.position(topicPartition);
}

代码示例来源:origin: apache/storm

consumer.seekToEnd(Collections.singleton(tp));
  tpToFirstSeekOffset.put(tp, consumer.position(tp));
} else if (lastBatchMeta != null) {
  consumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset after last offset from previous batch
final long fetchOffset = consumer.position(tp);
LOG.debug("Set [fetchOffset = {}] for partition [{}]", fetchOffset, tp);
return fetchOffset;

代码示例来源:origin: apache/storm

long lastEmittedOffset = consumer.position(currBatchTp) - 1;
currentBatch = new KafkaTridentSpoutBatchMetadata(lastEmittedOffset, lastEmittedOffset, topologyContext.getStormId());

代码示例来源:origin: org.apache.kafka/kafka_2.12

public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, long shiftBy) {
  final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
  final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
  final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
  for (final TopicPartition topicPartition : inputTopicPartitions) {
    final long position = client.position(topicPartition);
    final long offset = position + shiftBy;
    topicPartitionsAndOffset.put(topicPartition, offset);
  }
  final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
    checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
  for (final TopicPartition topicPartition : inputTopicPartitions) {
    client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
  }
}

代码示例来源:origin: org.apache.kafka/kafka_2.11

public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, long shiftBy) {
  final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
  final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
  final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
  for (final TopicPartition topicPartition : inputTopicPartitions) {
    final long position = client.position(topicPartition);
    final long offset = position + shiftBy;
    topicPartitionsAndOffset.put(topicPartition, offset);
  }
  final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
    checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
  for (final TopicPartition topicPartition : inputTopicPartitions) {
    client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
  }
}

代码示例来源:origin: org.apache.kafka/kafka

public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, long shiftBy) {
  final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
  final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
  final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
  for (final TopicPartition topicPartition : inputTopicPartitions) {
    final long position = client.position(topicPartition);
    final long offset = position + shiftBy;
    topicPartitionsAndOffset.put(topicPartition, offset);
  }
  final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
    checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
  for (final TopicPartition topicPartition : inputTopicPartitions) {
    client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
  }
}

代码示例来源:origin: apache/metron

@Override
public String getSampleMessage(final String topic) {
 String message = null;
 if (listTopics().contains(topic)) {
  try (Consumer<String, String> kafkaConsumer = kafkaConsumerFactory.createConsumer()) {
   kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
    .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
    .collect(Collectors.toList()));
   kafkaConsumer.assignment().stream()
    .filter(p -> (kafkaConsumer.position(p) - 1) >= 0)
    .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
   final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT);
   message = records.isEmpty() ? null : records.iterator().next().value();
   kafkaConsumer.unsubscribe();
  }
 }
 return message;
}

代码示例来源:origin: org.apache.kafka/kafka_2.11

System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + client.position(p));

代码示例来源:origin: org.apache.kafka/kafka

System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + client.position(p));

代码示例来源:origin: spring-projects/spring-kafka

Consumer<Integer, String> consumer = cf.createConsumer();
consumer.assign(Arrays.asList(new TopicPartition(topic9, 0), new TopicPartition(topic9, 1)));
assertThat(consumer.position(new TopicPartition(topic9, 0))).isEqualTo(2);
assertThat(consumer.position(new TopicPartition(topic9, 1))).isEqualTo(2);
container.stop();
consumer.close();

代码示例来源:origin: org.apache.kafka/kafka_2.12

System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + client.position(p));

相关文章