This document describes the current stable version of pytest_celery (1.0). For development docs, go here.

rabbitmq_management

Release:

1.0

Date:

May 16, 2024

Description

This example project demonstrates how to reconfigure the default rabbitmq broker to use the rabbitmq:management image. This is very useful for debugging the broker during development but is not recommended for production use unless you know what you’re doing.

Breakdown

File Structure

The following diagram lists the relevant files in the project.

rabbitmq_management/
├── tests/
│   ├── __init__.py
│   ├── conftest.py
│   └── test_management_broker.py
└── requirements.txt

conftest.py

To reconfigure the default rabbitmq broker container, we override the following fixtures with our desired values.

First, let’s pick our docker image label.

@pytest.fixture
def default_rabbitmq_broker_image() -> str:
    return "rabbitmq:management"

Then, we need to expose the management UI port.

@pytest.fixture
def default_rabbitmq_broker_ports() -> dict:
    # Expose the management UI port
    ports = RABBITMQ_PORTS.copy()
    ports.update({"15672/tcp": None})
    return ports

The RabbitMQTestBroker class is used to represent a logical broker in the testing environment, so we create our own subclass to represent the management broker. We add our custom logic to get the REST API URL, which we can later on access from our tests automatically.

class RabbitMQManagementTestBroker(RabbitMQTestBroker):
    def get_management_url(self) -> str:
        ip = self.container.attrs["NetworkSettings"]["Ports"]["15672/tcp"][0]["HostIp"]
        port = self.container.attrs["NetworkSettings"]["Ports"]["15672/tcp"][0]["HostPort"]
        return f"http://{ip}:{port}"

We need to override the rabbitmq broker fixture to apply our custom class. This allows us to extend the broker API with our own implementation.

@pytest.fixture
def celery_rabbitmq_broker(default_rabbitmq_broker: RabbitMQContainer) -> RabbitMQTestBroker:
    broker = RabbitMQManagementTestBroker(default_rabbitmq_broker)
    yield broker
    broker.teardown()

The celery_broker fixture is a special parameterized fixture that provides all of the available broker fixtures. By default celery_broker_cluster uses celery_broker. For the sake of the example, we’ve overridden it to use only one broker, our own custom rabbitmq management broker.

@pytest.fixture
def celery_broker_cluster(celery_rabbitmq_broker: RabbitMQTestBroker) -> CeleryBrokerCluster:
    cluster = CeleryBrokerCluster(celery_rabbitmq_broker)
    yield cluster
    cluster.teardown()

test_management_broker.py

In the first test, we don’t even load a full setup, we just use the broker alone using the default fixture. We test that we can access the management API using the default credentials.

def test_login_to_broker_alone(celery_rabbitmq_broker: RabbitMQManagementTestBroker):
    api = celery_rabbitmq_broker.get_management_url() + "/api/whoami"
    response = requests.get(api, auth=HTTPBasicAuth("guest", "guest"))
    assert response.status_code == 200
    assert response.json()["name"] == "guest"
    assert response.json()["tags"] == ["administrator"]

Note

Calling celery_rabbitmq_broker.get_management_url() during debug and opening the link in your browser allows you to see the RabbitMQ management UI for the tested broker.

In the second test, we load a full setup, and test that the broker is indeed the one we configured.

def test_broker_in_setup(celery_setup: CeleryTestSetup):
    assert isinstance(celery_setup.broker, RabbitMQManagementTestBroker)
    api = celery_setup.broker.get_management_url() + "/api/queues"
    response = requests.get(api, auth=HTTPBasicAuth("guest", "guest"))
    assert response.status_code == 200
    res = response.json()
    assert isinstance(res, list)
    assert len(list(filter(lambda queues: celery_setup.worker.hostname() in queues["name"], res))) == 1