完成任务后插入celery taskresult

hgc7kmma  于 2021-06-10  发布在  Redis
关注(0)|答案(0)|浏览(295)

我对python中django框架中的celery 任务队列模块有一些奇怪的行为。
当所有变量都正确设置时,我的代码就会工作。但是实现一些独立于celery 的验证代码似乎会干扰celery 。我先解释一下我在编什么程序。我的完整代码在这篇文章的底部。
Jmeter 板用户可以执行通过redis发送给celery 工人的任务。这很管用。接下来,我还成功地实现了celery 进度后端,有了它的progressrecorder,我现在可以设置进度了。一些jquery、ajax、html和css在 Jmeter 板中启用了一个奇特的进度条。
我现在将这个progressrecorder与类中的一些自定义代码组合在一起。这同样有效。通过自定义代码,可以指定哪些python函数定义应该记录进度。我的代码是模块化的,所以函数定义可以在很多级别上重用,并且不是每个函数都应该登录到每个任务中。因此,开发人员应该指定哪些函数应该记录进度。在初始化类时,用户使用“display \u progress \u of \u defs”键执行此操作:

  1. progress_recorder = ProgressRecorder(self)
  2. progress_logging = LogTaskProgressV3(task_id=task_id, app_id=app_id,
  3. progress_recorder=progress_recorder, total_steps=total_steps,
  4. display_progress_of_defs=["function_def_a", "function_def_c"])

在类中,我添加了验证来确认是否设置了“display\u progress\u of \u defs”键。为了测试这一点,我没有设置这个键就运行了代码。任务已执行,但taskresult记录仅在任务完成后插入。从此以后,由于还没有任务的记录,日志记录进度是不可能的。
在完成代码之前,我的问题是:为什么会发生,我如何解决它?
celery 任务:

  1. @shared_task(bind=True)
  2. def go_to_sleep_test(self, duration):
  3. """ Testing out Celery and the custom progress observer-class. """
  4. app_id = 'fictional-app'
  5. total_steps = 3
  6. # Retrieve task id
  7. task_id = current_task.request.id
  8. print(f"Starting 'go_to_sleep_test' with")
  9. print(f" app_id = {app_id}")
  10. print(f" task_id = {task_id}")
  11. print(f" total_steps = {total_steps}")
  12. # Start logging progress
  13. progress_recorder = ProgressRecorder(self)
  14. progress_logging = LogTaskProgressV3(task_id=task_id, app_id=app_id,
  15. progress_recorder=progress_recorder, total_steps=total_steps)
  16. progress_logging.init_function_def(function_def_name="go_to_sleep_test", total_steps=total_steps)
  17. # Initial logging
  18. progress_logging.log_progress(current_step=0,
  19. description="Task started")
  20. ################
  21. # PERFORM TASK #
  22. ################
  23. sleep(2)
  24. progress_logging.log_progress(current_step=1,
  25. description="Step 1")
  26. sleep(2)
  27. progress_logging.log_progress(current_step=2,
  28. description="Step 2")
  29. sleep(2)
  30. progress_logging.log_progress(current_step=3,
  31. description="Step 3")
  32. sleep(2)
  33. # Finish task
  34. return 'Done'

自定义进度观察者类。这与上述任务在同一个文件中。

  1. class LogTaskProgressV3:
  2. """ Log task progress """
  3. def __init__(self, task_id="not_given", app_id="not_given", progress_recorder=None, total_steps=100,
  4. display_progress_of_defs=[]):
  5. if display_progress_of_defs is None:
  6. display_progress_of_defs = []
  7. self.task_id = task_id
  8. self.app_id = app_id
  9. self.function_def_name = "Run init_function_def first."
  10. self.progress_recorder = progress_recorder
  11. self.total_steps = total_steps
  12. self.display_progress_of_defs = display_progress_of_defs
  13. print(f"self.task_id = {self.task_id}")
  14. print(f"self.app_id = {self.app_id}")
  15. print(f"self.function_def_name = {self.function_def_name}")
  16. print(f"self.progress_recorder = {self.progress_recorder}")
  17. print(f"self.total_steps = {self.total_steps}")
  18. print(f"self.display_progress_of_defs = {self.display_progress_of_defs}")
  19. def init_function_def(self, function_def_name="Function definition name not given.", total_steps=100):
  20. """ Each function definition has it's total amount of steps. Therefore set at start of function definition. """
  21. self.function_def_name = function_def_name
  22. self.total_steps = total_steps
  23. print(f"self.function_def_name = {self.function_def_name}")
  24. print(f"self.total_steps = {self.total_steps}")
  25. def log_progress(self, current_step, description):
  26. print("Now logging progress.")
  27. print(f"Confirming progress logging for '{self.function_def_name}' is requested:")
  28. print(f"self.display_progress_of_defs = {self.display_progress_of_defs}")
  29. # Without the below validation, the code works.
  30. if len(self.display_progress_of_defs) == 0:
  31. print("Progress logging not requested.")
  32. return
  33. elif self.function_def_name not in self.display_progress_of_defs:
  34. print("Progress logging not requested.")
  35. return
  36. print("Progress logging is requested.")
  37. # End of validation.
  38. # Log progress in celery model
  39. self.progress_recorder.set_progress(current=current_step,
  40. total=self.total_steps,
  41. description=description)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题