org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(12.4k)|赞(0)|评价(0)|浏览(180)

本文整理了Java中org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler类的一些代码示例,展示了ZookeeperOffsetHandler类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZookeeperOffsetHandler类的具体详情如下:
包路径:org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler
类名称:ZookeeperOffsetHandler

ZookeeperOffsetHandler介绍

[英]Handler for committing Kafka offsets to Zookeeper and to retrieve them again.
[中]将卡夫卡偏移提交给Zookeeper并再次检索它们的处理程序。

代码示例

代码示例来源:origin: apache/flink

final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig);
this.zookeeperOffsetHandler = zookeeperOffsetHandler;
      Long committedOffset = zookeeperOffsetHandler.getCommittedOffset(partition.getKafkaTopicPartition());
      if (committedOffset != null) {
    zookeeperOffsetHandler.close();

代码示例来源:origin: apache/flink

/**
 * @param partition The partition to read offset for.
 * @return The mapping from partition to offset.
 * @throws Exception This method forwards exceptions.
 */
public Long getCommittedOffset(KafkaTopicPartition partition) throws Exception {
  return getOffsetFromZooKeeper(curatorClient, groupId, partition.getTopic(), partition.getPartition());
}

代码示例来源:origin: apache/flink

@Override
public void run() {
  try {
    while (running) {
      Thread.sleep(commitInterval);
      // create copy a deep copy of the current offsets
      HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
      for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
        offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
      }
      offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
    }
  }
  catch (Throwable t) {
    if (running) {
      errorHandler.reportError(
          new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
    }
  }
}

代码示例来源:origin: apache/flink

/**
 * Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of
 * the last processed records; this method will take care of incrementing the offsets by 1 before committing them so
 * that the committed offsets to Zookeeper represent the next record to process.
 *
 * @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit.
 * @throws Exception The method forwards exceptions.
 */
public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception {
  for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) {
    KafkaTopicPartition tp = entry.getKey();
    Long lastProcessedOffset = entry.getValue();
    if (lastProcessedOffset != null && lastProcessedOffset >= 0) {
      setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10

/**
 * Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of
 * the last processed records; this method will take care of incrementing the offsets by 1 before committing them so
 * that the committed offsets to Zookeeper represent the next record to process.
 * 
 * @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit.
 * @throws Exception The method forwards exceptions.
 */
public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception {
  for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) {
    KafkaTopicPartition tp = entry.getKey();
    Long lastProcessedOffset = entry.getValue();
    if (lastProcessedOffset != null && lastProcessedOffset >= 0) {
      setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11

final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig);
this.zookeeperOffsetHandler = zookeeperOffsetHandler;
      Long committedOffset = zookeeperOffsetHandler.getCommittedOffset(partition.getKafkaTopicPartition());
      if (committedOffset != null) {
    zookeeperOffsetHandler.close();

代码示例来源:origin: apache/flink

@Override
protected void doCommitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception {
  ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
  if (zkHandler != null) {
    try {
      // the ZK handler takes care of incrementing the offsets by 1 before committing
      zkHandler.prepareAndCommitOffsets(offsets);
      commitCallback.onSuccess();
    }
    catch (Exception e) {
      if (running) {
        commitCallback.onException(e);
        throw e;
      } else {
        return;
      }
    }
  }
  // Set committed offsets in topic partition state
  for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
    Long offset = offsets.get(partition.getKafkaTopicPartition());
    if (offset != null) {
      partition.setCommittedOffset(offset);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8

/**
 * @param partition The partition to read offset for.
 * @return The mapping from partition to offset.
 * @throws Exception This method forwards exceptions.
 */
public Long getCommittedOffset(KafkaTopicPartition partition) throws Exception {
  return getOffsetFromZooKeeper(curatorClient, groupId, partition.getTopic(), partition.getPartition());
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11

/**
 * Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of
 * the last processed records; this method will take care of incrementing the offsets by 1 before committing them so
 * that the committed offsets to Zookeeper represent the next record to process.
 *
 * @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit.
 * @throws Exception The method forwards exceptions.
 */
public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception {
  for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) {
    KafkaTopicPartition tp = entry.getKey();
    Long lastProcessedOffset = entry.getValue();
    if (lastProcessedOffset != null && lastProcessedOffset >= 0) {
      setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8

final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig);
this.zookeeperOffsetHandler = zookeeperOffsetHandler;
      Long committedOffset = zookeeperOffsetHandler.getCommittedOffset(partition.getKafkaTopicPartition());
      if (committedOffset != null) {
    zookeeperOffsetHandler.close();

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11

@Override
public void run() {
  try {
    while (running) {
      Thread.sleep(commitInterval);
      // create copy a deep copy of the current offsets
      HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
      for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
        offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
      }
      offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
    }
  }
  catch (Throwable t) {
    if (running) {
      errorHandler.reportError(
          new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10

/**
 * @param partition The partition to read offset for.
 * @return The mapping from partition to offset.
 * @throws Exception This method forwards exceptions.
 */
public Long getCommittedOffset(KafkaTopicPartition partition) throws Exception {
  return getOffsetFromZooKeeper(curatorClient, groupId, partition.getTopic(), partition.getPartition());
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8

/**
 * Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of
 * the last processed records; this method will take care of incrementing the offsets by 1 before committing them so
 * that the committed offsets to Zookeeper represent the next record to process.
 *
 * @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit.
 * @throws Exception The method forwards exceptions.
 */
public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception {
  for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) {
    KafkaTopicPartition tp = entry.getKey();
    Long lastProcessedOffset = entry.getValue();
    if (lastProcessedOffset != null && lastProcessedOffset >= 0) {
      setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10

final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig);
this.zookeeperOffsetHandler = zookeeperOffsetHandler;
      Long committedOffset = zookeeperOffsetHandler.getCommittedOffset(partition.getKafkaTopicPartition());
      if (committedOffset != null) {
    zookeeperOffsetHandler.close();

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8

@Override
public void run() {
  try {
    while (running) {
      Thread.sleep(commitInterval);
      // create copy a deep copy of the current offsets
      HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
      for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
        offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
      }
      offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
    }
  }
  catch (Throwable t) {
    if (running) {
      errorHandler.reportError(
          new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11

/**
 * @param partition The partition to read offset for.
 * @return The mapping from partition to offset.
 * @throws Exception This method forwards exceptions.
 */
public Long getCommittedOffset(KafkaTopicPartition partition) throws Exception {
  return getOffsetFromZooKeeper(curatorClient, groupId, partition.getTopic(), partition.getPartition());
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10

@Override
public void run() {
  try {
    while (running) {
      Thread.sleep(commitInterval);
      // create copy a deep copy of the current offsets
      HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.length);
      for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
        offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
      }
      
      offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
    }
  }
  catch (Throwable t) {
    if (running) {
      errorHandler.reportError(
          new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8

@Override
protected void doCommitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception {
  ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
  if (zkHandler != null) {
    try {
      // the ZK handler takes care of incrementing the offsets by 1 before committing
      zkHandler.prepareAndCommitOffsets(offsets);
      commitCallback.onSuccess();
    }
    catch (Exception e) {
      if (running) {
        commitCallback.onException(e);
        throw e;
      } else {
        return;
      }
    }
  }
  // Set committed offsets in topic partition state
  for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
    Long offset = offsets.get(partition.getKafkaTopicPartition());
    if (offset != null) {
      partition.setCommittedOffset(offset);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11

@Override
protected void doCommitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception {
  ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
  if (zkHandler != null) {
    try {
      // the ZK handler takes care of incrementing the offsets by 1 before committing
      zkHandler.prepareAndCommitOffsets(offsets);
      commitCallback.onSuccess();
    }
    catch (Exception e) {
      if (running) {
        commitCallback.onException(e);
        throw e;
      } else {
        return;
      }
    }
  }
  // Set committed offsets in topic partition state
  for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
    Long offset = offsets.get(partition.getKafkaTopicPartition());
    if (offset != null) {
      partition.setCommittedOffset(offset);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10

@Override
public void commitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception {
  ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
  if (zkHandler != null) {
    try {
      // the ZK handler takes care of incrementing the offsets by 1 before committing
      zkHandler.prepareAndCommitOffsets(offsets);
      commitCallback.onSuccess();
    }
    catch (Exception e) {
      if (running) {
        commitCallback.onException(e);
        throw e;
      } else {
        return;
      }
    }
  }
  // Set committed offsets in topic partition state
  KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitionStates();
  for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
    Long offset = offsets.get(partition.getKafkaTopicPartition());
    if (offset != null) {
      partition.setCommittedOffset(offset);
    }
  }
}

相关文章