本文整理了Java中org.apache.storm.utils.Utils.reverseMap()
方法的一些代码示例,展示了Utils.reverseMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Utils.reverseMap()
方法的具体详情如下:
包路径:org.apache.storm.utils.Utils
类名称:Utils
方法名:reverseMap
[英]"[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}" Reverses an assoc-list style Map like reverseMap(Map...)
[中]“[[:a1][:b1][:c2]}->{1[:a:b]2:c}”反转与reverseMap(映射…)类似的关联列表样式映射
代码示例来源:origin: apache/storm
private Set<Set<ExecutorDetails>> computeWorkerSpecs(TopologyDetails topology) {
Map<String, List<ExecutorDetails>> compExecutors = Utils.reverseMap(topology.getExecutorToComponent());
List<ExecutorDetails> allExecutors = new ArrayList<ExecutorDetails>();
Collection<List<ExecutorDetails>> values = compExecutors.values();
for (List<ExecutorDetails> eList : values) {
allExecutors.addAll(eList);
}
int numWorkers = topology.getNumWorkers();
int bucketIndex = 0;
Map<Integer, Set<ExecutorDetails>> bucketExecutors = new HashMap<Integer, Set<ExecutorDetails>>(numWorkers);
for (ExecutorDetails executor : allExecutors) {
Set<ExecutorDetails> executors = bucketExecutors.get(bucketIndex);
if (executors == null) {
executors = new HashSet<ExecutorDetails>();
bucketExecutors.put(bucketIndex, executors);
}
executors.add(executor);
bucketIndex = (bucketIndex + 1) % numWorkers;
}
return new HashSet<Set<ExecutorDetails>>(bucketExecutors.values());
}
代码示例来源:origin: apache/storm
/**
* @return seq of task ids that receive messages from this worker
*/
private Set<Integer> workerOutboundTasks() {
WorkerTopologyContext context = getWorkerTopologyContext();
Set<String> components = new HashSet<>();
for (Integer taskId : localTaskIds) {
for (Map<String, Grouping> value : context.getTargets(context.getComponentId(taskId)).values()) {
components.addAll(value.keySet());
}
}
Set<Integer> outboundTasks = new HashSet<>();
for (Map.Entry<String, List<Integer>> entry : Utils.reverseMap(taskToComponent).entrySet()) {
if (components.contains(entry.getKey())) {
outboundTasks.addAll(entry.getValue());
}
}
return outboundTasks;
}
代码示例来源:origin: apache/storm
private static List<List<Long>> changedExecutors(Map<List<Long>, NodeInfo> map, Map<List<Long>,
List<Object>> newExecToNodePort) {
HashMap<NodeInfo, List<List<Long>>> tmpSlotAssigned = map == null ? new HashMap<>() : Utils.reverseMap(map);
HashMap<List<Object>, List<List<Long>>> slotAssigned = new HashMap<>();
for (Entry<NodeInfo, List<List<Long>>> entry : tmpSlotAssigned.entrySet()) {
NodeInfo ni = entry.getKey();
List<Object> key = new ArrayList<>(2);
key.add(ni.get_node());
key.add(ni.get_port_iterator().next());
List<List<Long>> value = new ArrayList<>(entry.getValue());
value.sort(Comparator.comparing(a -> a.get(0)));
slotAssigned.put(key, value);
}
HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() :
Utils.reverseMap(newExecToNodePort);
HashMap<List<Object>, List<List<Long>>> newSlotAssigned = new HashMap<>();
for (Entry<List<Object>, List<List<Long>>> entry : tmpNewSlotAssigned.entrySet()) {
List<List<Long>> value = new ArrayList<>(entry.getValue());
value.sort(Comparator.comparing(a -> a.get(0)));
newSlotAssigned.put(entry.getKey(), value);
}
Map<List<Object>, List<List<Long>>> diff = mapDiff(slotAssigned, newSlotAssigned);
List<List<Long>> ret = new ArrayList<>();
for (List<List<Long>> val : diff.values()) {
ret.addAll(val);
}
return ret;
}
代码示例来源:origin: apache/storm
public static Map<WorkerSlot, List<ExecutorDetails>> getAliveAssignedWorkerSlotExecutors(Cluster cluster, String topologyId) {
SchedulerAssignment existingAssignment = cluster.getAssignmentById(topologyId);
Map<ExecutorDetails, WorkerSlot> executorToSlot = null;
if (existingAssignment != null) {
executorToSlot = existingAssignment.getExecutorToSlot();
}
return Utils.reverseMap(executorToSlot);
}
代码示例来源:origin: apache/storm
private List<List<Integer>> computeExecutors(String topoId, StormBase base, Map<String, Object> topoConf,
StormTopology topology)
throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
assert (base != null);
Map<String, Integer> compToExecutors = base.get_component_executors();
List<List<Integer>> ret = new ArrayList<>();
if (compToExecutors != null) {
Map<Integer, String> taskInfo = StormCommon.stormTaskInfo(topology, topoConf);
Map<String, List<Integer>> compToTaskList = Utils.reverseMap(taskInfo);
for (Entry<String, List<Integer>> entry : compToTaskList.entrySet()) {
List<Integer> comps = entry.getValue();
comps.sort(null);
Integer numExecutors = compToExecutors.get(entry.getKey());
if (numExecutors != null) {
List<List<Integer>> partitioned = Utils.partitionFixed(numExecutors, comps);
for (List<Integer> partition : partitioned) {
ret.add(Arrays.asList(partition.get(0), partition.get(partition.size() - 1)));
}
}
}
}
return ret;
}
代码示例来源:origin: apache/storm
public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) {
for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
String topologyId = topology.getId();
Map<ExecutorDetails, WorkerSlot> newAssignment = scheduleTopology(topology, cluster);
Map<WorkerSlot, List<ExecutorDetails>> nodePortToExecutors = Utils.reverseMap(newAssignment);
for (Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : nodePortToExecutors.entrySet()) {
WorkerSlot nodePort = entry.getKey();
List<ExecutorDetails> executors = entry.getValue();
cluster.assign(nodePort, topologyId, executors);
}
}
}
代码示例来源:origin: apache/storm
/**
* need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock
* overrides all the timestamps. By only checking heartbeats with an assigned node+port, and only reading executors from that heartbeat
* that are actually assigned, we avoid situations like that.
*
* @param stormId topology id
* @param executorNodePort executor id -> node + port
* @return mapping of executorInfo -> executor beat
*/
@Override
public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
Map<ExecutorInfo, ExecutorBeat> executorWhbs = new HashMap<>();
Map<NodeInfo, List<List<Long>>> nodePortExecutors = Utils.reverseMap(executorNodePort);
for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
String node = entry.getKey().get_node();
Long port = entry.getKey().get_port_iterator().next();
ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port);
List<ExecutorInfo> executorInfoList = new ArrayList<>();
for (List<Long> list : entry.getValue()) {
executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
}
if (whb != null) {
executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb));
}
}
return executorWhbs;
}
代码示例来源:origin: apache/storm
private Map<String, List<AssignmentInfo>> hostAssignments(Cluster cluster) {
Collection<SchedulerAssignment> assignmentValues = cluster.getAssignments().values();
Map<String, List<AssignmentInfo>> hostAssignments = new HashMap<String, List<AssignmentInfo>>();
for (SchedulerAssignment sa : assignmentValues) {
Map<WorkerSlot, List<ExecutorDetails>> slotExecutors = Utils.reverseMap(sa.getExecutorToSlot());
Set<Map.Entry<WorkerSlot, List<ExecutorDetails>>> entries = slotExecutors.entrySet();
for (Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : entries) {
WorkerSlot slot = entry.getKey();
List<ExecutorDetails> executors = entry.getValue();
String host = cluster.getHost(slot.getNodeId());
AssignmentInfo ass = new AssignmentInfo(slot, sa.getTopologyId(), new HashSet<ExecutorDetails>(executors));
List<AssignmentInfo> executorList = hostAssignments.get(host);
if (executorList == null) {
executorList = new ArrayList<AssignmentInfo>();
hostAssignments.put(host, executorList);
}
executorList.add(ass);
}
}
return hostAssignments;
}
代码示例来源:origin: apache/storm
this.componentToSortedTasks = Utils.reverseMap(taskToComponent);
this.componentToSortedTasks.values().forEach(Collections::sort);
this.endpointSocketLock = new ReentrantReadWriteLock();
代码示例来源:origin: apache/storm
Map<String, List<Integer>> compToTasks = Utils.reverseMap(info.taskToComponent);
if (compToTasks.containsKey(StormCommon.EVENTLOGGER_COMPONENT_ID)) {
List<Integer> tasks = compToTasks.get(StormCommon.EVENTLOGGER_COMPONENT_ID);
代码示例来源:origin: org.apache.storm/storm-core
/**
* need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock overrides all the
* timestamps. By only checking heartbeats with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, we avoid
* situations like that
*
* @param stormId
* @param executorNodePort
* @return
*/
@Override
public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
Map<ExecutorInfo, ExecutorBeat> executorWhbs = new HashMap<>();
Map<NodeInfo, List<List<Long>>> nodePortExecutors = Utils.reverseMap(executorNodePort);
for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
String node = entry.getKey().get_node();
Long port = entry.getKey().get_port_iterator().next();
ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port);
List<ExecutorInfo> executorInfoList = new ArrayList<>();
for (List<Long> list : entry.getValue()) {
executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
}
if (whb != null)
executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb));
}
return executorWhbs;
}
内容来源于网络,如有侵权,请联系作者删除!