Java中命名管道的并发读/写(在Windows上)

xmjla07d  于 2023-05-05  发布在  Java
关注(0)|答案(6)|浏览(303)

我尝试使用命名管道在Windows上的C#应用程序和Java应用程序之间提供通信,方法由v01ver在此问题中描述:How to open a Windows named pipe from Java?
我在Java方面遇到了一个问题,因为我有一个读取器线程一直在等待管道上的输入,当我试图从我的主线程写入管道时,它永远卡住了。

final RandomAccessFile pipe;
try {
   pipe = new RandomAccessFile("\\\\.\\pipe\\mypipe", "rw");
}
catch (FileNotFoundException ex) {
   ex.printStackTrace();
   return;
}

Thread readerThread = new Thread(new Runnable() {
   @Override
   public void run() {
      String line = null;
      try {
         while (null != (line = pipe.readLine())) {
            System.out.println(line);
         }
      }
      catch (IOException ex) {
         ex.printStackTrace();
      }
   }
});
readerThread.start();

try { Thread.sleep(500); } catch (InterruptedException e) {}

try {
   System.out.println("Writing a message...");
   pipe.write("Hello there.\n".getBytes());
   System.out.println("Finished.");
}
catch (IOException ex) {
   ex.printStackTrace();
}

输出为:

Writing a message...

然后它永远等待。
如何在等待另一个线程中的输入时写入命名管道?

evrscar2

evrscar21#

这是管道的预期行为。它应该挂起,直到其他进程连接到管道并读取它。

w8f9ii69

w8f9ii692#

我也遇到了同样的问题--在Windows上使用命名管道在C#/Python应用程序和Java应用程序之间进行通信:
我们有一个用Java编写的客户端代码的例子,但是在String echoResponse = pipe.readLine();行中,线程永远等待。

try {
    // Connect to the pipe
    RandomAccessFile pipe = new RandomAccessFile("\\\\.\\pipe\\testpipe", "rw");
    String echoText = "Hello word\n";
    // write to pipe
    pipe.write ( echoText.getBytes() );
    // read response
    String echoResponse = pipe.readLine();
    System.out.println("Response: " + echoResponse );
    pipe.close();

    } catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }

问题解决方案:我有一个用Python写的ServerPipe代码,从这里Example Code - Named Pipes:并在Python 2.6.6上运行

from ctypes import *

PIPE_ACCESS_DUPLEX = 0x3
PIPE_TYPE_MESSAGE = 0x4
PIPE_READMODE_MESSAGE = 0x2
PIPE_WAIT = 0
PIPE_UNLIMITED_INSTANCES = 255
BUFSIZE = 4096
NMPWAIT_USE_DEFAULT_WAIT = 0
INVALID_HANDLE_VALUE = -1
ERROR_PIPE_CONNECTED = 535

MESSAGE = "Default answer from server\0"
szPipename = "\\\\.\\pipe\\mynamedpipe"

def ReadWrite_ClientPipe_Thread(hPipe):
    chBuf = create_string_buffer(BUFSIZE)
    cbRead = c_ulong(0)
    while 1:
        fSuccess = windll.kernel32.ReadFile(hPipe, chBuf, BUFSIZE,
byref(cbRead), None)
        if ((fSuccess ==1) or (cbRead.value != 0)):
            print chBuf.value
            cbWritten = c_ulong(0)
            fSuccess = windll.kernel32.WriteFile(hPipe,
                                                 c_char_p(MESSAGE),
                                                 len(MESSAGE),
                                                 byref(cbWritten),
                                                 None
                                                )
        else:
            break
        if ( (not fSuccess) or (len(MESSAGE) != cbWritten.value)):
            print "Could not reply to the client's request from the
pipe"
            break
        else:
            print "Number of bytes written:", cbWritten.value

    windll.kernel32.FlushFileBuffers(hPipe)
    windll.kernel32.DisconnectNamedPipe(hPipe)
    windll.kernel32.CloseHandle(hPipe)
    return 0

def main():
    THREADFUNC = CFUNCTYPE(c_int, c_int)
    thread_func = THREADFUNC(ReadWrite_ClientPipe_Thread)
    while 1:
        hPipe = windll.kernel32.CreateNamedPipeA(szPipename,
                                                 PIPE_ACCESS_DUPLEX,
                                                 PIPE_TYPE_MESSAGE |
                                                 PIPE_READMODE_MESSAGE
|
                                                 PIPE_WAIT,

PIPE_UNLIMITED_INSTANCES,
                                                 BUFSIZE, BUFSIZE,

NMPWAIT_USE_DEFAULT_WAIT,
                                                 None
                                                )
        if (hPipe == INVALID_HANDLE_VALUE):
            print "Error in creating Named Pipe"
            return 0

        fConnected = windll.kernel32.ConnectNamedPipe(hPipe, None)
        if ((fConnected == 0) and (windll.kernel32.GetLastError() ==
ERROR_PIPE_CONNECTED)):
            fConnected = 1
        if (fConnected == 1):
            dwThreadId = c_ulong(0)
            hThread = windll.kernel32.CreateThread(None, 0,
thread_func, hPipe, 0, byref(dwThreadId))
            if (hThread == -1):
                print "Create Thread failed"
                return 0
            else:
                windll.kernel32.CloseHandle(hThread)
        else:
            print "Could not connect to the Named Pipe"
            windll.kernel32.CloseHandle(hPipe)
    return 0

if __name__ == "__main__":
    main()

服务器启动后,您可以使用Java客户端代码的稍微修改版本:

try {
    // Connect to the pipe
    RandomAccessFile pipe = new RandomAccessFile("\\\\.\\pipe\\mynamedpipe", "rw");
    String echoText = "Hello world\n";
    // write to pipe
    pipe.write(echoText.getBytes());

    //String aChar;
    StringBuffer fullString = new StringBuffer();

    while(true){
        int charCode = pipe.read();
        if(charCode == 0) break;
        //aChar = new Character((char)charCode).toString();
        fullString.append((char)charCode);
    }

    System.out.println("Response: " + fullString);
    pipe.close();
}
catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

它在NetBeans 6.9.1中运行良好。

mrzz3bfm

mrzz3bfm3#

我认为RandomAccessFile在这里不是正确的API。在Java端尝试FileInputStream + FileOutputStream。但这只是一个猜测,因为我上次使用Windows API时,命名管道还不存在。

esbemjvw

esbemjvw4#

不用担心,使用RandomAccessFile访问一个 named pipe 是正确的。命名管道是文件系统对象。在Linux/Unix下,它也被称为“fifo”。这些对象就像文件一样可读。(与Java Pipe类抽象的进程之间使用的管道不同)。
但是我看到你的程序有两个问题。我不能测试它目前,因为我需要你的测试服务器(随时公布)。你的阅读器线程等待来自另一端的答案(即服务器)。它使用readLine(),我会使用不同的方法(对于调试阅读逐字符读取可能是最好的)。
使用Java(没有JNI),您实际上无法创建命名管道(服务器端)。使用RandomAccessFile使用的泛型方法打开命名管道,您将获得一个字节类型的流,该流可以是单向的,也可以是双向的。
顺便说一句:JTDS(SQL Server的免费JDBC驱动程序)可以选择使用命名管道访问SQL Server,甚至通过网络。它使用的正是RandomAccessFile方法。
BTW 2:在旧的MS SQL Server安装介质上有一个makepipe.exe测试服务器,但是我没有找到一个可信的来源来获取该文件。

luaexgnf

luaexgnf5#

管道是单向的意思,管道只能进行子到父的读操作和父到子的写操作,反之亦然,不能两者都做。为此,需要在C#和Java端使用两个管道,以便它们都执行读写操作。
管道与套接字:https://www.baeldung.com/cs/pipes-vs-sockets

[编辑]

因此,您无法在同一管道上执行读取和写入操作。一些语言如C#提供了双工管道,它可以在同一管道上执行读写操作,但可以肯定的是,这是两个管道“在引擎盖下”,分别用于读写操作。因此,只要是一个抽象,并且您并不真正知道“引擎盖下”发生了什么,您最好将两个管道之间的读写操作分段。另一个可能导致问题的因素是Java不能正式支持管道服务器,为了做到这一点,你需要使用Java Native Access库:RandomAccessFile Java实现使用OS文件系统以便在管道上执行读写操作,因此它具有一些限制,并且因此,如果两个线程正在访问存储器块,则在OS文件系统对象内异步地读写可能导致线程被锁定,并且甚至可能导致存储器损坏(竞争条件)。另一个问题是,当我试图从不同的线程对管道执行两个读操作时,我得到了\\.\pipe\testpipe1 (All pipe instances are busy)异常。这意味着,在您的场景中,您的管道可能很忙碌,并且在执行写操作时被锁定在等待响应状态。由于前面的因素,最好的选择是在管道上执行同步读/写操作,优选地使用两个管道,以便对所执行的读和写操作具有最大的控制。

[C#应用]

using System;
using System.IO;
using System.IO.Pipes;

class C_Sharp_Pipe
{
    static void Main()
    {
        Write_Pipe();
        Read_Pipe();

        Console.ReadLine();
    }

    static async void Write_Pipe()
    {
        using (NamedPipeServerStream pipeServer = new NamedPipeServerStream("testpipe1", PipeDirection.Out))
        {
            try
            {
                pipeServer.WaitForConnection();

                using (StreamWriter sw = new StreamWriter(pipeServer))
                {
                    sw.WriteLine("Hello Java process");
                    Console.WriteLine("Message sent to testpipe1 Java client: " + "\"Hello\"");

                    await sw.FlushAsync();
                }

            }
            catch (Exception e)
            {
                Console.WriteLine("ERROR: " + e.Message);
            }
        }
    }

    static async void Read_Pipe()
    {
        using (NamedPipeServerStream pipeServer = new NamedPipeServerStream("testpipe2", PipeDirection.InOut))
        {
            try
            {
                pipeServer.WaitForConnection();

                using (StreamReader sr = new StreamReader(pipeServer))
                {
                    string received_message = await sr.ReadLineAsync();
                    Console.WriteLine("Message received from Java testpipe2 server: " + received_message);

                }
            }
            catch (Exception e)
            {
                Console.WriteLine("ERROR: " + e.Message);
            }
        }

    }
}

[Java应用程序]

package com.algorithms;

import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class Main {

    public static void main(String[] args)
    {
        Read_Pipe();
        Write_Pipe();

        Scanner s = new Scanner(System.in);
        s.nextLine();
    }

    private static void Read_Pipe()
    {
        try
        {
            RandomAccessFile pipe = new RandomAccessFile("\\\\.\\pipe\\testpipe1", "r");
            byte[] buffer = new byte[1024];

            pipe.read(buffer);

            System.out.println("Response from C# testpipe1 pipe server: " + new String(buffer, StandardCharsets.UTF_8) );
            pipe.close();
        }
        catch (Exception e)
        {
            System.out.println(e.getMessage());
        }
    }


    private static void Write_Pipe()
    {
        try
        {
            RandomAccessFile pipe = new RandomAccessFile("\\\\.\\pipe\\testpipe2", "rw");
            String buffer = "Hello C# process";

            pipe.write(buffer.getBytes(StandardCharsets.UTF_8));

            System.out.println("Message sent to C# testpipe2 pipe client: " + buffer);
            pipe.close();
        }
        catch (Exception e)
        {
            System.out.println(e.getMessage());
        }
    }


}

[ C#进程间通信结果]

[Java进程间通信结果]

emeijp43

emeijp436#

我对JAVA不太熟悉,我的C#也很初级。然而,我在一个多线程C++客户端上遇到了类似的问题,我通过打开重叠IO的管道来解决这个问题。在我这样做之前,Windows串行化了读取和写入,有效地导致了一个不满意(阻塞)的ReadFile,以防止后续WriteFile的完成,直到读取完成。
参见CreateFile function
文件_标志_重叠

相关问题