org.apache.kafka.clients.producer.Producer.partitionsFor()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(6.4k)|赞(0)|评价(0)|浏览(237)

本文整理了Java中org.apache.kafka.clients.producer.Producer.partitionsFor()方法的一些代码示例,展示了Producer.partitionsFor()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.partitionsFor()方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:partitionsFor

Producer.partitionsFor介绍

[英]See KafkaProducer#partitionsFor(String)
[中]参见《卡夫卡制作人》#部分(字符串)

代码示例

代码示例来源:origin: apache/kylin

  1. public void tryFetchMetadataFor(String topic) {
  2. producer.partitionsFor(topic);
  3. }

代码示例来源:origin: openzipkin/brave

  1. @Override public List<PartitionInfo> partitionsFor(String topic) {
  2. return delegate.partitionsFor(topic);
  3. }

代码示例来源:origin: apache/flink

  1. private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
  2. // the fetched list is immutable, so we're creating a mutable copy in order to sort it
  3. List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
  4. // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
  5. Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
  6. @Override
  7. public int compare(PartitionInfo o1, PartitionInfo o2) {
  8. return Integer.compare(o1.partition(), o2.partition());
  9. }
  10. });
  11. int[] partitions = new int[partitionsList.size()];
  12. for (int i = 0; i < partitions.length; i++) {
  13. partitions[i] = partitionsList.get(i).partition();
  14. }
  15. return partitions;
  16. }

代码示例来源:origin: apache/flink

  1. private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
  2. // the fetched list is immutable, so we're creating a mutable copy in order to sort it
  3. List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
  4. // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
  5. Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
  6. @Override
  7. public int compare(PartitionInfo o1, PartitionInfo o2) {
  8. return Integer.compare(o1.partition(), o2.partition());
  9. }
  10. });
  11. int[] partitions = new int[partitionsList.size()];
  12. for (int i = 0; i < partitions.length; i++) {
  13. partitions[i] = partitionsList.get(i).partition();
  14. }
  15. return partitions;
  16. }

代码示例来源:origin: QNJR-GROUP/EasyTransaction

  1. public int calcMessagePartition(String kafkaTopic, TransactionId trxId) {
  2. List<PartitionInfo> partitionMetaData = kafkaProducer.partitionsFor(kafkaTopic);
  3. int partitionSize = partitionMetaData.size();
  4. int partition = Math.abs(trxId.hashCode() % partitionSize);
  5. return partition;
  6. }

代码示例来源:origin: apache/nifi

  1. /**
  2. *
  3. */
  4. private int getPartition(Object key, String topicName) {
  5. if (this.partitioner != null) {
  6. int partSize = this.kafkaProducer.partitionsFor(topicName).size();
  7. return this.partitioner.partition(key, partSize);
  8. }
  9. return 0;
  10. }
  11. }

代码示例来源:origin: spring-projects/spring-kafka

  1. @Override
  2. public List<PartitionInfo> partitionsFor(String topic) {
  3. return this.delegate.partitionsFor(topic);
  4. }

代码示例来源:origin: spring-projects/spring-kafka

  1. @Override
  2. public List<PartitionInfo> partitionsFor(String topic) {
  3. Producer<K, V> producer = getTheProducer();
  4. try {
  5. return producer.partitionsFor(topic);
  6. }
  7. finally {
  8. closeProducer(producer, inTransaction());
  9. }
  10. }

代码示例来源:origin: com.github.combinedmq/combinedmq

  1. @Override
  2. public List<PartitionInfo> partitionsFor(String topic) {
  3. return producer.partitionsFor(topic);
  4. }

代码示例来源:origin: org.axonframework/axon-kafka

  1. @Override
  2. public List<PartitionInfo> partitionsFor(String topic) {
  3. return this.delegate.partitionsFor(topic);
  4. }

代码示例来源:origin: eventuate-local/eventuate-local

  1. public List<PartitionInfo> partitionsFor(String topic) {
  2. return producer.partitionsFor(topic);
  3. }

代码示例来源:origin: org.axonframework.extensions.kafka/axon-kafka

  1. @Override
  2. public List<PartitionInfo> partitionsFor(String topic) {
  3. return this.delegate.partitionsFor(topic);
  4. }

代码示例来源:origin: org.apache.nifi/nifi-kafka-processors

  1. /**
  2. *
  3. */
  4. private int getPartition(Object key, String topicName) {
  5. if (this.partitioner != null) {
  6. int partSize = this.kafkaProducer.partitionsFor(topicName).size();
  7. return this.partitioner.partition(key, partSize);
  8. }
  9. return 0;
  10. }
  11. }

代码示例来源:origin: org.apache.nifi/nifi-kafka-0-8-processors

  1. /**
  2. *
  3. */
  4. private int getPartition(Object key, String topicName) {
  5. if (this.partitioner != null) {
  6. int partSize = this.kafkaProducer.partitionsFor(topicName).size();
  7. return this.partitioner.partition(key, partSize);
  8. }
  9. return 0;
  10. }
  11. }

代码示例来源:origin: mysql-time-machine/replicator

  1. private int getTotalPartitions() {
  2. try (Producer<byte[], byte[]> producer = this.getProducer()) {
  3. return producer.partitionsFor(this.topic).stream().mapToInt(PartitionInfo::partition).max().orElseThrow(() -> new InvalidPartitionsException("partitions not found")) + 1;
  4. }
  5. }

代码示例来源:origin: allegro/hermes

  1. @Override
  2. public boolean isTopicAvailable(CachedTopic cachedTopic) {
  3. String kafkaTopicName = cachedTopic.getKafkaTopics().getPrimary().name().asString();
  4. try {
  5. if (producers.get(cachedTopic.getTopic()).partitionsFor(kafkaTopicName).size() > 0) {
  6. return true;
  7. }
  8. } catch (Exception e) {
  9. logger.warn("Could not read information about partitions for topic {}. {}", kafkaTopicName, e.getMessage());
  10. return false;
  11. }
  12. logger.warn("No information about partitions for topic {}", kafkaTopicName);
  13. return false;
  14. }

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka11

  1. @Override
  2. public Collection<PartitionInfo> call() throws Exception {
  3. Producer<byte[], byte[]> producer = producerFB.createProducer();
  4. List<PartitionInfo> partitionsFor = producer.partitionsFor(destination.getName());
  5. producer.close();
  6. ((DisposableBean) producerFB).destroy();
  7. return partitionsFor;
  8. }

代码示例来源:origin: reactor/reactor-kafka

  1. /**
  2. * Tests invocation of methods on KafkaProducer using {@link KafkaSender#doOnProducer(java.util.function.Function)}
  3. */
  4. @Test
  5. public void producerMethods() {
  6. testProducerMethod(p -> assertEquals(0, p.metrics().size()));
  7. testProducerMethod(p -> assertEquals(2, p.partitionsFor(topic).size()));
  8. testProducerMethod(p -> p.flush());
  9. }

代码示例来源:origin: reactor/reactor-kafka

  1. /**
  2. * Tests {@link KafkaProducer#partitionsFor(String)} error path.
  3. */
  4. @Test
  5. public void partitionsForNonExistentTopic() {
  6. sender = new DefaultKafkaSender<>(producerFactory, SenderOptions.create());
  7. StepVerifier.create(sender.doOnProducer(producer -> producer.partitionsFor("nonexistent")))
  8. .expectError(InvalidTopicException.class)
  9. .verify(Duration.ofMillis(DEFAULT_TEST_TIMEOUT));
  10. }

代码示例来源:origin: reactor/reactor-kafka

  1. /**
  2. * Tests {@link KafkaProducer#partitionsFor(String)} good path.
  3. */
  4. @Test
  5. public void partitionsFor() {
  6. sender = new DefaultKafkaSender<>(producerFactory, SenderOptions.create());
  7. StepVerifier.create(sender.doOnProducer(producer -> producer.partitionsFor(topic)))
  8. .expectNext(cluster.cluster().partitionsForTopic(topic))
  9. .expectComplete()
  10. .verify(Duration.ofMillis(DEFAULT_TEST_TIMEOUT));
  11. }

相关文章