Source code for pyatlan.model.assets.core.snowflake_stream

# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.


from __future__ import annotations

from datetime import datetime
from typing import ClassVar, List, Optional

from pydantic.v1 import Field, validator

from pyatlan.model.fields.atlan_fields import (
    BooleanField,
    KeywordField,
    NumericField,
    RelationField,
)

from .s_q_l import SQL


[docs] class SnowflakeStream(SQL): """Description""" type_name: str = Field(default="SnowflakeStream", allow_mutation=False) @validator("type_name") def validate_type_name(cls, v): if v != "SnowflakeStream": raise ValueError("must be SnowflakeStream") return v def __setattr__(self, name, value): if name in SnowflakeStream._convenience_properties: return object.__setattr__(self, name, value) super().__setattr__(name, value) SNOWFLAKE_STREAM_TYPE: ClassVar[KeywordField] = KeywordField( "snowflakeStreamType", "snowflakeStreamType" ) """ Type of this stream, for example: standard, append-only, insert-only, etc. """ SNOWFLAKE_STREAM_SOURCE_TYPE: ClassVar[KeywordField] = KeywordField( "snowflakeStreamSourceType", "snowflakeStreamSourceType" ) """ Type of the source of this stream. """ SNOWFLAKE_STREAM_MODE: ClassVar[KeywordField] = KeywordField( "snowflakeStreamMode", "snowflakeStreamMode" ) """ Mode of this stream. """ SNOWFLAKE_STREAM_IS_STALE: ClassVar[BooleanField] = BooleanField( "snowflakeStreamIsStale", "snowflakeStreamIsStale" ) """ Whether this stream is stale (true) or not (false). """ SNOWFLAKE_STREAM_STALE_AFTER: ClassVar[NumericField] = NumericField( "snowflakeStreamStaleAfter", "snowflakeStreamStaleAfter" ) """ Time (epoch) after which this stream will be stale, in milliseconds. """ ATLAN_SCHEMA: ClassVar[RelationField] = RelationField("atlanSchema") """ TBC """ _convenience_properties: ClassVar[List[str]] = [ "snowflake_stream_type", "snowflake_stream_source_type", "snowflake_stream_mode", "snowflake_stream_is_stale", "snowflake_stream_stale_after", "atlan_schema", ] @property def snowflake_stream_type(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.snowflake_stream_type ) @snowflake_stream_type.setter def snowflake_stream_type(self, snowflake_stream_type: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.snowflake_stream_type = snowflake_stream_type @property def snowflake_stream_source_type(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.snowflake_stream_source_type ) @snowflake_stream_source_type.setter def snowflake_stream_source_type(self, snowflake_stream_source_type: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.snowflake_stream_source_type = snowflake_stream_source_type @property def snowflake_stream_mode(self) -> Optional[str]: return ( None if self.attributes is None else self.attributes.snowflake_stream_mode ) @snowflake_stream_mode.setter def snowflake_stream_mode(self, snowflake_stream_mode: Optional[str]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.snowflake_stream_mode = snowflake_stream_mode @property def snowflake_stream_is_stale(self) -> Optional[bool]: return ( None if self.attributes is None else self.attributes.snowflake_stream_is_stale ) @snowflake_stream_is_stale.setter def snowflake_stream_is_stale(self, snowflake_stream_is_stale: Optional[bool]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.snowflake_stream_is_stale = snowflake_stream_is_stale @property def snowflake_stream_stale_after(self) -> Optional[datetime]: return ( None if self.attributes is None else self.attributes.snowflake_stream_stale_after ) @snowflake_stream_stale_after.setter def snowflake_stream_stale_after( self, snowflake_stream_stale_after: Optional[datetime] ): if self.attributes is None: self.attributes = self.Attributes() self.attributes.snowflake_stream_stale_after = snowflake_stream_stale_after @property def atlan_schema(self) -> Optional[Schema]: return None if self.attributes is None else self.attributes.atlan_schema @atlan_schema.setter def atlan_schema(self, atlan_schema: Optional[Schema]): if self.attributes is None: self.attributes = self.Attributes() self.attributes.atlan_schema = atlan_schema class Attributes(SQL.Attributes): snowflake_stream_type: Optional[str] = Field(default=None, description="") snowflake_stream_source_type: Optional[str] = Field( default=None, description="" ) snowflake_stream_mode: Optional[str] = Field(default=None, description="") snowflake_stream_is_stale: Optional[bool] = Field(default=None, description="") snowflake_stream_stale_after: Optional[datetime] = Field( default=None, description="" ) atlan_schema: Optional[Schema] = Field( default=None, description="" ) # relationship attributes: SnowflakeStream.Attributes = Field( default_factory=lambda: SnowflakeStream.Attributes(), description=( "Map of attributes in the instance and their values. " "The specific keys of this map will vary by type, " "so are described in the sub-types of this schema." ), )
from .schema import Schema # noqa: E402, F401