# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
from __future__ import annotations
from datetime import datetime
from typing import ClassVar, List, Optional
from warnings import warn
from pydantic.v1 import Field, validator
from pyatlan.model.enums import AtlanConnectorType
from pyatlan.model.fields.atlan_fields import (
KeywordField,
KeywordTextField,
NumericField,
RelationField,
)
from pyatlan.model.utils import construct_object_key
from pyatlan.utils import init_guid, validate_required_fields
from .s3 import S3
[docs]
class S3Object(S3):
"""Description"""
@classmethod
@init_guid
def creator(
cls,
*,
name: str,
connection_qualified_name: str,
aws_arn: str,
s3_bucket_name: str,
s3_bucket_qualified_name: str,
) -> S3Object:
validate_required_fields(
[
"name",
"connection_qualified_name",
"aws_arn",
"s3_bucket_name",
"s3_bucket_qualified_name",
],
[
name,
connection_qualified_name,
aws_arn,
s3_bucket_name,
s3_bucket_qualified_name,
],
)
attributes = S3Object.Attributes.create(
name=name,
connection_qualified_name=connection_qualified_name,
aws_arn=aws_arn,
s3_bucket_name=s3_bucket_name,
s3_bucket_qualified_name=s3_bucket_qualified_name,
)
return cls(attributes=attributes)
@classmethod
@init_guid
def create(
cls,
*,
name: str,
connection_qualified_name: str,
aws_arn: str,
s3_bucket_name: str,
s3_bucket_qualified_name: str,
) -> S3Object:
warn(
(
"This method is deprecated, please use 'creator' "
"instead, which offers identical functionality."
),
DeprecationWarning,
stacklevel=2,
)
return cls.creator(
name=name,
connection_qualified_name=connection_qualified_name,
aws_arn=aws_arn,
s3_bucket_name=s3_bucket_name,
s3_bucket_qualified_name=s3_bucket_qualified_name,
)
@classmethod
@init_guid
def creator_with_prefix(
cls,
*,
name: str,
connection_qualified_name: str,
s3_bucket_name: str,
s3_bucket_qualified_name: str,
prefix: str = "",
) -> S3Object:
validate_required_fields(
[
"name",
"connection_qualified_name",
"s3_bucket_name",
"s3_bucket_qualified_name",
],
[
name,
connection_qualified_name,
s3_bucket_name,
s3_bucket_qualified_name,
],
)
attributes = S3Object.Attributes.create_with_prefix(
name=name,
connection_qualified_name=connection_qualified_name,
prefix=prefix,
s3_bucket_name=s3_bucket_name,
s3_bucket_qualified_name=s3_bucket_qualified_name,
)
return cls(attributes=attributes)
@classmethod
@init_guid
def create_with_prefix(
cls,
*,
name: str,
connection_qualified_name: str,
s3_bucket_name: str,
s3_bucket_qualified_name: str,
prefix: str = "",
) -> S3Object:
warn(
(
"This method is deprecated, please use 'creator_with_prefix' "
"instead, which offers identical functionality."
),
DeprecationWarning,
stacklevel=2,
)
return cls.creator_with_prefix(
name=name,
connection_qualified_name=connection_qualified_name,
s3_bucket_name=s3_bucket_name,
s3_bucket_qualified_name=s3_bucket_qualified_name,
prefix=prefix,
)
type_name: str = Field(default="S3Object", allow_mutation=False)
@validator("type_name")
def validate_type_name(cls, v):
if v != "S3Object":
raise ValueError("must be S3Object")
return v
def __setattr__(self, name, value):
if name in S3Object._convenience_properties:
return object.__setattr__(self, name, value)
super().__setattr__(name, value)
S3OBJECT_LAST_MODIFIED_TIME: ClassVar[NumericField] = NumericField(
"s3ObjectLastModifiedTime", "s3ObjectLastModifiedTime"
)
"""
Time (epoch) at which this object was last updated, in milliseconds, or when it was created if it has never been modified.
""" # noqa: E501
S3BUCKET_NAME: ClassVar[KeywordTextField] = KeywordTextField(
"s3BucketName", "s3BucketName", "s3BucketName.text"
)
"""
Simple name of the bucket in which this object exists.
"""
S3BUCKET_QUALIFIED_NAME: ClassVar[KeywordField] = KeywordField(
"s3BucketQualifiedName", "s3BucketQualifiedName"
)
"""
Unique name of the bucket in which this object exists.
"""
S3OBJECT_SIZE: ClassVar[NumericField] = NumericField("s3ObjectSize", "s3ObjectSize")
"""
Object size in bytes.
"""
S3OBJECT_STORAGE_CLASS: ClassVar[KeywordField] = KeywordField(
"s3ObjectStorageClass", "s3ObjectStorageClass"
)
"""
Storage class used for storing this object, for example: standard, intelligent-tiering, glacier, etc.
"""
S3OBJECT_KEY: ClassVar[KeywordTextField] = KeywordTextField(
"s3ObjectKey", "s3ObjectKey", "s3ObjectKey.text"
)
"""
Unique identity of this object in an S3 bucket. This is usually the concatenation of any prefix (folder) in the S3 bucket with the name of the object (file) itself.
""" # noqa: E501
S3OBJECT_CONTENT_TYPE: ClassVar[KeywordField] = KeywordField(
"s3ObjectContentType", "s3ObjectContentType"
)
"""
Type of content in this object, for example: text/plain, application/json, etc.
"""
S3OBJECT_CONTENT_DISPOSITION: ClassVar[KeywordField] = KeywordField(
"s3ObjectContentDisposition", "s3ObjectContentDisposition"
)
"""
Information about how this object's content should be presented.
"""
S3OBJECT_VERSION_ID: ClassVar[KeywordField] = KeywordField(
"s3ObjectVersionId", "s3ObjectVersionId"
)
"""
Version of this object. This is only applicable when versioning is enabled on the bucket in which this object exists.
""" # noqa: E501
BUCKET: ClassVar[RelationField] = RelationField("bucket")
"""
TBC
"""
_convenience_properties: ClassVar[List[str]] = [
"s3_object_last_modified_time",
"s3_bucket_name",
"s3_bucket_qualified_name",
"s3_object_size",
"s3_object_storage_class",
"s3_object_key",
"s3_object_content_type",
"s3_object_content_disposition",
"s3_object_version_id",
"bucket",
]
@property
def s3_object_last_modified_time(self) -> Optional[datetime]:
return (
None
if self.attributes is None
else self.attributes.s3_object_last_modified_time
)
@s3_object_last_modified_time.setter
def s3_object_last_modified_time(
self, s3_object_last_modified_time: Optional[datetime]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.s3_object_last_modified_time = s3_object_last_modified_time
@property
def s3_bucket_name(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.s3_bucket_name
@s3_bucket_name.setter
def s3_bucket_name(self, s3_bucket_name: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.s3_bucket_name = s3_bucket_name
@property
def s3_bucket_qualified_name(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.s3_bucket_qualified_name
)
@s3_bucket_qualified_name.setter
def s3_bucket_qualified_name(self, s3_bucket_qualified_name: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.s3_bucket_qualified_name = s3_bucket_qualified_name
@property
def s3_object_size(self) -> Optional[int]:
return None if self.attributes is None else self.attributes.s3_object_size
@s3_object_size.setter
def s3_object_size(self, s3_object_size: Optional[int]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.s3_object_size = s3_object_size
@property
def s3_object_storage_class(self) -> Optional[str]:
return (
None if self.attributes is None else self.attributes.s3_object_storage_class
)
@s3_object_storage_class.setter
def s3_object_storage_class(self, s3_object_storage_class: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.s3_object_storage_class = s3_object_storage_class
@property
def s3_object_key(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.s3_object_key
@s3_object_key.setter
def s3_object_key(self, s3_object_key: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.s3_object_key = s3_object_key
@property
def s3_object_content_type(self) -> Optional[str]:
return (
None if self.attributes is None else self.attributes.s3_object_content_type
)
@s3_object_content_type.setter
def s3_object_content_type(self, s3_object_content_type: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.s3_object_content_type = s3_object_content_type
@property
def s3_object_content_disposition(self) -> Optional[str]:
return (
None
if self.attributes is None
else self.attributes.s3_object_content_disposition
)
@s3_object_content_disposition.setter
def s3_object_content_disposition(
self, s3_object_content_disposition: Optional[str]
):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.s3_object_content_disposition = s3_object_content_disposition
@property
def s3_object_version_id(self) -> Optional[str]:
return None if self.attributes is None else self.attributes.s3_object_version_id
@s3_object_version_id.setter
def s3_object_version_id(self, s3_object_version_id: Optional[str]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.s3_object_version_id = s3_object_version_id
@property
def bucket(self) -> Optional[S3Bucket]:
return None if self.attributes is None else self.attributes.bucket
@bucket.setter
def bucket(self, bucket: Optional[S3Bucket]):
if self.attributes is None:
self.attributes = self.Attributes()
self.attributes.bucket = bucket
class Attributes(S3.Attributes):
s3_object_last_modified_time: Optional[datetime] = Field(
default=None, description=""
)
s3_bucket_name: Optional[str] = Field(default=None, description="")
s3_bucket_qualified_name: Optional[str] = Field(default=None, description="")
s3_object_size: Optional[int] = Field(default=None, description="")
s3_object_storage_class: Optional[str] = Field(default=None, description="")
s3_object_key: Optional[str] = Field(default=None, description="")
s3_object_content_type: Optional[str] = Field(default=None, description="")
s3_object_content_disposition: Optional[str] = Field(
default=None, description=""
)
s3_object_version_id: Optional[str] = Field(default=None, description="")
bucket: Optional[S3Bucket] = Field(default=None, description="") # relationship
@classmethod
@init_guid
def create(
cls,
*,
name: str,
connection_qualified_name: str,
aws_arn: str,
s3_bucket_name: str,
s3_bucket_qualified_name: str,
) -> S3Object.Attributes:
validate_required_fields(
[
"name",
"connection_qualified_name",
"aws_arn",
"s3_bucket_name",
"s3_bucket_qualified_name",
],
[
name,
connection_qualified_name,
aws_arn,
s3_bucket_name,
s3_bucket_qualified_name,
],
)
fields = connection_qualified_name.split("/")
if len(fields) != 3:
raise ValueError("Invalid connection_qualified_name")
try:
if fields[0].replace(" ", "") == "" or fields[2].replace(" ", "") == "":
raise ValueError("Invalid connection_qualified_name")
connector_type = AtlanConnectorType(fields[1]) # type:ignore
if connector_type != AtlanConnectorType.S3:
raise ValueError("Connector type must be S3")
except ValueError as e:
raise ValueError("Invalid connection_qualified_name") from e
return S3Object.Attributes(
aws_arn=aws_arn,
name=name,
connection_qualified_name=connection_qualified_name,
qualified_name=f"{connection_qualified_name}/{aws_arn}",
connector_name=connector_type.value,
s3_bucket_name=s3_bucket_name,
s3_bucket_qualified_name=s3_bucket_qualified_name,
bucket=S3Bucket.ref_by_qualified_name(s3_bucket_qualified_name),
)
@classmethod
@init_guid
def create_with_prefix(
cls,
*,
name: str,
connection_qualified_name: str,
s3_bucket_name: str,
s3_bucket_qualified_name: str,
prefix: str = "",
) -> S3Object.Attributes:
validate_required_fields(
[
"name",
"connection_qualified_name",
"s3_bucket_name",
"s3_bucket_qualified_name",
],
[
name,
connection_qualified_name,
s3_bucket_name,
s3_bucket_qualified_name,
],
)
fields = connection_qualified_name.split("/")
if len(fields) != 3:
raise ValueError("Invalid connection_qualified_name")
try:
if fields[0].replace(" ", "") == "" or fields[2].replace(" ", "") == "":
raise ValueError("Invalid connection_qualified_name")
connector_type = AtlanConnectorType(fields[1]) # type:ignore
if connector_type != AtlanConnectorType.S3:
raise ValueError("Connector type must be S3")
except ValueError as e:
raise ValueError("Invalid connection_qualified_name") from e
object_key = construct_object_key(prefix, name)
return S3Object.Attributes(
name=name,
s3_object_key=object_key,
connection_qualified_name=connection_qualified_name,
# We should include `s3_bucket_name` to make it unique
# and avoid duplicate `path/name` (object keys) in two different buckets
# within the same connection.
qualified_name=f"{connection_qualified_name}/{s3_bucket_name}/{object_key}",
connector_name=connector_type.value,
s3_bucket_name=s3_bucket_name,
s3_bucket_qualified_name=s3_bucket_qualified_name,
bucket=S3Bucket.ref_by_qualified_name(s3_bucket_qualified_name),
)
attributes: S3Object.Attributes = Field(
default_factory=lambda: S3Object.Attributes(),
description=(
"Map of attributes in the instance and their values. "
"The specific keys of this map will vary by type, "
"so are described in the sub-types of this schema."
),
)
from .s3_bucket import S3Bucket # noqa: E402, F401
S3Object.Attributes.update_forward_refs()