This document describes the current stable version of pytest_celery (1.0). For development docs, go here.

Source code for pytest_celery.api.worker

"""Worker components represents Celery's worker instances.

This module provides the base API for creating new worker components by
defining the base classes for worker nodes and clusters.
"""

from __future__ import annotations

import json

from celery import Celery

from pytest_celery.api.base import CeleryTestCluster
from pytest_celery.api.base import CeleryTestNode
from pytest_celery.api.container import CeleryTestContainer
from pytest_celery.vendors.worker.container import CeleryWorkerContainer


[docs] class CeleryTestWorker(CeleryTestNode): """This is specialized node type for handling celery worker nodes. It is used to encapsulate a worker instance. Responsibility Scope: Managing a celery worker. """ def __init__(self, container: CeleryTestContainer, app: Celery): """A celery worker node must be initialized with a celery app. Args: container (CeleryTestContainer): Container to use for the node. app (Celery, optional): Celery app to be accessed from the tests. """ super().__init__(container, app) # Helps with autocomplete in the IDE self.container: CeleryWorkerContainer @property def version(self) -> str: """Celery version of this worker node.""" return self.container.version() @property def log_level(self) -> str: """Celery log level of this worker node.""" return self.container.log_level() @property def worker_name(self) -> str: """Celery test worker node name.""" return self.container.worker_name() @property def worker_queue(self) -> str: """Celery queue for this worker node.""" return self.container.worker_queue()
[docs] def hostname(self) -> str: """Hostname of the worker node.""" return f"{self.worker_name}@{super().hostname()}"
[docs] def get_running_processes_info( self, columns: list[str] | None = None, filters: dict[str, str] | None = None, ) -> list[dict]: """Get running processes info on the container of this node. Possible columns: - pid - name - username - cmdline - cpu_percent - memory_percent - create_time Args: columns (list[str] | None, optional): Columns to query. Defaults to None (all). filters (dict[str, str] | None, optional): Filters to apply. Defaults to None. Raises: RuntimeError: If the command fails. Returns: list[dict]: List of processes info per requested columns. """ # Use special vendors/worker/content/utils.py module exit_code, output = self.container.exec_run( f'python -c "from utils import get_running_processes_info; print(get_running_processes_info({columns!r}))"' ) if exit_code != 0: raise RuntimeError(f"Failed to get processes info: {output}") decoded_str = output.decode("utf-8") output = json.loads(decoded_str) if filters: output = [item for item in output if all(item.get(key) == value for key, value in filters.items())] return output
[docs] class CeleryWorkerCluster(CeleryTestCluster): """This is a specialized cluster type for handling celery workers. It is used to define which worker instances are available for the test. Responsibility Scope: Provude useful methods for managing a cluster of celery workers. """ @property def versions(self) -> set[str]: """Celery versions of all workers in this cluster.""" return {worker.version for worker in self} # type: ignore