Source code for pyatlan.model.assets.core.flow_field

# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.


from __future__ import annotations

from datetime import datetime
from typing import ClassVar, List, Optional

from pydantic.v1 import Field, validator

from pyatlan.model.fields.atlan_fields import (
    KeywordField,
    KeywordTextField,
    NumericField,
    RelationField,
)

from .catalog import Catalog


[docs] class FlowField(Catalog): """Description""" type_name: str = Field(default="FlowField", allow_mutation=False) @validator("type_name") def validate_type_name(cls, v): if v != "FlowField": raise ValueError("must be FlowField") return v def __setattr__(self, name, value): if name in FlowField._convenience_properties: return object.__setattr__(self, name, value) super().__setattr__(name, value) FLOW_DATASET_NAME: ClassVar[KeywordTextField] = KeywordTextField( "flowDatasetName", "flowDatasetName.keyword", "flowDatasetName" ) """ Simple name of the ephemeral dataset in which this field is contained. """ FLOW_DATASET_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField( "flowDatasetQualifiedName", "flowDatasetQualifiedName" ) """ Unique name of the ephemeral dataset in which this field is contained. """ FLOW_DATA_TYPE: ClassVar[KeywordField] = KeywordField( "flowDataType", "flowDataType" ) """ Type of the data captured in this field. """ FLOW_EXPRESSION: ClassVar[KeywordField] = KeywordField( "flowExpression", "flowExpression" ) """ Logic that is applied, injected or otherwise used as part of producing this ephemeral field of data. """ FLOW_STARTED_AT: ClassVar[NumericField] = NumericField( "flowStartedAt", "flowStartedAt" ) """ Date and time at which this point in the data processing or orchestration started. """ FLOW_FINISHED_AT: ClassVar[NumericField] = NumericField( "flowFinishedAt", "flowFinishedAt" ) """ Date and time at which this point in the data processing or orchestration finished. """ FLOW_STATUS: ClassVar[KeywordField] = KeywordField("flowStatus", "flowStatus") """ Overall status of this point in the data processing or orchestration. """ FLOW_SCHEDULE: ClassVar[KeywordField] = KeywordField("flowSchedule", "flowSchedule") """ Schedule for this point in the data processing or orchestration. """ FLOW_PROJECT_NAME: ClassVar[KeywordTextField] = KeywordTextField( "flowProjectName", "flowProjectName.keyword", "flowProjectName" ) """ Simple name of the project in which this asset is contained. """ FLOW_PROJECT_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField( "flowProjectQualifiedName", "flowProjectQualifiedName" ) """ Unique name of the project in which this asset is contained. """ FLOW_FOLDER_NAME: ClassVar[KeywordTextField] = KeywordTextField( "flowFolderName", "flowFolderName.keyword", "flowFolderName" ) """ Simple name of the folder in which this asset is contained. """ FLOW_FOLDER_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField( "flowFolderQualifiedName", "flowFolderQualifiedName" ) """ Unique name of the folder in which this asset is contained. """ FLOW_REUSABLE_UNIT_NAME: ClassVar[KeywordTextField] = KeywordTextField( "flowReusableUnitName", "flowReusableUnitName.keyword", "flowReusableUnitName" ) """ Simple name of the reusable grouping of operations in which this ephemeral data is contained. """ FLOW_REUSABLE_UNIT_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField( "flowReusableUnitQualifiedName", "flowReusableUnitQualifiedName" ) """ Unique name of the reusable grouping of operations in which this ephemeral data is contained. """ FLOW_ID: ClassVar[KeywordField] = KeywordField("flowId", "flowId") """ Unique ID for this flow asset, which will remain constant throughout the lifecycle of the asset. """ FLOW_RUN_ID: ClassVar[KeywordField] = KeywordField("flowRunId", "flowRunId") """ Unique ID of the flow run, which could change on subsequent runs of the same flow. """ FLOW_ERROR_MESSAGE: ClassVar[KeywordField] = KeywordField( "flowErrorMessage", "flowErrorMessage" ) """ Optional error message of the flow run. """ FLOW_DATASET: ClassVar[RelationField] = RelationField("flowDataset") """ TBC """ _convenience_properties: ClassVar[List[str]] = [ "flow_dataset_name", "flow_dataset_qualified_name", "flow_data_type", "flow_expression", "flow_started_at", "flow_finished_at", "flow_status", "flow_schedule", "flow_project_name", "flow_project_qualified_name", "flow_folder_name", "flow_folder_qualified_name", "flow_reusable_unit_name", "flow_reusable_unit_qualified_name", "flow_id", "flow_run_id", "flow_error_message", "flow_dataset", ] @property def flow_dataset_name(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_dataset_name @flow_dataset_name.setter def flow_dataset_name(self, flow_dataset_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_dataset_name = flow_dataset_name @property def flow_dataset_qualified_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.flow_dataset_qualified_name ) @flow_dataset_qualified_name.setter def flow_dataset_qualified_name(self, flow_dataset_qualified_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_dataset_qualified_name = flow_dataset_qualified_name @property def flow_data_type(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_data_type @flow_data_type.setter def flow_data_type(self, flow_data_type: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_data_type = flow_data_type @property def flow_expression(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_expression @flow_expression.setter def flow_expression(self, flow_expression: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_expression = flow_expression @property def flow_started_at(self) -> Optional[datetime]: return None if self.attributes is None else self.attributes.flow_started_at @flow_started_at.setter def flow_started_at(self, flow_started_at: Optional[datetime]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_started_at = flow_started_at @property def flow_finished_at(self) -> Optional[datetime]: return None if self.attributes is None else self.attributes.flow_finished_at @flow_finished_at.setter def flow_finished_at(self, flow_finished_at: Optional[datetime]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_finished_at = flow_finished_at @property def flow_status(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_status @flow_status.setter def flow_status(self, flow_status: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_status = flow_status @property def flow_schedule(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_schedule @flow_schedule.setter def flow_schedule(self, flow_schedule: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_schedule = flow_schedule @property def flow_project_name(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_project_name @flow_project_name.setter def flow_project_name(self, flow_project_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_project_name = flow_project_name @property def flow_project_qualified_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.flow_project_qualified_name ) @flow_project_qualified_name.setter def flow_project_qualified_name(self, flow_project_qualified_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_project_qualified_name = flow_project_qualified_name @property def flow_folder_name(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_folder_name @flow_folder_name.setter def flow_folder_name(self, flow_folder_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_folder_name = flow_folder_name @property def flow_folder_qualified_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.flow_folder_qualified_name ) @flow_folder_qualified_name.setter def flow_folder_qualified_name(self, flow_folder_qualified_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_folder_qualified_name = flow_folder_qualified_name @property def flow_reusable_unit_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.flow_reusable_unit_name ) @flow_reusable_unit_name.setter def flow_reusable_unit_name(self, flow_reusable_unit_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_reusable_unit_name = flow_reusable_unit_name @property def flow_reusable_unit_qualified_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.flow_reusable_unit_qualified_name ) @flow_reusable_unit_qualified_name.setter def flow_reusable_unit_qualified_name( self, flow_reusable_unit_qualified_name: Optional[str] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_reusable_unit_qualified_name = ( flow_reusable_unit_qualified_name ) @property def flow_id(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_id @flow_id.setter def flow_id(self, flow_id: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_id = flow_id @property def flow_run_id(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_run_id @flow_run_id.setter def flow_run_id(self, flow_run_id: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_run_id = flow_run_id @property def flow_error_message(self) -> Optional[str]: return None if self.attributes is None else self.attributes.flow_error_message @flow_error_message.setter def flow_error_message(self, flow_error_message: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_error_message = flow_error_message @property def flow_dataset(self) -> Optional[FlowDataset]: return None if self.attributes is None else self.attributes.flow_dataset @flow_dataset.setter def flow_dataset(self, flow_dataset: Optional[FlowDataset]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_dataset = flow_dataset class Attributes(Catalog.Attributes): flow_dataset_name: Optional[str] = Field(default=None, description="") flow_dataset_qualified_name: Optional[str] = Field(default=None, description="") flow_data_type: Optional[str] = Field(default=None, description="") flow_expression: Optional[str] = Field(default=None, description="") flow_started_at: Optional[datetime] = Field(default=None, description="") flow_finished_at: Optional[datetime] = Field(default=None, description="") flow_status: Optional[str] = Field(default=None, description="") flow_schedule: Optional[str] = Field(default=None, description="") flow_project_name: Optional[str] = Field(default=None, description="") flow_project_qualified_name: Optional[str] = Field(default=None, description="") flow_folder_name: Optional[str] = Field(default=None, description="") flow_folder_qualified_name: Optional[str] = Field(default=None, description="") flow_reusable_unit_name: Optional[str] = Field(default=None, description="") flow_reusable_unit_qualified_name: Optional[str] = Field( default=None, description="" ) flow_id: Optional[str] = Field(default=None, description="") flow_run_id: Optional[str] = Field(default=None, description="") flow_error_message: Optional[str] = Field(default=None, description="") flow_dataset: Optional[FlowDataset] = Field( default=None, description="" ) # relationship attributes: FlowField.Attributes = Field( default_factory=lambda: FlowField.Attributes(), description=( "Map of attributes in the instance and their values. " "The specific keys of this map will vary by type, " "so are described in the sub-types of this schema." ), )
from .flow_dataset import FlowDataset # noqa: E402, F401