在java中使用executorservice执行第一个Runnable时,是否可以阻止其他Runnable?

uttx8gqw  于 2021-06-08  发布在  Java
关注(0)|答案(2)|浏览(388)

我正试图处理一个相对巨大的 StreamList 在多线程中,使用 ExecutorService . 这个方法看起来像这样。

public void initMigration() {
    ExecutorService executorService = Executors.newCachedThreadPool();
    try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {            
        streamOfLists.forEach(record4List -> {
            Runnable runnable = () -> {
                try {
                    final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
                    LOGGER.info("Invoking POST with payload {}", attachments);
                    Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
                    restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
                } catch (ExceptionA | ExceptionB e) {
                    e.printStackTrace();
                }
            };
            executorService.submit(runnable);
        });
    }
    LOGGER.info("Shutting down the ExecutorService");
    executorService.shutdown();
}

基本上,我要做的是 ListStream ,一个 Runnable 正在创建并提交给 ExecutorService . 它似乎工作正常。但是,我现在真正想做的是,看看有没有办法 ExecutorService 跑第一个 Runnable 从第一个 ListStream 同时阻止其他人 Runnables 直到它执行,并继续运行其他 Runnables (并行)之后。真的需要一些帮助。

hyrbngr7

hyrbngr71#

@阿列克谢的方法是(imo)解决这个问题的正确方法。别挡着跑。而是在运行runnables的前提条件已经满足时提交它们。
让一个可运行的块与其他块的问题是,你很容易堵塞执行器的线程池与任务被阻止等待另一个任务完成。实际上,如果线程池是有界的,那么您甚至可能会遇到这样的情况:所有线程都处于这种状态,并且执行器无法启动将取消阻止它们的任务。结果:死锁!
如果您仍然想阻止runnable(尽管有上述内容),那么可以使用 CountDownLatch .
在示例化 Runnable s、 创建 CountDownLatch 初始计数器为的示例 1 . 此示例必须由所有 Runnable s。
代码一 Runnable 这样它就能得到一个 List ,处理它,然后调用 latch.count() ;
请稍等 Runnable 打电话 latch.await() 然后提取并处理 List .
使用第一个任务提交一个任务 Runnable 剩下的用第二个。

vsnjm48y

vsnjm48y2#

您可以获取第一个runnable,执行它,然后提交其他runnable。

try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
        Iterator<List<Record4<Integer, Integer, String, byte[]>>> it = streamOfLists.iterator();
        if (it.hasNext()) {
            List<Record4<Integer, Integer, String, byte[]>> list = it.next();
            Runnable runnable = new MyRunnable(record4List);
            runnable.run();
        }
        while (it.hasNext()) {
            List<Record4<Integer, Integer, String, byte[]>> list = it.next();
            Runnable runnable = new MyRunnable(record4List);
            executorService.submit(runnable);
        }
    }

哪里

class MyRunnable implements Runnable {
    Record4<Integer, Integer, String, byte[]> record4List;

    MyRunnable(Record4<Integer, Integer, String, byte[]> record4List) {
        this.record4List = record4List;
    }

    @Override
    public void run() {
        try {
            final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
            LOGGER.info("Invoking POST with payload {}", attachments);
            Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
            restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
        } catch (ExceptionA | ExceptionB e) {
            e.printStackTrace();
        }
    }
}

相关问题