如何在spark中匹配具有阈值的双行?

gxwragnw  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(306)

我有一个非常简单的Dataframe:

+--+------+
|Id|Amount|
+--+------+
|0 |3.47  |
|1 |-3.47 |
|2 |3.47  |
|3 |3.47  |
|4 |2.02  |
|5 |-2.01 |
|6 |-2.01 |
|7 |7.65  |
|8 |7.65  |
+--+------+

我想匹配给定阈值(假设为0.5)的互相抵消的行。所以在本例中,匹配行0和1,4和5,以及返回行2和3。有几种解决方案,返回第0行和第2行也可以。
一般的想法是,他们应该2对2和剩菜返回。如果每一行都有一个匹配项,它应该不返回任何内容,并且应该返回所有不能以这种方式配对的行。
你知道怎么做吗?
预期结果:

+--+------+
|Id|Amount|
+--+------+
|0 |3.47  |
|2 |3.47  |
|6 |-2.01 |
|7 |7.65  |
|8 |7.65  |
+--+------+

我一直在考虑使用 UserDefinedAggregateFunction ,但我不确定这是否足够。尤其是因为我认为每一组行只能返回一个值。

rdlzhqv9

rdlzhqv91#

我和一个自由民主党人去了。用java编写自定义项非常复杂。。。
如果有人能找到一个方法来简化混乱,请张贴或评论。

private UDF1<WrappedArray<Row>, Row[]> matchData() {
    return (data) -> {
        List<Data> dataList = JavaConversions.seqAsJavaList(data).stream().map(Data::fromRow).collect(Collectors.toList());
        Set<Data> matched = new HashSet<>();

        for (Data element : dataList) {
            if (matched.contains(element)) continue;

            dataList.stream().filter(e -> !matched.contains(e) && e != element)
                    .filter(e -> Math.abs(e.getAmount() + element.getAmount()) < THRESHOLD
                            && Math.signum(e.getAmount()) != Math.signum(element.getAmount()))
                    .min(Comparator.comparingDouble(e -> Math.abs(e.getAmount() + element.getAmount())))
                    .ifPresent(e -> {
                        matched.add(e);
                        matched.add(element);
                    });
        }

        if (matched.size() != dataList.size()) {
            return dataList.stream().map(Data::toRow).toArray(Row[]::new);
        } else {
            return new Row[0];
        }
    };
}

对于数据类(使用lombok):

@AllArgsConstructor
@EqualsAndHashCode
@Data
public final class Data {
    private String name;
    private Double amount;

    public static Data fromRow(Row r) {
        return new Data(
                r.getString(r.fieldIndex("name")),
                r.getDouble(r.fieldIndex("amount")));
    }

    public Row toRow() {
        return RowFactory.create(name, amount);
    }
}

我要把整套都还回去,以防它坏了,这正是我需要的。

相关问题