java 如何为每个请求项创建多个线程

8e2ybdfx  于 2023-01-11  发布在  Java
关注(0)|答案(5)|浏览(150)

我试图在订单级别使用多线程处理下面的代码。

List<String> orders = Arrays.asList("order1", "order2", 
                   "order3", "order4", "order1");
    • 当前顺序执行:**
orders.stream().forEach(order -> {
    rules.forEach(rule -> {
        finalList.add(beanMapper.getBean(rule)
                .applyRule(createTemplate.apply(getMetaData.apply(rule), command),
                           order));
    });
});

我曾尝试使用:

orders.parallelStream().forEach(order -> {}} // code snippet.

但是它正在更改**rules. forEach(rule-〉{}}**的顺序。
例如:
输入:

List<String> orders = Arrays.asList("order1", "order2", 
                         "order3", "order4", "order1");
 List<String> rules = Arrays.asList("rule1", "rule2", "rule3");

预期产出:

order1 with rule1, rule2, rule3
order2 with rule1, rule2, rule3

parallelStream()的实际输出:

order1 with rule3, rule1, rule2
order1 with rule2, rule1, rule3

我不关心orders的顺序,但关心ehrules的顺序。订单可以按任何顺序处理,但规则应按相同顺序执行每个订单。
请帮帮我。

7lrncoxx

7lrncoxx1#

您可以用途:

orders.stream().parallel().forEachOrdered(// Your rules logic goes here. )

ForEachOrdered保证维护流的顺序。

因此,供您参考:

orders.stream().parallel().forEachOrdered( order -> {

            rules.stream().parallel().forEachOrdered ( rule -> {

                 System.out.println( " Order : " + order + " rule :" + rule);
            });

        });

**注:**虽然我们可以做到这一点,但应密切关注性能,因为平行性和秩序并不完美结合!
产出

Order : order1 rule :rule1
 Order : order1 rule :rule2
 Order : order1 rule :rule3
 Order : order2 rule :rule1
 Order : order2 rule :rule2
 Order : order2 rule :rule3
 Order : order3 rule :rule1
 Order : order3 rule :rule2
 Order : order3 rule :rule3
 Order : order4 rule :rule1
 Order : order4 rule :rule2
 Order : order4 rule :rule3
 Order : order1 rule :rule1
 Order : order1 rule :rule2
 Order : order1 rule :rule3
bzzcjhmw

bzzcjhmw2#

您同时从不同的线程向finalList添加元素,这会导致将规则应用于不同顺序的混合结果(规则没有按顺序分组)。
您可以通过为每个order创建一个临时列表,然后同步合并finalList中的所有临时列表来解决这个问题。
以下是使用Stream-API(Java 9+)实现此操作的方法:

List<AppliedRule> finalList = orders.parallelStream().map(order ->
        rules.stream().map(rule -> applyRule(order, rule)).collect(Collectors.toList())
).collect(Collectors.flatMapping(Collection::stream, Collectors.toList()));

注意:这里使用Collectors.flatMapping()而不是简单的flatMap,以便在流收集期间同步运行平面Map。
Java 8模拟版本:

List<AppliedRule> finalList = orders.parallelStream().map(order ->
        rules.stream().map(rule -> applyRule(order, rule)).collect(Collectors.toList())
).collect(Collectors.toList())
        .stream()
        .flatMap(Collection::stream)
        .collect(Collectors.toList());
uinbv5nw

uinbv5nw3#

这样行吗?

final int rulesSize = rules.size();
AtomicInteger atomicInteger = new AtomicInteger(0);
orders.stream().parallel().forEach(order -> {
    IntStream.range(0, rulesSize).parallel().forEach( i -> {
        synchronized (atomicInteger) {
            System.out.println(" Order : " + order + " rule :" + rules.get(atomicInteger.getAndIncrement() % rulesSize));
        }
    });
});
    • 产出**
Order : order1 rule :rule1
 Order : order4 rule :rule2
 Order : order1 rule :rule3
 Order : order3 rule :rule1
 Order : order3 rule :rule2
 Order : order3 rule :rule3
 Order : order2 rule :rule1
 Order : order2 rule :rule2
 Order : order2 rule :rule3
 Order : order1 rule :rule1
 Order : order1 rule :rule2
 Order : order4 rule :rule3
 Order : order1 rule :rule1
 Order : order4 rule :rule2
 Order : order1 rule :rule3
uklbhaso

uklbhaso4#

订单的顺序可以是任何顺序,但规则的顺序不应该改变。同样,对于特定的顺序,规则应该成组出现。
如果是这样,就没有实际并行的余地。
何时

order1-rule1
order1-rule2
order2-rule1
order2-rule2

以及

order2-rule1
order2-rule2
order1-rule1
order1-rule2

是2个订单和2个规则的唯一有效运行,
以及

order1-rule1
order2-rule1
order1-rule2
order2-rule2

被认为是无效的,这不是并行性,只是order的随机化,可能没有任何好处。如果你对order1总是排在第一感到"厌烦",你可以打乱列表,但仅此而已:

public static void main (String[] args) throws java.lang.Exception
{
    List<String> orders = Arrays.asList("order1", "order2", "order3", "order4");
    List<String> rules = Arrays.asList("rule1", "rule2", "rule3");
    Collections.shuffle(orders);
    orders.forEach(order->{
        rules.forEach(rule->{
            System.out.println(order+"-"+rule);
        });
    });
}

甚至不需要流,只需要两个嵌套循环。测试:https://ideone.com/qI3dqd

order2-rule1
order2-rule2
order2-rule3
order4-rule1
order4-rule2
order4-rule3
order1-rule1
order1-rule2
order1-rule3
order3-rule1
order3-rule2
order3-rule3
    • 原答复**

但是它正在改变规则。forEach(rule-〉{}}的顺序。
不,它不会。order可以重叠,但每个顺序的rule的顺序都保持不变。非并行forEach为什么要做其他事情呢?
示例代码:

public static void main (String[] args) throws java.lang.Exception
{
    List<String> orders = Arrays.asList("order1", "order2", "order3", "order4");
    List<String> rules = Arrays.asList("rule1", "rule2", "rule3");
    orders.stream().parallel().forEach(order->{
        rules.forEach(rule->{
            System.out.println(order+"-"+rule);
        });
    });
}

测试:https://ideone.com/95Cybg
输出示例:

order2-rule1
order2-rule2
order2-rule3
order1-rule1
order1-rule2
order1-rule3
order4-rule1
order4-rule2
order4-rule3
order3-rule1
order3-rule2
order3-rule3

order的顺序是混合的,但rule总是1 - 2 - 3。我认为您的输出只是隐藏了配对(实际上您没有显示它是如何生成的)。
当然,它可以被扩展一些延迟,所以order s的处理实际上会重叠:

public static void delay(){
    try{
        Thread.sleep(ThreadLocalRandom.current().nextInt(100,300));
    }catch(Exception ex){}
}

public static void main (String[] args) throws java.lang.Exception
{
    List<String> orders = Arrays.asList("order1", "order2", "order3", "order4");
    List<String> rules = Arrays.asList("rule1", "rule2", "rule3");
    orders.stream().parallel().forEach(order->{
        rules.forEach(rule->{
            delay();
            System.out.println(order+"-"+rule);
        });
    });
}

测试:https://ideone.com/cSFaqS
输出示例:

order3-rule1
order2-rule1
order2-rule2
order3-rule2
order3-rule3
order2-rule3
order1-rule1
order4-rule1
order1-rule2
order4-rule2
order4-rule3
order1-rule3

这可能是你已经看到的东西,只是没有orderx部分。随着order的可见,可以跟踪到rule不断出现1 - 2 - 3,* 每个order *。而且,你的示例列表包含order1两次,这肯定无助于看到发生了什么。

kcwpcxri

kcwpcxri5#

如果你不介意尝试第三方库。这里是我的库的样本:abacus-common

StreamEx.of(orders).parallelStream().forEach(order -> {}}

您甚至可以指定线程数:

StreamEx.of(orders).parallelStream(maxThreadNum).forEach(order -> {}}

将保留rule的顺序。
顺便说一下,由于它是并行流,所以...finalList.add(...代码很可能无法工作,我认为最好收集结果以列出:

StreamEx.of(orders).parallelStream().map/flatMap(order -> {...}}.toList()

即使你以后出于某种原因想保持order的顺序,这也是可行的:

StreamEx.of(orders).indexed().parallelStream()
      .map/flatMap(order -> {...}}.sortedBy(...index).toList()

相关问题