Source code for pyatlan.model.assets.core.process

# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.


from __future__ import annotations

import hashlib
from io import StringIO
from typing import ClassVar, List, Optional, Set
from warnings import warn

from pydantic.v1 import Field, validator

from pyatlan.model.enums import AIDatasetType
from pyatlan.model.fields.atlan_fields import KeywordField, RelationField, TextField
from pyatlan.utils import init_guid, validate_required_fields

from .asset import Asset


[docs] class Process(Asset, type_name="Process"): """Description""" @classmethod @init_guid def creator( cls, name: str, connection_qualified_name: str, inputs: List["Catalog"], outputs: List["Catalog"], process_id: Optional[str] = None, parent: Optional[Process] = None, ) -> Process: return Process( attributes=Process.Attributes.create( name=name, connection_qualified_name=connection_qualified_name, process_id=process_id, inputs=inputs, outputs=outputs, parent=parent, ) ) @classmethod @init_guid def create( cls, name: str, connection_qualified_name: str, inputs: List["Catalog"], outputs: List["Catalog"], process_id: Optional[str] = None, parent: Optional[Process] = None, ) -> Process: warn( ( "This method is deprecated, please use 'creator' " "instead, which offers identical functionality." ), DeprecationWarning, stacklevel=2, ) return cls.creator( name=name, connection_qualified_name=connection_qualified_name, inputs=inputs, outputs=outputs, process_id=process_id, parent=parent, ) type_name: str = Field(default="Process", allow_mutation=False) @validator("type_name") def validate_type_name(cls, v): if v != "Process": raise ValueError("must be Process") return v def __setattr__(self, name, value): if name in Process._convenience_properties: return object.__setattr__(self, name, value) super().__setattr__(name, value) CODE: ClassVar[TextField] = TextField("code", "code") """ Code that ran within the process. """ SQL: ClassVar[TextField] = TextField("sql", "sql") """ SQL query that ran to produce the outputs. """ PARENT_CONNECTION_PROCESS_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField( "parentConnectionProcessQualifiedName", "parentConnectionProcessQualifiedName" ) """ """ AST: ClassVar[TextField] = TextField("ast", "ast") """ Parsed AST of the code or SQL statements that describe the logic of this process. """ ADDITIONAL_ETL_CONTEXT: ClassVar[TextField] = TextField( "additionalEtlContext", "additionalEtlContext" ) """ Additional Context of the ETL pipeline/notebook which creates the process. """ AI_DATASET_TYPE: ClassVar[KeywordField] = KeywordField( "aiDatasetType", "aiDatasetType" ) """ Dataset type for AI Model - dataset process. """ FLOW_ORCHESTRATED_BY: ClassVar[RelationField] = RelationField("flowOrchestratedBy") """ TBC """ ADF_ACTIVITY: ClassVar[RelationField] = RelationField("adfActivity") """ TBC """ SPARK_JOBS: ClassVar[RelationField] = RelationField("sparkJobs") """ TBC """ MATILLION_COMPONENT: ClassVar[RelationField] = RelationField("matillionComponent") """ TBC """ AIRFLOW_TASKS: ClassVar[RelationField] = RelationField("airflowTasks") """ TBC """ FIVETRAN_CONNECTOR: ClassVar[RelationField] = RelationField("fivetranConnector") """ TBC """ POWER_BI_DATAFLOW: ClassVar[RelationField] = RelationField("powerBIDataflow") """ TBC """ COLUMN_PROCESSES: ClassVar[RelationField] = RelationField("columnProcesses") """ TBC """ _convenience_properties: ClassVar[List[str]] = [ "inputs", "outputs", "code", "sql", "parent_connection_process_qualified_name", "ast", "additional_etl_context", "ai_dataset_type", "flow_orchestrated_by", "adf_activity", "spark_jobs", "matillion_component", "airflow_tasks", "fivetran_connector", "power_b_i_dataflow", "column_processes", ] @property def inputs(self) -> Optional[List[Catalog]]: return None if self.attributes is None else self.attributes.inputs @inputs.setter def inputs(self, inputs: Optional[List[Catalog]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.inputs = inputs @property def outputs(self) -> Optional[List[Catalog]]: return None if self.attributes is None else self.attributes.outputs @outputs.setter def outputs(self, outputs: Optional[List[Catalog]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.outputs = outputs @property def code(self) -> Optional[str]: return None if self.attributes is None else self.attributes.code @code.setter def code(self, code: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.code = code @property def sql(self) -> Optional[str]: return None if self.attributes is None else self.attributes.sql @sql.setter def sql(self, sql: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.sql = sql @property def parent_connection_process_qualified_name(self) -> Optional[Set[str]]: return ( None if self.attributes is None else self.attributes.parent_connection_process_qualified_name ) @parent_connection_process_qualified_name.setter def parent_connection_process_qualified_name( self, parent_connection_process_qualified_name: Optional[Set[str]] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.parent_connection_process_qualified_name = ( parent_connection_process_qualified_name ) @property def ast(self) -> Optional[str]: return None if self.attributes is None else self.attributes.ast @ast.setter def ast(self, ast: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.ast = ast @property def additional_etl_context(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.additional_etl_context ) @additional_etl_context.setter def additional_etl_context(self, additional_etl_context: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.additional_etl_context = additional_etl_context @property def ai_dataset_type(self) -> Optional[AIDatasetType]: return None if self.attributes is None else self.attributes.ai_dataset_type @ai_dataset_type.setter def ai_dataset_type(self, ai_dataset_type: Optional[AIDatasetType]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.ai_dataset_type = ai_dataset_type @property def flow_orchestrated_by(self) -> Optional[FlowControlOperation]: return None if self.attributes is None else self.attributes.flow_orchestrated_by @flow_orchestrated_by.setter def flow_orchestrated_by( self, flow_orchestrated_by: Optional[FlowControlOperation] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_orchestrated_by = flow_orchestrated_by @property def adf_activity(self) -> Optional[AdfActivity]: return None if self.attributes is None else self.attributes.adf_activity @adf_activity.setter def adf_activity(self, adf_activity: Optional[AdfActivity]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_activity = adf_activity @property def spark_jobs(self) -> Optional[List[SparkJob]]: return None if self.attributes is None else self.attributes.spark_jobs @spark_jobs.setter def spark_jobs(self, spark_jobs: Optional[List[SparkJob]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.spark_jobs = spark_jobs @property def matillion_component(self) -> Optional[MatillionComponent]: return None if self.attributes is None else self.attributes.matillion_component @matillion_component.setter def matillion_component(self, matillion_component: Optional[MatillionComponent]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.matillion_component = matillion_component @property def airflow_tasks(self) -> Optional[List[AirflowTask]]: return None if self.attributes is None else self.attributes.airflow_tasks @airflow_tasks.setter def airflow_tasks(self, airflow_tasks: Optional[List[AirflowTask]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.airflow_tasks = airflow_tasks @property def fivetran_connector(self) -> Optional[FivetranConnector]: return None if self.attributes is None else self.attributes.fivetran_connector @fivetran_connector.setter def fivetran_connector(self, fivetran_connector: Optional[FivetranConnector]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.fivetran_connector = fivetran_connector @property def power_b_i_dataflow(self) -> Optional[PowerBIDataflow]: return None if self.attributes is None else self.attributes.power_b_i_dataflow @power_b_i_dataflow.setter def power_b_i_dataflow(self, power_b_i_dataflow: Optional[PowerBIDataflow]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.power_b_i_dataflow = power_b_i_dataflow @property def column_processes(self) -> Optional[List[ColumnProcess]]: return None if self.attributes is None else self.attributes.column_processes @column_processes.setter def column_processes(self, column_processes: Optional[List[ColumnProcess]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.column_processes = column_processes class Attributes(Asset.Attributes): inputs: Optional[List[Catalog]] = Field(default=None, description="") outputs: Optional[List[Catalog]] = Field(default=None, description="") code: Optional[str] = Field(default=None, description="") sql: Optional[str] = Field(default=None, description="") parent_connection_process_qualified_name: Optional[Set[str]] = Field( default=None, description="" ) ast: Optional[str] = Field(default=None, description="") additional_etl_context: Optional[str] = Field(default=None, description="") ai_dataset_type: Optional[AIDatasetType] = Field(default=None, description="") flow_orchestrated_by: Optional[FlowControlOperation] = Field( default=None, description="" ) # relationship adf_activity: Optional[AdfActivity] = Field( default=None, description="" ) # relationship spark_jobs: Optional[List[SparkJob]] = Field( default=None, description="" ) # relationship matillion_component: Optional[MatillionComponent] = Field( default=None, description="" ) # relationship airflow_tasks: Optional[List[AirflowTask]] = Field( default=None, description="" ) # relationship fivetran_connector: Optional[FivetranConnector] = Field( default=None, description="" ) # relationship power_b_i_dataflow: Optional[PowerBIDataflow] = Field( default=None, description="" ) # relationship column_processes: Optional[List[ColumnProcess]] = Field( default=None, description="" ) # relationship @staticmethod def generate_qualified_name( name: str, connection_qualified_name: str, inputs: List["Catalog"], outputs: List["Catalog"], parent: Optional["Process"] = None, process_id: Optional[str] = None, ) -> str: def append_relationship(output: StringIO, relationship: Asset): if relationship.guid: output.write(relationship.guid) def append_relationships(output: StringIO, relationships: List["Catalog"]): for catalog in relationships: append_relationship(output, catalog) validate_required_fields( ["name", "connection_qualified_name", "inputs", "outputs"], [name, connection_qualified_name, inputs, outputs], ) if process_id and process_id.strip(): return f"{connection_qualified_name}/{process_id}" buffer = StringIO() buffer.write(name) buffer.write(connection_qualified_name) if parent: append_relationship(buffer, parent) append_relationships(buffer, inputs) append_relationships(buffer, outputs) ret_value = hashlib.md5( # noqa: S303, S324 buffer.getvalue().encode() ).hexdigest() buffer.close() return f"{connection_qualified_name}/{ret_value}" @classmethod @init_guid def create( cls, name: str, connection_qualified_name: str, inputs: List["Catalog"], outputs: List["Catalog"], process_id: Optional[str] = None, parent: Optional[Process] = None, ) -> Process.Attributes: qualified_name = Process.Attributes.generate_qualified_name( name=name, connection_qualified_name=connection_qualified_name, process_id=process_id, inputs=inputs, outputs=outputs, parent=parent, ) connector_name = connection_qualified_name.split("/")[1] return Process.Attributes( name=name, qualified_name=qualified_name, connector_name=connector_name, connection_qualified_name=connection_qualified_name, inputs=inputs, outputs=outputs, ) attributes: Process.Attributes = Field( default_factory=lambda: Process.Attributes(), description=( "Map of attributes in the instance and their values. " "The specific keys of this map will vary by type, " "so are described in the sub-types of this schema." ), )
from .adf_activity import AdfActivity # noqa: E402, F401 from .airflow_task import AirflowTask # noqa: E402, F401 from .catalog import Catalog # noqa: E402, F401 from .column_process import ColumnProcess # noqa: E402, F401 from .fivetran_connector import FivetranConnector # noqa: E402, F401 from .flow_control_operation import FlowControlOperation # noqa: E402, F401 from .matillion_component import MatillionComponent # noqa: E402, F401 from .power_b_i_dataflow import PowerBIDataflow # noqa: E402, F401 from .spark_job import SparkJob # noqa: E402, F401