org.apache.kafka.clients.consumer.Consumer.assignment()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(236)

本文整理了Java中org.apache.kafka.clients.consumer.Consumer.assignment()方法的一些代码示例,展示了Consumer.assignment()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.assignment()方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:assignment

Consumer.assignment介绍

暂无

代码示例

代码示例来源:origin: openzipkin/brave

  1. @Override public Set<TopicPartition> assignment() {
  2. return delegate.assignment();
  3. }

代码示例来源:origin: apache/storm

  1. private void throwIfEmittingForUnassignedPartition(TopicPartition currBatchTp) {
  2. final Set<TopicPartition> assignments = consumer.assignment();
  3. if (!assignments.contains(currBatchTp)) {
  4. throw new IllegalStateException("The spout is asked to emit tuples on a partition it is not assigned."
  5. + " This indicates a bug in the TopicFilter or ManualPartitioner implementations."
  6. + " The current partition is [" + currBatchTp + "], the assigned partitions are [" + assignments + "].");
  7. }
  8. }

代码示例来源:origin: apache/storm

  1. private Collection<TopicPartition> pauseTopicPartitions(TopicPartition excludedTp) {
  2. final Set<TopicPartition> pausedTopicPartitions = new HashSet<>(consumer.assignment());
  3. LOG.debug("Currently assigned topic-partitions {}", pausedTopicPartitions);
  4. pausedTopicPartitions.remove(excludedTp);
  5. consumer.pause(pausedTopicPartitions);
  6. LOG.debug("Paused topic-partitions {}", pausedTopicPartitions);
  7. return pausedTopicPartitions;
  8. }

代码示例来源:origin: linkedin/cruise-control

  1. /**
  2. * The check if the consumption is done or not. The consumption is done if the consumer has caught up with the
  3. * log end or all the partitions are paused.
  4. * @param endOffsets the log end for each partition.
  5. * @return true if the consumption is done, false otherwise.
  6. */
  7. private boolean consumptionDone(Map<TopicPartition, Long> endOffsets) {
  8. Set<TopicPartition> partitionsNotPaused = new HashSet<>(_metricConsumer.assignment());
  9. partitionsNotPaused.removeAll(_metricConsumer.paused());
  10. for (TopicPartition tp : partitionsNotPaused) {
  11. if (_metricConsumer.position(tp) < endOffsets.get(tp)) {
  12. return false;
  13. }
  14. }
  15. return true;
  16. }

代码示例来源:origin: apache/storm

  1. /**
  2. * Assign partitions to the KafkaConsumer.
  3. * @param <K> The consumer key type
  4. * @param <V> The consumer value type
  5. * @param consumer The Kafka consumer to assign partitions to
  6. * @param newAssignment The partitions to assign.
  7. * @param listener The rebalance listener to call back on when the assignment changes
  8. */
  9. public <K, V> void assignPartitions(Consumer<K, V> consumer, Set<TopicPartition> newAssignment,
  10. ConsumerRebalanceListener listener) {
  11. Set<TopicPartition> currentAssignment = consumer.assignment();
  12. if (!newAssignment.equals(currentAssignment)) {
  13. listener.onPartitionsRevoked(currentAssignment);
  14. consumer.assign(newAssignment);
  15. listener.onPartitionsAssigned(newAssignment);
  16. }
  17. }

代码示例来源:origin: apache/storm

  1. Set<TopicPartition> assignment = consumer.assignment();
  2. if (!isAtLeastOnceProcessing()) {
  3. return new PollablePartitionsInfo(assignment, Collections.emptyMap());

代码示例来源:origin: apache/storm

  1. private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
  2. doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
  3. Set<TopicPartition> pausedPartitions = new HashSet<>(consumer.assignment());
  4. pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
  5. try {
  6. consumer.pause(pausedPartitions);
  7. final ConsumerRecords<K, V> consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
  8. ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
  9. final int numPolledRecords = consumerRecords.count();
  10. LOG.debug("Polled [{}] records from Kafka",
  11. numPolledRecords);
  12. if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
  13. //Commit polled records immediately to ensure delivery is at-most-once.
  14. Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
  15. createFetchedOffsetsMetadata(consumer.assignment());
  16. consumer.commitSync(offsetsToCommit);
  17. LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
  18. }
  19. return consumerRecords;
  20. } finally {
  21. consumer.resume(pausedPartitions);
  22. }
  23. }

代码示例来源:origin: apache/nifi

  1. /**
  2. * Execute poll using pause API just for sending heartbeat, not polling messages.
  3. */
  4. void retainConnection() {
  5. pollingLock.lock();
  6. TopicPartition[] assignments = null;
  7. try {
  8. final Set<TopicPartition> assignmentSet = kafkaConsumer.assignment();
  9. if (assignmentSet.isEmpty()) {
  10. return;
  11. }
  12. if (logger.isDebugEnabled()) {
  13. logger.debug("Pausing " + assignmentSet);
  14. }
  15. assignments = assignmentSet.toArray(new TopicPartition[assignmentSet.size()]);
  16. kafkaConsumer.pause(assignments);
  17. kafkaConsumer.poll(0);
  18. if (logger.isDebugEnabled()) {
  19. logger.debug("Resuming " + assignments);
  20. }
  21. } finally {
  22. try {
  23. if (assignments != null) {
  24. kafkaConsumer.resume(assignments);
  25. }
  26. } finally {
  27. pollingLock.unlock();
  28. }
  29. }
  30. }

代码示例来源:origin: linkedin/cruise-control

  1. while (_metricConsumer.assignment().isEmpty()) {
  2. pollerCount++;
  3. _metricConsumer.poll(10);
  4. for (TopicPartition tp : _metricConsumer.assignment()) {
  5. timestampToSeek.put(tp, startTimeMs);
  6. Set<TopicPartition> assignment = new HashSet<>(_metricConsumer.assignment());
  7. Map<TopicPartition, Long> endOffsets = _metricConsumer.endOffsets(assignment);
  8. Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = _metricConsumer.offsetsForTimes(timestampToSeek);
  9. LOG.debug("Starting consuming from metrics reporter topic partitions {}.", _metricConsumer.assignment());
  10. _metricConsumer.assignment(), startTimeMs, endTimeMs, totalMetricsAdded);

代码示例来源:origin: apache/storm

  1. @Override
  2. public void nextTuple() {
  3. try {
  4. if (refreshAssignmentTimer.isExpiredResetOnTrue()) {
  5. refreshAssignment();
  6. }
  7. if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
  8. if (isAtLeastOnceProcessing()) {
  9. commitOffsetsForAckedTuples();
  10. } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
  11. Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
  12. createFetchedOffsetsMetadata(consumer.assignment());
  13. consumer.commitAsync(offsetsToCommit, null);
  14. LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
  15. }
  16. }
  17. PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
  18. if (pollablePartitionsInfo.shouldPoll()) {
  19. try {
  20. setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo));
  21. } catch (RetriableException e) {
  22. LOG.error("Failed to poll from kafka.", e);
  23. }
  24. }
  25. emitIfWaitingNotEmitted();
  26. } catch (InterruptException e) {
  27. throwKafkaConsumerInterruptedException();
  28. }
  29. }

代码示例来源:origin: apache/metron

  1. @Override
  2. public String getSampleMessage(final String topic) {
  3. String message = null;
  4. if (listTopics().contains(topic)) {
  5. try (Consumer<String, String> kafkaConsumer = kafkaConsumerFactory.createConsumer()) {
  6. kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
  7. .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
  8. .collect(Collectors.toList()));
  9. kafkaConsumer.assignment().stream()
  10. .filter(p -> (kafkaConsumer.position(p) - 1) >= 0)
  11. .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
  12. final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT);
  13. message = records.isEmpty() ? null : records.iterator().next().value();
  14. kafkaConsumer.unsubscribe();
  15. }
  16. }
  17. return message;
  18. }

代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client

  1. @Override
  2. public Set<TopicPartition> assignment() {
  3. return consumer.assignment();
  4. }

代码示例来源:origin: io.zipkin.brave/brave-instrumentation-kafka-clients

  1. @Override public Set<TopicPartition> assignment() {
  2. return delegate.assignment();
  3. }

代码示例来源:origin: rayokota/kafka-graphs

  1. @Override
  2. public Set<TopicPartition> assignment() {
  3. return kafkaConsumer.assignment();
  4. }

代码示例来源:origin: spring-projects/spring-kafka

  1. return null;
  2. }).given(consumer).commitSync(any(Map.class));
  3. given(consumer.assignment()).willReturn(records.keySet());
  4. final CountDownLatch pauseLatch = new CountDownLatch(2);
  5. willAnswer(i -> {

代码示例来源:origin: spring-projects/spring-kafka

  1. return null;
  2. }).given(consumer).commitSync(any(Map.class));
  3. given(consumer.assignment()).willReturn(records1.keySet());
  4. TopicPartitionInitialOffset[] topicPartitionOffset = new TopicPartitionInitialOffset[] {
  5. new TopicPartitionInitialOffset("foo", 0) };

代码示例来源:origin: com.cerner.common.kafka/common-kafka

  1. private Set<TopicPartition> getAssignedPartitions() {
  2. Set<TopicPartition> assignedPartitions = consumer.assignment();
  3. if (assignedPartitions.isEmpty()) {
  4. // Polling with an immediate timeout will initialize the assignments for a fresh consumer.
  5. pollRecords(0L);
  6. assignedPartitions = consumer.assignment();
  7. }
  8. return assignedPartitions;
  9. }

代码示例来源:origin: rayokota/kafka-graphs

  1. private static Set<TopicPartition> localPartitions(Consumer<byte[], byte[]> consumer, String topic) {
  2. Set<TopicPartition> result = new HashSet<>();
  3. Set<TopicPartition> assignment = consumer.assignment();
  4. for (TopicPartition tp : assignment) {
  5. if (tp.topic().equals(topic)) {
  6. result.add(tp);
  7. }
  8. }
  9. return result;
  10. }

代码示例来源:origin: vert-x3/vertx-kafka-client

  1. @Override
  2. public KafkaReadStream<K, V> assignment(Handler<AsyncResult<Set<TopicPartition>>> handler) {
  3. this.submitTask((consumer, future) -> {
  4. Set<TopicPartition> partitions = consumer.assignment();
  5. if (future != null) {
  6. future.complete(partitions);
  7. }
  8. }, handler);
  9. return this;
  10. }

代码示例来源:origin: linkedin/li-apache-kafka-clients

  1. @Override
  2. public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
  3. Set<String> newSubscription = new HashSet<>(topics);
  4. // TODO: This is a hot fix for KAFKA-3664 and should be removed after the issue is fixed.
  5. commitSync();
  6. for (TopicPartition tp : _kafkaConsumer.assignment()) {
  7. if (!newSubscription.contains(tp.topic())) {
  8. _consumerRecordsProcessor.clear(tp);
  9. }
  10. }
  11. _consumerRebalanceListener.setUserListener(callback);
  12. _kafkaConsumer.subscribe(new ArrayList<>(topics), _consumerRebalanceListener);
  13. }

相关文章