代码示例来源:origin: apache/flink
public Tuple2<K, Either<NullValue, Message>> map(Vertex<K, VV> vertex) {
outTuple.f0 = vertex.getId();
return outTuple;
代码示例来源:origin: apache/flink
public Projection(
Vertex<KC, VVC> connectingVertex,
VV sourceVertexValue, VV targetVertexValue,
EV sourceEdgeValue, EV targetEdgeValue) {
this.f0 = connectingVertex.getId();
this.f1 = connectingVertex.getValue();
this.f2 = sourceVertexValue;
this.f3 = targetVertexValue;
this.f4 = sourceEdgeValue;
this.f5 = targetEdgeValue;
代码示例来源:origin: apache/flink
public void coGroup(Iterable<Edge<K, EV>> edges,
Iterable<Vertex<K, VV>> state,
Collector<Tuple2<K, Message>> out) throws Exception {
final Iterator<Vertex<K, VV>> stateIter = state.iterator();
if (stateIter.hasNext()) {
Vertex<K, VV> newVertexState = stateIter.next();
scatterFunction.set(edges.iterator(), out, newVertexState.getId());
代码示例来源:origin: apache/flink
public Vertex<K, VV> map(Vertex<K, Tuple3<VV, LongValue, LongValue>> vertex) {
return new Vertex<>(vertex.getId(), vertex.getValue().f0);
代码示例来源:origin: apache/flink
public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> state,
Collector<Tuple2<K, Message>> out) throws Exception {
final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> stateIter = state.iterator();
if (stateIter.hasNext()) {
Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = stateIter.next();
nextVertex.f0 = vertexWithDegrees.f0;
nextVertex.f1 = vertexWithDegrees.f1.f0;
scatterFunction.set(edges.iterator(), out, vertexWithDegrees.getId());
代码示例来源:origin: apache/flink
public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet)
throws Exception {
Vertex<Long, Point> srcVertex = triplet.getSrcVertex();
Vertex<Long, Point> trgVertex = triplet.getTrgVertex();
return new Tuple3<>(srcVertex.getId(), trgVertex.getId(),
代码示例来源:origin: apache/flink
public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages) {
double minDistance = (vertex.getId().equals(srcId)) ? 0d : Double.POSITIVE_INFINITY;
for (Double msg : messages) {
minDistance = Math.min(minDistance, msg);
if (minDistance < vertex.getValue()) {
for (Edge<Long, Double> e: getEdges()) {
sendMessageTo(e.getTarget(), minDistance + e.getValue());
代码示例来源:origin: apache/flink
public Vertex<K, Tuple2<Long, Double>> map(Vertex<K, Long> vertex) {
return new Vertex<>(vertex.getId(), new Tuple2<>(vertex.getValue(), 1.0));
代码示例来源:origin: apache/flink
public void reduce(Iterable<Vertex<K, VV>> values, Collector<VertexGroupItem<K, VV>> out) throws Exception {
K vertexGroupRepresentativeID = null;
long vertexGroupCount = 0L;
VV vertexGroupValue = null;
boolean isFirstElement = true;
for (Vertex<K, VV> vertex : values) {
if (isFirstElement) {
// take final group representative vertex id from first tuple
vertexGroupRepresentativeID = vertex.getId();
vertexGroupValue = vertex.getValue();
isFirstElement = false;
// no need to set group value for those tuples
createGroupRepresentativeTuple(vertexGroupRepresentativeID, vertexGroupValue, vertexGroupCount);
代码示例来源:origin: apache/flink
public void coGroup(
Iterable<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> messages,
Iterable<Edge<K, EV>> edgesIterator,
Collector<Either<Vertex<K, VV>, Tuple2<K, Message>>> out) throws Exception {
final Iterator<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> vertexIter =
if (vertexIter.hasNext()) {
final Tuple2<Vertex<K, VV>, Either<NullValue, Message>> first = vertexIter.next();
final Vertex<K, VV> vertexState = first.f0;
final MessageIterator<Message> messageIter = new MessageIterator<>();
if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
// there are no messages during the 1st superstep
else {
Iterator<Tuple2<?, Either<NullValue, Message>>> downcastIter =
(Iterator<Tuple2<?, Either<NullValue, Message>>>) (Iterator<?>) vertexIter;
computeFunction.set(vertexState.getId(), edgesIterator.iterator(), out);
computeFunction.compute(vertexState, messageIter);
代码示例来源:origin: apache/flink
public void iterateEdges(Vertex<String, NullValue> vertex,
Iterable<Edge<String, Integer>> edges, Collector<Tuple2<String, String>> out) throws Exception {
int maxPlaycount = 0;
String topSong = "";
for (Edge<String, Integer> edge : edges) {
if (edge.getValue() > maxPlaycount) {
maxPlaycount = edge.getValue();
topSong = edge.getTarget();
out.collect(new Tuple2<>(vertex.getId(), topSong));
代码示例来源:origin: apache/flink
public void join(Vertex<K, VV> vertex, Tuple3<K, LongValue, LongValue> degrees,
Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
out.collect(new Vertex<>(vertex.getId(),
new Tuple3<>(vertex.getValue(), degrees.f1, degrees.f2)));
代码示例来源:origin: apache/flink
public void sendMessages(Vertex<Long, Double> vertex) throws Exception {
if (getSuperstepNumber() == 1) {
if (vertex.getId().equals(edgeToBeRemoved.getSource())) {
// activate the edge target
sendMessageTo(edgeToBeRemoved.getSource(), Double.MAX_VALUE);
if (getSuperstepNumber() > 1) {
// invalidate all edges
for (Edge<Long, Double> edge : getEdges()) {
sendMessageTo(edge.getSource(), Double.MAX_VALUE);
代码示例来源:origin: com.alibaba.blink/flink-gelly
public Tuple2<K, Either<NullValue, Message>> map(Vertex<K, VV> vertex) {
outTuple.f0 = vertex.getId();
return outTuple;
代码示例来源:origin: vasia/gelly-streaming
public boolean filter(Vertex<K, NullValue> vertex) throws Exception {
if (!keys.contains(vertex.getId())) {
return true;
return false;
代码示例来源:origin: org.apache.flink/flink-gelly_2.11
public Projection(
Vertex<KC, VVC> connectingVertex,
VV sourceVertexValue, VV targetVertexValue,
EV sourceEdgeValue, EV targetEdgeValue) {
this.f0 = connectingVertex.getId();
this.f1 = connectingVertex.getValue();
this.f2 = sourceVertexValue;
this.f3 = targetVertexValue;
this.f4 = sourceEdgeValue;
this.f5 = targetEdgeValue;
代码示例来源:origin: vasia/gelly-streaming
public boolean filter(Vertex<Long, NullValue> vertex) throws Exception {
return vertex.getId() > 1;
代码示例来源:origin: vasia/gelly-streaming
public Vertex<K, Long> map(Vertex<K, Long> degree) throws Exception {
K key = degree.getId();
if (!localDegrees.containsKey(key)) {
localDegrees.put(key, 0L);
localDegrees.put(key, localDegrees.get(key) + degree.getValue());
return new Vertex<>(key, localDegrees.get(key));
代码示例来源:origin: org.apache.flink/flink-gelly_2.10
public void coGroup(Iterable<Edge<K, EV>> edges,
Iterable<Vertex<K, VV>> state,
Collector<Tuple2<K, Message>> out) throws Exception {
final Iterator<Vertex<K, VV>> stateIter = state.iterator();
if (stateIter.hasNext()) {
Vertex<K, VV> newVertexState = stateIter.next();
scatterFunction.set(edges.iterator(), out, newVertexState.getId());
代码示例来源:origin: com.alibaba.blink/flink-gelly
public void join(Vertex<K, VV> vertex, Tuple3<K, LongValue, LongValue> degrees,
Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
out.collect(new Vertex<>(vertex.getId(),
new Tuple3<>(vertex.getValue(), degrees.f1, degrees.f2)));