本文整理了Java中io.advantageous.qbit.queue.Queue.sendQueue
方法的一些代码示例,展示了Queue.sendQueue
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Queue.sendQueue
方法的具体详情如下:
包路径:io.advantageous.qbit.queue.Queue
类名称:Queue
方法名:sendQueue
[英]This returns a NON-thread safe SendQueue queue. It is not thread safe so that you can batch sends to minimize thread hand-off and to maximize IO throughput. Each call to this method returns a forwardEvent queue that can only be access from one thread. You get MT behavior by having a SendQueue per thread.
[中]这将返回一个非线程安全的SendQueue队列。它不是线程安全的,因此您可以批量发送以最小化线程切换并最大化IO吞吐量。对该方法的每次调用都会返回一个forwardEvent队列,该队列只能从一个线程访问。通过每个线程有一个SendQueue,可以获得MT行为。
代码示例来源:origin: advantageous/qbit
@Override
public SendQueue<MethodCall<Object>> requests() {
return requestQueue.sendQueue();
}
代码示例来源:origin: advantageous/qbit
@Override
public SendQueue<MethodCall<Object>> methodSendQueue() {
return methodQueue.sendQueue();
}
代码示例来源:origin: advantageous/qbit
default SendQueue<T> sendQueueWithAutoFlush(final PeriodicScheduler periodicScheduler,
final int interval, final TimeUnit timeUnit) {
SendQueue<T> sendQueue = sendQueue();
return new AutoFlushingSendQueue<>(sendQueue, periodicScheduler, interval, timeUnit);
}
代码示例来源:origin: advantageous/qbit
/**
* Create a wrapper SendQueue that encoders the objects to JSON
* before putting them into the queue.
*
* @return returns wrapped SendQueue tht does JSON encoding.
*/
@Override
public SendQueue<T> sendQueue() {
final SendQueue<String> sendQueue = queue.sendQueue();
return createJsonSendQueue(sendQueue);
}
代码示例来源:origin: advantageous/qbit
public <T> T createProxy(Class<T> serviceInterface) {
final SendQueue<MethodCall<Object>> methodCallSendQueue = requestQueue.sendQueue();
return proxy(serviceInterface, methodCallSendQueue);
}
代码示例来源:origin: advantageous/qbit
@Override
public HttpClient startClient() {
sendQueue = messages.sendQueue();
thread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Sys.sleep(50);
periodicFlushCallback.accept(null);
sendQueue.flushSends();
if (thread.isInterrupted()) {
break;
}
}
}
});
thread.start();
return this;
}
代码示例来源:origin: advantageous/qbit
this.responseQueue.sendQueue().sendAndFlush(response);
代码示例来源:origin: advantageous/qbit
@Override
public void run() {
final SendQueue<String> sendQueue = queue.sendQueue();
for (int index = 0; index < 1000; index++) {
sendQueue.send("this item " + index);
}
sendQueue.flushSends();
}
});
代码示例来源:origin: advantageous/qbit
@Override
public void run() {
final SendQueue<String> sendQueue = queue.sendQueue();
for (int index = 0; index < 1000; index++) {
sendQueue.send("item" + index);
}
sendQueue.flushSends();
}
});
代码示例来源:origin: advantageous/qbit
public WebSocketTextQueue(final WebSocket webSocket) {
this.webSocket = webSocket;
this.stringQueue = QueueBuilder.queueBuilder().setBatchSize(1).setLimit(100).setPollWait(1000).build();
final SendQueue<String> sendQueue = this.stringQueue.sendQueue();
this.webSocket.setTextMessageConsumer(sendQueue::send);
}
代码示例来源:origin: advantageous/qbit
final SendQueue<String> sendQueue = queue.sendQueue();
代码示例来源:origin: advantageous/qbit
@Test
public void integrationTest() throws Exception {
final Queue<String> queue = QueueBuilder.queueBuilder().build();
final QueueToStreamUnicast<String> stream = new QueueToStreamUnicast<>(queue);
final Iterator<String> iterator = toIteratorWithRequestsPipelineSize(stream, 10);
final SendQueue<String> sendQueue = queue.sendQueue();
for (int index = 0; index < 100; index++) {
sendQueue.send("" + index);
}
sendQueue.flushSends();
Thread thread = new Thread(() -> {
Sys.sleep(500);
queue.stop();
});
thread.start();
int i = 0;
while (iterator.hasNext()) {
String item = iterator.next();
assertEquals("" + i, item);
puts("" + i, item);
i++;
}
assertEquals(100, i);
}
代码示例来源:origin: advantageous/qbit
@Before
public void setUp() throws Exception {
personQueue = JsonQueue.createMapQueue(String.class, Person.class, QueueBuilder.queueBuilder()
.setName("FOO").build());
personSendQueue = personQueue.sendQueue();
personReceiveQueue = personQueue.receiveQueue();
personSendQueue.shouldBatch();
personSendQueue.name();
personSendQueue.size();
personQueue.name();
personQueue.size();
}
代码示例来源:origin: advantageous/qbit
@Before
public void setUp() throws Exception {
personQueue = JsonQueue.createListQueue(Person.class, QueueBuilder.queueBuilder()
.setName("FOO").build());
personSendQueue = personQueue.sendQueue();
personReceiveQueue = personQueue.receiveQueue();
personSendQueue.shouldBatch();
personSendQueue.name();
personSendQueue.size();
personQueue.name();
personQueue.size();
}
代码示例来源:origin: advantageous/qbit
@Before
public void setup() {
final QueueBuilder queueBuilder = QueueBuilder.queueBuilder();
queue = queueBuilder.setLinkTransferQueue().setBatchSize(50)
.setCheckEvery(5).setCheckIfBusy(true)
.setName("Queue test").setPollTimeUnit(TimeUnit.MILLISECONDS)
.setPollWait(50).build();
receiveQueue = queue.receiveQueue();
sendQueue = queue.sendQueue();
}
代码示例来源:origin: advantageous/qbit
@Before
public void setup() {
final QueueBuilder queueBuilder = QueueBuilder.queueBuilder();
queue = queueBuilder.setArrayBlockingQueue().setBatchSize(10)
.setCheckEvery(5).setCheckIfBusy(false)
.setName("Queue test").setPollTimeUnit(TimeUnit.MILLISECONDS)
.setPollWait(50).build();
receiveQueue = queue.receiveQueue();
sendQueue = queue.sendQueue();
}
代码示例来源:origin: advantageous/qbit
@Before
public void setUp() throws Exception {
personQueue = new JsonQueue<>(Person.class, QueueBuilder.queueBuilder()
.setName("FOO").build());
personSendQueue = personQueue.sendQueue();
personReceiveQueue = personQueue.receiveQueue();
personSendQueue.shouldBatch();
personSendQueue.name();
personSendQueue.size();
personQueue.name();
personQueue.size();
}
代码示例来源:origin: advantageous/qbit
@Before
public void setup() {
final QueueBuilder queueBuilder = QueueBuilder.queueBuilder();
queue = queueBuilder.setLinkTransferQueue().setBatchSize(50)
.setCheckEvery(5).setCheckIfBusy(true).setTryTransfer(true)
.setName("Queue test").setPollTimeUnit(TimeUnit.MILLISECONDS)
.setPollWait(50).build();
receiveQueue = queue.receiveQueue();
sendQueue = queue.sendQueue();
}
代码示例来源:origin: advantageous/qbit
@Before
public void setup() {
final QueueBuilder queueBuilder = QueueBuilder.queueBuilder();
queue = queueBuilder.setArrayBlockingQueue().setBatchSize(50)
.setCheckEvery(5).setCheckIfBusy(true)
.setEnqueueTimeoutTimeUnit(null).setEnqueueTimeout(0)
.setName("Queue test").setPollTimeUnit(TimeUnit.MILLISECONDS)
.setPollWait(50).build();
receiveQueue = queue.receiveQueue();
sendQueue = queue.sendQueue();
}
代码示例来源:origin: advantageous/qbit
@Test(expected = QueueException.class)
public void testTimeout() {
final QueueBuilder queueBuilder = QueueBuilder.queueBuilder();
queue = queueBuilder.setArrayBlockingQueue().setBatchSize(5).setSize(5)
.setEnqueueTimeout(1).setEnqueueTimeoutTimeUnit(TimeUnit.SECONDS)
.setName("Queue test").setPollTimeUnit(TimeUnit.MILLISECONDS)
.setPollWait(50).build();
receiveQueue = queue.receiveQueue();
sendQueue = queue.sendQueue();
for (int index = 0; index < 2000; index++) {
sendQueue.send("" + index);
}
}
内容来源于网络,如有侵权,请联系作者删除!