Source code for pyatlan.cache.role_cache
# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
from __future__ import annotations
from threading import Lock
from typing import TYPE_CHECKING, Dict, Iterable, Optional
from pyatlan.cache.common import RoleCacheCommon
from pyatlan.client.constants import GET_WHOAMI_USER
from pyatlan.model.role import AtlanRole
if TYPE_CHECKING:
from pyatlan.client.atlan import AtlanClient
lock: Lock = Lock()
[docs]
class RoleCache:
"""
Lazily-loaded cache for translating Atlan-internal roles into their various IDs.
"""
def __init__(self, client: AtlanClient):
self.client: AtlanClient = client
self.cache_by_id: Dict[str, AtlanRole] = {}
self.map_id_to_name: Dict[str, str] = {}
self.map_name_to_id: Dict[str, str] = {}
self.lock: Lock = Lock()
self._is_api_token_user: Optional[bool] = None
[docs]
def get_id_for_name(self, name: str) -> Optional[str]:
"""
Translate the provided human-readable role name to its GUID.
:param name: human-readable name of the role
:returns: unique identifier (GUID) of the role
"""
return self._get_id_for_name(name=name)
[docs]
def get_name_for_id(self, idstr: str) -> Optional[str]:
"""
Translate the provided role GUID to the human-readable role name.
:param idstr: unique identifier (GUID) of the role
:returns: human-readable name of the role
"""
return self._get_name_for_id(idstr=idstr)
[docs]
def validate_idstrs(self, idstrs: Iterable[str]):
"""
Validate that the given role GUIDs are valid. A ValueError will be raised in any are not.
:param idstrs: a collection of unique identifiers (GUID) of the roles to be checked
"""
return self._validate_idstrs(idstrs=idstrs)
def _refresh_cache(self) -> None:
with self.lock:
response = self.client.role.get(
limit=100, post_filter='{"name":{"$ilike":"$%"}}'
)
if not response:
return
# Process response using shared logic
(self.cache_by_id, self.map_id_to_name, self.map_name_to_id) = (
RoleCacheCommon.refresh_cache_data(response.records)
)
def _get_id_for_name(self, name: str) -> Optional[str]:
"""
Translate the provided human-readable role name to its GUID.
:param name: human-readable name of the role
:returns: unique identifier (GUID) of the role
"""
if role_id := self.map_name_to_id.get(name):
return role_id
self._refresh_cache()
return self.map_name_to_id.get(name)
def _get_name_for_id(self, idstr: str) -> Optional[str]:
"""
Translate the provided role GUID to the human-readable role name.
:param idstr: unique identifier (GUID) of the role
:returns: human-readable name of the role
"""
if role_name := self.map_id_to_name.get(idstr):
return role_name
self._refresh_cache()
return self.map_id_to_name.get(idstr)
def _validate_idstrs(self, idstrs: Iterable[str]):
"""
Validate that the given role GUIDs are valid. A ValueError will be raised in any are not.
:param idstrs: a collection of unique identifiers (GUID) of the roles to be checked
"""
for role_id in idstrs:
if not self.get_name_for_id(role_id):
raise ValueError(f"Provided role ID {role_id} was not found in Atlan.")
[docs]
def is_api_token_user(self) -> bool:
"""
Check if the current user is authenticated via an API token or OAuth client.
This method checks for the presence of $api-token-default-access or $admin roles.
:returns: True if the user is an API token user, False otherwise
"""
if self._is_api_token_user is not None:
return self._is_api_token_user
# Fetch role mappings for the current user
raw_json = self.client._call_api(api=GET_WHOAMI_USER)
# Check if the user has $api-token-default-access or $admin roles
for role_mapping in raw_json["roles"]:
role_name = role_mapping.get("name")
if role_name in ["$api-token-default-access", "$admin"]:
self._is_api_token_user = True
return self._is_api_token_user
self._is_api_token_user = False
return self._is_api_token_user