org.apache.kafka.clients.producer.Producer.flush()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(242)

本文整理了Java中org.apache.kafka.clients.producer.Producer.flush()方法的一些代码示例,展示了Producer.flush()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.flush()方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:flush

Producer.flush介绍

[英]See KafkaProducer#flush()
[中]参见《卡夫卡制作人》#flush()

代码示例

代码示例来源:origin: apache/incubator-druid

  1. @Override
  2. public void flush()
  3. {
  4. producer.flush();
  5. }

代码示例来源:origin: apache/incubator-gobblin

  1. @Override
  2. public void flush()
  3. throws IOException {
  4. this.producer.flush();
  5. }

代码示例来源:origin: openzipkin/brave

  1. @Override public void flush() {
  2. delegate.flush();
  3. }

代码示例来源:origin: uber-common/jvm-profiler

  1. @Override
  2. public void close() {
  3. synchronized (this) {
  4. if (producer == null) {
  5. return;
  6. }
  7. producer.flush();
  8. producer.close();
  9. producer = null;
  10. }
  11. }

代码示例来源:origin: uber-common/jvm-profiler

  1. @Override
  2. public void report(String profilerName, Map<String, Object> metrics) {
  3. ensureProducer();
  4. String topicName = getTopic(profilerName);
  5. String str = JsonUtils.serialize(metrics);
  6. byte[] message = str.getBytes(StandardCharsets.UTF_8);
  7. Future<RecordMetadata> future = producer.send(
  8. new ProducerRecord<String, byte[]>(topicName, message));
  9. if (syncMode) {
  10. producer.flush();
  11. try {
  12. future.get();
  13. } catch (InterruptedException | ExecutionException e) {
  14. throw new RuntimeException(e);
  15. }
  16. }
  17. }

代码示例来源:origin: apache/nifi

  1. public PublishResult complete() {
  2. if (tracker == null) {
  3. if (messagesSent.get() == 0L) {
  4. return PublishResult.EMPTY;
  5. }
  6. throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  7. }
  8. producer.flush();
  9. try {
  10. tracker.awaitCompletion(maxAckWaitMillis);
  11. return tracker.createPublishResult();
  12. } catch (final InterruptedException e) {
  13. logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
  14. Thread.currentThread().interrupt();
  15. return tracker.failOutstanding(e);
  16. } catch (final TimeoutException e) {
  17. logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
  18. return tracker.failOutstanding(e);
  19. } finally {
  20. tracker = null;
  21. }
  22. }

代码示例来源:origin: apache/nifi

  1. public PublishResult complete() {
  2. if (tracker == null) {
  3. throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  4. }
  5. producer.flush();
  6. try {
  7. tracker.awaitCompletion(maxAckWaitMillis);
  8. return tracker.createPublishResult();
  9. } catch (final InterruptedException e) {
  10. logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
  11. Thread.currentThread().interrupt();
  12. return tracker.failOutstanding(e);
  13. } catch (final TimeoutException e) {
  14. logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
  15. return tracker.failOutstanding(e);
  16. } finally {
  17. tracker = null;
  18. }
  19. }

代码示例来源:origin: linkedin/cruise-control

  1. _producer.flush();
  2. if (LOG.isDebugEnabled()) {
  3. LOG.debug("Stored {} partition metric samples and {} broker metric samples to Kafka",

代码示例来源:origin: apache/nifi

  1. public PublishResult complete() {
  2. if (tracker == null) {
  3. if (messagesSent.get() == 0L) {
  4. return PublishResult.EMPTY;
  5. }
  6. rollback();
  7. throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  8. }
  9. producer.flush();
  10. if (activeTransaction) {
  11. producer.commitTransaction();
  12. activeTransaction = false;
  13. }
  14. try {
  15. tracker.awaitCompletion(maxAckWaitMillis);
  16. return tracker.createPublishResult();
  17. } catch (final InterruptedException e) {
  18. logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
  19. Thread.currentThread().interrupt();
  20. return tracker.failOutstanding(e);
  21. } catch (final TimeoutException e) {
  22. logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
  23. return tracker.failOutstanding(e);
  24. } finally {
  25. tracker = null;
  26. }
  27. }

代码示例来源:origin: apache/nifi

  1. public PublishResult complete() {
  2. if (tracker == null) {
  3. if (messagesSent.get() == 0L) {
  4. return PublishResult.EMPTY;
  5. }
  6. rollback();
  7. throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  8. }
  9. producer.flush();
  10. if (activeTransaction) {
  11. producer.commitTransaction();
  12. activeTransaction = false;
  13. }
  14. try {
  15. tracker.awaitCompletion(maxAckWaitMillis);
  16. return tracker.createPublishResult();
  17. } catch (final InterruptedException e) {
  18. logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
  19. Thread.currentThread().interrupt();
  20. return tracker.failOutstanding(e);
  21. } catch (final TimeoutException e) {
  22. logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
  23. return tracker.failOutstanding(e);
  24. } finally {
  25. tracker = null;
  26. }
  27. }

代码示例来源:origin: apache/nifi

  1. public PublishResult complete() {
  2. if (tracker == null) {
  3. if (messagesSent.get() == 0L) {
  4. return PublishResult.EMPTY;
  5. }
  6. rollback();
  7. throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  8. }
  9. producer.flush();
  10. if (activeTransaction) {
  11. producer.commitTransaction();
  12. activeTransaction = false;
  13. }
  14. try {
  15. tracker.awaitCompletion(maxAckWaitMillis);
  16. return tracker.createPublishResult();
  17. } catch (final InterruptedException e) {
  18. logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
  19. Thread.currentThread().interrupt();
  20. return tracker.failOutstanding(e);
  21. } catch (final TimeoutException e) {
  22. logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
  23. return tracker.failOutstanding(e);
  24. } finally {
  25. tracker = null;
  26. }
  27. }

代码示例来源:origin: spring-projects/spring-kafka

  1. @Override
  2. public void flush() {
  3. this.delegate.flush();
  4. }

代码示例来源:origin: apache/incubator-gobblin

  1. producer.flush();
  2. producer.flush();
  3. producer.flush();

代码示例来源:origin: spring-projects/spring-kafka

  1. /**
  2. * {@inheritDoc}
  3. * <p><b>Note</b> It only makes sense to invoke this method if the
  4. * {@link ProducerFactory} serves up a singleton producer (such as the
  5. * {@link DefaultKafkaProducerFactory}).
  6. */
  7. @Override
  8. public void flush() {
  9. Producer<K, V> producer = getTheProducer();
  10. try {
  11. producer.flush();
  12. }
  13. finally {
  14. closeProducer(producer, inTransaction());
  15. }
  16. }

代码示例来源:origin: confluentinc/kafka-streams-examples

  1. /**
  2. * @param topic Kafka topic to write the data records to
  3. * @param records Data records to write to Kafka
  4. * @param producerConfig Kafka producer configuration
  5. * @param <K> Key type of the data records
  6. * @param <V> Value type of the data records
  7. */
  8. public static <K, V> void produceKeyValuesSynchronously(
  9. String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
  10. throws ExecutionException, InterruptedException {
  11. Producer<K, V> producer = new KafkaProducer<>(producerConfig);
  12. for (KeyValue<K, V> record : records) {
  13. Future<RecordMetadata> f = producer.send(
  14. new ProducerRecord<>(topic, record.key, record.value));
  15. f.get();
  16. }
  17. producer.flush();
  18. producer.close();
  19. }

代码示例来源:origin: org.apache.kafka/connect-runtime

  1. /**
  2. * Flush the underlying producer to ensure that all pending writes have been sent.
  3. */
  4. public void flush() {
  5. producer.flush();
  6. }

代码示例来源:origin: org.apache.kafka/kafka-streams

  1. @Override
  2. public void flush() {
  3. log.debug("Flushing producer");
  4. producer.flush();
  5. checkForException();
  6. }

代码示例来源:origin: org.jbpm.contrib/kafka-workitem

  1. @Override
  2. public void close() {
  3. if (producer != null) {
  4. producer.flush();
  5. producer.close();
  6. }
  7. }
  8. }

代码示例来源:origin: apache/incubator-rya

  1. @Override
  2. public void fromCollection(final Collection<VisibilityStatement> statements) throws RyaStreamsException {
  3. requireNonNull(statements);
  4. for(final VisibilityStatement statement : statements) {
  5. producer.send(new ProducerRecord<>(topic, statement));
  6. }
  7. producer.flush();
  8. }
  9. }

代码示例来源:origin: reactor/reactor-kafka

  1. /**
  2. * Tests invocation of methods on KafkaProducer using {@link KafkaSender#doOnProducer(java.util.function.Function)}
  3. */
  4. @Test
  5. public void producerMethods() {
  6. testProducerMethod(p -> assertEquals(0, p.metrics().size()));
  7. testProducerMethod(p -> assertEquals(2, p.partitionsFor(topic).size()));
  8. testProducerMethod(p -> p.flush());
  9. }

相关文章