# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
from __future__ import annotations
from typing import ClassVar, List, Optional, overload
from pydantic.v1 import Field, validator
from pyatlan.model.enums import AtlanConnectorType
from pyatlan.model.fields.atlan_fields import (
KeywordField,
KeywordTextField,
NumericField,
RelationField,
TextField,
)
from pyatlan.utils import init_guid, validate_required_fields
from .airflow import Airflow
[docs]
class AirflowTask(Airflow):
"""Description"""
@overload
@classmethod
def creator(
cls,
*,
name: str,
airflow_dag_qualified_name: str,
) -> AirflowTask: ...
@overload
@classmethod
def creator(
cls,
*,
name: str,
airflow_dag_qualified_name: str,
connection_qualified_name: str,
) -> AirflowTask: ...
@classmethod
@init_guid
def creator(
cls,
*,
name: str,
airflow_dag_qualified_name: str,
connection_qualified_name: Optional[str] = None,
) -> AirflowTask:
validate_required_fields(
["name", "airflow_dag_qualified_name"],
[name, airflow_dag_qualified_name],
)
attributes = AirflowTask.Attributes.creator(
name=name,
airflow_dag_qualified_name=airflow_dag_qualified_name,
connection_qualified_name=connection_qualified_name,
)
return cls(attributes=attributes)
type_name: str = Field(default="AirflowTask", allow_mutation=False)
@validator("type_name")
def validate_type_name(cls, v):
if v != "AirflowTask":
raise ValueError("must be AirflowTask")
return v
def __setattr__(self, name, value):
if name in AirflowTask._convenience_properties:
return object.__setattr__(self, name, value)
super().__setattr__(name, value)
AIRFLOW_TASK_OPERATOR_CLASS: ClassVar[KeywordTextField] = KeywordTextField(
"airflowTaskOperatorClass",
"airflowTaskOperatorClass.keyword",
"airflowTaskOperatorClass",
)
"""
Class name for the operator this task uses.
"""
AIRFLOW_DAG_NAME: ClassVar[KeywordTextField] = KeywordTextField(
"airflowDagName", "airflowDagName.keyword", "airflowDagName"
)
"""
Simple name of the DAG this task is contained within.
"""
AIRFLOW_DAG_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField(
"airflowDagQualifiedName", "airflowDagQualifiedName"
)
"""
Unique name of the DAG this task is contained within.
"""
AIRFLOW_TASK_CONNECTION_ID: ClassVar[KeywordTextField] = KeywordTextField(
"airflowTaskConnectionId",
"airflowTaskConnectionId.keyword",
"airflowTaskConnectionId",
)
"""
Identifier for the connection this task accesses.
"""
AIRFLOW_TASK_SQL: ClassVar[TextField] = TextField(
"airflowTaskSql", "airflowTaskSql"
)
"""
SQL code that executes through this task.
"""
AIRFLOW_TASK_RETRY_NUMBER: ClassVar[NumericField] = NumericField(
"airflowTaskRetryNumber", "airflowTaskRetryNumber"
)
"""
Retry count for this task running.
"""
AIRFLOW_TASK_POOL: ClassVar[KeywordField] = KeywordField(
"airflowTaskPool", "airflowTaskPool"
)
"""
Pool on which this run happened.
"""
AIRFLOW_TASK_POOL_SLOTS: ClassVar[NumericField] = NumericField(
"airflowTaskPoolSlots", "airflowTaskPoolSlots"
)
"""
Pool slots used for the run.
"""
AIRFLOW_TASK_QUEUE: ClassVar[KeywordField] = KeywordField(
"airflowTaskQueue", "airflowTaskQueue"
)
"""
Queue on which this run happened.
"""
AIRFLOW_TASK_PRIORITY_WEIGHT: ClassVar[NumericField] = NumericField(
"airflowTaskPriorityWeight", "airflowTaskPriorityWeight"
)
"""
Priority of the run.
"""
AIRFLOW_TASK_TRIGGER_RULE: ClassVar[KeywordField] = KeywordField(
"airflowTaskTriggerRule", "airflowTaskTriggerRule"
)
"""
Trigger for the run.
"""
AIRFLOW_TASK_GROUP_NAME: ClassVar[KeywordField] = KeywordField(
"airflowTaskGroupName", "airflowTaskGroupName"
)
"""
Group name for the task.
"""
OUTPUTS: ClassVar[RelationField] = RelationField("outputs")
"""
TBC
"""
INPUTS: ClassVar[RelationField] = RelationField("inputs")
"""
TBC
"""
PROCESS: ClassVar[RelationField] = RelationField("process")
"""
TBC
"""
AIRFLOW_DAG: ClassVar[RelationField] = RelationField("airflowDag")
"""
TBC
"""
_convenience_properties: ClassVar[List[str]] = [
"airflow_task_operator_class",
"airflow_dag_name",
"airflow_dag_qualified_name",
"airflow_task_connection_id",
"airflow_task_sql",
"airflow_task_retry_number",
"airflow_task_pool",
"airflow_task_pool_slots",
"airflow_task_queue",
"airflow_task_priority_weight",
"airflow_task_trigger_rule",
"airflow_task_group_name",
"outputs",
"inputs",
"process",
"airflow_dag",
]
@property
def airflow_task_operator_class(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.airflow_task_operator_class
)
@airflow_task_operator_class.setter
def airflow_task_operator_class(self, airflow_task_operator_class: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.airflow_task_operator_class = airflow_task_operator_class
@property
def airflow_dag_name(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.airflow_dag_name
@airflow_dag_name.setter
def airflow_dag_name(self, airflow_dag_name: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.airflow_dag_name = airflow_dag_name
@property
def airflow_dag_qualified_name(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.airflow_dag_qualified_name
)
@airflow_dag_qualified_name.setter
def airflow_dag_qualified_name(self, airflow_dag_qualified_name: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.airflow_dag_qualified_name = airflow_dag_qualified_name
@property
def airflow_task_connection_id(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.airflow_task_connection_id
)
@airflow_task_connection_id.setter
def airflow_task_connection_id(self, airflow_task_connection_id: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.airflow_task_connection_id = airflow_task_connection_id
@property
def airflow_task_sql(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.airflow_task_sql
@airflow_task_sql.setter
def airflow_task_sql(self, airflow_task_sql: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.airflow_task_sql = airflow_task_sql
@property
def airflow_task_retry_number(self) -> Optional[int]:
return (
None
if self.attributes is None
else self.attributes.airflow_task_retry_number
)
@airflow_task_retry_number.setter
def airflow_task_retry_number(self, airflow_task_retry_number: Optional[int]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.airflow_task_retry_number = airflow_task_retry_number
@property
def airflow_task_pool(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.airflow_task_pool
@airflow_task_pool.setter
def airflow_task_pool(self, airflow_task_pool: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.airflow_task_pool = airflow_task_pool
@property
def airflow_task_pool_slots(self) -> Optional[int]:
return (
None if self.attributes is None else self.attributes.airflow_task_pool_slots
)
@airflow_task_pool_slots.setter
def airflow_task_pool_slots(self, airflow_task_pool_slots: Optional[int]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.airflow_task_pool_slots = airflow_task_pool_slots
@property
def airflow_task_queue(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.airflow_task_queue
@airflow_task_queue.setter
def airflow_task_queue(self, airflow_task_queue: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.airflow_task_queue = airflow_task_queue
@property
def airflow_task_priority_weight(self) -> Optional[int]:
return (
None
if self.attributes is None
else self.attributes.airflow_task_priority_weight
)
@airflow_task_priority_weight.setter
def airflow_task_priority_weight(self, airflow_task_priority_weight: Optional[int]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.airflow_task_priority_weight = airflow_task_priority_weight
@property
def airflow_task_trigger_rule(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.airflow_task_trigger_rule
)
@airflow_task_trigger_rule.setter
def airflow_task_trigger_rule(self, airflow_task_trigger_rule: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.airflow_task_trigger_rule = airflow_task_trigger_rule
@property
def airflow_task_group_name(self) -> Optional[str]:
return (
None if self.attributes is None else self.attributes.airflow_task_group_name
)
@airflow_task_group_name.setter
def airflow_task_group_name(self, airflow_task_group_name: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.airflow_task_group_name = airflow_task_group_name
@property
def outputs(self) -> Optional[List[Catalog]]:
return None if self.attributes is None else self.attributes.outputs
@outputs.setter
def outputs(self, outputs: Optional[List[Catalog]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.outputs = outputs
@property
def inputs(self) -> Optional[List[Catalog]]:
return None if self.attributes is None else self.attributes.inputs
@inputs.setter
def inputs(self, inputs: Optional[List[Catalog]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.inputs = inputs
@property
def process(self) -> Optional[Process]:
return None if self.attributes is None else self.attributes.process
@process.setter
def process(self, process: Optional[Process]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.process = process
@property
def airflow_dag(self) -> Optional[AirflowDag]:
return None if self.attributes is None else self.attributes.airflow_dag
@airflow_dag.setter
def airflow_dag(self, airflow_dag: Optional[AirflowDag]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.airflow_dag = airflow_dag
class Attributes(Airflow.Attributes):
airflow_task_operator_class: Optional[str] = Field(default=None, description="")
airflow_dag_name: Optional[str] = Field(default=None, description="")
airflow_dag_qualified_name: Optional[str] = Field(default=None, description="")
airflow_task_connection_id: Optional[str] = Field(default=None, description="")
airflow_task_sql: Optional[str] = Field(default=None, description="")
airflow_task_retry_number: Optional[int] = Field(default=None, description="")
airflow_task_pool: Optional[str] = Field(default=None, description="")
airflow_task_pool_slots: Optional[int] = Field(default=None, description="")
airflow_task_queue: Optional[str] = Field(default=None, description="")
airflow_task_priority_weight: Optional[int] = Field(
default=None, description=""
)
airflow_task_trigger_rule: Optional[str] = Field(default=None, description="")
airflow_task_group_name: Optional[str] = Field(default=None, description="")
outputs: Optional[List[Catalog]] = Field(
default=None, description=""
) # relationship
inputs: Optional[List[Catalog]] = Field(
default=None, description=""
) # relationship
process: Optional[Process] = Field(default=None, description="") # relationship
airflow_dag: Optional[AirflowDag] = Field(
default=None, description=""
) # relationship
@classmethod
@init_guid
def creator(
cls,
*,
name: str,
airflow_dag_qualified_name: str,
connection_qualified_name: Optional[str] = None,
) -> AirflowTask.Attributes:
validate_required_fields(
["name", "airflow_dag_qualified_name"],
[name, airflow_dag_qualified_name],
)
if connection_qualified_name:
connector_name = AtlanConnectorType.get_connector_name(
connection_qualified_name
)
else:
connection_qn, connector_name = AtlanConnectorType.get_connector_name(
airflow_dag_qualified_name, "airflow_dag_qualified_name", 4
)
return AirflowTask.Attributes(
name=name,
airflow_dag_qualified_name=airflow_dag_qualified_name,
connection_qualified_name=connection_qualified_name or connection_qn,
qualified_name=f"{airflow_dag_qualified_name}/{name}",
connector_name=connector_name,
airflow_dag=AirflowDag.ref_by_qualified_name(
airflow_dag_qualified_name
),
)
attributes: AirflowTask.Attributes = Field(
default_factory=lambda: AirflowTask.Attributes(),
description=(
"Map of attributes in the instance and their values. "
"The specific keys of this map will vary by type, "
"so are described in the sub-types of this schema."
),
)
from .airflow_dag import AirflowDag # noqa: E402, F401
from .catalog import Catalog # noqa: E402, F401
from .process import Process # noqa: E402, F401