Source code for gestalt._providers

"""Provider base classes for non-integration Gestalt runtimes.

The generated request and response protobuf messages for authentication and
catalog data remain available through the public :mod:`gestalt` package, but
these helpers document the handwritten provider interfaces that wrap those
messages.
"""

from __future__ import annotations

import datetime as dt
from enum import Enum
from typing import TYPE_CHECKING, Any, Callable, Protocol, runtime_checkable

if TYPE_CHECKING:
    from ._cache import CacheEntry


[docs] class ProviderKind(str, Enum): """Runtime kinds supported by the Python SDK.""" INTEGRATION = "integration" AUTHENTICATION = "authentication" CACHE = "cache" S3 = "s3" AGENT = "agent" WORKFLOW = "workflow" SECRETS = "secrets" TELEMETRY = "telemetry"
[docs] class ProviderMetadata: """Descriptive metadata returned by :class:`MetadataProvider`.""" __slots__ = ("kind", "name", "display_name", "description", "version") def __init__( self, kind: ProviderKind | str, name: str = "", display_name: str = "", description: str = "", version: str = "", ) -> None: self.kind = kind self.name = name self.display_name = display_name self.description = description self.version = version
[docs] class PluginProvider: """Base interface shared by provider-style runtimes."""
[docs] def configure(self, name: str, config: dict[str, Any]) -> None: """Apply the host-provided provider name and parsed configuration.""" pass
[docs] class MetadataProvider: """Optional mixin for providers that expose descriptive metadata."""
[docs] def metadata(self) -> ProviderMetadata: """Return metadata for the running provider instance.""" raise NotImplementedError
[docs] class HealthChecker: """Optional mixin for providers that support health checks."""
[docs] def health_check(self) -> None: """Raise if the provider is unhealthy.""" raise NotImplementedError
@runtime_checkable class Starter(Protocol): """Optional mixin for providers with an explicit post-configure start phase.""" def start(self) -> None: """Start provider-owned background work after host readiness.""" ...
[docs] class WarningsProvider: """Optional mixin for providers that emit startup warnings."""
[docs] def warnings(self) -> list[str]: """Return human-readable warnings for the host to surface.""" raise NotImplementedError
[docs] class Closer: """Optional mixin for providers with explicit shutdown work."""
[docs] def close(self) -> None: """Release any provider resources before the process exits.""" raise NotImplementedError
RegisterServices = Callable[[Any, PluginProvider], None]
[docs] class PluginProviderAdapter: """Wrap a provider and registration callback for integration runtimes.""" __slots__ = ("kind", "provider", "register_services") def __init__( self, kind: ProviderKind | str, provider: PluginProvider, register_services: RegisterServices, ) -> None: self.kind = kind self.provider = provider self.register_services = register_services
[docs] def serve(self) -> None: """Start the provider's gRPC runtime.""" from . import _runtime _runtime.serve(self)
[docs] class AuthenticationProvider(PluginProvider): """Base class for authentication providers."""
[docs] def begin_login(self, request: Any) -> Any: """Begin an interactive login flow.""" raise NotImplementedError
[docs] def complete_login(self, request: Any) -> Any: """Complete an interactive login flow.""" raise NotImplementedError
[docs] def serve(self) -> None: """Start the authentication runtime.""" from . import _runtime _runtime.serve(self, runtime_kind=ProviderKind.AUTHENTICATION)
[docs] class ExternalTokenValidator: """Optional mixin for providers that validate external bearer tokens."""
[docs] def validate_external_token(self, token: str) -> Any: """Validate a bearer token and return the authenticated subject.""" raise NotImplementedError
[docs] class SessionTTLProvider: """Optional mixin for providers that control session lifetimes."""
[docs] def session_ttl(self) -> dt.timedelta: """Return the requested session time-to-live.""" raise NotImplementedError
[docs] class SecretsProvider(PluginProvider): """Base class for secret-provider runtimes."""
[docs] def get_secret(self, name: str) -> str: """Return a secret value by name.""" raise NotImplementedError
[docs] def serve(self) -> None: """Start the secrets runtime.""" from . import _runtime _runtime.serve(self, runtime_kind=ProviderKind.SECRETS)
[docs] class CacheProvider(PluginProvider): """Base class for cache-provider runtimes."""
[docs] def get(self, key: str) -> bytes | None: """Return a cached value or ``None`` if the key is missing.""" raise NotImplementedError
[docs] def get_many(self, keys: list[str]) -> dict[str, bytes]: """Return the subset of ``keys`` that currently exist.""" values: dict[str, bytes] = {} for key in keys: value = self.get(key) if value is not None: values[key] = bytes(value) return values
[docs] def set(self, key: str, value: bytes, ttl: dt.timedelta | None = None) -> None: """Store ``value`` for ``key`` with an optional time-to-live.""" raise NotImplementedError
[docs] def set_many( self, entries: list[CacheEntry], ttl: dt.timedelta | None = None ) -> None: """Store multiple cache entries using repeated :meth:`set` calls.""" for entry in entries: self.set(entry.key, entry.value, ttl)
[docs] def delete(self, key: str) -> bool: """Delete a cache entry and report whether it existed.""" raise NotImplementedError
[docs] def delete_many(self, keys: list[str]) -> int: """Delete a batch of cache keys and return the number removed.""" deleted = 0 seen: set[str] = set() for key in keys: if key in seen: continue seen.add(key) if self.delete(key): deleted += 1 return deleted
[docs] def touch(self, key: str, ttl: dt.timedelta) -> bool: """Refresh the TTL for an existing key.""" raise NotImplementedError
[docs] def serve(self) -> None: """Start the cache runtime.""" from . import _runtime _runtime.serve(self, runtime_kind=ProviderKind.CACHE)
[docs] class S3Provider(PluginProvider): """Base class for S3-compatible object store runtimes."""
[docs] def serve(self) -> None: """Start the S3 runtime.""" from . import _runtime _runtime.serve(self, runtime_kind=ProviderKind.S3)
class AgentProvider(PluginProvider): """Base class for agent-provider runtimes.""" def serve(self) -> None: """Start the agent runtime.""" from . import _runtime _runtime.serve(self, runtime_kind=ProviderKind.AGENT) class WorkflowProvider(PluginProvider): def serve(self) -> None: from . import _runtime _runtime.serve(self, runtime_kind=ProviderKind.WORKFLOW)