Source code for pyatlan.model.assets.core.power_b_i_dataflow

# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.


from __future__ import annotations

from typing import ClassVar, List, Optional, Set

from pydantic.v1 import Field, validator

from pyatlan.model.fields.atlan_fields import (
    KeywordField,
    KeywordTextField,
    RelationField,
    TextField,
)

from .power_b_i import PowerBI


[docs] class PowerBIDataflow(PowerBI): """Description""" type_name: str = Field(default="PowerBIDataflow", allow_mutation=False) @validator("type_name") def validate_type_name(cls, v): if v != "PowerBIDataflow": raise ValueError("must be PowerBIDataflow") return v def __setattr__(self, name, value): if name in PowerBIDataflow._convenience_properties: return object.__setattr__(self, name, value) super().__setattr__(name, value) WORKSPACE_QUALIFIED_NAME: ClassVar[KeywordTextField] = KeywordTextField( "workspaceQualifiedName", "workspaceQualifiedName.keyword", "workspaceQualifiedName", ) """ Unique name of the workspace in which this dataflow exists. """ WEB_URL: ClassVar[TextField] = TextField("webUrl", "webUrl") """ Deprecated. See 'sourceUrl' instead. """ POWER_BI_DATAFLOW_REFRESH_SCHEDULE_FREQUENCY: ClassVar[KeywordField] = KeywordField( "powerBIDataflowRefreshScheduleFrequency", "powerBIDataflowRefreshScheduleFrequency", ) """ Refresh Schedule frequency for a PowerBI Dataflow. """ POWER_BI_DATAFLOW_REFRESH_SCHEDULE_TIMES: ClassVar[KeywordField] = KeywordField( "powerBIDataflowRefreshScheduleTimes", "powerBIDataflowRefreshScheduleTimes" ) """ Time for the refresh schedule set for a PowerBI Dataflow. """ POWER_BI_DATAFLOW_REFRESH_SCHEDULE_TIME_ZONE: ClassVar[KeywordField] = KeywordField( "powerBIDataflowRefreshScheduleTimeZone", "powerBIDataflowRefreshScheduleTimeZone", ) """ Time zone for the refresh schedule set for a PowerBI Dataflow. """ WORKSPACE: ClassVar[RelationField] = RelationField("workspace") """ TBC """ POWER_BI_PROCESSES: ClassVar[RelationField] = RelationField("powerBIProcesses") """ TBC """ DATASETS: ClassVar[RelationField] = RelationField("datasets") """ TBC """ POWER_BI_DATAFLOW_ENTITY_COLUMNS: ClassVar[RelationField] = RelationField( "powerBIDataflowEntityColumns" ) """ TBC """ TABLES: ClassVar[RelationField] = RelationField("tables") """ TBC """ POWER_BI_DATASOURCES: ClassVar[RelationField] = RelationField("powerBIDatasources") """ TBC """ POWER_BI_DATAFLOW_CHILDREN: ClassVar[RelationField] = RelationField( "powerBIDataflowChildren" ) """ TBC """ POWER_BI_DATAFLOW_PARENTS: ClassVar[RelationField] = RelationField( "powerBIDataflowParents" ) """ TBC """ _convenience_properties: ClassVar[List[str]] = [ "workspace_qualified_name", "web_url", "power_b_i_dataflow_refresh_schedule_frequency", "power_b_i_dataflow_refresh_schedule_times", "power_b_i_dataflow_refresh_schedule_time_zone", "workspace", "power_b_i_processes", "datasets", "power_b_i_dataflow_entity_columns", "tables", "power_b_i_datasources", "power_b_i_dataflow_children", "power_b_i_dataflow_parents", ] @property def workspace_qualified_name(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.workspace_qualified_name ) @workspace_qualified_name.setter def workspace_qualified_name(self, workspace_qualified_name: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.workspace_qualified_name = workspace_qualified_name @property def web_url(self) -> Optional[str]: return None if self.attributes is None else self.attributes.web_url @web_url.setter def web_url(self, web_url: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.web_url = web_url @property def power_b_i_dataflow_refresh_schedule_frequency(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.power_b_i_dataflow_refresh_schedule_frequency ) @power_b_i_dataflow_refresh_schedule_frequency.setter def power_b_i_dataflow_refresh_schedule_frequency( self, power_b_i_dataflow_refresh_schedule_frequency: Optional[str] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.power_b_i_dataflow_refresh_schedule_frequency = ( power_b_i_dataflow_refresh_schedule_frequency ) @property def power_b_i_dataflow_refresh_schedule_times(self) -> Optional[Set[str]]: return ( None if self.attributes is None else self.attributes.power_b_i_dataflow_refresh_schedule_times ) @power_b_i_dataflow_refresh_schedule_times.setter def power_b_i_dataflow_refresh_schedule_times( self, power_b_i_dataflow_refresh_schedule_times: Optional[Set[str]] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.power_b_i_dataflow_refresh_schedule_times = ( power_b_i_dataflow_refresh_schedule_times ) @property def power_b_i_dataflow_refresh_schedule_time_zone(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.power_b_i_dataflow_refresh_schedule_time_zone ) @power_b_i_dataflow_refresh_schedule_time_zone.setter def power_b_i_dataflow_refresh_schedule_time_zone( self, power_b_i_dataflow_refresh_schedule_time_zone: Optional[str] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.power_b_i_dataflow_refresh_schedule_time_zone = ( power_b_i_dataflow_refresh_schedule_time_zone ) @property def workspace(self) -> Optional[PowerBIWorkspace]: return None if self.attributes is None else self.attributes.workspace @workspace.setter def workspace(self, workspace: Optional[PowerBIWorkspace]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.workspace = workspace @property def power_b_i_processes(self) -> Optional[List[Process]]: return None if self.attributes is None else self.attributes.power_b_i_processes @power_b_i_processes.setter def power_b_i_processes(self, power_b_i_processes: Optional[List[Process]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.power_b_i_processes = power_b_i_processes @property def datasets(self) -> Optional[List[PowerBIDataset]]: return None if self.attributes is None else self.attributes.datasets @datasets.setter def datasets(self, datasets: Optional[List[PowerBIDataset]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.datasets = datasets @property def power_b_i_dataflow_entity_columns( self, ) -> Optional[List[PowerBIDataflowEntityColumn]]: return ( None if self.attributes is None else self.attributes.power_b_i_dataflow_entity_columns ) @power_b_i_dataflow_entity_columns.setter def power_b_i_dataflow_entity_columns( self, power_b_i_dataflow_entity_columns: Optional[List[PowerBIDataflowEntityColumn]], ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.power_b_i_dataflow_entity_columns = ( power_b_i_dataflow_entity_columns ) @property def tables(self) -> Optional[List[PowerBITable]]: return None if self.attributes is None else self.attributes.tables @tables.setter def tables(self, tables: Optional[List[PowerBITable]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.tables = tables @property def power_b_i_datasources(self) -> Optional[List[PowerBIDatasource]]: return ( None if self.attributes is None else self.attributes.power_b_i_datasources ) @power_b_i_datasources.setter def power_b_i_datasources( self, power_b_i_datasources: Optional[List[PowerBIDatasource]] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.power_b_i_datasources = power_b_i_datasources @property def power_b_i_dataflow_children(self) -> Optional[List[PowerBIDataflow]]: return ( None if self.attributes is None else self.attributes.power_b_i_dataflow_children ) @power_b_i_dataflow_children.setter def power_b_i_dataflow_children( self, power_b_i_dataflow_children: Optional[List[PowerBIDataflow]] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.power_b_i_dataflow_children = power_b_i_dataflow_children @property def power_b_i_dataflow_parents(self) -> Optional[List[PowerBIDataflow]]: return ( None if self.attributes is None else self.attributes.power_b_i_dataflow_parents ) @power_b_i_dataflow_parents.setter def power_b_i_dataflow_parents( self, power_b_i_dataflow_parents: Optional[List[PowerBIDataflow]] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.power_b_i_dataflow_parents = power_b_i_dataflow_parents class Attributes(PowerBI.Attributes): workspace_qualified_name: Optional[str] = Field(default=None, description="") web_url: Optional[str] = Field(default=None, description="") power_b_i_dataflow_refresh_schedule_frequency: Optional[str] = Field( default=None, description="" ) power_b_i_dataflow_refresh_schedule_times: Optional[Set[str]] = Field( default=None, description="" ) power_b_i_dataflow_refresh_schedule_time_zone: Optional[str] = Field( default=None, description="" ) workspace: Optional[PowerBIWorkspace] = Field( default=None, description="" ) # relationship power_b_i_processes: Optional[List[Process]] = Field( default=None, description="" ) # relationship datasets: Optional[List[PowerBIDataset]] = Field( default=None, description="" ) # relationship power_b_i_dataflow_entity_columns: Optional[ List[PowerBIDataflowEntityColumn] ] = Field(default=None, description="") # relationship tables: Optional[List[PowerBITable]] = Field( default=None, description="" ) # relationship power_b_i_datasources: Optional[List[PowerBIDatasource]] = Field( default=None, description="" ) # relationship power_b_i_dataflow_children: Optional[List[PowerBIDataflow]] = Field( default=None, description="" ) # relationship power_b_i_dataflow_parents: Optional[List[PowerBIDataflow]] = Field( default=None, description="" ) # relationship attributes: PowerBIDataflow.Attributes = Field( default_factory=lambda: PowerBIDataflow.Attributes(), description=( "Map of attributes in the instance and their values. " "The specific keys of this map will vary by type, " "so are described in the sub-types of this schema." ), )
from .power_b_i_dataflow_entity_column import ( PowerBIDataflowEntityColumn, # noqa: E402, F401 ) from .power_b_i_dataset import PowerBIDataset # noqa: E402, F401 from .power_b_i_datasource import PowerBIDatasource # noqa: E402, F401 from .power_b_i_table import PowerBITable # noqa: E402, F401 from .power_b_i_workspace import PowerBIWorkspace # noqa: E402, F401 from .process import Process # noqa: E402, F401