本文整理了Java中java.nio.channels.Selector
类的一些代码示例,展示了Selector
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Selector
类的具体详情如下:
包路径:java.nio.channels.Selector
类名称:Selector
[英]A controller for the selection of SelectableChannel objects. Selectable channels can be registered with a selector and get a SelectionKey that represents the registration. The keys are also added to the selector's key set. Selection keys can be canceled so that the corresponding channel is no longer registered with the selector.
By invoking the select method, the key set is checked and all keys that have been canceled since last select operation are moved to the set of canceled keys. During the select operation, the channels registered with this selector are checked to see whether they are ready for operation according to their SelectionKey.
[中]用于选择可选择通道对象的控制器。可选频道可以通过选择器注册,并获得代表注册的SelectionKey。这些键也会添加到选择器的键集中。可以取消选择键,使相应的频道不再在选择器中注册。
通过调用select方法,将检查密钥集,并将自上次选择操作以来已取消的所有密钥移动到已取消密钥集。在选择操作期间,将根据SelectionKey检查使用该选择器注册的通道是否已准备好进行操作。
代码示例来源:origin: iluwatar/java-design-patterns
private void eventLoop() throws IOException {
while (true) {
// honor interrupt request
if (Thread.interrupted()) {
break;
}
// honor any pending commands first
processPendingCommands();
/*
* Synchronous event de-multiplexing happens here, this is blocking call which returns when it is possible to
* initiate non-blocking operation on any of the registered channels.
*/
selector.select();
/*
* Represents the events that have occurred on registered handles.
*/
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (!key.isValid()) {
iterator.remove();
continue;
}
processKey(key);
}
keys.clear();
}
}
代码示例来源:origin: iluwatar/java-design-patterns
/**
* Creates a reactor which will use provided {@code dispatcher} to dispatch events. The application can provide
* various implementations of dispatcher which suits its needs.
*
* @param dispatcher
* a non-null dispatcher used to dispatch events on registered channels.
* @throws IOException
* if any I/O error occurs.
*/
public NioReactor(Dispatcher dispatcher) throws IOException {
this.dispatcher = dispatcher;
this.selector = Selector.open();
}
代码示例来源:origin: apache/activemq
public void close() throws IOException {
if (rs.isOpen()) {
rs.close();
if (sc.isOpen()) {
sc.socket().shutdownInput();
sc.socket().close();
}
bb = null;
sc = null;
}
}
代码示例来源:origin: netty/netty
int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
代码示例来源:origin: iluwatar/java-design-patterns
/**
* Stops the reactor and related resources such as dispatcher.
*
* @throws InterruptedException
* if interrupted while stopping the reactor.
* @throws IOException
* if any I/O error occurs.
*/
public void stop() throws InterruptedException, IOException {
reactorMain.shutdownNow();
selector.wakeup();
reactorMain.awaitTermination(4, TimeUnit.SECONDS);
selector.close();
LOGGER.info("Reactor stopped");
}
代码示例来源:origin: apache/kafka
/**
* Check for data, waiting up to the given timeout.
*
* @param timeoutMs Length of time to wait, in milliseconds, which must be non-negative
* @return The number of keys ready
*/
private int select(long timeoutMs) throws IOException {
if (timeoutMs < 0L)
throw new IllegalArgumentException("timeout should be >= 0");
if (timeoutMs == 0L)
return this.nioSelector.selectNow();
else
return this.nioSelector.select(timeoutMs);
}
代码示例来源:origin: normanmaurer/netty-in-action
public void serve(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
ss.bind(address);
Selector selector = Selector.open();
for (;;){
try {
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
if (key.isWritable()) {
if (client.write(buffer) == 0) {
break;
key.channel().close();
} catch (IOException cex) {
代码示例来源:origin: wildfly/wildfly
protected void start(boolean direct) throws Exception {
selector=Selector.open();
ch=ServerSocketChannel.open();
ch.bind(new InetSocketAddress("0.0.0.0", 7500));
ch.configureBlocking(false);
ch.register(selector, SelectionKey.OP_ACCEPT, null);
System.out.println("-- server ready");
selector.select();
Set<SelectionKey> keys=selector.selectedKeys();
for(Iterator<SelectionKey> it=keys.iterator(); it.hasNext();) {
SelectionKey key=it.next();
if(!key.isValid()) {
it.remove();
continue;
if(key.isAcceptable()) {
SocketChannel client_ch=ch.accept();
if(client_ch != null) { // accept() may return null...
System.out.printf("accepted connection from %s\n", client_ch.getRemoteAddress());
client_ch.configureBlocking(false);
client_ch.register(selector, SelectionKey.OP_READ, create(SIZE, direct));
else if(key.isReadable()) {
if(!handle((SocketChannel)key.channel(), (ByteBuffer)key.attachment())) {
key.cancel();
代码示例来源:origin: AsyncHttpClient/async-http-client
public SocksProxy(int runningTime) throws IOException {
ServerSocketChannel socks = ServerSocketChannel.open();
socks.socket().bind(new InetSocketAddress(8000));
socks.configureBlocking(false);
Selector select = Selector.open();
socks.register(select, SelectionKey.OP_ACCEPT);
select.select(5000);
Set<SelectionKey> keys = select.selectedKeys();
for (SelectionKey k : keys) {
if (!k.isValid())
continue;
if (k.isAcceptable() && k.channel() == socks) {
SocketChannel csock = socks.accept();
continue;
addClient(csock);
csock.register(select, SelectionKey.OP_READ);
} else if (k.isReadable()) {
cl.client.close();
if (cl.remote != null)
cl.remote.close();
k.cancel();
clients.remove(cl);
代码示例来源:origin: alibaba/cobar
public NIOAcceptor(String name, int port, FrontendConnectionFactory factory) throws IOException {
super.setName(name);
this.port = port;
this.selector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
this.serverChannel.socket().bind(new InetSocketAddress(port));
this.serverChannel.configureBlocking(false);
this.serverChannel.register(selector, SelectionKey.OP_ACCEPT);
this.factory = factory;
}
代码示例来源:origin: kilim/kilim
try {
if (!running) {
Iterator<SelectionKey> it = sel.keys().iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
sk.cancel();
Object o = sk.attachment();
if (o instanceof SockEvent && ((SockEvent)o).ch instanceof ServerSocketChannel) {
((ServerSocketChannel)((SockEvent)o).ch).close();
} catch (IOException ignore) {}
if (update.get()) n = sel.selectNow();
else n = sel.select();
} catch (IOException ignore) { n = 0; ignore.printStackTrace(); }
if (n > 0) {
Iterator<SelectionKey> it = sel.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
it.remove();
Object o = sk.attachment();
sk.interestOps(0);
assert(o instanceof SockEvent);
if (update.getAndSet(false))
regtask.run();
代码示例来源:origin: voldemort/voldemort
socketChannel = SocketChannel.open();
socketChannel.socket().setReceiveBufferSize(this.socketBufferSize);
socketChannel.socket().setSendBufferSize(this.socketBufferSize);
socketChannel.socket().setTcpNoDelay(true);
socketChannel.socket().setSoTimeout(soTimeoutMs);
socketChannel.socket().setKeepAlive(this.socketKeepAlive);
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(dest.getHost(), dest.getPort()));
selector.wakeup();
} catch(Exception e) {
代码示例来源:origin: org.apache.hadoop/hadoop-common
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
Reader reader = new Reader(
"Socket Reader #" + (i + 1) + " for port " + port);
readers[i] = reader;
reader.start();
}
// Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
代码示例来源:origin: apache/nifi
@Override
public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException {
stopped = false;
executor = Executors.newFixedThreadPool(maxConnections);
final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
if (maxBufferSize > 0) {
serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
final int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
if (actualReceiveBufSize < maxBufferSize) {
logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to "
+ actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's "
+ "maximum receive buffer");
}
}
serverSocketChannel.socket().bind(new InetSocketAddress(nicAddress, port));
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
代码示例来源:origin: rapidoid/rapidoid
private void openSocket() throws IOException {
U.notNull(net.protocol(), "protocol");
U.notNull(net.helperClass(), "helperClass");
String blockingInfo = net.blockingAccept() ? "blocking" : "non-blocking";
Log.debug("Initializing server", "address", net.address(), "port", net.port(), "sync", net.syncBufs(), "accept", blockingInfo);
serverSocketChannel = ServerSocketChannel.open();
if ((serverSocketChannel.isOpen()) && (selector.isOpen())) {
serverSocketChannel.configureBlocking(net.blockingAccept());
ServerSocket socket = serverSocketChannel.socket();
Log.info("!Starting server", "!address", net.address(), "!port", net.port(), "I/O workers", net.workers(), "sync", net.syncBufs(), "accept", blockingInfo);
InetSocketAddress addr = new InetSocketAddress(net.address(), net.port());
socket.setReceiveBufferSize(16 * 1024);
socket.setReuseAddress(true);
socket.bind(addr, MAX_PENDING_CONNECTIONS);
Log.debug("Opened server socket", "address", addr);
if (!net.blockingAccept()) {
Log.debug("Registering accept selector");
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
initWorkers();
} else {
throw U.rte("Cannot open socket!");
}
}
代码示例来源:origin: org.apache.zookeeper/zookeeper
public void run() {
while (!ss.socket().isClosed()) {
try {
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
Collections.shuffle(selectedList);
for (SelectionKey k : selectedList) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k
.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia
+ " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector,
SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
代码示例来源:origin: voldemort/voldemort
public void accept(SocketChannel socketChannel) {
if(isClosed.get())
throw new IllegalStateException("Cannot accept more channels, selector manager closed");
socketChannelQueue.add(socketChannel);
selector.wakeup();
}
代码示例来源:origin: kilim/kilim
public void wake() {
update.set(true);
if (Thread.currentThread() != selectorThread)
sel.wakeup();
}
public void execute() throws Pausable, Exception {
代码示例来源:origin: lealone/Lealone
@Override
public void closeChannel(SocketChannel channel) {
if (channel == null) {
return;
}
for (SelectionKey key : selector.keys()) {
if (key.channel() == channel && key.isValid()) {
key.cancel();
break;
}
}
channels.remove(channel);
Socket socket = channel.socket();
if (socket != null) {
try {
socket.close();
} catch (Exception e) {
}
}
try {
channel.close();
} catch (Exception e) {
}
}
代码示例来源:origin: voldemort/voldemort
public void close() {
if(!isClosed.compareAndSet(false, true))
return;
for(SelectionKey sk: selector.keys()) {
try {
if(logger.isTraceEnabled())
logger.trace("Closing SelectionKey's channel");
sk.channel().close();
Object attachment = sk.attachment();
if(attachment instanceof Closeable) {
IOUtils.closeQuietly((Closeable) attachment);
logger.trace("Cancelling SelectionKey");
sk.cancel();
} catch(Exception e) {
if(logger.isEnabledFor(Level.WARN))
selector.close();
} catch(Exception e) {
if(logger.isEnabledFor(Level.WARN))
内容来源于网络,如有侵权,请联系作者删除!