Source code for pyatlan.cache.user_cache
# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
from __future__ import annotations
from threading import Lock
from typing import TYPE_CHECKING, Dict, Iterable, Optional
from pyatlan.cache.common import UserCacheCommon
from pyatlan.errors import ErrorCode
from pyatlan.model.constants import SERVICE_ACCOUNT_
if TYPE_CHECKING:
from pyatlan.client.atlan import AtlanClient
lock = Lock()
[docs]
class UserCache:
"""
Lazily-loaded cache for translating Atlan-internal users into their various IDs.
"""
def __init__(self, client: AtlanClient):
self.client: AtlanClient = client
self.map_id_to_name: Dict[str, str] = {}
self.map_name_to_id: Dict[str, str] = {}
self.map_email_to_id: Dict[str, str] = {}
self.lock: Lock = Lock()
[docs]
def get_id_for_name(self, name: str) -> Optional[str]:
"""
Translate the provided human-readable username to its GUID.
:param name: human-readable name of the user
:returns: unique identifier (GUID) of the user
"""
return self._get_id_for_name(name=name)
[docs]
def get_id_for_email(self, email: str) -> Optional[str]:
"""
Translate the provided email to its GUID.
:param email: email address of the user
:returns: unique identifier (GUID) of the user
"""
return self._get_id_for_email(email=email)
[docs]
def get_name_for_id(self, idstr: str) -> Optional[str]:
"""
Translate the provided user GUID to the human-readable username.
:param idstr: unique identifier (GUID) of the user
:returns: username of the user
"""
return self._get_name_for_id(idstr=idstr)
[docs]
def validate_names(self, names: Iterable[str]):
"""
Validate that the given human-readable usernames are valid. A ValueError will be raised in any are not.
:param names: a collection of usernames to be checked
"""
return self._validate_names(names)
def _refresh_cache(self) -> None:
with self.lock:
users = [user for user in self.client.user.get_all()]
if not users:
return
(self.map_id_to_name, self.map_name_to_id, self.map_email_to_id) = (
UserCacheCommon.refresh_cache_data(users)
)
def _get_id_for_name(self, name: str) -> Optional[str]:
"""
Translate the provided human-readable username to its GUID.
:param name: human-readable name of the user
:returns: unique identifier (GUID) of the user
"""
if user_id := self.map_name_to_id.get(name):
return user_id
# If we are translating an API token,
# short-circuit any further cache refresh
if name.startswith(SERVICE_ACCOUNT_):
token = self.client.token.get_by_id(client_id=name)
if token and token.guid:
self.map_name_to_id[name] = token.guid
return token.guid
else:
raise ErrorCode.API_TOKEN_NOT_FOUND_BY_NAME.exception_with_parameters(
name
)
self._refresh_cache()
return self.map_name_to_id.get(name)
def _get_id_for_email(self, email: str) -> Optional[str]:
"""
Translate the provided email to its GUID.
:param email: email address of the user
:returns: unique identifier (GUID) of the user
"""
if user_id := self.map_email_to_id.get(email):
return user_id
self._refresh_cache()
return self.map_email_to_id.get(email)
def _get_name_for_id(self, idstr: str) -> Optional[str]:
"""
Translate the provided user GUID to the human-readable username.
:param idstr: unique identifier (GUID) of the user
:returns: username of the user
"""
if username := self.map_id_to_name.get(idstr):
return username
# If the username isn't found, check if it is an API token
token = self.client.token.get_by_guid(guid=idstr)
if token and token.client_id:
return token.username
else:
self._refresh_cache()
return self.map_id_to_name.get(idstr)
def _validate_names(self, names: Iterable[str]):
"""
Validate that the given human-readable usernames are valid. A ValueError will be raised in any are not.
:param names: a collection of usernames to be checked
"""
for username in names:
if not self.get_id_for_name(username) and not self.client.token.get_by_id(
username
):
raise ValueError(
f"Provided username {username} was not found in Atlan."
)