本文整理了Java中org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler
类的一些代码示例,展示了ZookeeperOffsetHandler
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZookeeperOffsetHandler
类的具体详情如下:
包路径:org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler
类名称:ZookeeperOffsetHandler
[英]Handler for committing Kafka offsets to Zookeeper and to retrieve them again.
[中]将卡夫卡偏移提交给Zookeeper并再次检索它们的处理程序。
代码示例来源:origin: apache/flink
final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig);
this.zookeeperOffsetHandler = zookeeperOffsetHandler;
Long committedOffset = zookeeperOffsetHandler.getCommittedOffset(partition.getKafkaTopicPartition());
if (committedOffset != null) {
zookeeperOffsetHandler.close();
代码示例来源:origin: apache/flink
/**
* @param partition The partition to read offset for.
* @return The mapping from partition to offset.
* @throws Exception This method forwards exceptions.
*/
public Long getCommittedOffset(KafkaTopicPartition partition) throws Exception {
return getOffsetFromZooKeeper(curatorClient, groupId, partition.getTopic(), partition.getPartition());
}
代码示例来源:origin: apache/flink
@Override
public void run() {
try {
while (running) {
Thread.sleep(commitInterval);
// create copy a deep copy of the current offsets
HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
}
offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
}
}
catch (Throwable t) {
if (running) {
errorHandler.reportError(
new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
}
}
}
代码示例来源:origin: apache/flink
/**
* Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of
* the last processed records; this method will take care of incrementing the offsets by 1 before committing them so
* that the committed offsets to Zookeeper represent the next record to process.
*
* @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit.
* @throws Exception The method forwards exceptions.
*/
public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception {
for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) {
KafkaTopicPartition tp = entry.getKey();
Long lastProcessedOffset = entry.getValue();
if (lastProcessedOffset != null && lastProcessedOffset >= 0) {
setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10
/**
* Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of
* the last processed records; this method will take care of incrementing the offsets by 1 before committing them so
* that the committed offsets to Zookeeper represent the next record to process.
*
* @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit.
* @throws Exception The method forwards exceptions.
*/
public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception {
for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) {
KafkaTopicPartition tp = entry.getKey();
Long lastProcessedOffset = entry.getValue();
if (lastProcessedOffset != null && lastProcessedOffset >= 0) {
setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11
final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig);
this.zookeeperOffsetHandler = zookeeperOffsetHandler;
Long committedOffset = zookeeperOffsetHandler.getCommittedOffset(partition.getKafkaTopicPartition());
if (committedOffset != null) {
zookeeperOffsetHandler.close();
代码示例来源:origin: apache/flink
@Override
protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
try {
// the ZK handler takes care of incrementing the offsets by 1 before committing
zkHandler.prepareAndCommitOffsets(offsets);
commitCallback.onSuccess();
}
catch (Exception e) {
if (running) {
commitCallback.onException(e);
throw e;
} else {
return;
}
}
}
// Set committed offsets in topic partition state
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
Long offset = offsets.get(partition.getKafkaTopicPartition());
if (offset != null) {
partition.setCommittedOffset(offset);
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8
/**
* @param partition The partition to read offset for.
* @return The mapping from partition to offset.
* @throws Exception This method forwards exceptions.
*/
public Long getCommittedOffset(KafkaTopicPartition partition) throws Exception {
return getOffsetFromZooKeeper(curatorClient, groupId, partition.getTopic(), partition.getPartition());
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11
/**
* Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of
* the last processed records; this method will take care of incrementing the offsets by 1 before committing them so
* that the committed offsets to Zookeeper represent the next record to process.
*
* @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit.
* @throws Exception The method forwards exceptions.
*/
public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception {
for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) {
KafkaTopicPartition tp = entry.getKey();
Long lastProcessedOffset = entry.getValue();
if (lastProcessedOffset != null && lastProcessedOffset >= 0) {
setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8
final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig);
this.zookeeperOffsetHandler = zookeeperOffsetHandler;
Long committedOffset = zookeeperOffsetHandler.getCommittedOffset(partition.getKafkaTopicPartition());
if (committedOffset != null) {
zookeeperOffsetHandler.close();
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11
@Override
public void run() {
try {
while (running) {
Thread.sleep(commitInterval);
// create copy a deep copy of the current offsets
HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
}
offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
}
}
catch (Throwable t) {
if (running) {
errorHandler.reportError(
new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10
/**
* @param partition The partition to read offset for.
* @return The mapping from partition to offset.
* @throws Exception This method forwards exceptions.
*/
public Long getCommittedOffset(KafkaTopicPartition partition) throws Exception {
return getOffsetFromZooKeeper(curatorClient, groupId, partition.getTopic(), partition.getPartition());
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8
/**
* Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of
* the last processed records; this method will take care of incrementing the offsets by 1 before committing them so
* that the committed offsets to Zookeeper represent the next record to process.
*
* @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit.
* @throws Exception The method forwards exceptions.
*/
public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception {
for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) {
KafkaTopicPartition tp = entry.getKey();
Long lastProcessedOffset = entry.getValue();
if (lastProcessedOffset != null && lastProcessedOffset >= 0) {
setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10
final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig);
this.zookeeperOffsetHandler = zookeeperOffsetHandler;
Long committedOffset = zookeeperOffsetHandler.getCommittedOffset(partition.getKafkaTopicPartition());
if (committedOffset != null) {
zookeeperOffsetHandler.close();
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8
@Override
public void run() {
try {
while (running) {
Thread.sleep(commitInterval);
// create copy a deep copy of the current offsets
HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.size());
for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
}
offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
}
}
catch (Throwable t) {
if (running) {
errorHandler.reportError(
new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11
/**
* @param partition The partition to read offset for.
* @return The mapping from partition to offset.
* @throws Exception This method forwards exceptions.
*/
public Long getCommittedOffset(KafkaTopicPartition partition) throws Exception {
return getOffsetFromZooKeeper(curatorClient, groupId, partition.getTopic(), partition.getPartition());
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10
@Override
public void run() {
try {
while (running) {
Thread.sleep(commitInterval);
// create copy a deep copy of the current offsets
HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.length);
for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
}
offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
}
}
catch (Throwable t) {
if (running) {
errorHandler.reportError(
new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8
@Override
protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
try {
// the ZK handler takes care of incrementing the offsets by 1 before committing
zkHandler.prepareAndCommitOffsets(offsets);
commitCallback.onSuccess();
}
catch (Exception e) {
if (running) {
commitCallback.onException(e);
throw e;
} else {
return;
}
}
}
// Set committed offsets in topic partition state
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
Long offset = offsets.get(partition.getKafkaTopicPartition());
if (offset != null) {
partition.setCommittedOffset(offset);
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.11
@Override
protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
try {
// the ZK handler takes care of incrementing the offsets by 1 before committing
zkHandler.prepareAndCommitOffsets(offsets);
commitCallback.onSuccess();
}
catch (Exception e) {
if (running) {
commitCallback.onException(e);
throw e;
} else {
return;
}
}
}
// Set committed offsets in topic partition state
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
Long offset = offsets.get(partition.getKafkaTopicPartition());
if (offset != null) {
partition.setCommittedOffset(offset);
}
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.8_2.10
@Override
public void commitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
try {
// the ZK handler takes care of incrementing the offsets by 1 before committing
zkHandler.prepareAndCommitOffsets(offsets);
commitCallback.onSuccess();
}
catch (Exception e) {
if (running) {
commitCallback.onException(e);
throw e;
} else {
return;
}
}
}
// Set committed offsets in topic partition state
KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitionStates();
for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
Long offset = offsets.get(partition.getKafkaTopicPartition());
if (offset != null) {
partition.setCommittedOffset(offset);
}
}
}
内容来源于网络,如有侵权,请联系作者删除!