我想在java中创建一个由任务组成的dag,其中的任务可能依赖于其他任务的输出。如果两个任务之间没有定向路径,则它们可以并行运行。任务可能被取消。如果任何任务引发异常,则所有任务都将取消。
我想用 CompleteableFuture
为此,尽管实施了 Future
接口(包括 Future.cancel(boolean)
, CompletableFuture
不支持取消-- CompletableFuture.cancel(true)
只是被忽略了(有人知道为什么吗?)
因此,我求助于使用 Future
. 这是一大堆陈词滥调,而且很复杂。还有比这更好的方法吗?
举个例子:
我想打电话 Process process = Runtime.getRuntime().exec(cmd)
要启动命令行进程,请创建 Future<Process>
. 然后我想启动(扇出到)三个子任务:
一个任务消耗来自 process.getInputStream()
一个任务消耗来自 process.getErrorStream()
一项任务 process.waitFor()
,然后等待结果。
然后我要等待所有三个已启动的子任务完成(即扇入/a完成屏障)。这应该在期末考试中收集 Future<Integer> exitCode
它收集 process.waitFor()
任务。两个输入使用者任务只返回 Void
,因此可以忽略它们的输出,但完成屏障仍应等待它们的完成。
我希望任何已启动的子任务中的失败都会导致取消所有子任务,并销毁底层进程。
请注意 Process process = Runtime.getRuntime().exec(cmd)
在第一步中,可以抛出一个异常,这将导致失败一路级联到 exitCode
.
@FunctionalInterface
public static interface ConsumerThrowingIOException<T> {
public void accept(T val) throws IOException;
}
public static Future<Integer> exec(
ConsumerThrowingIOException<InputStream> stdoutConsumer,
ConsumerThrowingIOException<InputStream> stderrConsumer,
String... cmd) {
Future<Process> processFuture = executor.submit(
() -> Runtime.getRuntime().exec(cmd));
AtomicReference<Future<Void>> stdoutProcessorFuture = //
new AtomicReference<>();
AtomicReference<Future<Void>> stderrProcessorFuture = //
new AtomicReference<>();
AtomicReference<Future<Integer>> exitCodeFuture = //
new AtomicReference<>();
Runnable cancel = () -> {
try {
processFuture.get().destroy();
} catch (Exception e) {
// Ignore (exitCodeFuture.get() will still detect exceptions)
}
if (stdoutProcessorFuture.get() != null) {
stdoutProcessorFuture.get().cancel(true);
}
if (stderrProcessorFuture.get() != null) {
stderrProcessorFuture.get().cancel(true);
}
if (exitCodeFuture.get() != null) {
stderrProcessorFuture.get().cancel(true);
}
};
if (stdoutConsumer != null) {
stdoutProcessorFuture.set(executor.submit(() -> {
try {
InputStream inputStream = processFuture.get()
.getInputStream();
stdoutConsumer.accept(inputStream != null
? inputStream
: new ByteArrayInputStream(new byte[0]));
return null;
} catch (Exception e) {
cancel.run();
throw e;
}
}));
}
if (stderrConsumer != null) {
stderrProcessorFuture.set(executor.submit(() -> {
try {
InputStream errorStream = processFuture.get()
.getErrorStream();
stderrConsumer.accept(errorStream != null
? errorStream
: new ByteArrayInputStream(new byte[0]));
return null;
} catch (Exception e) {
cancel.run();
throw e;
}
}));
}
exitCodeFuture.set(executor.submit(() -> {
try {
return processFuture.get().waitFor();
} catch (Exception e) {
cancel.run();
throw e;
}
}));
// Async completion barrier -- wait for process to exit,
// and for output processors to complete
return executor.submit(() -> {
Exception exception = null;
int exitCode = 1;
try {
exitCode = exitCodeFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
exception = e;
}
if (stderrProcessorFuture.get() != null) {
try {
stderrProcessorFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
if (exception == null) {
exception = e;
} else if (e instanceof ExecutionException) {
exception.addSuppressed(e);
}
}
}
if (stdoutProcessorFuture.get() != null) {
try {
stdoutProcessorFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
if (exception == null) {
exception = e;
} else if (e instanceof ExecutionException) {
exception.addSuppressed(e);
}
}
}
if (exception != null) {
throw exception;
} else {
return exitCode;
}
});
}
注:我知道 Runtime.getRuntime().exec(cmd)
应该是非阻塞的,所以不需要自己的 Future
,但我还是用一个来编写代码,以说明dag构造的意义。
1条答案
按热度按时间ercv8c1e1#
不可能。进程没有异步接口(process.onexit()除外)。因此,您必须使用线程来等待进程的创建,并在从inputstreams读取数据时使用线程。dag的其他组件可以是异步任务(completablefutures)。
这不是什么大问题。异步任务相对于线程的唯一优点是占用的内存更少。不管怎样,你的过程消耗了很多内存,所以在这里保存内存没有什么意义。