Skip to content

Models

pyatlan.model.search

Classes

DSL(__pydantic_self__, **data: Any)

Bases: AtlanObject

Source code in pyatlan/model/search.py
def __init__(__pydantic_self__, **data: Any) -> None:
    super().__init__(**data)
    __pydantic_self__.__fields_set__.update(
        [
            "from_",
            "size",
            "track_total_hits",
            "sort",
            "aggregations",
        ]
    )
Functions
sortitem_from_dict(sort)

Convert dictionary entries into SortItem instances before validation to ensure proper deserialization and prevent validation issues.

Source code in pyatlan/model/search.py
@validator("sort", pre=True)
def sortitem_from_dict(cls, sort):
    """
    Convert dictionary entries into SortItem instances before validation
    to ensure proper deserialization and prevent validation issues.
    """
    if all(isinstance(item, dict) for item in sort):
        sort = [SortItem.from_dict(item) for item in sort]
    return sort

Classes

CompoundQuery(wheres: Optional[List[Query]] = None, where_nots: Optional[List[Query]] = None, where_somes: Optional[List[Query]] = None, _min_somes: int = 1) dataclass

Class to compose compound queries combining various conditions.

Source code in pyatlan/model/fluent_search.py
def __init__(
    self,
    wheres: Optional[List[Query]] = None,
    where_nots: Optional[List[Query]] = None,
    where_somes: Optional[List[Query]] = None,
    _min_somes: int = 1,
):
    self.wheres = wheres
    self.where_nots = where_nots
    self.where_somes = where_somes
    self._min_somes = _min_somes
Functions
active_assets() -> Query staticmethod

Returns a query that will only match assets that are active in Atlan.

:returns: a query that will only match assets that are active in Atlan

Source code in pyatlan/model/fluent_search.py
@staticmethod
def active_assets() -> Query:
    """
    Returns a query that will only match assets that are active in Atlan.

    :returns: a query that will only match assets that are active in Atlan
    """
    return Referenceable.STATUS.eq(EntityStatus.ACTIVE.value)
archived_assets() -> Query staticmethod

Returns a query that will only match assets that are archived (soft-deleted) in Atlan.

:returns: a query that will only match assets that are archived (soft-deleted) in Atlan

Source code in pyatlan/model/fluent_search.py
@staticmethod
def archived_assets() -> Query:
    """
    Returns a query that will only match assets that are archived (soft-deleted) in Atlan.

    :returns: a query that will only match assets that are archived (soft-deleted) in Atlan
    """
    return Referenceable.STATUS.eq(EntityStatus.DELETED.value)
asset_type(of: type) -> Query staticmethod

Returns a query that will only match assets of the type provided.

:param of: type for assets to match :returns: a query that will only match assets of the type provided

Source code in pyatlan/model/fluent_search.py
@staticmethod
def asset_type(of: type) -> Query:
    """
    Returns a query that will only match assets of the type provided.

    :param of: type for assets to match
    :returns: a query that will only match assets of the type provided
    """
    return Referenceable.TYPE_NAME.eq(of.__name__)
asset_types(one_of: List[type]) -> Query staticmethod

Returns a query that will only match assets that are one of the types provided.

:param one_of: types for assets to match :returns: a query that iwll only match assets of one of the types provided

Source code in pyatlan/model/fluent_search.py
@staticmethod
def asset_types(one_of: List[type]) -> Query:
    """
    Returns a query that will only match assets that are one of the types provided.

    :param one_of: types for assets to match
    :returns: a query that iwll only match assets of one of the types provided
    """
    return Referenceable.TYPE_NAME.within(list(map(lambda x: x.__name__, one_of)))
assigned_term(qualified_names: Optional[List[str]] = None) -> Query staticmethod

Returns a query that will only match assets that have at least one term assigned. (If a list of qualified_names is specified, the assets that match must have at least one term assigned from within the list of qualified_names.)

:param qualified_names: the qualified_names of the terms :returns: a query that will only match assets that have at least one term assigned

Source code in pyatlan/model/fluent_search.py
@staticmethod
def assigned_term(qualified_names: Optional[List[str]] = None) -> Query:
    """
    Returns a query that will only match assets that have at least one term assigned.
    (If a list of qualified_names is specified, the assets that match must have at least
    one term assigned from within the list of qualified_names.)

    :param qualified_names: the qualified_names of the terms
    :returns: a query that will only match assets that have at least one term assigned
    """
    if qualified_names:
        return Referenceable.ASSIGNED_TERMS.within(qualified_names)
    return Referenceable.ASSIGNED_TERMS.has_any_value()
min_somes(minimum: int) -> SelfQuery

Sets the minimum number of 'where_somes' that must match for a result to be included.

:param minimum: minimum number of 'where_somes' that must match :returns: the compound query with this additional criterion applied

Source code in pyatlan/model/fluent_search.py
def min_somes(self: SelfQuery, minimum: int) -> SelfQuery:
    """
    Sets the minimum number of 'where_somes' that must match for a result to be included.

    :param minimum: minimum number of 'where_somes' that must match
    :returns: the compound query with this additional criterion applied
    """
    clone = self._clone()
    clone._min_somes = minimum
    return clone
super_types(one_of: Union[type, List[type]]) -> Query staticmethod

Returns a query that will match all assets that are a subtype of at least one of the types provided.

:param one_of: type name(s) of the supertypes for assets to match :returns: a query that will only match assets of a subtype of the types provided

Source code in pyatlan/model/fluent_search.py
@staticmethod
def super_types(one_of: Union[type, List[type]]) -> Query:
    """
    Returns a query that will match all assets that are a subtype of at least one of
    the types provided.

    :param one_of: type name(s) of the supertypes for assets to match
    :returns: a query that will only match assets of a subtype of the types provided
    """
    if isinstance(one_of, list):
        return Referenceable.SUPER_TYPE_NAMES.within(
            list(map(lambda x: x.__name__, one_of))
        )
    return Referenceable.SUPER_TYPE_NAMES.eq(one_of.__name__)
tagged(client: AtlanClient, with_one_of: Optional[List[str]] = None, directly: bool = False) -> Query staticmethod

Returns a query that will only match assets that have at least one of the Atlan tags provided. This will match irrespective of the Atlan tag being directly applied to the asset, or if it was propagated to the asset.

:param client: connectivity to an Atlan tenant :param with_one_of: human-readable names of the Atlan tags :param directly: when True, the asset must have at least one Atlan tag directly assigned, otherwise even propagated tags will suffice :returns: a query that will only match assets that have at least one of the Atlan tags provided

Source code in pyatlan/model/fluent_search.py
@staticmethod
def tagged(
    client: AtlanClient,
    with_one_of: Optional[List[str]] = None,
    directly: bool = False,
) -> Query:
    """
    Returns a query that will only match assets that have at least one of the Atlan tags
    provided. This will match irrespective of the Atlan tag being directly applied to the
    asset, or if it was propagated to the asset.

    :param client: connectivity to an Atlan tenant
    :param with_one_of: human-readable names of the Atlan tags
    :param directly: when True, the asset must have at least one Atlan tag directly assigned, otherwise
                     even propagated tags will suffice
    :returns: a query that will only match assets that have at least one of the Atlan tags provided
    """
    values: List[str] = []
    if with_one_of:
        for name in with_one_of:
            if tag_id := client.atlan_tag_cache.get_id_for_name(name):
                values.append(tag_id)
            else:
                raise ErrorCode.ATLAN_TAG_NOT_FOUND_BY_NAME.exception_with_parameters(
                    name
                )
    if directly:
        if values:
            return FluentSearch(
                wheres=[Referenceable.ATLAN_TAGS.within(values)]
            ).to_query()
        return FluentSearch(
            wheres=[Referenceable.ATLAN_TAGS.has_any_value()]
        ).to_query()
    if values:
        return FluentSearch(
            where_somes=[
                Referenceable.ATLAN_TAGS.within(values),
                Referenceable.PROPAGATED_ATLAN_TAGS.within(values),
            ],
            _min_somes=1,
        ).to_query()
    return FluentSearch(
        where_somes=[
            Referenceable.ATLAN_TAGS.has_any_value(),
            Referenceable.PROPAGATED_ATLAN_TAGS.has_any_value(),
        ],
        _min_somes=1,
    ).to_query()
tagged_with_value(client: AtlanClient, atlan_tag_name: str, value: str, directly: bool = False, source_tag_qualified_name: Optional[str] = None) -> Query staticmethod

Returns a query that will match assets that have a specific value for the specified tag (for source-synced tags).

:param client: connectivity to an Atlan tenant :param atlan_tag_name: human-readable name of the Atlan tag :param value: tag should have to match the query :param directly: when True, the asset must have the tag and value directly assigned (otherwise even propagated tags with the value will suffice) :param source_tag_qualified_name: (optional) qualified name of the source tag to match (when there are multiple) :raises: AtlanError on any error communicating with the API to refresh the Atlan tag cache :returns: a query that will only match assets that have a particular value assigned for the given Atlan tag

Source code in pyatlan/model/fluent_search.py
@staticmethod
def tagged_with_value(
    client: AtlanClient,
    atlan_tag_name: str,
    value: str,
    directly: bool = False,
    source_tag_qualified_name: Optional[str] = None,
) -> Query:
    """
    Returns a query that will match assets that have a
    specific value for the specified tag (for source-synced tags).

    :param client: connectivity to an Atlan tenant
    :param atlan_tag_name: human-readable name of the Atlan tag
    :param value: tag should have to match the query
    :param directly: when `True`, the asset must have the tag and
    value directly assigned (otherwise even propagated tags with the value will suffice)
    :param source_tag_qualified_name: (optional) qualified name of
    the source tag to match (when there are multiple)
    :raises: AtlanError on any error communicating
    with the API to refresh the Atlan tag cache
    :returns: a query that will only match assets that have
    a particular value assigned for the given Atlan tag
    """
    big_spans = []
    little_spans = []
    tag_id = client.atlan_tag_cache.get_id_for_name(atlan_tag_name) or ""
    synced_tags = [
        tag
        for tag in (
            FluentSearch()
            .select()
            .where(Tag.MAPPED_CLASSIFICATION_NAME.eq(tag_id))
            .execute(client=client)
        )
    ]
    if len(synced_tags) > 1 and source_tag_qualified_name is None:
        synced_tag_qn = synced_tags[0].qualified_name or ""
        LOGGER.warning(
            "Multiple mapped source-synced tags found for tag %s -- using only the first: %s. "
            "You can specify the `source_tag_qualified_name` so we can match to the specific one.",
            atlan_tag_name,
            synced_tag_qn,
        )
    elif synced_tags:
        synced_tag_qn = (
            source_tag_qualified_name or synced_tags[0].qualified_name or ""
        )
    else:
        synced_tag_qn = "NON_EXISTENT"

    # Contruct little spans
    little_spans.append(
        SpanTerm(field="__classificationsText.text", value="tagAttachmentValue")
    )
    for token in value.split(" "):
        little_spans.append(
            SpanTerm(field="__classificationsText.text", value=token)
        )

    # Add span_or clause with tag-related terms
    span_or_clauses = [
        SpanTerm(field="__classificationsText.text", value="tagAttachmentKey"),
        SpanTerm(field="__classificationsText.text", value="sourceTagName"),
        SpanTerm(
            field="__classificationsText.text", value="sourceTagQualifiedName"
        ),
        SpanTerm(field="__classificationsText.text", value="sourceTagGuid"),
        SpanTerm(
            field="__classificationsText.text", value="sourceTagConnectorName"
        ),
        SpanTerm(field="__classificationsText.text", value="isSourceTagSynced"),
        SpanTerm(
            field="__classificationsText.text", value="sourceTagSyncTimestamp"
        ),
        SpanTerm(field="__classificationsText.text", value="sourceTagValue"),
    ]
    little_spans.append(SpanOr(clauses=span_or_clauses))  # type: ignore

    # Contruct big spans
    big_spans.append(SpanTerm(field="__classificationsText.text", value=tag_id))
    big_spans.append(
        SpanTerm(field="__classificationsText.text", value=synced_tag_qn)
    )

    # Contruct final span query
    span = SpanWithin(
        little=SpanNear(clauses=little_spans, slop=0, in_order=True),
        big=SpanNear(clauses=big_spans, slop=10000000, in_order=True),
    )

    # Without atlan tag propagation
    if directly:
        return (
            FluentSearch()
            .where(Referenceable.ATLAN_TAGS.eq(tag_id))
            .where(span)
            .to_query()
        )
    # With atlan tag propagation
    return (
        FluentSearch()
        .where_some(Referenceable.ATLAN_TAGS.eq(tag_id))
        .where_some(Referenceable.PROPAGATED_ATLAN_TAGS.eq(tag_id))
        .min_somes(1)
        .where(span)
        .to_query()
    )
tagged_with_value_async(client: 'AsyncAtlanClient', atlan_tag_name: str, value: str, directly: bool = False, source_tag_qualified_name: Optional[str] = None) -> Query async staticmethod

Async version of tagged_with_value that returns a query matching assets with a specific value for the specified tag (for source-synced tags).

:param client: async connectivity to an Atlan tenant :param atlan_tag_name: human-readable name of the Atlan tag :param value: tag should have to match the query :param directly: when True, the asset must have the tag and value directly assigned (otherwise even propagated tags with the value will suffice) :param source_tag_qualified_name: (optional) qualified name of the source tag to match (when there are multiple) :raises: AtlanError on any error communicating with the API to refresh the Atlan tag cache :returns: a query that will only match assets that have a particular value assigned for the given Atlan tag

Source code in pyatlan/model/fluent_search.py
@staticmethod
async def tagged_with_value_async(
    client: "AsyncAtlanClient",
    atlan_tag_name: str,
    value: str,
    directly: bool = False,
    source_tag_qualified_name: Optional[str] = None,
) -> Query:
    """
    Async version of tagged_with_value that returns a query matching assets
    with a specific value for the specified tag (for source-synced tags).

    :param client: async connectivity to an Atlan tenant
    :param atlan_tag_name: human-readable name of the Atlan tag
    :param value: tag should have to match the query
    :param directly: when `True`, the asset must have the tag and
    value directly assigned (otherwise even propagated tags with the value will suffice)
    :param source_tag_qualified_name: (optional) qualified name of
    the source tag to match (when there are multiple)
    :raises: AtlanError on any error communicating
    with the API to refresh the Atlan tag cache
    :returns: a query that will only match assets that have
    a particular value assigned for the given Atlan tag
    """
    big_spans = []
    little_spans = []
    tag_id = await client.atlan_tag_cache.get_id_for_name(atlan_tag_name) or ""
    synced_tags = [
        tag
        async for tag in (
            await FluentSearch()
            .select()
            .where(Tag.MAPPED_CLASSIFICATION_NAME.eq(tag_id))
            .execute_async(client=client)
        )
    ]
    if len(synced_tags) > 1 and source_tag_qualified_name is None:
        synced_tag_qn = synced_tags[0].qualified_name or ""
        LOGGER.warning(
            "Multiple mapped source-synced tags found for tag %s -- using only the first: %s. "
            "You can specify the `source_tag_qualified_name` so we can match to the specific one.",
            atlan_tag_name,
            synced_tag_qn,
        )
    elif synced_tags:
        synced_tag_qn = (
            source_tag_qualified_name or synced_tags[0].qualified_name or ""
        )
    else:
        synced_tag_qn = "NON_EXISTENT"

    # Contruct little spans
    little_spans.append(
        SpanTerm(field="__classificationsText.text", value="tagAttachmentValue")
    )
    for token in value.split(" "):
        little_spans.append(
            SpanTerm(field="__classificationsText.text", value=token)
        )

    # Add span_or clause with tag-related terms
    span_or_clauses = [
        SpanTerm(field="__classificationsText.text", value="tagAttachmentKey"),
        SpanTerm(field="__classificationsText.text", value="sourceTagName"),
        SpanTerm(
            field="__classificationsText.text", value="sourceTagQualifiedName"
        ),
        SpanTerm(field="__classificationsText.text", value="sourceTagGuid"),
        SpanTerm(
            field="__classificationsText.text", value="sourceTagConnectorName"
        ),
        SpanTerm(field="__classificationsText.text", value="isSourceTagSynced"),
        SpanTerm(
            field="__classificationsText.text", value="sourceTagSyncTimestamp"
        ),
        SpanTerm(field="__classificationsText.text", value="sourceTagValue"),
    ]
    little_spans.append(SpanOr(clauses=span_or_clauses))  # type: ignore

    # Contruct big spans
    big_spans.append(SpanTerm(field="__classificationsText.text", value=tag_id))
    big_spans.append(
        SpanTerm(field="__classificationsText.text", value=synced_tag_qn)
    )

    # Contruct final span query
    span = SpanWithin(
        little=SpanNear(clauses=little_spans, slop=0, in_order=True),
        big=SpanNear(clauses=big_spans, slop=10000000, in_order=True),
    )

    # Without atlan tag propagation
    if directly:
        return (
            FluentSearch()
            .where(Referenceable.ATLAN_TAGS.eq(tag_id))
            .where(span)
            .to_query()
        )
    # With atlan tag propagation
    return (
        FluentSearch()
        .where_some(Referenceable.ATLAN_TAGS.eq(tag_id))
        .where_some(Referenceable.PROPAGATED_ATLAN_TAGS.eq(tag_id))
        .min_somes(1)
        .where(span)
        .to_query()
    )
to_query() -> Query

Translate the Atlan compound query into an Elastic Query object.

:returns: an Elastic Query object that represents the compound query

Source code in pyatlan/model/fluent_search.py
def to_query(self) -> Query:
    """
    Translate the Atlan compound query into an Elastic Query object.

    :returns: an Elastic Query object that represents the compound query
    """
    q = Bool()
    q.filter = self.wheres or []
    q.must_not = self.where_nots or []
    if self.where_somes:
        q.should = self.where_somes
        q.minimum_should_match = self._min_somes
    return q
where(query: Query) -> SelfQuery

Add a single criterion that must be present on every search result. (Note: these are translated to filters.)

:param query: the query to set as a criterion that must be present on every search result :returns: the compound query with this additional criterion added

Source code in pyatlan/model/fluent_search.py
def where(self: SelfQuery, query: Query) -> SelfQuery:
    """
    Add a single criterion that must be present on every search result.
    (Note: these are translated to filters.)

    :param query: the query to set as a criterion that must be present on every search result
    :returns: the compound query with this additional criterion added
    """
    clone = self._clone()
    if clone.wheres is None:
        clone.wheres = []
    clone.wheres.append(query)
    return clone
where_not(query: Query) -> SelfQuery

Add a single criterion that must not be present on any search result.

:param query: the query to set as a criterion that must not be present on any search result :returns: the compound query with this additional criterion added

Source code in pyatlan/model/fluent_search.py
def where_not(self: SelfQuery, query: Query) -> SelfQuery:
    """
    Add a single criterion that must not be present on any search result.

    :param query: the query to set as a criterion that must not be present on any search result
    :returns: the compound query with this additional criterion added
    """
    clone = self._clone()
    if clone.where_nots is None:
        clone.where_nots = []
    clone.where_nots.append(query)
    return clone
where_some(query: Query) -> SelfQuery

Add a single criterion at least some of which should be present on each search result. You can control "how many" of the criteria added this way are a minimum for each search result to match through the 'minimum' property.

:param query: the query to set as a criterion some number of which should be present on a search result :returns: the compound query with this additional criterion added

Source code in pyatlan/model/fluent_search.py
def where_some(self: SelfQuery, query: Query) -> SelfQuery:
    """
    Add a single criterion at least some of which should be present on each search result.
    You can control "how many" of the criteria added this way are a minimum for each search
    result to match through the 'minimum' property.

    :param query: the query to set as a criterion some number of which should be present on a search result
    :returns: the compound query with this additional criterion added
    """
    clone = self._clone()
    if clone.where_somes is None:
        clone.where_somes = []
    clone.where_somes.append(query)
    return clone

FluentSearch(wheres: Optional[List[Query]] = None, where_nots: Optional[List[Query]] = None, where_somes: Optional[List[Query]] = None, _min_somes: int = 1, sorts: Optional[List[SortItem]] = None, aggregations: Optional[Dict[str, Aggregation]] = None, _page_size: Optional[int] = None, _includes_on_results: Optional[List[str]] = None, _includes_on_relations: Optional[List[str]] = None, _include_relationship_attributes: Optional[bool] = False, _enable_full_restriction: Optional[bool] = False) dataclass

Bases: CompoundQuery

Class to compose compound queries combining various conditions.

Source code in pyatlan/model/fluent_search.py
def __init__(
    self,
    wheres: Optional[List[Query]] = None,
    where_nots: Optional[List[Query]] = None,
    where_somes: Optional[List[Query]] = None,
    _min_somes: int = 1,
    sorts: Optional[List[SortItem]] = None,
    aggregations: Optional[Dict[str, Aggregation]] = None,
    _page_size: Optional[int] = None,
    _includes_on_results: Optional[List[str]] = None,
    _includes_on_relations: Optional[List[str]] = None,
    _include_relationship_attributes: Optional[bool] = False,
    _enable_full_restriction: Optional[bool] = False,
):
    super().__init__(wheres, where_nots, where_somes, _min_somes)
    self.sorts = sorts
    self.aggregations = aggregations
    self._page_size = _page_size
    self._includes_on_results = _includes_on_results
    self._includes_on_relations = _includes_on_relations
    self._include_relationship_attributes = _include_relationship_attributes
    self._enable_full_restriction = _enable_full_restriction
Functions
aggregate(key: str, aggregation: Aggregation) -> 'FluentSearch'

Add an aggregation to run against the results of the search. You provide any key you want (you'll use it to look at the results of a specific aggregation).

:param key: you want to use to look at the results of the aggregation :param aggregation: you want to calculate :returns: the fluent search with this aggregation added

Source code in pyatlan/model/fluent_search.py
def aggregate(self, key: str, aggregation: Aggregation) -> "FluentSearch":
    """
    Add an aggregation to run against the results of the search.
    You provide any key you want (you'll use it to look at the results of a specific aggregation).

    :param key: you want to use to look at the results of the aggregation
    :param aggregation: you want to calculate
    :returns: the fluent search with this aggregation added
    """
    clone = self._clone()
    if clone.aggregations is None:
        clone.aggregations = {}
    clone.aggregations[key] = aggregation
    return clone
count(client: AtlanClient) -> int

Return the total number of assets that will match the supplied criteria, using the most minimal query possible (retrieves minimal data).

:param client: client through which to count the assets :returns: the count of assets that will match the supplied criteria

Source code in pyatlan/model/fluent_search.py
def count(self, client: AtlanClient) -> int:
    """
    Return the total number of assets that will match the supplied criteria,
    using the most minimal query possible (retrieves minimal data).

    :param client: client through which to count the assets
    :returns: the count of assets that will match the supplied criteria
    """
    dsl = self._dsl()
    dsl.size = 1
    request = IndexSearchRequest(dsl=dsl)
    return client.asset.search(request).count
enable_full_restriction(enable: bool) -> 'FluentSearch'

Add an attribute to enable full authorization restrictions for search results.

:param include: when True, applies full authorization restrictions to show only assets the user has access to. By default, this is False and standard access controls apply :returns: the fluent search with this parameter added

Source code in pyatlan/model/fluent_search.py
def enable_full_restriction(self, enable: bool) -> "FluentSearch":
    """
    Add an attribute to enable full authorization restrictions for search results.

    :param include: when `True`, applies full authorization restrictions to show only assets
    the user has access to. By default, this is `False` and standard access controls apply
    :returns: the fluent search with this parameter added
    """
    clone = self._clone()
    clone._enable_full_restriction = enable
    return clone
execute(client: AtlanClient, bulk: bool = False) -> IndexSearchResults

Run the fluent search to retrieve assets that match the supplied criteria. Note: if the number of results exceeds the predefined threshold (100,000 assets) this will be automatically converted into a bulk search.

:param client: client through which to retrieve the assets. :param bulk: whether to run the search to retrieve assets that match the supplied criteria, for large numbers of results (> 100,000), defaults to False. Note: this will reorder the results (based on creation timestamp) in order to iterate through a large number (more than 100,000) results. :raises InvalidRequestError:

- if bulk search is enabled (`bulk=True`) and any
  user-specified sorting options are found in the search request.
- if bulk search is disabled (`bulk=False`) and the number of results
  exceeds the predefined threshold (i.e: `100,000` assets)
  and any user-specified sorting options are found in the search request.

:raises AtlanError: on any API communication issue :returns: an iterable list of assets that match the supplied criteria, lazily-fetched

Source code in pyatlan/model/fluent_search.py
def execute(self, client: AtlanClient, bulk: bool = False) -> IndexSearchResults:
    """
    Run the fluent search to retrieve assets that match the supplied criteria.
    `Note:` if the number of results exceeds the predefined threshold
    (100,000 assets) this will be automatically converted into a `bulk` search.

    :param client: client through which to retrieve the assets.
    :param bulk: whether to run the search to retrieve assets that match the supplied criteria,
    for large numbers of results (> `100,000`), defaults to `False`. Note: this will reorder the results
    (based on creation timestamp) in order to iterate through a large number (more than `100,000`) results.
    :raises InvalidRequestError:

        - if bulk search is enabled (`bulk=True`) and any
          user-specified sorting options are found in the search request.
        - if bulk search is disabled (`bulk=False`) and the number of results
          exceeds the predefined threshold (i.e: `100,000` assets)
          and any user-specified sorting options are found in the search request.

    :raises AtlanError: on any API communication issue
    :returns: an iterable list of assets that match the supplied criteria, lazily-fetched
    """
    return client.asset.search(criteria=self.to_request(), bulk=bulk)
execute_async(client: AsyncAtlanClient, bulk: bool = False) -> AsyncIndexSearchResults async

Run the fluent search asynchronously to retrieve assets that match the supplied criteria. Note: if the number of results exceeds the predefined threshold (100,000 assets) this will be automatically converted into a bulk search.

:param client: async client through which to retrieve the assets. :param bulk: whether to run the search to retrieve assets that match the supplied criteria, for large numbers of results (> 100,000), defaults to False. Note: this will reorder the results (based on creation timestamp) in order to iterate through a large number (more than 100,000) results. :raises InvalidRequestError:

- if bulk search is enabled (`bulk=True`) and any
  user-specified sorting options are found in the search request.
- if bulk search is disabled (`bulk=False`) and the number of results
  exceeds the predefined threshold (i.e: `100,000` assets)
  and any user-specified sorting options are found in the search request.

:raises AtlanError: on any API communication issue :returns: an async iterable list of assets that match the supplied criteria, lazily-fetched

Source code in pyatlan/model/fluent_search.py
async def execute_async(
    self, client: AsyncAtlanClient, bulk: bool = False
) -> AsyncIndexSearchResults:
    """
    Run the fluent search asynchronously to retrieve assets that match the supplied criteria.
    `Note:` if the number of results exceeds the predefined threshold
    (100,000 assets) this will be automatically converted into a `bulk` search.

    :param client: async client through which to retrieve the assets.
    :param bulk: whether to run the search to retrieve assets that match the supplied criteria,
    for large numbers of results (> `100,000`), defaults to `False`. Note: this will reorder the results
    (based on creation timestamp) in order to iterate through a large number (more than `100,000`) results.
    :raises InvalidRequestError:

        - if bulk search is enabled (`bulk=True`) and any
          user-specified sorting options are found in the search request.
        - if bulk search is disabled (`bulk=False`) and the number of results
          exceeds the predefined threshold (i.e: `100,000` assets)
          and any user-specified sorting options are found in the search request.

    :raises AtlanError: on any API communication issue
    :returns: an async iterable list of assets that match the supplied criteria, lazily-fetched
    """
    return await client.asset.search(criteria=self.to_request(), bulk=bulk)
include_on_relations(field: Union[str, AtlanField]) -> 'FluentSearch'

Add an attribute to retrieve for each asset related to the assets in the results.

:param field: attribute to retrieve for each related asset of each result :returns: the fluent search with this parameter added

Source code in pyatlan/model/fluent_search.py
def include_on_relations(self, field: Union[str, AtlanField]) -> "FluentSearch":
    """
    Add an attribute to retrieve for each asset related to the assets in the results.

    :param field: attribute to retrieve for each related asset of each result
    :returns: the fluent search with this parameter added
    """
    clone = self._clone()
    if clone._includes_on_relations is None:
        clone._includes_on_relations = []
    if isinstance(field, AtlanField):
        clone._includes_on_relations.append(field.atlan_field_name)
    else:
        clone._includes_on_relations.append(field)
    return clone
include_on_results(field: Union[str, AtlanField]) -> 'FluentSearch'

Add an attribute to retrieve for each asset in the results.

:param field: attribute to retrieve for each result :returns: the fluent search with this parameter added

Source code in pyatlan/model/fluent_search.py
def include_on_results(self, field: Union[str, AtlanField]) -> "FluentSearch":
    """
    Add an attribute to retrieve for each asset in the results.

    :param field: attribute to retrieve for each result
    :returns: the fluent search with this parameter added
    """
    clone = self._clone()
    if clone._includes_on_results is None:
        clone._includes_on_results = []
    if isinstance(field, AtlanField):
        clone._includes_on_results.append(field.atlan_field_name)
    else:
        clone._includes_on_results.append(field)
    return clone
include_relationship_attributes(include: bool) -> 'FluentSearch'

Add an attribute to include relationship attributes on each relationship in the results.

:param include: include relationship attributes on each relationship in the results :returns: the fluent search with this parameter added

Source code in pyatlan/model/fluent_search.py
def include_relationship_attributes(self, include: bool) -> "FluentSearch":
    """
    Add an attribute to include relationship attributes on each relationship in the results.

    :param include: include relationship attributes on each relationship in the results
    :returns: the fluent search with this parameter added
    """
    clone = self._clone()
    # When enabling `include_relationship_attributes`
    # it's mandatory to include the "name" field
    # to ensure relationship names are included in the response.
    # Omitting "name" will result in a 500 error from the metastore.
    clone = self.include_on_relations("name")
    clone._include_relationship_attributes = include
    return clone
page_size(size: int) -> 'FluentSearch'

Set the number of results to retrieve per underlying API request.

:param size: number of results to retrieve per underlying API request :returns: the fluent search with this parameter configured

Source code in pyatlan/model/fluent_search.py
def page_size(self, size: int) -> "FluentSearch":
    """
    Set the number of results to retrieve per underlying API request.

    :param size: number of results to retrieve per underlying API request
    :returns: the fluent search with this parameter configured
    """
    clone = self._clone()
    clone._page_size = size
    return clone
select(include_archived=False) -> 'FluentSearch' classmethod

Start a fluent search that will return all assets. Additional conditions can be chained onto the returned object before any asset retrieval is attempted, ensuring all conditions are pushed-down for optimal retrieval. Only active (non-archived) assets will be included.

:param include_archived: when True, archived (soft-deleted) assets will be included :returns: a fluent search that includes all assets

Source code in pyatlan/model/fluent_search.py
@classmethod
def select(cls, include_archived=False) -> "FluentSearch":
    """
    Start a fluent search that will return all assets.
    Additional conditions can be chained onto the returned object before any
    asset retrieval is attempted, ensuring all conditions are pushed-down for
    optimal retrieval. Only active (non-archived) assets will be included.

    :param include_archived: when True, archived (soft-deleted) assets will be included
    :returns: a fluent search that includes all assets
    """
    wheres = [Term.with_super_type_names("Asset")]
    if not include_archived:
        wheres.append(Term.with_state("ACTIVE"))
    return cls(
        wheres=wheres,
    )
sort(by: SortItem) -> 'FluentSearch'

Add a criterion by which to sort the results.

:param by: criterion by which to sort the results :returns: the fluent search with this sorting criterion added

Source code in pyatlan/model/fluent_search.py
def sort(self, by: SortItem) -> "FluentSearch":
    """
    Add a criterion by which to sort the results.

    :param by: criterion by which to sort the results
    :returns: the fluent search with this sorting criterion added
    """
    clone = self._clone()
    if clone.sorts is None:
        clone.sorts = []
    clone.sorts.append(by)
    return clone
to_request() -> IndexSearchRequest

Translate the Atlan fluent search into an Atlan search request.

:returns: an Atlan search request that encapsulates the fluent search

Source code in pyatlan/model/fluent_search.py
def to_request(self) -> IndexSearchRequest:
    """
    Translate the Atlan fluent search into an Atlan search request.

    :returns: an Atlan search request that encapsulates the fluent search
    """
    dsl = self._dsl()
    # Page size can be "0"
    if self._page_size is not None:
        dsl.size = self._page_size
    if self.sorts:
        dsl.sort = self.sorts
    if self.aggregations:
        dsl.aggregations.update(self.aggregations)
    request = IndexSearchRequest(dsl=dsl)
    if self._includes_on_results:
        request.attributes = self._includes_on_results
    if self._includes_on_relations:
        request.relation_attributes = self._includes_on_relations
    if self._include_relationship_attributes:
        request.include_relationship_attributes = (
            self._include_relationship_attributes
        )
    if self._enable_full_restriction:
        request.enable_full_restriction = self._enable_full_restriction
    return request

Typedef

pyatlan.model.typedef

Classes

AttributeDef(**data)

Bases: AtlanObject

Source code in pyatlan/model/typedef.py
def __init__(self, **data):
    super().__init__(**data)
    self.options._attr_def = self
Attributes
applicable_ai_asset_types: AIAssetTypes property writable

AI asset type names to which this attribute is restricted. Only AI assets of the specified types will have this attribute available.

applicable_asset_types: Union[Set[str], AssetTypes] property writable

Asset type names to which to restrict the attribute. Only assets of one of these types will have this attribute available. To further restrict the assets for this custom metadata by connection, see applicable_connections.

applicable_connections: Set[str] property writable

Qualified names of connections to which to restrict the attribute. Only assets within one of these connections will have this attribute available. To further restrict the types of assets within the glossaries, see applicable_asset_types}.

applicable_domain_types: DomainTypes property writable

Data product type names to which to restrict the attribute. These cover asset types in data products and data domains. Only assets of one of these types will have this attribute available.

applicable_domains: Set[str] property writable

Qualified names of domains to which to restrict the attribute. Only domains and data products within one of these domains will have this attribute available. To further restrict the types of assets within the domains, see applicable_domain_types.

applicable_entity_types: EntityTypes property writable

Set of entities on which this attribute can be applied. Note: generally this should be left as-is. Any overrides should instead be applied through one or more of applicable_asset_types, applicable_glossary_types, or applicable_other_asset_types.

applicable_glossaries: Set[str] property writable

Qualified names of glossaries to which to restrict the attribute. Only glossary assets within one of these glossaries will have this attribute available. To further restrict the types of assets within the glossaries, see applicable_glossary_types.

applicable_glossary_types: GlossaryTypes property writable

Glossary type names to which to restrict the attribute. Only glossary assets of one of these types will have this attribute available. To further restrict the glossary content for this custom metadata by glossary, see applicable_glossaries.

applicable_other_asset_types: OtherAssetTypes property writable

Any other asset type names to which to restrict the attribute. These cover any asset type that is not managed within a connection or a glossary. Only assets of one of these types will have this attribute available.

Functions
create_async(client, display_name: str, attribute_type: AtlanCustomAttributePrimitiveType, multi_valued: bool = False, options_name: Optional[str] = None, applicable_connections: Optional[Set[str]] = None, applicable_asset_types: Optional[Union[Set[str], AssetTypes]] = None, applicable_glossaries: Optional[Set[str]] = None, applicable_glossary_types: Optional[GlossaryTypes] = None, applicable_other_asset_types: Optional[OtherAssetTypes] = None, applicable_domains: Optional[Set[str]] = None, applicable_domain_types: Optional[DomainTypes] = None, applicable_ai_asset_types: Optional[AIAssetTypes] = None, description: Optional[str] = None) -> AttributeDef async staticmethod

Create an AttributeDef with async client support.

:param client: AsyncAtlanClient instance :param display_name: human-readable name for the attribute :param attribute_type: type of the attribute :param multi_valued: whether the attribute can have multiple values :param options_name: name of the enumeration (if type is OPTIONS) :param applicable_connections: connections where this attribute applies :param applicable_asset_types: asset types where this attribute applies :param applicable_glossaries: glossaries where this attribute applies :param applicable_glossary_types: glossary types where this attribute applies :param applicable_other_asset_types: other asset types where this attribute applies :param applicable_domains: domains where this attribute applies :param applicable_domain_types: domain types where this attribute applies :param applicable_ai_asset_types: AI asset types where this attribute applies :param description: description of the attribute :returns: AttributeDef configured for the specified parameters

Source code in pyatlan/model/typedef.py
@staticmethod
async def create_async(
    client,  # AsyncAtlanClient
    display_name: str,
    attribute_type: AtlanCustomAttributePrimitiveType,
    multi_valued: bool = False,
    options_name: Optional[str] = None,
    applicable_connections: Optional[Set[str]] = None,
    applicable_asset_types: Optional[Union[Set[str], AssetTypes]] = None,
    applicable_glossaries: Optional[Set[str]] = None,
    applicable_glossary_types: Optional[GlossaryTypes] = None,
    applicable_other_asset_types: Optional[OtherAssetTypes] = None,
    applicable_domains: Optional[Set[str]] = None,
    applicable_domain_types: Optional[DomainTypes] = None,
    applicable_ai_asset_types: Optional[AIAssetTypes] = None,
    description: Optional[str] = None,
) -> AttributeDef:
    """
    Create an AttributeDef with async client support.

    :param client: AsyncAtlanClient instance
    :param display_name: human-readable name for the attribute
    :param attribute_type: type of the attribute
    :param multi_valued: whether the attribute can have multiple values
    :param options_name: name of the enumeration (if type is OPTIONS)
    :param applicable_connections: connections where this attribute applies
    :param applicable_asset_types: asset types where this attribute applies
    :param applicable_glossaries: glossaries where this attribute applies
    :param applicable_glossary_types: glossary types where this attribute applies
    :param applicable_other_asset_types: other asset types where this attribute applies
    :param applicable_domains: domains where this attribute applies
    :param applicable_domain_types: domain types where this attribute applies
    :param applicable_ai_asset_types: AI asset types where this attribute applies
    :param description: description of the attribute
    :returns: AttributeDef configured for the specified parameters
    """
    from pyatlan.utils import validate_required_fields

    validate_required_fields(
        ["display_name", "attribute_type"],
        [display_name, attribute_type],
    )
    # RichText attributes cannot be multi-valued
    if (
        attribute_type == AtlanCustomAttributePrimitiveType.RICH_TEXT
        and multi_valued
    ):
        raise ErrorCode.INVALID_RICH_TEXT_CREATION.exception_with_parameters(
            display_name
        )

    # Async version of _get_all_qualified_names helper
    async def _get_all_qualified_names_async(asset_type: str):
        from pyatlan.model.assets import Asset
        from pyatlan.model.fluent_search import FluentSearch

        request = (
            FluentSearch.select()
            .where(Asset.TYPE_NAME.eq(asset_type))
            .include_on_results(Asset.QUALIFIED_NAME)
            .to_request()
        )
        results = await client.asset.search(request)
        names = []
        async for result in results:
            names.append(result.qualified_name or "")
        return set(names)

    # Create the AttributeDef - match sync implementation exactly
    attr_def = AttributeDef(
        display_name=display_name,
        options=AttributeDef.Options.create(
            attribute_type=attribute_type, options_name=options_name
        ),
        is_new=True,
        cardinality=Cardinality.SINGLE,
        description=description,
        name="",
        include_in_notification=False,
        is_indexable=True,
        is_optional=True,
        is_unique=False,
        values_min_count=0,
        values_max_count=1,
    )
    add_enum_values = attribute_type == AtlanCustomAttributePrimitiveType.OPTIONS
    if attribute_type == AtlanCustomAttributePrimitiveType.OPTIONS:
        base_type = options_name
    elif attribute_type in (
        AtlanCustomAttributePrimitiveType.USERS,
        AtlanCustomAttributePrimitiveType.GROUPS,
        AtlanCustomAttributePrimitiveType.URL,
        AtlanCustomAttributePrimitiveType.SQL,
    ):
        base_type = AtlanCustomAttributePrimitiveType.STRING.value
    else:
        base_type = attribute_type.value
    if multi_valued:
        attr_def.type_name = f"array<{str(base_type)}>"
        attr_def.options.multi_value_select = True  # type: ignore
    else:
        attr_def.type_name = base_type
    if add_enum_values:
        if enum_def := await client.enum_cache.get_by_name(str(options_name)):
            attr_def.enum_values = enum_def.get_valid_values()
        else:
            attr_def.enum_values = []

    attr_def.applicable_asset_types = applicable_asset_types or _complete_type_list
    attr_def.applicable_glossary_types = (
        applicable_glossary_types or _all_glossary_types
    )
    attr_def.applicable_domain_types = applicable_domain_types or _all_domain_types
    attr_def.applicable_other_asset_types = (
        applicable_other_asset_types or _all_other_types
    )
    attr_def.applicable_connections = (
        applicable_connections or await _get_all_qualified_names_async("Connection")
    )
    attr_def.applicable_glossaries = (
        applicable_glossaries
        or await _get_all_qualified_names_async("AtlasGlossary")
    )
    attr_def.applicable_domains = applicable_domains or _all_domains
    attr_def.applicable_ai_asset_types = applicable_ai_asset_types or set()
    return attr_def

EnumDef

Bases: TypeDef

Classes
ElementDef

Bases: AtlanObject

Functions
extend_elements(current: List[str], new: List[str]) -> List[str] staticmethod

Extends the element definitions without duplications and also retains the order of the current enum values.

:param current: current list of element definitions. :param new: list of new element definitions to be added. :return: list of unique element definitions without duplications.

Source code in pyatlan/model/typedef.py
@staticmethod
def extend_elements(current: List[str], new: List[str]) -> List[str]:
    """
    Extends the element definitions without duplications
    and also retains the order of the current enum values.

    :param current: current list of element definitions.
    :param new: list of new element definitions to be added.
    :return: list of unique element definitions without duplications.
    """
    unique_elements = set(current)
    # Make a copy of current values list
    extended_list = current[:]
    for element in new:
        if element not in unique_elements:
            extended_list.append(element)
            unique_elements.add(element)
    return extended_list
Functions
create(name: str, values: List[str]) -> EnumDef staticmethod

Builds the minimal object necessary to create an enumeration definition.

:param name: display name the human-readable name for the enumeration :param values: the list of additional valid values (as strings) to add to the existing enumeration :returns: the minimal object necessary to create the enumeration typedef

Source code in pyatlan/model/typedef.py
@staticmethod
def create(name: str, values: List[str]) -> EnumDef:
    """
    Builds the minimal object necessary to create an enumeration definition.

    :param name: display name the human-readable name for the enumeration
    :param values: the list of additional valid values
    (as strings) to add to the existing enumeration
    :returns: the minimal object necessary to create the enumeration typedef
    """
    from pyatlan.utils import validate_required_fields

    validate_required_fields(
        ["name", "values"],
        [name, values],
    )
    # Explicitly set all defaults to ensure
    # inclusion during pydantic serialization
    return EnumDef(
        category=AtlanTypeCategory.ENUM,
        name=name,
        element_defs=EnumDef.ElementDef.list_from(values),
    )
get_valid_values() -> List[str]

Translate the element definitions in this enumeration into simple list of strings.

Source code in pyatlan/model/typedef.py
def get_valid_values(self) -> List[str]:
    """
    Translate the element definitions in this enumeration into simple list of strings.
    """
    return [one.value for one in self.element_defs] if self.element_defs else []
update(client: AtlanClient, name: str, values: List[str], replace_existing: bool) -> EnumDef staticmethod

Builds the minimal object necessary to update an enumeration definition.

:param client: connectivity to an Atlan tenant :param name: display name the human-readable name for the enumeration :param values: the list of additional valid values (as strings) to add to the existing enumeration :param replace_existing: if True, will replace all existing values in the enumeration with the new ones; or if False the new ones will be appended to the existing set :returns: the minimal object necessary to update the enumeration typedef

Source code in pyatlan/model/typedef.py
@staticmethod
def update(
    client: AtlanClient, name: str, values: List[str], replace_existing: bool
) -> EnumDef:
    """
    Builds the minimal object necessary to update an enumeration definition.

    :param client: connectivity to an Atlan tenant
    :param name: display name the human-readable name for the enumeration
    :param values: the list of additional valid values
    (as strings) to add to the existing enumeration
    :param replace_existing: if `True`, will replace all
    existing values in the enumeration with the new ones;
    or if `False` the new ones will be appended to the existing set
    :returns: the minimal object necessary to update the enumeration typedef
    """
    from pyatlan.utils import validate_required_fields

    validate_required_fields(
        ["name", "values", "replace_existing"],
        [name, values, replace_existing],
    )
    update_values = (
        values
        if replace_existing
        else EnumDef.ElementDef.extend_elements(
            new=values,
            current=client.enum_cache.get_by_name(str(name)).get_valid_values(),
        )
    )
    return EnumDef(
        name=name,
        category=AtlanTypeCategory.ENUM,
        element_defs=EnumDef.ElementDef.list_from(update_values),
    )
update_async(client, name: str, values: List[str], replace_existing: bool) -> EnumDef async staticmethod

Builds the minimal object necessary to update an enumeration definition.

:param client: connectivity to an Atlan tenant :param name: display name the human-readable name for the enumeration :param values: the list of additional valid values (as strings) to add to the existing enumeration :param replace_existing: if True, will replace all existing values in the enumeration with the new ones; or if False the new ones will be appended to the existing set :returns: the minimal object necessary to update the enumeration typedef

Source code in pyatlan/model/typedef.py
@staticmethod
async def update_async(
    client, name: str, values: List[str], replace_existing: bool
) -> EnumDef:
    """
    Builds the minimal object necessary to update an enumeration definition.

    :param client: connectivity to an Atlan tenant
    :param name: display name the human-readable name for the enumeration
    :param values: the list of additional valid values
    (as strings) to add to the existing enumeration
    :param replace_existing: if `True`, will replace all
    existing values in the enumeration with the new ones;
    or if `False` the new ones will be appended to the existing set
    :returns: the minimal object necessary to update the enumeration typedef
    """
    from pyatlan.utils import validate_required_fields

    validate_required_fields(
        ["name", "values", "replace_existing"],
        [name, values, replace_existing],
    )
    update_values = (
        values
        if replace_existing
        else EnumDef.ElementDef.extend_elements(
            new=values,
            current=(
                await client.enum_cache.get_by_name(str(name))
            ).get_valid_values(),
        )
    )
    return EnumDef(
        name=name,
        category=AtlanTypeCategory.ENUM,
        element_defs=EnumDef.ElementDef.list_from(update_values),
    )

Audit

pyatlan.model.audit

Classes

AuditSearchRequest(__pydantic_self__, **data: Any)

Bases: SearchRequest

Class from which to configure a search against Atlan's activity log.

Source code in pyatlan/model/audit.py
def __init__(__pydantic_self__, **data: Any) -> None:
    dsl = data.get("dsl")
    class_name = __pydantic_self__.__class__.__name__
    if dsl and isinstance(dsl, DSL) and not dsl.req_class_name:
        data["dsl"] = DSL(req_class_name=class_name, **dsl.dict(exclude_unset=True))
    super().__init__(**data)
Functions
by_guid(guid: str, *, size: int = 10, _from: int = 0, sort: Union[SortItem, List[SortItem]] = LATEST_FIRST) -> 'AuditSearchRequest' classmethod

Create an audit search request for the last changes to an asset, by its GUID. :param guid: unique identifier of the asset for which to retrieve the audit history :param size: number of changes to retrieve :param _from: starting point for paging. Defaults to 0 (very first result) if not overridden :param sort: sorting criteria for the results. Defaults to LATEST_FIRST(sorting by "created" in desc order). :returns: an AuditSearchRequest that can be used to perform the search

Source code in pyatlan/model/audit.py
@classmethod
def by_guid(
    cls,
    guid: str,
    *,
    size: int = 10,
    _from: int = 0,
    sort: Union[SortItem, List[SortItem]] = LATEST_FIRST,
) -> "AuditSearchRequest":
    """
    Create an audit search request for the last changes to an asset, by its GUID.
    :param guid: unique identifier of the asset for which to retrieve the audit history
    :param size: number of changes to retrieve
    :param _from: starting point for paging. Defaults to 0 (very first result) if not overridden
    :param sort: sorting criteria for the results. Defaults to LATEST_FIRST(sorting by "created" in desc order).
    :returns: an AuditSearchRequest that can be used to perform the search
    """
    dsl = DSL(
        query=Bool(filter=[Term(field="entityId", value=guid)]),
        sort=sort if LATEST_FIRST else [],
        size=size,
        _from=_from,
    )
    return AuditSearchRequest(dsl=dsl)
by_qualified_name(type_name: str, qualified_name: str, *, size: int = 10, _from: int = 0, sort: Union[SortItem, List[SortItem]] = LATEST_FIRST) -> 'AuditSearchRequest' classmethod

Create an audit search request for the last changes to an asset, by its qualifiedName. :param type_name: the type of asset for which to retrieve the audit history :param qualified_name: unique name of the asset for which to retrieve the audit history :param size: number of changes to retrieve :param _from: starting point for paging. Defaults to 0 (very first result) if not overridden :param sort: sorting criteria for the results. Defaults to LATEST_FIRST(sorting by "created" in desc order). :returns: an AuditSearchRequest that can be used to perform the search

Source code in pyatlan/model/audit.py
@classmethod
def by_qualified_name(
    cls,
    type_name: str,
    qualified_name: str,
    *,
    size: int = 10,
    _from: int = 0,
    sort: Union[SortItem, List[SortItem]] = LATEST_FIRST,
) -> "AuditSearchRequest":
    """
    Create an audit search request for the last changes to an asset, by its qualifiedName.
    :param type_name: the type of asset for which to retrieve the audit history
    :param qualified_name: unique name of the asset for which to retrieve the audit history
    :param size: number of changes to retrieve
    :param _from: starting point for paging. Defaults to 0 (very first result) if not overridden
    :param sort: sorting criteria for the results. Defaults to LATEST_FIRST(sorting by "created" in desc order).
    :returns: an AuditSearchRequest that can be used to perform the search
    """
    dsl = DSL(
        query=Bool(
            must=[
                Term(field="entityQualifiedName", value=qualified_name),
                Term(field="typeName", value=type_name),
            ]
        ),
        sort=sort if LATEST_FIRST else [],
        size=size,
        _from=_from,
    )
    return AuditSearchRequest(dsl=dsl)
by_user(user: str, *, size: int = 10, _from: int = 0, sort: Union[SortItem, List[SortItem]] = LATEST_FIRST) -> 'AuditSearchRequest' classmethod

Create an audit search request for the last changes to an asset, by a given user. :param user: the name of the user for which to look for any changes :param size: number of changes to retrieve :param _from: starting point for paging. Defaults to 0 (very first result) if not overridden :param sort: sorting criteria for the results. Defaults to LATEST_FIRST(sorting by "created" in desc order). :returns: an AuditSearchRequest that can be used to perform the search

Source code in pyatlan/model/audit.py
@classmethod
def by_user(
    cls,
    user: str,
    *,
    size: int = 10,
    _from: int = 0,
    sort: Union[SortItem, List[SortItem]] = LATEST_FIRST,
) -> "AuditSearchRequest":
    """
    Create an audit search request for the last changes to an asset, by a given user.
    :param user: the name of the user for which to look for any changes
    :param size: number of changes to retrieve
    :param _from: starting point for paging. Defaults to 0 (very first result) if not overridden
    :param sort: sorting criteria for the results. Defaults to LATEST_FIRST(sorting by "created" in desc order).
    :returns: an AuditSearchRequest that can be used to perform the search
    """
    dsl = DSL(
        query=Bool(filter=[Term(field="user", value=user)]),
        sort=sort if LATEST_FIRST else [],
        size=size,
        _from=_from,
    )
    return AuditSearchRequest(dsl=dsl)

AuditSearchResults(client: ApiCaller, criteria: AuditSearchRequest, start: int, size: int, entity_audits: List[EntityAudit], count: int, bulk: bool = False, aggregations: Optional[Aggregation] = None)

Bases: Iterable

Captures the response from a search against Atlan's activity log.

Source code in pyatlan/model/audit.py
def __init__(
    self,
    client: ApiCaller,
    criteria: AuditSearchRequest,
    start: int,
    size: int,
    entity_audits: List[EntityAudit],
    count: int,
    bulk: bool = False,
    aggregations: Optional[Aggregation] = None,
):
    self._client = client
    self._endpoint = AUDIT_SEARCH
    self._criteria = criteria
    self._start = start
    self._size = size
    self._entity_audits = entity_audits
    self._count = count
    self._approximate_count = count
    self._bulk = bulk
    self._aggregations = aggregations
    self._first_record_creation_time = -2
    self._last_record_creation_time = -2
    self._processed_entity_keys: Set[str] = set()
Functions
current_page() -> List[EntityAudit]

Retrieve the current page of results.

:returns: list of assets on the current page of results

Source code in pyatlan/model/audit.py
def current_page(self) -> List[EntityAudit]:
    """
    Retrieve the current page of results.

    :returns: list of assets on the current page of results
    """
    return self._entity_audits
next_page(start=None, size=None) -> bool

Indicates whether there is a next page of results.

:returns: True if there is a next page of results, otherwise False

Source code in pyatlan/model/audit.py
def next_page(self, start=None, size=None) -> bool:
    """
    Indicates whether there is a next page of results.

    :returns: True if there is a next page of results, otherwise False
    """
    self._start = start or self._start + self._size
    is_bulk_search = (
        self._bulk or self._approximate_count > self._MASS_EXTRACT_THRESHOLD
    )
    if size:
        self._size = size

    if is_bulk_search:
        # Used in the "timestamp-based" paging approach
        # to check if audit `entity.event_key` has already been processed
        # in a previous page of results.
        # If it has,then exclude it from the current results;
        # otherwise, we may encounter duplicate audit entity records.
        self._processed_entity_keys.update(
            entity.event_key for entity in self._entity_audits
        )
    return self._get_next_page() if self._entity_audits else False
presorted_by_timestamp(sorts: Optional[List[SortItem]]) -> bool staticmethod

Checks if the sorting options prioritize creation time in ascending order. :param sorts: list of sorting options or None. :returns: True if sorting is already prioritized by creation time, False otherwise.

Source code in pyatlan/model/audit.py
@staticmethod
def presorted_by_timestamp(sorts: Optional[List[SortItem]]) -> bool:
    """
    Checks if the sorting options prioritize creation time in ascending order.
    :param sorts: list of sorting options or None.
    :returns: True if sorting is already prioritized by creation time, False otherwise.
    """
    if sorts and isinstance(sorts[0], SortItem):
        return sorts[0].field == "created" and sorts[0].order == SortOrder.ASCENDING
    return False
sort_by_timestamp_first(sorts: List[SortItem]) -> List[SortItem] staticmethod

Rewrites the sorting options to ensure that sorting by creation time, ascending, is the top priority. Adds this condition if it does not already exist, or moves it up to the top sorting priority if it does already exist in the list.

:param sorts: list of sorting options :returns: sorting options, making sorting by creation time in ascending order the top priority

Source code in pyatlan/model/audit.py
@staticmethod
def sort_by_timestamp_first(sorts: List[SortItem]) -> List[SortItem]:
    """
    Rewrites the sorting options to ensure that
    sorting by creation time, ascending, is the top
    priority. Adds this condition if it does not
    already exist, or moves it up to the top sorting
    priority if it does already exist in the list.

    :param sorts: list of sorting options
    :returns: sorting options, making sorting by
    creation time in ascending order the top priority
    """
    creation_asc_sort = [SortItem("created", order=SortOrder.ASCENDING)]

    if not sorts:
        return creation_asc_sort

    rewritten_sorts = [
        sort
        for sort in sorts
        if (not sort.field) or (sort.field != Asset.CREATE_TIME.internal_field_name)
    ]
    return creation_asc_sort + rewritten_sorts

CustomMetadataAttributesAuditDetail

Bases: AtlanObject

Capture the attributes and values for custom metadata as tracked through the audit log.

EntityAudit

Bases: AtlanObject

Detailed entry in the audit log. These objects should be treated as immutable.

Lineage

pyatlan.model.lineage

Classes

FluentLineage(*, starting_guid: StrictStr, depth: StrictInt = 1000000, direction: LineageDirection = LineageDirection.DOWNSTREAM, size: StrictInt = 10, exclude_meanings: StrictBool = True, exclude_atlan_tags: StrictBool = True, immediate_neighbors: StrictBool = False, includes_on_results: Optional[Union[List[str], str, List[AtlanField], AtlanField]] = None, includes_in_results: Optional[Union[List[LineageFilter], LineageFilter]] = None, includes_on_relations: Optional[Union[List[str], str, List[AtlanField], AtlanField]] = None, includes_condition: FilterList.Condition = FilterList.Condition.AND, where_assets: Optional[Union[List[LineageFilter], LineageFilter]] = None, assets_condition: FilterList.Condition = FilterList.Condition.AND, where_relationships: Optional[Union[List[LineageFilter], LineageFilter]] = None, relationships_condition: FilterList.Condition = FilterList.Condition.AND)

Lineage abstraction mechanism, to simplify the most common lineage requests against Atlan (removing the need to understand the guts of Elastic).

Create a FluentLineage request.

:param starting_guid: unique identifier (GUID) of the asset from which to start lineage :param depth: number of degrees of separation (hops) across which lineage should be fetched :param direction: direction of lineage to fetch (upstream or downstream) :param size: number of results to retrieve :param exclude_meanings: whether to include assigned terms for assets (False) or not (True) :param exclude_atlan_tags: whether to include Atlan tags for assets (False) or not (True) :param immediate_neighbors: whether to include immediate neighbors of the starting asset in the response (True), or not (False). :param includes_on_results: attributes to retrieve for each asset in the lineage results :param includes_in_results: assets to include in the results. Any assets not matching these filters will not be included in the results, but will still be traversed in the lineage so that any assets beyond them are still considered for inclusion in the results. :param includes_on_relations: attributes to retrieve for each asset related to the assets in the results (for internal use, unchecked!). :param includes_condition: whether the includes_in_results criteria should be combined (AND) or any matching is sufficient (OR) :param where_assets: filters to apply on assets. Any assets excluded by the filters will exclude all assets beyond, as well :param assets_condition: whether the where_assets criteria should be combined (AND) or any matching is sufficient (OR) :param where_relationships: filters to apply on relationships. Any relationships excluded by the filters will exclude all assets and relationships beyond, as well :param relationships_condition: whether the where_relationships criteria should be combined (AND) or any matching is sufficient (OR)

Source code in pyatlan/model/lineage.py
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def __init__(
    self,
    *,
    starting_guid: StrictStr,
    depth: StrictInt = 1000000,
    direction: LineageDirection = LineageDirection.DOWNSTREAM,
    size: StrictInt = 10,
    exclude_meanings: StrictBool = True,
    exclude_atlan_tags: StrictBool = True,
    immediate_neighbors: StrictBool = False,
    includes_on_results: Optional[
        Union[List[str], str, List[AtlanField], AtlanField]
    ] = None,
    includes_in_results: Optional[Union[List[LineageFilter], LineageFilter]] = None,
    includes_on_relations: Optional[
        Union[List[str], str, List[AtlanField], AtlanField]
    ] = None,
    includes_condition: FilterList.Condition = FilterList.Condition.AND,
    where_assets: Optional[Union[List[LineageFilter], LineageFilter]] = None,
    assets_condition: FilterList.Condition = FilterList.Condition.AND,
    where_relationships: Optional[Union[List[LineageFilter], LineageFilter]] = None,
    relationships_condition: FilterList.Condition = FilterList.Condition.AND,
):
    """
    Create a FluentLineage request.

    :param starting_guid: unique identifier (GUID) of the asset from which to start lineage
    :param depth: number of degrees of separation (hops) across which lineage should be fetched
    :param direction: direction of lineage to fetch (upstream or downstream)
    :param size: number of results to retrieve
    :param exclude_meanings: whether to include assigned terms for assets (False) or not (True)
    :param exclude_atlan_tags: whether to include Atlan tags for assets (False) or not (True)
    :param immediate_neighbors: whether to include immediate neighbors
    of the starting asset in the response (True), or not (False).
    :param includes_on_results: attributes to retrieve for each asset in the lineage results
    :param includes_in_results: assets to include in the results.
        Any assets not matching these filters will not be included in the results,
        but will still be traversed in the lineage so that any assets
        beyond them are still considered for inclusion in the results.
    :param includes_on_relations: attributes to retrieve for each asset related
    to the assets in the results (for internal use, unchecked!).
    :param includes_condition: whether the `includes_in_results` criteria
    should be combined (AND) or any matching is sufficient (OR)
    :param where_assets: filters to apply on assets. Any assets excluded
    by the filters will exclude all assets beyond, as well
    :param assets_condition: whether the `where_assets` criteria
    should be combined (AND) or any matching is sufficient (OR)
    :param where_relationships: filters to apply on relationships.
    Any relationships excluded by the filters will exclude all assets and relationships beyond, as well
    :param relationships_condition: whether the `where_relationships` criteria
    should be combined (AND) or any matching is sufficient (OR)
    """

    self._depth: int = depth
    self._direction: LineageDirection = direction
    self._exclude_atlan_tags: bool = exclude_atlan_tags
    self._exclude_meanings: bool = exclude_meanings
    self._immediate_neighbors: bool = immediate_neighbors
    self._includes_on_results: List[Union[str, AtlanField]] = self._to_list(
        includes_on_results
    )
    self._includes_in_results: List[LineageFilter] = self._to_list(
        includes_in_results
    )
    self._includes_on_relations: List[Union[str, AtlanField]] = self._to_list(
        includes_on_relations
    )
    self._includes_condition: FilterList.Condition = includes_condition
    self._size: int = size
    self._starting_guid = starting_guid
    self._where_assets: List[LineageFilter] = self._to_list(where_assets)
    self._assets_condition: FilterList.Condition = assets_condition
    self._where_relationships: List[LineageFilter] = self._to_list(
        where_relationships
    )
    self._relationships_condition: FilterList.Condition = relationships_condition
Attributes
request: LineageListRequest property

:returns: the LineageListRequest that encapsulates information specified in this FluentLineage

Functions
assets_condition(assets_condition: FilterList.Condition) -> 'FluentLineage'

Adds the filter condition to where_assets.

:param assets_condition: whether the where_assets criteria should be combined (AND) or any matching is sufficient (OR) :returns: the FluentLineage with this assets_condition criterion added

Source code in pyatlan/model/lineage.py
def assets_condition(
    self, assets_condition: FilterList.Condition
) -> "FluentLineage":
    """
    Adds the filter condition to `where_assets`.

    :param assets_condition: whether the `where_assets`
    criteria should be combined (AND) or any matching is sufficient (OR)
    :returns: the FluentLineage with this assets_condition criterion added
    """
    validate_type(
        name="assets_condition",
        _type=FilterList.Condition,
        value=assets_condition,
    )
    clone = self._clone()
    clone._assets_condition = assets_condition
    return clone
depth(depth: StrictInt) -> 'FluentLineage'

Adds the depth to traverse the lineage.

:param depth: number of degrees of separation (hops) across which lineage should be fetched :returns: the FluentLineage with this depth criterion added

Source code in pyatlan/model/lineage.py
def depth(self, depth: StrictInt) -> "FluentLineage":
    """
    Adds the depth to traverse the lineage.

    :param depth: number of degrees of separation (hops) across which lineage should be fetched
    :returns: the FluentLineage with this depth criterion added
    """
    validate_type(name="depth", _type=int, value=depth)
    clone = self._clone()
    clone._depth = depth
    return clone
direction(direction: LineageDirection) -> 'FluentLineage'

Adds the direction to traverse the lineage.

:param direction: direction of lineage to fetch (upstream or downstream) :returns: the FluentLineage with this direction criterion added

Source code in pyatlan/model/lineage.py
def direction(self, direction: LineageDirection) -> "FluentLineage":
    """
    Adds the direction to traverse the lineage.

    :param direction: direction of lineage to fetch (upstream or downstream)
    :returns: the FluentLineage with this direction criterion added
    """
    validate_type(name="direction", _type=LineageDirection, value=direction)
    clone = self._clone()
    clone._direction = direction
    return clone
exclude_atlan_tags(exclude_atlan_tags: StrictBool) -> 'FluentLineage'

Adds the exclude_atlan_tags to traverse the lineage.

:param exclude_atlan_tags: whether to include Atlan tags for assets (False) or not (True) :returns: the FluentLineage with this exclude_atlan_tags criterion added

Source code in pyatlan/model/lineage.py
def exclude_atlan_tags(self, exclude_atlan_tags: StrictBool) -> "FluentLineage":
    """
    Adds the exclude_atlan_tags to traverse the lineage.

    :param exclude_atlan_tags: whether to include Atlan tags for assets (False) or not (True)
    :returns: the FluentLineage with this exclude_atlan_tags criterion added
    """
    validate_type(name="exclude_atlan_tags", _type=bool, value=exclude_atlan_tags)
    clone = self._clone()
    clone._exclude_atlan_tags = exclude_atlan_tags
    return clone
exclude_meanings(exclude_meanings: StrictBool) -> 'FluentLineage'

Adds the exclude_meanings to traverse the lineage.

:param exclude_meanings: whether to include assigned terms for assets (False) or not (True) :returns: the FluentLineage with this exclude_meanings criterion added

Source code in pyatlan/model/lineage.py
def exclude_meanings(self, exclude_meanings: StrictBool) -> "FluentLineage":
    """
    Adds the exclude_meanings to traverse the lineage.

    :param exclude_meanings: whether to include assigned terms for assets (False) or not (True)
    :returns: the FluentLineage with this exclude_meanings criterion added
    """
    validate_type(name="exclude_meanings", _type=bool, value=exclude_meanings)
    clone = self._clone()
    clone._exclude_meanings = exclude_meanings
    return clone
immediate_neighbors(immediate_neighbors: StrictBool) -> 'FluentLineage'

Adds the immediate_neighbors to traverse the lineage.

:param immediate_neighbors: whether to include immediate neighbors of the starting asset in the response (True), or not (False) :returns: the FluentLineage with this immediate_neighbors criterion added

Source code in pyatlan/model/lineage.py
def immediate_neighbors(self, immediate_neighbors: StrictBool) -> "FluentLineage":
    """
    Adds the immediate_neighbors to traverse the lineage.

    :param immediate_neighbors: whether to include immediate neighbors
    of the starting asset in the response (True), or not (False)
    :returns: the FluentLineage with this immediate_neighbors criterion added
    """
    validate_type(name="immediate_neighbors", _type=bool, value=immediate_neighbors)
    clone = self._clone()
    clone._immediate_neighbors = immediate_neighbors
    return clone
include_in_results(lineage_filter: LineageFilter) -> 'FluentLineage'

Adds the include_on_results to traverse the lineage.

:param lineage_filter: Assets to include in the results. Any assets not matching this filters will not be included in the results, but will still be traversed in the lineage so that any assets beyond them are still considered for inclusion in the results :returns: the FluentLineage with this include_in_results criterion added

Source code in pyatlan/model/lineage.py
def include_in_results(self, lineage_filter: LineageFilter) -> "FluentLineage":
    """
    Adds the include_on_results to traverse the lineage.

    :param lineage_filter: Assets to include in the results.
        Any assets not matching this filters will not be
        included in the results, but will still be traversed
        in the lineage so that any assets beyond them are still
        considered for inclusion in the results
    :returns: the FluentLineage with this include_in_results criterion added
    """
    validate_type(name="lineage_filter", _type=LineageFilter, value=lineage_filter)
    clone = self._clone()
    clone._includes_in_results.append(lineage_filter)
    return clone
include_on_relations(field: Union[str, AtlanField]) -> 'FluentLineage'

Adds the include_on_relations to traverse the lineage.

:param field: attributes to retrieve for each asset related to the assets in the results (for internal use, unchecked!) :returns: the FluentLineage with this include_on_relations criterion added

Source code in pyatlan/model/lineage.py
def include_on_relations(self, field: Union[str, AtlanField]) -> "FluentLineage":
    """
    Adds the include_on_relations to traverse the lineage.

    :param field: attributes to retrieve for each asset related
    to the assets in the results (for internal use, unchecked!)
    :returns: the FluentLineage with this include_on_relations criterion added
    """
    validate_type(name="field", _type=(str, AtlanField), value=field)
    clone = self._clone()
    clone._includes_on_relations.append(field)
    return clone
include_on_results(field: Union[str, AtlanField]) -> 'FluentLineage'

Adds the include_on_results to traverse the lineage.

:param field: attributes to retrieve for each asset in the lineage results :returns: the FluentLineage with this include_on_results criterion added

Source code in pyatlan/model/lineage.py
def include_on_results(self, field: Union[str, AtlanField]) -> "FluentLineage":
    """
    Adds the include_on_results to traverse the lineage.

    :param field: attributes to retrieve for each asset in the lineage results
    :returns: the FluentLineage with this include_on_results criterion added
    """
    validate_type(name="field", _type=(str, AtlanField), value=field)
    clone = self._clone()
    clone._includes_on_results.append(field)
    return clone
includes_condition(includes_condition: FilterList.Condition) -> 'FluentLineage'

Adds the filter condition to include_on_results.

:param includes_condition: whether the includes_in_results criteria should be combined (AND) or any matching is sufficient (OR) :returns: the FluentLineage with this includes_condition criterion added

Source code in pyatlan/model/lineage.py
def includes_condition(
    self, includes_condition: FilterList.Condition
) -> "FluentLineage":
    """
    Adds the filter condition to `include_on_results`.

    :param includes_condition: whether the `includes_in_results`
    criteria should be combined (AND) or any matching is sufficient (OR)
    :returns: the FluentLineage with this includes_condition criterion added
    """
    validate_type(
        name="includes_condition",
        _type=FilterList.Condition,
        value=includes_condition,
    )
    clone = self._clone()
    clone._includes_condition = includes_condition
    return clone
relationships_condition(relationships_condition: FilterList.Condition) -> 'FluentLineage'

Adds the filter condition to where_relationships.

:param relationships_condition: whether the where_relationships criteria should be combined (AND) or any matching is sufficient (OR) :returns: the FluentLineage with this relationships_condition criterion added

Source code in pyatlan/model/lineage.py
def relationships_condition(
    self, relationships_condition: FilterList.Condition
) -> "FluentLineage":
    """
    Adds the filter condition to `where_relationships`.

    :param relationships_condition: whether the `where_relationships`
    criteria should be combined (AND) or any matching is sufficient (OR)
    :returns: the FluentLineage with this relationships_condition criterion added
    """
    validate_type(
        name="relationships_condition",
        _type=FilterList.Condition,
        value=relationships_condition,
    )
    clone = self._clone()
    clone._relationships_condition = relationships_condition
    return clone
size(size: StrictInt) -> 'FluentLineage'

Adds the size to traverse the lineage.

:param size: number of results to retrieve :returns: the FluentLineage with this size criterion added

Source code in pyatlan/model/lineage.py
def size(self, size: StrictInt) -> "FluentLineage":
    """
    Adds the size to traverse the lineage.

    :param size: number of results to retrieve
    :returns: the FluentLineage with this size criterion added
    """
    validate_type(name="size", _type=int, value=size)
    clone = self._clone()
    clone._size = size
    return clone
where_assets(lineage_filter: LineageFilter) -> 'FluentLineage'

Adds a filters to apply on assets.

:param lineage_filter: a filter to apply on assets. Any assets excluded by the filters will exclude all assets beyond, as well :returns: the FluentLineage with this where_assets criterion added

Source code in pyatlan/model/lineage.py
def where_assets(self, lineage_filter: LineageFilter) -> "FluentLineage":
    """
    Adds a filters to apply on assets.

    :param lineage_filter: a filter to apply on assets.
    Any assets excluded by the filters will exclude all assets beyond, as well
    :returns: the FluentLineage with this where_assets criterion added
    """
    validate_type(name="lineage_filter", _type=LineageFilter, value=lineage_filter)
    clone = self._clone()
    clone._where_assets.append(lineage_filter)
    return clone
where_relationships(lineage_filter: LineageFilter) -> 'FluentLineage'

Filters to apply on relationships.

:param lineage_filter: any relationships excluded by the filter will exclude all assets and relationships beyond, as well. :returns: the FluentLineage with this where_relationships criterion added

Source code in pyatlan/model/lineage.py
def where_relationships(self, lineage_filter: LineageFilter) -> "FluentLineage":
    """
    Filters to apply on relationships.

    :param lineage_filter: any relationships excluded
    by the filter will exclude all assets and relationships beyond, as well.
    :returns: the FluentLineage with this where_relationships criterion added
    """
    validate_type(name="lineage_filter", _type=LineageFilter, value=lineage_filter)
    clone = self._clone()
    clone._where_relationships.append(lineage_filter)
    return clone

Functions

Custom Metadata

pyatlan.model.custom_metadata

Classes

CustomMetadataDict(client: AtlanClient, name: str)

Bases: UserDict

This class allows the manipulation of a set of custom metadata attributes using the human-readable names.

Inits CustomMetadataDict with a string containing the human-readable name of a set of custom metadata

Source code in pyatlan/model/custom_metadata.py
def __init__(self, client: AtlanClient, name: str):
    """Inits CustomMetadataDict with a string containing the human-readable name of a set of custom metadata"""
    super().__init__()
    self._name = name
    self._modified = False
    self._client = client
    _id = self._client.custom_metadata_cache.get_id_for_name(name)
    self._names = {
        value
        for key, value in self._client.custom_metadata_cache.map_attr_id_to_name[
            _id
        ].items()
        if not self._client.custom_metadata_cache.is_attr_archived(attr_id=key)
    }
Attributes
business_attributes: Dict[str, Any] property

Returns a dict containing the metadata set with the human-readable set name and property names resolved to their internal values

modified property

Returns a boolean indicating whether the set has been modified from its initial values

Functions
clear_all()

This method will set all the properties available explicitly to None

Source code in pyatlan/model/custom_metadata.py
def clear_all(self):
    """This method will set all the properties available explicitly to None"""
    for attribute_name in self._names:
        self.data[attribute_name] = None
    self._modified = True
clear_unset()

This method will set all properties that haven't been set to None

Source code in pyatlan/model/custom_metadata.py
def clear_unset(self):
    """This method will set all properties that haven't been set to None"""
    for name in self.attribute_names:
        if name not in self.data:
            self.data[name] = None
get_deleted_sentinel() -> 'CustomMetadataDict' classmethod

Will return an CustomMetadataDict that is a sentinel object to represent deleted custom meta data.

Source code in pyatlan/model/custom_metadata.py
@classmethod
def get_deleted_sentinel(cls) -> "CustomMetadataDict":
    """Will return an CustomMetadataDict that is a sentinel object to represent deleted custom meta data."""
    if cls._sentinel is not None:
        return cls._sentinel
    return cls.__new__(
        cls, DELETED_SENTINEL
    )  # Because __new__ is being invoked directly __init__ won't be invoked
is_set(key: str)

Returns a boolean indicating whether the given property has been set in the metadata set. The key will be validated to ensure that it's a valid property name for this metadata set

Source code in pyatlan/model/custom_metadata.py
def is_set(self, key: str):
    """Returns a boolean indicating whether the given property has been set in the metadata set. The key
    will be validated to ensure that it's a valid property name for this metadata set
    """
    if key not in self._names:
        raise KeyError(f"'{key}' is not a valid property name for {self._name}")
    return key in self.data

Structs

pyatlan.model.structs

Classes

Action

Bases: AtlanObject

Description

AppWorkflowRunStep

Bases: AtlanObject

Description

AssetExternalDQMetadata

Bases: AtlanObject

Description

AssetExternalDQScoreBreakdownByDimension

Bases: AtlanObject

Description

AssetExternalDQTestDetails

Bases: AtlanObject

Description

AssetExternalDQTestMetric

Bases: AtlanObject

Description

AssetExternalDQTestRunHistory

Bases: AtlanObject

Description

AssetGCPDataplexAspectMetadata

Bases: AtlanObject

Description

AssetGCPDataplexMetadata

Bases: AtlanObject

Description

AssetHistogram

Bases: AtlanObject

Description

AssetSmusMetadataFormDetails

Bases: AtlanObject

Description

AssetV1ExternalDQTestRule

Bases: AtlanObject

Description

AssetV1ExternalDQTestScoreDimension

Bases: AtlanObject

Description

AtlanAppErrorHandling

Bases: AtlanObject

Description

AtlanObject

Bases: BaseModel

Functions
flatten_structs_attributes(values: Dict[str, Any]) -> Dict[str, Any]

Flatten the 'attributes' of the struct models.

:param values: dictionary containing the attributes. :returns: modified dictionary with attributes flattened.

Source code in pyatlan/model/structs.py
@root_validator(pre=True)
def flatten_structs_attributes(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """
    Flatten the 'attributes' of the struct models.

    :param values: dictionary containing the attributes.
    :returns: modified dictionary with attributes flattened.
    """
    attributes = values.pop("attributes", {})
    values = {**values, **attributes}
    return values

AuthPolicyCondition

Bases: AtlanObject

Description

AuthPolicyValiditySchedule

Bases: AtlanObject

Description

AwsCloudWatchMetric

Bases: AtlanObject

Description

AwsTag

Bases: AtlanObject

Description

AzureTag

Bases: AtlanObject

Description

BadgeCondition

Bases: AtlanObject

Description

BusinessPolicyRule

Bases: AtlanObject

Description

ColumnValueFrequencyMap

Bases: AtlanObject

Description

DataQualityRuleConfigArguments

Bases: AtlanObject

Description

DataQualityRuleTemplateConfig

Bases: AtlanObject

Description

DataQualityRuleThresholdObject

Bases: AtlanObject

Description

DatabricksAIModelVersionMetric

Bases: AtlanObject

Description

DbtInputContext

Bases: AtlanObject

Description

DbtJobRun

Bases: AtlanObject

Description

DbtMetricFilter

Bases: AtlanObject

Description

FormField

Bases: AtlanObject

Description

GoogleLabel

Bases: AtlanObject

Description

GoogleTag

Bases: AtlanObject

Description

Histogram

Bases: AtlanObject

Description

IcebergPartition

Bases: AtlanObject

Description

KafkaTopicConsumption

Bases: AtlanObject

Description

MCRuleComparison

Bases: AtlanObject

Description

MCRuleSchedule

Bases: AtlanObject

Description

PopularityInsights

Bases: AtlanObject

Description

ResponseValue

Bases: AtlanObject

Description

SQLProcedureArgument

Bases: AtlanObject

Description

SQLProcedureReturn

Bases: AtlanObject

Description

SourceTagAttachment

Bases: AtlanObject

Description

Functions
by_name(client: AtlanClient, name: SourceTagName, source_tag_values: List[SourceTagAttachmentValue], source_tag_sync_timestamp: Optional[datetime] = None, is_source_tag_synced: Optional[bool] = None, source_tag_sync_error: Optional[str] = None) classmethod

Create a source-synced tag attachment with a particular value when the attachment is synced to the source.

:param client: connectivity to an Atlan tenant :param name: unique name of the source tag in Atlan :param source_tag_values: value of the tag attachment from the source :param is_source_tag_synced: whether the tag attachment has been synced at the source (True) or not (False) :param source_tag_sync_timestamp: time (epoch) when the tag attachment was synced at the source, in milliseconds :param source_tag_sync_error: error message if the tag attachment sync at the source failed :returns: a SourceTagAttachment with the provided information :raises AtlanError: on any error communicating via the underlying APIs :raises NotFoundError: if the source-synced tag cannot be resolved

Source code in pyatlan/model/structs.py
@classmethod
def by_name(
    cls,
    client: AtlanClient,
    name: SourceTagName,
    source_tag_values: List[SourceTagAttachmentValue],
    source_tag_sync_timestamp: Optional[datetime] = None,
    is_source_tag_synced: Optional[bool] = None,
    source_tag_sync_error: Optional[str] = None,
):
    """
    Create a source-synced tag attachment with
    a particular value when the attachment is synced to the source.

    :param client: connectivity to an Atlan tenant
    :param name: unique name of the source tag in Atlan
    :param source_tag_values: value of the tag attachment from the source
    :param is_source_tag_synced: whether the tag attachment has been synced at the source (True) or not (False)
    :param source_tag_sync_timestamp: time (epoch) when the tag attachment was synced at the source, in milliseconds
    :param source_tag_sync_error: error message if the tag attachment sync at the source failed
    :returns: a SourceTagAttachment with the provided information
    :raises AtlanError: on any error communicating via the underlying APIs
    :raises NotFoundError: if the source-synced tag cannot be resolved
    """
    tag = client.source_tag_cache.get_by_name(name)
    tag_connector_name = AtlanConnectorType._get_connector_type_from_qualified_name(
        tag.qualified_name or ""
    )
    return cls.of(
        source_tag_name=tag.name,
        source_tag_qualified_name=tag.qualified_name,
        source_tag_guid=tag.guid,
        source_tag_connector_name=tag_connector_name,
        source_tag_values=source_tag_values,
        **select_optional_set_fields(
            dict(
                is_source_tag_synced=is_source_tag_synced,
                source_tag_sync_timestamp=source_tag_sync_timestamp,
                source_tag_sync_error=source_tag_sync_error,
            )
        ),
    )
by_name_async(client: AsyncAtlanClient, name: AsyncSourceTagName, source_tag_values: List[SourceTagAttachmentValue], source_tag_sync_timestamp: Optional[datetime] = None, is_source_tag_synced: Optional[bool] = None, source_tag_sync_error: Optional[str] = None) async classmethod

Async version of by_name that creates a source-synced tag attachment with a particular value when the attachment is synced to the source.

:param client: async connectivity to an Atlan tenant :param name: unique name of the source tag in Atlan :param source_tag_values: value of the tag attachment from the source :param is_source_tag_synced: whether the tag attachment has been synced at the source (True) or not (False) :param source_tag_sync_timestamp: time (epoch) when the tag attachment was synced at the source, in milliseconds :param source_tag_sync_error: error message if the tag attachment sync at the source failed :returns: a SourceTagAttachment with the provided information :raises AtlanError: on any error communicating via the underlying APIs :raises NotFoundError: if the source-synced tag cannot be resolved

Source code in pyatlan/model/structs.py
@classmethod
async def by_name_async(
    cls,
    client: AsyncAtlanClient,
    name: AsyncSourceTagName,
    source_tag_values: List[SourceTagAttachmentValue],
    source_tag_sync_timestamp: Optional[datetime] = None,
    is_source_tag_synced: Optional[bool] = None,
    source_tag_sync_error: Optional[str] = None,
):
    """
    Async version of by_name that creates a source-synced tag attachment with
    a particular value when the attachment is synced to the source.

    :param client: async connectivity to an Atlan tenant
    :param name: unique name of the source tag in Atlan
    :param source_tag_values: value of the tag attachment from the source
    :param is_source_tag_synced: whether the tag attachment has been synced at the source (True) or not (False)
    :param source_tag_sync_timestamp: time (epoch) when the tag attachment was synced at the source, in milliseconds
    :param source_tag_sync_error: error message if the tag attachment sync at the source failed
    :returns: a SourceTagAttachment with the provided information
    :raises AtlanError: on any error communicating via the underlying APIs
    :raises NotFoundError: if the source-synced tag cannot be resolved
    """
    tag = await client.source_tag_cache.get_by_name(name)
    tag_connector_name = AtlanConnectorType._get_connector_type_from_qualified_name(
        tag.qualified_name or ""
    )
    return cls.of(
        source_tag_name=tag.name,
        source_tag_qualified_name=tag.qualified_name,
        source_tag_guid=tag.guid,
        source_tag_connector_name=tag_connector_name,
        source_tag_values=source_tag_values,
        **select_optional_set_fields(
            dict(
                is_source_tag_synced=is_source_tag_synced,
                source_tag_sync_timestamp=source_tag_sync_timestamp,
                source_tag_sync_error=source_tag_sync_error,
            )
        ),
    )
by_qualified_name(client: AtlanClient, source_tag_qualified_name: str, source_tag_values: List[SourceTagAttachmentValue], source_tag_sync_timestamp: Optional[datetime] = None, is_source_tag_synced: Optional[bool] = None, source_tag_sync_error: Optional[str] = None) classmethod

Create a source-synced tag attachment with a particular value when the attachment is synced to the source.

:param client: connectivity to an Atlan tenant :param source_tag_qualified_name: unique name of the source tag in Atlan :param source_tag_values: value of the tag attachment from the source :param is_source_tag_synced: whether the tag attachment has been synced at the source (True) or not (False) :param source_tag_sync_timestamp: time (epoch) when the tag attachment was synced at the source, in milliseconds :param source_tag_sync_error: error message if the tag attachment sync at the source failed :returns: a SourceTagAttachment with the provided information :raises AtlanError: on any error communicating via the underlying APIs :raises NotFoundError: if the source-synced tag cannot be resolved

Source code in pyatlan/model/structs.py
@classmethod
def by_qualified_name(
    cls,
    client: AtlanClient,
    source_tag_qualified_name: str,
    source_tag_values: List[SourceTagAttachmentValue],
    source_tag_sync_timestamp: Optional[datetime] = None,
    is_source_tag_synced: Optional[bool] = None,
    source_tag_sync_error: Optional[str] = None,
):
    """
    Create a source-synced tag attachment with
    a particular value when the attachment is synced to the source.

    :param client: connectivity to an Atlan tenant
    :param source_tag_qualified_name: unique name of the source tag in Atlan
    :param source_tag_values: value of the tag attachment from the source
    :param is_source_tag_synced: whether the tag attachment has been synced at the source (True) or not (False)
    :param source_tag_sync_timestamp: time (epoch) when the tag attachment was synced at the source, in milliseconds
    :param source_tag_sync_error: error message if the tag attachment sync at the source failed
    :returns: a SourceTagAttachment with the provided information
    :raises AtlanError: on any error communicating via the underlying APIs
    :raises NotFoundError: if the source-synced tag cannot be resolved
    """
    tag = client.source_tag_cache.get_by_qualified_name(source_tag_qualified_name)
    tag_connector_name = AtlanConnectorType._get_connector_type_from_qualified_name(
        source_tag_qualified_name or ""
    )
    return cls.of(
        source_tag_name=tag.name,
        source_tag_qualified_name=source_tag_qualified_name,
        source_tag_guid=tag.guid,
        source_tag_connector_name=tag_connector_name,
        source_tag_values=source_tag_values,
        **select_optional_set_fields(
            dict(
                is_source_tag_synced=is_source_tag_synced,
                source_tag_sync_timestamp=source_tag_sync_timestamp,
                source_tag_sync_error=source_tag_sync_error,
            )
        ),
    )
of(source_tag_name: Optional[str] = None, source_tag_qualified_name: Optional[str] = None, source_tag_guid: Optional[str] = None, source_tag_connector_name: Optional[str] = None, source_tag_values: Optional[List[SourceTagAttachmentValue]] = None, is_source_tag_synced: Optional[bool] = None, source_tag_sync_timestamp: Optional[datetime] = None, source_tag_sync_error: Optional[str] = None) classmethod

Quickly create a new SourceTagAttachment.

:param source_tag_name: simple name of the source tag :param source_tag_qualified_name: unique name of the source tag in Atlan :param source_tag_guid: unique identifier (GUID) of the source tag in Atlan :param source_tag_connector_name: connector that is the source of the tag :param source_tag_values: value of the tag attachment from the source :param is_source_tag_synced: whether the tag attachment has been synced at the source (True) or not (False) :param source_tag_sync_timestamp: time (epoch) when the tag attachment was synced at the source, in milliseconds :param source_tag_sync_error: error message if the tag attachment sync at the source failed :returns: a SourceTagAttachment with the provided information

Source code in pyatlan/model/structs.py
@classmethod
def of(
    cls,
    source_tag_name: Optional[str] = None,
    source_tag_qualified_name: Optional[str] = None,
    source_tag_guid: Optional[str] = None,
    source_tag_connector_name: Optional[str] = None,
    source_tag_values: Optional[List[SourceTagAttachmentValue]] = None,
    is_source_tag_synced: Optional[bool] = None,
    source_tag_sync_timestamp: Optional[datetime] = None,
    source_tag_sync_error: Optional[str] = None,
):
    """
    Quickly create a new SourceTagAttachment.

    :param source_tag_name: simple name of the source tag
    :param source_tag_qualified_name: unique name of the source tag in Atlan
    :param source_tag_guid: unique identifier (GUID) of the source tag in Atlan
    :param source_tag_connector_name: connector that is the source of the tag
    :param source_tag_values: value of the tag attachment from the source
    :param is_source_tag_synced: whether the tag attachment has been synced at the source (True) or not (False)
    :param source_tag_sync_timestamp: time (epoch) when the tag attachment was synced at the source, in milliseconds
    :param source_tag_sync_error: error message if the tag attachment sync at the source failed
    :returns: a SourceTagAttachment with the provided information
    """
    return SourceTagAttachment(
        **select_optional_set_fields(
            dict(
                source_tag_name=source_tag_name,
                source_tag_qualified_name=source_tag_qualified_name,
                source_tag_guid=source_tag_guid,
                source_tag_connector_name=source_tag_connector_name,
                source_tag_value=source_tag_values,
                is_source_tag_synced=is_source_tag_synced,
                source_tag_sync_timestamp=source_tag_sync_timestamp,
                source_tag_sync_error=source_tag_sync_error,
            )
        ),
    )

SourceTagAttachmentValue

Bases: AtlanObject

Description

SourceTagAttribute

Bases: AtlanObject

Description

StarredDetails

Bases: AtlanObject

Description

Functions

Enums

pyatlan.model.enums

Classes

AtlanConnectorType

Bases: str, Enum

Functions
get_connector_name(qualified_name: str, attribute_name: str = 'connection_qualified_name', qualified_name_len: int = 3)

Extracts and returns the connector name from a given qualified name.

:param qualified_name: qualified name to extract the connector name from. :param attribute_name: name of the attribute. Defaults to connection_qualified_name. :param qualified_name_len: expected length of the split qualified name. Defaults to 3. :raises: ValueError if the qualified name is invalid or the connector type is not recognized. :returns: connector name extracted from the qualified name or tuple(connector qualified name, connector name).

Source code in pyatlan/model/enums.py
def get_connector_name(
    qualified_name: str,
    attribute_name: str = "connection_qualified_name",
    qualified_name_len: int = 3,
):
    """
    Extracts and returns the connector name from a given qualified name.

    :param qualified_name: qualified name to extract the connector name from.
    :param attribute_name: name of the attribute. Defaults to `connection_qualified_name`.
    :param qualified_name_len: expected length of the split qualified name. Defaults to `3`.
    :raises: `ValueError` if the qualified name is invalid or the connector type is not recognized.
    :returns: connector name extracted from the qualified name
    or tuple(connector qualified name, connector name).
    """
    err = f"Invalid {attribute_name}"
    # Split the qualified name
    # to extract necessary information
    fields = qualified_name.split("/")
    if len(fields) != qualified_name_len:
        raise ValueError(err)

    connector_value = fields[1]
    # Try enum conversion; fallback to custom connector if it fails
    try:
        connector_name = AtlanConnectorType(connector_value).value  # type: ignore
    except ValueError:
        custom_connection = AtlanConnectorType.CREATE_CUSTOM(
            # Ensure the enum name is converted to UPPER_SNAKE_CASE from kebab-case
            name=connector_value.replace("-", "_").upper(),
            value=connector_value,
        )
        connector_name = custom_connection.value
    if attribute_name != "connection_qualified_name":
        connection_qn = f"{fields[0]}/{fields[1]}/{fields[2]}"
        return connection_qn, connector_name
    return connector_name

OpenLineageEventType

Bases: Enum

Current transition of the run state. It is required to issue 1 START event and 1 of [COMPLETE, ABORT, FAIL ] event per run.

Packages

pyatlan.model.packages

Classes

APITokenConnectionAdmin()

Bases: AbstractCustomPackage

Base configuration for a new API token connection admin package.

Source code in pyatlan/model/packages/base/custom_package.py
def __init__(
    self,
):
    super().__init__()
    self._epoch = int(utils.get_epoch_timestamp())
Functions
config(connection_qualified_name: str, api_token_guid: str) -> APITokenConnectionAdmin

Set up the API token connection admin with the specified configuration.

:param connection_qualified_name: connection qualified name to which you want to add the API token as a connection admin. :param api_token_guid: guid of the API token

:returns: package, with the specified configuration.

Source code in pyatlan/model/packages/api_token_connection_admin.py
def config(
    self, connection_qualified_name: str, api_token_guid: str
) -> APITokenConnectionAdmin:
    """
    Set up the API token connection admin with the specified configuration.

    :param connection_qualified_name: connection qualified name
    to which you want to add the API token as a connection admin.
    :param api_token_guid: guid of the API token

    :returns: package, with the specified configuration.
    """
    self._parameters.append(
        {"name": "connection_qualified_name", "value": connection_qualified_name}
    )
    self._parameters.append({"name": "api_token_guid", "value": api_token_guid})
    return self

AssetExportBasic()

Bases: AbstractCustomPackage

Base configuration for the Asset Export package.

Source code in pyatlan/model/packages/asset_export_basic.py
def __init__(
    self,
):
    super().__init__()
    self._email_addresses = None
    self._delivery_type = None
    self._export_scope = None
    self._parameters = []
Functions
adls(client_id: str, client_secret: str, tenant_id: str, account_name: str, container: str) -> AssetExportBasic

Set up package to export to Azure Data Lake Storage.

:param client_id: unique application (client) ID assigned by Azure AD when the app was registered :param client_secret: client secret for authentication :param tenant_id: unique ID of the Azure Active Directory instance :param account_name: name of the storage account :param container: container to upload the export file to

:returns: package, set up to export metadata to ADLS

Source code in pyatlan/model/packages/asset_export_basic.py
def adls(
    self,
    client_id: str,
    client_secret: str,
    tenant_id: str,
    account_name: str,
    container: str,
) -> AssetExportBasic:
    """
    Set up package to export to Azure Data Lake Storage.

    :param client_id: unique application (client) ID assigned by Azure AD when the app was registered
    :param client_secret: client secret for authentication
    :param tenant_id: unique ID of the Azure Active Directory instance
    :param account_name: name of the storage account
    :param container: container to upload the export file to

    :returns: package, set up to export metadata to ADLS
    """
    self._credentials_body.update(
        {
            "name": f"csa-{self._NAME}-{self._epoch}-0",
            "auth_type": "adls",
            "username": client_id,
            "password": client_secret,
            "extra": {
                "azure_tenant_id": tenant_id,
                "storage_account_name": account_name,
                "adls_container": container,
            },
            "connector_config_name": "csa-connectors-objectstore",
        }
    )
    return self
all_assets(prefix: str, include_description: Optional[bool] = None, include_glossaries: Optional[bool] = None, include_data_products: Optional[bool] = None, include_archived: Optional[bool] = None) -> AssetExportBasic

Set up the package to export all assets.

:param prefix: Starting value for a qualifiedName that will determine which assets to export. :param include_description: Whether to extract only user-entered description (false), or to also include system-level description (true). :param include_glossaries: Whether glossaries (and their terms and categories) should be exported, too. :param include_data_products: Whether data products (and their domains) should be exported, too. :param include_archived: Whether to include archived assets in the export (true) or only active assets (false).

:returns: package, set up to export all assets

Source code in pyatlan/model/packages/asset_export_basic.py
def all_assets(
    self,
    prefix: str,
    include_description: Optional[bool] = None,
    include_glossaries: Optional[bool] = None,
    include_data_products: Optional[bool] = None,
    include_archived: Optional[bool] = None,
) -> AssetExportBasic:
    """
    Set up the package to export all assets.

    :param prefix: Starting value for a qualifiedName that will determine which assets to export.
    :param include_description: Whether to extract only user-entered description (false), or to also include
    system-level description (true).
    :param include_glossaries: Whether glossaries (and their terms and
    categories) should be exported, too.
    :param include_data_products: Whether data products (and their domains)
    should be exported, too.
    :param include_archived: Whether to include archived assets in the export (true) or
    only active assets (false).

    :returns: package, set up to export all assets
    """
    self._export_scope = "ALL"
    self._parameters.append({"name": "export_scope", "value": self._export_scope})
    params = {
        "qn_prefix": prefix,
        "include_description": include_description,
        "include_glossaries": include_glossaries,
        "include_products": include_data_products,
        "include_archived": include_archived,
    }
    self._add_optional_params(params)

    return self
direct() -> AssetExportBasic

Set up the package to deliver the export via direct download.

:returns: package, set up to deliver the export via direct download

Source code in pyatlan/model/packages/asset_export_basic.py
def direct(self) -> AssetExportBasic:
    """
    Set up the package to deliver the export via direct download.

    :returns: package, set up to deliver the export via direct download
    """
    self._delivery_type = "DIRECT"
    self._add_delivery_parameters()
    return self
email(email_addresses: List[str]) -> AssetExportBasic

Set up the package to deliver the export via email.

:param email_addresses: List of email addresses to send the export to.

:returns: package, set up to deliver the export via email

Source code in pyatlan/model/packages/asset_export_basic.py
def email(self, email_addresses: List[str]) -> AssetExportBasic:
    """
    Set up the package to deliver the export via email.

    :param email_addresses: List of email addresses to send the export to.

    :returns: package, set up to deliver the export via email
    """
    self._delivery_type = "EMAIL"
    self._email_addresses = email_addresses
    self._add_delivery_parameters()

    return self
enriched_only(prefix: str, include_description: Optional[bool] = None, include_glossaries: Optional[bool] = None, include_data_products: Optional[bool] = None, include_archived: Optional[bool] = None) -> AssetExportBasic

Set up the package to export only enriched assets.

:param prefix: Starting value for a qualifiedName that will determine which assets to export. :param include_description: Whether to extract only user-entered description (false), or to also include system-level description (true). :param include_glossaries: Whether glossaries (and their terms and categories) should be exported, too. :param include_data_products: Whether data products (and their domains) should be exported, too. :param include_archived: Whether to include archived assets in the export (true) or only active assets (false).

:returns: package, set up to export only enriched assets

Source code in pyatlan/model/packages/asset_export_basic.py
def enriched_only(
    self,
    prefix: str,
    include_description: Optional[bool] = None,
    include_glossaries: Optional[bool] = None,
    include_data_products: Optional[bool] = None,
    include_archived: Optional[bool] = None,
) -> AssetExportBasic:
    """
    Set up the package to export only enriched assets.

    :param prefix: Starting value for a qualifiedName that will determine which assets to export.
    :param include_description: Whether to extract only user-entered description (false), or to also include
    system-level description (true).
    :param include_glossaries: Whether glossaries (and their terms and
    categories) should be exported, too.
    :param include_data_products: Whether data products (and their domains)
    should be exported, too.
    :param include_archived: Whether to include archived assets in the export (true) or
    only active assets (false).

    :returns: package, set up to export only enriched assets
    """
    self._export_scope = "ENRICHED_ONLY"
    self._parameters.append({"name": "export_scope", "value": self._export_scope})
    params = {
        "qn_prefix": prefix,
        "include_description": include_description,
        "include_glossaries": include_glossaries,
        "include_products": include_data_products,
        "include_archived": include_archived,
    }
    self._add_optional_params(params)
    return self
gcs(project_id: str, service_account_json: str, bucket: str) -> AssetExportBasic

Set up package to export to Google Cloud Storage.

:param project_id: ID of GCP project :param service_account_json: service account credentials in JSON format :param bucket: bucket to upload the export file to

:returns: package, set up to export metadata to GCS

Source code in pyatlan/model/packages/asset_export_basic.py
def gcs(
    self, project_id: str, service_account_json: str, bucket: str
) -> AssetExportBasic:
    """
    Set up package to export to Google Cloud Storage.

    :param project_id: ID of GCP project
    :param service_account_json: service account credentials in JSON format
    :param bucket: bucket to upload the export file to

    :returns: package, set up to export metadata to GCS
    """
    self._credentials_body.update(
        {
            "name": f"csa-{self._NAME}-{self._epoch}-0",
            "auth_type": "gcs",
            "username": project_id,
            "password": service_account_json,
            "extra": {
                "gcs_bucket": bucket,
            },
            "connector_config_name": "csa-connectors-objectstore",
        }
    )
    return self
glossaries_only(include_archived: Optional[bool] = None) -> AssetExportBasic

Set up the package to export only glossaries.

:param include_archived: Whether to include archived assets in the export (true) or only active assets (false).

:returns: package, set up to export only glossaries

Source code in pyatlan/model/packages/asset_export_basic.py
def glossaries_only(
    self, include_archived: Optional[bool] = None
) -> AssetExportBasic:
    """
    Set up the package to export only glossaries.

    :param include_archived: Whether to include archived assets in the export (true) or only active assets (false).

    :returns: package, set up to export only glossaries
    """
    self._export_scope = "GLOSSARIES_ONLY"
    self._parameters.append({"name": "export_scope", "value": self._export_scope})
    self._parameters.append(
        {
            "name": "include_archived",
            "value": include_archived,
        }
    )
    return self
object_store(prefix: Optional[str] = None) -> AssetExportBasic

Set up the package to export to an object storage location.

:param prefix: The directory (path) within the object store to upload the exported file.

:returns: package, set up to export metadata to an object store

Source code in pyatlan/model/packages/asset_export_basic.py
def object_store(self, prefix: Optional[str] = None) -> AssetExportBasic:
    """
    Set up the package to export to an object storage location.

    :param prefix: The directory (path) within the object store to upload the exported file.

    :returns: package, set up to export metadata to an object store
    """
    self._delivery_type = "CLOUD"
    self._add_delivery_parameters()
    self._parameters.append({"name": "target_prefix", "value": prefix})
    self._parameters.append({"name": "cloud_target", "value": "{{credentialGuid}}"})
    return self
products_only(include_archived: Optional[bool] = None) -> AssetExportBasic

Set up the package to export only data products.

:param include_archived: Whether to include archived assets in the export (true) or only active assets (false).

:returns: package, set up to export only data products

Source code in pyatlan/model/packages/asset_export_basic.py
def products_only(
    self, include_archived: Optional[bool] = None
) -> AssetExportBasic:
    """
    Set up the package to export only data products.

    :param include_archived: Whether to include archived assets in the export (true) or only active assets (false).

    :returns: package, set up to export only data products
    """
    self._export_scope = "PRODUCTS_ONLY"
    self._parameters.append({"name": "export_scope", "value": self._export_scope})
    self._parameters.append(
        {
            "name": "include_archived",
            "value": include_archived,
        }
    )
    return self
s3(access_key: str, secret_key: str, bucket: str, region: str) -> AssetExportBasic

Set up package to export to S3.

:param access_key: AWS access key :param secret_key: AWS secret key :param bucket: S3 bucket to upload the export file to :param region: AWS region

:returns: package, set up to export metadata to S3

Source code in pyatlan/model/packages/asset_export_basic.py
def s3(
    self,
    access_key: str,
    secret_key: str,
    bucket: str,
    region: str,
) -> AssetExportBasic:
    """
    Set up package to export to S3.

    :param access_key: AWS access key
    :param secret_key: AWS secret key
    :param bucket: S3 bucket to upload the export file to
    :param region: AWS region

    :returns: package, set up to export metadata to S3
    """
    self._credentials_body.update(
        {
            "name": f"csa-{self._NAME}-{self._epoch}-0",
            "auth_type": "s3",
            "username": access_key,
            "password": secret_key,
            "extra": {
                "region": region,
                "s3_bucket": bucket,
            },
            "connector_config_name": "csa-connectors-objectstore",
        }
    )
    return self

AssetImport()

Bases: AbstractCustomPackage

Base configuration for a new Asset Import package.

Source code in pyatlan/model/packages/asset_import.py
def __init__(
    self,
):
    self._assets_advanced = False
    self._glossaries_advanced = False
    self._data_product_advanced = False
    super().__init__()
Functions
adls(client_id: str, client_secret: str, tenant_id: str, account_name: str, container: str) -> AssetImport

Set up package to import metadata from ADLS.

:param client_id: unique application (client) ID assigned by Azure AD when the app was registered :param client_secret: client secret for authentication :param tenant_id: unique ID of the Azure Active Directory instance :param account_name: name of the storage account :param container: container to retrieve object store objects from

:returns: package, set up to import metadata from ADLS

Source code in pyatlan/model/packages/asset_import.py
def adls(
    self,
    client_id: str,
    client_secret: str,
    tenant_id: str,
    account_name: str,
    container: str,
) -> AssetImport:
    """
    Set up package to import metadata from ADLS.

    :param client_id: unique application (client) ID assigned by Azure AD when the app was registered
    :param client_secret: client secret for authentication
    :param tenant_id: unique ID of the Azure Active Directory instance
    :param account_name: name of the storage account
    :param container: container to retrieve object store objects from

    :returns: package, set up to import metadata from ADLS
    """
    local_creds = {
        "name": f"csa-{self._NAME}-{self._epoch}-0",
        "auth_type": "adls",
        "username": client_id,
        "password": client_secret,
        "extra": {
            "azure_tenant_id": tenant_id,
            "storage_account_name": account_name,
            "adls_container": container,
        },
        "connector_config_name": "csa-connectors-objectstore",
    }
    self._credentials_body.update(local_creds)
    return self
assets(prefix: str, object_key: str, input_handling: AssetInputHandling = AssetInputHandling.UPDATE) -> AssetImport

Set up package to import assets.

:param prefix: directory (path) within the object store from which to retrieve the file containing asset metadata :param object_key: object key (filename), including its extension, within the object store and prefix :param input_handling: specifies whether to allow the creation of new assets from the input CSV (full or partial assets) or only update existing assets in Atlan

:returns: package, configured to import assets

Source code in pyatlan/model/packages/asset_import.py
def assets(
    self,
    prefix: str,
    object_key: str,
    input_handling: AssetInputHandling = AssetInputHandling.UPDATE,
) -> AssetImport:
    """
    Set up package to import assets.

    :param prefix: directory (path) within the object store from
        which to retrieve the file containing asset metadata
    :param object_key: object key (filename),
        including its extension, within the object store and prefix
    :param input_handling: specifies whether to allow the creation
        of new assets from the input CSV (full or partial assets)
        or only update existing assets in Atlan

    :returns: package, configured to import assets
    """
    self._parameters.append({"name": "assets_prefix", "value": prefix})
    self._parameters.append({"name": "assets_key", "value": object_key})
    self._parameters.append(
        {"name": "assets_upsert_semantic", "value": input_handling}
    )
    return self
assets_advanced(remove_attributes: Optional[Union[List[str], List[AtlanField]]] = None, fail_on_errors: Optional[bool] = None, case_sensitive_match: Optional[bool] = None, is_table_view_agnostic: Optional[bool] = None, field_separator: Optional[str] = None, batch_size: Optional[int] = None) -> AssetImport

Set up package to import assets with advanced configuration.

:param remove_attributes: list of attributes to clear (remove) from assets if their value is blank in the provided file. :param fail_on_errors: specifies whether an invalid value in a field should cause the import to fail (True) or log a warning, skip that value, and proceed (False). :param case_sensitive_match: indicates whether to use case-sensitive matching when running in update-only mode (True) or to try case-insensitive matching (False). :param is_table_view_agnostic: specifies whether to treat tables, views, and materialized views as interchangeable (True) or to strictly adhere to specified types in the input (False). :param field_separator: character used to separate fields in the input file (e.g., ',' or ';'). :param batch_size: maximum number of rows to process at a time (per API request).

:returns: package, configured to import assets with advanced configuration.

Source code in pyatlan/model/packages/asset_import.py
def assets_advanced(
    self,
    remove_attributes: Optional[Union[List[str], List[AtlanField]]] = None,
    fail_on_errors: Optional[bool] = None,
    case_sensitive_match: Optional[bool] = None,
    is_table_view_agnostic: Optional[bool] = None,
    field_separator: Optional[str] = None,
    batch_size: Optional[int] = None,
) -> AssetImport:
    """
    Set up package to import assets with advanced configuration.

    :param remove_attributes: list of attributes to clear (remove)
        from assets if their value is blank in the provided file.
    :param fail_on_errors: specifies whether an invalid value
        in a field should cause the import to fail (`True`) or
        log a warning, skip that value, and proceed (`False`).
    :param case_sensitive_match: indicates whether to use
        case-sensitive matching when running in update-only mode (`True`)
        or to try case-insensitive matching (`False`).
    :param is_table_view_agnostic: specifies whether to treat
        tables, views, and materialized views as interchangeable (`True`)
        or to strictly adhere to specified types in the input (`False`).
    :param field_separator: character used to separate
        fields in the input file (e.g., ',' or ';').
    :param batch_size: maximum number of rows
        to process at a time (per API request).

    :returns: package, configured to import
        assets with advanced configuration.
    """
    if isinstance(remove_attributes, list) and all(
        isinstance(field, AtlanField) for field in remove_attributes
    ):
        remove_attributes = [field.atlan_field_name for field in remove_attributes]  # type: ignore
    params = {
        "assets_attr_to_overwrite": dumps(remove_attributes, separators=(",", ":")),
        "assets_fail_on_errors": fail_on_errors,
        "assets_case_sensitive": case_sensitive_match,
        "assets_table_view_agnostic": is_table_view_agnostic,
        "assets_field_separator": field_separator,
        "assets_batch_size": batch_size,
    }
    self._add_optional_params(params)
    self._assets_advanced = True
    return self
data_product_advanced(remove_attributes: Optional[Union[List[str], List[AtlanField]]] = None, fail_on_errors: Optional[bool] = None, field_separator: Optional[str] = None, batch_size: Optional[int] = None) -> AssetImport

Set up package to import data domain and data products with advanced configuration.

:param remove_attributes: list of attributes to clear (remove) from assets if their value is blank in the provided file. :param fail_on_errors: specifies whether an invalid value in a field should cause the import to fail (True) or log a warning, skip that value, and proceed (False). :param field_separator: character used to separate fields in the input file (e.g., ',' or ';'). :param batch_size: maximum number of rows to process at a time (per API request).

:returns: package, configured to import data domain and data products with advanced configuration.

Source code in pyatlan/model/packages/asset_import.py
def data_product_advanced(
    self,
    remove_attributes: Optional[Union[List[str], List[AtlanField]]] = None,
    fail_on_errors: Optional[bool] = None,
    field_separator: Optional[str] = None,
    batch_size: Optional[int] = None,
) -> AssetImport:
    """
    Set up package to import data domain
    and data products with advanced configuration.

    :param remove_attributes: list of attributes to clear (remove)
        from assets if their value is blank in the provided file.
    :param fail_on_errors: specifies whether an invalid value
        in a field should cause the import to fail (`True`) or
        log a warning, skip that value, and proceed (`False`).
    :param field_separator: character used to separate
        fields in the input file (e.g., ',' or ';').
    :param batch_size: maximum number of rows
        to process at a time (per API request).

    :returns: package, configured to import
        data domain and data products with advanced configuration.
    """
    if isinstance(remove_attributes, list) and all(
        isinstance(field, AtlanField) for field in remove_attributes
    ):
        remove_attributes = [field.atlan_field_name for field in remove_attributes]  # type: ignore
    params = {
        "data_products_attr_to_overwrite": dumps(
            remove_attributes, separators=(",", ":")
        ),
        "data_products_fail_on_errors": fail_on_errors,
        "data_products_field_separator": field_separator,
        "data_products_batch_size": batch_size,
    }
    self._add_optional_params(params)
    self._data_product_advanced = True
    return self
data_products(prefix: str, object_key: str, input_handling: AssetInputHandling = AssetInputHandling.UPDATE) -> AssetImport

Set up package to import data products.

:param prefix: directory (path) within the object store from which to retrieve the file containing data domains, and data products :param object_key: object key (filename), including its extension, within the object store and prefix :param input_handling: specifies whether to allow the creation of new data domains, and data products from the input CSV, or ensure these are only updated if they already exist in Atlan.

:returns: package, configured to import data domain and data products

Source code in pyatlan/model/packages/asset_import.py
def data_products(
    self,
    prefix: str,
    object_key: str,
    input_handling: AssetInputHandling = AssetInputHandling.UPDATE,
) -> AssetImport:
    """
    Set up package to import data products.

    :param prefix: directory (path) within the object store from
        which to retrieve the file containing data domains, and data products
    :param object_key: object key (filename),
        including its extension, within the object store and prefix
    :param input_handling: specifies whether to allow the creation of new data domains, and data products
        from the input CSV, or ensure these are only updated if they already exist in Atlan.

    :returns: package, configured to import data domain and data products
    """
    self._parameters.append({"name": "data_products_prefix", "value": prefix})
    self._parameters.append({"name": "data_products_key", "value": object_key})
    self._parameters.append(
        {"name": "data_products_upsert_semantic", "value": input_handling}
    )
    return self
gcs(project_id: str, service_account_json: str, bucket: str) -> AssetImport

Set up package to import metadata from GCS.

:param project_id: ID of GCP project :param service_account_json: service account credentials in JSON format :param bucket: bucket to retrieve object store object from

:returns: Package set up to import metadata from GCS

Source code in pyatlan/model/packages/asset_import.py
def gcs(
    self, project_id: str, service_account_json: str, bucket: str
) -> AssetImport:
    """
    Set up package to import metadata from GCS.

    :param project_id: ID of GCP project
    :param service_account_json: service account credentials in JSON format
    :param bucket: bucket to retrieve object store object from

    :returns: Package set up to import metadata from GCS
    """
    local_creds = {
        "name": f"csa-{self._NAME}-{self._epoch}-0",
        "auth_type": "gcs",
        "username": project_id,
        "password": service_account_json,
        "extra": {
            "gcs_bucket": bucket,
        },
        "connector_config_name": "csa-connectors-objectstore",
    }
    self._credentials_body.update(local_creds)
    return self
glossaries(prefix: str, object_key: str, input_handling: AssetInputHandling = AssetInputHandling.UPDATE) -> AssetImport

Set up package to import glossaries.

:param prefix: directory (path) within the object store from which to retrieve the file containing glossaries, categories and terms :param object_key: object key (filename), including its extension, within the object store and prefix :param input_handling: specifies whether to allow the creation of new glossaries, categories and terms from the input CSV, or ensure these are only updated if they already exist in Atlan.

:returns: package, configured to import glossaries, categories and terms.

Source code in pyatlan/model/packages/asset_import.py
def glossaries(
    self,
    prefix: str,
    object_key: str,
    input_handling: AssetInputHandling = AssetInputHandling.UPDATE,
) -> AssetImport:
    """
    Set up package to import glossaries.

    :param prefix: directory (path) within the object store from
        which to retrieve the file containing glossaries, categories and terms
    :param object_key: object key (filename),
        including its extension, within the object store and prefix
    :param input_handling: specifies whether to allow the creation of new glossaries,
        categories and terms from the input CSV, or ensure these are only updated
        if they already exist in Atlan.

    :returns: package, configured to import glossaries, categories and terms.
    """
    self._parameters.append({"name": "glossaries_prefix", "value": prefix})
    self._parameters.append({"name": "glossaries_key", "value": object_key})
    self._parameters.append(
        {"name": "glossaries_upsert_semantic", "value": input_handling}
    )
    return self
glossaries_advanced(remove_attributes: Optional[Union[List[str], List[AtlanField]]] = None, fail_on_errors: Optional[bool] = None, field_separator: Optional[str] = None, batch_size: Optional[int] = None) -> AssetImport

Set up package to import glossaries with advanced configuration.

:param remove_attributes: list of attributes to clear (remove) from assets if their value is blank in the provided file. :param fail_on_errors: specifies whether an invalid value in a field should cause the import to fail (True) or log a warning, skip that value, and proceed (False). :param field_separator: character used to separate fields in the input file (e.g., ',' or ';'). :param batch_size: maximum number of rows to process at a time (per API request).

:returns: package, configured to import glossaries with advanced configuration.

Source code in pyatlan/model/packages/asset_import.py
def glossaries_advanced(
    self,
    remove_attributes: Optional[Union[List[str], List[AtlanField]]] = None,
    fail_on_errors: Optional[bool] = None,
    field_separator: Optional[str] = None,
    batch_size: Optional[int] = None,
) -> AssetImport:
    """
    Set up package to import glossaries with advanced configuration.

    :param remove_attributes: list of attributes to clear (remove)
        from assets if their value is blank in the provided file.
    :param fail_on_errors: specifies whether an invalid value
        in a field should cause the import to fail (`True`) or
        log a warning, skip that value, and proceed (`False`).
    :param field_separator: character used to separate
        fields in the input file (e.g., ',' or ';').
    :param batch_size: maximum number of rows
        to process at a time (per API request).

    :returns: package, configured to import
        glossaries with advanced configuration.
    """
    if isinstance(remove_attributes, list) and all(
        isinstance(field, AtlanField) for field in remove_attributes
    ):
        remove_attributes = [field.atlan_field_name for field in remove_attributes]  # type: ignore
    params = {
        "glossaries_attr_to_overwrite": dumps(
            remove_attributes, separators=(",", ":")
        ),
        "glossaries_fail_on_errors": fail_on_errors,
        "glossaries_field_separator": field_separator,
        "glossaries_batch_size": batch_size,
    }
    self._add_optional_params(params)
    self._glossaries_advanced = True
    return self
object_store() -> AssetImport

Set up the package to import metadata directly from the object store.

Source code in pyatlan/model/packages/asset_import.py
def object_store(self) -> AssetImport:
    """
    Set up the package to import
    metadata directly from the object store.
    """
    self._parameters.append({"name": "import_type", "value": "CLOUD"})
    self._parameters.append({"name": "cloud_source", "value": "{{credentialGuid}}"})
    return self
s3(access_key: str, secret_key: str, region: str, bucket: str) -> AssetImport

Set up package to import metadata from S3.

:param access_key: AWS access key :param secret_key: AWS secret key :param region: AWS region :param bucket: bucket to retrieve object store object from

:returns: package, set up to import metadata from S3

Source code in pyatlan/model/packages/asset_import.py
def s3(
    self,
    access_key: str,
    secret_key: str,
    region: str,
    bucket: str,
) -> AssetImport:
    """
    Set up package to import metadata from S3.

    :param access_key: AWS access key
    :param secret_key: AWS secret key
    :param region: AWS region
    :param bucket: bucket to retrieve object store object from

    :returns: package, set up to import metadata from S3
    """
    local_creds = {
        "name": f"csa-{self._NAME}-{self._epoch}-0",
        "auth_type": "s3",
        "username": access_key,
        "password": secret_key,
        "extra": {
            "region": region,
            "s3_bucket": bucket,
        },
        "connector_config_name": "csa-connectors-objectstore",
    }
    self._credentials_body.update(local_creds)
    return self

BigQueryCrawler(client: AtlanClient, connection_name: str, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None, admin_users: Optional[List[str]] = None, allow_query: bool = True, allow_query_preview: bool = True, row_limit: int = 10000)

Bases: AbstractCrawler

Base configuration for a new BigQuery crawler.

:param client: connectivity to an Atlan tenant :param connection_name: name for the connection :param admin_roles: admin roles for the connection :param admin_groups: admin groups for the connection :param admin_users: admin users for the connection :param allow_query: allow data to be queried in the connection (True) or not (False), default: True :param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False), default: True :param row_limit: maximum number of rows that can be returned by a query, default: 10000

Source code in pyatlan/model/packages/big_query_crawler.py
def __init__(
    self,
    client: AtlanClient,
    connection_name: str,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
    admin_users: Optional[List[str]] = None,
    allow_query: bool = True,
    allow_query_preview: bool = True,
    row_limit: int = 10000,
):
    self._advanced_config = False
    super().__init__(
        client=client,
        connection_name=connection_name,
        connection_type=self._CONNECTOR_TYPE,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
        admin_users=admin_users,
        allow_query=allow_query,
        allow_query_preview=allow_query_preview,
        row_limit=row_limit,
        source_logo=self._PACKAGE_LOGO,
    )
Functions
custom_config(config: Dict) -> BigQueryCrawler

Defines custom JSON configuration controlling experimental feature flags for the crawler.

:param config: custom configuration dict eg: {"ignore-all-case": True} to enable crawling assets with case-sensitive identifiers. :returns: miner, set to include custom configuration

Source code in pyatlan/model/packages/big_query_crawler.py
def custom_config(self, config: Dict) -> BigQueryCrawler:
    """
    Defines custom JSON configuration controlling
    experimental feature flags for the crawler.

    :param config: custom configuration dict eg:
    `{"ignore-all-case": True}` to enable crawling
    assets with case-sensitive identifiers.
    :returns: miner, set to include custom configuration
    """
    config and self._parameters.append(
        dict(name="control-config", value=str(config))
    )
    self._advanced_config = True
    return self
exclude(assets: dict) -> BigQueryCrawler

Defines the filter for assets to exclude when crawling.

:param assets: Map keyed by project name with each value being a list of tables :returns: crawler, set to exclude only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/big_query_crawler.py
def exclude(self, assets: dict) -> BigQueryCrawler:
    """
    Defines the filter for assets to exclude when crawling.

    :param assets: Map keyed by project name
    with each value being a list of tables
    :returns: crawler, set to exclude only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    exclude_assets = assets or {}
    to_exclude = self.build_hierarchical_filter(exclude_assets)
    self._parameters.append(dict(name="exclude-filter", value=to_exclude or "{}"))
    return self
exclude_regex(regex: str) -> BigQueryCrawler

Defines the exclude regex for crawler ignore tables and views based on a naming convention.

:param regex: exclude regex for the crawler :returns: crawler, set to exclude only those assets specified in the regex

Source code in pyatlan/model/packages/big_query_crawler.py
def exclude_regex(self, regex: str) -> BigQueryCrawler:
    """
    Defines the exclude regex for crawler ignore
    tables and views based on a naming convention.

    :param regex: exclude regex for the crawler
    :returns: crawler, set to exclude
    only those assets specified in the regex
    """
    self._parameters.append(dict(name="temp-table-regex", value=regex))
    return self
include(assets: dict) -> BigQueryCrawler

Defines the filter for assets to include when crawling.

:param assets: Map keyed by project name with each value being a list of tables :returns: crawler, set to include only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/big_query_crawler.py
def include(self, assets: dict) -> BigQueryCrawler:
    """
    Defines the filter for assets to include when crawling.

    :param assets: Map keyed by project name
    with each value being a list of tables
    :returns: crawler, set to include only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    include_assets = assets or {}
    to_include = self.build_hierarchical_filter(include_assets)
    self._parameters.append(dict(name="include-filter", value=to_include or "{}"))
    return self
service_account_auth(project_id: str, service_account_json: str, service_account_email: str) -> BigQueryCrawler

Set up the crawler to use service account authentication.

:param project_id: project ID of your Google Cloud project :param service_account_json: entire service account json :param service_account_email: service account email :returns: crawler, set up to use service account authentication

Source code in pyatlan/model/packages/big_query_crawler.py
def service_account_auth(
    self,
    project_id: str,
    service_account_json: str,
    service_account_email: str,
) -> BigQueryCrawler:
    """
    Set up the crawler to use service account authentication.

    :param project_id: project ID of your Google Cloud project
    :param service_account_json: entire service account json
    :param service_account_email: service account email
    :returns: crawler, set up to use service account authentication
    """
    creds = {
        "name": f"default-bigquery-{self._epoch}-0",
        "host": "https://www.googleapis.com/bigquery/v2",
        "port": 443,
        "auth_type": "basic",
        "username": service_account_email,
        "password": service_account_json,
        "extras": {"project_id": project_id},
        "connector_config_name": f"atlan-connectors-{self._NAME}",
    }
    self._credentials_body.update(creds)
    return self

ConfluentKafkaCrawler(client: AtlanClient, connection_name: str, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None, admin_users: Optional[List[str]] = None, allow_query: bool = False, allow_query_preview: bool = False, row_limit: int = 0)

Bases: AbstractCrawler

Base configuration for a new Confluent Kafka crawler.

:param client: connectivity to an Atlan tenant :param connection_name: name for the connection :param admin_roles: admin roles for the connection :param admin_groups: admin groups for the connection :param admin_users: admin users for the connection :param allow_query: allow data to be queried in the connection (True) or not (False), default: False :param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False), default: False :param row_limit: maximum number of rows that can be returned by a query, default: 0

Source code in pyatlan/model/packages/confluent_kafka_crawler.py
def __init__(
    self,
    client: AtlanClient,
    connection_name: str,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
    admin_users: Optional[List[str]] = None,
    allow_query: bool = False,
    allow_query_preview: bool = False,
    row_limit: int = 0,
):
    super().__init__(
        client=client,
        connection_name=connection_name,
        connection_type=self._CONNECTOR_TYPE,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
        admin_users=admin_users,
        allow_query=allow_query,
        allow_query_preview=allow_query_preview,
        row_limit=row_limit,
        source_logo=self._PACKAGE_LOGO,
    )
Functions
api_token(api_key: str, api_secret: str) -> ConfluentKafkaCrawler

Set up the crawler to use API token-based authentication.

:param api_key: through which to access Kafka :param api_secret: through which to access Kafka :returns: crawler, set up to use API token-based authentication

Source code in pyatlan/model/packages/confluent_kafka_crawler.py
def api_token(
    self,
    api_key: str,
    api_secret: str,
) -> ConfluentKafkaCrawler:
    """
    Set up the crawler to use API token-based authentication.

    :param api_key: through which to access Kafka
    :param api_secret: through which to access Kafka
    :returns: crawler, set up to use API token-based authentication
    """
    local_creds = {
        "auth_type": "basic",
        "username": api_key,
        "password": api_secret,
    }
    self._credentials_body.update(local_creds)
    return self
direct(bootstrap: str, encrypted: bool = True) -> ConfluentKafkaCrawler

Set up the crawler to extract directly from Kafka.

:param bootstrap: hostname and port number (host.example.com:9092) for the Kafka bootstrap server :param encrypted: whether to use an encrypted SSL connection (True), or plaintext (False), default: True :returns: crawler, set up to extract directly from Kafka

Source code in pyatlan/model/packages/confluent_kafka_crawler.py
def direct(self, bootstrap: str, encrypted: bool = True) -> ConfluentKafkaCrawler:
    """
    Set up the crawler to extract directly from Kafka.

    :param bootstrap: hostname and port number (host.example.com:9092) for the Kafka bootstrap server
    :param encrypted: whether to use an encrypted SSL connection (True), or plaintext (False), default: True
    :returns: crawler, set up to extract directly from Kafka
    """
    local_creds = {
        "name": f"default-{self._NAME}-{self._epoch}-0",
        "host": bootstrap,
        "port": 9092,
        "extra": {
            "security_protocol": "SASL_SSL" if encrypted else "SASL_PLAINTEXT"
        },
        "connector_config_name": "atlan-connectors-kafka-confluent-cloud",
    }
    self._credentials_body.update(local_creds)
    self._parameters.append(dict(name="extraction-method", value="direct"))
    return self
exclude(regex: str = '') -> ConfluentKafkaCrawler

Defines a regular expression to use for excluding topics when crawling.

:param regex: any topic names that match this regular expression will be excluded from crawling :returns: crawler, set to exclude any topics that match the provided regular expression

Source code in pyatlan/model/packages/confluent_kafka_crawler.py
def exclude(self, regex: str = "") -> ConfluentKafkaCrawler:
    """
    Defines a regular expression to use for excluding topics when crawling.

    :param regex: any topic names that match this
    regular expression will be excluded from crawling
    :returns: crawler, set to exclude any topics
    that match the provided regular expression
    """
    if not regex:
        return self
    self._parameters.append(dict(name="exclude-filter", value=regex))
    return self
include(regex: str = '') -> ConfluentKafkaCrawler

Defines the filter for topics to include when crawling.

:param regex: any topic names that match this regular expression will be included in crawling :returns: crawler, set to include only those topics specified

Source code in pyatlan/model/packages/confluent_kafka_crawler.py
def include(self, regex: str = "") -> ConfluentKafkaCrawler:
    """
    Defines the filter for topics to include when crawling.

    :param regex: any topic names that match this
    regular expression will be included in crawling
    :returns: crawler, set to include only those topics specified
    """
    if not regex:
        return self
    self._parameters.append(dict(name="include-filter", value=regex))
    return self
skip_internal(enabled: bool = True) -> ConfluentKafkaCrawler

Whether to skip internal topics when crawling (True) or include them.

:param enabled: if True, internal topics will be skipped when crawling, default: True :returns: crawler, set to include or exclude internal topics

Source code in pyatlan/model/packages/confluent_kafka_crawler.py
def skip_internal(self, enabled: bool = True) -> ConfluentKafkaCrawler:
    """
    Whether to skip internal topics when crawling (True) or include them.

    :param enabled: if True, internal topics
    will be skipped when crawling, default: True
    :returns: crawler, set to include or exclude internal topics
    """
    self._parameters.append(
        {
            "name": "skip-internal-topics",
            "value": "true" if enabled else "false",
        }
    )
    return self

ConnectionDelete(qualified_name: str, purge: bool)

Bases: AbstractMiner

Base configuration for a new connection delete workflow.

:param qualified_name: unique name of the connection whose assets should be deleted :param purge: if True, permanently delete the connection and its assets, otherwise only archive (soft-delete) them False

Source code in pyatlan/model/packages/connection_delete.py
def __init__(
    self,
    qualified_name: str,
    purge: bool,
):
    super().__init__(connection_qualified_name=qualified_name)
    self._parameters.append(dict(name="delete-assets", value="true"))
    self._parameters.append(
        dict(name="delete-type", value="PURGE" if purge else "SOFT")
    )

DatabricksCrawler(client: AtlanClient, connection_name: str, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None, admin_users: Optional[List[str]] = None, allow_query: bool = True, allow_query_preview: bool = True, row_limit: int = 10000)

Bases: AbstractCrawler

Base configuration for a new Databricks crawler.

:param client: connectivity to an Atlan tenant :param connection_name: name for the connection :param admin_roles: admin roles for the connection :param admin_groups: admin groups for the connection :param admin_users: admin users for the connection :param allow_query: allow data to be queried in the connection (True) or not (False), default: True :param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False), default: True :param row_limit: maximum number of rows that can be returned by a query, default: 10000

Source code in pyatlan/model/packages/databricks_crawler.py
def __init__(
    self,
    client: AtlanClient,
    connection_name: str,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
    admin_users: Optional[List[str]] = None,
    allow_query: bool = True,
    allow_query_preview: bool = True,
    row_limit: int = 10000,
):
    self._advanced_config = False
    super().__init__(
        client=client,
        connection_name=connection_name,
        connection_type=self._CONNECTOR_TYPE,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
        admin_users=admin_users,
        allow_query=allow_query,
        allow_query_preview=allow_query_preview,
        row_limit=row_limit,
        source_logo=self._PACKAGE_LOGO,
    )
Functions
asset_selection_for_system_tables(selection_criteria: List[DatabricksCrawler.AssetsSelection]) -> DatabricksCrawler

Defines the filter for system table assets to include or exclude when crawling.

This method allows you to configure asset selection specifically for Databricks system tables using various selection criteria including hierarchical filtering and regex-based filtering.

:param selection_criteria: List of selection criteria objects containing the type of selection (include/exclude) and the corresponding values for filtering system table assets :returns: crawler, configured with system table asset selection filters

Source code in pyatlan/model/packages/databricks_crawler.py
def asset_selection_for_system_tables(
    self, selection_criteria: List[DatabricksCrawler.AssetsSelection]
) -> DatabricksCrawler:
    """
    Defines the filter for system table assets to include or exclude when crawling.

    This method allows you to configure asset selection specifically for Databricks
    system tables using various selection criteria including hierarchical filtering
    and regex-based filtering.

    :param selection_criteria: List of selection criteria objects containing
    the type of selection (include/exclude) and the corresponding values
    for filtering system table assets
    :returns: crawler, configured with system table asset selection filters
    """
    for criteria in selection_criteria:
        if (
            criteria.type
            == DatabricksCrawler.AssetsSelectionCriteria.INCLUDE_BY_HIERARCHY
        ):
            include_assets = criteria.values or {}
            to_include = self.build_selective_hierarchical_filter(include_assets)
            self._parameters.append(
                dict(name=criteria.type.value, value=to_include or "{}")
            )

        elif (
            criteria.type
            == DatabricksCrawler.AssetsSelectionCriteria.EXCLUDE_BY_HIERARCHY
        ):
            exclude_assets = criteria.values or {}
            to_exclude = self.build_selective_hierarchical_filter(exclude_assets)
            self._parameters.append(
                dict(name=criteria.type.value, value=to_exclude or "{}")
            )

        elif (
            criteria.type
            == DatabricksCrawler.AssetsSelectionCriteria.INCLUDE_BY_REGEX
        ):
            include_regex = criteria.values
            asset_type = include_regex.get("asset_type")
            if asset_type not in DatabricksCrawler.RegexAssetTypes:
                raise ValueError(
                    f"Invalid asset_type: {asset_type}. Must be one of {[e.value for e in DatabricksCrawler.RegexAssetTypes]}"
                )
            self._parameters.append(
                dict(
                    name=f"include-{asset_type.value}-regex",
                    value=include_regex.get("regex", ""),
                )
            )

        elif (
            criteria.type
            == DatabricksCrawler.AssetsSelectionCriteria.EXCLUDE_BY_REGEX
        ):
            exclude_regex = criteria.values
            asset_type = exclude_regex.get("asset_type")
            if asset_type not in DatabricksCrawler.RegexAssetTypes:
                raise ValueError(
                    f"Invalid asset_type: {asset_type}. Must be one of {[e.value for e in DatabricksCrawler.RegexAssetTypes]}"
                )
            self._parameters.append(
                dict(
                    # NOTE: temp-table-regex-system-tables is the name
                    # of the parameter for exclude regex for system tables (TABLE_VIEWS)
                    name="temp-table-regex-system-tables"
                    if asset_type == DatabricksCrawler.RegexAssetTypes.TABLE_VIEWS
                    else f"exclude-{asset_type.value}-regex",
                    value=exclude_regex.get("regex", ""),
                )
            )
    return self
aws_service(client_id: str, client_secret: str) -> DatabricksCrawler

Set up the crawler to use AWS service principal.

:param client_id: client ID for your AWS service principal :param client_secret: client secret for your AWS service principal :returns: crawler, set up to use AWS service principal

Source code in pyatlan/model/packages/databricks_crawler.py
def aws_service(self, client_id: str, client_secret: str) -> DatabricksCrawler:
    """
    Set up the crawler to use AWS service principal.

    :param client_id: client ID for your AWS service principal
    :param client_secret: client secret for your AWS service principal
    :returns: crawler, set up to use AWS service principal
    """
    local_creds = {
        "authType": "aws_service",
        "username": "",
        "connector_type": "rest",
        "extra": {"client_id": client_id, "client_secret": client_secret},
    }
    self._credentials_body.update(local_creds)
    return self
azure_service(client_id: str, client_secret: str, tenant_id: str) -> DatabricksCrawler

Set up the crawler to use Azure service principal.

:param client_id: client ID for Azure service principal :param client_secret: client secret for your Azure service principal :param tenant_id: tenant ID (directory ID) for Azure service principal :returns: crawler, set up to use Azure service principal

Source code in pyatlan/model/packages/databricks_crawler.py
def azure_service(
    self, client_id: str, client_secret: str, tenant_id: str
) -> DatabricksCrawler:
    """
    Set up the crawler to use Azure service principal.

    :param client_id: client ID for Azure service principal
    :param client_secret: client secret for your Azure service principal
    :param tenant_id: tenant ID (directory ID) for Azure service principal
    :returns: crawler, set up to use Azure service principal
    """
    local_creds = {
        "authType": "azure_service",
        "username": "",
        "connector_type": "rest",
        "extra": {
            "client_id": client_id,
            "client_secret": client_secret,
            "tenant_id": tenant_id,
        },
    }
    self._credentials_body.update(local_creds)
    return self
basic_auth(personal_access_token: str, http_path: str) -> DatabricksCrawler

(DEPRECATED) Set up the crawler to use basic authentication.

:param personal_access_token: through which to access Databricks instance :param http_path: HTTP path of your Databricks instance :returns: crawler, set up to use basic authentication

Source code in pyatlan/model/packages/databricks_crawler.py
def basic_auth(
    self, personal_access_token: str, http_path: str
) -> DatabricksCrawler:
    """
    (DEPRECATED) Set up the crawler to use basic authentication.

    :param personal_access_token: through which to access Databricks instance
    :param http_path: HTTP path of your Databricks instance
    :returns: crawler, set up to use basic authentication
    """
    warn(
        "This method is deprecated, please use 'pat()' instead, which offers identical functionality.",
        DeprecationWarning,
        stacklevel=2,
    )
    local_creds = {
        "authType": "basic",
        "username": "",
        "password": personal_access_token,
        "connector_type": "dual",
        "extra": {
            "__http_path": http_path,
        },
    }
    self._credentials_body.update(local_creds)
    return self
direct(hostname: str, port: int = 443) -> DatabricksCrawler

Set up the crawler to extract directly from the Databricks.

:param hostname: hostname of the Databricks instance :param port: port number of the Databricks instance. default: 443 :returns: crawler, set up to extract directly from the Databricks

Source code in pyatlan/model/packages/databricks_crawler.py
def direct(self, hostname: str, port: int = 443) -> DatabricksCrawler:
    """
    Set up the crawler to extract directly from the Databricks.

    :param hostname: hostname of the Databricks instance
    :param port: port number of the Databricks instance. default: `443`
    :returns: crawler, set up to extract directly from the Databricks
    """
    local_creds = {
        "name": f"default-{self._NAME}-{self._epoch}-0",
        "host": hostname,
        "port": port,
        "connector_config_name": f"atlan-connectors-{self._NAME}",
    }
    self._credentials_body.update(local_creds)
    self._parameters.append(dict(name="extraction-method", value="direct"))
    return self
enable_cross_workspace_discovery(include: bool = False) -> DatabricksCrawler

Whether to enable cross-workspace discovery to discover assets from other workspaces.

:param include: if True, cross-workspace discovery will be included while crawling Databricks, default: False :returns: crawler, set to include or exclude cross-workspace discovery

Source code in pyatlan/model/packages/databricks_crawler.py
def enable_cross_workspace_discovery(
    self, include: bool = False
) -> DatabricksCrawler:
    """
    Whether to enable cross-workspace discovery to discover assets from other workspaces.

    :param include: if True, cross-workspace discovery will be included while crawling Databricks, default: False
    :returns: crawler, set to include or exclude cross-workspace discovery
    """
    self._parameters.append(
        {
            "name": "enable-cross-workspace-discovery",
            "value": "true" if include else "false",
        }
    )
    return self
enable_incremental_extraction(include: bool = False) -> DatabricksCrawler

Whether to enable or disable schema incremental extraction on source.

:param include: if True, incremental extraction will be included while crawling Databricks, default: False :returns: crawler, set to include or exclude incremental extraction

Source code in pyatlan/model/packages/databricks_crawler.py
def enable_incremental_extraction(self, include: bool = False) -> DatabricksCrawler:
    """
    Whether to enable or disable schema incremental extraction on source.

    :param include: if True, incremental extraction will be included while crawling Databricks, default: False
    :returns: crawler, set to include or exclude incremental extraction
    """
    self._parameters.append(
        {"name": "incremental-extraction", "value": "true" if include else "false"}
    )
    return self
enable_source_level_filtering(include: bool = False) -> DatabricksCrawler

Whether to enable or disable schema level filtering on source. schemas selected in the include filter will be fetched.

:param include: if True, schemas selected in the include filter will be fetched while crawling Databricks, default: False :returns: crawler, set to include or exclude source level filtering

Source code in pyatlan/model/packages/databricks_crawler.py
def enable_source_level_filtering(self, include: bool = False) -> DatabricksCrawler:
    """
    Whether to enable or disable schema level filtering on source.
    schemas selected in the include filter will be fetched.

    :param include: if True, schemas selected in the include
    filter will be fetched while crawling Databricks, default: False
    :returns: crawler, set to include or exclude source level filtering
    """
    self._parameters.append(
        {
            "name": "use-source-schema-filtering",
            "value": "true" if include else "false",
        }
    )
    self._advanced_config = True
    return self
enable_view_lineage(include: bool = True) -> DatabricksCrawler

Whether to enable view lineage as part of crawling Databricks.

:param include: if True, view lineage will be included while crawling Databricks, default: True :returns: crawler, set to include or exclude view lineage

Source code in pyatlan/model/packages/databricks_crawler.py
def enable_view_lineage(self, include: bool = True) -> DatabricksCrawler:
    """
    Whether to enable view lineage as part of crawling Databricks.

    :param include: if True, view lineage will be included while crawling Databricks, default: True
    :returns: crawler, set to include or exclude view lineage
    """
    self._parameters.append({"name": "enable-view-lineage", "value": include})
    return self
exclude(assets: dict) -> DatabricksCrawler

Defines the filter for assets to exclude when crawling.

:param assets: Map keyed by database name with each value being a list of schemas :returns: crawler, set to exclude only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/databricks_crawler.py
def exclude(self, assets: dict) -> DatabricksCrawler:
    """
    Defines the filter for assets to exclude when crawling.

    :param assets: Map keyed by database name with each value being a list of schemas
    :returns: crawler, set to exclude only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    exclude_assets = assets or {}
    to_exclude = self.build_hierarchical_filter(exclude_assets)
    self._parameters.append(dict(name="exclude-filter", value=to_exclude or "{}"))
    return self
exclude_for_rest_api(assets: List[str]) -> DatabricksCrawler

Defines the filter for assets to exclude when crawling. (When using REST API extraction method).

:param assets: list of databases names to exclude when crawling :returns: crawler, set to exclude only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/databricks_crawler.py
def exclude_for_rest_api(self, assets: List[str]) -> DatabricksCrawler:
    """
    Defines the filter for assets to exclude when crawling.
    (When using REST API extraction method).

    :param assets: list of databases names to exclude when crawling
    :returns: crawler, set to exclude only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    exclude_assets = assets or []
    to_exclude = self.build_flat_hierarchical_filter(exclude_assets)
    self._parameters.append(
        dict(name="exclude-filter-rest", value=to_exclude or "{}")
    )
    return self
exclude_regex(regex: str) -> DatabricksCrawler

Defines the exclude regex for crawler ignore tables & views based on a naming convention.

:param regex: exclude regex for the crawler :returns: crawler, set to exclude only those assets specified in the regex

Source code in pyatlan/model/packages/databricks_crawler.py
def exclude_regex(self, regex: str) -> DatabricksCrawler:
    """
    Defines the exclude regex for crawler
    ignore tables & views based on a naming convention.

    :param regex: exclude regex for the crawler
    :returns: crawler, set to exclude
    only those assets specified in the regex
    """
    self._parameters.append(dict(name="temp-table-regex", value=regex))
    return self
import_tags(include: bool = False) -> DatabricksCrawler

Whether to import tags from Databricks Unity Catalog to Atlan. Tags attached in Databricks will be automatically attached to your Databricks assets in Atlan. (When using REST API extraction method).

:param include: if True, tags will be imported from Databricks Unity Catalog to Atlan, default: False :returns: crawler, set to whether to import tags from Databricks Unity Catalog to Atlan

Source code in pyatlan/model/packages/databricks_crawler.py
def import_tags(self, include: bool = False) -> DatabricksCrawler:
    """
    Whether to import tags from Databricks Unity Catalog to Atlan.
    Tags attached in Databricks will be automatically attached to your Databricks assets in Atlan.
    (When using REST API extraction method).

    :param include: if True, tags will be imported from Databricks Unity Catalog to Atlan, default: False
    :returns: crawler, set to whether to import tags from Databricks Unity Catalog to Atlan
    """
    self._parameters.append({"name": "enable-tag-sync", "value": include})
    return self
include(assets: dict) -> DatabricksCrawler

Defines the filter for assets to include when crawling.

:param assets: Map keyed by database name with each value being a list of schemas :returns: crawler, set to include only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/databricks_crawler.py
def include(self, assets: dict) -> DatabricksCrawler:
    """
    Defines the filter for assets to include when crawling.

    :param assets: Map keyed by database name with each value being a list of schemas
    :returns: crawler, set to include only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    include_assets = assets or {}
    to_include = self.build_hierarchical_filter(include_assets)
    self._parameters.append(
        dict(dict(name="include-filter", value=to_include or "{}"))
    )
    return self
include_for_rest_api(assets: List[str]) -> DatabricksCrawler

Defines the filter for assets to include when crawling (When using REST API extraction method).

:param assets: list of databases names to include when crawling :returns: crawler, set to include only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/databricks_crawler.py
def include_for_rest_api(self, assets: List[str]) -> DatabricksCrawler:
    """
    Defines the filter for assets to include when crawling
    (When using REST API extraction method).

    :param assets: list of databases names to include when crawling
    :returns: crawler, set to include only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    include_assets = assets or []
    to_include = self.build_flat_hierarchical_filter(include_assets)
    self._parameters.append(
        dict(dict(name="include-filter-rest", value=to_include or "{}"))
    )
    return self
metadata_extraction_method(type: DatabricksCrawler.ExtractionMethod = ExtractionMethod.JDBC) -> DatabricksCrawler

Determines the interface that the package will use to extract metadata from Databricks. JDBC is the recommended method (default). REST API method is supported only by Unity Catalog enabled instances.

:param type: extraction method to use. Defaults to DatabricksCrawler.ExtractionMethod.JDBC

Source code in pyatlan/model/packages/databricks_crawler.py
def metadata_extraction_method(
    self,
    type: DatabricksCrawler.ExtractionMethod = ExtractionMethod.JDBC,
) -> DatabricksCrawler:
    """
    Determines the interface that the package
    will use to extract metadata from Databricks.
    JDBC is the recommended method (`default`).
    REST API method is supported only
    by Unity Catalog enabled instances.

    :param type: extraction method to use.
    Defaults to `DatabricksCrawler.ExtractionMethod.JDBC`
    """
    self._parameters.append({"name": "extract-strategy", "value": type.value})
    return self
pat(access_token: str, sql_warehouse_id: str) -> DatabricksCrawler

Set up the crawler to use PAT authentication.

:param access_token: through which to access Databricks instance :param sql_warehouse_id: ID of the associated SQL warehouse if this data source is backed by a SQL warehouse. eg: 3d939b0cc668be06 ref: https://docs.databricks.com/api/workspace/datasources/list#warehouse_id :returns: crawler, set up to use PAT

Source code in pyatlan/model/packages/databricks_crawler.py
def pat(self, access_token: str, sql_warehouse_id: str) -> DatabricksCrawler:
    """
    Set up the crawler to use PAT authentication.

    :param access_token: through which to access Databricks instance
    :param sql_warehouse_id: ID of the associated SQL warehouse
        if this data source is backed by a SQL warehouse. eg: `3d939b0cc668be06`
        ref: https://docs.databricks.com/api/workspace/datasources/list#warehouse_id
    :returns: crawler, set up to use PAT
    """
    local_creds = {
        "authType": "basic",
        "username": "",
        "password": access_token,
        "connector_type": "dual",
        "extra": {
            "__http_path": f"/sql/1.0/warehouses/{sql_warehouse_id}",
        },
    }
    self._credentials_body.update(local_creds)
    return self
s3(bucket_name: str, bucket_prefix: str, bucket_region: Optional[str] = None) -> DatabricksCrawler

Set up the crawler to extract from S3 bucket.

:param bucket_name: name of the bucket/storage that contains the extracted metadata files :param bucket_prefix: prefix is everything after the bucket/storage name, including the path :param bucket_region: (Optional) name of the region if applicable :returns: crawler, set up to extract from S3 bucket

Source code in pyatlan/model/packages/databricks_crawler.py
def s3(
    self,
    bucket_name: str,
    bucket_prefix: str,
    bucket_region: Optional[str] = None,
) -> DatabricksCrawler:
    """
    Set up the crawler to extract from S3 bucket.

    :param bucket_name: name of the bucket/storage
    that contains the extracted metadata files
    :param bucket_prefix: prefix is everything after the
    bucket/storage name, including the `path`
    :param bucket_region: (Optional) name of the region if applicable
    :returns: crawler, set up to extract from S3 bucket
    """
    self._parameters.append(dict(name="extraction-method", value="s3"))
    self._parameters.append(
        dict(name="offline-extraction-bucket", value=bucket_name)
    )
    self._parameters.append(
        dict(name="offline-extraction-prefix", value=bucket_prefix)
    )
    self._parameters.append(
        dict(name="offline-extraction-region", value=bucket_region)
    )
    return self
sql_warehouse(warehouse_ids: List[str]) -> DatabricksCrawler

Defines the filter for SQL warehouses to include when crawling. (When using REST API extraction method).

:param assets: list of warehose_id to include when crawling eg: [3d939b0cc668be06, 9a289b0cc838ce62] ref: https://docs.databricks.com/api/workspace/datasources/list#warehouse_id :returns: crawler, set to include only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/databricks_crawler.py
def sql_warehouse(self, warehouse_ids: List[str]) -> DatabricksCrawler:
    """
    Defines the filter for SQL warehouses to include when crawling.
    (When using REST API extraction method).

    :param assets: list of `warehose_id` to include when crawling eg: [`3d939b0cc668be06`, `9a289b0cc838ce62`]
    ref: https://docs.databricks.com/api/workspace/datasources/list#warehouse_id
    :returns: crawler, set to include only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    warehouse_ids = warehouse_ids or []
    to_include = self.build_flat_hierarchical_filter(warehouse_ids)
    self._parameters.append(dict(name="sql-warehouse", value=to_include or "{}"))
    return self

DatabricksMiner(connection_qualified_name: str)

Bases: AbstractMiner

Base configuration for a new Databricks miner.

:param connection_qualified_name: unique name of the Databricks connection whose assets should be mined

Source code in pyatlan/model/packages/databricks_miner.py
def __init__(
    self,
    connection_qualified_name: str,
):
    self._advanced_config = False
    super().__init__(connection_qualified_name=connection_qualified_name)
    self._parameters.append(dict(name="calculate-popularity", value=False))
    self._parameters.append(dict(name="popularity-window-days", value=30))
    self._parameters.append(dict(name="miner-start-time-epoch", value=0))
    self._parameters.append(
        dict(
            name="extraction-method-popularity",
            value=self.ExtractionMethod.REST_API.value,
        )
    )
Functions
offline(bucket_name: str, bucket_prefix: str)

Sets up the Databricks miner to use the offline extraction method.

This method sets up the miner to extract data from an S3 bucket by specifying the bucket name and prefix.

:param bucket_name: name of the S3 bucket to extract data from. :param bucket_prefix: prefix within the S3 bucket to narrow the extraction scope. :returns: miner, configured for offline extraction.

Source code in pyatlan/model/packages/databricks_miner.py
def offline(self, bucket_name: str, bucket_prefix: str):
    """
    Sets up the Databricks miner to use the offline extraction method.

    This method sets up the miner to extract data from an S3 bucket by specifying
    the bucket name and prefix.

    :param bucket_name: name of the S3 bucket to extract data from.
    :param bucket_prefix: prefix within the S3 bucket to narrow the extraction scope.
    :returns: miner, configured for offline extraction.
    """
    self._parameters.append(dict(name="extraction-method", value="offline"))
    self._parameters.append(
        dict(name="offline-extraction-bucket", value=bucket_name)
    )
    self._parameters.append(
        dict(name="offline-extraction-prefix", value=bucket_prefix)
    )
    return self
popularity_configuration(start_date: str, extraction_method: DatabricksMiner.ExtractionMethod = ExtractionMethod.REST_API, window_days: Optional[int] = None, excluded_users: Optional[List[str]] = None, warehouse_id: Optional[str] = None) -> DatabricksMiner

Configures the Databricks miner to calculate asset popularity metrics.

This method sets up the miner to fetch query history and calculate popularity metrics based on the specified configuration.

:param start_date: epoch timestamp from which queries will be fetched for calculating popularity. This does not affect lineage generation. :param extraction_method: method used to fetch popularity data. Defaults to ExtractionMethod.REST_API. :param window_days: (Optional) number of days to consider for calculating popularity metrics. :param excluded_users: (Optional) list of usernames to exclude from usage metrics calculations. :param warehouse_id: (Optional) unique identifier of the SQL warehouse to use for popularity calculations. Required if extraction_method is ExtractionMethod.SYSTEM_TABLE. :returns: miner, configured with popularity settings.

Source code in pyatlan/model/packages/databricks_miner.py
def popularity_configuration(
    self,
    start_date: str,
    extraction_method: DatabricksMiner.ExtractionMethod = ExtractionMethod.REST_API,
    window_days: Optional[int] = None,
    excluded_users: Optional[List[str]] = None,
    warehouse_id: Optional[str] = None,
) -> DatabricksMiner:
    """
    Configures the Databricks miner to calculate asset popularity metrics.

    This method sets up the miner to fetch query history and calculate
    popularity metrics based on the specified configuration.

    :param start_date: epoch timestamp from which queries will be fetched
    for calculating popularity. This does not affect lineage generation.
    :param extraction_method: method used to fetch popularity data.
    Defaults to `ExtractionMethod.REST_API`.
    :param window_days: (Optional) number of days to consider for calculating popularity metrics.
    :param excluded_users: (Optional) list of usernames to exclude from usage metrics calculations.
    :param warehouse_id: (Optional) unique identifier of the SQL warehouse to use for
    popularity calculations. Required if `extraction_method` is `ExtractionMethod.SYSTEM_TABLE`.
    :returns: miner, configured with popularity settings.
    """
    excluded_users = excluded_users or []
    config_map = {
        "calculate-popularity": True,
        "extraction-method-popularity": extraction_method.value,
        "miner-start-time-epoch": start_date,
        "popularity-window-days": window_days,
    }
    for param in self._parameters:
        if param["name"] in config_map:
            param["value"] = config_map[param["name"]]
    self._parameters.append(
        dict(name="popularity-exclude-user-config", value=dumps(excluded_users))
    )
    if extraction_method == self.ExtractionMethod.SYSTEM_TABLE:
        self._parameters.append(
            dict(name="sql-warehouse-popularity", value=warehouse_id)
        )
    return self
rest_api()

Sets up the Databricks miner to use the REST API method for fetching lineage.

:returns: miner, configured to use the REST API extraction method from Databricks.

Source code in pyatlan/model/packages/databricks_miner.py
def rest_api(self):
    """
    Sets up the Databricks miner to use the REST API method for fetching lineage.

    :returns: miner, configured to use the REST API extraction method from Databricks.
    """
    self._parameters.append(
        dict(name="extraction-method", value=self.ExtractionMethod.REST_API.value)
    )
    return self
system_table(warehouse_id: str)

Sets up the Databricks miner to use the system table extraction method.

This method sets up the miner to extract data using a specific SQL warehouse by providing its unique ID.

:param warehouse_id: unique identifier of the SQL warehouse to be used for system table extraction. :returns: miner, configured for system table extraction.

Source code in pyatlan/model/packages/databricks_miner.py
def system_table(self, warehouse_id: str):
    """
    Sets up the Databricks miner to use the system table extraction method.

    This method sets up the miner to extract data
    using a specific SQL warehouse by providing its unique ID.

    :param warehouse_id: unique identifier of the SQL
    warehouse to be used for system table extraction.
    :returns: miner, configured for system table extraction.
    """
    self._parameters.append(
        dict(name="extraction-method", value=self.ExtractionMethod.SYSTEM_TABLE)
    )
    self._parameters.append(dict(name="sql-warehouse", value=warehouse_id))
    return self

DbtCrawler(client: AtlanClient, connection_name: str, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None, admin_users: Optional[List[str]] = None, allow_query: bool = False, allow_query_preview: bool = False, row_limit: int = 0)

Bases: AbstractCrawler

Base configuration for a new Dbt crawler.

:param client: connectivity to an Atlan tenant :param connection_name: name for the connection :param admin_roles: admin roles for the connection :param admin_groups: admin groups for the connection :param admin_users: admin users for the connection :param allow_query: allow data to be queried in the connection (True) or not (False), default: False :param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False), default: False :param row_limit: maximum number of rows that can be returned by a query, default: 0

Source code in pyatlan/model/packages/dbt_crawler.py
def __init__(
    self,
    client: AtlanClient,
    connection_name: str,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
    admin_users: Optional[List[str]] = None,
    allow_query: bool = False,
    allow_query_preview: bool = False,
    row_limit: int = 0,
):
    super().__init__(
        client=client,
        connection_name=connection_name,
        connection_type=self._CONNECTOR_TYPE,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
        admin_users=admin_users,
        allow_query=allow_query,
        allow_query_preview=allow_query_preview,
        row_limit=row_limit,
        source_logo=self._PACKAGE_LOGO,
    )
Functions
cloud(service_token: str, hostname: str = 'https://cloud.getdbt.com', multi_tenant: bool = True) -> DbtCrawler

Set up the crawler to extract using dbt Cloud.

:param service_token: token to use to authenticate against dbt :param hostname: of dbt, default: https://cloud.getdbt.com :param multi_tenant: if True, use a multi-tenant cloud config, otherwise a single-tenant cloud config :returns: crawler, set up to extract using dbt Cloud

Source code in pyatlan/model/packages/dbt_crawler.py
def cloud(
    self,
    service_token: str,
    hostname: str = "https://cloud.getdbt.com",
    multi_tenant: bool = True,
) -> DbtCrawler:
    """
    Set up the crawler to extract using dbt Cloud.

    :param service_token: token to use to authenticate against dbt
    :param hostname: of dbt, default: https://cloud.getdbt.com
    :param multi_tenant: if True, use a multi-tenant
    cloud config, otherwise a single-tenant cloud config
    :returns: crawler, set up to extract using dbt Cloud
    """
    local_creds = {
        "name": f"default-{self._NAME}-{self._epoch}-1",
        "host": hostname,
        "port": 443,
        "auth_type": "token",
        "username": "",
        "password": service_token,
        "connector_config_name": f"atlan-connectors-{self._NAME}",
    }
    self._credentials_body.update(local_creds)
    self._parameters.append(dict(name="extraction-method", value="api"))
    self._parameters.append(
        dict(name="deployment-type", value="multi" if multi_tenant else "single")
    )
    self._parameters.append(
        {"name": "api-credential-guid", "value": "{{credentialGuid}}"}
    )
    self._parameters.append(dict(name="control-config-strategy", value="default"))
    return self
core(s3_bucket: str, s3_prefix: str, s3_region: str) -> DbtCrawler

Set up the crawler to extract using dbt Core files in S3.

:param s3_bucket: S3 bucket containing the dbt Core files :param s3_prefix: prefix within the S3 bucket where the dbt Core files are located :param s3_region: S3 region where the bucket is located :returns: crawler, set up to extract using dbt Core files in S3

Source code in pyatlan/model/packages/dbt_crawler.py
def core(self, s3_bucket: str, s3_prefix: str, s3_region: str) -> DbtCrawler:
    """
    Set up the crawler to extract using dbt Core files in S3.

    :param s3_bucket: S3 bucket containing the dbt Core files
    :param s3_prefix: prefix within the S3 bucket where the dbt Core files are located
    :param s3_region: S3 region where the bucket is located
    :returns: crawler, set up to extract using dbt Core files in S3
    """
    self._parameters.append(dict(name="extraction-method", value="core"))
    self._parameters.append(dict(name="deployment-type", value="single"))
    self._parameters.append(dict(name="core-extraction-s3-bucket", value=s3_bucket))
    self._parameters.append(dict(name="core-extraction-s3-prefix", value=s3_prefix))
    self._parameters.append(dict(name="core-extraction-s3-region", value=s3_region))
    return self
enrich_materialized_assets(enabled: bool = False) -> DbtCrawler

Whether to enable the enrichment of materialized SQL assets as part of crawling dbt.

:param enabled: if True, any assets that dbt materializes will also be enriched with details from dbt, default: False :returns: crawler, set up to include or exclude enrichment of materialized assets

Source code in pyatlan/model/packages/dbt_crawler.py
def enrich_materialized_assets(self, enabled: bool = False) -> DbtCrawler:
    """
    Whether to enable the enrichment of
    materialized SQL assets as part of crawling dbt.

    :param enabled: if True, any assets that dbt materializes
    will also be enriched with details from dbt, default: False
    :returns: crawler, set up to include
    or exclude enrichment of materialized assets
    """
    self._parameters.append(
        {
            "name": "enrich-materialised-sql-assets",
            "value": "true" if enabled else "false",
        }
    )
    return self
exclude(filter: str = '') -> DbtCrawler

Defines the filter for assets to exclude when crawling.

:param filter: for dbt Core provide a wildcard expression and for dbt Cloud provide a string-encoded map :return: the builder, set to exclude only those assets specified

Source code in pyatlan/model/packages/dbt_crawler.py
def exclude(self, filter: str = "") -> DbtCrawler:
    """
    Defines the filter for assets to exclude when crawling.

    :param filter: for dbt Core provide a wildcard
    expression and for dbt Cloud provide a string-encoded map
    :return: the builder, set to exclude only those assets specified
    """
    self._parameters.append(
        dict(name="exclude-filter", value=filter if filter else "{}")
    )
    self._parameters.append(
        dict(name="exclude-filter-core", value=filter if filter else "*")
    )
    return self
include(filter: str = '') -> DbtCrawler

Defines the filter for assets to include when crawling.

:param filter: for dbt Core provide a wildcard expression and for dbt Cloud provide a string-encoded map :returns: crawler, set to include only those assets specified

Source code in pyatlan/model/packages/dbt_crawler.py
def include(self, filter: str = "") -> DbtCrawler:
    """
    Defines the filter for assets to include when crawling.

    :param filter: for dbt Core provide a wildcard
    expression and for dbt Cloud provide a string-encoded map
    :returns: crawler, set to include only those assets specified
    """
    self._parameters.append(
        dict(name="include-filter", value=filter if filter else "{}")
    )
    self._parameters.append(
        dict(name="include-filter-core", value=filter if filter else "*")
    )
    return self
limit_to_connection(connection_qualified_name: str) -> DbtCrawler

Limit the crawling to a single connection's assets. If not specified, crawling will be attempted across all connection's assets.

:param connection_qualified_name: unique name of the connection for whose assets to limit crawling :returns: crawler, set to limit crawling to only those assets in the specified connection

Source code in pyatlan/model/packages/dbt_crawler.py
def limit_to_connection(self, connection_qualified_name: str) -> DbtCrawler:
    """
    Limit the crawling to a single connection's assets.
    If not specified, crawling will be
    attempted across all connection's assets.

    :param connection_qualified_name: unique name
    of the connection for whose assets to limit crawling
    :returns: crawler, set to limit crawling
    to only those assets in the specified connection
    """
    self._parameters.append(
        {
            "name": "connection-qualified-name",
            "value": connection_qualified_name,
        }
    )
    return self
tags(include: bool = False) -> DbtCrawler

Whether to enable dbt tag syncing as part of crawling dbt.

:param include: if True, tags in dbt will be included while crawling dbt, default: False :returns: crawler, set to include or exclude dbt tags

Source code in pyatlan/model/packages/dbt_crawler.py
def tags(self, include: bool = False) -> DbtCrawler:
    """
    Whether to enable dbt tag syncing as part of crawling dbt.

    :param include: if True, tags in dbt will
    be included while crawling dbt, default: False
    :returns: crawler, set to include or exclude dbt tags
    """
    self._parameters.append(
        {
            "name": "enable-dbt-tagsync",
            "value": "true" if include else "false",
        }
    )
    return self

DynamoDBCrawler(client: AtlanClient, connection_name: str, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None, admin_users: Optional[List[str]] = None, allow_query: bool = True, allow_query_preview: bool = True, row_limit: int = 10000)

Bases: AbstractCrawler

Base configuration for a new Amazon DynamoDB crawler.

:param client: connectivity to an Atlan tenant :param connection_name: name for the connection :param admin_roles: admin roles for the connection :param admin_groups: admin groups for the connection :param admin_users: admin users for the connection :param allow_query: allow data to be queried in the connection (True) or not (False), default: True :param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False), default: True :param row_limit: maximum number of rows that can be returned by a query, default: 10000

Source code in pyatlan/model/packages/dynamo_d_b_crawler.py
def __init__(
    self,
    client: AtlanClient,
    connection_name: str,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
    admin_users: Optional[List[str]] = None,
    allow_query: bool = True,
    allow_query_preview: bool = True,
    row_limit: int = 10000,
):
    super().__init__(
        client=client,
        connection_name=connection_name,
        connection_type=self._CONNECTOR_TYPE,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
        admin_users=admin_users,
        allow_query=allow_query,
        allow_query_preview=allow_query_preview,
        row_limit=row_limit,
        source_logo=self._PACKAGE_LOGO,
    )
Functions
direct(region: str) -> DynamoDBCrawler

Set up the crawler to extract directly from the DynamoDB.

:param region: AWS region where database is set up :returns: crawler, set up to extract directly from DynamoDB

Source code in pyatlan/model/packages/dynamo_d_b_crawler.py
def direct(
    self,
    region: str,
) -> DynamoDBCrawler:
    """
    Set up the crawler to extract directly from the DynamoDB.

    :param region: AWS region where database is set up
    :returns: crawler, set up to extract directly from DynamoDB
    """
    local_creds = {
        "name": f"default-{self._NAME}-{self._epoch}-0",
        "extra": {"region": region},
        "connector_config_name": f"atlan-connectors-{self._NAME}",
    }
    self._credentials_body.update(local_creds)
    self._parameters.append(dict(name="extraction-method", value="direct"))
    return self
exclude_regex(regex: str) -> DynamoDBCrawler

Defines the regex of tables to ignore. By default, nothing will be excluded. This takes priority over include regex.

:param regex: exclude regex for the crawler :returns: crawler, set to exclude only those assets specified in the regex

Source code in pyatlan/model/packages/dynamo_d_b_crawler.py
def exclude_regex(self, regex: str) -> DynamoDBCrawler:
    """
    Defines the regex of tables to ignore.
    By default, nothing will be excluded.
    This takes priority over include regex.

    :param regex: exclude regex for the crawler
    :returns: crawler, set to exclude
    only those assets specified in the regex
    """
    self._parameters.append(dict(name="exclude-filter", value=regex))
    return self
iam_role_auth(arn: str, external_id: str) -> DynamoDBCrawler

Set up the crawler to use IAM role-based authentication.

:param arn: ARN of the AWS role :param external_id: AWS external ID :returns: crawler, set up to use IAM user role-based authentication

Source code in pyatlan/model/packages/dynamo_d_b_crawler.py
def iam_role_auth(self, arn: str, external_id: str) -> DynamoDBCrawler:
    """
    Set up the crawler to use IAM role-based authentication.

    :param arn: ARN of the AWS role
    :param external_id: AWS external ID
    :returns: crawler, set up to use IAM user role-based authentication
    """
    local_creds = {
        "auth_type": "role",
        "connector_type": "sdk",
    }
    self._credentials_body["extra"].update(
        {"aws_role_arn": arn, "aws_external_id": external_id}
    )
    self._credentials_body.update(local_creds)
    return self
iam_user_auth(access_key: str, secret_key: str) -> DynamoDBCrawler

Set up the crawler to use IAM user-based authentication.

:param access_key: through which to access DynamoDB :param secret_key: through which to access DynamoDB :returns: crawler, set up to use IAM user-based authentication

Source code in pyatlan/model/packages/dynamo_d_b_crawler.py
def iam_user_auth(self, access_key: str, secret_key: str) -> DynamoDBCrawler:
    """
    Set up the crawler to use IAM user-based authentication.

    :param access_key: through which to access DynamoDB
    :param secret_key: through which to access DynamoDB
    :returns: crawler, set up to use IAM user-based authentication
    """
    local_creds = {
        "auth_type": "iam",
        "username": access_key,
        "password": secret_key,
    }
    self._credentials_body.update(local_creds)
    return self
include_regex(regex: str) -> DynamoDBCrawler

Defines the regex of tables to include. By default, everything will be included.

:param regex: exclude regex for the crawler :returns: crawler, set to include only those assets specified in the regex

Source code in pyatlan/model/packages/dynamo_d_b_crawler.py
def include_regex(self, regex: str) -> DynamoDBCrawler:
    """
    Defines the regex of tables to include.
    By default, everything will be included.

    :param regex: exclude regex for the crawler
    :returns: crawler, set to include
    only those assets specified in the regex
    """
    self._parameters.append(dict(name="include-filter", value=regex))
    return self

GlueCrawler(client: AtlanClient, connection_name: str, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None, admin_users: Optional[List[str]] = None, allow_query: bool = False, allow_query_preview: bool = False, row_limit: int = 0)

Bases: AbstractCrawler

Base configuration for a new Glue crawler.

:param client: connectivity to an Atlan tenant :param connection_name: name for the connection :param admin_roles: admin roles for the connection :param admin_groups: admin groups for the connection :param admin_users: admin users for the connection :param allow_query: allow data to be queried in the connection (True) or not (False), default: False :param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False), default: False :param row_limit: maximum number of rows that can be returned by a query, default: 0

Source code in pyatlan/model/packages/glue_crawler.py
def __init__(
    self,
    client: AtlanClient,
    connection_name: str,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
    admin_users: Optional[List[str]] = None,
    allow_query: bool = False,
    allow_query_preview: bool = False,
    row_limit: int = 0,
):
    super().__init__(
        client=client,
        connection_name=connection_name,
        connection_type=self._CONNECTOR_TYPE,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
        admin_users=admin_users,
        allow_query=allow_query,
        allow_query_preview=allow_query_preview,
        row_limit=row_limit,
        source_logo=self._PACKAGE_LOGO,
    )
Functions
direct(region: str) -> GlueCrawler

Set up the crawler to extract directly from Glue.

:param region: AWS region where Glue is set up :returns: crawler, set up to extract directly from Glue

Source code in pyatlan/model/packages/glue_crawler.py
def direct(
    self,
    region: str,
) -> GlueCrawler:
    """
    Set up the crawler to extract directly from Glue.

    :param region: AWS region where Glue is set up
    :returns: crawler, set up to extract directly from Glue
    """
    local_creds = {
        "name": f"default-{self._NAME}-{self._epoch}-0",
        "extra": {"region": region},
        "connector_config_name": f"atlan-connectors-{self._NAME}",
    }
    self._credentials_body.update(local_creds)
    return self
exclude(assets: List[str]) -> GlueCrawler

Defines the filter for assets to exclude when crawling.

:param assets: list of schema names to exclude when crawling :returns: crawler, set to exclude only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/glue_crawler.py
def exclude(self, assets: List[str]) -> GlueCrawler:
    """
    Defines the filter for assets to exclude when crawling.

    :param assets: list of schema names to exclude when crawling
    :returns: crawler, set to exclude only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    self._build_asset_filter("exclude", assets)
    return self
iam_user_auth(access_key: str, secret_key: str) -> GlueCrawler

Set up the crawler to use IAM user-based authentication.

:param access_key: through which to access Glue :param secret_key: through which to access Glue :returns: crawler, set up to use IAM user-based authentication

Source code in pyatlan/model/packages/glue_crawler.py
def iam_user_auth(self, access_key: str, secret_key: str) -> GlueCrawler:
    """
    Set up the crawler to use IAM user-based authentication.

    :param access_key: through which to access Glue
    :param secret_key: through which to access Glue
    :returns: crawler, set up to use IAM user-based authentication
    """
    local_creds = {
        "auth_type": "iam",
        "username": access_key,
        "password": secret_key,
    }
    self._credentials_body.update(local_creds)
    return self
include(assets: List[str]) -> GlueCrawler

Defines the filter for assets to include when crawling.

:param assets: list of schema names to include when crawling :returns: crawler, set to include only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/glue_crawler.py
def include(self, assets: List[str]) -> GlueCrawler:
    """
    Defines the filter for assets to include when crawling.

    :param assets: list of schema names to include when crawling
    :returns: crawler, set to include only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    self._build_asset_filter("include", assets)
    return self

LineageBuilder()

Bases: AbstractCustomPackage

Base configuration for a new lineage builder package.

Source code in pyatlan/model/packages/base/custom_package.py
def __init__(
    self,
):
    super().__init__()
    self._epoch = int(utils.get_epoch_timestamp())
Functions
adls(client_id: str, client_secret: str, tenant_id: str, account_name: str, container: str) -> LineageBuilder

Set up package to import lineage details from ADLS.

:param client_id: unique application (client) ID assigned by Azure AD when the app was registered :param client_secret: client secret for authentication :param tenant_id: unique ID of the Azure Active Directory instance :param account_name: name of the storage account :param container: container to retrieve object store objects from

:returns: package, set up to import lineage details from ADLS

Source code in pyatlan/model/packages/lineage_builder.py
def adls(
    self,
    client_id: str,
    client_secret: str,
    tenant_id: str,
    account_name: str,
    container: str,
) -> LineageBuilder:
    """
    Set up package to import lineage details from ADLS.

    :param client_id: unique application (client) ID assigned by Azure AD when the app was registered
    :param client_secret: client secret for authentication
    :param tenant_id: unique ID of the Azure Active Directory instance
    :param account_name: name of the storage account
    :param container: container to retrieve object store objects from

    :returns: package, set up to import lineage details from ADLS
    """
    local_creds = {
        "name": f"csa-{self._NAME}-{self._epoch}-0",
        "auth_type": "adls",
        "username": client_id,
        "password": client_secret,
        "extra": {
            "azure_tenant_id": tenant_id,
            "storage_account_name": account_name,
            "adls_container": container,
        },
        "connector_config_name": "csa-connectors-objectstore",
    }
    self._credentials_body.update(local_creds)
    return self
gcs(project_id: str, service_account_json: str, bucket: str) -> LineageBuilder

Set up package to import lineage details from GCS.

:param project_id: ID of GCP project :param service_account_json: service account credentials in JSON format :param bucket: bucket to retrieve object store object from

:returns: Package set up to import lineage details from GCS

Source code in pyatlan/model/packages/lineage_builder.py
def gcs(
    self, project_id: str, service_account_json: str, bucket: str
) -> LineageBuilder:
    """
    Set up package to import lineage details from GCS.

    :param project_id: ID of GCP project
    :param service_account_json: service account credentials in JSON format
    :param bucket: bucket to retrieve object store object from

    :returns: Package set up to import lineage details from GCS
    """
    local_creds = {
        "name": f"csa-{self._NAME}-{self._epoch}-0",
        "auth_type": "gcs",
        "username": project_id,
        "password": service_account_json,
        "extra": {
            "gcs_bucket": bucket,
        },
        "connector_config_name": "csa-connectors-objectstore",
    }
    self._credentials_body.update(local_creds)
    return self
object_store(prefix: str, object_key: str) -> LineageBuilder

Set up the package to retrieve the lineage file from cloud object storage.

:param prefix: directory (path) within the object store from which to retrieve the file containing asset metadata :param object_key: object key (filename), including its extension, within the object store and prefix

:returns: package, set up to import lineage details from the object store

Source code in pyatlan/model/packages/lineage_builder.py
def object_store(
    self,
    prefix: str,
    object_key: str,
) -> LineageBuilder:
    """
    Set up the package to retrieve the lineage file from cloud object storage.

    :param prefix: directory (path) within the object store from
        which to retrieve the file containing asset metadata
    :param object_key: object key (filename),
        including its extension, within the object store and prefix

    :returns: package, set up to import lineage details from the object store
    """
    self._parameters.append({"name": "lineage_prefix", "value": prefix})
    self._parameters.append({"name": "lineage_key", "value": object_key})
    self._parameters.append({"name": "lineage_import_type", "value": "CLOUD"})
    self._parameters.append({"name": "cloud_source", "value": "{{credentialGuid}}"})
    return self
options(input_handling: AssetInputHandling = AssetInputHandling.PARTIAL, fail_on_errors: Optional[bool] = None, case_sensitive_match: Optional[bool] = None, field_separator: Optional[str] = None, batch_size: Optional[int] = None) -> LineageBuilder

Set up the lineage builder with the specified options.

:param input_handling: specifies whether to allow the creation of new assets from the input CSV (full or partial assets) or only update existing (skip) assets in Atlan. :param fail_on_errors: specifies whether an invalid value in a field should cause the import to fail (True) or log a warning, skip that value, and proceed (False). :param case_sensitive_match: indicates whether to use case-sensitive matching when running in update-only mode (True) or to try case-insensitive matching (False). :param field_separator: character used to separate fields in the input file (e.g., ',' or ';'). :param batch_size: maximum number of rows to process at a time (per API request).

:returns: package, configured to import assets with advanced configuration.

Source code in pyatlan/model/packages/lineage_builder.py
def options(
    self,
    input_handling: AssetInputHandling = AssetInputHandling.PARTIAL,
    fail_on_errors: Optional[bool] = None,
    case_sensitive_match: Optional[bool] = None,
    field_separator: Optional[str] = None,
    batch_size: Optional[int] = None,
) -> LineageBuilder:
    """
    Set up the lineage builder with the specified options.

    :param input_handling: specifies whether to allow the creation
        of new assets from the input CSV (full or partial assets)
        or only update existing (skip) assets in Atlan.
    :param fail_on_errors: specifies whether an invalid value
        in a field should cause the import to fail (`True`) or
        log a warning, skip that value, and proceed (`False`).
    :param case_sensitive_match: indicates whether to use
        case-sensitive matching when running in update-only mode (`True`)
        or to try case-insensitive matching (`False`).
    :param field_separator: character used to separate
        fields in the input file (e.g., ',' or ';').
    :param batch_size: maximum number of rows
        to process at a time (per API request).

    :returns: package, configured to import
        assets with advanced configuration.
    """
    params = {
        "lineage_upsert_semantic": input_handling,
        "lineage_fail_on_errors": fail_on_errors,
        "lineage_case_sensitive": case_sensitive_match,
        "field_separator": field_separator,
        "batch_size": batch_size,
    }
    self._add_optional_params(params)
    return self
s3(access_key: str, secret_key: str, region: str, bucket: str) -> LineageBuilder

Set up package to import lineage details from S3.

:param access_key: AWS access key :param secret_key: AWS secret key :param region: AWS region :param bucket: bucket to retrieve object store object from

:returns: package, set up to import lineage details from S3

Source code in pyatlan/model/packages/lineage_builder.py
def s3(
    self,
    access_key: str,
    secret_key: str,
    region: str,
    bucket: str,
) -> LineageBuilder:
    """
    Set up package to import lineage details from S3.

    :param access_key: AWS access key
    :param secret_key: AWS secret key
    :param region: AWS region
    :param bucket: bucket to retrieve object store object from

    :returns: package, set up to import lineage details from S3
    """
    local_creds = {
        "name": f"csa-{self._NAME}-{self._epoch}-0",
        "auth_type": "s3",
        "username": access_key,
        "password": secret_key,
        "extra": {
            "region": region,
            "s3_bucket": bucket,
        },
        "connector_config_name": "csa-connectors-objectstore",
    }
    self._credentials_body.update(local_creds)
    return self

LineageGenerator()

Bases: AbstractCustomPackage

Base configuration for a new lineage generator package.

Source code in pyatlan/model/packages/base/custom_package.py
def __init__(
    self,
):
    super().__init__()
    self._epoch = int(utils.get_epoch_timestamp())
Functions
config(source_asset_type: SourceAssetType, source_qualified_name: str, target_asset_type: TargetAssetType, target_qualified_name: str, case_sensitive_match: bool = False, match_on_schema: bool = False, output_type: OutputType = OutputType.PREVIEW, generate_on_child_assets: bool = False, regex_match: Optional[str] = None, regex_replace: Optional[str] = None, regex_match_schema: Optional[str] = None, regex_replace_schema: Optional[str] = None, regex_match_schema_name: Optional[str] = None, regex_replace_schema_name: Optional[str] = None, match_prefix: Optional[str] = None, match_suffix: Optional[str] = None, file_advanced_seperator: Optional[str] = None, file_advanced_position: Optional[str] = None, process_connection_qn: Optional[str] = None) -> LineageGenerator

Set up the lineage generator with the specified configuration.

:param source_asset_type: type name of the lineage input assets (sources). :param source_qualified_name: qualified name prefix of the lineage input assets (sources). :param target_asset_type: type name of the lineage output assets (targets). :param target_qualified_name: qualified name prefix of the lineage output assets (targets). :param case_sensitive_match: whether to match asset names using a case sensitive logic, default: False :param match_on_schema: whether to include the schema name to match source and target assets, default: False. If one of "Source asset type" or "Target asset type" is not a relational type (Table, View, Materialized View, Calculation View or Column) or a MongoDB Collection the option is ignored, default: False :param output_type: default to Preview lineage - Preview lineage: to generate a csv with the lineage preview. - Generate lineage: to generate the lineage on Atlan. - Delete lineage: to delete the lineage on Atlan.

:param generate_on_child_assets: whether to generate the lineage on the child assets specified on Source asset type and Target asset type, default: False. :param regex_match (optional): if there is a re-naming happening between the source and the target that can be identified by a regex pattern, use this field to identify the characters to be replaced. :param regex_replace (optional): if there is a re-naming happening between the source and the target that can be identified by a regex pattern, use this field to specify the replacements characters. :param regex_match_schema (optional): if there is a re-naming happening between the source and the target schema that can be identified by a regex pattern, use this field to identify the characters to be replaced. Applicable only if match_on_schema is False. :param regex_replace_schema (optional): if there is a re-naming happening between the source and the target schema that can be identified by a regex pattern, use this field to specify the replacements characters. Applicable only if match_on_schema is True. :param regex_match_schema_name (optional): if there is a re-naming happening between the source and the target name + schema that can be identified by a regex pattern, use this field to identify the characters to be replaced. Applicable only if match_on_schema is True. It overrides any other regex defined. :param regex_replace_schema_name (optional): if there is a re-naming happening between the source and the target name + schema that can be identified by a regex pattern, use this field to specify the replacements characters. Applicable only if match_on_schema is True. It overrides any other regex defined. :param match_prefix (optional): prefix to add to source assets to match with target ones. :param match_suffix (optional): suffix to add to source assets to match with target ones. :param file_advanced_seperator (optional): sepator used to split the qualified name. It's applicable to file based assets only. eg: if the separator is equal to /: default/s3/1707397085/arn:aws:s3:::mybucket/prefix/myobject.csv -> [default,s3,1707397085,arn:aws:s3:::mybucket,prefix,myobject.csv] :param file_advanced_position (optional): number of substrings (created using "File advanced separator") to use for the asset match. The count is from right to left. It's applicable to file based assets only. In the above example if the value is equal to 3 -> [arn:aws:s3:::mybucket,prefix,myobject.csv] :param process_connection_qn (optional): connection for the process assets. If blank the process assets will be assigned to the source assets connection.

:returns: package, set up lineage generator with the specified configuration.

Source code in pyatlan/model/packages/lineage_generator_nt.py
def config(
    self,
    source_asset_type: SourceAssetType,
    source_qualified_name: str,
    target_asset_type: TargetAssetType,
    target_qualified_name: str,
    case_sensitive_match: bool = False,
    match_on_schema: bool = False,
    output_type: OutputType = OutputType.PREVIEW,
    generate_on_child_assets: bool = False,
    regex_match: Optional[str] = None,
    regex_replace: Optional[str] = None,
    regex_match_schema: Optional[str] = None,
    regex_replace_schema: Optional[str] = None,
    regex_match_schema_name: Optional[str] = None,
    regex_replace_schema_name: Optional[str] = None,
    match_prefix: Optional[str] = None,
    match_suffix: Optional[str] = None,
    file_advanced_seperator: Optional[str] = None,
    file_advanced_position: Optional[str] = None,
    process_connection_qn: Optional[str] = None,
) -> LineageGenerator:
    """
    Set up the lineage generator with the specified configuration.

    :param source_asset_type: type name of the lineage input assets (sources).
    :param source_qualified_name: qualified name prefix of the lineage input assets (sources).
    :param target_asset_type: type name of the lineage output assets (targets).
    :param target_qualified_name: qualified name prefix of the lineage output assets (targets).
    :param case_sensitive_match: whether to match asset names using a case sensitive logic, default: `False`
    :param match_on_schema: whether to include the schema name to match source and target assets, default: `False`.
    If one of `"Source asset type"` or `"Target asset type"`
    is not a relational type (`Table`, `View`, `Materialized View`,
    `Calculation View` or `Column`) or a `MongoDB Collection` the option is ignored, default: `False`
    :param output_type: default to `Preview` lineage
        - `Preview` lineage: to generate a csv with the lineage preview.
        - `Generate` lineage: to generate the lineage on Atlan.
        - `Delete` lineage: to delete the lineage on Atlan.

    :param generate_on_child_assets: whether to generate the lineage on the
    child assets specified on `Source` asset type and `Target` asset type, default: `False`.
    :param regex_match (optional): if there is a re-naming happening between
    the source and the target that can be identified by a regex pattern,
    use this field to identify the characters to be replaced.
    :param regex_replace (optional): if there is a re-naming happening between the source
    and the target that can be identified by a regex pattern, use this field to specify the replacements characters.
    :param regex_match_schema (optional): if there is a re-naming happening between
    the source and the target schema that can be identified by a regex pattern,
    use this field to identify the characters to be replaced. Applicable only if `match_on_schema` is `False`.
    :param regex_replace_schema (optional): if there is a re-naming happening between
    the source and the target schema that can be identified by a regex pattern,
    use this field to specify the replacements characters. Applicable only if `match_on_schema` is `True`.
    :param regex_match_schema_name (optional): if there is a re-naming happening between
    the source and the target name + schema that can be identified by a regex pattern,
    use this field to identify the characters to be replaced. Applicable only if `match_on_schema`
    is `True`. It overrides any other regex defined.
    :param regex_replace_schema_name (optional): if there is a re-naming happening between
    the source and the target name + schema that can be identified by a regex pattern, use this
    field to specify the replacements characters. Applicable only if `match_on_schema` is `True`.
    It overrides any other regex defined.
    :param match_prefix (optional): prefix to add to source assets to match with target ones.
    :param match_suffix (optional): suffix to add to source assets to match with target ones.
    :param file_advanced_seperator (optional): sepator used to split the qualified name.
    It's applicable to file based assets only. eg: if the separator is equal to
    `/`: `default/s3/1707397085/arn:aws:s3:::mybucket/prefix/myobject.csv`
    -> [`default,s3,1707397085,arn:aws:s3:::mybucket,prefix,myobject.csv`]
    :param file_advanced_position (optional): number of substrings (created using "File advanced separator")
    to use for the asset match. The count is from right to left. It's applicable to file based assets only.
    In the above example if the value is equal to `3` -> [`arn:aws:s3:::mybucket,prefix,myobject.csv`]
    :param process_connection_qn (optional): connection for the process assets.
    If blank the process assets will be assigned to the source assets connection.

    :returns: package, set up lineage generator with the specified configuration.
    """
    params = {
        "source-asset-type": source_asset_type.value,
        "source-qualified-name-prefix": source_qualified_name,
        "target-asset-type": target_asset_type.value,
        "target-qualified-name-prefix": target_qualified_name,
        "case-sensitive": "yes" if case_sensitive_match else "no",
        "match-on-schema": "yes" if match_on_schema else "no",
        "output-option": output_type.value,
        "child-lineage": "yes" if generate_on_child_assets else "no",
        "regex-match": regex_match,
        "regex-replace": regex_replace,
        "regex-match-schema": regex_match_schema,
        "regex-replace-schema": regex_replace_schema,
        "regex-match-schema-name": regex_match_schema_name,
        "regex-replace-schema-name": regex_replace_schema_name,
        "name-prefix": match_prefix,
        "name-suffix": match_suffix,
        "file-advanced-separator": file_advanced_seperator,
        "file-advanced-positions": file_advanced_position,
        "connection-qualified-name": process_connection_qn,
    }
    self._add_optional_params(params)
    return self

MongoDBCrawler(client: AtlanClient, connection_name: str, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None, admin_users: Optional[List[str]] = None, allow_query: bool = True, allow_query_preview: bool = True, row_limit: int = 10000)

Bases: AbstractCrawler

Base configuration for a new MongoDB crawler.

:param client: connectivity to an Atlan tenant :param connection_name: name for the connection :param admin_roles: admin roles for the connection :param admin_groups: admin groups for the connection :param admin_users: admin users for the connection :param allow_query: allow data to be queried in the connection (True) or not (False), default: True :param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False), default: True :param row_limit: maximum number of rows that can be returned by a query, default: 10000

Source code in pyatlan/model/packages/mongodb_crawler.py
def __init__(
    self,
    client: AtlanClient,
    connection_name: str,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
    admin_users: Optional[List[str]] = None,
    allow_query: bool = True,
    allow_query_preview: bool = True,
    row_limit: int = 10000,
):
    super().__init__(
        client=client,
        connection_name=connection_name,
        connection_type=self._CONNECTOR_TYPE,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
        admin_users=admin_users,
        allow_query=allow_query,
        allow_query_preview=allow_query_preview,
        row_limit=row_limit,
        source_logo=self._PACKAGE_LOGO,
    )
Functions
basic_auth(username: str, password: str, native_host: str, default_db: str, auth_db: str = 'admin', is_ssl: bool = True) -> MongoDBCrawler

Set up the crawler to use basic authentication.

:param username: through which to access Atlas SQL connection. :param password: through which to access Atlas SQL connection. :param native_host: native host address for the MongoDB connection. :param default_db: default database to connect to. :param auth_db: authentication database to use (default is "admin"). :param is_ssl: whether to use SSL for the connection (default is True). :returns: crawler, set up to use basic authentication

Source code in pyatlan/model/packages/mongodb_crawler.py
def basic_auth(
    self,
    username: str,
    password: str,
    native_host: str,
    default_db: str,
    auth_db: str = "admin",
    is_ssl: bool = True,
) -> MongoDBCrawler:
    """
    Set up the crawler to use basic authentication.

    :param username: through which to access Atlas SQL connection.
    :param password: through which to access Atlas SQL connection.
    :param native_host: native host address for the MongoDB connection.
    :param default_db: default database to connect to.
    :param auth_db: authentication database to use (default is `"admin"`).
    :param is_ssl: whether to use SSL for the connection (default is `True`).
    :returns: crawler, set up to use basic authentication
    """
    local_creds = {
        "authType": "basic",
        "username": username,
        "password": password,
        "extra": {
            "native-host": native_host,
            "default-database": default_db,
            "authsource": auth_db,
            "ssl": is_ssl,
        },
    }
    self._credentials_body.update(local_creds)
    return self
direct(hostname: str, port: int = 27017) -> MongoDBCrawler

Set up the crawler to extract directly from the MongoDB Atlas.

:param hostname: hostname of the Atlas SQL connection :param port: port number of the Atlas SQL connection. default: 27017 :returns: crawler, set up to extract directly from the Atlas SQL connection

Source code in pyatlan/model/packages/mongodb_crawler.py
def direct(self, hostname: str, port: int = 27017) -> MongoDBCrawler:
    """
    Set up the crawler to extract directly from the MongoDB Atlas.

    :param hostname: hostname of the Atlas SQL connection
    :param port: port number of the Atlas SQL connection. default: `27017`
    :returns: crawler, set up to extract directly from the Atlas SQL connection
    """
    local_creds = {
        "name": f"default-{self._NAME}-{self._epoch}-0",
        "host": hostname,
        "port": port,
        "connector_config_name": f"atlan-connectors-{self._NAME}",
    }
    self._credentials_body.update(local_creds)
    self._parameters.append(dict(name="extraction-method", value="direct"))
    return self
exclude(assets: List[str]) -> MongoDBCrawler

Defines the filter for assets to exclude when crawling.

:param assets: list of databases names to exclude when crawling :returns: crawler, set to exclude only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/mongodb_crawler.py
def exclude(self, assets: List[str]) -> MongoDBCrawler:
    """
    Defines the filter for assets to exclude when crawling.

    :param assets: list of databases names to exclude when crawling
    :returns: crawler, set to exclude only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    assets = assets or []
    exclude_assets: Dict[str, List[str]] = {asset: [] for asset in assets}
    to_exclude = self.build_hierarchical_filter(exclude_assets)
    self._parameters.append(dict(name="exclude-filter", value=to_exclude or "{}"))
    return self
exclude_regex(regex: str) -> MongoDBCrawler

Defines the exclude regex for crawler ignore collections based on a naming convention.

:param regex: exclude regex for the crawler :returns: crawler, set to exclude only those assets specified in the regex

Source code in pyatlan/model/packages/mongodb_crawler.py
def exclude_regex(self, regex: str) -> MongoDBCrawler:
    """
    Defines the exclude regex for crawler
    ignore collections based on a naming convention.

    :param regex: exclude regex for the crawler
    :returns: crawler, set to exclude
    only those assets specified in the regex
    """
    self._parameters.append(dict(name="temp-table-regex", value=regex))
    return self
include(assets: List[str]) -> MongoDBCrawler

Defines the filter for assets to include when crawling.

:param assets: list of databases names to include when crawling :returns: crawler, set to include only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/mongodb_crawler.py
def include(self, assets: List[str]) -> MongoDBCrawler:
    """
    Defines the filter for assets to include when crawling.

    :param assets: list of databases names to include when crawling
    :returns: crawler, set to include only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    assets = assets or []
    include_assets: Dict[str, List[str]] = {asset: [] for asset in assets}
    to_include = self.build_hierarchical_filter(include_assets)
    self._parameters.append(
        dict(dict(name="include-filter", value=to_include or "{}"))
    )
    return self

OracleCrawler(client: AtlanClient, connection_name: str, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None, admin_users: Optional[List[str]] = None, allow_query: bool = True, allow_query_preview: bool = True, row_limit: int = 10000)

Bases: AbstractCrawler

Base configuration for a new Oracle crawler.

:param client: connectivity to an Atlan tenant :param connection_name: name for the connection :param admin_roles: admin roles for the connection :param admin_groups: admin groups for the connection :param admin_users: admin users for the connection :param allow_query: allow data to be queried in the connection (True) or not (False), default: True :param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False), default: True :param row_limit: maximum number of rows that can be returned by a query, default: 10000

Source code in pyatlan/model/packages/oracle_crawler.py
def __init__(
    self,
    client: AtlanClient,
    connection_name: str,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
    admin_users: Optional[List[str]] = None,
    allow_query: bool = True,
    allow_query_preview: bool = True,
    row_limit: int = 10000,
):
    self._advanced_config = False
    self._agent_config = False
    super().__init__(
        client=client,
        connection_name=connection_name,
        connection_type=self._CONNECTOR_TYPE,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
        admin_users=admin_users,
        allow_query=allow_query,
        allow_query_preview=allow_query_preview,
        row_limit=row_limit,
        source_logo=self._PACKAGE_LOGO,
    )
Functions
agent_config(hostname: str, default_db_name: str, sid: str, agent_name: str, port: int = 1521, user_env_var: Optional[str] = None, password_env_var: Optional[str] = None, secret_store: SecretStore = SecretStore.CUSTOM, auth_type: AuthType = AuthType.BASIC, aws_region: str = 'us-east-1', aws_auth_method: AwsAuthMethod = AwsAuthMethod.IAM, azure_auth_method: AzureAuthMethod = AzureAuthMethod.MANAGED_IDENTITY, secret_path: Optional[str] = None, principal: Optional[str] = None, azure_vault_name: Optional[str] = None, agent_custom_config: Optional[Dict[str, Any]] = None) -> OracleCrawler

Configure the agent for Oracle extraction.

:param hostname: host address of the Oracle instance. :param default_db_name: default database name. :param sid: SID (system identifier) of the Oracle instance. :param agent_name: name of the agent. :param secret_store: secret store to use (e.g AWS, Azure, GCP, etc) :param port: port number for the Oracle instance. Defaults to 1521. :param user_env_var: (optional) environment variable storing the username. :param password_env_var: (optional) environment variable storing the password. :param auth_type: authentication type (basic or kerberos). Defaults to basic. :param aws_region: AWS region where secrets are stored. Defaults to us-east-1. :param aws_auth_method: AWS authentication method (iam, iam-assume-role, access-key). Defaults to iam. :param azure_auth_method: Azure authentication method (managed_identity or service_principal). Defaults to managed_identity. :param secret_path: (optional) path to the secret in the secret manager. :param principal: (optional) Kerberos principal (required if using Kerberos authentication). :param azure_vault_name: (optional) Azure Key Vault name (required if using Azure secret store). :param agent_custom_config: (optional Custom JSON configuration for the agent.

:returns: crawler, set up to extraction from offline agent.

Source code in pyatlan/model/packages/oracle_crawler.py
def agent_config(
    self,
    hostname: str,
    default_db_name: str,
    sid: str,
    agent_name: str,
    port: int = 1521,
    user_env_var: Optional[str] = None,
    password_env_var: Optional[str] = None,
    secret_store: SecretStore = SecretStore.CUSTOM,
    auth_type: AuthType = AuthType.BASIC,
    aws_region: str = "us-east-1",
    aws_auth_method: AwsAuthMethod = AwsAuthMethod.IAM,
    azure_auth_method: AzureAuthMethod = AzureAuthMethod.MANAGED_IDENTITY,
    secret_path: Optional[str] = None,
    principal: Optional[str] = None,
    azure_vault_name: Optional[str] = None,
    agent_custom_config: Optional[Dict[str, Any]] = None,
) -> OracleCrawler:
    """
    Configure the agent for Oracle extraction.

    :param hostname: host address of the Oracle instance.
    :param default_db_name: default database name.
    :param sid: SID (system identifier) of the Oracle instance.
    :param agent_name: name of the agent.
    :param secret_store: secret store to use (e.g AWS, Azure, GCP, etc)
    :param port: port number for the Oracle instance. Defaults to `1521`.
    :param user_env_var: (optional) environment variable storing the username.
    :param password_env_var: (optional) environment variable storing the password.
    :param auth_type: authentication type (`basic` or `kerberos`). Defaults to `basic`.
    :param aws_region: AWS region where secrets are stored. Defaults to `us-east-1`.
    :param aws_auth_method: AWS authentication method (`iam`, `iam-assume-role`, `access-key`). Defaults to `iam`.
    :param azure_auth_method: Azure authentication method (`managed_identity` or `service_principal`). Defaults to `managed_identity`.
    :param secret_path: (optional) path to the secret in the secret manager.
    :param principal: (optional) Kerberos principal (required if using Kerberos authentication).
    :param azure_vault_name: (optional) Azure Key Vault name (required if using Azure secret store).
    :param agent_custom_config: (optional Custom JSON configuration for the agent.

    :returns: crawler, set up to extraction from offline agent.
    """
    self._agent_config = True
    _agent_dict = {
        "host": hostname,
        "port": port,
        "auth-type": auth_type,
        "database": default_db_name,
        "extra-service": sid,
        "agent-name": agent_name,
        "secret-manager": secret_store,
        "user-env": user_env_var,
        "password-env": password_env_var,
        "agent-config": agent_custom_config,
        "aws-auth-method": aws_auth_method,
        "aws-region": aws_region,
        "azure-auth-method": azure_auth_method,
    }
    if secret_path:
        _agent_dict["secret-path"] = secret_path
    if principal:
        _agent_dict["extra-principal"] = principal
    if agent_custom_config:
        _agent_dict["agent-config"] = agent_custom_config
    if azure_vault_name:
        _agent_dict["azure-vault-name"] = azure_vault_name
    self._parameters.append(dict(name="extraction-method", value="agent"))
    self._parameters.append(dict(name="agent-json", value=dumps(_agent_dict)))
    return self
basic_auth(username: str, password: str, sid: str, database_name: str) -> OracleCrawler

Set up the crawler to use basic authentication.

:param username: through which to access Oracle :param password: through which to access Oracle :param sid: SID (system identifier) of the Oracle instance :param database_name: database name to crawl :returns: crawler, set up to use basic authentication

Source code in pyatlan/model/packages/oracle_crawler.py
def basic_auth(
    self,
    username: str,
    password: str,
    sid: str,
    database_name: str,
) -> OracleCrawler:
    """
    Set up the crawler to use basic authentication.

    :param username: through which to access Oracle
    :param password: through which to access Oracle
    :param sid: SID (system identifier) of the Oracle instance
    :param database_name: database name to crawl
    :returns: crawler, set up to use basic authentication
    """
    local_creds = {
        "name": f"default-{self._NAME}-{self._epoch}-0",
        "auth_type": "basic",
        "username": username,
        "password": password,
        "extra": {"sid": sid, "databaseName": database_name},
    }
    self._credentials_body.update(local_creds)
    return self
direct(hostname: str, port: int = 1521) -> OracleCrawler

Set up the crawler to extract directly from Oracle.

:param hostname: hostname of the Oracle instance :param port: port number of Oracle instance, defaults to 1521 :returns: crawler, set up to extract directly from Oracle.

Source code in pyatlan/model/packages/oracle_crawler.py
def direct(
    self,
    hostname: str,
    port: int = 1521,
) -> OracleCrawler:
    """
    Set up the crawler to extract directly from Oracle.

    :param hostname: hostname of the Oracle instance
    :param port: port number of Oracle instance, defaults to `1521`
    :returns: crawler, set up to extract directly from Oracle.
    """
    local_creds = {
        "name": f"default-{self._NAME}-{self._epoch}-0",
        "host": hostname,
        "port": port,
        "connector_config_name": f"atlan-connectors-{self._NAME}",
    }
    self._credentials_body.update(local_creds)
    self._parameters.append(dict(name="extraction-method", value="direct"))
    return self
exclude(assets: dict) -> OracleCrawler

Defines the filter for assets to exclude when crawling.

:param assets: Map keyed by database name with each value being a list of schemas :returns: crawler, set to exclude only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/oracle_crawler.py
def exclude(self, assets: dict) -> OracleCrawler:
    """
    Defines the filter for assets to exclude when crawling.

    :param assets: Map keyed by database name with each value being a list of schemas
    :returns: crawler, set to exclude only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    exclude_assets = assets or {}
    to_exclude = self.build_hierarchical_filter(exclude_assets)
    self._parameters.append(
        dict(
            name="exclude-filter"
            if not self._agent_config
            else "exclude-filter-agent",
            value=to_exclude or "{}",
        )
    )
    return self
exclude_regex(regex: str) -> OracleCrawler

Defines the exclude regex for crawler ignore tables and views based on a naming convention.

:param regex: exclude regex for the crawler :returns: crawler, set to exclude only those assets specified in the regex

Source code in pyatlan/model/packages/oracle_crawler.py
def exclude_regex(self, regex: str) -> OracleCrawler:
    """
    Defines the exclude regex for crawler ignore
    tables and views based on a naming convention.

    :param regex: exclude regex for the crawler
    :returns: crawler, set to exclude
    only those assets specified in the regex
    """
    self._parameters.append(dict(name="temp-table-regex", value=regex))
    return self
include(assets: dict) -> OracleCrawler

Defines the filter for assets to include when crawling.

:param assets: Map keyed by database name with each value being a list of schemas :returns: crawler, set to include only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/oracle_crawler.py
def include(self, assets: dict) -> OracleCrawler:
    """
    Defines the filter for assets to include when crawling.

    :param assets: Map keyed by database name with each value being a list of schemas
    :returns: crawler, set to include only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    include_assets = assets or {}
    to_include = self.build_hierarchical_filter(include_assets)
    self._parameters.append(
        dict(
            dict(
                name="include-filter"
                if not self._agent_config
                else "include-filter-agent",
                value=to_include or "{}",
            )
        )
    )
    return self
jdbc_internal_methods(enable: bool) -> OracleCrawler

Defines whether to enable or disable JDBC internal methods for data extraction.

:param enable: whether to whether to enable (True) or disable (False) JDBC internal methods for data extraction :returns: crawler, with jdbc internal methods for data extraction

Source code in pyatlan/model/packages/oracle_crawler.py
def jdbc_internal_methods(self, enable: bool) -> OracleCrawler:
    """
    Defines whether to enable or disable JDBC
    internal methods for data extraction.

    :param enable: whether to whether to enable (`True`) or
    disable (`False`) JDBC internal methods for data extraction
    :returns: crawler, with jdbc internal methods for data extraction
    """
    self._advanced_config = True
    self._parameters.append(
        dict(name="use-jdbc-internal-methods", value="true" if enable else "false")
    )
    return self
s3(bucket_name: str, bucket_prefix: str) -> OracleCrawler

Set up the crawler to fetch metadata directly from the S3 bucket.

:param bucket_name: name of the S3 bucket containing the extracted metadata files :param bucket_prefix: prefix within the S3 bucket where the extracted metadata files are located :returns: crawler, configured to fetch metadata directly from the S3 bucket

Source code in pyatlan/model/packages/oracle_crawler.py
def s3(self, bucket_name: str, bucket_prefix: str) -> OracleCrawler:
    """
    Set up the crawler to fetch metadata directly from the S3 bucket.

    :param bucket_name: name of the S3 bucket containing the extracted metadata files
    :param bucket_prefix: prefix within the S3 bucket where the extracted metadata files are located
    :returns: crawler, configured to fetch metadata directly from the S3 bucket
    """
    self._parameters.append(dict(name="extraction-method", value="s3"))
    self._parameters.append(dict(name="metadata-s3-bucket", value=bucket_name))
    self._parameters.append(dict(name="metadata-s3-prefix", value=bucket_prefix))
    # Advanced configuration defaults
    self.jdbc_internal_methods(enable=True)
    self.source_level_filtering(enable=False)
    return self
source_level_filtering(enable: bool) -> OracleCrawler

Defines whether to enable or disable schema level filtering on source. schemas selected in the include filter will be fetched.

:param enable: whether to enable (True) or disable (False) schema level filtering on source :returns: crawler, with schema level filtering on source

Source code in pyatlan/model/packages/oracle_crawler.py
def source_level_filtering(self, enable: bool) -> OracleCrawler:
    """
    Defines whether to enable or disable schema level filtering on source.
    schemas selected in the include filter will be fetched.

    :param enable: whether to enable (`True`) or
    disable (`False`) schema level filtering on source
    :returns: crawler, with schema level filtering on source
    """
    self._advanced_config = True
    self._parameters.append(
        dict(
            name="use-source-schema-filtering", value="true" if enable else "false"
        )
    )
    return self

PostgresCrawler(client: AtlanClient, connection_name: str, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None, admin_users: Optional[List[str]] = None, allow_query: bool = True, allow_query_preview: bool = True, row_limit: int = 10000)

Bases: AbstractCrawler

Base configuration for a new PostgreSQL crawler.

:param client: connectivity to an Atlan tenant :param connection_name: name for the connection :param admin_roles: admin roles for the connection :param admin_groups: admin groups for the connection :param admin_users: admin users for the connection :param allow_query: allow data to be queried in the connection (True) or not (False), default: True :param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False), default: True :param row_limit: maximum number of rows that can be returned by a query, default: 10000

Source code in pyatlan/model/packages/postgres_crawler.py
def __init__(
    self,
    client: AtlanClient,
    connection_name: str,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
    admin_users: Optional[List[str]] = None,
    allow_query: bool = True,
    allow_query_preview: bool = True,
    row_limit: int = 10000,
):
    super().__init__(
        client=client,
        connection_name=connection_name,
        connection_type=self._CONNECTOR_TYPE,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
        admin_users=admin_users,
        allow_query=allow_query,
        allow_query_preview=allow_query_preview,
        row_limit=row_limit,
        source_logo=self._PACKAGE_LOGO,
    )
Functions
basic_auth(username: str, password: str) -> PostgresCrawler

Set up the crawler to use basic authentication.

:param username: through which to access PostgreSQL :param password: through which to access PostgreSQL :returns: crawler, set up to use basic authentication

Source code in pyatlan/model/packages/postgres_crawler.py
def basic_auth(self, username: str, password: str) -> PostgresCrawler:
    """
    Set up the crawler to use basic authentication.

    :param username: through which to access PostgreSQL
    :param password: through which to access PostgreSQL
    :returns: crawler, set up to use basic authentication
    """
    local_creds = {
        "auth_type": "basic",
        "username": username,
        "password": password,
    }
    self._credentials_body.update(local_creds)
    return self
direct(hostname: str, database: str, port: int = 5432) -> PostgresCrawler

Set up the crawler to extract directly from PostgreSQL.

:param hostname: hostname of the PostgreSQL instance :param database: database name to crawl :param port: port number of PostgreSQL instance, defaults to 5432 :returns: crawler, set up to extract directly from PostgreSQL

Source code in pyatlan/model/packages/postgres_crawler.py
def direct(
    self,
    hostname: str,
    database: str,
    port: int = 5432,
) -> PostgresCrawler:
    """
    Set up the crawler to extract directly from PostgreSQL.

    :param hostname: hostname of the PostgreSQL instance
    :param database: database name to crawl
    :param port: port number of PostgreSQL instance, defaults to `5432`
    :returns: crawler, set up to extract directly from PostgreSQL
    """
    local_creds = {
        "name": f"default-{self._NAME}-{self._epoch}-0",
        "host": hostname,
        "port": port,
        "extra": {"database": database},
        "connector_config_name": f"atlan-connectors-{self._NAME}",
    }
    self._credentials_body.update(local_creds)
    self._parameters.append(dict(name="extraction-method", value="direct"))
    return self
exclude(assets: dict) -> PostgresCrawler

Defines the filter for assets to exclude when crawling.

:param assets: Map keyed by database name with each value being a list of schemas :returns: crawler, set to exclude only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/postgres_crawler.py
def exclude(self, assets: dict) -> PostgresCrawler:
    """
    Defines the filter for assets to exclude when crawling.

    :param assets: Map keyed by database name with each value being a list of schemas
    :returns: crawler, set to exclude only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    exclude_assets = assets or {}
    to_exclude = self.build_hierarchical_filter(exclude_assets)
    self._parameters.append(dict(name="exclude-filter", value=to_exclude or "{}"))
    return self
exclude_regex(regex: str) -> PostgresCrawler

Defines the exclude regex for crawler ignore tables and views based on a naming convention.

:param regex: exclude regex for the crawler :returns: crawler, set to exclude only those assets specified in the regex

Source code in pyatlan/model/packages/postgres_crawler.py
def exclude_regex(self, regex: str) -> PostgresCrawler:
    """
    Defines the exclude regex for crawler ignore
    tables and views based on a naming convention.

    :param regex: exclude regex for the crawler
    :returns: crawler, set to exclude
    only those assets specified in the regex
    """
    self._parameters.append(dict(name="temp-table-regex", value=regex))
    return self
iam_role_auth(username: str, arn: str, external_id: str) -> PostgresCrawler

Set up the crawler to use IAM role-based authentication.

:param username: database username to extract from :param arn: ARN of the AWS role :param external_id: AWS external ID :returns: crawler, set up to use IAM user role-based authentication

Source code in pyatlan/model/packages/postgres_crawler.py
def iam_role_auth(
    self, username: str, arn: str, external_id: str
) -> PostgresCrawler:
    """
    Set up the crawler to use IAM role-based authentication.

    :param username: database username to extract from
    :param arn: ARN of the AWS role
    :param external_id: AWS external ID
    :returns: crawler, set up to use IAM user role-based authentication
    """
    local_creds = {
        "auth_type": "iam_role",
        "connector_type": "jdbc",
    }
    self._credentials_body["extra"].update(
        {"username": username, "aws_role_arn": arn, "aws_external_id": external_id}
    )
    self._credentials_body.update(local_creds)
    return self
iam_user_auth(username: str, access_key: str, secret_key: str) -> PostgresCrawler

Set up the crawler to use IAM user-based authentication.

:param username: database username to extract from :param access_key: through which to access PostgreSQL :param secret_key: through which to access PostgreSQL :returns: crawler, set up to use IAM user-based authentication

Source code in pyatlan/model/packages/postgres_crawler.py
def iam_user_auth(
    self, username: str, access_key: str, secret_key: str
) -> PostgresCrawler:
    """
    Set up the crawler to use IAM user-based authentication.

    :param username: database username to extract from
    :param access_key: through which to access PostgreSQL
    :param secret_key: through which to access PostgreSQL
    :returns: crawler, set up to use IAM user-based authentication
    """
    local_creds = {
        "auth_type": "iam_user",
        "connector_type": "jdbc",
        "username": access_key,
        "password": secret_key,
    }
    self._credentials_body["extra"].update({"username": username})
    self._credentials_body.update(local_creds)
    return self
include(assets: dict) -> PostgresCrawler

Defines the filter for assets to include when crawling.

:param assets: Map keyed by database name with each value being a list of schemas :returns: crawler, set to include only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/postgres_crawler.py
def include(self, assets: dict) -> PostgresCrawler:
    """
    Defines the filter for assets to include when crawling.

    :param assets: Map keyed by database name with each value being a list of schemas
    :returns: crawler, set to include only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    include_assets = assets or {}
    to_include = self.build_hierarchical_filter(include_assets)
    self._parameters.append(dict(name="include-filter", value=to_include or "{}"))
    return self
jdbc_internal_methods(enable: bool) -> PostgresCrawler

Defines whether to enable or disable JDBC internal methods for data extraction.

:param enable: whether to whether to enable (True) or disable (False) JDBC internal methods for data extraction :returns: crawler, with jdbc internal methods for data extraction

Source code in pyatlan/model/packages/postgres_crawler.py
def jdbc_internal_methods(self, enable: bool) -> PostgresCrawler:
    """
    Defines whether to enable or disable JDBC
    internal methods for data extraction.

    :param enable: whether to whether to enable (`True`) or
    disable (`False`) JDBC internal methods for data extraction
    :returns: crawler, with jdbc internal methods for data extraction
    """
    self._parameters.append(dict(name="use-jdbc-internal-methods", value=enable))
    return self
s3(bucket_name: str, bucket_prefix: str, bucket_region: Optional[str] = None) -> PostgresCrawler

Set up the crawler to extract from S3 bucket

:param bucket_name: name of the bucket/storage that contains the extracted metadata files :param bucket_prefix: prefix is everything after the bucket/storage name, including the path :param bucket_region: (Optional) name of the region if applicable :returns: crawler, set up to extract from S3 bucket

Source code in pyatlan/model/packages/postgres_crawler.py
def s3(
    self,
    bucket_name: str,
    bucket_prefix: str,
    bucket_region: Optional[str] = None,
) -> PostgresCrawler:
    """
    Set up the crawler to extract from S3 bucket

    :param bucket_name: name of the bucket/storage
    that contains the extracted metadata files
    :param bucket_prefix: prefix is everything after the
    bucket/storage name, including the `path`
    :param bucket_region: (Optional) name of the region if applicable
    :returns: crawler, set up to extract from S3 bucket
    """
    self._parameters.append(dict(name="extraction-method", value="s3"))
    self._parameters.append(dict(name="metadata-s3-bucket", value=bucket_name))
    self._parameters.append(dict(name="metadata-s3-prefix", value=bucket_prefix))
    self._parameters.append(dict(name="metadata-s3-region", value=bucket_region))
    return self
source_level_filtering(enable: bool) -> PostgresCrawler

Defines whether to enable or disable schema level filtering on source. schemas selected in the include filter will be fetched.

:param enable: whether to enable (True) or disable (False) schema level filtering on source :returns: crawler, with schema level filtering on source

Source code in pyatlan/model/packages/postgres_crawler.py
def source_level_filtering(self, enable: bool) -> PostgresCrawler:
    """
    Defines whether to enable or disable schema level filtering on source.
    schemas selected in the include filter will be fetched.

    :param enable: whether to enable (`True`) or
    disable (`False`) schema level filtering on source
    :returns: crawler, with schema level filtering on source
    """
    self._parameters.append(dict(name="use-source-schema-filtering", value=enable))
    return self

PowerBICrawler(client: AtlanClient, connection_name: str, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None, admin_users: Optional[List[str]] = None, allow_query: bool = False, allow_query_preview: bool = False, row_limit: int = 0)

Bases: AbstractCrawler

Base configuration for a new PowerBI crawler.

:param client: connectivity to an Atlan tenant :param connection_name: name for the connection :param admin_roles: admin roles for the connection :param admin_groups: admin groups for the connection :param admin_users: admin users for the connection :param allow_query: allow data to be queried in the connection (True) or not (False), default: False :param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False), default: False :param row_limit: maximum number of rows that can be returned by a query, default: 0

Source code in pyatlan/model/packages/powerbi_crawler.py
def __init__(
    self,
    client: AtlanClient,
    connection_name: str,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
    admin_users: Optional[List[str]] = None,
    allow_query: bool = False,
    allow_query_preview: bool = False,
    row_limit: int = 0,
):
    super().__init__(
        client=client,
        connection_name=connection_name,
        connection_type=self._CONNECTOR_TYPE,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
        admin_users=admin_users,
        allow_query=allow_query,
        allow_query_preview=allow_query_preview,
        row_limit=row_limit,
        source_logo=self._PACKAGE_LOGO,
    )
Functions
delegated_user(username: str, password: str, tenant_id: str, client_id: str, client_secret: str) -> PowerBICrawler

Set up the crawler to use delegated user authentication.

:param username: through which to access Power BI :param password: through which to access Power BI :param tenant_id: unique ID (GUID) of the tenant for Power BI :param client_id: unique ID (GUID) of the client for Power BI :param client_secret: through which to access Power BI :returns: crawler, set up to use delegated user authentication

Source code in pyatlan/model/packages/powerbi_crawler.py
def delegated_user(
    self,
    username: str,
    password: str,
    tenant_id: str,
    client_id: str,
    client_secret: str,
) -> PowerBICrawler:
    """
    Set up the crawler to use delegated user authentication.

    :param username: through which to access Power BI
    :param password: through which to access Power BI
    :param tenant_id: unique ID (GUID) of the tenant for Power BI
    :param client_id: unique ID (GUID) of the client for Power BI
    :param client_secret: through which to access Power BI
    :returns: crawler, set up to use delegated user authentication
    """
    local_creds = {
        "authType": "basic",
        "username": username,
        "password": password,
        "extra": {
            "tenantId": tenant_id,
            "clientId": client_id,
            "clientSecret": client_secret,
        },
    }
    self._credentials_body.update(local_creds)
    return self
direct() -> PowerBICrawler

Set up the crawler to extract directly from Power BI.

:returns: rawler, set up to extract directly from Power BI

Source code in pyatlan/model/packages/powerbi_crawler.py
def direct(self) -> PowerBICrawler:
    """
    Set up the crawler to extract directly from Power BI.

    :returns: rawler, set up to extract directly from Power BI
    """
    local_creds = {
        "name": f"default-{self._NAME}-{self._epoch}-0",
        "host": "api.powerbi.com",
        "port": 443,
        "connector_config_name": f"atlan-connectors-{self._NAME}",
    }
    self._credentials_body.update(local_creds)
    return self
direct_endorsements(enabled: bool = True) -> PowerBICrawler

Whether to directly attach endorsements as certificates (True), or instead raise these as requests

:param enabled: if True, endorsements will be directly set as certificates on assets, otherwise requests will be raised, default: True :returns: crawler, set to directly (or not) set certificates on assets for endorsements

Source code in pyatlan/model/packages/powerbi_crawler.py
def direct_endorsements(self, enabled: bool = True) -> PowerBICrawler:
    """
    Whether to directly attach endorsements as
    certificates (True), or instead raise these as requests

    :param enabled: if True, endorsements will be directly set as
    certificates on assets, otherwise requests will be raised, default: True
    :returns: crawler, set to directly (or not) set certificates on assets for endorsements
    """
    self._parameters.append(
        {
            "name": "endorsement-attach-mode",
            "value": "metastore" if enabled else "requests",
        }
    )
    return self
exclude(workspaces: List[str]) -> PowerBICrawler

Defines the filter for workspaces to exclude when crawling.

:param workspaces: the GUIDs of workspaces to exclude when crawling :return: crawler, set to exclude only those workspaces specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/powerbi_crawler.py
def exclude(self, workspaces: List[str]) -> PowerBICrawler:
    """
    Defines the filter for workspaces to exclude when crawling.

    :param workspaces: the GUIDs of workspaces to exclude when crawling
    :return: crawler, set to exclude only those workspaces specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    exclude_workspaces = workspaces or []
    to_exclude = self.build_flat_filter(exclude_workspaces)
    self._parameters.append(dict(name="exclude-filter", value=to_exclude or "{}"))
    return self
include(workspaces: List[str]) -> PowerBICrawler

Defines the filter for workspaces to include when crawling.

:param workspaces: the GUIDs of workspaces to include when crawling :return: crawler, set to include only those workspaces specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/powerbi_crawler.py
def include(self, workspaces: List[str]) -> PowerBICrawler:
    """
    Defines the filter for workspaces to include when crawling.

    :param workspaces: the GUIDs of workspaces to include when crawling
    :return: crawler, set to include only those workspaces specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    include_workspaces = workspaces or []
    to_include = self.build_flat_filter(include_workspaces)
    self._parameters.append(
        dict(dict(name="include-filter", value=to_include or "{}"))
    )
    return self
service_principal(tenant_id: str, client_id: str, client_secret: str) -> PowerBICrawler

Set up the crawler to use service principal authentication.

:param tenant_id: unique ID (GUID) of the tenant for Power BI :param client_id: unique ID (GUID) of the client for Power BI :param client_secret: through which to access Power BI :returns: crawler, set up to use service principal authentication

Source code in pyatlan/model/packages/powerbi_crawler.py
def service_principal(
    self, tenant_id: str, client_id: str, client_secret: str
) -> PowerBICrawler:
    """
    Set up the crawler to use service principal authentication.

    :param tenant_id: unique ID (GUID) of the tenant for Power BI
    :param client_id: unique ID (GUID) of the client for Power BI
    :param client_secret: through which to access Power BI
    :returns: crawler, set up to use service principal authentication
    """
    local_creds = {
        "authType": "service_principal",
        "connectorType": "rest",
        "extra": {
            "tenantId": tenant_id,
            "clientId": client_id,
            "clientSecret": client_secret,
        },
    }
    self._credentials_body.update(local_creds)
    return self

RelationalAssetsBuilder()

Bases: AbstractCustomPackage

Base configuration for the Relational Assets Builder package.

Source code in pyatlan/model/packages/relational_assets_builder.py
def __init__(
    self,
):
    super().__init__()
Functions
adls(client_id: str, client_secret: str, tenant_id: str, account_name: str, container: str) -> RelationalAssetsBuilder

Set up package to import metadata from ADLS.

:param client_id: unique application (client) ID assigned by Azure AD when the app was registered :param client_secret: client secret for authentication :param tenant_id: unique ID of the Azure Active Directory instance :param account_name: name of the storage account :param container: container to retrieve object store objects from

:returns: package, set up to import metadata from ADLS

Source code in pyatlan/model/packages/relational_assets_builder.py
def adls(
    self,
    client_id: str,
    client_secret: str,
    tenant_id: str,
    account_name: str,
    container: str,
) -> RelationalAssetsBuilder:
    """
    Set up package to import metadata from ADLS.

    :param client_id: unique application (client) ID assigned by Azure AD when the app was registered
    :param client_secret: client secret for authentication
    :param tenant_id: unique ID of the Azure Active Directory instance
    :param account_name: name of the storage account
    :param container: container to retrieve object store objects from

    :returns: package, set up to import metadata from ADLS
    """
    local_creds = {
        "name": f"csa-{self._NAME}-{self._epoch}-0",
        "auth_type": "adls",
        "username": client_id,
        "password": client_secret,
        "extra": {
            "azure_tenant_id": tenant_id,
            "storage_account_name": account_name,
            "adls_container": container,
        },
        "connector_config_name": "csa-connectors-objectstore",
    }
    self._credentials_body.update(local_creds)
    return self
assets_semantics(input_handling: AssetInputHandling = AssetInputHandling.UPSERT, delta_handling: AssetDeltaHandling = AssetDeltaHandling.INCREMENTAL, removal_type: AssetRemovalType = AssetRemovalType.ARCHIVE) -> RelationalAssetsBuilder

Set up the package to import metadata with semantics.

:param input_handling: Whether to allow the creation of new (full or partial) assets from the input CSV, or ensure assets are only updated if they already exist in Atlan. :param delta_handling: Whether to treat the input file as an initial load, full replacement (deleting any existing assets not in the file) or only incremental (no deletion of existing assets). :param removal_type: If delta_handling is set to FULL_REPLACEMENT, this parameter specifies whether to delete any assets not found in the latest file by archive (recoverable) or purge (non-recoverable). If delta_handling is set to INCREMENTAL, this parameter is ignored and assets are archived.

:returns: package, set up to import metadata with semantics

Source code in pyatlan/model/packages/relational_assets_builder.py
def assets_semantics(
    self,
    input_handling: AssetInputHandling = AssetInputHandling.UPSERT,
    delta_handling: AssetDeltaHandling = AssetDeltaHandling.INCREMENTAL,
    removal_type: AssetRemovalType = AssetRemovalType.ARCHIVE,
) -> RelationalAssetsBuilder:
    """
    Set up the package to import metadata with semantics.

    :param input_handling: Whether to allow the creation of new (full or partial) assets from the input CSV,
                 or ensure assets are only updated if they already exist in Atlan.
    :param delta_handling: Whether to treat the input file as an initial load, full replacement (deleting any
                 existing assets not in the file) or only incremental (no deletion of existing assets).
    :param removal_type: If `delta_handling` is set to `FULL_REPLACEMENT`, this parameter specifies whether to
                delete any assets not found in the latest file by archive (recoverable) or purge (non-recoverable).
                If `delta_handling` is set to `INCREMENTAL`, this parameter is ignored and assets are archived.

    :returns: package, set up to import metadata with semantics
    """
    self._parameters.append(
        {"name": "assets_upsert_semantic", "value": input_handling}
    )
    self._parameters.append({"name": "delta_semantic", "value": delta_handling})
    if delta_handling == AssetDeltaHandling.FULL_REPLACEMENT:
        self._parameters.append(
            {"name": "delta_removal_type", "value": removal_type}
        )
    else:
        self._parameters.append(
            {"name": "delta_removal_type", "value": AssetRemovalType.ARCHIVE}
        )
    return self
direct() -> RelationalAssetsBuilder

Set up package to directly upload the file.

Source code in pyatlan/model/packages/relational_assets_builder.py
def direct(self) -> RelationalAssetsBuilder:
    """
    Set up package to directly upload the file.
    """
    self._parameters.append({"name": "import_type", "value": "DIRECT"})
    return self
gcs(project_id: str, service_account_json: str, bucket: str) -> RelationalAssetsBuilder

Set up package to import metadata from GCS.

:param project_id: ID of GCP project :param service_account_json: service account credentials in JSON format :param bucket: the bucket from which to retrieve the object store object(s)

:returns: Package set up to import metadata from GCS

Source code in pyatlan/model/packages/relational_assets_builder.py
def gcs(
    self, project_id: str, service_account_json: str, bucket: str
) -> RelationalAssetsBuilder:
    """
    Set up package to import metadata from GCS.

    :param project_id: ID of GCP project
    :param service_account_json: service account credentials in JSON format
    :param bucket: the bucket from which to retrieve the object store object(s)

    :returns: Package set up to import metadata from GCS
    """
    local_creds = {
        "name": f"csa-{self._NAME}-{self._epoch}-0",
        "auth_type": "gcs",
        "username": project_id,
        "password": service_account_json,
        "extra": {
            "gcs_bucket": bucket,
        },
        "connector_config_name": "csa-connectors-objectstore",
    }
    self._credentials_body.update(local_creds)
    return self
object_store(prefix: Optional[str] = None, object_key: Optional[str] = None) -> RelationalAssetsBuilder

Set up the package to import metadata directly from the object store.

:param prefix: directory (path) within the bucket/container from which to retrieve the object(s). :param object_key: object key (filename), including its extension, within the bucket/container and prefix.

:returns: package, set up to import metadata from object store

Source code in pyatlan/model/packages/relational_assets_builder.py
def object_store(
    self, prefix: Optional[str] = None, object_key: Optional[str] = None
) -> RelationalAssetsBuilder:
    """
    Set up the package to import
    metadata directly from the object store.

    :param prefix: directory (path) within the bucket/container from which to retrieve the object(s).
    :param object_key: object key (filename), including its extension, within the bucket/container and
    prefix.

    :returns: package, set up to import metadata from object store
    """
    self._parameters.append({"name": "import_type", "value": "CLOUD"})
    self._parameters.append({"name": "assets_prefix", "value": prefix})
    self._parameters.append({"name": "assets_key", "value": object_key})
    self._parameters.append({"name": "cloud_source", "value": "{{credentialGuid}}"})
    return self
options(remove_attributes: Optional[Union[List[str], List[AtlanField]]] = None, fail_on_errors: Optional[bool] = None, field_separator: Optional[str] = None, batch_size: Optional[int] = None) -> RelationalAssetsBuilder

Set up package to import assets with advanced configuration.

:param remove_attributes: list of attributes to clear (remove) from assets if their value is blank in the provided file. :param fail_on_errors: specifies whether an invalid value in a field should cause the import to fail (True) or log a warning, skip that value, and proceed (False). :param field_separator: character used to separate fields in the input file (e.g., ',' or ';'). :param batch_size: maximum number of rows to process at a time (per API request).

:returns: package, set up to import assets with advanced configuration

Source code in pyatlan/model/packages/relational_assets_builder.py
def options(
    self,
    remove_attributes: Optional[Union[List[str], List[AtlanField]]] = None,
    fail_on_errors: Optional[bool] = None,
    field_separator: Optional[str] = None,
    batch_size: Optional[int] = None,
) -> RelationalAssetsBuilder:
    """
    Set up package to import assets with advanced configuration.

    :param remove_attributes: list of attributes to clear (remove)
        from assets if their value is blank in the provided file.
    :param fail_on_errors: specifies whether an invalid value
        in a field should cause the import to fail (`True`) or
        log a warning, skip that value, and proceed (`False`).
    :param field_separator: character used to separate
        fields in the input file (e.g., ',' or ';').
    :param batch_size: maximum number of rows
        to process at a time (per API request).

    :returns: package, set up to import assets with advanced configuration
    """

    if isinstance(remove_attributes, list) and all(
        isinstance(field, AtlanField) for field in remove_attributes
    ):
        remove_attributes = [field.atlan_field_name for field in remove_attributes]  # type: ignore
    params = {
        "assets_attr_to_overwrite": dumps(remove_attributes, separators=(",", ":")),
        "assets_fail_on_errors": fail_on_errors,
        "assets_field_separator": field_separator,
        "assets_batch_size": batch_size,
    }
    self._add_optional_params(params)
    return self
s3(access_key: str, secret_key: str, region: str, bucket: str) -> RelationalAssetsBuilder

Set up package to import metadata from S3.

:param access_key: AWS access key :param secret_key: AWS secret key :param region: AWS region :param bucket: Enter the bucket from which to retrieve the object store object(s).

:returns: package, set up to import metadata from S3

Source code in pyatlan/model/packages/relational_assets_builder.py
def s3(
    self,
    access_key: str,
    secret_key: str,
    region: str,
    bucket: str,
) -> RelationalAssetsBuilder:
    """
    Set up package to import metadata from S3.

    :param access_key: AWS access key
    :param secret_key: AWS secret key
    :param region: AWS region
    :param bucket: Enter the bucket from which to retrieve the object store object(s).

    :returns: package, set up to import metadata from S3
    """
    local_creds = {
        "name": f"csa-{self._NAME}-{self._epoch}-0",
        "auth_type": "s3",
        "username": access_key,
        "password": secret_key,
        "extra": {
            "region": region,
            "s3_bucket": bucket,
        },
        "connector_config_name": "csa-connectors-objectstore",
    }
    self._credentials_body.update(local_creds)
    return self

SQLServerCrawler(client: AtlanClient, connection_name: str, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None, admin_users: Optional[List[str]] = None, allow_query: bool = True, allow_query_preview: bool = True, row_limit: int = 10000)

Bases: AbstractCrawler

Base configuration for a new Microsoft SQL Server crawler.

:param connection_name: name for the connection :param admin_roles: admin roles for the connection :param admin_groups: admin groups for the connection :param admin_users: admin users for the connection :param allow_query: allow data to be queried in the connection (True) or not (False), default: True :param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False), default: True :param row_limit: maximum number of rows that can be returned by a query, default: 10000

Source code in pyatlan/model/packages/s_q_l_server_crawler.py
def __init__(
    self,
    client: AtlanClient,
    connection_name: str,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
    admin_users: Optional[List[str]] = None,
    allow_query: bool = True,
    allow_query_preview: bool = True,
    row_limit: int = 10000,
):
    super().__init__(
        client=client,
        connection_name=connection_name,
        connection_type=self._CONNECTOR_TYPE,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
        admin_users=admin_users,
        allow_query=allow_query,
        allow_query_preview=allow_query_preview,
        row_limit=row_limit,
        source_logo=self._PACKAGE_LOGO,
    )
Functions
basic_auth(username: str, password: str) -> SQLServerCrawler

Set up the crawler to use basic authentication.

:param username: through which to access SQL Server :param password: through which to access SQL Server :returns: crawler, set up to use basic authentication

Source code in pyatlan/model/packages/s_q_l_server_crawler.py
def basic_auth(self, username: str, password: str) -> SQLServerCrawler:
    """
    Set up the crawler to use basic authentication.

    :param username: through which to access SQL Server
    :param password: through which to access SQL Server
    :returns: crawler, set up to use basic authentication
    """
    local_creds = {
        "authType": "basic",
        "username": username,
        "password": password,
    }
    self._credentials_body.update(local_creds)
    return self
direct(hostname: str, database: str, port: int = 1433) -> SQLServerCrawler

Set up the crawler to extract directly from the database.

:param hostname: hostname of the SQL Server host :param database: name of the database to extract :param port: port number of the SQL Server host, default: 1433 :returns: crawler, set up to extract directly from the database

Source code in pyatlan/model/packages/s_q_l_server_crawler.py
def direct(
    self, hostname: str, database: str, port: int = 1433
) -> SQLServerCrawler:
    """
    Set up the crawler to extract directly from the database.

    :param hostname: hostname of the SQL Server host
    :param database: name of the database to extract
    :param port: port number of the SQL Server host, default: `1433`
    :returns: crawler, set up to extract directly from the database
    """
    local_creds = {
        "name": f"default-{self._NAME}-{self._epoch}-0",
        "host": hostname,
        "port": port,
        "extra": {"database": database},
        "connector_config_name": f"atlan-connectors-{self._NAME}",
    }
    self._credentials_body.update(local_creds)
    return self
exclude(assets: dict) -> SQLServerCrawler

Defines the filter for assets to exclude when crawling.

:param assets: map keyed by database name with each value being a list of schemas :returns: crawler, set to exclude only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/s_q_l_server_crawler.py
def exclude(self, assets: dict) -> SQLServerCrawler:
    """
    Defines the filter for assets to exclude when crawling.

    :param assets: map keyed by database name
    with each value being a list of schemas
    :returns: crawler, set to exclude only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    exclude_assets = assets or {}
    to_exclude = self.build_hierarchical_filter(exclude_assets)
    self._parameters.append(dict(name="exclude-filter", value=to_exclude or "{}"))
    return self
include(assets: dict) -> SQLServerCrawler

Defines the filter for assets to include when crawling.

:param assets: map keyed by database name with each value being a list of schemas :returns: crawler, set to include only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/s_q_l_server_crawler.py
def include(self, assets: dict) -> SQLServerCrawler:
    """
    Defines the filter for assets to include when crawling.

    :param assets: map keyed by database name
    with each value being a list of schemas
    :returns: crawler, set to include only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    include_assets = assets or {}
    to_include = self.build_hierarchical_filter(include_assets)
    self._parameters.append(dict(name="include-filter", value=to_include or "{}"))
    return self

SigmaCrawler(client: AtlanClient, connection_name: str, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None, admin_users: Optional[List[str]] = None, allow_query: bool = False, allow_query_preview: bool = False, row_limit: int = 0)

Bases: AbstractCrawler

Base configuration for a new Sigma crawler.

:param client: connectivity to an Atlan tenant :param connection_name: name for the connection :param admin_roles: admin roles for the connection :param admin_groups: admin groups for the connection :param admin_users: admin users for the connection :param allow_query: allow data to be queried in the connection (True) or not (False), default: False :param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False), default: False :param row_limit: maximum number of rows that can be returned by a query, default: 0

Source code in pyatlan/model/packages/sigma_crawler.py
def __init__(
    self,
    client: AtlanClient,
    connection_name: str,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
    admin_users: Optional[List[str]] = None,
    allow_query: bool = False,
    allow_query_preview: bool = False,
    row_limit: int = 0,
):
    super().__init__(
        client=client,
        connection_name=connection_name,
        connection_type=self._CONNECTOR_TYPE,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
        admin_users=admin_users,
        allow_query=allow_query,
        allow_query_preview=allow_query_preview,
        row_limit=row_limit,
        source_logo=self._PACKAGE_LOGO,
    )
Functions
api_token(client_id: str, api_token: str) -> SigmaCrawler

Set up the crawler to use API token-based authentication.

:param client_id: through which to access Sigma :param api_token: through which to access Sigma :returns: crawler, set up to use API token-based authentication

Source code in pyatlan/model/packages/sigma_crawler.py
def api_token(
    self,
    client_id: str,
    api_token: str,
) -> SigmaCrawler:
    """
    Set up the crawler to use API token-based authentication.

    :param client_id: through which to access Sigma
    :param api_token: through which to access Sigma
    :returns: crawler, set up to use API token-based authentication
    """
    local_creds = {
        "username": client_id,
        "password": api_token,
        "auth_type": "api_token",
    }
    self._credentials_body.update(local_creds)
    return self
direct(hostname: SigmaCrawler.Hostname, port: int = 443) -> SigmaCrawler

Set up the crawler to extract directly from Sigma.

:param hostname: of the Sigma host, for example SigmaCrawler.Hostname.AWS :param port: of the Sigma host, default: 443 :returns: crawler, set up to extract directly from Sigma

Source code in pyatlan/model/packages/sigma_crawler.py
def direct(self, hostname: SigmaCrawler.Hostname, port: int = 443) -> SigmaCrawler:
    """
    Set up the crawler to extract directly from Sigma.

    :param hostname: of the Sigma host, for example `SigmaCrawler.Hostname.AWS`
    :param port: of the Sigma host, default: `443`
    :returns: crawler, set up to extract directly from Sigma
    """
    local_creds = {
        "name": f"default-{self._NAME}-{self._epoch}-0",
        "host": hostname,
        "port": port,
        "extra": {},
        "connector_config_name": f"atlan-connectors-{self._NAME}",
    }
    self._credentials_body.update(local_creds)
    return self
exclude(workbooks: List[str]) -> SigmaCrawler

Defines the filter for Sigma workbooks to exclude when crawling.

:param workbooks: the GUIDs of workbooks to exclude when crawling, default to no workbooks if None are specified :returns: crawler, set to exclude only those workbooks specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/sigma_crawler.py
def exclude(self, workbooks: List[str]) -> SigmaCrawler:
    """
    Defines the filter for Sigma workbooks to exclude when crawling.

    :param workbooks: the GUIDs of workbooks to exclude when crawling,
    default to no workbooks if `None` are specified
    :returns: crawler, set to exclude only those workbooks specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    exclude_workbooks = workbooks or []
    to_exclude = self.build_flat_filter(exclude_workbooks)
    self._parameters.append(dict(name="exclude-filter", value=to_exclude or "{}"))
    return self
include(workbooks: List[str]) -> SigmaCrawler

Defines the filter for Sigma workbooks to include when crawling.

:param workbooks: the GUIDs of workbooks to include when crawling, default to no workbooks if None are specified :returns: crawler, set to include only those workbooks specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/sigma_crawler.py
def include(self, workbooks: List[str]) -> SigmaCrawler:
    """
    Defines the filter for Sigma workbooks to include when crawling.

    :param workbooks: the GUIDs of workbooks to include when crawling,
    default to no workbooks if `None` are specified
    :returns: crawler, set to include only those workbooks specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    include_workbooks = workbooks or []
    to_include = self.build_flat_filter(include_workbooks)
    self._parameters.append(
        dict(dict(name="include-filter", value=to_include or "{}"))
    )
    return self

SnowflakeCrawler(client: AtlanClient, connection_name: str, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None, admin_users: Optional[List[str]] = None, allow_query: bool = True, allow_query_preview: bool = True, row_limit: int = 10000)

Bases: AbstractCrawler

Base configuration for a new Snowflake crawler.

:param client: connectivity to an Atlan tenant :param connection_name: name for the connection :param admin_roles: admin roles for the connection :param admin_groups: admin groups for the connection :param admin_users: admin users for the connection :param allow_query: allow data to be queried in the connection (True) or not (False), default: True :param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False), default: True :param row_limit: maximum number of rows that can be returned by a query, default: 10000

Source code in pyatlan/model/packages/snowflake_crawler.py
def __init__(
    self,
    client: AtlanClient,
    connection_name: str,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
    admin_users: Optional[List[str]] = None,
    allow_query: bool = True,
    allow_query_preview: bool = True,
    row_limit: int = 10000,
):
    super().__init__(
        client=client,
        connection_name=connection_name,
        connection_type=self._CONNECTOR_TYPE,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
        admin_users=admin_users,
        allow_query=allow_query,
        allow_query_preview=allow_query_preview,
        row_limit=row_limit,
        source_logo=self._PACKAGE_LOGO,
    )
Functions
account_usage(hostname: str, database_name: str, schema_name: str) -> SnowflakeCrawler

Set the crawler to extract using Snowflake's account usage database and schema.

:param hostname: hostname of the Snowflake instance :param database_name: name of the database to use :param schema_name: name of the schema to use :returns: crawler, set to extract using account usage

Source code in pyatlan/model/packages/snowflake_crawler.py
def account_usage(
    self, hostname: str, database_name: str, schema_name: str
) -> SnowflakeCrawler:
    """
    Set the crawler to extract using Snowflake's account usage database and schema.

    :param hostname: hostname of the Snowflake instance
    :param database_name: name of the database to use
    :param schema_name: name of the schema to use
    :returns: crawler, set to extract using account usage
    """
    local_creds = {
        "host": hostname,
        "name": f"default-snowflake-{self._epoch}-0",
        "connector_config_name": "atlan-connectors-snowflake",
    }
    self._credentials_body.update(local_creds)
    self._parameters.append(
        {"name": "account-usage-database-name", "value": database_name}
    )
    self._parameters.append(
        {"name": "account-usage-schema-name", "value": schema_name}
    )
    return self
basic_auth(username: str, password: str, role: str, warehouse: str) -> SnowflakeCrawler

Set up the crawler to use basic authentication.

:param username: through which to access Snowflake :param password: through which to access Snowflake :param role: name of the role within Snowflake to crawl through :param warehouse: name of the warehouse within Snowflake to crawl through :returns: crawler, set up to use basic authentication

Source code in pyatlan/model/packages/snowflake_crawler.py
def basic_auth(
    self, username: str, password: str, role: str, warehouse: str
) -> SnowflakeCrawler:
    """
    Set up the crawler to use basic authentication.

    :param username: through which to access Snowflake
    :param password: through which to access Snowflake
    :param role: name of the role within Snowflake to crawl through
    :param warehouse: name of the warehouse within Snowflake to crawl through
    :returns: crawler, set up to use basic authentication
    """
    local_creds = {
        "name": f"default-snowflake-{self._epoch}-0",
        "port": 443,
        "auth_type": "basic",
        "username": username,
        "password": password,
        "extras": {"role": role, "warehouse": warehouse},
    }
    self._credentials_body.update(local_creds)
    return self
exclude(assets: dict) -> SnowflakeCrawler

Defines the filter for assets to exclude when crawling.

:param assets: Map keyed by database name with each value being a list of schemas :returns: crawler, set to exclude only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/snowflake_crawler.py
def exclude(self, assets: dict) -> SnowflakeCrawler:
    """
    Defines the filter for assets to exclude when crawling.

    :param assets: Map keyed by database name with each value being a list of schemas
    :returns: crawler, set to exclude only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    exclude_assets = assets or {}
    to_exclude = self.build_hierarchical_filter(exclude_assets)
    self._parameters.append(dict(name="exclude-filter", value=to_exclude or "{}"))
    return self
include(assets: dict) -> SnowflakeCrawler

Defines the filter for assets to include when crawling.

:param assets: Map keyed by database name with each value being a list of schemas :returns: crawler, set to include only those assets specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/snowflake_crawler.py
def include(self, assets: dict) -> SnowflakeCrawler:
    """
    Defines the filter for assets to include when crawling.

    :param assets: Map keyed by database name with each value being a list of schemas
    :returns: crawler, set to include only those assets specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    include_assets = assets or {}
    to_include = self.build_hierarchical_filter(include_assets)
    self._parameters.append(
        dict(dict(name="include-filter", value=to_include or "{}"))
    )
    return self
information_schema(hostname: str) -> SnowflakeCrawler

Set the crawler to extract using Snowflake's information schema.

:param hostname: hostname of the Snowflake instance :returns: crawler, set to extract using information schema

Source code in pyatlan/model/packages/snowflake_crawler.py
def information_schema(self, hostname: str) -> SnowflakeCrawler:
    """
    Set the crawler to extract using Snowflake's information schema.

    :param hostname: hostname of the Snowflake instance
    :returns: crawler, set to extract using information schema
    """
    local_creds = {
        "host": hostname,
        "name": f"default-snowflake-{self._epoch}-0",
        "connector_config_name": "atlan-connectors-snowflake",
    }
    parameters = {"name": "extract-strategy", "value": "information-schema"}
    self._credentials_body.update(local_creds)
    self._parameters.append(parameters)
    return self
keypair_auth(username: str, private_key: str, private_key_password: str, role: str, warehouse: str) -> SnowflakeCrawler

Set up the crawler to use keypair-based authentication.

:param username: through which to access Snowflake :param private_key: encrypted private key for authenticating with Snowflake :param private_key_password: password for the encrypted private key :param role: name of the role within Snowflake to crawl through :param warehouse: name of the warehouse within Snowflake to crawl through :returns: crawler, set up to use keypair-based authentication

Source code in pyatlan/model/packages/snowflake_crawler.py
def keypair_auth(
    self,
    username: str,
    private_key: str,
    private_key_password: str,
    role: str,
    warehouse: str,
) -> SnowflakeCrawler:
    """
    Set up the crawler to use keypair-based authentication.

    :param username: through which to access Snowflake
    :param private_key: encrypted private key for authenticating with Snowflake
    :param private_key_password: password for the encrypted private key
    :param role: name of the role within Snowflake to crawl through
    :param warehouse: name of the warehouse within Snowflake to crawl through
    :returns: crawler, set up to use keypair-based authentication
    """
    local_creds = {
        "name": f"default-snowflake-{self._epoch}-0",
        "port": 443,
        "auth_type": "keypair",
        "username": username,
        "password": private_key,
        "extras": {
            "role": role,
            "warehouse": warehouse,
            "private_key_password": private_key_password,
        },
    }
    self._credentials_body.update(local_creds)
    return self
lineage(include: bool = True) -> SnowflakeCrawler

Whether to enable lineage as part of crawling Snowflake.

:param include: if True, lineage will be included while crawling Snowflake, default: True :returns: crawler, set to include or exclude lineage

Source code in pyatlan/model/packages/snowflake_crawler.py
def lineage(self, include: bool = True) -> SnowflakeCrawler:
    """
    Whether to enable lineage as part of crawling Snowflake.

    :param include: if True, lineage will be included while crawling Snowflake, default: True
    :returns: crawler, set to include or exclude lineage
    """
    self._parameters.append(
        {"name": "enable-lineage", "value": "true" if include else "false"}
    )
    return self
tags(include: bool = False) -> SnowflakeCrawler

Whether to enable Snowflake tag syncing as part of crawling Snowflake.

:param include: Whether true, tags in Snowflake will be included while crawling Snowflake :returns: crawler, set to include or exclude Snowflake tags

Source code in pyatlan/model/packages/snowflake_crawler.py
def tags(self, include: bool = False) -> SnowflakeCrawler:
    """
    Whether to enable Snowflake tag syncing as part of crawling Snowflake.

    :param include: Whether true, tags in Snowflake will be included while crawling Snowflake
    :returns: crawler, set to include or exclude Snowflake tags
    """
    self._parameters.append(
        {"name": "enable-snowflake-tag", "value": "true" if include else "false"}
    )
    return self

SnowflakeMiner(connection_qualified_name: str)

Bases: AbstractMiner

Base configuration for a new Snowflake miner.

:param connection_qualified_name: unique name of the Snowflake connection whose assets should be mined

Source code in pyatlan/model/packages/snowflake_miner.py
def __init__(
    self,
    connection_qualified_name: str,
):
    self._advanced_config = False
    super().__init__(connection_qualified_name=connection_qualified_name)
Functions
custom_config(config: Dict) -> SnowflakeMiner

Defines custom JSON configuration controlling experimental feature flags for the miner.

:param config: custom configuration dict :returns: miner, set to include custom configuration

Source code in pyatlan/model/packages/snowflake_miner.py
def custom_config(self, config: Dict) -> SnowflakeMiner:
    """
    Defines custom JSON configuration controlling
    experimental feature flags for the miner.

    :param config: custom configuration dict
    :returns: miner, set to include custom configuration
    """
    config and self._parameters.append(
        dict(name="control-config", value=dumps(config))
    )
    self._advanced_config = True
    return self
direct(start_epoch: int, database: Optional[str] = None, schema: Optional[str] = None) -> SnowflakeMiner

Set up the miner to extract directly from Snowflake.

:param start_epoch: date and time from which to start mining, as an epoch :param database: name of the database to extract from (cloned database) :param schema: name of the schema to extract from (cloned database) :returns: miner, set up to extract directly from Snowflake

Source code in pyatlan/model/packages/snowflake_miner.py
def direct(
    self,
    start_epoch: int,
    database: Optional[str] = None,
    schema: Optional[str] = None,
) -> SnowflakeMiner:
    """
    Set up the miner to extract directly from Snowflake.

    :param start_epoch: date and time from which to start mining, as an epoch
    :param database: name of the database to extract from (cloned database)
    :param schema: name of the schema to extract from (cloned database)
    :returns: miner, set up to extract directly from Snowflake
    """
    # In case of default database
    if not (database or schema):
        self._parameters.append(dict(name="snowflake-database", value="default"))
    # In case of cloned database
    else:
        self._parameters.append(dict(name="database-name", value=database))
        self._parameters.append(dict(name="schema-name", value=schema))
    self._parameters.append(dict(name="extraction-method", value="query_history"))
    self._parameters.append(
        dict(name="miner-start-time-epoch", value=str(start_epoch))
    )
    return self
exclude_users(users: List[str]) -> SnowflakeMiner

Defines users who should be excluded when calculating usage metrics for assets (for example, system accounts).

:param users: list of users to exclude when calculating usage metrics :returns: miner, set to exclude the specified users from usage metrics :raises InvalidRequestException: in the unlikely event the provided list cannot be translated

Source code in pyatlan/model/packages/snowflake_miner.py
def exclude_users(self, users: List[str]) -> SnowflakeMiner:
    """
    Defines users who should be excluded when calculating
    usage metrics for assets (for example, system accounts).

    :param users: list of users to exclude when calculating usage metrics
    :returns: miner, set to exclude the specified users from usage metrics
    :raises InvalidRequestException: in the unlikely event the provided
    list cannot be translated
    """
    exclude_users = users or []
    self._parameters.append(
        dict(
            name="popularity-exclude-user-config",
            value=dumps(exclude_users) if exclude_users else "[]",
        )
    )
    return self
native_lineage(enabled: bool) -> SnowflakeMiner

Whether to enable native lineage from Snowflake, using Snowflake's ACCESS_HISTORY.OBJECTS_MODIFIED Column. Note: this is only available only for Snowflake Enterprise customers.

:param enabled: if True, native lineage from Snowflake will be used for crawling :returns: miner, set to include / exclude native lineage from Snowflake

Source code in pyatlan/model/packages/snowflake_miner.py
def native_lineage(self, enabled: bool) -> SnowflakeMiner:
    """
    Whether to enable native lineage from Snowflake, using
    Snowflake's ACCESS_HISTORY.OBJECTS_MODIFIED Column.
    Note: this is only available only for Snowflake Enterprise customers.

    :param enabled: if True, native lineage from Snowflake will be used for crawling
    :returns: miner, set to include / exclude native lineage from Snowflake
    """
    self._advanced_config = True
    self._parameters.append(
        dict(name="native-lineage-active", value="true" if enabled else "false")
    )
    return self
popularity_window(days: int = 30) -> SnowflakeMiner

Defines number of days to consider for calculating popularity.

:param days: number of days to consider, defaults to 30 :returns: miner, set to include popularity window

Source code in pyatlan/model/packages/snowflake_miner.py
def popularity_window(self, days: int = 30) -> SnowflakeMiner:
    """
    Defines number of days to consider for calculating popularity.

    :param days: number of days to consider, defaults to 30
    :returns: miner, set to include popularity window
    """
    self._advanced_config = True
    self._parameters.append(dict(name="popularity-window-days", value=str(days)))
    return self
s3(s3_bucket: str, s3_prefix: str, sql_query_key: str, default_database_key: str, default_schema_key: str, session_id_key: str, s3_bucket_region: Optional[str] = None) -> SnowflakeMiner

Set up the miner to extract from S3 (using JSON line-separated files).

:param s3_bucket: S3 bucket where the JSON line-separated files are located :param s3_prefix: prefix within the S3 bucket in which the JSON line-separated files are located :param sql_query_key: JSON key containing the query definition :param default_database_key: JSON key containing the default database name to use if a query is not qualified with database name :param default_schema_key: JSON key containing the default schema name to use if a query is not qualified with schema name :param session_id_key: JSON key containing the session ID of the SQL query :param s3_bucket_region: (Optional) region of the S3 bucket if applicable :returns: miner, set up to extract from a set of JSON line-separated files in S3

Source code in pyatlan/model/packages/snowflake_miner.py
def s3(
    self,
    s3_bucket: str,
    s3_prefix: str,
    sql_query_key: str,
    default_database_key: str,
    default_schema_key: str,
    session_id_key: str,
    s3_bucket_region: Optional[str] = None,
) -> SnowflakeMiner:
    """
    Set up the miner to extract from S3 (using JSON line-separated files).

    :param s3_bucket: S3 bucket where the JSON line-separated files are located
    :param s3_prefix: prefix within the S3 bucket in
    which the JSON line-separated files are located
    :param sql_query_key: JSON key containing the query definition
    :param default_database_key: JSON key containing the default
    database name to use if a query is not qualified with database name
    :param default_schema_key: JSON key containing the default schema name
    to use if a query is not qualified with schema name
    :param session_id_key: JSON key containing the session ID of the SQL query
    :param s3_bucket_region: (Optional) region of the S3 bucket if applicable
    :returns: miner, set up to extract from a set of JSON line-separated files in S3
    """
    self._parameters.append(dict(name="extraction-method", value="s3"))
    self._parameters.append(dict(name="extraction-s3-bucket", value=s3_bucket))
    self._parameters.append(dict(name="extraction-s3-prefix", value=s3_prefix))
    self._parameters.append(dict(name="sql-json-key", value=sql_query_key))
    self._parameters.append(
        dict(name="catalog-json-key", value=default_database_key)
    )
    self._parameters.append(dict(name="schema-json-key", value=default_schema_key))
    self._parameters.append(dict(name="session-json-key", value=session_id_key))
    s3_bucket_region and self._parameters.append(
        dict(name="extraction-s3-region", value=s3_bucket_region)
    )
    return self

TableauCrawler(client: AtlanClient, connection_name: str, admin_roles: Optional[List[str]] = None, admin_groups: Optional[List[str]] = None, admin_users: Optional[List[str]] = None, allow_query: bool = False, allow_query_preview: bool = False, row_limit: int = 0)

Bases: AbstractCrawler

Base configuration for a new Tableau crawler.

:param client: connectivity to an Atlan tenant :param connection_name: name for the connection :param admin_roles: admin roles for the connection :param admin_groups: admin groups for the connection :param admin_users: admin users for the connection :param allow_query: allow data to be queried in the connection (True) or not (False), default: False :param allow_query_preview: allow sample data viewing for assets in the connection (True) or not (False), default: False :param row_limit: maximum number of rows that can be returned by a query, default: 0

Source code in pyatlan/model/packages/tableau_crawler.py
def __init__(
    self,
    client: AtlanClient,
    connection_name: str,
    admin_roles: Optional[List[str]] = None,
    admin_groups: Optional[List[str]] = None,
    admin_users: Optional[List[str]] = None,
    allow_query: bool = False,
    allow_query_preview: bool = False,
    row_limit: int = 0,
):
    super().__init__(
        client=client,
        connection_name=connection_name,
        connection_type=self._CONNECTOR_TYPE,
        admin_roles=admin_roles,
        admin_groups=admin_groups,
        admin_users=admin_users,
        allow_query=allow_query,
        allow_query_preview=allow_query_preview,
        row_limit=row_limit,
        source_logo=self._PACKAGE_LOGO,
    )
Functions
alternate_host(hostname: str) -> TableauCrawler

Set an alternate host to use for the "View in Tableau" button for assets in the UI.

:param hostname: alternate hostname to use :returns: crawler, set to use an alternate host for viewing assets in Tableau

Source code in pyatlan/model/packages/tableau_crawler.py
def alternate_host(self, hostname: str) -> TableauCrawler:
    """
    Set an alternate host to use for the "View in Tableau" button for assets in the UI.

    :param hostname: alternate hostname to use
    :returns: crawler, set to use an alternate host for viewing assets in Tableau
    """
    self._parameters.append({"name": "tableau-alternate-host", "value": hostname})
    return self
basic_auth(username: str, password: str) -> TableauCrawler

Set up the crawler to use basic authentication.

:param username: through which to access Tableau :param password: through which to access Tableau :returns: crawler, set up to use basic authentication

Source code in pyatlan/model/packages/tableau_crawler.py
def basic_auth(self, username: str, password: str) -> TableauCrawler:
    """
    Set up the crawler to use basic authentication.

    :param username: through which to access Tableau
    :param password: through which to access Tableau
    :returns: crawler, set up to use basic authentication
    """
    local_creds = {
        "authType": "basic",
        "username": username,
        "password": password,
    }
    self._credentials_body.update(local_creds)
    return self
crawl_hidden_fields(enabled: bool = True) -> TableauCrawler

Whether to crawl hidden datasource fields (True) or not.

:param enabled: If True, hidden datasource fields will be crawled, otherwise they will not, default: True :returns: crawler, set to include or exclude hidden datasource fields

Source code in pyatlan/model/packages/tableau_crawler.py
def crawl_hidden_fields(self, enabled: bool = True) -> TableauCrawler:
    """
    Whether to crawl hidden datasource fields (True) or not.

    :param enabled: If True, hidden datasource fields
    will be crawled, otherwise they will not, default: True
    :returns: crawler, set to include or exclude hidden datasource fields
    """
    self._parameters.append(
        {
            "name": "crawl-hidden-datasource-fields",
            "value": "true" if enabled else "false",
        }
    )
    return self
crawl_unpublished(enabled: bool = True) -> TableauCrawler

Whether to crawl unpublished worksheets and dashboards (True) or not.

:param enabled: If True, unpublished worksheets and dashboards will be crawled, otherwise they will not, default: True :returns: crawler, set to include or exclude unpublished worksheets and dashboards

Source code in pyatlan/model/packages/tableau_crawler.py
def crawl_unpublished(self, enabled: bool = True) -> TableauCrawler:
    """
    Whether to crawl unpublished worksheets and dashboards (True) or not.

    :param enabled: If True, unpublished worksheets and dashboards
    will be crawled, otherwise they will not, default: True
    :returns: crawler, set to include or exclude unpublished worksheets and dashboards
    """
    self._parameters.append(
        {
            "name": "crawl-unpublished-worksheets-dashboard",
            "value": "true" if enabled else "false",
        }
    )
    return self
direct(hostname: str, site: str, port: int = 443, ssl_enabled: bool = True) -> TableauCrawler

Set up the crawler to extract directly from Tableau.

:param hostname: hostname of Tableau :param site: site in Tableau from which to extract :param port: port for the connection to Tableau :param ssl_enabled: if True, use SSL for the connection, otherwise do not use SSL :returns: crawler, set up to extract directly from Tableau

Source code in pyatlan/model/packages/tableau_crawler.py
def direct(
    self,
    hostname: str,
    site: str,
    port: int = 443,
    ssl_enabled: bool = True,
) -> TableauCrawler:
    """
    Set up the crawler to extract directly from Tableau.

    :param hostname: hostname of Tableau
    :param site: site in Tableau from which to extract
    :param port: port for the connection to Tableau
    :param ssl_enabled: if True, use SSL for the connection, otherwise do not use SSL
    :returns: crawler, set up to extract directly from Tableau
    """
    local_creds = {
        "name": f"default-{self._NAME}-{self._epoch}-0",
        "host": hostname,
        "port": port,
        "extra": {
            "protocol": "https" if ssl_enabled else "http",
            "defaultSite": site,
        },
        "connector_config_name": f"atlan-connectors-{self._NAME}",
    }
    self._credentials_body.update(local_creds)
    self._parameters.append({"name": "extraction-method", "value": "direct"})
    self._parameters.append(
        {"name": "credential-guid", "value": "{{credentialGuid}}"}
    )
    return self
exclude(projects: List[str]) -> TableauCrawler

Defines the filter for projects to exclude when crawling.

:param projects: the GUIDs of projects to exclude when crawling :returns: crawler, set to exclude only those projects specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/tableau_crawler.py
def exclude(self, projects: List[str]) -> TableauCrawler:
    """
    Defines the filter for projects to exclude when crawling.

    :param projects: the GUIDs of projects to exclude when crawling
    :returns: crawler, set to exclude only those projects specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    exclude_projects = projects or []
    to_exclude = self.build_flat_filter(exclude_projects)
    self._parameters.append(dict(name="exclude-filter", value=to_exclude or "{}"))
    return self
include(projects: List[str]) -> TableauCrawler

Defines the filter for projects to include when crawling.

:param projects: the GUIDs of projects to include when crawling :returns: crawler, set to include only those projects specified :raises InvalidRequestException: In the unlikely event the provided filter cannot be translated

Source code in pyatlan/model/packages/tableau_crawler.py
def include(self, projects: List[str]) -> TableauCrawler:
    """
    Defines the filter for projects to include when crawling.

    :param projects: the GUIDs of projects to include when crawling
    :returns: crawler, set to include only those projects specified
    :raises InvalidRequestException: In the unlikely
    event the provided filter cannot be translated
    """
    include_projects = projects or []
    to_include = self.build_flat_filter(include_projects)
    self._parameters.append(
        dict(dict(name="include-filter", value=to_include or "{}"))
    )
    return self
personal_access_token(username: str, access_token: str) -> TableauCrawler

Set up the crawler to use PAT-based authentication.

:param username: through which to access Tableau :param access_token: personal access token for the user, through which to access Tableau :returns: crawler, set up to use PAT-based authentication

Source code in pyatlan/model/packages/tableau_crawler.py
def personal_access_token(self, username: str, access_token: str) -> TableauCrawler:
    """
    Set up the crawler to use PAT-based authentication.

    :param username: through which to access Tableau
    :param access_token: personal access token for the user, through which to access Tableau
    :returns: crawler, set up to use PAT-based authentication
    """
    local_creds = {
        "authType": "personal_access_token",
        "username": username,
        "password": access_token,
    }
    self._credentials_body.update(local_creds)
    return self
s3(bucket_name: str, bucket_prefix: str, bucket_region: Optional[str] = None) -> TableauCrawler

Set up the crawler to fetch metadata directly from the S3 bucket.

:param bucket_name: name of the S3 bucket containing the extracted metadata files :param bucket_prefix: prefix within the S3 bucket where the extracted metadata files are located :param bucket_region: (Optional) region where the S3 bucket is located :returns: crawler, configured to fetch metadata directly from the S3 bucket

Source code in pyatlan/model/packages/tableau_crawler.py
def s3(
    self, bucket_name: str, bucket_prefix: str, bucket_region: Optional[str] = None
) -> TableauCrawler:
    """
    Set up the crawler to fetch metadata directly from the S3 bucket.

    :param bucket_name: name of the S3 bucket containing the extracted metadata files
    :param bucket_prefix: prefix within the S3 bucket where the extracted metadata files are located
    :param bucket_region: (Optional) region where the S3 bucket is located
    :returns: crawler, configured to fetch metadata directly from the S3 bucket
    """
    self._parameters.append(dict(name="extraction-method", value="s3"))
    self._parameters.append(dict(name="metadata-s3-bucket", value=bucket_name))
    self._parameters.append(dict(name="metadata-s3-prefix", value=bucket_prefix))
    self._parameters.append(dict(name="metadata-s3-region", value=bucket_region))
    # Advanced configuration
    self.exclude(projects=[])
    self.include(projects=[])
    self.crawl_unpublished(enabled=True)
    self.crawl_hidden_fields(enabled=True)
    return self

Group

pyatlan.model.group

Classes

Role

pyatlan.model.role

Classes

AtlanRole

Bases: AtlanObject

Attributes
id: str = Field(description='Unique identifier for the role (GUID).\n') class-attribute instance-attribute

Unique identifier for the role (GUID).

User

pyatlan.model.user

Classes

UserProvider

Bases: Protocol

Protocol that is implemented by classes that can provide a list of all the users in Atlan

Functions
get_all_users(limit: int = 20) -> List[AtlanUser]

Retrieve all users defined in Atlan.

:returns: a list of all the users in Atlan

Source code in pyatlan/model/user.py
def get_all_users(
    self,
    limit: int = 20,
) -> List[AtlanUser]:
    """
    Retrieve all users defined in Atlan.

    :returns: a list of all the users in Atlan
    """
get_api_token_by_id(client_id: str) -> Optional[ApiToken]

Retrieves the API token with a name that exactly matches the provided string.

:param display_name: name (as it appears in the UI) by which to retrieve the API token :returns: the API token whose name (in the UI) matches the provided string, or None if there is none

Source code in pyatlan/model/user.py
def get_api_token_by_id(self, client_id: str) -> Optional[ApiToken]:
    """
    Retrieves the API token with a name that exactly matches the provided string.

    :param display_name: name (as it appears in the UI) by which to retrieve the API token
    :returns: the API token whose name (in the UI) matches the provided string, or None if there is none
    """

Task

pyatlan.model.task

Classes

AtlanTask

Bases: AtlanObject

Attributes
ATTEMPT_COUNT: NumericField = NumericField('attemptCount', '__task_attemptCount') class-attribute

Number of times the task has been attempted.

CLASSIFICATION_ID: KeywordField = KeywordField('classificationId', '__task_classificationId') class-attribute

TBC

CREATED_BY: KeywordField = KeywordField('createdBy', '__task_createdBy') class-attribute

User who created the task.

CREATED_TIME: NumericField = NumericField('createdTime', '__task_timestamp') class-attribute

Time (epoch) at which the task was created, in milliseconds.

END_TIME: NumericField = NumericField('endTime', '__task_endTime') class-attribute

Time (epoch) at which the task was ended, in milliseconds.

ENTITY_GUID: KeywordField = KeywordField('entityGuid', '__task_entityGuid') class-attribute

Unique identifier of the asset the task originated from.

GUID: KeywordField = KeywordField('guid', '__task_guid') class-attribute

Unique identifier of the task.

START_TIME: NumericField = NumericField('startTime', '__task_startTime') class-attribute

Time (epoch) at which the task was started, in milliseconds.

STATUS: TextField = TextField('status', '__task_status') class-attribute

Status of the task.

TIME_TAKEN_IN_SECONDS: NumericField = NumericField('timeTakenInSeconds', '__task_timeTakenInSeconds') class-attribute

Total time taken to complete the task, in seconds.

TYPE: KeywordField = KeywordField('type', '__task_type') class-attribute

Type of the task.

UPDATED_TIME: NumericField = NumericField('updatedTime', '__task_modificationTimestamp') class-attribute

Time (epoch) at which the task was last updated, in milliseconds.

TaskSearchRequest

Bases: SearchRequest

Class from which to configure and run a search against Atlan's task queue.

TaskSearchResponse(client: 'ApiCaller', endpoint: API, criteria: TaskSearchRequest, start: int, size: int, count: int, tasks: List[AtlanTask], aggregations: Dict[str, Aggregation])

Bases: Iterable

Captures the response from a search against Atlan's task queue.

Source code in pyatlan/model/task.py
def __init__(
    self,
    client: "ApiCaller",
    endpoint: API,
    criteria: TaskSearchRequest,
    start: int,
    size: int,
    count: int,
    tasks: List[AtlanTask],
    aggregations: Dict[str, Aggregation],
):
    self._client = client
    self._endpoint = endpoint
    self._criteria = criteria
    self._start = start
    self._size = size
    self._count = count
    self._tasks = tasks
    self._aggregations = aggregations
Functions
current_page() -> List[AtlanTask]

Retrieve the current page of results.

:returns: list of assets on the current page of results

Source code in pyatlan/model/task.py
def current_page(self) -> List[AtlanTask]:
    """
    Retrieve the current page of results.

    :returns: list of assets on the current page of results
    """
    return self._tasks
next_page(start=None, size=None) -> bool

Indicates whether there is a next page of results.

:returns: True if there is a next page of results, otherwise False

Source code in pyatlan/model/task.py
def next_page(self, start=None, size=None) -> bool:
    """
    Indicates whether there is a next page of results.

    :returns: True if there is a next page of results, otherwise False
    """
    self._start = start or self._start + self._size
    if size:
        self._size = size
    return self._get_next_page() if self._tasks else False

Workflow

pyatlan.model.workflow

Classes

Query

pyatlan.model.query

Classes

QueryResponse(events: Optional[List[Dict[str, Any]]] = None)

Bases: AtlanObject

Create a single consolidated response from multiple events related to the same query.

Source code in pyatlan/model/query.py
def __init__(self, events: Optional[List[Dict[str, Any]]] = None):
    super().__init__()
    if not events:
        return
    self.rows = []
    self.columns = []
    # Populate the results from all events that have rows
    for event in events:
        event_rows = event.get("rows")
        event_columns = event.get("columns")
        if event_rows:
            self.rows.extend(event_rows)
        if not self.columns and event_columns:
            # Only need to do this once
            self.columns = event_columns
    # Populate the remainder from the final event
    last_event = events[-1]
    self.request_id = last_event.get("requestId")
    self.error_name = last_event.get("errorName")
    self.error_message = last_event.get("errorMessage")
    self.error_code = last_event.get("errorCode")
    self.query_id = last_event.get("queryId")
    self.details = last_event.get("details")
Classes
ColumnDetails

Bases: AtlanObject

Details about the type of column that was returned from a query that was run.

QueryDetails

Bases: AtlanObject

Details about a query that was run.

Response

pyatlan.model.response

Classes

Contract

pyatlan.model.contract

Classes

DataContractSpec

Bases: AtlanYamlModel

Capture the detailed specification of a data contract for an asset.

Classes
Announcement

Bases: AtlanYamlModel

Announcement details for the dataset.

Certification

Bases: AtlanYamlModel

Certification information for the dataset.

DCColumn

Bases: AtlanYamlModel

Details of individual columns in the dataset.

DCTag

Bases: AtlanYamlModel

Tagging details for the dataset.

Owners

Bases: AtlanYamlModel

Owners of the dataset.

Credential

pyatlan.model.credential

Classes

CredentialListResponse

Bases: AtlanObject

Model representing a response containing a list of CredentialResponse objects.

CredentialResponse

Bases: AtlanObject

Functions
to_credential() -> Credential

Convert this response into a credential instance. Note: The password field must still be populated manually, as it will never be returned by a credential lookup for security reasons.

Source code in pyatlan/model/credential.py
def to_credential(self) -> Credential:
    """
    Convert this response into a credential instance.
    Note: The password field must still be populated manually,
    as it will never be returned by a credential lookup for security reasons.
    """
    return Credential(
        id=self.id,
        name=self.name,
        host=self.host,
        port=self.port,
        auth_type=self.auth_type,
        connector_type=self.connector_type,
        connector_config_name=self.connector_config_name,
        username=self.username,
        extras=self.extras,  # type: ignore[call-arg]
    )

CredentialTestResponse

Bases: AtlanObject

Attributes
is_successful: bool property

Whether the test was successful (True) or failed (False).

Returns:

Name Type Description
bool bool

True if the test was successful, False otherwise.

SSO

pyatlan.model.sso

Classes

API Tokens

pyatlan.model.api_tokens

Classes

Aggregation

pyatlan.model.aggregation

Classes

AggregationBucketDetails

Bases: AtlanObject

Captures the results of a single bucket within an aggregation.

Functions
get_source_value(field: AtlanField) -> Optional[str]

Returns the source value of the specified field for this bucket.

:param field: in Atlan for which to retrieve the value :returns: the value of the field in Atlan that is represented within this bucket otherwise None

Source code in pyatlan/model/aggregation.py
def get_source_value(self, field: AtlanField) -> Optional[str]:
    """
    Returns the source value of the specified field for this bucket.

    :param field: in Atlan for which to retrieve the value
    :returns: the value of the field in Atlan that
    is represented within this bucket otherwise `None`
    """
    from pyatlan.model.fields.atlan_fields import (
        AtlanField,
        CustomMetadataField,
        SearchableField,
    )

    validate_type(name="field", _type=AtlanField, value=field)

    if (
        self.nested_results
        and SearchableField.EMBEDDED_SOURCE_VALUE in self.nested_results
    ):
        result = self.nested_results[SearchableField.EMBEDDED_SOURCE_VALUE]
        if (
            isinstance(result, AggregationHitsResult)
            and result.hits
            and result.hits.hits
        ):
            details = result.hits.hits[0]
            if details and details.source:
                if isinstance(field, CustomMetadataField):
                    # Need to handle the hashed-string ID
                    # stuff for custom metadata fields
                    return details.source.get(field.elastic_field_name)
                else:
                    return details.source.get(field.atlan_field_name)
    return None

AggregationBucketResult

Bases: AtlanObject

Captures the results from a bucket aggregation.

AggregationHitsResult

Bases: AtlanObject

Captures the hit results from a bucket aggregation.

Classes
Hits

Bases: AtlanObject

Details of the hits requested.

Stats

Bases: AtlanObject

Attributes
relation: Optional[str] = Field(default=None) class-attribute instance-attribute

Comparison operation used to determine whether the values match.

value: Optional[int] = Field(default=None) class-attribute instance-attribute

Number of search results that matched the hit value.

AggregationMetricResult

Bases: AtlanObject

Captures the results from a metric aggregation.

Aggregations

Bases: AtlanObject

Aggregation results from a search

Functions

Events

pyatlan.model.events

Classes

Keycloak Events

pyatlan.model.keycloak_events

Classes

Suggestions

pyatlan.model.suggestions

Classes

Suggestions

Bases: AtlanObject

Attributes
AGG_TERMS: str = 'group_by_terms' class-attribute

Client through which to find suggestions.

asset: Optional[Asset] = Field(default=None) class-attribute instance-attribute

Asset for which to find suggestions.

include_archived: bool = False class-attribute instance-attribute

Whether to include archived assets as part of suggestions (True) or not (False, default).

includes: List[Suggestions.TYPE] = Field(default_factory=list) class-attribute instance-attribute

Which type(s) of suggestions to include in the search and results.

max_suggestions: int = Field(default=5) class-attribute instance-attribute

Maximum number of suggestions to return (default: 5)

where_nots: Optional[List[Query]] = Field(default_factory=list) class-attribute instance-attribute

By default, we will only match on the name (exactly) of the provided asset. You may want to expand this, for example, to look for assets with the same name as well as without some other context, for example, looking only at columns with the same name that are not in a particular schema (e.g. one used purely for testing purposes).

wheres: Optional[List[Query]] = Field(default_factory=list) class-attribute instance-attribute

By default, we will only match on the name (exactly) of the provided asset. You may want to expand this, for example, to look for assets with the same name as well as with some other context, for example, looking only at columns with the same name that are also in parent tables that have the same name. (Columns like 'ID' may otherwise be insufficiently unique to have very useful suggestions.)

with_other_types: Optional[List[str]] = Field(default_factory=list) class-attribute instance-attribute

By default, we will only look for suggestions on other assets with exactly the same type. You may want to expand this, for example, for suggested metadata for tables you might also want to look at Views. You can add any additional types here you want to consider where an asset with the same name as this asset is likely have similar metadata (and thus be valid for providing suggestions).

Classes
TYPE

Bases: str, Enum

Enum representing suggestion types.

Functions
apply(client: AtlanClient, allow_multiple: bool = False, batch: Optional[Batch] = None) -> Optional[AssetMutationResponse]

Find the requested suggestions and apply the top suggestions as changes to the asset.

Note: this will NOT validate whether there is any existing value for what you are setting, so will clobber any existing value with the suggestion. If you want to be certain you are only updating empty values, you should ensure you are only building a finder for suggestions for values that do not already exist on the asset in question.

:param client: client connectivity to an Atlan tenant :param allow_multiple: if True, allow multiple suggestions to be applied to the asset (up to max_suggestions requested), i.e: for owners, terms and tags :param batch: (optional) the batch in which you want to apply the top suggestions as changes to the asset

Source code in pyatlan/model/suggestions.py
def apply(
    self,
    client: AtlanClient,
    allow_multiple: bool = False,
    batch: Optional[Batch] = None,
) -> Optional[AssetMutationResponse]:
    """
    Find the requested suggestions and apply the top suggestions as changes to the asset.

    Note: this will NOT validate whether there is any existing value for what
    you are setting, so will clobber any existing value with the suggestion.
    If you want to be certain you are only updating empty values, you should ensure
    you are only building a finder for suggestions for values that do not already
    exist on the asset in question.

    :param client: client connectivity to an Atlan tenant
    :param allow_multiple: if `True`, allow multiple suggestions to be applied
    to the asset (up to `max_suggestions` requested), i.e: for owners, terms and tags
    :param batch: (optional) the batch in which you want to apply the top suggestions as changes to the asset
    """
    if batch:
        return batch.add(self._apply(client, allow_multiple).asset)
    result = self._apply(client, allow_multiple)
    return client.save(result.asset, result.include_tags)
finder(asset: Asset) -> Suggestions

Build a suggestion finder against the provided Atlan tenant for the provided asset

:param: asset for which to find suggestions :return: the start of a suggestion finder for the provided asset, against the specified tenant

Source code in pyatlan/model/suggestions.py
def finder(self, asset: Asset) -> Suggestions:
    """
    Build a suggestion finder against
    the provided Atlan tenant for the provided asset

    :param: asset for which to find suggestions
    :return: the start of a suggestion finder
    for the provided asset, against the specified tenant
    """
    self.asset = asset
    return self
include(type: Suggestions.TYPE) -> Suggestions

Add a criterion for which type(s) of suggestions to include in the search results.

:param include: criterion by which to sort the results :returns: the Suggestions with this includes criterion added

Source code in pyatlan/model/suggestions.py
def include(self, type: Suggestions.TYPE) -> Suggestions:
    """
    Add a criterion for which type(s)
    of suggestions to include in the search results.

    :param include: criterion by which to sort the results
    :returns: the `Suggestions` with this `includes` criterion added
    """
    validate_type(name="types", _type=Suggestions.TYPE, value=type)
    clone = self._clone()
    if clone.includes is None:
        clone.includes = []
    clone.includes.append(type)
    return clone
include_archive(include: bool) -> Suggestions

Add a criterion to specify whether to include archived assets as part of the suggestions (True) or not (False).

:param include: criterion by which to sort the results :returns: the Suggestions with this include_archived criterion added

Source code in pyatlan/model/suggestions.py
def include_archive(self, include: bool) -> Suggestions:
    """
    Add a criterion to specify whether to include archived
    assets as part of the suggestions (`True`) or not (`False`).

    :param include: criterion by which to sort the results
    :returns: the `Suggestions` with this `include_archived` criterion added
    """
    validate_type(name="include", _type=bool, value=include)
    clone = self._clone()
    clone.include_archived = include
    return clone
max_suggestion(value: int) -> Suggestions

Add a criterion for maximum number of suggestions to return.

:param value: maximum number of suggestions to return :returns: the Suggestions with this max_suggestions criterion added

Source code in pyatlan/model/suggestions.py
def max_suggestion(self, value: int) -> Suggestions:
    """
    Add a criterion for maximum number of suggestions to return.

    :param value: maximum number of suggestions to return
    :returns: the `Suggestions` with this `max_suggestions` criterion added
    """
    validate_type(name="value", _type=int, value=value)
    clone = self._clone()
    clone.max_suggestions = value
    return clone
where(query: Query) -> Suggestions

Add a single criterion that must be present on every search result. (Note: these are translated to filters.)

:param query: the query to set as a criterion that must be present on every search result :returns: the Suggestions with this where criterion added

Source code in pyatlan/model/suggestions.py
def where(self, query: Query) -> Suggestions:
    """
    Add a single criterion that must be present on every search result.
    (Note: these are translated to filters.)

    :param query: the query to set as a criterion
    that must be present on every search result
    :returns: the `Suggestions` with this `where` criterion added
    """
    validate_type(name="query", _type=Query, value=query)
    clone = self._clone()
    if clone.wheres is None:
        clone.wheres = []
    clone.wheres.append(query)
    return clone
where_not(query: Query) -> Suggestions

Add a single criterion that must not be present on any search result.

:param query: the query to set as a criterion that must not be present on any search result :returns: the Suggestions with this where_not criterion added

Source code in pyatlan/model/suggestions.py
def where_not(self, query: Query) -> Suggestions:
    """
    Add a single criterion that must not be present on any search result.

    :param query: the query to set as a criterion
    that must not be present on any search result
    :returns: the `Suggestions` with this `where_not` criterion added
    """
    validate_type(name="query", _type=Query, value=query)
    clone = self._clone()
    if clone.where_nots is None:
        clone.where_nots = []
    clone.where_nots.append(query)
    return clone
with_other_type(type: str) -> Suggestions

Add a single criterion to include another asset type in the suggestions.

:param type: the asset type to include :returns: the Suggestions with this with_other_type criterion added

Source code in pyatlan/model/suggestions.py
def with_other_type(self, type: str) -> Suggestions:
    """
    Add a single criterion to include another asset type in the suggestions.

    :param type: the asset type to include
    :returns: the `Suggestions` with this `with_other_type` criterion added
    """
    validate_type(name="type", _type=str, value=type)
    clone = self._clone()
    if clone.with_other_types is None:
        clone.with_other_types = []
    clone.with_other_types.append(type)
    return clone

Functions

Search Log

pyatlan.model.search_log

Classes

AssetViews

Bases: AtlanObject

Captures a specific aggregate result of assets and the views on that asset. Instances of this class should be treated as immutable.

SearchLogEntry

Bases: AtlanObject

Represents a log entry for asset search in the search log. Instances of this class should be treated as immutable.

SearchLogRequest(__pydantic_self__, **data: Any)

Bases: SearchRequest

Class from which to configure a search against Atlan's search log.

Source code in pyatlan/model/search_log.py
def __init__(__pydantic_self__, **data: Any) -> None:
    dsl = data.get("dsl")
    class_name = __pydantic_self__.__class__.__name__
    if dsl and isinstance(dsl, DSL) and not dsl.req_class_name:
        data["dsl"] = DSL(req_class_name=class_name, **dsl.dict(exclude_unset=True))
    super().__init__(**data)
Functions
most_recent_viewers(guid: str, max_users: int = 20, exclude_users: Optional[List[str]] = None) -> SearchLogRequest classmethod

Create a search log request to retrieve views of an asset by its GUID.

:param guid: unique identifier of the asset. :param max_users: maximum number of recent users to consider. Defaults to 20. :param exclude_users: a list containing usernames to be excluded from the search log results (optional).

:returns: A SearchLogRequest that can be used to perform the search.

Source code in pyatlan/model/search_log.py
@classmethod
def most_recent_viewers(
    cls,
    guid: str,
    max_users: int = 20,
    exclude_users: Optional[List[str]] = None,
) -> SearchLogRequest:
    """
    Create a search log request to retrieve views of an asset by its GUID.

    :param guid: unique identifier of the asset.
    :param max_users: maximum number of recent users to consider. Defaults to 20.
    :param exclude_users: a list containing usernames to be excluded from the search log results (optional).

    :returns: A SearchLogRequest that can be used to perform the search.
    """
    query_filter = [
        Term(field="entityGuidsAll", value=guid, case_insensitive=False)
    ]
    dsl = DSL(
        **cls._get_view_dsl_kwargs(
            size=0, from_=0, query_filter=query_filter, exclude_users=exclude_users
        ),
        aggregations=cls._get_recent_viewers_aggs(max_users),
    )
    return SearchLogRequest(dsl=dsl)
most_viewed_assets(max_assets: int = 10, by_different_user: bool = False, exclude_users: Optional[List[str]] = None) -> SearchLogRequest classmethod

Create a search log request to retrieve most viewed assets.

:param max_assets: maximum number of assets to consider. Defaults to 10. :param by_different_user: when True, will consider assets viewed by more users as more important than total view count, otherwise will consider total view count most important. :param exclude_users: a list containing usernames to be excluded from the search log results (optional).

:returns: A SearchLogRequest that can be used to perform the search.

Source code in pyatlan/model/search_log.py
@classmethod
def most_viewed_assets(
    cls,
    max_assets: int = 10,
    by_different_user: bool = False,
    exclude_users: Optional[List[str]] = None,
) -> SearchLogRequest:
    """
    Create a search log request to retrieve most viewed assets.

    :param max_assets: maximum number of assets to consider. Defaults to 10.
    :param by_different_user: when True, will consider assets viewed by more users as more
    important than total view count, otherwise will consider total view count most important.
    :param exclude_users: a list containing usernames to be excluded from the search log results (optional).

    :returns: A SearchLogRequest that can be used to perform the search.
    """
    dsl = DSL(
        **cls._get_view_dsl_kwargs(size=0, from_=0, exclude_users=exclude_users),
        aggregations=cls._get_most_viewed_assets_aggs(
            max_assets, by_different_user
        ),
    )
    return SearchLogRequest(dsl=dsl)
views_by_guid(guid: str, size: int = 20, from_: int = 0, sort: Optional[List[SortItem]] = None, exclude_users: Optional[List[str]] = None) -> SearchLogRequest classmethod

Create a search log request to retrieve recent search logs of an assets.

:param guid: unique identifier of the asset. :param size: number of results to retrieve per page. Defaults to 20. :param from_: starting point for paging. Defaults to 0 (very first result) if not overridden. :param sort: properties by which to sort the results (optional). :param exclude_users: a list containing usernames to be excluded from the search log results (optional).

:returns: A SearchLogRequest that can be used to perform the search.

Source code in pyatlan/model/search_log.py
@classmethod
def views_by_guid(
    cls,
    guid: str,
    size: int = 20,
    from_: int = 0,
    sort: Optional[List[SortItem]] = None,
    exclude_users: Optional[List[str]] = None,
) -> SearchLogRequest:
    """
    Create a search log request to retrieve recent search logs of an assets.

    :param guid: unique identifier of the asset.
    :param size: number of results to retrieve per page. Defaults to 20.
    :param from_: starting point for paging. Defaults to 0 (very first result) if not overridden.
    :param sort: properties by which to sort the results (optional).
    :param exclude_users: a list containing usernames to be excluded from the search log results (optional).

    :returns: A SearchLogRequest that can be used to perform the search.
    """
    query_filter = [
        Term(field="entityGuidsAll", value=guid, case_insensitive=False)
    ]
    dsl = DSL(
        **cls._get_view_dsl_kwargs(
            size=size,
            from_=from_,
            query_filter=query_filter,
            sort=sort,
            exclude_users=exclude_users,
        ),
    )
    return SearchLogRequest(dsl=dsl)

SearchLogResults(client: ApiCaller, criteria: SearchLogRequest, start: int, size: int, count: int, log_entries: List[SearchLogEntry], aggregations: Dict[str, Aggregation], bulk: bool = False, processed_log_entries_count: int = 0)

Bases: Iterable

Captures the response from a search against Atlan's recent search logs.

Source code in pyatlan/model/search_log.py
def __init__(
    self,
    client: ApiCaller,
    criteria: SearchLogRequest,
    start: int,
    size: int,
    count: int,
    log_entries: List[SearchLogEntry],
    aggregations: Dict[str, Aggregation],
    bulk: bool = False,
    processed_log_entries_count: int = 0,
):
    self._client = client
    self._endpoint = SEARCH_LOG
    self._criteria = criteria
    self._start = start
    self._size = size
    self._log_entries = log_entries
    self._count = count
    self._approximate_count = count
    self._aggregations = aggregations
    self._bulk = bulk
    self._first_record_creation_time = -2
    self._last_record_creation_time = -2
    self._duplicate_timestamp_page_count: int = 0
    self._processed_log_entries_count: int = processed_log_entries_count
Functions
current_page() -> List[SearchLogEntry]

Retrieve the current page of results.

:returns: list of assets on the current page of results

Source code in pyatlan/model/search_log.py
def current_page(self) -> List[SearchLogEntry]:
    """
    Retrieve the current page of results.

    :returns: list of assets on the current page of results
    """
    return self._log_entries
next_page(start=None, size=None) -> bool

Indicates whether there is a next page of results.

:returns: True if there is a next page of results, otherwise False

Source code in pyatlan/model/search_log.py
def next_page(self, start=None, size=None) -> bool:
    """
    Indicates whether there is a next page of results.

    :returns: True if there is a next page of results, otherwise False
    """
    self._start = start or self._start + self._size
    if size:
        self._size = size
    return self._get_next_page() if self._log_entries else False
presorted_by_timestamp(sorts: Optional[List[SortItem]]) -> bool staticmethod

Checks if the sorting options prioritize creation time in ascending order. :param sorts: list of sorting options or None. :returns: True if sorting is already prioritized by creation time, False otherwise.

Source code in pyatlan/model/search_log.py
@staticmethod
def presorted_by_timestamp(sorts: Optional[List[SortItem]]) -> bool:
    """
    Checks if the sorting options prioritize creation time in ascending order.
    :param sorts: list of sorting options or None.
    :returns: True if sorting is already prioritized by creation time, False otherwise.
    """
    if sorts and isinstance(sorts[0], SortItem):
        return (
            sorts[0].field == "createdAt" and sorts[0].order == SortOrder.ASCENDING
        )
    return False
sort_by_timestamp_first(sorts: List[SortItem]) -> List[SortItem] staticmethod

Rewrites the sorting options to ensure that sorting by creation time, ascending, is the top priority. Adds this condition if it does not already exist, or moves it up to the top sorting priority if it does already exist in the list.

:param sorts: list of sorting options :returns: sorting options, making sorting by creation time in ascending order the top priority

Source code in pyatlan/model/search_log.py
@staticmethod
def sort_by_timestamp_first(sorts: List[SortItem]) -> List[SortItem]:
    """
    Rewrites the sorting options to ensure that
    sorting by creation time, ascending, is the top
    priority. Adds this condition if it does not
    already exist, or moves it up to the top sorting
    priority if it does already exist in the list.

    :param sorts: list of sorting options
    :returns: sorting options, making sorting by
    creation time in ascending order the top priority
    """
    creation_asc_sort = [SortItem("createdAt", order=SortOrder.ASCENDING)]
    if not sorts:
        return creation_asc_sort

    rewritten_sorts = [
        sort
        for sort in sorts
        # Added a condition to disable "timestamp" sorting when bulk search for logs is enabled,
        # as sorting is already handled based on "createdAt" in this case.
        if (
            (not sort.field)
            or (sort.field != Asset.CREATE_TIME.internal_field_name)
        )
        and (sort not in BY_TIMESTAMP)
    ]
    return creation_asc_sort + rewritten_sorts

SearchLogViewResults(count: int, user_views: Optional[List[UserViews]] = None, asset_views: Optional[List[AssetViews]] = None)

Captures the response from a search against Atlan's search log views.

Source code in pyatlan/model/search_log.py
def __init__(
    self,
    count: int,
    user_views: Optional[List[UserViews]] = None,
    asset_views: Optional[List[AssetViews]] = None,
):
    self._count = count
    self._user_views = user_views
    self._asset_views = asset_views

UserViews

Bases: AtlanObject

Represents unique user views entry in the search log. Instances of this class should be treated as immutable.

Data Mesh

pyatlan.model.data_mesh

Classes

DataProductsAssetsDSL

Bases: AtlanObject

Functions
get_asset_selection(search_request: IndexSearchRequest) -> str staticmethod

Returns the selection of assets for the data product, based on the specified Atlan index search request

:param search_request: index search request that defines the assets to include in the data product :returns: search DSL used to define which assets are part of this data product.

Source code in pyatlan/model/data_mesh.py
@staticmethod
def get_asset_selection(search_request: IndexSearchRequest) -> str:
    """
    Returns the selection of assets for the data product,
    based on the specified Atlan index search request

    :param search_request: index search request that
    defines the assets to include in the data product
    :returns: search DSL used to define
    which assets are part of this data product.
    """
    return DataProductsAssetsDSL(query=search_request).to_string()
to_string()

:returns: selected assets DSL JSON string for the data product. :raises: InvalidRequestError If the query provided in the IndexSearchRequest is invalid.

Source code in pyatlan/model/data_mesh.py
def to_string(self):
    """
    :returns: selected assets DSL JSON string for the data product.
    :raises: `InvalidRequestError` If the query provided
    in the `IndexSearchRequest` is invalid.
    """
    search_request = IndexSearchRequest(
        dsl=DSL(
            track_total_hits=None,
            query=self.query.dsl.query,
        ),
        suppress_logs=True,
        request_metadata=None,
        exclude_meanings=None,
        show_search_score=None,
        exclude_atlan_tags=None,
        allow_deleted_relations=None,
        attributes=self._ATTR_LIST,
    )
    dsl_json_str = DataProductsAssetsDSL(query=search_request).json(
        by_alias=True, exclude={"query": {"dsl": {"sort", "size"}}}
    )
    asset_selection_dsl = dict(loads(dsl_json_str, object_hook=self._exclude_nulls))
    return self._contruct_dsl_str(asset_selection_dsl)

File

pyatlan.model.file

Classes

Fluent Tasks

pyatlan.model.fluent_tasks

Classes

FluentTasks(wheres: Optional[List[Query]] = None, where_nots: Optional[List[Query]] = None, where_somes: Optional[List[Query]] = None, _min_somes: int = 1, sorts: Optional[List[SortItem]] = None, aggregations: Optional[Dict[str, Aggregation]] = None, _page_size: Optional[int] = None) dataclass

Search abstraction mechanism, to simplify the most common searches against Atlan task queue (removing the need to understand the guts of Elastic).

Source code in pyatlan/model/fluent_tasks.py
def __init__(
    self,
    wheres: Optional[List[Query]] = None,
    where_nots: Optional[List[Query]] = None,
    where_somes: Optional[List[Query]] = None,
    _min_somes: int = 1,
    sorts: Optional[List[SortItem]] = None,
    aggregations: Optional[Dict[str, Aggregation]] = None,
    _page_size: Optional[int] = None,
):
    self.wheres = wheres
    self.where_nots = where_nots
    self.where_somes = where_somes
    self._min_somes = _min_somes
    self.sorts = sorts
    self.aggregations = aggregations
    self._page_size = _page_size
Functions
aggregate(key: str, aggregation: Aggregation) -> FluentTasks

Add an aggregation to run against the results of the search. You provide any key you want (you'll use it to look at the results of a specific aggregation).

:param key: you want to use to look at the results of the aggregation :param aggregation: you want to calculate :returns: the FluentTasks with this aggregation added

Source code in pyatlan/model/fluent_tasks.py
def aggregate(self, key: str, aggregation: Aggregation) -> FluentTasks:
    """
    Add an aggregation to run against the results of the search.
    You provide any key you want (you'll use it to look at the
    results of a specific aggregation).

    :param key: you want to use to look at the results of the aggregation
    :param aggregation: you want to calculate
    :returns: the `FluentTasks` with this aggregation added
    """
    validate_type(name="key", _type=str, value=key)
    validate_type(name="aggregation", _type=Aggregation, value=aggregation)
    clone = self._clone()
    if clone.aggregations is None:
        clone.aggregations = {}
    clone.aggregations[key] = aggregation
    return clone
count(client: AtlanClient) -> int

Return the total number of tasks that will match the supplied criteria, using the most minimal query possible (retrieves minimal data).

:param client: client through which to count the tasks :raises: InvalidRequestError if no atlan client has been provided :returns: the count of tasks that will match the supplied criteria

Source code in pyatlan/model/fluent_tasks.py
def count(self, client: AtlanClient) -> int:
    """
    Return the total number of tasks that will match the supplied criteria,
    using the most minimal query possible (retrieves minimal data).

    :param client: client through which to count the tasks
    :raises: `InvalidRequestError` if no atlan client has been provided
    :returns: the count of tasks that will match the supplied criteria
    """
    if not isinstance(client, AtlanClient):
        raise ErrorCode.NO_ATLAN_CLIENT.exception_with_parameters()
    dsl = self._dsl()
    dsl.size = 1
    request = TaskSearchRequest(dsl=dsl)
    return client.tasks.search(request).count
execute(client: AtlanClient) -> TaskSearchResponse

Run the fluent search to retrieve tasks that match the supplied criteria.

:param client: client through which to retrieve the tasks :raises: InvalidRequestError if no atlan client has been provided :returns: an iterable list of tasks that match the supplied criteria, lazily-fetched

Source code in pyatlan/model/fluent_tasks.py
def execute(self, client: AtlanClient) -> TaskSearchResponse:
    """
    Run the fluent search to retrieve tasks that match the supplied criteria.

    :param client: client through which to retrieve the tasks
    :raises: `InvalidRequestError` if no atlan client has been provided
    :returns: an iterable list of tasks that match the supplied criteria, lazily-fetched
    """
    if not isinstance(client, AtlanClient):
        raise ErrorCode.NO_ATLAN_CLIENT.exception_with_parameters()
    return client.tasks.search(self.to_request())
min_somes(minimum: int) -> FluentTasks

Sets the minimum number of 'where_somes' that must match for a result to be included.

:param minimum: minimum number of 'where_somes' that must match :returns: the FluentTasks with this min_somes criterion added

Source code in pyatlan/model/fluent_tasks.py
def min_somes(self, minimum: int) -> FluentTasks:
    """
    Sets the minimum number of 'where_somes'
    that must match for a result to be included.

    :param minimum: minimum number of 'where_somes' that must match
    :returns: the `FluentTasks` with this `min_somes` criterion added
    """
    validate_type(name="minimum", _type=int, value=minimum)
    clone = self._clone()
    clone._min_somes = minimum
    return clone
page_size(size: int) -> FluentTasks

Set the number of results to retrieve per underlying API request.

:param size: number of results to retrieve per underlying API request :returns: the FluentTasks with this parameter configured

Source code in pyatlan/model/fluent_tasks.py
def page_size(self, size: int) -> FluentTasks:
    """
    Set the number of results to retrieve per underlying API request.

    :param size: number of results to retrieve per underlying API request
    :returns: the `FluentTasks` with this parameter configured
    """
    validate_type(name="size", _type=int, value=size)
    clone = self._clone()
    clone._page_size = size
    return clone
sort(by: SortItem) -> FluentTasks

Add a criterion by which to sort the results.

:param by: criterion by which to sort the results :returns: the FluentTasks with this sorting criterion added

Source code in pyatlan/model/fluent_tasks.py
def sort(self, by: SortItem) -> FluentTasks:
    """
    Add a criterion by which to sort the results.

    :param by: criterion by which to sort the results
    :returns: the `FluentTasks` with this sorting criterion added
    """
    validate_type(name="by", _type=SortItem, value=by)
    clone = self._clone()
    if clone.sorts is None:
        clone.sorts = []
    clone.sorts.append(by)
    return clone
to_query() -> Query

Translate the Atlan compound query into an Elastic Query object.

:returns: an Elastic Query object that represents the compound query

Source code in pyatlan/model/fluent_tasks.py
def to_query(self) -> Query:
    """
    Translate the Atlan compound query into an Elastic Query object.

    :returns: an Elastic Query object that represents the compound query
    """
    q = Bool()
    q.filter = self.wheres or []
    q.must_not = self.where_nots or []
    if self.where_somes:
        q.should = self.where_somes
        q.minimum_should_match = self._min_somes
    return q
to_request() -> TaskSearchRequest

:returns: the TaskSearchRequest that encapsulates information specified in this FluentTasks

Source code in pyatlan/model/fluent_tasks.py
def to_request(self) -> TaskSearchRequest:
    """
    :returns: the TaskSearchRequest that encapsulates
    information specified in this FluentTasks
    """
    dsl = self._dsl()
    if self._page_size:
        dsl.size = self._page_size
    if self.sorts:
        dsl.sort = self.sorts
    if self.aggregations:
        dsl.aggregations.update(self.aggregations)
    request = TaskSearchRequest(dsl=dsl)
    return request
where(query: Query) -> FluentTasks

Add a single criterion that must be present on every search result. (Note: these are translated to filters.)

:param query: the query to set as a criterion that must be present on every search result :returns: the FluentTasks with this where criterion added

Source code in pyatlan/model/fluent_tasks.py
def where(self, query: Query) -> FluentTasks:
    """
    Add a single criterion that must be present
    on every search result. (Note: these are translated to filters.)

    :param query: the query to set as a criterion
    that must be present on every search result
    :returns: the `FluentTasks` with this `where` criterion added
    """
    validate_type(name="query", _type=Query, value=query)
    clone = self._clone()
    if clone.wheres is None:
        clone.wheres = []
    clone.wheres.append(query)
    return clone
where_not(query: Query) -> FluentTasks

Add a single criterion that must not be present on any search result.

:param query: the query to set as a criterion that must not be present on any search result :returns: the FluentTasks with this where_not criterion added

Source code in pyatlan/model/fluent_tasks.py
def where_not(self, query: Query) -> FluentTasks:
    """
    Add a single criterion that must not be present on any search result.

    :param query: the query to set as a criterion
    that must not be present on any search result
    :returns: the `FluentTasks` with this `where_not` criterion added
    """
    validate_type(name="query", _type=Query, value=query)
    clone = self._clone()
    if clone.where_nots is None:
        clone.where_nots = []
    clone.where_nots.append(query)
    return clone
where_some(query: Query) -> FluentTasks

Add a single criterion at least some of which should be present on each search result. You can control "how many" of the criteria added this way are a minimum for each search result to match through the 'minimum' property.

:param query: the query to set as a criterion some number of which should be present on a search result :returns: the FluentTasks with this where_some criterion added

Source code in pyatlan/model/fluent_tasks.py
def where_some(self, query: Query) -> FluentTasks:
    """
    Add a single criterion at least some of which should
    be present on each search result. You can control "how many"
    of the criteria added this way are a minimum for each search
    result to match through the 'minimum' property.

    :param query: the query to set as a criterion
    some number of which should be present on a search result
    :returns: the `FluentTasks` with this `where_some` criterion added
    """
    validate_type(name="query", _type=Query, value=query)
    clone = self._clone()
    if clone.where_somes is None:
        clone.where_somes = []
    clone.where_somes.append(query)
    return clone

Functions

Lineage Ref

pyatlan.model.lineage_ref

Classes