# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
from __future__ import annotations
from typing import TYPE_CHECKING, ClassVar, List, Optional, Set
from warnings import warn
from pydantic.v1 import Field, validator
from pyatlan.model.core import AtlanTagName
from pyatlan.model.enums import (
AuthPolicyCategory,
AuthPolicyResourceCategory,
AuthPolicyType,
DataAction,
PurposeMetadataAction,
)
from pyatlan.model.fields.atlan_fields import KeywordField
from pyatlan.utils import init_guid, validate_required_fields
from .core.access_control import AccessControl
from .core.asset import SelfAsset
from .core.auth_policy import AuthPolicy
if TYPE_CHECKING:
from pyatlan.client.atlan import AtlanClient
[docs]
class Purpose(AccessControl):
"""Description"""
@classmethod
@init_guid
def creator(cls, *, name: str, atlan_tags: List[AtlanTagName]) -> Purpose:
validate_required_fields(["name", "atlan_tags"], [name, atlan_tags])
attributes = Purpose.Attributes.create(name=name, atlan_tags=atlan_tags)
return cls(attributes=attributes)
@classmethod
@init_guid
def create(cls, *, name: str, atlan_tags: List[AtlanTagName]) -> Purpose:
warn(
(
"This method is deprecated, please use 'creator' "
"instead, which offers identical functionality."
),
DeprecationWarning,
stacklevel=2,
)
return cls.creator(name=name, atlan_tags=atlan_tags)
@classmethod
def create_metadata_policy(
cls,
*,
client: AtlanClient,
name: str,
purpose_id: str,
policy_type: AuthPolicyType,
actions: Set[PurposeMetadataAction],
policy_groups: Optional[Set[str]] = None,
policy_users: Optional[Set[str]] = None,
all_users: bool = False,
) -> AuthPolicy:
validate_required_fields(
["client", "name", "purpose_id", "policy_type", "actions"],
[client, name, purpose_id, policy_type, actions],
)
target_found = False
policy = AuthPolicy._AuthPolicy__create(name=name) # type: ignore[attr-defined]
policy.policy_actions = {x.value for x in actions}
policy.policy_category = AuthPolicyCategory.PURPOSE.value
policy.policy_type = policy_type
policy.policy_resource_category = AuthPolicyResourceCategory.TAG.value
policy.policy_service_name = "atlas_tag"
policy.policy_sub_category = "metadata"
purpose = Purpose()
purpose.guid = purpose_id
policy.access_control = purpose
if all_users:
target_found = True
policy.policy_groups = {"public"}
else:
if policy_groups:
for group_name in policy_groups:
if not client.group_cache.get_id_for_name(group_name):
raise ValueError(
f"Provided group name {group_name} was not found in Atlan."
)
target_found = True
policy.policy_groups = policy_groups
else:
policy.policy_groups = None
if policy_users:
for username in policy_users:
if not client.user_cache.get_id_for_name(username):
raise ValueError(
f"Provided username {username} was not found in Atlan."
)
target_found = True
policy.policy_users = policy_users
else:
policy.policy_users = None
if target_found:
return policy
else:
raise ValueError("No user or group specified for the policy.")
@classmethod
def create_data_policy(
cls,
*,
client: AtlanClient,
name: str,
purpose_id: str,
policy_type: AuthPolicyType,
policy_groups: Optional[Set[str]] = None,
policy_users: Optional[Set[str]] = None,
all_users: bool = False,
) -> AuthPolicy:
validate_required_fields(
["client", "name", "purpose_id", "policy_type"],
[client, name, purpose_id, policy_type],
)
policy = AuthPolicy._AuthPolicy__create(name=name) # type: ignore[attr-defined]
policy.policy_actions = {DataAction.SELECT.value}
policy.policy_category = AuthPolicyCategory.PURPOSE.value
policy.policy_type = policy_type
policy.policy_resource_category = AuthPolicyResourceCategory.TAG.value
policy.policy_service_name = "atlas_tag"
policy.policy_sub_category = "data"
purpose = Purpose()
purpose.guid = purpose_id
policy.access_control = purpose
if all_users:
target_found = True
policy.policy_groups = {"public"}
else:
if policy_groups:
for group_name in policy_groups:
if not client.group_cache.get_id_for_name(group_name):
raise ValueError(
f"Provided group name {group_name} was not found in Atlan."
)
target_found = True
policy.policy_groups = policy_groups
else:
policy.policy_groups = None
if policy_users:
for username in policy_users:
if not client.user_cache.get_id_for_name(username):
raise ValueError(
f"Provided username {username} was not found in Atlan."
)
target_found = True
policy.policy_users = policy_users
else:
policy.policy_users = None
if target_found:
return policy
else:
raise ValueError("No user or group specified for the policy.")
@classmethod
@init_guid
def updater(
cls: type[SelfAsset],
qualified_name: str = "",
name: str = "",
is_enabled: bool = True,
) -> SelfAsset:
validate_required_fields(
["name", "qualified_name", "is_enabled"],
[name, qualified_name, is_enabled],
)
return cls(
attributes=cls.Attributes(
qualified_name=qualified_name,
name=name,
is_access_control_enabled=is_enabled,
)
)
@classmethod
def create_for_modification(
cls,
qualified_name: str = "",
name: str = "",
is_enabled: bool = True,
) -> Purpose:
warn(
(
"This method is deprecated, please use 'updater' "
"instead, which offers identical functionality."
),
DeprecationWarning,
stacklevel=2,
)
return cls.updater(
qualified_name=qualified_name, name=name, is_enabled=is_enabled
)
type_name: str = Field(default="Purpose", allow_mutation=False)
@validator("type_name")
def validate_type_name(cls, v):
if v != "Purpose":
raise ValueError("must be Purpose")
return v
def __setattr__(self, name, value):
if name in Purpose._convenience_properties:
return object.__setattr__(self, name, value)
super().__setattr__(name, value)
PURPOSE_CLASSIFICATIONS: ClassVar[KeywordField] = KeywordField(
"purposeClassifications", "purposeClassifications"
)
"""
TBC
"""
_convenience_properties: ClassVar[List[str]] = [
"purpose_atlan_tags",
]
@property
def purpose_atlan_tags(self) -> Optional[List[AtlanTagName]]:
return None if self.attributes is None else self.attributes.purpose_atlan_tags
@purpose_atlan_tags.setter
def purpose_atlan_tags(self, purpose_atlan_tags: Optional[List[AtlanTagName]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.purpose_atlan_tags = purpose_atlan_tags
class Attributes(AccessControl.Attributes):
purpose_atlan_tags: Optional[List[AtlanTagName]] = Field(
default=None, description=""
)
@classmethod
@init_guid
def create(
cls, name: str, atlan_tags: List[AtlanTagName]
) -> Purpose.Attributes:
validate_required_fields(["name", "atlan_tags"], [name, atlan_tags])
return Purpose.Attributes(
qualified_name=name,
name=name,
display_name=name,
is_access_control_enabled=True,
description="",
purpose_atlan_tags=atlan_tags,
)
attributes: Purpose.Attributes = Field(
default_factory=lambda: Purpose.Attributes(),
description=(
"Map of attributes in the instance and their values. "
"The specific keys of this map will vary by type, "
"so are described in the sub-types of this schema."
),
)
Purpose.Attributes.update_forward_refs()