Skip to content

Client

AtlanClient

pyatlan.client.atlan

Classes

AtlanClient(**data)

Bases: BaseSettings

Source code in pyatlan/client/atlan.py
def __init__(self, **data):
    super().__init__(**data)

    if self.oauth_client_id and self.oauth_client_secret and self.api_key is None:
        LOGGER.debug("API KEY not provided. Using OAuth flow for authentication")

        final_base_url = self.base_url or os.environ.get(
            "ATLAN_BASE_URL", "INTERNAL"
        )
        final_oauth_client_id = self.oauth_client_id or os.environ.get(
            "ATLAN_OAUTH_CLIENT_ID"
        )
        final_oauth_client_secret = self.oauth_client_secret or os.environ.get(
            "ATLAN_OAUTH_CLIENT_SECRET"
        )
        self._oauth_token_manager = OAuthTokenManager(
            base_url=final_base_url,
            client_id=final_oauth_client_id,
            client_secret=final_oauth_client_secret,
            connect_timeout=self.connect_timeout,
            read_timeout=self.read_timeout,
        )
        self._request_params = {"headers": {}}
    else:
        self._request_params = (
            {"headers": {"authorization": f"Bearer {self.api_key}"}}
            if self.api_key and self.api_key.strip()
            else {"headers": {}}
        )

    # Build proxy/SSL configuration with environment variable fallback
    transport_kwargs = self._build_transport_proxy_config(data)
    # Configure httpx client with custom transport that supports retry and proxy
    # Note: We pass proxy/SSL config to the transport, not the client,
    # so that retry logic properly respects these settings
    self._session = httpx.Client(
        transport=PyatlanSyncTransport(retry=self.retry, **transport_kwargs),
        headers={
            "x-atlan-agent": "sdk",
            "x-atlan-agent-id": "python",
            "x-atlan-client-origin": "product_sdk",
            "x-atlan-python-version": get_python_version(),
            "x-atlan-client-type": "sync",
            "User-Agent": f"Atlan-PythonSDK/{VERSION}",
        },
        event_hooks={"response": [log_response]},
    )
    self._401_has_retried.set(False)
Functions
add_api_token_as_admin(asset_guid: str, impersonation_token: str) -> Optional[AssetMutationResponse]

Deprecated - use user.add_as_admin() instead.

Source code in pyatlan/client/atlan.py
def add_api_token_as_admin(
    self, asset_guid: str, impersonation_token: str
) -> Optional[AssetMutationResponse]:
    """Deprecated - use user.add_as_admin() instead."""
    warn(
        "This method is deprecated, please use 'user.add_as_admin' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.user.add_as_admin(
        asset_guid=asset_guid, impersonation_token=impersonation_token
    )
add_api_token_as_viewer(asset_guid: str, impersonation_token: str) -> Optional[AssetMutationResponse]

Deprecated - use user.add_as_viewer() instead.

Source code in pyatlan/client/atlan.py
def add_api_token_as_viewer(
    self, asset_guid: str, impersonation_token: str
) -> Optional[AssetMutationResponse]:
    """Deprecated - use user.add_as_viewer() instead."""
    warn(
        "This method is deprecated, please use 'user.add_as_viewer' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.user.add_as_viewer(
        asset_guid=asset_guid, impersonation_token=impersonation_token
    )
add_atlan_tags(asset_type: Type[A], qualified_name: str, atlan_tag_names: List[str], propagate: bool = False, remove_propagation_on_delete: bool = True, restrict_lineage_propagation: bool = False, restrict_propagation_through_hierarchy: bool = False) -> None

Deprecated - use asset.add_atlan_tags() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def add_atlan_tags(
    self,
    asset_type: Type[A],
    qualified_name: str,
    atlan_tag_names: List[str],
    propagate: bool = False,
    remove_propagation_on_delete: bool = True,
    restrict_lineage_propagation: bool = False,
    restrict_propagation_through_hierarchy: bool = False,
) -> None:
    """Deprecated - use asset.add_atlan_tags() instead."""
    warn(
        "This method is deprecated, please use 'asset.add_atlan_tags' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    self.asset.add_atlan_tags(
        asset_type=asset_type,
        qualified_name=qualified_name,
        atlan_tag_names=atlan_tag_names,
        propagate=propagate,
        remove_propagation_on_delete=remove_propagation_on_delete,
        restrict_lineage_propagation=restrict_lineage_propagation,
        restrict_propagation_through_hierarchy=restrict_propagation_through_hierarchy,
    )
add_user_to_groups(guid: str, group_ids: List[str]) -> None

Deprecated - use user.add_to_groups() instead.

Source code in pyatlan/client/atlan.py
def add_user_to_groups(
    self,
    guid: str,
    group_ids: List[str],
) -> None:
    """Deprecated - use user.add_to_groups() instead."""
    warn(
        "This method is deprecated, please use 'user.add_to_groups' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    self.user.add_to_groups(guid=guid, group_ids=group_ids)
append_terms(asset_type: Type[A], terms: List[AtlasGlossaryTerm], guid: Optional[str] = None, qualified_name: Optional[str] = None) -> A

Deprecated - use asset.append_terms() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def append_terms(
    self,
    asset_type: Type[A],
    terms: List[AtlasGlossaryTerm],
    guid: Optional[str] = None,
    qualified_name: Optional[str] = None,
) -> A:
    """Deprecated - use asset.append_terms() instead."""
    warn(
        "This method is deprecated, please use 'asset.append_terms' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.append_terms(
        asset_type=asset_type, terms=terms, guid=guid, qualified_name=qualified_name
    )
change_user_role(guid: str, role_id: str) -> None

Deprecated - use user.change_role() instead.

Source code in pyatlan/client/atlan.py
def change_user_role(
    self,
    guid: str,
    role_id: str,
) -> None:
    """Deprecated - use user.change_role() instead."""
    warn(
        "This method is deprecated, please use 'user.change_role' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    self.user.change_role(guid=guid, role_id=role_id)
create_api_token(display_name: str, description: str = '', personas: Optional[Set[str]] = None, validity_seconds: int = -1) -> ApiToken

Deprecated - use token.create() instead.

Source code in pyatlan/client/atlan.py
def create_api_token(
    self,
    display_name: str,
    description: str = "",
    personas: Optional[Set[str]] = None,
    validity_seconds: int = -1,
) -> ApiToken:
    """Deprecated - use token.create() instead."""
    warn(
        "This method is deprecated, please use 'token.create' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.token.create(
        display_name=display_name,
        description=description,
        personas=personas,
        validity_seconds=validity_seconds,
    )
create_group(group: AtlanGroup, user_ids: Optional[List[str]] = None) -> CreateGroupResponse

Deprecated - use group.create() instead.

Source code in pyatlan/client/atlan.py
def create_group(
    self,
    group: AtlanGroup,
    user_ids: Optional[List[str]] = None,
) -> CreateGroupResponse:
    """Deprecated - use group.create() instead."""
    warn(
        "This method is deprecated, please use 'group.create' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.group.create(group=group, user_ids=user_ids)
create_typedef(typedef: TypeDef) -> TypeDefResponse

Deprecated - use typedef.create() instead.

Source code in pyatlan/client/atlan.py
def create_typedef(self, typedef: TypeDef) -> TypeDefResponse:
    """Deprecated - use typedef.create() instead."""
    warn(
        "This method is deprecated, please use 'typedef.create' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.typedef.create(typedef=typedef)
create_users(users: List[AtlanUser]) -> None

Deprecated - use user.create() instead.

Source code in pyatlan/client/atlan.py
def create_users(
    self,
    users: List[AtlanUser],
) -> None:
    """Deprecated - use user.create() instead."""
    warn(
        "This method is deprecated, please use 'user.create' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    self.user.create(users=users)
delete_entity_by_guid(guid: Union[str, List[str]]) -> AssetMutationResponse

Deprecated - use asset.delete_by_guid() instead.

Source code in pyatlan/client/atlan.py
def delete_entity_by_guid(
    self, guid: Union[str, List[str]]
) -> AssetMutationResponse:
    """Deprecated - use asset.delete_by_guid() instead."""
    warn(
        "This method is deprecated, please use 'asset.delete_by_guid' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.delete_by_guid(guid=guid)
find_category_by_name(name: constr(strip_whitespace=True, min_length=1, strict=True), glossary_name: constr(strip_whitespace=True, min_length=1, strict=True), attributes: Optional[List[StrictStr]] = None) -> List[AtlasGlossaryCategory]

Deprecated - use asset.find_category_by_name() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def find_category_by_name(
    self,
    name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    glossary_name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    attributes: Optional[List[StrictStr]] = None,
) -> List[AtlasGlossaryCategory]:
    """Deprecated - use asset.find_category_by_name() instead."""
    warn(
        "This method is deprecated, please use 'asset.find_category_by_name' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.find_category_by_name(
        name=name, glossary_name=glossary_name, attributes=attributes
    )
find_category_fast_by_name(name: constr(strip_whitespace=True, min_length=1, strict=True), glossary_qualified_name: constr(strip_whitespace=True, min_length=1, strict=True), attributes: Optional[List[StrictStr]] = None) -> List[AtlasGlossaryCategory]

Deprecated - use asset.find_category_fast_by_name() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def find_category_fast_by_name(
    self,
    name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    glossary_qualified_name: constr(  # type: ignore
        strip_whitespace=True, min_length=1, strict=True
    ),
    attributes: Optional[List[StrictStr]] = None,
) -> List[AtlasGlossaryCategory]:
    """Deprecated - use asset.find_category_fast_by_name() instead."""
    warn(
        "This method is deprecated, please use 'asset.find_category_fast_by_name' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.find_category_fast_by_name(
        name=name,
        glossary_qualified_name=glossary_qualified_name,
        attributes=attributes,
    )
find_connections_by_name(name: str, connector_type: AtlanConnectorType, attributes: Optional[List[str]] = None) -> List[Connection]

Deprecated - use asset.find_connections_by_name() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def find_connections_by_name(
    self,
    name: str,
    connector_type: AtlanConnectorType,
    attributes: Optional[List[str]] = None,
) -> List[Connection]:
    """Deprecated - use asset.find_connections_by_name() instead."""
    warn(
        "This method is deprecated, please use 'asset.find_connections_by_name' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.find_connections_by_name(
        name=name, connector_type=connector_type, attributes=attributes
    )
find_glossary_by_name(name: constr(strip_whitespace=True, min_length=1, strict=True), attributes: Optional[List[StrictStr]] = None) -> AtlasGlossary

Deprecated - use asset.find_glossary_by_name() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def find_glossary_by_name(
    self,
    name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    attributes: Optional[List[StrictStr]] = None,
) -> AtlasGlossary:
    """Deprecated - use asset.find_glossary_by_name() instead."""
    warn(
        "This method is deprecated, please use 'asset.find_glossary_by_name' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.find_glossary_by_name(name=name, attributes=attributes)
find_personas_by_name(name: str, attributes: Optional[List[str]] = None) -> List[Persona]

Deprecated - use asset.find_personas_by_name() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def find_personas_by_name(
    self,
    name: str,
    attributes: Optional[List[str]] = None,
) -> List[Persona]:
    """Deprecated - use asset.find_personas_by_name() instead."""
    warn(
        "This method is deprecated, please use 'asset.find_personas_by_name' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.find_personas_by_name(name=name, attributes=attributes)
find_purposes_by_name(name: str, attributes: Optional[List[str]] = None) -> List[Purpose]

Deprecated - use asset.find_personas_by_name() instead.

Source code in pyatlan/client/atlan.py
def find_purposes_by_name(
    self,
    name: str,
    attributes: Optional[List[str]] = None,
) -> List[Purpose]:
    """Deprecated - use asset.find_personas_by_name() instead."""
    warn(
        "This method is deprecated, please use 'asset.find_purposes_by_name' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.find_purposes_by_name(name=name, attributes=attributes)
find_term_by_name(name: constr(strip_whitespace=True, min_length=1, strict=True), glossary_name: constr(strip_whitespace=True, min_length=1, strict=True), attributes: Optional[List[StrictStr]] = None) -> AtlasGlossaryTerm

Deprecated - use asset.find_term_by_name() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def find_term_by_name(
    self,
    name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    glossary_name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    attributes: Optional[List[StrictStr]] = None,
) -> AtlasGlossaryTerm:
    """Deprecated - use asset.find_term_by_name() instead."""
    warn(
        "This method is deprecated, please use 'asset.find_term_by_name' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.find_term_by_name(
        name=name, glossary_name=glossary_name, attributes=attributes
    )
find_term_fast_by_name(name: constr(strip_whitespace=True, min_length=1, strict=True), glossary_qualified_name: constr(strip_whitespace=True, min_length=1, strict=True), attributes: Optional[List[StrictStr]] = None) -> AtlasGlossaryTerm

Deprecated - use asset.find_category_by_name() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def find_term_fast_by_name(
    self,
    name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    glossary_qualified_name: constr(  # type: ignore
        strip_whitespace=True, min_length=1, strict=True
    ),
    attributes: Optional[List[StrictStr]] = None,
) -> AtlasGlossaryTerm:
    """Deprecated - use asset.find_category_by_name() instead."""
    warn(
        "This method is deprecated, please use 'asset.find_category_by_name' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.find_term_fast_by_name(
        name=name,
        glossary_qualified_name=glossary_qualified_name,
        attributes=attributes,
    )
from_token_guid(guid: str, base_url: Optional[str] = None, client_id: Optional[str] = None, client_secret: Optional[str] = None) -> AtlanClient classmethod

Create an AtlanClient instance using an API token GUID.

This method performs a multi-step authentication flow: 1. Obtains Atlan-Argo (superuser) access token 2. Uses Argo token to retrieve the API token's client credentials 3. Exchanges those credentials for an access token 4. Returns a new AtlanClient authenticated with the resolved token

:param guid: API token GUID to resolve :param base_url: Optional base URL for the Atlan service(overrides ATLAN_BASE_URL environment variable) :param client_id: Optional client ID for authentication (overrides CLIENT_ID environment variable) :param client_secret: Optional client secret for authentication (overrides CLIENT_SECRET environment variable) :returns: a new client instance authenticated with the resolved token :raises: ErrorCode.UNABLE_TO_ESCALATE_WITH_PARAM: If any step in the token resolution fails

Source code in pyatlan/client/atlan.py
@classmethod
def from_token_guid(
    cls,
    guid: str,
    base_url: Optional[str] = None,
    client_id: Optional[str] = None,
    client_secret: Optional[str] = None,
) -> AtlanClient:
    """
    Create an AtlanClient instance using an API token GUID.

    This method performs a multi-step authentication flow:
    1. Obtains Atlan-Argo (superuser) access token
    2. Uses Argo token to retrieve the API token's client credentials
    3. Exchanges those credentials for an access token
    4. Returns a new AtlanClient authenticated with the resolved token

    :param guid: API token GUID to resolve
    :param base_url: Optional base URL for the Atlan service(overrides ATLAN_BASE_URL environment variable)
    :param client_id: Optional client ID for authentication (overrides CLIENT_ID environment variable)
    :param client_secret: Optional client secret for authentication (overrides CLIENT_SECRET environment variable)
    :returns: a new client instance authenticated with the resolved token
    :raises: ErrorCode.UNABLE_TO_ESCALATE_WITH_PARAM: If any step in the token resolution fails
    """
    final_base_url = base_url or os.environ.get("ATLAN_BASE_URL", "INTERNAL")

    # Step 1: Initialize base client and get Atlan-Argo credentials
    # Note: Using empty api_key as we're bootstrapping authentication
    client = AtlanClient(base_url=final_base_url, api_key="")
    client_info = ImpersonateUser.get_client_info(
        client_id=client_id, client_secret=client_secret
    )

    # Prepare credentials for Atlan-Argo token request
    argo_credentials = {
        "grant_type": "client_credentials",
        "client_id": client_info.client_id,
        "client_secret": client_info.client_secret,
        "scope": "openid",
    }

    # Step 2: Obtain Atlan-Argo (superuser) access token
    try:
        raw_json = client._call_api(GET_TOKEN, request_obj=argo_credentials)
        argo_token = AccessTokenResponse(**raw_json).access_token
        temp_argo_client = AtlanClient(base_url=final_base_url, api_key=argo_token)
    except AtlanError as atlan_err:
        raise ErrorCode.UNABLE_TO_ESCALATE_WITH_PARAM.exception_with_parameters(
            "Failed to obtain Atlan-Argo token"
        ) from atlan_err

    # Step 3: Use Argo client to retrieve API token's credentials
    # Both endpoints require authentication, hence using the Argo token
    token_secret = temp_argo_client.impersonate.get_client_secret(client_guid=guid)
    token_client_id = temp_argo_client.token.get_by_guid(  # type: ignore[union-attr]
        guid=guid
    ).client_id

    # Step 4: Exchange API token credentials for access token
    token_credentials = {
        "grant_type": "client_credentials",
        "client_id": token_client_id,
        "client_secret": token_secret,
        "scope": "openid",
    }

    try:
        raw_json = client._call_api(GET_TOKEN, request_obj=token_credentials)
        token_api_key = AccessTokenResponse(**raw_json).access_token

        # Step 5: Create and return the authenticated client
        return AtlanClient(base_url=final_base_url, api_key=token_api_key)
    except AtlanError as atlan_err:
        raise ErrorCode.UNABLE_TO_ESCALATE_WITH_PARAM.exception_with_parameters(
            "Failed to obtain access token for API token"
        ) from atlan_err
get_admin_events(admin_request: AdminEventRequest) -> AdminEventResponse

Deprecated - use admin.get_admin_events() instead.

Source code in pyatlan/client/atlan.py
def get_admin_events(self, admin_request: AdminEventRequest) -> AdminEventResponse:
    """Deprecated - use admin.get_admin_events() instead."""
    warn(
        "This method is deprecated, please use 'admin.get_admin_events' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.admin.get_admin_events(admin_request=admin_request)
get_all_groups(limit: int = 20) -> GroupResponse

Deprecated - use group.get_all() instead.

Source code in pyatlan/client/atlan.py
def get_all_groups(
    self,
    limit: int = 20,
) -> GroupResponse:
    """Deprecated - use group.get_all() instead."""
    warn(
        "This method is deprecated, please use 'group.get_all' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.group.get_all(limit=limit)
get_all_roles() -> RoleResponse

Deprecated - use self.role.get_all() instead.

Source code in pyatlan/client/atlan.py
def get_all_roles(self) -> RoleResponse:
    """Deprecated - use self.role.get_all() instead."""
    warn(
        "This method is deprecated, please use 'self.role.get_all' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.role.get_all()
get_all_typedefs() -> TypeDefResponse

Deprecated - use typedef.get_all() instead.

Source code in pyatlan/client/atlan.py
def get_all_typedefs(self) -> TypeDefResponse:
    """Deprecated - use typedef.get_all() instead."""
    warn(
        "This method is deprecated, please use 'typedef.get_all' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.typedef.get_all()
get_all_users(limit: int = 20) -> UserResponse

Deprecated - use user.get_all() instead.

Source code in pyatlan/client/atlan.py
def get_all_users(
    self,
    limit: int = 20,
) -> UserResponse:
    """Deprecated - use user.get_all() instead."""
    warn(
        "This method is deprecated, please use 'user.get_all' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.user.get_all(limit=limit)
get_api_token_by_id(client_id: str) -> Optional[ApiToken]

Deprecated - use token.get_by_id() instead.

Source code in pyatlan/client/atlan.py
def get_api_token_by_id(self, client_id: str) -> Optional[ApiToken]:
    """Deprecated - use token.get_by_id() instead."""
    warn(
        "This method is deprecated, please use 'token.get_by_id' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.token.get_by_id(client_id=client_id)
get_api_token_by_name(display_name: str) -> Optional[ApiToken]

Deprecated - use token.get_by_name() instead.

Source code in pyatlan/client/atlan.py
def get_api_token_by_name(self, display_name: str) -> Optional[ApiToken]:
    """Deprecated - use token.get_by_name() instead."""
    warn(
        "This method is deprecated, please use 'token.get_by_name' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.token.get_by_name(display_name=display_name)
get_api_tokens(limit: Optional[int] = None, post_filter: Optional[str] = None, sort: Optional[str] = None, count: bool = True, offset: int = 0) -> ApiTokenResponse

Deprecated - use token.get() instead.

Source code in pyatlan/client/atlan.py
def get_api_tokens(
    self,
    limit: Optional[int] = None,
    post_filter: Optional[str] = None,
    sort: Optional[str] = None,
    count: bool = True,
    offset: int = 0,
) -> ApiTokenResponse:
    """Deprecated - use token.get() instead."""
    warn(
        "This method is deprecated, please use 'token.get' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.token.get(
        limit=limit, post_filter=post_filter, sort=sort, count=count, offset=offset
    )
get_asset_by_guid(guid: str, asset_type: Type[A], min_ext_info: bool = False, ignore_relationships: bool = False) -> A

Deprecated - use asset.get_by_guid() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def get_asset_by_guid(
    self,
    guid: str,
    asset_type: Type[A],
    min_ext_info: bool = False,
    ignore_relationships: bool = False,
) -> A:
    """Deprecated - use asset.get_by_guid() instead."""
    warn(
        "This method is deprecated, please use 'asset.get_by_guid' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.get_by_guid(
        guid=guid,
        asset_type=asset_type,
        min_ext_info=min_ext_info,
        ignore_relationships=ignore_relationships,
    )
get_asset_by_qualified_name(qualified_name: str, asset_type: Type[A], min_ext_info: bool = False, ignore_relationships: bool = False) -> A

Deprecated - use asset.get_by_qualified_name() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def get_asset_by_qualified_name(
    self,
    qualified_name: str,
    asset_type: Type[A],
    min_ext_info: bool = False,
    ignore_relationships: bool = False,
) -> A:
    """Deprecated - use asset.get_by_qualified_name() instead."""
    warn(
        "This method is deprecated, please use 'asset.get_by_qualified_name' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.get_by_qualified_name(
        qualified_name=qualified_name,
        asset_type=asset_type,
        min_ext_info=min_ext_info,
        ignore_relationships=ignore_relationships,
    )
get_current_user() -> UserMinimalResponse

Deprecated - use user.get_current() instead.

Source code in pyatlan/client/atlan.py
def get_current_user(
    self,
) -> UserMinimalResponse:
    """Deprecated - use user.get_current() instead."""
    warn(
        "This method is deprecated, please use 'user.get_current' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.user.get_current()
get_group_by_name(alias: str, limit: int = 20) -> Optional[GroupResponse]

Deprecated - use group.get_by_name() instead.

Source code in pyatlan/client/atlan.py
def get_group_by_name(
    self,
    alias: str,
    limit: int = 20,
) -> Optional[GroupResponse]:
    """Deprecated - use group.get_by_name() instead."""
    warn(
        "This method is deprecated, please use 'group.get_by_name' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.group.get_by_name(alias=alias, limit=limit)
get_group_members(guid: str) -> UserResponse

Deprecated - use group.get_members() instead.

Source code in pyatlan/client/atlan.py
def get_group_members(self, guid: str) -> UserResponse:
    """Deprecated - use group.get_members() instead."""
    warn(
        "This method is deprecated, please use 'group.get_members' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.group.get_members(guid=guid)
get_groups(limit: Optional[int] = None, post_filter: Optional[str] = None, sort: Optional[str] = None, count: bool = True, offset: int = 0) -> GroupResponse

Deprecated - use group.get() instead.

Source code in pyatlan/client/atlan.py
def get_groups(
    self,
    limit: Optional[int] = None,
    post_filter: Optional[str] = None,
    sort: Optional[str] = None,
    count: bool = True,
    offset: int = 0,
) -> GroupResponse:
    """Deprecated - use group.get() instead."""
    warn(
        "This method is deprecated, please use 'group.get' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.group.get(
        limit=limit, post_filter=post_filter, sort=sort, count=count, offset=offset
    )
get_groups_for_user(guid: str) -> GroupResponse

Deprecated - use user.get_groups() instead.

Source code in pyatlan/client/atlan.py
def get_groups_for_user(
    self,
    guid: str,
) -> GroupResponse:
    """Deprecated - use user.get_groups() instead."""
    warn(
        "This method is deprecated, please use 'user.get_groups' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.user.get_groups(guid=guid)
get_keycloak_events(keycloak_request: KeycloakEventRequest) -> KeycloakEventResponse

Deprecated - use admin.get_keycloak_events() instead.

Source code in pyatlan/client/atlan.py
def get_keycloak_events(
    self, keycloak_request: KeycloakEventRequest
) -> KeycloakEventResponse:
    """Deprecated - use admin.get_keycloak_events() instead."""
    warn(
        "This method is deprecated, please use 'admin.get_keycloak_events' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.admin.get_keycloak_events(keycloak_request=keycloak_request)
get_lineage_list(lineage_request: LineageListRequest) -> LineageListResults

Deprecated - use asset.get_lineage_list() instead.

Source code in pyatlan/client/atlan.py
def get_lineage_list(
    self, lineage_request: LineageListRequest
) -> LineageListResults:
    """Deprecated - use asset.get_lineage_list() instead."""
    warn(
        "This method is deprecated, please use 'asset.get_lineage_list' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.get_lineage_list(lineage_request=lineage_request)
get_roles(limit: int, post_filter: Optional[str] = None, sort: Optional[str] = None, count: bool = True, offset: int = 0) -> RoleResponse

Deprecated - use role.get() instead.

Source code in pyatlan/client/atlan.py
def get_roles(
    self,
    limit: int,
    post_filter: Optional[str] = None,
    sort: Optional[str] = None,
    count: bool = True,
    offset: int = 0,
) -> RoleResponse:
    """Deprecated - use role.get() instead."""
    warn(
        "This method is deprecated, please use 'role.get' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.role.get(
        limit=limit, post_filter=post_filter, sort=sort, count=count, offset=offset
    )
get_typedefs(type_category: Union[AtlanTypeCategory, List[AtlanTypeCategory]]) -> TypeDefResponse

Deprecated - use typedef.get() instead.

Source code in pyatlan/client/atlan.py
def get_typedefs(
    self, type_category: Union[AtlanTypeCategory, List[AtlanTypeCategory]]
) -> TypeDefResponse:
    """Deprecated - use typedef.get() instead."""
    warn(
        "This method is deprecated, please use 'typedef.get' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.typedef.get(type_category=type_category)
get_user_by_username(username: str) -> Optional[AtlanUser]

Deprecated - use user.get_by_username() instead.

Source code in pyatlan/client/atlan.py
def get_user_by_username(self, username: str) -> Optional[AtlanUser]:
    """Deprecated - use user.get_by_username() instead."""
    warn(
        "This method is deprecated, please use 'user.get_by_username' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.user.get_by_username(username=username)
get_users(limit: Optional[int] = None, post_filter: Optional[str] = None, sort: Optional[str] = None, count: bool = True, offset: int = 0) -> UserResponse

Deprecated - use user.get() instead.

Source code in pyatlan/client/atlan.py
def get_users(
    self,
    limit: Optional[int] = None,
    post_filter: Optional[str] = None,
    sort: Optional[str] = None,
    count: bool = True,
    offset: int = 0,
) -> UserResponse:
    """Deprecated - use user.get() instead."""
    warn(
        "This method is deprecated, please use 'user.get' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.user.get(
        limit=limit, post_filter=post_filter, sort=sort, count=count, offset=offset
    )
get_users_by_email(email: str, limit: int = 20) -> Optional[UserResponse]

Deprecated - use user.get_by_email() instead.

Source code in pyatlan/client/atlan.py
def get_users_by_email(
    self,
    email: str,
    limit: int = 20,
) -> Optional[UserResponse]:
    """Deprecated - use user.get_by_email() instead."""
    warn(
        "This method is deprecated, please use 'user.get_by_email' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.user.get_by_email(email=email, limit=limit)
max_retries(max_retries: Retry = CONNECTION_RETRY) -> _GeneratorContextManager[None]

Creates a context manger that can used to temporarily change parameters used for retrying connnections. The original Retry information will be restored when the context is exited.

Source code in pyatlan/client/atlan.py
@contextlib.contextmanager  # type: ignore[misc,arg-type]
def max_retries(  # type: ignore[misc]
    self, max_retries: Retry = CONNECTION_RETRY
) -> _GeneratorContextManager[None]:
    """Creates a context manger that can used to temporarily change parameters used for retrying connnections.
    The original Retry information will be restored when the context is exited."""
    # Store current transport and create new one with updated retries
    current_transport = self._session._transport

    # Build transport kwargs with current proxy/SSL settings
    transport_kwargs = {}
    if self.proxy:
        transport_kwargs["proxy"] = self.proxy
    if self.verify is not None:
        transport_kwargs["verify"] = self.verify

    new_transport = PyatlanSyncTransport(retry=max_retries, **transport_kwargs)
    self._session._transport = new_transport

    LOGGER.debug(
        "max_retries set to total: %s force_list: %s",
        max_retries.total,
        max_retries.status_forcelist,
    )
    try:
        LOGGER.debug("Entering max_retries")
        yield None
        LOGGER.debug("Exiting max_retries")
    except httpx.TransportError as err:
        LOGGER.exception("Exception in max retries")
        raise ErrorCode.RETRY_OVERRUN.exception_with_parameters() from err
    finally:
        # Restore original transport
        self._session._transport = current_transport
        LOGGER.debug("max_retries restored %s", self._session._transport.retry)  # type: ignore[attr-defined]
parse_query(query: QueryParserRequest) -> Optional[ParsedQuery]

Parses the provided query to describe its component parts.

:param query: query to parse and configuration options :returns: parsed explanation of the query :raises AtlanError: on any API communication issue

Source code in pyatlan/client/atlan.py
@validate_arguments
def parse_query(self, query: QueryParserRequest) -> Optional[ParsedQuery]:
    """
    Parses the provided query to describe its component parts.

    :param query: query to parse and configuration options
    :returns: parsed explanation of the query
    :raises AtlanError: on any API communication issue
    """
    raw_json = self._call_api(
        PARSE_QUERY,
        request_obj=query,
        exclude_unset=True,
    )
    return ParsedQuery(**raw_json)
purge_api_token(guid: str) -> None

Deprecated - use token.purge() instead.

Source code in pyatlan/client/atlan.py
def purge_api_token(self, guid: str) -> None:
    """Deprecated - use token.purge() instead."""
    warn(
        "This method is deprecated, please use 'token.purge' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    self.token.purge(guid=guid)
purge_entity_by_guid(guid: Union[str, List[str]]) -> AssetMutationResponse

Deprecated - use asset.purge_by_guid() instead.

Source code in pyatlan/client/atlan.py
def purge_entity_by_guid(
    self, guid: Union[str, List[str]]
) -> AssetMutationResponse:
    """Deprecated - use asset.purge_by_guid() instead."""
    warn(
        "This method is deprecated, please use 'asset.purge_by_guid' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.purge_by_guid(guid=guid)
purge_group(guid: str) -> None

Deprecated - use group.purge() instead.

Source code in pyatlan/client/atlan.py
def purge_group(
    self,
    guid: str,
) -> None:
    """Deprecated - use group.purge() instead."""
    warn(
        "This method is deprecated, please use 'group.purge' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    self.group.purge(guid=guid)
purge_typedef(name: str, typedef_type: type) -> None

Deprecated - use typedef.purge() instead.

Source code in pyatlan/client/atlan.py
def purge_typedef(self, name: str, typedef_type: type) -> None:
    """Deprecated - use typedef.purge() instead."""
    warn(
        "This method is deprecated, please use 'typedef.purge' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    self.typedef.purge(name=name, typedef_type=typedef_type)
remove_announcement(asset_type: Type[A], qualified_name: str, name: str) -> Optional[A]

Deprecated - use asset.remove_announcement() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def remove_announcement(
    self, asset_type: Type[A], qualified_name: str, name: str
) -> Optional[A]:
    """Deprecated - use asset.remove_announcement() instead."""
    warn(
        "This method is deprecated, please use 'asset.remove_announcement' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.remove_announcement(
        asset_type=asset_type, qualified_name=qualified_name, name=name
    )
remove_atlan_tag(asset_type: Type[A], qualified_name: str, atlan_tag_name: str) -> None

Deprecated - use asset.remove_atlan_tag() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def remove_atlan_tag(
    self, asset_type: Type[A], qualified_name: str, atlan_tag_name: str
) -> None:
    """Deprecated - use asset.remove_atlan_tag() instead."""
    warn(
        "This method is deprecated, please use 'asset.remove_atlan_tag' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    self.asset.remove_atlan_tag(
        asset_type=asset_type,
        qualified_name=qualified_name,
        atlan_tag_name=atlan_tag_name,
    )
remove_certificate(asset_type: Type[A], qualified_name: str, name: str) -> Optional[A]

Deprecated - use asset.remove_certificate() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def remove_certificate(
    self, asset_type: Type[A], qualified_name: str, name: str
) -> Optional[A]:
    """Deprecated - use asset.remove_certificate() instead."""
    warn(
        "This method is deprecated, please use 'asset.remove_certificate' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.remove_certificate(
        asset_type=asset_type, qualified_name=qualified_name, name=name
    )
remove_custom_metadata(guid: str, cm_name: str)

Deprecated - use asset.remove_custom_metadata() instead.

Source code in pyatlan/client/atlan.py
def remove_custom_metadata(self, guid: str, cm_name: str):
    """Deprecated - use asset.remove_custom_metadata() instead."""
    warn(
        "This method is deprecated, please use 'asset.remove_custom_metadata' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    self.asset.remove_custom_metadata(guid=guid, cm_name=cm_name)
remove_terms(asset_type: Type[A], terms: List[AtlasGlossaryTerm], guid: Optional[str] = None, qualified_name: Optional[str] = None) -> A

Deprecated - use asset.remove_terms() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def remove_terms(
    self,
    asset_type: Type[A],
    terms: List[AtlasGlossaryTerm],
    guid: Optional[str] = None,
    qualified_name: Optional[str] = None,
) -> A:
    """Deprecated - use asset.remove_terms() instead."""
    warn(
        "This method is deprecated, please use 'asset.remove_terms' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.remove_terms(
        asset_type=asset_type, terms=terms, guid=guid, qualified_name=qualified_name
    )
remove_users_from_group(guid: str, user_ids=List[str]) -> None

Deprecated - use group.remove_users() instead.

Source code in pyatlan/client/atlan.py
def remove_users_from_group(self, guid: str, user_ids=List[str]) -> None:
    """Deprecated - use group.remove_users() instead."""
    warn(
        "This method is deprecated, please use 'group.remove_users' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    self.group.remove_users(guid=guid, user_ids=user_ids)
replace_custom_metadata(guid: str, custom_metadata: CustomMetadataDict)

Deprecated - use asset.replace_custom_metadata() instead.

Source code in pyatlan/client/atlan.py
def replace_custom_metadata(self, guid: str, custom_metadata: CustomMetadataDict):
    """Deprecated - use asset.replace_custom_metadata() instead."""
    warn(
        "This method is deprecated, please use 'asset.replace_custom_metadata' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    self.asset.replace_custom_metadata(guid=guid, custom_metadata=custom_metadata)
replace_terms(asset_type: Type[A], terms: List[AtlasGlossaryTerm], guid: Optional[str] = None, qualified_name: Optional[str] = None) -> A

Deprecated - use asset.replace_terms() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def replace_terms(
    self,
    asset_type: Type[A],
    terms: List[AtlasGlossaryTerm],
    guid: Optional[str] = None,
    qualified_name: Optional[str] = None,
) -> A:
    """Deprecated - use asset.replace_terms() instead."""
    warn(
        "This method is deprecated, please use 'asset.replace_terms' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.replace_terms(
        asset_type=asset_type, terms=terms, guid=guid, qualified_name=qualified_name
    )
restore(asset_type: Type[A], qualified_name: str) -> bool

Deprecated - use asset.restore() instead.

Source code in pyatlan/client/atlan.py
def restore(self, asset_type: Type[A], qualified_name: str) -> bool:
    """Deprecated - use asset.restore() instead."""
    warn(
        "This method is deprecated, please use 'asset.restore' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.restore(asset_type=asset_type, qualified_name=qualified_name)
retrieve_minimal(guid: str, asset_type: Type[A]) -> A

Deprecated - use asset.retrieve_minimal() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def retrieve_minimal(self, guid: str, asset_type: Type[A]) -> A:
    """Deprecated - use asset.retrieve_minimal() instead."""
    warn(
        "This method is deprecated, please use 'asset.retrieve_minimal' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.retrieve_minimal(guid=guid, asset_type=asset_type)
save(entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False, replace_custom_metadata: bool = False, overwrite_custom_metadata: bool = False) -> AssetMutationResponse

Deprecated - use asset.save() instead.

Source code in pyatlan/client/atlan.py
def save(
    self,
    entity: Union[Asset, List[Asset]],
    replace_atlan_tags: bool = False,
    replace_custom_metadata: bool = False,
    overwrite_custom_metadata: bool = False,
) -> AssetMutationResponse:
    """Deprecated - use asset.save() instead."""
    warn(
        "This method is deprecated, please use 'asset.save' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.save(
        entity=entity,
        replace_atlan_tags=replace_atlan_tags,
        replace_custom_metadata=replace_custom_metadata,
        overwrite_custom_metadata=overwrite_custom_metadata,
    )
save_merging_cm(entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False) -> AssetMutationResponse

Deprecated - use asset.save_merging_cm() instead.

Source code in pyatlan/client/atlan.py
def save_merging_cm(
    self, entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False
) -> AssetMutationResponse:
    """Deprecated - use asset.save_merging_cm() instead."""
    warn(
        "This method is deprecated, please use 'asset.save_merging_cm' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.save_merging_cm(
        entity=entity, replace_atlan_tags=replace_atlan_tags
    )
save_replacing_cm(entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False) -> AssetMutationResponse

Deprecated - use asset.save_replacing_cm() instead.

Source code in pyatlan/client/atlan.py
def save_replacing_cm(
    self, entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False
) -> AssetMutationResponse:
    """Deprecated - use asset.save_replacing_cm() instead."""
    warn(
        "This method is deprecated, please use 'asset.save_replacing_cm' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.save_replacing_cm(
        entity=entity, replace_atlan_tags=replace_atlan_tags
    )
search(criteria: IndexSearchRequest) -> IndexSearchResults

Deprecated - use asset.search() instead.

Source code in pyatlan/client/atlan.py
def search(self, criteria: IndexSearchRequest) -> IndexSearchResults:
    """Deprecated - use asset.search() instead."""
    warn(
        "This method is deprecated, please use 'asset.search' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.search(criteria=criteria)
update_announcement(asset_type: Type[A], qualified_name: str, name: str, announcement: Announcement) -> Optional[A]

Deprecated - use asset.update_announcement() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def update_announcement(
    self,
    asset_type: Type[A],
    qualified_name: str,
    name: str,
    announcement: Announcement,
) -> Optional[A]:
    """Deprecated - use asset.update_announcement() instead."""
    warn(
        "This method is deprecated, please use 'asset.update_announcement' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.update_announcement(
        asset_type=asset_type,
        qualified_name=qualified_name,
        name=name,
        announcement=announcement,
    )
update_api_token(guid: str, display_name: str, description: str = '', personas: Optional[Set[str]] = None) -> ApiToken

Deprecated - use token.update() instead.

Source code in pyatlan/client/atlan.py
def update_api_token(
    self,
    guid: str,
    display_name: str,
    description: str = "",
    personas: Optional[Set[str]] = None,
) -> ApiToken:
    """Deprecated - use token.update() instead."""
    warn(
        "This method is deprecated, please use 'token.update' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.token.update(
        guid=guid,
        display_name=display_name,
        description=description,
        personas=personas,
    )
update_certificate(asset_type: Type[A], qualified_name: str, name: str, certificate_status: CertificateStatus, message: Optional[str] = None) -> Optional[A]

Deprecated - use asset.update_certificate() instead.

Source code in pyatlan/client/atlan.py
@validate_arguments
def update_certificate(
    self,
    asset_type: Type[A],
    qualified_name: str,
    name: str,
    certificate_status: CertificateStatus,
    message: Optional[str] = None,
) -> Optional[A]:
    """Deprecated - use asset.update_certificate() instead."""
    warn(
        "This method is deprecated, please use 'asset.update_certificate' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.update_certificate(
        asset_type=asset_type,
        qualified_name=qualified_name,
        name=name,
        certificate_status=certificate_status,
        message=message,
    )
update_custom_metadata_attributes(guid: str, custom_metadata: CustomMetadataDict)

Deprecated - use asset.update_custom_metadata_attributes() instead.

Source code in pyatlan/client/atlan.py
def update_custom_metadata_attributes(
    self, guid: str, custom_metadata: CustomMetadataDict
):
    """Deprecated - use asset.update_custom_metadata_attributes() instead."""
    warn(
        "This method is deprecated, please use 'asset.update_custom_metadata_attributes' instead, which offers "
        "identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    self.asset.update_custom_metadata_attributes(
        guid=guid, custom_metadata=custom_metadata
    )
update_group(group: AtlanGroup) -> None

Deprecated - use group.update() instead.

Source code in pyatlan/client/atlan.py
def update_group(
    self,
    group: AtlanGroup,
) -> None:
    """Deprecated - use group.update() instead."""
    warn(
        "This method is deprecated, please use 'group.update' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.group.update(group=group)
update_merging_cm(entity: Asset, replace_atlan_tags: bool = False) -> AssetMutationResponse

Deprecated - use asset.update_merging_cm() instead.

Source code in pyatlan/client/atlan.py
def update_merging_cm(
    self, entity: Asset, replace_atlan_tags: bool = False
) -> AssetMutationResponse:
    """Deprecated - use asset.update_merging_cm() instead."""
    warn(
        "This method is deprecated, please use 'asset.update_merging_cm' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.update_merging_cm(
        entity=entity, replace_atlan_tags=replace_atlan_tags
    )
update_replacing_cm(entity: Asset, replace_atlan_tags: bool = False) -> AssetMutationResponse

Deprecated - use asset.update_replacing_cm() instead.

Source code in pyatlan/client/atlan.py
def update_replacing_cm(
    self, entity: Asset, replace_atlan_tags: bool = False
) -> AssetMutationResponse:
    """Deprecated - use asset.update_replacing_cm() instead."""
    warn(
        "This method is deprecated, please use 'asset.update_replacing_cm' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.update_replacing_cm(
        entity=entity, replace_atlan_tags=replace_atlan_tags
    )
update_typedef(typedef: TypeDef) -> TypeDefResponse

Deprecated - use typedef.update() instead.

Source code in pyatlan/client/atlan.py
def update_typedef(self, typedef: TypeDef) -> TypeDefResponse:
    """Deprecated - use typedef.update() instead."""
    warn(
        "This method is deprecated, please use 'typedef.update' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.typedef.update(typedef=typedef)
update_user(guid: str, user: AtlanUser) -> UserMinimalResponse

Deprecated - use user.update() instead.

Source code in pyatlan/client/atlan.py
def update_user(
    self,
    guid: str,
    user: AtlanUser,
) -> UserMinimalResponse:
    """Deprecated - use user.update() instead."""
    warn(
        "This method is deprecated, please use 'user.update' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.user.update(guid=guid, user=user)
upload_image(file, filename: str) -> AtlanImage

Uploads an image from the provided local file.

:param file: local file to upload :param filename: name of the file to be uploaded :returns: details of the uploaded image :raises AtlanError: on any API communication issue

Source code in pyatlan/client/atlan.py
@validate_arguments
def upload_image(self, file, filename: str) -> AtlanImage:
    """
    Uploads an image from the provided local file.

    :param file: local file to upload
    :param filename: name of the file to be uploaded
    :returns: details of the uploaded image
    :raises AtlanError: on any API communication issue
    """
    raw_json = self._upload_file(UPLOAD_IMAGE, file=file, filename=filename)
    return AtlanImage(**raw_json)
upsert(entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False, replace_custom_metadata: bool = False, overwrite_custom_metadata: bool = False) -> AssetMutationResponse

Deprecated - use asset.save() instead.

Source code in pyatlan/client/atlan.py
def upsert(
    self,
    entity: Union[Asset, List[Asset]],
    replace_atlan_tags: bool = False,
    replace_custom_metadata: bool = False,
    overwrite_custom_metadata: bool = False,
) -> AssetMutationResponse:
    """Deprecated - use asset.save() instead."""
    warn(
        "This method is deprecated, please use 'asset.save' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.save(
        entity=entity,
        replace_atlan_tags=replace_atlan_tags,
        replace_custom_metadata=replace_custom_metadata,
        overwrite_custom_metadata=overwrite_custom_metadata,
    )
upsert_merging_cm(entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False) -> AssetMutationResponse

Deprecated - use asset.save_merging_cm() instead.

Source code in pyatlan/client/atlan.py
def upsert_merging_cm(
    self, entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False
) -> AssetMutationResponse:
    """Deprecated - use asset.save_merging_cm() instead."""
    warn(
        "This method is deprecated, please use 'asset.save_merging_cm' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.save_merging_cm(
        entity=entity, replace_atlan_tags=replace_atlan_tags
    )
upsert_replacing_cm(entity: Union[Asset, List[Asset]], replace_atlan_tagss: bool = False) -> AssetMutationResponse

Deprecated - use asset.save_replacing_cm() instead.

Source code in pyatlan/client/atlan.py
def upsert_replacing_cm(
    self, entity: Union[Asset, List[Asset]], replace_atlan_tagss: bool = False
) -> AssetMutationResponse:
    """Deprecated - use asset.save_replacing_cm() instead."""
    warn(
        "This method is deprecated, please use 'asset.save_replacing_cm' instead, which offers identical "
        "functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.asset.save_replacing_cm(
        entity=entity, replace_atlan_tags=replace_atlan_tagss
    )

Functions

client_connection(client: AtlanClient, base_url: Optional[HttpUrl] = None, api_key: Optional[str] = None, connect_timeout: float = 30.0, read_timeout: float = 120.0, retry: Retry = DEFAULT_RETRY) -> Generator[AtlanClient, None, None]

Creates a new client created with the given base_url and/api_key.

:param base_url: the base_url to be used for the new connection. If not specified the current value will be used :param api_key: the api_key to be used for the new connection. If not specified the current value will be used

Source code in pyatlan/client/atlan.py
@contextlib.contextmanager
def client_connection(
    client: AtlanClient,
    base_url: Optional[HttpUrl] = None,
    api_key: Optional[str] = None,
    connect_timeout: float = 30.0,
    read_timeout: float = 120.0,
    retry: Retry = DEFAULT_RETRY,
) -> Generator[AtlanClient, None, None]:
    """
    Creates a new client created with the given base_url and/api_key.

    :param base_url: the base_url to be used for the new connection.
    If not specified the current value will be used
    :param api_key: the api_key to be used for the new connection.
    If not specified the current value will be used
    """
    tmp_client = AtlanClient(
        base_url=base_url or client.base_url,
        api_key=api_key or client.api_key,
        connect_timeout=connect_timeout,
        read_timeout=read_timeout,
        retry=retry,
    )
    yield tmp_client

get_adapter() -> logging.LoggerAdapter

This function creates a LoggerAdapter that will provide the requestid from the ContextVar request_id_var :returns: the LogAdapter

Source code in pyatlan/client/atlan.py
def get_adapter() -> logging.LoggerAdapter:
    """
    This function creates a LoggerAdapter that will provide the requestid from the ContextVar request_id_var
    :returns: the LogAdapter
    """
    logger = logging.getLogger(__name__)
    logger.addFilter(AuthorizationFilter())
    return RequestIdAdapter(logger=logger, contextvar=request_id_var)

Asset Client

pyatlan.client.asset

Classes

AssetClient(client: ApiCaller)

This class can be used to retrieve information about assets. This class does not need to be instantiated directly but can be obtained through the asset property of AtlanClient.

Source code in pyatlan/client/asset.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
add_atlan_tags(asset_type: Type[A], qualified_name: str, atlan_tag_names: List[str], propagate: bool = False, remove_propagation_on_delete: bool = True, restrict_lineage_propagation: bool = False, restrict_propagation_through_hierarchy: bool = False) -> A

Add one or more Atlan tags to the provided asset.

:param asset_type: type of asset to which to add the Atlan tags :param qualified_name: qualified_name of the asset to which to add the Atlan tags :param atlan_tag_names: human-readable names of the Atlan tags to add to the asset :param propagate: whether to propagate the Atlan tag (True) or not (False) :param remove_propagation_on_delete: whether to remove the propagated Atlan tags when the Atlan tag is removed from this asset (True) or not (False) :param restrict_lineage_propagation: whether to avoid propagating through lineage (True) or do propagate through lineage (False) :param restrict_propagation_through_hierarchy: whether to prevent this Atlan tag from propagating through hierarchy (True) or allow it to propagate through hierarchy (False) :returns: the asset that was updated (note that it will NOT contain details of the added Atlan tags) :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
@validate_arguments
def add_atlan_tags(
    self,
    asset_type: Type[A],
    qualified_name: str,
    atlan_tag_names: List[str],
    propagate: bool = False,
    remove_propagation_on_delete: bool = True,
    restrict_lineage_propagation: bool = False,
    restrict_propagation_through_hierarchy: bool = False,
) -> A:
    """
    Add one or more Atlan tags to the provided asset.

    :param asset_type: type of asset to which to add the Atlan tags
    :param qualified_name: qualified_name of the asset to which to add the Atlan tags
    :param atlan_tag_names: human-readable names of the Atlan tags to add to the asset
    :param propagate: whether to propagate the Atlan tag (True) or not (False)
    :param remove_propagation_on_delete: whether to remove the propagated Atlan tags
    when the Atlan tag is removed from this asset (True) or not (False)
    :param restrict_lineage_propagation: whether to avoid propagating
    through lineage (True) or do propagate through lineage (False)
    :param restrict_propagation_through_hierarchy: whether to prevent this Atlan tag from
    propagating through hierarchy (True) or allow it to propagate through hierarchy (False)
    :returns: the asset that was updated (note that it will NOT contain details of the added Atlan tags)
    :raises AtlanError: on any API communication issue
    """
    return self._modify_tags(
        asset_type=asset_type,
        qualified_name=qualified_name,
        atlan_tag_names=atlan_tag_names,
        propagate=propagate,
        remove_propagation_on_delete=remove_propagation_on_delete,
        restrict_lineage_propagation=restrict_lineage_propagation,
        restrict_propagation_through_hierarchy=restrict_propagation_through_hierarchy,
        modification_type="add",
        save_parameters={
            "replace_atlan_tags": False,
            "append_atlan_tags": True,
        },
    )
add_dq_rule_schedule(asset_type: Type[A], asset_name: str, asset_qualified_name: str, schedule_crontab: str, schedule_time_zone: str) -> AssetMutationResponse

Add a data quality rule schedule to an asset.

:param asset_type: the type of asset to update (e.g., Table) :param asset_name: the name of the asset to update :param asset_qualified_name: the qualified name of the asset to update :param schedule_crontab: cron expression string defining the schedule for the DQ rules, e.g: 5 4 * * *. :param schedule_time_zone: timezone for the schedule, e.g: Europe/Paris. :returns: the result of the save :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
@validate_arguments
def add_dq_rule_schedule(
    self,
    asset_type: Type[A],
    asset_name: str,
    asset_qualified_name: str,
    schedule_crontab: str,
    schedule_time_zone: str,
) -> AssetMutationResponse:
    """
    Add a data quality rule schedule to an asset.

    :param asset_type: the type of asset to update (e.g., Table)
    :param asset_name: the name of the asset to update
    :param asset_qualified_name: the qualified name of the asset to update
    :param schedule_crontab: cron expression string defining the schedule for the DQ rules, e.g: `5 4 * * *`.
    :param schedule_time_zone: timezone for the schedule, e.g: `Europe/Paris`.
    :returns: the result of the save
    :raises AtlanError: on any API communication issue
    """
    updated_asset = asset_type.updater(
        qualified_name=asset_qualified_name, name=asset_name
    )
    updated_asset.asset_d_q_schedule_time_zone = schedule_time_zone
    updated_asset.asset_d_q_schedule_crontab = schedule_crontab
    updated_asset.asset_d_q_schedule_type = DataQualityScheduleType.CRON
    response = self.save(updated_asset)
    return response
append_terms(asset_type: Type[A], terms: List[AtlasGlossaryTerm], guid: Optional[str] = None, qualified_name: Optional[str] = None) -> A

Link additional terms to an asset, without replacing existing terms linked to the asset. Note: this operation must make two API calls — one to retrieve the asset's existing terms, and a second to append the new terms. (At least one of the GUID or qualified_name must be supplied, but both are not necessary.)

:param asset_type: type of the asset :param terms: the list of terms to append to the asset :param guid: unique identifier (GUID) of the asset to which to link the terms :param qualified_name: the qualified_name of the asset to which to link the terms :returns: the asset that was updated (note that it will NOT contain details of the appended terms)

Source code in pyatlan/client/asset.py
@validate_arguments
def append_terms(
    self,
    asset_type: Type[A],
    terms: List[AtlasGlossaryTerm],
    guid: Optional[str] = None,
    qualified_name: Optional[str] = None,
) -> A:
    """
    Link additional terms to an asset, without replacing existing terms linked to the asset.
    Note: this operation must make two API calls — one to retrieve the asset's existing terms,
    and a second to append the new terms. (At least one of the GUID or qualified_name must be
    supplied, but both are not necessary.)

    :param asset_type: type of the asset
    :param terms: the list of terms to append to the asset
    :param guid: unique identifier (GUID) of the asset to which to link the terms
    :param qualified_name: the qualified_name of the asset to which to link the terms
    :returns: the asset that was updated (note that it will NOT contain details of the appended terms)
    """
    return self._manage_terms(
        asset_type=asset_type,
        terms=terms,
        save_semantic=SaveSemantic.APPEND,
        guid=guid,
        qualified_name=qualified_name,
    )
delete_by_guid(guid: Union[str, List[str]]) -> AssetMutationResponse

Soft-deletes (archives) one or more assets by their unique identifier (GUID). This operation can be reversed by updating the asset and its status to ACTIVE.

:param guid: unique identifier(s) (GUIDs) of one or more assets to soft-delete :returns: details of the soft-deleted asset(s) :raises AtlanError: on any API communication issue :raises ApiError: if the retry limit is overrun waiting for confirmation the asset is deleted :raises InvalidRequestError: if an asset does not support archiving

Source code in pyatlan/client/asset.py
@validate_arguments
def delete_by_guid(self, guid: Union[str, List[str]]) -> AssetMutationResponse:
    """
    Soft-deletes (archives) one or more assets by their unique identifier (GUID).
    This operation can be reversed by updating the asset and its status to ACTIVE.

    :param guid: unique identifier(s) (GUIDs) of one or more assets to soft-delete
    :returns: details of the soft-deleted asset(s)
    :raises AtlanError: on any API communication issue
    :raises ApiError: if the retry limit is overrun waiting for confirmation the asset is deleted
    :raises InvalidRequestError: if an asset does not support archiving
    """
    guids = DeleteByGuid.prepare_request(guid)

    # Validate each asset can be archived
    assets = []
    for single_guid in guids:
        asset = self.retrieve_minimal(guid=single_guid, asset_type=Asset)
        assets.append(asset)
    DeleteByGuid.validate_assets_can_be_archived(assets)

    # Perform the deletion
    query_params = DeleteByGuid.prepare_delete_request(guids)
    raw_json = self._client._call_api(
        DELETE_ENTITIES_BY_GUIDS, query_params=query_params
    )
    response = DeleteByGuid.process_response(raw_json)

    # Wait for deletion confirmation
    for asset in DeleteByGuid.get_deleted_assets(response):
        try:
            self._wait_till_deleted(asset)
        except RetryError as err:
            raise ErrorCode.RETRY_OVERRUN.exception_with_parameters() from err
    return response
find_category_by_name(name: constr(strip_whitespace=True, min_length=1, strict=True), glossary_name: constr(strip_whitespace=True, min_length=1, strict=True), attributes: Optional[List[StrictStr]] = None) -> List[AtlasGlossaryCategory]

Find a category by its human-readable name. Note: this operation must run two separate queries to first resolve the qualified_name of the glossary, so will be somewhat slower. If you already have the qualified_name of the glossary, use find_category_by_name_fast instead. Note that categories are not unique by name, so there may be multiple results.

:param name: of the category :param glossary_name: human-readable name of the glossary in which the category exists :param attributes: (optional) collection of attributes to retrieve for the category :returns: the category, if found :raises NotFoundError: if no category with the provided name exists in the glossary

Source code in pyatlan/client/asset.py
@validate_arguments
def find_category_by_name(
    self,
    name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    glossary_name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    attributes: Optional[List[StrictStr]] = None,
) -> List[AtlasGlossaryCategory]:
    """
    Find a category by its human-readable name.
    Note: this operation must run two separate queries to first resolve the qualified_name of the
    glossary, so will be somewhat slower. If you already have the qualified_name of the glossary, use
    find_category_by_name_fast instead. Note that categories are not unique by name, so there may be
    multiple results.

    :param name: of the category
    :param glossary_name: human-readable name of the glossary in which the category exists
    :param attributes: (optional) collection of attributes to retrieve for the category
    :returns: the category, if found
    :raises NotFoundError: if no category with the provided name exists in the glossary
    """
    # First find the glossary by name
    glossary = self.find_glossary_by_name(name=glossary_name)

    # Then find the category in that glossary using the fast method
    return self.find_category_fast_by_name(
        name=name,
        glossary_qualified_name=glossary.qualified_name,
        attributes=attributes,
    )
find_category_fast_by_name(name: constr(strip_whitespace=True, min_length=1, strict=True), glossary_qualified_name: constr(strip_whitespace=True, min_length=1, strict=True), attributes: Optional[List[StrictStr]] = None) -> List[AtlasGlossaryCategory]

Find a category by its human-readable name. Note: this operation requires first knowing the qualified_name of the glossary in which the category exists. Note that categories are not unique by name, so there may be multiple results.

:param name: of the category :param glossary_qualified_name: qualified_name of the glossary in which the category exists :param attributes: (optional) collection of attributes to retrieve for the category :returns: the category, if found :raises NotFoundError: if no category with the provided name exists in the glossary

Source code in pyatlan/client/asset.py
@validate_arguments
def find_category_fast_by_name(
    self,
    name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    glossary_qualified_name: constr(  # type: ignore
        strip_whitespace=True, min_length=1, strict=True
    ),
    attributes: Optional[List[StrictStr]] = None,
) -> List[AtlasGlossaryCategory]:
    """
    Find a category by its human-readable name.
    Note: this operation requires first knowing the qualified_name of the glossary in which the
    category exists. Note that categories are not unique by name, so there may be
    multiple results.

    :param name: of the category
    :param glossary_qualified_name: qualified_name of the glossary in which the category exists
    :param attributes: (optional) collection of attributes to retrieve for the category
    :returns: the category, if found
    :raises NotFoundError: if no category with the provided name exists in the glossary
    """
    if attributes is None:
        attributes = []

    # Build query using shared logic
    query = FindCategoryFastByName.build_query(name, glossary_qualified_name)

    # Execute search using shared logic
    return self._search_for_asset_with_name(
        query=query,
        name=name,
        asset_type=AtlasGlossaryCategory,
        attributes=attributes,
        allow_multiple=True,
    )
find_connections_by_name(name: str, connector_type: AtlanConnectorType, attributes: Optional[List[str]] = None) -> List[Connection]

Find a connection by its human-readable name and type.

:param name: of the connection :param connector_type: of the connection :param attributes: (optional) collection of attributes to retrieve for the connection :returns: all connections with that name and type, if found :raises NotFoundError: if the connection does not exist

Source code in pyatlan/client/asset.py
@validate_arguments
def find_connections_by_name(
    self,
    name: str,
    connector_type: AtlanConnectorType,
    attributes: Optional[List[str]] = None,
) -> List[Connection]:
    """
    Find a connection by its human-readable name and type.

    :param name: of the connection
    :param connector_type: of the connection
    :param attributes: (optional) collection of attributes to retrieve for the connection
    :returns: all connections with that name and type, if found
    :raises NotFoundError: if the connection does not exist
    """
    if attributes is None:
        attributes = []

    # Build query using shared logic
    query = FindConnectionsByName.build_query(name, connector_type)

    # Execute search using shared logic
    return self._search_for_asset_with_name(
        query=query,
        name=name,
        asset_type=Connection,
        attributes=attributes,
        allow_multiple=True,
    )
find_domain_by_name(name: constr(strip_whitespace=True, min_length=1, strict=True), attributes: Optional[List[StrictStr]] = None) -> DataDomain

Find a data domain by its human-readable name.

:param name: of the domain :param attributes: (optional) collection of attributes to retrieve for the domain :returns: the domain, if found :raises NotFoundError: if no domain with the provided name exists

Source code in pyatlan/client/asset.py
@validate_arguments
def find_domain_by_name(
    self,
    name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    attributes: Optional[List[StrictStr]] = None,
) -> DataDomain:
    """
    Find a data domain by its human-readable name.

    :param name: of the domain
    :param attributes: (optional) collection of attributes to retrieve for the domain
    :returns: the domain, if found
    :raises NotFoundError: if no domain with the provided name exists
    """
    attributes = attributes or []

    # Build query using shared logic
    query = FindDomainByName.build_query(name)

    # Execute search using shared logic
    return self._search_for_asset_with_name(
        query=query, name=name, asset_type=DataDomain, attributes=attributes
    )[0]
find_glossary_by_name(name: constr(strip_whitespace=True, min_length=1, strict=True), attributes: Optional[List[StrictStr]] = None) -> AtlasGlossary

Find a glossary by its human-readable name.

:param name: of the glossary :param attributes: (optional) collection of attributes to retrieve for the glossary :returns: the glossary, if found :raises NotFoundError: if no glossary with the provided name exists

Source code in pyatlan/client/asset.py
@validate_arguments
def find_glossary_by_name(
    self,
    name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    attributes: Optional[List[StrictStr]] = None,
) -> AtlasGlossary:
    """
    Find a glossary by its human-readable name.

    :param name: of the glossary
    :param attributes: (optional) collection of attributes to retrieve for the glossary
    :returns: the glossary, if found
    :raises NotFoundError: if no glossary with the provided name exists
    """
    if attributes is None:
        attributes = []

    # Build query using shared logic
    query = FindGlossaryByName.build_query(name)

    # Execute search using shared logic
    return self._search_for_asset_with_name(
        query=query, name=name, asset_type=AtlasGlossary, attributes=attributes
    )[0]
find_personas_by_name(name: str, attributes: Optional[List[str]] = None) -> List[Persona]

Find a persona by its human-readable name.

:param name: of the persona :param attributes: (optional) collection of attributes to retrieve for the persona :returns: all personas with that name, if found :raises NotFoundError: if no persona with the provided name exists

Source code in pyatlan/client/asset.py
@validate_arguments
def find_personas_by_name(
    self,
    name: str,
    attributes: Optional[List[str]] = None,
) -> List[Persona]:
    """
    Find a persona by its human-readable name.

    :param name: of the persona
    :param attributes: (optional) collection of attributes to retrieve for the persona
    :returns: all personas with that name, if found
    :raises NotFoundError: if no persona with the provided name exists
    """
    search_request = FindPersonasByName.prepare_request(name, attributes)
    search_results = self.search(search_request)
    return FindPersonasByName.process_response(
        search_results, name, allow_multiple=True
    )
find_product_by_name(name: constr(strip_whitespace=True, min_length=1, strict=True), attributes: Optional[List[StrictStr]] = None) -> DataProduct

Find a data product by its human-readable name.

:param name: of the product :param attributes: (optional) collection of attributes to retrieve for the product :returns: the product, if found :raises NotFoundError: if no product with the provided name exists

Source code in pyatlan/client/asset.py
@validate_arguments
def find_product_by_name(
    self,
    name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    attributes: Optional[List[StrictStr]] = None,
) -> DataProduct:
    """
    Find a data product by its human-readable name.

    :param name: of the product
    :param attributes: (optional) collection of attributes to retrieve for the product
    :returns: the product, if found
    :raises NotFoundError: if no product with the provided name exists
    """
    attributes = attributes or []

    # Build query using shared logic
    query = FindProductByName.build_query(name)

    # Execute search using shared logic
    return self._search_for_asset_with_name(
        query=query, name=name, asset_type=DataProduct, attributes=attributes
    )[0]
find_purposes_by_name(name: str, attributes: Optional[List[str]] = None) -> List[Purpose]

Find a purpose by its human-readable name.

:param name: of the purpose :param attributes: (optional) collection of attributes to retrieve for the purpose :returns: all purposes with that name, if found :raises NotFoundError: if no purpose with the provided name exists

Source code in pyatlan/client/asset.py
@validate_arguments
def find_purposes_by_name(
    self,
    name: str,
    attributes: Optional[List[str]] = None,
) -> List[Purpose]:
    """
    Find a purpose by its human-readable name.

    :param name: of the purpose
    :param attributes: (optional) collection of attributes to retrieve for the purpose
    :returns: all purposes with that name, if found
    :raises NotFoundError: if no purpose with the provided name exists
    """
    search_request = FindPurposesByName.prepare_request(name, attributes)
    search_results = self.search(search_request)
    return FindPurposesByName.process_response(
        search_results, name, allow_multiple=True
    )
find_term_by_name(name: constr(strip_whitespace=True, min_length=1, strict=True), glossary_name: constr(strip_whitespace=True, min_length=1, strict=True), attributes: Optional[List[StrictStr]] = None) -> AtlasGlossaryTerm

Find a term by its human-readable name. Note: this operation must run two separate queries to first resolve the qualified_name of the glossary, so will be somewhat slower. If you already have the qualified_name of the glossary, use find_term_by_name_fast instead.

:param name: of the term :param glossary_name: human-readable name of the glossary in which the term exists :param attributes: (optional) collection of attributes to retrieve for the term :returns: the term, if found :raises NotFoundError: if no term with the provided name exists in the glossary

Source code in pyatlan/client/asset.py
@validate_arguments
def find_term_by_name(
    self,
    name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    glossary_name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    attributes: Optional[List[StrictStr]] = None,
) -> AtlasGlossaryTerm:
    """
    Find a term by its human-readable name.
    Note: this operation must run two separate queries to first resolve the qualified_name of the
    glossary, so will be somewhat slower. If you already have the qualified_name of the glossary, use
    find_term_by_name_fast instead.

    :param name: of the term
    :param glossary_name: human-readable name of the glossary in which the term exists
    :param attributes: (optional) collection of attributes to retrieve for the term
    :returns: the term, if found
    :raises NotFoundError: if no term with the provided name exists in the glossary
    """
    # First find the glossary by name
    glossary = self.find_glossary_by_name(name=glossary_name)

    # Then find the term in that glossary using the fast method
    return self.find_term_fast_by_name(
        name=name,
        glossary_qualified_name=glossary.qualified_name,
        attributes=attributes,
    )
find_term_fast_by_name(name: constr(strip_whitespace=True, min_length=1, strict=True), glossary_qualified_name: constr(strip_whitespace=True, min_length=1, strict=True), attributes: Optional[List[StrictStr]] = None) -> AtlasGlossaryTerm

Find a term by its human-readable name. Note: this operation requires first knowing the qualified_name of the glossary in which the term exists.

:param name: of the term :param glossary_qualified_name: qualified_name of the glossary in which the term exists :param attributes: (optional) collection of attributes to retrieve for the term :returns: the term, if found :raises NotFoundError: if no term with the provided name exists in the glossary

Source code in pyatlan/client/asset.py
@validate_arguments
def find_term_fast_by_name(
    self,
    name: constr(strip_whitespace=True, min_length=1, strict=True),  # type: ignore
    glossary_qualified_name: constr(  # type: ignore
        strip_whitespace=True, min_length=1, strict=True
    ),
    attributes: Optional[List[StrictStr]] = None,
) -> AtlasGlossaryTerm:
    """
    Find a term by its human-readable name.
    Note: this operation requires first knowing the qualified_name of the glossary in which the
    term exists.

    :param name: of the term
    :param glossary_qualified_name: qualified_name of the glossary in which the term exists
    :param attributes: (optional) collection of attributes to retrieve for the term
    :returns: the term, if found
    :raises NotFoundError: if no term with the provided name exists in the glossary
    """
    if attributes is None:
        attributes = []

    # Build query using shared logic
    query = FindTermFastByName.build_query(name, glossary_qualified_name)

    # Execute search using shared logic
    return self._search_for_asset_with_name(
        query=query, name=name, asset_type=AtlasGlossaryTerm, attributes=attributes
    )[0]
get_by_guid(guid: str, asset_type: Type[A] = Asset, min_ext_info: bool = False, ignore_relationships: bool = True, attributes: Optional[Union[List[str], List[AtlanField]]] = None, related_attributes: Optional[Union[List[str], List[AtlanField]]] = None) -> A

Retrieves an asset by its GUID.

:param guid: unique identifier (GUID) of the asset to retrieve :param asset_type: type of asset to be retrieved, defaults to Asset :param min_ext_info: whether to minimize extra info (True) or not (False) :param ignore_relationships: whether to include relationships (False) or exclude them (True) :param attributes: a specific list of attributes to retrieve for the asset :param related_attributes: a specific list of relationships attributes to retrieve for the asset :returns: the requested asset :raises NotFoundError: if the asset does not exist, or is not of the type requested :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def get_by_guid(
    self,
    guid: str,
    asset_type: Type[A] = Asset,  # type: ignore[assignment]
    min_ext_info: bool = False,
    ignore_relationships: bool = True,
    attributes: Optional[Union[List[str], List[AtlanField]]] = None,
    related_attributes: Optional[Union[List[str], List[AtlanField]]] = None,
) -> A:
    """
    Retrieves an asset by its GUID.

    :param guid: unique identifier (GUID) of the asset to retrieve
    :param asset_type: type of asset to be retrieved, defaults to `Asset`
    :param min_ext_info: whether to minimize extra info (True) or not (False)
    :param ignore_relationships: whether to include relationships (False) or exclude them (True)
    :param attributes: a specific list of attributes to retrieve for the asset
    :param related_attributes: a specific list of relationships attributes to retrieve for the asset
    :returns: the requested asset
    :raises NotFoundError: if the asset does not exist, or is not of the type requested
    :raises AtlanError: on any API communication issue
    """

    # Normalize field inputs
    normalized_attributes = GetByQualifiedName.normalize_search_fields(attributes)
    normalized_related_attributes = GetByQualifiedName.normalize_search_fields(
        related_attributes
    )

    # Use FluentSearch if specific attributes are requested
    if (normalized_attributes and len(normalized_attributes)) or (
        normalized_related_attributes and len(normalized_related_attributes)
    ):
        search = GetByGuid.prepare_fluent_search_request(
            guid, asset_type, normalized_attributes, normalized_related_attributes
        )
        results = search.execute(client=self._client)  # type: ignore[arg-type]
        return GetByGuid.process_fluent_search_response(results, guid, asset_type)

    # Use direct API call for simple requests
    endpoint_path, query_params = GetByGuid.prepare_direct_api_request(
        guid, min_ext_info, ignore_relationships
    )
    raw_json = self._client._call_api(endpoint_path, query_params)
    return GetByGuid.process_direct_api_response(raw_json, guid, asset_type)
get_by_qualified_name(qualified_name: str, asset_type: Type[A], min_ext_info: bool = False, ignore_relationships: bool = True, attributes: Optional[Union[List[str], List[AtlanField]]] = None, related_attributes: Optional[Union[List[str], List[AtlanField]]] = None) -> A

Retrieves an asset by its qualified_name.

:param qualified_name: qualified_name of the asset to be retrieved :param asset_type: type of asset to be retrieved ( must be the actual asset type not a super type) :param min_ext_info: whether to minimize extra info (True) or not (False) :param ignore_relationships: whether to include relationships (False) or exclude them (True) :param attributes: a specific list of attributes to retrieve for the asset :param related_attributes: a specific list of relationships attributes to retrieve for the asset :returns: the requested asset :raises NotFoundError: if the asset does not exist :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def get_by_qualified_name(
    self,
    qualified_name: str,
    asset_type: Type[A],
    min_ext_info: bool = False,
    ignore_relationships: bool = True,
    attributes: Optional[Union[List[str], List[AtlanField]]] = None,
    related_attributes: Optional[Union[List[str], List[AtlanField]]] = None,
) -> A:
    """
    Retrieves an asset by its qualified_name.

    :param qualified_name: qualified_name of the asset to be retrieved
    :param asset_type: type of asset to be retrieved ( must be the actual asset type not a super type)
    :param min_ext_info: whether to minimize extra info (True) or not (False)
    :param ignore_relationships: whether to include relationships (False) or exclude them (True)
    :param attributes: a specific list of attributes to retrieve for the asset
    :param related_attributes: a specific list of relationships attributes to retrieve for the asset
    :returns: the requested asset
    :raises NotFoundError: if the asset does not exist
    :raises AtlanError: on any API communication issue
    """

    # Normalize field inputs
    normalized_attributes = GetByQualifiedName.normalize_search_fields(attributes)
    normalized_related_attributes = GetByQualifiedName.normalize_search_fields(
        related_attributes
    )

    # Use FluentSearch if specific attributes are requested
    if (normalized_attributes and len(normalized_attributes)) or (
        normalized_related_attributes and len(normalized_related_attributes)
    ):
        search = GetByQualifiedName.prepare_fluent_search_request(
            qualified_name,
            asset_type,
            normalized_attributes,
            normalized_related_attributes,
        )
        results = search.execute(client=self._client)  # type: ignore[arg-type]
        return GetByQualifiedName.process_fluent_search_response(
            results, qualified_name, asset_type
        )

    # Use direct API call for simple requests
    endpoint_path, query_params = GetByQualifiedName.prepare_direct_api_request(
        qualified_name, asset_type, min_ext_info, ignore_relationships
    )
    raw_json = self._client._call_api(endpoint_path, query_params)
    return GetByQualifiedName.process_direct_api_response(
        raw_json, qualified_name, asset_type
    )
get_hierarchy(glossary: AtlasGlossary, attributes: Optional[List[Union[AtlanField, str]]] = None, related_attributes: Optional[List[Union[AtlanField, str]]] = None) -> CategoryHierarchy

Retrieve category hierarchy in this Glossary, in a traversable form. You can traverse in either depth_first or breadth_first order. Both return an ordered list of Glossary objects. Note: by default, each category will have a minimal set of information (name, GUID, qualifiedName). If you want additional details about each category, specify the attributes you want in the attributes parameter of this method.

:param glossary: the glossary to retrieve the category hierarchy for :param attributes: attributes to retrieve for each category in the hierarchy :param related_attributes: attributes to retrieve for each related asset in the hierarchy :returns: a traversable category hierarchy

Source code in pyatlan/client/asset.py
def get_hierarchy(
    self,
    glossary: AtlasGlossary,
    attributes: Optional[List[Union[AtlanField, str]]] = None,
    related_attributes: Optional[List[Union[AtlanField, str]]] = None,
) -> CategoryHierarchy:
    """
    Retrieve category hierarchy in this Glossary, in a traversable form. You can traverse in either depth_first
    or breadth_first order. Both return an ordered list of Glossary objects.
    Note: by default, each category will have a minimal set of information (name, GUID, qualifiedName). If you
    want additional details about each category, specify the attributes you want in the attributes parameter
    of this method.

    :param glossary: the glossary to retrieve the category hierarchy for
    :param attributes: attributes to retrieve for each category in the hierarchy
    :param related_attributes: attributes to retrieve for each related asset in the hierarchy
    :returns: a traversable category hierarchy
    """

    # Validate glossary using shared logic
    GetHierarchy.validate_glossary(glossary)

    # Prepare search request using shared logic
    request = GetHierarchy.prepare_search_request(
        glossary, attributes, related_attributes
    )

    # Execute search
    response = self.search(request)

    # Process results using shared logic
    return GetHierarchy.process_search_results(response, glossary)
get_lineage_list(lineage_request: LineageListRequest) -> LineageListResults

Retrieve lineage using the higher-performance "list" API.

:param lineage_request: detailing the lineage query, parameters, and so on to run :returns: the results of the lineage request :raises InvalidRequestError: if the requested lineage direction is 'BOTH' (unsupported for this operation) :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
def get_lineage_list(
    self, lineage_request: LineageListRequest
) -> LineageListResults:
    """
    Retrieve lineage using the higher-performance "list" API.

    :param lineage_request: detailing the lineage query, parameters, and so on to run
    :returns: the results of the lineage request
    :raises InvalidRequestError: if the requested lineage direction is 'BOTH' (unsupported for this operation)
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = GetLineageList.prepare_request(lineage_request)
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    response = GetLineageList.process_response(raw_json, lineage_request)
    return LineageListResults(
        client=self._client,
        criteria=lineage_request,
        start=lineage_request.offset or 0,
        size=lineage_request.size or 10,
        has_more=response["has_more"],
        assets=response["assets"],
    )
process_assets(search: IndexSearchRequestProvider, func: Callable[[Asset], None]) -> int

Process assets matching a search query and apply a processing function to each unique asset.

This function iteratively searches for assets using the search provider and processes each unique asset using the provided callable function. The uniqueness of assets is determined based on their GUIDs. If new assets are found in later iterations that haven't been processed yet, the process continues until no more new assets are available to process.

Parameters:

Name Type Description Default
search IndexSearchRequestProvider

IndexSearchRequestProvider The search provider that generates search queries and contains the criteria for searching the assets such as a FluentSearch.

required
func Callable[[Asset], None]

Callable[[Asset], None] A callable function that receives each unique asset as its parameter and performs the required operations on it.

required

Returns:

Name Type Description
int int

The total number of unique assets that have been processed.

Source code in pyatlan/client/asset.py
def process_assets(
    self, search: IndexSearchRequestProvider, func: Callable[[Asset], None]
) -> int:
    """
    Process assets matching a search query and apply a processing function to each unique asset.

    This function iteratively searches for assets using the search provider and processes each
    unique asset using the provided callable function. The uniqueness of assets is determined
    based on their GUIDs. If new assets are found in later iterations that haven't been
    processed yet, the process continues until no more new assets are available to process.

    Arguments:
        search: IndexSearchRequestProvider
            The search provider that generates search queries and contains the criteria for
            searching the assets such as a FluentSearch.
        func: Callable[[Asset], None]
            A callable function that receives each unique asset as its parameter and performs
            the required operations on it.

    Returns:
        int: The total number of unique assets that have been processed.
    """
    guids_processed: set[str] = set()
    has_assets_to_process: bool = True
    iteration_count = 0
    while has_assets_to_process:
        iteration_count += 1
        has_assets_to_process = False
        response = self.search(search.to_request())
        LOGGER.debug(
            "Iteration %d found %d assets.", iteration_count, response.count
        )
        for asset in response:
            if asset.guid not in guids_processed:
                guids_processed.add(asset.guid)
                has_assets_to_process = True
                func(asset)
    return len(guids_processed)
purge_by_guid(guid: Union[str, List[str]], delete_type: AtlanDeleteType = AtlanDeleteType.PURGE) -> AssetMutationResponse

Deletes one or more assets by their unique identifier (GUID) using the specified delete type.

:param guid: unique identifier(s) (GUIDs) of one or more assets to delete :param delete_type: type of deletion to perform:

- PURGE: completely removes entity and all audit/history traces (default, irreversible)
- HARD: physically removes entity but keeps audit history (irreversible)

:returns: details of the deleted asset(s) :raises AtlanError: on any API communication issue

.. warning:: PURGE and HARD deletions are irreversible operations. Use with caution.

Source code in pyatlan/client/asset.py
@validate_arguments
def purge_by_guid(
    self,
    guid: Union[str, List[str]],
    delete_type: AtlanDeleteType = AtlanDeleteType.PURGE,
) -> AssetMutationResponse:
    """
    Deletes one or more assets by their unique identifier (GUID) using the specified delete type.

    :param guid: unique identifier(s) (GUIDs) of one or more assets to delete
    :param delete_type: type of deletion to perform:

        - PURGE: completely removes entity and all audit/history traces (default, irreversible)
        - HARD: physically removes entity but keeps audit history (irreversible)

    :returns: details of the deleted asset(s)
    :raises AtlanError: on any API communication issue

    .. warning::
        PURGE and HARD deletions are irreversible operations. Use with caution.
    """
    query_params = PurgeByGuid.prepare_request(guid, delete_type)
    raw_json = self._client._call_api(
        DELETE_ENTITIES_BY_GUIDS, query_params=query_params
    )
    return PurgeByGuid.process_response(raw_json)
remove_announcement(asset_type: Type[A], qualified_name: str, name: str, glossary_guid: Optional[str] = None) -> Optional[A]
remove_announcement(
    asset_type: Type[AtlasGlossaryTerm],
    qualified_name: str,
    name: str,
    glossary_guid: str,
) -> Optional[AtlasGlossaryTerm]
remove_announcement(
    asset_type: Type[AtlasGlossaryCategory],
    qualified_name: str,
    name: str,
    glossary_guid: str,
) -> Optional[AtlasGlossaryCategory]
remove_announcement(
    asset_type: Type[A],
    qualified_name: str,
    name: str,
    glossary_guid: Optional[str] = None,
) -> Optional[A]

Remove the announcement from an asset.

:param asset_type: type of asset from which to remove the announcement :param qualified_name: the qualified_name of the asset from which to remove the announcement :param glossary_guid: unique identifier of the glossary, required only when the asset type is AtlasGlossaryTerm or AtlasGlossaryCategory :returns: the result of the removal, or None if the removal failed

Source code in pyatlan/client/asset.py
@validate_arguments
def remove_announcement(
    self,
    asset_type: Type[A],
    qualified_name: str,
    name: str,
    glossary_guid: Optional[str] = None,
) -> Optional[A]:
    """
    Remove the announcement from an asset.

    :param asset_type: type of asset from which to remove the announcement
    :param qualified_name: the qualified_name of the asset from which to remove the announcement
    :param glossary_guid: unique identifier of the glossary, required
    only when the asset type is `AtlasGlossaryTerm` or `AtlasGlossaryCategory`
    :returns: the result of the removal, or None if the removal failed
    """

    # Prepare asset for announcement removal using shared logic
    asset = RemoveAnnouncement.prepare_asset_for_announcement_removal(
        asset_type=asset_type,
        qualified_name=qualified_name,
        name=name,
        glossary_guid=glossary_guid,
    )

    # Execute update using shared logic
    return self._update_asset_by_attribute(asset, asset_type, qualified_name)
remove_atlan_tag(asset_type: Type[A], qualified_name: str, atlan_tag_name: str) -> A

Removes a single Atlan tag from the provided asset.

:param asset_type: type of asset to which to add the Atlan tags :param qualified_name: qualified_name of the asset to which to add the Atlan tags :param atlan_tag_name: human-readable name of the Atlan tag to remove from the asset :returns: the asset that was updated (note that it will NOT contain details of the deleted Atlan tag) :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
@validate_arguments
def remove_atlan_tag(
    self,
    asset_type: Type[A],
    qualified_name: str,
    atlan_tag_name: str,
) -> A:
    """
    Removes a single Atlan tag from the provided asset.

    :param asset_type: type of asset to which to add the Atlan tags
    :param qualified_name: qualified_name of the asset to which to add the Atlan tags
    :param atlan_tag_name: human-readable name of the Atlan tag to remove from the asset
    :returns: the asset that was updated (note that it will NOT contain details of the deleted Atlan tag)
    :raises AtlanError: on any API communication issue
    """
    return self._modify_tags(
        asset_type=asset_type,
        qualified_name=qualified_name,
        atlan_tag_names=[atlan_tag_name],
        modification_type="remove",
        save_parameters={
            "replace_atlan_tags": False,
            "append_atlan_tags": True,
        },
    )
remove_atlan_tags(asset_type: Type[A], qualified_name: str, atlan_tag_names: List[str]) -> A

Removes one or more Atlan tag from the provided asset.

:param asset_type: type of asset to which to add the Atlan tags :param qualified_name: qualified_name of the asset to which to add the Atlan tags :param atlan_tag_names: human-readable name of the Atlan tag to remove from the asset :returns: the asset that was updated (note that it will NOT contain details of the deleted Atlan tags) :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
@validate_arguments
def remove_atlan_tags(
    self,
    asset_type: Type[A],
    qualified_name: str,
    atlan_tag_names: List[str],
) -> A:
    """
    Removes one or more Atlan tag from the provided asset.

    :param asset_type: type of asset to which to add the Atlan tags
    :param qualified_name: qualified_name of the asset to which to add the Atlan tags
    :param atlan_tag_names: human-readable name of the Atlan tag to remove from the asset
    :returns: the asset that was updated (note that it will NOT contain details of the deleted Atlan tags)
    :raises AtlanError: on any API communication issue
    """
    return self._modify_tags(
        asset_type=asset_type,
        qualified_name=qualified_name,
        atlan_tag_names=atlan_tag_names,
        modification_type="remove",
        save_parameters={
            "replace_atlan_tags": False,
            "append_atlan_tags": True,
        },
    )
remove_certificate(asset_type: Type[A], qualified_name: str, name: str, glossary_guid: Optional[str] = None) -> Optional[A]
remove_certificate(
    asset_type: Type[AtlasGlossaryTerm],
    qualified_name: str,
    name: str,
    glossary_guid: str,
) -> Optional[AtlasGlossaryTerm]
remove_certificate(
    asset_type: Type[AtlasGlossaryCategory],
    qualified_name: str,
    name: str,
    glossary_guid: str,
) -> Optional[AtlasGlossaryCategory]
remove_certificate(
    asset_type: Type[A],
    qualified_name: str,
    name: str,
    glossary_guid: Optional[str] = None,
) -> Optional[A]

Remove the certificate from an asset.

:param asset_type: type of asset from which to remove the certificate :param qualified_name: the qualified_name of the asset from which to remove the certificate :param name: the name of the asset from which to remove the certificate :param glossary_guid: unique identifier of the glossary, required only when the asset type is AtlasGlossaryTerm or AtlasGlossaryCategory :returns: the result of the removal, or None if the removal failed

Source code in pyatlan/client/asset.py
@validate_arguments
def remove_certificate(
    self,
    asset_type: Type[A],
    qualified_name: str,
    name: str,
    glossary_guid: Optional[str] = None,
) -> Optional[A]:
    """
    Remove the certificate from an asset.

    :param asset_type: type of asset from which to remove the certificate
    :param qualified_name: the qualified_name of the asset from which to remove the certificate
    :param name: the name of the asset from which to remove the certificate
    :param glossary_guid: unique identifier of the glossary, required
    only when the asset type is `AtlasGlossaryTerm` or `AtlasGlossaryCategory`
    :returns: the result of the removal, or None if the removal failed
    """

    # Prepare asset for certificate removal using shared logic
    asset = RemoveCertificate.prepare_asset_for_certificate_removal(
        asset_type=asset_type,
        qualified_name=qualified_name,
        name=name,
        glossary_guid=glossary_guid,
    )

    # Execute update using shared logic
    return self._update_asset_by_attribute(asset, asset_type, qualified_name)
remove_custom_metadata(guid: str, cm_name: str)

Remove specific custom metadata from an asset.

:param guid: unique identifier (GUID) of the asset :param cm_name: human-readable name of the custom metadata to remove :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
@validate_arguments
def remove_custom_metadata(self, guid: str, cm_name: str):
    """
    Remove specific custom metadata from an asset.

    :param guid: unique identifier (GUID) of the asset
    :param cm_name: human-readable name of the custom metadata to remove
    :raises AtlanError: on any API communication issue
    """

    # Prepare request using shared logic (includes clear_all())
    custom_metadata_request = RemoveCustomMetadata.prepare_request(
        cm_name, self._client
    )

    # Get API endpoint using shared logic
    endpoint = ManageCustomMetadata.get_api_endpoint(
        guid, custom_metadata_request.custom_metadata_set_id
    )

    # Make API call
    self._client._call_api(endpoint, None, custom_metadata_request)
remove_terms(asset_type: Type[A], terms: List[AtlasGlossaryTerm], guid: Optional[str] = None, qualified_name: Optional[str] = None) -> A

Remove terms from an asset, without replacing all existing terms linked to the asset. Note: this operation must make two API calls — one to retrieve the asset's existing terms, and a second to remove the provided terms.

:param asset_type: type of the asset :param terms: the list of terms to remove from the asset (note: these must be references by GUID to efficiently remove any existing terms) :param guid: unique identifier (GUID) of the asset from which to remove the terms :param qualified_name: the qualified_name of the asset from which to remove the terms :returns: the asset that was updated (note that it will NOT contain details of the resulting terms)

Source code in pyatlan/client/asset.py
@validate_arguments
def remove_terms(
    self,
    asset_type: Type[A],
    terms: List[AtlasGlossaryTerm],
    guid: Optional[str] = None,
    qualified_name: Optional[str] = None,
) -> A:
    """
    Remove terms from an asset, without replacing all existing terms linked to the asset.
    Note: this operation must make two API calls — one to retrieve the asset's existing terms,
    and a second to remove the provided terms.

    :param asset_type: type of the asset
    :param terms: the list of terms to remove from the asset (note: these must be references by GUID to efficiently
                  remove any existing terms)
    :param guid: unique identifier (GUID) of the asset from which to remove the terms
    :param qualified_name: the qualified_name of the asset from which to remove the terms
    :returns: the asset that was updated (note that it will NOT contain details of the resulting terms)
    """
    return self._manage_terms(
        asset_type=asset_type,
        terms=terms,
        save_semantic=SaveSemantic.REMOVE,
        guid=guid,
        qualified_name=qualified_name,
    )
replace_custom_metadata(guid: str, custom_metadata: CustomMetadataDict)

Replace specific custom metadata on the asset. This will replace everything within the named custom metadata, but will not change any of hte other named custom metadata on the asset.

:param guid: unique identifier (GUID) of the asset :param custom_metadata: custom metadata to replace, as human-readable names mapped to values :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def replace_custom_metadata(self, guid: str, custom_metadata: CustomMetadataDict):
    """
    Replace specific custom metadata on the asset. This will replace everything within the named
    custom metadata, but will not change any of hte other named custom metadata on the asset.

    :param guid: unique identifier (GUID) of the asset
    :param custom_metadata: custom metadata to replace, as human-readable names mapped to values
    :raises AtlanError: on any API communication issue
    """

    # Prepare request using shared logic (includes clear_unset())
    custom_metadata_request = ReplaceCustomMetadata.prepare_request(custom_metadata)

    # Get API endpoint using shared logic
    endpoint = ManageCustomMetadata.get_api_endpoint(
        guid, custom_metadata_request.custom_metadata_set_id
    )

    # Make API call
    self._client._call_api(endpoint, None, custom_metadata_request)
replace_terms(asset_type: Type[A], terms: List[AtlasGlossaryTerm], guid: Optional[str] = None, qualified_name: Optional[str] = None) -> A

Replace the terms linked to an asset. (At least one of the GUID or qualified_name must be supplied, but both are not necessary.)

:param asset_type: type of the asset :param terms: the list of terms to replace on the asset, or an empty list to remove all terms from an asset :param guid: unique identifier (GUID) of the asset to which to replace the terms :param qualified_name: the qualified_name of the asset to which to replace the terms :returns: the asset that was updated (note that it will NOT contain details of the replaced terms)

Source code in pyatlan/client/asset.py
@validate_arguments
def replace_terms(
    self,
    asset_type: Type[A],
    terms: List[AtlasGlossaryTerm],
    guid: Optional[str] = None,
    qualified_name: Optional[str] = None,
) -> A:
    """
    Replace the terms linked to an asset.
    (At least one of the GUID or qualified_name must be supplied, but both are not necessary.)

    :param asset_type: type of the asset
    :param terms: the list of terms to replace on the asset, or an empty list to remove all terms from an asset
    :param guid: unique identifier (GUID) of the asset to which to replace the terms
    :param qualified_name: the qualified_name of the asset to which to replace the terms
    :returns: the asset that was updated (note that it will NOT contain details of the replaced terms)
    """
    return self._manage_terms(
        asset_type=asset_type,
        terms=terms,
        save_semantic=SaveSemantic.REPLACE,
        guid=guid,
        qualified_name=qualified_name,
    )
restore(asset_type: Type[A], qualified_name: str) -> bool

Restore an archived (soft-deleted) asset to active.

:param asset_type: type of the asset to restore :param qualified_name: of the asset to restore :returns: True if the asset is now restored, or False if not :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
@validate_arguments
def restore(self, asset_type: Type[A], qualified_name: str) -> bool:
    """
    Restore an archived (soft-deleted) asset to active.

    :param asset_type: type of the asset to restore
    :param qualified_name: of the asset to restore
    :returns: True if the asset is now restored, or False if not
    :raises AtlanError: on any API communication issue
    """
    return self._restore(asset_type, qualified_name, 0)
retrieve_minimal(guid: str, asset_type: Type[A] = Asset) -> A

Retrieves an asset by its GUID, without any of its relationships.

:param guid: unique identifier (GUID) of the asset to retrieve :param asset_type: type of asset to be retrieved, defaults to Asset :returns: the asset, without any of its relationships :raises NotFoundError: if the asset does not exist

Source code in pyatlan/client/asset.py
@validate_arguments
def retrieve_minimal(
    self,
    guid: str,
    asset_type: Type[A] = Asset,  # type: ignore[assignment]
) -> A:
    """
    Retrieves an asset by its GUID, without any of its relationships.

    :param guid: unique identifier (GUID) of the asset to retrieve
    :param asset_type: type of asset to be retrieved, defaults to `Asset`
    :returns: the asset, without any of its relationships
    :raises NotFoundError: if the asset does not exist
    """
    return self.get_by_guid(
        guid=guid,
        asset_type=asset_type,
        min_ext_info=True,
        ignore_relationships=True,
    )
save(entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False, replace_custom_metadata: bool = False, overwrite_custom_metadata: bool = False, append_atlan_tags: bool = False) -> AssetMutationResponse

If an asset with the same qualified_name exists, updates the existing asset. Otherwise, creates the asset. If an asset does exist, opertionally overwrites any Atlan tags. Custom metadata will either be overwritten or merged depending on the options provided.

:param entity: one or more assets to save :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) :param replace_custom_metadata: replaces any custom metadata with non-empty values provided :param overwrite_custom_metadata: overwrites any custom metadata, even with empty values :param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False) :returns: the result of the save :raises AtlanError: on any API communication issue :raises ApiError: if a connection was created and blocking until policies are synced overruns the retry limit

Source code in pyatlan/client/asset.py
@validate_arguments
def save(
    self,
    entity: Union[Asset, List[Asset]],
    replace_atlan_tags: bool = False,
    replace_custom_metadata: bool = False,
    overwrite_custom_metadata: bool = False,
    append_atlan_tags: bool = False,
) -> AssetMutationResponse:
    """
    If an asset with the same qualified_name exists, updates the existing asset. Otherwise, creates the asset.
    If an asset does exist, opertionally overwrites any Atlan tags. Custom metadata will either be
    overwritten or merged depending on the options provided.

    :param entity: one or more assets to save
    :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False)
    :param replace_custom_metadata: replaces any custom metadata with non-empty values provided
    :param overwrite_custom_metadata: overwrites any custom metadata, even with empty values
    :param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False)
    :returns: the result of the save
    :raises AtlanError: on any API communication issue
    :raises ApiError: if a connection was created and blocking until policies are synced overruns the retry limit
    """
    query_params, request = Save.prepare_request(
        entity=entity,
        replace_atlan_tags=replace_atlan_tags,
        replace_custom_metadata=replace_custom_metadata,
        overwrite_custom_metadata=overwrite_custom_metadata,
        append_atlan_tags=append_atlan_tags,
        client=self._client,  # type: ignore[arg-type]
    )
    raw_json = self._client._call_api(BULK_UPDATE, query_params, request)
    response = Save.process_response(raw_json)
    if connections_created := response.assets_created(Connection):
        self._wait_for_connections_to_be_created(connections_created)
    return response
save_merging_cm(entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False) -> AssetMutationResponse

If no asset exists, has the same behavior as the upsert() method, while also setting any custom metadata provided. If an asset does exist, optionally overwrites any Atlan tags. Will merge any provided custom metadata with any custom metadata that already exists on the asset.

:param entity: one or more assets to save :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) :returns: details of the created or updated assets

Source code in pyatlan/client/asset.py
@validate_arguments
def save_merging_cm(
    self, entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False
) -> AssetMutationResponse:
    """
    If no asset exists, has the same behavior as the upsert() method, while also setting
    any custom metadata provided. If an asset does exist, optionally overwrites any Atlan tags.
    Will merge any provided custom metadata with any custom metadata that already exists on the asset.

    :param entity: one or more assets to save
    :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False)
    :returns: details of the created or updated assets
    """
    return self.save(
        entity=entity,
        replace_atlan_tags=replace_atlan_tags,
        replace_custom_metadata=True,
        overwrite_custom_metadata=False,
    )
save_replacing_cm(entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False) -> AssetMutationResponse

If no asset exists, has the same behavior as the upsert() method, while also setting any custom metadata provided. If an asset does exist, optionally overwrites any Atlan tags. Will overwrite all custom metadata on any existing asset with only the custom metadata provided (wiping out any other custom metadata on an existing asset that is not provided in the request).

:param entity: one or more assets to save :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) :returns: details of the created or updated assets :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
@validate_arguments
def save_replacing_cm(
    self, entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False
) -> AssetMutationResponse:
    """
    If no asset exists, has the same behavior as the upsert() method, while also setting
    any custom metadata provided.
    If an asset does exist, optionally overwrites any Atlan tags.
    Will overwrite all custom metadata on any existing asset with only the custom metadata provided
    (wiping out any other custom metadata on an existing asset that is not provided in the request).

    :param entity: one or more assets to save
    :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False)
    :returns: details of the created or updated assets
    :raises AtlanError: on any API communication issue
    """
    query_params, request = Save.prepare_request_replacing_cm(
        entity=entity,
        replace_atlan_tags=replace_atlan_tags,
        client=self._client,  # type: ignore[arg-type]
    )
    raw_json = self._client._call_api(BULK_UPDATE, query_params, request)
    return Save.process_response_replacing_cm(raw_json)
search(criteria: IndexSearchRequest, bulk=False) -> IndexSearchResults

Search for assets using the provided criteria. Note: if the number of results exceeds the predefined threshold (100,000 assets) this will be automatically converted into a bulk search.

:param criteria: detailing the search query, parameters, and so on to run :param bulk: whether to run the search to retrieve assets that match the supplied criteria, for large numbers of results (> 100,000), defaults to False. Note: this will reorder the results (based on creation timestamp) in order to iterate through a large number (more than 100,000) results. :raises InvalidRequestError:

- if bulk search is enabled (`bulk=True`) and any
  user-specified sorting options are found in the search request.
- if bulk search is disabled (`bulk=False`) and the number of results
  exceeds the predefined threshold (i.e: `100,000` assets)
  and any user-specified sorting options are found in the search request.

:raises AtlanError: on any API communication issue :returns: the results of the search

Source code in pyatlan/client/asset.py
def search(self, criteria: IndexSearchRequest, bulk=False) -> IndexSearchResults:
    """
    Search for assets using the provided criteria.
    `Note:` if the number of results exceeds the predefined threshold
    (100,000 assets) this will be automatically converted into a `bulk` search.

    :param criteria: detailing the search query, parameters, and so on to run
    :param bulk: whether to run the search to retrieve assets that match the supplied criteria,
    for large numbers of results (> `100,000`), defaults to `False`. Note: this will reorder the results
    (based on creation timestamp) in order to iterate through a large number (more than `100,000`) results.
    :raises InvalidRequestError:

        - if bulk search is enabled (`bulk=True`) and any
          user-specified sorting options are found in the search request.
        - if bulk search is disabled (`bulk=False`) and the number of results
          exceeds the predefined threshold (i.e: `100,000` assets)
          and any user-specified sorting options are found in the search request.

    :raises AtlanError: on any API communication issue
    :returns: the results of the search
    """
    endpoint, request_obj = Search.prepare_request(criteria, bulk)
    raw_json = self._client._call_api(
        endpoint,
        request_obj=request_obj,
    )
    response = Search.process_response(raw_json, criteria)
    if Search._check_for_bulk_search(criteria, response["count"], bulk):
        return self.search(criteria)
    return IndexSearchResults(
        client=self._client,
        criteria=criteria,
        start=criteria.dsl.from_,
        size=criteria.dsl.size,
        count=response["count"],
        assets=response["assets"],
        aggregations=response["aggregations"],
        bulk=bulk,
    )
set_dq_row_scope_filter_column(asset_type: Type[A], asset_name: str, asset_qualified_name: str, row_scope_filter_column_qualified_name: str) -> AssetMutationResponse

Set the row scope filter column for data quality rules on an asset.

:param asset_type: the type of asset to update (e.g., Table) :param asset_name: the name of the asset to update :param asset_qualified_name: the qualified name of the asset to update :param row_scope_filter_column_qualified_name: the qualified name of the column to use for row scope filtering :returns: the result of the save :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
@validate_arguments
def set_dq_row_scope_filter_column(
    self,
    asset_type: Type[A],
    asset_name: str,
    asset_qualified_name: str,
    row_scope_filter_column_qualified_name: str,
) -> AssetMutationResponse:
    """
    Set the row scope filter column for data quality rules on an asset.

    :param asset_type: the type of asset to update (e.g., Table)
    :param asset_name: the name of the asset to update
    :param asset_qualified_name: the qualified name of the asset to update
    :param row_scope_filter_column_qualified_name: the qualified name of the column to use for row scope filtering
    :returns: the result of the save
    :raises AtlanError: on any API communication issue
    """
    updated_asset = asset_type.updater(
        qualified_name=asset_qualified_name, name=asset_name
    )
    updated_asset.asset_d_q_row_scope_filter_column_qualified_name = (
        row_scope_filter_column_qualified_name
    )
    response = self.save(updated_asset)
    return response
update_announcement(asset_type: Type[A], qualified_name: str, name: str, announcement: Announcement, glossary_guid: Optional[str] = None) -> Optional[A]
update_announcement(
    asset_type: Type[AtlasGlossaryTerm],
    qualified_name: str,
    name: str,
    announcement: Announcement,
    glossary_guid: str,
) -> Optional[AtlasGlossaryTerm]
update_announcement(
    asset_type: Type[AtlasGlossaryCategory],
    qualified_name: str,
    name: str,
    announcement: Announcement,
    glossary_guid: str,
) -> Optional[AtlasGlossaryCategory]
update_announcement(
    asset_type: Type[A],
    qualified_name: str,
    name: str,
    announcement: Announcement,
    glossary_guid: Optional[str] = None,
) -> Optional[A]

Update the announcement on an asset.

:param asset_type: type of asset on which to update the announcement :param qualified_name: the qualified_name of the asset on which to update the announcement :param name: the name of the asset on which to update the announcement :param announcement: to apply to the asset :param glossary_guid: unique identifier of the glossary, required only when the asset type is AtlasGlossaryTerm or AtlasGlossaryCategory :returns: the result of the update, or None if the update failed

Source code in pyatlan/client/asset.py
@validate_arguments
def update_announcement(
    self,
    asset_type: Type[A],
    qualified_name: str,
    name: str,
    announcement: Announcement,
    glossary_guid: Optional[str] = None,
) -> Optional[A]:
    """
    Update the announcement on an asset.

    :param asset_type: type of asset on which to update the announcement
    :param qualified_name: the qualified_name of the asset on which to update the announcement
    :param name: the name of the asset on which to update the announcement
    :param announcement: to apply to the asset
    :param glossary_guid: unique identifier of the glossary, required
    only when the asset type is `AtlasGlossaryTerm` or `AtlasGlossaryCategory`
    :returns: the result of the update, or None if the update failed
    """

    # Prepare asset with announcement using shared logic
    asset = UpdateAnnouncement.prepare_asset_with_announcement(
        asset_type=asset_type,
        qualified_name=qualified_name,
        name=name,
        announcement=announcement,
        glossary_guid=glossary_guid,
    )

    # Execute update using shared logic
    return self._update_asset_by_attribute(asset, asset_type, qualified_name)
update_atlan_tags(asset_type: Type[A], qualified_name: str, atlan_tag_names: List[str], propagate: bool = False, remove_propagation_on_delete: bool = True, restrict_lineage_propagation: bool = True, restrict_propagation_through_hierarchy: bool = False) -> A

Update one or more Atlan tags to the provided asset.

:param asset_type: type of asset to which to update the Atlan tags :param qualified_name: qualified_name of the asset to which to update the Atlan tags :param atlan_tag_names: human-readable names of the Atlan tags to update to the asset :param propagate: whether to propagate the Atlan tag (True) or not (False) :param remove_propagation_on_delete: whether to remove the propagated Atlan tags when the Atlan tag is removed from this asset (True) or not (False) :param restrict_lineage_propagation: whether to avoid propagating through lineage (True) or do propagate through lineage (False) :param restrict_propagation_through_hierarchy: whether to prevent this Atlan tag from propagating through hierarchy (True) or allow it to propagate through hierarchy (False) :returns: the asset that was updated (note that it will NOT contain details of the updated Atlan tags) :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
@validate_arguments
def update_atlan_tags(
    self,
    asset_type: Type[A],
    qualified_name: str,
    atlan_tag_names: List[str],
    propagate: bool = False,
    remove_propagation_on_delete: bool = True,
    restrict_lineage_propagation: bool = True,
    restrict_propagation_through_hierarchy: bool = False,
) -> A:
    """
    Update one or more Atlan tags to the provided asset.

    :param asset_type: type of asset to which to update the Atlan tags
    :param qualified_name: qualified_name of the asset to which to update the Atlan tags
    :param atlan_tag_names: human-readable names of the Atlan tags to update to the asset
    :param propagate: whether to propagate the Atlan tag (True) or not (False)
    :param remove_propagation_on_delete: whether to remove the propagated Atlan tags
    when the Atlan tag is removed from this asset (True) or not (False)
    :param restrict_lineage_propagation: whether to avoid propagating
    through lineage (True) or do propagate through lineage (False)
    :param restrict_propagation_through_hierarchy: whether to prevent this Atlan tag from
    propagating through hierarchy (True) or allow it to propagate through hierarchy (False)
    :returns: the asset that was updated (note that it will NOT contain details of the updated Atlan tags)
    :raises AtlanError: on any API communication issue
    """
    return self._modify_tags(
        asset_type=asset_type,
        qualified_name=qualified_name,
        atlan_tag_names=atlan_tag_names,
        propagate=propagate,
        remove_propagation_on_delete=remove_propagation_on_delete,
        restrict_lineage_propagation=restrict_lineage_propagation,
        restrict_propagation_through_hierarchy=restrict_propagation_through_hierarchy,
        modification_type="update",
        save_parameters={
            "replace_atlan_tags": False,
            "append_atlan_tags": True,
        },
    )
update_certificate(asset_type: Type[A], qualified_name: str, name: str, certificate_status: CertificateStatus, glossary_guid: Optional[str] = None, message: Optional[str] = None) -> Optional[A]
update_certificate(
    asset_type: Type[AtlasGlossaryTerm],
    qualified_name: str,
    name: str,
    certificate_status: CertificateStatus,
    glossary_guid: str,
    message: Optional[str] = None,
) -> Optional[AtlasGlossaryTerm]
update_certificate(
    asset_type: Type[AtlasGlossaryCategory],
    qualified_name: str,
    name: str,
    certificate_status: CertificateStatus,
    glossary_guid: str,
    message: Optional[str] = None,
) -> Optional[AtlasGlossaryCategory]
update_certificate(
    asset_type: Type[A],
    qualified_name: str,
    name: str,
    certificate_status: CertificateStatus,
    glossary_guid: Optional[str] = None,
    message: Optional[str] = None,
) -> Optional[A]

Update the certificate on an asset.

:param asset_type: type of asset on which to update the certificate :param qualified_name: the qualified_name of the asset on which to update the certificate :param name: the name of the asset on which to update the certificate :param certificate_status: specific certificate to set on the asset :param glossary_guid: unique identifier of the glossary, required only when the asset type is AtlasGlossaryTerm or AtlasGlossaryCategory :param message: (optional) message to set (or None for no message) :returns: the result of the update, or None if the update failed :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
@validate_arguments
def update_certificate(
    self,
    asset_type: Type[A],
    qualified_name: str,
    name: str,
    certificate_status: CertificateStatus,
    glossary_guid: Optional[str] = None,
    message: Optional[str] = None,
) -> Optional[A]:
    """
    Update the certificate on an asset.

    :param asset_type: type of asset on which to update the certificate
    :param qualified_name: the qualified_name of the asset on which to update the certificate
    :param name: the name of the asset on which to update the certificate
    :param certificate_status: specific certificate to set on the asset
    :param glossary_guid: unique identifier of the glossary, required
    only when the asset type is `AtlasGlossaryTerm` or `AtlasGlossaryCategory`
    :param message: (optional) message to set (or None for no message)
    :returns: the result of the update, or None if the update failed
    :raises AtlanError: on any API communication issue
    """

    # Prepare asset with certificate using shared logic
    asset = UpdateCertificate.prepare_asset_with_certificate(
        asset_type=asset_type,
        qualified_name=qualified_name,
        name=name,
        certificate_status=certificate_status,
        message=message,
        glossary_guid=glossary_guid,
    )

    # Execute update using shared logic
    return self._update_asset_by_attribute(asset, asset_type, qualified_name)
update_custom_metadata_attributes(guid: str, custom_metadata: CustomMetadataDict)
ManageCustomMetadata,
UpdateCustomMetadataAttributes,

)

Update only the provided custom metadata attributes on the asset. This will leave all other custom metadata attributes, even within the same named custom metadata, unchanged.

:param guid: unique identifier (GUID) of the asset :param custom_metadata: custom metadata to update, as human-readable names mapped to values :raises AtlanError: on any API communication issue

Source code in pyatlan/client/asset.py
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def update_custom_metadata_attributes(
    self, guid: str, custom_metadata: CustomMetadataDict
):
    """
        ManageCustomMetadata,
        UpdateCustomMetadataAttributes,
    )

    Update only the provided custom metadata attributes on the asset. This will leave all
    other custom metadata attributes, even within the same named custom metadata, unchanged.

    :param guid: unique identifier (GUID) of the asset
    :param custom_metadata: custom metadata to update, as human-readable names mapped to values
    :raises AtlanError: on any API communication issue
    """
    # Prepare request using shared logic
    custom_metadata_request = UpdateCustomMetadataAttributes.prepare_request(
        custom_metadata
    )

    # Get API endpoint using shared logic
    endpoint = ManageCustomMetadata.get_api_endpoint(
        guid, custom_metadata_request.custom_metadata_set_id
    )

    # Make API call
    self._client._call_api(endpoint, None, custom_metadata_request)
update_merging_cm(entity: Asset, replace_atlan_tags: bool = False) -> AssetMutationResponse

If no asset exists, fails with a NotFoundError. Will merge any provided custom metadata with any custom metadata that already exists on the asset. If an asset does exist, optionally overwrites any Atlan tags.

:param entity: the asset to update :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) :returns: details of the updated asset :raises NotFoundError: if the asset does not exist (will not create it)

Source code in pyatlan/client/asset.py
@validate_arguments
def update_merging_cm(
    self, entity: Asset, replace_atlan_tags: bool = False
) -> AssetMutationResponse:
    """
    If no asset exists, fails with a NotFoundError. Will merge any provided
    custom metadata with any custom metadata that already exists on the asset.
    If an asset does exist, optionally overwrites any Atlan tags.

    :param entity: the asset to update
    :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False)
    :returns: details of the updated asset
    :raises NotFoundError: if the asset does not exist (will not create it)
    """
    UpdateAsset.validate_asset_exists(
        qualified_name=entity.qualified_name or "",
        asset_type=type(entity),
        get_by_qualified_name_func=self.get_by_qualified_name,
    )
    return self.save_merging_cm(
        entity=entity, replace_atlan_tags=replace_atlan_tags
    )
update_replacing_cm(entity: Asset, replace_atlan_tags: bool = False) -> AssetMutationResponse

If no asset exists, fails with a NotFoundError. Will overwrite all custom metadata on any existing asset with only the custom metadata provided (wiping out any other custom metadata on an existing asset that is not provided in the request). If an asset does exist, optionally overwrites any Atlan tags.

:param entity: the asset to update :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) :returns: details of the updated asset :raises NotFoundError: if the asset does not exist (will not create it)

Source code in pyatlan/client/asset.py
@validate_arguments
def update_replacing_cm(
    self, entity: Asset, replace_atlan_tags: bool = False
) -> AssetMutationResponse:
    """
    If no asset exists, fails with a NotFoundError.
    Will overwrite all custom metadata on any existing asset with only the custom metadata provided
    (wiping out any other custom metadata on an existing asset that is not provided in the request).
    If an asset does exist, optionally overwrites any Atlan tags.

    :param entity: the asset to update
    :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False)
    :returns: details of the updated asset
    :raises NotFoundError: if the asset does not exist (will not create it)
    """
    UpdateAsset.validate_asset_exists(
        qualified_name=entity.qualified_name or "",
        asset_type=type(entity),
        get_by_qualified_name_func=self.get_by_qualified_name,
    )
    return self.save_replacing_cm(
        entity=entity, replace_atlan_tags=replace_atlan_tags
    )
upsert(entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False, replace_custom_metadata: bool = False, overwrite_custom_metadata: bool = False) -> AssetMutationResponse

Deprecated - use save() instead.

Source code in pyatlan/client/asset.py
@validate_arguments
def upsert(
    self,
    entity: Union[Asset, List[Asset]],
    replace_atlan_tags: bool = False,
    replace_custom_metadata: bool = False,
    overwrite_custom_metadata: bool = False,
) -> AssetMutationResponse:
    """Deprecated - use save() instead."""
    warn(
        "This method is deprecated, please use 'save' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.save(
        entity=entity,
        replace_atlan_tags=replace_atlan_tags,
        replace_custom_metadata=replace_custom_metadata,
        overwrite_custom_metadata=overwrite_custom_metadata,
    )
upsert_merging_cm(entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False) -> AssetMutationResponse

Deprecated - use save_merging_cm() instead.

Source code in pyatlan/client/asset.py
@validate_arguments
def upsert_merging_cm(
    self, entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False
) -> AssetMutationResponse:
    """Deprecated - use save_merging_cm() instead."""
    warn(
        "This method is deprecated, please use 'save_merging_cm' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.save_merging_cm(
        entity=entity, replace_atlan_tags=replace_atlan_tags
    )
upsert_replacing_cm(entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False) -> AssetMutationResponse

Deprecated - use save_replacing_cm() instead.

Source code in pyatlan/client/asset.py
@validate_arguments
def upsert_replacing_cm(
    self, entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False
) -> AssetMutationResponse:
    """Deprecated - use save_replacing_cm() instead."""
    warn(
        "This method is deprecated, please use 'save_replacing_cm' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self.save_replacing_cm(
        entity=entity, replace_atlan_tags=replace_atlan_tags
    )

AssetIdentity(type_name: str, qualified_name: str, case_insensitive: bool = False)

Bases: AtlanObject

Class to uniquely identify an asset by its type and qualifiedName.

Initializes an AssetIdentity.

:param type_name: type of the asset. :param qualified_name: qualified name of the asset. :param case_insensitive: Whether the qualified name should be case insensitive.

Source code in pyatlan/client/asset.py
def __init__(
    self, type_name: str, qualified_name: str, case_insensitive: bool = False
):
    """
    Initializes an AssetIdentity.

    :param type_name: type of the asset.
    :param qualified_name: qualified name of the asset.
    :param case_insensitive: Whether the qualified name should be case insensitive.
    """
    if case_insensitive:
        qualified_name = qualified_name.lower()
    super().__init__(type_name=type_name, qualified_name=qualified_name)  # type: ignore[call-arg]
Functions
from_string(combined: str) -> 'AssetIdentity' staticmethod

Reverse-engineer an asset identity from a string representation.

:param combined: The combined (serialized) asset identity. :return: The actual asset identity.

Source code in pyatlan/client/asset.py
@staticmethod
def from_string(combined: str) -> "AssetIdentity":
    """
    Reverse-engineer an asset identity from a string representation.

    :param combined: The combined (serialized) asset identity.
    :return: The actual asset identity.
    """
    tokens = combined.split("::")
    if len(tokens) != 2:
        raise ValueError(f"Invalid asset identity: {combined}")
    return AssetIdentity(type_name=tokens[0], qualified_name=tokens[1])

Batch(client: AtlanClient, max_size: int, replace_atlan_tags: bool = False, custom_metadata_handling: CustomMetadataHandling = CustomMetadataHandling.IGNORE, capture_failures: bool = False, update_only: bool = False, track: bool = False, case_insensitive: bool = False, table_view_agnostic: bool = False, creation_handling: AssetCreationHandling = AssetCreationHandling.FULL)

Utility class for managing bulk updates in batches.

Create a new batch of assets to be bulk-saved.

:param client: AtlanClient to use :param max_size: maximum size of each batch that should be processed (per API call) :param replace_atlan_tags: if True, all Atlan tags on an existing asset will be overwritten; if False, all Atlan tags will be ignored :param custom_metadata_handling: how to handle custom metadata (ignore it, replace it (wiping out anything pre-existing), or merge it) :param capture_failures: when True, any failed batches will be captured and retained rather than exceptions being raised (for large amounts of processing this could cause memory issues!) :param update_only: whether to allow assets to be created (False) or only allow existing assets to be updated (True) :param track: whether to track the basic information about every asset that is created or updated (True) or only track counts (False) :param case_insensitive: when running with update_only as True, whether to consider only exact matches (False) or ignore case (True). :param table_view_agnostic: whether tables and views should be treated interchangeably (an asset in the batch marked as a table will attempt to match a view if not found as a table, and vice versa) :param creation_handling: when allowing assets to be created, how to handle those creations (full assets or partial assets).

Source code in pyatlan/client/asset.py
def __init__(
    self,
    client: AtlanClient,
    max_size: int,
    replace_atlan_tags: bool = False,
    custom_metadata_handling: CustomMetadataHandling = CustomMetadataHandling.IGNORE,
    capture_failures: bool = False,
    update_only: bool = False,
    track: bool = False,
    case_insensitive: bool = False,
    table_view_agnostic: bool = False,
    creation_handling: AssetCreationHandling = AssetCreationHandling.FULL,
):
    """
    Create a new batch of assets to be bulk-saved.

    :param client: AtlanClient to use
    :param max_size: maximum size of each batch
        that should be processed (per API call)
    :param replace_atlan_tags: if True, all Atlan tags on an existing
        asset will be overwritten; if False, all Atlan tags will be ignored
    :param custom_metadata_handling: how to handle custom metadata
        (ignore it, replace it (wiping out anything pre-existing), or merge it)
    :param capture_failures: when True, any failed batches will be
        captured and retained rather than exceptions being raised
        (for large amounts of processing this could cause memory issues!)
    :param update_only: whether to allow assets to be created (False)
        or only allow existing assets to be updated (True)
    :param track: whether to track the basic information about
        every asset that is created or updated (True) or only track counts (False)
    :param case_insensitive: when running with `update_only` as True,
        whether to consider only exact matches (False) or ignore case (True).
    :param table_view_agnostic: whether tables and views should be treated interchangeably
        (an asset in the batch marked as a table will attempt to match a
        view if not found as a table, and vice versa)
    :param creation_handling: when allowing assets to be created,
        how to handle those creations (full assets or partial assets).
    """
    self._client: AtlanClient = client
    self._max_size: int = max_size
    self._replace_atlan_tags: bool = replace_atlan_tags
    self._custom_metadata_handling: CustomMetadataHandling = (
        custom_metadata_handling
    )
    self._capture_failures: bool = capture_failures
    self._update_only: bool = update_only
    self._track: bool = track
    self._case_insensitive: bool = case_insensitive
    self._table_view_agnostic: bool = table_view_agnostic
    self._creation_handling: AssetCreationHandling = creation_handling
    self._num_created = 0
    self._num_updated = 0
    self._num_restored = 0
    self._num_skipped = 0
    self._resolved_guids: Dict[str, str] = {}
    self._batch: List[Asset] = []
    self._failures: List[FailedBatch] = []
    self._created: List[Asset] = []
    self._updated: List[Asset] = []
    self._restored: List[Asset] = []
    self._skipped: List[Asset] = []
    self._resolved_qualified_names: Dict[str, str] = {}
Attributes
created: List[Asset] property

Get a list of all the Assets that were created

:returns: a list of all the Assets that were created

failures: List[FailedBatch] property

Get information on any failed batches

:returns: a list of FailedBatch objects that contain information about any batches that may have failed an empty list will be returned if there are no failures.

num_created: int property

Number of assets that were created (count only)

num_restored: int property

Number of assets that were restored (count only)

num_skipped: int property

Number of assets that were skipped (count only)

num_updated: int property

Number of assets that were updated (count only)

restored: List[Asset] property

Get a list of all the Assets that were potentially restored from being archived, or otherwise touched without actually being updated (minimal info only).

:returns: a list of all the Assets that were restored

skipped: List[Asset] property

Get a list of all the Assets that were skipped. when update only is requested and the asset does not exist in Atlan

:returns: a list of all the Assets that were skipped

updated: List[Asset] property

Get a list of all the Assets that were updated

:returns: a list of all the Assets that were updated

Functions
add(single: Asset) -> Optional[AssetMutationResponse]

Add an asset to the batch to be processed.

:param single: the asset to add to a batch :returns: an AssetMutationResponse containing the results of the save or None if the batch is still queued.

Source code in pyatlan/client/asset.py
@validate_arguments
def add(self, single: Asset) -> Optional[AssetMutationResponse]:
    """
    Add an asset to the batch to be processed.

    :param single: the asset to add to a batch
    :returns: an AssetMutationResponse containing the results of the save or None if the batch is still queued.
    """
    self._batch.append(single)
    return self._process()
flush() -> Optional[AssetMutationResponse]

Flush any remaining assets in the batch.

:returns: n AssetMutationResponse containing the results of the saving any assets that were flushed

Source code in pyatlan/client/asset.py
def flush(self) -> Optional[AssetMutationResponse]:
    """Flush any remaining assets in the batch.

    :returns: n AssetMutationResponse containing the results of the saving any assets that were flushed
    """
    from pyatlan.model.fluent_search import FluentSearch

    revised: list = []
    response: Optional[AssetMutationResponse] = None
    if self._batch:
        fuzzy_match: bool = False
        if self._table_view_agnostic:
            types_in_batch = {asset.type_name for asset in self._batch}
            fuzzy_match = any(
                type_name in types_in_batch
                for type_name in self._TABLE_LEVEL_ASSETS
            )
        if (
            self._update_only
            or self._creation_handling != AssetCreationHandling.FULL
            or fuzzy_match
        ):
            found: Dict[str, str] = {}
            qualified_names = [asset.qualified_name or "" for asset in self._batch]
            if self._case_insensitive:
                search = FluentSearch().select(include_archived=True).min_somes(1)
                for qn in qualified_names:
                    search = search.where_some(
                        Asset.QUALIFIED_NAME.eq(
                            value=qn or "", case_insensitive=self._case_insensitive
                        )
                    )
            else:
                search = (
                    FluentSearch()
                    .select(include_archived=True)
                    .where(Asset.QUALIFIED_NAME.within(values=qualified_names))
                )
            results = search.page_size(
                max(self._max_size * 2, DSL.__fields__.get("size").default)  # type: ignore[union-attr]
            ).execute(client=self._client)  # type: ignore[arg-type]

            for asset in results:
                asset_id = AssetIdentity(
                    type_name=asset.type_name,
                    qualified_name=asset.qualified_name or "",
                    case_insensitive=self._case_insensitive,
                )
                found[str(asset_id)] = asset.qualified_name or ""

            for asset in self._batch:
                asset_id = AssetIdentity(
                    type_name=asset.type_name,
                    qualified_name=asset.qualified_name or "",
                    case_insensitive=self._case_insensitive,
                )
                # If found, with a type match, go ahead and update it
                if str(asset_id) in found:
                    # Replace the actual qualifiedName on the asset before adding it to the batch
                    # in case it matched case-insensitively, we need the proper case-sensitive name we
                    # found to ensure it's an update, not a create)
                    self.add_fuzzy_matched(
                        asset=asset,
                        actual_qn=found.get(str(asset_id), ""),
                        revised=revised,
                    )
                elif (
                    self._table_view_agnostic
                    and asset.type_name in self._TABLE_LEVEL_ASSETS
                ):
                    # If found as a different (but acceptable) type, update that instead
                    as_table = AssetIdentity(
                        type_name=Table.__name__,
                        qualified_name=asset.qualified_name or "",
                        case_insensitive=self._case_insensitive,
                    )
                    as_view = AssetIdentity(
                        type_name=View.__name__,
                        qualified_name=asset.qualified_name or "",
                        case_insensitive=self._case_insensitive,
                    )
                    as_materialized_view = AssetIdentity(
                        type_name=MaterialisedView.__name__,
                        qualified_name=asset.qualified_name or "",
                        case_insensitive=self._case_insensitive,
                    )

                    if str(as_table) in found:
                        self.add_fuzzy_matched(
                            asset=asset,
                            actual_qn=found.get(str(as_table), ""),
                            revised=revised,
                            type_name=Table.__name__,
                        )
                    elif str(as_view) in found:
                        self.add_fuzzy_matched(
                            asset=asset,
                            actual_qn=found.get(str(as_view), ""),
                            revised=revised,
                            type_name=View.__name__,
                        )
                    elif str(as_materialized_view) in found:
                        self.add_fuzzy_matched(
                            asset=asset,
                            actual_qn=found.get(str(as_materialized_view), ""),
                            revised=revised,
                            type_name=MaterialisedView.__name__,
                        )
                    elif self._creation_handling == AssetCreationHandling.PARTIAL:
                        # Still create it (partial), if not found
                        # and partial asset creation is allowed
                        self.add_partial_asset(asset, revised)
                    elif self._creation_handling == AssetCreationHandling.FULL:
                        # Still create it (full), if not found
                        # and full asset creation is allowed
                        revised.append(asset)
                    else:
                        # Otherwise, if it still does not match any
                        # fallback and cannot be created, skip it
                        self.__track(self._skipped, asset)
                        self._num_skipped += 1
                elif self._creation_handling == AssetCreationHandling.PARTIAL:
                    # Append `is_partial=True` onto the asset
                    # before adding it to the batch, to ensure only
                    # a partial (and not a full) asset is created
                    self.add_partial_asset(asset, revised)
                else:
                    self.__track(self._skipped, asset)
                    self._num_skipped += 1
        else:
            # Otherwise create it (full)
            revised = self._batch.copy()

        if revised:
            try:
                if self._custom_metadata_handling == CustomMetadataHandling.IGNORE:
                    response = self._client.asset.save(
                        revised, replace_atlan_tags=self._replace_atlan_tags
                    )
                elif (
                    self._custom_metadata_handling
                    == CustomMetadataHandling.OVERWRITE
                ):
                    response = self._client.asset.save_replacing_cm(
                        revised, replace_atlan_tags=self._replace_atlan_tags
                    )
                elif self._custom_metadata_handling == CustomMetadataHandling.MERGE:
                    response = self._client.asset.save_merging_cm(
                        revised, replace_atlan_tags=self._replace_atlan_tags
                    )
                else:
                    raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
                        self._custom_metadata_handling,
                        "CustomMetadataHandling.IGNORE, CustomMetadataHandling.OVERWRITE "
                        "or CustomMetadataHandling.MERGE",
                    )
            except AtlanError as er:
                if self._capture_failures:
                    self._failures.append(
                        FailedBatch(failed_assets=self._batch, failure_reason=er)
                    )
                else:
                    raise er
            self._batch = []
            response and self._track_response(response, revised)
    return response

CategoryHierarchy(top_level: Set[str], stub_dict: Dict[str, AtlasGlossaryCategory])

Source code in pyatlan/client/asset.py
def __init__(
    self, top_level: Set[str], stub_dict: Dict[str, AtlasGlossaryCategory]
):
    self._top_level = top_level
    self._root_categories: list = []
    self._categories: Dict[str, AtlasGlossaryCategory] = {}
    self._build_category_dict(stub_dict)
    self._bfs_list: List[AtlasGlossaryCategory] = []
    self._dfs_list: List[AtlasGlossaryCategory] = []
Attributes
breadth_first: List[AtlasGlossaryCategory] property

Retrieve all the categories in the hierarchy in breadth-first traversal order.

:returns: all categories in breadth-first order

depth_first: List[AtlasGlossaryCategory] property

Retrieve all the categories in the hierarchy in depth-first traversal order.

:returns: all categories in depth-first order

root_categories: List[AtlasGlossaryCategory] property

Retrieve only the root-level categories (those with no parents).

:returns: the root-level categories of the Glossary

Functions
get_category(guid: str) -> AtlasGlossaryCategory

Retrieve a specific category from anywhere in the hierarchy by its unique identifier (GUID).

:param guid: guid of the category to retrieve :returns: the requested category

Source code in pyatlan/client/asset.py
@validate_arguments
def get_category(self, guid: str) -> AtlasGlossaryCategory:
    """
    Retrieve a specific category from anywhere in the hierarchy by its unique identifier (GUID).

    :param guid: guid of the category to retrieve
    :returns: the requested category
    """
    return self._categories[guid]

FailedBatch(failed_assets: List[Asset], failure_reason: Exception)

Internal class to capture batch failures.

Source code in pyatlan/client/asset.py
def __init__(self, failed_assets: List[Asset], failure_reason: Exception):
    self.failed_assets = failed_assets
    self.failure_reason = failure_reason

IndexSearchResults(client: ApiCaller, criteria: IndexSearchRequest, start: int, size: int, count: int, assets: List[Asset], aggregations: Optional[Aggregations], bulk: bool = False)

Bases: SearchResults, Iterable

Captures the response from a search against Atlan. Also provides the ability to iteratively page through results, without needing to track or re-run the original query.

Source code in pyatlan/client/asset.py
def __init__(
    self,
    client: ApiCaller,
    criteria: IndexSearchRequest,
    start: int,
    size: int,
    count: int,
    assets: List[Asset],
    aggregations: Optional[Aggregations],
    bulk: bool = False,
):
    super().__init__(
        client,
        INDEX_SEARCH,
        criteria,
        start,
        size,
        assets,
    )
    self._count = count
    self._approximate_count = count
    self._aggregations = aggregations
    self._bulk = bulk
Functions
next_page(start=None, size=None) -> bool

Indicates whether there is a next page of results.

:returns: True if there is a next page of results, otherwise False

Source code in pyatlan/client/asset.py
def next_page(self, start=None, size=None) -> bool:
    """
    Indicates whether there is a next page of results.

    :returns: True if there is a next page of results, otherwise False
    """
    self._start = start or self._start + self._size
    is_bulk_search = (
        self._bulk or self._approximate_count > self._MASS_EXTRACT_THRESHOLD
    )
    if size:
        self._size = size
    if is_bulk_search:
        # Used in the "timestamp-based" paging approach
        # to check if `asset.guid` has already been processed
        # in a previous page of results.
        # If it has,then exclude it from the current results;
        # otherwise, we may encounter duplicate asset records.
        self._processed_guids.update(
            asset.guid for asset in self._assets if asset is not None
        )
    return self._get_next_page() if self._assets else False
presorted_by_timestamp(sorts: List[SortItem]) -> bool staticmethod

Indicates whether the sort options prioritize creation-time in ascending order as the first sorting key (True) or anything else (False).

:param sorts: list of sorting options :returns: True if the sorting options have creation time and ascending as the first option

Source code in pyatlan/client/asset.py
@staticmethod
def presorted_by_timestamp(sorts: List[SortItem]) -> bool:
    """
    Indicates whether the sort options prioritize
    creation-time in ascending order as the first
    sorting key (`True`) or anything else (`False`).

    :param sorts: list of sorting options
    :returns: `True` if the sorting options have
    creation time and ascending as the first option
    """
    return (
        isinstance(sorts, list)
        and len(sorts) > 0
        and isinstance(sorts[0], SortItem)
        and sorts[0].field == Asset.CREATE_TIME.internal_field_name
        and sorts[0].order == SortOrder.ASCENDING
    )
sort_by_timestamp_first(sorts: List[SortItem]) -> List[SortItem] staticmethod

Rewrites the sorting options to ensure that sorting by creation time, ascending, is the top priority. Adds this condition if it does not already exist, or moves it up to the top sorting priority if it does already exist in the list.

:param sorts: list of sorting options :returns: sorting options, making sorting by creation time in ascending order the top priority

Source code in pyatlan/client/asset.py
@staticmethod
def sort_by_timestamp_first(sorts: List[SortItem]) -> List[SortItem]:
    """
    Rewrites the sorting options to ensure that
    sorting by creation time, ascending, is the top
    priority. Adds this condition if it does not
    already exist, or moves it up to the top sorting
    priority if it does already exist in the list.

    :param sorts: list of sorting options
    :returns: sorting options, making sorting by
    creation time in ascending order the top priority
    """
    creation_asc_sort = [Asset.CREATE_TIME.order(SortOrder.ASCENDING)]

    if not sorts:
        return creation_asc_sort

    rewritten_sorts = [
        sort
        for sort in sorts
        if (not sort.field) or (sort.field != Asset.CREATE_TIME.internal_field_name)
    ]
    return creation_asc_sort + rewritten_sorts

LineageListResults(client: ApiCaller, criteria: LineageListRequest, start: int, size: int, has_more: bool, assets: List[Asset])

Bases: SearchResults, Iterable

Captures the response from a lineage retrieval against Atlan. Also provides the ability to iteratively page through results, without needing to track or re-run the original query.

Source code in pyatlan/client/asset.py
def __init__(
    self,
    client: ApiCaller,
    criteria: LineageListRequest,
    start: int,
    size: int,
    has_more: bool,
    assets: List[Asset],
):
    super().__init__(client, GET_LINEAGE_LIST, criteria, start, size, assets)
    self._has_more = has_more

SearchResults(client: ApiCaller, endpoint: API, criteria: SearchRequest, start: int, size: int, assets: List[Asset])

Bases: ABC, Iterable

Abstract class that encapsulates results returned by various searches.

Source code in pyatlan/client/asset.py
def __init__(
    self,
    client: ApiCaller,
    endpoint: API,
    criteria: SearchRequest,
    start: int,
    size: int,
    assets: List[Asset],
):
    self._client = client
    self._endpoint = endpoint
    self._criteria = criteria
    self._start = start
    self._size = size
    self._assets = assets
    self._processed_guids: Set[str] = set()
    self._first_record_creation_time = -2
    self._last_record_creation_time = -2
Functions
current_page() -> List[Asset]

Retrieve the current page of results.

:returns: list of assets on the current page of results

Source code in pyatlan/client/asset.py
def current_page(self) -> List[Asset]:
    """
    Retrieve the current page of results.

    :returns: list of assets on the current page of results
    """
    return self._assets
next_page(start=None, size=None) -> bool

Indicates whether there is a next page of results.

:returns: True if there is a next page of results, otherwise False

Source code in pyatlan/client/asset.py
def next_page(self, start=None, size=None) -> bool:
    """
    Indicates whether there is a next page of results.

    :returns: True if there is a next page of results, otherwise False
    """
    self._start = start or self._start + self._size
    if size:
        self._size = size
    return self._get_next_page() if self._assets else False

Audit Client

pyatlan.client.audit

Classes

AuditClient(client: ApiCaller)

This class can be used to configure and run a search against Atlan's activity log. This class does not need to be instantiated directly but can be obtained through the audit property of AtlanClient.

Source code in pyatlan/client/audit.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
search(criteria: AuditSearchRequest, bulk=False) -> AuditSearchResults

Search for assets using the provided criteria. Note: if the number of results exceeds the predefined threshold (10,000 assets) this will be automatically converted into an audit bulk search.

:param criteria: detailing the search query, parameters, and so on to run :param bulk: whether to run the search to retrieve assets that match the supplied criteria, for large numbers of results (> 10,000), defaults to False. Note: this will reorder the results (based on creation timestamp) in order to iterate through a large number (more than 10,000) results. :raises InvalidRequestError:

- if audit bulk search is enabled (`bulk=True`) and any
  user-specified sorting options are found in the search request.
- if audit bulk search is disabled (`bulk=False`) and the number of results
  exceeds the predefined threshold (i.e: `10,000` assets)
  and any user-specified sorting options are found in the search request.

:raises AtlanError: on any API communication issue :returns: the results of the search

Source code in pyatlan/client/audit.py
@validate_arguments
def search(self, criteria: AuditSearchRequest, bulk=False) -> AuditSearchResults:
    """
    Search for assets using the provided criteria.
    `Note:` if the number of results exceeds the predefined threshold
    (10,000 assets) this will be automatically converted into an audit `bulk` search.

    :param criteria: detailing the search query, parameters, and so on to run
    :param bulk: whether to run the search to retrieve assets that match the supplied criteria,
    for large numbers of results (> `10,000`), defaults to `False`. Note: this will reorder the results
    (based on creation timestamp) in order to iterate through a large number (more than `10,000`) results.
    :raises InvalidRequestError:

        - if audit bulk search is enabled (`bulk=True`) and any
          user-specified sorting options are found in the search request.
        - if audit bulk search is disabled (`bulk=False`) and the number of results
          exceeds the predefined threshold (i.e: `10,000` assets)
          and any user-specified sorting options are found in the search request.

    :raises AtlanError: on any API communication issue
    :returns: the results of the search
    """
    # Prepare request using shared logic
    endpoint, request_obj = AuditSearch.prepare_request(criteria, bulk)

    # Execute API call
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)

    # Process response using shared logic
    response = AuditSearch.process_response(raw_json)

    # Check if we need to convert to bulk search using shared logic
    if AuditSearch.check_for_bulk_search(
        response["count"], criteria, bulk, AuditSearchResults
    ):
        # Recursive call with updated criteria
        return self.search(criteria)

    # Create and return search results
    return AuditSearchResults(
        client=self._client,
        criteria=criteria,
        start=criteria.dsl.from_,
        size=criteria.dsl.size,
        count=response["count"],
        entity_audits=response["entity_audits"],
        bulk=bulk,
        aggregations=response["aggregations"],
    )

File Client

pyatlan.client.file

Classes

FileClient(client: ApiCaller)

A client for operating on Atlan's tenant object storage.

Source code in pyatlan/client/file.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
download_file(presigned_url: str, file_path: str) -> str

Downloads a file from Atlan's tenant object storage.

:param presigned_url: any valid presigned URL. :param file_path: path to the file where you want to download the file. :raises InvalidRequestException: if unable to download the file. :raises AtlanError: on any error during API invocation. :returns: full path to the downloaded file.

Source code in pyatlan/client/file.py
@validate_arguments
def download_file(
    self,
    presigned_url: str,
    file_path: str,
) -> str:
    """
    Downloads a file from Atlan's tenant object storage.

    :param presigned_url: any valid presigned URL.
    :param file_path: path to the file where you want to download the file.
    :raises InvalidRequestException: if unable to download the file.
    :raises AtlanError: on any error during API invocation.
    :returns: full path to the downloaded file.
    """
    # Prepare request using shared logic
    endpoint = FileDownload.prepare_request(presigned_url)

    # Make API call and return result
    return self._client._presigned_url_file_download(
        file_path=file_path, api=endpoint
    )
generate_presigned_url(request: PresignedURLRequest) -> str

Generates a presigned URL based on Atlan's tenant object store.

:param request: instance containing object key, expiry, and method (PUT: upload, GET: download). :raises AtlanError: on any error during API invocation. :returns: a response object containing a presigned URL with its cloud provider.

Source code in pyatlan/client/file.py
@validate_arguments
def generate_presigned_url(self, request: PresignedURLRequest) -> str:
    """
    Generates a presigned URL based on Atlan's tenant object store.

    :param request: instance containing object key,
    expiry, and method (PUT: upload, GET: download).
    :raises AtlanError: on any error during API invocation.
    :returns: a response object containing a presigned URL with its cloud provider.
    """
    # Prepare request using shared logic
    endpoint, request_obj = FilePresignedUrl.prepare_request(request)

    # Make API call
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)

    # Process response using shared logic
    return FilePresignedUrl.process_response(raw_json)
upload_file(presigned_url: str, file_path: str) -> None

Uploads a file to Atlan's object storage.

:param presigned_url: any valid presigned URL. :param file_path: path to the file to be uploaded. :raises AtlanError: on any error during API invocation. :raises InvalidRequestException: if the upload file path is invalid, or when the presigned URL cloud provider is unsupported.

Source code in pyatlan/client/file.py
@validate_arguments
def upload_file(self, presigned_url: str, file_path: str) -> None:
    """
    Uploads a file to Atlan's object storage.

    :param presigned_url: any valid presigned URL.
    :param file_path: path to the file to be uploaded.
    :raises AtlanError: on any error during API invocation.
    :raises InvalidRequestException: if the upload file path is invalid,
    or when the presigned URL cloud provider is unsupported.
    """
    # Validate and open file using shared logic
    upload_file = FileUpload.validate_file_path(file_path)

    # Identify cloud provider using shared logic
    provider = FileUpload.identify_cloud_provider(presigned_url)

    # Prepare request based on provider using shared logic
    if provider == "s3":
        endpoint = FileUpload.prepare_s3_request(presigned_url)
        return self._client._s3_presigned_url_file_upload(
            upload_file=upload_file, api=endpoint
        )
    elif provider == "azure_blob":
        endpoint = FileUpload.prepare_azure_request(presigned_url)
        return self._client._azure_blob_presigned_url_file_upload(
            upload_file=upload_file, api=endpoint
        )
    elif provider == "gcs":
        endpoint = FileUpload.prepare_gcs_request(presigned_url)
        return self._client._gcs_presigned_url_file_upload(
            upload_file=upload_file, api=endpoint
        )

Group Client

pyatlan.client.group

Classes

GroupClient(client: ApiCaller)

This class can be used to retrieve information about groups. This class does not need to be instantiated directly but can be obtained through the group property of AtlanClient.

Source code in pyatlan/client/group.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
create(group: AtlanGroup, user_ids: Optional[List[str]] = None) -> CreateGroupResponse

Create a new group.

:param group: details of the new group :param user_ids: list of unique identifiers (GUIDs) of users to associate with the group :returns: details of the created group and user association :raises AtlanError: on any API communication issue

Source code in pyatlan/client/group.py
@validate_arguments
def create(
    self,
    group: AtlanGroup,
    user_ids: Optional[List[str]] = None,
) -> CreateGroupResponse:
    """
    Create a new group.

    :param group: details of the new group
    :param user_ids: list of unique identifiers (GUIDs) of users to associate with the group
    :returns: details of the created group and user association
    :raises AtlanError: on any API communication issue
    """
    # Prepare request using shared logic
    endpoint, request_obj = GroupCreate.prepare_request(group, user_ids)

    # Make API call
    raw_json = self._client._call_api(
        endpoint, request_obj=request_obj, exclude_unset=True
    )

    # Process response using shared logic
    return GroupCreate.process_response(raw_json)
get(limit: Optional[int] = 20, post_filter: Optional[str] = None, sort: Optional[str] = None, count: bool = True, offset: int = 0, columns: Optional[List[str]] = None) -> GroupResponse

Retrieves a GroupResponse object which contains a list of the groups defined in Atlan.

:param limit: maximum number of results to be returned :param post_filter: which groups to retrieve :param sort: property by which to sort the results :param count: whether to return the total number of records (True) or not (False) :param offset: starting point for results to return, for paging :param columns: provides columns projection support for groups endpoint :returns: a GroupResponse object which contains a list of groups that match the provided criteria :raises AtlanError: on any API communication issue

Source code in pyatlan/client/group.py
@validate_arguments
def get(
    self,
    limit: Optional[int] = 20,
    post_filter: Optional[str] = None,
    sort: Optional[str] = None,
    count: bool = True,
    offset: int = 0,
    columns: Optional[List[str]] = None,
) -> GroupResponse:
    """
    Retrieves a GroupResponse object which contains a list of the groups defined in Atlan.

    :param limit: maximum number of results to be returned
    :param post_filter: which groups to retrieve
    :param sort: property by which to sort the results
    :param count: whether to return the total number of records (True) or not (False)
    :param offset: starting point for results to return, for paging
    :param columns: provides columns projection support for groups endpoint
    :returns: a GroupResponse object which contains a list of groups that match the provided criteria
    :raises AtlanError: on any API communication issue
    """
    # Prepare request using shared logic
    endpoint, request = GroupGet.prepare_request(
        limit, post_filter, sort, count, offset, columns
    )

    # Make API call
    raw_json = self._client._call_api(
        api=endpoint, query_params=request.query_params
    )

    # Process response using shared logic
    response_data = GroupGet.process_response(
        raw_json, self._client, endpoint, request
    )
    return GroupResponse(**response_data)
get_all(limit: int = 20, offset: int = 0, sort: Optional[str] = 'name', columns: Optional[List[str]] = None) -> GroupResponse

Retrieve a GroupResponse object containing a list of all groups defined in Atlan.

:param limit: maximum number of results to be returned :param offset: starting point for the list of groups when paging :param sort: property by which to sort the results, by default : name :param columns: provides columns projection support for groups endpoint :returns: a GroupResponse object with all groups based on the parameters; results are iterable.

Source code in pyatlan/client/group.py
@validate_arguments
def get_all(
    self,
    limit: int = 20,
    offset: int = 0,
    sort: Optional[str] = "name",
    columns: Optional[List[str]] = None,
) -> GroupResponse:
    """
    Retrieve a GroupResponse object containing a list of all groups defined in Atlan.

    :param limit: maximum number of results to be returned
    :param offset: starting point for the list of groups when paging
    :param sort: property by which to sort the results, by default : name
    :param columns: provides columns projection support for groups endpoint
    :returns: a GroupResponse object with all groups based on the parameters; results are iterable.
    """
    response: GroupResponse = self.get(
        offset=offset, limit=limit, sort=sort, columns=columns
    )
    return response
get_by_name(alias: str, limit: int = 20, offset: int = 0) -> Optional[GroupResponse]

Retrieves a GroupResponse object containing a list of groups that match the specified string. (This could include a complete group name, in which case there should be at most a single item in the returned list, or could be a partial group name to retrieve all groups with that naming convention.)

:param alias: name (as it appears in the UI) on which to filter the groups :param limit: maximum number of groups to retrieve :param offset: starting point for the list of groups when paging :returns: a GroupResponse object containing a list of groups whose UI names include the given string; the results are iterable.

Source code in pyatlan/client/group.py
@validate_arguments
def get_by_name(
    self,
    alias: str,
    limit: int = 20,
    offset: int = 0,
) -> Optional[GroupResponse]:
    """
    Retrieves a GroupResponse object containing a list of groups that match the specified string.
    (This could include a complete group name, in which case there should be at most
    a single item in the returned list, or could be a partial group name to retrieve
    all groups with that naming convention.)

    :param alias: name (as it appears in the UI) on which to filter the groups
    :param limit: maximum number of groups to retrieve
    :param offset: starting point for the list of groups when paging
    :returns: a GroupResponse object containing a list of groups whose UI names include the given string; the results are iterable.
    """
    response: GroupResponse = self.get(
        offset=offset,
        limit=limit,
        post_filter='{"$and":[{"alias":{"$ilike":"%' + alias + '%"}}]}',
    )
    return response
get_members(guid: str, request: Optional[UserRequest] = None) -> UserResponse

Retrieves a UserResponse object which contains a list of the members (users) of a group.

:param guid: unique identifier (GUID) of the group from which to retrieve members :param request: request containing details about which members to retrieve :returns: a UserResponse object which contains a list of users that are members of the group :raises AtlanError: on any API communication issue

Source code in pyatlan/client/group.py
@validate_arguments
def get_members(
    self, guid: str, request: Optional[UserRequest] = None
) -> UserResponse:
    """
    Retrieves a UserResponse object which contains a list of the members (users) of a group.

    :param guid: unique identifier (GUID) of the group from which to retrieve members
    :param request: request containing details about which members to retrieve
    :returns: a UserResponse object which contains a list of users that are members of the group
    :raises AtlanError: on any API communication issue
    """
    # Prepare request using shared logic
    endpoint, user_request = GroupGetMembers.prepare_request(guid, request)

    # Make API call
    raw_json = self._client._call_api(
        api=endpoint,
        query_params=user_request.query_params,
    )

    # Process response using shared logic
    response_data = GroupGetMembers.process_response(
        raw_json, self._client, endpoint, user_request
    )
    return UserResponse(**response_data)
purge(guid: str) -> None

Delete a group.

:param guid: unique identifier (GUID) of the group to delete :raises AtlanError: on any API communication issue

Source code in pyatlan/client/group.py
@validate_arguments
def purge(
    self,
    guid: str,
) -> None:
    """
    Delete a group.

    :param guid: unique identifier (GUID) of the group to delete
    :raises AtlanError: on any API communication issue
    """
    # Prepare request using shared logic
    endpoint = GroupPurge.prepare_request(guid)

    # Make API call
    self._client._call_api(endpoint)
remove_users(guid: str, user_ids: Optional[List[str]] = None) -> None

Remove one or more users from a group.

:param guid: unique identifier (GUID) of the group from which to remove users :param user_ids: unique identifiers (GUIDs) of the users to remove from the group :raises AtlanError: on any API communication issue

Source code in pyatlan/client/group.py
@validate_arguments
def remove_users(self, guid: str, user_ids: Optional[List[str]] = None) -> None:
    """
    Remove one or more users from a group.

    :param guid: unique identifier (GUID) of the group from which to remove users
    :param user_ids: unique identifiers (GUIDs) of the users to remove from the group
    :raises AtlanError: on any API communication issue
    """
    # Prepare request using shared logic
    endpoint, request_obj = GroupRemoveUsers.prepare_request(guid, user_ids)

    # Make API call
    self._client._call_api(
        endpoint,
        request_obj=request_obj,
        exclude_unset=True,
    )
update(group: AtlanGroup) -> None

Update a group. Note that the provided 'group' must have its id populated.

:param group: details to update on the group :raises AtlanError: on any API communication issue

Source code in pyatlan/client/group.py
@validate_arguments
def update(
    self,
    group: AtlanGroup,
) -> None:
    """
    Update a group. Note that the provided 'group' must have its id populated.

    :param group: details to update on the group
    :raises AtlanError: on any API communication issue
    """
    # Prepare request using shared logic
    endpoint = GroupUpdate.prepare_request(group)

    # Make API call
    self._client._call_api(
        endpoint,
        request_obj=group,
        exclude_unset=True,
    )

Role Client

pyatlan.client.role

Classes

RoleClient(client: ApiCaller)

This class can be used to retrieve information about roles. This class does not need to be instantiated directly but can be obtained through the role property of AtlanClient.

Source code in pyatlan/client/role.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
get(limit: int, post_filter: Optional[str] = None, sort: Optional[str] = None, count: bool = True, offset: int = 0) -> RoleResponse

Retrieves a RoleResponse which contains a list of the roles defined in Atlan.

:param limit: maximum number of results to be returned :param post_filter: which roles to retrieve :param sort: property by which to sort the results :param count: whether to return the total number of records (True) or not (False) :param offset: starting point for results to return, for paging :returns: None or a RoleResponse object which contains list of roles that match the provided criteria :raises AtlanError: on any API communication issue

Source code in pyatlan/client/role.py
@validate_arguments
def get(
    self,
    limit: int,
    post_filter: Optional[str] = None,
    sort: Optional[str] = None,
    count: bool = True,
    offset: int = 0,
) -> RoleResponse:
    """
    Retrieves a RoleResponse which contains a list of the roles defined in Atlan.

    :param limit: maximum number of results to be returned
    :param post_filter: which roles to retrieve
    :param sort: property by which to sort the results
    :param count: whether to return the total number of records (True) or not (False)
    :param offset: starting point for results to return, for paging
    :returns: None or a RoleResponse object which contains list of roles that match the provided criteria
    :raises AtlanError: on any API communication issue
    """
    # Prepare request using shared logic
    endpoint, query_params = RoleGet.prepare_request(
        limit=limit,
        post_filter=post_filter,
        sort=sort,
        count=count,
        offset=offset,
    )

    # Execute API call
    raw_json = self._client._call_api(endpoint, query_params)

    # Process response using shared logic
    return RoleGet.process_response(raw_json)
get_all() -> RoleResponse

Retrieves a RoleResponse which contains a list of all the roles defined in Atlan.

:returns: a RoleResponse which contains a list of all the roles defined in Atlan :raises AtlanError: on any API communication issue

Source code in pyatlan/client/role.py
def get_all(self) -> RoleResponse:
    """
    Retrieves a RoleResponse which contains a list of all the roles defined in Atlan.

    :returns:  a RoleResponse which contains a list of all the roles defined in Atlan
    :raises AtlanError: on any API communication issue
    """
    # Prepare request using shared logic
    endpoint = RoleGetAll.prepare_request()

    # Execute API call
    raw_json = self._client._call_api(endpoint)

    # Process response using shared logic
    return RoleGetAll.process_response(raw_json)

Token Client

pyatlan.client.token

Classes

TokenClient(client: ApiCaller)

This class can be used to retrieve information pertaining to API tokens. This class does not need to be instantiated directly but can be obtained through the token property of AtlanClient.

Source code in pyatlan/client/token.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
create(display_name: str, description: str = '', personas: Optional[Set[str]] = None, validity_seconds: int = -1) -> ApiToken

Create a new API token with the provided settings.

:param display_name: human-readable name for the API token :param description: optional explanation of the API token :param personas: qualified_names of personas that should be linked to the token :param validity_seconds: time in seconds after which the token should expire (negative numbers are treated as infinite) :returns: the created API token :raises AtlanError: on any API communication issue

Source code in pyatlan/client/token.py
@validate_arguments
def create(
    self,
    display_name: str,
    description: str = "",
    personas: Optional[Set[str]] = None,
    validity_seconds: int = -1,
) -> ApiToken:
    """
    Create a new API token with the provided settings.

    :param display_name: human-readable name for the API token
    :param description: optional explanation of the API token
    :param personas: qualified_names of personas that should  be linked to the token
    :param validity_seconds: time in seconds after which the token should expire (negative numbers are treated as
                             infinite)
    :returns: the created API token
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = TokenCreate.prepare_request(
        display_name, description, personas, validity_seconds
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return TokenCreate.process_response(raw_json)
get(limit: Optional[int] = None, post_filter: Optional[str] = None, sort: Optional[str] = None, count: bool = True, offset: int = 0) -> ApiTokenResponse

Retrieves an ApiTokenResponse which contains a list of API tokens defined in Atlan.

:param limit: maximum number of results to be returned :param post_filter: which API tokens to retrieve :param sort: property by which to sort the results :param count: whether to return the total number of records (True) or not (False) :param offset: starting point for results to return, for paging :returns: an ApiTokenResponse which contains a list of API tokens that match the provided criteria :raises AtlanError: on any API communication issue

Source code in pyatlan/client/token.py
@validate_arguments
def get(
    self,
    limit: Optional[int] = None,
    post_filter: Optional[str] = None,
    sort: Optional[str] = None,
    count: bool = True,
    offset: int = 0,
) -> ApiTokenResponse:
    """
    Retrieves an ApiTokenResponse which contains a list of API tokens defined in Atlan.

    :param limit: maximum number of results to be returned
    :param post_filter: which API tokens to retrieve
    :param sort: property by which to sort the results
    :param count: whether to return the total number of records (True) or not (False)
    :param offset: starting point for results to return, for paging
    :returns: an ApiTokenResponse which contains a list of API tokens that match the provided criteria
    :raises AtlanError: on any API communication issue
    """
    endpoint, query_params = TokenGet.prepare_request(
        limit, post_filter, sort, count, offset
    )
    raw_json = self._client._call_api(endpoint, query_params)
    return TokenGet.process_response(raw_json)
get_by_guid(guid: str) -> Optional[ApiToken]

Retrieves the API token with a unique ID (GUID) that exactly matches the provided string.

:param guid: unique identifier by which to retrieve the API token :returns: the API token whose clientId matches the provided string, or None if there is none

Source code in pyatlan/client/token.py
@validate_arguments
def get_by_guid(self, guid: str) -> Optional[ApiToken]:
    """
    Retrieves the API token with a unique ID (GUID) that exactly matches the provided string.

    :param guid: unique identifier by which to retrieve the API token
    :returns: the API token whose clientId matches the provided string, or None if there is none
    """
    endpoint, query_params = TokenGetByGuid.prepare_request(guid)
    raw_json = self._client._call_api(endpoint, query_params)
    return TokenGetByGuid.process_response(raw_json)
get_by_id(client_id: str) -> Optional[ApiToken]

Retrieves the API token with a client ID that exactly matches the provided string.

:param client_id: unique client identifier by which to retrieve the API token :returns: the API token whose clientId matches the provided string, or None if there is none

Source code in pyatlan/client/token.py
@validate_arguments
def get_by_id(self, client_id: str) -> Optional[ApiToken]:
    """
    Retrieves the API token with a client ID that exactly matches the provided string.

    :param client_id: unique client identifier by which to retrieve the API token
    :returns: the API token whose clientId matches the provided string, or None if there is none
    """
    endpoint, query_params = TokenGetById.prepare_request(client_id)
    raw_json = self._client._call_api(endpoint, query_params)
    return TokenGetById.process_response(raw_json)
get_by_name(display_name: str) -> Optional[ApiToken]

Retrieves the API token with a name that exactly matches the provided string.

:param display_name: name (as it appears in the UI) by which to retrieve the API token :returns: the API token whose name (in the UI) matches the provided string, or None if there is none

Source code in pyatlan/client/token.py
@validate_arguments
def get_by_name(self, display_name: str) -> Optional[ApiToken]:
    """
    Retrieves the API token with a name that exactly matches the provided string.

    :param display_name: name (as it appears in the UI) by which to retrieve the API token
    :returns: the API token whose name (in the UI) matches the provided string, or None if there is none
    """
    endpoint, query_params = TokenGetByName.prepare_request(display_name)
    raw_json = self._client._call_api(endpoint, query_params)
    return TokenGetByName.process_response(raw_json)
purge(guid: str) -> None

Delete (purge) the specified API token.

:param guid: unique identifier (GUID) of the API token to delete :raises AtlanError: on any API communication issue

Source code in pyatlan/client/token.py
@validate_arguments
def purge(self, guid: str) -> None:
    """
    Delete (purge) the specified API token.

    :param guid: unique identifier (GUID) of the API token to delete
    :raises AtlanError: on any API communication issue
    """
    endpoint, _ = TokenPurge.prepare_request(guid)
    self._client._call_api(endpoint)
update(guid: str, display_name: str, description: str = '', personas: Optional[Set[str]] = None) -> ApiToken

Update an existing API token with the provided settings.

:param guid: unique identifier (GUID) of the API token :param display_name: human-readable name for the API token :param description: optional explanation of the API token :param personas: qualified_names of personas that should be linked to the token, note that you MUST provide the complete list on any update (any not included in the list will be removed, so if you do not specify any personas then ALL personas will be unlinked from the API token) :returns: the created API token :raises AtlanError: on any API communication issue

Source code in pyatlan/client/token.py
@validate_arguments
def update(
    self,
    guid: str,
    display_name: str,
    description: str = "",
    personas: Optional[Set[str]] = None,
) -> ApiToken:
    """
    Update an existing API token with the provided settings.

    :param guid: unique identifier (GUID) of the API token
    :param display_name: human-readable name for the API token
    :param description: optional explanation of the API token
    :param personas: qualified_names of personas that should  be linked to the token, note that you MUST
                     provide the complete list on any update (any not included in the list will be removed,
                     so if you do not specify any personas then ALL personas will be unlinked from the API token)
    :returns: the created API token
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = TokenUpdate.prepare_request(
        guid, display_name, description, personas
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return TokenUpdate.process_response(raw_json)

User Client

pyatlan.client.user

Classes

UserClient(client: ApiCaller)

This class can be used to retrieve information pertaining to users. This class does not need to be instantiated directly but can be obtained through the user property of AtlanClient.

Source code in pyatlan/client/user.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
add_as_admin(asset_guid: str, impersonation_token: str) -> Optional[AssetMutationResponse]

Add the API token configured for the default client as an admin to the asset with the provided GUID. This is primarily useful for connections, to allow the API token to manage policies for the connection, and for query collections, to allow the API token to manage the queries in a collection or the collection itself.

:param asset_guid: unique identifier (GUID) of the asset to which we should add this API token as an admin :param impersonation_token: a bearer token for an actual user who is already an admin for the asset, NOT an API token :returns: a AssetMutationResponse which contains the results of the operation :raises NotFoundError: if the asset to which to add the API token as an admin cannot be found

Source code in pyatlan/client/user.py
@validate_arguments
def add_as_admin(
    self, asset_guid: str, impersonation_token: str
) -> Optional[AssetMutationResponse]:
    """
    Add the API token configured for the default client as an admin to the asset with the provided GUID.
    This is primarily useful for connections, to allow the API token to manage policies for the connection, and
    for query collections, to allow the API token to manage the queries in a collection or the collection itself.

    :param asset_guid: unique identifier (GUID) of the asset to which we should add this API token as an admin
    :param impersonation_token: a bearer token for an actual user who is already an admin for the asset,
                                NOT an API token
    :returns: a AssetMutationResponse which contains the results of the operation
    :raises NotFoundError: if the asset to which to add the API token as an admin cannot be found
    """

    return self._add_as(
        asset_guid=asset_guid,
        impersonation_token=impersonation_token,
        keyword_field=Asset.ADMIN_USERS,
    )
add_as_viewer(asset_guid: str, impersonation_token: str) -> Optional[AssetMutationResponse]

Add the API token configured for the default client as a viewer to the asset with the provided GUID. This is primarily useful for query collections, to allow the API token to view or run queries within the collection, but not make any changes to them.

:param asset_guid: unique identifier (GUID) of the asset to which we should add this API token as an admin :param impersonation_token: a bearer token for an actual user who is already an admin for the asset, NOT an API token :returns: a AssetMutationResponse which contains the results of the operation :raises NotFoundError: if the asset to which to add the API token as a viewer cannot be found

Source code in pyatlan/client/user.py
@validate_arguments
def add_as_viewer(
    self, asset_guid: str, impersonation_token: str
) -> Optional[AssetMutationResponse]:
    """
    Add the API token configured for the default client as a viewer to the asset with the provided GUID.
    This is primarily useful for query collections, to allow the API token to view or run queries within the
    collection, but not make any changes to them.

    :param asset_guid: unique identifier (GUID) of the asset to which we should add this API token as an admin
    :param impersonation_token: a bearer token for an actual user who is already an admin for the asset,
                                NOT an API token
    :returns: a AssetMutationResponse which contains the results of the operation
    :raises NotFoundError: if the asset to which to add the API token as a viewer cannot be found
    """

    return self._add_as(
        asset_guid=asset_guid,
        impersonation_token=impersonation_token,
        keyword_field=Asset.VIEWER_USERS,
    )
add_to_groups(guid: str, group_ids: List[str]) -> None

Add a user to one or more groups.

:param guid: unique identifier (GUID) of the user to add into groups :param group_ids: unique identifiers (GUIDs) of the groups to add the user into :raises AtlanError: on any API communication issue

Source code in pyatlan/client/user.py
@validate_arguments
def add_to_groups(
    self,
    guid: str,
    group_ids: List[str],
) -> None:
    """
    Add a user to one or more groups.

    :param guid: unique identifier (GUID) of the user to add into groups
    :param group_ids: unique identifiers (GUIDs) of the groups to add the user into
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = UserAddToGroups.prepare_request(guid, group_ids)
    self._client._call_api(endpoint, request_obj=request_obj, exclude_unset=True)
change_role(guid: str, role_id: str) -> None

Change the role of a user.

:param guid: unique identifier (GUID) of the user whose role should be changed :param role_id: unique identifier (GUID) of the role to move the user into :raises AtlanError: on any API communication issue

Source code in pyatlan/client/user.py
@validate_arguments
def change_role(
    self,
    guid: str,
    role_id: str,
) -> None:
    """
    Change the role of a user.

    :param guid: unique identifier (GUID) of the user whose role should be changed
    :param role_id: unique identifier (GUID) of the role to move the user into
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = UserChangeRole.prepare_request(guid, role_id)
    self._client._call_api(endpoint, request_obj=request_obj, exclude_unset=True)
create(users: List[AtlanUser], return_info: bool = False) -> Optional[UserResponse]

Create one or more new users.

:param users: the details of the new users :param return_info: whether to return the details of created users, defaults to False :raises AtlanError: on any API communication issue :returns: a UserResponse object which contains the list of details of created users if return_info is True, otherwise None

Source code in pyatlan/client/user.py
@validate_arguments
def create(
    self, users: List[AtlanUser], return_info: bool = False
) -> Optional[UserResponse]:
    """
    Create one or more new users.

    :param users: the details of the new users
    :param return_info: whether to return the details of created users, defaults to `False`
    :raises AtlanError: on any API communication issue
    :returns: a UserResponse object which contains the list of details of created users if `return_info` is `True`, otherwise `None`
    """
    endpoint, request_obj = UserCreate.prepare_request(users, self._client)
    self._client._call_api(endpoint, request_obj=request_obj, exclude_unset=True)
    if return_info:
        users_emails = [user.email for user in request_obj.users]
        return self.get_by_emails(emails=users_emails)
    return None
get(limit: Optional[int] = 20, post_filter: Optional[str] = None, sort: Optional[str] = None, count: bool = True, offset: int = 0) -> UserResponse

Retrieves a UserResponse which contains a list of users defined in Atlan.

:param limit: maximum number of results to be returned :param post_filter: which users to retrieve :param sort: property by which to sort the results :param count: whether to return the total number of records (True) or not (False) :param offset: starting point for results to return, for paging :returns: a UserResponse which contains a list of users that match the provided criteria :raises AtlanError: on any API communication issue

Source code in pyatlan/client/user.py
@validate_arguments
def get(
    self,
    limit: Optional[int] = 20,
    post_filter: Optional[str] = None,
    sort: Optional[str] = None,
    count: bool = True,
    offset: int = 0,
) -> UserResponse:
    """
    Retrieves a UserResponse which contains a list of users defined in Atlan.

    :param limit: maximum number of results to be returned
    :param post_filter: which users to retrieve
    :param sort: property by which to sort the results
    :param count: whether to return the total number of records (True) or not (False)
    :param offset: starting point for results to return, for paging
    :returns: a UserResponse which contains a list of users that match the provided criteria
    :raises AtlanError: on any API communication issue
    """
    endpoint, query_params = UserGet.prepare_request(
        limit, post_filter, sort, count, offset
    )
    raw_json = self._client._call_api(api=endpoint, query_params=query_params)

    # Build the request object for response processing

    request = UserRequest(
        post_filter=post_filter,
        limit=limit,
        sort=sort,
        count=count,
        offset=offset,
    )

    response_data = UserGet.process_response(
        raw_json, self._client, endpoint, request, offset, limit
    )
    return UserResponse(**response_data)
get_all(limit: int = 20, offset: int = 0, sort: Optional[str] = 'username') -> UserResponse

Retrieve a UserResponse object containing a list of all users defined in Atlan.

:param limit: maximum number of users to retrieve :param offset: starting point for the list of users when paging :param sort: property by which to sort the results, by default : username :returns: a UserResponse object with all users based on the parameters; results are iterable.

Source code in pyatlan/client/user.py
@validate_arguments
def get_all(
    self,
    limit: int = 20,
    offset: int = 0,
    sort: Optional[str] = "username",
) -> UserResponse:
    """
    Retrieve a UserResponse object containing a list of all users defined in Atlan.

    :param limit: maximum number of users to retrieve
    :param offset: starting point for the list of users when paging
    :param sort: property by which to sort the results, by default : `username`
    :returns: a UserResponse object with all users based on the parameters; results are iterable.
    """
    response: UserResponse = self.get(offset=offset, limit=limit, sort=sort)
    return response
get_by_email(email: str, limit: int = 20, offset: int = 0) -> Optional[UserResponse]

Retrieves a UserResponse object containing a list of users with email addresses that contain the provided email. (This could include a complete email address, in which case there should be at most a single item in the returned list, or could be a partial email address such as "@example.com" to retrieve all users with that domain in their email address.)

:param email: on which to filter the users :param limit: maximum number of users to retrieve :param offset: starting point for the list of users when pagin :returns: a UserResponse object containing a list of users whose email addresses contain the provided string

Source code in pyatlan/client/user.py
@validate_arguments
def get_by_email(
    self,
    email: str,
    limit: int = 20,
    offset: int = 0,
) -> Optional[UserResponse]:
    """
    Retrieves a UserResponse object containing a list of users with email addresses that contain the provided email.
    (This could include a complete email address, in which case there should be at
    most a single item in the returned list, or could be a partial email address
    such as "@example.com" to retrieve all users with that domain in their email
    address.)

    :param email: on which to filter the users
    :param limit: maximum number of users to retrieve
    :param offset: starting point for the list of users when pagin
    :returns: a UserResponse object containing a list of users whose email addresses contain the provided string
    """
    endpoint, query_params = UserGetByEmail.prepare_request(email, limit, offset)
    raw_json = self._client._call_api(api=endpoint, query_params=query_params)

    # Build the request object for response processing

    request = UserRequest(
        post_filter='{"email":{"$ilike":"%' + email + '%"}}',
        limit=limit,
        offset=offset,
    )

    response_data = UserGet.process_response(
        raw_json, self._client, endpoint, request, offset, limit
    )
    return UserResponse(**response_data)
get_by_emails(emails: List[str], limit: int = 20, offset: int = 0) -> Optional[UserResponse]

Retrieves a UserResponse object containing a list of users with email addresses that match the provided list of emails.

:param emails: list of email addresses to filter the users :param limit: maximum number of users to retrieve :param offset: starting point for the list of users when paginating :returns: a UserResponse object containing a list of users whose email addresses match the provided list

Source code in pyatlan/client/user.py
@validate_arguments
def get_by_emails(
    self,
    emails: List[str],
    limit: int = 20,
    offset: int = 0,
) -> Optional[UserResponse]:
    """
    Retrieves a UserResponse object containing a list of users with email addresses that match the provided list of emails.

    :param emails: list of email addresses to filter the users
    :param limit: maximum number of users to retrieve
    :param offset: starting point for the list of users when paginating
    :returns: a UserResponse object containing a list of users whose email addresses match the provided list
    """
    endpoint, query_params = UserGetByEmails.prepare_request(emails, limit, offset)
    raw_json = self._client._call_api(api=endpoint, query_params=query_params)

    # Build the request object for response processing

    email_filter = '{"email":{"$in":' + json.dumps(emails or [""]) + "}}"
    request = UserRequest(
        post_filter=email_filter,
        limit=limit,
        offset=offset,
    )

    response_data = UserGet.process_response(
        raw_json, self._client, endpoint, request, offset, limit
    )
    return UserResponse(**response_data)
get_by_username(username: str) -> Optional[AtlanUser]

Retrieves a user based on the username. (This attempts an exact match on username rather than a contains search.)

:param username: the username by which to find the user :returns: the with that username

Source code in pyatlan/client/user.py
@validate_arguments
def get_by_username(self, username: str) -> Optional[AtlanUser]:
    """
    Retrieves a user based on the username. (This attempts an exact match on username
    rather than a contains search.)

    :param username: the username by which to find the user
    :returns: the with that username
    """
    endpoint, query_params = UserGetByUsername.prepare_request(username)
    raw_json = self._client._call_api(api=endpoint, query_params=query_params)

    # Build the request object for response processing

    request = UserRequest(
        post_filter='{"username":"' + username + '"}',
        limit=5,
        offset=0,
    )

    response_data = UserGet.process_response(
        raw_json, self._client, endpoint, request, 0, 5
    )
    response = UserResponse(**response_data)
    return UserGetByUsername.process_response(response)
get_by_usernames(usernames: List[str], limit: int = 5, offset: int = 0) -> Optional[UserResponse]

Retrieves a UserResponse object containing a list of users based on their usernames.

:param usernames: the list of usernames by which to find the users :param limit: maximum number of users to retrieve :param offset: starting point for the list of users when paginating :returns: a UserResponse object containing list of users with the specified usernames

Source code in pyatlan/client/user.py
@validate_arguments
def get_by_usernames(
    self, usernames: List[str], limit: int = 5, offset: int = 0
) -> Optional[UserResponse]:
    """
    Retrieves a UserResponse object containing a list of users based on their usernames.

    :param usernames: the list of usernames by which to find the users
    :param limit: maximum number of users to retrieve
    :param offset: starting point for the list of users when paginating
    :returns: a UserResponse object containing list of users with the specified usernames
    """
    endpoint, query_params = UserGetByUsernames.prepare_request(
        usernames, limit, offset
    )
    raw_json = self._client._call_api(api=endpoint, query_params=query_params)

    # Build the request object for response processing

    username_filter = '{"username":{"$in":' + json.dumps(usernames or [""]) + "}}"
    request = UserRequest(
        post_filter=username_filter,
        limit=limit,
        offset=offset,
    )

    response_data = UserGet.process_response(
        raw_json, self._client, endpoint, request, offset, limit
    )
    return UserResponse(**response_data)
get_current() -> UserMinimalResponse

Retrieve the current user (representing the API token).

:returns: basic details about the current user (API token) :raises AtlanError: on any API communication issue

Source code in pyatlan/client/user.py
def get_current(
    self,
) -> UserMinimalResponse:
    """
    Retrieve the current user (representing the API token).

    :returns: basic details about the current user (API token)
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = UserGetCurrent.prepare_request()
    raw_json = self._client._call_api(endpoint, request_obj)
    return UserGetCurrent.process_response(raw_json)
get_groups(guid: str, request: Optional[GroupRequest] = None) -> GroupResponse

Retrieve the groups this user belongs to.

:param guid: unique identifier (GUID) of the user :param request: request containing details about which groups to retrieve :returns: a GroupResponse which contains the groups this user belongs to :raises AtlanError: on any API communication issue

Source code in pyatlan/client/user.py
@validate_arguments
def get_groups(
    self, guid: str, request: Optional[GroupRequest] = None
) -> GroupResponse:
    """
    Retrieve the groups this user belongs to.

    :param guid: unique identifier (GUID) of the user
    :param request: request containing details about which groups to retrieve
    :returns: a GroupResponse which contains the groups this user belongs to
    :raises AtlanError: on any API communication issue
    """
    endpoint, query_params = UserGetGroups.prepare_request(guid, request)
    raw_json = self._client._call_api(api=endpoint, query_params=query_params)

    if not request:
        request = GroupRequest()
    response_data = UserGetGroups.process_response(
        raw_json, self._client, endpoint, request
    )
    return GroupResponse(**response_data)
update(guid: str, user: AtlanUser) -> UserMinimalResponse

Update a user. Note: you can only update users that have already signed up to Atlan. Users that are only invited (but have not yet logged in) cannot be updated.

:param guid: unique identifier (GUID) of the user to update :param user: details to update on the user :returns: basic details about the updated user :raises AtlanError: on any API communication issue

Source code in pyatlan/client/user.py
@validate_arguments
def update(
    self,
    guid: str,
    user: AtlanUser,
) -> UserMinimalResponse:
    """
    Update a user.
    Note: you can only update users that have already signed up to Atlan. Users that are
    only invited (but have not yet logged in) cannot be updated.

    :param guid: unique identifier (GUID) of the user to update
    :param user: details to update on the user
    :returns: basic details about the updated user
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = UserUpdate.prepare_request(guid, user)
    raw_json = self._client._call_api(
        endpoint, request_obj=request_obj, exclude_unset=True
    )
    return UserUpdate.process_response(raw_json)

Typedef Client

pyatlan.client.typedef

Classes

TypeDefClient(client: ApiCaller)

This class can be used to retrieve information pertaining to TypeDefs. This class does not need to be instantiated directly but can be obtained through the typedef property of AtlanClient.

Source code in pyatlan/client/typedef.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
create(typedef: TypeDef) -> TypeDefResponse

Create a new type definition in Atlan. Note: only custom metadata, enumerations (options), and Atlan tag type definitions are currently supported. Furthermore, if any of these are created their respective cache will be force-refreshed.

:param typedef: type definition to create :returns: the resulting type definition that was created :raises InvalidRequestError: if the typedef you are trying to create is not one of the allowed types :raises AtlanError: on any API communication issue

Source code in pyatlan/client/typedef.py
@validate_arguments
def create(self, typedef: TypeDef) -> TypeDefResponse:
    """
    Create a new type definition in Atlan.
    Note: only custom metadata, enumerations (options), and Atlan tag type
    definitions are currently supported. Furthermore, if any of these are
    created their respective cache will be force-refreshed.

    :param typedef: type definition to create
    :returns: the resulting type definition that was created
    :raises InvalidRequestError: if the typedef you are
    trying to create is not one of the allowed types
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = TypeDefCreate.prepare_request(typedef)
    raw_json = self._client._call_api(
        endpoint, request_obj=request_obj, exclude_unset=True
    )
    self._refresh_caches(typedef)
    return TypeDefCreate.process_response(raw_json)
get(type_category: Union[AtlanTypeCategory, List[AtlanTypeCategory]]) -> TypeDefResponse

Retrieves a TypeDefResponse object that contain a list of the specified category type definitions in Atlan.

:param type_category: category of type definitions to retrieve :returns: TypeDefResponse object that contain a list that contains the requested list of type definitions :raises AtlanError: on any API communication issue

Source code in pyatlan/client/typedef.py
@validate_arguments
def get(
    self, type_category: Union[AtlanTypeCategory, List[AtlanTypeCategory]]
) -> TypeDefResponse:
    """
    Retrieves a TypeDefResponse object that contain a list of the specified category type definitions in Atlan.

    :param type_category: category of type definitions to retrieve
    :returns: TypeDefResponse object that contain a list that contains the requested list of type definitions
    :raises AtlanError: on any API communication issue
    """
    endpoint, query_params = TypeDefGet.prepare_request_by_category(type_category)
    raw_json = self._client._call_api(endpoint, query_params)
    return TypeDefGet.process_response(raw_json)
get_all() -> TypeDefResponse

Retrieves a TypeDefResponse object that contains a list of all the type definitions in Atlan.

:returns: TypeDefResponse object that contains a list of all the type definitions in Atlan :raises AtlanError: on any API communication issue

Source code in pyatlan/client/typedef.py
def get_all(self) -> TypeDefResponse:
    """
    Retrieves a TypeDefResponse object that contains a list of all the type definitions in Atlan.

    :returns: TypeDefResponse object that contains  a list of all the type definitions in Atlan
    :raises AtlanError: on any API communication issue
    """
    endpoint, query_params = TypeDefGet.prepare_request_all()
    raw_json = self._client._call_api(endpoint, query_params)
    return TypeDefGet.process_response(raw_json)
get_by_name(name: str) -> TypeDef

Retrieves a specific type definition from Atlan.

:name: internal (hashed-string, if used) name of the type definition :returns: details of that specific type definition :raises ApiError: on receiving an unsupported type definition category or when unable to produce a valid response :raises AtlanError: on any API communication issue

Source code in pyatlan/client/typedef.py
@validate_arguments
def get_by_name(self, name: str) -> TypeDef:
    """
    Retrieves a specific type definition from Atlan.

    :name: internal (hashed-string, if used) name of the type definition
    :returns: details of that specific type definition
    :raises ApiError: on receiving an unsupported type definition
    category or when unable to produce a valid response
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = TypeDefGetByName.prepare_request(name)
    raw_json = self._client._call_api(endpoint, request_obj)
    return TypeDefGetByName.process_response(raw_json)
purge(name: str, typedef_type: type) -> None

Delete the type definition. Furthermore, if an Atlan tag, enumeration or custom metadata is deleted their respective cache will be force-refreshed.

:param name: internal hashed-string name of the type definition :param typedef_type: type of the type definition that is being deleted :raises InvalidRequestError: if the typedef you are trying to delete is not one of the allowed types :raises NotFoundError: if the typedef you are trying to delete cannot be found :raises AtlanError: on any API communication issue

Source code in pyatlan/client/typedef.py
@validate_arguments
def purge(self, name: str, typedef_type: type) -> None:
    """
    Delete the type definition.
    Furthermore, if an Atlan tag, enumeration or custom metadata is deleted their
    respective cache will be force-refreshed.

    :param name: internal hashed-string name of the type definition
    :param typedef_type: type of the type definition that is being deleted
    :raises InvalidRequestError: if the typedef you are trying to delete is not one of the allowed types
    :raises NotFoundError: if the typedef you are trying to delete cannot be found
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = TypeDefPurge.prepare_request(
        name, typedef_type, self._client
    )
    self._client._call_api(endpoint, request_obj)
    TypeDefPurge.refresh_caches(typedef_type, self._client)
update(typedef: TypeDef) -> TypeDefResponse

Update an existing type definition in Atlan. Note: only custom metadata, enumerations (options), and Atlan tag type definitions are currently supported. Furthermore, if any of these are updated their respective cache will be force-refreshed.

:param typedef: type definition to update :returns: the resulting type definition that was updated :raises InvalidRequestError: if the typedef you are trying to update is not one of the allowed types :raises AtlanError: on any API communication issue

Source code in pyatlan/client/typedef.py
@validate_arguments
def update(self, typedef: TypeDef) -> TypeDefResponse:
    """
    Update an existing type definition in Atlan.
    Note: only custom metadata, enumerations (options), and Atlan tag type
    definitions are currently supported. Furthermore, if any of these are
    updated their respective cache will be force-refreshed.

    :param typedef: type definition to update
    :returns: the resulting type definition that was updated
    :raises InvalidRequestError: if the typedef you are
    trying to update is not one of the allowed types
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = TypeDefUpdate.prepare_request(typedef)
    raw_json = self._client._call_api(
        endpoint, request_obj=request_obj, exclude_unset=True
    )
    self._refresh_caches(typedef)
    return TypeDefUpdate.process_response(raw_json)

Workflow Client

pyatlan.client.workflow

Classes

WorkflowClient(client: ApiCaller)

This class can be used to retrieve information and rerun workflows. This class does not need to be instantiated directly but can be obtained through the workflow property of AtlanClient.

Source code in pyatlan/client/workflow.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
add_schedule(workflow: Union[WorkflowResponse, WorkflowPackage, WorkflowSearchResult, WorkflowSearchResultDetail], workflow_schedule: WorkflowSchedule) -> WorkflowResponse
add_schedule(
    workflow: WorkflowResponse,
    workflow_schedule: WorkflowSchedule,
) -> WorkflowResponse
add_schedule(
    workflow: WorkflowPackage,
    workflow_schedule: WorkflowSchedule,
) -> WorkflowResponse
add_schedule(
    workflow: WorkflowSearchResult,
    workflow_schedule: WorkflowSchedule,
) -> WorkflowResponse
add_schedule(
    workflow: WorkflowSearchResultDetail,
    workflow_schedule: WorkflowSchedule,
) -> WorkflowResponse

Add a schedule for an existing workflow run.

:param workflow: existing workflow run to schedule. :param workflow_schedule: a WorkflowSchedule object containing: - A cron schedule expression, e.g: 5 4 * * *. - The time zone for the cron schedule, e.g: Europe/Paris.

:returns: a scheduled workflow. :raises AtlanError: on any API communication issue.

Source code in pyatlan/client/workflow.py
def add_schedule(
    self,
    workflow: Union[
        WorkflowResponse,
        WorkflowPackage,
        WorkflowSearchResult,
        WorkflowSearchResultDetail,
    ],
    workflow_schedule: WorkflowSchedule,
) -> WorkflowResponse:
    """
    Add a schedule for an existing workflow run.

    :param workflow: existing workflow run to schedule.
    :param workflow_schedule: a WorkflowSchedule object containing:
        - A cron schedule expression, e.g: `5 4 * * *`.
        - The time zone for the cron schedule, e.g: `Europe/Paris`.

    :returns: a scheduled workflow.
    :raises AtlanError: on any API communication issue.
    """
    validate_type(
        name="workflow",
        _type=(
            WorkflowResponse,
            WorkflowPackage,
            WorkflowSearchResult,
            WorkflowSearchResultDetail,
        ),
        value=workflow,
    )
    workflow_to_update = self._handle_workflow_types(workflow)
    self._add_schedule(workflow_to_update, workflow_schedule)
    # Check if user is API token user to determine endpoint
    use_package_endpoint = not self._client.role_cache.is_api_token_user()  # type: ignore[attr-defined]
    endpoint, request_obj = WorkflowScheduleUtils.prepare_request(
        workflow_to_update, use_package_endpoint
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return WorkflowScheduleUtils.process_response(raw_json)
delete(workflow_name: str) -> None

Archive (delete) the provided workflow.

:param workflow_name: name of the workflow as displayed in the UI (e.g: atlan-snowflake-miner-1714638976). :raises AtlanError: on any API communication issue.

Source code in pyatlan/client/workflow.py
@validate_arguments
def delete(
    self,
    workflow_name: str,
) -> None:
    """
    Archive (delete) the provided workflow.

    :param workflow_name: name of the workflow as displayed
    in the UI (e.g: `atlan-snowflake-miner-1714638976`).
    :raises AtlanError: on any API communication issue.
    """
    # Check if user is API token user to determine endpoint
    use_package_endpoint = not self._client.role_cache.is_api_token_user()  # type: ignore[attr-defined]
    endpoint, request_obj = WorkflowDelete.prepare_request(
        workflow_name, use_package_endpoint
    )
    self._client._call_api(endpoint, request_obj=request_obj)
find_by_id(id: str) -> Optional[WorkflowSearchResult]

Find workflows based on their ID (e.g: atlan-snowflake-miner-1714638976) Note: Only workflows that have been run will be found

:param id: the ID of the workflow to find :returns: the workflow with the provided ID, or None if none is found :raises AtlanError: on any API communication issue

Source code in pyatlan/client/workflow.py
@validate_arguments
def find_by_id(self, id: str) -> Optional[WorkflowSearchResult]:
    """
    Find workflows based on their ID (e.g: `atlan-snowflake-miner-1714638976`)
    Note: Only workflows that have been run will be found

    :param id: the ID of the workflow to find
    :returns: the workflow with the provided ID, or None if none is found
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = WorkflowFindById.prepare_request(id)
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return WorkflowFindById.process_response(raw_json)
find_by_type(prefix: WorkflowPackage, max_results: int = 10) -> List[WorkflowSearchResult]

Find workflows based on their type (prefix). Note: Only workflows that have been run will be found.

:param prefix: name of the specific workflow to find (for example CONNECTION_DELETE) :param max_results: the maximum number of results to retrieve :returns: the list of workflows of the provided type, with the most-recently created first :raises ValidationError: If the provided prefix is invalid workflow package :raises AtlanError: on any API communication issue

Source code in pyatlan/client/workflow.py
@validate_arguments
def find_by_type(
    self, prefix: WorkflowPackage, max_results: int = 10
) -> List[WorkflowSearchResult]:
    """
    Find workflows based on their type (prefix). Note: Only workflows that have been run will be found.

    :param prefix: name of the specific workflow to find (for example CONNECTION_DELETE)
    :param max_results: the maximum number of results to retrieve
    :returns: the list of workflows of the provided type, with the most-recently created first
    :raises ValidationError: If the provided prefix is invalid workflow package
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = WorkflowFindByType.prepare_request(prefix, max_results)
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return WorkflowFindByType.process_response(raw_json)
find_run_by_id(id: str) -> Optional[WorkflowSearchResult]

Find workflows runs based on their ID (e.g: atlan-snowflake-miner-1714638976-t7s8b) Note: Only workflow runs will be found

:param id: the ID of the workflow run to find :returns: the workflow run with the provided ID, or None if none is found :raises AtlanError: on any API communication issue

Source code in pyatlan/client/workflow.py
@validate_arguments
def find_run_by_id(self, id: str) -> Optional[WorkflowSearchResult]:
    """
    Find workflows runs based on their ID (e.g: `atlan-snowflake-miner-1714638976-t7s8b`)
    Note: Only workflow runs will be found

    :param id: the ID of the workflow run to find
    :returns: the workflow run with the provided ID, or None if none is found
    :raises AtlanError: on any API communication issue
    """

    query = Bool(
        filter=[
            Term(
                field="_id",
                value=id,
            ),
        ]
    )
    response = self._find_runs(query, size=1)
    return results[0] if (results := response.hits and response.hits.hits) else None
find_runs_by_status_and_time_range(status: List[AtlanWorkflowPhase], started_at: Optional[str] = None, finished_at: Optional[str] = None, from_: int = 0, size: int = 100) -> WorkflowSearchResponse

Retrieves a WorkflowSearchResponse object containing workflow runs based on their status and time range.

:param status: list of the workflow statuses to filter :param started_at: (optional) lower bound on 'status.startedAt' (e.g 'now-2h') :param finished_at: (optional) lower bound on 'status.finishedAt' (e.g 'now-1h') :param from_:(optional) starting index of the search results (default: 0). :param size: (optional) maximum number of search results to return (default: 100). :returns: a WorkflowSearchResponse object containing a list of workflows matching the filters :raises ValidationError: if inputs are invalid :raises AtlanError: on any API communication issue

Source code in pyatlan/client/workflow.py
@validate_arguments
def find_runs_by_status_and_time_range(
    self,
    status: List[AtlanWorkflowPhase],
    started_at: Optional[str] = None,
    finished_at: Optional[str] = None,
    from_: int = 0,
    size: int = 100,
) -> WorkflowSearchResponse:
    """
    Retrieves a WorkflowSearchResponse object containing workflow runs based on their status and time range.

    :param status: list of the workflow statuses to filter
    :param started_at: (optional) lower bound on 'status.startedAt' (e.g 'now-2h')
    :param finished_at: (optional) lower bound on 'status.finishedAt' (e.g 'now-1h')
    :param from_:(optional) starting index of the search results (default: `0`).
    :param size: (optional) maximum number of search results to return (default: `100`).
    :returns: a WorkflowSearchResponse object containing a list of workflows matching the filters
    :raises ValidationError: if inputs are invalid
    :raises AtlanError: on any API communication issue
    """
    # Use the original implementation since this has a complex custom query

    time_filters = []
    if started_at:
        time_filters.append(Range(field="status.startedAt", gte=started_at))
    if finished_at:
        time_filters.append(Range(field="status.finishedAt", lte=finished_at))

    run_lookup_query = Bool(
        must=[
            NestedQuery(
                query=Terms(
                    field="metadata.labels.workflows.argoproj.io/phase.keyword",
                    values=[s.value for s in status],
                ),
                path="metadata",
            ),
            *time_filters,
            NestedQuery(
                query=Exists(field="metadata.labels.workflows.argoproj.io/creator"),
                path="metadata",
            ),
        ],
    )
    run_lookup_results = self._find_runs(
        query=run_lookup_query, from_=from_, size=size
    )
    return run_lookup_results
find_schedule_query(saved_query_id: str, max_results: int = 10) -> List[WorkflowSearchResult]

Find scheduled query workflows by their saved query identifier.

:param saved_query_id: identifier of the saved query. :param max_results: maximum number of results to retrieve. Defaults to 10. :raises AtlanError: on any API communication issue. :returns: a list of scheduled query workflows.

Source code in pyatlan/client/workflow.py
@validate_arguments
def find_schedule_query(
    self, saved_query_id: str, max_results: int = 10
) -> List[WorkflowSearchResult]:
    """
    Find scheduled query workflows by their saved query identifier.

    :param saved_query_id: identifier of the saved query.
    :param max_results: maximum number of results to retrieve. Defaults to `10`.
    :raises AtlanError: on any API communication issue.
    :returns: a list of scheduled query workflows.
    """
    endpoint, request_obj = WorkflowFindScheduleQuery.prepare_request(
        saved_query_id, max_results
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return WorkflowFindScheduleQuery.process_response(raw_json)
find_schedule_query_between(request: ScheduleQueriesSearchRequest, missed: bool = False) -> Optional[List[WorkflowRunResponse]]

Find scheduled query workflows within the specified duration.

:param request: a ScheduleQueriesSearchRequest object containing start and end dates in ISO 8601 format (e.g: 2024-03-25T16:30:00.000+05:30). :param missed: if True, perform a search for missed scheduled query workflows. Defaults to False. :raises AtlanError: on any API communication issue. :returns: a list of scheduled query workflows found within the specified duration.

Source code in pyatlan/client/workflow.py
@validate_arguments
def find_schedule_query_between(
    self, request: ScheduleQueriesSearchRequest, missed: bool = False
) -> Optional[List[WorkflowRunResponse]]:
    """
    Find scheduled query workflows within the specified duration.

    :param request: a `ScheduleQueriesSearchRequest` object containing
    start and end dates in ISO 8601 format (e.g: `2024-03-25T16:30:00.000+05:30`).
    :param missed: if `True`, perform a search for missed
    scheduled query workflows. Defaults to `False`.
    :raises AtlanError: on any API communication issue.
    :returns: a list of scheduled query workflows found within the specified duration.
    """
    endpoint, request_obj = WorkflowFindScheduleQueryBetween.prepare_request(
        request, missed
    )
    raw_json = self._client._call_api(endpoint, query_params=request_obj)
    return WorkflowFindScheduleQueryBetween.process_response(raw_json)
get_all_scheduled_runs() -> List[WorkflowScheduleResponse]

Get the details of scheduled run for all workflow.

:returns: list of all the workflow schedules :raises AtlanError: on any API communication issue

Source code in pyatlan/client/workflow.py
def get_all_scheduled_runs(self) -> List[WorkflowScheduleResponse]:
    """
    Get the details of scheduled run for all workflow.

    :returns: list of all the workflow schedules
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = WorkflowGetAllScheduledRuns.prepare_request()
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return WorkflowGetAllScheduledRuns.process_response(raw_json)
get_runs(workflow_name: str, workflow_phase: AtlanWorkflowPhase, from_: int = 0, size: int = 100) -> Optional[WorkflowSearchResponse]

Retrieves all workflow runs.

:param workflow_name: name of the workflow as displayed in the UI (e.g: atlan-snowflake-miner-1714638976). :param workflow_phase: phase of the given workflow (e.g: Succeeded, Running, Failed, etc). :param from_: starting index of the search results (default: 0). :param size: maximum number of search results to return (default: 100). :returns: a list of runs of the given workflow. :raises AtlanError: on any API communication issue.

Source code in pyatlan/client/workflow.py
def get_runs(
    self,
    workflow_name: str,
    workflow_phase: AtlanWorkflowPhase,
    from_: int = 0,
    size: int = 100,
) -> Optional[WorkflowSearchResponse]:
    """
    Retrieves all workflow runs.

    :param workflow_name: name of the workflow as displayed
    in the UI (e.g: `atlan-snowflake-miner-1714638976`).
    :param workflow_phase: phase of the given workflow (e.g: Succeeded, Running, Failed, etc).
    :param from_: starting index of the search results (default: `0`).
    :param size: maximum number of search results to return (default: `100`).
    :returns: a list of runs of the given workflow.
    :raises AtlanError: on any API communication issue.
    """
    # Note: this method uses a custom query, so we'll keep the existing implementation

    query = Bool(
        must=[
            NestedQuery(
                query=Term(
                    field="spec.workflowTemplateRef.name.keyword",
                    value=workflow_name,
                ),
                path="spec",
            )
        ],
        filter=[Term(field="status.phase.keyword", value=workflow_phase.value)],
    )
    response = self._find_runs(query, from_=from_, size=size)
    return response
get_scheduled_run(workflow_name: str) -> WorkflowScheduleResponse

Get the details of scheduled run for a specific workflow.

:param workflow_name: name of the workflow for which we want the scheduled run details :returns: details of the workflow schedule :raises AtlanError: on any API communication issue

Source code in pyatlan/client/workflow.py
@validate_arguments
def get_scheduled_run(self, workflow_name: str) -> WorkflowScheduleResponse:
    """
    Get the details of scheduled run for a specific workflow.

    :param workflow_name: name of the workflow for which we want the scheduled run details
    :returns: details of the workflow schedule
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = WorkflowGetScheduledRun.prepare_request(workflow_name)
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return WorkflowGetScheduledRun.process_response(raw_json)
monitor(workflow_response: Optional[WorkflowResponse] = None, logger: Optional[Logger] = None, workflow_name: Optional[str] = None) -> Optional[AtlanWorkflowPhase]

Monitor a workflow until its completion (or the script terminates).

:param workflow_response: The workflow_response returned from running the workflow :param logger: the logger to log status information (logging.INFO for summary info. logging.DEBUG for detail info) :param workflow_name: name of the workflow to be monitored :returns: the status at completion or None if the workflow wasn't run :raises ValidationError: If the provided workflow_response, logger is invalid :raises AtlanError: on any API communication issue

Source code in pyatlan/client/workflow.py
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def monitor(
    self,
    workflow_response: Optional[WorkflowResponse] = None,
    logger: Optional[Logger] = None,
    workflow_name: Optional[str] = None,
) -> Optional[AtlanWorkflowPhase]:
    """
    Monitor a workflow until its completion (or the script terminates).

    :param workflow_response: The workflow_response returned from running the workflow
    :param logger: the logger to log status information
    (logging.INFO for summary info. logging.DEBUG for detail info)
    :param workflow_name: name of the workflow to be monitored
    :returns: the status at completion or None if the workflow wasn't run
    :raises ValidationError: If the provided `workflow_response`, `logger` is invalid
    :raises AtlanError: on any API communication issue
    """
    name = workflow_name or (
        workflow_response.metadata.name
        if workflow_response and workflow_response.metadata
        else None
    )

    if not name:
        if logger:
            logger.info("Skipping workflow monitoring — nothing to monitor.")
        return None

    status: Optional[AtlanWorkflowPhase] = None
    while status not in {
        AtlanWorkflowPhase.SUCCESS,
        AtlanWorkflowPhase.ERROR,
        AtlanWorkflowPhase.FAILED,
    }:
        sleep(MONITOR_SLEEP_SECONDS)
        if run_details := self._get_run_details(name):
            status = run_details.status
            if logger:
                logger.debug("Workflow status: %s", status)

    if logger:
        logger.info("Workflow completion status: %s", status)
    return status
re_run_schedule_query(schedule_query_id: str) -> WorkflowRunResponse

Re-run a scheduled query.

:param schedule_query_id: ID of the scheduled query to re-run :returns: the workflow run response :raises AtlanError: on any API communication issue

Source code in pyatlan/client/workflow.py
@validate_arguments
def re_run_schedule_query(self, schedule_query_id: str) -> WorkflowRunResponse:
    """
    Re-run a scheduled query.

    :param schedule_query_id: ID of the scheduled query to re-run
    :returns: the workflow run response
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = WorkflowReRunScheduleQuery.prepare_request(
        schedule_query_id
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return WorkflowReRunScheduleQuery.process_response(raw_json)
remove_schedule(workflow: Union[WorkflowResponse, WorkflowPackage, WorkflowSearchResult, WorkflowSearchResultDetail]) -> WorkflowResponse
remove_schedule(
    workflow: WorkflowResponse,
) -> WorkflowResponse
remove_schedule(
    workflow: WorkflowPackage,
) -> WorkflowResponse
remove_schedule(
    workflow: WorkflowSearchResult,
) -> WorkflowResponse
remove_schedule(
    workflow: WorkflowSearchResultDetail,
) -> WorkflowResponse

Remove a scheduled run from an existing workflow run.

:param workflow_run: existing workflow run to remove the schedule from. :returns: a workflow. :raises AtlanError: on any API communication issue.

Source code in pyatlan/client/workflow.py
def remove_schedule(
    self,
    workflow: Union[
        WorkflowResponse,
        WorkflowPackage,
        WorkflowSearchResult,
        WorkflowSearchResultDetail,
    ],
) -> WorkflowResponse:
    """
    Remove a scheduled run from an existing workflow run.

    :param workflow_run: existing workflow run to remove the schedule from.
    :returns: a workflow.
    :raises AtlanError: on any API communication issue.
    """
    validate_type(
        name="workflow",
        _type=(
            WorkflowResponse,
            WorkflowPackage,
            WorkflowSearchResult,
            WorkflowSearchResultDetail,
        ),
        value=workflow,
    )
    workflow_to_update = self._handle_workflow_types(workflow)
    WorkflowScheduleUtils.remove_schedule(workflow_to_update)
    # Check if user is API token user to determine endpoint
    use_package_endpoint = not self._client.role_cache.is_api_token_user()  # type: ignore[attr-defined]
    endpoint, request_obj = WorkflowScheduleUtils.prepare_request(
        workflow_to_update, use_package_endpoint
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return WorkflowScheduleUtils.process_response(raw_json)
rerun(workflow: Union[WorkflowPackage, WorkflowSearchResultDetail, WorkflowSearchResult], idempotent: bool = False) -> WorkflowRunResponse
rerun(
    workflow: WorkflowPackage, idempotent: bool = False
) -> WorkflowRunResponse
rerun(
    workflow: WorkflowSearchResultDetail,
    idempotent: bool = False,
) -> WorkflowRunResponse
rerun(
    workflow: WorkflowSearchResult, idempotent: bool = False
) -> WorkflowRunResponse

Rerun the workflow immediately. Note: this must be a workflow that was previously run.

:param workflow: The workflow to rerun. :param idempotent: If True, the workflow will only be rerun if it is not already currently running :returns: the details of the workflow run (if idempotent, will return details of the already-running workflow) :raises ValidationError: If the provided workflow is invalid :raises InvalidRequestException: If no prior runs are available for the provided workflow :raises AtlanError: on any API communication issue

Source code in pyatlan/client/workflow.py
def rerun(
    self,
    workflow: Union[
        WorkflowPackage, WorkflowSearchResultDetail, WorkflowSearchResult
    ],
    idempotent: bool = False,
) -> WorkflowRunResponse:
    """
    Rerun the workflow immediately.
    Note: this must be a workflow that was previously run.

    :param workflow: The workflow to rerun.
    :param idempotent: If `True`, the workflow will only be rerun if it is not already currently running
    :returns: the details of the workflow run (if `idempotent`, will return details of the already-running workflow)
    :raises ValidationError: If the provided workflow is invalid
    :raises InvalidRequestException: If no prior runs are available for the provided workflow
    :raises AtlanError: on any API communication issue
    """
    validate_type(
        name="workflow",
        _type=(WorkflowPackage, WorkflowSearchResultDetail, WorkflowSearchResult),
        value=workflow,
    )
    detail = self._handle_workflow_types(workflow)
    if idempotent and detail and detail.metadata and detail.metadata.name:
        # Introducing a delay before checking the current workflow run
        # since it takes some time to start or stop
        sleep(10)
        if (
            (
                current_run_details := self._find_current_run(
                    workflow_name=detail.metadata.name
                )
            )
            and current_run_details.source
            and current_run_details.source.metadata
            and current_run_details.source.spec
            and current_run_details.source.status
        ):
            return WorkflowParseResponse.parse_response(
                {
                    "metadata": current_run_details.source.metadata,
                    "spec": current_run_details.source.spec,
                    "status": current_run_details.source.status,
                },
                WorkflowRunResponse,
            )
    # Check if user is API token user to determine endpoint
    use_package_endpoint = not self._client.role_cache.is_api_token_user()  # type: ignore[attr-defined]
    endpoint, request_obj = WorkflowRerun.prepare_request(
        detail, use_package_endpoint
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return WorkflowRerun.process_response(raw_json)
run(workflow: Union[Workflow, str], workflow_schedule: Optional[WorkflowSchedule] = None) -> WorkflowResponse
run(
    workflow: Workflow,
    workflow_schedule: Optional[WorkflowSchedule] = None,
) -> WorkflowResponse
run(
    workflow: str,
    workflow_schedule: Optional[WorkflowSchedule] = None,
) -> WorkflowResponse

Run the Atlan workflow with a specific configuration.

Note: This method should only be used to create the workflow for the first time. Each invocation creates a new connection and new assets within that connection. Running the workflow multiple times with the same configuration may lead to duplicate assets. Consider using the "rerun()" method instead to re-execute an existing workflow.

:param workflow: workflow object to run or a raw workflow JSON string. :param workflow_schedule: (Optional) a WorkflowSchedule object containing: - A cron schedule expression, e.g: 5 4 * * *. - The time zone for the cron schedule, e.g: Europe/Paris.

:returns: Details of the workflow run. :raises ValidationError: If the provided workflow is invalid. :raises AtlanError: on any API communication issue.

Source code in pyatlan/client/workflow.py
def run(
    self,
    workflow: Union[Workflow, str],
    workflow_schedule: Optional[WorkflowSchedule] = None,
) -> WorkflowResponse:
    """
    Run the Atlan workflow with a specific configuration.

    Note: This method should only be used to create the workflow for the first time.
    Each invocation creates a new connection and new assets within that connection.
    Running the workflow multiple times with the same configuration may lead to duplicate assets.
    Consider using the "rerun()" method instead to re-execute an existing workflow.

    :param workflow: workflow object to run or a raw workflow JSON string.
    :param workflow_schedule: (Optional) a WorkflowSchedule object containing:
        - A cron schedule expression, e.g: `5 4 * * *`.
        - The time zone for the cron schedule, e.g: `Europe/Paris`.

    :returns: Details of the workflow run.
    :raises ValidationError: If the provided `workflow` is invalid.
    :raises AtlanError: on any API communication issue.
    """
    validate_type(name="workflow", _type=(Workflow, str), value=workflow)
    validate_type(
        name="workflow_schedule",
        _type=(WorkflowSchedule, None),
        value=workflow_schedule,
    )
    if isinstance(workflow, str):
        workflow = Workflow.parse_raw(workflow)
    if workflow_schedule:
        self._add_schedule(workflow, workflow_schedule)
    # Check if user is API token user to determine endpoint
    use_package_endpoint = not self._client.role_cache.is_api_token_user()  # type: ignore[attr-defined]
    endpoint, request_obj = WorkflowRun.prepare_request(
        workflow, workflow_schedule, use_package_endpoint
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return WorkflowRun.process_response(raw_json)
stop(workflow_run_id: str) -> WorkflowRunResponse

Stop the provided, running workflow.

:param workflow_run_id: identifier of the specific workflow run :returns: the stopped workflow run :raises AtlanError: on any API communication issue

Source code in pyatlan/client/workflow.py
@validate_arguments
def stop(
    self,
    workflow_run_id: str,
) -> WorkflowRunResponse:
    """
    Stop the provided, running workflow.

    :param workflow_run_id: identifier of the specific workflow run
    :returns: the stopped workflow run
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = WorkflowStop.prepare_request(workflow_run_id)
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return WorkflowStop.process_response(raw_json)
update(workflow: Workflow) -> WorkflowResponse

Update a given workflow's configuration.

:param workflow: request full details of the workflow's revised configuration. :returns: the updated workflow configuration. :raises ValidationError: If the provided workflow is invalid. :raises AtlanError: on any API communication issue

Source code in pyatlan/client/workflow.py
@validate_arguments
def update(self, workflow: Workflow) -> WorkflowResponse:
    """
    Update a given workflow's configuration.

    :param workflow: request full details of the workflow's revised configuration.
    :returns: the updated workflow configuration.
    :raises ValidationError: If the provided `workflow` is invalid.
    :raises AtlanError: on any API communication issue
    """
    # Check if user is API token user to determine endpoint
    use_package_endpoint = not self._client.role_cache.is_api_token_user()  # type: ignore[attr-defined]
    endpoint, request_obj = WorkflowUpdate.prepare_request(
        workflow, use_package_endpoint
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return WorkflowUpdate.process_response(raw_json)
update_owner(workflow_name: str, username: str) -> WorkflowResponse

Update the owner of a workflow.

:param workflow_name: name of the workflow for which we want to update owner :param username: new username of the user who should own the workflow :returns: workflow response details :raises AtlanError: on any API communication issue

Source code in pyatlan/client/workflow.py
@validate_arguments
def update_owner(self, workflow_name: str, username: str) -> WorkflowResponse:
    """
    Update the owner of a workflow.

    :param workflow_name: name of the workflow for which we want to update owner
    :param username: new username of the user who should own the workflow
    :returns: workflow response details
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = WorkflowUpdateOwner.prepare_request(
        workflow_name, username
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return WorkflowUpdateOwner.process_response(raw_json)

Functions

Credential Client

pyatlan.client.credential

Classes

CredentialClient(client: ApiCaller)

A client for managing credentials within the Atlan platform.

This class provides functionality for interacting with Atlan's credential objects. It allows you to perform operations such as retrieving, testing, and updating given credentials.

Source code in pyatlan/client/credential.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
creator(credential: Credential, test: bool = True) -> CredentialResponse

Create a new credential.

:param credential: provide full details of the credential's to be created. :param test: whether to validate the credentials (True) or skip validation (False) before creation, defaults to True. :returns: A CredentialResponse instance. :raises ValidationError: If the provided credential is invalid. :raises InvalidRequestError: If test is False and the credential contains a username or password.

Source code in pyatlan/client/credential.py
@validate_arguments
def creator(self, credential: Credential, test: bool = True) -> CredentialResponse:
    """
    Create a new credential.

    :param credential: provide full details of the credential's to be created.
    :param test: whether to validate the credentials (`True`) or skip validation
    (`False`) before creation, defaults to `True`.
    :returns: A CredentialResponse instance.
    :raises ValidationError: If the provided `credential` is invalid.
    :raises InvalidRequestError: If `test` is `False` and the credential contains a `username` or `password`.
    """
    # Validate request using shared logic
    CredentialCreate.validate_request(credential, test)

    # Prepare request using shared logic
    endpoint, query_params = CredentialCreate.prepare_request(test)

    # Make API call
    raw_json = self._client._call_api(
        api=endpoint,
        query_params=query_params,
        request_obj=credential,
    )

    # Process response using shared logic
    return CredentialCreate.process_response(raw_json)
get(guid: str) -> CredentialResponse

Retrieves a credential by its unique identifier (GUID). Note that this will never contain sensitive information in the credential, such as usernames, passwords or client secrets or keys.

:param guid: GUID of the credential. :returns: A CredentialResponse instance. :raises: AtlanError on any error during API invocation.

Source code in pyatlan/client/credential.py
@validate_arguments
def get(self, guid: str) -> CredentialResponse:
    """
    Retrieves a credential by its unique identifier (GUID).
    Note that this will never contain sensitive information
    in the credential, such as usernames, passwords or client secrets or keys.

    :param guid: GUID of the credential.
    :returns: A CredentialResponse instance.
    :raises: AtlanError on any error during API invocation.
    """
    # Prepare request using shared logic
    endpoint = CredentialGet.prepare_request(guid)

    # Make API call
    raw_json = self._client._call_api(endpoint)

    # Process response using shared logic
    return CredentialGet.process_response(raw_json)
get_all(filter: Optional[Dict[str, Any]] = None, limit: Optional[int] = None, offset: Optional[int] = None, workflow_name: Optional[str] = None) -> CredentialListResponse

Retrieves all credentials.

:param filter: (optional) dictionary specifying the filter criteria. :param limit: (optional) maximum number of credentials to retrieve. :param offset: (optional) number of credentials to skip before starting retrieval. :param workflow_name: (optional) name of the workflow to retrieve credentials for. :returns: CredentialListResponse instance. :raises: AtlanError on any error during API invocation.

Source code in pyatlan/client/credential.py
@validate_arguments
def get_all(
    self,
    filter: Optional[Dict[str, Any]] = None,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    workflow_name: Optional[str] = None,
) -> CredentialListResponse:
    """
    Retrieves all credentials.

    :param filter: (optional) dictionary specifying the filter criteria.
    :param limit: (optional) maximum number of credentials to retrieve.
    :param offset: (optional) number of credentials to skip before starting retrieval.
    :param workflow_name: (optional) name of the workflow to retrieve credentials for.
    :returns: CredentialListResponse instance.
    :raises: AtlanError on any error during API invocation.
    """
    # Prepare request using shared logic
    endpoint, params = CredentialGetAll.prepare_request(
        filter, limit, offset, workflow_name
    )

    # Make API call
    raw_json = self._client._call_api(endpoint, query_params=params)

    # Process response using shared logic
    return CredentialGetAll.process_response(raw_json)
purge_by_guid(guid: str) -> CredentialResponse

Hard-deletes (purges) credential by their unique identifier (GUID). This operation is irreversible.

:param guid: unique identifier(s) (GUIDs) of credential to hard-delete :returns: details of the hard-deleted asset(s) :raises AtlanError: on any API communication issue

Source code in pyatlan/client/credential.py
@validate_arguments
def purge_by_guid(self, guid: str) -> CredentialResponse:
    """
    Hard-deletes (purges) credential by their unique identifier (GUID).
    This operation is irreversible.

    :param guid: unique identifier(s) (GUIDs) of credential to hard-delete
    :returns: details of the hard-deleted asset(s)
    :raises AtlanError: on any API communication issue
    """
    # Prepare request using shared logic
    endpoint = CredentialPurge.prepare_request(guid)

    # Make API call
    raw_json = self._client._call_api(endpoint)

    return raw_json
test(credential: Credential) -> CredentialTestResponse

Tests the given credential by sending it to Atlan for validation.

:param credential: The credential to be tested. :type credential: A CredentialTestResponse instance. :returns: The response indicating the test result. :raises ValidationError: If the provided credential is invalid type. :raises AtlanError: On any error during API invocation.

Source code in pyatlan/client/credential.py
@validate_arguments
def test(self, credential: Credential) -> CredentialTestResponse:
    """
    Tests the given credential by sending it to Atlan for validation.

    :param credential: The credential to be tested.
    :type credential: A CredentialTestResponse instance.
    :returns: The response indicating the test result.
    :raises ValidationError: If the provided credential is invalid type.
    :raises AtlanError: On any error during API invocation.
    """
    # Make API call
    raw_json = self._client._call_api(TEST_CREDENTIAL, request_obj=credential)

    # Process response using shared logic
    return CredentialTest.process_response(raw_json)
test_and_update(credential: Credential) -> CredentialResponse

Updates this credential in Atlan after first testing it to confirm its successful validation.

:param credential: The credential to be tested and updated. :returns: An updated CredentialResponse instance. :raises ValidationError: If the provided credential is invalid type. :raises InvalidRequestException: if the provided credentials cannot be validated successfully. :raises InvalidRequestException: If the provided credential does not have an ID. :raises AtlanError: on any error during API invocation.

Source code in pyatlan/client/credential.py
@validate_arguments
def test_and_update(self, credential: Credential) -> CredentialResponse:
    """
    Updates this credential in Atlan after first
    testing it to confirm its successful validation.

    :param credential: The credential to be tested and updated.
    :returns: An updated CredentialResponse instance.
    :raises ValidationError: If the provided credential is invalid type.
    :raises InvalidRequestException: if the provided credentials
    cannot be validated successfully.
    :raises InvalidRequestException: If the provided credential
    does not have an ID.
    :raises AtlanError: on any error during API invocation.
    """
    # Test credential first
    test_response = self.test(credential=credential)

    # Validate test response using shared logic
    CredentialTestAndUpdate.validate_test_response(test_response, credential)

    # Prepare update request using shared logic
    endpoint = CredentialTestAndUpdate.prepare_request(credential)

    # Make API call
    raw_json = self._client._call_api(endpoint, request_obj=credential)

    # Process response using shared logic
    return CredentialTestAndUpdate.process_response(raw_json)

Contract Client

pyatlan.client.contract

Classes

ContractClient(client: ApiCaller)

A client for data contract-specific operations.

Source code in pyatlan/client/contract.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
generate_initial_spec(asset: Asset) -> Optional[str]

Generate an initial contract spec for the provided asset. The asset must have at least its qualifiedName (and typeName) populated.

:param asset: for which to generate the initial contract spec

:raises AtlanError: if there is an issue interacting with the API :returns: YAML for the initial contract spec for the provided asset

Source code in pyatlan/client/contract.py
@validate_arguments
def generate_initial_spec(
    self,
    asset: Asset,
) -> Optional[str]:
    """
    Generate an initial contract spec for the provided asset.
    The asset must have at least its `qualifiedName` (and `typeName`) populated.

    :param asset: for which to generate the initial contract spec

    :raises AtlanError: if there is an issue interacting with the API
    :returns: YAML for the initial contract spec for the provided asset
    """
    # Prepare request using shared logic
    request_obj = ContractInit.prepare_request(asset)

    # Make API call
    response = self._client._call_api(CONTRACT_INIT_API, request_obj=request_obj)

    # Process response using shared logic
    return ContractInit.process_response(response)

Query Client

pyatlan.client.query

Classes

QueryClient(client: ApiCaller)

A client for running SQL queries.

Source code in pyatlan/client/query.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
stream(request: QueryRequest) -> QueryResponse

Runs the provided query and returns its results.

:param: request query to run. :returns: results of the query. :raises : AtlanError on any issues with API communication.

Source code in pyatlan/client/query.py
@validate_arguments
def stream(self, request: QueryRequest) -> QueryResponse:
    """
    Runs the provided query and returns its results.

    :param: request query to run.
    :returns: results of the query.
    :raises : AtlanError on any issues with API communication.
    """
    # Prepare request using shared logic
    endpoint, request_obj = QueryStream.prepare_request(request)

    # Execute API call
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)

    # Process response using shared logic
    return QueryStream.process_response(raw_json)

Search Log Client

pyatlan.client.search_log

Classes

SearchLogClient(client: ApiCaller)

This class can be used to configure and run a search against Atlan's searcg log. This class does not need to be instantiated directly but can be obtained through the search_log property of AtlanClient.

Source code in pyatlan/client/search_log.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
search(criteria: SearchLogRequest, bulk=False) -> Union[SearchLogViewResults, SearchLogResults]

Search for search logs using the provided criteria. Note: if the number of results exceeds the predefined threshold (10,000 search logs) this will be automatically converted into an search log bulk search.

:param criteria: detailing the search query, parameters, and so on to run :param bulk: whether to run the search to retrieve search logs that match the supplied criteria, for large numbers of results (> 10,000), defaults to False. Note: this will reorder the results (based on creation timestamp) in order to iterate through a large number (more than 10,000) results. :raises InvalidRequestError:

- if search log bulk search is enabled (`bulk=True`) and any
  user-specified sorting options are found in the search request.
- if search log bulk search is disabled (`bulk=False`) and the number of results
  exceeds the predefined threshold (i.e: `10,000` assets)
  and any user-specified sorting options are found in the search request.

:raises AtlanError: on any API communication issue :returns: the results of the search

Source code in pyatlan/client/search_log.py
@validate_arguments
def search(
    self, criteria: SearchLogRequest, bulk=False
) -> Union[SearchLogViewResults, SearchLogResults]:
    """
    Search for search logs using the provided criteria.
    `Note:` if the number of results exceeds the predefined threshold
    (10,000 search logs) this will be automatically converted into an search log `bulk` search.

    :param criteria: detailing the search query, parameters, and so on to run
    :param bulk: whether to run the search to retrieve search logs that match the supplied criteria,
    for large numbers of results (> `10,000`), defaults to `False`. Note: this will reorder the results
    (based on creation timestamp) in order to iterate through a large number (more than `10,000`) results.
    :raises InvalidRequestError:

        - if search log bulk search is enabled (`bulk=True`) and any
          user-specified sorting options are found in the search request.
        - if search log bulk search is disabled (`bulk=False`) and the number of results
          exceeds the predefined threshold (i.e: `10,000` assets)
          and any user-specified sorting options are found in the search request.

    :raises AtlanError: on any API communication issue
    :returns: the results of the search
    """
    # Prepare request using shared logic
    endpoint, request_obj = SearchLogSearch.prepare_request(criteria, bulk)

    # Execute API call
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)

    # Process response using shared logic (which returns the final results)
    results = SearchLogSearch.process_response(
        raw_json, criteria, bulk, self._client
    )

    # If it's SearchLogResults (not SearchLogViewResults), check for bulk search conversion
    if isinstance(results, SearchLogResults):
        if SearchLogSearch.check_for_bulk_search(
            results.count, criteria, bulk, SearchLogResults
        ):
            # Recursive call with updated criteria
            return self.search(criteria)

    return results

Task Client

pyatlan.client.task

Classes

TaskClient(client: ApiCaller)

A client for operating on tasks.

Source code in pyatlan/client/task.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
search(request: TaskSearchRequest) -> TaskSearchResponse

Search for tasks using the provided criteria.

:param request: search request for tasks :returns: search results for tasks

Source code in pyatlan/client/task.py
@validate_arguments
def search(self, request: TaskSearchRequest) -> TaskSearchResponse:
    """
    Search for tasks using the provided criteria.

    :param request: search request for tasks
    :returns: search results for tasks
    """
    endpoint, request_obj = TaskSearch.prepare_request(request)
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    response_data = TaskSearch.process_response(raw_json)

    return TaskSearchResponse(
        client=self._client,
        endpoint=endpoint,
        criteria=request,
        start=request.dsl.from_,
        size=request.dsl.size,
        count=response_data["count"],
        tasks=response_data["tasks"],
        aggregations=response_data["aggregations"],
    )

SSO Client

pyatlan.client.sso

Classes

SSOClient(client: ApiCaller)

A client for operating on Atlan's single sign-on (SSO).

Source code in pyatlan/client/sso.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
create_group_mapping(sso_alias: str, atlan_group: AtlanGroup, sso_group_name: str) -> SSOMapper

Creates a new Atlan SSO group mapping.

:param sso_alias: name of the SSO provider. :param atlan_group: existing Atlan group. :param sso_group_name: name of the SSO group. :raises AtlanError: on any error during API invocation. :returns: created SSO group mapping instance.

Source code in pyatlan/client/sso.py
@validate_arguments
def create_group_mapping(
    self, sso_alias: str, atlan_group: AtlanGroup, sso_group_name: str
) -> SSOMapper:
    """
    Creates a new Atlan SSO group mapping.

    :param sso_alias: name of the SSO provider.
    :param atlan_group: existing Atlan group.
    :param sso_group_name: name of the SSO group.
    :raises AtlanError: on any error during API invocation.
    :returns: created SSO group mapping instance.
    """
    self._check_existing_group_mappings(sso_alias, atlan_group)
    endpoint, request_obj = SSOCreateGroupMapping.prepare_request(
        sso_alias, atlan_group, sso_group_name
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return SSOCreateGroupMapping.process_response(raw_json)
delete_group_mapping(sso_alias: str, group_map_id: str) -> None

Deletes an existing Atlan SSO group mapping.

:param sso_alias: name of the SSO provider. :param group_map_id: existing SSO group map identifier. :raises AtlanError: on any error during API invocation. :returns: an empty response (None).

Source code in pyatlan/client/sso.py
@validate_arguments
def delete_group_mapping(self, sso_alias: str, group_map_id: str) -> None:
    """
    Deletes an existing Atlan SSO group mapping.

    :param sso_alias: name of the SSO provider.
    :param group_map_id: existing SSO group map identifier.
    :raises AtlanError: on any error during API invocation.
    :returns: an empty response (`None`).
    """
    endpoint, request_obj = SSODeleteGroupMapping.prepare_request(
        sso_alias, group_map_id
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return raw_json
get_all_group_mappings(sso_alias: str) -> List[SSOMapper]

Retrieves all existing Atlan SSO group mappings.

:param sso_alias: name of the SSO provider. :raises AtlanError: on any error during API invocation. :returns: list of existing SSO group mapping instances.

Source code in pyatlan/client/sso.py
@validate_arguments
def get_all_group_mappings(self, sso_alias: str) -> List[SSOMapper]:
    """
    Retrieves all existing Atlan SSO group mappings.

    :param sso_alias: name of the SSO provider.
    :raises AtlanError: on any error during API invocation.
    :returns: list of existing SSO group mapping instances.
    """
    endpoint, request_obj = SSOGetAllGroupMappings.prepare_request(sso_alias)
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return SSOGetAllGroupMappings.process_response(raw_json)
get_group_mapping(sso_alias: str, group_map_id: str) -> SSOMapper

Retrieves an existing Atlan SSO group mapping.

:param sso_alias: name of the SSO provider. :param group_map_id: existing SSO group map identifier. :raises AtlanError: on any error during API invocation. :returns: existing SSO group mapping instance.

Source code in pyatlan/client/sso.py
@validate_arguments
def get_group_mapping(self, sso_alias: str, group_map_id: str) -> SSOMapper:
    """
    Retrieves an existing Atlan SSO group mapping.

    :param sso_alias: name of the SSO provider.
    :param group_map_id: existing SSO group map identifier.
    :raises AtlanError: on any error during API invocation.
    :returns: existing SSO group mapping instance.
    """
    endpoint, request_obj = SSOGetGroupMapping.prepare_request(
        sso_alias, group_map_id
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return SSOGetGroupMapping.process_response(raw_json)
update_group_mapping(sso_alias: str, atlan_group: AtlanGroup, group_map_id: str, group_map_name: str, sso_group_name: str) -> SSOMapper

Update an existing Atlan SSO group mapping.

:param sso_alias: name of the SSO provider. :param atlan_group: existing Atlan group. :param group_map_id: existing SSO group map identifier. :param group_map_name: existing SSO group map name. :param sso_group_name: new SSO group name. :raises AtlanError: on any error during API invocation. :returns: updated SSO group mapping instance.

Source code in pyatlan/client/sso.py
@validate_arguments
def update_group_mapping(
    self,
    sso_alias: str,
    atlan_group: AtlanGroup,
    group_map_id: str,
    group_map_name: str,
    sso_group_name: str,
) -> SSOMapper:
    """
    Update an existing Atlan SSO group mapping.

    :param sso_alias: name of the SSO provider.
    :param atlan_group: existing Atlan group.
    :param group_map_id: existing SSO group map identifier.
    :param group_map_name: existing SSO group map name.
    :param sso_group_name: new SSO group name.
    :raises AtlanError: on any error during API invocation.
    :returns: updated SSO group mapping instance.
    """
    endpoint, request_obj = SSOUpdateGroupMapping.prepare_request(
        sso_alias, atlan_group, group_map_id, group_map_name, sso_group_name
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return SSOUpdateGroupMapping.process_response(raw_json)

Open Lineage Client

pyatlan.client.open_lineage

Classes

OpenLineageClient(client: ApiCaller)

A client for interacting with OpenLineage.

Source code in pyatlan/client/open_lineage.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
create_connection(name: str, connector_type: AtlanConnectorType = AtlanConnectorType.SPARK, admin_users: Optional[List[str]] = None, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None) -> AssetMutationResponse

Creates a connection for OpenLineage.

:param name: name for the new connection :param connector_type: for the new connection to be associated with :param admin_users: list of admin users to associate with this connection :param admin_roles: list of admin roles to associate with this connection :param admin_groups:list of admin groups to associate with this connection :return: details of the connection created

Source code in pyatlan/client/open_lineage.py
@validate_arguments
def create_connection(
    self,
    name: str,
    connector_type: AtlanConnectorType = AtlanConnectorType.SPARK,
    admin_users: Optional[List[str]] = None,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
) -> AssetMutationResponse:
    """
    Creates a connection for OpenLineage.

    :param name:  name for the new connection
    :param connector_type: for the new connection to be associated with
    :param admin_users: list of admin users to associate with this connection
    :param admin_roles: list of admin roles to associate with this connection
    :param admin_groups:list of admin groups to associate with this connection
    :return: details of the connection created
    """
    # Step 1: Create credential using shared logic
    create_credential = OpenLineageCreateCredential.prepare_request(connector_type)
    credential_response = self._client.credentials.creator(  # type: ignore[attr-defined]
        credential=create_credential
    )

    # Step 2: Create connection using shared logic
    connection = OpenLineageCreateConnection.prepare_request(
        client=self._client,
        name=name,
        connector_type=connector_type,
        credential_id=credential_response.id,
        admin_users=admin_users,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
    )

    # Save connection and return response directly
    return self._client.asset.save(connection)  # type: ignore[attr-defined]
send(request: Union[OpenLineageEvent, OpenLineageRawEvent, List[Dict[str, Any]], Dict[str, Any], str], connector_type: AtlanConnectorType) -> None

Sends the OpenLineage event to Atlan to be consumed.

:param request: OpenLineage event to send - can be an OpenLineageEvent, OpenLineageRawEvent, list of dicts, dict, or JSON string :param connector_type: of the connection that should receive the OpenLineage event :raises AtlanError: when OpenLineage is not configured OR on any issues with API communication

Source code in pyatlan/client/open_lineage.py
def send(
    self,
    request: Union[
        OpenLineageEvent,
        OpenLineageRawEvent,
        List[Dict[str, Any]],
        Dict[str, Any],
        str,
    ],
    connector_type: AtlanConnectorType,
) -> None:
    """
    Sends the OpenLineage event to Atlan to be consumed.

    :param request: OpenLineage event to send - can be an OpenLineageEvent, OpenLineageRawEvent, list of dicts, dict, or JSON string
    :param connector_type: of the connection that should receive the OpenLineage event
    :raises AtlanError: when OpenLineage is not configured OR on any issues with API communication
    """
    validate_type(
        name="request",
        _type=(OpenLineageEvent, OpenLineageRawEvent, list, dict, str),
        value=request,
    )
    validate_type(
        name="connector_type",
        _type=(AtlanConnectorType),
        value=connector_type,
    )
    try:
        # Convert raw list/dict/str/ of dicts to OpenLineageRawEvent if needed
        if isinstance(request, (dict, str, list)):
            if isinstance(request, str):
                request = OpenLineageRawEvent.parse_raw(request)
            else:
                # For list or dict, use parse_obj
                request = OpenLineageRawEvent.parse_obj(request)

        # Prepare request using shared logic
        api_endpoint, request_obj, api_options = OpenLineageSend.prepare_request(
            request, connector_type
        )
        # Make API call - _call_api handles JSON conversion automatically
        self._client._call_api(
            request_obj=request_obj, api=api_endpoint, **api_options
        )
    except AtlanError as e:
        # Validate and handle OpenLineage-specific errors using shared logic
        OpenLineageSend.validate_response(e, connector_type)

Functions

Impersonation Client

pyatlan.client.impersonate

Classes

ImpersonationClient(client: ApiCaller)

This class can be used for impersonating users as part of Atlan automations (if desired). Note: this will only work when run as part of Atlan's packaged workflow ecosystem (running in the cluster back-end).

Source code in pyatlan/client/impersonate.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
escalate() -> str

Escalate to a privileged user on a short-term basis. Note: this is only possible from within the Atlan tenant, and only when given the appropriate credentials.

:returns: a short-lived bearer token with escalated privileges :raises AtlanError: on any API communication issue

Source code in pyatlan/client/impersonate.py
def escalate(self) -> str:
    """
    Escalate to a privileged user on a short-term basis.
    Note: this is only possible from within the Atlan tenant, and only when given the appropriate credentials.

    :returns: a short-lived bearer token with escalated privileges
    :raises AtlanError: on any API communication issue
    """
    # Get client info using shared logic
    client_info = ImpersonateEscalate.get_client_info()

    # Prepare escalation request using shared logic
    endpoint, credentials = ImpersonateEscalate.prepare_request(client_info)

    try:
        raw_json = self._client._call_api(endpoint, request_obj=credentials)
        return ImpersonateEscalate.process_response(raw_json)
    except AtlanError as atlan_err:
        raise ErrorCode.UNABLE_TO_ESCALATE.exception_with_parameters() from atlan_err
get_client_secret(client_guid: str) -> Optional[str]

Retrieves the client secret associated with the given client GUID

:param client_guid: GUID of the client whose secret is to be retrieved :returns: client secret if available, otherwise None :raises: - AtlanError: If an API error occurs. - InvalidRequestError: If the provided GUID is invalid or retrieval fails.

Source code in pyatlan/client/impersonate.py
def get_client_secret(self, client_guid: str) -> Optional[str]:
    """
    Retrieves the client secret associated with the given client GUID

    :param client_guid: GUID of the client whose secret is to be retrieved
    :returns: client secret if available, otherwise `None`
    :raises:
        - AtlanError: If an API error occurs.
        - InvalidRequestError: If the provided GUID is invalid or retrieval fails.
    """
    try:
        # Prepare request using shared logic
        endpoint = ImpersonateGetClientSecret.prepare_request(client_guid)

        # Make API call
        raw_json = self._client._call_api(endpoint)

        # Process response using shared logic
        return ImpersonateGetClientSecret.process_response(raw_json)
    except AtlanError as e:
        raise ErrorCode.UNABLE_TO_RETRIEVE_CLIENT_SECRET.exception_with_parameters(
            client_guid
        ) from e
get_user_id(username: str) -> Optional[str]

Retrieves the user ID from Keycloak for the specified username. This method is particularly useful for impersonating API tokens.

:param username: username of the user whose ID needs to be retrieved. :returns: Keycloak user ID :raises: - AtlanError: If an API error occurs. - InvalidRequestError: If an error occurs while fetching the user ID from Keycloak.

Source code in pyatlan/client/impersonate.py
def get_user_id(self, username: str) -> Optional[str]:
    """
    Retrieves the user ID from Keycloak for the specified username.
    This method is particularly useful for impersonating API tokens.

    :param username: username of the user whose ID needs to be retrieved.
    :returns: Keycloak user ID
    :raises:
        - AtlanError: If an API error occurs.
        - InvalidRequestError: If an error occurs while fetching the user ID from Keycloak.
    """
    try:
        # Prepare request using shared logic
        endpoint, query_params = ImpersonateGetUserId.prepare_request(username)

        # Make API call
        raw_json = self._client._call_api(endpoint, query_params=query_params)

        # Process response using shared logic
        return ImpersonateGetUserId.process_response(raw_json)
    except AtlanError as e:
        raise ErrorCode.UNABLE_TO_RETRIEVE_USER_GUID.exception_with_parameters(
            username
        ) from e
user(user_id: str) -> str

Retrieves a bearer token that impersonates the provided user.

:param user_id: unique identifier of the user to impersonate :returns: a bearer token that impersonates the provided user :raises AtlanError: on any API communication issue

Source code in pyatlan/client/impersonate.py
def user(self, user_id: str) -> str:
    """
    Retrieves a bearer token that impersonates the provided user.

    :param user_id: unique identifier of the user to impersonate
    :returns: a bearer token that impersonates the provided user
    :raises AtlanError: on any API communication issue
    """
    # Get client info using shared logic
    client_info = ImpersonateUser.get_client_info()

    # Prepare escalation request using shared logic
    endpoint, credentials = ImpersonateUser.prepare_request(client_info)

    LOGGER.debug("Getting token with client id and secret")
    try:
        raw_json = self._client._call_api(endpoint, request_obj=credentials)
        argo_token = ImpersonateUser.process_response(raw_json)
    except AtlanError as atlan_err:
        raise ErrorCode.UNABLE_TO_ESCALATE.exception_with_parameters() from atlan_err

    LOGGER.debug("Getting token with subject token")
    try:
        # Prepare impersonation request using shared logic
        endpoint, user_credentials = ImpersonateUser.prepare_impersonation_request(
            client_info, argo_token, user_id
        )
        raw_json = self._client._call_api(endpoint, request_obj=user_credentials)
        return ImpersonateUser.process_response(raw_json)
    except AtlanError as atlan_err:
        raise ErrorCode.UNABLE_TO_IMPERSONATE.exception_with_parameters() from atlan_err

Admin Client

pyatlan.client.admin

Classes

AdminClient(client: ApiCaller)

This class can be used to retrieve keycloak and admin events. This class does not need to be instantiated directly but can be obtained through the admin property of AtlanClient.

Source code in pyatlan/client/admin.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
get_admin_events(admin_request: AdminEventRequest) -> AdminEventResponse

Retrieve admin events based on the supplied filters.

:param admin_request: details of the filters to apply when retrieving admin events :returns: the admin events that match the supplied filters :raises AtlanError: on any API communication issue

Source code in pyatlan/client/admin.py
@validate_arguments
def get_admin_events(self, admin_request: AdminEventRequest) -> AdminEventResponse:
    """
    Retrieve admin events based on the supplied filters.

    :param admin_request: details of the filters to apply when retrieving admin events
    :returns: the admin events that match the supplied filters
    :raises AtlanError: on any API communication issue
    """
    endpoint, query_params = AdminGetAdminEvents.prepare_request(admin_request)
    raw_json = self._client._call_api(
        endpoint, query_params=query_params, exclude_unset=True
    )
    response_data = AdminGetAdminEvents.process_response(raw_json, admin_request)

    return AdminEventResponse(client=self._client, **response_data)
get_keycloak_events(keycloak_request: KeycloakEventRequest) -> KeycloakEventResponse

Retrieve all events, based on the supplied filters.

:param keycloak_request: details of the filters to apply when retrieving events :returns: the events that match the supplied filters :raises AtlanError: on any API communication issue

Source code in pyatlan/client/admin.py
@validate_arguments
def get_keycloak_events(
    self, keycloak_request: KeycloakEventRequest
) -> KeycloakEventResponse:
    """
    Retrieve all events, based on the supplied filters.

    :param keycloak_request: details of the filters to apply when retrieving events
    :returns: the events that match the supplied filters
    :raises AtlanError: on any API communication issue
    """
    endpoint, query_params = AdminGetKeycloakEvents.prepare_request(
        keycloak_request
    )
    raw_json = self._client._call_api(
        endpoint,
        query_params=query_params,
        exclude_unset=True,
    )
    response_data = AdminGetKeycloakEvents.process_response(
        raw_json, keycloak_request
    )

    return KeycloakEventResponse(client=self._client, **response_data)

OAuth

pyatlan.client.oauth

Classes

OAuthTokenManager(base_url: str, client_id: str, client_secret: str, http_client: Optional[httpx.Client] = None, connect_timeout: float = 30.0, read_timeout: float = 900.0, write_timeout: float = 30.0, pool_timeout: float = 30.0)

Manages OAuth tokens for HTTP clients. :param base_url: Base URL of the Atlan tenant. :param client_id: OAuth client ID. :param client_secret: OAuth client secret. :param http_client: Optional HTTP client to use. :param connect_timeout: Timeout for establishing connections. :param read_timeout: Timeout for reading data. :param write_timeout: Timeout for writing data. :param pool_timeout: Timeout for acquiring a connection from the pool.

Source code in pyatlan/client/oauth.py
def __init__(
    self,
    base_url: str,
    client_id: str,
    client_secret: str,
    http_client: Optional[httpx.Client] = None,
    connect_timeout: float = 30.0,
    read_timeout: float = 900.0,
    write_timeout: float = 30.0,
    pool_timeout: float = 30.0,
):
    self.base_url = base_url
    self.client_id = client_id
    self.client_secret = client_secret
    self.token_url = self._create_path(GET_OAUTH_CLIENT)
    self._lock = threading.Lock()
    self._http_client = http_client or httpx.Client(
        timeout=httpx.Timeout(
            connect=connect_timeout,
            read=read_timeout,
            write=write_timeout,
            pool=pool_timeout,
        )
    )
    self._token: Optional[OAuth2Token] = None
    self._owns_client = http_client is None
Functions
close()

Closes the underlying HTTP client if owned by this manager.

Source code in pyatlan/client/oauth.py
def close(self):
    """
    Closes the underlying HTTP client if owned by this manager.
    """
    if self._owns_client:
        self._http_client.close()
get_token() -> str

Retrieves a valid OAuth token, refreshing it if necessary.

Source code in pyatlan/client/oauth.py
def get_token(self) -> str:
    """
    Retrieves a valid OAuth token, refreshing it if necessary.
    """
    with self._lock:
        if self._token and not self._token.is_expired():
            return str(self._token["access_token"])

        response = self._http_client.post(
            self.token_url,
            json={
                "clientId": self.client_id,
                "clientSecret": self.client_secret,
            },
            headers={"Content-Type": "application/json"},
        )
        response.raise_for_status()

        data = response.json()
        access_token = data.get("accessToken") or data.get("access_token")

        if not access_token:
            raise ValueError(
                f"OAuth token response missing 'accessToken' field. "
                f"Response keys: {list(data.keys())}"
            )

        expires_in = data.get("expiresIn") or data.get("expires_in", 600)

        self._token = OAuth2Token(
            {
                "access_token": access_token,
                "token_type": data.get("tokenType")
                or data.get("token_type", "Bearer"),
                "expires_in": expires_in,
                "expires_at": int(time.time()) + expires_in,
            }
        )

        return access_token
invalidate_token()

Invalidates the current OAuth token.

Source code in pyatlan/client/oauth.py
def invalidate_token(self):
    """
    Invalidates the current OAuth token.
    """
    with self._lock:
        self._token = None

pyatlan.client.oauth_client

Classes

OAuthClient(client: ApiCaller)

This class can be used to manage OAuth client credentials. This class does not need to be instantiated directly but can be obtained through the oauth_client property of AtlanClient.

Source code in pyatlan/client/oauth_client.py
def __init__(self, client: ApiCaller):
    if not isinstance(client, ApiCaller):
        raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
            "client", "ApiCaller"
        )
    self._client = client
Functions
create(name: str, role: str, description: Optional[str] = None, persona_qualified_names: Optional[List[str]] = None) -> OAuthClientCreateResponse

Create a new OAuth client with the provided settings.

:param name: human-readable name for the OAuth client (displayed in UI) :param role: role description to assign to the OAuth client (e.g., 'Admin', 'Member'). :param description: optional explanation of the OAuth client :param persona_qualified_names: qualified names of personas to associate with the OAuth client :returns: the created OAuthClientCreateResponse (includes client_id and client_secret) :raises AtlanError: on any API communication issue :raises NotFoundError: if the specified role description is not found

Source code in pyatlan/client/oauth_client.py
@validate_arguments
def create(
    self,
    name: str,
    role: str,
    description: Optional[str] = None,
    persona_qualified_names: Optional[List[str]] = None,
) -> OAuthClientCreateResponse:
    """
    Create a new OAuth client with the provided settings.

    :param name: human-readable name for the OAuth client (displayed in UI)
    :param role: role description to assign to the OAuth client (e.g., 'Admin', 'Member').
    :param description: optional explanation of the OAuth client
    :param persona_qualified_names: qualified names of personas to associate with the OAuth client
    :returns: the created OAuthClientCreateResponse (includes client_id and client_secret)
    :raises AtlanError: on any API communication issue
    :raises NotFoundError: if the specified role description is not found
    """
    # Fetch available roles and resolve the user-provided role name
    available_roles = self._fetch_available_roles()
    resolved_role = OAuthClientCreate.resolve_role_name(role, available_roles)

    # Prepare and execute the request
    endpoint, request_obj = OAuthClientCreate.prepare_request(
        display_name=name,
        role=resolved_role,
        description=description,
        persona_qualified_names=persona_qualified_names,
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return OAuthClientCreate.process_response(raw_json)
get(limit: int = 20, offset: int = 0, sort: Optional[str] = None) -> OAuthClientListResponse

Retrieves OAuth clients defined in Atlan with pagination support.

:param limit: maximum number of results to be returned per page (default: 20) :param offset: starting point for results to return, for paging :param sort: property by which to sort the results (e.g., 'createdAt' for descending) :returns: an OAuthClientListResponse containing records and pagination info :raises AtlanError: on any API communication issue

Source code in pyatlan/client/oauth_client.py
@validate_arguments
def get(
    self,
    limit: int = 20,
    offset: int = 0,
    sort: Optional[str] = None,
) -> OAuthClientListResponse:
    """
    Retrieves OAuth clients defined in Atlan with pagination support.

    :param limit: maximum number of results to be returned per page (default: 20)
    :param offset: starting point for results to return, for paging
    :param sort: property by which to sort the results (e.g., 'createdAt' for descending)
    :returns: an OAuthClientListResponse containing records and pagination info
    :raises AtlanError: on any API communication issue
    """
    endpoint, query_params = OAuthClientGet.prepare_request(limit, offset, sort)
    raw_json = self._client._call_api(endpoint, query_params)
    return OAuthClientListResponse(
        **raw_json,
        endpoint=endpoint,
        client=self._client,
        size=limit,
        start=offset,
        sort=sort,
    )
get_by_id(client_id: str) -> OAuthClientResponse

Retrieves the OAuth client with the specified client ID.

:param client_id: unique client identifier (e.g., 'oauth-client-xxx') :returns: the OAuthClientResponse with the specified client ID :raises AtlanError: on any API communication issue

Source code in pyatlan/client/oauth_client.py
@validate_arguments
def get_by_id(self, client_id: str) -> OAuthClientResponse:
    """
    Retrieves the OAuth client with the specified client ID.

    :param client_id: unique client identifier (e.g., 'oauth-client-xxx')
    :returns: the OAuthClientResponse with the specified client ID
    :raises AtlanError: on any API communication issue
    """
    endpoint, query_params = OAuthClientGetById.prepare_request(client_id)
    raw_json = self._client._call_api(endpoint, query_params)
    return OAuthClientGetById.process_response(raw_json)
purge(client_id: str) -> None

Delete (purge) the specified OAuth client.

:param client_id: unique client identifier (e.g., 'oauth-client-xxx') :raises AtlanError: on any API communication issue

Source code in pyatlan/client/oauth_client.py
@validate_arguments
def purge(self, client_id: str) -> None:
    """
    Delete (purge) the specified OAuth client.

    :param client_id: unique client identifier (e.g., 'oauth-client-xxx')
    :raises AtlanError: on any API communication issue
    """
    endpoint, _ = OAuthClientPurge.prepare_request(client_id)
    self._client._call_api(endpoint)
update(client_id: str, display_name: Optional[str] = None, description: Optional[str] = None) -> OAuthClientResponse

Update an existing OAuth client with the provided settings.

:param client_id: unique client identifier (e.g., 'oauth-client-xxx') :param display_name: human-readable name for the OAuth client :param description: optional explanation of the OAuth client :returns: the updated OAuthClientResponse :raises AtlanError: on any API communication issue

Source code in pyatlan/client/oauth_client.py
@validate_arguments
def update(
    self,
    client_id: str,
    display_name: Optional[str] = None,
    description: Optional[str] = None,
) -> OAuthClientResponse:
    """
    Update an existing OAuth client with the provided settings.

    :param client_id: unique client identifier (e.g., 'oauth-client-xxx')
    :param display_name: human-readable name for the OAuth client
    :param description: optional explanation of the OAuth client
    :returns: the updated OAuthClientResponse
    :raises AtlanError: on any API communication issue
    """
    endpoint, request_obj = OAuthClientUpdate.prepare_request(
        client_id, display_name, description
    )
    raw_json = self._client._call_api(endpoint, request_obj=request_obj)
    return OAuthClientUpdate.process_response(raw_json)