java.nio.channels.Selector类的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(12.3k)|赞(0)|评价(0)|浏览(180)

本文整理了Java中java.nio.channels.Selector类的一些代码示例,展示了Selector类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Selector类的具体详情如下:
包路径:java.nio.channels.Selector
类名称:Selector

Selector介绍

[英]A controller for the selection of SelectableChannel objects. Selectable channels can be registered with a selector and get a SelectionKey that represents the registration. The keys are also added to the selector's key set. Selection keys can be canceled so that the corresponding channel is no longer registered with the selector.

By invoking the select method, the key set is checked and all keys that have been canceled since last select operation are moved to the set of canceled keys. During the select operation, the channels registered with this selector are checked to see whether they are ready for operation according to their SelectionKey.
[中]用于选择可选择通道对象的控制器。可选频道可以通过选择器注册,并获得代表注册的SelectionKey。这些键也会添加到选择器的键集中。可以取消选择键,使相应的频道不再在选择器中注册。
通过调用select方法,将检查密钥集,并将自上次选择操作以来已取消的所有密钥移动到已取消密钥集。在选择操作期间,将根据SelectionKey检查使用该选择器注册的通道是否已准备好进行操作。

代码示例

代码示例来源:origin: iluwatar/java-design-patterns

private void eventLoop() throws IOException {
 while (true) {
  // honor interrupt request
  if (Thread.interrupted()) {
   break;
  }
  // honor any pending commands first
  processPendingCommands();
  /*
   * Synchronous event de-multiplexing happens here, this is blocking call which returns when it is possible to
   * initiate non-blocking operation on any of the registered channels.
   */
  selector.select();
  /*
   * Represents the events that have occurred on registered handles.
   */
  Set<SelectionKey> keys = selector.selectedKeys();
  Iterator<SelectionKey> iterator = keys.iterator();
  while (iterator.hasNext()) {
   SelectionKey key = iterator.next();
   if (!key.isValid()) {
    iterator.remove();
    continue;
   }
   processKey(key);
  }
  keys.clear();
 }
}

代码示例来源:origin: iluwatar/java-design-patterns

/**
 * Creates a reactor which will use provided {@code dispatcher} to dispatch events. The application can provide
 * various implementations of dispatcher which suits its needs.
 * 
 * @param dispatcher
 *          a non-null dispatcher used to dispatch events on registered channels.
 * @throws IOException
 *           if any I/O error occurs.
 */
public NioReactor(Dispatcher dispatcher) throws IOException {
 this.dispatcher = dispatcher;
 this.selector = Selector.open();
}

代码示例来源:origin: apache/activemq

public void close() throws IOException {
  if (rs.isOpen()) {
    rs.close();
    if (sc.isOpen()) {
      sc.socket().shutdownInput();
      sc.socket().close();
    }
    bb = null;
    sc = null;
  }
}

代码示例来源:origin: netty/netty

int selectNow() throws IOException {
  try {
    return selector.selectNow();
  } finally {
    // restore wakeup state if needed
    if (wakenUp.get()) {
      selector.wakeup();
    }
  }
}

代码示例来源:origin: iluwatar/java-design-patterns

/**
 * Stops the reactor and related resources such as dispatcher.
 * 
 * @throws InterruptedException
 *           if interrupted while stopping the reactor.
 * @throws IOException
 *           if any I/O error occurs.
 */
public void stop() throws InterruptedException, IOException {
 reactorMain.shutdownNow();
 selector.wakeup();
 reactorMain.awaitTermination(4, TimeUnit.SECONDS);
 selector.close();
 LOGGER.info("Reactor stopped");
}

代码示例来源:origin: apache/kafka

/**
 * Check for data, waiting up to the given timeout.
 *
 * @param timeoutMs Length of time to wait, in milliseconds, which must be non-negative
 * @return The number of keys ready
 */
private int select(long timeoutMs) throws IOException {
  if (timeoutMs < 0L)
    throw new IllegalArgumentException("timeout should be >= 0");
  if (timeoutMs == 0L)
    return this.nioSelector.selectNow();
  else
    return this.nioSelector.select(timeoutMs);
}

代码示例来源:origin: normanmaurer/netty-in-action

public void serve(int port) throws IOException {
  ServerSocketChannel serverChannel = ServerSocketChannel.open();
  serverChannel.configureBlocking(false);
  ServerSocket ss = serverChannel.socket();
  InetSocketAddress address = new InetSocketAddress(port);
  ss.bind(address);
  Selector selector = Selector.open();
  for (;;){
    try {
      selector.select();
    } catch (IOException ex) {
      ex.printStackTrace();
    Set<SelectionKey> readyKeys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = readyKeys.iterator();
    while (iterator.hasNext()) {
        if (key.isWritable()) {
            if (client.write(buffer) == 0) {
              break;
          key.channel().close();
        } catch (IOException cex) {

代码示例来源:origin: wildfly/wildfly

protected void start(boolean direct) throws Exception {
  selector=Selector.open();
  ch=ServerSocketChannel.open();
  ch.bind(new InetSocketAddress("0.0.0.0", 7500));
  ch.configureBlocking(false);
  ch.register(selector, SelectionKey.OP_ACCEPT, null);
  System.out.println("-- server ready");
    selector.select();
    Set<SelectionKey> keys=selector.selectedKeys();
    for(Iterator<SelectionKey> it=keys.iterator(); it.hasNext();) {
      SelectionKey key=it.next();
      if(!key.isValid()) {
        it.remove();
        continue;
      if(key.isAcceptable()) {
        SocketChannel client_ch=ch.accept();
        if(client_ch != null) { // accept() may return null...
          System.out.printf("accepted connection from %s\n", client_ch.getRemoteAddress());
          client_ch.configureBlocking(false);
          client_ch.register(selector, SelectionKey.OP_READ, create(SIZE, direct));
      else if(key.isReadable()) {
        if(!handle((SocketChannel)key.channel(), (ByteBuffer)key.attachment())) {
          key.cancel();

代码示例来源:origin: AsyncHttpClient/async-http-client

public SocksProxy(int runningTime) throws IOException {
 ServerSocketChannel socks = ServerSocketChannel.open();
 socks.socket().bind(new InetSocketAddress(8000));
 socks.configureBlocking(false);
 Selector select = Selector.open();
 socks.register(select, SelectionKey.OP_ACCEPT);
  select.select(5000);
  Set<SelectionKey> keys = select.selectedKeys();
  for (SelectionKey k : keys) {
   if (!k.isValid())
    continue;
   if (k.isAcceptable() && k.channel() == socks) {
    SocketChannel csock = socks.accept();
     continue;
    addClient(csock);
    csock.register(select, SelectionKey.OP_READ);
   } else if (k.isReadable()) {
      cl.client.close();
      if (cl.remote != null)
       cl.remote.close();
      k.cancel();
      clients.remove(cl);

代码示例来源:origin: alibaba/cobar

public NIOAcceptor(String name, int port, FrontendConnectionFactory factory) throws IOException {
  super.setName(name);
  this.port = port;
  this.selector = Selector.open();
  this.serverChannel = ServerSocketChannel.open();
  this.serverChannel.socket().bind(new InetSocketAddress(port));
  this.serverChannel.configureBlocking(false);
  this.serverChannel.register(selector, SelectionKey.OP_ACCEPT);
  this.factory = factory;
}

代码示例来源:origin: kilim/kilim

try {
  if (!running) {
    Iterator<SelectionKey> it = sel.keys().iterator();
    while (it.hasNext()) {
      SelectionKey sk = it.next();
      sk.cancel();
      Object o = sk.attachment();
      if (o instanceof SockEvent  &&   ((SockEvent)o).ch instanceof ServerSocketChannel) {
          ((ServerSocketChannel)((SockEvent)o).ch).close();
        } catch (IOException ignore) {}
  if (update.get()) n = sel.selectNow();
  else n = sel.select();
} catch (IOException ignore) { n = 0; ignore.printStackTrace(); }
if (n > 0) {
  Iterator<SelectionKey> it = sel.selectedKeys().iterator();
  while (it.hasNext()) {
    SelectionKey sk = it.next();
    it.remove();
    Object o = sk.attachment();
    sk.interestOps(0);
    assert(o instanceof SockEvent);
if (update.getAndSet(false))
  regtask.run();

代码示例来源:origin: voldemort/voldemort

socketChannel = SocketChannel.open();
  socketChannel.socket().setReceiveBufferSize(this.socketBufferSize);
  socketChannel.socket().setSendBufferSize(this.socketBufferSize);
  socketChannel.socket().setTcpNoDelay(true);
  socketChannel.socket().setSoTimeout(soTimeoutMs);
  socketChannel.socket().setKeepAlive(this.socketKeepAlive);
  socketChannel.configureBlocking(false);
  socketChannel.connect(new InetSocketAddress(dest.getHost(), dest.getPort()));
  selector.wakeup();
} catch(Exception e) {

代码示例来源:origin: org.apache.hadoop/hadoop-common

public Listener() throws IOException {
 address = new InetSocketAddress(bindAddress, port);
 // Create a new server socket and set to non blocking mode
 acceptChannel = ServerSocketChannel.open();
 acceptChannel.configureBlocking(false);
 // Bind the server socket to the local host and port
 bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
 port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
 // create a selector;
 selector= Selector.open();
 readers = new Reader[readThreads];
 for (int i = 0; i < readThreads; i++) {
  Reader reader = new Reader(
    "Socket Reader #" + (i + 1) + " for port " + port);
  readers[i] = reader;
  reader.start();
 }
 // Register accepts on the server socket with the selector.
 acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
 this.setName("IPC Server listener on " + port);
 this.setDaemon(true);
}

代码示例来源:origin: apache/nifi

@Override
public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException {
  stopped = false;
  executor = Executors.newFixedThreadPool(maxConnections);
  final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  serverSocketChannel.configureBlocking(false);
  if (maxBufferSize > 0) {
    serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
    final int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
    if (actualReceiveBufSize < maxBufferSize) {
      logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to "
          + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's "
          + "maximum receive buffer");
    }
  }
  serverSocketChannel.socket().bind(new InetSocketAddress(nicAddress, port));
  selector = Selector.open();
  serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}

代码示例来源:origin: rapidoid/rapidoid

private void openSocket() throws IOException {
  U.notNull(net.protocol(), "protocol");
  U.notNull(net.helperClass(), "helperClass");
  String blockingInfo = net.blockingAccept() ? "blocking" : "non-blocking";
  Log.debug("Initializing server", "address", net.address(), "port", net.port(), "sync", net.syncBufs(), "accept", blockingInfo);
  serverSocketChannel = ServerSocketChannel.open();
  if ((serverSocketChannel.isOpen()) && (selector.isOpen())) {
    serverSocketChannel.configureBlocking(net.blockingAccept());
    ServerSocket socket = serverSocketChannel.socket();
    Log.info("!Starting server", "!address", net.address(), "!port", net.port(), "I/O workers", net.workers(), "sync", net.syncBufs(), "accept", blockingInfo);
    InetSocketAddress addr = new InetSocketAddress(net.address(), net.port());
    socket.setReceiveBufferSize(16 * 1024);
    socket.setReuseAddress(true);
    socket.bind(addr, MAX_PENDING_CONNECTIONS);
    Log.debug("Opened server socket", "address", addr);
    if (!net.blockingAccept()) {
      Log.debug("Registering accept selector");
      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }
    initWorkers();
  } else {
    throw U.rte("Cannot open socket!");
  }
}

代码示例来源:origin: org.apache.zookeeper/zookeeper

public void run() {
  while (!ss.socket().isClosed()) {
    try {
      selector.select(1000);
      Set<SelectionKey> selected;
      synchronized (this) {
        selected = selector.selectedKeys();
      Collections.shuffle(selectedList);
      for (SelectionKey k : selectedList) {
        if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
          SocketChannel sc = ((ServerSocketChannel) k
              .channel()).accept();
          InetAddress ia = sc.socket().getInetAddress();
          int cnxncount = getClientCnxnCount(ia);
          if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
            LOG.warn("Too many connections from " + ia
                 + " - max is " + maxClientCnxns );
            sc.close();
          } else {
            LOG.info("Accepted socket connection from "
                 + sc.socket().getRemoteSocketAddress());
            sc.configureBlocking(false);
            SelectionKey sk = sc.register(selector,
                SelectionKey.OP_READ);
            NIOServerCnxn cnxn = createConnection(sc, sk);
            sk.attach(cnxn);
            addCnxn(cnxn);

代码示例来源:origin: voldemort/voldemort

public void accept(SocketChannel socketChannel) {
  if(isClosed.get())
    throw new IllegalStateException("Cannot accept more channels, selector manager closed");
  socketChannelQueue.add(socketChannel);
  selector.wakeup();
}

代码示例来源:origin: kilim/kilim

public void wake() {
  update.set(true);
  if (Thread.currentThread() != selectorThread)
    sel.wakeup();
}
public void execute() throws Pausable, Exception {

代码示例来源:origin: lealone/Lealone

@Override
public void closeChannel(SocketChannel channel) {
  if (channel == null) {
    return;
  }
  for (SelectionKey key : selector.keys()) {
    if (key.channel() == channel && key.isValid()) {
      key.cancel();
      break;
    }
  }
  channels.remove(channel);
  Socket socket = channel.socket();
  if (socket != null) {
    try {
      socket.close();
    } catch (Exception e) {
    }
  }
  try {
    channel.close();
  } catch (Exception e) {
  }
}

代码示例来源:origin: voldemort/voldemort

public void close() {
  if(!isClosed.compareAndSet(false, true))
    return;
    for(SelectionKey sk: selector.keys()) {
      try {
        if(logger.isTraceEnabled())
          logger.trace("Closing SelectionKey's channel");
        sk.channel().close();
        Object attachment = sk.attachment();
        if(attachment instanceof Closeable) {
          IOUtils.closeQuietly((Closeable) attachment);
          logger.trace("Cancelling SelectionKey");
        sk.cancel();
      } catch(Exception e) {
        if(logger.isEnabledFor(Level.WARN))
    selector.close();
  } catch(Exception e) {
    if(logger.isEnabledFor(Level.WARN))

相关文章