# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
from __future__ import annotations
import hashlib
from io import StringIO
from typing import TYPE_CHECKING, ClassVar, List, Optional, Set
from warnings import warn
from pydantic.v1 import Field, validator
from pyatlan.model.enums import AIDatasetType
from pyatlan.model.fields.atlan_fields import KeywordField, RelationField, TextField
from pyatlan.utils import init_guid, validate_required_fields
from .asset import Asset
if TYPE_CHECKING:
from pyatlan.model.assets import (
BigqueryRoutine,
FabricActivity,
Function,
Procedure,
)
[docs]
class Process(Asset, type_name="Process"):
"""Description"""
@classmethod
@init_guid
def creator(
cls,
name: str,
connection_qualified_name: str,
inputs: List["Catalog"],
outputs: List["Catalog"],
process_id: Optional[str] = None,
parent: Optional[Process] = None,
extra_hash_params: Optional[Set[str]] = None,
) -> Process:
return Process(
attributes=Process.Attributes.create(
name=name,
connection_qualified_name=connection_qualified_name,
process_id=process_id,
inputs=inputs,
outputs=outputs,
parent=parent,
extra_hash_params=extra_hash_params,
)
)
@classmethod
@init_guid
def create(
cls,
name: str,
connection_qualified_name: str,
inputs: List["Catalog"],
outputs: List["Catalog"],
process_id: Optional[str] = None,
parent: Optional[Process] = None,
extra_hash_params: Optional[Set[str]] = None,
) -> Process:
warn(
(
"This method is deprecated, please use 'creator' "
"instead, which offers identical functionality."
),
DeprecationWarning,
stacklevel=2,
)
return cls.creator(
name=name,
connection_qualified_name=connection_qualified_name,
inputs=inputs,
outputs=outputs,
process_id=process_id,
parent=parent,
extra_hash_params=extra_hash_params,
)
type_name: str = Field(default="Process", allow_mutation=False)
@validator("type_name")
def validate_type_name(cls, v):
if v != "Process":
raise ValueError("must be Process")
return v
def __setattr__(self, name, value):
if name in Process._convenience_properties:
return object.__setattr__(self, name, value)
super().__setattr__(name, value)
CODE: ClassVar[TextField] = TextField("code", "code")
"""
Code that ran within the process.
"""
SQL: ClassVar[TextField] = TextField("sql", "sql")
"""
SQL query that ran to produce the outputs.
"""
PARENT_CONNECTION_PROCESS_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField(
"parentConnectionProcessQualifiedName", "parentConnectionProcessQualifiedName"
)
"""
"""
AST: ClassVar[TextField] = TextField("ast", "ast")
"""
Parsed AST of the code or SQL statements that describe the logic of this process.
"""
ADDITIONAL_ETL_CONTEXT: ClassVar[TextField] = TextField(
"additionalEtlContext", "additionalEtlContext"
)
"""
Additional Context of the ETL pipeline/notebook which creates the process.
"""
AI_DATASET_TYPE: ClassVar[KeywordField] = KeywordField(
"aiDatasetType", "aiDatasetType"
)
"""
Dataset type for AI Model - dataset process.
"""
FLOW_ORCHESTRATED_BY: ClassVar[RelationField] = RelationField("flowOrchestratedBy")
"""
TBC
"""
SQL_PROCEDURES: ClassVar[RelationField] = RelationField("sqlProcedures")
"""
TBC
"""
FABRIC_ACTIVITIES: ClassVar[RelationField] = RelationField("fabricActivities")
"""
TBC
"""
ADF_ACTIVITY: ClassVar[RelationField] = RelationField("adfActivity")
"""
TBC
"""
BIGQUERY_ROUTINES: ClassVar[RelationField] = RelationField("bigqueryRoutines")
"""
TBC
"""
SPARK_JOBS: ClassVar[RelationField] = RelationField("sparkJobs")
"""
TBC
"""
SQL_FUNCTIONS: ClassVar[RelationField] = RelationField("sqlFunctions")
"""
TBC
"""
MATILLION_COMPONENT: ClassVar[RelationField] = RelationField("matillionComponent")
"""
TBC
"""
AIRFLOW_TASKS: ClassVar[RelationField] = RelationField("airflowTasks")
"""
TBC
"""
FIVETRAN_CONNECTOR: ClassVar[RelationField] = RelationField("fivetranConnector")
"""
TBC
"""
POWER_BI_DATAFLOW: ClassVar[RelationField] = RelationField("powerBIDataflow")
"""
TBC
"""
COLUMN_PROCESSES: ClassVar[RelationField] = RelationField("columnProcesses")
"""
TBC
"""
_convenience_properties: ClassVar[List[str]] = [
"inputs",
"outputs",
"code",
"sql",
"parent_connection_process_qualified_name",
"ast",
"additional_etl_context",
"ai_dataset_type",
"flow_orchestrated_by",
"sql_procedures",
"fabric_activities",
"adf_activity",
"bigquery_routines",
"spark_jobs",
"sql_functions",
"matillion_component",
"airflow_tasks",
"fivetran_connector",
"power_b_i_dataflow",
"column_processes",
]
@property
def inputs(self) -> Optional[List[Catalog]]:
return None if self.attributes is None else self.attributes.inputs
@inputs.setter
def inputs(self, inputs: Optional[List[Catalog]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.inputs = inputs
@property
def outputs(self) -> Optional[List[Catalog]]:
return None if self.attributes is None else self.attributes.outputs
@outputs.setter
def outputs(self, outputs: Optional[List[Catalog]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.outputs = outputs
@property
def code(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.code
@code.setter
def code(self, code: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.code = code
@property
def sql(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.sql
@sql.setter
def sql(self, sql: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.sql = sql
@property
def parent_connection_process_qualified_name(self) -> Optional[Set[str]]:
return (
None
if self.attributes is None
else self.attributes.parent_connection_process_qualified_name
)
@parent_connection_process_qualified_name.setter
def parent_connection_process_qualified_name(
self, parent_connection_process_qualified_name: Optional[Set[str]]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.parent_connection_process_qualified_name = (
parent_connection_process_qualified_name
)
@property
def ast(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.ast
@ast.setter
def ast(self, ast: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.ast = ast
@property
def additional_etl_context(self) -> Optional[str]:
return (
None if self.attributes is None else self.attributes.additional_etl_context
)
@additional_etl_context.setter
def additional_etl_context(self, additional_etl_context: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.additional_etl_context = additional_etl_context
@property
def ai_dataset_type(self) -> Optional[AIDatasetType]:
return None if self.attributes is None else self.attributes.ai_dataset_type
@ai_dataset_type.setter
def ai_dataset_type(self, ai_dataset_type: Optional[AIDatasetType]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.ai_dataset_type = ai_dataset_type
@property
def flow_orchestrated_by(self) -> Optional[FlowControlOperation]:
return None if self.attributes is None else self.attributes.flow_orchestrated_by
@flow_orchestrated_by.setter
def flow_orchestrated_by(
self, flow_orchestrated_by: Optional[FlowControlOperation]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.flow_orchestrated_by = flow_orchestrated_by
@property
def sql_procedures(self) -> Optional[List[Procedure]]:
return None if self.attributes is None else self.attributes.sql_procedures
@sql_procedures.setter
def sql_procedures(self, sql_procedures: Optional[List[Procedure]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.sql_procedures = sql_procedures
@property
def fabric_activities(self) -> Optional[List[FabricActivity]]:
return None if self.attributes is None else self.attributes.fabric_activities
@fabric_activities.setter
def fabric_activities(self, fabric_activities: Optional[List[FabricActivity]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.fabric_activities = fabric_activities
@property
def adf_activity(self) -> Optional[AdfActivity]:
return None if self.attributes is None else self.attributes.adf_activity
@adf_activity.setter
def adf_activity(self, adf_activity: Optional[AdfActivity]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity = adf_activity
@property
def bigquery_routines(self) -> Optional[List[BigqueryRoutine]]:
return None if self.attributes is None else self.attributes.bigquery_routines
@bigquery_routines.setter
def bigquery_routines(self, bigquery_routines: Optional[List[BigqueryRoutine]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.bigquery_routines = bigquery_routines
@property
def spark_jobs(self) -> Optional[List[SparkJob]]:
return None if self.attributes is None else self.attributes.spark_jobs
@spark_jobs.setter
def spark_jobs(self, spark_jobs: Optional[List[SparkJob]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.spark_jobs = spark_jobs
@property
def sql_functions(self) -> Optional[List[Function]]:
return None if self.attributes is None else self.attributes.sql_functions
@sql_functions.setter
def sql_functions(self, sql_functions: Optional[List[Function]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.sql_functions = sql_functions
@property
def matillion_component(self) -> Optional[MatillionComponent]:
return None if self.attributes is None else self.attributes.matillion_component
@matillion_component.setter
def matillion_component(self, matillion_component: Optional[MatillionComponent]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.matillion_component = matillion_component
@property
def airflow_tasks(self) -> Optional[List[AirflowTask]]:
return None if self.attributes is None else self.attributes.airflow_tasks
@airflow_tasks.setter
def airflow_tasks(self, airflow_tasks: Optional[List[AirflowTask]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.airflow_tasks = airflow_tasks
@property
def fivetran_connector(self) -> Optional[FivetranConnector]:
return None if self.attributes is None else self.attributes.fivetran_connector
@fivetran_connector.setter
def fivetran_connector(self, fivetran_connector: Optional[FivetranConnector]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.fivetran_connector = fivetran_connector
@property
def power_b_i_dataflow(self) -> Optional[PowerBIDataflow]:
return None if self.attributes is None else self.attributes.power_b_i_dataflow
@power_b_i_dataflow.setter
def power_b_i_dataflow(self, power_b_i_dataflow: Optional[PowerBIDataflow]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.power_b_i_dataflow = power_b_i_dataflow
@property
def column_processes(self) -> Optional[List[ColumnProcess]]:
return None if self.attributes is None else self.attributes.column_processes
@column_processes.setter
def column_processes(self, column_processes: Optional[List[ColumnProcess]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.column_processes = column_processes
class Attributes(Asset.Attributes):
inputs: Optional[List[Catalog]] = Field(default=None, description="")
outputs: Optional[List[Catalog]] = Field(default=None, description="")
code: Optional[str] = Field(default=None, description="")
sql: Optional[str] = Field(default=None, description="")
parent_connection_process_qualified_name: Optional[Set[str]] = Field(
default=None, description=""
)
ast: Optional[str] = Field(default=None, description="")
additional_etl_context: Optional[str] = Field(default=None, description="")
ai_dataset_type: Optional[AIDatasetType] = Field(default=None, description="")
flow_orchestrated_by: Optional[FlowControlOperation] = Field(
default=None, description=""
) # relationship
sql_procedures: Optional[List[Procedure]] = Field(
default=None, description=""
) # relationship
fabric_activities: Optional[List[FabricActivity]] = Field(
default=None, description=""
) # relationship
adf_activity: Optional[AdfActivity] = Field(
default=None, description=""
) # relationship
bigquery_routines: Optional[List[BigqueryRoutine]] = Field(
default=None, description=""
) # relationship
spark_jobs: Optional[List[SparkJob]] = Field(
default=None, description=""
) # relationship
sql_functions: Optional[List[Function]] = Field(
default=None, description=""
) # relationship
matillion_component: Optional[MatillionComponent] = Field(
default=None, description=""
) # relationship
airflow_tasks: Optional[List[AirflowTask]] = Field(
default=None, description=""
) # relationship
fivetran_connector: Optional[FivetranConnector] = Field(
default=None, description=""
) # relationship
power_b_i_dataflow: Optional[PowerBIDataflow] = Field(
default=None, description=""
) # relationship
column_processes: Optional[List[ColumnProcess]] = Field(
default=None, description=""
) # relationship
@staticmethod
def generate_qualified_name(
name: str,
connection_qualified_name: str,
inputs: List["Catalog"],
outputs: List["Catalog"],
parent: Optional["Process"] = None,
process_id: Optional[str] = None,
extra_hash_params: Optional[Set[str]] = None,
) -> str:
def append_relationship(output: StringIO, relationship: Asset):
if relationship.guid:
output.write(relationship.guid)
def append_relationships(output: StringIO, relationships: List["Catalog"]):
for catalog in relationships:
append_relationship(output, catalog)
validate_required_fields(
["name", "connection_qualified_name", "inputs", "outputs"],
[name, connection_qualified_name, inputs, outputs],
)
extra_hash_params = extra_hash_params or set()
if process_id and process_id.strip():
return f"{connection_qualified_name}/{process_id}"
buffer = StringIO()
buffer.write(name)
buffer.write(connection_qualified_name)
if parent:
append_relationship(buffer, parent)
append_relationships(buffer, inputs)
append_relationships(buffer, outputs)
# Handles edge case where identical name, connection, input, and output caused hash collisions,
# resulting in duplicate qualified names and backend skipping process creation.
if extra_hash_params:
for param in extra_hash_params:
buffer.write(param)
# file deepcode ignore InsecureHash: this is not used for generating security keys
ret_value = hashlib.md5( # noqa: S303, S324
buffer.getvalue().encode()
).hexdigest()
buffer.close()
return f"{connection_qualified_name}/{ret_value}"
@classmethod
@init_guid
def create(
cls,
name: str,
connection_qualified_name: str,
inputs: List["Catalog"],
outputs: List["Catalog"],
process_id: Optional[str] = None,
parent: Optional[Process] = None,
extra_hash_params: Optional[Set[str]] = None,
) -> Process.Attributes:
qualified_name = Process.Attributes.generate_qualified_name(
name=name,
connection_qualified_name=connection_qualified_name,
process_id=process_id,
inputs=inputs,
outputs=outputs,
parent=parent,
extra_hash_params=extra_hash_params,
)
connector_name = connection_qualified_name.split("/")[1]
return Process.Attributes(
name=name,
qualified_name=qualified_name,
connector_name=connector_name,
connection_qualified_name=connection_qualified_name,
inputs=inputs,
outputs=outputs,
)
attributes: Process.Attributes = Field(
default_factory=lambda: Process.Attributes(),
description=(
"Map of attributes in the instance and their values. "
"The specific keys of this map will vary by type, "
"so are described in the sub-types of this schema."
),
)
from .adf_activity import AdfActivity # noqa: E402, F401
from .airflow_task import AirflowTask # noqa: E402, F401
from .catalog import Catalog # noqa: E402, F401
from .column_process import ColumnProcess # noqa: E402, F401
from .fivetran_connector import FivetranConnector # noqa: E402, F401
from .flow_control_operation import FlowControlOperation # noqa: E402, F401
from .matillion_component import MatillionComponent # noqa: E402, F401
from .power_b_i_dataflow import PowerBIDataflow # noqa: E402, F401
from .spark_job import SparkJob # noqa: E402, F401