多线程—并行处理传入数据的最佳方法((java/groovy)

iq3niunx  于 2021-07-03  发布在  Java
关注(0)|答案(2)|浏览(390)

我有一个程序,可以解析来自多个设备(大约1000个设备)的报告,将它们保存到数据库中,然后对它们进行额外的处理。
分析报告可以同时进行,但是保存到db和附加处理需要基于它们来自哪个设备id进行一些同步(因为可能需要更新db上的相同数据)。
因此,只要线程处理来自不同设备id的报告,我就可以并行运行处理。
处理这个问题最有效的方法是什么?

示例

我最初考虑使用线程池并锁定设备id,但如果从单个设备收到大量报告,那就没有效率了。
例如,考虑具有4个线程和10个传入报告的线程池:
报告#设备A2A3A4A5A6B7C8D9E10F
线程1将开始处理a的报告,线程2-4将等待线程1完成,其余的报告将进入队列。
如果a的其余报告可以排队,允许b/c/d报告同时处理,那么效率会更高。有没有有效的方法?

9w11ddsr

9w11ddsr1#

织布机项目

在youtube.com上观看了oracle project loom负责人ron pressler的一些2020年后期视频后,解决方案非常简单,java的未来版本将提供新的虚拟线程(光纤)功能:
叫一个新的 Executors 方法创建使用虚拟线程(光纤)而不是平台/内核线程的执行器服务。
将所有传入的报表处理任务提交给该执行器服务。
在每个任务中,尝试获取一个信号量,为1000个设备中的每一个获取一个信号量。
该信号量将是一次只处理每个设备一个输入的方式,以并行化每个源设备。如果表示特定设备的信号量不可用,只需阻塞-让报表处理线程等待信号量可用。
projectloom维护许多轻量级虚拟线程(fibres),甚至数百万个,它们运行在一些重量级平台/内核线程上。这使得阻塞线程变得便宜。
早期版本的jdk二进制文件带有内置于macos/linux/windows的projectloom,现在可以使用了。
警告:我既不是并发方面的Maven,也不是projectloom方面的Maven。但您的特定用例似乎与ronpressler在视频中提出的一些具体建议相匹配。

示例代码

下面是一些我用过的示例代码。我不确定这是不是一个好例子。
我使用了java 16的早期access构建,特别是使用project loom技术构建的: Build 16-loom+9-316 (2020/11/30) 对于macos intel。

package work.basil.example;

import java.time.*;
import java.util.*;
import java.util.concurrent.*;

/**
 * An example of using Project Loom virtual threads to more simply process incoming data on background threads.
 * <p>
 * This code was built as a possible solution to this Question at StackOverflow.com: https://stackoverflow.com/q/65327325/642706
 * <p>
 * Posted in my Answer at StackOverflow.com: https://stackoverflow.com/a/65328799/642706
 * <p>
 * ©2020 Basil Bourque. 2020-12.
 * <p>
 * This work by Basil Bourque is licensed under CC BY 4.0. To view a copy of this license, visit https://creativecommons.org/licenses/by/4.0
 * <p>
 * Caveats:
 * - Project Loom is still in early-release, available only as a special build of OpenJDK for Java 16.
 * - I am *not* an expert on concurrency in general, nor Project Loom in particular. This code is merely me guessing and experimenting.
 */
public class App
{
    // FYI, Project Loom links:
    // https://wiki.openjdk.java.net/display/loom/Main
    // http://jdk.java.net/loom/  (special early-access builds of Java 16 with Project Loom built-in)
    // https://download.java.net/java/early_access/loom/docs/api/ (Javadoc)
    // https://www.youtube.com/watch?v=23HjZBOIshY  (Ron Pressler talk, 2020-07)

    public static void main ( String[] args )
    {
        System.out.println( "java.version: " + System.getProperty( "java.version" ) );
        App app = new App();
        app.checkForProjectLoom();
        app.demo();
    }

    public static boolean projectLoomIsPresent ( )
    {
        try
        {
            Thread.class.getDeclaredMethod( "startVirtualThread" , Runnable.class );
            return true;
        }
        catch ( NoSuchMethodException e )
        {
            return false;
        }
    }

    private void checkForProjectLoom ( )
    {
        if ( App.projectLoomIsPresent() )
        {
            System.out.println( "INFO - Running on a JVM with Project Loom technology. " + Instant.now() );
        } else
        {
            throw new IllegalStateException( "Project Loom technology not present in this Java implementation. " + Instant.now() );
        }
    }

    record ReportProcessorRunnable(Semaphore semaphore , Integer deviceIdentifier , boolean printToConsole , Queue < String > fauxDatabase) implements Runnable
    {
        @Override
        public void run ( )
        {
            // Our goal is to serialize the report-processing per device.
            // Each device can have only one report being processed at a time.
            // In Project Loom this can be accomplished simply by spawning virtual threads for all such
            // reports but process them serially by synchronizing on a binary (single-permit) semaphore.
            // Each thread working on a report submitted for that device waits on semaphore assigned to that device.
            // Blocking to wait for the semaphore is cheap in Project Loom using virtual threads. The underlying
            // platform/kernel thread carrying this virtual thread will be assigned other work while this
            // virtual thread is parked.
            try
            {
                semaphore.acquire(); // Blocks until the semaphore for this particular device becomes available. Blocking is cheap on a virtual thread.
                // Simulate more lengthy work being done by sleeping the virtual thread handling this task via the executor service.
                try {Thread.sleep( Duration.ofMillis( 100 ) );} catch ( InterruptedException e ) {e.printStackTrace();}
                String fauxData = "Insert into database table for device ID # " + this.deviceIdentifier + " at " + Instant.now();
                fauxDatabase.add( fauxData );
                if ( Objects.nonNull( this.printToConsole ) && this.printToConsole ) { System.out.println( fauxData ); }
                semaphore.release();  // For fun, comment-out this line to see the effect of the per-device semaphore at runtime.
            }
            catch ( InterruptedException e )
            {
                e.printStackTrace();
            }
        }
    }

    record IncomingReportsSimulatorRunnable(Map < Integer, Semaphore > deviceToSemaphoreMap ,
                                            ExecutorService reportProcessingExecutorService ,
                                            int countOfReportsToGeneratePerBatch ,
                                            boolean printToConsole ,
                                            Queue < String > fauxDatabase)
            implements Runnable
    {
        @Override
        public void run ( )
        {
            if ( printToConsole ) System.out.println( "INFO - Generating " + countOfReportsToGeneratePerBatch + " reports at " + Instant.now() );
            for ( int i = 0 ; i < countOfReportsToGeneratePerBatch ; i++ )
            {
                // Make a new Runnable task containing report data to be processed, and submit this task to the executor service using virtual threads.
                // To simulate a device sending in a report, we randomly pick one of the devices to pretend it is our source of report data.
                final List < Integer > deviceIdentifiers = List.copyOf( deviceToSemaphoreMap.keySet() );
                int randomIndexNumber = ThreadLocalRandom.current().nextInt( 0 , deviceIdentifiers.size() );
                Integer deviceIdentifier = deviceIdentifiers.get( randomIndexNumber );
                Semaphore semaphore = deviceToSemaphoreMap.get( deviceIdentifier );
                Runnable processReport = new ReportProcessorRunnable( semaphore , deviceIdentifier , printToConsole , fauxDatabase );
                reportProcessingExecutorService.submit( processReport );
            }
        }
    }

    private void demo ( )
    {
        // Configure experiment.
        Duration durationOfExperiment = Duration.ofSeconds( 20 );
        int countOfReportsToGeneratePerBatch = 7;  // Would be 40 per the Stack Overflow Question.
        boolean printToConsole = true;

        // To use as a concurrent list, I found this suggestion to use `ConcurrentLinkedQueue`: https://stackoverflow.com/a/25630263/642706
        Queue < String > fauxDatabase = new ConcurrentLinkedQueue < String >();

        // Represent each of the thousand devices that are sending us report data to be processed.
        // We map each device to a Java `Semaphore` object, to serialize the processing of multiple reports per device.
        final int firstDeviceNumber = 1_000;
        final int countDevices = 10; // Would be 1_000 per the Stack Overflow question.
        final Map < Integer, Semaphore > deviceToSemaphoreMap = new TreeMap <>();
        for ( int i = 0 ; i < countDevices ; i++ )
        {
            Integer deviceIdentifier = i + firstDeviceNumber; // Our devices are identified as numbered 1,000 to 1,999.
            Semaphore semaphore = new Semaphore( 1 , true ); // A single permit to make a binary semaphore, and make it fair.
            deviceToSemaphoreMap.put( deviceIdentifier , semaphore );
        }

        // Run experiment.
        // Notice that in Project Loom the `ExecutorService` interface is now `AutoCloseable`, for use in try-with-resources syntax.
        try (
                ScheduledExecutorService reportGeneratingExecutorService = Executors.newSingleThreadScheduledExecutor() ;
                ExecutorService reportProcessingExecutorService = Executors.newVirtualThreadExecutor() ;
        )
        {
            Runnable simulateIncommingReports = new IncomingReportsSimulatorRunnable( deviceToSemaphoreMap , reportProcessingExecutorService , countOfReportsToGeneratePerBatch , printToConsole , fauxDatabase );
            ScheduledFuture scheduledFuture = reportGeneratingExecutorService.scheduleAtFixedRate( simulateIncommingReports , 0 , 1 , TimeUnit.SECONDS );
            try {Thread.sleep( durationOfExperiment );} catch ( InterruptedException e ) {e.printStackTrace();}
        }
        // Notice that when reaching this point we block until all submitted tasks still running are finished,
        // because that is the new behavior of `ExecutorService` being `AutoCloseable`.
        System.out.println( "INFO - executor services shut down at this point. " + Instant.now() );

        // Results of experiment
        System.out.println( "fauxDatabase.size(): " + fauxDatabase.size() );
        System.out.println( "fauxDatabase = " + fauxDatabase );
    }
}
mwngjboj

mwngjboj2#

尝试使用优先级队列。线程池将选择队列中优先级最高的项进行处理。例如:
注意:我知道优先级队列通常不使用数组实现,有些优先级队列使用较小的索引值来获得更高的优先级。为了简单起见,我用这个符号。
let(设备ID,优先级)。让当前线程池为空->[]
假设,我们得到一个传入的10个报告->[(a,1),(a,1),(a,1),(b,1),(b,1),(c,1),(d,1),(e,1),(f,1),(g,1)](表示接收到报告时填充的优先级队列)。
因此,您将第一个项目出列并将其交给线程池。然后使用deviceid a降低优先级队列中所有项目的优先级。如下所示:
(a,1)已退出队列,因此您只需获得a。然后,优先级队列将在降低仍在队列中的a的优先级之后发生移动[(b、 (1),(b,1),(c,1),(d,1),(e,1),(f,1),(g,1),(a,0),(a,0)]

相关问题