Source code for pyatlan.model.assets.core.process

# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.


from __future__ import annotations

import hashlib
from io import StringIO
from typing import TYPE_CHECKING, ClassVar, List, Optional, Set
from warnings import warn

from pydantic.v1 import Field, validator

from pyatlan.model.enums import AIDatasetType
from pyatlan.model.fields.atlan_fields import KeywordField, RelationField, TextField
from pyatlan.utils import init_guid, validate_required_fields

from .asset import Asset

if TYPE_CHECKING:
    from pyatlan.model.assets import (
        BigqueryRoutine,
        FabricActivity,
        Function,
        Procedure,
    )


[docs] class Process(Asset, type_name="Process"): """Description""" @classmethod @init_guid def creator( cls, name: str, connection_qualified_name: str, inputs: List["Catalog"], outputs: List["Catalog"], process_id: Optional[str] = None, parent: Optional[Process] = None, extra_hash_params: Optional[Set[str]] = None, ) -> Process: return Process( attributes=Process.Attributes.create( name=name, connection_qualified_name=connection_qualified_name, process_id=process_id, inputs=inputs, outputs=outputs, parent=parent, extra_hash_params=extra_hash_params, ) ) @classmethod @init_guid def create( cls, name: str, connection_qualified_name: str, inputs: List["Catalog"], outputs: List["Catalog"], process_id: Optional[str] = None, parent: Optional[Process] = None, extra_hash_params: Optional[Set[str]] = None, ) -> Process: warn( ( "This method is deprecated, please use 'creator' " "instead, which offers identical functionality." ), DeprecationWarning, stacklevel=2, ) return cls.creator( name=name, connection_qualified_name=connection_qualified_name, inputs=inputs, outputs=outputs, process_id=process_id, parent=parent, extra_hash_params=extra_hash_params, ) type_name: str = Field(default="Process", allow_mutation=False) @validator("type_name") def validate_type_name(cls, v): if v != "Process": raise ValueError("must be Process") return v def __setattr__(self, name, value): if name in Process._convenience_properties: return object.__setattr__(self, name, value) super().__setattr__(name, value) CODE: ClassVar[TextField] = TextField("code", "code") """ Code that ran within the process. """ SQL: ClassVar[TextField] = TextField("sql", "sql") """ SQL query that ran to produce the outputs. """ PARENT_CONNECTION_PROCESS_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField( "parentConnectionProcessQualifiedName", "parentConnectionProcessQualifiedName" ) """ """ AST: ClassVar[TextField] = TextField("ast", "ast") """ Parsed AST of the code or SQL statements that describe the logic of this process. """ ADDITIONAL_ETL_CONTEXT: ClassVar[TextField] = TextField( "additionalEtlContext", "additionalEtlContext" ) """ Additional Context of the ETL pipeline/notebook which creates the process. """ AI_DATASET_TYPE: ClassVar[KeywordField] = KeywordField( "aiDatasetType", "aiDatasetType" ) """ Dataset type for AI Model - dataset process. """ FLOW_ORCHESTRATED_BY: ClassVar[RelationField] = RelationField("flowOrchestratedBy") """ TBC """ SQL_PROCEDURES: ClassVar[RelationField] = RelationField("sqlProcedures") """ TBC """ FABRIC_ACTIVITIES: ClassVar[RelationField] = RelationField("fabricActivities") """ TBC """ ADF_ACTIVITY: ClassVar[RelationField] = RelationField("adfActivity") """ TBC """ BIGQUERY_ROUTINES: ClassVar[RelationField] = RelationField("bigqueryRoutines") """ TBC """ SPARK_JOBS: ClassVar[RelationField] = RelationField("sparkJobs") """ TBC """ SQL_FUNCTIONS: ClassVar[RelationField] = RelationField("sqlFunctions") """ TBC """ MATILLION_COMPONENT: ClassVar[RelationField] = RelationField("matillionComponent") """ TBC """ AIRFLOW_TASKS: ClassVar[RelationField] = RelationField("airflowTasks") """ TBC """ FIVETRAN_CONNECTOR: ClassVar[RelationField] = RelationField("fivetranConnector") """ TBC """ POWER_BI_DATAFLOW: ClassVar[RelationField] = RelationField("powerBIDataflow") """ TBC """ COLUMN_PROCESSES: ClassVar[RelationField] = RelationField("columnProcesses") """ TBC """ _convenience_properties: ClassVar[List[str]] = [ "inputs", "outputs", "code", "sql", "parent_connection_process_qualified_name", "ast", "additional_etl_context", "ai_dataset_type", "flow_orchestrated_by", "sql_procedures", "fabric_activities", "adf_activity", "bigquery_routines", "spark_jobs", "sql_functions", "matillion_component", "airflow_tasks", "fivetran_connector", "power_b_i_dataflow", "column_processes", ] @property def inputs(self) -> Optional[List[Catalog]]: return None if self.attributes is None else self.attributes.inputs @inputs.setter def inputs(self, inputs: Optional[List[Catalog]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.inputs = inputs @property def outputs(self) -> Optional[List[Catalog]]: return None if self.attributes is None else self.attributes.outputs @outputs.setter def outputs(self, outputs: Optional[List[Catalog]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.outputs = outputs @property def code(self) -> Optional[str]: return None if self.attributes is None else self.attributes.code @code.setter def code(self, code: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.code = code @property def sql(self) -> Optional[str]: return None if self.attributes is None else self.attributes.sql @sql.setter def sql(self, sql: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.sql = sql @property def parent_connection_process_qualified_name(self) -> Optional[Set[str]]: return ( None if self.attributes is None else self.attributes.parent_connection_process_qualified_name ) @parent_connection_process_qualified_name.setter def parent_connection_process_qualified_name( self, parent_connection_process_qualified_name: Optional[Set[str]] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.parent_connection_process_qualified_name = ( parent_connection_process_qualified_name ) @property def ast(self) -> Optional[str]: return None if self.attributes is None else self.attributes.ast @ast.setter def ast(self, ast: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.ast = ast @property def additional_etl_context(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.additional_etl_context ) @additional_etl_context.setter def additional_etl_context(self, additional_etl_context: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.additional_etl_context = additional_etl_context @property def ai_dataset_type(self) -> Optional[AIDatasetType]: return None if self.attributes is None else self.attributes.ai_dataset_type @ai_dataset_type.setter def ai_dataset_type(self, ai_dataset_type: Optional[AIDatasetType]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.ai_dataset_type = ai_dataset_type @property def flow_orchestrated_by(self) -> Optional[FlowControlOperation]: return None if self.attributes is None else self.attributes.flow_orchestrated_by @flow_orchestrated_by.setter def flow_orchestrated_by( self, flow_orchestrated_by: Optional[FlowControlOperation] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.flow_orchestrated_by = flow_orchestrated_by @property def sql_procedures(self) -> Optional[List[Procedure]]: return None if self.attributes is None else self.attributes.sql_procedures @sql_procedures.setter def sql_procedures(self, sql_procedures: Optional[List[Procedure]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.sql_procedures = sql_procedures @property def fabric_activities(self) -> Optional[List[FabricActivity]]: return None if self.attributes is None else self.attributes.fabric_activities @fabric_activities.setter def fabric_activities(self, fabric_activities: Optional[List[FabricActivity]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.fabric_activities = fabric_activities @property def adf_activity(self) -> Optional[AdfActivity]: return None if self.attributes is None else self.attributes.adf_activity @adf_activity.setter def adf_activity(self, adf_activity: Optional[AdfActivity]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_activity = adf_activity @property def bigquery_routines(self) -> Optional[List[BigqueryRoutine]]: return None if self.attributes is None else self.attributes.bigquery_routines @bigquery_routines.setter def bigquery_routines(self, bigquery_routines: Optional[List[BigqueryRoutine]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.bigquery_routines = bigquery_routines @property def spark_jobs(self) -> Optional[List[SparkJob]]: return None if self.attributes is None else self.attributes.spark_jobs @spark_jobs.setter def spark_jobs(self, spark_jobs: Optional[List[SparkJob]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.spark_jobs = spark_jobs @property def sql_functions(self) -> Optional[List[Function]]: return None if self.attributes is None else self.attributes.sql_functions @sql_functions.setter def sql_functions(self, sql_functions: Optional[List[Function]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.sql_functions = sql_functions @property def matillion_component(self) -> Optional[MatillionComponent]: return None if self.attributes is None else self.attributes.matillion_component @matillion_component.setter def matillion_component(self, matillion_component: Optional[MatillionComponent]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.matillion_component = matillion_component @property def airflow_tasks(self) -> Optional[List[AirflowTask]]: return None if self.attributes is None else self.attributes.airflow_tasks @airflow_tasks.setter def airflow_tasks(self, airflow_tasks: Optional[List[AirflowTask]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.airflow_tasks = airflow_tasks @property def fivetran_connector(self) -> Optional[FivetranConnector]: return None if self.attributes is None else self.attributes.fivetran_connector @fivetran_connector.setter def fivetran_connector(self, fivetran_connector: Optional[FivetranConnector]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.fivetran_connector = fivetran_connector @property def power_b_i_dataflow(self) -> Optional[PowerBIDataflow]: return None if self.attributes is None else self.attributes.power_b_i_dataflow @power_b_i_dataflow.setter def power_b_i_dataflow(self, power_b_i_dataflow: Optional[PowerBIDataflow]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.power_b_i_dataflow = power_b_i_dataflow @property def column_processes(self) -> Optional[List[ColumnProcess]]: return None if self.attributes is None else self.attributes.column_processes @column_processes.setter def column_processes(self, column_processes: Optional[List[ColumnProcess]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.column_processes = column_processes class Attributes(Asset.Attributes): inputs: Optional[List[Catalog]] = Field(default=None, description="") outputs: Optional[List[Catalog]] = Field(default=None, description="") code: Optional[str] = Field(default=None, description="") sql: Optional[str] = Field(default=None, description="") parent_connection_process_qualified_name: Optional[Set[str]] = Field( default=None, description="" ) ast: Optional[str] = Field(default=None, description="") additional_etl_context: Optional[str] = Field(default=None, description="") ai_dataset_type: Optional[AIDatasetType] = Field(default=None, description="") flow_orchestrated_by: Optional[FlowControlOperation] = Field( default=None, description="" ) # relationship sql_procedures: Optional[List[Procedure]] = Field( default=None, description="" ) # relationship fabric_activities: Optional[List[FabricActivity]] = Field( default=None, description="" ) # relationship adf_activity: Optional[AdfActivity] = Field( default=None, description="" ) # relationship bigquery_routines: Optional[List[BigqueryRoutine]] = Field( default=None, description="" ) # relationship spark_jobs: Optional[List[SparkJob]] = Field( default=None, description="" ) # relationship sql_functions: Optional[List[Function]] = Field( default=None, description="" ) # relationship matillion_component: Optional[MatillionComponent] = Field( default=None, description="" ) # relationship airflow_tasks: Optional[List[AirflowTask]] = Field( default=None, description="" ) # relationship fivetran_connector: Optional[FivetranConnector] = Field( default=None, description="" ) # relationship power_b_i_dataflow: Optional[PowerBIDataflow] = Field( default=None, description="" ) # relationship column_processes: Optional[List[ColumnProcess]] = Field( default=None, description="" ) # relationship @staticmethod def generate_qualified_name( name: str, connection_qualified_name: str, inputs: List["Catalog"], outputs: List["Catalog"], parent: Optional["Process"] = None, process_id: Optional[str] = None, extra_hash_params: Optional[Set[str]] = None, ) -> str: def append_relationship(output: StringIO, relationship: Asset): if relationship.guid: output.write(relationship.guid) def append_relationships(output: StringIO, relationships: List["Catalog"]): for catalog in relationships: append_relationship(output, catalog) validate_required_fields( ["name", "connection_qualified_name", "inputs", "outputs"], [name, connection_qualified_name, inputs, outputs], ) extra_hash_params = extra_hash_params or set() if process_id and process_id.strip(): return f"{connection_qualified_name}/{process_id}" buffer = StringIO() buffer.write(name) buffer.write(connection_qualified_name) if parent: append_relationship(buffer, parent) append_relationships(buffer, inputs) append_relationships(buffer, outputs) # Handles edge case where identical name, connection, input, and output caused hash collisions, # resulting in duplicate qualified names and backend skipping process creation. if extra_hash_params: for param in extra_hash_params: buffer.write(param) # file deepcode ignore InsecureHash: this is not used for generating security keys ret_value = hashlib.md5( # noqa: S303, S324 buffer.getvalue().encode() ).hexdigest() buffer.close() return f"{connection_qualified_name}/{ret_value}" @classmethod @init_guid def create( cls, name: str, connection_qualified_name: str, inputs: List["Catalog"], outputs: List["Catalog"], process_id: Optional[str] = None, parent: Optional[Process] = None, extra_hash_params: Optional[Set[str]] = None, ) -> Process.Attributes: qualified_name = Process.Attributes.generate_qualified_name( name=name, connection_qualified_name=connection_qualified_name, process_id=process_id, inputs=inputs, outputs=outputs, parent=parent, extra_hash_params=extra_hash_params, ) connector_name = connection_qualified_name.split("/")[1] return Process.Attributes( name=name, qualified_name=qualified_name, connector_name=connector_name, connection_qualified_name=connection_qualified_name, inputs=inputs, outputs=outputs, ) attributes: Process.Attributes = Field( default_factory=lambda: Process.Attributes(), description=( "Map of attributes in the instance and their values. " "The specific keys of this map will vary by type, " "so are described in the sub-types of this schema." ), )
from .adf_activity import AdfActivity # noqa: E402, F401 from .airflow_task import AirflowTask # noqa: E402, F401 from .catalog import Catalog # noqa: E402, F401 from .column_process import ColumnProcess # noqa: E402, F401 from .fivetran_connector import FivetranConnector # noqa: E402, F401 from .flow_control_operation import FlowControlOperation # noqa: E402, F401 from .matillion_component import MatillionComponent # noqa: E402, F401 from .power_b_i_dataflow import PowerBIDataflow # noqa: E402, F401 from .spark_job import SparkJob # noqa: E402, F401