Source code for pyatlan.model.assets.cassandra_keyspace
# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
from __future__ import annotations
from typing import ClassVar, Dict, List, Optional
from pydantic.v1 import Field, validator
from pyatlan.model.fields.atlan_fields import BooleanField, KeywordField, RelationField
from .cassandra import Cassandra
[docs]
class CassandraKeyspace(Cassandra):
"""Description"""
type_name: str = Field(default="CassandraKeyspace", allow_mutation=False)
@validator("type_name")
def validate_type_name(cls, v):
if v != "CassandraKeyspace":
raise ValueError("must be CassandraKeyspace")
return v
def __setattr__(self, name, value):
if name in CassandraKeyspace._convenience_properties:
return object.__setattr__(self, name, value)
super().__setattr__(name, value)
CASSANDRA_KEYSPACE_DURABLE_WRITES: ClassVar[BooleanField] = BooleanField(
"cassandraKeyspaceDurableWrites", "cassandraKeyspaceDurableWrites"
)
"""
Indicates whether durable writes are enabled for the CassandraKeyspace.
"""
CASSANDRA_KEYSPACE_REPLICATION: ClassVar[KeywordField] = KeywordField(
"cassandraKeyspaceReplication", "cassandraKeyspaceReplication"
)
"""
Replication class for the CassandraKeyspace.
"""
CASSANDRA_KEYSPACE_VIRTUAL: ClassVar[BooleanField] = BooleanField(
"cassandraKeyspaceVirtual", "cassandraKeyspaceVirtual"
)
"""
Indicates whether the CassandraKeyspace is virtual.
"""
CASSANDRA_KEYSPACE_QUERY: ClassVar[KeywordField] = KeywordField(
"cassandraKeyspaceQuery", "cassandraKeyspaceQuery"
)
"""
Query associated with the CassandraKeyspace.
"""
CASSANDRA_TABLES: ClassVar[RelationField] = RelationField("cassandraTables")
"""
TBC
"""
CASSANDRA_VIEWS: ClassVar[RelationField] = RelationField("cassandraViews")
"""
TBC
"""
_convenience_properties: ClassVar[List[str]] = [
"cassandra_keyspace_durable_writes",
"cassandra_keyspace_replication",
"cassandra_keyspace_virtual",
"cassandra_keyspace_query",
"cassandra_tables",
"cassandra_views",
]
@property
def cassandra_keyspace_durable_writes(self) -> Optional[bool]:
return (
None
if self.attributes is None
else self.attributes.cassandra_keyspace_durable_writes
)
@cassandra_keyspace_durable_writes.setter
def cassandra_keyspace_durable_writes(
self, cassandra_keyspace_durable_writes: Optional[bool]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.cassandra_keyspace_durable_writes = (
cassandra_keyspace_durable_writes
)
@property
def cassandra_keyspace_replication(self) -> Optional[Dict[str, str]]:
return (
None
if self.attributes is None
else self.attributes.cassandra_keyspace_replication
)
@cassandra_keyspace_replication.setter
def cassandra_keyspace_replication(
self, cassandra_keyspace_replication: Optional[Dict[str, str]]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.cassandra_keyspace_replication = cassandra_keyspace_replication
@property
def cassandra_keyspace_virtual(self) -> Optional[bool]:
return (
None
if self.attributes is None
else self.attributes.cassandra_keyspace_virtual
)
@cassandra_keyspace_virtual.setter
def cassandra_keyspace_virtual(self, cassandra_keyspace_virtual: Optional[bool]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.cassandra_keyspace_virtual = cassandra_keyspace_virtual
@property
def cassandra_keyspace_query(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.cassandra_keyspace_query
)
@cassandra_keyspace_query.setter
def cassandra_keyspace_query(self, cassandra_keyspace_query: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.cassandra_keyspace_query = cassandra_keyspace_query
@property
def cassandra_tables(self) -> Optional[List[CassandraTable]]:
return None if self.attributes is None else self.attributes.cassandra_tables
@cassandra_tables.setter
def cassandra_tables(self, cassandra_tables: Optional[List[CassandraTable]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.cassandra_tables = cassandra_tables
@property
def cassandra_views(self) -> Optional[List[CassandraView]]:
return None if self.attributes is None else self.attributes.cassandra_views
@cassandra_views.setter
def cassandra_views(self, cassandra_views: Optional[List[CassandraView]]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.cassandra_views = cassandra_views
class Attributes(Cassandra.Attributes):
cassandra_keyspace_durable_writes: Optional[bool] = Field(
default=None, description=""
)
cassandra_keyspace_replication: Optional[Dict[str, str]] = Field(
default=None, description=""
)
cassandra_keyspace_virtual: Optional[bool] = Field(default=None, description="")
cassandra_keyspace_query: Optional[str] = Field(default=None, description="")
cassandra_tables: Optional[List[CassandraTable]] = Field(
default=None, description=""
) # relationship
cassandra_views: Optional[List[CassandraView]] = Field(
default=None, description=""
) # relationship
attributes: CassandraKeyspace.Attributes = Field(
default_factory=lambda: CassandraKeyspace.Attributes(),
description=(
"Map of attributes in the instance and their values. "
"The specific keys of this map will vary by type, "
"so are described in the sub-types of this schema."
),
)
from .cassandra_table import CassandraTable # noqa: E402, F401
from .cassandra_view import CassandraView # noqa: E402, F401
CassandraKeyspace.Attributes.update_forward_refs()