org.apache.kafka.clients.producer.Producer类的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(8.9k)|赞(0)|评价(0)|浏览(294)

本文整理了Java中org.apache.kafka.clients.producer.Producer类的一些代码示例,展示了Producer类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer类的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer

Producer介绍

[英]The interface for the KafkaProducer
[中]卡夫卡制作人的界面

代码示例

代码示例来源:origin: QNJR-GROUP/EasyTransaction

  1. public Future<RecordMetadata> publishKafkaMessage(ProducerRecord<String,byte[]> record){
  2. return kafkaProducer.send(record);
  3. }
  4. }

代码示例来源:origin: apache/storm

  1. @Override
  2. public void cleanup() {
  3. producer.close();
  4. }

代码示例来源:origin: uber-common/jvm-profiler

  1. @Override
  2. public void close() {
  3. synchronized (this) {
  4. if (producer == null) {
  5. return;
  6. }
  7. producer.flush();
  8. producer.close();
  9. producer = null;
  10. }
  11. }

代码示例来源:origin: apache/nifi

  1. void beginTransaction() {
  2. if (!useTransactions) {
  3. return;
  4. }
  5. if (!transactionsInitialized) {
  6. producer.initTransactions();
  7. transactionsInitialized = true;
  8. }
  9. producer.beginTransaction();
  10. activeTransaction = true;
  11. }

代码示例来源:origin: spring-projects/spring-kafka

  1. @SuppressWarnings("unchecked")
  2. Producer<Object, Object> producer2 = mock(Producer.class);
  3. producer1.initTransactions();
  4. AtomicBoolean first = new AtomicBoolean(true);
  5. inOrder.verify(producer1).beginTransaction();
  6. inOrder.verify(producer2).beginTransaction();
  7. inOrder.verify(producer2).commitTransaction();
  8. inOrder.verify(producer2).close();
  9. inOrder.verify(producer1).commitTransaction();
  10. inOrder.verify(producer1).close();

代码示例来源:origin: apache/kafka

  1. try {
  2. Future<?> future = executor.submit(() -> {
  3. producer.send(new ProducerRecord<>("topic", "key", "value"));
  4. try {
  5. producer.close();
  6. fail("Close should block and throw.");
  7. } catch (Exception e) {

代码示例来源:origin: uber-common/jvm-profiler

  1. @Override
  2. public void report(String profilerName, Map<String, Object> metrics) {
  3. ensureProducer();
  4. String topicName = getTopic(profilerName);
  5. String str = JsonUtils.serialize(metrics);
  6. byte[] message = str.getBytes(StandardCharsets.UTF_8);
  7. Future<RecordMetadata> future = producer.send(
  8. new ProducerRecord<String, byte[]>(topicName, message));
  9. if (syncMode) {
  10. producer.flush();
  11. try {
  12. future.get();
  13. } catch (InterruptedException | ExecutionException e) {
  14. throw new RuntimeException(e);
  15. }
  16. }
  17. }

代码示例来源:origin: confluentinc/kafka-streams-examples

  1. /**
  2. * @param topic Kafka topic to write the data records to
  3. * @param records Data records to write to Kafka
  4. * @param producerConfig Kafka producer configuration
  5. * @param <K> Key type of the data records
  6. * @param <V> Value type of the data records
  7. */
  8. public static <K, V> void produceKeyValuesSynchronously(
  9. String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
  10. throws ExecutionException, InterruptedException {
  11. Producer<K, V> producer = new KafkaProducer<>(producerConfig);
  12. for (KeyValue<K, V> record : records) {
  13. Future<RecordMetadata> f = producer.send(
  14. new ProducerRecord<>(topic, record.key, record.value));
  15. f.get();
  16. }
  17. producer.flush();
  18. producer.close();
  19. }

代码示例来源:origin: apache/kafka

  1. @Test(expected = KafkaException.class)
  2. public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
  3. Map<String, Object> configs = new HashMap<>();
  4. configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
  5. configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
  6. configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
  7. Time time = new MockTime();
  8. MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
  9. Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
  10. metadata.update(initialUpdateResponse, time.milliseconds());
  11. MockClient client = new MockClient(time, metadata);
  12. Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
  13. metadata, client, null, time);
  14. try {
  15. producer.initTransactions();
  16. } catch (TimeoutException e) {
  17. // expected
  18. }
  19. // other transactional operations should not be allowed if we catch the error after initTransactions failed
  20. try {
  21. producer.beginTransaction();
  22. } finally {
  23. producer.close(Duration.ofMillis(0));
  24. }
  25. }

代码示例来源:origin: apache/incubator-druid

  1. @Override
  2. public void flush()
  3. {
  4. producer.flush();
  5. }

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka11

  1. @Override
  2. public Collection<PartitionInfo> call() throws Exception {
  3. Producer<byte[], byte[]> producer = producerFB.createProducer();
  4. List<PartitionInfo> partitionsFor = producer.partitionsFor(destination.getName());
  5. producer.close();
  6. ((DisposableBean) producerFB).destroy();
  7. return partitionsFor;
  8. }

代码示例来源:origin: apache/nifi

  1. public PublishResult complete() {
  2. if (tracker == null) {
  3. if (messagesSent.get() == 0L) {
  4. return PublishResult.EMPTY;
  5. }
  6. rollback();
  7. throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  8. }
  9. producer.flush();
  10. if (activeTransaction) {
  11. producer.commitTransaction();
  12. activeTransaction = false;
  13. }
  14. try {
  15. tracker.awaitCompletion(maxAckWaitMillis);
  16. return tracker.createPublishResult();
  17. } catch (final InterruptedException e) {
  18. logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
  19. Thread.currentThread().interrupt();
  20. return tracker.failOutstanding(e);
  21. } catch (final TimeoutException e) {
  22. logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
  23. return tracker.failOutstanding(e);
  24. } finally {
  25. tracker = null;
  26. }
  27. }

代码示例来源:origin: apache/kylin

  1. public void tryFetchMetadataFor(String topic) {
  2. producer.partitionsFor(topic);
  3. }

代码示例来源:origin: reactor/reactor-kafka

  1. /**
  2. * Tests invocation of methods on KafkaProducer using {@link KafkaSender#doOnProducer(java.util.function.Function)}
  3. */
  4. @Test
  5. public void producerMethods() {
  6. testProducerMethod(p -> assertEquals(0, p.metrics().size()));
  7. testProducerMethod(p -> assertEquals(2, p.partitionsFor(topic).size()));
  8. testProducerMethod(p -> p.flush());
  9. }

代码示例来源:origin: openzipkin/brave

  1. @Override public void initTransactions() {
  2. delegate.initTransactions();
  3. }

代码示例来源:origin: spring-projects/spring-kafka

  1. @Test
  2. public void testDeadLetterPublisherWhileTransactionActive() {
  3. @SuppressWarnings("unchecked")
  4. Producer<Object, Object> producer1 = mock(Producer.class);
  5. @SuppressWarnings("unchecked")
  6. Producer<Object, Object> producer2 = mock(Producer.class);
  7. producer1.initTransactions();
  8. @SuppressWarnings("unchecked")
  9. ProducerFactory<Object, Object> pf = mock(ProducerFactory.class);
  10. given(pf.transactionCapable()).willReturn(true);
  11. given(pf.createProducer()).willReturn(producer1).willReturn(producer2);
  12. KafkaTemplate<Object, Object> template = spy(new KafkaTemplate<>(pf));
  13. template.setDefaultTopic(STRING_KEY_TOPIC);
  14. KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
  15. new TransactionTemplate(tm).execute(s -> {
  16. new DeadLetterPublishingRecoverer(template).accept(
  17. new ConsumerRecord<>(STRING_KEY_TOPIC, 0, 0L, "key", "foo"),
  18. new RuntimeException("foo"));
  19. return null;
  20. });
  21. verify(producer1).beginTransaction();
  22. verify(producer1).commitTransaction();
  23. verify(producer1).close();
  24. verify(producer2, never()).beginTransaction();
  25. verify(template, never()).executeInTransaction(any());
  26. }

代码示例来源:origin: apache/kafka

  1. producer.send(new ProducerRecord<>(topicName, "key", "value"));
  2. fail();
  3. } catch (Exception e) {
  4. producer.close(Duration.ofMillis(0));
  5. TestUtils.waitForCondition(() -> sendException.get() != null, "No producer exception within timeout");
  6. assertEquals(KafkaException.class, sendException.get().getClass());

代码示例来源:origin: linkedin/cruise-control

  1. final AtomicInteger metricSampleCount = new AtomicInteger(0);
  2. for (PartitionMetricSample sample : samples.partitionMetricSamples()) {
  3. _producer.send(new ProducerRecord<>(_partitionMetricSampleStoreTopic, null, sample.sampleTime(), null, sample.toBytes()),
  4. new Callback() {
  5. @Override
  6. _producer.send(new ProducerRecord<>(_brokerMetricSampleStoreTopic, sample.toBytes()),
  7. new Callback() {
  8. @Override
  9. _producer.flush();
  10. if (LOG.isDebugEnabled()) {
  11. LOG.debug("Stored {} partition metric samples and {} broker metric samples to Kafka",

代码示例来源:origin: apache/incubator-gobblin

  1. @Override
  2. public void flush()
  3. throws IOException {
  4. this.producer.flush();
  5. }

代码示例来源:origin: apache/nifi

  1. public PublishResult complete() {
  2. if (tracker == null) {
  3. if (messagesSent.get() == 0L) {
  4. return PublishResult.EMPTY;
  5. }
  6. rollback();
  7. throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  8. }
  9. producer.flush();
  10. if (activeTransaction) {
  11. producer.commitTransaction();
  12. activeTransaction = false;
  13. }
  14. try {
  15. tracker.awaitCompletion(maxAckWaitMillis);
  16. return tracker.createPublishResult();
  17. } catch (final InterruptedException e) {
  18. logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
  19. Thread.currentThread().interrupt();
  20. return tracker.failOutstanding(e);
  21. } catch (final TimeoutException e) {
  22. logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
  23. return tracker.failOutstanding(e);
  24. } finally {
  25. tracker = null;
  26. }
  27. }

相关文章