java—在上下文线程中执行完成时如何防止completablefuture#

tzcvj98z  于 2021-07-03  发布在  Java
关注(0)|答案(1)|浏览(538)

我有以下代码:

ConcurrentHashMap taskMap= new ConcurrentHashMap();
....
taskMap.compute(key, (k, queue) -> {
        CompletableFuture<Void> future = (queue == null)
                ? CompletableFuture.runAsync(myTask, poolExecutor)
                : queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor);
        //to prevent OutOfMemoryError in case if we will have too much keys
        future.whenComplete((r, e) -> taskMap.remove(key, future));            
        return future;
    });

这个代码的问题 future 已完成 whenComplete 函数参数与在同一线程中调用 compute 调用。在这个方法的主体中,我们从Map中删除条目。但是计算方法文档禁止这样做,应用程序冻结。
如何解决此问题?

qltillow

qltillow1#

最明显的解决办法是 whenCompleteAsync 而不是 whenComplete ,因为前者保证使用提供的 Executor 而不是调用线程。这可以用

Executor ex = r -> { System.out.println("job scheduled"); new Thread(r).start(); };
for(int run = 0; run<2; run++) {
    boolean completed = run==0;
    System.out.println("***"+(completed? "with already completed": "with async"));
    CompletableFuture<String> source = completed?
        CompletableFuture.completedFuture("created   in "+Thread.currentThread()):
        CompletableFuture.supplyAsync(() -> {
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
            return "created   in "+Thread.currentThread();
        }, ex);

    source.thenApplyAsync(s -> s+"\nprocessed in "+Thread.currentThread(), ex)
          .whenCompleteAsync((s,t) -> {
                if(t!=null) t.printStackTrace(); else System.out.println(s);
                System.out.println("consumed  in "+Thread.currentThread());
            }, ex)
          .join();
}

它会打印出


***with already completed

job scheduled
job scheduled
created   in Thread[main,5,main]
processed in Thread[Thread-0,5,main]
consumed  in Thread[Thread-1,5,main]

***with async

job scheduled
job scheduled
job scheduled
created   in Thread[Thread-2,5,main]
processed in Thread[Thread-3,5,main]
consumed  in Thread[Thread-4,5,main]

所以你可以用

taskMap.compute(key, (k, queue) -> {
        CompletableFuture<Void> future = (queue == null)
                ? CompletableFuture.runAsync(myTask, poolExecutor)
                : queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor);
        //to prevent OutOfMemoryError in case if we will have too much keys
        future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), poolExecutor);
        return future;
    });

如果提前完成的可能性很大,那么可以使用

taskMap.compute(key, (k, queue) -> {
        CompletableFuture<Void> future = (queue == null)
                ? CompletableFuture.runAsync(myTask, poolExecutor)
                : queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor);
        //to prevent OutOfMemoryError in case if we will have too much keys
        if(future.isDone()) future = null;
        else future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), poolExecutor);
        return future;
    });

也许,您没有找到这个明显的解决方案,因为您不喜欢依赖操作总是被安排为池中的新任务,即使完成已经发生在不同的任务中。您可以使用专门的执行器来解决此问题,该执行器只会在必要时重新安排任务:

Executor inPlace = Runnable::run;
Thread forbidden = Thread.currentThread();
Executor forceBackground
       = r -> (Thread.currentThread()==forbidden? poolExecutor: inPlace).execute(r);

…

future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), forceBackground);

但您可能会重新考虑是否真的需要这种复杂的per-mapping清理逻辑。这不仅很复杂,而且可能会产生显著的开销,可能会安排大量的清理操作,而这些操作在执行时已经过时,实际上并不需要。
它可能更简单,甚至更有效地执行

taskMap.values().removeIf(CompletableFuture::isDone);

不时清理整个Map。

相关问题