Source code for pyatlan.model.assets.core.adf_dataset

# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.


from __future__ import annotations

from typing import ClassVar, List, Optional, Set

from pydantic.v1 import Field, validator

from pyatlan.model.fields.atlan_fields import KeywordField, RelationField, TextField

from .a_d_f import ADF


[docs] class AdfDataset(ADF): """Description""" type_name: str = Field(default="AdfDataset", allow_mutation=False) @validator("type_name") def validate_type_name(cls, v): if v != "AdfDataset": raise ValueError("must be AdfDataset") return v def __setattr__(self, name, value): if name in AdfDataset._convenience_properties: return object.__setattr__(self, name, value) super().__setattr__(name, value) ADF_DATASET_TYPE: ClassVar[KeywordField] = KeywordField( "adfDatasetType", "adfDatasetType" ) """ Defines the type of the dataset. """ ADF_DATASET_ANNOTATIONS: ClassVar[TextField] = TextField( "adfDatasetAnnotations", "adfDatasetAnnotations" ) """ The list of annotation assigned to a dataset. """ ADF_DATASET_LINKED_SERVICE: ClassVar[KeywordField] = KeywordField( "adfDatasetLinkedService", "adfDatasetLinkedService" ) """ Defines the name of the linked service used to create this dataset. """ ADF_DATASET_COLLECTION_NAME: ClassVar[TextField] = TextField( "adfDatasetCollectionName", "adfDatasetCollectionName" ) """ Defines the name collection in the cosmos dataset. """ ADF_DATASET_STORAGE_TYPE: ClassVar[TextField] = TextField( "adfDatasetStorageType", "adfDatasetStorageType" ) """ Defines the storage type of storage file system dataset. """ ADF_DATASET_FILE_NAME: ClassVar[TextField] = TextField( "adfDatasetFileName", "adfDatasetFileName" ) """ Defines the name of the file in the storage file system dataset. """ ADF_DATASET_FILE_FOLDER_PATH: ClassVar[TextField] = TextField( "adfDatasetFileFolderPath", "adfDatasetFileFolderPath" ) """ Defines the folder path of the file in the storage file system dataset. """ ADF_DATASET_CONTAINER_NAME: ClassVar[TextField] = TextField( "adfDatasetContainerName", "adfDatasetContainerName" ) """ Defines the container or bucket name in the storage file system dataset. """ ADF_DATASET_SCHEMA_NAME: ClassVar[TextField] = TextField( "adfDatasetSchemaName", "adfDatasetSchemaName" ) """ Defines the name of the schema used in the snowflake, mssql, azure sql database type of dataset. """ ADF_DATASET_TABLE_NAME: ClassVar[TextField] = TextField( "adfDatasetTableName", "adfDatasetTableName" ) """ Defines the name of the table used in the snowflake, mssql, azure sql database type of dataset. """ ADF_DATASET_DATABASE_NAME: ClassVar[TextField] = TextField( "adfDatasetDatabaseName", "adfDatasetDatabaseName" ) """ Defines the name of the database used in the azure delta lake type of dataset. """ ADF_LINKEDSERVICE: ClassVar[RelationField] = RelationField("adfLinkedservice") """ TBC """ ADF_ACTIVITIES: ClassVar[RelationField] = RelationField("adfActivities") """ TBC """ ADF_DATAFLOWS: ClassVar[RelationField] = RelationField("adfDataflows") """ TBC """ ADF_PIPELINES: ClassVar[RelationField] = RelationField("adfPipelines") """ TBC """ _convenience_properties: ClassVar[List[str]] = [ "adf_dataset_type", "adf_dataset_annotations", "adf_dataset_linked_service", "adf_dataset_collection_name", "adf_dataset_storage_type", "adf_dataset_file_name", "adf_dataset_file_folder_path", "adf_dataset_container_name", "adf_dataset_schema_name", "adf_dataset_table_name", "adf_dataset_database_name", "adf_linkedservice", "adf_activities", "adf_dataflows", "adf_pipelines", ] @property def adf_dataset_type(self) -> Optional[str]: return None if self.attributes is None else self.attributes.adf_dataset_type @adf_dataset_type.setter def adf_dataset_type(self, adf_dataset_type: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_dataset_type = adf_dataset_type @property def adf_dataset_annotations(self) -> Optional[Set[str]]: return ( None if self.attributes is None else self.attributes.adf_dataset_annotations ) @adf_dataset_annotations.setter def adf_dataset_annotations(self, adf_dataset_annotations: Optional[Set[str]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_dataset_annotations = adf_dataset_annotations @property def adf_dataset_linked_service(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.adf_dataset_linked_service ) @adf_dataset_linked_service.setter def adf_dataset_linked_service(self, adf_dataset_linked_service: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_dataset_linked_service = adf_dataset_linked_service @property def adf_dataset_collection_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.adf_dataset_collection_name ) @adf_dataset_collection_name.setter def adf_dataset_collection_name(self, adf_dataset_collection_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_dataset_collection_name = adf_dataset_collection_name @property def adf_dataset_storage_type(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.adf_dataset_storage_type ) @adf_dataset_storage_type.setter def adf_dataset_storage_type(self, adf_dataset_storage_type: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_dataset_storage_type = adf_dataset_storage_type @property def adf_dataset_file_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.adf_dataset_file_name ) @adf_dataset_file_name.setter def adf_dataset_file_name(self, adf_dataset_file_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_dataset_file_name = adf_dataset_file_name @property def adf_dataset_file_folder_path(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.adf_dataset_file_folder_path ) @adf_dataset_file_folder_path.setter def adf_dataset_file_folder_path(self, adf_dataset_file_folder_path: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_dataset_file_folder_path = adf_dataset_file_folder_path @property def adf_dataset_container_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.adf_dataset_container_name ) @adf_dataset_container_name.setter def adf_dataset_container_name(self, adf_dataset_container_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_dataset_container_name = adf_dataset_container_name @property def adf_dataset_schema_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.adf_dataset_schema_name ) @adf_dataset_schema_name.setter def adf_dataset_schema_name(self, adf_dataset_schema_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_dataset_schema_name = adf_dataset_schema_name @property def adf_dataset_table_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.adf_dataset_table_name ) @adf_dataset_table_name.setter def adf_dataset_table_name(self, adf_dataset_table_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_dataset_table_name = adf_dataset_table_name @property def adf_dataset_database_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.adf_dataset_database_name ) @adf_dataset_database_name.setter def adf_dataset_database_name(self, adf_dataset_database_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_dataset_database_name = adf_dataset_database_name @property def adf_linkedservice(self) -> Optional[AdfLinkedservice]: return None if self.attributes is None else self.attributes.adf_linkedservice @adf_linkedservice.setter def adf_linkedservice(self, adf_linkedservice: Optional[AdfLinkedservice]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_linkedservice = adf_linkedservice @property def adf_activities(self) -> Optional[List[AdfActivity]]: return None if self.attributes is None else self.attributes.adf_activities @adf_activities.setter def adf_activities(self, adf_activities: Optional[List[AdfActivity]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_activities = adf_activities @property def adf_dataflows(self) -> Optional[List[AdfDataflow]]: return None if self.attributes is None else self.attributes.adf_dataflows @adf_dataflows.setter def adf_dataflows(self, adf_dataflows: Optional[List[AdfDataflow]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_dataflows = adf_dataflows @property def adf_pipelines(self) -> Optional[List[AdfPipeline]]: return None if self.attributes is None else self.attributes.adf_pipelines @adf_pipelines.setter def adf_pipelines(self, adf_pipelines: Optional[List[AdfPipeline]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.adf_pipelines = adf_pipelines class Attributes(ADF.Attributes): adf_dataset_type: Optional[str] = Field(default=None, description="") adf_dataset_annotations: Optional[Set[str]] = Field( default=None, description="" ) adf_dataset_linked_service: Optional[str] = Field(default=None, description="") adf_dataset_collection_name: Optional[str] = Field(default=None, description="") adf_dataset_storage_type: Optional[str] = Field(default=None, description="") adf_dataset_file_name: Optional[str] = Field(default=None, description="") adf_dataset_file_folder_path: Optional[str] = Field( default=None, description="" ) adf_dataset_container_name: Optional[str] = Field(default=None, description="") adf_dataset_schema_name: Optional[str] = Field(default=None, description="") adf_dataset_table_name: Optional[str] = Field(default=None, description="") adf_dataset_database_name: Optional[str] = Field(default=None, description="") adf_linkedservice: Optional[AdfLinkedservice] = Field( default=None, description="" ) # relationship adf_activities: Optional[List[AdfActivity]] = Field( default=None, description="" ) # relationship adf_dataflows: Optional[List[AdfDataflow]] = Field( default=None, description="" ) # relationship adf_pipelines: Optional[List[AdfPipeline]] = Field( default=None, description="" ) # relationship attributes: AdfDataset.Attributes = Field( default_factory=lambda: AdfDataset.Attributes(), description=( "Map of attributes in the instance and their values. " "The specific keys of this map will vary by type, " "so are described in the sub-types of this schema." ), )
from .adf_activity import AdfActivity # noqa: E402, F401 from .adf_dataflow import AdfDataflow # noqa: E402, F401 from .adf_linkedservice import AdfLinkedservice # noqa: E402, F401 from .adf_pipeline import AdfPipeline # noqa: E402, F401