kafka.javaapi.producer.Producer.close()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(4.9k)|赞(0)|评价(0)|浏览(199)

本文整理了Java中kafka.javaapi.producer.Producer.close()方法的一些代码示例,展示了Producer.close()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.close()方法的具体详情如下:
包路径:kafka.javaapi.producer.Producer
类名称:Producer
方法名:close

Producer.close介绍

暂无

代码示例

代码示例来源:origin: apache/incubator-pinot

  1. public void shutdown() {
  2. keepIndexing = false;
  3. avroDataStream = null;
  4. producer.close();
  5. producer = null;
  6. service.shutdown();
  7. }

代码示例来源:origin: org.apache.kafka/kafka_2.9.2

  1. public void awaitShutdown() {
  2. try {
  3. shutdownComplete.await();
  4. producer.close();
  5. logger.info("Producer thread " + threadName + " shutdown complete");
  6. } catch(InterruptedException ie) {
  7. logger.warn("Interrupt during shutdown of ProducerThread", ie);
  8. }
  9. }
  10. }

代码示例来源:origin: linkedin/camus

  1. } finally {
  2. if (producer != null) {
  3. producer.close();

代码示例来源:origin: linkedin/camus

  1. private static List<Message> writeKafka(String topic, int numOfMessages) {
  2. List<Message> messages = new ArrayList<Message>();
  3. List<KeyedMessage<String, String>> kafkaMessages = new ArrayList<KeyedMessage<String, String>>();
  4. for (int i = 0; i < numOfMessages; i++) {
  5. Message msg = new Message(RANDOM.nextInt());
  6. messages.add(msg);
  7. kafkaMessages.add(new KeyedMessage<String, String>(topic, Integer.toString(i), gson.toJson(msg)));
  8. }
  9. Properties producerProps = cluster.getProps();
  10. producerProps.setProperty("serializer.class", StringEncoder.class.getName());
  11. producerProps.setProperty("key.serializer.class", StringEncoder.class.getName());
  12. Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(producerProps));
  13. try {
  14. producer.send(kafkaMessages);
  15. } finally {
  16. producer.close();
  17. }
  18. return messages;
  19. }

代码示例来源:origin: linkedin/camus

  1. private Producer mockProducerSendThrowsException() {
  2. Producer mockProducer = EasyMock.createMock(Producer.class);
  3. mockProducer.send((KeyedMessage) EasyMock.anyObject());
  4. EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).anyTimes();
  5. mockProducer.close();
  6. EasyMock.expectLastCall().anyTimes();
  7. EasyMock.replay(mockProducer);
  8. return mockProducer;
  9. }

代码示例来源:origin: HuaweiBigData/StreamCQL

  1. /**
  2. * {@inheritDoc}
  3. */
  4. @Override
  5. public void destroy()
  6. throws StreamingException
  7. {
  8. if (producer != null)
  9. {
  10. producer.close();
  11. }
  12. }

代码示例来源:origin: gwenshap/kafka-examples

  1. @Override
  2. public void close() {
  3. producer.close();
  4. }
  5. }

代码示例来源:origin: apache/eagle

  1. @Override
  2. public void cleanup() {
  3. if (this.producer != null) {
  4. this.producer.close();
  5. }
  6. }

代码示例来源:origin: com.github.hackerwin7/jlib-utils

  1. /**
  2. * close producer client
  3. */
  4. public void close() {
  5. if(producer != null)
  6. producer.close();
  7. }

代码示例来源:origin: org.apache.apex/malhar-contrib

  1. /**
  2. * Implement Component Interface.
  3. */
  4. @Override
  5. public void teardown()
  6. {
  7. producer.close();
  8. }

代码示例来源:origin: linkedin/camus

  1. private Producer mockProducerThirdSendSucceed() {
  2. Producer mockProducer = EasyMock.createMock(Producer.class);
  3. mockProducer.send((KeyedMessage) EasyMock.anyObject());
  4. EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).times(2);
  5. mockProducer.send((KeyedMessage) EasyMock.anyObject());
  6. EasyMock.expectLastCall().times(1);
  7. mockProducer.close();
  8. EasyMock.expectLastCall().anyTimes();
  9. EasyMock.replay(mockProducer);
  10. return mockProducer;
  11. }

代码示例来源:origin: apache/eagle

  1. @Override
  2. public void close() throws IOException {
  3. if (this.producer != null) {
  4. LOGGER.info("Closing kafka producer for stream {}", this.streamId);
  5. this.producer.close();
  6. }
  7. }
  8. }

代码示例来源:origin: org.apache.apex/malhar-contrib

  1. public void stopServer() throws IOException {
  2. serverSocket.close();
  3. connectionSocket.close();
  4. producer.close();
  5. }
  6. }

代码示例来源:origin: apache/apex-malhar

  1. public void stopServer() throws IOException
  2. {
  3. serverSocket.close();
  4. connectionSocket.close();
  5. producer.close();
  6. }
  7. }

代码示例来源:origin: locationtech/geowave

  1. public synchronized void close() {
  2. for (final Producer<String, T> producer : cachedProducers.values()) {
  3. try {
  4. producer.close();
  5. } catch (final Exception e) {
  6. LOGGER.warn("Unable to close kafka producer", e);
  7. }
  8. }
  9. cachedProducers.clear();
  10. }
  11. }

代码示例来源:origin: com.ebay.jetstream/jetstream-messaging

  1. private void closeProducers(List<Producer<byte[], byte[]>> producers) {
  2. Iterator<Producer<byte[], byte[]>> it = producers.iterator();
  3. while (it.hasNext()) {
  4. Producer<byte[], byte[]> producer = it.next();
  5. if (producer != null) {
  6. producer.close();
  7. producer = null;
  8. }
  9. it.remove();
  10. }
  11. }

代码示例来源:origin: org.apache.twill/twill-core

  1. @Override
  2. public void run() {
  3. // Call from cancel() through executor only.
  4. cancelChangeListener.cancel();
  5. Producer<Integer, ByteBuffer> kafkaProducer = producer.get();
  6. kafkaProducer.close();
  7. executor.shutdownNow();
  8. }
  9. }

代码示例来源:origin: thilinamb/flume-ng-kafka-sink

  1. @Override
  2. public synchronized void stop() {
  3. producer.close();
  4. super.stop();
  5. }

代码示例来源:origin: com.netflix.suro/suro-kafka

  1. @Override
  2. protected void innerClose() {
  3. super.innerClose();
  4. producer.close();
  5. }

代码示例来源:origin: buildlackey/cep

  1. public void shutdown() {
  2. producer.close();
  3. try { // Give producer some time...
  4. Thread.sleep(1000);
  5. } catch (InterruptedException e) {
  6. e.printStackTrace();
  7. }
  8. kafkaServer.shutdown();
  9. kafkaServer.awaitShutdown();
  10. }

相关文章