本文整理了Java中org.apache.kafka.clients.producer.Producer.initTransactions()
方法的一些代码示例,展示了Producer.initTransactions()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.initTransactions()
方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:initTransactions
[英]See KafkaProducer#initTransactions()
[中]参见《卡夫卡制作人》#initTransactions()
代码示例来源:origin: openzipkin/brave
@Override public void initTransactions() {
delegate.initTransactions();
}
代码示例来源:origin: apache/nifi
void beginTransaction() {
if (!useTransactions) {
return;
}
if (!transactionsInitialized) {
producer.initTransactions();
transactionsInitialized = true;
}
producer.beginTransaction();
activeTransaction = true;
}
代码示例来源:origin: apache/nifi
void beginTransaction() {
if (!useTransactions) {
return;
}
if (!transactionsInitialized) {
producer.initTransactions();
transactionsInitialized = true;
}
producer.beginTransaction();
activeTransaction = true;
}
代码示例来源:origin: apache/nifi
void beginTransaction() {
if (!useTransactions) {
return;
}
if (!transactionsInitialized) {
producer.initTransactions();
transactionsInitialized = true;
}
producer.beginTransaction();
activeTransaction = true;
}
代码示例来源:origin: alibaba/canal
producer.initTransactions();
} else {
producer2.initTransactions();
代码示例来源:origin: apache/kafka
@Test(expected = TimeoutException.class)
public void testInitTransactionTimeout() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Time time = new MockTime(1);
MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
metadata.update(initialUpdateResponse, time.milliseconds());
MockClient client = new MockClient(time, metadata);
try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
new StringSerializer(), metadata, client, null, time)) {
producer.initTransactions();
fail("initTransactions() should have raised TimeoutException");
}
}
代码示例来源:origin: apache/kafka
@Test(expected = KafkaException.class)
public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Time time = new MockTime();
MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
metadata.update(initialUpdateResponse, time.milliseconds());
MockClient client = new MockClient(time, metadata);
Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
metadata, client, null, time);
try {
producer.initTransactions();
} catch (TimeoutException e) {
// expected
}
// other transactional operations should not be allowed if we catch the error after initTransactions failed
try {
producer.beginTransaction();
} finally {
producer.close(Duration.ofMillis(0));
}
}
代码示例来源:origin: spring-projects/spring-kafka
@Override
public void initTransactions() {
this.delegate.initTransactions();
}
代码示例来源:origin: spring-projects/spring-kafka
private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
Producer<K, V> newProducer;
Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
newProducer = new KafkaProducer<K, V>(newProducerConfigs, this.keySerializer, this.valueSerializer);
newProducer.initTransactions();
return new CloseSafeProducer<K, V>(newProducer, this.cache, remover,
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
}
代码示例来源:origin: spring-projects/spring-kafka
@Override
protected Producer createTransactionalProducer() {
producer.initTransactions();
BlockingQueue<Producer> cache = getCache();
Producer cached = cache.poll();
return cached == null ? new CloseSafeProducer(producer, cache) : cached;
}
代码示例来源:origin: spring-projects/spring-kafka
@Override
protected Producer createTransactionalProducer() {
producer.initTransactions();
BlockingQueue<Producer> cache = getCache();
Producer cached = cache.poll();
return cached == null ? new CloseSafeProducer(producer, cache) : cached;
}
代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client
@Override
public void initTransactions() {
producer.initTransactions();
}
代码示例来源:origin: com.github.combinedmq/combinedmq
@Override
public void initTransactions() {
producer.initTransactions();
}
代码示例来源:origin: spring-projects/spring-kafka
@Test
public void testDeadLetterPublisherWhileTransactionActive() {
@SuppressWarnings("unchecked")
Producer<Object, Object> producer1 = mock(Producer.class);
@SuppressWarnings("unchecked")
Producer<Object, Object> producer2 = mock(Producer.class);
producer1.initTransactions();
@SuppressWarnings("unchecked")
ProducerFactory<Object, Object> pf = mock(ProducerFactory.class);
given(pf.transactionCapable()).willReturn(true);
given(pf.createProducer()).willReturn(producer1).willReturn(producer2);
KafkaTemplate<Object, Object> template = spy(new KafkaTemplate<>(pf));
template.setDefaultTopic(STRING_KEY_TOPIC);
KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
new TransactionTemplate(tm).execute(s -> {
new DeadLetterPublishingRecoverer(template).accept(
new ConsumerRecord<>(STRING_KEY_TOPIC, 0, 0L, "key", "foo"),
new RuntimeException("foo"));
return null;
});
verify(producer1).beginTransaction();
verify(producer1).commitTransaction();
verify(producer1).close();
verify(producer2, never()).beginTransaction();
verify(template, never()).executeInTransaction(any());
}
代码示例来源:origin: spring-projects/spring-kafka
@SuppressWarnings("unchecked")
Producer<Object, Object> producer2 = mock(Producer.class);
producer1.initTransactions();
AtomicBoolean first = new AtomicBoolean(true);
代码示例来源:origin: spring-projects/spring-kafka
inOrder.verify(producer).initTransactions();
inOrder.verify(producer).beginTransaction();
inOrder.verify(producer).send(any(), any());
代码示例来源:origin: org.apache.nifi/nifi-kafka-2-0-processors
void beginTransaction() {
if (!useTransactions) {
return;
}
if (!transactionsInitialized) {
producer.initTransactions();
transactionsInitialized = true;
}
producer.beginTransaction();
activeTransaction = true;
}
代码示例来源:origin: org.apache.nifi/nifi-kafka-1-0-processors
void beginTransaction() {
if (!useTransactions) {
return;
}
if (!transactionsInitialized) {
producer.initTransactions();
transactionsInitialized = true;
}
producer.beginTransaction();
activeTransaction = true;
}
代码示例来源:origin: org.axonframework.extensions.kafka/axon-kafka
private Producer<K, V> createTransactionalProducer() {
Producer<K, V> producer = this.cache.poll();
if (producer != null) {
return producer;
}
Map<String, Object> configs = new HashMap<>(this.configuration);
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
producer = new CloseLazyProducer<>(createKafkaProducer(configs), cache, closeTimeout, timeUnit);
producer.initTransactions();
return producer;
}
代码示例来源:origin: org.axonframework/axon-kafka
private Producer<K, V> createTransactionalProducer() {
Producer<K, V> producer = this.cache.poll();
if (producer != null) {
return producer;
}
Map<String, Object> configs = new HashMap<>(this.configs);
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
producer = new CloseLazyProducer<>(createKafkaProducer(configs), cache, closeTimeout, unit);
producer.initTransactions();
return producer;
}
内容来源于网络,如有侵权,请联系作者删除!