Source code for pyatlan.model.assets.core.flow_dataset

# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.


from __future__ import annotations

from datetime import datetime
from typing import ClassVar, List, Optional

from pydantic.v1 import Field, validator

from pyatlan.model.fields.atlan_fields import (
    KeywordField,
    KeywordTextField,
    NumericField,
    RelationField,
)

from .catalog import Catalog


[docs] class FlowDataset(Catalog): """Description""" type_name: str = Field(default="FlowDataset", allow_mutation=False) @validator("type_name") def validate_type_name(cls, v): if v != "FlowDataset": raise ValueError("must be FlowDataset") return v def __setattr__(self, name, value): if name in FlowDataset._convenience_properties: return object.__setattr__(self, name, value) super().__setattr__(name, value) FLOW_FIELD_COUNT: ClassVar[NumericField] = NumericField( "flowFieldCount", "flowFieldCount" ) """ Count of the number of individual fields that make up this ephemeral dataset. """ FLOW_TYPE: ClassVar[KeywordField] = KeywordField("flowType", "flowType") """ Type of the ephemeral piece of data. """ FLOW_EXPRESSION: ClassVar[KeywordField] = KeywordField( "flowExpression", "flowExpression" ) """ Logic that is applied, injected or otherwise used as part of producing this ephemeral piece of data. """ FLOW_QUERY: ClassVar[KeywordField] = KeywordField("flowQuery", "flowQuery") """ Query (e.g. SQL) that was run to produce this ephemeral piece of data. """ FLOW_STARTED_AT: ClassVar[NumericField] = NumericField( "flowStartedAt", "flowStartedAt" ) """ Date and time at which this point in the data processing or orchestration started. """ FLOW_FINISHED_AT: ClassVar[NumericField] = NumericField( "flowFinishedAt", "flowFinishedAt" ) """ Date and time at which this point in the data processing or orchestration finished. """ FLOW_STATUS: ClassVar[KeywordField] = KeywordField("flowStatus", "flowStatus") """ Overall status of this point in the data processing or orchestration. """ FLOW_SCHEDULE: ClassVar[KeywordField] = KeywordField("flowSchedule", "flowSchedule") """ Schedule for this point in the data processing or orchestration. """ FLOW_PROJECT_NAME: ClassVar[KeywordTextField] = KeywordTextField( "flowProjectName", "flowProjectName.keyword", "flowProjectName" ) """ Simple name of the project in which this asset is contained. """ FLOW_PROJECT_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField( "flowProjectQualifiedName", "flowProjectQualifiedName" ) """ Unique name of the project in which this asset is contained. """ FLOW_FOLDER_NAME: ClassVar[KeywordTextField] = KeywordTextField( "flowFolderName", "flowFolderName.keyword", "flowFolderName" ) """ Simple name of the folder in which this asset is contained. """ FLOW_FOLDER_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField( "flowFolderQualifiedName", "flowFolderQualifiedName" ) """ Unique name of the folder in which this asset is contained. """ FLOW_REUSABLE_UNIT_NAME: ClassVar[KeywordTextField] = KeywordTextField( "flowReusableUnitName", "flowReusableUnitName.keyword", "flowReusableUnitName" ) """ Simple name of the reusable grouping of operations in which this ephemeral data is contained. """ FLOW_REUSABLE_UNIT_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField( "flowReusableUnitQualifiedName", "flowReusableUnitQualifiedName" ) """ Unique name of the reusable grouping of operations in which this ephemeral data is contained. """ FLOW_ID: ClassVar[KeywordField] = KeywordField("flowId", "flowId") """ Unique ID for this flow asset, which will remain constant throughout the lifecycle of the asset. """ FLOW_RUN_ID: ClassVar[KeywordField] = KeywordField("flowRunId", "flowRunId") """ Unique ID of the flow run, which could change on subsequent runs of the same flow. """ FLOW_ERROR_MESSAGE: ClassVar[KeywordField] = KeywordField( "flowErrorMessage", "flowErrorMessage" ) """ Optional error message of the flow run. """ FLOW_PARENT_UNIT: ClassVar[RelationField] = RelationField("flowParentUnit") """ TBC """ FLOW_FIELDS: ClassVar[RelationField] = RelationField("flowFields") """ TBC """ FLOW_DETAILED_BY: ClassVar[RelationField] = RelationField("flowDetailedBy") """ TBC """ _convenience_properties: ClassVar[List[str]] = [ "flow_field_count", "flow_type", "flow_expression", "flow_query", "flow_started_at", "flow_finished_at", "flow_status", "flow_schedule", "flow_project_name", "flow_project_qualified_name", "flow_folder_name", "flow_folder_qualified_name", "flow_reusable_unit_name", "flow_reusable_unit_qualified_name", "flow_id", "flow_run_id", "flow_error_message", "flow_parent_unit", "flow_fields", "flow_detailed_by", ] @property def flow_field_count(self) -> Optional[int]: return None if self.attributes is None else self.attributes.flow_field_count @flow_field_count.setter def flow_field_count(self, flow_field_count: Optional[int]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_field_count = flow_field_count @property def flow_type(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_type @flow_type.setter def flow_type(self, flow_type: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_type = flow_type @property def flow_expression(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_expression @flow_expression.setter def flow_expression(self, flow_expression: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_expression = flow_expression @property def flow_query(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_query @flow_query.setter def flow_query(self, flow_query: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_query = flow_query @property def flow_started_at(self) -> Optional[datetime]: return None if self.attributes is None else self.attributes.flow_started_at @flow_started_at.setter def flow_started_at(self, flow_started_at: Optional[datetime]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_started_at = flow_started_at @property def flow_finished_at(self) -> Optional[datetime]: return None if self.attributes is None else self.attributes.flow_finished_at @flow_finished_at.setter def flow_finished_at(self, flow_finished_at: Optional[datetime]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_finished_at = flow_finished_at @property def flow_status(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_status @flow_status.setter def flow_status(self, flow_status: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_status = flow_status @property def flow_schedule(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_schedule @flow_schedule.setter def flow_schedule(self, flow_schedule: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_schedule = flow_schedule @property def flow_project_name(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_project_name @flow_project_name.setter def flow_project_name(self, flow_project_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_project_name = flow_project_name @property def flow_project_qualified_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.flow_project_qualified_name ) @flow_project_qualified_name.setter def flow_project_qualified_name(self, flow_project_qualified_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_project_qualified_name = flow_project_qualified_name @property def flow_folder_name(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_folder_name @flow_folder_name.setter def flow_folder_name(self, flow_folder_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_folder_name = flow_folder_name @property def flow_folder_qualified_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.flow_folder_qualified_name ) @flow_folder_qualified_name.setter def flow_folder_qualified_name(self, flow_folder_qualified_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_folder_qualified_name = flow_folder_qualified_name @property def flow_reusable_unit_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.flow_reusable_unit_name ) @flow_reusable_unit_name.setter def flow_reusable_unit_name(self, flow_reusable_unit_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_reusable_unit_name = flow_reusable_unit_name @property def flow_reusable_unit_qualified_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.flow_reusable_unit_qualified_name ) @flow_reusable_unit_qualified_name.setter def flow_reusable_unit_qualified_name( self, flow_reusable_unit_qualified_name: Optional[str] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_reusable_unit_qualified_name = ( flow_reusable_unit_qualified_name ) @property def flow_id(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_id @flow_id.setter def flow_id(self, flow_id: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_id = flow_id @property def flow_run_id(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_run_id @flow_run_id.setter def flow_run_id(self, flow_run_id: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_run_id = flow_run_id @property def flow_error_message(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_error_message @flow_error_message.setter def flow_error_message(self, flow_error_message: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_error_message = flow_error_message @property def flow_parent_unit(self) -> Optional[FlowReusableUnit]: return None if self.attributes is None else self.attributes.flow_parent_unit @flow_parent_unit.setter def flow_parent_unit(self, flow_parent_unit: Optional[FlowReusableUnit]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_parent_unit = flow_parent_unit @property def flow_fields(self) -> Optional[List[FlowField]]: return None if self.attributes is None else self.attributes.flow_fields @flow_fields.setter def flow_fields(self, flow_fields: Optional[List[FlowField]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_fields = flow_fields @property def flow_detailed_by(self) -> Optional[FlowReusableUnit]: return None if self.attributes is None else self.attributes.flow_detailed_by @flow_detailed_by.setter def flow_detailed_by(self, flow_detailed_by: Optional[FlowReusableUnit]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_detailed_by = flow_detailed_by class Attributes(Catalog.Attributes): flow_field_count: Optional[int] = Field(default=None, description="") flow_type: Optional[str] = Field(default=None, description="") flow_expression: Optional[str] = Field(default=None, description="") flow_query: Optional[str] = Field(default=None, description="") flow_started_at: Optional[datetime] = Field(default=None, description="") flow_finished_at: Optional[datetime] = Field(default=None, description="") flow_status: Optional[str] = Field(default=None, description="") flow_schedule: Optional[str] = Field(default=None, description="") flow_project_name: Optional[str] = Field(default=None, description="") flow_project_qualified_name: Optional[str] = Field(default=None, description="") flow_folder_name: Optional[str] = Field(default=None, description="") flow_folder_qualified_name: Optional[str] = Field(default=None, description="") flow_reusable_unit_name: Optional[str] = Field(default=None, description="") flow_reusable_unit_qualified_name: Optional[str] = Field( default=None, description="" ) flow_id: Optional[str] = Field(default=None, description="") flow_run_id: Optional[str] = Field(default=None, description="") flow_error_message: Optional[str] = Field(default=None, description="") flow_parent_unit: Optional[FlowReusableUnit] = Field( default=None, description="" ) # relationship flow_fields: Optional[List[FlowField]] = Field( default=None, description="" ) # relationship flow_detailed_by: Optional[FlowReusableUnit] = Field( default=None, description="" ) # relationship attributes: FlowDataset.Attributes = Field( default_factory=lambda: FlowDataset.Attributes(), description=( "Map of attributes in the instance and their values. " "The specific keys of this map will vary by type, " "so are described in the sub-types of this schema." ), )
from .flow_field import FlowField # noqa: E402, F401 from .flow_reusable_unit import FlowReusableUnit # noqa: E402, F401