com.pinterest.secor.common.ZookeeperConnector类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(10.5k)|赞(0)|评价(0)|浏览(153)

本文整理了Java中com.pinterest.secor.common.ZookeeperConnector类的一些代码示例,展示了ZookeeperConnector类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZookeeperConnector类的具体详情如下:
包路径:com.pinterest.secor.common.ZookeeperConnector
类名称:ZookeeperConnector

ZookeeperConnector介绍

[英]ZookeeperConnector implements interactions with Zookeeper.
[中]ZookePerConnector实现与Zookeeper的交互。

代码示例

代码示例来源:origin: pinterest/secor

public static void main(String[] args) {
    try {
      CommandLine commandLine = parseArgs(args);
      String command = commandLine.getOptionValue("command");
      if (!command.equals("delete_committed_offsets")) {
        throw new IllegalArgumentException(
          "command has to be one of \"delete_committed_offsets\"");
      }
      SecorConfig config = SecorConfig.load();
      ZookeeperConnector zookeeperConnector = new ZookeeperConnector(config);
      String topic = commandLine.getOptionValue("topic");
      if (commandLine.hasOption("partition")) {
        int partition =
          ((Number) commandLine.getParsedOptionValue("partition")).intValue();
        TopicPartition topicPartition = new TopicPartition(topic, partition);
        zookeeperConnector.deleteCommittedOffsetPartitionCount(topicPartition);
      } else {
        zookeeperConnector.deleteCommittedOffsetTopicCount(topic);
      }
    } catch (Throwable t) {
      LOG.error("Zookeeper client failed", t);
      System.exit(1);
    }
  }
}

代码示例来源:origin: pinterest/secor

private String getCommittedOffsetTopicPath(String topic) {
  return getCommittedOffsetGroupPath() + "/" + topic;
}

代码示例来源:origin: pinterest/secor

public void setCommittedOffsetCount(TopicPartition topicPartition, long count)
    throws Exception {
  ZooKeeper zookeeper = mZookeeperClient.get();
  String offsetPath = getCommittedOffsetPartitionPath(topicPartition);
  LOG.info("creating missing parents for zookeeper path {}", offsetPath);
  createMissingParents(offsetPath);
  byte[] data = Long.toString(count).getBytes();
  try {
    LOG.info("setting zookeeper path {} value {}", offsetPath, count);
    // -1 matches any version
    zookeeper.setData(offsetPath, data, -1);
  } catch (KeeperException.NoNodeException exception) {
    zookeeper.create(offsetPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  }
}

代码示例来源:origin: pinterest/secor

public void deleteCommittedOffsetTopicCount(String topic) throws Exception {
  ZooKeeper zookeeper = mZookeeperClient.get();
  List<Integer> partitions = getCommittedOffsetPartitions(topic);
  for (Integer partition : partitions) {
    TopicPartition topicPartition = new TopicPartition(topic, partition);
    String offsetPath = getCommittedOffsetPartitionPath(topicPartition);
    LOG.info("deleting path {}", offsetPath);
    zookeeper.delete(offsetPath, -1);
  }
}

代码示例来源:origin: pinterest/secor

topicPartition.getPartition());
mZookeeperConnector.lock(lockPath);
try {
  long zookeeperCommittedOffsetCount = mZookeeperConnector.getCommittedOffsetCount(
      topicPartition);
  if (zookeeperCommittedOffsetCount == committedOffsetCount) {
    mZookeeperConnector.setCommittedOffsetCount(topicPartition, lastSeenOffset + 1);
    mOffsetTracker.setCommittedOffsetCount(topicPartition, lastSeenOffset + 1);
    if (isOffsetsStorageKafka) {
  mZookeeperConnector.unlock(lockPath);

代码示例来源:origin: pinterest/secor

Mockito.verify(mZookeeperConnector).lock(lockPath);
PowerMockito.verifyStatic();
FileUtil.moveToCloud(
        + "some_other_partition/10_0_00000000000000000010");
Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition);
Mockito.verify(mZookeeperConnector).setCommittedOffsetCount(
    mTopicPartition, 1L);
Mockito.verify(mOffsetTracker).setCommittedOffsetCount(mTopicPartition,
    1L);
Mockito.verify(mZookeeperConnector).unlock(lockPath);

代码示例来源:origin: pinterest/secor

protected void verify(String zookeeperPath, String expectedOffsetPath) {
    ZookeeperConnector zookeeperConnector = new ZookeeperConnector();
    PropertiesConfiguration properties = new PropertiesConfiguration();
    properties.setProperty("kafka.zookeeper.path", zookeeperPath);
    properties.setProperty("secor.kafka.group", "secor_cg");
    SecorConfig secorConfig = new SecorConfig(properties);
    zookeeperConnector.setConfig(secorConfig);
    Assert.assertEquals(expectedOffsetPath, zookeeperConnector.getCommittedOffsetGroupPath());
  }
}

代码示例来源:origin: pinterest/secor

/**
 * Init the Uploader with its dependent objects.
 *
 * @param config Secor configuration
 * @param offsetTracker Tracker of the current offset of topics partitions
 * @param fileRegistry Registry of log files on a per-topic and per-partition basis
 * @param uploadManager Manager of the physical upload of log files to the remote repository
 * @param metricCollector component that ingest metrics into monitoring system
 */
public void init(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry,
         UploadManager uploadManager, MessageReader messageReader, MetricCollector metricCollector) {
  init(config, offsetTracker, fileRegistry, uploadManager, messageReader,
      new ZookeeperConnector(config), metricCollector);
}

代码示例来源:origin: pinterest/secor

public void testDeleteTopicPartition() throws Exception {
  Mockito.when(
      mZookeeperConnector.getCommittedOffsetCount(mTopicPartition))
      .thenReturn(31L);
  Mockito.when(
      mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 30L))
      .thenReturn(11L);
  Mockito.when(mOffsetTracker.getLastSeenOffset(mTopicPartition))
      .thenReturn(20L);
  mUploader.applyPolicy(false);
  Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition);
}

代码示例来源:origin: pinterest/secor

public long getCommittedOffsetCount(TopicPartition topicPartition) throws Exception {
  ZooKeeper zookeeper = mZookeeperClient.get();
  String offsetPath = getCommittedOffsetPartitionPath(topicPartition);
  try {
    byte[] data = zookeeper.getData(offsetPath, false, null);
    return Long.parseLong(new String(data));
  } catch (KeeperException.NoNodeException exception) {
    LOG.warn("path {} does not exist in zookeeper", offsetPath);
    return -1;
  }
}

代码示例来源:origin: pinterest/secor

public void testUploadFiles() throws Exception {
  Mockito.when(
      mZookeeperConnector.getCommittedOffsetCount(mTopicPartition))
      .thenReturn(11L);
  Mockito.when(
  Mockito.verify(mZookeeperConnector).lock(lockPath);
  PowerMockito.verifyStatic();
  FileUtil.moveToCloud(
          + "some_other_partition/10_0_00000000000000000010");
  Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition);
  Mockito.verify(mZookeeperConnector).setCommittedOffsetCount(
      mTopicPartition, 21L);
  Mockito.verify(mOffsetTracker).setCommittedOffsetCount(mTopicPartition,
      21L);
  Mockito.verify(mZookeeperConnector).unlock(lockPath);

代码示例来源:origin: pinterest/secor

@Override
  public void init(SecorConfig config) {
    mConfig = config;
    mZookeeperConnector = new ZookeeperConnector(mConfig);
    mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
  }
}

代码示例来源:origin: pinterest/secor

@Override
public Message getCommittedMessage(TopicPartition topicPartition) throws Exception {
  SimpleConsumer consumer = null;
  try {
    long committedOffset = mZookeeperConnector.getCommittedOffsetCount(topicPartition) - 1;
    if (committedOffset < 0) {
      return null;
    }
    consumer = createConsumer(topicPartition);
    if (consumer == null) {
      return null;
    }
    return getMessage(topicPartition, committedOffset, consumer);
  } catch (MessageDoesNotExistException e) {
   // If a RuntimeEMessageDoesNotExistException exception is raised,
   // the message at the last comitted offset does not exist in Kafka.
   // This is usually due to the message being compacted away by the
   // Kafka log compaction process.
   //
   // That is no an exceptional situation - in fact it can be normal if
   // the topic being consumed by Secor has a low volume. So in that
   // case, simply return null
   LOG.warn("no committed message for topic {} partition {}", topicPartition.getTopic(), topicPartition.getPartition());
   return null;
  } finally {
    if (consumer != null) {
      consumer.close();
    }
  }
}

代码示例来源:origin: pinterest/secor

public void deleteCommittedOffsetPartitionCount(TopicPartition topicPartition)
    throws Exception {
  String offsetPath = getCommittedOffsetPartitionPath(topicPartition);
  ZooKeeper zookeeper = mZookeeperClient.get();
  LOG.info("deleting path {}", offsetPath);
  zookeeper.delete(offsetPath, -1);
}

代码示例来源:origin: pinterest/secor

public PartitionFinalizer(SecorConfig config) throws Exception {
  mConfig = config;
  Class kafkaClientClass = Class.forName(mConfig.getKafkaClientClass());
  this.mKafkaClient = (KafkaClient) kafkaClientClass.newInstance();
  this.mKafkaClient.init(config);
  mZookeeperConnector = new ZookeeperConnector(mConfig);
  mMessageParser = (TimestampedMessageParser) ReflectionUtil.createMessageParser(
   mConfig.getMessageParserClass(), mConfig);
  mQuboleClient = new QuboleClient(mConfig);
  if (mConfig.getFileExtension() != null && !mConfig.getFileExtension().isEmpty()) {
    mFileExtension = mConfig.getFileExtension();
  } else if (mConfig.getCompressionCodec() != null && !mConfig.getCompressionCodec().isEmpty()) {
    CompressionCodec codec = CompressionUtil.createCompressionCodec(mConfig.getCompressionCodec());
    mFileExtension = codec.getDefaultExtension();
  } else {
    mFileExtension = "";
  }
  mLookbackPeriods = config.getFinalizerLookbackPeriods();
  LOG.info("Lookback periods: " + mLookbackPeriods);
}

代码示例来源:origin: pinterest/secor

public void testTrimFiles() throws Exception {
  Mockito.when(
      mZookeeperConnector.getCommittedOffsetCount(mTopicPartition))
      .thenReturn(21L);

代码示例来源:origin: pinterest/secor

public List<String> getCommittedOffsetTopics() throws Exception {
  ZooKeeper zookeeper = mZookeeperClient.get();
  String offsetPath = getCommittedOffsetGroupPath();
  List<String> topics = zookeeper.getChildren(offsetPath, false);
  LinkedList<String> result = new LinkedList<String>();
  for (String topicPath : topics) {
    String[] elements = topicPath.split("/");
    String topic = elements[elements.length - 1];
    result.add(topic);
  }
  return result;
}

代码示例来源:origin: pinterest/secor

public ProgressMonitor(SecorConfig config)
    throws Exception
{
  mConfig = config;
  mZookeeperConnector = new ZookeeperConnector(mConfig);
  try {
    Class timestampClass = Class.forName(mConfig.getKafkaClientClass());
    this.mKafkaClient = (KafkaClient) timestampClass.newInstance();
    this.mKafkaClient.init(config);
  } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
    throw new RuntimeException(e);
  }
  mMessageParser = (MessageParser) ReflectionUtil.createMessageParser(
      mConfig.getMessageParserClass(), mConfig);
  mPrefix = mConfig.getMonitoringPrefix();
  if (Strings.isNullOrEmpty(mPrefix)) {
    mPrefix = "secor";
  }
  if (mConfig.getStatsDHostPort() != null && !mConfig.getStatsDHostPort().isEmpty()) {
    HostAndPort hostPort = HostAndPort.fromString(mConfig.getStatsDHostPort());
    mStatsDClient = new NonBlockingStatsDClient(null, hostPort.getHostText(), hostPort.getPort(),
        mConfig.getStatsDDogstatsdConstantTags());
  }
}

代码示例来源:origin: pinterest/secor

modificationAgeSec >= mConfig.getMaxFileAgeSeconds() ||
  isRequiredToUploadAtTime(topicPartition)) {
long newOffsetCount = mZookeeperConnector.getCommittedOffsetCount(topicPartition);
long oldOffsetCount = mOffsetTracker.setCommittedOffsetCount(topicPartition,
    newOffsetCount);

相关文章