python pytest中的Celery_worker夹具,连接已关闭错误

brc7rcf0  于 2023-02-21  发布在  Python
关注(0)|答案(1)|浏览(128)

我有如此完美的结构

import pytest

@pytest.mark.django_db
class TestClass:
    def test_celery_mht_notification_create(self, celery_worker, user):
        # some test logic

当我使用celery_worker fixture时,我得到这样的错误psycopg2.InterfaceError: connection already closed
怎么解决呢?

ctzwtxfj

ctzwtxfj1#

根据他们在pytest-django上的issue created中提供的解决方案:
另一方面,我的一个变通方法是使用@pytest.mark.django_db(transaction=True)作为事务测试运行
在第一个链接的评论线程中再挖掘一点,IMHO是一个更干净的解决方案,沿着解释了为什么会发生这种情况:
例如,如果您正在使用Py.test,并且希望使用进程内工作线程,则可以执行以下操作

def pytest_configure():
    from celery.fixups.django import DjangoWorkerFixup
    DjangoWorkerFixup.install = lambda x: None

禁用工作进程链接地址信息,在此上下文中不需要这些链接地址信息。

相关问题