pytest-kafka-broker documentation#

This is a pytest plugin to run a temporary, local, single-broker Kafka cluster.

You must have Java installed. Other than that, there are no dependencies. The plugin will download and cache Kafka and start and stop it automatically when you use the fixture.

Example#

import pytest

TOPIC = "topic"
PAYLOAD = b"hello world"
GROUP_ID = "group_id"


def test_sync(kafka_broker):
    """Demonstrate using the kafka_broker fixture in an ordinary test."""
    with kafka_broker.producer() as producer:
        producer.produce(TOPIC, PAYLOAD)

    with kafka_broker.consumer(
        {"group.id": GROUP_ID, "auto.offset.reset": "earliest"}
    ) as consumer:
        consumer.subscribe([TOPIC])
        (message,) = consumer.consume()
        assert message.value() == PAYLOAD

    with kafka_broker.admin() as admin:
        assert TOPIC in admin.list_topics().topics


@pytest.mark.asyncio
async def test_async(kafka_broker):
    """Demonstrate using the kafka_broker fixture in an async test."""
    async with kafka_broker.aio_producer() as producer:
        await producer.produce(TOPIC, PAYLOAD)

    async with kafka_broker.aio_consumer(
        {"group.id": GROUP_ID, "auto.offset.reset": "earliest"}
    ) as consumer:
        await consumer.subscribe([TOPIC])
        (message,) = await consumer.consume()
    assert message.value() == PAYLOAD

    with kafka_broker.admin() as admin:
        assert TOPIC in admin.list_topics().topics

Reference#

pytest_kafka_broker Package#

Classes#

KafkaBrokerContext(bootstrap_server)

Information and convenience methods for a temporary Kafka cluster.

Known Issues on Windows#

This plugin includes some workarounds for a few quirks on Windows that would otherwise make it difficult to run a Kafka broker:

  • Long values of CLASSPATH run up against command-line length limits. As a workaround, the plugin temporarily maps the KAFKA_HOME directory to a drive letter.

  • The Windows version of the kafka-server-start script uses the wmic command to determine an appropriate value ofr the Java heap size, but this command was removed in recent versions of Windows. This is a known bug in Kafka. As a workaround, the plugin set the environment variable KAFKA_HEAP_OPTS to an explicit value.