Source code for pyatlan.model.assets.kafka_topic

# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.


from __future__ import annotations

from typing import ClassVar, List, Optional

from pydantic.v1 import Field, validator

from pyatlan.model.enums import (
    AtlanConnectorType,
    KafkaTopicCleanupPolicy,
    KafkaTopicCompressionType,
)
from pyatlan.model.fields.atlan_fields import (
    BooleanField,
    KeywordField,
    NumericField,
    RelationField,
)
from pyatlan.utils import init_guid, validate_required_fields

from .kafka import Kafka


[docs] class KafkaTopic(Kafka): """Description""" @classmethod @init_guid def creator(cls, *, name: str, connection_qualified_name: str) -> KafkaTopic: validate_required_fields( ["name", "connection_qualified_name"], [name, connection_qualified_name] ) attributes = KafkaTopic.Attributes.creator( name=name, connection_qualified_name=connection_qualified_name ) return cls(attributes=attributes) type_name: str = Field(default="KafkaTopic", allow_mutation=False) @validator("type_name") def validate_type_name(cls, v): if v != "KafkaTopic": raise ValueError("must be KafkaTopic") return v def __setattr__(self, name, value): if name in KafkaTopic._convenience_properties: return object.__setattr__(self, name, value) super().__setattr__(name, value) KAFKA_TOPIC_IS_INTERNAL: ClassVar[BooleanField] = BooleanField( "kafkaTopicIsInternal", "kafkaTopicIsInternal" ) """ Whether this topic is an internal topic (true) or not (false). """ KAFKA_TOPIC_COMPRESSION_TYPE: ClassVar[KeywordField] = KeywordField( "kafkaTopicCompressionType", "kafkaTopicCompressionType" ) """ Type of compression used for this topic. """ KAFKA_TOPIC_REPLICATION_FACTOR: ClassVar[NumericField] = NumericField( "kafkaTopicReplicationFactor", "kafkaTopicReplicationFactor" ) """ Replication factor for this topic. """ KAFKA_TOPIC_SEGMENT_BYTES: ClassVar[NumericField] = NumericField( "kafkaTopicSegmentBytes", "kafkaTopicSegmentBytes" ) """ Segment size for this topic. """ KAFKA_TOPIC_RETENTION_TIME_IN_MS: ClassVar[NumericField] = NumericField( "kafkaTopicRetentionTimeInMs", "kafkaTopicRetentionTimeInMs" ) """ Amount of time messages will be retained in this topic, in milliseconds. """ KAFKA_TOPIC_PARTITIONS_COUNT: ClassVar[NumericField] = NumericField( "kafkaTopicPartitionsCount", "kafkaTopicPartitionsCount" ) """ Number of partitions for this topic. """ KAFKA_TOPIC_SIZE_IN_BYTES: ClassVar[NumericField] = NumericField( "kafkaTopicSizeInBytes", "kafkaTopicSizeInBytes" ) """ Size of this topic, in bytes. """ KAFKA_TOPIC_RECORD_COUNT: ClassVar[NumericField] = NumericField( "kafkaTopicRecordCount", "kafkaTopicRecordCount" ) """ Number of (unexpired) messages in this topic. """ KAFKA_TOPIC_CLEANUP_POLICY: ClassVar[KeywordField] = KeywordField( "kafkaTopicCleanupPolicy", "kafkaTopicCleanupPolicy" ) """ Cleanup policy for this topic. """ KAFKA_TOPIC_LOG_CLEANUP_POLICY: ClassVar[KeywordField] = KeywordField( "kafkaTopicLogCleanupPolicy", "kafkaTopicLogCleanupPolicy" ) """ Comma seperated Cleanup policy for this topic. """ KAFKA_CONSUMER_GROUPS: ClassVar[RelationField] = RelationField( "kafkaConsumerGroups" ) """ TBC """ _convenience_properties: ClassVar[List[str]] = [ "kafka_topic_is_internal", "kafka_topic_compression_type", "kafka_topic_replication_factor", "kafka_topic_segment_bytes", "kafka_topic_retention_time_in_ms", "kafka_topic_partitions_count", "kafka_topic_size_in_bytes", "kafka_topic_record_count", "kafka_topic_cleanup_policy", "kafka_topic_log_cleanup_policy", "kafka_consumer_groups", ] @property def kafka_topic_is_internal(self) -> Optional[bool]: return ( None if self.attributes is None else self.attributes.kafka_topic_is_internal ) @kafka_topic_is_internal.setter def kafka_topic_is_internal(self, kafka_topic_is_internal: Optional[bool]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.kafka_topic_is_internal = kafka_topic_is_internal @property def kafka_topic_compression_type(self) -> Optional[KafkaTopicCompressionType]: return ( None if self.attributes is None else self.attributes.kafka_topic_compression_type ) @kafka_topic_compression_type.setter def kafka_topic_compression_type( self, kafka_topic_compression_type: Optional[KafkaTopicCompressionType] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.kafka_topic_compression_type = kafka_topic_compression_type @property def kafka_topic_replication_factor(self) -> Optional[int]: return ( None if self.attributes is None else self.attributes.kafka_topic_replication_factor ) @kafka_topic_replication_factor.setter def kafka_topic_replication_factor( self, kafka_topic_replication_factor: Optional[int] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.kafka_topic_replication_factor = kafka_topic_replication_factor @property def kafka_topic_segment_bytes(self) -> Optional[int]: return ( None if self.attributes is None else self.attributes.kafka_topic_segment_bytes ) @kafka_topic_segment_bytes.setter def kafka_topic_segment_bytes(self, kafka_topic_segment_bytes: Optional[int]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.kafka_topic_segment_bytes = kafka_topic_segment_bytes @property def kafka_topic_retention_time_in_ms(self) -> Optional[int]: return ( None if self.attributes is None else self.attributes.kafka_topic_retention_time_in_ms ) @kafka_topic_retention_time_in_ms.setter def kafka_topic_retention_time_in_ms( self, kafka_topic_retention_time_in_ms: Optional[int] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.kafka_topic_retention_time_in_ms = ( kafka_topic_retention_time_in_ms ) @property def kafka_topic_partitions_count(self) -> Optional[int]: return ( None if self.attributes is None else self.attributes.kafka_topic_partitions_count ) @kafka_topic_partitions_count.setter def kafka_topic_partitions_count(self, kafka_topic_partitions_count: Optional[int]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.kafka_topic_partitions_count = kafka_topic_partitions_count @property def kafka_topic_size_in_bytes(self) -> Optional[int]: return ( None if self.attributes is None else self.attributes.kafka_topic_size_in_bytes ) @kafka_topic_size_in_bytes.setter def kafka_topic_size_in_bytes(self, kafka_topic_size_in_bytes: Optional[int]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.kafka_topic_size_in_bytes = kafka_topic_size_in_bytes @property def kafka_topic_record_count(self) -> Optional[int]: return ( None if self.attributes is None else self.attributes.kafka_topic_record_count ) @kafka_topic_record_count.setter def kafka_topic_record_count(self, kafka_topic_record_count: Optional[int]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.kafka_topic_record_count = kafka_topic_record_count @property def kafka_topic_cleanup_policy(self) -> Optional[KafkaTopicCleanupPolicy]: return ( None if self.attributes is None else self.attributes.kafka_topic_cleanup_policy ) @kafka_topic_cleanup_policy.setter def kafka_topic_cleanup_policy( self, kafka_topic_cleanup_policy: Optional[KafkaTopicCleanupPolicy] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.kafka_topic_cleanup_policy = kafka_topic_cleanup_policy @property def kafka_topic_log_cleanup_policy(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.kafka_topic_log_cleanup_policy ) @kafka_topic_log_cleanup_policy.setter def kafka_topic_log_cleanup_policy( self, kafka_topic_log_cleanup_policy: Optional[str] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.kafka_topic_log_cleanup_policy = kafka_topic_log_cleanup_policy @property def kafka_consumer_groups(self) -> Optional[List[KafkaConsumerGroup]]: return ( None if self.attributes is None else self.attributes.kafka_consumer_groups ) @kafka_consumer_groups.setter def kafka_consumer_groups( self, kafka_consumer_groups: Optional[List[KafkaConsumerGroup]] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.kafka_consumer_groups = kafka_consumer_groups class Attributes(Kafka.Attributes): kafka_topic_is_internal: Optional[bool] = Field(default=None, description="") kafka_topic_compression_type: Optional[KafkaTopicCompressionType] = Field( default=None, description="" ) kafka_topic_replication_factor: Optional[int] = Field( default=None, description="" ) kafka_topic_segment_bytes: Optional[int] = Field(default=None, description="") kafka_topic_retention_time_in_ms: Optional[int] = Field( default=None, description="" ) kafka_topic_partitions_count: Optional[int] = Field( default=None, description="" ) kafka_topic_size_in_bytes: Optional[int] = Field(default=None, description="") kafka_topic_record_count: Optional[int] = Field(default=None, description="") kafka_topic_cleanup_policy: Optional[KafkaTopicCleanupPolicy] = Field( default=None, description="" ) kafka_topic_log_cleanup_policy: Optional[str] = Field( default=None, description="" ) kafka_consumer_groups: Optional[List[KafkaConsumerGroup]] = Field( default=None, description="" ) # relationship @classmethod @init_guid def creator( cls, *, name: str, connection_qualified_name: str ) -> KafkaTopic.Attributes: validate_required_fields( ["name", "connection_qualified_name"], [name, connection_qualified_name] ) return KafkaTopic.Attributes( name=name, qualified_name=f"{connection_qualified_name}/topic/{name}", connection_qualified_name=connection_qualified_name, connector_name=AtlanConnectorType.get_connector_name( connection_qualified_name ), ) attributes: KafkaTopic.Attributes = Field( default_factory=lambda: KafkaTopic.Attributes(), description=( "Map of attributes in the instance and their values. " "The specific keys of this map will vary by type, " "so are described in the sub-types of this schema." ), )
from .kafka_consumer_group import KafkaConsumerGroup # noqa: E402, F401 KafkaTopic.Attributes.update_forward_refs()