This document describes the current stable version of pytest_celery (1.0). For development docs, go here.

How to inject your own utility functions

Release:

1.0

Date:

Apr 29, 2024

The plugin injects a special utils.py module into the worker component to provide enhanced testing capabilities over the Celery worker component. The module contains API that is accessible using the CeleryTestWorker API.

This guide will teach you how to inject your own utility functions into the worker component using this mechanism.

Note

If you already understand how the initialization pipeline works, you can skip to the Custom Utility Functions section.

Worker Pipeline Breakdown

New in version 1.0.0.

Each worker component is built using a pipeline of fixtures that control each layer and is responsible for preparing the worker for the test case. Let’s see how our Built-in Celery Worker is built to understand each step in the pipeline.

Initialization Pipeline

The worker component is initialized using a container and a node that is responsible for integrating the component with the test case.

The component’s node creation fixture receives the worker container during its initialization.

@pytest.fixture
def celery_setup_worker(
    ...
    default_worker_container: CeleryWorkerContainer,
    default_worker_app: Celery,
    ...
) -> CeleryTestWorker:

Container

The default worker container receives its configuration from the default worker fixtures. Each fixture is responsible for a different layer of the initialization procedure.

default_worker_container = container(
    image="{celery_base_worker_image.id}",
    ports=fxtr("default_worker_ports"),
    environment=fxtr("default_worker_env"),
    network="{default_pytest_celery_network.name}",
    volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
    wrapper_class=CeleryWorkerContainer,
    timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT,
    command=fxtr("default_worker_command"),
)
Image

The image is built using the built-in Dockerfile and is provided to the container using the following fixture.

celery_base_worker_image = build(
    path=WORKER_DOCKERFILE_ROOTDIR,
    tag="pytest-celery/components/worker:default",
    buildargs={
        "CELERY_VERSION": fxtr("default_worker_celery_version"),
        "CELERY_LOG_LEVEL": fxtr("default_worker_celery_log_level"),
        "CELERY_WORKER_NAME": fxtr("default_worker_celery_worker_name"),
        "CELERY_WORKER_QUEUE": fxtr("default_worker_celery_worker_queue"),
    },
)
Environment

Environment variables are provided to the worker container during initialization using the Vendor Class.

The worker receives the broker and result backend configurations from the environment variables by default using the celery_worker_cluster_config fixture, which is initialized using celery_broker_cluster_config and celery_backend_cluster_config fixtures, to provide the worker with the broker and result backend configurations according to the configured broker and backend clusters.

Network

The worker will use the default network that will be created for each test case to allow the worker component to communicate with the other components.

The network isolation allows multiple setups to run in parallel without interfering with each other.

Volumes

The plugin provides a special volume that is designed to provide improved testing control over the worker component initialization and functionality.

To practically install the pytest-celery plugin inside the worker component, the worker container needs to be using the default volume.

default_worker_container = container(
    ...
    volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
    ...
)

This will use the following binding to mount the plugin volume into the worker container.

WORKER_VOLUME = {
    "bind": "/app",
    "mode": "rw",
}

Note

The default volume may be replaced or reconfigured if needed, by providing your own volume configuration dict to the worker container.

More volumes can be added to the worker container to accommodate more complex testing scenarios, or to provide additional configuration options to the worker component. For example, the current project can be added as a mounted volume alongside the default volume to provide the worker with the project code and configuration.

volumes={
    "{default_worker_volume.name}": defaults.DEFAULT_WORKER_VOLUME,
    os.path.abspath(os.getcwd()): {
        "bind": "/target/path/in/worker",
        "mode": "rw",
    },
},

Tip

When Debugging with VSCode, the bind value is what should be set for the remoteRoot in the launch.json configuration.

Wrapper Class

The wrapper_class is responsible for providing the configuration class that will be used to initialize the worker container instance.

The wrapper_class must be a subclass of CeleryWorkerContainer.

See more: Fixture wrappers.

Timeout

The timeout defines the time pytest will wait for the worker container to be ready before raising a timeout exception.

By default, the timeout is set to accommodate parallel test runs and to provide a reasonable time for the worker to be ready in most cases. Feel free to experiment and adjust the timeout according to your needs, or use DEFAULT_WORKER_CONTAINER_TIMEOUT to apply the default timeout.

Command

The command field allows to override the worker container CMD instruction instead of the CMD defined in the Dockerfile using the default_worker_command fixture.

If the CMD instruction is provided in the Dockerfile, the command field can be omitted.

Sequence Diagram

The following diagram describes the worker component initialization pipeline described above.

sequenceDiagram autonumber participant WCI as Worker Component Initialization participant CF as Container Fixture participant DF as Dockerfile participant EV as Environment Variables participant Net as Network participant Vol as Volumes participant PV as Plugin Volume participant TO as Timeout participant Cmd as Command participant WC as Wrapper Class participant CWCC as CeleryWorkerContainer Class participant NF as Node Fixture WCI->>CF: Initiates CF->>DF: Builds Image From CF->>EV: Sets CF->>Net: Connects to CF->>Vol: Mounts Vol->>PV: Includes CF->>TO: Sets CF->>Cmd: Sets CF->>WC: Manages with WC->>CWCC: Inherits from CF->>WCI: Create Worker Container WCI->>NF: Integrates the container into its node NF->>WCI: Node Ready, worker initialization completed

Configuration Pipeline

The worker uses the default_worker_initial_content fixture to provide the worker with the initial content that will be used to configure the worker component’s container volume.

@pytest.fixture
def default_worker_initial_content(
    ...
    default_worker_app_module: ModuleType,
    default_worker_utils_module: ModuleType,
    default_worker_tasks: set,
    default_worker_signals: set,
    default_worker_app: Celery,
    ...
) -> dict:

It uses the default worker fixtures to allow configuring every part of the volume using the standard pytest fixtures mechanism by itself without hooking into the default_worker_initial_content fixture directly.

The volume initialization integrates into the initialization pipeline by injecting worker configurations and files into the worker container to control the Celery app instance and provide enhanced testing capabilities.

sequenceDiagram autonumber participant WCI as Worker Component<br>Initialization participant CF as Container Fixture participant V as Volumes participant DCI as Default Configuration Injection participant WN as Worker Node WCI->>CF: Initializes Container CF->>V: Prepares Volumes V->>DCI: Injects<br>default_worker_app_module,<br>default_worker_utils_module,<br>default_worker_tasks,<br>default_worker_signals,<br>default_worker_app DCI->>CF: Finishes Volume<br>Configuration CF->>WN: Finalizes Worker<br>Container Initialization WN->>WCI: Add container to node,<br>Worker Component Initialization Completed

Custom Utility Functions

New in version 1.0.0.

To configure your own module, use the default_worker_utils_module fixture.

@pytest.fixture
def default_worker_utils_module() -> ModuleType:
    from tests import myutils

    return myutils

This will inject the myutils module into the worker component, instead of the default module, allowing you to access your own utility functions.

Warning

The module must provide all of the existing API in the utils.py module, otherwise the worker component will not function correctly (when based off of CeleryTestWorker).

For reference, the default utils.py module is defined as follows:

pytest_celery.vendors.worker.content.utils.py
"""The pytest-celery plugin provides a set of built-in components called
:ref:`vendors`.

This module is part of the :ref:`built-in-worker` vendor.
"""

from __future__ import annotations

import json

import psutil


def get_running_processes_info(columns: list[str] | None = None) -> str:
    """Get information about running processes using psutil."""
    if not columns:
        columns = [
            "pid",
            "name",
            "username",
            "cmdline",
            "cpu_percent",
            "memory_percent",
            "create_time",
        ]
    processes = [proc.info for proc in psutil.process_iter(columns)]
    return json.dumps(processes)

Tip

Check out the myutils example for a demonstration of how to use this feature.