org.apache.kafka.clients.producer.Producer.initTransactions()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(7.8k)|赞(0)|评价(0)|浏览(144)

本文整理了Java中org.apache.kafka.clients.producer.Producer.initTransactions()方法的一些代码示例,展示了Producer.initTransactions()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.initTransactions()方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:initTransactions

Producer.initTransactions介绍

[英]See KafkaProducer#initTransactions()
[中]参见《卡夫卡制作人》#initTransactions()

代码示例

代码示例来源:origin: openzipkin/brave

@Override public void initTransactions() {
 delegate.initTransactions();
}

代码示例来源:origin: apache/nifi

void beginTransaction() {
  if (!useTransactions) {
    return;
  }
  if (!transactionsInitialized) {
    producer.initTransactions();
    transactionsInitialized = true;
  }
  producer.beginTransaction();
  activeTransaction = true;
}

代码示例来源:origin: apache/nifi

void beginTransaction() {
  if (!useTransactions) {
    return;
  }
  if (!transactionsInitialized) {
    producer.initTransactions();
    transactionsInitialized = true;
  }
  producer.beginTransaction();
  activeTransaction = true;
}

代码示例来源:origin: apache/nifi

void beginTransaction() {
  if (!useTransactions) {
    return;
  }
  if (!transactionsInitialized) {
    producer.initTransactions();
    transactionsInitialized = true;
  }
  producer.beginTransaction();
  activeTransaction = true;
}

代码示例来源:origin: alibaba/canal

producer.initTransactions();
} else {
  producer2.initTransactions();

代码示例来源:origin: apache/kafka

@Test(expected = TimeoutException.class)
public void testInitTransactionTimeout() {
  Map<String, Object> configs = new HashMap<>();
  configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
  configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
  configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
  Time time = new MockTime(1);
  MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
  Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
  metadata.update(initialUpdateResponse, time.milliseconds());
  MockClient client = new MockClient(time, metadata);
  try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
      new StringSerializer(), metadata, client, null, time)) {
    producer.initTransactions();
    fail("initTransactions() should have raised TimeoutException");
  }
}

代码示例来源:origin: apache/kafka

@Test(expected = KafkaException.class)
public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
  Map<String, Object> configs = new HashMap<>();
  configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
  configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
  configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
  Time time = new MockTime();
  MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
  Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
  metadata.update(initialUpdateResponse, time.milliseconds());
  MockClient client = new MockClient(time, metadata);
  Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
      metadata, client, null, time);
  try {
    producer.initTransactions();
  } catch (TimeoutException e) {
    // expected
  }
  // other transactional operations should not be allowed if we catch the error after initTransactions failed
  try {
    producer.beginTransaction();
  } finally {
    producer.close(Duration.ofMillis(0));
  }
}

代码示例来源:origin: spring-projects/spring-kafka

@Override
public void initTransactions() {
  this.delegate.initTransactions();
}

代码示例来源:origin: spring-projects/spring-kafka

private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
  Producer<K, V> newProducer;
  Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
  newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
  newProducer = new KafkaProducer<K, V>(newProducerConfigs, this.keySerializer, this.valueSerializer);
  newProducer.initTransactions();
  return new CloseSafeProducer<K, V>(newProducer, this.cache, remover,
      (String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
}

代码示例来源:origin: spring-projects/spring-kafka

@Override
protected Producer createTransactionalProducer() {
  producer.initTransactions();
  BlockingQueue<Producer> cache = getCache();
  Producer cached = cache.poll();
  return cached == null ? new CloseSafeProducer(producer, cache) : cached;
}

代码示例来源:origin: spring-projects/spring-kafka

@Override
protected Producer createTransactionalProducer() {
  producer.initTransactions();
  BlockingQueue<Producer> cache = getCache();
  Producer cached = cache.poll();
  return cached == null ? new CloseSafeProducer(producer, cache) : cached;
}

代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client

@Override
public void initTransactions() {
 producer.initTransactions();
}

代码示例来源:origin: com.github.combinedmq/combinedmq

@Override
public void initTransactions() {
  producer.initTransactions();
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testDeadLetterPublisherWhileTransactionActive() {
  @SuppressWarnings("unchecked")
  Producer<Object, Object> producer1 = mock(Producer.class);
  @SuppressWarnings("unchecked")
  Producer<Object, Object> producer2 = mock(Producer.class);
  producer1.initTransactions();
  @SuppressWarnings("unchecked")
  ProducerFactory<Object, Object> pf = mock(ProducerFactory.class);
  given(pf.transactionCapable()).willReturn(true);
  given(pf.createProducer()).willReturn(producer1).willReturn(producer2);
  KafkaTemplate<Object, Object> template = spy(new KafkaTemplate<>(pf));
  template.setDefaultTopic(STRING_KEY_TOPIC);
  KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
  new TransactionTemplate(tm).execute(s -> {
    new DeadLetterPublishingRecoverer(template).accept(
        new ConsumerRecord<>(STRING_KEY_TOPIC, 0, 0L, "key", "foo"),
        new RuntimeException("foo"));
    return null;
  });
  verify(producer1).beginTransaction();
  verify(producer1).commitTransaction();
  verify(producer1).close();
  verify(producer2, never()).beginTransaction();
  verify(template, never()).executeInTransaction(any());
}

代码示例来源:origin: spring-projects/spring-kafka

@SuppressWarnings("unchecked")
Producer<Object, Object> producer2 = mock(Producer.class);
producer1.initTransactions();
AtomicBoolean first = new AtomicBoolean(true);

代码示例来源:origin: spring-projects/spring-kafka

inOrder.verify(producer).initTransactions();
inOrder.verify(producer).beginTransaction();
inOrder.verify(producer).send(any(), any());

代码示例来源:origin: org.apache.nifi/nifi-kafka-2-0-processors

void beginTransaction() {
  if (!useTransactions) {
    return;
  }
  if (!transactionsInitialized) {
    producer.initTransactions();
    transactionsInitialized = true;
  }
  producer.beginTransaction();
  activeTransaction = true;
}

代码示例来源:origin: org.apache.nifi/nifi-kafka-1-0-processors

void beginTransaction() {
  if (!useTransactions) {
    return;
  }
  if (!transactionsInitialized) {
    producer.initTransactions();
    transactionsInitialized = true;
  }
  producer.beginTransaction();
  activeTransaction = true;
}

代码示例来源:origin: org.axonframework.extensions.kafka/axon-kafka

private Producer<K, V> createTransactionalProducer() {
  Producer<K, V> producer = this.cache.poll();
  if (producer != null) {
    return producer;
  }
  Map<String, Object> configs = new HashMap<>(this.configuration);
  configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
        this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
  producer = new CloseLazyProducer<>(createKafkaProducer(configs), cache, closeTimeout, timeUnit);
  producer.initTransactions();
  return producer;
}

代码示例来源:origin: org.axonframework/axon-kafka

private Producer<K, V> createTransactionalProducer() {
  Producer<K, V> producer = this.cache.poll();
  if (producer != null) {
    return producer;
  }
  Map<String, Object> configs = new HashMap<>(this.configs);
  configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
        this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
  producer = new CloseLazyProducer<>(createKafkaProducer(configs), cache, closeTimeout, unit);
  producer.initTransactions();
  return producer;
}

相关文章