org.apache.storm.utils.Utils.serialize()方法的使用及代码示例

x33g5p2x  于2022-02-01 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(148)

本文整理了Java中org.apache.storm.utils.Utils.serialize()方法的一些代码示例,展示了Utils.serialize()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Utils.serialize()方法的具体详情如下:
包路径:org.apache.storm.utils.Utils
类名称:Utils
方法名:serialize

Utils.serialize介绍

[英]Serialize an object using the configured serialization and then base64 encode it into a string.
[中]使用配置的序列化对对象进行序列化,然后base64将其编码为字符串。

代码示例

代码示例来源:origin: apache/storm

/**
 * Turn a WorkerTokenInfo in a byte array.
 *
 * @param wti what to serialize.
 * @return the resulting byte array.
 */
public static byte[] serializeWorkerTokenInfo(WorkerTokenInfo wti) {
  return Utils.serialize(wti);
}

代码示例来源:origin: apache/storm

private static void assignedAssignmentsToLocal(IStormClusterState clusterState, SupervisorAssignments assignments) {
  if (null == assignments) {
    //unknown error, just skip
    return;
  }
  Map<String, byte[]> serAssignments = new HashMap<>();
  for (Map.Entry<String, Assignment> entry : assignments.get_storm_assignment().entrySet()) {
    serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue()));
  }
  clusterState.syncRemoteAssignments(serAssignments);
}

代码示例来源:origin: apache/storm

@Override
public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
  if (info != null) {
    String path = ClusterUtils.workerbeatPath(stormId, node, port);
    stateStorage.set_worker_hb(path, Utils.serialize(info), defaultAcls);
  }
}

代码示例来源:origin: apache/storm

@Override
public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
  String path = ClusterUtils.supervisorPath(supervisorId);
  stateStorage.set_ephemeral_node(path, Utils.serialize(info), defaultAcls);
}

代码示例来源:origin: apache/storm

@Override
public void setAssignment(String stormId, Assignment info, Map<String, Object> topoConf) {
  byte[] serAssignment = Utils.serialize(info);
  stateStorage.mkdirs(ClusterUtils.ASSIGNMENTS_SUBTREE, defaultAcls);
  stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
  this.assignmentsBackend.keepOrUpdateAssignment(stormId, info);
}

代码示例来源:origin: apache/storm

@Override
public void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) {
  List<ACL> aclList = ClusterUtils.mkTopoReadOnlyAcls(topoConf);
  String path = ClusterUtils.credentialsPath(stormId);
  stateStorage.set_data(path, Utils.serialize(creds), aclList);
}

代码示例来源:origin: apache/storm

@Override
public void setTopologyLogConfig(String stormId, LogConfig logConfig, Map<String, Object> topoConf) {
  stateStorage.mkdirs(ClusterUtils.LOGCONFIG_SUBTREE, defaultAcls);
  stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
}

代码示例来源:origin: apache/storm

@Override
public void addPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion, PrivateWorkerKey key) {
  assert context.getDaemonType() == DaemonType.NIMBUS;
  stateStorage.mkdirs(ClusterUtils.SECRET_KEYS_SUBTREE, defaultAcls);
  List<ACL> secretAcls = context.getZkSecretAcls(type);
  String path = ClusterUtils.secretKeysPath(type, topologyId, keyVersion);
  LOG.info("Storing private key for {} connecting to a {} at {} with ACL {}", topologyId, type, path, secretAcls);
  stateStorage.set_data(path, Utils.serialize(key), secretAcls);
}

代码示例来源:origin: apache/storm

@Override
public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) {
  // explicit delete for ephmeral node to ensure this session creates the entry.
  stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
  stateStorage.add_listener((curatorFramework, connectionState) -> {
    LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
    if (connectionState.equals(ConnectionState.RECONNECTED)) {
      LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
      // explicit delete for ephemeral node to ensure this session creates the entry.
      stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
      stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), defaultAcls);
    }
  });
  stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), defaultAcls);
}

代码示例来源:origin: apache/storm

/**
 * Update an existing topology .
 * @param topoId the id of the topology
 * @param who who is doing it
 * @param topo the new topology to save
 * @throws AuthorizationException if who is not allowed to update a topology
 * @throws KeyNotFoundException if the topology is not found in the blob store
 * @throws IOException on any error interacting with the blob store
 */
public void updateTopology(final String topoId, final Subject who, final StormTopology topo)
  throws AuthorizationException, KeyNotFoundException, IOException {
  final String key = ConfigUtils.masterStormCodeKey(topoId);
  store.updateBlob(key, Utils.serialize(topo), who);
  List<AccessControl> acl = BlobStoreAclHandler.DEFAULT;
  WithAcl<StormTopology> old = topos.get(topoId);
  if (old != null) {
    acl = old.acl;
  } else {
    acl = store.getBlobMeta(key, who).get_settable().get_acl();
  }
  topos.put(topoId, new WithAcl<>(acl, topo));
}

代码示例来源:origin: apache/storm

/**
 * Add a new topology.
 * @param topoId the id of the topology
 * @param who who is doing it
 * @param topo the topology itself
 * @throws AuthorizationException if who is not allowed to add a topology
 * @throws KeyAlreadyExistsException if the topology already exists
 * @throws IOException on any error interacting with the blob store
 */
public void addTopology(final String topoId, final Subject who, final StormTopology topo)
  throws AuthorizationException, KeyAlreadyExistsException, IOException {
  final String key = ConfigUtils.masterStormCodeKey(topoId);
  final List<AccessControl> acl = BlobStoreAclHandler.DEFAULT;
  SettableBlobMeta meta = new SettableBlobMeta(acl);
  store.createBlob(key, Utils.serialize(topo), meta, who);
  topos.put(topoId, new WithAcl<>(meta.get_acl(), topo));
}

代码示例来源:origin: apache/storm

@Override
public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest) {
  ProfileAction profileAction = profileRequest.get_action();
  String host = profileRequest.get_nodeInfo().get_node();
  Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
  String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
  stateStorage.set_data(path, Utils.serialize(profileRequest), defaultAcls);
}

代码示例来源:origin: apache/storm

@Override
public void activateStorm(String stormId, StormBase stormBase, Map<String, Object> topoConf) {
  String path = ClusterUtils.stormPath(stormId);
  stateStorage.mkdirs(ClusterUtils.STORMS_SUBTREE, defaultAcls);
  stateStorage.set_data(path, Utils.serialize(stormBase), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
  this.assignmentsBackend.keepStormId(stormBase.get_name(), stormId);
}

代码示例来源:origin: apache/storm

errorInfo.set_host(node);
errorInfo.set_port(port.intValue());
byte[] serData = Utils.serialize(errorInfo);
stateStorage.mkdirs(path, defaultAcls);
stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, defaultAcls);

代码示例来源:origin: apache/storm

newElems.set_status(stormBase.get_status());
stateStorage.set_data(ClusterUtils.stormPath(stormId), Utils.serialize(newElems), defaultAcls);

代码示例来源:origin: apache/storm

@Test
public void testGetWorkerHb() throws Exception {
  HBPulse hbPulse = new HBPulse();
  hbPulse.set_id("/foo");
  ClusterWorkerHeartbeat cwh = new ClusterWorkerHeartbeat("some-storm-id", new HashMap(), 1, 1);
  hbPulse.set_details(Utils.serialize(cwh));
  createPaceMakerStateStorage(HBServerMessageType.GET_PULSE_RESPONSE, HBMessageData.pulse(hbPulse));
  stateStorage.get_worker_hb("/foo", false);
  verify(clientMock).send(hbMessageCaptor.capture());
  HBMessage sent = hbMessageCaptor.getValue();
  Assert.assertEquals(HBServerMessageType.GET_PULSE, sent.get_type());
  Assert.assertEquals("/foo", sent.get_data().get_path());
}

代码示例来源:origin: org.apache.storm/storm-core

@Override
public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
  if (info != null) {
    String path = ClusterUtils.workerbeatPath(stormId, node, port);
    stateStorage.set_worker_hb(path, Utils.serialize(info), acls);
  }
}

代码示例来源:origin: org.apache.storm/storm-core

@Override
public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
  String path = ClusterUtils.supervisorPath(supervisorId);
  stateStorage.set_ephemeral_node(path, Utils.serialize(info), acls);
}

代码示例来源:origin: org.apache.storm/storm-core

@Override
  public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
    LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
    if (connectionState.equals(ConnectionState.RECONNECTED)) {
      LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
      // explicit delete for ephmeral node to ensure this session creates the entry.
      stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
      stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
    }
  }
});

代码示例来源:origin: org.apache.storm/storm-core

@Override
public void activateStorm(String stormId, StormBase stormBase, Map<String, Object> topoConf) {
  String path = ClusterUtils.stormPath(stormId);
  stateStorage.mkdirs(ClusterUtils.STORMS_SUBTREE, acls);
  stateStorage.set_data(path, Utils.serialize(stormBase), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
}

相关文章

Utils类方法