"""Provider base classes for non-integration Gestalt runtimes.
The generated request and response protobuf messages for authentication and
catalog data remain available through the public :mod:`gestalt` package, but
these helpers document the handwritten provider interfaces that wrap those
messages.
"""
from __future__ import annotations
import datetime as dt
from enum import Enum
from typing import TYPE_CHECKING, Any, Callable, Protocol, runtime_checkable
if TYPE_CHECKING:
from ._cache import CacheEntry
[docs]
class ProviderKind(str, Enum):
"""Runtime kinds supported by the Python SDK."""
INTEGRATION = "integration"
AUTHENTICATION = "authentication"
CACHE = "cache"
S3 = "s3"
AGENT = "agent"
WORKFLOW = "workflow"
SECRETS = "secrets"
TELEMETRY = "telemetry"
[docs]
class PluginProvider:
"""Base interface shared by provider-style runtimes."""
[docs]
class HealthChecker:
"""Optional mixin for providers that support health checks."""
[docs]
def health_check(self) -> None:
"""Raise if the provider is unhealthy."""
raise NotImplementedError
@runtime_checkable
class Starter(Protocol):
"""Optional mixin for providers with an explicit post-configure start phase."""
def start(self) -> None:
"""Start provider-owned background work after host readiness."""
...
[docs]
class WarningsProvider:
"""Optional mixin for providers that emit startup warnings."""
[docs]
def warnings(self) -> list[str]:
"""Return human-readable warnings for the host to surface."""
raise NotImplementedError
[docs]
class Closer:
"""Optional mixin for providers with explicit shutdown work."""
[docs]
def close(self) -> None:
"""Release any provider resources before the process exits."""
raise NotImplementedError
RegisterServices = Callable[[Any, PluginProvider], None]
[docs]
class PluginProviderAdapter:
"""Wrap a provider and registration callback for integration runtimes."""
__slots__ = ("kind", "provider", "register_services")
def __init__(
self,
kind: ProviderKind | str,
provider: PluginProvider,
register_services: RegisterServices,
) -> None:
self.kind = kind
self.provider = provider
self.register_services = register_services
[docs]
def serve(self) -> None:
"""Start the provider's gRPC runtime."""
from . import _runtime
_runtime.serve(self)
[docs]
class AuthenticationProvider(PluginProvider):
"""Base class for authentication providers."""
[docs]
def begin_login(self, request: Any) -> Any:
"""Begin an interactive login flow."""
raise NotImplementedError
[docs]
def complete_login(self, request: Any) -> Any:
"""Complete an interactive login flow."""
raise NotImplementedError
[docs]
def serve(self) -> None:
"""Start the authentication runtime."""
from . import _runtime
_runtime.serve(self, runtime_kind=ProviderKind.AUTHENTICATION)
[docs]
class ExternalTokenValidator:
"""Optional mixin for providers that validate external bearer tokens."""
[docs]
def validate_external_token(self, token: str) -> Any:
"""Validate a bearer token and return the authenticated subject."""
raise NotImplementedError
[docs]
class SessionTTLProvider:
"""Optional mixin for providers that control session lifetimes."""
[docs]
def session_ttl(self) -> dt.timedelta:
"""Return the requested session time-to-live."""
raise NotImplementedError
[docs]
class SecretsProvider(PluginProvider):
"""Base class for secret-provider runtimes."""
[docs]
def get_secret(self, name: str) -> str:
"""Return a secret value by name."""
raise NotImplementedError
[docs]
def serve(self) -> None:
"""Start the secrets runtime."""
from . import _runtime
_runtime.serve(self, runtime_kind=ProviderKind.SECRETS)
[docs]
class CacheProvider(PluginProvider):
"""Base class for cache-provider runtimes."""
[docs]
def get(self, key: str) -> bytes | None:
"""Return a cached value or ``None`` if the key is missing."""
raise NotImplementedError
[docs]
def get_many(self, keys: list[str]) -> dict[str, bytes]:
"""Return the subset of ``keys`` that currently exist."""
values: dict[str, bytes] = {}
for key in keys:
value = self.get(key)
if value is not None:
values[key] = bytes(value)
return values
[docs]
def set(self, key: str, value: bytes, ttl: dt.timedelta | None = None) -> None:
"""Store ``value`` for ``key`` with an optional time-to-live."""
raise NotImplementedError
[docs]
def set_many(
self, entries: list[CacheEntry], ttl: dt.timedelta | None = None
) -> None:
"""Store multiple cache entries using repeated :meth:`set` calls."""
for entry in entries:
self.set(entry.key, entry.value, ttl)
[docs]
def delete(self, key: str) -> bool:
"""Delete a cache entry and report whether it existed."""
raise NotImplementedError
[docs]
def delete_many(self, keys: list[str]) -> int:
"""Delete a batch of cache keys and return the number removed."""
deleted = 0
seen: set[str] = set()
for key in keys:
if key in seen:
continue
seen.add(key)
if self.delete(key):
deleted += 1
return deleted
[docs]
def touch(self, key: str, ttl: dt.timedelta) -> bool:
"""Refresh the TTL for an existing key."""
raise NotImplementedError
[docs]
def serve(self) -> None:
"""Start the cache runtime."""
from . import _runtime
_runtime.serve(self, runtime_kind=ProviderKind.CACHE)
[docs]
class S3Provider(PluginProvider):
"""Base class for S3-compatible object store runtimes."""
[docs]
def serve(self) -> None:
"""Start the S3 runtime."""
from . import _runtime
_runtime.serve(self, runtime_kind=ProviderKind.S3)
class AgentProvider(PluginProvider):
"""Base class for agent-provider runtimes."""
def serve(self) -> None:
"""Start the agent runtime."""
from . import _runtime
_runtime.serve(self, runtime_kind=ProviderKind.AGENT)
class WorkflowProvider(PluginProvider):
def serve(self) -> None:
from . import _runtime
_runtime.serve(self, runtime_kind=ProviderKind.WORKFLOW)