Source code for pytest_kafka_broker

from dataclasses import dataclass
from importlib.metadata import version

from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient
from confluent_kafka.aio import AIOConsumer, AIOProducer

__version__ = version(__package__)
__all__ = ("KafkaBrokerContext",)

_doc = """{}

Parameters
----------
config
    Extra Kafka client configuration properties. See list in the
    `librdkafka documentation <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>`_.
"""


[docs] @dataclass class KafkaBrokerContext: """Information and convenience methods for a temporary Kafka cluster. This object is returned by :func:`kafka_broker`. """ bootstrap_server: str """Kafka bootstrap server in the form :samp:`{host}:{port}`."""
[docs] def config(self, config: dict | None = None) -> dict: return { **(config or {}), "bootstrap.servers": self.bootstrap_server, "security.protocol": "PLAINTEXT", }
[docs] def admin(self, config: dict | None = None) -> AdminClient: return AdminClient(self.config(config))
[docs] def producer(self, config: dict | None = None) -> Producer: return Producer(self.config(config))
[docs] def consumer(self, config: dict | None = None) -> Consumer: return Consumer(self.config(config))
[docs] def aio_producer(self, config: dict | None = None) -> AIOProducer: return AIOProducer(self.config(config))
[docs] def aio_consumer(self, config: dict | None = None) -> AIOConsumer: return AIOConsumer(self.config(config))
config.__doc__ = _doc.format("Get the configuration for a Kafka client.") admin.__doc__ = _doc.format("Create a Kafka admin client connected to the cluster.") producer.__doc__ = _doc.format("Create a Kafka producer connected to the cluster.") consumer.__doc__ = _doc.format("Create a Kafka consumer connected to the cluster.") aio_producer.__doc__ = _doc.format( "Create an asynchronous Kafka producer connected to the cluster." ) aio_consumer.__doc__ = _doc.format( "Create an asynchronous Kafka consumer connected to the cluster." )
del _doc