This document describes the current stable version of pytest_celery (1.0). For development docs, go here.

Vendors

Release:

1.0

Date:

Apr 29, 2024

Built-in Brokers and Backends

The plugin comes with support for several brokers and backends out of the box. This page lists the supported vendors and their status.

Brokers

Name

Status

Enabled

RabbitMQ

Stable

Yes

Redis

Stable

Yes

Backends

Name

Status

Enabled

Redis

Stable

Yes

Memcache

Experimental

No

Experimental brokers may be functional but are not confirmed to be production ready.

Enabled means that it is automatically added to the test Test Setup Matrix when running the test suite if the vendor dependencies are installed.

Warning

Enabling a new vendor will automatically add it globally to every test suite that relies on the default vendors detection. Be careful when enabling new vendors and make sure they are stable and production ready.

Built-in Celery Worker

The plugin provides a built-in Celery worker that can be used to run tests against. It uses the latest stable version of Celery and can be used, replaced or extended by the user.

The Dockerfile is published with the source code and can be found using WORKER_DOCKERFILE_ROOTDIR.

pytest_celery.vendors.worker.Dockerfile
FROM python:3.10-slim-buster

# Create a user to run the worker
RUN adduser --disabled-password --gecos "" test_user

# Install system dependencies
RUN apt-get update && apt-get install -y build-essential \
    git \
    wget \
    make \
    curl \
    apt-utils \
    debconf \
    lsb-release \
    libmemcached-dev \
    libffi-dev \
    ca-certificates \
    pypy3 \
    pypy3-lib \
    sudo

# Set arguments
ARG CELERY_VERSION=""
ARG CELERY_LOG_LEVEL=INFO
ARG CELERY_WORKER_NAME=celery_test_worker
ARG CELERY_WORKER_QUEUE=celery
ENV WORKER_VERSION=$CELERY_VERSION
ENV LOG_LEVEL=$CELERY_LOG_LEVEL
ENV WORKER_NAME=$CELERY_WORKER_NAME
ENV WORKER_QUEUE=$CELERY_WORKER_QUEUE

ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1

EXPOSE 5678

# Install Python dependencies
RUN pip install --no-cache-dir --upgrade \
    pip \
    celery[redis,pymemcache,gevent]${WORKER_VERSION:+==$WORKER_VERSION} \
    pytest-celery@git+https://github.com/celery/pytest-celery.git

# The workdir must be /app
WORKDIR /app

COPY content/ .

# Switch to the test_user
USER test_user

# Start the celery worker
CMD celery -A app worker --loglevel=$LOG_LEVEL -n $WORKER_NAME@%h -Q $WORKER_QUEUE

Custom Vendors

Injected brokers, backends and workers can extend the built-in ones or provide completely new ones. The plugin provides a set of base classes that can be used to implement custom vendors.

Custom Broker

class pytest_celery.api.broker.CeleryTestBroker(container: CeleryTestContainer, app: Celery = None)[source]

Bases: CeleryTestNode

This is specialized node type for handling celery brokers nodes. It is used to encapsulate a broker instance.

Responsibility Scope:

Handling broker specific requirements and configuration.

property app: Celery

Celery app for the node if available.

assert_log_does_not_exist(log: str, message: str = '', timeout: int = 1) None

Assert that a log does not exist in the container.

Parameters:
  • log (str) – Log to assert.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to 1.

assert_log_exists(log: str, message: str = '', timeout: int = 60) None

Assert that a log exists in the container.

Parameters:
  • log (str) – Log to assert.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to RESULT_TIMEOUT.

config(*args: tuple, **kwargs: dict) dict

Compile the configurations required for Celery from this node.

property container: CeleryTestContainer

Underlying container for the node.

classmethod default_config() dict[source]

Default node configurations if not overridden by the user.

hostname() str

Get the hostname of this node.

kill(signal: str | int = 'SIGKILL', reload_container: bool = True) None

Kill the underlying container.

Parameters:
  • signal (str | int, optional) – Signal to send to the container. Defaults to “SIGKILL”.

  • reload_container (bool, optional) – Reload the container object after killing it. Defaults to True.

logs() str

Get the logs of the underlying container.

name() str

Get the name of this node.

ready() bool

Waits until the node is ready or raise an exception if it fails to boot up.

restart(reload_container: bool = True, force: bool = False) None[source]

Override restart method to update the app broker url with new container values.

teardown() None

Teardown the node.

wait_for_log(log: str, message: str = '', timeout: int = 60) None

Wait for a log to appear in the container.

Parameters:
  • log (str) – Log to wait for.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to RESULT_TIMEOUT.

Custom Backend

class pytest_celery.api.backend.CeleryTestBackend(container: CeleryTestContainer, app: Celery = None)[source]

Bases: CeleryTestNode

This is specialized node type for handling celery backends nodes. It is used to encapsulate a backend instance.

Responsibility Scope:

Handling backend specific requirements and configuration.

property app: Celery

Celery app for the node if available.

assert_log_does_not_exist(log: str, message: str = '', timeout: int = 1) None

Assert that a log does not exist in the container.

Parameters:
  • log (str) – Log to assert.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to 1.

assert_log_exists(log: str, message: str = '', timeout: int = 60) None

Assert that a log exists in the container.

Parameters:
  • log (str) – Log to assert.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to RESULT_TIMEOUT.

config(*args: tuple, **kwargs: dict) dict

Compile the configurations required for Celery from this node.

property container: CeleryTestContainer

Underlying container for the node.

classmethod default_config() dict[source]

Default node configurations if not overridden by the user.

hostname() str

Get the hostname of this node.

kill(signal: str | int = 'SIGKILL', reload_container: bool = True) None

Kill the underlying container.

Parameters:
  • signal (str | int, optional) – Signal to send to the container. Defaults to “SIGKILL”.

  • reload_container (bool, optional) – Reload the container object after killing it. Defaults to True.

logs() str

Get the logs of the underlying container.

name() str

Get the name of this node.

ready() bool

Waits until the node is ready or raise an exception if it fails to boot up.

restart(reload_container: bool = True, force: bool = False) None[source]

Override restart method to update the app result backend with new container values.

teardown() None

Teardown the node.

wait_for_log(log: str, message: str = '', timeout: int = 60) None

Wait for a log to appear in the container.

Parameters:
  • log (str) – Log to wait for.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to RESULT_TIMEOUT.

Custom Worker

class pytest_celery.api.worker.CeleryTestWorker(container: CeleryTestContainer, app: Celery)[source]

Bases: CeleryTestNode

This is specialized node type for handling celery worker nodes. It is used to encapsulate a worker instance.

Responsibility Scope:

Managing a celery worker.

property app: Celery

Celery app for the node if available.

assert_log_does_not_exist(log: str, message: str = '', timeout: int = 1) None

Assert that a log does not exist in the container.

Parameters:
  • log (str) – Log to assert.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to 1.

assert_log_exists(log: str, message: str = '', timeout: int = 60) None

Assert that a log exists in the container.

Parameters:
  • log (str) – Log to assert.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to RESULT_TIMEOUT.

config(*args: tuple, **kwargs: dict) dict

Compile the configurations required for Celery from this node.

property container: CeleryTestContainer

Underlying container for the node.

classmethod default_config() dict

Default node configurations if not overridden by the user.

get_running_processes_info(columns: list[str] | None = None, filters: dict[str, str] | None = None) list[dict][source]

Get running processes info on the container of this node.

Possible columns:
  • pid

  • name

  • username

  • cmdline

  • cpu_percent

  • memory_percent

  • create_time

Parameters:
  • columns (list[str] | None, optional) – Columns to query. Defaults to None (all).

  • filters (dict[str, str] | None, optional) – Filters to apply. Defaults to None.

Raises:

RuntimeError – If the command fails.

Returns:

List of processes info per requested columns.

Return type:

list[dict]

hostname() str[source]

Hostname of the worker node.

kill(signal: str | int = 'SIGKILL', reload_container: bool = True) None

Kill the underlying container.

Parameters:
  • signal (str | int, optional) – Signal to send to the container. Defaults to “SIGKILL”.

  • reload_container (bool, optional) – Reload the container object after killing it. Defaults to True.

property log_level: str

Celery log level of this worker node.

logs() str

Get the logs of the underlying container.

name() str

Get the name of this node.

ready() bool

Waits until the node is ready or raise an exception if it fails to boot up.

restart(reload_container: bool = True, force: bool = False) None

Restart the underlying container.

Parameters:
  • reload_container (bool, optional) – Reload the container object after restarting it. Defaults to True.

  • force (bool, optional) – Kill the container before restarting it. Defaults to False.

teardown() None

Teardown the node.

property version: str

Celery version of this worker node.

wait_for_log(log: str, message: str = '', timeout: int = 60) None

Wait for a log to appear in the container.

Parameters:
  • log (str) – Log to wait for.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to RESULT_TIMEOUT.

property worker_name: str

Celery test worker node name.

property worker_queue: str

Celery queue for this worker node.

Vendor Class

The Vendor Class is an optional mechanism for OOP style configuration of the plugin’s vendors. It allows registering a class that defines how does the vendor behave and configured.

The vendor class represents the vendor’s container class that is used automatically by the plugin.

The following diagram shows the relationship between the vendor class and the vendor’s infrastructure.

graph TD; Vendor[Vendor] --> BrokerComponent[Broker Component] Vendor --> BackendComponent[Backend Component] Vendor --> WorkerComponent[Worker Component] BrokerComponent --> Comp BackendComponent --> Comp WorkerComponent --> Comp subgraph Comp["Component"] Node[Node] --> Container[Container] end Comp --> DefaultFixtures[Default Fixtures] Comp --> VendorClass[Vendor Class] VendorClass -. "You are here" .-> VendorClass DefaultFixtures <.-> VendorClass style Vendor fill:#f9f,stroke:#333,stroke-width:4px style Comp fill:#ddf,stroke:#333,stroke-width:2px style Node fill:#eeffdd,stroke:#333 style Container fill:#ffffee,stroke:#333 style VendorClass fill:#ffeedd,stroke:#333

Use Cases

Warning

It is used only to override the built-in vendors containers.

Registering a Vendor Class

The plugin uses the vendor class to implement the default fixtures of the vendor. To override it, create your own vendor class and subclass the matching built-in vendor class to include the built-in fixtures implementation.

Worker Example
class MyWorkerContainer(CeleryWorkerContainer):
    @property
    def client(self) -> Any:
        return self

    @classmethod
    def version(cls) -> str:
        return celery.__version__

    @classmethod
    def log_level(cls) -> str:
        return "INFO"

    @classmethod
    def worker_name(cls) -> str:
        return "my_tests_worker"

    @classmethod
    def worker_queue(cls) -> str:
        return "my_tests_queue"

    def post_initialization_logic(self) -> None:
        pass

And then, register it using the matching default fixture.

@pytest.fixture
def default_worker_container_cls() -> Type[CeleryWorkerContainer]:
    return MyWorkerContainer

Warning

The worker vendor requires another fixture to be registered to allow configuring the worker before it gets built.

@pytest.fixture(scope="session")
def default_worker_container_session_cls() -> Type[CeleryWorkerContainer]:
    return MyWorkerContainer

There’s no session vendor class for other vendors.

Accessing the Vendor Class

Once a vendor class has been registered, it can be accessed using the Test Setup. Any additional API added to the class can be accessed as well.

For example,

def test_accessing_post_initialization_logic(celery_setup: CeleryTestSetup):
    worker: MyWorkerContainer = celery_setup.worker
    worker.post_initialization_logic()