我正在尝试通过vertx事件总线发送大量消息,例如(使用hazelcast群集)而不阻塞:
EventBus eb = vertx.eventBus();
for (int i = 0; i < 100; i++) {
vertx.setPeriodic(1, num -> {
eb.send("clusteredEndpoint", "ping");
});
}
当计时器的数量较少时,它可以正常工作,但是在大约100个计时器时,我会得到这个错误。
我想知道如何在不阻塞的情况下扩展到100k事件/秒(作为参考,我编写了一个vertxwebsocket测试,它可能会超过这个数字)。
如果不可能的话,我想了解什么是阻塞-看起来像是这门课上的东西:https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/eventbus/impl/clustered/serializer.java
参考-此代码不阻塞-即使使用1000个计时器:
HttpClient client = vertx.createHttpClient();
client.webSocket(8080, "localhost", "/", res -> {
for (int i = 0; i < 1000; i++) {
vertx.setPeriodic(1, num -> {
res.result().writeTextMessage("ping");
});
}
});
});
2020年12月15日上午10:54:38 io.vertx.core.impl.blockedthreadchecker警告:线程[vert.x-eventloop-thread-1,5,main]已被阻塞36794毫秒,时间限制为2000毫秒io.vertx.core.vertxexception:线程在io.vertx.core.impl.future.futureimpl.addlistener(futureimpl。java:140)在io.vertx.core.impl.future.promiseimpl.addlistener(promiseimpl。java:23)在io.vertx.core.impl.future.futureimpl.oncomplete(futureimpl。java:133)在io.vertx.core.impl.future.promiseimpl.oncomplete(promiseimpl。java:23)在io.vertx.core.spi.cluster.impl.selector.selectors.withselector(选择器。java:48)在io.vertx.core.spi.cluster.impl.defaultnodeselector.selectforsend(defaultnodeselector。java:42)在io.vertx.core.eventbus.impl.clusteredeventbus$$lambda$1065/195695453.accept(未知在io.vertx.core.eventbus.impl.clustered.serializer$serializerqueue$serializedtask.process(serializer。java:147)位于io.vertx.core.eventbus.impl.clustered.serializer$serializerqueue.checkpending(序列化程序)。java:94)在io.vertx.core.eventbus.impl.clustered.serializer$serializerqueue.add(serializer。java:114)在io.vertx.core.eventbus.impl.clustered.serializer.queue(序列化程序。java:65)在io.vertx.core.eventbus.impl.clusteredeventbus.sendorpub(clusteredeventbus。java:172)在io.vertx.core.eventbus.impl.outbounddeliverycontext.next(outbounddeliverycontext。java:127)在io.vertx.core.eventbus.impl.eventbusimpl.SendorPubiInternal(eventbusimpl。java:394)在io.vertx.core.eventbus.impl.eventbusimpl.sendorpubinternal(eventbusimpl。java:400)在io.vertx.core.eventbus.impl.eventbusimpl.send(eventbusimpl。java:103)在io.vertx.core.eventbus.impl.eventbusimpl.send(eventbusimpl。java:97)在io.vertx.example.ebtestclient.lambda$start$0(ebtestclient。java:22)在io.vertx.example.ebtestclient$$lambda$1056/1487417027.handle(未知源)在io.vertx.core.impl.vertximpl$internaltimerhandler.handle(vertximpl。java:939)在io.vertx.core.impl.vertximpl$internaltimerhandler.handle(vertximpl。java:910)在io.vertx.core.impl.eventloopcontext.emit(eventloopcontext。java:52)在io.vertx.core.impl.contextimpl.emit(contextimpl。java:294)在io.vertx.core.impl.eventloopcontext.emit(eventloopcontext。java:24)在io.vertx.core.impl.abstractcontext.emit(abstractcontext。java:49)在io.vertx.core.impl.eventloopcontext.emit(eventloopcontext。java:24)在io.vertx.core.impl.vertximpl$internaltimerhandler.run(vertximpl。java:933)在io.netty.util.concurrent.promisetask.runtask(promisetask。java:98)在io.netty.util.concurrent.scheduledfuturetask.run(scheduledfuturetask。java:176)在io.netty.util.concurrent.abstracteventexecutor.safeexecute(abstracteventexecutor。java:164)在io.netty.util.concurrent.singlethreadeventexecutor.runalltasks(singlethreadeventexecutor。java:472)在io.netty.channel.nio.nioeventloop.run(nioeventloop。java:500)在io.netty.util.concurrent.singlethreadeventexecutor$4.run(singlethreadeventexecutor。java:989)在io.netty.util.internal.threadexecutormap$2.run(threadexecutormap。java:74)在io.netty.util.concurrent.fastthreadlocalrunnable.run(fastthreadlocalrunnable。java:30)在java.lang.thread.run(线程。java:748)
2条答案
按热度按时间gopyfrb31#
以下是我进一步调查后的分析:
当使用vertx事件总线进行远程通信时,一旦消费者不知所措,它就会停止响应。这会导致生产者阻止,我捕获了3个不同的阻止消息(见下文)。在阻塞警告之后有这样一个警告:
警告:服务器2d1fb2ce-940f-4b60-bf60-39847f31bcaf中没有pong-将认为它已死亡
我的问题的答案是,“为什么”阻塞并不重要,因为它已经死了(因为它已经达到了某个极限)。
我很惊讶vert.x不能更优雅地处理这个问题,比如抛出异常。
阻塞错误#1
在io.vertx.core.impl.future.futureimpl.addlistener(futureimpl。java:140)在io.vertx.core.impl.future.promiseimpl.addlistener(promiseimpl。java:23)在io.vertx.core.impl.future.futureimpl.oncomplete(futureimpl。java:133)在io.vertx.core.impl.future.promiseimpl.oncomplete(promiseimpl。java:23)在io.vertx.core.spi.cluster.impl.selector.selectors.withselector(选择器。java:48)在
阻塞错误#2
io.vertx.core.vertxexception:线程在java.nio.charset.charsetencoder处被阻塞。java:198)在java.nio.charset.charsetencoder.(charsetencoder。java:233)在sun.nio.cs.utf\u8$encoder.(utf\u8。java:558)在sun.nio.cs.utf\u8$encoder.(utf\u8。java:554)在sun.nio.cs.utf\u8.newencoder(utf\u8。java:72)
阻塞错误#3
io.vertx.core.vertxexception:线程在io.vertx.core.eventbus.impl.clustered.connectionholder.writemessage(connectionholder。java:93)在io.vertx.core.eventbus.impl.clusteredeventbus.sendremote(clusteredeventbus。java:332)在io.vertx.core.eventbus.impl.clusteredeventbus.sendtonode(clusteredeventbus。java:283)
6ojccjat2#
首先,您将在同一个线程上运行100个任务,因为vert.x具有线程相关性。如果要避免这种情况,请在单独的垂直轴上运行它们。但是,我仍然不认为你有100个CPU,所以会有很多争论。
将它们全部设置为每1ms执行一次意味着它们需要在10微秒内完成,其中包括网络代码,因为您使用的是集群eventbus。
所以,测试是如何编写的,而不是vert.x在做什么。
如果您真的想测试这种负载(我们这里说的是100k rps),请将您的请求分散到多台机器上。
但是,我不确定hazelcast是否适合处理这种负载。
如果你想知道什么是真正的障碍,我猜这部分代码是:
https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/spi/cluster/impl/defaultnodeselector.java#l43
由于我没有一个集群的vert.x设置,我无法确认我的假设是否正确。