python 控制流返回到调用者的嵌套的Pencio协程

plupiseo  于 2024-01-05  发布在  Python
关注(0)|答案(1)|浏览(161)

我有一系列的异步步骤要做,我想把它们放在一个方法中。每一步都生成一个子结果,并诱导下一个异步步骤,它再次生成一个子结果,直到到达最后一步。
看看我的代码:

  1. class TextDetector(Detector):
  2. lock: asyncio.Lock = asyncio.Lock()
  3. async def detect(self, input: TextDetectorInput) -> TextDetectorResult:
  4. pass
  5. class TextAugmentationProcedure(AugmentationProcedure):
  6. async def augment(
  7. self, input: AugmentationProcedureTextInput
  8. ) -> AugmentationProcedureTextOutput:
  9. pass
  10. TextDetectorCoolDownAndResultType = Awaitable[Awaitable[ImageDetectorResult]]
  11. RunTextDetectionType = AsyncIterator[tuple[DetectorIdentifier, TextDetector, TextDetectorCoolDownAndResultType]]
  12. GetFilesTextType = AsyncIterator[tuple[InputFileIdentifier, TextInputFile, AugmentationProcedureTextOutput, RunTextDetectionType]]
  13. CheckTextType = AsyncIterator[tuple[AugmentationProcedureIdentifier, TextAugmentationProcedure, GetFilesTextType]]
  14. @dataclass
  15. class TextTestCase(TestCase):
  16. inputs: dict[InputFileIdentifier, TextInputFile]
  17. augmentation_procedures: dict[AugmentationProcedureIdentifier, TextAugmentationProcedure]
  18. detectors: dict[DetectorIdentifier, TextDetector]
  19. async def get_files(self, augmentation_procedure: TextAugmentationProcedure) -> GetFilesTextType:
  20. for input_identifier, text_file in self.inputs.items():
  21. augmentation_input = await read_text_file(
  22. text_file.file_name, text_file.language
  23. )
  24. augmentation_output = await augmentation_procedure.augment(augmentation_input)
  25. detector_input = TextDetectorInput(
  26. augmentation_output.language, augmentation_output.text
  27. )
  28. yield (input_identifier, text_file, augmentation_output, self.run_detection(detector_input))
  29. async def run_detection(self, detector_input: TextDetectorInput) -> RunTextDetectionType:
  30. for detector_identifier, detector in self.detectors.items():
  31. async def cooldown_and_detect(detector: TextDetector, detector_input: TextDetectorInput):
  32. # Acquire lock
  33. with detector.lock
  34. # Cooldown
  35. cooleddown = await detector.cooldown()
  36. return detector.detect(detector_input)
  37. yield (detector_identifier, detector, cooldown_and_detect(detector, detector_input))
  38. async def check(self) -> CheckImageType:
  39. for augmentation_procedure_identifier, augmentation_procedure in self.augmentation_procedures.items():
  40. yield (augmentation_procedure_identifier, augmentation_procedure, self.get_files(augmentation_procedure))

字符串
基本上,我想在TextTestCase的示例上调用方法check(...)时得到子结果。有趣的部分是run_detection()。对于每个检测器,都应该通知调用者。之后,获取一个锁。然后调用detector.cooldown()并等待。如果等待,则应该通知调用者并调用detector.detect()。当结果可用时,则应该通知调用方并且应该释放锁。
目前,我用以下方式调用check()

  1. test_case = TextTestCase()
  2. async for (augmentation_procedure_identifier, augmentation_procedure, augmentation_results) in test_case.check():
  3. async for (file_identifier, image_file, augmentation_output_awaitable, detectors) in augmentation_results:
  4. results.append([image_file.file_name, str(procedure), "Augmenting...", ""])
  5. live.update(update_table_with_results(results))
  6. async for (detector_identifier, detector, cooldown_awaitable) in detectors:
  7. try:
  8. detection_awaitable = await cooldown_awaitable
  9. detection_result = await detection_awaitable
  10. # TODO: Do stuff here
  11. except:
  12. pass
  13. # Error occured
  14. # TODO: print error


因为cooldown_and_detect() * 返回 * 一个可等待的detector.detect(),上下文管理器显然会在可等待的detector.detect()返回时释放锁,也就是在detector.cooldown()被等待并且detector.detect()被触发之后。但是我想在detector.detect()被等待之后释放它,但是我仍然想把控制流传递给调用者。

rqqzpn5f

rqqzpn5f1#

从上面的代码中理解你想要的东西确实有点混乱,但是通过添加适当的回调,一切都应该是可能的。也就是说,如果你的任务可以在一个可以并行运行的普通非循环流中描述,你应该只使用await和上下文管理器,并且只使用一个await.TaskGroup,或者调用gather,其中包含您希望并行运行的所有“根级别”任务。
否则,由于您希望自定义管理中间资源,例如由lock保护的资源,因此可以显式调用acquire,并在相应步骤的done回调中释放锁,而不是将锁用作上下文管理器(with块),或者无条件地在finally块上释放它。
正如我之前所写的,你的例子看起来比它所能写的更做作,所以一个基于你的代码的很好的例子可以被写出来-请阅读关于发布minimal reproducible example before asking your next question -的文章。
也就是说,如果我能正确理解您的意思,那么沿着下面的代码为您的内部cooldown_and_detect协同例程所做的一些事情可能会产生您想要的效果。

  1. async def cooldown_and_detect(detector, detector_input: TextDetectorInput):
  2. # Acquire lock
  3. await detector.lock.acquire()
  4. try: # Cooldown
  5. cooleddown = await detector.cooldown()
  6. except Exception as error:
  7. # something wrong with cooldown - release lcok
  8. detector.lock.release()
  9. raise
  10. detector_task = asyncio.create_task(detector.detect(detector_input))
  11. detector_task.add_done_callback(lambda task: detector.lock.release())
  12. return detector_task

字符串

展开查看全部

相关问题