Source code for pyatlan.model.assets.azure_event_hub_consumer_group
# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
from __future__ import annotations
from typing import ClassVar, List
from pydantic.v1 import Field, validator
from pyatlan.model.enums import AtlanConnectorType
from pyatlan.utils import init_guid, validate_required_fields
from .azure_event_hub import AzureEventHub
from .kafka_consumer_group import KafkaConsumerGroup
[docs]
class AzureEventHubConsumerGroup(KafkaConsumerGroup):
"""Description"""
@classmethod
@init_guid
def creator(
cls,
*,
name: str,
event_hub_qualified_names: List[str],
) -> AzureEventHubConsumerGroup:
validate_required_fields(
["name", "event_hub_qualified_names"],
[name, event_hub_qualified_names],
)
event_hubs = []
for event_hub_qn in event_hub_qualified_names:
connection_qn, connector_name = AtlanConnectorType.get_connector_name(
event_hub_qn, "event_hub_qualified_names", 5
)
event_hubs.append(AzureEventHub.ref_by_qualified_name(event_hub_qn))
# Following a similar approach to construct the qualified name:
# https://github.com/atlanhq/marketplace-packages/blob/master/packages/atlan/azure-event-hub/transformers/eh-consumer-group.jinja2#L9
first_event_hub_name = event_hub_qualified_names[0].split("/")[4]
attributes = AzureEventHubConsumerGroup.Attributes(
name=name,
connector_name=connector_name,
connection_qualified_name=connection_qn,
kafka_topics=event_hubs, # type:ignore[arg-type]
kafka_topic_qualified_names=set(event_hub_qualified_names),
qualified_name=f"{connection_qn}/consumer-group/{first_event_hub_name}/{name}",
)
return cls(attributes=attributes)
type_name: str = Field(default="AzureEventHubConsumerGroup", allow_mutation=False)
@validator("type_name")
def validate_type_name(cls, v):
if v != "AzureEventHubConsumerGroup":
raise ValueError("must be AzureEventHubConsumerGroup")
return v
def __setattr__(self, name, value):
if name in AzureEventHubConsumerGroup._convenience_properties:
return object.__setattr__(self, name, value)
super().__setattr__(name, value)
_convenience_properties: ClassVar[List[str]] = []
AzureEventHubConsumerGroup.Attributes.update_forward_refs()