java.nio.channels.Selector.select()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(159)

本文整理了Java中java.nio.channels.Selector.select()方法的一些代码示例,展示了Selector.select()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Selector.select()方法的具体详情如下:
包路径:java.nio.channels.Selector
类名称:Selector
方法名:select

Selector.select介绍

[英]Detects if any of the registered channels is ready for I/O operations according to its SelectionKey. This method does not return until at least one channel is ready, #wakeup() is invoked or the calling thread is interrupted.
[中]根据SelectionKey检测任何已注册的通道是否已准备好进行I/O操作。在至少一个通道就绪、调用#wakeup()或调用线程中断之前,此方法不会返回。

代码示例

代码示例来源:origin: iluwatar/java-design-patterns

private void eventLoop() throws IOException {
 while (true) {
  // honor interrupt request
  if (Thread.interrupted()) {
   break;
  }
  // honor any pending commands first
  processPendingCommands();
  /*
   * Synchronous event de-multiplexing happens here, this is blocking call which returns when it is possible to
   * initiate non-blocking operation on any of the registered channels.
   */
  selector.select();
  /*
   * Represents the events that have occurred on registered handles.
   */
  Set<SelectionKey> keys = selector.selectedKeys();
  Iterator<SelectionKey> iterator = keys.iterator();
  while (iterator.hasNext()) {
   SelectionKey key = iterator.next();
   if (!key.isValid()) {
    iterator.remove();
    continue;
   }
   processKey(key);
  }
  keys.clear();
 }
}

代码示例来源:origin: apache/activemq

private void fill(int n) throws IOException, ClosedChannelException {
    int bytesRead = -1;

    if ((n <= 0) || (n <= bb.remaining()))
      return;

    bb.compact();

    n = (bb.remaining() < n ? bb.remaining() : n);

    for (;;) {
      bytesRead = sc.read(bb);

      if (bytesRead == -1)
        throw new ClosedChannelException();

      n -= bytesRead;

      if (n <= 0)
        break;

      rs.select(0);
      rs.selectedKeys().clear();
    }

    bb.flip();
  }
}

代码示例来源:origin: wildfly/wildfly

public static void await(NioXnio nioXnio, SelectableChannel channel, int op, long time, TimeUnit unit) throws IOException {
    if (time <= 0) {
      await(nioXnio, channel, op);
      return;
    }
    Xnio.checkBlockingAllowed();
    final Selector selector = nioXnio.getSelector();
    final SelectionKey selectionKey;
    try {
      selectionKey = channel.register(selector, op);
    } catch (ClosedChannelException e) {
      return;
    }
    long timeoutInMillis = unit.toMillis(time);
    selector.select(timeoutInMillis == 0 ? 1: timeoutInMillis);
    selector.selectedKeys().clear();
    if (Thread.currentThread().isInterrupted()) {
      throw log.interruptedIO();
    }
    selectionKey.cancel();
    selector.selectNow();
  }
}

代码示例来源:origin: com.hazelcast/hazelcast-all

private void acceptLoop() throws IOException {
  while (!stop) {
    // block until new connection or interruption.
    int keyCount = selector.select();
    if (isInterrupted()) {
      break;
    }
    if (keyCount == 0) {
      continue;
    }
    Iterator<SelectionKey> it = selector.selectedKeys().iterator();
    handleSelectionKeys(it);
  }
}

代码示例来源:origin: looly/hutool

/**
 * 开始监听
 * 
 * @throws IOException IO异常
 */
private void doListen() throws IOException {
  while (0 != this.selector.select()) {
    // 返回已选择键的集合
    final Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
    while (keyIter.hasNext()) {
      handle(keyIter.next());
      keyIter.remove();
    }
  }
}

代码示例来源:origin: hazelcast/hazelcast-jet

private void acceptLoop() throws IOException {
  while (!stop) {
    // block until new connection or interruption.
    int keyCount = selector.select();
    if (isInterrupted()) {
      break;
    }
    if (keyCount == 0) {
      continue;
    }
    Iterator<SelectionKey> it = selector.selectedKeys().iterator();
    handleSelectionKeys(it);
  }
}

代码示例来源:origin: alibaba/cobar

@Override
public void run() {
  final Selector selector = this.selector;
  for (;;) {
    ++acceptCount;
    try {
      selector.select(1000L);
      Set<SelectionKey> keys = selector.selectedKeys();
      try {
        for (SelectionKey key : keys) {
          if (key.isValid() && key.isAcceptable()) {
            accept();
          } else {
            key.cancel();
          }
        }
      } finally {
        keys.clear();
      }
    } catch (Throwable e) {
      LOGGER.warn(getName(), e);
    }
  }
}

代码示例来源:origin: wildfly/wildfly

public static void await(NioXnio nioXnio, SelectableChannel channel, int op) throws IOException {
  if (NioXnio.IS_HP_UX) {
    // HP-UX has buggy write wakeup semantics
    await(nioXnio, channel, op, 1, TimeUnit.SECONDS);
    return;
  }
  Xnio.checkBlockingAllowed();
  final Selector selector = nioXnio.getSelector();
  final SelectionKey selectionKey;
  try {
    selectionKey = channel.register(selector, op);
  } catch (ClosedChannelException e) {
    return;
  }
  selector.select();
  selector.selectedKeys().clear();
  if (Thread.currentThread().isInterrupted()) {
    throw log.interruptedIO();
  }
  selectionKey.cancel();
  selector.selectNow();
}

代码示例来源:origin: looly/hutool

/**
 * 开始监听
 * 
 * @throws IOException IO异常
 */
private void doListen() throws IOException {
  while (0 != this.selector.select()) {
    // 返回已选择键的集合
    final Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
    while (keyIter.hasNext()) {
      handle(keyIter.next());
      keyIter.remove();
    }
  }
}

代码示例来源:origin: apache/incubator-gobblin

@Override
public void run() {
 try {
  Tunnel.this.server.register(this.selector, SelectionKey.OP_ACCEPT,
    ExecutorsUtils.loggingDecorator(new AcceptHandler(Tunnel.this.server, this.selector, Tunnel.this.config)));
  while (!Thread.interrupted()) {
   this.selector.select();
   Set<SelectionKey> selectionKeys = this.selector.selectedKeys();
   for (SelectionKey selectionKey : selectionKeys) {
    dispatch(selectionKey);
   }
   selectionKeys.clear();
  }
 } catch (IOException ioe) {
  LOG.error("Unhandled IOException. Tunnel will close", ioe);
 }
 LOG.info("Closing tunnel");
}

代码示例来源:origin: code4craft/netty-learning

public void run() {
  try {
    while (!Thread.interrupted()) {
      //循环,等待事件
      selector.select();
      Set selected = selector.selectedKeys();
      Iterator it = selected.iterator();
      while (it.hasNext())
        //调用handler,处理事件
        dispatch((SelectionKey) (it.next()));
      selected.clear();
    }
  } catch (IOException ex) { /* ... */
  }
}

代码示例来源:origin: alibaba/cobar

@Override
public void run() {
  final Selector selector = this.selector;
  for (;;) {
    ++connectCount;
    try {
      selector.select(1000L);
      connect(selector);
      Set<SelectionKey> keys = selector.selectedKeys();
      try {
        for (SelectionKey key : keys) {
          Object att = key.attachment();
          if (att != null && key.isValid() && key.isConnectable()) {
            finishConnect(key, att);
          } else {
            key.cancel();
          }
        }
      } finally {
        keys.clear();
      }
    } catch (Throwable e) {
      LOGGER.warn(name, e);
    }
  }
}

代码示例来源:origin: apache/zookeeper

private void select() {
  try {
    selector.select();
    Iterator<SelectionKey> selectedKeys =
      selector.selectedKeys().iterator();
    while (!stopped && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();
      if (!key.isValid()) {
        continue;
      }
      if (key.isAcceptable()) {
        if (!doAccept()) {
          // If unable to pull a new connection off the accept
          // queue, pause accepting to give us time to free
          // up file descriptors and so the accept thread
          // doesn't spin in a tight loop.
          pauseAccept(10);
        }
      } else {
        LOG.warn("Unexpected ops in accept select "
             + key.readyOps());
      }
    }
  } catch (IOException e) {
    LOG.warn("Ignoring IOException while selecting", e);
  }
}

代码示例来源:origin: apache/nifi

private void selectSocketChannelKeys() throws IOException {
  // once a channel associated with a key in this selector is 'ready', it causes this select to immediately return.
  // thus, for each trip through the run() we only get hit with one real timeout...the one in selectServerSocketKeys.
  int numSelected = socketChannelSelector.select(timeout);
  if (numSelected == 0) {
    return;
  }
  for (SelectionKey socketChannelKey : socketChannelSelector.selectedKeys()) {
    final SelectableChannel channel = socketChannelKey.channel();
    AbstractChannelReader reader = null;
    // there are 2 kinds of channels in this selector, both which have their own readers and are executed in their own
    // threads. We will get here whenever a new SocketChannel is created due to an incoming connection. However,
    // for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only
    // way to tell if it's new is the lack of an attachment.
    if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) {
      reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory, readSingleDatagram);
      socketChannelKey.attach(reader);
      final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L, channelReaderFrequencyMilliseconds.get(),
          TimeUnit.MILLISECONDS);
      reader.setScheduledFuture(readerFuture);
    }
    if (reader != null && LOGGER.isDebugEnabled()) {
      LOGGER.debug(this + " New Connection established.  Server channel: " + channel + " Reader: " + reader);
    }
  }
}

代码示例来源:origin: rapidoid/rapidoid

private void processNonBlocking() {
  try {
    selector.select(50);
  } catch (IOException e) {
    Log.error("Select failed!", e);
  }
  try {
    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    synchronized (selectedKeys) {
      Iterator<?> iter = selectedKeys.iterator();
      while (iter.hasNext()) {
        SelectionKey key = (SelectionKey) iter.next();
        iter.remove();
        acceptChannel((ServerSocketChannel) key.channel());
      }
    }
  } catch (ClosedSelectorException e) {
    // do nothing
  }
}

代码示例来源:origin: alibaba/cobar

++reactCount;
try {
  selector.select(1000L);
  register(selector);
  Set<SelectionKey> keys = selector.selectedKeys();
  try {
    for (SelectionKey key : keys) {

代码示例来源:origin: apache/zookeeper

private void select() {
  try {
    selector.select();
    Set<SelectionKey> selected = selector.selectedKeys();
    ArrayList<SelectionKey> selectedList =
      new ArrayList<SelectionKey>(selected);
    Collections.shuffle(selectedList);
    Iterator<SelectionKey> selectedKeys = selectedList.iterator();
    while(!stopped && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selected.remove(key);
      if (!key.isValid()) {
        cleanupSelectionKey(key);
        continue;
      }
      if (key.isReadable() || key.isWritable()) {
        handleIO(key);
      } else {
        LOG.warn("Unexpected ops in select " + key.readyOps());
      }
    }
  } catch (IOException e) {
    LOG.warn("Ignoring IOException while selecting", e);
  }
}

代码示例来源:origin: wildfly/wildfly

@Override
protected void doConnect(SocketAddress remoteAddress,
             SocketAddress localAddress) throws Exception {
  if (localAddress != null) {
    ch.bind(localAddress);
  }
  boolean success = false;
  try {
    ch.connect(remoteAddress);
    boolean  finishConnect = false;
    while (!finishConnect) {
      if (connectSelector.select(SO_TIMEOUT) >= 0) {
        final Set<SelectionKey> selectionKeys = connectSelector.selectedKeys();
        for (SelectionKey key : selectionKeys) {
          if (key.isConnectable()) {
            selectionKeys.clear();
            finishConnect = true;
            break;
          }
        }
        selectionKeys.clear();
      }
    }
    success = ch.finishConnect();
  } finally {
    if (!success) {
      doClose();
    }
  }
}

代码示例来源:origin: oldmanpushcart/greys-anatomy

@Override
  public void run() {
    while (!isInterrupted()
        && isBind()) {
      try {
        while (selector.isOpen()
            && selector.select() > 0) {
          final Iterator<SelectionKey> it = selector.selectedKeys().iterator();
          while (it.hasNext()) {
            final SelectionKey key = it.next();
            it.remove();
            // do ssc accept
            if (key.isValid() && key.isAcceptable()) {
              doAccept(key, selector, configure);
            }
            // do sc read
            if (key.isValid() && key.isReadable()) {
              doRead(byteBuffer, key);
            }
          }
        }
      } catch (IOException e) {
        logger.warn("selector failed.", e);
      } catch (ClosedSelectorException e) {
        logger.debug("selector closed.", e);
      }
    }
  }
};

代码示例来源:origin: org.apache.zookeeper/zookeeper

public void run() {
  while (!ss.socket().isClosed()) {
    try {
      selector.select(1000);
      Set<SelectionKey> selected;
      synchronized (this) {
        selected = selector.selectedKeys();

相关文章