本文整理了Java中org.apache.kafka.clients.producer.Producer.partitionsFor()
方法的一些代码示例,展示了Producer.partitionsFor()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.partitionsFor()
方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:partitionsFor
[英]See KafkaProducer#partitionsFor(String)
[中]参见《卡夫卡制作人》#部分(字符串)
代码示例来源:origin: apache/kylin
public void tryFetchMetadataFor(String topic) {
producer.partitionsFor(topic);
}
代码示例来源:origin: openzipkin/brave
@Override public List<PartitionInfo> partitionsFor(String topic) {
return delegate.partitionsFor(topic);
}
代码示例来源:origin: apache/flink
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
@Override
public int compare(PartitionInfo o1, PartitionInfo o2) {
return Integer.compare(o1.partition(), o2.partition());
}
});
int[] partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
}
return partitions;
}
代码示例来源:origin: apache/flink
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
@Override
public int compare(PartitionInfo o1, PartitionInfo o2) {
return Integer.compare(o1.partition(), o2.partition());
}
});
int[] partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
}
return partitions;
}
代码示例来源:origin: QNJR-GROUP/EasyTransaction
public int calcMessagePartition(String kafkaTopic, TransactionId trxId) {
List<PartitionInfo> partitionMetaData = kafkaProducer.partitionsFor(kafkaTopic);
int partitionSize = partitionMetaData.size();
int partition = Math.abs(trxId.hashCode() % partitionSize);
return partition;
}
代码示例来源:origin: apache/nifi
/**
*
*/
private int getPartition(Object key, String topicName) {
if (this.partitioner != null) {
int partSize = this.kafkaProducer.partitionsFor(topicName).size();
return this.partitioner.partition(key, partSize);
}
return 0;
}
}
代码示例来源:origin: spring-projects/spring-kafka
@Override
public List<PartitionInfo> partitionsFor(String topic) {
return this.delegate.partitionsFor(topic);
}
代码示例来源:origin: spring-projects/spring-kafka
@Override
public List<PartitionInfo> partitionsFor(String topic) {
Producer<K, V> producer = getTheProducer();
try {
return producer.partitionsFor(topic);
}
finally {
closeProducer(producer, inTransaction());
}
}
代码示例来源:origin: com.github.combinedmq/combinedmq
@Override
public List<PartitionInfo> partitionsFor(String topic) {
return producer.partitionsFor(topic);
}
代码示例来源:origin: org.axonframework/axon-kafka
@Override
public List<PartitionInfo> partitionsFor(String topic) {
return this.delegate.partitionsFor(topic);
}
代码示例来源:origin: eventuate-local/eventuate-local
public List<PartitionInfo> partitionsFor(String topic) {
return producer.partitionsFor(topic);
}
代码示例来源:origin: org.axonframework.extensions.kafka/axon-kafka
@Override
public List<PartitionInfo> partitionsFor(String topic) {
return this.delegate.partitionsFor(topic);
}
代码示例来源:origin: org.apache.nifi/nifi-kafka-processors
/**
*
*/
private int getPartition(Object key, String topicName) {
if (this.partitioner != null) {
int partSize = this.kafkaProducer.partitionsFor(topicName).size();
return this.partitioner.partition(key, partSize);
}
return 0;
}
}
代码示例来源:origin: org.apache.nifi/nifi-kafka-0-8-processors
/**
*
*/
private int getPartition(Object key, String topicName) {
if (this.partitioner != null) {
int partSize = this.kafkaProducer.partitionsFor(topicName).size();
return this.partitioner.partition(key, partSize);
}
return 0;
}
}
代码示例来源:origin: mysql-time-machine/replicator
private int getTotalPartitions() {
try (Producer<byte[], byte[]> producer = this.getProducer()) {
return producer.partitionsFor(this.topic).stream().mapToInt(PartitionInfo::partition).max().orElseThrow(() -> new InvalidPartitionsException("partitions not found")) + 1;
}
}
代码示例来源:origin: allegro/hermes
@Override
public boolean isTopicAvailable(CachedTopic cachedTopic) {
String kafkaTopicName = cachedTopic.getKafkaTopics().getPrimary().name().asString();
try {
if (producers.get(cachedTopic.getTopic()).partitionsFor(kafkaTopicName).size() > 0) {
return true;
}
} catch (Exception e) {
logger.warn("Could not read information about partitions for topic {}. {}", kafkaTopicName, e.getMessage());
return false;
}
logger.warn("No information about partitions for topic {}", kafkaTopicName);
return false;
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka11
@Override
public Collection<PartitionInfo> call() throws Exception {
Producer<byte[], byte[]> producer = producerFB.createProducer();
List<PartitionInfo> partitionsFor = producer.partitionsFor(destination.getName());
producer.close();
((DisposableBean) producerFB).destroy();
return partitionsFor;
}
代码示例来源:origin: reactor/reactor-kafka
/**
* Tests invocation of methods on KafkaProducer using {@link KafkaSender#doOnProducer(java.util.function.Function)}
*/
@Test
public void producerMethods() {
testProducerMethod(p -> assertEquals(0, p.metrics().size()));
testProducerMethod(p -> assertEquals(2, p.partitionsFor(topic).size()));
testProducerMethod(p -> p.flush());
}
代码示例来源:origin: reactor/reactor-kafka
/**
* Tests {@link KafkaProducer#partitionsFor(String)} error path.
*/
@Test
public void partitionsForNonExistentTopic() {
sender = new DefaultKafkaSender<>(producerFactory, SenderOptions.create());
StepVerifier.create(sender.doOnProducer(producer -> producer.partitionsFor("nonexistent")))
.expectError(InvalidTopicException.class)
.verify(Duration.ofMillis(DEFAULT_TEST_TIMEOUT));
}
代码示例来源:origin: reactor/reactor-kafka
/**
* Tests {@link KafkaProducer#partitionsFor(String)} good path.
*/
@Test
public void partitionsFor() {
sender = new DefaultKafkaSender<>(producerFactory, SenderOptions.create());
StepVerifier.create(sender.doOnProducer(producer -> producer.partitionsFor(topic)))
.expectNext(cluster.cluster().partitionsForTopic(topic))
.expectComplete()
.verify(Duration.ofMillis(DEFAULT_TEST_TIMEOUT));
}
内容来源于网络,如有侵权,请联系作者删除!