本文整理了Java中org.apache.storm.utils.Utils.serialize()
方法的一些代码示例,展示了Utils.serialize()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Utils.serialize()
方法的具体详情如下:
包路径:org.apache.storm.utils.Utils
类名称:Utils
方法名:serialize
[英]Serialize an object using the configured serialization and then base64 encode it into a string.
[中]使用配置的序列化对对象进行序列化,然后base64将其编码为字符串。
代码示例来源:origin: apache/storm
/**
* Turn a WorkerTokenInfo in a byte array.
*
* @param wti what to serialize.
* @return the resulting byte array.
*/
public static byte[] serializeWorkerTokenInfo(WorkerTokenInfo wti) {
return Utils.serialize(wti);
}
代码示例来源:origin: apache/storm
private static void assignedAssignmentsToLocal(IStormClusterState clusterState, SupervisorAssignments assignments) {
if (null == assignments) {
//unknown error, just skip
return;
}
Map<String, byte[]> serAssignments = new HashMap<>();
for (Map.Entry<String, Assignment> entry : assignments.get_storm_assignment().entrySet()) {
serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue()));
}
clusterState.syncRemoteAssignments(serAssignments);
}
代码示例来源:origin: apache/storm
@Override
public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
if (info != null) {
String path = ClusterUtils.workerbeatPath(stormId, node, port);
stateStorage.set_worker_hb(path, Utils.serialize(info), defaultAcls);
}
}
代码示例来源:origin: apache/storm
@Override
public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
String path = ClusterUtils.supervisorPath(supervisorId);
stateStorage.set_ephemeral_node(path, Utils.serialize(info), defaultAcls);
}
代码示例来源:origin: apache/storm
@Override
public void setAssignment(String stormId, Assignment info, Map<String, Object> topoConf) {
byte[] serAssignment = Utils.serialize(info);
stateStorage.mkdirs(ClusterUtils.ASSIGNMENTS_SUBTREE, defaultAcls);
stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
this.assignmentsBackend.keepOrUpdateAssignment(stormId, info);
}
代码示例来源:origin: apache/storm
@Override
public void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) {
List<ACL> aclList = ClusterUtils.mkTopoReadOnlyAcls(topoConf);
String path = ClusterUtils.credentialsPath(stormId);
stateStorage.set_data(path, Utils.serialize(creds), aclList);
}
代码示例来源:origin: apache/storm
@Override
public void setTopologyLogConfig(String stormId, LogConfig logConfig, Map<String, Object> topoConf) {
stateStorage.mkdirs(ClusterUtils.LOGCONFIG_SUBTREE, defaultAcls);
stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
}
代码示例来源:origin: apache/storm
@Override
public void addPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion, PrivateWorkerKey key) {
assert context.getDaemonType() == DaemonType.NIMBUS;
stateStorage.mkdirs(ClusterUtils.SECRET_KEYS_SUBTREE, defaultAcls);
List<ACL> secretAcls = context.getZkSecretAcls(type);
String path = ClusterUtils.secretKeysPath(type, topologyId, keyVersion);
LOG.info("Storing private key for {} connecting to a {} at {} with ACL {}", topologyId, type, path, secretAcls);
stateStorage.set_data(path, Utils.serialize(key), secretAcls);
}
代码示例来源:origin: apache/storm
@Override
public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) {
// explicit delete for ephmeral node to ensure this session creates the entry.
stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
stateStorage.add_listener((curatorFramework, connectionState) -> {
LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
if (connectionState.equals(ConnectionState.RECONNECTED)) {
LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
// explicit delete for ephemeral node to ensure this session creates the entry.
stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), defaultAcls);
}
});
stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), defaultAcls);
}
代码示例来源:origin: apache/storm
/**
* Update an existing topology .
* @param topoId the id of the topology
* @param who who is doing it
* @param topo the new topology to save
* @throws AuthorizationException if who is not allowed to update a topology
* @throws KeyNotFoundException if the topology is not found in the blob store
* @throws IOException on any error interacting with the blob store
*/
public void updateTopology(final String topoId, final Subject who, final StormTopology topo)
throws AuthorizationException, KeyNotFoundException, IOException {
final String key = ConfigUtils.masterStormCodeKey(topoId);
store.updateBlob(key, Utils.serialize(topo), who);
List<AccessControl> acl = BlobStoreAclHandler.DEFAULT;
WithAcl<StormTopology> old = topos.get(topoId);
if (old != null) {
acl = old.acl;
} else {
acl = store.getBlobMeta(key, who).get_settable().get_acl();
}
topos.put(topoId, new WithAcl<>(acl, topo));
}
代码示例来源:origin: apache/storm
/**
* Add a new topology.
* @param topoId the id of the topology
* @param who who is doing it
* @param topo the topology itself
* @throws AuthorizationException if who is not allowed to add a topology
* @throws KeyAlreadyExistsException if the topology already exists
* @throws IOException on any error interacting with the blob store
*/
public void addTopology(final String topoId, final Subject who, final StormTopology topo)
throws AuthorizationException, KeyAlreadyExistsException, IOException {
final String key = ConfigUtils.masterStormCodeKey(topoId);
final List<AccessControl> acl = BlobStoreAclHandler.DEFAULT;
SettableBlobMeta meta = new SettableBlobMeta(acl);
store.createBlob(key, Utils.serialize(topo), meta, who);
topos.put(topoId, new WithAcl<>(meta.get_acl(), topo));
}
代码示例来源:origin: apache/storm
@Override
public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest) {
ProfileAction profileAction = profileRequest.get_action();
String host = profileRequest.get_nodeInfo().get_node();
Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
stateStorage.set_data(path, Utils.serialize(profileRequest), defaultAcls);
}
代码示例来源:origin: apache/storm
@Override
public void activateStorm(String stormId, StormBase stormBase, Map<String, Object> topoConf) {
String path = ClusterUtils.stormPath(stormId);
stateStorage.mkdirs(ClusterUtils.STORMS_SUBTREE, defaultAcls);
stateStorage.set_data(path, Utils.serialize(stormBase), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
this.assignmentsBackend.keepStormId(stormBase.get_name(), stormId);
}
代码示例来源:origin: apache/storm
errorInfo.set_host(node);
errorInfo.set_port(port.intValue());
byte[] serData = Utils.serialize(errorInfo);
stateStorage.mkdirs(path, defaultAcls);
stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, defaultAcls);
代码示例来源:origin: apache/storm
newElems.set_status(stormBase.get_status());
stateStorage.set_data(ClusterUtils.stormPath(stormId), Utils.serialize(newElems), defaultAcls);
代码示例来源:origin: apache/storm
@Test
public void testGetWorkerHb() throws Exception {
HBPulse hbPulse = new HBPulse();
hbPulse.set_id("/foo");
ClusterWorkerHeartbeat cwh = new ClusterWorkerHeartbeat("some-storm-id", new HashMap(), 1, 1);
hbPulse.set_details(Utils.serialize(cwh));
createPaceMakerStateStorage(HBServerMessageType.GET_PULSE_RESPONSE, HBMessageData.pulse(hbPulse));
stateStorage.get_worker_hb("/foo", false);
verify(clientMock).send(hbMessageCaptor.capture());
HBMessage sent = hbMessageCaptor.getValue();
Assert.assertEquals(HBServerMessageType.GET_PULSE, sent.get_type());
Assert.assertEquals("/foo", sent.get_data().get_path());
}
代码示例来源:origin: org.apache.storm/storm-core
@Override
public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
if (info != null) {
String path = ClusterUtils.workerbeatPath(stormId, node, port);
stateStorage.set_worker_hb(path, Utils.serialize(info), acls);
}
}
代码示例来源:origin: org.apache.storm/storm-core
@Override
public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
String path = ClusterUtils.supervisorPath(supervisorId);
stateStorage.set_ephemeral_node(path, Utils.serialize(info), acls);
}
代码示例来源:origin: org.apache.storm/storm-core
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
if (connectionState.equals(ConnectionState.RECONNECTED)) {
LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
// explicit delete for ephmeral node to ensure this session creates the entry.
stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
}
}
});
代码示例来源:origin: org.apache.storm/storm-core
@Override
public void activateStorm(String stormId, StormBase stormBase, Map<String, Object> topoConf) {
String path = ClusterUtils.stormPath(stormId);
stateStorage.mkdirs(ClusterUtils.STORMS_SUBTREE, acls);
stateStorage.set_data(path, Utils.serialize(stormBase), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
}
内容来源于网络,如有侵权,请联系作者删除!