Source code for pyatlan.model.assets.core.flow_reusable_unit

# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.


from __future__ import annotations

from typing import ClassVar, List, Optional

from pydantic.v1 import Field, validator

from pyatlan.model.fields.atlan_fields import NumericField, RelationField

from .flow import Flow


[docs] class FlowReusableUnit(Flow): """Description""" type_name: str = Field(default="FlowReusableUnit", allow_mutation=False) @validator("type_name") def validate_type_name(cls, v): if v != "FlowReusableUnit": raise ValueError("must be FlowReusableUnit") return v def __setattr__(self, name, value): if name in FlowReusableUnit._convenience_properties: return object.__setattr__(self, name, value) super().__setattr__(name, value) FLOW_DATASET_COUNT: ClassVar[NumericField] = NumericField( "flowDatasetCount", "flowDatasetCount" ) """ Count of the number of ephemeral datasets contained within this reusable unit. """ FLOW_CONTROL_OPERATION_COUNT: ClassVar[NumericField] = NumericField( "flowControlOperationCount", "flowControlOperationCount" ) """ Count of the number of control flow operations that execute this reusable unit. """ FLOW_DATA_FLOWS: ClassVar[RelationField] = RelationField("flowDataFlows") """ TBC """ FLOW_ABSTRACTS: ClassVar[RelationField] = RelationField("flowAbstracts") """ TBC """ FLOW_DATASETS: ClassVar[RelationField] = RelationField("flowDatasets") """ TBC """ _convenience_properties: ClassVar[List[str]] = [ "flow_dataset_count", "flow_control_operation_count", "flow_data_flows", "flow_abstracts", "flow_datasets", ] @property def flow_dataset_count(self) -> Optional[int]: return None if self.attributes is None else self.attributes.flow_dataset_count @flow_dataset_count.setter def flow_dataset_count(self, flow_dataset_count: Optional[int]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_dataset_count = flow_dataset_count @property def flow_control_operation_count(self) -> Optional[int]: return ( None if self.attributes is None else self.attributes.flow_control_operation_count ) @flow_control_operation_count.setter def flow_control_operation_count(self, flow_control_operation_count: Optional[int]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_control_operation_count = flow_control_operation_count @property def flow_data_flows(self) -> Optional[List[FlowDatasetOperation]]: return None if self.attributes is None else self.attributes.flow_data_flows @flow_data_flows.setter def flow_data_flows(self, flow_data_flows: Optional[List[FlowDatasetOperation]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_data_flows = flow_data_flows @property def flow_abstracts(self) -> Optional[List[FlowDataset]]: return None if self.attributes is None else self.attributes.flow_abstracts @flow_abstracts.setter def flow_abstracts(self, flow_abstracts: Optional[List[FlowDataset]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_abstracts = flow_abstracts @property def flow_datasets(self) -> Optional[List[FlowDataset]]: return None if self.attributes is None else self.attributes.flow_datasets @flow_datasets.setter def flow_datasets(self, flow_datasets: Optional[List[FlowDataset]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_datasets = flow_datasets class Attributes(Flow.Attributes): flow_dataset_count: Optional[int] = Field(default=None, description="") flow_control_operation_count: Optional[int] = Field( default=None, description="" ) flow_data_flows: Optional[List[FlowDatasetOperation]] = Field( default=None, description="" ) # relationship flow_abstracts: Optional[List[FlowDataset]] = Field( default=None, description="" ) # relationship flow_datasets: Optional[List[FlowDataset]] = Field( default=None, description="" ) # relationship attributes: FlowReusableUnit.Attributes = Field( default_factory=lambda: FlowReusableUnit.Attributes(), description=( "Map of attributes in the instance and their values. " "The specific keys of this map will vary by type, " "so are described in the sub-types of this schema." ), )
from .flow_dataset import FlowDataset # noqa: E402, F401 from .flow_dataset_operation import FlowDatasetOperation # noqa: E402, F401