org.apache.kafka.clients.producer.Producer.commitTransaction()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(13.7k)|赞(0)|评价(0)|浏览(193)

本文整理了Java中org.apache.kafka.clients.producer.Producer.commitTransaction()方法的一些代码示例,展示了Producer.commitTransaction()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.commitTransaction()方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:commitTransaction

Producer.commitTransaction介绍

[英]See KafkaProducer#commitTransaction()
[中]参见卡夫卡制作人#commitTransaction()

代码示例

代码示例来源:origin: openzipkin/brave

@Override public void commitTransaction() {
 delegate.commitTransaction();
}

代码示例来源:origin: alibaba/canal

producerTmp.commitTransaction();

代码示例来源:origin: apache/nifi

public PublishResult complete() {
  if (tracker == null) {
    if (messagesSent.get() == 0L) {
      return PublishResult.EMPTY;
    }
    rollback();
    throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  }
  producer.flush();
  if (activeTransaction) {
    producer.commitTransaction();
    activeTransaction = false;
  }
  try {
    tracker.awaitCompletion(maxAckWaitMillis);
    return tracker.createPublishResult();
  } catch (final InterruptedException e) {
    logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    Thread.currentThread().interrupt();
    return tracker.failOutstanding(e);
  } catch (final TimeoutException e) {
    logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    return tracker.failOutstanding(e);
  } finally {
    tracker = null;
  }
}

代码示例来源:origin: apache/nifi

public PublishResult complete() {
  if (tracker == null) {
    if (messagesSent.get() == 0L) {
      return PublishResult.EMPTY;
    }
    rollback();
    throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  }
  producer.flush();
  if (activeTransaction) {
    producer.commitTransaction();
    activeTransaction = false;
  }
  try {
    tracker.awaitCompletion(maxAckWaitMillis);
    return tracker.createPublishResult();
  } catch (final InterruptedException e) {
    logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    Thread.currentThread().interrupt();
    return tracker.failOutstanding(e);
  } catch (final TimeoutException e) {
    logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    return tracker.failOutstanding(e);
  } finally {
    tracker = null;
  }
}

代码示例来源:origin: apache/nifi

public PublishResult complete() {
  if (tracker == null) {
    if (messagesSent.get() == 0L) {
      return PublishResult.EMPTY;
    }
    rollback();
    throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  }
  producer.flush();
  if (activeTransaction) {
    producer.commitTransaction();
    activeTransaction = false;
  }
  try {
    tracker.awaitCompletion(maxAckWaitMillis);
    return tracker.createPublishResult();
  } catch (final InterruptedException e) {
    logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    Thread.currentThread().interrupt();
    return tracker.failOutstanding(e);
  } catch (final TimeoutException e) {
    logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    return tracker.failOutstanding(e);
  } finally {
    tracker = null;
  }
}

代码示例来源:origin: spring-projects/spring-kafka

public void commit() {
  this.producer.commitTransaction();
}

代码示例来源:origin: spring-projects/spring-kafka

@Override
public void commitTransaction() throws ProducerFencedException {
  if (logger.isDebugEnabled()) {
    logger.debug("commitTransaction: " + this);
  }
  try {
    this.delegate.commitTransaction();
  }
  catch (RuntimeException e) {
    if (logger.isErrorEnabled()) {
      logger.error("commitTransaction failed: " + this, e);
    }
    this.txFailed = true;
    throw e;
  }
}

代码示例来源:origin: spring-projects/spring-kafka

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testDeclarative() {
  AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(DeclarativeConfig.class);
  Tx1 tx1 = ctx.getBean(Tx1.class);
  tx1.txMethod();
  ProducerFactory producerFactory = ctx.getBean(ProducerFactory.class);
  verify(producerFactory, times(2)).createProducer();
  Producer producer1 = ctx.getBean("producer1", Producer.class);
  Producer producer2 = ctx.getBean("producer1", Producer.class);
  InOrder inOrder = inOrder(producer1, producer2);
  inOrder.verify(producer1).beginTransaction();
  inOrder.verify(producer1).send(eq(new ProducerRecord("foo", "bar")), any(Callback.class));
  inOrder.verify(producer1).send(eq(new ProducerRecord("baz", "qux")), any(Callback.class));
  inOrder.verify(producer2).beginTransaction();
  inOrder.verify(producer2).send(eq(new ProducerRecord("fiz", "buz")), any(Callback.class));
  inOrder.verify(producer2).commitTransaction();
  inOrder.verify(producer1).commitTransaction();
  ctx.close();
}

代码示例来源:origin: spring-projects/spring-kafka

@SuppressWarnings("unchecked")
@Test
public void discardRemainingRecordsFromPollAndSeek() throws Exception {
  assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
  assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
  this.registry.stop();
  assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
  InOrder inOrder = inOrder(this.consumer, this.producer);
  inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
  inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
  inOrder.verify(this.producer).beginTransaction();
  inOrder.verify(this.consumer).seek(new TopicPartition("foo", 0), 0L);
  inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 0L);
  inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 0L);
  inOrder.verify(this.producer).abortTransaction();
  inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
  inOrder.verify(this.producer).beginTransaction();
  Map<TopicPartition, OffsetAndMetadata> offsets = new LinkedHashMap<>();
  offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L));
  offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
  offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
  inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
  inOrder.verify(this.producer).commitTransaction();
  assertThat(this.config.ehException).isInstanceOf(ListenerExecutionFailedException.class);
  assertThat(((ListenerExecutionFailedException) this.config.ehException).getGroupId()).isEqualTo(CONTAINER_ID);
}

代码示例来源:origin: spring-projects/spring-kafka

T result = callback.doInOperations(this);
try {
  producer.commitTransaction();

代码示例来源:origin: spring-projects/spring-kafka

offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(1L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 1L);
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 0L);
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(1L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
assertThat(this.config.count).isEqualTo(7);

代码示例来源:origin: spring-projects/spring-kafka

offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(1L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 1L);
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 0L);
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(1L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
assertThat(this.config.count).isEqualTo(7);

代码示例来源:origin: spring-projects/spring-kafka

inOrder.verify(producer1).beginTransaction();
inOrder.verify(producer2).beginTransaction();
inOrder.verify(producer2).commitTransaction();
inOrder.verify(producer2).close();
inOrder.verify(producer1).commitTransaction();
inOrder.verify(producer1).close();

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testDeadLetterPublisherWhileTransactionActive() {
  @SuppressWarnings("unchecked")
  Producer<Object, Object> producer1 = mock(Producer.class);
  @SuppressWarnings("unchecked")
  Producer<Object, Object> producer2 = mock(Producer.class);
  producer1.initTransactions();
  @SuppressWarnings("unchecked")
  ProducerFactory<Object, Object> pf = mock(ProducerFactory.class);
  given(pf.transactionCapable()).willReturn(true);
  given(pf.createProducer()).willReturn(producer1).willReturn(producer2);
  KafkaTemplate<Object, Object> template = spy(new KafkaTemplate<>(pf));
  template.setDefaultTopic(STRING_KEY_TOPIC);
  KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
  new TransactionTemplate(tm).execute(s -> {
    new DeadLetterPublishingRecoverer(template).accept(
        new ConsumerRecord<>(STRING_KEY_TOPIC, 0, 0L, "key", "foo"),
        new RuntimeException("foo"));
    return null;
  });
  verify(producer1).beginTransaction();
  verify(producer1).commitTransaction();
  verify(producer1).close();
  verify(producer2, never()).beginTransaction();
  verify(template, never()).executeInTransaction(any());
}

代码示例来源:origin: spring-projects/spring-kafka

inOrder.verify(producer).beginTransaction();
inOrder.verify(producer).send(any(), any());
inOrder.verify(producer).commitTransaction();
inOrder.verify(producer).beginTransaction();
inOrder.verify(producer).close();

代码示例来源:origin: spring-projects/spring-kafka

inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
    new OffsetAndMetadata(0)), "group");
inOrder.verify(producer).commitTransaction();
inOrder.verify(producer).close();
inOrder.verify(producer).beginTransaction();
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
    new OffsetAndMetadata(1)), "group");
inOrder.verify(producer).commitTransaction();
inOrder.verify(producer).close();
container.stop();

代码示例来源:origin: spring-projects/spring-kafka

assertThat(captor.getValue()).isEqualTo(new ProducerRecord("bar", "baz"));
inOrder.verify(producer, never()).sendOffsetsToTransaction(anyMap(), anyString());
inOrder.verify(producer, never()).commitTransaction();
inOrder.verify(producer).abortTransaction();
inOrder.verify(producer).close();

代码示例来源:origin: spring-projects/spring-kafka

assertThat(captor.getValue()).isEqualTo(new ProducerRecord("bar", "baz"));
inOrder.verify(producer, never()).sendOffsetsToTransaction(anyMap(), anyString());
inOrder.verify(producer, never()).commitTransaction();
inOrder.verify(producer).abortTransaction();
inOrder.verify(producer).close();

代码示例来源:origin: spring-projects/spring-kafka

inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
    new OffsetAndMetadata(1)), "group");
inOrder.verify(producer).commitTransaction();
inOrder.verify(producer).close();
container.stop();

代码示例来源:origin: org.axonframework.extensions.kafka/axon-kafka

private void tryCommit(Producer<?, ?> producer,
            Map<? super EventMessage<?>, MonitorCallback> monitorCallbacks) {
  try {
    producer.commitTransaction();
    monitorCallbacks.forEach((k, v) -> v.reportSuccess());
  } catch (ProducerFencedException e) {
    logger.warn("Unable to commit transaction", e);
    monitorCallbacks.forEach((k, v) -> v.reportFailure(e));
    throw new EventPublicationFailedException(
        "Event publication failed: Exception occurred while committing kafka transaction",
        e);
  }
}

相关文章