rabbitmq Celery First Steps -在result.get()上出现超时错误

m1m5dgzv  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(4)|浏览(218)

我在这里遵循celery 的第一步教程:http://celery.readthedocs.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results
我按照教程的原样使用RabbitMQ。
当我执行result.get(timeout=1)时,它显示了超时错误,即使它是一个简单的add操作,而且我可以在另一个窗口中看到worker正在运行并生成正确的结果(结果为8

  1. (venv) C:\Volt\celerytest>ipython
  2. Python 2.7.6 (default, Nov 10 2013, 19:24:18) [MSC v.1500 32 bit (Intel)]
  3. Type "copyright", "credits" or "license" for more information.
  4. IPython 2.1.0 -- An enhanced Interactive Python.
  5. ? -> Introduction and overview of IPython's features.
  6. %quickref -> Quick reference.
  7. help -> Python's own help system.
  8. object? -> Details about 'object', use 'object??' for extra details.
  9. In [1]: from tasks import add
  10. In [2]: a = add(1,3)
  11. In [3]: a
  12. Out[3]: 4
  13. In [4]: a = add.delay(1,3)
  14. In [5]: a.ready()
  15. Out[5]: False
  16. In [6]: a = add.delay(4,4)
  17. In [7]: a.get(timeout=0.5)
  18. ---------------------------------------------------------------------------
  19. TimeoutError Traceback (most recent call last)
  20. <ipython-input-7-2c407a92720e> in <module>()
  21. ----> 1 a.get(timeout=0.5)
  22. C:\Users\Som\Envs\venv\lib\site-packages\celery\result.pyc in get(self, timeout,
  23. propagate, interval, no_ack, follow_parents)
  24. 167 interval=interval,
  25. 168 on_interval=on_interval,
  26. --> 169 no_ack=no_ack,
  27. 170 )
  28. 171 finally:
  29. C:\Users\Som\Envs\venv\lib\site-packages\celery\backends\amqp.pyc in wait_for(se
  30. lf, task_id, timeout, cache, propagate, no_ack, on_interval, READY_STATES, PROPA
  31. GATE_STATES,**kwargs)
  32. 155 on_interval=on_interval)
  33. 156 except socket.timeout:
  34. --> 157 raise TimeoutError('The operation timed out.')
  35. 158
  36. 159 if meta['status'] in PROPAGATE_STATES and propagate:
  37. TimeoutError: The operation timed out.
  38. In [8]:

任务.py文件

  1. from celery import Celery
  2. app = Celery('tasks', backend='amqp', broker='amqp://')
  3. @app.task
  4. def add(x, y):
  5. return x + y

工作进程日志

  1. [tasks]
  2. . tasks.add
  3. [2014-07-17 13:00:33,196: INFO/MainProcess] Connected to amqp://guest:**@127.0.0
  4. .1:5672//
  5. [2014-07-17 13:00:33,211: INFO/MainProcess] mingle: searching for neighbors
  6. [2014-07-17 13:00:34,220: INFO/MainProcess] mingle: all alone
  7. [2014-07-17 13:00:34,240: WARNING/MainProcess] celery@SomsPC ready.
  8. [2014-07-17 13:00:34,242: INFO/MainProcess] Received task: tasks.add[85ff75d8-38
  9. b5-442a-a574-c8b976a33739]
  10. [2014-07-17 13:00:34,243: INFO/MainProcess] Task tasks.add[85ff75d8-38b5-442a-a5
  11. 74-c8b976a33739] succeeded in 0.000999927520752s: 4
  12. [2014-07-17 13:00:46,582: INFO/MainProcess] Received task: tasks.add[49de7c6b-96
  13. 72-485d-926e-a4e564ccc89a]
  14. [2014-07-17 13:00:46,588: INFO/MainProcess] Task tasks.add[49de7c6b-9672-485d-92
  15. 6e-a4e564ccc89a] succeeded in 0.00600004196167s: 8
eni9jsuy

eni9jsuy1#

我经历了完全相同的问题后,通过“celery 的第一步”。
我认为这是建议backend='amqp'的原因。
对我有效的设置如下:

  1. app = Celery('tasks', broker='amqp://guest@localhost//')
  2. app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

根据文档,当使用AMQP结果后端时,每个结果只能被检索一次(它实际上是查询中的单个消息)。
我猜想,你的工作进程检索它是为了把结果打印到控制台:
Task tasks.add[49de7c6b-9672-485d-926e-a4e564ccc89a] succeeded in 0.00600004196167s: 8
因此无法再次检索到相同的结果。

n9vozmp4

n9vozmp42#

如果你看一下this thread,设置--pool=solo似乎也解决了这个问题。

tcbh2hod

tcbh2hod3#

有时候我也会用redis接收TimeoutError,所以我实现了helper函数:

  1. celery_app.update(
  2. redis_socket_timeout=5,
  3. redis_socket_connect_timeout=5,
  4. )
  5. def run_task(task, *args,**kwargs):
  6. timeout = 2 * 60
  7. future = task.apply_async(args, kwargs)
  8. time_end = time.time() + timeout
  9. while True:
  10. try:
  11. return future.get(timeout=timeout)
  12. except redis.TimeoutError:
  13. if time.time() < time_end:
  14. continue
  15. raise
展开查看全部
6qfn3psc

6qfn3psc4#

我知道我的答案晚了,但也许它会帮助一些人。
您只需要在配置完后端后重新启动已经运行的worker。您可以在First Steps页面上找到有关此操作的信息,但仅在本文的最后。
确保您没有任何旧的worker仍在运行。
很容易意外启动多个工作线程,因此在启动新的工作线程之前,请确保已正确关闭前一个工作线程。
未配置预期结果后端的旧工作进程可能正在运行,并正在劫持任务。

相关问题