本文整理了Java中java.nio.channels.Selector.select()
方法的一些代码示例,展示了Selector.select()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Selector.select()
方法的具体详情如下:
包路径:java.nio.channels.Selector
类名称:Selector
方法名:select
[英]Detects if any of the registered channels is ready for I/O operations according to its SelectionKey. This method does not return until at least one channel is ready, #wakeup() is invoked or the calling thread is interrupted.
[中]根据SelectionKey检测任何已注册的通道是否已准备好进行I/O操作。在至少一个通道就绪、调用#wakeup()或调用线程中断之前,此方法不会返回。
代码示例来源:origin: iluwatar/java-design-patterns
private void eventLoop() throws IOException {
while (true) {
// honor interrupt request
if (Thread.interrupted()) {
break;
}
// honor any pending commands first
processPendingCommands();
/*
* Synchronous event de-multiplexing happens here, this is blocking call which returns when it is possible to
* initiate non-blocking operation on any of the registered channels.
*/
selector.select();
/*
* Represents the events that have occurred on registered handles.
*/
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (!key.isValid()) {
iterator.remove();
continue;
}
processKey(key);
}
keys.clear();
}
}
代码示例来源:origin: apache/activemq
private void fill(int n) throws IOException, ClosedChannelException {
int bytesRead = -1;
if ((n <= 0) || (n <= bb.remaining()))
return;
bb.compact();
n = (bb.remaining() < n ? bb.remaining() : n);
for (;;) {
bytesRead = sc.read(bb);
if (bytesRead == -1)
throw new ClosedChannelException();
n -= bytesRead;
if (n <= 0)
break;
rs.select(0);
rs.selectedKeys().clear();
}
bb.flip();
}
}
代码示例来源:origin: wildfly/wildfly
public static void await(NioXnio nioXnio, SelectableChannel channel, int op, long time, TimeUnit unit) throws IOException {
if (time <= 0) {
await(nioXnio, channel, op);
return;
}
Xnio.checkBlockingAllowed();
final Selector selector = nioXnio.getSelector();
final SelectionKey selectionKey;
try {
selectionKey = channel.register(selector, op);
} catch (ClosedChannelException e) {
return;
}
long timeoutInMillis = unit.toMillis(time);
selector.select(timeoutInMillis == 0 ? 1: timeoutInMillis);
selector.selectedKeys().clear();
if (Thread.currentThread().isInterrupted()) {
throw log.interruptedIO();
}
selectionKey.cancel();
selector.selectNow();
}
}
代码示例来源:origin: com.hazelcast/hazelcast-all
private void acceptLoop() throws IOException {
while (!stop) {
// block until new connection or interruption.
int keyCount = selector.select();
if (isInterrupted()) {
break;
}
if (keyCount == 0) {
continue;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
handleSelectionKeys(it);
}
}
代码示例来源:origin: looly/hutool
/**
* 开始监听
*
* @throws IOException IO异常
*/
private void doListen() throws IOException {
while (0 != this.selector.select()) {
// 返回已选择键的集合
final Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
while (keyIter.hasNext()) {
handle(keyIter.next());
keyIter.remove();
}
}
}
代码示例来源:origin: hazelcast/hazelcast-jet
private void acceptLoop() throws IOException {
while (!stop) {
// block until new connection or interruption.
int keyCount = selector.select();
if (isInterrupted()) {
break;
}
if (keyCount == 0) {
continue;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
handleSelectionKeys(it);
}
}
代码示例来源:origin: alibaba/cobar
@Override
public void run() {
final Selector selector = this.selector;
for (;;) {
++acceptCount;
try {
selector.select(1000L);
Set<SelectionKey> keys = selector.selectedKeys();
try {
for (SelectionKey key : keys) {
if (key.isValid() && key.isAcceptable()) {
accept();
} else {
key.cancel();
}
}
} finally {
keys.clear();
}
} catch (Throwable e) {
LOGGER.warn(getName(), e);
}
}
}
代码示例来源:origin: wildfly/wildfly
public static void await(NioXnio nioXnio, SelectableChannel channel, int op) throws IOException {
if (NioXnio.IS_HP_UX) {
// HP-UX has buggy write wakeup semantics
await(nioXnio, channel, op, 1, TimeUnit.SECONDS);
return;
}
Xnio.checkBlockingAllowed();
final Selector selector = nioXnio.getSelector();
final SelectionKey selectionKey;
try {
selectionKey = channel.register(selector, op);
} catch (ClosedChannelException e) {
return;
}
selector.select();
selector.selectedKeys().clear();
if (Thread.currentThread().isInterrupted()) {
throw log.interruptedIO();
}
selectionKey.cancel();
selector.selectNow();
}
代码示例来源:origin: looly/hutool
/**
* 开始监听
*
* @throws IOException IO异常
*/
private void doListen() throws IOException {
while (0 != this.selector.select()) {
// 返回已选择键的集合
final Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
while (keyIter.hasNext()) {
handle(keyIter.next());
keyIter.remove();
}
}
}
代码示例来源:origin: apache/incubator-gobblin
@Override
public void run() {
try {
Tunnel.this.server.register(this.selector, SelectionKey.OP_ACCEPT,
ExecutorsUtils.loggingDecorator(new AcceptHandler(Tunnel.this.server, this.selector, Tunnel.this.config)));
while (!Thread.interrupted()) {
this.selector.select();
Set<SelectionKey> selectionKeys = this.selector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
dispatch(selectionKey);
}
selectionKeys.clear();
}
} catch (IOException ioe) {
LOG.error("Unhandled IOException. Tunnel will close", ioe);
}
LOG.info("Closing tunnel");
}
代码示例来源:origin: code4craft/netty-learning
public void run() {
try {
while (!Thread.interrupted()) {
//循环,等待事件
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
//调用handler,处理事件
dispatch((SelectionKey) (it.next()));
selected.clear();
}
} catch (IOException ex) { /* ... */
}
}
代码示例来源:origin: alibaba/cobar
@Override
public void run() {
final Selector selector = this.selector;
for (;;) {
++connectCount;
try {
selector.select(1000L);
connect(selector);
Set<SelectionKey> keys = selector.selectedKeys();
try {
for (SelectionKey key : keys) {
Object att = key.attachment();
if (att != null && key.isValid() && key.isConnectable()) {
finishConnect(key, att);
} else {
key.cancel();
}
}
} finally {
keys.clear();
}
} catch (Throwable e) {
LOGGER.warn(name, e);
}
}
}
代码示例来源:origin: apache/zookeeper
private void select() {
try {
selector.select();
Iterator<SelectionKey> selectedKeys =
selector.selectedKeys().iterator();
while (!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
if (!doAccept()) {
// If unable to pull a new connection off the accept
// queue, pause accepting to give us time to free
// up file descriptors and so the accept thread
// doesn't spin in a tight loop.
pauseAccept(10);
}
} else {
LOG.warn("Unexpected ops in accept select "
+ key.readyOps());
}
}
} catch (IOException e) {
LOG.warn("Ignoring IOException while selecting", e);
}
}
代码示例来源:origin: apache/nifi
private void selectSocketChannelKeys() throws IOException {
// once a channel associated with a key in this selector is 'ready', it causes this select to immediately return.
// thus, for each trip through the run() we only get hit with one real timeout...the one in selectServerSocketKeys.
int numSelected = socketChannelSelector.select(timeout);
if (numSelected == 0) {
return;
}
for (SelectionKey socketChannelKey : socketChannelSelector.selectedKeys()) {
final SelectableChannel channel = socketChannelKey.channel();
AbstractChannelReader reader = null;
// there are 2 kinds of channels in this selector, both which have their own readers and are executed in their own
// threads. We will get here whenever a new SocketChannel is created due to an incoming connection. However,
// for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only
// way to tell if it's new is the lack of an attachment.
if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) {
reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory, readSingleDatagram);
socketChannelKey.attach(reader);
final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L, channelReaderFrequencyMilliseconds.get(),
TimeUnit.MILLISECONDS);
reader.setScheduledFuture(readerFuture);
}
if (reader != null && LOGGER.isDebugEnabled()) {
LOGGER.debug(this + " New Connection established. Server channel: " + channel + " Reader: " + reader);
}
}
}
代码示例来源:origin: rapidoid/rapidoid
private void processNonBlocking() {
try {
selector.select(50);
} catch (IOException e) {
Log.error("Select failed!", e);
}
try {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
synchronized (selectedKeys) {
Iterator<?> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
acceptChannel((ServerSocketChannel) key.channel());
}
}
} catch (ClosedSelectorException e) {
// do nothing
}
}
代码示例来源:origin: alibaba/cobar
++reactCount;
try {
selector.select(1000L);
register(selector);
Set<SelectionKey> keys = selector.selectedKeys();
try {
for (SelectionKey key : keys) {
代码示例来源:origin: apache/zookeeper
private void select() {
try {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
ArrayList<SelectionKey> selectedList =
new ArrayList<SelectionKey>(selected);
Collections.shuffle(selectedList);
Iterator<SelectionKey> selectedKeys = selectedList.iterator();
while(!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selected.remove(key);
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
if (key.isReadable() || key.isWritable()) {
handleIO(key);
} else {
LOG.warn("Unexpected ops in select " + key.readyOps());
}
}
} catch (IOException e) {
LOG.warn("Ignoring IOException while selecting", e);
}
}
代码示例来源:origin: wildfly/wildfly
@Override
protected void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress) throws Exception {
if (localAddress != null) {
ch.bind(localAddress);
}
boolean success = false;
try {
ch.connect(remoteAddress);
boolean finishConnect = false;
while (!finishConnect) {
if (connectSelector.select(SO_TIMEOUT) >= 0) {
final Set<SelectionKey> selectionKeys = connectSelector.selectedKeys();
for (SelectionKey key : selectionKeys) {
if (key.isConnectable()) {
selectionKeys.clear();
finishConnect = true;
break;
}
}
selectionKeys.clear();
}
}
success = ch.finishConnect();
} finally {
if (!success) {
doClose();
}
}
}
代码示例来源:origin: oldmanpushcart/greys-anatomy
@Override
public void run() {
while (!isInterrupted()
&& isBind()) {
try {
while (selector.isOpen()
&& selector.select() > 0) {
final Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
final SelectionKey key = it.next();
it.remove();
// do ssc accept
if (key.isValid() && key.isAcceptable()) {
doAccept(key, selector, configure);
}
// do sc read
if (key.isValid() && key.isReadable()) {
doRead(byteBuffer, key);
}
}
}
} catch (IOException e) {
logger.warn("selector failed.", e);
} catch (ClosedSelectorException e) {
logger.debug("selector closed.", e);
}
}
}
};
代码示例来源:origin: org.apache.zookeeper/zookeeper
public void run() {
while (!ss.socket().isClosed()) {
try {
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
内容来源于网络,如有侵权,请联系作者删除!