org.apache.kafka.clients.producer.Producer.close()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(5.9k)|赞(0)|评价(0)|浏览(377)

本文整理了Java中org.apache.kafka.clients.producer.Producer.close()方法的一些代码示例,展示了Producer.close()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.close()方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:close

Producer.close介绍

[英]Close this producer
[中]关闭这个制作人

代码示例

代码示例来源:origin: apache/storm

  1. @Override
  2. public void cleanup() {
  3. producer.close();
  4. }

代码示例来源:origin: OryxProject/oryx

  1. @Override
  2. public synchronized void close() {
  3. if (producer != null) {
  4. producer.close();
  5. }
  6. }

代码示例来源:origin: line/armeria

  1. @Override
  2. protected void close() {
  3. if (needToCloseProducer) {
  4. producer.close();
  5. }
  6. }
  7. }

代码示例来源:origin: OryxProject/oryx

  1. @Override
  2. public synchronized void close() {
  3. if (producer != null) {
  4. producer.close();
  5. }
  6. }

代码示例来源:origin: apache/nifi

  1. /**
  2. * Will close the underlying {@link KafkaProducer}
  3. */
  4. @Override
  5. public void close() {
  6. this.kafkaProducer.close();
  7. }

代码示例来源:origin: jmxtrans/jmxtrans

  1. @Override
  2. public void close() {
  3. producer.close();
  4. }
  5. }

代码示例来源:origin: jmxtrans/jmxtrans

  1. @Override
  2. public void close() {
  3. producer.close();
  4. }
  5. }

代码示例来源:origin: apache/kylin

  1. public void close() {
  2. producer.close();
  3. }
  4. }

代码示例来源:origin: apache/kafka

  1. /**
  2. * See {@link KafkaProducer#close(Duration)}
  3. */
  4. void close(Duration timeout);
  5. }

代码示例来源:origin: alibaba/canal

  1. @Override
  2. public void stop() {
  3. try {
  4. logger.info("## stop the kafka producer");
  5. if (producer != null) {
  6. producer.close();
  7. }
  8. if (producer2 != null) {
  9. producer2.close();
  10. }
  11. } catch (Throwable e) {
  12. logger.warn("##something goes wrong when stopping kafka producer:", e);
  13. } finally {
  14. logger.info("## kafka producer is down.");
  15. }
  16. }

代码示例来源:origin: apache/incubator-gobblin

  1. @Override
  2. public void close()
  3. throws IOException {
  4. log.debug("Close called");
  5. this.producer.close();
  6. }

代码示例来源:origin: apache/incubator-druid

  1. @Override
  2. @LifecycleStop
  3. public void close()
  4. {
  5. scheduler.shutdownNow();
  6. producer.close();
  7. }
  8. }

代码示例来源:origin: confluentinc/ksql

  1. public void close() {
  2. commandConsumer.wakeup();
  3. commandConsumer.close();
  4. commandProducer.close();
  5. }
  6. }

代码示例来源:origin: apache/kafka

  1. @Test
  2. public void closeShouldBeIdempotent() {
  3. Properties producerProps = new Properties();
  4. producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
  5. Producer producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
  6. producer.close();
  7. producer.close();
  8. }

代码示例来源:origin: confluentinc/ksql

  1. @Test
  2. public void shouldCloseAllResources() {
  3. // When:
  4. commandTopic.close();
  5. //Then:
  6. final InOrder ordered = inOrder(commandConsumer);
  7. ordered.verify(commandConsumer).wakeup();
  8. ordered.verify(commandConsumer).close();
  9. verify(commandProducer).close();
  10. }

代码示例来源:origin: line/armeria

  1. @Test
  2. public void closeProducerWhenRequested() {
  3. final KafkaAccessLogWriter<String, String> service =
  4. new KafkaAccessLogWriter<>(producer, TOPIC_NAME, log -> "");
  5. service.shutdown().join();
  6. verify(producer, times(1)).close();
  7. }
  8. }

代码示例来源:origin: line/armeria

  1. @Test
  2. public void testDoNotCloseProducerWhenNotRequested() {
  3. final KafkaStructuredLoggingServiceExposed service =
  4. new KafkaStructuredLoggingServiceExposed(producer, null, false);
  5. service.close();
  6. verify(producer, times(0)).close();
  7. }
  8. }

代码示例来源:origin: line/armeria

  1. @Test
  2. public void testCloseProducerWhenRequested() {
  3. final KafkaStructuredLoggingServiceExposed service =
  4. new KafkaStructuredLoggingServiceExposed(producer, null, true);
  5. service.close();
  6. verify(producer, times(1)).close();
  7. }

代码示例来源:origin: apache/kafka

  1. @Test(expected = KafkaException.class)
  2. public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
  3. Map<String, Object> configs = new HashMap<>();
  4. configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
  5. configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
  6. configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
  7. Time time = new MockTime();
  8. MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
  9. Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
  10. metadata.update(initialUpdateResponse, time.milliseconds());
  11. MockClient client = new MockClient(time, metadata);
  12. Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
  13. metadata, client, null, time);
  14. try {
  15. producer.initTransactions();
  16. } catch (TimeoutException e) {
  17. // expected
  18. }
  19. // other transactional operations should not be allowed if we catch the error after initTransactions failed
  20. try {
  21. producer.beginTransaction();
  22. } finally {
  23. producer.close(Duration.ofMillis(0));
  24. }
  25. }

代码示例来源:origin: apache/kafka

  1. @Test
  2. public void testSendToInvalidTopic() throws Exception {
  3. Map<String, Object> configs = new HashMap<>();
  4. configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
  5. configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
  6. Time time = new MockTime();
  7. MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, emptyMap());
  8. Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
  9. metadata.update(initialUpdateResponse, time.milliseconds());
  10. MockClient client = new MockClient(time, metadata);
  11. Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
  12. metadata, client, null, time);
  13. String invalidTopicName = "topic abc"; // Invalid topic name due to space
  14. ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
  15. List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
  16. topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
  17. invalidTopicName, false, Collections.emptyList()));
  18. MetadataResponse updateResponse = new MetadataResponse(
  19. new ArrayList<>(initialUpdateResponse.brokers()),
  20. initialUpdateResponse.clusterId(),
  21. initialUpdateResponse.controller().id(),
  22. topicMetadata);
  23. client.prepareMetadataUpdate(updateResponse);
  24. Future<RecordMetadata> future = producer.send(record);
  25. assertEquals("Cluster has incorrect invalid topic list.", Collections.singleton(invalidTopicName),
  26. metadata.fetch().invalidTopics());
  27. TestUtils.assertFutureError(future, InvalidTopicException.class);
  28. producer.close(Duration.ofMillis(0));
  29. }

相关文章