Source code for pyatlan.model.assets.data_studio

# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.


from __future__ import annotations

from typing import ClassVar, List, Optional

from pydantic.v1 import Field, validator

from pyatlan.model.fields.atlan_fields import (
    KeywordField,
    KeywordTextField,
    NumericField,
    RelationField,
)
from pyatlan.model.structs import GoogleLabel, GoogleTag

from .google import Google


[docs] class DataStudio(Google): """Description""" type_name: str = Field(default="DataStudio", allow_mutation=False) @validator("type_name") def validate_type_name(cls, v): if v != "DataStudio": raise ValueError("must be DataStudio") return v def __setattr__(self, name, value): if name in DataStudio._convenience_properties: return object.__setattr__(self, name, value) super().__setattr__(name, value) GOOGLE_SERVICE: ClassVar[KeywordField] = KeywordField( "googleService", "googleService" ) """ Service in Google in which the asset exists. """ GOOGLE_PROJECT_NAME: ClassVar[KeywordTextField] = KeywordTextField( "googleProjectName", "googleProjectName", "googleProjectName.text" ) """ Name of the project in which the asset exists. """ GOOGLE_PROJECT_ID: ClassVar[KeywordTextField] = KeywordTextField( "googleProjectId", "googleProjectId", "googleProjectId.text" ) """ ID of the project in which the asset exists. """ GOOGLE_PROJECT_NUMBER: ClassVar[NumericField] = NumericField( "googleProjectNumber", "googleProjectNumber" ) """ Number of the project in which the asset exists. """ GOOGLE_LOCATION: ClassVar[KeywordField] = KeywordField( "googleLocation", "googleLocation" ) """ Location of this asset in Google. """ GOOGLE_LOCATION_TYPE: ClassVar[KeywordField] = KeywordField( "googleLocationType", "googleLocationType" ) """ Type of location of this asset in Google. """ GOOGLE_LABELS: ClassVar[KeywordField] = KeywordField("googleLabels", "googleLabels") """ List of labels that have been applied to the asset in Google. """ GOOGLE_TAGS: ClassVar[KeywordField] = KeywordField("googleTags", "googleTags") """ List of tags that have been applied to the asset in Google. """ INPUT_TO_SPARK_JOBS: ClassVar[RelationField] = RelationField("inputToSparkJobs") """ TBC """ INPUT_TO_AIRFLOW_TASKS: ClassVar[RelationField] = RelationField( "inputToAirflowTasks" ) """ TBC """ INPUT_TO_PROCESSES: ClassVar[RelationField] = RelationField("inputToProcesses") """ TBC """ MODEL_IMPLEMENTED_ATTRIBUTES: ClassVar[RelationField] = RelationField( "modelImplementedAttributes" ) """ TBC """ OUTPUT_FROM_AIRFLOW_TASKS: ClassVar[RelationField] = RelationField( "outputFromAirflowTasks" ) """ TBC """ OUTPUT_FROM_SPARK_JOBS: ClassVar[RelationField] = RelationField( "outputFromSparkJobs" ) """ TBC """ MODEL_IMPLEMENTED_ENTITIES: ClassVar[RelationField] = RelationField( "modelImplementedEntities" ) """ TBC """ OUTPUT_FROM_PROCESSES: ClassVar[RelationField] = RelationField( "outputFromProcesses" ) """ TBC """ _convenience_properties: ClassVar[List[str]] = [ "google_service", "google_project_name", "google_project_id", "google_project_number", "google_location", "google_location_type", "google_labels", "google_tags", "input_to_spark_jobs", "input_to_airflow_tasks", "input_to_processes", "model_implemented_attributes", "output_from_airflow_tasks", "output_from_spark_jobs", "model_implemented_entities", "output_from_processes", ] @property def google_service(self) -> Optional[str]: return None if self.attributes is None else self.attributes.google_service @google_service.setter def google_service(self, google_service: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.google_service = google_service @property def google_project_name(self) -> Optional[str]: return None if self.attributes is None else self.attributes.google_project_name @google_project_name.setter def google_project_name(self, google_project_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.google_project_name = google_project_name @property def google_project_id(self) -> Optional[str]: return None if self.attributes is None else self.attributes.google_project_id @google_project_id.setter def google_project_id(self, google_project_id: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.google_project_id = google_project_id @property def google_project_number(self) -> Optional[int]: return ( None if self.attributes is None else self.attributes.google_project_number ) @google_project_number.setter def google_project_number(self, google_project_number: Optional[int]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.google_project_number = google_project_number @property def google_location(self) -> Optional[str]: return None if self.attributes is None else self.attributes.google_location @google_location.setter def google_location(self, google_location: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.google_location = google_location @property def google_location_type(self) -> Optional[str]: return None if self.attributes is None else self.attributes.google_location_type @google_location_type.setter def google_location_type(self, google_location_type: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.google_location_type = google_location_type @property def google_labels(self) -> Optional[List[GoogleLabel]]: return None if self.attributes is None else self.attributes.google_labels @google_labels.setter def google_labels(self, google_labels: Optional[List[GoogleLabel]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.google_labels = google_labels @property def google_tags(self) -> Optional[List[GoogleTag]]: return None if self.attributes is None else self.attributes.google_tags @google_tags.setter def google_tags(self, google_tags: Optional[List[GoogleTag]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.google_tags = google_tags @property def input_to_spark_jobs(self) -> Optional[List[SparkJob]]: return None if self.attributes is None else self.attributes.input_to_spark_jobs @input_to_spark_jobs.setter def input_to_spark_jobs(self, input_to_spark_jobs: Optional[List[SparkJob]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.input_to_spark_jobs = input_to_spark_jobs @property def input_to_airflow_tasks(self) -> Optional[List[AirflowTask]]: return ( None if self.attributes is None else self.attributes.input_to_airflow_tasks ) @input_to_airflow_tasks.setter def input_to_airflow_tasks( self, input_to_airflow_tasks: Optional[List[AirflowTask]] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.input_to_airflow_tasks = input_to_airflow_tasks @property def input_to_processes(self) -> Optional[List[Process]]: return None if self.attributes is None else self.attributes.input_to_processes @input_to_processes.setter def input_to_processes(self, input_to_processes: Optional[List[Process]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.input_to_processes = input_to_processes @property def model_implemented_attributes(self) -> Optional[List[ModelAttribute]]: return ( None if self.attributes is None else self.attributes.model_implemented_attributes ) @model_implemented_attributes.setter def model_implemented_attributes( self, model_implemented_attributes: Optional[List[ModelAttribute]] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.model_implemented_attributes = model_implemented_attributes @property def output_from_airflow_tasks(self) -> Optional[List[AirflowTask]]: return ( None if self.attributes is None else self.attributes.output_from_airflow_tasks ) @output_from_airflow_tasks.setter def output_from_airflow_tasks( self, output_from_airflow_tasks: Optional[List[AirflowTask]] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.output_from_airflow_tasks = output_from_airflow_tasks @property def output_from_spark_jobs(self) -> Optional[List[SparkJob]]: return ( None if self.attributes is None else self.attributes.output_from_spark_jobs ) @output_from_spark_jobs.setter def output_from_spark_jobs(self, output_from_spark_jobs: Optional[List[SparkJob]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.output_from_spark_jobs = output_from_spark_jobs @property def model_implemented_entities(self) -> Optional[List[ModelEntity]]: return ( None if self.attributes is None else self.attributes.model_implemented_entities ) @model_implemented_entities.setter def model_implemented_entities( self, model_implemented_entities: Optional[List[ModelEntity]] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.model_implemented_entities = model_implemented_entities @property def output_from_processes(self) -> Optional[List[Process]]: return ( None if self.attributes is None else self.attributes.output_from_processes ) @output_from_processes.setter def output_from_processes(self, output_from_processes: Optional[List[Process]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.output_from_processes = output_from_processes class Attributes(Google.Attributes): google_service: Optional[str] = Field(default=None, description="") google_project_name: Optional[str] = Field(default=None, description="") google_project_id: Optional[str] = Field(default=None, description="") google_project_number: Optional[int] = Field(default=None, description="") google_location: Optional[str] = Field(default=None, description="") google_location_type: Optional[str] = Field(default=None, description="") google_labels: Optional[List[GoogleLabel]] = Field(default=None, description="") google_tags: Optional[List[GoogleTag]] = Field(default=None, description="") input_to_spark_jobs: Optional[List[SparkJob]] = Field( default=None, description="" ) # relationship input_to_airflow_tasks: Optional[List[AirflowTask]] = Field( default=None, description="" ) # relationship input_to_processes: Optional[List[Process]] = Field( default=None, description="" ) # relationship model_implemented_attributes: Optional[List[ModelAttribute]] = Field( default=None, description="" ) # relationship output_from_airflow_tasks: Optional[List[AirflowTask]] = Field( default=None, description="" ) # relationship output_from_spark_jobs: Optional[List[SparkJob]] = Field( default=None, description="" ) # relationship model_implemented_entities: Optional[List[ModelEntity]] = Field( default=None, description="" ) # relationship output_from_processes: Optional[List[Process]] = Field( default=None, description="" ) # relationship attributes: DataStudio.Attributes = Field( default_factory=lambda: DataStudio.Attributes(), description=( "Map of attributes in the instance and their values. " "The specific keys of this map will vary by type, " "so are described in the sub-types of this schema." ), )
from .core.airflow_task import AirflowTask # noqa: E402, F401 from .core.model_attribute import ModelAttribute # noqa: E402, F401 from .core.model_entity import ModelEntity # noqa: E402, F401 from .core.process import Process # noqa: E402, F401 from .core.spark_job import SparkJob # noqa: E402, F401 DataStudio.Attributes.update_forward_refs()