本文整理了Java中org.apache.kafka.clients.producer.Producer.flush()
方法的一些代码示例,展示了Producer.flush()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.flush()
方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:flush
[英]See KafkaProducer#flush()
[中]参见《卡夫卡制作人》#flush()
代码示例来源:origin: apache/incubator-druid
@Override
public void flush()
{
producer.flush();
}
代码示例来源:origin: apache/incubator-gobblin
@Override
public void flush()
throws IOException {
this.producer.flush();
}
代码示例来源:origin: openzipkin/brave
@Override public void flush() {
delegate.flush();
}
代码示例来源:origin: uber-common/jvm-profiler
@Override
public void close() {
synchronized (this) {
if (producer == null) {
return;
}
producer.flush();
producer.close();
producer = null;
}
}
代码示例来源:origin: uber-common/jvm-profiler
@Override
public void report(String profilerName, Map<String, Object> metrics) {
ensureProducer();
String topicName = getTopic(profilerName);
String str = JsonUtils.serialize(metrics);
byte[] message = str.getBytes(StandardCharsets.UTF_8);
Future<RecordMetadata> future = producer.send(
new ProducerRecord<String, byte[]>(topicName, message));
if (syncMode) {
producer.flush();
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
代码示例来源:origin: apache/nifi
public PublishResult complete() {
if (tracker == null) {
if (messagesSent.get() == 0L) {
return PublishResult.EMPTY;
}
throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
}
producer.flush();
try {
tracker.awaitCompletion(maxAckWaitMillis);
return tracker.createPublishResult();
} catch (final InterruptedException e) {
logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
Thread.currentThread().interrupt();
return tracker.failOutstanding(e);
} catch (final TimeoutException e) {
logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
return tracker.failOutstanding(e);
} finally {
tracker = null;
}
}
代码示例来源:origin: apache/nifi
public PublishResult complete() {
if (tracker == null) {
throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
}
producer.flush();
try {
tracker.awaitCompletion(maxAckWaitMillis);
return tracker.createPublishResult();
} catch (final InterruptedException e) {
logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
Thread.currentThread().interrupt();
return tracker.failOutstanding(e);
} catch (final TimeoutException e) {
logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
return tracker.failOutstanding(e);
} finally {
tracker = null;
}
}
代码示例来源:origin: linkedin/cruise-control
_producer.flush();
if (LOG.isDebugEnabled()) {
LOG.debug("Stored {} partition metric samples and {} broker metric samples to Kafka",
代码示例来源:origin: apache/nifi
public PublishResult complete() {
if (tracker == null) {
if (messagesSent.get() == 0L) {
return PublishResult.EMPTY;
}
rollback();
throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
}
producer.flush();
if (activeTransaction) {
producer.commitTransaction();
activeTransaction = false;
}
try {
tracker.awaitCompletion(maxAckWaitMillis);
return tracker.createPublishResult();
} catch (final InterruptedException e) {
logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
Thread.currentThread().interrupt();
return tracker.failOutstanding(e);
} catch (final TimeoutException e) {
logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
return tracker.failOutstanding(e);
} finally {
tracker = null;
}
}
代码示例来源:origin: apache/nifi
public PublishResult complete() {
if (tracker == null) {
if (messagesSent.get() == 0L) {
return PublishResult.EMPTY;
}
rollback();
throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
}
producer.flush();
if (activeTransaction) {
producer.commitTransaction();
activeTransaction = false;
}
try {
tracker.awaitCompletion(maxAckWaitMillis);
return tracker.createPublishResult();
} catch (final InterruptedException e) {
logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
Thread.currentThread().interrupt();
return tracker.failOutstanding(e);
} catch (final TimeoutException e) {
logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
return tracker.failOutstanding(e);
} finally {
tracker = null;
}
}
代码示例来源:origin: apache/nifi
public PublishResult complete() {
if (tracker == null) {
if (messagesSent.get() == 0L) {
return PublishResult.EMPTY;
}
rollback();
throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
}
producer.flush();
if (activeTransaction) {
producer.commitTransaction();
activeTransaction = false;
}
try {
tracker.awaitCompletion(maxAckWaitMillis);
return tracker.createPublishResult();
} catch (final InterruptedException e) {
logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
Thread.currentThread().interrupt();
return tracker.failOutstanding(e);
} catch (final TimeoutException e) {
logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
return tracker.failOutstanding(e);
} finally {
tracker = null;
}
}
代码示例来源:origin: spring-projects/spring-kafka
@Override
public void flush() {
this.delegate.flush();
}
代码示例来源:origin: apache/incubator-gobblin
producer.flush();
producer.flush();
producer.flush();
代码示例来源:origin: spring-projects/spring-kafka
/**
* {@inheritDoc}
* <p><b>Note</b> It only makes sense to invoke this method if the
* {@link ProducerFactory} serves up a singleton producer (such as the
* {@link DefaultKafkaProducerFactory}).
*/
@Override
public void flush() {
Producer<K, V> producer = getTheProducer();
try {
producer.flush();
}
finally {
closeProducer(producer, inTransaction());
}
}
代码示例来源:origin: confluentinc/kafka-streams-examples
/**
* @param topic Kafka topic to write the data records to
* @param records Data records to write to Kafka
* @param producerConfig Kafka producer configuration
* @param <K> Key type of the data records
* @param <V> Value type of the data records
*/
public static <K, V> void produceKeyValuesSynchronously(
String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
throws ExecutionException, InterruptedException {
Producer<K, V> producer = new KafkaProducer<>(producerConfig);
for (KeyValue<K, V> record : records) {
Future<RecordMetadata> f = producer.send(
new ProducerRecord<>(topic, record.key, record.value));
f.get();
}
producer.flush();
producer.close();
}
代码示例来源:origin: org.apache.kafka/connect-runtime
/**
* Flush the underlying producer to ensure that all pending writes have been sent.
*/
public void flush() {
producer.flush();
}
代码示例来源:origin: org.apache.kafka/kafka-streams
@Override
public void flush() {
log.debug("Flushing producer");
producer.flush();
checkForException();
}
代码示例来源:origin: org.jbpm.contrib/kafka-workitem
@Override
public void close() {
if (producer != null) {
producer.flush();
producer.close();
}
}
}
代码示例来源:origin: apache/incubator-rya
@Override
public void fromCollection(final Collection<VisibilityStatement> statements) throws RyaStreamsException {
requireNonNull(statements);
for(final VisibilityStatement statement : statements) {
producer.send(new ProducerRecord<>(topic, statement));
}
producer.flush();
}
}
代码示例来源:origin: reactor/reactor-kafka
/**
* Tests invocation of methods on KafkaProducer using {@link KafkaSender#doOnProducer(java.util.function.Function)}
*/
@Test
public void producerMethods() {
testProducerMethod(p -> assertEquals(0, p.metrics().size()));
testProducerMethod(p -> assertEquals(2, p.partitionsFor(topic).size()));
testProducerMethod(p -> p.flush());
}
内容来源于网络,如有侵权,请联系作者删除!