This document describes the current stable version of pytest_celery (1.0). For development docs, go here.

How to connect signal handlers

Release:

1.0

Date:

Apr 29, 2024

Signal handlers may be defined in the publisher or the consumer side or both. When done on the publisher side, they can be connected inside the scope of the test function using the standard Celery API. When done on the consumer side, they can be connected using injected signal handlers modules, which we’ll cover in this guide.

The plugin uses its Code Generation mechanism to inject signal handlers modules into the worker container. The available signal handlers can be configured differently for each test case using the Fixture availability feature of pytest.

This guide will teach you how to utilize this mechanism to connect signal handlers to your Celery workers in your test cases.

Note

If you already understand how the initialization pipeline works, you can skip to the Signal handlers modules injection section.

Worker Pipeline Breakdown

New in version 1.0.0.

Each worker component is built using a pipeline of fixtures that control each layer and is responsible for preparing the worker for the test case. Let’s see how our Built-in Celery Worker is built to understand each step in the pipeline.

Initialization Pipeline

The worker component is initialized using a container and a node that is responsible for integrating the component with the test case.

The component’s node creation fixture receives the worker container during its initialization.

@pytest.fixture
def celery_setup_worker(
    ...
    default_worker_container: CeleryWorkerContainer,
    default_worker_app: Celery,
    ...
) -> CeleryTestWorker:

Container

The default worker container receives its configuration from the default worker fixtures. Each fixture is responsible for a different layer of the initialization procedure.

default_worker_container = container(
    image="{celery_base_worker_image.id}",
    ports=fxtr("default_worker_ports"),
    environment=fxtr("default_worker_env"),
    network="{default_pytest_celery_network.name}",
    volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
    wrapper_class=CeleryWorkerContainer,
    timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT,
    command=fxtr("default_worker_command"),
)
Image

The image is built using the built-in Dockerfile and is provided to the container using the following fixture.

celery_base_worker_image = build(
    path=WORKER_DOCKERFILE_ROOTDIR,
    tag="pytest-celery/components/worker:default",
    buildargs={
        "CELERY_VERSION": fxtr("default_worker_celery_version"),
        "CELERY_LOG_LEVEL": fxtr("default_worker_celery_log_level"),
        "CELERY_WORKER_NAME": fxtr("default_worker_celery_worker_name"),
        "CELERY_WORKER_QUEUE": fxtr("default_worker_celery_worker_queue"),
    },
)
Environment

Environment variables are provided to the worker container during initialization using the Vendor Class.

The worker receives the broker and result backend configurations from the environment variables by default using the celery_worker_cluster_config fixture, which is initialized using celery_broker_cluster_config and celery_backend_cluster_config fixtures, to provide the worker with the broker and result backend configurations according to the configured broker and backend clusters.

Network

The worker will use the default network that will be created for each test case to allow the worker component to communicate with the other components.

The network isolation allows multiple setups to run in parallel without interfering with each other.

Volumes

The plugin provides a special volume that is designed to provide improved testing control over the worker component initialization and functionality.

To practically install the pytest-celery plugin inside the worker component, the worker container needs to be using the default volume.

default_worker_container = container(
    ...
    volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
    ...
)

This will use the following binding to mount the plugin volume into the worker container.

WORKER_VOLUME = {
    "bind": "/app",
    "mode": "rw",
}

Note

The default volume may be replaced or reconfigured if needed, by providing your own volume configuration dict to the worker container.

More volumes can be added to the worker container to accommodate more complex testing scenarios, or to provide additional configuration options to the worker component. For example, the current project can be added as a mounted volume alongside the default volume to provide the worker with the project code and configuration.

volumes={
    "{default_worker_volume.name}": defaults.DEFAULT_WORKER_VOLUME,
    os.path.abspath(os.getcwd()): {
        "bind": "/target/path/in/worker",
        "mode": "rw",
    },
},

Tip

When Debugging with VSCode, the bind value is what should be set for the remoteRoot in the launch.json configuration.

Wrapper Class

The wrapper_class is responsible for providing the configuration class that will be used to initialize the worker container instance.

The wrapper_class must be a subclass of CeleryWorkerContainer.

See more: Fixture wrappers.

Timeout

The timeout defines the time pytest will wait for the worker container to be ready before raising a timeout exception.

By default, the timeout is set to accommodate parallel test runs and to provide a reasonable time for the worker to be ready in most cases. Feel free to experiment and adjust the timeout according to your needs, or use DEFAULT_WORKER_CONTAINER_TIMEOUT to apply the default timeout.

Command

The command field allows to override the worker container CMD instruction instead of the CMD defined in the Dockerfile using the default_worker_command fixture.

If the CMD instruction is provided in the Dockerfile, the command field can be omitted.

Sequence Diagram

The following diagram describes the worker component initialization pipeline described above.

sequenceDiagram autonumber participant WCI as Worker Component Initialization participant CF as Container Fixture participant DF as Dockerfile participant EV as Environment Variables participant Net as Network participant Vol as Volumes participant PV as Plugin Volume participant TO as Timeout participant Cmd as Command participant WC as Wrapper Class participant CWCC as CeleryWorkerContainer Class participant NF as Node Fixture WCI->>CF: Initiates CF->>DF: Builds Image From CF->>EV: Sets CF->>Net: Connects to CF->>Vol: Mounts Vol->>PV: Includes CF->>TO: Sets CF->>Cmd: Sets CF->>WC: Manages with WC->>CWCC: Inherits from CF->>WCI: Create Worker Container WCI->>NF: Integrates the container into its node NF->>WCI: Node Ready, worker initialization completed

Configuration Pipeline

The worker uses the default_worker_initial_content fixture to provide the worker with the initial content that will be used to configure the worker component’s container volume.

@pytest.fixture
def default_worker_initial_content(
    ...
    default_worker_app_module: ModuleType,
    default_worker_utils_module: ModuleType,
    default_worker_tasks: set,
    default_worker_signals: set,
    default_worker_app: Celery,
    ...
) -> dict:

It uses the default worker fixtures to allow configuring every part of the volume using the standard pytest fixtures mechanism by itself without hooking into the default_worker_initial_content fixture directly.

The volume initialization integrates into the initialization pipeline by injecting worker configurations and files into the worker container to control the Celery app instance and provide enhanced testing capabilities.

sequenceDiagram autonumber participant WCI as Worker Component<br>Initialization participant CF as Container Fixture participant V as Volumes participant DCI as Default Configuration Injection participant WN as Worker Node WCI->>CF: Initializes Container CF->>V: Prepares Volumes V->>DCI: Injects<br>default_worker_app_module,<br>default_worker_utils_module,<br>default_worker_tasks,<br>default_worker_signals,<br>default_worker_app DCI->>CF: Finishes Volume<br>Configuration CF->>WN: Finalizes Worker<br>Container Initialization WN->>WCI: Add container to node,<br>Worker Component Initialization Completed

Signal handlers modules injection

New in version 1.0.0.

To add your own signal handlers, use the default_worker_signals fixture.

@pytest.fixture
def default_worker_signals(default_worker_signals: set) -> set:
    from tests import signals

    default_worker_signals.add(signals)
    return default_worker_signals

For example, we can review the plugin’s tests to see how the signal handlers are connected.

signals.py

This module contain our signal handlers which we want to connect on the consumer side.

tests.smoke.signals

from celery.signals import worker_init
from celery.signals import worker_process_init
from celery.signals import worker_process_shutdown
from celery.signals import worker_ready
from celery.signals import worker_shutdown


@worker_init.connect
def worker_init_handler(sender, **kwargs):  # type: ignore
    print("worker_init_handler")


@worker_process_init.connect
def worker_process_init_handler(sender, **kwargs):  # type: ignore
    print("worker_process_init_handler")


@worker_process_shutdown.connect
def worker_process_shutdown_handler(sender, pid, exitcode, **kwargs):  # type: ignore
    print("worker_process_shutdown_handler")


@worker_ready.connect
def worker_ready_handler(sender, **kwargs):  # type: ignore
    print("worker_ready_handler")


@worker_shutdown.connect
def worker_shutdown_handler(sender, **kwargs):  # type: ignore
    print("worker_shutdown_handler")

test_signals.py

These tests demonstrate how to query the output of the signal handlers that were injected into the worker container alongside inline signal handlers connected on the publisher side.

tests.smoke.test_signals
    @pytest.mark.parametrize(
        "log, control",
        [
            ("worker_init_handler", None),
            ("worker_process_init_handler", None),
            ("worker_ready_handler", None),
            ("worker_process_shutdown_handler", "shutdown"),
            ("worker_shutdown_handler", "shutdown"),
        ],
    )
    def test_sanity(self, celery_setup: CeleryTestSetup, log: str, control: str):
        if control:
            celery_setup.app.control.broadcast(control)
        celery_setup.worker.assert_log_exists(log)

    def test_before_task_publish(self, celery_setup: CeleryTestSetup):
        @before_task_publish.connect
        def before_task_publish_handler(*args, **kwargs):
            nonlocal signal_was_called
            signal_was_called = True

        signal_was_called = False
        noop.s().apply_async(queue=celery_setup.worker.worker_queue)
        assert signal_was_called is True

    def test_after_task_publish(self, celery_setup: CeleryTestSetup):
        @after_task_publish.connect
        def after_task_publish_handler(*args, **kwargs):
            nonlocal signal_was_called
            signal_was_called = True

        signal_was_called = False
        noop.s().apply_async(queue=celery_setup.worker.worker_queue)
        assert signal_was_called is True