This document describes the current stable version of Celery (5.0). For development docs, go here.
celery.contrib.testing.worker
¶
API Reference¶
Embedded workers for integration tests.
-
class
celery.contrib.testing.worker.
TestWorkController
(*args: Any, **kwargs: Any)[source]¶ Worker that can synchronize on being fully started.
-
ensure_started
() → None[source]¶ Wait for worker to be fully up and running.
Warning
Worker must be started within a thread for this to work, or it will block forever.
-
on_consumer_ready
(consumer: celery.worker.consumer.Consumer) → None[source]¶ Callback called when the Consumer blueprint is fully started.
-
-
celery.contrib.testing.worker.
setup_app_for_worker
(app: Celery, loglevel: Union[str, int], logfile: str) → None[source]¶ Setup the app to be used for starting an embedded worker.
-
celery.contrib.testing.worker.
start_worker
(app: Celery, concurrency: int = 1, pool: str = 'solo', loglevel: Union[str, int] = 'error', logfile: str = None, perform_ping_check: bool = True, ping_task_timeout: float = 10.0, **kwargs: Any) → Iterable[source]¶ Start embedded worker.
- Yields
celery.app.worker.Worker – worker instance.