Skip to content

Reference

datetime_serializer module-attribute

datetime_serializer = DatetimeSerializer().register()

An iso8601 serializer for datetime objects.

json_serializer module-attribute

json_serializer = JsonSerializer().register()

A serializer for JSON.

json_sorted_serializer module-attribute

json_sorted_serializer = JsonSerializer(
    sort_keys=True
).register()

A serializer for JSON with sorted keys

temp_file_storage module-attribute

temp_file_storage = FileSystemStorage(
    temp_dir.name
).register()

A temporary file storage backend (best for testing).

Artifact

Bases: Node[OrmArtifact], Generic[T]

A wrapper around an ORM artifact record.

Source code in src/artigraph/core/api/artifact.py
class Artifact(Node[OrmArtifact], Generic[T]):
    """A wrapper around an ORM artifact record."""

    graph_orm_type: ClassVar[type[OrmArtifact]] = OrmArtifact

    value: T
    serializer: Serializer | None = None
    storage: Storage | None = None

    async def graph_dump_self(self) -> OrmArtifact:
        if self.serializer is not None:
            serializer_name = self.serializer.name
            if self.value is None:
                data = None
            else:
                data = self.serializer.serialize(self.value)
        elif isinstance(self.value, bytes):
            serializer_name = None
            data = self.value
        else:
            serializer = get_serializer_by_type(type(self.value))[0]
            data = serializer.serialize(self.value)
            serializer_name = serializer.name

        if data is not None and self.storage is not None:
            location = await self.storage.create(data)
            artifact = OrmRemoteArtifact(
                id=self.graph_id,
                artifact_serializer=serializer_name,
                remote_artifact_storage=self.storage.name,
                remote_artifact_location=location,
            )
        else:
            artifact = OrmDatabaseArtifact(
                id=self.graph_id,
                artifact_serializer=serializer_name,
                database_artifact_data=data,
            )

        return artifact

    @classmethod
    async def _graph_load_extra_kwargs(cls, self_record: OrmArtifact) -> dict[str, Any]:
        return {
            **await super()._graph_load_extra_kwargs(self_record),
            "value": await load_deserialized_artifact_value(self_record),
            "serializer": (
                get_serializer_by_name(self_record.artifact_serializer)
                if self_record.artifact_serializer
                else None
            ),
            "storage": (
                get_storage_by_name(self_record.remote_artifact_storage)
                if isinstance(self_record, OrmRemoteArtifact)
                else None
            ),
        }

ArtifactFilter

Bases: NodeFilter[A]

Filter artifacts that meet the given conditions.

Source code in src/artigraph/core/api/filter.py
class ArtifactFilter(NodeFilter[A]):
    """Filter artifacts that meet the given conditions."""

    node_type: NodeTypeFilter[A] = field(
        # delay this in case tables are defined late
        default_factory=lambda: NodeTypeFilter(type=[OrmArtifact])  # type: ignore
    )
    """Artifacts must be one of these types."""

node_type class-attribute instance-attribute

node_type: NodeTypeFilter[A] = field(
    default_factory=lambda: NodeTypeFilter(
        type=[OrmArtifact]
    )
)

Artifacts must be one of these types.

DatetimeSerializer

Bases: Serializer

Serializer for datetime.datetime.

Source code in src/artigraph/core/serializer/datetime.py
class DatetimeSerializer(Serializer):
    """Serializer for datetime.datetime."""

    name = "artigraph-datetime"
    types = (datetime,)

    def serialize(self, value: datetime) -> bytes:
        """Serialize a datetime.datetime to an ISO 8601 string."""
        return value.isoformat().encode()

    def deserialize(self, value: bytes) -> datetime:
        """Deserialize an ISO 8601 string to a datetime.datetime."""
        return datetime.fromisoformat(value.decode())

deserialize

deserialize(value: bytes) -> datetime

Deserialize an ISO 8601 string to a datetime.datetime.

Source code in src/artigraph/core/serializer/datetime.py
def deserialize(self, value: bytes) -> datetime:
    """Deserialize an ISO 8601 string to a datetime.datetime."""
    return datetime.fromisoformat(value.decode())

serialize

serialize(value: datetime) -> bytes

Serialize a datetime.datetime to an ISO 8601 string.

Source code in src/artigraph/core/serializer/datetime.py
def serialize(self, value: datetime) -> bytes:
    """Serialize a datetime.datetime to an ISO 8601 string."""
    return value.isoformat().encode()

FileSystemStorage

Bases: Storage

A storage backend that saves artifacts to the filesystem.

Parameters:

Name Type Description Default
directory str | Path

The directory to save artifacts to.

required
name str

The name of the storage backend.

''
Source code in src/artigraph/core/storage/file.py
class FileSystemStorage(Storage):
    """A storage backend that saves artifacts to the filesystem.

    Parameters:
        directory: The directory to save artifacts to.
        name: The name of the storage backend.
    """

    def __init__(self, directory: str | Path, name: str = "") -> None:
        self.dir = Path(directory)
        self.name = slugify(f"artigraph-file-system-{name or self.dir}")

    async def create(self, data: bytes) -> str:
        """Create an artifact in the filesystem and return its location"""
        key = uuid4().hex
        path = self.dir / key
        path.write_bytes(data)
        return key

    async def read(self, key: str) -> bytes:
        """Read an artifact from the filesystem."""
        path = self.dir / key
        return path.read_bytes()

    async def update(self, key: str, data: bytes) -> None:
        """Update an artifact in the filesystem."""
        path = self.dir / key
        path.write_bytes(data)

    async def delete(self, key: str) -> None:
        """Delete an artifact from the filesystem."""
        path = self.dir / key
        path.unlink()

    async def exists(self, key: str) -> bool:
        """Check if an artifact exists in the filesystem."""
        path = self.dir / key
        return path.exists()

create async

create(data: bytes) -> str

Create an artifact in the filesystem and return its location

Source code in src/artigraph/core/storage/file.py
async def create(self, data: bytes) -> str:
    """Create an artifact in the filesystem and return its location"""
    key = uuid4().hex
    path = self.dir / key
    path.write_bytes(data)
    return key

delete async

delete(key: str) -> None

Delete an artifact from the filesystem.

Source code in src/artigraph/core/storage/file.py
async def delete(self, key: str) -> None:
    """Delete an artifact from the filesystem."""
    path = self.dir / key
    path.unlink()

exists async

exists(key: str) -> bool

Check if an artifact exists in the filesystem.

Source code in src/artigraph/core/storage/file.py
async def exists(self, key: str) -> bool:
    """Check if an artifact exists in the filesystem."""
    path = self.dir / key
    return path.exists()

read async

read(key: str) -> bytes

Read an artifact from the filesystem.

Source code in src/artigraph/core/storage/file.py
async def read(self, key: str) -> bytes:
    """Read an artifact from the filesystem."""
    path = self.dir / key
    return path.read_bytes()

update async

update(key: str, data: bytes) -> None

Update an artifact in the filesystem.

Source code in src/artigraph/core/storage/file.py
async def update(self, key: str, data: bytes) -> None:
    """Update an artifact in the filesystem."""
    path = self.dir / key
    path.write_bytes(data)

Filter

Bases: FrozenDataclass

Base class for where clauses.

Source code in src/artigraph/core/api/filter.py
class Filter(FrozenDataclass):
    """Base class for where clauses."""

    def create(self) -> Expression:
        """Return the condition represented by this filter."""
        return self.compose(_NO_OP)

    def compose(self, expr: Expression, /) -> Expression:
        """Apply the where clause to the given query."""
        raise NotImplementedError()

    def __and__(self, other: Filter) -> MultiFilter:
        """Combine this filter with another."""
        return MultiFilter(op="and", filters=(self, other))

    def __or__(self, other: Filter) -> MultiFilter:
        """Combine this filter with another."""
        return MultiFilter(op="or", filters=(self, other))

    def __str__(self) -> str:
        return str(self.create().compile(compile_kwargs={"literal_binds": True}))

__and__

__and__(other: Filter) -> MultiFilter

Combine this filter with another.

Source code in src/artigraph/core/api/filter.py
def __and__(self, other: Filter) -> MultiFilter:
    """Combine this filter with another."""
    return MultiFilter(op="and", filters=(self, other))

__or__

__or__(other: Filter) -> MultiFilter

Combine this filter with another.

Source code in src/artigraph/core/api/filter.py
def __or__(self, other: Filter) -> MultiFilter:
    """Combine this filter with another."""
    return MultiFilter(op="or", filters=(self, other))

compose

compose(expr: Expression) -> Expression

Apply the where clause to the given query.

Source code in src/artigraph/core/api/filter.py
def compose(self, expr: Expression, /) -> Expression:
    """Apply the where clause to the given query."""
    raise NotImplementedError()

create

create() -> Expression

Return the condition represented by this filter.

Source code in src/artigraph/core/api/filter.py
def create(self) -> Expression:
    """Return the condition represented by this filter."""
    return self.compose(_NO_OP)

GraphModel

Bases: GraphObject[OrmModelArtifact, OrmBase, NodeFilter[Any]]

A base for all modeled artifacts.

Source code in src/artigraph/core/model/base.py
class GraphModel(GraphObject[OrmModelArtifact, OrmBase, NodeFilter[Any]]):
    """A base for all modeled artifacts."""

    graph_id: UUID
    """The unique ID of this model."""

    graph_orm_type: ClassVar[type[OrmModelArtifact]] = OrmModelArtifact
    """The ORM type for this model."""

    graph_model_name: ClassVar[str]
    """The name of the artifact model."""

    graph_model_version: ClassVar[int] = 1
    """The version of the artifact model."""

    def graph_model_data(self) -> ModelData:
        """The data for the artifact model."""
        raise NotImplementedError()

    @classmethod
    def graph_model_init(
        cls,
        info: ModelInfo,  # noqa: ARG003
        kwargs: dict[str, Any],
        /,
    ) -> Self:  # nocov
        """Initialize the artifact model, migrating it if necessary."""
        return cls(**kwargs)

    def __init_subclass__(cls, version: int, **kwargs: Any):
        cls.graph_model_version = version

        if "graph_model_name" not in cls.__dict__:
            cls.graph_model_name = cls.__name__

        if not ALLOW_MODEL_TYPE_OVERWRITES.get() and cls.graph_model_name in MODEL_TYPE_BY_NAME:
            msg = (
                f"Artifact model {cls.graph_model_name!r} already exists "
                f"as {MODEL_TYPE_BY_NAME[cls.graph_model_name]}"
            )
            raise RuntimeError(msg)
        else:
            MODEL_TYPE_BY_NAME[cls.graph_model_name] = cls

        super().__init_subclass__(**kwargs)

    def graph_filter_self(self) -> NodeFilter[Any]:
        return NodeFilter(id=self.graph_id)

    @classmethod
    def graph_filter_related(cls, where: NodeFilter[Any]) -> dict[type[OrmBase], Filter]:
        return {
            OrmArtifact: NodeFilter(descendant_of=where),
            OrmLink: LinkFilter(ancestor=where),
        }

    async def graph_dump_self(self) -> OrmModelArtifact:
        metadata_dict = ModelMetadata(artigraph_version=artigraph_version)
        return self._graph_make_own_metadata_artifact(metadata_dict)

    async def graph_dump_related(self) -> Sequence[OrmBase]:
        dump_related: TaskBatch[Sequence[OrmBase]] = TaskBatch()
        for label, (value, spec) in self.graph_model_data().items():
            maybe_model = _try_convert_value_to_modeled_type(value)
            if spec.is_empty() and isinstance(maybe_model, GraphObject):
                dump_related.add(_dump_and_link, maybe_model, self.graph_id, label)
            else:
                art = spec.create_artifact(value)
                dump_related.add(_dump_and_link, art, self.graph_id, label)
        return [r for rs in await dump_related.gather() for r in rs]

    @classmethod
    async def graph_load(
        cls,
        self_records: Sequence[OrmModelArtifact],
        related_records: dict[type[OrmBase], Sequence[OrmBase]],
    ) -> Sequence[Self]:
        arts_dict_by_p_id = _get_labeled_artifacts_by_source_id(self_records, related_records)
        return [
            (
                await model_type._graph_load_from_labeled_artifacts_by_source_id(
                    art, arts_dict_by_p_id
                )
            )
            for art in self_records
            if issubclass(model_type := get_model_type_by_name(art.model_artifact_type_name), cls)
        ]

    @classmethod
    async def _graph_load_from_labeled_artifacts_by_source_id(
        cls,
        model_metadata_artifact: OrmModelArtifact,
        labeled_artifacts_by_source_id: LabeledArtifactsByParentId,
    ) -> Self:
        return cls.graph_model_init(
            ModelInfo(
                graph_id=model_metadata_artifact.id,
                version=model_metadata_artifact.model_artifact_version,
                metadata=await load_deserialized_artifact_value(model_metadata_artifact),
            ),
            await cls._graph_load_kwargs_from_labeled_artifacts_by_source_id(
                model_metadata_artifact,
                labeled_artifacts_by_source_id,
            ),
        )

    @classmethod
    async def _graph_load_kwargs_from_labeled_artifacts_by_source_id(
        cls,
        model_metadata_artifact: OrmModelArtifact,
        labeled_artifacts_by_source_id: LabeledArtifactsByParentId,
    ) -> dict[str, Any]:
        labeled_children = labeled_artifacts_by_source_id[model_metadata_artifact.id]

        load_field_values: TaskBatch[Any] = TaskBatch()
        for child in labeled_children.values():
            if isinstance(child, OrmModelArtifact):
                child_cls = get_model_type_by_name(child.model_artifact_type_name)
                load_field_values.add(
                    child_cls._graph_load_from_labeled_artifacts_by_source_id,
                    child,
                    labeled_artifacts_by_source_id,
                )
            else:
                load_field_values.add(
                    load_deserialized_artifact_value,
                    child,
                )

        return dict(zip(labeled_children.keys(), await load_field_values.gather()))

    def _graph_make_own_metadata_artifact(self, metadata: ModelMetadata) -> OrmModelArtifact:
        return OrmModelArtifact(
            id=self.graph_id,
            artifact_serializer=json_sorted_serializer.name,
            database_artifact_data=json_sorted_serializer.serialize(metadata),
            model_artifact_type_name=self.graph_model_name,
            model_artifact_version=self.graph_model_version,
        )

graph_id instance-attribute

graph_id: UUID

The unique ID of this model.

graph_model_name class-attribute

graph_model_name: str

The name of the artifact model.

graph_model_version class-attribute

graph_model_version: int = 1

The version of the artifact model.

graph_orm_type class-attribute

graph_orm_type: type[OrmModelArtifact] = OrmModelArtifact

The ORM type for this model.

graph_model_data

graph_model_data() -> ModelData

The data for the artifact model.

Source code in src/artigraph/core/model/base.py
def graph_model_data(self) -> ModelData:
    """The data for the artifact model."""
    raise NotImplementedError()

graph_model_init classmethod

graph_model_init(
    info: ModelInfo, kwargs: dict[str, Any]
) -> Self

Initialize the artifact model, migrating it if necessary.

Source code in src/artigraph/core/model/base.py
@classmethod
def graph_model_init(
    cls,
    info: ModelInfo,  # noqa: ARG003
    kwargs: dict[str, Any],
    /,
) -> Self:  # nocov
    """Initialize the artifact model, migrating it if necessary."""
    return cls(**kwargs)

GraphObject

Bases: ABC, Generic[S, R, F]

Base for objects that can be converted to and from Artigraph ORM records.

Source code in src/artigraph/core/api/base.py
class GraphObject(abc.ABC, Generic[S, R, F]):
    """Base for objects that can be converted to and from Artigraph ORM records."""

    graph_id: UUID
    """The ID of the object."""

    graph_orm_type: ClassVar[type[OrmBase]]
    """The ORM type that represents this object."""

    @abc.abstractmethod
    def graph_filter_self(self) -> F:
        """Get the filter for records of the ORM type that represent the object."""
        raise NotImplementedError()

    @abc.abstractmethod
    async def graph_dump_self(self) -> S:
        """Dump the object into an ORM record."""
        raise NotImplementedError()

    @abc.abstractmethod
    async def graph_dump_related(self) -> Sequence[R]:
        """Dump all other related objects into ORM records."""
        raise NotImplementedError()

    @classmethod
    @abc.abstractmethod
    def graph_filter_related(cls, self_filter: Filter, /) -> Mapping[type[R], Filter]:
        """Get the filters for records of related ORM records required to construct this object."""
        raise NotImplementedError()

    @classmethod
    @abc.abstractmethod
    async def graph_load(
        cls,
        self_records: Sequence[S],
        related_records: dict[type[R], Sequence[R]],
        /,
    ) -> Sequence[Self]:
        """Load ORM records into objects."""
        raise NotImplementedError()

graph_id instance-attribute

graph_id: UUID

The ID of the object.

graph_orm_type class-attribute

graph_orm_type: type[OrmBase]

The ORM type that represents this object.

graph_dump_related() -> Sequence[R]

Dump all other related objects into ORM records.

Source code in src/artigraph/core/api/base.py
@abc.abstractmethod
async def graph_dump_related(self) -> Sequence[R]:
    """Dump all other related objects into ORM records."""
    raise NotImplementedError()

graph_dump_self abstractmethod async

graph_dump_self() -> S

Dump the object into an ORM record.

Source code in src/artigraph/core/api/base.py
@abc.abstractmethod
async def graph_dump_self(self) -> S:
    """Dump the object into an ORM record."""
    raise NotImplementedError()
graph_filter_related(
    self_filter: Filter,
) -> Mapping[type[R], Filter]

Get the filters for records of related ORM records required to construct this object.

Source code in src/artigraph/core/api/base.py
@classmethod
@abc.abstractmethod
def graph_filter_related(cls, self_filter: Filter, /) -> Mapping[type[R], Filter]:
    """Get the filters for records of related ORM records required to construct this object."""
    raise NotImplementedError()

graph_filter_self abstractmethod

graph_filter_self() -> F

Get the filter for records of the ORM type that represent the object.

Source code in src/artigraph/core/api/base.py
@abc.abstractmethod
def graph_filter_self(self) -> F:
    """Get the filter for records of the ORM type that represent the object."""
    raise NotImplementedError()

graph_load abstractmethod async classmethod

graph_load(
    self_records: Sequence[S],
    related_records: dict[type[R], Sequence[R]],
) -> Sequence[Self]

Load ORM records into objects.

Source code in src/artigraph/core/api/base.py
@classmethod
@abc.abstractmethod
async def graph_load(
    cls,
    self_records: Sequence[S],
    related_records: dict[type[R], Sequence[R]],
    /,
) -> Sequence[Self]:
    """Load ORM records into objects."""
    raise NotImplementedError()

JsonSerializer

Bases: Serializer[Any]

A serializer for JSON.

Source code in src/artigraph/core/serializer/json.py
class JsonSerializer(Serializer[Any]):
    """A serializer for JSON."""

    types = (object,)

    def __init__(self, *, sort_keys: bool = False) -> None:
        self.name = f"artigraph-json-{'sorted' if sort_keys else 'unsorted'}"

    def serialize(self, value: Any) -> bytes:
        """Serialize a value."""
        return json.dumps(value, separators=(",", ":"), allow_nan=False).encode("utf-8")

    def deserialize(self, value: bytes) -> Any:
        """Deserialize a value."""
        return json.loads(value.decode("utf-8"))

deserialize

deserialize(value: bytes) -> Any

Deserialize a value.

Source code in src/artigraph/core/serializer/json.py
def deserialize(self, value: bytes) -> Any:
    """Deserialize a value."""
    return json.loads(value.decode("utf-8"))

serialize

serialize(value: Any) -> bytes

Serialize a value.

Source code in src/artigraph/core/serializer/json.py
def serialize(self, value: Any) -> bytes:
    """Serialize a value."""
    return json.dumps(value, separators=(",", ":"), allow_nan=False).encode("utf-8")

Bases: FrozenDataclass, GraphObject[L, OrmLink, LinkFilter]

A wrapper around an ORM node link record.

Source code in src/artigraph/core/api/link.py
class Link(FrozenDataclass, GraphObject[L, OrmLink, LinkFilter]):
    """A wrapper around an ORM node link record."""

    graph_orm_type: ClassVar[type[OrmLink]] = OrmLink
    """The ORM type for this node."""

    source_id: UUID
    """The ID of the parent node."""
    target_id: UUID
    """The ID of the child node."""
    label: str | None = None
    """A label for the link."""
    graph_id: UUID = field(default_factory=uuid1)
    """The unique ID of this link"""

    def graph_filter_self(self) -> LinkFilter:
        return LinkFilter(id=self.graph_id)

    @classmethod
    def graph_filter_related(cls, _: LinkFilter) -> dict:
        return {}

    async def graph_dump_self(self) -> OrmLink:
        return OrmLink(
            id=self.graph_id,
            target_id=self.target_id,
            source_id=self.source_id,
            label=self.label,
        )

    async def graph_dump_related(self) -> Sequence[Any]:
        return []

    @classmethod
    async def graph_load(cls, self_records: Sequence[OrmLink], _: dict) -> Sequence[Self]:
        return [
            cls(
                graph_id=r.id,
                target_id=r.target_id,
                source_id=r.source_id,
                label=r.label,
            )
            for r in self_records
        ]

graph_id class-attribute instance-attribute

graph_id: UUID = field(default_factory=uuid1)

The unique ID of this link

graph_orm_type class-attribute

graph_orm_type: type[OrmLink] = OrmLink

The ORM type for this node.

label class-attribute instance-attribute

label: str | None = None

A label for the link.

source_id instance-attribute

source_id: UUID

The ID of the parent node.

target_id instance-attribute

target_id: UUID

The ID of the child node.

LinkFilter

Bases: Filter

Filter node links.

Source code in src/artigraph/core/api/filter.py
class LinkFilter(Filter):
    """Filter node links."""

    id: ValueFilter[UUID] | UUID | None = None
    """Links must have this ID or meet this condition."""
    parent: NodeFilter | Sequence[UUID] | UUID | None = None
    """Links must have one of these nodes as their parent."""
    child: NodeFilter | Sequence[UUID] | UUID | None = None
    """Links must have one of these nodes as their child."""
    descendant: NodeFilter | Sequence[UUID] | UUID | None = None
    """Links must have one of these nodes as their descendant."""
    ancestor: NodeFilter | Sequence[UUID] | UUID | None = None
    """Links must have one of these nodes as their ancestor."""
    label: ValueFilter[str] | Sequence[str] | str | None = None

    def compose(self, expr: Expression) -> Expression:
        link_id = to_value_filter(self.id)
        source_id = to_node_id_selector(self.parent)
        target_id = to_node_id_selector(self.child)
        descendant_id = to_node_id_selector(self.descendant)
        ancestor_id = to_node_id_selector(self.ancestor)
        label = to_value_filter(self.label)

        if link_id is not None:
            expr &= link_id.against(OrmLink.id).create()

        if source_id is not None:
            expr &= OrmLink.source_id.in_(source_id)

        if target_id is not None:
            expr &= OrmLink.target_id.in_(target_id)

        if ancestor_id is not None:
            # Create a CTE to get the descendants recursively
            descendant_node_cte = (
                select(OrmLink.source_id.label("descendant_id"), OrmLink.target_id)
                .where(OrmLink.source_id.in_(ancestor_id))
                .cte(name="descendants", recursive=True)
            )

            # Recursive case: select the children of the current nodes
            child_node = aliased(OrmLink)
            descendant_node_cte = descendant_node_cte.union_all(
                select(child_node.source_id, child_node.target_id).where(
                    child_node.source_id == descendant_node_cte.c.target_id
                )
            )

            # Join the CTE with the actual Node table to get the descendants
            expr &= OrmLink.source_id.in_(
                select(descendant_node_cte.c.descendant_id).where(
                    descendant_node_cte.c.descendant_id.isnot(None)
                )
            )

        if descendant_id is not None:
            # Create a CTE to get the ancestors recursively
            ancestor_node_cte = (
                select(OrmLink.target_id.label("ancestor_id"), OrmLink.source_id)
                .where(OrmLink.target_id.in_(descendant_id))
                .cte(name="ancestors", recursive=True)
            )

            # Recursive case: select the parents of the current nodes
            parent_node = aliased(OrmLink)
            ancestor_node_cte = ancestor_node_cte.union_all(
                select(parent_node.target_id, parent_node.source_id).where(
                    parent_node.target_id == ancestor_node_cte.c.source_id
                )
            )

            # Join the CTE with the actual Node table to get the ancestors
            expr &= OrmLink.target_id.in_(
                select(ancestor_node_cte.c.ancestor_id).where(
                    ancestor_node_cte.c.ancestor_id.isnot(None)
                )
            )

        if label is not None:
            expr &= label.against(OrmLink.label).create()

        return expr

ancestor class-attribute instance-attribute

ancestor: NodeFilter | Sequence[UUID] | UUID | None = None

Links must have one of these nodes as their ancestor.

child class-attribute instance-attribute

child: NodeFilter | Sequence[UUID] | UUID | None = None

Links must have one of these nodes as their child.

descendant class-attribute instance-attribute

descendant: NodeFilter | Sequence[UUID] | UUID | None = None

Links must have one of these nodes as their descendant.

id class-attribute instance-attribute

id: ValueFilter[UUID] | UUID | None = None

Links must have this ID or meet this condition.

parent class-attribute instance-attribute

parent: NodeFilter | Sequence[UUID] | UUID | None = None

Links must have one of these nodes as their parent.

Linker

Bases: AnySyncContextManager['Linker']

A context manager for linking graph objects together

Source code in src/artigraph/core/linker.py
class Linker(AnySyncContextManager["Linker"]):
    """A context manager for linking graph objects together"""

    def __init__(self, node: GraphObject, label: str | None = None) -> None:
        self.node = node
        self.label = label
        self._labels: set[str] = set()
        self._write_on_enter: list[GraphObject] = [self.node]
        self._write_on_exit: list[GraphObject] = []

    def link(
        self,
        value: Any,
        label: str | None = None,
        storage: Storage | None = None,
        serializer: Serializer | None = None,
    ) -> None:
        """Link a graph object to the current node"""
        if label is not None:
            if label in self._labels:
                msg = f"Label {label} already exists for {self.node}"
                raise ValueError(msg)
            self._labels.add(label)

        if isinstance(value, GraphObject):
            graph_obj = value
            if storage is not None or serializer is not None:
                msg = "Cannot specify storage or serializer when linking a GraphObject"
                raise ValueError(msg)
        else:
            graph_obj = Artifact(value=value, storage=storage, serializer=serializer)

        self._write_on_exit.extend(
            [
                graph_obj,
                Link(
                    source_id=self.node.graph_id,
                    target_id=graph_obj.graph_id,
                    label=label,
                ),
            ]
        )

    async def _aenter(self) -> Self:
        await write_many.a(self._write_on_enter)
        return self

    async def _aexit(self, *_: Any) -> None:
        await write_many.a(self._write_on_exit)

    def _enter(self) -> None:
        self.parent = _CURRENT_LINKER.get()
        if self.parent is not None:
            self._write_on_enter.append(
                Link(
                    source_id=self.parent.node.graph_id,
                    target_id=self.node.graph_id,
                    label=self.label,
                )
            )
        self._reset_parent = _CURRENT_LINKER.set(self)

    def _exit(self) -> None:
        _CURRENT_LINKER.reset(self._reset_parent)
link(
    value: Any,
    label: str | None = None,
    storage: Storage | None = None,
    serializer: Serializer | None = None,
) -> None

Link a graph object to the current node

Source code in src/artigraph/core/linker.py
def link(
    self,
    value: Any,
    label: str | None = None,
    storage: Storage | None = None,
    serializer: Serializer | None = None,
) -> None:
    """Link a graph object to the current node"""
    if label is not None:
        if label in self._labels:
            msg = f"Label {label} already exists for {self.node}"
            raise ValueError(msg)
        self._labels.add(label)

    if isinstance(value, GraphObject):
        graph_obj = value
        if storage is not None or serializer is not None:
            msg = "Cannot specify storage or serializer when linking a GraphObject"
            raise ValueError(msg)
    else:
        graph_obj = Artifact(value=value, storage=storage, serializer=serializer)

    self._write_on_exit.extend(
        [
            graph_obj,
            Link(
                source_id=self.node.graph_id,
                target_id=graph_obj.graph_id,
                label=label,
            ),
        ]
    )

ModelFilter

Bases: ArtifactFilter[OrmModelArtifact], Generic[M]

A filter for models.

Source code in src/artigraph/core/model/filter.py
class ModelFilter(ArtifactFilter[OrmModelArtifact], Generic[M]):
    """A filter for models."""

    node_type: NodeTypeFilter[OrmModelArtifact] = NodeTypeFilter(type=[OrmModelArtifact])
    """Models must be one of these types."""
    model_type: Sequence[ModelTypeFilter[M]] | ModelTypeFilter[M] | type[M] | None = None
    """Models must be one of these types."""

    def compose(self, expr: Expression) -> Expression:
        expr = super().compose(expr)

        model_type = to_sequence_or_none(self.model_type)

        if model_type:
            expr &= MultiFilter(
                op="or",
                filters=[_to_model_type_filter(mt) for mt in model_type],
            ).create()

        return expr

model_type class-attribute instance-attribute

model_type: Sequence[ModelTypeFilter[M]] | ModelTypeFilter[
    M
] | type[M] | None = None

Models must be one of these types.

node_type class-attribute instance-attribute

node_type: NodeTypeFilter[
    OrmModelArtifact
] = NodeTypeFilter(type=[OrmModelArtifact])

Models must be one of these types.

ModelInfo dataclass

The info for an artifact model.

Source code in src/artigraph/core/model/base.py
@dataclass(frozen=True)
class ModelInfo:
    """The info for an artifact model."""

    graph_id: UUID
    """The unique ID of the artifact model."""
    version: int
    """The version of the artifact model."""
    metadata: ModelMetadata
    """The metadata for the artifact model"""

graph_id instance-attribute

graph_id: UUID

The unique ID of the artifact model.

metadata instance-attribute

metadata: ModelMetadata

The metadata for the artifact model

version instance-attribute

version: int

The version of the artifact model.

ModelMetadata

Bases: TypedDict

The metadata for an artifact model.

Source code in src/artigraph/core/model/base.py
class ModelMetadata(TypedDict):
    """The metadata for an artifact model."""

    artigraph_version: str
    """The version of Artigraph used to generate the model"""

artigraph_version instance-attribute

artigraph_version: str

The version of Artigraph used to generate the model

ModelTypeFilter

Bases: Generic[M], Filter

Filter models by their type and version

Source code in src/artigraph/core/model/filter.py
class ModelTypeFilter(Generic[M], Filter):
    """Filter models by their type and version"""

    type: type[M]
    """Models must be this type."""
    version: ValueFilter | int | None = None
    """Models must be this version."""
    subclasses: bool = True
    """If True, include subclasses of the given model type."""

    def compose(self, expr: Expression) -> Expression:
        version = to_value_filter(self.version)

        if self.subclasses:
            expr &= OrmModelArtifact.model_artifact_type_name.in_(
                [m.graph_model_name for m in get_subclasses(self.type)]
            )
        else:
            expr &= OrmModelArtifact.model_artifact_type_name == self.type.graph_model_name

        if version:
            expr &= version.against(OrmModelArtifact.model_artifact_version).create()

        return expr

subclasses class-attribute instance-attribute

subclasses: bool = True

If True, include subclasses of the given model type.

type instance-attribute

type: type[M]

Models must be this type.

version class-attribute instance-attribute

version: ValueFilter | int | None = None

Models must be this version.

Node

Bases: FrozenDataclass, GraphObject[N, OrmLink, NodeFilter[Any]]

A wrapper around an ORM node record.

Source code in src/artigraph/core/api/node.py
class Node(FrozenDataclass, GraphObject[N, OrmLink, NodeFilter[Any]]):
    """A wrapper around an ORM node record."""

    graph_orm_type: ClassVar[type[OrmNode]] = OrmNode
    """The ORM type for this node."""

    graph_id: UUID = field(default_factory=uuid1)
    """The unique ID of this node"""

    def graph_filter_self(self) -> NodeFilter[Any]:
        return NodeFilter(id=self.graph_id)

    async def graph_dump_self(self) -> OrmNode:
        return OrmNode(id=self.graph_id)

    async def graph_dump_related(self) -> Sequence[Any]:
        return []

    @classmethod
    def graph_filter_related(cls, where: NodeFilter[Any]) -> dict[type[OrmLink], Filter]:
        return {OrmLink: LinkFilter(parent=where) | LinkFilter(child=where)}

    @classmethod
    async def graph_load(
        cls,
        self_records: Sequence[N],
        related_records: dict[type[OrmLink], Sequence[OrmLink]],  # noqa: ARG003
    ) -> Sequence[Self]:
        return [
            cls(
                graph_id=r.id,
                **(await cls._graph_load_extra_kwargs(r)),
            )
            for r in self_records
        ]

    @classmethod
    async def _graph_load_extra_kwargs(
        cls,
        self_record: N,  # noqa: ARG003
    ) -> dict[str, Any]:
        return {}

graph_id class-attribute instance-attribute

graph_id: UUID = field(default_factory=uuid1)

The unique ID of this node

graph_orm_type class-attribute

graph_orm_type: type[OrmNode] = OrmNode

The ORM type for this node.

NodeFilter

Bases: Filter, Generic[N]

Filter nodes that meet the given conditions

Source code in src/artigraph/core/api/filter.py
class NodeFilter(Filter, Generic[N]):
    """Filter nodes that meet the given conditions"""

    id: ValueFilter[UUID] | Sequence[UUID] | UUID | None = None
    """Nodes must have this ID or meet this condition."""
    node_type: NodeTypeFilter[N] | type[N] | None = None
    """Nodes must be one of these types."""
    created_at: ValueFilter[datetime] | datetime | None = None
    """Filter nodes by their creation time."""
    updated_at: ValueFilter[datetime] | datetime | None = None
    """Filter nodes by their last update time."""
    parent_of: NodeFilter | Sequence[UUID] | UUID | None = None
    """Nodes must be the parent of one of these nodes."""
    child_of: NodeFilter | Sequence[UUID] | UUID | None = None
    """Nodes must be the child of one of these nodes."""
    descendant_of: NodeFilter | Sequence[UUID] | UUID | None = None
    """Nodes must be the descendant of one of these nodes."""
    ancestor_of: NodeFilter | Sequence[UUID] | UUID | None = None
    """Nodes must be the ancestor of one of these nodes."""
    label: ValueFilter[str] | Sequence[str] | str | None = None
    """Nodes must have a link with one of these labels."""

    def compose(self, expr: Expression) -> Expression:
        node_id = to_value_filter(self.id)
        created_at = to_value_filter(self.created_at)
        updated_at = to_value_filter(self.updated_at)

        if node_id is not None:
            expr &= node_id.against(OrmNode.id).create()

        if self.node_type is not None:
            expr &= (
                self.node_type
                if isinstance(self.node_type, NodeTypeFilter)
                else NodeTypeFilter(type=[self.node_type])
            ).create()

        if created_at:
            expr &= created_at.against(OrmNode.created_at).create()

        if updated_at:
            expr &= updated_at.against(OrmNode.updated_at).create()

        if self.parent_of or self.ancestor_of:
            expr &= OrmNode.id.in_(
                select(OrmLink.source_id).where(
                    LinkFilter(child=self.parent_of, descendant=self.ancestor_of).create()
                )
            )

        if self.child_of or self.descendant_of:
            expr &= OrmNode.id.in_(
                select(OrmLink.target_id).where(
                    LinkFilter(parent=self.child_of, ancestor=self.descendant_of).create()
                )
            )

        if self.label:
            expr &= OrmNode.id.in_(
                select(OrmLink.target_id).where(LinkFilter(label=self.label).create())
            )

        return expr

ancestor_of class-attribute instance-attribute

ancestor_of: NodeFilter | Sequence[
    UUID
] | UUID | None = None

Nodes must be the ancestor of one of these nodes.

child_of class-attribute instance-attribute

child_of: NodeFilter | Sequence[UUID] | UUID | None = None

Nodes must be the child of one of these nodes.

created_at class-attribute instance-attribute

created_at: ValueFilter[datetime] | datetime | None = None

Filter nodes by their creation time.

descendant_of class-attribute instance-attribute

descendant_of: NodeFilter | Sequence[
    UUID
] | UUID | None = None

Nodes must be the descendant of one of these nodes.

id class-attribute instance-attribute

id: ValueFilter[UUID] | Sequence[UUID] | UUID | None = None

Nodes must have this ID or meet this condition.

label class-attribute instance-attribute

label: ValueFilter[str] | Sequence[str] | str | None = None

Nodes must have a link with one of these labels.

node_type class-attribute instance-attribute

node_type: NodeTypeFilter[N] | type[N] | None = None

Nodes must be one of these types.

parent_of class-attribute instance-attribute

parent_of: NodeFilter | Sequence[UUID] | UUID | None = None

Nodes must be the parent of one of these nodes.

updated_at class-attribute instance-attribute

updated_at: ValueFilter[datetime] | datetime | None = None

Filter nodes by their last update time.

NodeTypeFilter

Bases: Filter, Generic[N]

Filter nodes by their type.

Source code in src/artigraph/core/api/filter.py
class NodeTypeFilter(Filter, Generic[N]):
    """Filter nodes by their type."""

    subclasses: bool = True
    """Consider subclasses of the given types when filtering."""
    type: Sequence[type[N]] | type[N] | None = None
    """Nodes must be one of these types."""
    not_type: Sequence[type[N]] | type[N] | None = None
    """Nodes must not be one of these types."""

    def compose(self, expr: Expression) -> Expression:
        type_in = to_sequence_or_none(self.type)
        type_not_in = to_sequence_or_none(self.not_type)

        if type_in is not None:
            polys_in = get_polymorphic_identities(type_in, subclasses=self.subclasses)
            expr &= OrmNode.node_type.in_(polys_in)

        if type_not_in is not None:
            polys_not_in = get_polymorphic_identities(type_not_in, subclasses=self.subclasses)
            expr &= OrmNode.node_type.notin_(polys_not_in)

        return expr

not_type class-attribute instance-attribute

not_type: Sequence[type[N]] | type[N] | None = None

Nodes must not be one of these types.

subclasses class-attribute instance-attribute

subclasses: bool = True

Consider subclasses of the given types when filtering.

type class-attribute instance-attribute

type: Sequence[type[N]] | type[N] | None = None

Nodes must be one of these types.

OrmArtifact

Bases: OrmNode

A base class for artifacts.

Source code in src/artigraph/core/orm/artifact.py
class OrmArtifact(OrmNode):
    """A base class for artifacts."""

    __table_args__ = (UniqueConstraint("node_source_id", "artifact_label"),)
    __mapper_args__: ClassVar[dict[str, Any]] = {"polymorphic_abstract": True}

    artifact_serializer: Mapped[str | None] = mapped_column(index=True)
    """The name of the serializer used to serialize the artifact."""

artifact_serializer class-attribute instance-attribute

artifact_serializer: Mapped[str | None] = mapped_column(
    index=True
)

The name of the serializer used to serialize the artifact.

OrmBase

Bases: MappedAsDataclass, DeclarativeBase

A base class for all database models.

Source code in src/artigraph/core/orm/base.py
class OrmBase(MappedAsDataclass, DeclarativeBase):
    """A base class for all database models."""

    __mapper_args__: ClassVar[dict[str, Any]] = {}

    _: KW_ONLY

    def __init_subclass__(cls, **kwargs: Any) -> None:
        super().__init_subclass__(**kwargs)

        tablename = getattr(cls, "__tablename__", None)
        if not tablename:  # nocov
            return

        if not cls.__mapper_args__.get("polymorphic_abstract"):
            poly_id = cls.__mapper_args__.get("polymorphic_identity")
            if poly_id is not None:
                t_and_p = (cls.__tablename__, poly_id)
                maybe_conflict_cls = _ORM_TYPE_BY_TABLE_AND_POLY_ID.setdefault(t_and_p, cls)
                if cls is not maybe_conflict_cls:  # nocov
                    msg = f"Polymorphic ID {poly_id} exists as {maybe_conflict_cls}"
                    raise ValueError(msg)
        rank = 0
        for c in inspect(cls).columns:
            for fk in c.foreign_keys:
                try:
                    col = fk.column
                except NoReferencedTableError as e:  # nocov
                    msg = "Artigraph does not support deferred foreign keys at this time."
                    raise RuntimeError(msg) from e
                if col.table.name != tablename:
                    other_rank = _FK_DEPENDENCY_RANK_BY_TABLE_NAME[fk.column.table.name] + 1
                    rank = max(rank, other_rank)
        _FK_DEPENDENCY_RANK_BY_TABLE_NAME[tablename] = rank

    created_at: Mapped[datetime] = mapped_column(
        nullable=False,
        default_factory=func.now,
        init=False,
    )
    """The time that this node link was created."""

    updated_at: Mapped[datetime] = mapped_column(
        nullable=False,
        default_factory=func.now,
        onupdate=func.now(),
        init=False,
    )
    """The time that this node link was last updated."""

created_at class-attribute instance-attribute

created_at: Mapped[datetime] = mapped_column(
    nullable=False, default_factory=func.now, init=False
)

The time that this node link was created.

updated_at class-attribute instance-attribute

updated_at: Mapped[datetime] = mapped_column(
    nullable=False,
    default_factory=func.now,
    onupdate=func.now(),
    init=False,
)

The time that this node link was last updated.

OrmDatabaseArtifact

Bases: OrmArtifact

An artifact saved directly in the database.

Source code in src/artigraph/core/orm/artifact.py
class OrmDatabaseArtifact(OrmArtifact):
    """An artifact saved directly in the database."""

    polymorphic_identity = "database_artifact"
    __mapper_args__: ClassVar[dict[str, Any]] = {"polymorphic_identity": polymorphic_identity}

    database_artifact_data: Mapped[bytes | None]
    """The data of the artifact."""

database_artifact_data instance-attribute

database_artifact_data: Mapped[bytes | None]

The data of the artifact.

Bases: OrmBase

A link between two nodes.

Source code in src/artigraph/core/orm/link.py
class OrmLink(OrmBase):
    """A link between two nodes."""

    __tablename__ = "artigraph_link"
    __table_args__ = (
        UniqueConstraint("source_id", "target_id"),
        UniqueConstraint("source_id", "label"),
    )

    id: Mapped[UUID] = mapped_column(primary_key=True)
    """The ID of the link."""
    target_id: Mapped[UUID] = mapped_column(ForeignKey(OrmNode.id), nullable=False, index=True)
    """The ID of the node to which this link points."""
    source_id: Mapped[UUID] = mapped_column(ForeignKey(OrmNode.id), nullable=False, index=True)
    """The ID of the node from which this link originates."""
    label: Mapped[str | None] = mapped_column(nullable=True, default=None, index=True)
    """A label for the link - labels must be unique for a given source node."""

id class-attribute instance-attribute

id: Mapped[UUID] = mapped_column(primary_key=True)

The ID of the link.

label class-attribute instance-attribute

label: Mapped[str | None] = mapped_column(
    nullable=True, default=None, index=True
)

A label for the link - labels must be unique for a given source node.

source_id class-attribute instance-attribute

source_id: Mapped[UUID] = mapped_column(
    ForeignKey(OrmNode.id), nullable=False, index=True
)

The ID of the node from which this link originates.

target_id class-attribute instance-attribute

target_id: Mapped[UUID] = mapped_column(
    ForeignKey(OrmNode.id), nullable=False, index=True
)

The ID of the node to which this link points.

OrmModelArtifact

Bases: OrmDatabaseArtifact

An artifact that is a model.

Source code in src/artigraph/core/orm/artifact.py
class OrmModelArtifact(OrmDatabaseArtifact):
    """An artifact that is a model."""

    polymorphic_identity = "model_artifact"
    __mapper_args__: ClassVar[dict[str, Any]] = {"polymorphic_identity": polymorphic_identity}

    model_artifact_type_name: Mapped[str] = mapped_column(nullable=True, index=True)
    """The type of the model."""

    model_artifact_version: Mapped[int] = mapped_column(nullable=True, index=True)
    """The version of the model."""

model_artifact_type_name class-attribute instance-attribute

model_artifact_type_name: Mapped[str] = mapped_column(
    nullable=True, index=True
)

The type of the model.

model_artifact_version class-attribute instance-attribute

model_artifact_version: Mapped[int] = mapped_column(
    nullable=True, index=True
)

The version of the model.

OrmNode

Bases: OrmBase

A base class for describing a node in a graph.

Source code in src/artigraph/core/orm/node.py
class OrmNode(OrmBase, **_node_dataclass_kwargs):
    """A base class for describing a node in a graph."""

    def __init_subclass__(cls, **kwargs: Any) -> None:
        cls._shuttle_table_args()
        cls._set_polymorphic_identity()
        super().__init_subclass__(**kwargs)

    @classmethod
    def is_abstract(cls) -> bool:
        """Returns True if the class is abstract. That is, it defines a polymorphic identity."""
        return "polymorphic_identity" not in cls.__dict__

    polymorphic_identity: ClassVar[str] = "node"
    """The type of the node - should be overridden by subclasses and passed to mapper args."""

    __tablename__ = "artigraph_node"
    __mapper_args__: ClassVar[dict[str, Any]] = {
        "polymorphic_identity": polymorphic_identity,
        "polymorphic_on": "node_type",
    }

    id: Mapped[UUID] = mapped_column(primary_key=True)
    """The unique ID of this node"""

    node_type: Mapped[str] = mapped_column(nullable=False, init=False, index=True)
    """The type of the node link."""

    @classmethod
    def _shuttle_table_args(cls: type[OrmNode]) -> None:
        """Transfer table args from non-table subclasses to the base which has a table.

        This method exists because __table_args__ cannot be define on subclasses without
        a __tablename__. Since we're using single table inheritance this effectively means
        subclasses cannot specify __table_args__. To work around this, we transfer the
        any __table_args__ defined on a subclass to the first base that has a __tablename__
        (which is Node) before SQLAlchemy complains.
        """
        if "__table_args__" in cls.__dict__:
            table_args = cls.__table_args__
            for parent_cls in cls.mro():  # nocov (this)
                if hasattr(parent_cls, "__tablename__"):
                    cls.__table_args__ += table_args
                    break
            del cls.__table_args__

    @classmethod
    def _set_polymorphic_identity(cls: type[OrmNode]) -> None:
        """Sets a polymorphic identity attribute on the class for easier use."""
        poly_id: str
        for c in cls.mro():
            mapper_args = getattr(c, "__mapper_args__", {})
            if "polymorphic_identity" in mapper_args:
                poly_id = mapper_args["polymorphic_identity"]
                break
        else:  # nocov
            msg = "No polymorphic_identity found in mro"
            raise TypeError(msg)
        if poly_id != cls.polymorphic_identity:
            msg = (
                f"polymorphic_identity class attribute {cls.polymorphic_identity!r} "
                f"does not match value from __mapper_args__ {poly_id!r}"
            )
            raise ValueError(msg)

id class-attribute instance-attribute

id: Mapped[UUID] = mapped_column(primary_key=True)

The unique ID of this node

node_type class-attribute instance-attribute

node_type: Mapped[str] = mapped_column(
    nullable=False, init=False, index=True
)

The type of the node link.

polymorphic_identity class-attribute

polymorphic_identity: str = 'node'

The type of the node - should be overridden by subclasses and passed to mapper args.

is_abstract classmethod

is_abstract() -> bool

Returns True if the class is abstract. That is, it defines a polymorphic identity.

Source code in src/artigraph/core/orm/node.py
@classmethod
def is_abstract(cls) -> bool:
    """Returns True if the class is abstract. That is, it defines a polymorphic identity."""
    return "polymorphic_identity" not in cls.__dict__

OrmRemoteArtifact

Bases: OrmArtifact

An artifact saved via a storage backend.

Source code in src/artigraph/core/orm/artifact.py
class OrmRemoteArtifact(OrmArtifact):
    """An artifact saved via a storage backend."""

    polymorphic_identity = "remote_artifact"
    __mapper_args__: ClassVar[dict[str, Any]] = {"polymorphic_identity": polymorphic_identity}

    remote_artifact_storage: Mapped[str] = mapped_column(nullable=True, index=True)
    """The name of the storage method for the artifact."""

    remote_artifact_location: Mapped[str] = mapped_column(nullable=True, index=True)
    """A string describing where the artifact is stored."""

remote_artifact_location class-attribute instance-attribute

remote_artifact_location: Mapped[str] = mapped_column(
    nullable=True, index=True
)

A string describing where the artifact is stored.

remote_artifact_storage class-attribute instance-attribute

remote_artifact_storage: Mapped[str] = mapped_column(
    nullable=True, index=True
)

The name of the storage method for the artifact.

SaveSpec dataclass

Information about how to save an artifact.

Source code in src/artigraph/core/api/artifact.py
@dataclass(frozen=True)
class SaveSpec:
    """Information about how to save an artifact."""

    serializers: Sequence[Serializer] = ()
    """The serializers to try when saving the artifact."""

    storage: Storage | None = None
    """The storage to use when saving the artifact."""

    def is_empty(self) -> bool:
        """Return whether this save spec is empty."""
        return not self.serializers and self.storage is None

    def create_artifact(self, value: T, *, strict: bool = False) -> Artifact[T]:
        """Create an artifact from a value."""
        if isinstance(value, bytes):
            return Artifact(value=value, serializer=None, storage=self.storage)

        for s in self.serializers:
            if isinstance(value, s.types):
                return Artifact(value=value, serializer=s, storage=self.storage)

        if strict:
            if not self.serializers:
                msg = f"No serializers specified for {value!r}"
                raise ValueError(msg)
            allowed_types = ", ".join([t.__name__ for s in self.serializers for t in s.types])
            msg = f"Expected {allowed_types} - got {value!r}"
            raise TypeError(msg)

        serializer = get_serializer_by_type(type(value))[0]
        return Artifact(value=value, serializer=serializer, storage=self.storage)

serializers class-attribute instance-attribute

serializers: Sequence[Serializer] = ()

The serializers to try when saving the artifact.

storage class-attribute instance-attribute

storage: Storage | None = None

The storage to use when saving the artifact.

create_artifact

create_artifact(
    value: T, *, strict: bool = False
) -> Artifact[T]

Create an artifact from a value.

Source code in src/artigraph/core/api/artifact.py
def create_artifact(self, value: T, *, strict: bool = False) -> Artifact[T]:
    """Create an artifact from a value."""
    if isinstance(value, bytes):
        return Artifact(value=value, serializer=None, storage=self.storage)

    for s in self.serializers:
        if isinstance(value, s.types):
            return Artifact(value=value, serializer=s, storage=self.storage)

    if strict:
        if not self.serializers:
            msg = f"No serializers specified for {value!r}"
            raise ValueError(msg)
        allowed_types = ", ".join([t.__name__ for s in self.serializers for t in s.types])
        msg = f"Expected {allowed_types} - got {value!r}"
        raise TypeError(msg)

    serializer = get_serializer_by_type(type(value))[0]
    return Artifact(value=value, serializer=serializer, storage=self.storage)

is_empty

is_empty() -> bool

Return whether this save spec is empty.

Source code in src/artigraph/core/api/artifact.py
def is_empty(self) -> bool:
    """Return whether this save spec is empty."""
    return not self.serializers and self.storage is None

Serializer

Bases: ABC, Generic[T]

A type of artifact that can be serialized to a string or bytes.

Source code in src/artigraph/core/serializer/base.py
class Serializer(ABC, Generic[T]):
    """A type of artifact that can be serialized to a string or bytes."""

    name: str
    """A globally unique name for this serializer.

    This will typically be of the form "library_name.SerializerName". You should avoid
    using dynamic values like `__name__` or `__qualname__` as these may change between
    versions of the library or if you move the class to a different module.

    The serializer name will be used to recover this class from a when deserializing
    artifacts so it must not change between versions of the library. If you need to
    change the name, you should create and register a subclass with the new name and
    deprecate the old one.
    """

    types: tuple[type[T], ...]
    """The types of values this serializer supports."""

    def register(self) -> Self:
        """Register a serializer.

        It's recommended that each serializer be defined and registerd in a separate module
        so that users can select which serializers they want to use by importing the module.
        Thus if a user does not import a serializer if will not be registered. This is
        important for two reasons:

        1. It allows users to avoid importing dependencies they don't need.
        2. Serializers that supprt the same type will override each other - only the last one
        registered will be used unless the user explicitly selects one.
        """
        if not isinstance(self, Serializer):  # nocov
            msg = f"{self} is not of Serializer"
            raise ValueError(msg)

        if self.name in SERIALIZERS_BY_NAME:
            msg = f"Serializer named {self.name!r} already registered."
            raise ValueError(msg)
        SERIALIZERS_BY_NAME[self.name] = self

        for t in self.types:
            SERIALIZERS_BY_TYPE[t] = (*SERIALIZERS_BY_TYPE.get(t, ()), self)

        return self

    @abstractmethod
    def serialize(self, value: T, /) -> bytes:
        """Serialize a value to a string or bytes."""
        raise NotImplementedError()  # nocov

    @abstractmethod
    def deserialize(self, value: bytes, /) -> T:
        """Deserialize a string or bytes to a value."""
        raise NotImplementedError()  # nocov

name instance-attribute

name: str

A globally unique name for this serializer.

This will typically be of the form "library_name.SerializerName". You should avoid using dynamic values like __name__ or __qualname__ as these may change between versions of the library or if you move the class to a different module.

The serializer name will be used to recover this class from a when deserializing artifacts so it must not change between versions of the library. If you need to change the name, you should create and register a subclass with the new name and deprecate the old one.

types instance-attribute

types: tuple[type[T], ...]

The types of values this serializer supports.

deserialize abstractmethod

deserialize(value: bytes) -> T

Deserialize a string or bytes to a value.

Source code in src/artigraph/core/serializer/base.py
@abstractmethod
def deserialize(self, value: bytes, /) -> T:
    """Deserialize a string or bytes to a value."""
    raise NotImplementedError()  # nocov

register

register() -> Self

Register a serializer.

It's recommended that each serializer be defined and registerd in a separate module so that users can select which serializers they want to use by importing the module. Thus if a user does not import a serializer if will not be registered. This is important for two reasons:

  1. It allows users to avoid importing dependencies they don't need.
  2. Serializers that supprt the same type will override each other - only the last one registered will be used unless the user explicitly selects one.
Source code in src/artigraph/core/serializer/base.py
def register(self) -> Self:
    """Register a serializer.

    It's recommended that each serializer be defined and registerd in a separate module
    so that users can select which serializers they want to use by importing the module.
    Thus if a user does not import a serializer if will not be registered. This is
    important for two reasons:

    1. It allows users to avoid importing dependencies they don't need.
    2. Serializers that supprt the same type will override each other - only the last one
    registered will be used unless the user explicitly selects one.
    """
    if not isinstance(self, Serializer):  # nocov
        msg = f"{self} is not of Serializer"
        raise ValueError(msg)

    if self.name in SERIALIZERS_BY_NAME:
        msg = f"Serializer named {self.name!r} already registered."
        raise ValueError(msg)
    SERIALIZERS_BY_NAME[self.name] = self

    for t in self.types:
        SERIALIZERS_BY_TYPE[t] = (*SERIALIZERS_BY_TYPE.get(t, ()), self)

    return self

serialize abstractmethod

serialize(value: T) -> bytes

Serialize a value to a string or bytes.

Source code in src/artigraph/core/serializer/base.py
@abstractmethod
def serialize(self, value: T, /) -> bytes:
    """Serialize a value to a string or bytes."""
    raise NotImplementedError()  # nocov

Storage

Bases: ABC

A storage backend for artifacts.

Source code in src/artigraph/core/storage/base.py
class Storage(ABC):
    """A storage backend for artifacts."""

    name: str
    """A globally unique name for this storage.

    This will typically be of the form "library_name-storage_name". You should avoid
    using dynamic values like `__name__` or `__qualname__` as these may change between
    versions of the library or if you move the class to a different module.

    The storage name will be used to recover this class when loading data from records.
    It must not change between versions of the library. If you need to change the name,
    you should create and register a subclass with the new name and deprecate the old
    one.
    """

    def register(self) -> Self:
        """Register a storage backend.

        It's recommended that each storage backend be defined and registerd in a separate
        module so that users can select which storage they want to use by importing the module.
        Thus, if a user does not import a storage backend it will not be registered. This is
        important because some storage backends may have dependencies that are not installed.
        """
        if self.name in STORAGE_BY_NAME:
            msg = (
                f"Serializer named {self.name!r} already "
                f"registered as {STORAGE_BY_NAME[self.name]!r}"
            )
            raise ValueError(msg)

        STORAGE_BY_NAME[self.name] = self

        return self

    @abstractmethod
    async def create(self, data: bytes, /) -> str:
        """Create the artifact data and return its location."""
        ...

    @abstractmethod
    async def read(self, location: str, /) -> bytes:
        """Read artifact data from the given location."""
        ...

    @abstractmethod
    async def update(self, location: str, data: bytes, /) -> None:
        """Update artifact data at the given location."""
        ...

    @abstractmethod
    async def delete(self, location: str, /) -> None:
        """Delete artifact data at the given location."""
        ...

    @abstractmethod
    async def exists(self, location: str, /) -> bool:
        """Check if artifact data exists at the given location."""
        ...

name instance-attribute

name: str

A globally unique name for this storage.

This will typically be of the form "library_name-storage_name". You should avoid using dynamic values like __name__ or __qualname__ as these may change between versions of the library or if you move the class to a different module.

The storage name will be used to recover this class when loading data from records. It must not change between versions of the library. If you need to change the name, you should create and register a subclass with the new name and deprecate the old one.

create abstractmethod async

create(data: bytes) -> str

Create the artifact data and return its location.

Source code in src/artigraph/core/storage/base.py
@abstractmethod
async def create(self, data: bytes, /) -> str:
    """Create the artifact data and return its location."""
    ...

delete abstractmethod async

delete(location: str) -> None

Delete artifact data at the given location.

Source code in src/artigraph/core/storage/base.py
@abstractmethod
async def delete(self, location: str, /) -> None:
    """Delete artifact data at the given location."""
    ...

exists abstractmethod async

exists(location: str) -> bool

Check if artifact data exists at the given location.

Source code in src/artigraph/core/storage/base.py
@abstractmethod
async def exists(self, location: str, /) -> bool:
    """Check if artifact data exists at the given location."""
    ...

read abstractmethod async

read(location: str) -> bytes

Read artifact data from the given location.

Source code in src/artigraph/core/storage/base.py
@abstractmethod
async def read(self, location: str, /) -> bytes:
    """Read artifact data from the given location."""
    ...

register

register() -> Self

Register a storage backend.

It's recommended that each storage backend be defined and registerd in a separate module so that users can select which storage they want to use by importing the module. Thus, if a user does not import a storage backend it will not be registered. This is important because some storage backends may have dependencies that are not installed.

Source code in src/artigraph/core/storage/base.py
def register(self) -> Self:
    """Register a storage backend.

    It's recommended that each storage backend be defined and registerd in a separate
    module so that users can select which storage they want to use by importing the module.
    Thus, if a user does not import a storage backend it will not be registered. This is
    important because some storage backends may have dependencies that are not installed.
    """
    if self.name in STORAGE_BY_NAME:
        msg = (
            f"Serializer named {self.name!r} already "
            f"registered as {STORAGE_BY_NAME[self.name]!r}"
        )
        raise ValueError(msg)

    STORAGE_BY_NAME[self.name] = self

    return self

update abstractmethod async

update(location: str, data: bytes) -> None

Update artifact data at the given location.

Source code in src/artigraph/core/storage/base.py
@abstractmethod
async def update(self, location: str, data: bytes, /) -> None:
    """Update artifact data at the given location."""
    ...

ValueFilter

Bases: Filter, Generic[T]

Filter a column by comparing it to a value.

Source code in src/artigraph/core/api/filter.py
class ValueFilter(Filter, Generic[T]):
    """Filter a column by comparing it to a value."""

    column: InstrumentedAttribute[T] | None = field(repr=False, default=None)
    """The column to filter."""

    gt: T | None = column_op(default=None, op=operator.gt)
    """The column must be greater than this value."""
    ge: T | None = column_op(default=None, op=operator.ge)
    """The column must be greater than or equal to this value."""
    lt: T | None = column_op(default=None, op=operator.lt)
    """The column must be less than this value."""
    le: T | None = column_op(default=None, op=operator.le)
    """The column must be less than or equal to this value."""
    ne: T | None = column_op(default=None, op=operator.ne)
    """The column must not be equal to this value."""
    eq: T | None = column_op(default=None, op=operator.eq)
    """The column must be equal to this value."""
    in_: Collection[T] | None = column_op(default=None, op=Column.in_)
    """The column must be one of these values."""
    not_in: Collection[T] | None = column_op(default=None, op=Column.notin_)
    """The column must not be one of these values."""
    like: T | None = column_op(default=None, op=Column.like)
    """The column must match this pattern."""
    ilike: T | None = column_op(default=None, op=Column.ilike)
    """The column must match this pattern, case-insensitive."""
    is_: bool | None = column_op(default=None, op=Column.is_)
    """The column must be this value."""
    is_not: bool | None = column_op(default=None, op=Column.isnot)
    """The column must not be this value."""

    def against(self, column: InstrumentedAttribute[T] | InstrumentedAttribute[T | None]) -> Self:
        """Filter against the given column."""
        return replace(self, column=column)

    def compose(self, expr: Expression) -> Expression:
        # InstrumentedAttribute is a descriptor so the type checker thinks
        # self.column is of type T, not InstrumentedAttribute[T]
        column = cast(InstrumentedAttribute[T] | None, self.column)

        if column is None:  # nocov
            msg = "No column to filter against - did you forget to call `against`?"
            raise ValueError(msg)

        for f in fields(self):
            if "op" in f.metadata:
                op_value = getattr(self, f.name, None)
                if op_value is not None:
                    op: Callable[[InstrumentedAttribute[T], T], BinaryExpression] = f.metadata["op"]
                    expr &= op(column, op_value)

        return expr

column class-attribute instance-attribute

column: InstrumentedAttribute[T] | None = field(
    repr=False, default=None
)

The column to filter.

eq class-attribute instance-attribute

eq: T | None = column_op(default=None, op=operator.eq)

The column must be equal to this value.

ge class-attribute instance-attribute

ge: T | None = column_op(default=None, op=operator.ge)

The column must be greater than or equal to this value.

gt class-attribute instance-attribute

gt: T | None = column_op(default=None, op=operator.gt)

The column must be greater than this value.

ilike class-attribute instance-attribute

ilike: T | None = column_op(default=None, op=Column.ilike)

The column must match this pattern, case-insensitive.

in_ class-attribute instance-attribute

in_: Collection[T] | None = column_op(
    default=None, op=Column.in_
)

The column must be one of these values.

is_ class-attribute instance-attribute

is_: bool | None = column_op(default=None, op=Column.is_)

The column must be this value.

is_not class-attribute instance-attribute

is_not: bool | None = column_op(
    default=None, op=Column.isnot
)

The column must not be this value.

le class-attribute instance-attribute

le: T | None = column_op(default=None, op=operator.le)

The column must be less than or equal to this value.

like class-attribute instance-attribute

like: T | None = column_op(default=None, op=Column.like)

The column must match this pattern.

lt class-attribute instance-attribute

lt: T | None = column_op(default=None, op=operator.lt)

The column must be less than this value.

ne class-attribute instance-attribute

ne: T | None = column_op(default=None, op=operator.ne)

The column must not be equal to this value.

not_in class-attribute instance-attribute

not_in: Collection[T] | None = column_op(
    default=None, op=Column.notin_
)

The column must not be one of these values.

against

against(
    column: InstrumentedAttribute[T]
    | InstrumentedAttribute[T | None],
) -> Self

Filter against the given column.

Source code in src/artigraph/core/api/filter.py
def against(self, column: InstrumentedAttribute[T] | InstrumentedAttribute[T | None]) -> Self:
    """Filter against the given column."""
    return replace(self, column=column)

current_engine

current_engine(
    engine: AsyncEngine | str,
    *,
    create_tables: bool = False
) -> Iterator[AsyncEngine]

Define which engine to use in the context.

Source code in src/artigraph/core/db.py
@contextmanager
def current_engine(
    engine: AsyncEngine | str,
    *,
    create_tables: bool = False,
) -> Iterator[AsyncEngine]:
    """Define which engine to use in the context."""
    engine = create_async_engine(engine) if isinstance(engine, str) else engine
    reset = set_engine(engine, create_tables=create_tables)
    try:
        yield engine
    finally:
        reset()

current_linker

current_linker() -> Linker

Get the current linker

Source code in src/artigraph/core/linker.py
def current_linker() -> Linker:
    """Get the current linker"""
    linker = _CURRENT_LINKER.get()
    if linker is None:  # nocov
        msg = "No linker is currently active"
        raise RuntimeError(msg)
    return linker

current_session

current_session(
    session_maker: async_sessionmaker[AsyncSession]
    | None = None,
) -> AsyncContextManager[AsyncSession]

A context manager for an asynchronous database session.

Source code in src/artigraph/core/db.py
def current_session(
    session_maker: async_sessionmaker[AsyncSession] | None = None,
) -> AsyncContextManager[AsyncSession]:
    """A context manager for an asynchronous database session."""
    return _CurrentSession(session_maker)

dataclass

dataclass(
    cls: type[T] | None = None, **kwargs: Any
) -> type[T] | Callable[[type[T]], type[T]]

A decorator that makes a class into a dataclass GraphModel.

See: dataclass

Source code in src/artigraph/core/model/dataclasses.py
@dataclass_transform(field_specifiers=(field,))
def dataclass(cls: type[T] | None = None, **kwargs: Any) -> type[T] | Callable[[type[T]], type[T]]:
    """A decorator that makes a class into a dataclass GraphModel.

    See: [dataclass](https://docs.python.org/3/library/dataclasses.html#dataclasses.dataclass)
    """

    def decorator(cls: type[T]) -> type[T]:
        if not issubclass(cls, GraphModel):
            msg = f"{cls} does not inherit from GraphModel"
            raise TypeError(msg)

        cls = _dataclass(cls, **kwargs)

        with allow_model_type_overwrites():

            @_dataclass(**kwargs)
            class _DataclassModel(cls, version=cls.graph_model_version):
                graph_id: UUID = field(default_factory=uuid1, init=False, compare=False)
                graph_model_name = getattr(cls, "graph_model_name", cls.__name__)

                @classmethod
                def graph_model_init(cls, info: ModelInfo, data: dict[str, Any]) -> Self:
                    self = cls(**data)
                    object.__setattr__(self, "graph_id", info.graph_id)
                    return self

                def graph_model_data(self) -> ModelData:
                    return get_annotated_model_data(
                        self,
                        [
                            f.name
                            for f in fields(self)
                            if f.init
                            # exclude this since it's on the DB record anyway
                            and f.name != "graph_id"
                        ],
                    )

        _DataclassModel.__name__ = cls.__name__
        _DataclassModel.__qualname__ = cls.__qualname__

        return cast(type[T], _DataclassModel)

    return decorator if cls is None else decorator(cls)

delete async

delete(cls: type[GraphObject], where: Filter) -> None

Delete records matching the given filter.

Source code in src/artigraph/core/api/funcs.py
@anysync
async def delete(cls: type[GraphObject], where: Filter) -> None:
    """Delete records matching the given filter."""
    related_filters = cls.graph_filter_related(where)
    async with current_session():
        for o_type in sorted(related_filters, key=get_fk_dependency_rank, reverse=True):
            await orm_delete(o_type, related_filters[o_type])
        # must delete this last since the related deletion queries may depend on it
        await orm_delete(cls.graph_orm_type, where)

delete_many async

delete_many(objs: Sequence[GraphObject]) -> None

Delete records.

Source code in src/artigraph/core/api/funcs.py
@anysync
async def delete_many(objs: Sequence[GraphObject]) -> None:
    """Delete records."""
    filters_by_type: defaultdict[type[GraphObject], list[Filter]] = defaultdict(list)
    for o in objs:
        filters_by_type[type(o)].append(o.graph_filter_self())

    async with current_session() as session:
        for o_type, o_filters in filters_by_type.items():
            where = o_filters[0] if len(o_filters) == 1 else MultiFilter(op="or", filters=o_filters)
            await delete.a(o_type, where)
        await session.commit()

delete_one async

delete_one(obj: GraphObject) -> None

Delete a record.

Source code in src/artigraph/core/api/funcs.py
@anysync
async def delete_one(obj: GraphObject) -> None:
    """Delete a record."""
    return await delete_many.a([obj])

exists async

exists(cls: type[GraphObject], where: Filter) -> bool

Check if records exist.

Source code in src/artigraph/core/api/funcs.py
@anysync
async def exists(cls: type[GraphObject], where: Filter) -> bool:
    """Check if records exist."""
    return await orm_exists(cls.graph_orm_type, where)

get_polymorphic_identities

get_polymorphic_identities(
    node_types: Sequence[type[OrmNode]],
    *,
    subclasses: bool = False
) -> Sequence[str]

Get the polymorphic identities of the given node types and optionall their subclasses.

Source code in src/artigraph/core/orm/node.py
def get_polymorphic_identities(
    node_types: Sequence[type[OrmNode]],
    *,
    subclasses: bool = False,
) -> Sequence[str]:
    """Get the polymorphic identities of the given node types and optionall their subclasses."""
    node_types = [s for c in node_types for s in get_subclasses(c)] if subclasses else node_types
    return [nt.polymorphic_identity for nt in node_types if not nt.is_abstract()]

get_serializer_by_name

get_serializer_by_name(name: str) -> Serializer[Any]

Get a serializer by name.

Source code in src/artigraph/core/serializer/base.py
def get_serializer_by_name(name: str) -> Serializer[Any]:
    """Get a serializer by name."""
    if name not in SERIALIZERS_BY_NAME:  # nocov
        msg = f"No serializer named {name!r}"
        raise ValueError(msg)
    return SERIALIZERS_BY_NAME[name]

get_serializer_by_type

get_serializer_by_type(
    cls: type[T],
) -> Sequence[Serializer[T]]

Get a serializer by type.

Source code in src/artigraph/core/serializer/base.py
def get_serializer_by_type(cls: type[T]) -> Sequence[Serializer[T]]:
    """Get a serializer by type."""
    for c in cls.mro():
        if c in SERIALIZERS_BY_TYPE:
            return SERIALIZERS_BY_TYPE[c]
    msg = f"No serializer for type {cls!r}"  # nocov
    raise ValueError(msg)  # nocov

linked

linked(
    *,
    node_type: Callable[[], Node] = Node,
    is_method: bool = False,
    include: str | Collection[str] = (),
    exclude: str | Collection[str] = ()
) -> Callable[[F], F]

Capture the inputs and outputs of a function using Artigraph

Source code in src/artigraph/core/linker.py
def linked(
    *,
    node_type: Callable[[], Node] = Node,
    is_method: bool = False,
    include: str | Collection[str] = (),
    exclude: str | Collection[str] = (),
) -> Callable[[F], F]:
    """Capture the inputs and outputs of a function using Artigraph"""

    if include and exclude:  # nocov
        msg = "Cannot specify both only_save and do_not_save"
        raise ValueError(msg)

    include = {include} if isinstance(include, str) else set(include)
    exclude = {exclude} if isinstance(exclude, str) else set(exclude)
    call_id = 0

    def decorator(func: F) -> F:
        sig = signature(func)
        hint_info = get_save_specs_from_type_hints(func)

        def _create_label_and_inputs(args, kwargs):
            nonlocal call_id
            call_id += 1
            full_label = f"{func.__qualname__}[{call_id}]"
            bound_args = sig.bind_partial(*args, **kwargs)
            inputs = {
                k: v
                for i, (k, v) in enumerate(bound_args.arguments.items())
                if not is_method or i > 1
            }
            return full_label, inputs

        if iscoroutinefunction(func):

            @wraps(func)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                # Check that there's an active linker. Without one, it's possible to
                # produce orphaned nodes. It seems better to prevent that.
                current_linker()

                label, inputs = _create_label_and_inputs(args, kwargs)
                async with Linker(node_type(), label) as linker:
                    output = await func(*args, **kwargs)
                    values = {"return": output, **inputs}
                    for k, v in _create_graph_objects(
                        values,
                        hint_info,
                        include,
                        exclude,
                    ).items():
                        linker.link(v, k)
                    return output

            return cast(F, async_wrapper)

        elif isfunction(func):

            @wraps(func)
            def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
                nonlocal call_id

                # Check that there's an active linker. Without one, it's possible to
                # produce orphaned nodes. It seems better to prevent that.
                current_linker()

                call_id += 1
                label, inputs = _create_label_and_inputs(args, kwargs)
                with Linker(node_type(), label) as linker:
                    output = func(*args, **kwargs)
                    values = {"return": output, **inputs}
                    for k, v in _create_graph_objects(
                        values,
                        hint_info,
                        include,
                        exclude,
                    ).items():
                        linker.link(v, k)
                    return output

            return cast(F, sync_wrapper)

        else:  # nocov
            msg = f"Expected a function, got {type(func)}"
            raise TypeError(msg)

    return decorator

load_deserialized_artifact_value async

load_deserialized_artifact_value(obj: OrmArtifact) -> Any

Load the value of an artifact from its ORM record.

Source code in src/artigraph/core/api/artifact.py
async def load_deserialized_artifact_value(obj: OrmArtifact) -> Any:
    """Load the value of an artifact from its ORM record."""

    if isinstance(obj, OrmRemoteArtifact):
        storage = get_storage_by_name(obj.remote_artifact_storage)
        data = await storage.read(obj.remote_artifact_location)
    elif isinstance(obj, OrmDatabaseArtifact):
        data = obj.database_artifact_data
        storage = None
    else:  # nocov
        msg = f"Unknown artifact type: {obj}"
        raise RuntimeError(msg)

    if data is not None and obj.artifact_serializer is not None:
        data = get_serializer_by_name(obj.artifact_serializer).deserialize(data)

    return data

load_extras

load_extras(*names: str) -> None

Load extra modules.

This is useful for registering serializers from 3rd party libraries.

Source code in src/artigraph/extras/__init__.py
def load_extras(*names: str) -> None:  # nocov
    """Load extra modules.

    This is useful for registering serializers from 3rd party libraries.
    """
    invalid = set(names).difference(_MODULE_NAMES)
    if invalid:
        msg = f"Invalid module names: {invalid}"
        raise ValueError(msg)

    for n in names or _MODULE_NAMES:
        try:
            import_module(f"artigraph.extras.{n}")
        except ImportError:
            if names:
                raise
            _logger.debug(f"Failed to load artigraph.extras.{n}", exc_info=True)

read async

read(cls: type[G], where: Filter) -> Sequence[G]

Read records that match the given filter.

Source code in src/artigraph/core/api/funcs.py
@anysync
async def read(cls: type[G], where: Filter) -> Sequence[G]:
    """Read records that match the given filter."""
    records = await orm_read(cls.graph_orm_type, where)
    related_records = {
        graph_orm_type: await orm_read(graph_orm_type, api_filter)
        for graph_orm_type, api_filter in cls.graph_filter_related(where).items()
    }
    return await cls.graph_load(records, related_records)

read_one async

read_one(cls: type[G], where: Filter) -> G

Read a record that matches the given filter.

Source code in src/artigraph/core/api/funcs.py
@anysync
async def read_one(cls: type[G], where: Filter) -> G:
    """Read a record that matches the given filter."""
    one = await read_one_or_none.a(cls, where)
    if one is None:
        msg = f"No record found matching filter {where}"
        raise ValueError(msg)
    return one

read_one_or_none async

read_one_or_none(cls: type[G], where: Filter) -> G | None

Read a record that matches the given filter or None if no record is found.

Source code in src/artigraph/core/api/funcs.py
@anysync
async def read_one_or_none(cls: type[G], where: Filter) -> G | None:
    """Read a record that matches the given filter or None if no record is found."""
    record = await orm_read_one_or_none(cls.graph_orm_type, where)
    if record is None:
        return None
    related_records = {
        graph_orm_type: await orm_read(graph_orm_type, related_filter)
        for graph_orm_type, related_filter in cls.graph_filter_related(where).items()
    }
    return cast(G, (await cls.graph_load([record], related_records))[0])

set_engine

set_engine(
    engine: AsyncEngine | str,
    *,
    create_tables: bool = False
) -> Callable[[], None]

Set the current engine and whether to try creating tables if they don't exist.

Tables are only created when the engine is retrieved for the first time.

Source code in src/artigraph/core/db.py
def set_engine(engine: AsyncEngine | str, *, create_tables: bool = False) -> Callable[[], None]:
    """Set the current engine and whether to try creating tables if they don't exist.

    Tables are only created when the engine is retrieved for the first time.
    """
    engine = create_async_engine(engine) if isinstance(engine, str) else engine
    current_engine_token = _CURRENT_ENGINE.set(engine)
    create_tables_token = _CREATE_TABLES.set(create_tables)

    def reset() -> None:
        _CURRENT_ENGINE.reset(current_engine_token)
        _CREATE_TABLES.reset(create_tables_token)

    return reset

write_many async

write_many(objs: Collection[GraphObject]) -> None

Create records and, if given, refresh their attributes.

Source code in src/artigraph/core/api/funcs.py
@anysync
async def write_many(objs: Collection[GraphObject]) -> None:
    """Create records and, if given, refresh their attributes."""
    await orm_write(await dump(objs))

write_one async

write_one(obj: GraphObject) -> None

Create a record.

Source code in src/artigraph/core/api/funcs.py
@anysync
async def write_one(obj: GraphObject) -> None:
    """Create a record."""
    return await write_many.a([obj])

S3 storage backend for Artigraph.

S3Storage

Bases: Storage

S3 storage backend for Artigraph.

Parameters:

Name Type Description Default
bucket str

The name of the S3 bucket.

required
prefix str

The prefix to use for all S3 keys.

''
s3_client BaseClient

The S3 client to use.

required
Source code in src/artigraph/extras/aws.py
class S3Storage(Storage):
    """S3 storage backend for Artigraph.

    Parameters:
        bucket: The name of the S3 bucket.
        prefix: The prefix to use for all S3 keys.
        s3_client: The S3 client to use.
    """

    def __init__(self, bucket: str, prefix: str = "", *, s3_client: BaseClient) -> None:
        self.name = slugify(f"artigraph-s3-{bucket}-{prefix}")
        self.bucket = bucket
        self.prefix = prefix
        self.client = s3_client

    async def create(self, value: bytes) -> str:
        """Create an S3 object and return is key."""
        hashed_value = hashlib.sha512(value).hexdigest()
        key = f"{self.prefix}/{hashed_value}"

        # Only create the object if it doesn't already exist.
        try:
            await run_in_thread(self.client.head_object, Bucket=self.bucket, Key=key)
        except ClientError as error:
            if error.response["Error"]["Code"] != "404":
                raise  # nocov
            await run_in_thread(self.client.put_object, Bucket=self.bucket, Key=key, Body=value)

        return key

    async def read(self, key: str) -> bytes:
        """Read an S3 object."""
        response = await run_in_thread(self.client.get_object, Bucket=self.bucket, Key=key)
        return cast(bytes, response["Body"].read())

    async def update(self, key: str, value: bytes) -> None:
        """Update an S3 object."""
        await run_in_thread(self.client.put_object, Bucket=self.bucket, Key=key, Body=value)

    async def delete(self, key: str) -> None:
        """Delete an S3 object."""
        await run_in_thread(self.client.delete_object, Bucket=self.bucket, Key=key)

    async def exists(self, key: str) -> bool:
        """Check if an S3 object exists."""
        try:
            await run_in_thread(self.client.head_object, Bucket=self.bucket, Key=key)
        except ClientError as error:
            if error.response["Error"]["Code"] == "404":
                return False
            raise  # nocov
        return True

create async

create(value: bytes) -> str

Create an S3 object and return is key.

Source code in src/artigraph/extras/aws.py
async def create(self, value: bytes) -> str:
    """Create an S3 object and return is key."""
    hashed_value = hashlib.sha512(value).hexdigest()
    key = f"{self.prefix}/{hashed_value}"

    # Only create the object if it doesn't already exist.
    try:
        await run_in_thread(self.client.head_object, Bucket=self.bucket, Key=key)
    except ClientError as error:
        if error.response["Error"]["Code"] != "404":
            raise  # nocov
        await run_in_thread(self.client.put_object, Bucket=self.bucket, Key=key, Body=value)

    return key

delete async

delete(key: str) -> None

Delete an S3 object.

Source code in src/artigraph/extras/aws.py
async def delete(self, key: str) -> None:
    """Delete an S3 object."""
    await run_in_thread(self.client.delete_object, Bucket=self.bucket, Key=key)

exists async

exists(key: str) -> bool

Check if an S3 object exists.

Source code in src/artigraph/extras/aws.py
async def exists(self, key: str) -> bool:
    """Check if an S3 object exists."""
    try:
        await run_in_thread(self.client.head_object, Bucket=self.bucket, Key=key)
    except ClientError as error:
        if error.response["Error"]["Code"] == "404":
            return False
        raise  # nocov
    return True

read async

read(key: str) -> bytes

Read an S3 object.

Source code in src/artigraph/extras/aws.py
async def read(self, key: str) -> bytes:
    """Read an S3 object."""
    response = await run_in_thread(self.client.get_object, Bucket=self.bucket, Key=key)
    return cast(bytes, response["Body"].read())

update async

update(key: str, value: bytes) -> None

Update an S3 object.

Source code in src/artigraph/extras/aws.py
async def update(self, key: str, value: bytes) -> None:
    """Update an S3 object."""
    await run_in_thread(self.client.put_object, Bucket=self.bucket, Key=key, Body=value)

create_graph async

create_graph(root: GraphObject) -> nx.DiGraph

Create a NetworkX graph from an Artigraph node.

Source code in src/artigraph/extras/networkx.py
@anysync
async def create_graph(root: GraphObject) -> nx.DiGraph:
    """Create a NetworkX graph from an Artigraph node."""
    nodes_by_id, relationship, labels = await _read_nodes_relationships_labels(root)

    graph = nx.DiGraph()
    for i, n in _dfs_iter_nodes(root, list(nodes_by_id.values()), relationship):
        graph.add_node(i, obj=n, label=labels.get(i))
    graph.add_edges_from(
        [
            (source, target, {"label": labels[target]})
            for source, targets in relationship.items()
            for target in targets
        ]
    )

    for layer, nodes in enumerate(nx.topological_generations(graph)):
        # `multipartite_layout` expects the layer as a node attribute, so add the
        # numeric layer value as a node attribute
        for node in nodes:
            graph.nodes[node]["subset"] = layer

    return graph

array_serializer module-attribute

array_serializer = ArraySerializer().register()

A serializer for numpy arrays.

ArraySerializer

Bases: Serializer[ndarray]

A serializer for numpy arrays.

Source code in src/artigraph/extras/numpy.py
class ArraySerializer(Serializer[np.ndarray]):
    """A serializer for numpy arrays."""

    types = (np.ndarray,)
    name = "artigraph-numpy"

    @staticmethod
    def serialize(value: np.ndarray) -> bytes:
        """Serialize a numpy array."""
        if len(value.shape) == NP_1D_SHAPE_LEN:
            pd_value = pd.DataFrame({"1darray": value})
        elif len(value.shape) == NP_2D_SHAPE_LEN:
            pd_value = pd.DataFrame(dict(enumerate(value.T)))
        else:
            msg = f"Can only serialize 1D or 2D arrays, not {value.shape}."
            raise ValueError(msg)
        return dataframe_serializer.serialize(pd_value)

    @staticmethod
    def deserialize(value: bytes) -> np.ndarray:
        """Deserialize a numpy array."""
        pd_value = dataframe_serializer.deserialize(value)
        if "1darray" in pd_value.columns:
            return pd_value["1darray"].to_numpy()
        return pd_value.to_numpy()

deserialize staticmethod

deserialize(value: bytes) -> np.ndarray

Deserialize a numpy array.

Source code in src/artigraph/extras/numpy.py
@staticmethod
def deserialize(value: bytes) -> np.ndarray:
    """Deserialize a numpy array."""
    pd_value = dataframe_serializer.deserialize(value)
    if "1darray" in pd_value.columns:
        return pd_value["1darray"].to_numpy()
    return pd_value.to_numpy()

serialize staticmethod

serialize(value: np.ndarray) -> bytes

Serialize a numpy array.

Source code in src/artigraph/extras/numpy.py
@staticmethod
def serialize(value: np.ndarray) -> bytes:
    """Serialize a numpy array."""
    if len(value.shape) == NP_1D_SHAPE_LEN:
        pd_value = pd.DataFrame({"1darray": value})
    elif len(value.shape) == NP_2D_SHAPE_LEN:
        pd_value = pd.DataFrame(dict(enumerate(value.T)))
    else:
        msg = f"Can only serialize 1D or 2D arrays, not {value.shape}."
        raise ValueError(msg)
    return dataframe_serializer.serialize(pd_value)

dataframe_serializer module-attribute

dataframe_serializer = DataFrameSerializer().register()

A serializer for Pandas dataframes.

DataFrameSerializer

Bases: Serializer[DataFrame]

A serializer for Pandas dataframes.

Source code in src/artigraph/extras/pandas.py
class DataFrameSerializer(Serializer[pd.DataFrame]):
    """A serializer for Pandas dataframes."""

    types = (pd.DataFrame,)
    name = "artigraph-pandas"

    @staticmethod
    def serialize(value: pd.DataFrame) -> bytes:
        """Serialize a Pandas dataframe."""
        return value.to_parquet()

    @staticmethod
    def deserialize(value: bytes) -> pd.DataFrame:
        """Deserialize a Pandas dataframe."""
        return pd.read_parquet(BytesIO(value))

deserialize staticmethod

deserialize(value: bytes) -> pd.DataFrame

Deserialize a Pandas dataframe.

Source code in src/artigraph/extras/pandas.py
@staticmethod
def deserialize(value: bytes) -> pd.DataFrame:
    """Deserialize a Pandas dataframe."""
    return pd.read_parquet(BytesIO(value))

serialize staticmethod

serialize(value: pd.DataFrame) -> bytes

Serialize a Pandas dataframe.

Source code in src/artigraph/extras/pandas.py
@staticmethod
def serialize(value: pd.DataFrame) -> bytes:
    """Serialize a Pandas dataframe."""
    return value.to_parquet()

figure_json_serializer module-attribute

figure_json_serializer = FigureJsonSerializer().register()

Serialize a plotly figure

FigureJsonSerializer

Bases: Serializer[Figure | FigureWidget]

Serialize a plotly figure

Source code in src/artigraph/extras/plotly.py
class FigureJsonSerializer(Serializer[Figure | FigureWidget]):
    """Serialize a plotly figure"""

    name = "artigraph-plotly-figure-json"
    types = (Figure, FigureWidget)

    def serialize(self, figure: Figure | FigureWidget) -> bytes:
        result = plotly_io.to_json(figure)
        if result is None:  # no cov
            msg = "Plotly failed to serialize the figure - this is likely an issue with Plotly"
            raise RuntimeError(msg)
        return result.encode()

    def deserialize(self, data: bytes) -> Figure | FigureWidget:
        return plotly_io.from_json(data.decode())

figure_from_networkx

figure_from_networkx(
    graph: nx.Graph, hover_text_line_limit: int = 25
) -> go.Figure

Create a figure from a NetworkX graph

Source code in src/artigraph/extras/plotly.py
def figure_from_networkx(graph: nx.Graph, hover_text_line_limit: int = 25) -> go.Figure:
    """Create a figure from a NetworkX graph"""
    import networkx as nx

    try:
        import pandas as pd
    except ImportError:  # nocov
        pass
    else:
        pd.set_option("display.max_rows", 20)

    pos = nx.multipartite_layout(graph, align="horizontal")

    node_x = []
    node_y = []
    for node in graph.nodes():
        x, y = pos[node]
        node_x.append(x)
        node_y.append(y)

    edge_x = []
    edge_y = []
    for edge in graph.edges():
        x0, y0 = pos[edge[0]]
        x1, y1 = pos[edge[1]]
        edge_x.extend([x0, x1, None])
        edge_y.extend([y0, y1, None])

    # color Artifact green and Node blue
    node_colors = [
        "blue"  # Deep Blue
        if isinstance(graph.nodes[node]["obj"], Artifact)
        else "yellow"  # Bright Yellow
        if isinstance(graph.nodes[node]["obj"], GraphModel)
        else "green"  # Teal
        for node in graph.nodes()
    ]

    edge_trace = go.Scatter(
        x=edge_x,
        y=edge_y,
        mode="lines",
        line={"width": 3},
    )

    node_trace = go.Scatter(
        x=node_x,
        y=node_y,
        mode="markers",
        hoverinfo="text",
        marker={
            "color": node_colors,
            "size": 12,
            "line_width": 2,
        },
    )

    node_text: list[str] = []
    # generate node text from label of parent edge
    for node in graph.nodes():
        node_attrs = graph.nodes[node]
        text = _create_node_hover_text(node_attrs["label"], node_attrs["obj"])
        br_count = text.count("<br>")
        if br_count > hover_text_line_limit:
            text = "<br>".join(text.split("<br>")[:25] + ["<br>..."])
        node_text.append(text)
    node_trace.text = node_text

    return go.Figure(
        data=[edge_trace, node_trace],
        layout=go.Layout(
            titlefont_size=16,
            showlegend=False,
            hovermode="closest",
            xaxis={"showgrid": False, "zeroline": False, "showticklabels": False},
            yaxis={"showgrid": False, "zeroline": False, "showticklabels": False},
            hoverlabel={"font": {"family": "monospace", "size": 10}},
        ),
    )

dataframe_serializer module-attribute

dataframe_serializer = DataFrameSerializer().register()

A serializer for Polars dataframes that uses the parquet file format.

DataFrameSerializer

Bases: Serializer[DataFrame]

A serializer for Polars dataframes.

Source code in src/artigraph/extras/polars.py
class DataFrameSerializer(Serializer[pl.DataFrame]):
    """A serializer for Polars dataframes."""

    types = (pl.DataFrame,)

    def __init__(self, pyarrow_serializer: ArrowSerializer = parquet_serializer):
        self.pyarrow_serializer = pyarrow_serializer
        self.name = f"artigraph-polars-{pyarrow_serializer.name}"

    def serialize(self, value: pl.DataFrame) -> bytes:
        """Serialize a Polars dataframe."""
        return self.pyarrow_serializer.serialize(value.to_arrow())

    def deserialize(self, value: bytes) -> pl.DataFrame:
        """Deserialize a Polars dataframe."""
        return pl.from_arrow(self.pyarrow_serializer.deserialize(value))  # type: ignore

deserialize

deserialize(value: bytes) -> pl.DataFrame

Deserialize a Polars dataframe.

Source code in src/artigraph/extras/polars.py
def deserialize(self, value: bytes) -> pl.DataFrame:
    """Deserialize a Polars dataframe."""
    return pl.from_arrow(self.pyarrow_serializer.deserialize(value))  # type: ignore

serialize

serialize(value: pl.DataFrame) -> bytes

Serialize a Polars dataframe.

Source code in src/artigraph/extras/polars.py
def serialize(self, value: pl.DataFrame) -> bytes:
    """Serialize a Polars dataframe."""
    return self.pyarrow_serializer.serialize(value.to_arrow())

feather_serializer module-attribute

feather_serializer = ArrowSerializer('feather').register()

A serializer for PyArrow tables using Feather.

parquet_serializer module-attribute

parquet_serializer = ArrowSerializer('parquet').register()

A serializer for PyArrow tables using Parquet.

ArrowSerializer

Bases: Serializer[Table]

A serializer for PyArrow tables.

Source code in src/artigraph/extras/pyarrow.py
class ArrowSerializer(Serializer[pa.Table]):
    """A serializer for PyArrow tables."""

    types = (pa.Table,)

    def __init__(self, file_format: Literal["feather", "parquet"]):
        self.file_format = file_format
        self.name = f"artigraph-pyarrow-{file_format}"

    def serialize(self, value: pa.Table) -> bytes:
        """Serialize a PyArrow table."""
        if not isinstance(value, pa.Table):
            msg = f"Expected a PyArrow table, got {type(value)}"
            raise TypeError(msg)
        method = getattr(self, f"serialize_{self.file_format}")
        return method(value)

    def deserialize(self, value: bytes) -> pa.Table:
        """Deserialize a PyArrow table."""
        method = getattr(self, f"deserialize_{self.file_format}")
        return method(value)

    def serialize_parquet(self, value: pa.Table) -> bytes:
        """Serialize a PyArrow table to Parquet."""
        buffer = BytesIO()
        parquet.write_table(value, buffer)
        return buffer.getvalue()

    def deserialize_parquet(self, value: bytes) -> pa.Table:
        """Deserialize a PyArrow table from Parquet."""
        buffer = BytesIO(value)
        return parquet.read_table(buffer)

    def serialize_feather(self, value: pa.Table) -> bytes:
        """Serialize a PyArrow table to Feather."""
        buffer = BytesIO()
        feather.write_feather(value, buffer)
        return buffer.getvalue()

    def deserialize_feather(self, value: bytes) -> pa.Table:
        """Deserialize a PyArrow table from Feather."""
        buffer = BytesIO(value)
        return feather.read_table(buffer)

deserialize

deserialize(value: bytes) -> pa.Table

Deserialize a PyArrow table.

Source code in src/artigraph/extras/pyarrow.py
def deserialize(self, value: bytes) -> pa.Table:
    """Deserialize a PyArrow table."""
    method = getattr(self, f"deserialize_{self.file_format}")
    return method(value)

deserialize_feather

deserialize_feather(value: bytes) -> pa.Table

Deserialize a PyArrow table from Feather.

Source code in src/artigraph/extras/pyarrow.py
def deserialize_feather(self, value: bytes) -> pa.Table:
    """Deserialize a PyArrow table from Feather."""
    buffer = BytesIO(value)
    return feather.read_table(buffer)

deserialize_parquet

deserialize_parquet(value: bytes) -> pa.Table

Deserialize a PyArrow table from Parquet.

Source code in src/artigraph/extras/pyarrow.py
def deserialize_parquet(self, value: bytes) -> pa.Table:
    """Deserialize a PyArrow table from Parquet."""
    buffer = BytesIO(value)
    return parquet.read_table(buffer)

serialize

serialize(value: pa.Table) -> bytes

Serialize a PyArrow table.

Source code in src/artigraph/extras/pyarrow.py
def serialize(self, value: pa.Table) -> bytes:
    """Serialize a PyArrow table."""
    if not isinstance(value, pa.Table):
        msg = f"Expected a PyArrow table, got {type(value)}"
        raise TypeError(msg)
    method = getattr(self, f"serialize_{self.file_format}")
    return method(value)

serialize_feather

serialize_feather(value: pa.Table) -> bytes

Serialize a PyArrow table to Feather.

Source code in src/artigraph/extras/pyarrow.py
def serialize_feather(self, value: pa.Table) -> bytes:
    """Serialize a PyArrow table to Feather."""
    buffer = BytesIO()
    feather.write_feather(value, buffer)
    return buffer.getvalue()

serialize_parquet

serialize_parquet(value: pa.Table) -> bytes

Serialize a PyArrow table to Parquet.

Source code in src/artigraph/extras/pyarrow.py
def serialize_parquet(self, value: pa.Table) -> bytes:
    """Serialize a PyArrow table to Parquet."""
    buffer = BytesIO()
    parquet.write_table(value, buffer)
    return buffer.getvalue()

PydanticModel

Bases: GraphModel, BaseModel

A base for all artifacts modeled with Pydantic.

Source code in src/artigraph/extras/pydantic.py
class PydanticModel(GraphModel, _BaseModel, version=1):
    """A base for all artifacts modeled with Pydantic."""

    graph_id: UUID = Field(default_factory=uuid1, exclude=True)
    """The unique ID of this model."""

    def graph_model_data(self) -> ModelData:
        return get_annotated_model_data(
            self,
            [name for name, field in self.model_fields.items() if not field.exclude],
        )

    @classmethod
    def graph_model_init(cls, info: ModelInfo, kwargs: dict[str, Any]) -> Self:
        return cls(graph_id=info.graph_id, **kwargs)

graph_id class-attribute instance-attribute

graph_id: UUID = Field(default_factory=uuid1, exclude=True)

The unique ID of this model.