Dart -使长时间运行的同步函数异步

nwnhqdif  于 2023-03-27  发布在  其他
关注(0)|答案(3)|浏览(129)

我有一个可能需要几秒钟才能执行的函数,并且它是同步的。

String slowFunction() { ... }
...
Future<String>(() => slowFunction());

是否改为异步?
如果我在下一步中需要它的结果,这段代码有意义吗?

Future<void> anotherFunction() async {
  // other async calls with Futures and await
  ...
  final result = await Future<String>(() => slowFunction());
  print(result);
  ...
  // do something else with result
}

创建一个Future,然后立即在其上执行await,这似乎有点奇怪。我应该直接调用该函数吗?我猜它有点“yields”,并允许其他代码在此之前执行,但这样的代码有任何用处吗?

piah890a

piah890a1#

将一个固有的同步进程伪装成异步进程是没有意义的。(更一般地称为“并发”)工作,不仅仅是在Dart中,而是在一般情况下。并发只是一种编程技巧,使多个操作在同一线程中相互交错运行,给人一种真正并行的错觉(不同的线程或进程同时运行)。这允许那些在等待资源时通常会阻塞的进程推迟到以后,因为程序继续处理其他事情。
如果你想让一个同步进程阻塞 * 由于工作正在被积极地完成 *,要么程序无论如何都会阻塞,因为“异步”代码的执行就像它本来会做的那样,要么程序会阻塞一样长的时间,但在以后的时间。无论哪种方式,你仍然是用一个长时间运行的进程阻塞你的程序。
举个例子,这就是你要问的:获取一个长时间运行同步工作负载并将其 Package 在Future中,从而使其成为“异步”:

Future<String> slowFunction() async { 
  // Simulate a long-running synchronous operation
  int counter = 0;
  for (int i = 0; i < 10000000000; i++) {
    counter++;
  }

  // Return the "asynchronous" result
  return counter.toString();
}

...

String result = await slowFunction();

在正常的并发情况下,这将把slowFunction放入异步队列。下一次程序有一些停机时间(例如在UI绘制调用之间),它将把该函数从队列中拉出来并处理它。并且 * 这是 * 当函数执行时,它将阻塞多少秒。
slowFunction可能被标记为async,但因为它实际上没有await任何东西,Dart无论如何都会尝试同步运行它,从而破坏了整个目的。您不必首先将其 Package 在Future中。
如果你想分解同步函数的操作,你有两个选择。要么你必须把它分解成不同的操作,你可以在它们之间进行await(这本身是一个有点复杂的过程,并不总是可行的,通常是一个很好的代码气味来源),要么你把函数完全卸载到一个单独的线程,使用 parallelism 而不仅仅是 concurrency
Dart是单线程的,但可以通过使用隔离来进行多线程处理。(isolate是Dart对子进程的命名,它与Dart中的真正多线程非常接近。)通过将函数 Package 在Isolate中,您可以在一个完全独立的进程上运行工作。这样,即使该进程阻塞2-3秒,也不会影响应用的大部分。
不过,这里有一个问题。因为隔离菌株是完全不同的进程,所以没有任何内存共享。这意味着隔离菌株可以访问的任何数据都必须通过使用“端口”(即SendPortReceivePort)手动传入。这自然会使隔离菌株编程有点痛苦,但作为交换,你就不会遇到像你的程序有争用条件或陷入死锁这样的事情。(至少是因为共享内存问题。严格地说,有很多其他方法可以得到死锁和争用条件。)
使用Isolate的工作方式如下:

// main process

void createIsolate() async {
  ReceivePort isolateToMain = ReceivePort();

  isolateToMain.listen((data) {
    // Listen for data passed back to the main process
  });

  Isolate myIsolateInstance = await Isolate.spawn(myIsolate, isolateToMain.sendPort);
}

// isolate process

void myIsolate(SendPort mainToIsolate) {
  final result = slowFunction();
  mainToIsolate.send(result);
}
kgsdhlau

kgsdhlau2#

我有一个可能需要几秒钟才能执行的函数,并且它是同步的。

String slowFunction() { ... }
...
Future<String>(() => slowFunction());

是否改为异步?
简单地返回一个Future并不能使你的函数以你可能想要的方式异步。
调用异步函数时,将尽可能以“同步”方式执行该函数。也就是说,该函数将同步执行,直到到达第一个await由于Dart isolate是单线程的,因此如果您希望其他工作能够与长时间运行的操作并发发生,slowFunction在内部需要使用await(这是用于创建Future.then()回调的语法糖)来允许执行产生。
请看下面的代码:

Future<void> longRunningOperation1() async {
  for (var i = 0; i < 100000000; i += 1) {
    if (i % 10000000 == 0) {
      print('longRunningOperation1: $i');
    }
  }
}

Future<void> longRunningOperation2() async {
  for (var i = 0; i < 100000000; i += 1) {
    if (i % 10000000 == 0) {
      print('longRunningOperation2: $i');
    }
  }
}

Future<void> main() async {
  await Future.wait([longRunningOperation1(), longRunningOperation2()]);
}

您将看到longRunningOperation1longRunningOperation2从不重叠;一个总是在另一个开始之前运行到完成。为了允许操作以最小的更改重叠,您可以这样做:

Future<void> longRunningOperation1() async {
  for (var i = 0; i < 100000000; i += 1) {
    if (i % 10000000 == 0) {
      print('longRunningOperation1: $i');
      await Future.delayed(Duration.zero);
    }
  }
}

Future<void> longRunningOperation2() async {
  for (var i = 0; i < 100000000; i += 1) {
    if (i % 10000000 == 0) {
      print('longRunningOperation2: $i');
      await Future.delayed(Duration.zero);
    }
  }
}
yrefmtwq

yrefmtwq3#

我使用一个 Package 器将慢速操作生成到一个单独的Isolate中并返回一个Future。它还允许传递要运行的函数和一些参数。

import 'dart:async';
import 'dart:isolate';

/// Example
///
/// ```
/// main() async {
///   String str;
///   str = await runAsync<String, String Function(String)>(sing, ["lalalala"]);
///   print(str);
///
///   str = await runAsync<String, Function>(song);
///   print(str);
/// }
/// 
/// String sing(String str) => "Singing: " + str;
/// String song() => "lololololo";
/// ```

Future<R> runAsync<R, F>(F func, [List<dynamic> parameters]) async {
  final receivePort = ReceivePort();
  await Isolate.spawn(asyncRunner, receivePort.sendPort);

  // The 'asyncRunner' isolate sends it's SendPort as the first message
  final sendPort = await receivePort.first;

  final responsePort = ReceivePort();
  sendPort.send([responsePort.sendPort, func, parameters ?? []]);
  final res = await responsePort.first;
  if (res is! R)
    return Future.error(res);
  else if (res == null) return null;
  return res as R;
}

// Isolate entry point
void asyncRunner(SendPort sendPort) async {
  // Open the ReceivePort for incoming messages
  final port = ReceivePort();

  // Notify our creator the port we listen to
  sendPort.send(port.sendPort);

  final msg = await port.first;

  // Execute
  final SendPort replyTo = msg[0];
  final Function myFunc = msg[1];
  final List<dynamic> parameters = msg[2] ?? [];

  try {
    switch (parameters.length) {
      case 0:
        replyTo.send(myFunc());
        break;
      case 1:
        replyTo.send(myFunc(parameters[0]));
        break;
      case 2:
        replyTo.send(myFunc(parameters[0], parameters[1]));
        break;
      case 3:
        replyTo.send(myFunc(parameters[0], parameters[1], parameters[2]));
        break;
      case 4:
        replyTo.send(
            myFunc(parameters[0], parameters[1], parameters[2], parameters[3]));
        break;
      case 5:
        replyTo.send(myFunc(parameters[0], parameters[1], parameters[2],
            parameters[3], parameters[4]));
        break;
      default:
        replyTo.send(Exception("Unsupported argument length"));
    }
  } catch (err) {
    replyTo.send(Exception(err.toString()));
  }

  // Done
  port.close();
  Isolate.current.kill();
}

https://github.com/vocdoni/dvote-dart/blob/main/lib/util/asyncify.dart#L16

相关问题