在flink数据集上应用多个连接的java分区策略

kqlmhetl  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(285)

我正在使用 Flink 1.4.0 .
假设我有一个 POJO 具体如下:

public class Rating {
    public String name;
    public String labelA;
    public String labelB;
    public String labelC;
    ...        
}

和一个 JOIN 功能:

public class SetLabelA implements JoinFunction<Tuple2<String, Rating>, Tuple2<String, String>, Tuple2<String, Rating>> {

    @Override
        public Tuple2<String, Rating> join(Tuple2<String, Rating> rating, Tuple2<String, String> labelA) {
        rating.f1.setLabelA(labelA)
        return rating;
    }
}

假设我想申请 JOIN 操作来设置 DataSet<Tuple2<String, Rating>> ,我可以这样做:

DataSet<Tuple2<String, Rating>> ratings = // [...]
DataSet<Tuple2<String, Double>> aLabels = // [...]
DataSet<Tuple2<String, Double>> bLabels = // [...]
DataSet<Tuple2<String, Double>> cLabels = // [...]
...
DataSet<Tuple2<String, Rating>>
            newRatings =
            ratings.leftOuterJoin(aLabels, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)

                   // key of the first input
                   .where("f0")

                   // key of the second input
                   .equalTo("f0")

                   // applying the JoinFunction on joining pairs
                   .with(new SetLabelA());

不幸的是,这是必要的评级和所有 xLabels 它们很大 DataSets 我不得不调查每一个 xlabels 为了找到我需要的字段值,同时不是所有评级键都存在于每个字段中 xlabels .
这实际上意味着我必须执行 leftOuterJoinxlabel ,为此,我还需要创建相应的 JoinFunction 使用来自
Rating POJO .
有没有更有效的方法来解决这个问题,任何人都能想到?
至于分区策略,我已经确保对 DataSet<Tuple2<String, Rating>> ratings 使用:

DataSet<Tuple2<String, Rating>> sorted_ratings = ratings.sortPartition(0, Order.ASCENDING).setParallelism(1);

通过将parallelism设置为1,我可以确保整个数据集都是有序的。然后我使用 .partitionByRange :

DataSet<Tuple2<String, Rating>> partitioned_ratings = sorted_ratings.partitionByRange(0).setParallelism(N);

哪里 N 是我的虚拟机上的内核数。另一个问题是 .setParallelism 如果设置为1,那么就如何执行管道的其余部分而言是有限制的,也就是说,后续 .setParallelism(N) 改变 DataSet 是否已处理?
最后,我做了所有这些所以当 partitioned_ratings 与一个
xlabels DataSet ,将使用 JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE . 根据 Flink 文档 v.1.4.0 :
重新分区\排序\合并:系统对每个输入进行分区(洗牌)(除非输入已经分区)并对每个输入进行排序(除非输入已经排序)。输入通过排序输入的流式合并连接。如果一个或两个输入都已排序,则此策略是好的。
所以就我而言, ratings 我想是这样的
xlabels DataSets 因此,这是最有效的策略。有什么问题吗?有别的办法吗?

y1aodyip

y1aodyip1#

到目前为止我还没能通过这个策略。好像是依靠 JOINs 太麻烦了,因为它们是昂贵的操作,除非真的有必要,否则应该避免它们。
例如, JOINs 如果两者都是,则应使用 Datasets 它们的尺寸很大。如果不是,一个方便的替代方法是使用 BroadCastVariables 两个人中的哪一个 Datasets (最小的一个),无论是出于什么目的,都会广播给员工。下面是一个示例(为了方便起见,从该链接复制)

DataSet<Point> points = env.readCsv(...);

DataSet<Centroid> centroids = ... ; // some computation

points.map(new RichMapFunction<Point, Integer>() {

    private List<Centroid> centroids;

    @Override
    public void open(Configuration parameters) {
        this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
    }

    @Override
    public Integer map(Point p) {
        return selectCentroid(centroids, p);
    }

}).withBroadcastSet("centroids", centroids);

另外,由于填充pojo的字段意味着一个非常相似的代码将被重复利用,因此应该使用 jlens 以避免代码重复,并编写一个更简洁和易于遵循的解决方案。

相关问题