
x33g5p2x  于2022-02-01 转载在 其他  



[英]A Vertex in a graph.

A Vertex has an Oplet instance that will be executed at runtime and zero or more input ports and zero or more output ports. Each output port is represented by a Connector instance.


代码示例来源:origin: org.apache.edgent/edgent-spi-graph

public <N extends Source<P>, P> Connector<P> source(N oplet) {
  return insert(oplet, 0, 1).getConnectors().get(0);

代码示例来源:origin: apache/incubator-edgent

assertSame(op, v.getInstance());
v.getConnectors().get(0).connect(v2, 0);

代码示例来源:origin: org.apache.edgent/edgent-runtime-etiao

@SuppressWarnings({ "rawtypes", "unchecked" })
public VertexType(Vertex<? extends Oplet<?, ?>, ?, ?> value, IdMapper<String> ids) { = (value instanceof ExecutableVertex) ?
    ids.add(value, ((ExecutableVertex) value).getInvocationId()) :
    // Can't get an id from the vertex, generate unique value
  this.invocation = new InvocationType(value.getInstance());

代码示例来源:origin: apache/incubator-edgent

@SuppressWarnings({ "rawtypes", "unchecked" })
public VertexType(Vertex<? extends Oplet<?, ?>, ?, ?> value, IdMapper<String> ids) { = (value instanceof ExecutableVertex) ?
    ids.add(value, ((ExecutableVertex) value).getInvocationId()) :
    // Can't get an id from the vertex, generate unique value
  this.invocation = new InvocationType(value.getInstance());

代码示例来源:origin: apache/incubator-edgent

public <N extends Source<P>, P> Connector<P> source(N oplet) {
  return insert(oplet, 0, 1).getConnectors().get(0);

代码示例来源:origin: apache/incubator-edgent

   * Add counter metrics to all the topology's streams.
   * <p>
   * {@link CounterOp} oplets are inserted between every two graph
   * vertices with the following exceptions:
   * <ul>
   * <li>Oplets are only inserted upstream from a FanOut oplet.</li>
   * <li>If a chain of Peek oplets exists between oplets A and B, a Metric 
   * oplet is inserted after the last Peek, right upstream from oplet B.</li>
   * <li>If a chain a Peek oplets is followed by a FanOut, a metric oplet is 
   * inserted between the last Peek and the FanOut oplet.</li>
   * <li>Oplets are not inserted immediately downstream from another 
   * {@code CounterOp} oplet (but they are inserted upstream from one.)</li>
   * </ul>
   * The implementation is not idempotent: Calling the method twice 
   * will insert a new set of metric oplets into the graph.
   * @param t
   *            The topology
   * @see org.apache.edgent.graph.Graph#peekAll(org.apache.edgent.function.Supplier, org.apache.edgent.function.Predicate) Graph.peekAll()
  public static void counter(Topology t) {
    // peekAll() embodies the above exclusion semantics
        () -> new CounterOp<>(), 
        v -> !(v.getInstance() instanceof CounterOp)

代码示例来源:origin: org.apache.edgent/edgent-spi-graph

public <N extends Oplet<C, P>, C, P> Connector<P> pipe(Connector<C> output, N oplet) {
  Vertex<N, C, P> pipeVertex = insert(oplet, 1, 1);
  output.connect(pipeVertex, 0);
  return pipeVertex.getConnectors().get(0);

代码示例来源:origin: apache/incubator-edgent

public void testMetricsEverywhere() throws Exception {
  Topology t = newTopology();
  TStream<String> s = t.strings("a", "b", "c");
  // Condition inserts a sink
  Condition<Long> tc = t.getTester().tupleCount(s, 3);
  Graph g = t.graph();
  Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
  // Two vertices before submission
  assertEquals(2, vertices.size());
  complete(t, tc);
  // At least three vertices after submission
  // (provide may have added other oplets as well)
  Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> verticesAfterSubmit = g.getVertices();
  assertTrue("size="+verticesAfterSubmit.size(), verticesAfterSubmit.size() >= 3);
  // There is exactly one vertex for a metric oplet
  int numOplets = 0;
  for (Vertex<? extends Oplet<?, ?>, ?, ?> v : verticesAfterSubmit) {
    Oplet<?,?> oplet = v.getInstance();
    if (oplet instanceof CounterOp) {
  assertEquals(1, numOplets);

代码示例来源:origin: apache/incubator-edgent

public <N extends Oplet<C, P>, C, P> Connector<P> pipe(Connector<C> output, N oplet) {
  Vertex<N, C, P> pipeVertex = insert(oplet, 1, 1);
  output.connect(pipeVertex, 0);
  return pipeVertex.getConnectors().get(0);

代码示例来源:origin: apache/incubator-edgent

PeriodicSource<?> src = null;
for (Vertex<? extends Oplet<?, ?>, ?, ?> v : vertices) {
  Oplet<?,?> op = v.getInstance();
  assertTrue(op instanceof PeriodicSource);
  src = (PeriodicSource<?>) op;

代码示例来源:origin: org.apache.edgent/edgent-spi-topology

public <U> TStream<U> fanin(FanIn<T,U> fanin, List<TStream<T>> others) {
 if (others.isEmpty() || others.size() == 1 && others.contains(this)) 
  throw new IllegalArgumentException("others");  // use pipe()
 if (new HashSet<>(others).size() != others.size())
  throw new IllegalArgumentException("others has dups");
 for (TStream<T> other : others)
 others = new ArrayList<>(others);
 others.add(0, this);
 Vertex<Oplet<T,U>, T, U> fanInVertex = graph().insert(fanin, others.size(), 1);
 int inputPort = 0;
 for (TStream<T> other : others) {
   ConnectorStream<G,T> cs = (ConnectorStream<G, T>) other;
   cs.connector.connect(fanInVertex, inputPort++);
 return derived(fanInVertex.getConnectors().get(0));

代码示例来源:origin: apache/incubator-edgent

Oplet<?,?> oplet = v.getInstance();
if (oplet instanceof StreamScope) {

代码示例来源:origin: apache/incubator-edgent

public <U> TStream<U> fanin(FanIn<T,U> fanin, List<TStream<T>> others) {
 if (others.isEmpty() || others.size() == 1 && others.contains(this)) 
  throw new IllegalArgumentException("others");  // use pipe()
 if (new HashSet<>(others).size() != others.size())
  throw new IllegalArgumentException("others has dups");
 for (TStream<T> other : others)
 others = new ArrayList<>(others);
 others.add(0, this);
 Vertex<Oplet<T,U>, T, U> fanInVertex = graph().insert(fanin, others.size(), 1);
 int inputPort = 0;
 for (TStream<T> other : others) {
   ConnectorStream<G,T> cs = (ConnectorStream<G, T>) other;
   cs.connector.connect(fanInVertex, inputPort++);
 return derived(fanInVertex.getConnectors().get(0));

代码示例来源:origin: org.apache.edgent/edgent-spi-topology

public List<TStream<T>> split(int n, ToIntFunction<T> splitter) {
  if (n <= 0)
    throw new IllegalArgumentException("n <= 0");
  Split<T> splitOp = new Split<T>(splitter);
  Vertex<Split<T>, T, T> splitVertex = graph().insert(splitOp, 1, n);
  connector.connect(splitVertex, 0);
  List<TStream<T>> outputs = new ArrayList<>(n);
  for (int i = 0; i < n; i++) {
  return outputs;

代码示例来源:origin: apache/incubator-edgent

public List<TStream<T>> split(int n, ToIntFunction<T> splitter) {
  if (n <= 0)
    throw new IllegalArgumentException("n <= 0");
  Split<T> splitOp = new Split<T>(splitter);
  Vertex<Split<T>, T, T> splitVertex = graph().insert(splitOp, 1, n);
  connector.connect(splitVertex, 0);
  List<TStream<T>> outputs = new ArrayList<>(n);
  for (int i = 0; i < n; i++) {
  return outputs;

代码示例来源:origin: apache/incubator-edgent

public TStream<T> union(Set<TStream<T>> others) {
  if (others.isEmpty())
    return this;
  if (others.size() == 1 && others.contains(this))
    return this;
  for (TStream<T> other : others)
  // Create a set we can modify and add this stream
  others = new HashSet<>(others);
  Union<T> fanInOp = new Union<T>();
  Vertex<Union<T>, T, T> fanInVertex = graph().insert(fanInOp, others.size(), 1);
  int inputPort = 0;
  for (TStream<T> other : others) {
    ConnectorStream<G,T> cs = (ConnectorStream<G, T>) other;
    cs.connector.connect(fanInVertex, inputPort++);
  return derived(fanInVertex.getConnectors().get(0));

代码示例来源:origin: org.apache.edgent/edgent-spi-topology

public TStream<T> union(Set<TStream<T>> others) {
  if (others.isEmpty())
    return this;
  if (others.size() == 1 && others.contains(this))
    return this;
  for (TStream<T> other : others)
  // Create a set we can modify and add this stream
  others = new HashSet<>(others);
  Union<T> fanInOp = new Union<T>();
  Vertex<Union<T>, T, T> fanInVertex = graph().insert(fanInOp, others.size(), 1);
  int inputPort = 0;
  for (TStream<T> other : others) {
    ConnectorStream<G,T> cs = (ConnectorStream<G, T>) other;
    cs.connector.connect(fanInVertex, inputPort++);
  return derived(fanInVertex.getConnectors().get(0));

代码示例来源:origin: org.apache.edgent/edgent-spi-graph

@SuppressWarnings({ "unchecked", "rawtypes" })
  public void peekAll(Supplier<? extends Peek<?>> supplier, Predicate<Vertex<?, ?, ?>> select) {
    // Select vertices which satisfy the specified predicate
    List<Vertex<?, ?, ?>> vertices = new ArrayList<>();
    for (Vertex<?, ?, ?> v : getVertices()) {
      if (select.test(v)) {
    // Insert peek oplets on isConnected() output ports
    for (Vertex<?, ?, ?> v : vertices) {
      List<? extends Connector<?>> connectors = v.getConnectors();
      for (Connector<?> c : connectors) {
        if (c.isConnected()) {
          Peek<?> oplet = supplier.get();
          c.peek((Peek) oplet);

代码示例来源:origin: apache/incubator-edgent

@SuppressWarnings({ "unchecked", "rawtypes" })
  public void peekAll(Supplier<? extends Peek<?>> supplier, Predicate<Vertex<?, ?, ?>> select) {
    // Select vertices which satisfy the specified predicate
    List<Vertex<?, ?, ?>> vertices = new ArrayList<>();
    for (Vertex<?, ?, ?> v : getVertices()) {
      if (select.test(v)) {
    // Insert peek oplets on isConnected() output ports
    for (Vertex<?, ?, ?> v : vertices) {
      List<? extends Connector<?>> connectors = v.getConnectors();
      for (Connector<?> c : connectors) {
        if (c.isConnected()) {
          Peek<?> oplet = supplier.get();
          c.peek((Peek) oplet);

代码示例来源:origin: apache/incubator-edgent

public void testGraphToJson2() {
  Graph g = getGraph();
  TestOp<String, Integer> op1 = new TestOp<>();
  Vertex<TestOp<String, Integer>, String, Integer> v = g.insert(op1, 1, 1);
  TestOp<Integer, Integer> op2 = new TestOp<>();
  /*Connector<Integer> out2 = */g.pipe(v.getConnectors().get(0), op2);
  Gson gson = new GsonBuilder().setPrettyPrinting().create();
  String json = gson.toJson(new GraphType(g));
  GraphType gt2 = new Gson().fromJson(json, GraphType.class);
  assertEquals(2, gt2.getVertices().size());
  assertEquals(1, gt2.getEdges().size());
