Skip to content

aws

Classes:

StreamBufferType module-attribute

StreamBufferType = Callable[
    [], AbstractContextManager[IO[bytes]]
]

A function that returns a context manager for a stream buffer.

S3Pointer

Bases: TypedDict

A pointer to a location in S3.

Attributes:

  • bucket (str) –

    The S3 bucket where the data is stored.

  • key (str) –

    The S3 object key where the data is stored.

bucket instance-attribute

bucket: str

The S3 bucket where the data is stored.

key instance-attribute

key: str

The S3 object key where the data is stored.

S3Router

Bases: Protocol

A protocol for routing data to S3 buckets by returning an S3Pointer.

S3Storage

S3Storage(
    *,
    s3_client: S3Client,
    s3_router: S3Router | None,
    max_writers: int | None = None,
    max_readers: int | None = None,
    stream_writer_min_part_size: int = _5MB,
    stream_writer_buffer_type: StreamBufferType = lambda: SpooledTemporaryFile(
        max_size=_5MB
    ),
    stream_reader_max_part_size: int = _5MB
)

Bases: Storage['S3Pointer']

Storage for S3 data.

Parameters:

  • s3_client

    (S3Client) –

    The S3 client to use for storage operations.

  • s3_router

    (S3Router | None) –

    The S3 router to use for mapping digests to S3 pointers.

  • max_writers

    (int | None, default: None ) –

    The maximum number of concurrent writes to S3.

  • max_readers

    (int | None, default: None ) –

    The maximum number of concurrent reads from S3.

  • stream_writer_min_part_size

    (int, default: _5MB ) –

    The minimum part size written to S3 while streaming.

  • stream_writer_buffer_type

    (StreamBufferType, default: lambda: SpooledTemporaryFile(max_size=_5MB) ) –

    The buffer type to use for streaming writes.

  • stream_reader_max_part_size

    (int, default: _5MB ) –

    The maximum part size read from S3 while streaming.

Methods:

deserialize_config

deserialize_config(config: str) -> C

Deserialize the configuration from a JSON string.

read_data async

read_data(pointer: S3Pointer) -> bytes

Load the value from the given location.

read_data_stream async

read_data_stream(
    pointer: S3Pointer,
) -> AsyncGenerator[bytes]

Load the stream from the given location.

serialize_config

serialize_config(config: C) -> str

Serialize the configuration to a JSON string.

write_data async

write_data(
    data: bytes, digest: Digest, tags: TagMap
) -> S3Pointer

Save the given value.

write_data_stream async

write_data_stream(
    data_stream: AsyncIterable[bytes],
    get_digest: GetStreamDigest,
    tags: TagMap,
) -> S3Pointer

Save the given data stream.

This works by first saving the stream to a temporary key becuase the content hash is not known until the stream is fully read. The data has been written to the temporary key it's copied to its final location based on the content hash.

simple_s3_router

simple_s3_router(bucket: str, prefix: str = '') -> S3Router

Create a simple S3 router that routes digests to S3 pointers.

Object paths are of the form:

<prefix>/<content_hash>.<extension>

Parameters:

  • bucket

    (str) –

    The S3 bucket to use for routing.

  • prefix

    (str, default: '' ) –

    An optional prefix to add to the S3 object key.