本文整理了Java中org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets()
方法的一些代码示例,展示了ZookeeperOffsetHandler.prepareAndCommitOffsets()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZookeeperOffsetHandler.prepareAndCommitOffsets()
方法的具体详情如下:
包路径:org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler
类名称:ZookeeperOffsetHandler
方法名:prepareAndCommitOffsets
[英]Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of the last processed records; this method will take care of incrementing the offsets by 1 before committing them so that the committed offsets to Zookeeper represent the next record to process.
[中]将Kafka分区的偏移提交给ZooKeeper。此方法的给定偏移量应该是最后处理的记录的偏移量;此方法将在提交偏移量之前将偏移量增加1,以便提交给Zookeeper的偏移量代表下一个要处理的记录。
代码示例来源:origin: apache/flink
@Override
public void run() {
try {
while (running) {
Thread.sleep(commitInterval);
// create copy a deep copy of the current offsets
HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
}
offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
}
}
catch (Throwable t) {
if (running) {
errorHandler.reportError(
new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
}
}
}
代码示例来源:origin: apache/flink
@Override
protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
try {
// the ZK handler takes care of incrementing the offsets by 1 before committing
zkHandler.prepareAndCommitOffsets(offsets);
commitCallback.onSuccess();
}
catch (Exception e) {
if (running) {
commitCallback.onException(e);
throw e;
} else {
return;
}
}
}
// Set committed offsets in topic partition state
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
Long offset = offsets.get(partition.getKafkaTopicPartition());
if (offset != null) {
partition.setCommittedOffset(offset);
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11
@Override
public void run() {
try {
while (running) {
Thread.sleep(commitInterval);
// create copy a deep copy of the current offsets
HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
}
offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
}
}
catch (Throwable t) {
if (running) {
errorHandler.reportError(
new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8
@Override
public void run() {
try {
while (running) {
Thread.sleep(commitInterval);
// create copy a deep copy of the current offsets
HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
}
offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
}
}
catch (Throwable t) {
if (running) {
errorHandler.reportError(
new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10
@Override
public void run() {
try {
while (running) {
Thread.sleep(commitInterval);
// create copy a deep copy of the current offsets
HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.length);
for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
}
offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
}
}
catch (Throwable t) {
if (running) {
errorHandler.reportError(
new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8
@Override
protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
try {
// the ZK handler takes care of incrementing the offsets by 1 before committing
zkHandler.prepareAndCommitOffsets(offsets);
commitCallback.onSuccess();
}
catch (Exception e) {
if (running) {
commitCallback.onException(e);
throw e;
} else {
return;
}
}
}
// Set committed offsets in topic partition state
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
Long offset = offsets.get(partition.getKafkaTopicPartition());
if (offset != null) {
partition.setCommittedOffset(offset);
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10
@Override
public void commitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
try {
// the ZK handler takes care of incrementing the offsets by 1 before committing
zkHandler.prepareAndCommitOffsets(offsets);
commitCallback.onSuccess();
}
catch (Exception e) {
if (running) {
commitCallback.onException(e);
throw e;
} else {
return;
}
}
}
// Set committed offsets in topic partition state
KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitionStates();
for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
Long offset = offsets.get(partition.getKafkaTopicPartition());
if (offset != null) {
partition.setCommittedOffset(offset);
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11
@Override
protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
try {
// the ZK handler takes care of incrementing the offsets by 1 before committing
zkHandler.prepareAndCommitOffsets(offsets);
commitCallback.onSuccess();
}
catch (Exception e) {
if (running) {
commitCallback.onException(e);
throw e;
} else {
return;
}
}
}
// Set committed offsets in topic partition state
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
Long offset = offsets.get(partition.getKafkaTopicPartition());
if (offset != null) {
partition.setCommittedOffset(offset);
}
}
}
内容来源于网络,如有侵权,请联系作者删除!