org.apache.storm.utils.Utils.reverseMap()方法的使用及代码示例

x33g5p2x  于2022-02-01 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(141)

本文整理了Java中org.apache.storm.utils.Utils.reverseMap()方法的一些代码示例,展示了Utils.reverseMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Utils.reverseMap()方法的具体详情如下:
包路径:org.apache.storm.utils.Utils
类名称:Utils
方法名:reverseMap

Utils.reverseMap介绍

[英]"[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}" Reverses an assoc-list style Map like reverseMap(Map...)
[中]“[[:a1][:b1][:c2]}->{1[:a:b]2:c}”反转与reverseMap(映射…)类似的关联列表样式映射

代码示例

代码示例来源:origin: apache/storm

private Set<Set<ExecutorDetails>> computeWorkerSpecs(TopologyDetails topology) {
  Map<String, List<ExecutorDetails>> compExecutors = Utils.reverseMap(topology.getExecutorToComponent());
  List<ExecutorDetails> allExecutors = new ArrayList<ExecutorDetails>();
  Collection<List<ExecutorDetails>> values = compExecutors.values();
  for (List<ExecutorDetails> eList : values) {
    allExecutors.addAll(eList);
  }
  int numWorkers = topology.getNumWorkers();
  int bucketIndex = 0;
  Map<Integer, Set<ExecutorDetails>> bucketExecutors = new HashMap<Integer, Set<ExecutorDetails>>(numWorkers);
  for (ExecutorDetails executor : allExecutors) {
    Set<ExecutorDetails> executors = bucketExecutors.get(bucketIndex);
    if (executors == null) {
      executors = new HashSet<ExecutorDetails>();
      bucketExecutors.put(bucketIndex, executors);
    }
    executors.add(executor);
    bucketIndex = (bucketIndex + 1) % numWorkers;
  }
  return new HashSet<Set<ExecutorDetails>>(bucketExecutors.values());
}

代码示例来源:origin: apache/storm

/**
 * @return seq of task ids that receive messages from this worker
 */
private Set<Integer> workerOutboundTasks() {
  WorkerTopologyContext context = getWorkerTopologyContext();
  Set<String> components = new HashSet<>();
  for (Integer taskId : localTaskIds) {
    for (Map<String, Grouping> value : context.getTargets(context.getComponentId(taskId)).values()) {
      components.addAll(value.keySet());
    }
  }
  Set<Integer> outboundTasks = new HashSet<>();
  for (Map.Entry<String, List<Integer>> entry : Utils.reverseMap(taskToComponent).entrySet()) {
    if (components.contains(entry.getKey())) {
      outboundTasks.addAll(entry.getValue());
    }
  }
  return outboundTasks;
}

代码示例来源:origin: apache/storm

private static List<List<Long>> changedExecutors(Map<List<Long>, NodeInfo> map, Map<List<Long>,
  List<Object>> newExecToNodePort) {
  HashMap<NodeInfo, List<List<Long>>> tmpSlotAssigned = map == null ? new HashMap<>() : Utils.reverseMap(map);
  HashMap<List<Object>, List<List<Long>>> slotAssigned = new HashMap<>();
  for (Entry<NodeInfo, List<List<Long>>> entry : tmpSlotAssigned.entrySet()) {
    NodeInfo ni = entry.getKey();
    List<Object> key = new ArrayList<>(2);
    key.add(ni.get_node());
    key.add(ni.get_port_iterator().next());
    List<List<Long>> value = new ArrayList<>(entry.getValue());
    value.sort(Comparator.comparing(a -> a.get(0)));
    slotAssigned.put(key, value);
  }
  HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() :
    Utils.reverseMap(newExecToNodePort);
  HashMap<List<Object>, List<List<Long>>> newSlotAssigned = new HashMap<>();
  for (Entry<List<Object>, List<List<Long>>> entry : tmpNewSlotAssigned.entrySet()) {
    List<List<Long>> value = new ArrayList<>(entry.getValue());
    value.sort(Comparator.comparing(a -> a.get(0)));
    newSlotAssigned.put(entry.getKey(), value);
  }
  Map<List<Object>, List<List<Long>>> diff = mapDiff(slotAssigned, newSlotAssigned);
  List<List<Long>> ret = new ArrayList<>();
  for (List<List<Long>> val : diff.values()) {
    ret.addAll(val);
  }
  return ret;
}

代码示例来源:origin: apache/storm

public static Map<WorkerSlot, List<ExecutorDetails>> getAliveAssignedWorkerSlotExecutors(Cluster cluster, String topologyId) {
  SchedulerAssignment existingAssignment = cluster.getAssignmentById(topologyId);
  Map<ExecutorDetails, WorkerSlot> executorToSlot = null;
  if (existingAssignment != null) {
    executorToSlot = existingAssignment.getExecutorToSlot();
  }
  return Utils.reverseMap(executorToSlot);
}

代码示例来源:origin: apache/storm

private List<List<Integer>> computeExecutors(String topoId, StormBase base, Map<String, Object> topoConf,
                       StormTopology topology)
  throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
  assert (base != null);
  Map<String, Integer> compToExecutors = base.get_component_executors();
  List<List<Integer>> ret = new ArrayList<>();
  if (compToExecutors != null) {
    Map<Integer, String> taskInfo = StormCommon.stormTaskInfo(topology, topoConf);
    Map<String, List<Integer>> compToTaskList = Utils.reverseMap(taskInfo);
    for (Entry<String, List<Integer>> entry : compToTaskList.entrySet()) {
      List<Integer> comps = entry.getValue();
      comps.sort(null);
      Integer numExecutors = compToExecutors.get(entry.getKey());
      if (numExecutors != null) {
        List<List<Integer>> partitioned = Utils.partitionFixed(numExecutors, comps);
        for (List<Integer> partition : partitioned) {
          ret.add(Arrays.asList(partition.get(0), partition.get(partition.size() - 1)));
        }
      }
    }
  }
  return ret;
}

代码示例来源:origin: apache/storm

public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) {
  for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
    String topologyId = topology.getId();
    Map<ExecutorDetails, WorkerSlot> newAssignment = scheduleTopology(topology, cluster);
    Map<WorkerSlot, List<ExecutorDetails>> nodePortToExecutors = Utils.reverseMap(newAssignment);
    for (Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : nodePortToExecutors.entrySet()) {
      WorkerSlot nodePort = entry.getKey();
      List<ExecutorDetails> executors = entry.getValue();
      cluster.assign(nodePort, topologyId, executors);
    }
  }
}

代码示例来源:origin: apache/storm

/**
 * need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock
 * overrides all the timestamps. By only checking heartbeats with an assigned node+port, and only reading executors from that heartbeat
 * that are actually assigned, we avoid situations like that.
 *
 * @param stormId          topology id
 * @param executorNodePort executor id -> node + port
 * @return mapping of executorInfo -> executor beat
 */
@Override
public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
  Map<ExecutorInfo, ExecutorBeat> executorWhbs = new HashMap<>();
  Map<NodeInfo, List<List<Long>>> nodePortExecutors = Utils.reverseMap(executorNodePort);
  for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
    String node = entry.getKey().get_node();
    Long port = entry.getKey().get_port_iterator().next();
    ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port);
    List<ExecutorInfo> executorInfoList = new ArrayList<>();
    for (List<Long> list : entry.getValue()) {
      executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
    }
    if (whb != null) {
      executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb));
    }
  }
  return executorWhbs;
}

代码示例来源:origin: apache/storm

private Map<String, List<AssignmentInfo>> hostAssignments(Cluster cluster) {
  Collection<SchedulerAssignment> assignmentValues = cluster.getAssignments().values();
  Map<String, List<AssignmentInfo>> hostAssignments = new HashMap<String, List<AssignmentInfo>>();
  for (SchedulerAssignment sa : assignmentValues) {
    Map<WorkerSlot, List<ExecutorDetails>> slotExecutors = Utils.reverseMap(sa.getExecutorToSlot());
    Set<Map.Entry<WorkerSlot, List<ExecutorDetails>>> entries = slotExecutors.entrySet();
    for (Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : entries) {
      WorkerSlot slot = entry.getKey();
      List<ExecutorDetails> executors = entry.getValue();
      String host = cluster.getHost(slot.getNodeId());
      AssignmentInfo ass = new AssignmentInfo(slot, sa.getTopologyId(), new HashSet<ExecutorDetails>(executors));
      List<AssignmentInfo> executorList = hostAssignments.get(host);
      if (executorList == null) {
        executorList = new ArrayList<AssignmentInfo>();
        hostAssignments.put(host, executorList);
      }
      executorList.add(ass);
    }
  }
  return hostAssignments;
}

代码示例来源:origin: apache/storm

this.componentToSortedTasks = Utils.reverseMap(taskToComponent);
this.componentToSortedTasks.values().forEach(Collections::sort);
this.endpointSocketLock = new ReentrantReadWriteLock();

代码示例来源:origin: apache/storm

Map<String, List<Integer>> compToTasks = Utils.reverseMap(info.taskToComponent);
if (compToTasks.containsKey(StormCommon.EVENTLOGGER_COMPONENT_ID)) {
  List<Integer> tasks = compToTasks.get(StormCommon.EVENTLOGGER_COMPONENT_ID);

代码示例来源:origin: org.apache.storm/storm-core

/**
 * need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock overrides all the
 * timestamps. By only checking heartbeats with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, we avoid
 * situations like that
 * 
 * @param stormId
 * @param executorNodePort
 * @return
 */
@Override
public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
  Map<ExecutorInfo, ExecutorBeat> executorWhbs = new HashMap<>();
  Map<NodeInfo, List<List<Long>>> nodePortExecutors = Utils.reverseMap(executorNodePort);
  for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
    String node = entry.getKey().get_node();
    Long port = entry.getKey().get_port_iterator().next();
    ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port);
    List<ExecutorInfo> executorInfoList = new ArrayList<>();
    for (List<Long> list : entry.getValue()) {
      executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
    }
    if (whb != null)
      executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb));
  }
  return executorWhbs;
}

相关文章

Utils类方法