Skip to main content

Celery 缺少的测试库。

项目描述

Celery Python 库缺少的测试库。Graveolens 可以帮助的一些事情包括:

  • 为 Celery 任务调用提供结果,而无需联系您的经纪人。

  • 确保您确切知道调用了哪些任务。

  • 轻松断言任务调用的参数。

  • 在 Celery 中使用send_task时轻松处理结果。

芹菜的二项式名称是Apium Graveolens

返回结果

from my_app.celery import app
import graveolens

def test_my_task():
    with graveolens.activate() as celery:
        celery.add('my_app.task', {'done': True, 'status': 'OK'})

        result = app.send_task('my_app.task', 'test', id=3)

        # The result is an EagerResult from Celery.
        assert result.get() == {'done': True, 'status': 'OK'}

        # You can also check ALL the calls that Celery received.
        assert len(celery.calls) == 1
        assert celery.calls[0].name == 'http://twitter.com/api/1/foobar'
        assert celery.calls[0].args == ('test', )
        assert celery.calls[0].kwargs == {'id': 3}

断言 Celery 调用

默认情况下,如果添加了结果但未使用,则会 在上下文管理器退出时引发AssertionError ,例如:

import graveolens

def test_my_task():
    with graveolens.activate() as celery:
        celery.add('my_app.task')

    # Assertion will be raised here because 'my_app.task' is never called.

这可以使用assert_all_tasks_call标志来 配置activate()

此外,如果在没有设置结果的情况下调用 Celery 任务, 则会引发Graveolens.NotMockedTask

from my_app.celery import app
import graveolens

def test_my_task():
    with graveolens.activate() as celery:
        try:
            result = app.send_task('my_app.task', 'test', id=3)
        except graveolens.NotMockedTask:
            # Exception will be raised since my_app.task has no result.
            pass

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

内置分布

Graveolens-0.1.4-py2.py3-none-any.whl (4.4 kB 查看哈希

已上传 py2 py3