# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
from __future__ import annotations
from datetime import datetime
from typing import ClassVar, List, Optional, Set
from pydantic.v1 import Field, validator
from pyatlan.model.enums import WorkflowRunStatus, WorkflowRunType
from pyatlan.model.fields.atlan_fields import KeywordField, NumericField, TextField
from .core.asset import Asset
[docs]
class WorkflowRun(Asset, type_name="WorkflowRun"):
"""Description"""
type_name: str = Field(default="WorkflowRun", allow_mutation=False)
@validator("type_name")
def validate_type_name(cls, v):
if v != "WorkflowRun":
raise ValueError("must be WorkflowRun")
return v
def __setattr__(self, name, value):
if name in WorkflowRun._convenience_properties:
return object.__setattr__(self, name, value)
super().__setattr__(name, value)
WORKFLOW_RUN_WORKFLOW_GUID: ClassVar[KeywordField] = KeywordField(
"workflowRunWorkflowGuid", "workflowRunWorkflowGuid"
)
"""
GUID of the workflow from which this run was created.
"""
WORKFLOW_RUN_TYPE: ClassVar[KeywordField] = KeywordField(
"workflowRunType", "workflowRunType"
)
"""
Type of the workflow from which this run was created.
"""
WORKFLOW_RUN_ACTION_CHOICES: ClassVar[KeywordField] = KeywordField(
"workflowRunActionChoices", "workflowRunActionChoices"
)
"""
List of workflow run action choices.
"""
WORKFLOW_RUN_ON_ASSET_GUID: ClassVar[KeywordField] = KeywordField(
"workflowRunOnAssetGuid", "workflowRunOnAssetGuid"
)
"""
The asset for which this run was created.
"""
WORKFLOW_RUN_COMMENT: ClassVar[TextField] = TextField(
"workflowRunComment", "workflowRunComment"
)
"""
The comment added by the requester
"""
WORKFLOW_RUN_CONFIG: ClassVar[TextField] = TextField(
"workflowRunConfig", "workflowRunConfig"
)
"""
Details of the approval workflow run.
"""
WORKFLOW_RUN_STATUS: ClassVar[KeywordField] = KeywordField(
"workflowRunStatus", "workflowRunStatus"
)
"""
Status of the run.
"""
WORKFLOW_RUN_EXPIRES_AT: ClassVar[NumericField] = NumericField(
"workflowRunExpiresAt", "workflowRunExpiresAt"
)
"""
Time at which this run will expire.
"""
WORKFLOW_RUN_CREATED_BY: ClassVar[KeywordField] = KeywordField(
"workflowRunCreatedBy", "workflowRunCreatedBy"
)
"""
Username of the user who created this workflow run.
"""
WORKFLOW_RUN_UPDATED_BY: ClassVar[KeywordField] = KeywordField(
"workflowRunUpdatedBy", "workflowRunUpdatedBy"
)
"""
Username of the user who updated this workflow run.
"""
WORKFLOW_RUN_DELETED_AT: ClassVar[NumericField] = NumericField(
"workflowRunDeletedAt", "workflowRunDeletedAt"
)
"""
Deletion time of this workflow run.
"""
_convenience_properties: ClassVar[List[str]] = [
"workflow_run_workflow_guid",
"workflow_run_type",
"workflow_run_action_choices",
"workflow_run_on_asset_guid",
"workflow_run_comment",
"workflow_run_config",
"workflow_run_status",
"workflow_run_expires_at",
"workflow_run_created_by",
"workflow_run_updated_by",
"workflow_run_deleted_at",
]
@property
def workflow_run_workflow_guid(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.workflow_run_workflow_guid
)
@workflow_run_workflow_guid.setter
def workflow_run_workflow_guid(self, workflow_run_workflow_guid: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_run_workflow_guid = workflow_run_workflow_guid
@property
def workflow_run_type(self) -> Optional[WorkflowRunType]:
return None if self.attributes is None else self.attributes.workflow_run_type
@workflow_run_type.setter
def workflow_run_type(self, workflow_run_type: Optional[WorkflowRunType]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_run_type = workflow_run_type
@property
def workflow_run_action_choices(self) -> Optional[Set[str]]:
return (
None
if self.attributes is None
else self.attributes.workflow_run_action_choices
)
@workflow_run_action_choices.setter
def workflow_run_action_choices(
self, workflow_run_action_choices: Optional[Set[str]]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_run_action_choices = workflow_run_action_choices
@property
def workflow_run_on_asset_guid(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.workflow_run_on_asset_guid
)
@workflow_run_on_asset_guid.setter
def workflow_run_on_asset_guid(self, workflow_run_on_asset_guid: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_run_on_asset_guid = workflow_run_on_asset_guid
@property
def workflow_run_comment(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.workflow_run_comment
@workflow_run_comment.setter
def workflow_run_comment(self, workflow_run_comment: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_run_comment = workflow_run_comment
@property
def workflow_run_config(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.workflow_run_config
@workflow_run_config.setter
def workflow_run_config(self, workflow_run_config: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_run_config = workflow_run_config
@property
def workflow_run_status(self) -> Optional[WorkflowRunStatus]:
return None if self.attributes is None else self.attributes.workflow_run_status
@workflow_run_status.setter
def workflow_run_status(self, workflow_run_status: Optional[WorkflowRunStatus]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_run_status = workflow_run_status
@property
def workflow_run_expires_at(self) -> Optional[datetime]:
return (
None if self.attributes is None else self.attributes.workflow_run_expires_at
)
@workflow_run_expires_at.setter
def workflow_run_expires_at(self, workflow_run_expires_at: Optional[datetime]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_run_expires_at = workflow_run_expires_at
@property
def workflow_run_created_by(self) -> Optional[str]:
return (
None if self.attributes is None else self.attributes.workflow_run_created_by
)
@workflow_run_created_by.setter
def workflow_run_created_by(self, workflow_run_created_by: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_run_created_by = workflow_run_created_by
@property
def workflow_run_updated_by(self) -> Optional[str]:
return (
None if self.attributes is None else self.attributes.workflow_run_updated_by
)
@workflow_run_updated_by.setter
def workflow_run_updated_by(self, workflow_run_updated_by: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_run_updated_by = workflow_run_updated_by
@property
def workflow_run_deleted_at(self) -> Optional[datetime]:
return (
None if self.attributes is None else self.attributes.workflow_run_deleted_at
)
@workflow_run_deleted_at.setter
def workflow_run_deleted_at(self, workflow_run_deleted_at: Optional[datetime]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_run_deleted_at = workflow_run_deleted_at
class Attributes(Asset.Attributes):
workflow_run_workflow_guid: Optional[str] = Field(default=None, description="")
workflow_run_type: Optional[WorkflowRunType] = Field(
default=None, description=""
)
workflow_run_action_choices: Optional[Set[str]] = Field(
default=None, description=""
)
workflow_run_on_asset_guid: Optional[str] = Field(default=None, description="")
workflow_run_comment: Optional[str] = Field(default=None, description="")
workflow_run_config: Optional[str] = Field(default=None, description="")
workflow_run_status: Optional[WorkflowRunStatus] = Field(
default=None, description=""
)
workflow_run_expires_at: Optional[datetime] = Field(
default=None, description=""
)
workflow_run_created_by: Optional[str] = Field(default=None, description="")
workflow_run_updated_by: Optional[str] = Field(default=None, description="")
workflow_run_deleted_at: Optional[datetime] = Field(
default=None, description=""
)
attributes: WorkflowRun.Attributes = Field(
default_factory=lambda: WorkflowRun.Attributes(),
description=(
"Map of attributes in the instance and their values. "
"The specific keys of this map will vary by type, "
"so are described in the sub-types of this schema."
),
)
WorkflowRun.Attributes.update_forward_refs()