使Django测试用例数据库对Celery可见

iezvtpos  于 2023-10-21  发布在  Go
关注(0)|答案(4)|浏览(133)

当Django测试用例运行时,它会创建一个隔离的测试数据库,以便在每个测试完成时回滚数据库写入。我试图用Celery创建一个集成测试,但我不知道如何将Celery连接到这个短暂的测试数据库。在简单的设置中,保存在Django中的对象对Celery是不可见的,而保存在Celery中的对象无限期地存在。
下面是一个示例测试用例:

import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response

class MyTestCase(APITestCase):
    @classmethod
    def setUpTestData(cls):
        # This object is not visible to Celery
        MyModel(id='test_object').save()

    def test_celery_integration(self):
        # This view spawns a Celery task
        # Task should see MyModel.objects.get(id='test_object'), but can't
        http_response = self.client.post('/', 'test_data', format='json')

        result = get_result_from_response(http_response)
        result.get()  # Wait for task to finish before ending test case
        # Objects saved by Celery task should be deleted, but persist

我有两个问题:
1.如何让Celery看到Django测试用例中的对象?
1.如何确保Celery保存的所有对象在测试完成后自动回滚?
如果自动清理对象是不可能的,我愿意手动清理对象,但是删除tearDown中的对象,即使是APISimpleTestCase中的对象,似乎也会回滚。

wlp8pajw

wlp8pajw1#

这可以通过在Django测试用例中启动Celery worker来实现。

背景

Django的内存数据库是sqlite3。正如the description page for Sqlite in-memory databases上所说,“所有共享内存数据库的数据库连接都需要在同一个进程中。”这意味着,只要Django使用内存测试数据库,并且Celery在单独的进程中启动,Celery和Django就不可能共享测试数据库。
但是,使用celery.contrib.testing.worker.start_worker,可以在同一进程内的单独线程中启动Celery worker。这个工作线程可以访问内存中的数据库。
这里假设Celery已经在the usual way中安装了Django项目。

解决方案

因为Django-Celery涉及到一些跨线程的通信,所以只有不在隔离事务中运行的测试用例才能工作。测试用例必须直接继承自SimpleTestCase或其Rest等效项APISimpleTestCase,并将databases设置为'__all__'或仅设置为测试交互的数据库。
关键是在TestCasesetUpClass方法中启动Celery worker,并在tearDownClass方法中关闭它。关键函数是celery.contrib.testing.worker.start_worker,它需要当前Celery应用程序的示例,可能是从mysite.celery.app获得的,并返回一个Python ContextManager,它有__enter____exit__方法,必须分别在setUpClasstearDownClass中调用。可能有一种方法可以避免使用装饰器或其他东西手动输入和存在ContextManager,但我无法找到它。下面是一个tests.py文件示例:

from celery.contrib.testing.worker import start_worker
from django.test import SimpleTestCase

from mysite.celery import app

class BatchSimulationTestCase(SimpleTestCase):
    databases = '__all__'

    @classmethod
    def setUpClass(cls):
        super().setUpClass()

        # Start up celery worker
        cls.celery_worker = start_worker(app, perform_ping_check=False)
        cls.celery_worker.__enter__()

    @classmethod
    def tearDownClass(cls):
        super().tearDownClass()

        # Close worker
        cls.celery_worker.__exit__(None, None, None)

    def test_my_function(self):
        # my_task.delay() or something

不管出于什么原因,测试工作者尝试使用名为'celery.ping'的任务,可能是为了在工作者失败的情况下提供更好的错误消息。它正在寻找的任务是celery.contrib.testing.tasks.ping,在测试时不可用。将start_workerperform_ping_check参数设置为False将跳过对此的检查并避免相关错误。
现在,当测试运行时,不需要启动单独的Celery过程。Celery worker将在Django测试过程中作为一个单独的线程启动。这个worker可以查看任何内存中的数据库,包括默认的内存中测试数据库。start_worker中提供了一些选项来控制worker的数量,但默认值似乎是单个worker。

ymdaylpp

ymdaylpp2#

对于你的单元测试,我建议跳过celery依赖,下面的两个链接将为你提供启动单元测试的基本信息:

如果你真的想测试celery函数调用,包括一个队列,我可能会设置一个dockercompose与服务器,工人,队列组合,并从django-celery文档扩展自定义的dockerTestRunner。但我不会看到它的好处,因为测试系统是pbly到远离生产的代表性。

i86rm4rw

i86rm4rw3#

我发现了另一个解决方案的解决方案的基础上,@ dragonist的一个:
在调用start_worker(app)之前调用celery.contrib.testing.app.TestApp()

from celery.contrib.testing.worker import start_worker
from celery.contrib.testing.app import TestApp

from myapp.tasks import app, my_task

class TestTasks:
    def setup(self):
        TestApp()
        self.celery_worker = start_worker(app)
        self.celery_worker.__enter__()

    def teardown(self):
        self.celery_worker.__exit__(None, None, None)
ca1c2owp

ca1c2owp4#

我发现了一种有用的方法来测试celery任务,而不需要写入主数据库。
你可以使用unittest.mock.patch来替换celery函数。

import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response
from unittest.mock import patch

class MyTestCase(APITestCase):
    @classmethod
    def setUpTestData(cls):
        # This object is not visible to Celery
        MyModel(id='test_object').save()

    @patch('appName.views.yourView.yourCeleryTask.delay', new=yourCeleryTask)
    def test_celery_integration(self):
        # This view spawns a Celery task
        # Task should see MyModel.objects.get(id='test_object'), but can't
        http_response = self.client.post('/', 'test_data', format='json')

        result = get_result_from_response(http_response)
        result.get()  # Wait for task to finish before ending test case
        # Objects saved by Celery task should be deleted, but persist

希望这能有所帮助,而不是调用celery worker来执行任务,视图本身将在测试时执行任务

相关问题