This document describes the current stable version of pytest_celery (1.0). For development docs, go here.

worker_pool

Release:

1.0

Date:

May 16, 2024

Description

This example project demonstrates how to use a different worker pool. The example uses two different methods to run the Celery worker with different pools.

The following guide will explain each method and how they are used.

Tip

See first the myworker example before continuing with this one.

Breakdown

File Structure

The following diagram lists the relevant files in the project.

rabbitmq_management/
├── tests/
│   ├── __init__.py
│   └── test_gevent_pool.py
│   └── test_solo_pool.py
└── Dockerfile
└── tasks.py
└── requirements.txt

Dockerfile

To use the gevent pool, we create our own image using a similar Dockerfile to the one in the myworker example. The purpose of this worker is to ensure the gevent dependency is installed.

examples.worker_pool.Dockerfile
FROM python:3.11-bookworm

# Create a user to run the worker
RUN adduser --disabled-password --gecos "" test_user

# Install system dependencies
RUN apt-get update && apt-get install -y build-essential git libevent-dev

# Set arguments
ARG CELERY_LOG_LEVEL=INFO
ARG CELERY_WORKER_NAME=my_worker
ARG CELERY_WORKER_QUEUE=celery
ENV LOG_LEVEL=$CELERY_LOG_LEVEL
ENV WORKER_NAME=$CELERY_WORKER_NAME
ENV WORKER_QUEUE=$CELERY_WORKER_QUEUE

EXPOSE 5678

# Install packages
COPY --chown=test_user:test_user requirements.txt .
RUN pip install --no-cache-dir --upgrade pip
RUN pip install -r ./requirements.txt

# The workdir must be /app
WORKDIR /app

# Switch to the test_user
USER test_user

# Start the celery worker
CMD celery -A app worker --loglevel=$LOG_LEVEL -n $WORKER_NAME@%h -Q $WORKER_QUEUE
examples.worker_pool.requirements.txt
pytest>=7.4.4
pytest-xdist>=3.5.0
pytest-subtests>=0.11.0
pytest-rerunfailures>=14.0
celery[gevent]
pytest-celery[all]@git+https://github.com/celery/pytest-celery.git

tasks.py

Our tasks module is using the example task from the Celery gevent example.

examples.worker_pool.tasks.py
# Based on https://github.com/celery/celery/blob/main/examples/gevent/tasks.py

import requests
from celery import shared_task


@shared_task(ignore_result=True)
def urlopen(url):
    print(f"Opening: {url}")
    try:
        requests.get(url)
    except requests.exceptions.RequestException as exc:
        print(f"Exception for {url}: {exc!r}")
        return url, 0
    print(f"Done with: {url}")
    return url, 1

test_gevent_pool.py

To add a new gevent worker, we create a new CeleryWorkerContainer to configure the worker with the gevent pool.

examples.worker_pool.tests.test_gevent_pool.py
from __future__ import annotations

import pytest
import tasks
from celery import Celery
from celery.canvas import Signature
from celery.canvas import group
from celery.result import AsyncResult
from pytest_docker_tools import build
from pytest_docker_tools import container
from pytest_docker_tools import fxtr

from pytest_celery import RESULT_TIMEOUT
from pytest_celery import CeleryTestSetup
from pytest_celery import CeleryTestWorker
from pytest_celery import CeleryWorkerCluster
from pytest_celery import CeleryWorkerContainer
from pytest_celery import defaults
from pytest_celery import ping


class GeventWorkerContainer(CeleryWorkerContainer):
    @classmethod
    def command(cls, *args: str) -> list[str]:
        return super().command("-P", "gevent", "-c", "1000")


gevent_worker_image = build(
    path=".",
    dockerfile="Dockerfile",
    tag="pytest-celery/examples/worker_pool:gevent",
    buildargs=GeventWorkerContainer.buildargs(),
)


gevent_worker_container = container(
    image="{gevent_worker_image.id}",
    ports=fxtr("default_worker_ports"),
    environment=fxtr("default_worker_env"),
    network="{default_pytest_celery_network.name}",
    volumes={"{default_worker_volume.name}": defaults.DEFAULT_WORKER_VOLUME},
    wrapper_class=GeventWorkerContainer,
    timeout=defaults.DEFAULT_WORKER_CONTAINER_TIMEOUT,
    command=GeventWorkerContainer.command(),
)


@pytest.fixture
def gevent_worker(gevent_worker_container: GeventWorkerContainer, celery_setup_app: Celery) -> CeleryTestWorker:
    worker = CeleryTestWorker(gevent_worker_container, app=celery_setup_app)
    yield worker
    worker.teardown()


@pytest.fixture
def celery_worker_cluster(gevent_worker: CeleryTestWorker) -> CeleryWorkerCluster:
    cluster = CeleryWorkerCluster(gevent_worker)
    yield cluster
    cluster.teardown()


@pytest.fixture
def default_worker_tasks(default_worker_tasks: set) -> set:
    default_worker_tasks.add(tasks)
    return default_worker_tasks


And then we can just use it in our tests.

examples.worker_pool.tests.test_gevent_pool.py


class TestGeventPool:
    def test_celery_banner(self, gevent_worker: CeleryTestWorker):
        gevent_worker.assert_log_exists("concurrency: 1000 (gevent)")

    def test_ping(self, celery_setup: CeleryTestSetup):
        sig: Signature = ping.s()
        res: AsyncResult = sig.apply_async()
        assert res.get(timeout=RESULT_TIMEOUT) == "pong"

    def test_celery_gevent_example(self, celery_setup: CeleryTestSetup):
        """Based on https://github.com/celery/celery/tree/main/examples/gevent"""
        LIST_OF_URLS = [
            "https://github.com/celery",
            "https://github.com/celery/celery",
            "https://github.com/celery/pytest-celery",
        ]
        group(tasks.urlopen.s(url) for url in LIST_OF_URLS).apply_async()
        celery_setup.worker.assert_log_does_not_exist("Exception for")

test_solo_pool.py

The solo pool example on the other hand, reconfigures the default Built-in Celery Worker as it does not require any additional dependencies.

examples.worker_pool.tests.test_solo_pool.py
from __future__ import annotations

import pytest
from celery.canvas import Signature
from celery.result import AsyncResult

from pytest_celery import RESULT_TIMEOUT
from pytest_celery import CeleryTestSetup
from pytest_celery import CeleryTestWorker
from pytest_celery import CeleryWorkerContainer
from pytest_celery import ping


class SoloPoolWorker(CeleryWorkerContainer):
    @classmethod
    def command(cls, *args: str) -> list[str]:
        return super().command("-P", "solo")


@pytest.fixture
def default_worker_container_cls() -> type[CeleryWorkerContainer]:
    return SoloPoolWorker


@pytest.fixture(scope="session")
def default_worker_container_session_cls() -> type[CeleryWorkerContainer]:
    return SoloPoolWorker


class TestSoloPool:
    def test_celery_banner(self, celery_worker: CeleryTestWorker):
        celery_worker.assert_log_exists("solo")

    def test_ping(self, celery_setup: CeleryTestSetup):
        sig: Signature = ping.s()
        res: AsyncResult = sig.apply_async()
        assert res.get(timeout=RESULT_TIMEOUT) == "pong"