Celery 缺少的测试库。
项目描述
Celery Python 库缺少的测试库。Graveolens 可以帮助的一些事情包括:
为 Celery 任务调用提供结果,而无需联系您的经纪人。
确保您确切知道调用了哪些任务。
轻松断言任务调用的参数。
在 Celery 中使用send_task时轻松处理结果。
芹菜的二项式名称是Apium Graveolens。
返回结果
from my_app.celery import app
import graveolens
def test_my_task():
with graveolens.activate() as celery:
celery.add('my_app.task', {'done': True, 'status': 'OK'})
result = app.send_task('my_app.task', 'test', id=3)
# The result is an EagerResult from Celery.
assert result.get() == {'done': True, 'status': 'OK'}
# You can also check ALL the calls that Celery received.
assert len(celery.calls) == 1
assert celery.calls[0].name == 'http://twitter.com/api/1/foobar'
assert celery.calls[0].args == ('test', )
assert celery.calls[0].kwargs == {'id': 3}
断言 Celery 调用
默认情况下,如果添加了结果但未使用,则会 在上下文管理器退出时引发AssertionError ,例如:
import graveolens
def test_my_task():
with graveolens.activate() as celery:
celery.add('my_app.task')
# Assertion will be raised here because 'my_app.task' is never called.
这可以使用assert_all_tasks_call标志来 配置activate()。
此外,如果在没有设置结果的情况下调用 Celery 任务, 则会引发Graveolens.NotMockedTask。
from my_app.celery import app
import graveolens
def test_my_task():
with graveolens.activate() as celery:
try:
result = app.send_task('my_app.task', 'test', id=3)
except graveolens.NotMockedTask:
# Exception will be raised since my_app.task has no result.
pass
项目详情
关
Graveolens -0.1.4-py2.py3-none-any.whl 的哈希值
| 算法 | 哈希摘要 | |
|---|---|---|
| SHA256 | d4311cfb664157bbe50e3074ce3ad2421ce69e0d57d815c9af11994254b45547 |
|
| MD5 | b75a8edc8d39b788dbd23ee68b0cc0a4 |
|
| 布莱克2-256 | 7960f7767ce467f24d6787bfc44cc3b6cd199bac016c2fedd0a63dfcafa25200 |