Camel Splitter并行处理数组列表-并发访问问题

xj3cbfub  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(240)

使用Camel拆分ArrayList并并行处理每一项,最多10个线程。以下是配置。线程池配置文件设置为最大线程数=10。

  1. <camel:route id="ReportsRoute">
  2. <camel:from uri="direct:processReportsChannel" />
  3. <camel:to uri="bean:reportRepository?method=getPendingTransactions" />
  4. <camel:split parallelProcessing="true" executorServiceRef="ReportThreadPoolProfile">
  5. <camel:simple>${body}</camel:simple>
  6. <camel:doTry>
  7. <camel:to uri="direct:processReportChannel" />
  8. <camel:doCatch>
  9. <camel:exception>java.lang.Exception</camel:exception>
  10. <camel:handled>
  11. <camel:constant>true</camel:constant>
  12. </camel:handled>
  13. <camel:to uri="bean:ReportRepository?method=markAsFailed"/>
  14. <camel:wireTap uri="direct:loggingAndNotificationChannel" />
  15. </camel:doCatch>
  16. </camel:doTry>
  17. </camel:split>
  18. </camel:route>

bean:reportRepository?method=getPendingTransactions获取数组列表并传递给拆分器。
processReportChannel是处理项的处理器。

**问题:**作业开始时启动了10个线程,但有些线程选择的是同一个项目。例如,如果ArrayList中有item_no_1到10个项目,thread_no_1和thread_no_2或有时更多线程选择的是item_no_2。这是因为Array List不是线程安全的,而且Splitter不管理它吗?

我不是这方面的Maven,需要帮助指出问题所在。

qlvxas9a

qlvxas9a1#

我使用以下(更简单的)设置进行了测试:

  1. <camelContext xmlns="http://camel.apache.org/schema/spring">
  2. <route id="ReportsRoute">
  3. <from uri="direct:start" />
  4. <!-- By default a pool size of 10 is used. -->
  5. <split parallelProcessing="true">
  6. <simple>${body}</simple>
  7. <to uri="direct:sub" />
  8. </split>
  9. </route>
  10. <route>
  11. <from uri="direct:sub"/>
  12. <log message="Processing item ${body}" />
  13. </route>
  14. </camelContext>

测试:

  1. List<Object> list = new ArrayList<>();
  2. for (int i = 0; i < 1000; i++) {
  3. list.add("And we go and go: " + (i + 1));
  4. }
  5. template.sendBody("direct:start", list);

在这种设置下,没有条目被处理两次。因此,在您的处理器中一定有什么东西导致了这个问题,即同一个列表项被多个线程拾取。

展开查看全部

相关问题