read process standard输出和standard错误在swift中并行,不阻塞

ukxgm1gy  于 2023-11-16  发布在  Swift
关注(0)|答案(2)|浏览(107)

在swift5中,我想运行一个Process()读取standardOutputstandardError而不阻塞,这样我就可以解析它们。
此示例代码一旦调用带有for try await line in errorPipe.fileHandleForReading.bytes.lines的行,程序执行将被阻止。standardOutput读取器停止打印

import Foundation

let outputPipe = Pipe()
let errorPipe = Pipe()

let process = Process()
process.executableURL = URL(fileURLWithPath:"/sbin/ping")
process.arguments = ["google.com"]
process.standardOutput = outputPipe
process.standardError = errorPipe

try? process.run()

func processStdOut() async
{
  for i in 0..<5 {
    print("processStdOut X ", i)
    try? await Task.sleep(nanoseconds: 1_000_000_000)
  }

  do {
    for try await line in outputPipe.fileHandleForReading.bytes.lines {
      print("stdout Line: \(line)")
    }
  } catch {
    NSLog("processStdOut Error \(error.localizedDescription)")
  }
  NSLog("processStdOut finished")

}

func processStdErr() async
{
  for i in 0..<5 {
    print("processStdErr X ", i)
    try? await Task.sleep(nanoseconds: 2_000_000_000)
  }
  do {
    for try await line in errorPipe.fileHandleForReading.bytes.lines {
      print("stderr Line: \(line)")
    }
  } catch {
    NSLog("processStdErr Error \(error.localizedDescription)")
  }
  NSLog("processStdErr finished")
}

await withTaskGroup(of: Void.self) { group in
  group.addTask {
    await processStdErr()
  }
  group.addTask {
    await processStdOut()
  }
  group.addTask {
    process.waitUntilExit()
  }
}

字符串
请注意,如果您通过断开wifi或网络standardOutput来强制数据进入standardError,则会再次解除阻塞。
还有什么我应该尝试的吗?

hjzp0vay

hjzp0vay1#

是的,标准的bytes实现在standardError上同时使用bytes时也会阻塞standardOutput
下面是一个简单的bytes实现,它不会阻塞,因为它利用了readabilityHandler

extension Pipe {
    struct AsyncBytes: AsyncSequence {
        typealias Element = UInt8

        let pipe: Pipe

        func makeAsyncIterator() -> AsyncStream<Element>.Iterator {
            AsyncStream { continuation in
                pipe.fileHandleForReading.readabilityHandler = { @Sendable handle in
                    let data = handle.availableData

                    guard !data.isEmpty else {
                        continuation.finish()
                        return
                    }

                    for byte in data {
                        continuation.yield(byte)
                    }
                }

                continuation.onTermination = { _ in
                    pipe.fileHandleForReading.readabilityHandler = nil
                }
            }.makeAsyncIterator()
        }
    }

    var bytes: AsyncBytes { AsyncBytes(pipe: self) }
}

字符串
因此,以下代码在同时处理standardOutputstandardError时不会遇到相同的问题:

let outputPipe = Pipe()
let errorPipe = Pipe()

let process = Process()
process.executableURL = URL(fileURLWithPath: …)
process.standardOutput = outputPipe
process.standardError = errorPipe

func processStandardOutput() async throws {
    for try await line in outputPipe.bytes.lines {
        …
    }
}

func processStandardError() async throws {
    for try await line in errorPipe.bytes.lines {
        …
    }
}

// optionally, you might want to return whatever non-zero termination status code the process returned

process.terminationHandler = { process in
    if process.terminationStatus != 0 {
        exit(process.terminationStatus)
    }
}

try process.run()

try? await withThrowingTaskGroup(of: Void.self) { group in
    group.addTask {
        try await processStandardOutput()
    }
    
    group.addTask {
        try await processStandardError()
    }
    
    …
}

vxf3dgd4

vxf3dgd42#

大多数程序默认默认缓冲策略,由于你无法控制/sbin/ping如何处理输出,其中一个管道可能会阻塞FileHandle.AsyncBytes的实现(不知道为什么)。我通过调用.availableData来避免阻塞,从而同时使用两个管道。

import Foundation

let outputPipe = Pipe()
let errorPipe = Pipe()

let process = Process()

process.executableURL = URL(fileURLWithPath: "/sbin/ping")
process.arguments = ["-c", "10", "diariosur.es"]
process.standardOutput = outputPipe
process.standardError = errorPipe

try? process.run()

func processStdOut() async {
    print("stdout start")
    
    while process.isRunning {
        let data = outputPipe.fileHandleForReading.availableData
        if !data.isEmpty {
            if let line = String(data: data, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines) {
                print("stdout data: \(line)")
            }
        }
        
    }
    
    print("stdout finished")
}

func processStdErr() async {
    print("stderr start")
    
    while process.isRunning {
        let data = errorPipe.fileHandleForReading.availableData
        if !data.isEmpty {
            if let line = String(data: data, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines) {
                print("stderr data: \(line)")
            }
        }
    }
    
    print("stderr finished")
}
    
    await withTaskGroup(of: Void.self) { group in
        group.addTask {
            await processStdErr()
        }
        
        group.addTask {
            await processStdOut()
        }
    }
    
    process.waitUntilExit()

字符串
然后我得到以下(修剪后的)输出:

stderr start
stdout start
stdout data: PING diariosur.es (23.213.41.6): 56 data bytes
64 bytes from 23.213.41.6: icmp_seq=0 ttl=57 time=7.060 ms
stdout data: 64 bytes from 23.213.41.6: icmp_seq=1 ttl=57 time=6.562 ms
...
stdout data: 64 bytes from 23.213.41.6: icmp_seq=9 ttl=57 time=7.904 ms
stdout data: --- diariosur.es ping statistics ---
10 packets transmitted, 10 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 6.562/7.327/9.439/0.783 ms
stdout finished
stderr finished


我尝试了默认为stderrcurl,它也没有阻塞:

process.executableURL = URL(fileURLWithPath: "/usr/bin/curl")
process.arguments = ["-N", "--output", "test.zsync", "http://ubuntu.mirror.digitalpacific.com.au/releases/23.04/ubuntu-23.04-desktop-amd64.iso.zsync"]


编辑:
使用以下C程序进行测试:

#include <stdio.h>
#include <unistd.h>

int main() {
    for (int i = 1; i <= 100; ++i) {
        fprintf(stdout, "stdout: %d\n", i); 
        fflush(stdout); 
    
        if (i % 10 == 0) {
                fprintf(stderr, "stderr: %d\n", i); 
                fflush(stderr);
        }   
        usleep(100000);
    }   
    
    return 0;
}


它返回以下输出:

stdout start
stderr start
stdout data: stdout: 1
stdout data: stdout: 2
stdout data: stdout: 3
stdout data: stdout: 4
stdout data: stdout: 5
stdout data: stdout: 6
stdout data: stdout: 7
stdout data: stdout: 8
stdout data: stdout: 9
stderr data: stderr: 10
stdout data: stdout: 10
stdout data: stdout: 11
stdout data: stdout: 12
stdout data: stdout: 13
stdout data: stdout: 14
stdout data: stdout: 15
stdout data: stdout: 16
stdout data: stdout: 17
stdout data: stdout: 18
stdout data: stdout: 19
stderr data: stderr: 20

相关问题