# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
from __future__ import annotations
from datetime import datetime
from typing import ClassVar, List, Optional, Set
from pydantic.v1 import Field, validator
from pyatlan.model.enums import AIDatasetType
from pyatlan.model.fields.atlan_fields import (
KeywordField,
KeywordTextField,
NumericField,
TextField,
)
from .column_process import ColumnProcess
[docs]
class FlowFieldOperation(ColumnProcess):
"""Description"""
type_name: str = Field(default="FlowFieldOperation", allow_mutation=False)
@validator("type_name")
def validate_type_name(cls, v):
if v != "FlowFieldOperation":
raise ValueError("must be FlowFieldOperation")
return v
def __setattr__(self, name, value):
if name in FlowFieldOperation._convenience_properties:
return object.__setattr__(self, name, value)
super().__setattr__(name, value)
CODE: ClassVar[TextField] = TextField("code", "code")
"""
Code that ran within the process.
"""
SQL: ClassVar[TextField] = TextField("sql", "sql")
"""
SQL query that ran to produce the outputs.
"""
PARENT_CONNECTION_PROCESS_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField(
"parentConnectionProcessQualifiedName", "parentConnectionProcessQualifiedName"
)
"""
"""
AST: ClassVar[TextField] = TextField("ast", "ast")
"""
Parsed AST of the code or SQL statements that describe the logic of this process.
"""
ADDITIONAL_ETL_CONTEXT: ClassVar[TextField] = TextField(
"additionalEtlContext", "additionalEtlContext"
)
"""
Additional Context of the ETL pipeline/notebook which creates the process.
"""
AI_DATASET_TYPE: ClassVar[KeywordField] = KeywordField(
"aiDatasetType", "aiDatasetType"
)
"""
Dataset type for AI Model - dataset process.
"""
FLOW_STARTED_AT: ClassVar[NumericField] = NumericField(
"flowStartedAt", "flowStartedAt"
)
"""
Date and time at which this point in the data processing or orchestration started.
"""
FLOW_FINISHED_AT: ClassVar[NumericField] = NumericField(
"flowFinishedAt", "flowFinishedAt"
)
"""
Date and time at which this point in the data processing or orchestration finished.
"""
FLOW_STATUS: ClassVar[KeywordField] = KeywordField("flowStatus", "flowStatus")
"""
Overall status of this point in the data processing or orchestration.
"""
FLOW_SCHEDULE: ClassVar[KeywordField] = KeywordField("flowSchedule", "flowSchedule")
"""
Schedule for this point in the data processing or orchestration.
"""
FLOW_PROJECT_NAME: ClassVar[KeywordTextField] = KeywordTextField(
"flowProjectName", "flowProjectName.keyword", "flowProjectName"
)
"""
Simple name of the project in which this asset is contained.
"""
FLOW_PROJECT_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField(
"flowProjectQualifiedName", "flowProjectQualifiedName"
)
"""
Unique name of the project in which this asset is contained.
"""
FLOW_FOLDER_NAME: ClassVar[KeywordTextField] = KeywordTextField(
"flowFolderName", "flowFolderName.keyword", "flowFolderName"
)
"""
Simple name of the folder in which this asset is contained.
"""
FLOW_FOLDER_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField(
"flowFolderQualifiedName", "flowFolderQualifiedName"
)
"""
Unique name of the folder in which this asset is contained.
"""
FLOW_REUSABLE_UNIT_NAME: ClassVar[KeywordTextField] = KeywordTextField(
"flowReusableUnitName", "flowReusableUnitName.keyword", "flowReusableUnitName"
)
"""
Simple name of the reusable grouping of operations in which this ephemeral data is contained.
"""
FLOW_REUSABLE_UNIT_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField(
"flowReusableUnitQualifiedName", "flowReusableUnitQualifiedName"
)
"""
Unique name of the reusable grouping of operations in which this ephemeral data is contained.
"""
FLOW_ID: ClassVar[KeywordField] = KeywordField("flowId", "flowId")
"""
Unique ID for this flow asset, which will remain constant throughout the lifecycle of the asset.
"""
FLOW_RUN_ID: ClassVar[KeywordField] = KeywordField("flowRunId", "flowRunId")
"""
Unique ID of the flow run, which could change on subsequent runs of the same flow.
"""
FLOW_ERROR_MESSAGE: ClassVar[KeywordField] = KeywordField(
"flowErrorMessage", "flowErrorMessage"
)
"""
Optional error message of the flow run.
"""
_convenience_properties: ClassVar[List[str]] = [
"inputs",
"outputs",
"code",
"sql",
"parent_connection_process_qualified_name",
"ast",
"additional_etl_context",
"ai_dataset_type",
"flow_started_at",
"flow_finished_at",
"flow_status",
"flow_schedule",
"flow_project_name",
"flow_project_qualified_name",
"flow_folder_name",
"flow_folder_qualified_name",
"flow_reusable_unit_name",
"flow_reusable_unit_qualified_name",
"flow_id",
"flow_run_id",
"flow_error_message",
]
@property
def inputs(self) -> Optional[List[Catalog]]:
return None if self.attributes is None else self.attributes.inputs
@inputs.setter
def inputs(self, inputs: Optional[List[Catalog]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.inputs = inputs
@property
def outputs(self) -> Optional[List[Catalog]]:
return None if self.attributes is None else self.attributes.outputs
@outputs.setter
def outputs(self, outputs: Optional[List[Catalog]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.outputs = outputs
@property
def code(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.code
@code.setter
def code(self, code: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.code = code
@property
def sql(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.sql
@sql.setter
def sql(self, sql: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.sql = sql
@property
def parent_connection_process_qualified_name(self) -> Optional[Set[str]]:
return (
None
if self.attributes is None
else self.attributes.parent_connection_process_qualified_name
)
@parent_connection_process_qualified_name.setter
def parent_connection_process_qualified_name(
self, parent_connection_process_qualified_name: Optional[Set[str]]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.parent_connection_process_qualified_name = (
parent_connection_process_qualified_name
)
@property
def ast(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.ast
@ast.setter
def ast(self, ast: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.ast = ast
@property
def additional_etl_context(self) -> Optional[str]:
return (
None if self.attributes is None else self.attributes.additional_etl_context
)
@additional_etl_context.setter
def additional_etl_context(self, additional_etl_context: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.additional_etl_context = additional_etl_context
@property
def ai_dataset_type(self) -> Optional[AIDatasetType]:
return None if self.attributes is None else self.attributes.ai_dataset_type
@ai_dataset_type.setter
def ai_dataset_type(self, ai_dataset_type: Optional[AIDatasetType]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.ai_dataset_type = ai_dataset_type
@property
def flow_started_at(self) -> Optional[datetime]:
return None if self.attributes is None else self.attributes.flow_started_at
@flow_started_at.setter
def flow_started_at(self, flow_started_at: Optional[datetime]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.flow_started_at = flow_started_at
@property
def flow_finished_at(self) -> Optional[datetime]:
return None if self.attributes is None else self.attributes.flow_finished_at
@flow_finished_at.setter
def flow_finished_at(self, flow_finished_at: Optional[datetime]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.flow_finished_at = flow_finished_at
@property
def flow_status(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.flow_status
@flow_status.setter
def flow_status(self, flow_status: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.flow_status = flow_status
@property
def flow_schedule(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.flow_schedule
@flow_schedule.setter
def flow_schedule(self, flow_schedule: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.flow_schedule = flow_schedule
@property
def flow_project_name(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.flow_project_name
@flow_project_name.setter
def flow_project_name(self, flow_project_name: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.flow_project_name = flow_project_name
@property
def flow_project_qualified_name(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.flow_project_qualified_name
)
@flow_project_qualified_name.setter
def flow_project_qualified_name(self, flow_project_qualified_name: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.flow_project_qualified_name = flow_project_qualified_name
@property
def flow_folder_name(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.flow_folder_name
@flow_folder_name.setter
def flow_folder_name(self, flow_folder_name: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.flow_folder_name = flow_folder_name
@property
def flow_folder_qualified_name(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.flow_folder_qualified_name
)
@flow_folder_qualified_name.setter
def flow_folder_qualified_name(self, flow_folder_qualified_name: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.flow_folder_qualified_name = flow_folder_qualified_name
@property
def flow_reusable_unit_name(self) -> Optional[str]:
return (
None if self.attributes is None else self.attributes.flow_reusable_unit_name
)
@flow_reusable_unit_name.setter
def flow_reusable_unit_name(self, flow_reusable_unit_name: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.flow_reusable_unit_name = flow_reusable_unit_name
@property
def flow_reusable_unit_qualified_name(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.flow_reusable_unit_qualified_name
)
@flow_reusable_unit_qualified_name.setter
def flow_reusable_unit_qualified_name(
self, flow_reusable_unit_qualified_name: Optional[str]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.flow_reusable_unit_qualified_name = (
flow_reusable_unit_qualified_name
)
@property
def flow_id(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.flow_id
@flow_id.setter
def flow_id(self, flow_id: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.flow_id = flow_id
@property
def flow_run_id(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.flow_run_id
@flow_run_id.setter
def flow_run_id(self, flow_run_id: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.flow_run_id = flow_run_id
@property
def flow_error_message(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.flow_error_message
@flow_error_message.setter
def flow_error_message(self, flow_error_message: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.flow_error_message = flow_error_message
class Attributes(ColumnProcess.Attributes):
inputs: Optional[List[Catalog]] = Field(default=None, description="")
outputs: Optional[List[Catalog]] = Field(default=None, description="")
code: Optional[str] = Field(default=None, description="")
sql: Optional[str] = Field(default=None, description="")
parent_connection_process_qualified_name: Optional[Set[str]] = Field(
default=None, description=""
)
ast: Optional[str] = Field(default=None, description="")
additional_etl_context: Optional[str] = Field(default=None, description="")
ai_dataset_type: Optional[AIDatasetType] = Field(default=None, description="")
flow_started_at: Optional[datetime] = Field(default=None, description="")
flow_finished_at: Optional[datetime] = Field(default=None, description="")
flow_status: Optional[str] = Field(default=None, description="")
flow_schedule: Optional[str] = Field(default=None, description="")
flow_project_name: Optional[str] = Field(default=None, description="")
flow_project_qualified_name: Optional[str] = Field(default=None, description="")
flow_folder_name: Optional[str] = Field(default=None, description="")
flow_folder_qualified_name: Optional[str] = Field(default=None, description="")
flow_reusable_unit_name: Optional[str] = Field(default=None, description="")
flow_reusable_unit_qualified_name: Optional[str] = Field(
default=None, description=""
)
flow_id: Optional[str] = Field(default=None, description="")
flow_run_id: Optional[str] = Field(default=None, description="")
flow_error_message: Optional[str] = Field(default=None, description="")
attributes: FlowFieldOperation.Attributes = Field(
default_factory=lambda: FlowFieldOperation.Attributes(),
description=(
"Map of attributes in the instance and their values. "
"The specific keys of this map will vary by type, "
"so are described in the sub-types of this schema."
),
)
from .catalog import Catalog # noqa: E402, F401