# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
from __future__ import annotations
from typing import ClassVar, Dict, List, Optional, Set
from pydantic.v1 import Field, validator
from pyatlan.model.enums import AdfActivityState
from pyatlan.model.fields.atlan_fields import (
BooleanField,
KeywordField,
KeywordTextField,
NumericField,
RelationField,
TextField,
)
from .a_d_f import ADF
[docs]
class AdfActivity(ADF):
"""Description"""
type_name: str = Field(default="AdfActivity", allow_mutation=False)
@validator("type_name")
def validate_type_name(cls, v):
if v != "AdfActivity":
raise ValueError("must be AdfActivity")
return v
def __setattr__(self, name, value):
if name in AdfActivity._convenience_properties:
return object.__setattr__(self, name, value)
super().__setattr__(name, value)
ADF_ACTIVITY_TYPE: ClassVar[KeywordField] = KeywordField(
"adfActivityType", "adfActivityType"
)
"""
The type of the ADF activity.
"""
ADF_ACTIVITY_PRECEDING_DEPENDENCY: ClassVar[TextField] = TextField(
"adfActivityPrecedingDependency", "adfActivityPrecedingDependency"
)
"""
The list of ADF activities on which this ADF activity depends on.
"""
ADF_ACTIVITY_POLICY_TIMEOUT: ClassVar[TextField] = TextField(
"adfActivityPolicyTimeout", "adfActivityPolicyTimeout"
)
"""
The timout defined for the ADF activity.
"""
ADF_ACTIVITY_POLICT_RETRY_INTERVAL: ClassVar[NumericField] = NumericField(
"adfActivityPolictRetryInterval", "adfActivityPolictRetryInterval"
)
"""
The retry interval in seconds for the ADF activity.
"""
ADF_ACTIVITY_STATE: ClassVar[KeywordField] = KeywordField(
"adfActivityState", "adfActivityState"
)
"""
Defines the state (Active or Inactive) of an ADF activity whether it is active or not.
"""
ADF_ACTIVITY_SOURCES: ClassVar[TextField] = TextField(
"adfActivitySources", "adfActivitySources"
)
"""
The list of names of sources for the ADF activity.
"""
ADF_ACTIVITY_SINKS: ClassVar[TextField] = TextField(
"adfActivitySinks", "adfActivitySinks"
)
"""
The list of names of sinks for the ADF activity.
"""
ADF_ACTIVITY_SOURCE_TYPE: ClassVar[TextField] = TextField(
"adfActivitySourceType", "adfActivitySourceType"
)
"""
Defines the type of the source of the ADF activtity.
"""
ADF_ACTIVITY_SINK_TYPE: ClassVar[TextField] = TextField(
"adfActivitySinkType", "adfActivitySinkType"
)
"""
Defines the type of the sink of the ADF activtity.
"""
ADF_ACTIVITY_RUNS: ClassVar[KeywordField] = KeywordField(
"adfActivityRuns", "adfActivityRuns"
)
"""
List of objects of activity runs for a particular activity.
"""
ADF_ACTIVITY_NOTEBOOK_PATH: ClassVar[TextField] = TextField(
"adfActivityNotebookPath", "adfActivityNotebookPath"
)
"""
Defines the path of the notebook in the databricks notebook activity.
"""
ADF_ACTIVITY_MAIN_CLASS_NAME: ClassVar[TextField] = TextField(
"adfActivityMainClassName", "adfActivityMainClassName"
)
"""
Defines the main class of the databricks spark activity.
"""
ADF_ACTIVITY_PYTHON_FILE_PATH: ClassVar[TextField] = TextField(
"adfActivityPythonFilePath", "adfActivityPythonFilePath"
)
"""
Defines the python file path for databricks python activity.
"""
ADF_ACTIVITY_FIRST_ROW_ONLY: ClassVar[BooleanField] = BooleanField(
"adfActivityFirstRowOnly", "adfActivityFirstRowOnly"
)
"""
Indicates whether to import only first row only or not in Lookup activity.
"""
ADF_ACTIVITY_BATCH_COUNT: ClassVar[NumericField] = NumericField(
"adfActivityBatchCount", "adfActivityBatchCount"
)
"""
Defines the batch count of activity to runs in ForEach activity.
"""
ADF_ACTIVITY_IS_SEQUENTIAL: ClassVar[BooleanField] = BooleanField(
"adfActivityIsSequential", "adfActivityIsSequential"
)
"""
Indicates whether the activity processing is sequential or not inside the ForEach activity.
"""
ADF_ACTIVITY_SUB_ACTIVITIES: ClassVar[TextField] = TextField(
"adfActivitySubActivities", "adfActivitySubActivities"
)
"""
The list of activities to be run inside a ForEach activity.
"""
ADF_ACTIVITY_REFERENCE_DATAFLOW: ClassVar[TextField] = TextField(
"adfActivityReferenceDataflow", "adfActivityReferenceDataflow"
)
"""
Defines the dataflow that is to be used in dataflow activity.
"""
ADF_PIPELINE_QUALIFIED_NAME: ClassVar[KeywordTextField] = KeywordTextField(
"adfPipelineQualifiedName",
"adfPipelineQualifiedName",
"adfPipelineQualifiedName.text",
)
"""
Unique name of the pipeline in which this activity exists.
"""
ADF_LINKEDSERVICES: ClassVar[RelationField] = RelationField("adfLinkedservices")
"""
TBC
"""
ADF_DATASETS: ClassVar[RelationField] = RelationField("adfDatasets")
"""
TBC
"""
PROCESSES: ClassVar[RelationField] = RelationField("processes")
"""
TBC
"""
ADF_PIPELINE: ClassVar[RelationField] = RelationField("adfPipeline")
"""
TBC
"""
ADF_DATAFLOW: ClassVar[RelationField] = RelationField("adfDataflow")
"""
TBC
"""
_convenience_properties: ClassVar[List[str]] = [
"adf_activity_type",
"adf_activity_preceding_dependency",
"adf_activity_policy_timeout",
"adf_activity_polict_retry_interval",
"adf_activity_state",
"adf_activity_sources",
"adf_activity_sinks",
"adf_activity_source_type",
"adf_activity_sink_type",
"adf_activity_runs",
"adf_activity_notebook_path",
"adf_activity_main_class_name",
"adf_activity_python_file_path",
"adf_activity_first_row_only",
"adf_activity_batch_count",
"adf_activity_is_sequential",
"adf_activity_sub_activities",
"adf_activity_reference_dataflow",
"adf_pipeline_qualified_name",
"adf_linkedservices",
"adf_datasets",
"processes",
"adf_pipeline",
"adf_dataflow",
]
@property
def adf_activity_type(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.adf_activity_type
@adf_activity_type.setter
def adf_activity_type(self, adf_activity_type: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_type = adf_activity_type
@property
def adf_activity_preceding_dependency(self) -> Optional[Set[str]]:
return (
None
if self.attributes is None
else self.attributes.adf_activity_preceding_dependency
)
@adf_activity_preceding_dependency.setter
def adf_activity_preceding_dependency(
self, adf_activity_preceding_dependency: Optional[Set[str]]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_preceding_dependency = (
adf_activity_preceding_dependency
)
@property
def adf_activity_policy_timeout(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.adf_activity_policy_timeout
)
@adf_activity_policy_timeout.setter
def adf_activity_policy_timeout(self, adf_activity_policy_timeout: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_policy_timeout = adf_activity_policy_timeout
@property
def adf_activity_polict_retry_interval(self) -> Optional[int]:
return (
None
if self.attributes is None
else self.attributes.adf_activity_polict_retry_interval
)
@adf_activity_polict_retry_interval.setter
def adf_activity_polict_retry_interval(
self, adf_activity_polict_retry_interval: Optional[int]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_polict_retry_interval = (
adf_activity_polict_retry_interval
)
@property
def adf_activity_state(self) -> Optional[AdfActivityState]:
return None if self.attributes is None else self.attributes.adf_activity_state
@adf_activity_state.setter
def adf_activity_state(self, adf_activity_state: Optional[AdfActivityState]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_state = adf_activity_state
@property
def adf_activity_sources(self) -> Optional[Set[str]]:
return None if self.attributes is None else self.attributes.adf_activity_sources
@adf_activity_sources.setter
def adf_activity_sources(self, adf_activity_sources: Optional[Set[str]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_sources = adf_activity_sources
@property
def adf_activity_sinks(self) -> Optional[Set[str]]:
return None if self.attributes is None else self.attributes.adf_activity_sinks
@adf_activity_sinks.setter
def adf_activity_sinks(self, adf_activity_sinks: Optional[Set[str]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_sinks = adf_activity_sinks
@property
def adf_activity_source_type(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.adf_activity_source_type
)
@adf_activity_source_type.setter
def adf_activity_source_type(self, adf_activity_source_type: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_source_type = adf_activity_source_type
@property
def adf_activity_sink_type(self) -> Optional[str]:
return (
None if self.attributes is None else self.attributes.adf_activity_sink_type
)
@adf_activity_sink_type.setter
def adf_activity_sink_type(self, adf_activity_sink_type: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_sink_type = adf_activity_sink_type
@property
def adf_activity_runs(self) -> Optional[List[Dict[str, str]]]:
return None if self.attributes is None else self.attributes.adf_activity_runs
@adf_activity_runs.setter
def adf_activity_runs(self, adf_activity_runs: Optional[List[Dict[str, str]]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_runs = adf_activity_runs
@property
def adf_activity_notebook_path(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.adf_activity_notebook_path
)
@adf_activity_notebook_path.setter
def adf_activity_notebook_path(self, adf_activity_notebook_path: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_notebook_path = adf_activity_notebook_path
@property
def adf_activity_main_class_name(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.adf_activity_main_class_name
)
@adf_activity_main_class_name.setter
def adf_activity_main_class_name(self, adf_activity_main_class_name: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_main_class_name = adf_activity_main_class_name
@property
def adf_activity_python_file_path(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.adf_activity_python_file_path
)
@adf_activity_python_file_path.setter
def adf_activity_python_file_path(
self, adf_activity_python_file_path: Optional[str]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_python_file_path = adf_activity_python_file_path
@property
def adf_activity_first_row_only(self) -> Optional[bool]:
return (
None
if self.attributes is None
else self.attributes.adf_activity_first_row_only
)
@adf_activity_first_row_only.setter
def adf_activity_first_row_only(self, adf_activity_first_row_only: Optional[bool]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_first_row_only = adf_activity_first_row_only
@property
def adf_activity_batch_count(self) -> Optional[int]:
return (
None
if self.attributes is None
else self.attributes.adf_activity_batch_count
)
@adf_activity_batch_count.setter
def adf_activity_batch_count(self, adf_activity_batch_count: Optional[int]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_batch_count = adf_activity_batch_count
@property
def adf_activity_is_sequential(self) -> Optional[bool]:
return (
None
if self.attributes is None
else self.attributes.adf_activity_is_sequential
)
@adf_activity_is_sequential.setter
def adf_activity_is_sequential(self, adf_activity_is_sequential: Optional[bool]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_is_sequential = adf_activity_is_sequential
@property
def adf_activity_sub_activities(self) -> Optional[Set[str]]:
return (
None
if self.attributes is None
else self.attributes.adf_activity_sub_activities
)
@adf_activity_sub_activities.setter
def adf_activity_sub_activities(
self, adf_activity_sub_activities: Optional[Set[str]]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_sub_activities = adf_activity_sub_activities
@property
def adf_activity_reference_dataflow(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.adf_activity_reference_dataflow
)
@adf_activity_reference_dataflow.setter
def adf_activity_reference_dataflow(
self, adf_activity_reference_dataflow: Optional[str]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_activity_reference_dataflow = (
adf_activity_reference_dataflow
)
@property
def adf_pipeline_qualified_name(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.adf_pipeline_qualified_name
)
@adf_pipeline_qualified_name.setter
def adf_pipeline_qualified_name(self, adf_pipeline_qualified_name: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_pipeline_qualified_name = adf_pipeline_qualified_name
@property
def adf_linkedservices(self) -> Optional[List[AdfLinkedservice]]:
return None if self.attributes is None else self.attributes.adf_linkedservices
@adf_linkedservices.setter
def adf_linkedservices(self, adf_linkedservices: Optional[List[AdfLinkedservice]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_linkedservices = adf_linkedservices
@property
def adf_datasets(self) -> Optional[List[AdfDataset]]:
return None if self.attributes is None else self.attributes.adf_datasets
@adf_datasets.setter
def adf_datasets(self, adf_datasets: Optional[List[AdfDataset]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_datasets = adf_datasets
@property
def processes(self) -> Optional[List[Process]]:
return None if self.attributes is None else self.attributes.processes
@processes.setter
def processes(self, processes: Optional[List[Process]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.processes = processes
@property
def adf_pipeline(self) -> Optional[AdfPipeline]:
return None if self.attributes is None else self.attributes.adf_pipeline
@adf_pipeline.setter
def adf_pipeline(self, adf_pipeline: Optional[AdfPipeline]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_pipeline = adf_pipeline
@property
def adf_dataflow(self) -> Optional[AdfDataflow]:
return None if self.attributes is None else self.attributes.adf_dataflow
@adf_dataflow.setter
def adf_dataflow(self, adf_dataflow: Optional[AdfDataflow]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.adf_dataflow = adf_dataflow
class Attributes(ADF.Attributes):
adf_activity_type: Optional[str] = Field(default=None, description="")
adf_activity_preceding_dependency: Optional[Set[str]] = Field(
default=None, description=""
)
adf_activity_policy_timeout: Optional[str] = Field(default=None, description="")
adf_activity_polict_retry_interval: Optional[int] = Field(
default=None, description=""
)
adf_activity_state: Optional[AdfActivityState] = Field(
default=None, description=""
)
adf_activity_sources: Optional[Set[str]] = Field(default=None, description="")
adf_activity_sinks: Optional[Set[str]] = Field(default=None, description="")
adf_activity_source_type: Optional[str] = Field(default=None, description="")
adf_activity_sink_type: Optional[str] = Field(default=None, description="")
adf_activity_runs: Optional[List[Dict[str, str]]] = Field(
default=None, description=""
)
adf_activity_notebook_path: Optional[str] = Field(default=None, description="")
adf_activity_main_class_name: Optional[str] = Field(
default=None, description=""
)
adf_activity_python_file_path: Optional[str] = Field(
default=None, description=""
)
adf_activity_first_row_only: Optional[bool] = Field(
default=None, description=""
)
adf_activity_batch_count: Optional[int] = Field(default=None, description="")
adf_activity_is_sequential: Optional[bool] = Field(default=None, description="")
adf_activity_sub_activities: Optional[Set[str]] = Field(
default=None, description=""
)
adf_activity_reference_dataflow: Optional[str] = Field(
default=None, description=""
)
adf_pipeline_qualified_name: Optional[str] = Field(default=None, description="")
adf_linkedservices: Optional[List[AdfLinkedservice]] = Field(
default=None, description=""
) # relationship
adf_datasets: Optional[List[AdfDataset]] = Field(
default=None, description=""
) # relationship
processes: Optional[List[Process]] = Field(
default=None, description=""
) # relationship
adf_pipeline: Optional[AdfPipeline] = Field(
default=None, description=""
) # relationship
adf_dataflow: Optional[AdfDataflow] = Field(
default=None, description=""
) # relationship
attributes: AdfActivity.Attributes = Field(
default_factory=lambda: AdfActivity.Attributes(),
description=(
"Map of attributes in the instance and their values. "
"The specific keys of this map will vary by type, "
"so are described in the sub-types of this schema."
),
)
from .adf_dataflow import AdfDataflow # noqa: E402, F401
from .adf_dataset import AdfDataset # noqa: E402, F401
from .adf_linkedservice import AdfLinkedservice # noqa: E402, F401
from .adf_pipeline import AdfPipeline # noqa: E402, F401
from .process import Process # noqa: E402, F401