io.netty.channel.EventLoop类的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(246)

本文整理了Java中io.netty.channel.EventLoop类的一些代码示例,展示了EventLoop类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。EventLoop类的具体详情如下:
包路径:io.netty.channel.EventLoop
类名称:EventLoop

EventLoop介绍

[英]Will handle all the I/O operations for a Channel once registered. One EventLoop instance will usually handle more than one Channel but this may depend on implementation details and internals.
[中]一旦注册,将处理通道的所有I/O操作。一个EventLoop实例通常会处理多个通道,但这可能取决于实现细节和内部结构。

代码示例

代码示例来源:origin: line/armeria

private void initSession(SessionProtocol desiredProtocol, ChannelFuture connectFuture,
             Promise<Channel> sessionPromise) {
  assert connectFuture.isSuccess();
  final Channel ch = connectFuture.channel();
  final EventLoop eventLoop = ch.eventLoop();
  assert eventLoop.inEventLoop();
  final ScheduledFuture<?> timeoutFuture = eventLoop.schedule(() -> {
    if (sessionPromise.tryFailure(new SessionProtocolNegotiationException(
        desiredProtocol, "connection established, but session creation timed out: " + ch))) {
      ch.close();
    }
  }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
  ch.pipeline().addLast(new HttpSessionHandler(this, ch, sessionPromise, timeoutFuture));
}

代码示例来源:origin: netty/netty

@Override
  public void initChannel(final Channel ch) throws Exception {
    final ChannelPipeline pipeline = ch.pipeline();
    ChannelHandler handler = config.handler();
    if (handler != null) {
      pipeline.addLast(handler);
    }
    ch.eventLoop().execute(new Runnable() {
      @Override
      public void run() {
        pipeline.addLast(new ServerBootstrapAcceptor(
            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
      }
    });
  }
});

代码示例来源:origin: redisson/redisson

private void scheduleFlush(final ChannelHandlerContext ctx) {
  if (nextScheduledFlush == null) {
    // Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
    nextScheduledFlush = ctx.channel().eventLoop().submit(flushTask);
  }
}

代码示例来源:origin: netty/netty

@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
  EventLoop loop = eventLoop();
  if (loop.inEventLoop()) {
    shutdownOutput0(promise);
  } else {
    loop.execute(new Runnable() {
      @Override
      public void run() {
        shutdownOutput0(promise);
      }
    });
  }
  return promise;
}

代码示例来源:origin: eclipse-vertx/vert.x

InternalTimerHandler(long timerID, Handler<Long> runnable, boolean periodic, long delay, ContextImpl context) {
 this.context = context;
 this.timerID = timerID;
 this.handler = runnable;
 this.periodic = periodic;
 this.cancelled = new AtomicBoolean();
 EventLoop el = context.nettyEventLoop();
 Runnable toRun = () -> context.runOnContext(this);
 if (periodic) {
  future = el.scheduleAtFixedRate(toRun, delay, delay, TimeUnit.MILLISECONDS);
 } else {
  future = el.schedule(toRun, delay, TimeUnit.MILLISECONDS);
 }
}

代码示例来源:origin: redisson/redisson

nameServerComparator = new NameServerComparator(preferredAddressType.addressType());
Bootstrap b = new Bootstrap();
b.group(executor());
b.channelFactory(channelFactory);
b.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
final DnsResponseHandler responseHandler = new DnsResponseHandler(executor().<Channel>newPromise());
b.handler(new ChannelInitializer<DatagramChannel>() {
  @Override
ChannelFuture future = b.register();
Throwable cause = future.cause();
if (cause != null) {
  if (cause instanceof RuntimeException) {
ch = future.channel();
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(maxPayloadSize));
ch.closeFuture().addListener(new ChannelFutureListener() {
  @Override
  public void operationComplete(ChannelFuture future) {

代码示例来源:origin: apache/hive

int connectTimeoutMs = (int) rpcConf.getConnectTimeoutMs();
final ChannelFuture cf = new Bootstrap()
  .group(eloop)
  .handler(new ChannelInboundHandlerAdapter() { })
  .channel(NioSocketChannel.class)
  .option(ChannelOption.SO_KEEPALIVE, true)
  .connect(host, port);
final Promise<Rpc> promise = eloop.next().newPromise();
final AtomicReference<Rpc> rpc = new AtomicReference<Rpc>();
cf.addListener(new ChannelFutureListener() {
 @Override
 public void operationComplete(ChannelFuture cf) throws Exception {
promise.addListener(new GenericFutureListener<Promise<Rpc>>() {
 @Override
 public void operationComplete(Promise<Rpc> p) {

代码示例来源:origin: wildfly/wildfly

nameServerComparator = new NameServerComparator(preferredAddressType.addressType());
Bootstrap b = new Bootstrap();
b.group(executor());
b.channelFactory(channelFactory);
b.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
final DnsResponseHandler responseHandler = new DnsResponseHandler(executor().<Channel>newPromise());
b.handler(new ChannelInitializer<DatagramChannel>() {
  @Override
ch = (DatagramChannel) b.register().channel();
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(maxPayloadSize));
ch.closeFuture().addListener(new ChannelFutureListener() {
  @Override
  public void operationComplete(ChannelFuture future) {

代码示例来源:origin: eclipse-vertx/vert.x

handlerContext.channel()
 .eventLoop()
 .register(fileChannel)
 .addListener((ChannelFutureListener) future -> {
  if (future.isSuccess()) {
   fileChannel.pipeline().fireUserEventTriggered(raf);
  } else {
   result.tryFail(future.cause());

代码示例来源:origin: yu199195/Raincat

@Override
public void doConnect() {
  if (channel != null && channel.isActive()) {
    return;
  }
  final TxManagerServer txManagerServer = TxManagerLocator.getInstance().locator();
  if (Objects.nonNull(txManagerServer)
      && StringUtils.isNoneBlank(txManagerServer.getHost())
      && Objects.nonNull(txManagerServer.getPort())) {
    host = txManagerServer.getHost();
    port = txManagerServer.getPort();
  }
  ChannelFuture future = bootstrap.connect(host, port);
  LogUtil.info(LOGGER, ".....connect txManager-socket -> host:port:{}", () -> host + ":" + port);
  future.addListener((ChannelFutureListener) futureListener -> {
    if (futureListener.isSuccess()) {
      channel = futureListener.channel();
      LogUtil.info(LOGGER, "Connect to server successfully!-> host:port:{}", () -> host + ":" + port);
    } else {
      LogUtil.info(LOGGER, "Failed to connect to server, try connect after 5s-> host:port:{}", () -> host + ":" + port);
      futureListener.channel().eventLoop().schedule(this::doConnect, 5, TimeUnit.SECONDS);
    }
  });
}

代码示例来源:origin: Exslims/MercuryTrade

@Override
public void operationComplete(ChannelFuture future) throws Exception {
  if (!future.isSuccess()) {
    connectionEstablished = false;
    future.channel().close();
    future.channel().eventLoop().schedule(() -> {
      bootstrap.connect().addListener(this);
    }, new Random().nextInt(5), TimeUnit.MINUTES);
  } else {
    channel = future.channel();
    connectionEstablished = true;
    addCLoseDetectListener(channel);
    LOGGER.debug("Connection established");
  }
}

代码示例来源:origin: wildfly/wildfly

private void onHttp2UpgradeStreamInitialized(ChannelHandlerContext ctx, Http2MultiplexCodecStream stream) {
  assert stream.state() == Http2Stream.State.HALF_CLOSED_LOCAL;
  DefaultHttp2StreamChannel ch = new DefaultHttp2StreamChannel(stream, true);
  ch.outboundClosed = true;
  // Add our upgrade handler to the channel and then register the channel.
  // The register call fires the channelActive, etc.
  ch.pipeline().addLast(upgradeStreamHandler);
  ChannelFuture future = ctx.channel().eventLoop().register(ch);
  if (future.isDone()) {
    registerDone(future);
  } else {
    future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
  }
}

代码示例来源:origin: line/armeria

private void doCloseSync() {
  final CountDownLatch outerLatch = eventLoop.submit(() -> {
    if (allChannels.isEmpty()) {
      return null;
      final ChannelFuture f = ch.closeFuture();
      closeFutures.add(f);
      f.addListener((ChannelFutureListener) future -> latch.countDown());
    closeFutures.forEach(f -> f.channel().close());
    return latch;
  }).syncUninterruptibly().getNow();

代码示例来源:origin: com.simplyti.cloud/simple-server-core

private Future<Void> bind(EventLoop executor, int port) {
  Promise<Void> futureBind = executor.newPromise();
  log.info("Starting service listener on port {}", port);
  ChannelFuture channelFuture = bootstrap.bind(port);
  channelFuture.addListener((ChannelFuture future) -> {
    if (future.isSuccess()) {
      log.info("Listening on {}", future.channel().localAddress());
      this.serverChannels.add(channelFuture.channel());
      futureBind.setSuccess(null);
    } else {
      log.warn("Error listening on port {}: {}", port, future.cause().getMessage());
      futureBind.setFailure(future.cause());
    }
  });
  return futureBind;
}

代码示例来源:origin: netty/netty

@Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    final ChannelConfig config = ctx.channel().config();
    if (config.isAutoRead()) {
      // stop accept new connections for 1 second to allow the channel to recover
      // See https://github.com/netty/netty/issues/1328
      config.setAutoRead(false);
      ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
    }
    // still let the exceptionCaught event flow through the pipeline to give the user
    // a chance to do something with it
    ctx.fireExceptionCaught(cause);
  }
}

代码示例来源:origin: wildfly/wildfly

public void open0(ChannelHandlerContext ctx, final Promise<Http2StreamChannel> promise) {
  assert ctx.executor().inEventLoop();
  final Http2StreamChannel streamChannel = ((Http2MultiplexCodec) ctx.handler()).newOutboundStream();
  try {
    init(streamChannel);
  } catch (Exception e) {
    streamChannel.unsafe().closeForcibly();
    promise.setFailure(e);
    return;
  }
  ChannelFuture future = ctx.channel().eventLoop().register(streamChannel);
  future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
      if (future.isSuccess()) {
        promise.setSuccess(streamChannel);
      } else if (future.isCancelled()) {
        promise.cancel(false);
      } else {
        if (streamChannel.isRegistered()) {
          streamChannel.close();
        } else {
          streamChannel.unsafe().closeForcibly();
        }
        promise.setFailure(future.cause());
      }
    }
  });
}

代码示例来源:origin: netty/netty

if (ch == null) {
  Bootstrap bs = bootstrap.clone();
  bs.attr(POOL_KEY, this);
  ChannelFuture f = connectChannel(bs);
  if (f.isDone()) {
    notifyConnect(f, promise);
  } else {
    f.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
EventLoop loop = ch.eventLoop();
if (loop.inEventLoop()) {
  doHealthCheck(ch, promise);
} else {
  loop.execute(new Runnable() {
    @Override
    public void run() {
promise.tryFailure(cause);

代码示例来源:origin: AsyncHttpClient/async-http-client

@Override
protected void complete() {
 channel.eventLoop().execute(() -> channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
     .addListener(future -> removeFromPipeline()));
}

代码示例来源:origin: blynkkk/blynk-server

@Override
  public void operationComplete(ChannelFuture channelFuture) {
    session.initialEventLoop.register(channelFuture.channel()).addListener(completeHandler);
  }
});

代码示例来源:origin: com.microsoft.rest.v2/client-runtime

private void writeRequest(final DefaultHttpRequest raw) {
  channel.eventLoop().execute(() ->
      channel //
          .write(raw) //
          .addListener((Future<Void> future) -> {
            if (!future.isSuccess()) {
              emitError(future.cause());
            }
          })
  );
}

相关文章