java—以较小的内存占用执行数百万个可运行的

x8goxv8g  于 2021-07-09  发布在  Java
关注(0)|答案(4)|浏览(402)

我有n个长的是ID。对于每个id,我需要执行一个runnable(即,我不关心返回值),并等待它们全部完成。每个runnable可能需要几秒钟到几分钟,并行运行大约100个线程是安全的。
在我们当前的解决方案中,我们使用executors.newfixedthreadpool(),对每个id调用submit(),然后对每个返回的future调用get()。
代码运行良好,而且非常简单,因为我不必处理线程、复杂的等待逻辑等。它有一个缺点:内存占用。
所有仍然排队的runnable都会消耗内存(比long所需的字节要多得多:这些是我的具有一些内部状态的java类),所有n个未来示例也会消耗内存(这些是具有状态的java类,我只用于等待,但不需要实际结果)。我查看了一个堆转储,估计n=1000万占用了略多于1gib的内存。一个数组中1000万个long只会消耗76个mib。
有没有一种方法可以解决这个问题,只将id保存在内存中,最好不要求助于低级并发编程?

velaa5lx

velaa5lx1#

用什么 ExecutorCompletionService ? 类似于以下内容(可能包含bug,我没有测试它):

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.function.LongFunction;

public class Foo {

  private final ExecutorCompletionService<Void> completionService;
  private final LongFunction<Runnable> taskCreator;
  private final long maxRunning; // max tasks running or queued

  public Foo(Executor executor, LongFunction<Runnable> taskCreator, long maxRunning) {
    this.completionService = new ExecutorCompletionService<>(executor);
    this.taskCreator = taskCreator;
    this.maxRunning = maxRunning;
  }

  public synchronized void processIds(long[] ids) throws InterruptedException {
    int completed = 0;

    int running = 0;
    for (long id : ids) {
      if (running < maxRunning) {
        completionService.submit(taskCreator.apply(id), null);
        running++;
      } else {
        completionService.take();
        running--;
        completed++;
      }
    }

    while (completed < ids.length) {
      completionService.take();
      completed++;
    }

  }

}

上面的另一个版本可以使用 Semaphore 以及 CountDownLatch ,而不是 CompletionService .

public static void processIds(long[] ids, Executor executor,
                              int max, LongFunction<Runnable> taskSup) throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(ids.length);
  Semaphore semaphore = new Semaphore(max);

  for (long id : ids) {
    semaphore.acquire();

    Runnable task = taskSup.apply(id);
    executor.execute(() -> {
      try {
        task.run();
      } finally {
        semaphore.release();
        latch.countDown();
      }
    });

  }

  latch.await();
}
lh80um4z

lh80um4z2#

这是我通常用生产者/消费者模式做的事情,用阻塞队列协调两者,或者,如果我手头有项目的话,用akka演员。
但是我想我会建议一些安静一点的不同的东西,依赖于java的流行为。
直觉是流的延迟执行将被用来限制工作单元、未来及其结果的创建。

public static void main(String[] args) {
    // So we have a list of ids, I stream it
    // (note : if we have an iterator, you could group it by a batch of, say 100,
    // and then flat map each batch)
    LongStream ids = LongStream.range(0, 10_000_000L);
    // This is were the actual tasks will be dispatched
    ExecutorService executor = Executors.newFixedThreadPool(4);

    // For each id to compute, create a runnable, which I call "WorkUnit"
    Optional<Exception> error = ids.mapToObj(WorkUnit::new)
             // create a parralel stream
             // this allows the stream engine to launch the next instructions concurrently
            .parallel()
            // We dispatch ("parallely") the work units to a thread and have them execute
            .map(workUnit -> CompletableFuture.runAsync(workUnit, executor))
            // And then we wait for the unit of work to complete
            .map(future -> {
                try {
                    future.get();
                } catch (Exception e) {
                    // we do care about exceptions
                    return e;
                } finally {
                    System.out.println("Done with a work unit ");
                }
                // we do not care for the result
                return null;
            })
            // Keep exceptions on the stream
            .filter(Objects::nonNull)
            // Stop as soon as one is found
            .findFirst();

    executor.shutdown();
    System.out.println(error.isPresent());
}

老实说,我不确定该行为是否得到了规范的保证,但根据我的经验,它是有效的。每个并行“chunck”获取几个id,然后将其提供给管道(Map到一个工作单元,分派到线程池,等待结果,过滤异常),这意味着很快就达到了一个平衡,将活动工作单元的数量与 executor .
如果要微调并行“chuncks”的数量,应该在这里继续:java8并行流中的自定义线程池

fkvaft9z

fkvaft9z3#

是:你可以有一个长的共享队列。您提交n Runnable s到执行器,其中n是执行器中的线程数,位于 run 方法,则从队列中获取下一个long,然后重新提交新的 Runnable .

piwo6bdm

piwo6bdm4#

与其创建数百万个runnable,不如创建以long作为任务的特定线程池。不要使用future.get()等待任务完成,而是使用countdownlatch。
线程池可以这样实现:

int N = 1000000;// number of tasks;
int T = 100; // number of threads;
CountdownLatch latch = new CountdownLatch(N);
ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<>();

for (int k=0; k<N; k++) {
   queue.put(createNumber(k));
}
for (int k=0; k<T; k++) {
  new WorkingThread().start();
}
CountdownLatch.await();

class WorkingThread extends Thread {
  public void run() {
      while (latch.getCount() != 0) {
           processNumber(queue.take());
           latch.countDown();
      }
  }
}

相关问题