# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
from __future__ import annotations
from datetime import datetime
from typing import ClassVar, List, Optional, Set
from pydantic.v1 import Field, validator
from pyatlan.model.enums import WorkflowStatus, WorkflowType
from pyatlan.model.fields.atlan_fields import KeywordField, NumericField, TextField
from .core.asset import Asset
[docs]
class Workflow(Asset, type_name="Workflow"):
"""Description"""
type_name: str = Field(default="Workflow", allow_mutation=False)
@validator("type_name")
def validate_type_name(cls, v):
if v != "Workflow":
raise ValueError("must be Workflow")
return v
def __setattr__(self, name, value):
if name in Workflow._convenience_properties:
return object.__setattr__(self, name, value)
super().__setattr__(name, value)
WORKFLOW_TEMPLATE_GUID: ClassVar[KeywordField] = KeywordField(
"workflowTemplateGuid", "workflowTemplateGuid"
)
"""
GUID of the workflow template from which this workflow was created.
"""
WORKFLOW_TYPE: ClassVar[KeywordField] = KeywordField("workflowType", "workflowType")
"""
Type of the workflow.
"""
WORKFLOW_ACTION_CHOICES: ClassVar[KeywordField] = KeywordField(
"workflowActionChoices", "workflowActionChoices"
)
"""
List of workflow action choices.
"""
WORKFLOW_CONFIG: ClassVar[TextField] = TextField("workflowConfig", "workflowConfig")
"""
Details of the workflow.
"""
WORKFLOW_STATUS: ClassVar[KeywordField] = KeywordField(
"workflowStatus", "workflowStatus"
)
"""
Status of the workflow.
"""
WORKFLOW_RUN_EXPIRES_IN: ClassVar[TextField] = TextField(
"workflowRunExpiresIn", "workflowRunExpiresIn"
)
"""
Time duration after which a run of this workflow will expire.
"""
WORKFLOW_CREATED_BY: ClassVar[KeywordField] = KeywordField(
"workflowCreatedBy", "workflowCreatedBy"
)
"""
Username of the user who created this workflow.
"""
WORKFLOW_UPDATED_BY: ClassVar[KeywordField] = KeywordField(
"workflowUpdatedBy", "workflowUpdatedBy"
)
"""
Username of the user who updated this workflow.
"""
WORKFLOW_DELETED_AT: ClassVar[NumericField] = NumericField(
"workflowDeletedAt", "workflowDeletedAt"
)
"""
Deletion time of this workflow.
"""
_convenience_properties: ClassVar[List[str]] = [
"workflow_template_guid",
"workflow_type",
"workflow_action_choices",
"workflow_config",
"workflow_status",
"workflow_run_expires_in",
"workflow_created_by",
"workflow_updated_by",
"workflow_deleted_at",
]
@property
def workflow_template_guid(self) -> Optional[str]:
return (
None if self.attributes is None else self.attributes.workflow_template_guid
)
@workflow_template_guid.setter
def workflow_template_guid(self, workflow_template_guid: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_template_guid = workflow_template_guid
@property
def workflow_type(self) -> Optional[WorkflowType]:
return None if self.attributes is None else self.attributes.workflow_type
@workflow_type.setter
def workflow_type(self, workflow_type: Optional[WorkflowType]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_type = workflow_type
@property
def workflow_action_choices(self) -> Optional[Set[str]]:
return (
None if self.attributes is None else self.attributes.workflow_action_choices
)
@workflow_action_choices.setter
def workflow_action_choices(self, workflow_action_choices: Optional[Set[str]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_action_choices = workflow_action_choices
@property
def workflow_config(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.workflow_config
@workflow_config.setter
def workflow_config(self, workflow_config: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_config = workflow_config
@property
def workflow_status(self) -> Optional[WorkflowStatus]:
return None if self.attributes is None else self.attributes.workflow_status
@workflow_status.setter
def workflow_status(self, workflow_status: Optional[WorkflowStatus]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_status = workflow_status
@property
def workflow_run_expires_in(self) -> Optional[str]:
return (
None if self.attributes is None else self.attributes.workflow_run_expires_in
)
@workflow_run_expires_in.setter
def workflow_run_expires_in(self, workflow_run_expires_in: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_run_expires_in = workflow_run_expires_in
@property
def workflow_created_by(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.workflow_created_by
@workflow_created_by.setter
def workflow_created_by(self, workflow_created_by: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_created_by = workflow_created_by
@property
def workflow_updated_by(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.workflow_updated_by
@workflow_updated_by.setter
def workflow_updated_by(self, workflow_updated_by: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_updated_by = workflow_updated_by
@property
def workflow_deleted_at(self) -> Optional[datetime]:
return None if self.attributes is None else self.attributes.workflow_deleted_at
@workflow_deleted_at.setter
def workflow_deleted_at(self, workflow_deleted_at: Optional[datetime]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.workflow_deleted_at = workflow_deleted_at
class Attributes(Asset.Attributes):
workflow_template_guid: Optional[str] = Field(default=None, description="")
workflow_type: Optional[WorkflowType] = Field(default=None, description="")
workflow_action_choices: Optional[Set[str]] = Field(
default=None, description=""
)
workflow_config: Optional[str] = Field(default=None, description="")
workflow_status: Optional[WorkflowStatus] = Field(default=None, description="")
workflow_run_expires_in: Optional[str] = Field(default=None, description="")
workflow_created_by: Optional[str] = Field(default=None, description="")
workflow_updated_by: Optional[str] = Field(default=None, description="")
workflow_deleted_at: Optional[datetime] = Field(default=None, description="")
attributes: Workflow.Attributes = Field(
default_factory=lambda: Workflow.Attributes(),
description=(
"Map of attributes in the instance and their values. "
"The specific keys of this map will vary by type, "
"so are described in the sub-types of this schema."
),
)
Workflow.Attributes.update_forward_refs()