Source code for pyatlan.model.assets.core.data_product

# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.


from __future__ import annotations

import json
from datetime import datetime
from typing import TYPE_CHECKING, ClassVar, List, Optional, Set
from warnings import warn

from pydantic.v1 import Field, StrictStr, validator

from pyatlan.errors import ErrorCode
from pyatlan.model.data_mesh import DataProductsAssetsDSL
from pyatlan.model.enums import (
    DataProductCriticality,
    DataProductLineageStatus,
    DataProductSensitivity,
    DataProductStatus,
    DataProductVisibility,
)
from pyatlan.model.fields.atlan_fields import (
    KeywordField,
    NumericField,
    RelationField,
    TextField,
)
from pyatlan.model.search import IndexSearchRequest
from pyatlan.utils import init_guid, validate_required_fields

from .asset import SelfAsset
from .data_mesh import DataMesh

if TYPE_CHECKING:
    from pyatlan.client.atlan import AtlanClient


[docs] class DataProduct(DataMesh): """Description""" @classmethod @init_guid def creator( cls, *, name: StrictStr, domain_qualified_name: StrictStr, asset_selection: IndexSearchRequest, ) -> DataProduct: attributes = DataProduct.Attributes.create( name=name, domain_qualified_name=domain_qualified_name, asset_selection=asset_selection, ) return cls(attributes=attributes) @classmethod @init_guid def create( cls, *, name: StrictStr, domain_qualified_name: StrictStr, asset_selection: IndexSearchRequest, ) -> DataProduct: warn( ( "This method is deprecated, please use 'creator' " "instead, which offers identical functionality." ), DeprecationWarning, stacklevel=2, ) return cls.creator( name=name, domain_qualified_name=domain_qualified_name, asset_selection=asset_selection, ) @classmethod @init_guid def updater( cls, qualified_name: str = "", name: str = "", asset_selection: Optional[IndexSearchRequest] = None, ) -> DataProduct: validate_required_fields( ["name", "qualified_name"], [name, qualified_name], ) # Split the data product qualified_name to extract data mesh info fields = qualified_name.split("/") if len(fields) < 5: raise ValueError(f"Invalid data product qualified_name: {qualified_name}") product = cls( attributes=cls.Attributes( qualified_name=qualified_name, name=name, ) ) if asset_selection: product.data_product_assets_d_s_l = ( DataProductsAssetsDSL.get_asset_selection(asset_selection) ) return product @classmethod def create_for_modification( cls: type[SelfAsset], qualified_name: str = "", name: str = "", ) -> SelfAsset: warn( ( "This method is deprecated, please use 'updater' " "instead, which offers identical functionality." ), DeprecationWarning, stacklevel=2, ) return cls.updater( qualified_name=qualified_name, name=name, )
[docs] def get_assets(self, client: AtlanClient): """ Retrieves list of all assets linked to the provided data product. :param client: connectivity to an Atlan tenant :raises InvalidRequestError: if DataProduct asset DSL cannot be found (does not exist) in Atlan :raises AtlanError: if there is an issue interacting with the API :returns: instance of `IndexSearchResults` with list of all assets linked to the provided data product """ dp_dsl = self.data_product_assets_d_s_l if not dp_dsl: raise ErrorCode.MISSING_DATA_PRODUCT_ASSET_DSL.exception_with_parameters() json_object = json.loads(dp_dsl) request = IndexSearchRequest(**json_object.get("query", {})) response = client.asset.search(request) return response
type_name: str = Field(default="DataProduct", allow_mutation=False) @validator("type_name") def validate_type_name(cls, v): if v != "DataProduct": raise ValueError("must be DataProduct") return v def __setattr__(self, name, value): if name in DataProduct._convenience_properties: return object.__setattr__(self, name, value) super().__setattr__(name, value) DATA_PRODUCT_STATUS: ClassVar[KeywordField] = KeywordField( "dataProductStatus", "dataProductStatus" ) """ Status of this data product. """ DAAP_STATUS: ClassVar[KeywordField] = KeywordField("daapStatus", "daapStatus") """ Status of this data product. """ DATA_PRODUCT_CRITICALITY: ClassVar[KeywordField] = KeywordField( "dataProductCriticality", "dataProductCriticality" ) """ Criticality of this data product. """ DAAP_CRITICALITY: ClassVar[KeywordField] = KeywordField( "daapCriticality", "daapCriticality" ) """ Criticality of this data product. """ DATA_PRODUCT_SENSITIVITY: ClassVar[KeywordField] = KeywordField( "dataProductSensitivity", "dataProductSensitivity" ) """ Information sensitivity of this data product. """ DAAP_SENSITIVITY: ClassVar[KeywordField] = KeywordField( "daapSensitivity", "daapSensitivity" ) """ Information sensitivity of this data product. """ DATA_PRODUCT_VISIBILITY: ClassVar[KeywordField] = KeywordField( "dataProductVisibility", "dataProductVisibility" ) """ Visibility of a data product. """ DAAP_VISIBILITY: ClassVar[KeywordField] = KeywordField( "daapVisibility", "daapVisibility" ) """ Visibility of a data product. """ DATA_PRODUCT_ASSETS_DSL: ClassVar[TextField] = TextField( "dataProductAssetsDSL", "dataProductAssetsDSL" ) """ Search DSL used to define which assets are part of this data product. """ DATA_PRODUCT_ASSETS_PLAYBOOK_FILTER: ClassVar[TextField] = TextField( "dataProductAssetsPlaybookFilter", "dataProductAssetsPlaybookFilter" ) """ Playbook filter to define which assets are part of this data product. """ DATA_PRODUCT_SCORE_VALUE: ClassVar[NumericField] = NumericField( "dataProductScoreValue", "dataProductScoreValue" ) """ Score of this data product. """ DATA_PRODUCT_SCORE_UPDATED_AT: ClassVar[NumericField] = NumericField( "dataProductScoreUpdatedAt", "dataProductScoreUpdatedAt" ) """ Timestamp when the score of this data product was last updated. """ DAAP_VISIBILITY_USERS: ClassVar[KeywordField] = KeywordField( "daapVisibilityUsers", "daapVisibilityUsers" ) """ list of users for product visibility control """ DAAP_VISIBILITY_GROUPS: ClassVar[KeywordField] = KeywordField( "daapVisibilityGroups", "daapVisibilityGroups" ) """ list of groups for product visibility control """ DAAP_OUTPUT_PORT_GUIDS: ClassVar[KeywordField] = KeywordField( "daapOutputPortGuids", "daapOutputPortGuids" ) """ Output ports guids for this data product. """ DAAP_INPUT_PORT_GUIDS: ClassVar[KeywordField] = KeywordField( "daapInputPortGuids", "daapInputPortGuids" ) """ Input ports guids for this data product. """ DAAP_LINEAGE_STATUS: ClassVar[KeywordField] = KeywordField( "daapLineageStatus", "daapLineageStatus" ) """ Status of this data product lineage. """ OUTPUT_PORTS: ClassVar[RelationField] = RelationField("outputPorts") """ TBC """ DATA_DOMAIN: ClassVar[RelationField] = RelationField("dataDomain") """ TBC """ INPUT_PORTS: ClassVar[RelationField] = RelationField("inputPorts") """ TBC """ _convenience_properties: ClassVar[List[str]] = [ "data_product_status", "daap_status", "data_product_criticality", "daap_criticality", "data_product_sensitivity", "daap_sensitivity", "data_product_visibility", "daap_visibility", "data_product_assets_d_s_l", "data_product_assets_playbook_filter", "data_product_score_value", "data_product_score_updated_at", "daap_visibility_users", "daap_visibility_groups", "daap_output_port_guids", "daap_input_port_guids", "daap_lineage_status", "output_ports", "data_domain", "input_ports", ] @property def data_product_status(self) -> Optional[DataProductStatus]: return None if self.attributes is None else self.attributes.data_product_status @data_product_status.setter def data_product_status(self, data_product_status: Optional[DataProductStatus]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.data_product_status = data_product_status @property def daap_status(self) -> Optional[DataProductStatus]: return None if self.attributes is None else self.attributes.daap_status @daap_status.setter def daap_status(self, daap_status: Optional[DataProductStatus]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.daap_status = daap_status @property def data_product_criticality(self) -> Optional[DataProductCriticality]: return ( None if self.attributes is None else self.attributes.data_product_criticality ) @data_product_criticality.setter def data_product_criticality( self, data_product_criticality: Optional[DataProductCriticality] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.data_product_criticality = data_product_criticality @property def daap_criticality(self) -> Optional[DataProductCriticality]: return None if self.attributes is None else self.attributes.daap_criticality @daap_criticality.setter def daap_criticality(self, daap_criticality: Optional[DataProductCriticality]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.daap_criticality = daap_criticality @property def data_product_sensitivity(self) -> Optional[DataProductSensitivity]: return ( None if self.attributes is None else self.attributes.data_product_sensitivity ) @data_product_sensitivity.setter def data_product_sensitivity( self, data_product_sensitivity: Optional[DataProductSensitivity] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.data_product_sensitivity = data_product_sensitivity @property def daap_sensitivity(self) -> Optional[DataProductSensitivity]: return None if self.attributes is None else self.attributes.daap_sensitivity @daap_sensitivity.setter def daap_sensitivity(self, daap_sensitivity: Optional[DataProductSensitivity]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.daap_sensitivity = daap_sensitivity @property def data_product_visibility(self) -> Optional[DataProductVisibility]: return ( None if self.attributes is None else self.attributes.data_product_visibility ) @data_product_visibility.setter def data_product_visibility( self, data_product_visibility: Optional[DataProductVisibility] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.data_product_visibility = data_product_visibility @property def daap_visibility(self) -> Optional[DataProductVisibility]: return None if self.attributes is None else self.attributes.daap_visibility @daap_visibility.setter def daap_visibility(self, daap_visibility: Optional[DataProductVisibility]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.daap_visibility = daap_visibility @property def data_product_assets_d_s_l(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.data_product_assets_d_s_l ) @data_product_assets_d_s_l.setter def data_product_assets_d_s_l(self, data_product_assets_d_s_l: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.data_product_assets_d_s_l = data_product_assets_d_s_l @property def data_product_assets_playbook_filter(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.data_product_assets_playbook_filter ) @data_product_assets_playbook_filter.setter def data_product_assets_playbook_filter( self, data_product_assets_playbook_filter: Optional[str] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.data_product_assets_playbook_filter = ( data_product_assets_playbook_filter ) @property def data_product_score_value(self) -> Optional[float]: return ( None if self.attributes is None else self.attributes.data_product_score_value ) @data_product_score_value.setter def data_product_score_value(self, data_product_score_value: Optional[float]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.data_product_score_value = data_product_score_value @property def data_product_score_updated_at(self) -> Optional[datetime]: return ( None if self.attributes is None else self.attributes.data_product_score_updated_at ) @data_product_score_updated_at.setter def data_product_score_updated_at( self, data_product_score_updated_at: Optional[datetime] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.data_product_score_updated_at = data_product_score_updated_at @property def daap_visibility_users(self) -> Optional[Set[str]]: return ( None if self.attributes is None else self.attributes.daap_visibility_users ) @daap_visibility_users.setter def daap_visibility_users(self, daap_visibility_users: Optional[Set[str]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.daap_visibility_users = daap_visibility_users @property def daap_visibility_groups(self) -> Optional[Set[str]]: return ( None if self.attributes is None else self.attributes.daap_visibility_groups ) @daap_visibility_groups.setter def daap_visibility_groups(self, daap_visibility_groups: Optional[Set[str]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.daap_visibility_groups = daap_visibility_groups @property def daap_output_port_guids(self) -> Optional[Set[str]]: return ( None if self.attributes is None else self.attributes.daap_output_port_guids ) @daap_output_port_guids.setter def daap_output_port_guids(self, daap_output_port_guids: Optional[Set[str]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.daap_output_port_guids = daap_output_port_guids @property def daap_input_port_guids(self) -> Optional[Set[str]]: return ( None if self.attributes is None else self.attributes.daap_input_port_guids ) @daap_input_port_guids.setter def daap_input_port_guids(self, daap_input_port_guids: Optional[Set[str]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.daap_input_port_guids = daap_input_port_guids @property def daap_lineage_status(self) -> Optional[DataProductLineageStatus]: return None if self.attributes is None else self.attributes.daap_lineage_status @daap_lineage_status.setter def daap_lineage_status( self, daap_lineage_status: Optional[DataProductLineageStatus] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.daap_lineage_status = daap_lineage_status @property def output_ports(self) -> Optional[List[Asset]]: return None if self.attributes is None else self.attributes.output_ports @output_ports.setter def output_ports(self, output_ports: Optional[List[Asset]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.output_ports = output_ports @property def data_domain(self) -> Optional[DataDomain]: return None if self.attributes is None else self.attributes.data_domain @data_domain.setter def data_domain(self, data_domain: Optional[DataDomain]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.data_domain = data_domain @property def input_ports(self) -> Optional[List[Asset]]: return None if self.attributes is None else self.attributes.input_ports @input_ports.setter def input_ports(self, input_ports: Optional[List[Asset]]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.input_ports = input_ports class Attributes(DataMesh.Attributes): data_product_status: Optional[DataProductStatus] = Field( default=None, description="" ) daap_status: Optional[DataProductStatus] = Field(default=None, description="") data_product_criticality: Optional[DataProductCriticality] = Field( default=None, description="" ) daap_criticality: Optional[DataProductCriticality] = Field( default=None, description="" ) data_product_sensitivity: Optional[DataProductSensitivity] = Field( default=None, description="" ) daap_sensitivity: Optional[DataProductSensitivity] = Field( default=None, description="" ) data_product_visibility: Optional[DataProductVisibility] = Field( default=None, description="" ) daap_visibility: Optional[DataProductVisibility] = Field( default=None, description="" ) data_product_assets_d_s_l: Optional[str] = Field(default=None, description="") data_product_assets_playbook_filter: Optional[str] = Field( default=None, description="" ) data_product_score_value: Optional[float] = Field(default=None, description="") data_product_score_updated_at: Optional[datetime] = Field( default=None, description="" ) daap_visibility_users: Optional[Set[str]] = Field(default=None, description="") daap_visibility_groups: Optional[Set[str]] = Field(default=None, description="") daap_output_port_guids: Optional[Set[str]] = Field(default=None, description="") daap_input_port_guids: Optional[Set[str]] = Field(default=None, description="") daap_lineage_status: Optional[DataProductLineageStatus] = Field( default=None, description="" ) output_ports: Optional[List[Asset]] = Field( default=None, description="" ) # relationship data_domain: Optional[DataDomain] = Field( default=None, description="" ) # relationship input_ports: Optional[List[Asset]] = Field( default=None, description="" ) # relationship @classmethod @init_guid def create( cls, *, name: StrictStr, domain_qualified_name: StrictStr, asset_selection: IndexSearchRequest, ) -> DataProduct.Attributes: validate_required_fields( ["name", "domain_qualified_name", "asset_selection"], [name, domain_qualified_name, asset_selection], ) ASSETS_PLAYBOOK_FILTER = ( '{"condition":"AND","isGroupLocked":false,"rules":[]}' ) return DataProduct.Attributes( name=name, data_product_assets_d_s_l=DataProductsAssetsDSL.get_asset_selection( asset_selection ), data_domain=DataDomain.ref_by_qualified_name(domain_qualified_name), qualified_name=f"{domain_qualified_name}/product/{name}", data_product_assets_playbook_filter=ASSETS_PLAYBOOK_FILTER, parent_domain_qualified_name=domain_qualified_name, super_domain_qualified_name=DataMesh.get_super_domain_qualified_name( domain_qualified_name ), daap_status=DataProductStatus.ACTIVE, ) attributes: DataProduct.Attributes = Field( default_factory=lambda: DataProduct.Attributes(), description=( "Map of attributes in the instance and their values. " "The specific keys of this map will vary by type, " "so are described in the sub-types of this schema." ), )
from .asset import Asset # noqa: E402, F401 from .data_domain import DataDomain # noqa: E402, F401