flink gelly扩展edge类并在dataset中使用

igetnqfo  于 2021-06-21  发布在  Flink


  1. public class TemporalEdgev3<K, V> extends Edge<K, Tuple3<V,Integer,Integer>> {
  2. /*
  3. Creates new temporaledge with only null values
  4. */
  5. public TemporalEdgev3() {}
  6. /*
  7. * Constructor to make a temporal edge version 2, has 5 input values but makes a
  8. * typle 3 which is compatible with Gelly
  9. * */
  10. public TemporalEdgev3(K src, K trg, V val, Integer start, Integer end) {
  11. this.f0 = src;
  12. this.f1 = trg;
  13. this.f2 = new Tuple3<V,Integer,Integer>(val,start,end);
  14. }


  1. // a temporal set created with Flink, now we need to make it into a temporal set into gelly
  2. DataSet<Tuple5<Long,Long, Double,Integer, Integer>> temporalset = env.readCsvFile("./datasets/testdata")
  3. .fieldDelimiter(",") // node IDs are separated by spaces
  4. .ignoreComments("%") // comments start with "%"
  5. .types(Long.class,Long.class,Double.class,Integer.class,Integer.class); // read the node IDs as Longs
  6. DataSet<TemporalEdgev3<Long,Double>> edgeset3 = temporalset.map(new MapFunction<Tuple5<Long, Long, Double, Integer, Integer>, TemporalEdgev3<Long, Double>>() {
  7. @Override
  8. public TemporalEdgev3<Long, Double> map(Tuple5<Long, Long, Double, Integer, Integer> value) throws Exception {
  9. return new TemporalEdgev3<Long, Double>(value.f0,value.f1,value.f2,value.f3,value.f4);
  10. }
  11. });
  12. DataSet<Edge<Long,Tuple3<Double,Integer,Integer>>> edgeset4 = temporalset.map(new MapFunction<Tuple5<Long, Long, Double, Integer, Integer>, Edge<Long, Tuple3<Double, Integer, Integer>>>() {
  13. @Override
  14. public Edge<Long, Tuple3<Double, Integer, Integer>> map(Tuple5<Long, Long, Double, Integer, Integer> value) throws Exception {
  15. return new Edge<Long, Tuple3<Double, Integer, Integer>>(value.f0,value.f1, new Tuple3<Double, Integer, Integer>(value.f2,value.f3,value.f4));
  16. }
  17. });
  18. Graph<Long, NullValue, Tuple3<Double,Integer,Integer>> temporalgraph = Graph.fromDataSet(edgeset4,env);
  19. Graph<Long,NullValue, Tuple3<Double,Integer,Integer>> temporalgraph2 = Graph.fromDataSet(edgeset3,env);


  1. fromDataSet
  2. (org.apache.flink.api.java.DataSet<org.apache.flink.graph.Edge<K,EV>>,
  3. ExecutionEnvironment)
  4. in Graph cannot be applied
  5. to
  6. (org.apache.flink.api.java.DataSet<flink.gelly.school.TemporalEdgev3<java.lang.Long,java.lang.Double>>,
  7. ExecutionEnvironment)
  9. reason: no instance(s) of type variable(s) EV, K exist so that TemporalEdgev3<Long, Double> conforms to Edge<K, EV>




你不需要延长时间 Edge 类型。你可以简单地使用 Tuple3 或自定义边值类型。
您的图形声明 Graph<Long, NullValue, Tuple3<Double,Integer,Integer>> 期望 DataSet<Edge<Long, Tuple3<Double,Integer,Integer>>> 作为边输入。所以你的 temporalgraph2 申报有效,但 temporalgraph 不。
