本文整理了Java中kafka.javaapi.producer.Producer.close()
方法的一些代码示例,展示了Producer.close()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.close()
方法的具体详情如下:
包路径:kafka.javaapi.producer.Producer
类名称:Producer
方法名:close
暂无
代码示例来源:origin: apache/incubator-pinot
public void shutdown() {
keepIndexing = false;
avroDataStream = null;
producer.close();
producer = null;
service.shutdown();
}
代码示例来源:origin: org.apache.kafka/kafka_2.9.2
public void awaitShutdown() {
try {
shutdownComplete.await();
producer.close();
logger.info("Producer thread " + threadName + " shutdown complete");
} catch(InterruptedException ie) {
logger.warn("Interrupt during shutdown of ProducerThread", ie);
}
}
}
代码示例来源:origin: linkedin/camus
} finally {
if (producer != null) {
producer.close();
代码示例来源:origin: linkedin/camus
private static List<Message> writeKafka(String topic, int numOfMessages) {
List<Message> messages = new ArrayList<Message>();
List<KeyedMessage<String, String>> kafkaMessages = new ArrayList<KeyedMessage<String, String>>();
for (int i = 0; i < numOfMessages; i++) {
Message msg = new Message(RANDOM.nextInt());
messages.add(msg);
kafkaMessages.add(new KeyedMessage<String, String>(topic, Integer.toString(i), gson.toJson(msg)));
}
Properties producerProps = cluster.getProps();
producerProps.setProperty("serializer.class", StringEncoder.class.getName());
producerProps.setProperty("key.serializer.class", StringEncoder.class.getName());
Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(producerProps));
try {
producer.send(kafkaMessages);
} finally {
producer.close();
}
return messages;
}
代码示例来源:origin: linkedin/camus
private Producer mockProducerSendThrowsException() {
Producer mockProducer = EasyMock.createMock(Producer.class);
mockProducer.send((KeyedMessage) EasyMock.anyObject());
EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).anyTimes();
mockProducer.close();
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(mockProducer);
return mockProducer;
}
代码示例来源:origin: HuaweiBigData/StreamCQL
/**
* {@inheritDoc}
*/
@Override
public void destroy()
throws StreamingException
{
if (producer != null)
{
producer.close();
}
}
代码示例来源:origin: gwenshap/kafka-examples
@Override
public void close() {
producer.close();
}
}
代码示例来源:origin: apache/eagle
@Override
public void cleanup() {
if (this.producer != null) {
this.producer.close();
}
}
代码示例来源:origin: com.github.hackerwin7/jlib-utils
/**
* close producer client
*/
public void close() {
if(producer != null)
producer.close();
}
代码示例来源:origin: org.apache.apex/malhar-contrib
/**
* Implement Component Interface.
*/
@Override
public void teardown()
{
producer.close();
}
代码示例来源:origin: linkedin/camus
private Producer mockProducerThirdSendSucceed() {
Producer mockProducer = EasyMock.createMock(Producer.class);
mockProducer.send((KeyedMessage) EasyMock.anyObject());
EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).times(2);
mockProducer.send((KeyedMessage) EasyMock.anyObject());
EasyMock.expectLastCall().times(1);
mockProducer.close();
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(mockProducer);
return mockProducer;
}
代码示例来源:origin: apache/eagle
@Override
public void close() throws IOException {
if (this.producer != null) {
LOGGER.info("Closing kafka producer for stream {}", this.streamId);
this.producer.close();
}
}
}
代码示例来源:origin: org.apache.apex/malhar-contrib
public void stopServer() throws IOException {
serverSocket.close();
connectionSocket.close();
producer.close();
}
}
代码示例来源:origin: apache/apex-malhar
public void stopServer() throws IOException
{
serverSocket.close();
connectionSocket.close();
producer.close();
}
}
代码示例来源:origin: locationtech/geowave
public synchronized void close() {
for (final Producer<String, T> producer : cachedProducers.values()) {
try {
producer.close();
} catch (final Exception e) {
LOGGER.warn("Unable to close kafka producer", e);
}
}
cachedProducers.clear();
}
}
代码示例来源:origin: com.ebay.jetstream/jetstream-messaging
private void closeProducers(List<Producer<byte[], byte[]>> producers) {
Iterator<Producer<byte[], byte[]>> it = producers.iterator();
while (it.hasNext()) {
Producer<byte[], byte[]> producer = it.next();
if (producer != null) {
producer.close();
producer = null;
}
it.remove();
}
}
代码示例来源:origin: org.apache.twill/twill-core
@Override
public void run() {
// Call from cancel() through executor only.
cancelChangeListener.cancel();
Producer<Integer, ByteBuffer> kafkaProducer = producer.get();
kafkaProducer.close();
executor.shutdownNow();
}
}
代码示例来源:origin: thilinamb/flume-ng-kafka-sink
@Override
public synchronized void stop() {
producer.close();
super.stop();
}
代码示例来源:origin: com.netflix.suro/suro-kafka
@Override
protected void innerClose() {
super.innerClose();
producer.close();
}
代码示例来源:origin: buildlackey/cep
public void shutdown() {
producer.close();
try { // Give producer some time...
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaServer.shutdown();
kafkaServer.awaitShutdown();
}
内容来源于网络,如有侵权,请联系作者删除!