本文整理了Java中org.apache.kafka.clients.producer.Producer.close()
方法的一些代码示例,展示了Producer.close()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.close()
方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:close
[英]Close this producer
[中]关闭这个制作人
代码示例来源:origin: apache/storm
@Override
public void cleanup() {
producer.close();
}
代码示例来源:origin: OryxProject/oryx
@Override
public synchronized void close() {
if (producer != null) {
producer.close();
}
}
代码示例来源:origin: line/armeria
@Override
protected void close() {
if (needToCloseProducer) {
producer.close();
}
}
}
代码示例来源:origin: OryxProject/oryx
@Override
public synchronized void close() {
if (producer != null) {
producer.close();
}
}
代码示例来源:origin: apache/nifi
/**
* Will close the underlying {@link KafkaProducer}
*/
@Override
public void close() {
this.kafkaProducer.close();
}
代码示例来源:origin: jmxtrans/jmxtrans
@Override
public void close() {
producer.close();
}
}
代码示例来源:origin: jmxtrans/jmxtrans
@Override
public void close() {
producer.close();
}
}
代码示例来源:origin: apache/kylin
public void close() {
producer.close();
}
}
代码示例来源:origin: apache/kafka
/**
* See {@link KafkaProducer#close(Duration)}
*/
void close(Duration timeout);
}
代码示例来源:origin: alibaba/canal
@Override
public void stop() {
try {
logger.info("## stop the kafka producer");
if (producer != null) {
producer.close();
}
if (producer2 != null) {
producer2.close();
}
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping kafka producer:", e);
} finally {
logger.info("## kafka producer is down.");
}
}
代码示例来源:origin: apache/incubator-gobblin
@Override
public void close()
throws IOException {
log.debug("Close called");
this.producer.close();
}
代码示例来源:origin: apache/incubator-druid
@Override
@LifecycleStop
public void close()
{
scheduler.shutdownNow();
producer.close();
}
}
代码示例来源:origin: confluentinc/ksql
public void close() {
commandConsumer.wakeup();
commandConsumer.close();
commandProducer.close();
}
}
代码示例来源:origin: apache/kafka
@Test
public void closeShouldBeIdempotent() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Producer producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
producer.close();
producer.close();
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldCloseAllResources() {
// When:
commandTopic.close();
//Then:
final InOrder ordered = inOrder(commandConsumer);
ordered.verify(commandConsumer).wakeup();
ordered.verify(commandConsumer).close();
verify(commandProducer).close();
}
代码示例来源:origin: line/armeria
@Test
public void closeProducerWhenRequested() {
final KafkaAccessLogWriter<String, String> service =
new KafkaAccessLogWriter<>(producer, TOPIC_NAME, log -> "");
service.shutdown().join();
verify(producer, times(1)).close();
}
}
代码示例来源:origin: line/armeria
@Test
public void testDoNotCloseProducerWhenNotRequested() {
final KafkaStructuredLoggingServiceExposed service =
new KafkaStructuredLoggingServiceExposed(producer, null, false);
service.close();
verify(producer, times(0)).close();
}
}
代码示例来源:origin: line/armeria
@Test
public void testCloseProducerWhenRequested() {
final KafkaStructuredLoggingServiceExposed service =
new KafkaStructuredLoggingServiceExposed(producer, null, true);
service.close();
verify(producer, times(1)).close();
}
代码示例来源:origin: apache/kafka
@Test(expected = KafkaException.class)
public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Time time = new MockTime();
MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
metadata.update(initialUpdateResponse, time.milliseconds());
MockClient client = new MockClient(time, metadata);
Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
metadata, client, null, time);
try {
producer.initTransactions();
} catch (TimeoutException e) {
// expected
}
// other transactional operations should not be allowed if we catch the error after initTransactions failed
try {
producer.beginTransaction();
} finally {
producer.close(Duration.ofMillis(0));
}
}
代码示例来源:origin: apache/kafka
@Test
public void testSendToInvalidTopic() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
Time time = new MockTime();
MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, emptyMap());
Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
metadata.update(initialUpdateResponse, time.milliseconds());
MockClient client = new MockClient(time, metadata);
Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
metadata, client, null, time);
String invalidTopicName = "topic abc"; // Invalid topic name due to space
ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
invalidTopicName, false, Collections.emptyList()));
MetadataResponse updateResponse = new MetadataResponse(
new ArrayList<>(initialUpdateResponse.brokers()),
initialUpdateResponse.clusterId(),
initialUpdateResponse.controller().id(),
topicMetadata);
client.prepareMetadataUpdate(updateResponse);
Future<RecordMetadata> future = producer.send(record);
assertEquals("Cluster has incorrect invalid topic list.", Collections.singleton(invalidTopicName),
metadata.fetch().invalidTopics());
TestUtils.assertFutureError(future, InvalidTopicException.class);
producer.close(Duration.ofMillis(0));
}
内容来源于网络,如有侵权,请联系作者删除!