# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
from __future__ import annotations
import json
import time
import uuid
from datetime import datetime
from typing import TYPE_CHECKING, ClassVar, List, Optional, Set
from pydantic.v1 import Field, validator
from pyatlan.errors import ErrorCode
from pyatlan.model.enums import (
DataQualityDimension,
DataQualityResult,
DataQualityRuleAlertPriority,
DataQualityRuleCustomSQLReturnType,
DataQualityRuleStatus,
DataQualityRuleTemplateType,
DataQualityRuleThresholdCompareOperator,
DataQualityRuleThresholdUnit,
DataQualitySourceSyncStatus,
)
from pyatlan.model.fields.atlan_fields import (
BooleanField,
KeywordField,
NumericField,
RelationField,
)
from pyatlan.model.structs import (
DataQualityRuleConfigArguments,
DataQualityRuleThresholdObject,
)
from pyatlan.utils import init_guid, validate_required_fields
from .asset import SelfAsset
from .data_quality import DataQuality
if TYPE_CHECKING:
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Column
[docs]
class DataQualityRule(DataQuality):
"""Description"""
@classmethod
@init_guid
def custom_sql_creator(
cls,
*,
client: AtlanClient,
rule_name: str,
asset: Asset,
custom_sql: str,
threshold_compare_operator: DataQualityRuleThresholdCompareOperator,
threshold_value: int,
alert_priority: DataQualityRuleAlertPriority,
dimension: DataQualityDimension,
custom_sql_return_type: Optional[DataQualityRuleCustomSQLReturnType] = None,
description: Optional[str] = None,
) -> DataQualityRule:
validate_required_fields(
[
"client",
"rule_name",
"asset",
"threshold_compare_operator",
"threshold_value",
"alert_priority",
"dimension",
"custom_sql",
],
[
client,
rule_name,
asset,
threshold_compare_operator,
threshold_value,
alert_priority,
dimension,
custom_sql,
],
)
attributes = DataQualityRule.Attributes.creator(
client=client,
rule_name=rule_name,
rule_type=DataQualityRuleTemplateType.CUSTOM_SQL,
asset=asset,
threshold_compare_operator=threshold_compare_operator,
threshold_value=threshold_value,
alert_priority=alert_priority,
dimension=dimension,
custom_sql=custom_sql,
custom_sql_return_type=custom_sql_return_type,
description=description,
column=None,
threshold_unit=None,
)
return cls(attributes=attributes)
@classmethod
@init_guid
def table_level_rule_creator(
cls,
*,
client: AtlanClient,
rule_type: DataQualityRuleTemplateType,
asset: Asset,
threshold_value: int,
alert_priority: DataQualityRuleAlertPriority,
threshold_compare_operator: Optional[
DataQualityRuleThresholdCompareOperator
] = None,
threshold_unit: Optional[DataQualityRuleThresholdUnit] = None,
rule_conditions: Optional[str] = None,
row_scope_filtering_enabled: Optional[bool] = False,
) -> DataQualityRule:
validate_required_fields(
[
"client",
"rule_type",
"asset",
"threshold_value",
"alert_priority",
],
[
client,
rule_type,
asset,
threshold_value,
alert_priority,
],
)
template_config = client.dq_template_config_cache.get_template_config(
rule_type.value
)
asset_for_validation, target_table_asset = (
DataQualityRule.Attributes._fetch_assets_for_row_scope_validation(
client, asset, rule_conditions, row_scope_filtering_enabled or False
)
)
validated_threshold_operator = (
DataQualityRule.Attributes._validate_template_features(
rule_type,
rule_conditions,
row_scope_filtering_enabled,
template_config,
threshold_compare_operator,
asset_for_validation,
target_table_asset,
)
)
final_threshold_compare_operator = (
validated_threshold_operator
or threshold_compare_operator
or DataQualityRuleThresholdCompareOperator.LESS_THAN_EQUAL
)
attributes = DataQualityRule.Attributes.creator(
client=client,
rule_type=rule_type,
asset=asset,
threshold_compare_operator=final_threshold_compare_operator,
threshold_value=threshold_value,
alert_priority=alert_priority,
rule_name=None,
column=None,
threshold_unit=threshold_unit,
dimension=None,
custom_sql=None,
description=None,
rule_conditions=rule_conditions,
row_scope_filtering_enabled=row_scope_filtering_enabled,
)
return cls(attributes=attributes)
@classmethod
@init_guid
def column_level_rule_creator(
cls,
*,
client: AtlanClient,
rule_type: DataQualityRuleTemplateType,
asset: Asset,
column: Asset,
threshold_value: int,
alert_priority: DataQualityRuleAlertPriority,
threshold_compare_operator: Optional[
DataQualityRuleThresholdCompareOperator
] = None,
threshold_unit: Optional[DataQualityRuleThresholdUnit] = None,
rule_conditions: Optional[str] = None,
row_scope_filtering_enabled: Optional[bool] = False,
) -> DataQualityRule:
validate_required_fields(
[
"client",
"rule_type",
"asset",
"column",
"threshold_value",
"alert_priority",
],
[
client,
rule_type,
asset,
column,
threshold_value,
alert_priority,
],
)
template_config = client.dq_template_config_cache.get_template_config(
rule_type.value
)
asset_for_validation, target_table_asset = (
DataQualityRule.Attributes._fetch_assets_for_row_scope_validation(
client, asset, rule_conditions, row_scope_filtering_enabled or False
)
)
validated_threshold_operator = (
DataQualityRule.Attributes._validate_template_features(
rule_type,
rule_conditions,
row_scope_filtering_enabled,
template_config,
threshold_compare_operator,
asset_for_validation,
target_table_asset,
)
)
final_threshold_compare_operator = (
validated_threshold_operator
or threshold_compare_operator
or DataQualityRuleThresholdCompareOperator.LESS_THAN_EQUAL
)
attributes = DataQualityRule.Attributes.creator(
client=client,
rule_type=rule_type,
asset=asset,
column=column,
threshold_compare_operator=final_threshold_compare_operator,
threshold_value=threshold_value,
alert_priority=alert_priority,
threshold_unit=threshold_unit,
rule_name=None,
dimension=None,
custom_sql=None,
description=None,
rule_conditions=rule_conditions,
row_scope_filtering_enabled=row_scope_filtering_enabled,
)
return cls(attributes=attributes)
@classmethod
@init_guid
def updater(
cls: type[SelfAsset],
client: AtlanClient,
qualified_name: str,
threshold_compare_operator: Optional[
DataQualityRuleThresholdCompareOperator
] = None,
threshold_value: Optional[int] = None,
alert_priority: Optional[DataQualityRuleAlertPriority] = None,
threshold_unit: Optional[DataQualityRuleThresholdUnit] = None,
dimension: Optional[DataQualityDimension] = None,
custom_sql: Optional[str] = None,
custom_sql_return_type: Optional[DataQualityRuleCustomSQLReturnType] = None,
rule_name: Optional[str] = None,
description: Optional[str] = None,
rule_conditions: Optional[str] = None,
row_scope_filtering_enabled: Optional[bool] = False,
) -> SelfAsset:
from pyatlan.model.fluent_search import FluentSearch
validate_required_fields(
["client", "qualified_name"],
[client, qualified_name],
)
request = (
FluentSearch()
.where(DataQualityRule.QUALIFIED_NAME.eq(qualified_name))
.include_on_results(DataQualityRule.NAME)
.include_on_results(DataQualityRule.DQ_RULE_TEMPLATE_NAME)
.include_on_results(DataQualityRule.DQ_RULE_TEMPLATE)
.include_on_results(DataQualityRule.DQ_RULE_BASE_DATASET)
.include_on_results(DataQualityRule.DQ_RULE_BASE_COLUMN)
.include_on_results(DataQualityRule.DQ_RULE_ALERT_PRIORITY)
.include_on_results(DataQualityRule.DISPLAY_NAME)
.include_on_results(DataQualityRule.DQ_RULE_CUSTOM_SQL)
.include_on_results(DataQualityRule.DQ_RULE_CUSTOM_SQL_RETURN_TYPE)
.include_on_results(DataQualityRule.USER_DESCRIPTION)
.include_on_results(DataQualityRule.DQ_RULE_DIMENSION)
.include_on_results(DataQualityRule.DQ_RULE_CONFIG_ARGUMENTS)
.include_on_results(DataQualityRule.DQ_RULE_ROW_SCOPE_FILTERING_ENABLED)
.include_on_results(DataQualityRule.DQ_RULE_SOURCE_SYNC_STATUS)
.include_on_results(DataQualityRule.DQ_RULE_STATUS)
).to_request()
results = client.asset.search(request)
if results.count != 1:
raise ValueError(
f"Expected exactly 1 asset for qualified_name: {qualified_name}, "
f"but found: {results.count}"
)
search_result = results.current_page()[0]
retrieved_custom_sql = search_result.dq_rule_custom_s_q_l # type: ignore[attr-defined]
retrieved_custom_sql_return_type = (
search_result.dq_rule_custom_s_q_l_return_type # type: ignore[attr-defined]
)
retrieved_rule_name = search_result.display_name
retrieved_dimension = search_result.dq_rule_dimension # type: ignore[attr-defined]
retrieved_column = search_result.dq_rule_base_column # type: ignore[attr-defined]
retrieved_alert_priority = search_result.dq_rule_alert_priority # type: ignore[attr-defined]
retrieved_row_scope_filtering_enabled = (
search_result.dq_rule_row_scope_filtering_enabled # type: ignore[attr-defined]
)
retrieved_description = search_result.user_description
retrieved_asset = search_result.dq_rule_base_dataset # type: ignore[attr-defined]
retrieved_template_rule_name = search_result.dq_rule_template_name # type: ignore[attr-defined]
retrieved_template = search_result.dq_rule_template # type: ignore[attr-defined]
retrieved_threshold_compare_operator = (
search_result.dq_rule_config_arguments.dq_rule_threshold_object.dq_rule_threshold_compare_operator # type: ignore[attr-defined]
if search_result.dq_rule_config_arguments is not None # type: ignore[attr-defined]
and search_result.dq_rule_config_arguments.dq_rule_threshold_object # type: ignore[attr-defined]
is not None
else None
)
retrieved_threshold_value = (
search_result.dq_rule_config_arguments.dq_rule_threshold_object.dq_rule_threshold_value # type: ignore[attr-defined]
if search_result.dq_rule_config_arguments is not None # type: ignore[attr-defined]
and search_result.dq_rule_config_arguments.dq_rule_threshold_object # type: ignore[attr-defined]
is not None
else None
) # type: ignore[attr-defined]
retrieved_threshold_unit = (
search_result.dq_rule_config_arguments.dq_rule_threshold_object.dq_rule_threshold_unit # type: ignore[attr-defined]
if search_result.dq_rule_config_arguments is not None # type: ignore[attr-defined]
and search_result.dq_rule_config_arguments.dq_rule_threshold_object # type: ignore[attr-defined]
is not None
else None
) # type: ignore[attr-defined]
template_config = None
if retrieved_template_rule_name:
template_config = client.dq_template_config_cache.get_template_config(
retrieved_template_rule_name
)
if rule_conditions:
final_rule_conditions = rule_conditions
elif search_result.dq_rule_config_arguments is not None: # type: ignore[attr-defined]
final_rule_conditions = (
search_result.dq_rule_config_arguments.dq_rule_config_rule_conditions # type: ignore[attr-defined]
)
else:
final_rule_conditions = None
final_row_scope_filtering_enabled = (
row_scope_filtering_enabled or retrieved_row_scope_filtering_enabled
)
if retrieved_asset:
retrieved_asset, target_table_asset = (
DataQualityRule.Attributes._fetch_assets_for_row_scope_validation(
client,
retrieved_asset,
final_rule_conditions,
final_row_scope_filtering_enabled,
)
)
else:
target_table_asset = None
validated_threshold_operator = None
if retrieved_template_rule_name and template_config:
try:
retrieved_rule_type = DataQualityRuleTemplateType(
retrieved_template_rule_name
)
validated_threshold_operator = (
DataQualityRule.Attributes._validate_template_features(
retrieved_rule_type,
final_rule_conditions,
final_row_scope_filtering_enabled,
template_config,
threshold_compare_operator
or retrieved_threshold_compare_operator,
retrieved_asset,
target_table_asset,
)
)
except ValueError:
pass
final_compare_operator = (
validated_threshold_operator
or threshold_compare_operator
or retrieved_threshold_compare_operator
or DataQualityRuleThresholdCompareOperator.LESS_THAN_EQUAL
)
attr_dq = cls.Attributes(
name="",
dq_rule_config_arguments=DataQualityRuleConfigArguments(
dq_rule_threshold_object=DataQualityRuleThresholdObject(
dq_rule_threshold_compare_operator=final_compare_operator,
dq_rule_threshold_value=threshold_value
or retrieved_threshold_value,
dq_rule_threshold_unit=threshold_unit or retrieved_threshold_unit,
),
dq_rule_config_rule_conditions=final_rule_conditions,
),
dq_rule_base_dataset_qualified_name=retrieved_asset.qualified_name,
dq_rule_alert_priority=alert_priority or retrieved_alert_priority,
dq_rule_row_scope_filtering_enabled=final_row_scope_filtering_enabled,
dq_rule_base_dataset=retrieved_asset,
qualified_name=qualified_name,
dq_rule_dimension=dimension or retrieved_dimension,
dq_rule_template_name=retrieved_template_rule_name,
dq_rule_template=DataQualityRuleTemplate.ref_by_qualified_name(
qualified_name=retrieved_template.qualified_name
),
)
if retrieved_column is not None:
attr_dq.dq_rule_base_column_qualified_name = retrieved_column.qualified_name
attr_dq.dq_rule_base_column = retrieved_column # type: ignore
custom_sql = custom_sql or retrieved_custom_sql
if custom_sql is not None:
attr_dq.dq_rule_custom_s_q_l = custom_sql
attr_dq.display_name = rule_name or retrieved_rule_name
attr_dq.dq_rule_custom_s_q_l_return_type = (
custom_sql_return_type or retrieved_custom_sql_return_type
)
if description is not None:
attr_dq.user_description = description or retrieved_description
return cls(attributes=attr_dq)
type_name: str = Field(default="DataQualityRule", allow_mutation=False)
@validator("type_name")
def validate_type_name(cls, v):
if v != "DataQualityRule":
raise ValueError("must be DataQualityRule")
return v
def __setattr__(self, name, value):
if name in DataQualityRule._convenience_properties:
return object.__setattr__(self, name, value)
super().__setattr__(name, value)
DQ_RULE_BASE_DATASET_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField(
"dqRuleBaseDatasetQualifiedName", "dqRuleBaseDatasetQualifiedName"
)
"""
Base dataset qualified name that attached to this rule.
"""
DQ_RULE_BASE_COLUMN_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField(
"dqRuleBaseColumnQualifiedName", "dqRuleBaseColumnQualifiedName"
)
"""
Base column qualified name that attached to this rule.
"""
DQ_RULE_REFERENCE_DATASET_QUALIFIED_NAMES: ClassVar[KeywordField] = KeywordField(
"dqRuleReferenceDatasetQualifiedNames", "dqRuleReferenceDatasetQualifiedNames"
)
"""
List of unique reference dataset's qualified names related to this rule.
"""
DQ_RULE_REFERENCE_COLUMN_QUALIFIED_NAMES: ClassVar[KeywordField] = KeywordField(
"dqRuleReferenceColumnQualifiedNames", "dqRuleReferenceColumnQualifiedNames"
)
"""
List of unique reference column's qualified names related to this rule.
"""
DQ_RULE_SOURCE_SYNC_STATUS: ClassVar[KeywordField] = KeywordField(
"dqRuleSourceSyncStatus", "dqRuleSourceSyncStatus"
)
"""
Latest sync status of the rule to the source.
"""
DQ_RULE_SOURCE_SYNC_ERROR_CODE: ClassVar[KeywordField] = KeywordField(
"dqRuleSourceSyncErrorCode", "dqRuleSourceSyncErrorCode"
)
"""
Error code in the case of state being "failure".
"""
DQ_RULE_SOURCE_SYNC_ERROR_MESSAGE: ClassVar[KeywordField] = KeywordField(
"dqRuleSourceSyncErrorMessage", "dqRuleSourceSyncErrorMessage"
)
"""
Error message in the case of state being "error".
"""
DQ_RULE_SOURCE_SYNC_RAW_ERROR: ClassVar[KeywordField] = KeywordField(
"dqRuleSourceSyncRawError", "dqRuleSourceSyncRawError"
)
"""
Raw error message from the source.
"""
DQ_RULE_SOURCE_SYNCED_AT: ClassVar[NumericField] = NumericField(
"dqRuleSourceSyncedAt", "dqRuleSourceSyncedAt"
)
"""
Time (epoch) at which the rule synced to the source.
"""
DQ_RULE_LATEST_RESULT: ClassVar[KeywordField] = KeywordField(
"dqRuleLatestResult", "dqRuleLatestResult"
)
"""
Latest result of the rule.
"""
DQ_RULE_LATEST_RESULT_COMPUTED_AT: ClassVar[NumericField] = NumericField(
"dqRuleLatestResultComputedAt", "dqRuleLatestResultComputedAt"
)
"""
Time (epoch) at which the latest rule result was evaluated.
"""
DQ_RULE_LATEST_RESULT_FETCHED_AT: ClassVar[NumericField] = NumericField(
"dqRuleLatestResultFetchedAt", "dqRuleLatestResultFetchedAt"
)
"""
Time (epoch) at which the latest rule result was fetched.
"""
DQ_RULE_LATEST_METRIC_VALUE: ClassVar[KeywordField] = KeywordField(
"dqRuleLatestMetricValue", "dqRuleLatestMetricValue"
)
"""
Last result metrics value of the rule.
"""
DQ_RULE_LATEST_METRIC_VALUE_COMPUTED_AT: ClassVar[NumericField] = NumericField(
"dqRuleLatestMetricValueComputedAt", "dqRuleLatestMetricValueComputedAt"
)
"""
Time (epoch) at which the latest metric value was evaluated in the source.
"""
DQ_RULE_DIMENSION: ClassVar[KeywordField] = KeywordField(
"dqRuleDimension", "dqRuleDimension"
)
"""
Dimension of the data quality rule.
"""
DQ_RULE_TEMPLATE_NAME: ClassVar[KeywordField] = KeywordField(
"dqRuleTemplateName", "dqRuleTemplateName"
)
"""
Name of the rule template corresponding to the rule.
"""
DQ_RULE_STATUS: ClassVar[KeywordField] = KeywordField(
"dqRuleStatus", "dqRuleStatus"
)
"""
Status of the rule.
"""
DQ_RULE_ALERT_PRIORITY: ClassVar[KeywordField] = KeywordField(
"dqRuleAlertPriority", "dqRuleAlertPriority"
)
"""
Default priority level for alerts involving this rule.
"""
DQ_RULE_CONFIG_ARGUMENTS: ClassVar[KeywordField] = KeywordField(
"dqRuleConfigArguments", "dqRuleConfigArguments"
)
"""
Json string of the rule config that contains the rule definitions.
"""
DQ_RULE_CUSTOM_SQL: ClassVar[KeywordField] = KeywordField(
"dqRuleCustomSQL", "dqRuleCustomSQL"
)
"""
SQL code for custom SQL rules.
"""
DQ_RULE_CUSTOM_SQL_RETURN_TYPE: ClassVar[KeywordField] = KeywordField(
"dqRuleCustomSQLReturnType", "dqRuleCustomSQLReturnType"
)
"""
Type of result returned by the custom SQL (number of rows or numeric value).
"""
DQ_RULE_FAILED_ROWS_SQL: ClassVar[KeywordField] = KeywordField(
"dqRuleFailedRowsSQL", "dqRuleFailedRowsSQL"
)
"""
SQL query used to retrieve failed rows.
"""
DQ_RULE_ROW_SCOPE_FILTERING_ENABLED: ClassVar[BooleanField] = BooleanField(
"dqRuleRowScopeFilteringEnabled", "dqRuleRowScopeFilteringEnabled"
)
"""
Whether row scope filtering is enabled for this data quality rule (true) or not (false).
"""
DQ_RULE_BASE_DATASET: ClassVar[RelationField] = RelationField("dqRuleBaseDataset")
"""
TBC
"""
DQ_RULE_REFERENCE_DATASETS: ClassVar[RelationField] = RelationField(
"dqRuleReferenceDatasets"
)
"""
TBC
"""
DQ_RULE_TEMPLATE: ClassVar[RelationField] = RelationField("dqRuleTemplate")
"""
TBC
"""
DQ_RULE_BASE_COLUMN: ClassVar[RelationField] = RelationField("dqRuleBaseColumn")
"""
TBC
"""
DQ_RULE_REFERENCE_COLUMNS: ClassVar[RelationField] = RelationField(
"dqRuleReferenceColumns"
)
"""
TBC
"""
_convenience_properties: ClassVar[List[str]] = [
"dq_rule_base_dataset_qualified_name",
"dq_rule_base_column_qualified_name",
"dq_rule_reference_dataset_qualified_names",
"dq_rule_reference_column_qualified_names",
"dq_rule_source_sync_status",
"dq_rule_source_sync_error_code",
"dq_rule_source_sync_error_message",
"dq_rule_source_sync_raw_error",
"dq_rule_source_synced_at",
"dq_rule_latest_result",
"dq_rule_latest_result_computed_at",
"dq_rule_latest_result_fetched_at",
"dq_rule_latest_metric_value",
"dq_rule_latest_metric_value_computed_at",
"dq_rule_dimension",
"dq_rule_template_name",
"dq_rule_status",
"dq_rule_alert_priority",
"dq_rule_config_arguments",
"dq_rule_custom_s_q_l",
"dq_rule_custom_s_q_l_return_type",
"dq_rule_failed_rows_s_q_l",
"dq_rule_row_scope_filtering_enabled",
"dq_rule_base_dataset",
"dq_rule_reference_datasets",
"dq_rule_template",
"dq_rule_base_column",
"dq_rule_reference_columns",
]
@property
def dq_rule_base_dataset_qualified_name(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_base_dataset_qualified_name
)
@dq_rule_base_dataset_qualified_name.setter
def dq_rule_base_dataset_qualified_name(
self, dq_rule_base_dataset_qualified_name: Optional[str]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_base_dataset_qualified_name = (
dq_rule_base_dataset_qualified_name
)
@property
def dq_rule_base_column_qualified_name(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_base_column_qualified_name
)
@dq_rule_base_column_qualified_name.setter
def dq_rule_base_column_qualified_name(
self, dq_rule_base_column_qualified_name: Optional[str]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_base_column_qualified_name = (
dq_rule_base_column_qualified_name
)
@property
def dq_rule_reference_dataset_qualified_names(self) -> Optional[Set[str]]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_reference_dataset_qualified_names
)
@dq_rule_reference_dataset_qualified_names.setter
def dq_rule_reference_dataset_qualified_names(
self, dq_rule_reference_dataset_qualified_names: Optional[Set[str]]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_reference_dataset_qualified_names = (
dq_rule_reference_dataset_qualified_names
)
@property
def dq_rule_reference_column_qualified_names(self) -> Optional[Set[str]]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_reference_column_qualified_names
)
@dq_rule_reference_column_qualified_names.setter
def dq_rule_reference_column_qualified_names(
self, dq_rule_reference_column_qualified_names: Optional[Set[str]]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_reference_column_qualified_names = (
dq_rule_reference_column_qualified_names
)
@property
def dq_rule_source_sync_status(self) -> Optional[DataQualitySourceSyncStatus]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_source_sync_status
)
@dq_rule_source_sync_status.setter
def dq_rule_source_sync_status(
self, dq_rule_source_sync_status: Optional[DataQualitySourceSyncStatus]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_source_sync_status = dq_rule_source_sync_status
@property
def dq_rule_source_sync_error_code(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_source_sync_error_code
)
@dq_rule_source_sync_error_code.setter
def dq_rule_source_sync_error_code(
self, dq_rule_source_sync_error_code: Optional[str]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_source_sync_error_code = dq_rule_source_sync_error_code
@property
def dq_rule_source_sync_error_message(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_source_sync_error_message
)
@dq_rule_source_sync_error_message.setter
def dq_rule_source_sync_error_message(
self, dq_rule_source_sync_error_message: Optional[str]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_source_sync_error_message = (
dq_rule_source_sync_error_message
)
@property
def dq_rule_source_sync_raw_error(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_source_sync_raw_error
)
@dq_rule_source_sync_raw_error.setter
def dq_rule_source_sync_raw_error(
self, dq_rule_source_sync_raw_error: Optional[str]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_source_sync_raw_error = dq_rule_source_sync_raw_error
@property
def dq_rule_source_synced_at(self) -> Optional[datetime]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_source_synced_at
)
@dq_rule_source_synced_at.setter
def dq_rule_source_synced_at(self, dq_rule_source_synced_at: Optional[datetime]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_source_synced_at = dq_rule_source_synced_at
@property
def dq_rule_latest_result(self) -> Optional[DataQualityResult]:
return (
None if self.attributes is None else self.attributes.dq_rule_latest_result
)
@dq_rule_latest_result.setter
def dq_rule_latest_result(self, dq_rule_latest_result: Optional[DataQualityResult]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_latest_result = dq_rule_latest_result
@property
def dq_rule_latest_result_computed_at(self) -> Optional[datetime]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_latest_result_computed_at
)
@dq_rule_latest_result_computed_at.setter
def dq_rule_latest_result_computed_at(
self, dq_rule_latest_result_computed_at: Optional[datetime]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_latest_result_computed_at = (
dq_rule_latest_result_computed_at
)
@property
def dq_rule_latest_result_fetched_at(self) -> Optional[datetime]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_latest_result_fetched_at
)
@dq_rule_latest_result_fetched_at.setter
def dq_rule_latest_result_fetched_at(
self, dq_rule_latest_result_fetched_at: Optional[datetime]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_latest_result_fetched_at = (
dq_rule_latest_result_fetched_at
)
@property
def dq_rule_latest_metric_value(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_latest_metric_value
)
@dq_rule_latest_metric_value.setter
def dq_rule_latest_metric_value(self, dq_rule_latest_metric_value: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_latest_metric_value = dq_rule_latest_metric_value
@property
def dq_rule_latest_metric_value_computed_at(self) -> Optional[datetime]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_latest_metric_value_computed_at
)
@dq_rule_latest_metric_value_computed_at.setter
def dq_rule_latest_metric_value_computed_at(
self, dq_rule_latest_metric_value_computed_at: Optional[datetime]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_latest_metric_value_computed_at = (
dq_rule_latest_metric_value_computed_at
)
@property
def dq_rule_dimension(self) -> Optional[DataQualityDimension]:
return None if self.attributes is None else self.attributes.dq_rule_dimension
@dq_rule_dimension.setter
def dq_rule_dimension(self, dq_rule_dimension: Optional[DataQualityDimension]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_dimension = dq_rule_dimension
@property
def dq_rule_template_name(self) -> Optional[str]:
return (
None if self.attributes is None else self.attributes.dq_rule_template_name
)
@dq_rule_template_name.setter
def dq_rule_template_name(self, dq_rule_template_name: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_template_name = dq_rule_template_name
@property
def dq_rule_status(self) -> Optional[DataQualityRuleStatus]:
return None if self.attributes is None else self.attributes.dq_rule_status
@dq_rule_status.setter
def dq_rule_status(self, dq_rule_status: Optional[DataQualityRuleStatus]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_status = dq_rule_status
@property
def dq_rule_alert_priority(self) -> Optional[DataQualityRuleAlertPriority]:
return (
None if self.attributes is None else self.attributes.dq_rule_alert_priority
)
@dq_rule_alert_priority.setter
def dq_rule_alert_priority(
self, dq_rule_alert_priority: Optional[DataQualityRuleAlertPriority]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_alert_priority = dq_rule_alert_priority
@property
def dq_rule_config_arguments(self) -> Optional[DataQualityRuleConfigArguments]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_config_arguments
)
@dq_rule_config_arguments.setter
def dq_rule_config_arguments(
self, dq_rule_config_arguments: Optional[DataQualityRuleConfigArguments]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_config_arguments = dq_rule_config_arguments
@property
def dq_rule_custom_s_q_l(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.dq_rule_custom_s_q_l
@dq_rule_custom_s_q_l.setter
def dq_rule_custom_s_q_l(self, dq_rule_custom_s_q_l: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_custom_s_q_l = dq_rule_custom_s_q_l
@property
def dq_rule_custom_s_q_l_return_type(
self,
) -> Optional[DataQualityRuleCustomSQLReturnType]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_custom_s_q_l_return_type
)
@dq_rule_custom_s_q_l_return_type.setter
def dq_rule_custom_s_q_l_return_type(
self,
dq_rule_custom_s_q_l_return_type: Optional[DataQualityRuleCustomSQLReturnType],
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_custom_s_q_l_return_type = (
dq_rule_custom_s_q_l_return_type
)
@property
def dq_rule_failed_rows_s_q_l(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_failed_rows_s_q_l
)
@dq_rule_failed_rows_s_q_l.setter
def dq_rule_failed_rows_s_q_l(self, dq_rule_failed_rows_s_q_l: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_failed_rows_s_q_l = dq_rule_failed_rows_s_q_l
@property
def dq_rule_row_scope_filtering_enabled(self) -> Optional[bool]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_row_scope_filtering_enabled
)
@dq_rule_row_scope_filtering_enabled.setter
def dq_rule_row_scope_filtering_enabled(
self, dq_rule_row_scope_filtering_enabled: Optional[bool]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_row_scope_filtering_enabled = (
dq_rule_row_scope_filtering_enabled
)
@property
def dq_rule_base_dataset(self) -> Optional[Asset]:
return None if self.attributes is None else self.attributes.dq_rule_base_dataset
@dq_rule_base_dataset.setter
def dq_rule_base_dataset(self, dq_rule_base_dataset: Optional[Asset]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_base_dataset = dq_rule_base_dataset
@property
def dq_rule_reference_datasets(self) -> Optional[List[Asset]]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_reference_datasets
)
@dq_rule_reference_datasets.setter
def dq_rule_reference_datasets(
self, dq_rule_reference_datasets: Optional[List[Asset]]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_reference_datasets = dq_rule_reference_datasets
@property
def dq_rule_template(self) -> Optional[DataQualityRuleTemplate]:
return None if self.attributes is None else self.attributes.dq_rule_template
@dq_rule_template.setter
def dq_rule_template(self, dq_rule_template: Optional[DataQualityRuleTemplate]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_template = dq_rule_template
@property
def dq_rule_base_column(self) -> Optional[Column]:
return None if self.attributes is None else self.attributes.dq_rule_base_column
@dq_rule_base_column.setter
def dq_rule_base_column(self, dq_rule_base_column: Optional[Column]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_base_column = dq_rule_base_column
@property
def dq_rule_reference_columns(self) -> Optional[List[Column]]:
return (
None
if self.attributes is None
else self.attributes.dq_rule_reference_columns
)
@dq_rule_reference_columns.setter
def dq_rule_reference_columns(
self, dq_rule_reference_columns: Optional[List[Column]]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.dq_rule_reference_columns = dq_rule_reference_columns
class Attributes(DataQuality.Attributes):
dq_rule_base_dataset_qualified_name: Optional[str] = Field(
default=None, description=""
)
dq_rule_base_column_qualified_name: Optional[str] = Field(
default=None, description=""
)
dq_rule_reference_dataset_qualified_names: Optional[Set[str]] = Field(
default=None, description=""
)
dq_rule_reference_column_qualified_names: Optional[Set[str]] = Field(
default=None, description=""
)
dq_rule_source_sync_status: Optional[DataQualitySourceSyncStatus] = Field(
default=None, description=""
)
dq_rule_source_sync_error_code: Optional[str] = Field(
default=None, description=""
)
dq_rule_source_sync_error_message: Optional[str] = Field(
default=None, description=""
)
dq_rule_source_sync_raw_error: Optional[str] = Field(
default=None, description=""
)
dq_rule_source_synced_at: Optional[datetime] = Field(
default=None, description=""
)
dq_rule_latest_result: Optional[DataQualityResult] = Field(
default=None, description=""
)
dq_rule_latest_result_computed_at: Optional[datetime] = Field(
default=None, description=""
)
dq_rule_latest_result_fetched_at: Optional[datetime] = Field(
default=None, description=""
)
dq_rule_latest_metric_value: Optional[str] = Field(default=None, description="")
dq_rule_latest_metric_value_computed_at: Optional[datetime] = Field(
default=None, description=""
)
dq_rule_dimension: Optional[DataQualityDimension] = Field(
default=None, description=""
)
dq_rule_template_name: Optional[str] = Field(default=None, description="")
dq_rule_status: Optional[DataQualityRuleStatus] = Field(
default=None, description=""
)
dq_rule_alert_priority: Optional[DataQualityRuleAlertPriority] = Field(
default=None, description=""
)
dq_rule_config_arguments: Optional[DataQualityRuleConfigArguments] = Field(
default=None, description=""
)
dq_rule_custom_s_q_l: Optional[str] = Field(default=None, description="")
dq_rule_custom_s_q_l_return_type: Optional[
DataQualityRuleCustomSQLReturnType
] = Field(default=None, description="")
dq_rule_failed_rows_s_q_l: Optional[str] = Field(default=None, description="")
dq_rule_row_scope_filtering_enabled: Optional[bool] = Field(
default=None, description=""
)
dq_rule_base_dataset: Optional[Asset] = Field(
default=None, description=""
) # relationship
dq_rule_reference_datasets: Optional[List[Asset]] = Field(
default=None, description=""
) # relationship
dq_rule_template: Optional[DataQualityRuleTemplate] = Field(
default=None, description=""
) # relationship
dq_rule_base_column: Optional[Column] = Field(
default=None, description=""
) # relationship
dq_rule_reference_columns: Optional[List[Column]] = Field(
default=None, description=""
) # relationship
@staticmethod
def _fetch_assets_for_row_scope_validation(
client: AtlanClient,
base_asset: Asset,
rule_conditions: Optional[str],
row_scope_filtering_enabled: bool,
) -> tuple[Asset, Optional[Asset]]:
asset_for_validation = base_asset
target_table_asset = None
if not row_scope_filtering_enabled:
return asset_for_validation, target_table_asset
# Extract target_table from rule_conditions
target_table_qualified_name = None
if rule_conditions:
try:
rule_conditions_json = json.loads(rule_conditions)
conditions = rule_conditions_json.get("conditions", [])
if conditions:
condition_value = conditions[0].get("value", {})
target_table_qualified_name = condition_value.get(
"target_table"
)
except (json.JSONDecodeError, KeyError, TypeError, AttributeError):
pass
qualified_names_to_search = []
if base_asset.qualified_name:
qualified_names_to_search.append(base_asset.qualified_name)
if target_table_qualified_name:
qualified_names_to_search.append(target_table_qualified_name)
if qualified_names_to_search:
from pyatlan.model.fluent_search import FluentSearch
search_request = (
FluentSearch()
.where(Asset.QUALIFIED_NAME.within(qualified_names_to_search))
.include_on_results(
Asset.ASSET_DQ_ROW_SCOPE_FILTER_COLUMN_QUALIFIED_NAME
)
).to_request()
results = client.asset.search(search_request)
for result in results.current_page():
if result.qualified_name == base_asset.qualified_name:
asset_for_validation = result
elif (
target_table_qualified_name
and result.qualified_name == target_table_qualified_name
):
target_table_asset = result
return asset_for_validation, target_table_asset
@staticmethod
def _get_template_config_value(
config_value: str,
property_name: Optional[str] = None,
value_key: str = "default",
):
if not config_value:
return None
try:
config_json = json.loads(config_value)
if property_name:
properties = config_json.get("properties", {})
field = properties.get(property_name, {})
return field.get(value_key)
else:
return config_json.get(value_key)
except (json.JSONDecodeError, KeyError):
return None
@staticmethod
def _validate_template_features(
rule_type: DataQualityRuleTemplateType,
rule_conditions: Optional[str],
row_scope_filtering_enabled: Optional[bool],
template_config: Optional[dict],
threshold_compare_operator: Optional[
DataQualityRuleThresholdCompareOperator
] = None,
asset: Optional[Asset] = None,
target_table_asset: Optional[Asset] = None,
) -> Optional[DataQualityRuleThresholdCompareOperator]:
if not template_config or not template_config.get("config"):
return None
config = template_config["config"]
if (
rule_conditions
and config.dq_rule_template_config_rule_conditions is None
):
raise ErrorCode.DQ_RULE_TYPE_NOT_SUPPORTED.exception_with_parameters(
rule_type.value, "rule conditions"
)
if row_scope_filtering_enabled:
advanced_settings = (
config.dq_rule_template_config_advanced_settings or ""
)
if "dqRuleRowScopeFilteringEnabled" not in str(advanced_settings):
raise ErrorCode.DQ_RULE_TYPE_NOT_SUPPORTED.exception_with_parameters(
rule_type.value, "row scope filtering"
)
if asset and not getattr(
asset,
"asset_d_q_row_scope_filter_column_qualified_name",
None,
):
raise ErrorCode.DQ_ROW_SCOPE_FILTER_COLUMN_MISSING.exception_with_parameters(
getattr(asset, "qualified_name", "unknown")
)
if target_table_asset:
if not getattr(
target_table_asset,
"asset_d_q_row_scope_filter_column_qualified_name",
None,
):
raise ErrorCode.DQ_ROW_SCOPE_FILTER_COLUMN_MISSING.exception_with_parameters(
getattr(target_table_asset, "qualified_name", "unknown")
)
if rule_conditions:
allowed_rule_conditions = (
DataQualityRule.Attributes._get_template_config_value(
config.dq_rule_template_config_rule_conditions or "",
None,
"enum",
)
)
if allowed_rule_conditions:
try:
rule_conditions_json = json.loads(rule_conditions)
conditions = rule_conditions_json.get("conditions", [])
if len(conditions) != 1:
raise ErrorCode.DQ_RULE_CONDITIONS_INVALID.exception_with_parameters(
f"exactly one condition required, found {len(conditions)}"
)
condition_type = conditions[0].get("type")
except json.JSONDecodeError:
condition_type = rule_conditions
if condition_type not in allowed_rule_conditions:
raise ErrorCode.DQ_RULE_CONDITIONS_INVALID.exception_with_parameters(
f"condition type '{condition_type}' not supported, allowed: {allowed_rule_conditions}"
)
if threshold_compare_operator is None:
return DataQualityRuleThresholdCompareOperator.EQUAL
elif (
threshold_compare_operator
!= DataQualityRuleThresholdCompareOperator.EQUAL
):
raise ErrorCode.INVALID_PARAMETER_VALUE.exception_with_parameters(
f"threshold_compare_operator={threshold_compare_operator.value}",
"threshold_compare_operator",
"EQUAL when rule_conditions are provided",
)
if threshold_compare_operator is not None:
allowed_operators = (
DataQualityRule.Attributes._get_template_config_value(
config.dq_rule_template_config_threshold_object,
"dqRuleTemplateConfigThresholdCompareOperator",
"enum",
)
)
if (
allowed_operators
and threshold_compare_operator.value not in allowed_operators
):
raise ErrorCode.INVALID_PARAMETER_VALUE.exception_with_parameters(
f"threshold_compare_operator={threshold_compare_operator.value}",
"threshold_compare_operator",
f"must be one of {allowed_operators}",
)
elif threshold_compare_operator is None:
default_value = DataQualityRule.Attributes._get_template_config_value(
config.dq_rule_template_config_threshold_object,
"dqRuleTemplateConfigThresholdCompareOperator",
"default",
)
if default_value:
threshold_compare_operator = (
DataQualityRuleThresholdCompareOperator(default_value)
)
return (
threshold_compare_operator
or DataQualityRuleThresholdCompareOperator.LESS_THAN_EQUAL
)
@staticmethod
def _generate_uuid():
d = int(time.time() * 1000)
random_bytes = uuid.uuid4().bytes
rand_index = 0
def replace_char(c):
nonlocal d, rand_index
r = (d + random_bytes[rand_index % 16]) % 16
rand_index += 1
d = d // 16
if c == "x":
return hex(r)[2:]
elif c == "y":
return hex((r & 0x3) | 0x8)[2:] # y -> 8 to b
else:
return c
template = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx"
uuid_str = "".join(replace_char(c) if c in "xy" else c for c in template)
return uuid_str
@classmethod
@init_guid
def creator(
cls,
*,
client: AtlanClient,
rule_name: str,
rule_type: DataQualityRuleTemplateType,
asset: Asset,
threshold_compare_operator: DataQualityRuleThresholdCompareOperator,
threshold_value: int,
alert_priority: DataQualityRuleAlertPriority,
column: Optional[Asset] = None,
threshold_unit: Optional[DataQualityRuleThresholdUnit] = None,
dimension: Optional[DataQualityDimension] = None,
custom_sql: Optional[str] = None,
custom_sql_return_type: Optional[DataQualityRuleCustomSQLReturnType] = None,
description: Optional[str] = None,
rule_conditions: Optional[str] = None,
row_scope_filtering_enabled: Optional[bool] = False,
) -> DataQualityRule.Attributes:
template_config = client.dq_template_config_cache.get_template_config(
rule_type.value
)
if template_config is None:
raise ErrorCode.DQ_RULE_NOT_FOUND.exception_with_parameters(
rule_type.value
)
template_rule_name = template_config.get("name")
template_qualified_name = template_config.get("qualified_name")
if dimension is None:
dimension = template_config.get("dimension")
if threshold_unit is None:
config = template_config.get("config")
if config is not None:
threshold_unit = (
DataQualityRule.Attributes._get_template_config_value(
config.dq_rule_template_config_threshold_object,
"dqRuleTemplateConfigThresholdUnit",
"default",
)
)
attr_dq = DataQualityRule.Attributes(
name="",
dq_rule_config_arguments=DataQualityRuleConfigArguments(
dq_rule_threshold_object=DataQualityRuleThresholdObject(
dq_rule_threshold_compare_operator=threshold_compare_operator,
dq_rule_threshold_value=threshold_value,
dq_rule_threshold_unit=threshold_unit,
),
dq_rule_config_rule_conditions=rule_conditions,
),
dq_rule_base_dataset_qualified_name=asset.qualified_name,
dq_rule_alert_priority=alert_priority,
dq_rule_row_scope_filtering_enabled=row_scope_filtering_enabled,
dq_rule_source_sync_status=DataQualitySourceSyncStatus.IN_PROGRESS,
dq_rule_status=DataQualityRuleStatus.ACTIVE,
dq_rule_base_dataset=asset,
qualified_name=f"{asset.qualified_name}/rule/{str(cls._generate_uuid())}",
dq_rule_dimension=dimension,
dq_rule_template_name=template_rule_name,
dq_rule_template=DataQualityRuleTemplate.ref_by_qualified_name(
qualified_name=template_qualified_name # type: ignore
),
)
if column is not None:
attr_dq.dq_rule_base_column_qualified_name = column.qualified_name
attr_dq.dq_rule_base_column = column # type: ignore
if custom_sql is not None:
attr_dq.dq_rule_custom_s_q_l = custom_sql
attr_dq.display_name = rule_name
if custom_sql_return_type is not None:
attr_dq.dq_rule_custom_s_q_l_return_type = custom_sql_return_type
if description is not None:
attr_dq.user_description = description
return attr_dq
attributes: DataQualityRule.Attributes = Field(
default_factory=lambda: DataQualityRule.Attributes(),
description=(
"Map of attributes in the instance and their values. "
"The specific keys of this map will vary by type, "
"so are described in the sub-types of this schema."
),
)
from .asset import Asset # noqa: E402, F401
from .data_quality_rule_template import DataQualityRuleTemplate # noqa: E402, F401