com.spotify.helios.servicescommon.coordination.ZooKeeperUpdatingPersistentDirectory类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(94)

本文整理了Java中com.spotify.helios.servicescommon.coordination.ZooKeeperUpdatingPersistentDirectory类的一些代码示例,展示了ZooKeeperUpdatingPersistentDirectory类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZooKeeperUpdatingPersistentDirectory类的具体详情如下:
包路径:com.spotify.helios.servicescommon.coordination.ZooKeeperUpdatingPersistentDirectory
类名称:ZooKeeperUpdatingPersistentDirectory

ZooKeeperUpdatingPersistentDirectory介绍

[英]A map that persists modification locally on disk and attempt to replicate modifications to ZooKeeper, retrying forever until successful. Note that ZooKeeper is only written to and never read from, so this is not a distributed map. Multiple changes to the same key are folded and only the last value is written to ZooKeeper.
[中]在磁盘上本地保存修改并尝试将修改复制到ZooKeeper的映射,永远重试,直到成功。请注意,ZooKeeper只被写入,从不被读取,所以这不是一个分布式地图。对同一个键的多个更改被折叠,只有最后一个值被写入ZooKeeper。

代码示例

代码示例来源:origin: spotify/helios

public static ZooKeeperUpdatingPersistentDirectory create(final String name,
                             final ZooKeeperClientProvider client,
                             final Path stateFile,
                             final String path)
  throws IOException, InterruptedException {
 return new ZooKeeperUpdatingPersistentDirectory(name, client, stateFile, path);
}

代码示例来源:origin: spotify/helios

/**
 * Returns the {@link TaskStatus}es for all tasks assigned to the current agent.
 */
@Override
public Map<JobId, TaskStatus> getTaskStatuses() {
 final Map<JobId, TaskStatus> statuses = Maps.newHashMap();
 for (final Map.Entry<String, byte[]> entry : this.taskStatuses.entrySet()) {
  try {
   final JobId id = JobId.fromString(entry.getKey());
   final TaskStatus status = Json.read(entry.getValue(), TaskStatus.class);
   statuses.put(id, status);
  } catch (IOException e) {
   throw new RuntimeException(e);
  }
 }
 return statuses;
}

代码示例来源:origin: spotify/helios

/**
 * Get the {@link TaskStatus} for the job identified by {@code jobId}.
 */
@Override
public TaskStatus getTaskStatus(final JobId jobId) {
 final byte[] data = taskStatuses.get(jobId.toString());
 if (data == null) {
  return null;
 }
 try {
  return parse(data, TaskStatus.class);
 } catch (IOException e) {
  throw new RuntimeException(e);
 }
}

代码示例来源:origin: spotify/helios

public byte[] remove(final Object key) throws InterruptedException {
 if (!(key instanceof String)) {
  return null;
 }
 return remove((String) key);
}

代码示例来源:origin: spotify/helios

@Override
protected void startUp() throws Exception {
 client("startUp").getConnectionStateListenable().addListener(connectionStateListener);
 reactor.startAsync().awaitRunning();
 reactor.signal();
}

代码示例来源:origin: spotify/helios

@Override
protected void shutDown() throws Exception {
 tasks.stopAsync().awaitTerminated();
 taskStatuses.stopAsync().awaitTerminated();
 if (historyWriter != null) {
  historyWriter.stopAsync().awaitTerminated();
 }
}

代码示例来源:origin: spotify/helios

@Override
protected void startUp() throws Exception {
 tasks.startAsync().awaitRunning();
 taskStatuses.startAsync().awaitRunning();
 if (historyWriter != null) {
  historyWriter.startAsync().awaitRunning();
 }
}

代码示例来源:origin: spotify/helios

private boolean isAlive() {
 return state().ordinal() < STOPPING.ordinal();
}

代码示例来源:origin: spotify/helios

public ZooKeeperAgentModel(final ZooKeeperClientProvider provider,
              final String host,
              final Path stateDirectory,
              final TaskHistoryWriter historyWriter,
              final List<EventSender> eventSenders,
              final String taskStatusEventTopic)
  throws IOException, InterruptedException {
 // TODO(drewc): we're constructing too many heavyweight things in the ctor, these kinds of
 // things should be passed in/provider'd/etc.
 final ZooKeeperClient client = provider.get("ZooKeeperAgentModel_ctor");
 this.agent = checkNotNull(host);
 final Path taskConfigFile = stateDirectory.resolve(TASK_CONFIG_FILENAME);
 this.tasks = client.pathChildrenCache(Paths.configHostJobs(host), taskConfigFile,
   Json.type(Task.class));
 tasks.addListener(new JobsListener());
 final Path taskStatusFile = stateDirectory.resolve(TASK_STATUS_FILENAME);
 this.taskStatuses = ZooKeeperUpdatingPersistentDirectory.create("agent-model-task-statuses",
   provider,
   taskStatusFile,
   Paths.statusHostJobs(host));
 this.historyWriter = historyWriter;
 this.eventSenders = eventSenders;
 this.taskStatusEventTopic = taskStatusEventTopic;
}

代码示例来源:origin: spotify/helios

/**
 * Set the {@link TaskStatus} for the job identified by {@code jobId}.
 */
@Override
public void setTaskStatus(final JobId jobId, final TaskStatus status)
  throws InterruptedException {
 log.debug("setting task status: {}", status);
 taskStatuses.put(jobId.toString(), status.toJsonBytes());
 if (historyWriter != null) {
  try {
   historyWriter.saveHistoryItem(status);
  } catch (Exception e) {
   // Log error here and keep going as saving task history is not critical.
   // This is to prevent bad data in the queue from screwing up the actually important Helios
   // agent operations.
   log.error("Error saving task status {} to ZooKeeper: {}", status, e);
  }
 }
 final TaskStatusEvent event = new TaskStatusEvent(status, System.currentTimeMillis(), agent);
 final byte[] message = event.toJsonBytes();
 for (final EventSender sender : eventSenders) {
  sender.send(taskStatusEventTopic, message);
 }
}

代码示例来源:origin: spotify/helios

/**
 * Remove the {@link TaskStatus} for the job identified by {@code jobId}.
 */
@Override
public void removeTaskStatus(final JobId jobId) throws InterruptedException {
 taskStatuses.remove(jobId.toString());
}

代码示例来源:origin: at.molindo/helios-services

@Override
protected void startUp() throws Exception {
 client("startUp").getConnectionStateListenable().addListener(connectionStateListener);
 reactor.startAsync().awaitRunning();
 reactor.signal();
}

代码示例来源:origin: at.molindo/helios-services

@Override
protected void shutDown() throws Exception {
 tasks.stopAsync().awaitTerminated();
 taskStatuses.stopAsync().awaitTerminated();
 historyWriter.stopAsync().awaitTerminated();
}

代码示例来源:origin: at.molindo/helios-services

@Override
protected void startUp() throws Exception {
 tasks.startAsync().awaitRunning();
 taskStatuses.startAsync().awaitRunning();
 historyWriter.startAsync().awaitRunning();
}

代码示例来源:origin: at.molindo/helios-services

private boolean isAlive() {
 return state().ordinal() < STOPPING.ordinal();
}

代码示例来源:origin: at.molindo/helios-services

public ZooKeeperAgentModel(final ZooKeeperClientProvider provider,
              final KafkaClientProvider kafkaProvider, final String host,
              final Path stateDirectory) throws IOException, InterruptedException {
 // TODO(drewc): we're constructing too many heavyweight things in the ctor, these kinds of
 // things should be passed in/provider'd/etc.
 final ZooKeeperClient client = provider.get("ZooKeeperAgentModel_ctor");
 this.agent = checkNotNull(host);
 final Path taskConfigFile = stateDirectory.resolve(TASK_CONFIG_FILENAME);
 this.tasks = client.pathChildrenCache(Paths.configHostJobs(host), taskConfigFile,
                    Json.type(Task.class));
 tasks.addListener(new JobsListener());
 final Path taskStatusFile = stateDirectory.resolve(TASK_STATUS_FILENAME);
 this.taskStatuses = ZooKeeperUpdatingPersistentDirectory.create("agent-model-task-statuses",
                                 provider,
                                 taskStatusFile,
                                 Paths.statusHostJobs(host));
 this.historyWriter = new TaskHistoryWriter(
   host, client, stateDirectory.resolve(TASK_HISTORY_FILENAME));
 this.kafkaSender = new KafkaSender(
   kafkaProvider.getProducer(new StringSerializer(), new ByteArraySerializer()));
}

代码示例来源:origin: at.molindo/helios-services

/**
 * Set the {@link TaskStatus} for the job identified by {@code jobId}.
 */
@Override
public void setTaskStatus(final JobId jobId, final TaskStatus status)
  throws InterruptedException {
 log.debug("setting task status: {}", status);
 taskStatuses.put(jobId.toString(), status.toJsonBytes());
 try {
  historyWriter.saveHistoryItem(status);
 } catch (Exception e) {
  // Log error here and keep going as saving task history is not critical.
  // This is to prevent bad data in the queue from screwing up the actually important Helios
  // agent operations.
  log.error("Error saving task status {} to ZooKeeper: {}", status, e);
 }
 final TaskStatusEvent event = new TaskStatusEvent(status, System.currentTimeMillis(), agent);
 kafkaSender.send(KafkaRecord.of(TaskStatusEvent.KAFKA_TOPIC, event.toJsonBytes()));
}

代码示例来源:origin: at.molindo/helios-services

public byte[] remove(final Object key) throws InterruptedException {
 if (!(key instanceof String)) {
  return null;
 }
 return remove((String) key);
}

代码示例来源:origin: at.molindo/helios-services

public static ZooKeeperUpdatingPersistentDirectory create(final String name,
                             final ZooKeeperClientProvider client,
                             final Path stateFile,
                             final String path)
  throws IOException, InterruptedException {
 return new ZooKeeperUpdatingPersistentDirectory(name, client, stateFile, path);
}

代码示例来源:origin: at.molindo/helios-services

/**
 * Get the {@link TaskStatus} for the job identified by {@code jobId}.
 */
@Override
public TaskStatus getTaskStatus(final JobId jobId) {
 final byte[] data = taskStatuses.get(jobId.toString());
 if (data == null) {
  return null;
 }
 try {
  return parse(data, TaskStatus.class);
 } catch (IOException e) {
  throw Throwables.propagate(e);
 }
}

相关文章