本文整理了Java中io.netty.channel.EventLoop
类的一些代码示例,展示了EventLoop
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。EventLoop
类的具体详情如下:
包路径:io.netty.channel.EventLoop
类名称:EventLoop
[英]Will handle all the I/O operations for a Channel once registered. One EventLoop instance will usually handle more than one Channel but this may depend on implementation details and internals.
[中]一旦注册,将处理通道的所有I/O操作。一个EventLoop实例通常会处理多个通道,但这可能取决于实现细节和内部结构。
代码示例来源:origin: line/armeria
private void initSession(SessionProtocol desiredProtocol, ChannelFuture connectFuture,
Promise<Channel> sessionPromise) {
assert connectFuture.isSuccess();
final Channel ch = connectFuture.channel();
final EventLoop eventLoop = ch.eventLoop();
assert eventLoop.inEventLoop();
final ScheduledFuture<?> timeoutFuture = eventLoop.schedule(() -> {
if (sessionPromise.tryFailure(new SessionProtocolNegotiationException(
desiredProtocol, "connection established, but session creation timed out: " + ch))) {
ch.close();
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
ch.pipeline().addLast(new HttpSessionHandler(this, ch, sessionPromise, timeoutFuture));
}
代码示例来源:origin: netty/netty
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
代码示例来源:origin: redisson/redisson
private void scheduleFlush(final ChannelHandlerContext ctx) {
if (nextScheduledFlush == null) {
// Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
nextScheduledFlush = ctx.channel().eventLoop().submit(flushTask);
}
}
代码示例来源:origin: netty/netty
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownOutput0(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
}
return promise;
}
代码示例来源:origin: eclipse-vertx/vert.x
InternalTimerHandler(long timerID, Handler<Long> runnable, boolean periodic, long delay, ContextImpl context) {
this.context = context;
this.timerID = timerID;
this.handler = runnable;
this.periodic = periodic;
this.cancelled = new AtomicBoolean();
EventLoop el = context.nettyEventLoop();
Runnable toRun = () -> context.runOnContext(this);
if (periodic) {
future = el.scheduleAtFixedRate(toRun, delay, delay, TimeUnit.MILLISECONDS);
} else {
future = el.schedule(toRun, delay, TimeUnit.MILLISECONDS);
}
}
代码示例来源:origin: redisson/redisson
nameServerComparator = new NameServerComparator(preferredAddressType.addressType());
Bootstrap b = new Bootstrap();
b.group(executor());
b.channelFactory(channelFactory);
b.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
final DnsResponseHandler responseHandler = new DnsResponseHandler(executor().<Channel>newPromise());
b.handler(new ChannelInitializer<DatagramChannel>() {
@Override
ChannelFuture future = b.register();
Throwable cause = future.cause();
if (cause != null) {
if (cause instanceof RuntimeException) {
ch = future.channel();
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(maxPayloadSize));
ch.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
代码示例来源:origin: apache/hive
int connectTimeoutMs = (int) rpcConf.getConnectTimeoutMs();
final ChannelFuture cf = new Bootstrap()
.group(eloop)
.handler(new ChannelInboundHandlerAdapter() { })
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.connect(host, port);
final Promise<Rpc> promise = eloop.next().newPromise();
final AtomicReference<Rpc> rpc = new AtomicReference<Rpc>();
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture cf) throws Exception {
promise.addListener(new GenericFutureListener<Promise<Rpc>>() {
@Override
public void operationComplete(Promise<Rpc> p) {
代码示例来源:origin: wildfly/wildfly
nameServerComparator = new NameServerComparator(preferredAddressType.addressType());
Bootstrap b = new Bootstrap();
b.group(executor());
b.channelFactory(channelFactory);
b.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
final DnsResponseHandler responseHandler = new DnsResponseHandler(executor().<Channel>newPromise());
b.handler(new ChannelInitializer<DatagramChannel>() {
@Override
ch = (DatagramChannel) b.register().channel();
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(maxPayloadSize));
ch.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
代码示例来源:origin: eclipse-vertx/vert.x
handlerContext.channel()
.eventLoop()
.register(fileChannel)
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
fileChannel.pipeline().fireUserEventTriggered(raf);
} else {
result.tryFail(future.cause());
代码示例来源:origin: yu199195/Raincat
@Override
public void doConnect() {
if (channel != null && channel.isActive()) {
return;
}
final TxManagerServer txManagerServer = TxManagerLocator.getInstance().locator();
if (Objects.nonNull(txManagerServer)
&& StringUtils.isNoneBlank(txManagerServer.getHost())
&& Objects.nonNull(txManagerServer.getPort())) {
host = txManagerServer.getHost();
port = txManagerServer.getPort();
}
ChannelFuture future = bootstrap.connect(host, port);
LogUtil.info(LOGGER, ".....connect txManager-socket -> host:port:{}", () -> host + ":" + port);
future.addListener((ChannelFutureListener) futureListener -> {
if (futureListener.isSuccess()) {
channel = futureListener.channel();
LogUtil.info(LOGGER, "Connect to server successfully!-> host:port:{}", () -> host + ":" + port);
} else {
LogUtil.info(LOGGER, "Failed to connect to server, try connect after 5s-> host:port:{}", () -> host + ":" + port);
futureListener.channel().eventLoop().schedule(this::doConnect, 5, TimeUnit.SECONDS);
}
});
}
代码示例来源:origin: Exslims/MercuryTrade
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
connectionEstablished = false;
future.channel().close();
future.channel().eventLoop().schedule(() -> {
bootstrap.connect().addListener(this);
}, new Random().nextInt(5), TimeUnit.MINUTES);
} else {
channel = future.channel();
connectionEstablished = true;
addCLoseDetectListener(channel);
LOGGER.debug("Connection established");
}
}
代码示例来源:origin: wildfly/wildfly
private void onHttp2UpgradeStreamInitialized(ChannelHandlerContext ctx, Http2MultiplexCodecStream stream) {
assert stream.state() == Http2Stream.State.HALF_CLOSED_LOCAL;
DefaultHttp2StreamChannel ch = new DefaultHttp2StreamChannel(stream, true);
ch.outboundClosed = true;
// Add our upgrade handler to the channel and then register the channel.
// The register call fires the channelActive, etc.
ch.pipeline().addLast(upgradeStreamHandler);
ChannelFuture future = ctx.channel().eventLoop().register(ch);
if (future.isDone()) {
registerDone(future);
} else {
future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
}
}
代码示例来源:origin: line/armeria
private void doCloseSync() {
final CountDownLatch outerLatch = eventLoop.submit(() -> {
if (allChannels.isEmpty()) {
return null;
final ChannelFuture f = ch.closeFuture();
closeFutures.add(f);
f.addListener((ChannelFutureListener) future -> latch.countDown());
closeFutures.forEach(f -> f.channel().close());
return latch;
}).syncUninterruptibly().getNow();
代码示例来源:origin: com.simplyti.cloud/simple-server-core
private Future<Void> bind(EventLoop executor, int port) {
Promise<Void> futureBind = executor.newPromise();
log.info("Starting service listener on port {}", port);
ChannelFuture channelFuture = bootstrap.bind(port);
channelFuture.addListener((ChannelFuture future) -> {
if (future.isSuccess()) {
log.info("Listening on {}", future.channel().localAddress());
this.serverChannels.add(channelFuture.channel());
futureBind.setSuccess(null);
} else {
log.warn("Error listening on port {}: {}", port, future.cause().getMessage());
futureBind.setFailure(future.cause());
}
});
return futureBind;
}
代码示例来源:origin: netty/netty
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
}
代码示例来源:origin: wildfly/wildfly
public void open0(ChannelHandlerContext ctx, final Promise<Http2StreamChannel> promise) {
assert ctx.executor().inEventLoop();
final Http2StreamChannel streamChannel = ((Http2MultiplexCodec) ctx.handler()).newOutboundStream();
try {
init(streamChannel);
} catch (Exception e) {
streamChannel.unsafe().closeForcibly();
promise.setFailure(e);
return;
}
ChannelFuture future = ctx.channel().eventLoop().register(streamChannel);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
promise.setSuccess(streamChannel);
} else if (future.isCancelled()) {
promise.cancel(false);
} else {
if (streamChannel.isRegistered()) {
streamChannel.close();
} else {
streamChannel.unsafe().closeForcibly();
}
promise.setFailure(future.cause());
}
}
});
}
代码示例来源:origin: netty/netty
if (ch == null) {
Bootstrap bs = bootstrap.clone();
bs.attr(POOL_KEY, this);
ChannelFuture f = connectChannel(bs);
if (f.isDone()) {
notifyConnect(f, promise);
} else {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
EventLoop loop = ch.eventLoop();
if (loop.inEventLoop()) {
doHealthCheck(ch, promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
promise.tryFailure(cause);
代码示例来源:origin: AsyncHttpClient/async-http-client
@Override
protected void complete() {
channel.eventLoop().execute(() -> channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
.addListener(future -> removeFromPipeline()));
}
代码示例来源:origin: blynkkk/blynk-server
@Override
public void operationComplete(ChannelFuture channelFuture) {
session.initialEventLoop.register(channelFuture.channel()).addListener(completeHandler);
}
});
代码示例来源:origin: com.microsoft.rest.v2/client-runtime
private void writeRequest(final DefaultHttpRequest raw) {
channel.eventLoop().execute(() ->
channel //
.write(raw) //
.addListener((Future<Void> future) -> {
if (!future.isSuccess()) {
emitError(future.cause());
}
})
);
}
内容来源于网络,如有侵权,请联系作者删除!