更具体的问题是为什么flux.fromiterable()不能与reactor-netty-httpclient一起工作?这个简单的例子很好用。所有10项都由flux publisher发出:
public class ConcurrentLinkedQueueFluxTest {
public static final void main(String[] args) {
List<Integer> aList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<>();
aList.stream().forEach(i -> clq.add(i));
Flux.fromIterable(clq)
.subscribe(new ReactiveSubscriber<Integer>());
}
}
使用reactivesubscriber:
class ReactiveSubscriber<T> extends BaseSubscriber<T> {
private Subscription subscription;
@Override
public void hookOnSubscribe(Subscription s) {
System.out.println("In hookOnSubscribe");
this.subscription = s;
subscription.request(1);
}
@Override
public void hookOnNext(T response) {
System.out.println("In hookOnNext: "+response.toString());
subscription.request(1);
}
@Override
public void hookOnError(Throwable t) {
System.out.println(t.getLocalizedMessage());
}
@Override
public void hookOnComplete() {
System.out.println("In hookOnComplete");
subscription.cancel();
}
}
如下所示,如果我将类似的订阅服务器与httpclient.send(flux.fromiterable())一起使用,则只会发出一个项,而不是队列中的所有项。因此,由于创建通量的flux.fromiterable()方法不能与httpclient一起使用,因此有些配置不正确。
对于实际的生产代码问题,我已经包括了队列定义、客户机方法、订阅者、服务器方法和一个日志,其中只显示5个项目中的1个从客户机发送到服务器。看起来,即使httpclient send()方法从队列中加载了一个flux对象,也只发送一个项,尽管队列中有5个项。
要发送到服务器的项目位于bytebuf类型的队列中:
private ConcurrentLinkedQueue<ByteBuf> electionRequestQueue;
public ElectionTransactionRequest() {
electionRequestQueue = new ConcurrentLinkedQueue<ByteBuf>();
}
客户端方法是:
public void task() {
log.debug("Queue size: "+electionRequestQueue.size());
ElectionTransactionSubscriber etSubscriber = new ElectionTransactionSubscriber();
HttpClient.create()
.tcpConfiguration(tcpClient -> tcpClient.host("localhost"))
.port(61005)
.protocol(HttpProtocol.HTTP11)
.post()
.uri("/election/transaction")
.send(Flux.fromIterable(electionRequestQueue))
.responseContent()
.aggregate()
.asByteArray()
.subscribe(etSubscriber);
}
订户定义为:
class ElectionTransactionSubscriber extends BaseSubscriber<byte[]> {
private static final Logger log = LoggerFactory.getLogger(ElectionTransactionSubscriber.class);
private Subscription subscription;
@Override
public void hookOnSubscribe(Subscription s) {
log.debug("In hookOnSubscribe");
this.subscription = s;
subscription.request(1);
}
@Override
public void hookOnNext(byte[] response) {
log.info("In hookOnNext");
subscription.request(1);
}
@Override
public void hookOnError(Throwable t) {
log.error(t.getLocalizedMessage());
}
@Override
public void hookOnComplete() {
log.debug("In hookOnComplete");
subscription.cancel();
}
}
服务器端在方法中定义:
public void start() {
disposableServer =
HttpServer.create()
.host("localhost")
.port(61005)
.protocol(HttpProtocol.HTTP11)
.route(routes ->
routes
.post("/election/transaction",
(request, response) -> response.send(request
.receive()
.aggregate()
.flatMap(aggregatedBody ->
electionTransactionHandler.electionTransactionResponse(aggregatedBody)))))
.bindNow();
disposableServer.onDispose().block();
}
当客户机运行时,队列中有5个项目,但只有一个项目被发送到服务器,如日志中所示。在订阅服务器中,只发送来自flux发布服务器的1个项目后,将调用hookoncomplete()方法。
2020-12-01 12:03:56,442 DEBUG [main] com.dd.vbc.business.services.client.requests.ElectionTransactionRequest: Queue size: 5
2020-12-01 12:03:56,539 DEBUG [main] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnSubscribe
2020-12-01 12:03:56,746 INFO [reactor-http-epoll-1] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnNext
2020-12-01 12:03:56,746 DEBUG [reactor-http-epoll-1] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnComplete
1条答案
按热度按时间3zwtqj6y1#
缓冲可能是最容易使用的。它发出缓冲区(它们是集合) — 默认情况下,列表)。所以在您的情况下,订阅者将收到一个列表
=========================
这直接来自React堆堆芯的文件:
三种配料
当您有很多元素并且希望将它们分批处理时,reactor中有三种广泛的解决方案:分组、窗口和缓冲。这三者在概念上是相近的,因为它们将通量重新分配到一个集合中。分组和窗口创建通量,同时将聚合缓冲到集合中。使用通量分组
分组是将源流拆分为多个批的行为,每个批都匹配一个键。
关联的运算符是groupby。
每个组都表示为groupedflux,它允许您通过调用其key()方法来检索键。
这些小组的内容没有必要的连续性。一旦一个源元素产生了一个新的键,这个键的组就会被打开,并且与该键匹配的元素最终会出现在这个组中(可以同时打开几个组)。
这意味着:
以下示例按值是偶数还是奇数对值进行分组:
警告分组最适合于组数从中到低的情况。还必须强制使用这些组(例如由flatmap使用),以便groupby继续从上游获取数据并为更多的组提供数据。有时,这两个约束会成倍增加并导致挂起,例如当基数较高且使用组的flatmap的并发性太低时。使用flux
窗口化是根据大小、时间、边界定义 predicate 或边界定义发布者的标准将源流拆分为窗口的行为。
关联的运算符有window、windowtimeout、windowuntil、windowwhile和windowwhen。
与groupby不同,groupby根据传入的键随机重叠,窗口(大多数时间)是按顺序打开的。
不过,有些变体仍然可以重叠。例如,在window(int maxsize,int skip)中,maxsize参数是窗口关闭后的元素数,skip参数是源中打开新窗口后的元素数。因此,如果maxsize>skip,则会在前一个窗口关闭之前打开一个新窗口,两个窗口重叠。
以下示例显示重叠窗口:
注意:使用反向配置(maxsize<skip),源中的一些元素被删除,并且不是任何窗口的一部分。
在通过windowuntil和windowwhile进行基于 predicate 的窗口化的情况下,具有与 predicate 不匹配的后续源元素也会导致空窗口,如下例所示:
通量缓冲
缓冲与窗口类似,但有以下扭曲:它不发射窗口(每个窗口都是通量),而是发射缓冲区(每个窗口都是集合) — 默认情况下,列表)。
用于缓冲的运算符与用于窗口的运算符相同:buffer、buffertimeout、bufferuntil、bufferwhile和bufferwhen。
当相应的窗口操作符打开一个窗口时,缓冲操作符创建一个新集合并开始向其中添加元素。当窗口关闭时,缓冲操作符发出集合。
缓冲还可能导致源元素丢失或缓冲区重叠,如下例所示:
与窗口化不同,bufferuntil和bufferwhile不会发出空缓冲区,如下例所示:
https://github.com/reactor/reactor-core/blob/master/docs/asciidoc/advancedfeatures.adoc