celery 和弦回调的redis超时

ryevplcw  于 2021-06-10  发布在  Redis
关注(0)|答案(0)|浏览(355)

我想我对celery 的和弦回调有意见。我在里面搜索了这么多,但找不到任何有用的信息。
让我展示一些代码,然后解释这个问题。
我有以下celery :

execution_chord = chord(
                [
                    tasks.run_web_scanners.si(information).set(queue='fast_queue'),
                    tasks.run_ip_scans.si(information).set(queue='slow_queue')
                ],
                body=tasks.on_demand_scan_finished.si().set(queue='fast_queue'),
                immutable = True
            )
        execution_chord.apply_async(queue='fast_queue', interval=100)

现在, run_web_scanners 以及 run_ip_scans 两者都有和弦,这和弦不是在异步模式下调用的。主要是因为一个线程产生更多线程的想法是一个灾难的配方。
以下是 run_web_scanners 例如。

@shared_task
def run_web_scanners(scan_information):
    # Deep copy just in case
    web_information = copy.deepcopy(scan_information)

    execution_chord =chord(
        [
            # Fast_scans

            header_scan_task.s(web_information).set(queue='fast_queue')
        ],
        body=web_security_scan_finished.si().set(queue='fast_queue'),
        immutable=True)()
    return

扫描运行得很好(这些扫描有一个指示成功的body函数),但大多数时候我在回调时都会遇到超时错误 on_demand_scan_finished 被称为。
你知道会发生什么吗? run_web_scanners 以及 run_ip_scans 与所呈现的和弦的“构建”方式相同,身体功能每次都很好。
这是我收到的错误信息

[2020-06-22 18:12:07,397: ERROR/ForkPoolWorker-3] Chord '79af732d-8590-4a1a-bc3b-c76ad8edb742' raised: timeout()
...
...
/lib/python3.7/site-packages/kombu/transport/redis.py", line 382, in get
    raise Empty()
_queue.Empty

注意:我搜索了redis错误,但是考虑到它发生的情况,我认为这不是问题所在(可能是其他原因)
编辑:
满栈错误

[2020-06-24 14:05:48,419: ERROR/ForkPoolWorker-3] Chord '5f9d48d7-6531-4da7-97f6-fc530a8d649f' raised: timeout()
Traceback (most recent call last):
  File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 963, in drain_events
    get(self._deliver, timeout=timeout)
  File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/kombu/transport/redis.py", line 382, in get
    raise Empty()
_queue.Empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/celery/app/builtins.py", line 83, in unlock_chord
    propagate=True,
  File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/celery/result.py", line 822, in join_native
    on_message, on_interval):
  File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/celery/backends/base.py", line 644, in iter_native
    on_message=on_message, on_interval=on_interval,
  File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/celery/backends/amqp.py", line 280, in get_many
    wait(timeout=timeout)
  File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/kombu/connection.py", line 323, in drain_events
    return self.transport.drain_events(self.connection,**kwargs)
  File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 966, in drain_events
    raise socket.timeout()
socket.timeout

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题