本文整理了Java中org.apache.kafka.clients.producer.Producer
类的一些代码示例,展示了Producer
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer
类的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
[英]The interface for the KafkaProducer
[中]卡夫卡制作人的界面
代码示例来源:origin: QNJR-GROUP/EasyTransaction
public Future<RecordMetadata> publishKafkaMessage(ProducerRecord<String,byte[]> record){
return kafkaProducer.send(record);
}
}
代码示例来源:origin: apache/storm
@Override
public void cleanup() {
producer.close();
}
代码示例来源:origin: uber-common/jvm-profiler
@Override
public void close() {
synchronized (this) {
if (producer == null) {
return;
}
producer.flush();
producer.close();
producer = null;
}
}
代码示例来源:origin: apache/nifi
void beginTransaction() {
if (!useTransactions) {
return;
}
if (!transactionsInitialized) {
producer.initTransactions();
transactionsInitialized = true;
}
producer.beginTransaction();
activeTransaction = true;
}
代码示例来源:origin: spring-projects/spring-kafka
@SuppressWarnings("unchecked")
Producer<Object, Object> producer2 = mock(Producer.class);
producer1.initTransactions();
AtomicBoolean first = new AtomicBoolean(true);
inOrder.verify(producer1).beginTransaction();
inOrder.verify(producer2).beginTransaction();
inOrder.verify(producer2).commitTransaction();
inOrder.verify(producer2).close();
inOrder.verify(producer1).commitTransaction();
inOrder.verify(producer1).close();
代码示例来源:origin: apache/kafka
try {
Future<?> future = executor.submit(() -> {
producer.send(new ProducerRecord<>("topic", "key", "value"));
try {
producer.close();
fail("Close should block and throw.");
} catch (Exception e) {
代码示例来源:origin: uber-common/jvm-profiler
@Override
public void report(String profilerName, Map<String, Object> metrics) {
ensureProducer();
String topicName = getTopic(profilerName);
String str = JsonUtils.serialize(metrics);
byte[] message = str.getBytes(StandardCharsets.UTF_8);
Future<RecordMetadata> future = producer.send(
new ProducerRecord<String, byte[]>(topicName, message));
if (syncMode) {
producer.flush();
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
代码示例来源:origin: confluentinc/kafka-streams-examples
/**
* @param topic Kafka topic to write the data records to
* @param records Data records to write to Kafka
* @param producerConfig Kafka producer configuration
* @param <K> Key type of the data records
* @param <V> Value type of the data records
*/
public static <K, V> void produceKeyValuesSynchronously(
String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
throws ExecutionException, InterruptedException {
Producer<K, V> producer = new KafkaProducer<>(producerConfig);
for (KeyValue<K, V> record : records) {
Future<RecordMetadata> f = producer.send(
new ProducerRecord<>(topic, record.key, record.value));
f.get();
}
producer.flush();
producer.close();
}
代码示例来源:origin: apache/kafka
@Test(expected = KafkaException.class)
public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Time time = new MockTime();
MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
metadata.update(initialUpdateResponse, time.milliseconds());
MockClient client = new MockClient(time, metadata);
Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
metadata, client, null, time);
try {
producer.initTransactions();
} catch (TimeoutException e) {
// expected
}
// other transactional operations should not be allowed if we catch the error after initTransactions failed
try {
producer.beginTransaction();
} finally {
producer.close(Duration.ofMillis(0));
}
}
代码示例来源:origin: apache/incubator-druid
@Override
public void flush()
{
producer.flush();
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka11
@Override
public Collection<PartitionInfo> call() throws Exception {
Producer<byte[], byte[]> producer = producerFB.createProducer();
List<PartitionInfo> partitionsFor = producer.partitionsFor(destination.getName());
producer.close();
((DisposableBean) producerFB).destroy();
return partitionsFor;
}
代码示例来源:origin: apache/nifi
public PublishResult complete() {
if (tracker == null) {
if (messagesSent.get() == 0L) {
return PublishResult.EMPTY;
}
rollback();
throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
}
producer.flush();
if (activeTransaction) {
producer.commitTransaction();
activeTransaction = false;
}
try {
tracker.awaitCompletion(maxAckWaitMillis);
return tracker.createPublishResult();
} catch (final InterruptedException e) {
logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
Thread.currentThread().interrupt();
return tracker.failOutstanding(e);
} catch (final TimeoutException e) {
logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
return tracker.failOutstanding(e);
} finally {
tracker = null;
}
}
代码示例来源:origin: apache/kylin
public void tryFetchMetadataFor(String topic) {
producer.partitionsFor(topic);
}
代码示例来源:origin: reactor/reactor-kafka
/**
* Tests invocation of methods on KafkaProducer using {@link KafkaSender#doOnProducer(java.util.function.Function)}
*/
@Test
public void producerMethods() {
testProducerMethod(p -> assertEquals(0, p.metrics().size()));
testProducerMethod(p -> assertEquals(2, p.partitionsFor(topic).size()));
testProducerMethod(p -> p.flush());
}
代码示例来源:origin: openzipkin/brave
@Override public void initTransactions() {
delegate.initTransactions();
}
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testDeadLetterPublisherWhileTransactionActive() {
@SuppressWarnings("unchecked")
Producer<Object, Object> producer1 = mock(Producer.class);
@SuppressWarnings("unchecked")
Producer<Object, Object> producer2 = mock(Producer.class);
producer1.initTransactions();
@SuppressWarnings("unchecked")
ProducerFactory<Object, Object> pf = mock(ProducerFactory.class);
given(pf.transactionCapable()).willReturn(true);
given(pf.createProducer()).willReturn(producer1).willReturn(producer2);
KafkaTemplate<Object, Object> template = spy(new KafkaTemplate<>(pf));
template.setDefaultTopic(STRING_KEY_TOPIC);
KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
new TransactionTemplate(tm).execute(s -> {
new DeadLetterPublishingRecoverer(template).accept(
new ConsumerRecord<>(STRING_KEY_TOPIC, 0, 0L, "key", "foo"),
new RuntimeException("foo"));
return null;
});
verify(producer1).beginTransaction();
verify(producer1).commitTransaction();
verify(producer1).close();
verify(producer2, never()).beginTransaction();
verify(template, never()).executeInTransaction(any());
}
代码示例来源:origin: apache/kafka
producer.send(new ProducerRecord<>(topicName, "key", "value"));
fail();
} catch (Exception e) {
producer.close(Duration.ofMillis(0));
TestUtils.waitForCondition(() -> sendException.get() != null, "No producer exception within timeout");
assertEquals(KafkaException.class, sendException.get().getClass());
代码示例来源:origin: linkedin/cruise-control
final AtomicInteger metricSampleCount = new AtomicInteger(0);
for (PartitionMetricSample sample : samples.partitionMetricSamples()) {
_producer.send(new ProducerRecord<>(_partitionMetricSampleStoreTopic, null, sample.sampleTime(), null, sample.toBytes()),
new Callback() {
@Override
_producer.send(new ProducerRecord<>(_brokerMetricSampleStoreTopic, sample.toBytes()),
new Callback() {
@Override
_producer.flush();
if (LOG.isDebugEnabled()) {
LOG.debug("Stored {} partition metric samples and {} broker metric samples to Kafka",
代码示例来源:origin: apache/incubator-gobblin
@Override
public void flush()
throws IOException {
this.producer.flush();
}
代码示例来源:origin: apache/nifi
public PublishResult complete() {
if (tracker == null) {
if (messagesSent.get() == 0L) {
return PublishResult.EMPTY;
}
rollback();
throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
}
producer.flush();
if (activeTransaction) {
producer.commitTransaction();
activeTransaction = false;
}
try {
tracker.awaitCompletion(maxAckWaitMillis);
return tracker.createPublishResult();
} catch (final InterruptedException e) {
logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
Thread.currentThread().interrupt();
return tracker.failOutstanding(e);
} catch (final TimeoutException e) {
logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
return tracker.failOutstanding(e);
} finally {
tracker = null;
}
}
内容来源于网络,如有侵权,请联系作者删除!