本文整理了Java中java.nio.channels.Pipe
类的一些代码示例,展示了Pipe
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Pipe
类的具体详情如下:
包路径:java.nio.channels.Pipe
类名称:Pipe
[英]A pipe contains two channels, forming a unidirectional pipe. One is the writable sink channel, and the other is the readable source channel. When bytes are written into the writable channel they can be read from the readable channel. Bytes are read in the order in which they were written.
[中]管道包含两个通道,形成单向管道。一个是可写的接收通道,另一个是可读的源通道。当字节写入可写通道时,可以从可读通道读取字节。字节按写入顺序读取。
代码示例来源:origin: wildfly/wildfly
public ChannelPipe<StreamConnection, StreamConnection> createFullDuplexPipeConnection(XnioIoFactory peer) throws IOException {
getWorker().checkShutdown();
boolean ok = false;
final Pipe topPipe = Pipe.open();
try {
topPipe.source().configureBlocking(false);
topPipe.sink().configureBlocking(false);
final Pipe bottomPipe = Pipe.open();
try {
bottomPipe.source().configureBlocking(false);
bottomPipe.sink().configureBlocking(false);
final WorkerThread peerThread = getPeerThread(peer);
final SelectionKey topSourceKey = registerChannel(topPipe.source());
final SelectionKey topSinkKey = peerThread.registerChannel(topPipe.sink());
final SelectionKey bottomSourceKey = peerThread.registerChannel(bottomPipe.source());
final SelectionKey bottomSinkKey = registerChannel(bottomPipe.sink());
final NioPipeStreamConnection leftConnection = new NioPipeStreamConnection(this, bottomSourceKey, topSinkKey);
final NioPipeStreamConnection rightConnection = new NioPipeStreamConnection(this, topSourceKey, bottomSinkKey);
} finally {
if (! ok) {
safeClose(bottomPipe.sink());
safeClose(bottomPipe.source());
safeClose(topPipe.sink());
safeClose(topPipe.source());
代码示例来源:origin: jenkinsci/remoting
/**
* The behaviour of a {@link Pipe} with regard to propagation of the close is counter-intuitive. It doesn't
* propagate the close status to the other side, rather if the {@link Pipe#sink()} is closed, the
* {@link Pipe#source()} will just return {@literal -1} from the {@link Pipe.SourceChannel#read(ByteBuffer)}.
*/
@Test
public void pipeCloseSink() throws Exception {
Pipe pipe = Pipe.open();
assertThat(pipe.source().isOpen(), is(true));
assertThat(pipe.sink().isOpen(), is(true));
pipe.sink().close();
assertThat(pipe.sink().isOpen(), is(false));
assertThat(pipe.source().isOpen(), is(true));
ByteBuffer buffer = ByteBuffer.allocate(1);
assertThat(pipe.source().read(buffer), is(-1));
assertThat("No data read", buffer.remaining(), is(1));
assertThat("No detection of sink closed", pipe.source().isOpen(), is(true));
}
代码示例来源:origin: org.jruby/jruby-complete
if (pendingReads == null && unselectableReads == null && unselectableWrites == null) {
if (has_timeout && timeout == 0) {
for (Selector selector : selectors.values()) selector.selectNow();
} else {
List<Future> futures = new ArrayList<Future>(enxioSelectors.size());
mainSelector.select(has_timeout ? timeout : 0);
for (ENXIOSelector enxioSelector : enxioSelectors) enxioSelector.selector.wakeup();
Pipe.SourceChannel source = enxioSelector.pipe.source();
SelectionKey key = source.keyFor(mainSelector);
if (key != null && mainSelector.selectedKeys().contains(key)) {
mainSelector.selectedKeys().remove(key);
ByteBuffer buf = ByteBuffer.allocate(1);
source.read(buf);
代码示例来源:origin: jenkinsci/remoting
@Theory
public void serverRefuses(NetworkLayerFactory serverFactory, NetworkLayerFactory clientFactory) throws Exception {
ProtocolStack<IOBufferMatcher> client =
ProtocolStack
.on(clientFactory.create(selector.hub(), serverToClient.source(), clientToServer.sink()))
.filter(new ConnectionHeadersFilterLayer(Collections.<String, String>emptyMap(),
new ConnectionHeadersFilterLayer.Listener() {
.on(serverFactory.create(selector.hub(), clientToServer.source(), serverToClient.sink()))
.filter(new ConnectionHeadersFilterLayer(Collections.<String, String>emptyMap(),
new ConnectionHeadersFilterLayer.Listener() {
allOf(instanceOf(ConnectionRefusalException.class), not(instanceOf(PermanentConnectionRefusalException.class)))
);
server.get().awaitClose();
代码示例来源:origin: jenkinsci/remoting
@Theory
public void serverRejectsClient(NetworkLayerFactory serverFactory, NetworkLayerFactory clientFactory) throws Exception {
Logger.getLogger(name.getMethodName()).log(
clientFactory.create(selector.hub(), serverToClient.source(), clientToServer.sink()))
.filter(new SSLEngineFilterLayer(clientEngine, null))
.build(new IOBufferMatcherLayer());
serverFactory.create(selector.hub(), clientToServer.source(), serverToClient.sink()))
.filter(new SSLEngineFilterLayer(serverEngine, new SSLEngineFilterLayer.Listener() {
@Override
Logger.getLogger(name.getMethodName()).log(Level.INFO, "Waiting for server close");
serverMatcher.awaitClose();
assertThat(clientMatcher.getCloseCause(), instanceOf(ClosedChannelException.class));
assertThat(serverMatcher.getCloseCause(), instanceOf(ConnectionRefusalException.class));
Logger.getLogger(name.getMethodName()).log(Level.INFO, "Done");
代码示例来源:origin: org.jruby/jruby-complete
public Object call() throws Exception {
try {
selector.select();
} finally {
ByteBuffer buf = ByteBuffer.allocate(1);
buf.put((byte) 0);
buf.flip();
pipe.sink().write(buf);
}
return null;
}
}
代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java
@Test
public void testPackageUploadWithEmptyDirectorySucceeds() throws Exception {
Pipe pipe = Pipe.open();
File tmpDirectory = tmpFolder.newFolder("folder");
when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
.thenReturn(
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
List<DataflowPackage> targets =
defaultPackageUtil.stageClasspathElements(
ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions);
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
verifyNoMoreInteractions(mockGcsUtil);
assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar"));
assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
try (ZipInputStream zipInputStream =
new ZipInputStream(Channels.newInputStream(pipe.source()))) {
assertNull(zipInputStream.getNextEntry());
}
}
代码示例来源:origin: org.jruby/jruby-core
private void tidyUp() throws IOException {
selector.close(); // close unregisters all channels, so we can safely reset blocking modes
enxioSelector.pipe.sink().close();
enxioSelector.pipe.source().close();
代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java
@Test
public void testPackageUploadWithExplicitPackageName() throws Exception {
Pipe pipe = Pipe.open();
File tmpFile = makeFileWithContents("file.txt", "This is a test!");
final String overriddenName = "alias.txt";
when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
.thenReturn(
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
List<DataflowPackage> targets =
defaultPackageUtil.stageClasspathElements(
ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()),
STAGING_PATH,
createOptions);
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
verifyNoMoreInteractions(mockGcsUtil);
assertThat(target.getName(), equalTo(overriddenName));
assertThat(
target.getLocation(), RegexMatcher.matches(STAGING_PATH + "file-" + HASH_PATTERN + ".txt"));
}
代码示例来源:origin: jenkinsci/remoting
@Theory
public void tooBigHeader(NetworkLayerFactory serverFactory, NetworkLayerFactory clientFactory) throws Exception {
final SettableFuture<Map<String, String>> serverActualHeaders = SettableFuture.create();
Map<String, String> clientExpectedHeaders = new HashMap<String, String>(64);
StringBuilder bigString = new StringBuilder(8*128);
for (int i = 0; i < 128; i++) {
bigString.append("Too Big!");
}
for (int i = 0; i < 64; i++) {
clientExpectedHeaders.put(String.format("key-%d", i), bigString.toString());
}
try {
ProtocolStack
.on(clientFactory.create(selector.hub(), serverToClient.source(), clientToServer.sink()))
.filter(new ConnectionHeadersFilterLayer(clientExpectedHeaders,
new ConnectionHeadersFilterLayer.Listener() {
@Override
public void onReceiveHeaders(Map<String, String> headers)
throws ConnectionRefusalException {
serverActualHeaders.set(headers);
}
}))
.build(new IOBufferMatcherLayer());
fail("IllegalArgumentException expected");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("less than 65536"));
}
}
代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java
@Test
public void testStagingPreservesClasspath() throws Exception {
File smallFile = makeFileWithContents("small.txt", "small");
File largeFile = makeFileWithContents("large.txt", "large contents");
when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
.thenReturn(
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString()))
.thenAnswer(invocation -> Pipe.open().sink());
List<DataflowPackage> targets =
defaultPackageUtil.stageClasspathElements(
ImmutableList.of(smallFile.getAbsolutePath(), largeFile.getAbsolutePath()),
STAGING_PATH,
createOptions);
// Verify that the packages are returned small, then large, matching input order even though
// the large file would be uploaded first.
assertThat(targets.get(0).getName(), startsWith("small"));
assertThat(targets.get(1).getName(), startsWith("large"));
}
代码示例来源:origin: org.jruby/jruby-complete
public void cleanup() throws IOException {
pipe.sink().close();
pipe.source().close();
}
}
代码示例来源:origin: org.jruby/jruby-core
private Selector getSelector(ThreadContext context, SelectableChannel channel) throws IOException {
Selector selector = selectors.get(channel.provider());
if (selector == null) {
selector = SelectorFactory.openWithRetryFrom(context.runtime, channel.provider());
if (selectors.isEmpty()) {
selectors = new HashMap<SelectorProvider, Selector>();
}
selectors.put(channel.provider(), selector);
if (!selector.provider().equals(SelectorProvider.provider())) {
// need to create pipe between alt impl selector and native NIO selector
Pipe pipe = Pipe.open();
ENXIOSelector enxioSelector = new ENXIOSelector(selector, pipe);
if (enxioSelectors.isEmpty()) enxioSelectors = new ArrayList<ENXIOSelector>();
enxioSelectors.add(enxioSelector);
pipe.source().configureBlocking(false);
pipe.source().register(getSelector(context, pipe.source()), SelectionKey.OP_READ, enxioSelector);
} else if (mainSelector == null) {
mainSelector = selector;
}
}
return selector;
}
代码示例来源:origin: jenkinsci/remoting
@Theory
public void sendingBiggerAndBiggerBatches(NetworkLayerFactory serverFactory, NetworkLayerFactory clientFactory,
BatchSendBufferingFilterLayer batch)
.on(clientFactory.create(selector.hub(), serverToClient.source(), clientToServer.sink()))
.filter(new SSLEngineFilterLayer(clientEngine, null))
.build(new IOBufferMatcherLayer());
.on(serverFactory.create(selector.hub(), clientToServer.source(), serverToClient.sink()))
.filter(new SSLEngineFilterLayer(serverEngine, null))
.filter(batch)
代码示例来源:origin: org.terracotta/terracotta-l1-ee
public PipeSocket(Socket socket) throws IOException {
this.socket = socket;
this.inputPipe = Pipe.open();
this.outputPipe = Pipe.open();
this.outputPipe.source().configureBlocking(false);
}
代码示例来源:origin: org.apache.qpid/proton-j
@Override
public Pipe pipe() throws IOException {
return Pipe.open();
}
代码示例来源:origin: org.apache.qpid/proton-j
@Override
public void run(Selectable selectable) {
try {
wakeup.source().read(ByteBuffer.allocate(64));
} catch (IOException e) {
throw new RuntimeException(e);
}
expireSelectable(selectable);
}
代码示例来源:origin: Refinitiv/Elektron-SDK
void pipeWrite() throws IOException
{
if (_pipeWriteCount.incrementAndGet() == 1)
_pipe.sink().write(ByteBuffer.wrap(_pipeWriteByte));
}
代码示例来源:origin: jenkinsci/remoting
/**
* The behaviour of a {@link Pipe} with regard to propagation of the close is counter-intuitive. It doesn't
* propagate the close status to the other side, rather if the {@link Pipe#source()} is closed, the
* {@link Pipe#sink()} will just throw an {@link IOException} with {@literal Broken pipe} as the message.
*/
@Test
public void pipeCloseSource() throws Exception {
Pipe pipe = Pipe.open();
assertThat(pipe.source().isOpen(), is(true));
assertThat(pipe.sink().isOpen(), is(true));
pipe.source().close();
assertThat(pipe.source().isOpen(), is(false));
assertThat(pipe.sink().isOpen(), is(true));
try {
pipe.sink().write(ByteBuffer.allocate(1));
} catch (IOException e) {
assertThat(e.getMessage(), containsString("Broken pipe"));
}
assertThat("No detection of source closed", pipe.sink().isOpen(), is(true));
Thread.sleep(1000);
assertThat("No detection of source closed", pipe.sink().isOpen(), is(true));
}
代码示例来源:origin: org.jruby/jruby-core
if (pendingReads == null && unselectableReads == null && unselectableWrites == null) {
if (has_timeout && timeout == 0) {
for (Selector selector : selectors.values()) selector.selectNow();
} else {
List<Future> futures = new ArrayList<Future>(enxioSelectors.size());
mainSelector.select(has_timeout ? timeout : 0);
for (ENXIOSelector enxioSelector : enxioSelectors) enxioSelector.selector.wakeup();
Pipe.SourceChannel source = enxioSelector.pipe.source();
SelectionKey key = source.keyFor(mainSelector);
if (key != null && mainSelector.selectedKeys().contains(key)) {
mainSelector.selectedKeys().remove(key);
ByteBuffer buf = ByteBuffer.allocate(1);
source.read(buf);
内容来源于网络,如有侵权,请联系作者删除!