org.apache.kafka.clients.consumer.Consumer.partitionsFor()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(379)

本文整理了Java中org.apache.kafka.clients.consumer.Consumer.partitionsFor()方法的一些代码示例,展示了Consumer.partitionsFor()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.partitionsFor()方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:partitionsFor

Consumer.partitionsFor介绍

暂无

代码示例

代码示例来源:origin: openzipkin/brave

@Override public List<PartitionInfo> partitionsFor(String topic) {
 return delegate.partitionsFor(topic);
}

代码示例来源:origin: openzipkin/brave

public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
 return delegate.partitionsFor(topic, timeout);
}

代码示例来源:origin: apache/storm

@Override
public Set<TopicPartition> getAllSubscribedPartitions(Consumer<?, ?> consumer) {
  Set<TopicPartition> allPartitions = new HashSet<>();
  for (String topic : topics) {
    List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
    if (partitionInfoList != null) {
      for (PartitionInfo partitionInfo : partitionInfoList) {
        allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
      }
    } else {
      LOG.warn("Topic {} not found, skipping addition of the topic", topic);
    }
  }
  return allPartitions;
}

代码示例来源:origin: apache/incubator-gobblin

@Override
public List<WorkUnit> getWorkunits(SourceState state) {
 Config config = ConfigUtils.propertiesToConfig(state.getProperties());
 Consumer<String, byte[]> consumer = getKafkaConsumer(config);
 LOG.debug("Consumer is {}", consumer);
 String topic = ConfigUtils.getString(config, TOPIC_WHITELIST,
   StringUtils.EMPTY); // TODO: fix this to use the new API when KafkaWrapper is fixed
 List<WorkUnit> workUnits = new ArrayList<WorkUnit>();
 List<PartitionInfo> topicPartitions;
 topicPartitions = consumer.partitionsFor(topic);
 LOG.info("Partition count is {}", topicPartitions.size());
 for (PartitionInfo topicPartition : topicPartitions) {
  Extract extract = this.createExtract(DEFAULT_TABLE_TYPE, DEFAULT_NAMESPACE_NAME, topicPartition.topic());
  LOG.info("Partition info is {}", topicPartition);
  WorkUnit workUnit = WorkUnit.create(extract);
  setTopicNameInState(workUnit, topicPartition.topic());
  workUnit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, topicPartition.topic());
  setPartitionId(workUnit, topicPartition.partition());
  workUnits.add(workUnit);
 }
 return workUnits;
}

代码示例来源:origin: apache/metron

@Override
public String getSampleMessage(final String topic) {
 String message = null;
 if (listTopics().contains(topic)) {
  try (Consumer<String, String> kafkaConsumer = kafkaConsumerFactory.createConsumer()) {
   kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
    .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
    .collect(Collectors.toList()));
   kafkaConsumer.assignment().stream()
    .filter(p -> (kafkaConsumer.position(p) - 1) >= 0)
    .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
   final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT);
   message = records.isEmpty() ? null : records.iterator().next().value();
   kafkaConsumer.unsubscribe();
  }
 }
 return message;
}

代码示例来源:origin: apache/metron

@Override
public KafkaTopic getTopic(final String name) {
 KafkaTopic kafkaTopic = null;
 if (listTopics().contains(name)) {
  try (Consumer<String, String> consumer = kafkaConsumerFactory.createConsumer()) {
   final List<PartitionInfo> partitionInfos = consumer.partitionsFor(name);
   if (partitionInfos.size() > 0) {
    final PartitionInfo partitionInfo = partitionInfos.get(0);
    kafkaTopic = new KafkaTopic();
    kafkaTopic.setName(name);
    kafkaTopic.setNumPartitions(partitionInfos.size());
    kafkaTopic.setReplicationFactor(partitionInfo.replicas().length);
   }
  }
 }
 return kafkaTopic;
}

代码示例来源:origin: spring-projects/spring-kafka

protected void checkTopics() {
  if (this.containerProperties.isMissingTopicsFatal() && this.containerProperties.getTopicPattern() == null) {
    try (Consumer<K, V> consumer =
        this.consumerFactory.createConsumer(this.containerProperties.getGroupId(),
            this.containerProperties.getClientId(), null)) {
      if (consumer != null) {
        String[] topics = this.containerProperties.getTopics();
        if (topics == null) {
          topics = Arrays.stream(this.containerProperties.getTopicPartitions())
              .map(TopicPartitionInitialOffset::topic)
              .toArray(String[]::new);
        }
        List<String> missing = new ArrayList<>();
        for (String topic : topics) {
          if (consumer.partitionsFor(topic) == null) {
            missing.add(topic);
          }
        }
        if (missing.size() > 0) {
          throw new IllegalStateException(
              "Topic(s) " + missing.toString()
                  + " is/are not present and missingTopicsFatal is true");
        }
      }
    }
  }
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testAddingTopics() {
  int count = embeddedKafka.getTopics().size();
  embeddedKafka.addTopics("testAddingTopics");
  assertThat(embeddedKafka.getTopics().size()).isEqualTo(count + 1);
  embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1));
  assertThat(embeddedKafka.getTopics().size()).isEqualTo(count + 2);
  assertThatThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1)))
    .isInstanceOf(IllegalArgumentException.class).hasMessageContaining("exists");
  assertThatThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions2", 10, (short) 2)))
    .isInstanceOf(IllegalArgumentException.class).hasMessageContaining("replication");
  Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
  consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testMultiReplying");
  ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
  Consumer<Integer, String> consumer = cf.createConsumer();
  assertThat(consumer.partitionsFor("morePartitions")).hasSize(10);
  consumer.close();
}

代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client

@Override
public List<PartitionInfo> partitionsFor(String topic) {
 return consumer.partitionsFor(topic);
}

代码示例来源:origin: opentracing-contrib/java-kafka-client

@Override
public List<PartitionInfo> partitionsFor(String topic) {
 return consumer.partitionsFor(topic);
}

代码示例来源:origin: apache/samza

Map<String, List<PartitionInfo>> getTopicMetadata(Set<String> topics) {
 Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap();
 List<PartitionInfo> partitionInfoList;
 for (String topic : topics) {
  partitionInfoList = metadataConsumer.partitionsFor(topic);
  streamToPartitionsInfo.put(topic, partitionInfoList);
 }
 return streamToPartitionsInfo;
}

代码示例来源:origin: org.apache.samza/samza-kafka

Map<String, List<PartitionInfo>> getTopicMetadata(Set<String> topics) {
 Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap();
 List<PartitionInfo> partitionInfoList;
 for (String topic : topics) {
  partitionInfoList = metadataConsumer.partitionsFor(topic);
  streamToPartitionsInfo.put(topic, partitionInfoList);
 }
 return streamToPartitionsInfo;
}

代码示例来源:origin: vert-x3/vertx-kafka-client

@Override
public KafkaReadStreamImpl<K, V> partitionsFor(String topic, Handler<AsyncResult<List<PartitionInfo>>> handler) {
 this.submitTask((consumer, future) -> {
  List<PartitionInfo> partitions = consumer.partitionsFor(topic);
  if (future != null) {
   future.complete(partitions);
  }
 }, handler);
 return this;
}

代码示例来源:origin: Nepxion/Thunder

@SuppressWarnings("resource")
  private int getPartitionIndex(Consumer<String, byte[]> consumer, String topic, String key) {
    int partitionNumber = consumer.partitionsFor(topic).size();

    StringSerializer keySerializer = new StringSerializer();
    byte[] serializedKey = keySerializer.serialize(topic, key);

    int positive = Utils.murmur2(serializedKey) & 0x7fffffff;

    return positive % partitionNumber;
  }
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka11

@Override
public Collection<PartitionInfo> call() throws Exception {
  Consumer<?, ?> consumer = consumerFactory.createConsumer();
  List<PartitionInfo> partitionsFor = consumer.partitionsFor(destination.getName());
  consumer.close();
  return partitionsFor;
}

代码示例来源:origin: apache/samza

@Test(expected = SamzaException.class)
public void testGetSystemStreamMetadataShouldTerminateAfterFiniteRetriesOnException() {
 when(mockKafkaConsumer.partitionsFor(VALID_TOPIC)).thenThrow(new RuntimeException())
   .thenThrow(new RuntimeException())
   .thenThrow(new RuntimeException())
   .thenThrow(new RuntimeException())
   .thenThrow(new RuntimeException());
 kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(VALID_TOPIC));
}

代码示例来源:origin: apache/samza

@Test(expected = SamzaException.class)
 public void testGetSystemStreamPartitionCountsShouldTerminateAfterFiniteRetriesOnException() throws Exception {
  final Set<String> streamNames = ImmutableSet.of(VALID_TOPIC);
  final long cacheTTL = 100L;

  when(mockKafkaConsumer.partitionsFor(VALID_TOPIC)).thenThrow(new RuntimeException())
    .thenThrow(new RuntimeException())
    .thenThrow(new RuntimeException())
    .thenThrow(new RuntimeException())
    .thenThrow(new RuntimeException());

  kafkaSystemAdmin.getSystemStreamPartitionCounts(streamNames, cacheTTL);
 }
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

private Collection<PartitionInfo> getPartitionInfo(String topic,
    final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
    final ConsumerFactory<?, ?> consumerFactory, int partitionCount) {
  return provisioningProvider.getPartitionsForTopic(partitionCount,
      extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
      () -> {
        try (Consumer<?, ?> consumer = consumerFactory.createConsumer()) {
          List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic);
          return partitionsFor;
        }
      }, topic);
}

代码示例来源:origin: seznam/euphoria

public void tryGetPartitions(List<PartitionInfo> partitions) {
  KafkaSource source = mock(KafkaSource.class);
  Consumer<byte[], byte[]> consumer = mock(Consumer.class);
  when(consumer.partitionsFor(any(String.class))).thenReturn(partitions);
  when(source.newConsumer(any(), any(), any())).thenReturn(consumer);
  when(source.getPartitions()).thenCallRealMethod();
  source.getPartitions();
 }
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka

private Collection<PartitionInfo> getPartitionInfo(String topic,
    final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
    final ConsumerFactory<?, ?> consumerFactory, int partitionCount) {
  return provisioningProvider.getPartitionsForTopic(partitionCount,
      extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
      () -> {
        try (Consumer<?, ?> consumer = consumerFactory.createConsumer()) {
          List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic);
          return partitionsFor;
        }
      }, topic);
}

相关文章