本文整理了Java中com.spotify.helios.servicescommon.coordination.ZooKeeperUpdatingPersistentDirectory
类的一些代码示例,展示了ZooKeeperUpdatingPersistentDirectory
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZooKeeperUpdatingPersistentDirectory
类的具体详情如下:
包路径:com.spotify.helios.servicescommon.coordination.ZooKeeperUpdatingPersistentDirectory
类名称:ZooKeeperUpdatingPersistentDirectory
[英]A map that persists modification locally on disk and attempt to replicate modifications to ZooKeeper, retrying forever until successful. Note that ZooKeeper is only written to and never read from, so this is not a distributed map. Multiple changes to the same key are folded and only the last value is written to ZooKeeper.
[中]在磁盘上本地保存修改并尝试将修改复制到ZooKeeper的映射,永远重试,直到成功。请注意,ZooKeeper只被写入,从不被读取,所以这不是一个分布式地图。对同一个键的多个更改被折叠,只有最后一个值被写入ZooKeeper。
代码示例来源:origin: spotify/helios
public static ZooKeeperUpdatingPersistentDirectory create(final String name,
final ZooKeeperClientProvider client,
final Path stateFile,
final String path)
throws IOException, InterruptedException {
return new ZooKeeperUpdatingPersistentDirectory(name, client, stateFile, path);
}
代码示例来源:origin: spotify/helios
/**
* Returns the {@link TaskStatus}es for all tasks assigned to the current agent.
*/
@Override
public Map<JobId, TaskStatus> getTaskStatuses() {
final Map<JobId, TaskStatus> statuses = Maps.newHashMap();
for (final Map.Entry<String, byte[]> entry : this.taskStatuses.entrySet()) {
try {
final JobId id = JobId.fromString(entry.getKey());
final TaskStatus status = Json.read(entry.getValue(), TaskStatus.class);
statuses.put(id, status);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return statuses;
}
代码示例来源:origin: spotify/helios
/**
* Get the {@link TaskStatus} for the job identified by {@code jobId}.
*/
@Override
public TaskStatus getTaskStatus(final JobId jobId) {
final byte[] data = taskStatuses.get(jobId.toString());
if (data == null) {
return null;
}
try {
return parse(data, TaskStatus.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
代码示例来源:origin: spotify/helios
public byte[] remove(final Object key) throws InterruptedException {
if (!(key instanceof String)) {
return null;
}
return remove((String) key);
}
代码示例来源:origin: spotify/helios
@Override
protected void startUp() throws Exception {
client("startUp").getConnectionStateListenable().addListener(connectionStateListener);
reactor.startAsync().awaitRunning();
reactor.signal();
}
代码示例来源:origin: spotify/helios
@Override
protected void shutDown() throws Exception {
tasks.stopAsync().awaitTerminated();
taskStatuses.stopAsync().awaitTerminated();
if (historyWriter != null) {
historyWriter.stopAsync().awaitTerminated();
}
}
代码示例来源:origin: spotify/helios
@Override
protected void startUp() throws Exception {
tasks.startAsync().awaitRunning();
taskStatuses.startAsync().awaitRunning();
if (historyWriter != null) {
historyWriter.startAsync().awaitRunning();
}
}
代码示例来源:origin: spotify/helios
private boolean isAlive() {
return state().ordinal() < STOPPING.ordinal();
}
代码示例来源:origin: spotify/helios
public ZooKeeperAgentModel(final ZooKeeperClientProvider provider,
final String host,
final Path stateDirectory,
final TaskHistoryWriter historyWriter,
final List<EventSender> eventSenders,
final String taskStatusEventTopic)
throws IOException, InterruptedException {
// TODO(drewc): we're constructing too many heavyweight things in the ctor, these kinds of
// things should be passed in/provider'd/etc.
final ZooKeeperClient client = provider.get("ZooKeeperAgentModel_ctor");
this.agent = checkNotNull(host);
final Path taskConfigFile = stateDirectory.resolve(TASK_CONFIG_FILENAME);
this.tasks = client.pathChildrenCache(Paths.configHostJobs(host), taskConfigFile,
Json.type(Task.class));
tasks.addListener(new JobsListener());
final Path taskStatusFile = stateDirectory.resolve(TASK_STATUS_FILENAME);
this.taskStatuses = ZooKeeperUpdatingPersistentDirectory.create("agent-model-task-statuses",
provider,
taskStatusFile,
Paths.statusHostJobs(host));
this.historyWriter = historyWriter;
this.eventSenders = eventSenders;
this.taskStatusEventTopic = taskStatusEventTopic;
}
代码示例来源:origin: spotify/helios
/**
* Set the {@link TaskStatus} for the job identified by {@code jobId}.
*/
@Override
public void setTaskStatus(final JobId jobId, final TaskStatus status)
throws InterruptedException {
log.debug("setting task status: {}", status);
taskStatuses.put(jobId.toString(), status.toJsonBytes());
if (historyWriter != null) {
try {
historyWriter.saveHistoryItem(status);
} catch (Exception e) {
// Log error here and keep going as saving task history is not critical.
// This is to prevent bad data in the queue from screwing up the actually important Helios
// agent operations.
log.error("Error saving task status {} to ZooKeeper: {}", status, e);
}
}
final TaskStatusEvent event = new TaskStatusEvent(status, System.currentTimeMillis(), agent);
final byte[] message = event.toJsonBytes();
for (final EventSender sender : eventSenders) {
sender.send(taskStatusEventTopic, message);
}
}
代码示例来源:origin: spotify/helios
/**
* Remove the {@link TaskStatus} for the job identified by {@code jobId}.
*/
@Override
public void removeTaskStatus(final JobId jobId) throws InterruptedException {
taskStatuses.remove(jobId.toString());
}
代码示例来源:origin: at.molindo/helios-services
@Override
protected void startUp() throws Exception {
client("startUp").getConnectionStateListenable().addListener(connectionStateListener);
reactor.startAsync().awaitRunning();
reactor.signal();
}
代码示例来源:origin: at.molindo/helios-services
@Override
protected void shutDown() throws Exception {
tasks.stopAsync().awaitTerminated();
taskStatuses.stopAsync().awaitTerminated();
historyWriter.stopAsync().awaitTerminated();
}
代码示例来源:origin: at.molindo/helios-services
@Override
protected void startUp() throws Exception {
tasks.startAsync().awaitRunning();
taskStatuses.startAsync().awaitRunning();
historyWriter.startAsync().awaitRunning();
}
代码示例来源:origin: at.molindo/helios-services
private boolean isAlive() {
return state().ordinal() < STOPPING.ordinal();
}
代码示例来源:origin: at.molindo/helios-services
public ZooKeeperAgentModel(final ZooKeeperClientProvider provider,
final KafkaClientProvider kafkaProvider, final String host,
final Path stateDirectory) throws IOException, InterruptedException {
// TODO(drewc): we're constructing too many heavyweight things in the ctor, these kinds of
// things should be passed in/provider'd/etc.
final ZooKeeperClient client = provider.get("ZooKeeperAgentModel_ctor");
this.agent = checkNotNull(host);
final Path taskConfigFile = stateDirectory.resolve(TASK_CONFIG_FILENAME);
this.tasks = client.pathChildrenCache(Paths.configHostJobs(host), taskConfigFile,
Json.type(Task.class));
tasks.addListener(new JobsListener());
final Path taskStatusFile = stateDirectory.resolve(TASK_STATUS_FILENAME);
this.taskStatuses = ZooKeeperUpdatingPersistentDirectory.create("agent-model-task-statuses",
provider,
taskStatusFile,
Paths.statusHostJobs(host));
this.historyWriter = new TaskHistoryWriter(
host, client, stateDirectory.resolve(TASK_HISTORY_FILENAME));
this.kafkaSender = new KafkaSender(
kafkaProvider.getProducer(new StringSerializer(), new ByteArraySerializer()));
}
代码示例来源:origin: at.molindo/helios-services
/**
* Set the {@link TaskStatus} for the job identified by {@code jobId}.
*/
@Override
public void setTaskStatus(final JobId jobId, final TaskStatus status)
throws InterruptedException {
log.debug("setting task status: {}", status);
taskStatuses.put(jobId.toString(), status.toJsonBytes());
try {
historyWriter.saveHistoryItem(status);
} catch (Exception e) {
// Log error here and keep going as saving task history is not critical.
// This is to prevent bad data in the queue from screwing up the actually important Helios
// agent operations.
log.error("Error saving task status {} to ZooKeeper: {}", status, e);
}
final TaskStatusEvent event = new TaskStatusEvent(status, System.currentTimeMillis(), agent);
kafkaSender.send(KafkaRecord.of(TaskStatusEvent.KAFKA_TOPIC, event.toJsonBytes()));
}
代码示例来源:origin: at.molindo/helios-services
public byte[] remove(final Object key) throws InterruptedException {
if (!(key instanceof String)) {
return null;
}
return remove((String) key);
}
代码示例来源:origin: at.molindo/helios-services
public static ZooKeeperUpdatingPersistentDirectory create(final String name,
final ZooKeeperClientProvider client,
final Path stateFile,
final String path)
throws IOException, InterruptedException {
return new ZooKeeperUpdatingPersistentDirectory(name, client, stateFile, path);
}
代码示例来源:origin: at.molindo/helios-services
/**
* Get the {@link TaskStatus} for the job identified by {@code jobId}.
*/
@Override
public TaskStatus getTaskStatus(final JobId jobId) {
final byte[] data = taskStatuses.get(jobId.toString());
if (data == null) {
return null;
}
try {
return parse(data, TaskStatus.class);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
内容来源于网络,如有侵权,请联系作者删除!