本文整理了Java中com.pinterest.secor.common.ZookeeperConnector
类的一些代码示例,展示了ZookeeperConnector
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZookeeperConnector
类的具体详情如下:
包路径:com.pinterest.secor.common.ZookeeperConnector
类名称:ZookeeperConnector
[英]ZookeeperConnector implements interactions with Zookeeper.
[中]ZookePerConnector实现与Zookeeper的交互。
代码示例来源:origin: pinterest/secor
public static void main(String[] args) {
try {
CommandLine commandLine = parseArgs(args);
String command = commandLine.getOptionValue("command");
if (!command.equals("delete_committed_offsets")) {
throw new IllegalArgumentException(
"command has to be one of \"delete_committed_offsets\"");
}
SecorConfig config = SecorConfig.load();
ZookeeperConnector zookeeperConnector = new ZookeeperConnector(config);
String topic = commandLine.getOptionValue("topic");
if (commandLine.hasOption("partition")) {
int partition =
((Number) commandLine.getParsedOptionValue("partition")).intValue();
TopicPartition topicPartition = new TopicPartition(topic, partition);
zookeeperConnector.deleteCommittedOffsetPartitionCount(topicPartition);
} else {
zookeeperConnector.deleteCommittedOffsetTopicCount(topic);
}
} catch (Throwable t) {
LOG.error("Zookeeper client failed", t);
System.exit(1);
}
}
}
代码示例来源:origin: pinterest/secor
private String getCommittedOffsetTopicPath(String topic) {
return getCommittedOffsetGroupPath() + "/" + topic;
}
代码示例来源:origin: pinterest/secor
public void setCommittedOffsetCount(TopicPartition topicPartition, long count)
throws Exception {
ZooKeeper zookeeper = mZookeeperClient.get();
String offsetPath = getCommittedOffsetPartitionPath(topicPartition);
LOG.info("creating missing parents for zookeeper path {}", offsetPath);
createMissingParents(offsetPath);
byte[] data = Long.toString(count).getBytes();
try {
LOG.info("setting zookeeper path {} value {}", offsetPath, count);
// -1 matches any version
zookeeper.setData(offsetPath, data, -1);
} catch (KeeperException.NoNodeException exception) {
zookeeper.create(offsetPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
代码示例来源:origin: pinterest/secor
public void deleteCommittedOffsetTopicCount(String topic) throws Exception {
ZooKeeper zookeeper = mZookeeperClient.get();
List<Integer> partitions = getCommittedOffsetPartitions(topic);
for (Integer partition : partitions) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
String offsetPath = getCommittedOffsetPartitionPath(topicPartition);
LOG.info("deleting path {}", offsetPath);
zookeeper.delete(offsetPath, -1);
}
}
代码示例来源:origin: pinterest/secor
topicPartition.getPartition());
mZookeeperConnector.lock(lockPath);
try {
long zookeeperCommittedOffsetCount = mZookeeperConnector.getCommittedOffsetCount(
topicPartition);
if (zookeeperCommittedOffsetCount == committedOffsetCount) {
mZookeeperConnector.setCommittedOffsetCount(topicPartition, lastSeenOffset + 1);
mOffsetTracker.setCommittedOffsetCount(topicPartition, lastSeenOffset + 1);
if (isOffsetsStorageKafka) {
mZookeeperConnector.unlock(lockPath);
代码示例来源:origin: pinterest/secor
Mockito.verify(mZookeeperConnector).lock(lockPath);
PowerMockito.verifyStatic();
FileUtil.moveToCloud(
+ "some_other_partition/10_0_00000000000000000010");
Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition);
Mockito.verify(mZookeeperConnector).setCommittedOffsetCount(
mTopicPartition, 1L);
Mockito.verify(mOffsetTracker).setCommittedOffsetCount(mTopicPartition,
1L);
Mockito.verify(mZookeeperConnector).unlock(lockPath);
代码示例来源:origin: pinterest/secor
protected void verify(String zookeeperPath, String expectedOffsetPath) {
ZookeeperConnector zookeeperConnector = new ZookeeperConnector();
PropertiesConfiguration properties = new PropertiesConfiguration();
properties.setProperty("kafka.zookeeper.path", zookeeperPath);
properties.setProperty("secor.kafka.group", "secor_cg");
SecorConfig secorConfig = new SecorConfig(properties);
zookeeperConnector.setConfig(secorConfig);
Assert.assertEquals(expectedOffsetPath, zookeeperConnector.getCommittedOffsetGroupPath());
}
}
代码示例来源:origin: pinterest/secor
/**
* Init the Uploader with its dependent objects.
*
* @param config Secor configuration
* @param offsetTracker Tracker of the current offset of topics partitions
* @param fileRegistry Registry of log files on a per-topic and per-partition basis
* @param uploadManager Manager of the physical upload of log files to the remote repository
* @param metricCollector component that ingest metrics into monitoring system
*/
public void init(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry,
UploadManager uploadManager, MessageReader messageReader, MetricCollector metricCollector) {
init(config, offsetTracker, fileRegistry, uploadManager, messageReader,
new ZookeeperConnector(config), metricCollector);
}
代码示例来源:origin: pinterest/secor
public void testDeleteTopicPartition() throws Exception {
Mockito.when(
mZookeeperConnector.getCommittedOffsetCount(mTopicPartition))
.thenReturn(31L);
Mockito.when(
mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 30L))
.thenReturn(11L);
Mockito.when(mOffsetTracker.getLastSeenOffset(mTopicPartition))
.thenReturn(20L);
mUploader.applyPolicy(false);
Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition);
}
代码示例来源:origin: pinterest/secor
public long getCommittedOffsetCount(TopicPartition topicPartition) throws Exception {
ZooKeeper zookeeper = mZookeeperClient.get();
String offsetPath = getCommittedOffsetPartitionPath(topicPartition);
try {
byte[] data = zookeeper.getData(offsetPath, false, null);
return Long.parseLong(new String(data));
} catch (KeeperException.NoNodeException exception) {
LOG.warn("path {} does not exist in zookeeper", offsetPath);
return -1;
}
}
代码示例来源:origin: pinterest/secor
public void testUploadFiles() throws Exception {
Mockito.when(
mZookeeperConnector.getCommittedOffsetCount(mTopicPartition))
.thenReturn(11L);
Mockito.when(
Mockito.verify(mZookeeperConnector).lock(lockPath);
PowerMockito.verifyStatic();
FileUtil.moveToCloud(
+ "some_other_partition/10_0_00000000000000000010");
Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition);
Mockito.verify(mZookeeperConnector).setCommittedOffsetCount(
mTopicPartition, 21L);
Mockito.verify(mOffsetTracker).setCommittedOffsetCount(mTopicPartition,
21L);
Mockito.verify(mZookeeperConnector).unlock(lockPath);
代码示例来源:origin: pinterest/secor
@Override
public void init(SecorConfig config) {
mConfig = config;
mZookeeperConnector = new ZookeeperConnector(mConfig);
mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}
}
代码示例来源:origin: pinterest/secor
@Override
public Message getCommittedMessage(TopicPartition topicPartition) throws Exception {
SimpleConsumer consumer = null;
try {
long committedOffset = mZookeeperConnector.getCommittedOffsetCount(topicPartition) - 1;
if (committedOffset < 0) {
return null;
}
consumer = createConsumer(topicPartition);
if (consumer == null) {
return null;
}
return getMessage(topicPartition, committedOffset, consumer);
} catch (MessageDoesNotExistException e) {
// If a RuntimeEMessageDoesNotExistException exception is raised,
// the message at the last comitted offset does not exist in Kafka.
// This is usually due to the message being compacted away by the
// Kafka log compaction process.
//
// That is no an exceptional situation - in fact it can be normal if
// the topic being consumed by Secor has a low volume. So in that
// case, simply return null
LOG.warn("no committed message for topic {} partition {}", topicPartition.getTopic(), topicPartition.getPartition());
return null;
} finally {
if (consumer != null) {
consumer.close();
}
}
}
代码示例来源:origin: pinterest/secor
public void deleteCommittedOffsetPartitionCount(TopicPartition topicPartition)
throws Exception {
String offsetPath = getCommittedOffsetPartitionPath(topicPartition);
ZooKeeper zookeeper = mZookeeperClient.get();
LOG.info("deleting path {}", offsetPath);
zookeeper.delete(offsetPath, -1);
}
代码示例来源:origin: pinterest/secor
public PartitionFinalizer(SecorConfig config) throws Exception {
mConfig = config;
Class kafkaClientClass = Class.forName(mConfig.getKafkaClientClass());
this.mKafkaClient = (KafkaClient) kafkaClientClass.newInstance();
this.mKafkaClient.init(config);
mZookeeperConnector = new ZookeeperConnector(mConfig);
mMessageParser = (TimestampedMessageParser) ReflectionUtil.createMessageParser(
mConfig.getMessageParserClass(), mConfig);
mQuboleClient = new QuboleClient(mConfig);
if (mConfig.getFileExtension() != null && !mConfig.getFileExtension().isEmpty()) {
mFileExtension = mConfig.getFileExtension();
} else if (mConfig.getCompressionCodec() != null && !mConfig.getCompressionCodec().isEmpty()) {
CompressionCodec codec = CompressionUtil.createCompressionCodec(mConfig.getCompressionCodec());
mFileExtension = codec.getDefaultExtension();
} else {
mFileExtension = "";
}
mLookbackPeriods = config.getFinalizerLookbackPeriods();
LOG.info("Lookback periods: " + mLookbackPeriods);
}
代码示例来源:origin: pinterest/secor
public void testTrimFiles() throws Exception {
Mockito.when(
mZookeeperConnector.getCommittedOffsetCount(mTopicPartition))
.thenReturn(21L);
代码示例来源:origin: pinterest/secor
public List<String> getCommittedOffsetTopics() throws Exception {
ZooKeeper zookeeper = mZookeeperClient.get();
String offsetPath = getCommittedOffsetGroupPath();
List<String> topics = zookeeper.getChildren(offsetPath, false);
LinkedList<String> result = new LinkedList<String>();
for (String topicPath : topics) {
String[] elements = topicPath.split("/");
String topic = elements[elements.length - 1];
result.add(topic);
}
return result;
}
代码示例来源:origin: pinterest/secor
public ProgressMonitor(SecorConfig config)
throws Exception
{
mConfig = config;
mZookeeperConnector = new ZookeeperConnector(mConfig);
try {
Class timestampClass = Class.forName(mConfig.getKafkaClientClass());
this.mKafkaClient = (KafkaClient) timestampClass.newInstance();
this.mKafkaClient.init(config);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
mMessageParser = (MessageParser) ReflectionUtil.createMessageParser(
mConfig.getMessageParserClass(), mConfig);
mPrefix = mConfig.getMonitoringPrefix();
if (Strings.isNullOrEmpty(mPrefix)) {
mPrefix = "secor";
}
if (mConfig.getStatsDHostPort() != null && !mConfig.getStatsDHostPort().isEmpty()) {
HostAndPort hostPort = HostAndPort.fromString(mConfig.getStatsDHostPort());
mStatsDClient = new NonBlockingStatsDClient(null, hostPort.getHostText(), hostPort.getPort(),
mConfig.getStatsDDogstatsdConstantTags());
}
}
代码示例来源:origin: pinterest/secor
modificationAgeSec >= mConfig.getMaxFileAgeSeconds() ||
isRequiredToUploadAtTime(topicPartition)) {
long newOffsetCount = mZookeeperConnector.getCommittedOffsetCount(topicPartition);
long oldOffsetCount = mOffsetTracker.setCommittedOffsetCount(topicPartition,
newOffsetCount);
内容来源于网络,如有侵权,请联系作者删除!