Skip to content

Storages

Laykery storages are used to persist data from a serializer. All storage implementations must support persisting singular blobs of data as well as streams of data when subclassing the Storage base class.

Defining a Storage

To define a storage you need to implement the Storage interface with the following:

  • name - a string that uniquely and permanently identifies the storage.
  • write_data - a method that saves a single blob of data to the storage.
  • read_data - a method that reads a single blob of data from the storage.
  • write_data_stream - a method that saves a stream of data to the storage.
  • read_data_stream - a method that reads a stream of data from the storage.

The code snippets below show a storage that saves data to files. You can start by implementing the write_data and read_data methods:

from pathlib import Path

from labox import Digest
from labox import Storage
from labox import TagMap


class FileStorage(Storage):
    name = "temp-file-storage@v1"

    def __init__(self, prefix: Path, *, read_chunk_size: int = 1024):
        self._prefix = prefix
        self._read_chunk_size = read_chunk_size

    async def write_data(self, data: bytes, digest: Digest, tags: TagMap) -> str:
        path = self._prefix / digest["content_hash"]
        with path.open("wb") as f:
            f.write(data)
        return str(path)

    async def read_data(self, path: str) -> bytes:
        with Path(path).open("rb") as f:
            return f.read()

You can then implement the write_data_stream and read_data_stream methods to handle streams of data. This is a bit trickier since the conten_hash of the serialized data is not known until the stream has been fully read. Consequently, the data must be written to a temporary file first, and then the final content_hash can be computed and the file renamed to the final key.

from collections.abc import AsyncGenerator
from collections.abc import AsyncIterator
from tempfile import NamedTemporaryFile


class FileStorage(Storage):
    ...

    async def write_data_stream(
        self,
        stream: AsyncIterator[bytes],
        get_digest: GetStreamDigest,
        tags: TagMap,
    ) -> str:
        with NamedTemporaryFile(dir=self._root) as temp_file:
            async for chunk in stream:
                temp_file.write(chunk)
            temp_file.flush()
            temp_file.seek(0)
            key = get_digest()["content_hash"]
            Path(temp_file.name).rename(self._root / key)
        return key

    async def read_data_stream(self, key: str) -> AsyncGenerator[bytes]:
        with (self._root / key).open("rb") as f:
            while chunk := f.read(self._read_chunk_size):
                yield chunk

Content Digest

The Digest for a piece of content passed to the write_* methods of a storage provides extra information about the content (e.g. its size, type, and hash). In the case of streamed data a function (GetStreamDigest) to retrieve the digest is passed as an argument instead. If the stream has not been fully consumed allow_incomplete=True must be passed when retrieving the digest to avoid a ValueError.

Best Practices

When implementing a storage, the most important thing to keep in mind is that a storage implementation must be able to read from any location is has written to. So, for example, if one of the configuration options your storage accepts is a path prefix (as in the example above), then this prefix must be included in the storage config returned by the write_data and write_data_stream methods. This way, when reading data, the storage can reconstruct the full path to the data even if the prefix may have changed since the data was written.

A pattern used within Labox when implementing a storage is to allow users to configure their storages with a "router" function that takes in the Digest and tags of the data being saved and returns a dictionary with the storage-specific information needed to locate the data later. In the case of the S3Storage, the router function must return an S3Pointer dictionary with the bucket and key where the data is stored. This forces the storage implementation to be agnostic about where it's been configured to save data while still allowing it to save data in a location that can be reconstructed later.

Storage Tags

In the example above, the write_data and write_data_stream methods accept atags argument. This is a dictionary of tags that were provided when saving the data.

Storage Config

When a storage saves data via its write_data and write_data_stream methods, information that is used to retrieve it later is returned. This information is called "storage config" and is distinct from the data which is being stored remotely. In the example above, the storage config is a string that forms part of a file path where the data was put. More generally though, these methods may return anything which is JSON serializable. You may customize how this data is serialized and deserialized by replacing the default Storage.serialize_config and Storage.deserialize_config methods. Ultimately this config is saved within the ContentRecord.storage_config column in the database.

Storage Names

Storages are identified by a globally unique name associated with each storage class within a registry. Once a storage has been used to saved data this name must never be changed and the implementation of the storage must always remain backwards compatible. If you need to change the implementation of a storage you should create a new one with a new name. In order to continue loading old data you'll have to preserve and registry the old storage.