pytest-kafka-broker documentation#
This is a pytest plugin to run a temporary, local, single-broker Kafka cluster.
You must have Java installed. Other than that, there are no dependencies. The plugin will download and cache Kafka and start and stop it automatically when you use the fixture.
Example#
import pytest
TOPIC = "topic"
PAYLOAD = b"hello world"
GROUP_ID = "group_id"
def test_sync(kafka_broker):
"""Demonstrate using the kafka_broker fixture in an ordinary test."""
with kafka_broker.producer() as producer:
producer.produce(TOPIC, PAYLOAD)
with kafka_broker.consumer(
{"group.id": GROUP_ID, "auto.offset.reset": "earliest"}
) as consumer:
consumer.subscribe([TOPIC])
(message,) = consumer.consume()
assert message.value() == PAYLOAD
with kafka_broker.admin() as admin:
assert TOPIC in admin.list_topics().topics
@pytest.mark.asyncio
async def test_async(kafka_broker):
"""Demonstrate using the kafka_broker fixture in an async test."""
async with kafka_broker.aio_producer() as producer:
await producer.produce(TOPIC, PAYLOAD)
async with kafka_broker.aio_consumer(
{"group.id": GROUP_ID, "auto.offset.reset": "earliest"}
) as consumer:
await consumer.subscribe([TOPIC])
(message,) = await consumer.consume()
assert message.value() == PAYLOAD
with kafka_broker.admin() as admin:
assert TOPIC in admin.list_topics().topics
Reference#
pytest_kafka_broker Package#
Classes#
|
Information and convenience methods for a temporary Kafka cluster. |
Known Issues on Windows#
This plugin includes some workarounds for a few quirks on Windows that would otherwise make it difficult to run a Kafka broker:
Long values of
CLASSPATHrun up against command-line length limits. As a workaround, the plugin temporarily maps theKAFKA_HOMEdirectory to a drive letter.The Windows version of the kafka-server-start script uses the wmic command to determine an appropriate value ofr the Java heap size, but this command was removed in recent versions of Windows. This is a known bug in Kafka. As a workaround, the plugin set the environment variable
KAFKA_HEAP_OPTSto an explicit value.