[英]A controller for the selection of SelectableChannel objects. Selectable channels can be registered with a selector and get a SelectionKey that represents the registration. The keys are also added to the selector's key set. Selection keys can be canceled so that the corresponding channel is no longer registered with the selector.

By invoking the select method, the key set is checked and all keys that have been canceled since last select operation are moved to the set of canceled keys. During the select operation, the channels registered with this selector are checked to see whether they are ready for operation according to their SelectionKey.


代码示例来源:origin: iluwatar/java-design-patterns

  1. private void eventLoop() throws IOException {
  2. while (true) {
  3. // honor interrupt request
  4. if (Thread.interrupted()) {
  5. break;
  6. }
  7. // honor any pending commands first
  8. processPendingCommands();
  9. /*
  10. * Synchronous event de-multiplexing happens here, this is blocking call which returns when it is possible to
  11. * initiate non-blocking operation on any of the registered channels.
  12. */
  14. /*
  15. * Represents the events that have occurred on registered handles.
  16. */
  17. Set<SelectionKey> keys = selector.selectedKeys();
  18. Iterator<SelectionKey> iterator = keys.iterator();
  19. while (iterator.hasNext()) {
  20. SelectionKey key =;
  21. if (!key.isValid()) {
  22. iterator.remove();
  23. continue;
  24. }
  25. processKey(key);
  26. }
  27. keys.clear();
  28. }
  29. }

代码示例来源:origin: iluwatar/java-design-patterns

  1. /**
  2. * Creates a reactor which will use provided {@code dispatcher} to dispatch events. The application can provide
  3. * various implementations of dispatcher which suits its needs.
  4. *
  5. * @param dispatcher
  6. * a non-null dispatcher used to dispatch events on registered channels.
  7. * @throws IOException
  8. * if any I/O error occurs.
  9. */
  10. public NioReactor(Dispatcher dispatcher) throws IOException {
  11. this.dispatcher = dispatcher;
  12. this.selector =;
  13. }

代码示例来源:origin: apache/activemq

  1. public void close() throws IOException {
  2. if (rs.isOpen()) {
  3. rs.close();
  4. if (sc.isOpen()) {
  5. sc.socket().shutdownInput();
  6. sc.socket().close();
  7. }
  8. bb = null;
  9. sc = null;
  10. }
  11. }

代码示例来源:origin: netty/netty

  1. int selectNow() throws IOException {
  2. try {
  3. return selector.selectNow();
  4. } finally {
  5. // restore wakeup state if needed
  6. if (wakenUp.get()) {
  7. selector.wakeup();
  8. }
  9. }
  10. }

代码示例来源:origin: iluwatar/java-design-patterns

  1. /**
  2. * Stops the reactor and related resources such as dispatcher.
  3. *
  4. * @throws InterruptedException
  5. * if interrupted while stopping the reactor.
  6. * @throws IOException
  7. * if any I/O error occurs.
  8. */
  9. public void stop() throws InterruptedException, IOException {
  10. reactorMain.shutdownNow();
  11. selector.wakeup();
  12. reactorMain.awaitTermination(4, TimeUnit.SECONDS);
  13. selector.close();
  14."Reactor stopped");
  15. }

代码示例来源:origin: apache/kafka

  1. /**
  2. * Check for data, waiting up to the given timeout.
  3. *
  4. * @param timeoutMs Length of time to wait, in milliseconds, which must be non-negative
  5. * @return The number of keys ready
  6. */
  7. private int select(long timeoutMs) throws IOException {
  8. if (timeoutMs < 0L)
  9. throw new IllegalArgumentException("timeout should be >= 0");
  10. if (timeoutMs == 0L)
  11. return this.nioSelector.selectNow();
  12. else
  13. return;
  14. }

代码示例来源:origin: normanmaurer/netty-in-action

  1. public void serve(int port) throws IOException {
  2. ServerSocketChannel serverChannel =;
  3. serverChannel.configureBlocking(false);
  4. ServerSocket ss = serverChannel.socket();
  5. InetSocketAddress address = new InetSocketAddress(port);
  6. ss.bind(address);
  7. Selector selector =;
  8. for (;;){
  9. try {
  11. } catch (IOException ex) {
  12. ex.printStackTrace();
  13. Set<SelectionKey> readyKeys = selector.selectedKeys();
  14. Iterator<SelectionKey> iterator = readyKeys.iterator();
  15. while (iterator.hasNext()) {
  16. if (key.isWritable()) {
  17. if (client.write(buffer) == 0) {
  18. break;
  20. } catch (IOException cex) {

代码示例来源:origin: wildfly/wildfly

  1. protected void start(boolean direct) throws Exception {
  4. ch.bind(new InetSocketAddress("", 7500));
  5. ch.configureBlocking(false);
  6. ch.register(selector, SelectionKey.OP_ACCEPT, null);
  7. System.out.println("-- server ready");
  9. Set<SelectionKey> keys=selector.selectedKeys();
  10. for(Iterator<SelectionKey> it=keys.iterator(); it.hasNext();) {
  11. SelectionKey;
  12. if(!key.isValid()) {
  13. it.remove();
  14. continue;
  15. if(key.isAcceptable()) {
  16. SocketChannel client_ch=ch.accept();
  17. if(client_ch != null) { // accept() may return null...
  18. System.out.printf("accepted connection from %s\n", client_ch.getRemoteAddress());
  19. client_ch.configureBlocking(false);
  20. client_ch.register(selector, SelectionKey.OP_READ, create(SIZE, direct));
  21. else if(key.isReadable()) {
  22. if(!handle((SocketChannel), (ByteBuffer)key.attachment())) {
  23. key.cancel();

代码示例来源:origin: AsyncHttpClient/async-http-client

  1. public SocksProxy(int runningTime) throws IOException {
  2. ServerSocketChannel socks =;
  3. socks.socket().bind(new InetSocketAddress(8000));
  4. socks.configureBlocking(false);
  5. Selector select =;
  6. socks.register(select, SelectionKey.OP_ACCEPT);
  8. Set<SelectionKey> keys = select.selectedKeys();
  9. for (SelectionKey k : keys) {
  10. if (!k.isValid())
  11. continue;
  12. if (k.isAcceptable() && == socks) {
  13. SocketChannel csock = socks.accept();
  14. continue;
  15. addClient(csock);
  16. csock.register(select, SelectionKey.OP_READ);
  17. } else if (k.isReadable()) {
  18. cl.client.close();
  19. if (cl.remote != null)
  20. cl.remote.close();
  21. k.cancel();
  22. clients.remove(cl);

代码示例来源:origin: alibaba/cobar

  1. public NIOAcceptor(String name, int port, FrontendConnectionFactory factory) throws IOException {
  2. super.setName(name);
  3. this.port = port;
  4. this.selector =;
  5. this.serverChannel =;
  6. this.serverChannel.socket().bind(new InetSocketAddress(port));
  7. this.serverChannel.configureBlocking(false);
  8. this.serverChannel.register(selector, SelectionKey.OP_ACCEPT);
  9. this.factory = factory;
  10. }

代码示例来源:origin: kilim/kilim

  1. try {
  2. if (!running) {
  3. Iterator<SelectionKey> it = sel.keys().iterator();
  4. while (it.hasNext()) {
  5. SelectionKey sk =;
  6. sk.cancel();
  7. Object o = sk.attachment();
  8. if (o instanceof SockEvent && ((SockEvent)o).ch instanceof ServerSocketChannel) {
  9. ((ServerSocketChannel)((SockEvent)o).ch).close();
  10. } catch (IOException ignore) {}
  11. if (update.get()) n = sel.selectNow();
  12. else n =;
  13. } catch (IOException ignore) { n = 0; ignore.printStackTrace(); }
  14. if (n > 0) {
  15. Iterator<SelectionKey> it = sel.selectedKeys().iterator();
  16. while (it.hasNext()) {
  17. SelectionKey sk =;
  18. it.remove();
  19. Object o = sk.attachment();
  20. sk.interestOps(0);
  21. assert(o instanceof SockEvent);
  22. if (update.getAndSet(false))

代码示例来源:origin: voldemort/voldemort

  1. socketChannel =;
  2. socketChannel.socket().setReceiveBufferSize(this.socketBufferSize);
  3. socketChannel.socket().setSendBufferSize(this.socketBufferSize);
  4. socketChannel.socket().setTcpNoDelay(true);
  5. socketChannel.socket().setSoTimeout(soTimeoutMs);
  6. socketChannel.socket().setKeepAlive(this.socketKeepAlive);
  7. socketChannel.configureBlocking(false);
  8. socketChannel.connect(new InetSocketAddress(dest.getHost(), dest.getPort()));
  9. selector.wakeup();
  10. } catch(Exception e) {

代码示例来源:origin: org.apache.hadoop/hadoop-common

  1. public Listener() throws IOException {
  2. address = new InetSocketAddress(bindAddress, port);
  3. // Create a new server socket and set to non blocking mode
  4. acceptChannel =;
  5. acceptChannel.configureBlocking(false);
  6. // Bind the server socket to the local host and port
  7. bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
  8. port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
  9. // create a selector;
  10. selector=;
  11. readers = new Reader[readThreads];
  12. for (int i = 0; i < readThreads; i++) {
  13. Reader reader = new Reader(
  14. "Socket Reader #" + (i + 1) + " for port " + port);
  15. readers[i] = reader;
  16. reader.start();
  17. }
  18. // Register accepts on the server socket with the selector.
  19. acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
  20. this.setName("IPC Server listener on " + port);
  21. this.setDaemon(true);
  22. }

代码示例来源:origin: apache/nifi

  1. @Override
  2. public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException {
  3. stopped = false;
  4. executor = Executors.newFixedThreadPool(maxConnections);
  5. final ServerSocketChannel serverSocketChannel =;
  6. serverSocketChannel.configureBlocking(false);
  7. if (maxBufferSize > 0) {
  8. serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
  9. final int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
  10. if (actualReceiveBufSize < maxBufferSize) {
  11. logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to "
  12. + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's "
  13. + "maximum receive buffer");
  14. }
  15. }
  16. serverSocketChannel.socket().bind(new InetSocketAddress(nicAddress, port));
  17. selector =;
  18. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  19. }

代码示例来源:origin: rapidoid/rapidoid

  1. private void openSocket() throws IOException {
  2. U.notNull(net.protocol(), "protocol");
  3. U.notNull(net.helperClass(), "helperClass");
  4. String blockingInfo = net.blockingAccept() ? "blocking" : "non-blocking";
  5. Log.debug("Initializing server", "address", net.address(), "port", net.port(), "sync", net.syncBufs(), "accept", blockingInfo);
  6. serverSocketChannel =;
  7. if ((serverSocketChannel.isOpen()) && (selector.isOpen())) {
  8. serverSocketChannel.configureBlocking(net.blockingAccept());
  9. ServerSocket socket = serverSocketChannel.socket();
  10."!Starting server", "!address", net.address(), "!port", net.port(), "I/O workers", net.workers(), "sync", net.syncBufs(), "accept", blockingInfo);
  11. InetSocketAddress addr = new InetSocketAddress(net.address(), net.port());
  12. socket.setReceiveBufferSize(16 * 1024);
  13. socket.setReuseAddress(true);
  14. socket.bind(addr, MAX_PENDING_CONNECTIONS);
  15. Log.debug("Opened server socket", "address", addr);
  16. if (!net.blockingAccept()) {
  17. Log.debug("Registering accept selector");
  18. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  19. }
  20. initWorkers();
  21. } else {
  22. throw U.rte("Cannot open socket!");
  23. }
  24. }

代码示例来源:origin: org.apache.zookeeper/zookeeper

  1. public void run() {
  2. while (!ss.socket().isClosed()) {
  3. try {
  5. Set<SelectionKey> selected;
  6. synchronized (this) {
  7. selected = selector.selectedKeys();
  8. Collections.shuffle(selectedList);
  9. for (SelectionKey k : selectedList) {
  10. if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
  11. SocketChannel sc = ((ServerSocketChannel) k
  12. .channel()).accept();
  13. InetAddress ia = sc.socket().getInetAddress();
  14. int cnxncount = getClientCnxnCount(ia);
  15. if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
  16. LOG.warn("Too many connections from " + ia
  17. + " - max is " + maxClientCnxns );
  18. sc.close();
  19. } else {
  20."Accepted socket connection from "
  21. + sc.socket().getRemoteSocketAddress());
  22. sc.configureBlocking(false);
  23. SelectionKey sk = sc.register(selector,
  24. SelectionKey.OP_READ);
  25. NIOServerCnxn cnxn = createConnection(sc, sk);
  26. sk.attach(cnxn);
  27. addCnxn(cnxn);

代码示例来源:origin: voldemort/voldemort

  1. public void accept(SocketChannel socketChannel) {
  2. if(isClosed.get())
  3. throw new IllegalStateException("Cannot accept more channels, selector manager closed");
  4. socketChannelQueue.add(socketChannel);
  5. selector.wakeup();
  6. }

代码示例来源:origin: kilim/kilim

  1. public void wake() {
  2. update.set(true);
  3. if (Thread.currentThread() != selectorThread)
  4. sel.wakeup();
  5. }
  6. public void execute() throws Pausable, Exception {

代码示例来源:origin: lealone/Lealone

  1. @Override
  2. public void closeChannel(SocketChannel channel) {
  3. if (channel == null) {
  4. return;
  5. }
  6. for (SelectionKey key : selector.keys()) {
  7. if ( == channel && key.isValid()) {
  8. key.cancel();
  9. break;
  10. }
  11. }
  12. channels.remove(channel);
  13. Socket socket = channel.socket();
  14. if (socket != null) {
  15. try {
  16. socket.close();
  17. } catch (Exception e) {
  18. }
  19. }
  20. try {
  21. channel.close();
  22. } catch (Exception e) {
  23. }
  24. }

代码示例来源:origin: voldemort/voldemort

  1. public void close() {
  2. if(!isClosed.compareAndSet(false, true))
  3. return;
  4. for(SelectionKey sk: selector.keys()) {
  5. try {
  6. if(logger.isTraceEnabled())
  7. logger.trace("Closing SelectionKey's channel");
  9. Object attachment = sk.attachment();
  10. if(attachment instanceof Closeable) {
  11. IOUtils.closeQuietly((Closeable) attachment);
  12. logger.trace("Cancelling SelectionKey");
  13. sk.cancel();
  14. } catch(Exception e) {
  15. if(logger.isEnabledFor(Level.WARN))
  16. selector.close();
  17. } catch(Exception e) {
  18. if(logger.isEnabledFor(Level.WARN))
