我需要Dart中的互斥锁

toiithl6  于 2023-01-28  发布在  其他
关注(0)|答案(2)|浏览(337)

我有一个应用程序,使用了大量的socketio请求,我做批量请求。当接收数据时,我必须迭代通过一个列表来添加和删除项目:

List carlist;

void receiveData(Map data) {
  // Need a lock here
  for (var i = 0; i < carlist.lenght;) {
    if (data[carlist[i]['name']]['color'] == carlist[i]['color']) {
      carlist.removeAt(i);
      onDeleteCar(data); // Update the UI.
    }
  }
  // Need a release lock here
}

因为我需要修改列表,所以没有互斥锁就无法工作。
有人有主意吗?

2ic8powd

2ic8powd1#

dart没有并发线程,因此一个简单的布尔变量就可以作为互斥体工作。
如果您需要另一个异步执行来等待锁被释放并继续执行,而不是仅仅返回,那么它会变得有点复杂。

bool mutex = false;

void receiveData(Map data) {
  // Need a lock here
  if(mutex) {
    return;
  } else {
    mutex = true;
  }

  for (var i = 0; i < carlist.lenght;) {
    if (data[carlist[i]['name']]['color'] == carlist[i]['color']) {
      carlist.removeAt(i);
      onDeleteCar(data); // Update the UI.
    }
  }
  // Need a release lock here
  mutex = false;
}

如果您的方法是从一个事件处理程序调用的,该事件处理程序会非常快地生成大量事件,并且您希望处理每个执行,但仍然只能一个接一个地处理,则可以将调用排队,如下所示:

import 'dart:async' as async;
import 'dart:collection' as coll;
import 'dart:math' as math;

void main() {
  int j = 0;
  int delay = 0;
  for(int i = 0; i < 100; i++) {
    new async.Future.delayed(
        new Duration(milliseconds: delay += rnd.nextInt(150)),
        () => receiveData({'${j++}': j}, j)
    ).catchError((e) => print('Error: $e'));
  }
}

//List carlist = [];

WorkQueue workQueue = new WorkQueue(timeout: new Duration(milliseconds: 270));
math.Random rnd = new math.Random();

// had to made this async to make the timeout functionality work
// otherwise Dart would have evaluated the timeouts only after all other
// code has already been finished
async.Future receiveData(Map data, int id) {

  // ensure that the code passed to add isn't entered by another async thread
  // while it is already executed
  // id is just for debugging purposes
  return workQueue.add(id, () {

    async.Completer completer = new async.Completer();
    print('$id start');
//    for (var i = 0; i < carlist.length;) {
//      carlist.add(data);
//      if (data[carlist[i]['name']]['color'] == carlist[i]['color']) {
//        carlist.removeAt(i);
//        onDeleteCar(data); // Update the UI.
//      }
//    }

    // dummy task to burn time
    new async.Future.delayed(new Duration(milliseconds: rnd.nextInt(100)), () {
      var val = rnd.nextDouble();
      // add some fun - check if code that throws is handled correctly
      if(val > 0.9) {
        completer.completeError('artifical error ${id}');
        print('$id end with error');
      } else {
        completer.complete();
        print('$id end');
      }
    });

    return completer.future;
  });
  // Need a release lock here
}

// just holds a completer and a closure
class Task {
  // complete after f was executed
  async.Completer completer = new async.Completer();
  // this code should only be entered by one thread at a time
  Function f;
  // only for debugging purposes
  int id;
  // ignore timeout if f has already been invoked
  bool isInvoked = false;

  Task(this.id, this.f, Duration timeout){
    if(timeout != null) {
      completer.future.timeout(timeout, onTimeout: () {
        if(!completer.isCompleted && !isInvoked) {
          completer.completeError('${id} timed out');
        }
      })
      // future.timeout creates a new Future which also throws when
      // the completer is completed with completeError
      // not handling this error ends the app with unhandled exception
      .catchError((_) {});
    }
  }
}

class WorkQueue {
  // enque all calls
  coll.Queue q = new coll.Queue();
  // currently executing?
  bool isExecuting = false;

  // throw when the execution is delayed longer than the provide timeout
  Duration timeout;

  WorkQueue({this.timeout});

  // enqueue a new execution
  async.Future add(int id, Function f) {
    print('add $id - queue length: ${q.length}');
    var t = new Task(id, f, timeout);
    q.add(t);
    // ensure that the queue is processed
    new async.Future(release);
    return t.completer.future;
  }

  // execute next waiting thread if any
  void release() {
    // do nothing if closure is currently being executed or queue is empty
    if(!isExecuting && !q.isEmpty) {
      isExecuting = true;
      Task t = q.removeFirst();
      // check if t hasn't alredy timed out
      if(!t.completer.isCompleted) {
        // ignore timeout because we are already being invoked
        t.isInvoked = true;
        // invoke the closure
        new async.Future(t.f)
        // handle errors in closure
        .catchError((e) {
          t.completer.completeError(e);
        })
        // process next Task in queue
        .then((_) {
          isExecuting = false;
          new async.Future(release);
          if(!t.completer.isCompleted) {
            t.completer.complete();
          }
        });
      }
    }
  }
}

示例输出

add 1 - queue length: 0
1 start
1 end
add 2 - queue length: 0
2 start
add 3 - queue length: 0
2 end
3 start
3 end
add 4 - queue length: 0
4 start
4 end with error
Error: artifical error 4
add 5 - queue length: 0
5 start
5 end
add 6 - queue length: 0
6 start
add 7 - queue length: 0
6 end
7 start
7 end
add 8 - queue length: 0
8 start
add 9 - queue length: 0
add 10 - queue length: 1
8 end with error
Error: artifical error 8
9 start
9 end
10 start
add 11 - queue length: 0
10 end
11 start
add 12 - queue length: 0
add 13 - queue length: 1
11 end
12 start
12 end
13 start
13 end
add 14 - queue length: 0
14 start
14 end
add 15 - queue length: 0
15 start
add 16 - queue length: 0
15 end
16 start
16 end with error
Error: artifical error 16
add 17 - queue length: 0
17 start
17 end
add 18 - queue length: 0
18 start
18 end
add 19 - queue length: 0
19 start
add 20 - queue length: 0
19 end
20 start
add 21 - queue length: 0
20 end
21 start
add 22 - queue length: 0
21 end
22 start
22 end
add 23 - queue length: 0
23 start
23 end
add 24 - queue length: 0
24 start
add 25 - queue length: 0
add 26 - queue length: 1
add 27 - queue length: 2
24 end with error
Error: artifical error 24
25 start
add 28 - queue length: 2
add 29 - queue length: 3
25 end
26 start
add 30 - queue length: 3
26 end
27 start
27 end
28 start
add 31 - queue length: 2
28 end
29 start
29 end
30 start
30 end
31 start
add 32 - queue length: 0
31 end
32 start
32 end
add 33 - queue length: 0
33 start
33 end
add 34 - queue length: 0
34 start
34 end
add 35 - queue length: 0
35 start
35 end
add 36 - queue length: 0
36 start
add 37 - queue length: 0
36 end
37 start
add 38 - queue length: 0
37 end
38 start
add 39 - queue length: 0
38 end with error
Error: artifical error 38
39 start
39 end with error
Error: artifical error 39
add 40 - queue length: 0
40 start
add 41 - queue length: 0
40 end
41 start
41 end with error
Error: artifical error 41
add 42 - queue length: 0
42 start
add 43 - queue length: 0
add 44 - queue length: 1
add 45 - queue length: 2
42 end with error
Error: artifical error 42
43 start
add 46 - queue length: 2
43 end
44 start
add 47 - queue length: 2
add 48 - queue length: 3
add 49 - queue length: 4
44 end
45 start
add 50 - queue length: 4
45 end with error
Error: artifical error 45
46 start
46 end
47 start
add 51 - queue length: 3
47 end
48 start
Error: 49 timed out
48 end
add 52 - queue length: 2
Error: 50 timed out
add 53 - queue length: 3
add 54 - queue length: 4
add 55 - queue length: 5
Error: 51 timed out
add 56 - queue length: 6
Error: 52 timed out
add 57 - queue length: 7
Error: 53 timed out
Error: 54 timed out
add 58 - queue length: 8
Error: 55 timed out
add 59 - queue length: 9
Error: 56 timed out
Error: 57 timed out
add 60 - queue length: 10
Error: 58 timed out
add 61 - queue length: 11
Error: 59 timed out
add 62 - queue length: 12
add 63 - queue length: 13
Error: 60 timed out
add 64 - queue length: 14
Error: 61 timed out
Error: 62 timed out
add 65 - queue length: 15
Error: 63 timed out
add 66 - queue length: 16
Error: 64 timed out
add 67 - queue length: 17
Error: 65 timed out
add 68 - queue length: 18
Error: 66 timed out
add 69 - queue length: 19
add 70 - queue length: 20
add 71 - queue length: 21
Error: 67 timed out
add 72 - queue length: 22
Error: 68 timed out
add 73 - queue length: 23
Error: 69 timed out
Error: 70 timed out
Error: 71 timed out
add 74 - queue length: 24
Error: 72 timed out
Error: 73 timed out
add 75 - queue length: 25
add 76 - queue length: 26
Error: 74 timed out
add 77 - queue length: 27
add 78 - queue length: 28
Error: 75 timed out
Error: 76 timed out
add 79 - queue length: 29
add 80 - queue length: 30
Error: 77 timed out
Error: 78 timed out
add 81 - queue length: 31
add 82 - queue length: 32
add 83 - queue length: 33
Error: 79 timed out
Error: 80 timed out
add 84 - queue length: 34
Error: 81 timed out
add 85 - queue length: 35
Error: 82 timed out
Error: 83 timed out
add 86 - queue length: 36
Error: 84 timed out
add 87 - queue length: 37
Error: 85 timed out
add 88 - queue length: 38
add 89 - queue length: 39
add 90 - queue length: 40
add 91 - queue length: 41
Error: 86 timed out
add 92 - queue length: 42
Error: 87 timed out
Error: 88 timed out
add 93 - queue length: 43
Error: 89 timed out
Error: 90 timed out
Error: 91 timed out
Error: 92 timed out
add 94 - queue length: 44
add 95 - queue length: 45
add 96 - queue length: 46
add 97 - queue length: 47
Error: 93 timed out
add 98 - queue length: 48
Error: 94 timed out
Error: 95 timed out
Error: 96 timed out
add 99 - queue length: 49
Error: 97 timed out
add 100 - queue length: 50
Error: 98 timed out
Error: 99 timed out
Error: 100 timed out
4uqofj5v

4uqofj5v2#

只需使用包互斥体!
flutter pub add mutex

List carlist;
final locker = Mutex();

void receiveData(Map data) {
  // Need a lock here
  locker.acquire();
  try {
    for (var i = 0; i < carlist.lenght;) {
      if (data[carlist[i]['name']]['color'] == carlist[i]['color']) {
        carlist.removeAt(i);
        onDeleteCar(data); // Update the UI.
      }
    }
  } finally {
    // Need a release lock here
    locker.release();
  }
}

相关问题