本文整理了Java中org.apache.kafka.clients.consumer.Consumer.partitionsFor()
方法的一些代码示例,展示了Consumer.partitionsFor()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.partitionsFor()
方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:partitionsFor
暂无
代码示例来源:origin: openzipkin/brave
@Override public List<PartitionInfo> partitionsFor(String topic) {
return delegate.partitionsFor(topic);
}
代码示例来源:origin: openzipkin/brave
public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
return delegate.partitionsFor(topic, timeout);
}
代码示例来源:origin: apache/storm
@Override
public Set<TopicPartition> getAllSubscribedPartitions(Consumer<?, ?> consumer) {
Set<TopicPartition> allPartitions = new HashSet<>();
for (String topic : topics) {
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
if (partitionInfoList != null) {
for (PartitionInfo partitionInfo : partitionInfoList) {
allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
} else {
LOG.warn("Topic {} not found, skipping addition of the topic", topic);
}
}
return allPartitions;
}
代码示例来源:origin: apache/incubator-gobblin
@Override
public List<WorkUnit> getWorkunits(SourceState state) {
Config config = ConfigUtils.propertiesToConfig(state.getProperties());
Consumer<String, byte[]> consumer = getKafkaConsumer(config);
LOG.debug("Consumer is {}", consumer);
String topic = ConfigUtils.getString(config, TOPIC_WHITELIST,
StringUtils.EMPTY); // TODO: fix this to use the new API when KafkaWrapper is fixed
List<WorkUnit> workUnits = new ArrayList<WorkUnit>();
List<PartitionInfo> topicPartitions;
topicPartitions = consumer.partitionsFor(topic);
LOG.info("Partition count is {}", topicPartitions.size());
for (PartitionInfo topicPartition : topicPartitions) {
Extract extract = this.createExtract(DEFAULT_TABLE_TYPE, DEFAULT_NAMESPACE_NAME, topicPartition.topic());
LOG.info("Partition info is {}", topicPartition);
WorkUnit workUnit = WorkUnit.create(extract);
setTopicNameInState(workUnit, topicPartition.topic());
workUnit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, topicPartition.topic());
setPartitionId(workUnit, topicPartition.partition());
workUnits.add(workUnit);
}
return workUnits;
}
代码示例来源:origin: apache/metron
@Override
public String getSampleMessage(final String topic) {
String message = null;
if (listTopics().contains(topic)) {
try (Consumer<String, String> kafkaConsumer = kafkaConsumerFactory.createConsumer()) {
kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
.map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
kafkaConsumer.assignment().stream()
.filter(p -> (kafkaConsumer.position(p) - 1) >= 0)
.forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT);
message = records.isEmpty() ? null : records.iterator().next().value();
kafkaConsumer.unsubscribe();
}
}
return message;
}
代码示例来源:origin: apache/metron
@Override
public KafkaTopic getTopic(final String name) {
KafkaTopic kafkaTopic = null;
if (listTopics().contains(name)) {
try (Consumer<String, String> consumer = kafkaConsumerFactory.createConsumer()) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(name);
if (partitionInfos.size() > 0) {
final PartitionInfo partitionInfo = partitionInfos.get(0);
kafkaTopic = new KafkaTopic();
kafkaTopic.setName(name);
kafkaTopic.setNumPartitions(partitionInfos.size());
kafkaTopic.setReplicationFactor(partitionInfo.replicas().length);
}
}
}
return kafkaTopic;
}
代码示例来源:origin: spring-projects/spring-kafka
protected void checkTopics() {
if (this.containerProperties.isMissingTopicsFatal() && this.containerProperties.getTopicPattern() == null) {
try (Consumer<K, V> consumer =
this.consumerFactory.createConsumer(this.containerProperties.getGroupId(),
this.containerProperties.getClientId(), null)) {
if (consumer != null) {
String[] topics = this.containerProperties.getTopics();
if (topics == null) {
topics = Arrays.stream(this.containerProperties.getTopicPartitions())
.map(TopicPartitionInitialOffset::topic)
.toArray(String[]::new);
}
List<String> missing = new ArrayList<>();
for (String topic : topics) {
if (consumer.partitionsFor(topic) == null) {
missing.add(topic);
}
}
if (missing.size() > 0) {
throw new IllegalStateException(
"Topic(s) " + missing.toString()
+ " is/are not present and missingTopicsFatal is true");
}
}
}
}
}
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testAddingTopics() {
int count = embeddedKafka.getTopics().size();
embeddedKafka.addTopics("testAddingTopics");
assertThat(embeddedKafka.getTopics().size()).isEqualTo(count + 1);
embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1));
assertThat(embeddedKafka.getTopics().size()).isEqualTo(count + 2);
assertThatThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1)))
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("exists");
assertThatThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions2", 10, (short) 2)))
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("replication");
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testMultiReplying");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
assertThat(consumer.partitionsFor("morePartitions")).hasSize(10);
consumer.close();
}
代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client
@Override
public List<PartitionInfo> partitionsFor(String topic) {
return consumer.partitionsFor(topic);
}
代码示例来源:origin: opentracing-contrib/java-kafka-client
@Override
public List<PartitionInfo> partitionsFor(String topic) {
return consumer.partitionsFor(topic);
}
代码示例来源:origin: apache/samza
Map<String, List<PartitionInfo>> getTopicMetadata(Set<String> topics) {
Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap();
List<PartitionInfo> partitionInfoList;
for (String topic : topics) {
partitionInfoList = metadataConsumer.partitionsFor(topic);
streamToPartitionsInfo.put(topic, partitionInfoList);
}
return streamToPartitionsInfo;
}
代码示例来源:origin: org.apache.samza/samza-kafka
Map<String, List<PartitionInfo>> getTopicMetadata(Set<String> topics) {
Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap();
List<PartitionInfo> partitionInfoList;
for (String topic : topics) {
partitionInfoList = metadataConsumer.partitionsFor(topic);
streamToPartitionsInfo.put(topic, partitionInfoList);
}
return streamToPartitionsInfo;
}
代码示例来源:origin: vert-x3/vertx-kafka-client
@Override
public KafkaReadStreamImpl<K, V> partitionsFor(String topic, Handler<AsyncResult<List<PartitionInfo>>> handler) {
this.submitTask((consumer, future) -> {
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
if (future != null) {
future.complete(partitions);
}
}, handler);
return this;
}
代码示例来源:origin: Nepxion/Thunder
@SuppressWarnings("resource")
private int getPartitionIndex(Consumer<String, byte[]> consumer, String topic, String key) {
int partitionNumber = consumer.partitionsFor(topic).size();
StringSerializer keySerializer = new StringSerializer();
byte[] serializedKey = keySerializer.serialize(topic, key);
int positive = Utils.murmur2(serializedKey) & 0x7fffffff;
return positive % partitionNumber;
}
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka11
@Override
public Collection<PartitionInfo> call() throws Exception {
Consumer<?, ?> consumer = consumerFactory.createConsumer();
List<PartitionInfo> partitionsFor = consumer.partitionsFor(destination.getName());
consumer.close();
return partitionsFor;
}
代码示例来源:origin: apache/samza
@Test(expected = SamzaException.class)
public void testGetSystemStreamMetadataShouldTerminateAfterFiniteRetriesOnException() {
when(mockKafkaConsumer.partitionsFor(VALID_TOPIC)).thenThrow(new RuntimeException())
.thenThrow(new RuntimeException())
.thenThrow(new RuntimeException())
.thenThrow(new RuntimeException())
.thenThrow(new RuntimeException());
kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(VALID_TOPIC));
}
代码示例来源:origin: apache/samza
@Test(expected = SamzaException.class)
public void testGetSystemStreamPartitionCountsShouldTerminateAfterFiniteRetriesOnException() throws Exception {
final Set<String> streamNames = ImmutableSet.of(VALID_TOPIC);
final long cacheTTL = 100L;
when(mockKafkaConsumer.partitionsFor(VALID_TOPIC)).thenThrow(new RuntimeException())
.thenThrow(new RuntimeException())
.thenThrow(new RuntimeException())
.thenThrow(new RuntimeException())
.thenThrow(new RuntimeException());
kafkaSystemAdmin.getSystemStreamPartitionCounts(streamNames, cacheTTL);
}
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka
private Collection<PartitionInfo> getPartitionInfo(String topic,
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
final ConsumerFactory<?, ?> consumerFactory, int partitionCount) {
return provisioningProvider.getPartitionsForTopic(partitionCount,
extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
() -> {
try (Consumer<?, ?> consumer = consumerFactory.createConsumer()) {
List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic);
return partitionsFor;
}
}, topic);
}
代码示例来源:origin: seznam/euphoria
public void tryGetPartitions(List<PartitionInfo> partitions) {
KafkaSource source = mock(KafkaSource.class);
Consumer<byte[], byte[]> consumer = mock(Consumer.class);
when(consumer.partitionsFor(any(String.class))).thenReturn(partitions);
when(source.newConsumer(any(), any(), any())).thenReturn(consumer);
when(source.getPartitions()).thenCallRealMethod();
source.getPartitions();
}
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka
private Collection<PartitionInfo> getPartitionInfo(String topic,
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
final ConsumerFactory<?, ?> consumerFactory, int partitionCount) {
return provisioningProvider.getPartitionsForTopic(partitionCount,
extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
() -> {
try (Consumer<?, ?> consumer = consumerFactory.createConsumer()) {
List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic);
return partitionsFor;
}
}, topic);
}
内容来源于网络,如有侵权,请联系作者删除!