org.apache.kafka.clients.producer.Producer.initTransactions()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(7.8k)|赞(0)|评价(0)|浏览(165)

本文整理了Java中org.apache.kafka.clients.producer.Producer.initTransactions()方法的一些代码示例,展示了Producer.initTransactions()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.initTransactions()方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:initTransactions

Producer.initTransactions介绍

[英]See KafkaProducer#initTransactions()
[中]参见《卡夫卡制作人》#initTransactions()

代码示例

代码示例来源:origin: openzipkin/brave

  1. @Override public void initTransactions() {
  2. delegate.initTransactions();
  3. }

代码示例来源:origin: apache/nifi

  1. void beginTransaction() {
  2. if (!useTransactions) {
  3. return;
  4. }
  5. if (!transactionsInitialized) {
  6. producer.initTransactions();
  7. transactionsInitialized = true;
  8. }
  9. producer.beginTransaction();
  10. activeTransaction = true;
  11. }

代码示例来源:origin: apache/nifi

  1. void beginTransaction() {
  2. if (!useTransactions) {
  3. return;
  4. }
  5. if (!transactionsInitialized) {
  6. producer.initTransactions();
  7. transactionsInitialized = true;
  8. }
  9. producer.beginTransaction();
  10. activeTransaction = true;
  11. }

代码示例来源:origin: apache/nifi

  1. void beginTransaction() {
  2. if (!useTransactions) {
  3. return;
  4. }
  5. if (!transactionsInitialized) {
  6. producer.initTransactions();
  7. transactionsInitialized = true;
  8. }
  9. producer.beginTransaction();
  10. activeTransaction = true;
  11. }

代码示例来源:origin: alibaba/canal

  1. producer.initTransactions();
  2. } else {
  3. producer2.initTransactions();

代码示例来源:origin: apache/kafka

  1. @Test(expected = TimeoutException.class)
  2. public void testInitTransactionTimeout() {
  3. Map<String, Object> configs = new HashMap<>();
  4. configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
  5. configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
  6. configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
  7. Time time = new MockTime(1);
  8. MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
  9. Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
  10. metadata.update(initialUpdateResponse, time.milliseconds());
  11. MockClient client = new MockClient(time, metadata);
  12. try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
  13. new StringSerializer(), metadata, client, null, time)) {
  14. producer.initTransactions();
  15. fail("initTransactions() should have raised TimeoutException");
  16. }
  17. }

代码示例来源:origin: apache/kafka

  1. @Test(expected = KafkaException.class)
  2. public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
  3. Map<String, Object> configs = new HashMap<>();
  4. configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
  5. configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
  6. configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
  7. Time time = new MockTime();
  8. MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
  9. Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
  10. metadata.update(initialUpdateResponse, time.milliseconds());
  11. MockClient client = new MockClient(time, metadata);
  12. Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
  13. metadata, client, null, time);
  14. try {
  15. producer.initTransactions();
  16. } catch (TimeoutException e) {
  17. // expected
  18. }
  19. // other transactional operations should not be allowed if we catch the error after initTransactions failed
  20. try {
  21. producer.beginTransaction();
  22. } finally {
  23. producer.close(Duration.ofMillis(0));
  24. }
  25. }

代码示例来源:origin: spring-projects/spring-kafka

  1. @Override
  2. public void initTransactions() {
  3. this.delegate.initTransactions();
  4. }

代码示例来源:origin: spring-projects/spring-kafka

  1. private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
  2. Producer<K, V> newProducer;
  3. Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
  4. newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
  5. newProducer = new KafkaProducer<K, V>(newProducerConfigs, this.keySerializer, this.valueSerializer);
  6. newProducer.initTransactions();
  7. return new CloseSafeProducer<K, V>(newProducer, this.cache, remover,
  8. (String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
  9. }

代码示例来源:origin: spring-projects/spring-kafka

  1. @Override
  2. protected Producer createTransactionalProducer() {
  3. producer.initTransactions();
  4. BlockingQueue<Producer> cache = getCache();
  5. Producer cached = cache.poll();
  6. return cached == null ? new CloseSafeProducer(producer, cache) : cached;
  7. }

代码示例来源:origin: spring-projects/spring-kafka

  1. @Override
  2. protected Producer createTransactionalProducer() {
  3. producer.initTransactions();
  4. BlockingQueue<Producer> cache = getCache();
  5. Producer cached = cache.poll();
  6. return cached == null ? new CloseSafeProducer(producer, cache) : cached;
  7. }

代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client

  1. @Override
  2. public void initTransactions() {
  3. producer.initTransactions();
  4. }

代码示例来源:origin: com.github.combinedmq/combinedmq

  1. @Override
  2. public void initTransactions() {
  3. producer.initTransactions();
  4. }

代码示例来源:origin: spring-projects/spring-kafka

  1. @Test
  2. public void testDeadLetterPublisherWhileTransactionActive() {
  3. @SuppressWarnings("unchecked")
  4. Producer<Object, Object> producer1 = mock(Producer.class);
  5. @SuppressWarnings("unchecked")
  6. Producer<Object, Object> producer2 = mock(Producer.class);
  7. producer1.initTransactions();
  8. @SuppressWarnings("unchecked")
  9. ProducerFactory<Object, Object> pf = mock(ProducerFactory.class);
  10. given(pf.transactionCapable()).willReturn(true);
  11. given(pf.createProducer()).willReturn(producer1).willReturn(producer2);
  12. KafkaTemplate<Object, Object> template = spy(new KafkaTemplate<>(pf));
  13. template.setDefaultTopic(STRING_KEY_TOPIC);
  14. KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
  15. new TransactionTemplate(tm).execute(s -> {
  16. new DeadLetterPublishingRecoverer(template).accept(
  17. new ConsumerRecord<>(STRING_KEY_TOPIC, 0, 0L, "key", "foo"),
  18. new RuntimeException("foo"));
  19. return null;
  20. });
  21. verify(producer1).beginTransaction();
  22. verify(producer1).commitTransaction();
  23. verify(producer1).close();
  24. verify(producer2, never()).beginTransaction();
  25. verify(template, never()).executeInTransaction(any());
  26. }

代码示例来源:origin: spring-projects/spring-kafka

  1. @SuppressWarnings("unchecked")
  2. Producer<Object, Object> producer2 = mock(Producer.class);
  3. producer1.initTransactions();
  4. AtomicBoolean first = new AtomicBoolean(true);

代码示例来源:origin: spring-projects/spring-kafka

  1. inOrder.verify(producer).initTransactions();
  2. inOrder.verify(producer).beginTransaction();
  3. inOrder.verify(producer).send(any(), any());

代码示例来源:origin: org.apache.nifi/nifi-kafka-2-0-processors

  1. void beginTransaction() {
  2. if (!useTransactions) {
  3. return;
  4. }
  5. if (!transactionsInitialized) {
  6. producer.initTransactions();
  7. transactionsInitialized = true;
  8. }
  9. producer.beginTransaction();
  10. activeTransaction = true;
  11. }

代码示例来源:origin: org.apache.nifi/nifi-kafka-1-0-processors

  1. void beginTransaction() {
  2. if (!useTransactions) {
  3. return;
  4. }
  5. if (!transactionsInitialized) {
  6. producer.initTransactions();
  7. transactionsInitialized = true;
  8. }
  9. producer.beginTransaction();
  10. activeTransaction = true;
  11. }

代码示例来源:origin: org.axonframework.extensions.kafka/axon-kafka

  1. private Producer<K, V> createTransactionalProducer() {
  2. Producer<K, V> producer = this.cache.poll();
  3. if (producer != null) {
  4. return producer;
  5. }
  6. Map<String, Object> configs = new HashMap<>(this.configuration);
  7. configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
  8. this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
  9. producer = new CloseLazyProducer<>(createKafkaProducer(configs), cache, closeTimeout, timeUnit);
  10. producer.initTransactions();
  11. return producer;
  12. }

代码示例来源:origin: org.axonframework/axon-kafka

  1. private Producer<K, V> createTransactionalProducer() {
  2. Producer<K, V> producer = this.cache.poll();
  3. if (producer != null) {
  4. return producer;
  5. }
  6. Map<String, Object> configs = new HashMap<>(this.configs);
  7. configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
  8. this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
  9. producer = new CloseLazyProducer<>(createKafkaProducer(configs), cache, closeTimeout, unit);
  10. producer.initTransactions();
  11. return producer;
  12. }

相关文章