如何在React流中重新分配变量?

wribegjk  于 2021-07-13  发布在  Java
关注(0)|答案(2)|浏览(351)

我将请求发送到Web服务,将结果转换为一个大的 csv 并将csv行保存到数据库中。
由于请求是长时间运行的(10-20秒),我想并行化请求。我一次收集了所有的数据 StringBuilder 保存转换的csv行的。
问题:如果在csv中达到了1000行的数据块,那么如何将数据取出进行持久化,而任何其他并发响应都将写入新的 StringBuilder ?
因为,流的最终变量无法重新初始化。

  1. final StringBuilder sb = new StringBuilder();
  2. AtomicInteger count = new AtomicInteger();
  3. Flux.fromIterable(requests)
  4. .flatMap(req -> {
  5. return webClientService.send(req); //assume long running response
  6. }, 8) //send 8 requests in parallel, as response takes up to 10s
  7. .map(rsp -> {
  8. //convert response to csv values and add to StringBuilder
  9. int c = addCsv(sb, rsp);
  10. if (count.addAndGet(c) > 1000) {
  11. //TODO how can I assign a new StringBuilder,
  12. //so that all further finished responses will append the csv to the new builder?
  13. //same problem with the counter.
  14. databaseWriter.write(sb.build()); //writes the content so far to db, but not threadsafe so far
  15. }
  16. return c;
  17. })
  18. .blockLast();
h5qlskok

h5qlskok1#

或许你可以尝试完全避免副作用,例如:

  1. .map(x -> toCsv(x))
  2. .reduce((a, b) -> {
  3. if (length(a) < 1000) {
  4. return concat(a, b);
  5. }
  6. databaseWriter.write(a);
  7. return b;
  8. })
  9. .doOnNext(x -> databaseWriter.write(x))
nxagd54h

nxagd54h2#

在我看来,您可以使用内置运算符来实现相同的结果:

  1. Flux.fromIterable(requests)
  2. .flatMap(req -> webClientService
  3. .send(req)
  4. .subscribeOn(Schedulers.boundedElastic()), 8)// subscribeOn to subscribe from different threads
  5. .map(resp -> converToCsvLine(resp)) //make some transformations on the respnse
  6. .window(1000) //split incoming data into 1000 lines
  7. .flatMap(stringFlux -> stringFlux.collect(Collectors.joining("\n")))// collect last 1000
  8. .flatMap(s -> Mono.fromRunnable(() -> writeToDb(s))) //do some logic on the collected 1000 lines
  9. .blockLast();

相关问题