Python队列只保留具有特定条件的最新元素

b1payxdu  于 2024-01-05  发布在  Python
关注(0)|答案(3)|浏览(181)

假设在传统的生产者/消费者模式中,生产者将生产两种类型的元素,AB。缓冲队列应实现以下逻辑:

  • 执行绪安全。
  • FIFO
  • 所有A都可以安全地留在队列中,这表示重要任务。
  • 每当新的B到达时,将队列中所有在前的B踢出,并附加到队列的末尾,这表示最新的非重要任务。
  • 应保留元素的顺序。
  • 消费流行元素就像平常一样。

为简单起见,假设队列的大小是无限的。
不使用2个队列是否可以实现此功能?

ne5o7dgx

ne5o7dgx1#

你可能可以子类化一个collections.deque并得到类似这样的结果:

  1. from collection import deque
  2. class Foo(deque[str]):
  3. def append(self, val: str):
  4. if val == 'A':
  5. super().append(val)
  6. else:
  7. try:
  8. while True:
  9. super().remove('B')
  10. except ValueError:
  11. pass
  12. super().append(val)

字符串

  • 注意:* while True块在这里,以防你从一个可迭代对象初始化队列,并且有超过1个“B”来开始,因为没有什么能阻止你这样做:q = Foo(['B', 'B', 'B', 'B'])。如果你想,你也可以ovverride父对象的__init __,以确保队列中总是只有1个'B'

然后你可以继续把任务放在队列中:

  1. tasks = ['A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A']
  2. q = Foo()
  3. for i in tasks:
  4. q.append(i)
  5. print(q)
  6. Out:
  7. Foo(['A'])
  8. Foo(['A', 'A'])
  9. Foo(['A', 'A', 'A'])
  10. Foo(['A', 'A', 'A', 'B'])
  11. Foo(['A', 'A', 'A', 'B', 'A'])
  12. Foo(['A', 'A', 'A', 'A', 'B'])
  13. Foo(['A', 'A', 'A', 'A', 'B', 'A'])
  14. Foo(['A', 'A', 'A', 'A', 'A', 'B'])
  15. Foo(['A', 'A', 'A', 'A', 'A', 'B', 'A'])
  16. Foo(['A', 'A', 'A', 'A', 'A', 'B', 'A', 'A'])


最后,使用popleft()以FIFO顺序获取元素。
deque也应该是线程安全的:https://docs.python.org/3/library/collections.html#collections.deque

展开查看全部
gpfsuwkq

gpfsuwkq2#

与其试图从队列中驱逐B,不如在弹出B时检查它是否是队列中唯一的一个,这可能是最简单的方法。

  1. queue = deque()
  2. def push(x):
  3. queue.append(x)
  4. def pop(x):
  5. while True:
  6. result = queue.popleft()
  7. if result == 'B' and 'B' in queue:
  8. continue
  9. return result

字符串
再多做一点努力,你就可以统计队列中B的数量,只在计数下降到0时才处理B。这将保存'B' in queue的成本,但计数必须通过一个锁来访问才是线程安全的。

ddarikpa

ddarikpa3#

从潜在的长任务队列中踢出现有的不重要任务的更有效的方法是使用双向链表作为队列,并维护对不重要任务的节点的引用(如果有的话),以便可以在 O(1) 时间复杂度内删除引用的节点。
使用llist.dllist,一个来自llist模块的双向链表的优秀实现:

  1. from llist import dllist
  2. from dataclasses import dataclass
  3. @dataclass
  4. class Task:
  5. name: str
  6. class UnimportantTask(Task):
  7. pass
  8. class Tasks:
  9. def __init__(self):
  10. self.queue = dllist()
  11. self.unimportant_task = None
  12. def add(self, task):
  13. node = self.queue.appendright(task)
  14. if isinstance(task, UnimportantTask):
  15. if self.unimportant_task:
  16. self.queue.remove(self.unimportant_task)
  17. self.unimportant_task = node
  18. def next(self):
  19. if self.queue:
  20. return self.queue.popleft()

字符串
以便:

  1. tasks = Tasks()
  2. tasks.add(Task('A1'))
  3. tasks.add(Task('A2'))
  4. tasks.add(UnimportantTask('B1'))
  5. tasks.add(Task('A3'))
  6. tasks.add(UnimportantTask('B2'))
  7. tasks.add(UnimportantTask('B3'))
  8. tasks.add(Task('A4'))
  9. while task := tasks.next():
  10. print(task)


产出:

  1. Task(name='A1')
  2. Task(name='A2')
  3. Task(name='A3')
  4. UnimportantTask(name='B3')
  5. Task(name='A4')


演示:https://replit.com/@blhsing1/UnimportantTaskQueue#main.py

展开查看全部

相关问题