org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets()方法的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(6.5k)|赞(0)|评价(0)|浏览(153)

本文整理了Java中org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets()方法的一些代码示例,展示了ZookeeperOffsetHandler.prepareAndCommitOffsets()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZookeeperOffsetHandler.prepareAndCommitOffsets()方法的具体详情如下:
包路径:org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler
类名称:ZookeeperOffsetHandler
方法名:prepareAndCommitOffsets

ZookeeperOffsetHandler.prepareAndCommitOffsets介绍

[英]Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of the last processed records; this method will take care of incrementing the offsets by 1 before committing them so that the committed offsets to Zookeeper represent the next record to process.
[中]将Kafka分区的偏移提交给ZooKeeper。此方法的给定偏移量应该是最后处理的记录的偏移量;此方法将在提交偏移量之前将偏移量增加1,以便提交给Zookeeper的偏移量代表下一个要处理的记录。

代码示例

代码示例来源:origin: apache/flink

@Override
public void run() {
  try {
    while (running) {
      Thread.sleep(commitInterval);
      // create copy a deep copy of the current offsets
      HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
      for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
        offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
      }
      offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
    }
  }
  catch (Throwable t) {
    if (running) {
      errorHandler.reportError(
          new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
    }
  }
}

代码示例来源:origin: apache/flink

@Override
protected void doCommitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception {
  ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
  if (zkHandler != null) {
    try {
      // the ZK handler takes care of incrementing the offsets by 1 before committing
      zkHandler.prepareAndCommitOffsets(offsets);
      commitCallback.onSuccess();
    }
    catch (Exception e) {
      if (running) {
        commitCallback.onException(e);
        throw e;
      } else {
        return;
      }
    }
  }
  // Set committed offsets in topic partition state
  for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
    Long offset = offsets.get(partition.getKafkaTopicPartition());
    if (offset != null) {
      partition.setCommittedOffset(offset);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11

@Override
public void run() {
  try {
    while (running) {
      Thread.sleep(commitInterval);
      // create copy a deep copy of the current offsets
      HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
      for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
        offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
      }
      offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
    }
  }
  catch (Throwable t) {
    if (running) {
      errorHandler.reportError(
          new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8

@Override
public void run() {
  try {
    while (running) {
      Thread.sleep(commitInterval);
      // create copy a deep copy of the current offsets
      HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
      for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
        offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
      }
      offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
    }
  }
  catch (Throwable t) {
    if (running) {
      errorHandler.reportError(
          new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10

@Override
public void run() {
  try {
    while (running) {
      Thread.sleep(commitInterval);
      // create copy a deep copy of the current offsets
      HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.length);
      for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
        offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
      }
      
      offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
    }
  }
  catch (Throwable t) {
    if (running) {
      errorHandler.reportError(
          new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8

@Override
protected void doCommitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception {
  ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
  if (zkHandler != null) {
    try {
      // the ZK handler takes care of incrementing the offsets by 1 before committing
      zkHandler.prepareAndCommitOffsets(offsets);
      commitCallback.onSuccess();
    }
    catch (Exception e) {
      if (running) {
        commitCallback.onException(e);
        throw e;
      } else {
        return;
      }
    }
  }
  // Set committed offsets in topic partition state
  for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
    Long offset = offsets.get(partition.getKafkaTopicPartition());
    if (offset != null) {
      partition.setCommittedOffset(offset);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10

@Override
public void commitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception {
  ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
  if (zkHandler != null) {
    try {
      // the ZK handler takes care of incrementing the offsets by 1 before committing
      zkHandler.prepareAndCommitOffsets(offsets);
      commitCallback.onSuccess();
    }
    catch (Exception e) {
      if (running) {
        commitCallback.onException(e);
        throw e;
      } else {
        return;
      }
    }
  }
  // Set committed offsets in topic partition state
  KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitionStates();
  for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
    Long offset = offsets.get(partition.getKafkaTopicPartition());
    if (offset != null) {
      partition.setCommittedOffset(offset);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11

@Override
protected void doCommitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception {
  ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
  if (zkHandler != null) {
    try {
      // the ZK handler takes care of incrementing the offsets by 1 before committing
      zkHandler.prepareAndCommitOffsets(offsets);
      commitCallback.onSuccess();
    }
    catch (Exception e) {
      if (running) {
        commitCallback.onException(e);
        throw e;
      } else {
        return;
      }
    }
  }
  // Set committed offsets in topic partition state
  for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
    Long offset = offsets.get(partition.getKafkaTopicPartition());
    if (offset != null) {
      partition.setCommittedOffset(offset);
    }
  }
}

相关文章