我想要类似的东西 Collectors.maxBy()
,获取集合中的顶级元素的收集器( maxBy
只有一个)。
我有一条小溪 Possibility
可以用一个 Integer score(Possibility)
方法。
首先我试着:
List<Possibity> possibilities = getPossibilityStream()
.parallel()
.collect(Collectors.toList());
if(!possibilities.isEmpty()) {
int bestScore = possibilities.stream()
.mapToInt(p -> score(p))
.max()
.getAsInt();
possibilities = possibilities.stream()
.filter(p -> score(p)==bestScore)
.collect(Collectors.toList());
}
但这样做,我扫描了三次收集。一次构建它,第二次获得最高分数,第三次过滤它,这不是最佳的。此外,可能性的数量可能是巨大的(>1012)。
最好的方法应该是在第一次收集中直接获得最高的可能性,但是似乎没有内置的收集器来做这样的事情。
所以我实现了我自己的 Collector
:
public class BestCollector<E> implements Collector<E, List<E>, List<E>> {
private final Comparator<E> comparator;
private final Class<? extends List> listImpl ;
public BestCollector(Comparator<E> comparator, Class<? extends List> listImpl) {
this.comparator = comparator;
this.listImpl = listImpl;
}
public BestCollector(Comparator<E> comparator) {
this.comparator= comparator;
listImpl = ArrayList.class;
}
@Override
public Supplier<List<E>> supplier() {
return () -> {
try {
return listImpl.newInstance();
} catch (InstantiationException | IllegalAccessException ex) {
throw new RuntimeException(ex);
}
};
}
@Override
public BiConsumer<List<E>, E> accumulator() {
return (list, e) -> {
if (list.isEmpty()) {
list.add(e);
} else {
final int comparison = comparator.compare(list.get(0), e);
if (comparison == 0) {
list.add(e);
} else if (comparison < 0) {
list.clear();
list.add(e);
}
}
};
}
@Override
public BinaryOperator<List<E>> combiner() {
return (l1, l2) -> {
final int comparison = comparator.compare(l1.get(0), l2.get(0));
if (comparison == 0) {
l1.addAll(l2);
return l1;
} else if (comparison < 0) {
return l2;
} else {
return l1;
}
};
}
@Override
public Function<List<E>, List<E>> finisher() {
return Function.identity();
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT, Characteristics.UNORDERED);
}
}
然后:
List<Possibity> possibilities = getPossibilityStream()
.parallel()
.collect(new BestCollector<Possibility>((p1, p2) -> score(p1).compareTo(score(p2)));
这是以顺序模式完成的(没有 .parallel()
)但在并行模式中,偶尔有两个例外:
一 java.lang.IndexOutOfBoundsException Index: 0, Size: 0
在队列中:
final int comparison = comparator.compare(list.get(0), e);
的 accumulator()
方法
我知道当一个 list.clear()
被称为中间人 list.isEmpty()
以及 list.get(0)
.
一 java.lang.NullPointerException
因为可能性是 null
. 同样地,这条线也包括在内:
final int comparison = comparator.compare(list.get(0), e);
我不明白怎么做 list.get(0)
可能会回来 null
...
在并行模式下,有时 list.get(0)
提高 IndexOutOfBoundsException
有时还会回来 null
.
我知道我的代码不是线程安全的,所以我尝试了几种解决方案:
添加 synchronized
在bestcollector的所有方法中: public synchronized …
使用线程安全集合而不是 ArrayList
: java.util.concurrent.CopyOnWriteArrayList
添加 synchronized
使用 CopyOnWriteArrayList
同时
删除 Characteristics.CONCURRENT
在外面 Set<Characteristics>
的 characteristics()
方法
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}
但我不知道 Characteristics.CONCURRENT
在这里表示我的代码是线程安全的,或者我的代码将用于并发处理。
但这些解决方案都没有真正解决问题。
事实上,当我去掉并发特征时,有时会有一个 java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
但话说回来:
final int comparison = comparator.compare(l1.get(0), l2.get(0));
的 combiner()
方法。
然而,政府提出的例外情况 accumulator()
方法似乎不再出现了。
@霍尔格的回答是对的。
完整的解决方案是改变两者 combiner()
以及 characteristics()
方法:
@Override
public BinaryOperator<List<E>> combiner() {
return (l1, l2) -> {
if (l1.isEmpty()) {
return l2;
} else if (l2.isEmpty()) {
return l1;
} else {
final int comparison = comparator.compare(l1.get(0), l2.get(0));
if (comparison == 0) {
l1.addAll(l2);
return l1;
} else if (comparison < 0) {
return l2;
} else {
return l1;
}
}
};
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}
1条答案
按热度按时间jtw3ybtb1#
您的代码只有一个重大错误:如果收集器不是线程安全的,则不应报告
Characteristics.CONCURRENT
因为这正是声称它是线程安全的。你必须明白的一点是,对于非-
CONCURRENT
在收集器中,框架将执行必要的步骤,以线程安全但仍然有效的方式使用它:对于每个工作线程,将通过
supplier()
每个工人将使用accumulator()
与自己的本地容器一起运行这个
combiner()
将在两个工作线程完成工作后使用这个
finisher()
将在所有工作线程完成其工作并且合并了所有容器时使用因此,您所要做的就是确保您的供应商在每次调用时真正返回一个新示例,并且所有函数都是无干扰和无副作用的(除了作为参数接收的容器之外的任何其他函数),当然,不报告
Characteristics.CONCURRENT
当您的收集器不是并发收集器时。你不需要
synchronized
关键字或并发集合。顺便说一下,一个
Comparator
窗体的(p1, p2) -> score(p1).compareTo(score(p2))
可以使用Comparator.comparing(p -> score(p))
或者如果得分值是int
:Comparator.comparingInt(p -> score(p))
.最后,combiner函数不会检查其中一个列表是否为空。这很好地解释了
IndexOutOfBoundsException
在combiner
而IndexOutOfBoundsException
在accumulator
是你的收集者报告的结果吗Characteristics.CONCURRENT
…了解添加
synchronized
关键字到accumulator()
或者combiner()
方法不保护通过lambda表达式构造的函数。它将保护构造函数示例的方法,而不是函数的代码本身。与内部类不同的是,在synchronized
关键字到实际函数的实现方法。