我有一个程序,可以解析来自多个设备(大约1000个设备)的报告,将它们保存到数据库中,然后对它们进行额外的处理。
分析报告可以同时进行,但是保存到db和附加处理需要基于它们来自哪个设备id进行一些同步(因为可能需要更新db上的相同数据)。
因此,只要线程处理来自不同设备id的报告,我就可以并行运行处理。
处理这个问题最有效的方法是什么?
示例
我最初考虑使用线程池并锁定设备id,但如果从单个设备收到大量报告,那就没有效率了。
例如,考虑具有4个线程和10个传入报告的线程池:
报告#设备A2A3A4A5A6B7C8D9E10F
线程1将开始处理a的报告,线程2-4将等待线程1完成,其余的报告将进入队列。
如果a的其余报告可以排队,允许b/c/d报告同时处理,那么效率会更高。有没有有效的方法?
2条答案
按热度按时间9w11ddsr1#
织布机项目
在youtube.com上观看了oracle project loom负责人ron pressler的一些2020年后期视频后,解决方案非常简单,java的未来版本将提供新的虚拟线程(光纤)功能:
叫一个新的
Executors
方法创建使用虚拟线程(光纤)而不是平台/内核线程的执行器服务。将所有传入的报表处理任务提交给该执行器服务。
在每个任务中,尝试获取一个信号量,为1000个设备中的每一个获取一个信号量。
该信号量将是一次只处理每个设备一个输入的方式,以并行化每个源设备。如果表示特定设备的信号量不可用,只需阻塞-让报表处理线程等待信号量可用。
projectloom维护许多轻量级虚拟线程(fibres),甚至数百万个,它们运行在一些重量级平台/内核线程上。这使得阻塞线程变得便宜。
早期版本的jdk二进制文件带有内置于macos/linux/windows的projectloom,现在可以使用了。
警告:我既不是并发方面的Maven,也不是projectloom方面的Maven。但您的特定用例似乎与ronpressler在视频中提出的一些具体建议相匹配。
示例代码
下面是一些我用过的示例代码。我不确定这是不是一个好例子。
我使用了java 16的早期access构建,特别是使用project loom技术构建的:
Build 16-loom+9-316 (2020/11/30)
对于macos intel。mwngjboj2#
尝试使用优先级队列。线程池将选择队列中优先级最高的项进行处理。例如:
注意:我知道优先级队列通常不使用数组实现,有些优先级队列使用较小的索引值来获得更高的优先级。为了简单起见,我用这个符号。
let(设备ID,优先级)。让当前线程池为空->[]
假设,我们得到一个传入的10个报告->[(a,1),(a,1),(a,1),(b,1),(b,1),(c,1),(d,1),(e,1),(f,1),(g,1)](表示接收到报告时填充的优先级队列)。
因此,您将第一个项目出列并将其交给线程池。然后使用deviceid a降低优先级队列中所有项目的优先级。如下所示:
(a,1)已退出队列,因此您只需获得a。然后,优先级队列将在降低仍在队列中的a的优先级之后发生移动[(b、 (1),(b,1),(c,1),(d,1),(e,1),(f,1),(g,1),(a,0),(a,0)]