Skip to Content

Workflow

This page covers building custom workflow providers. If you only need to configure workflow storage for a deployment, use Providers > Workflow.

Workflow providers back global workflow runs, schedules, and triggers. Gestalt uses them to persist workflow state and to call back into target plugin operations through the workflow host socket.

This runtime is not a declarative DAG planner. Gestalt does not hand a workflow provider a graph of named steps. A provider stores schedules, triggers, runs, and events, then asks Gestalt to invoke one explicit target operation at a time. Larger flows are composed above this layer by publishing events and attaching more triggers to those events.

Manifest

A workflow provider manifest declares kind: workflow:

kind: workflow source: github.com/your-org/workflow/example version: 0.0.1 displayName: Example Workflow description: Workflow provider with custom run and schedule storage. spec: configSchemaPath: ./workflow_config.json

Provider interface

The workflow service includes:

MethodPurpose
StartRun / GetRun / ListRuns / CancelRunManage workflow runs
UpsertSchedule / GetSchedule / ListSchedules / DeleteSchedule / PauseSchedule / ResumeScheduleManage schedules
UpsertEventTrigger / GetEventTrigger / ListEventTriggers / DeleteEventTrigger / PauseEventTrigger / ResumeEventTriggerManage triggers
PublishEventDeliver an event into the provider

The examples below show the core methods. The remaining methods follow the same request/response shape.

package exampleworkflow import ( "context" gestalt "github.com/valon-technologies/gestalt/sdk/go" proto "github.com/valon-technologies/gestalt/sdk/go/gen/v1" ) type Provider struct{} func New() *Provider { return &Provider{} } func (p *Provider) Configure(_ context.Context, _ string, config map[string]any) error { return nil } func (p *Provider) StartRun(_ context.Context, req *proto.StartWorkflowProviderRunRequest) (*proto.BoundWorkflowRun, error) { return &proto.BoundWorkflowRun{ Id: req.GetTarget().GetPlugin().GetPluginName() + ":run-1", Status: proto.WorkflowRunStatus_WORKFLOW_RUN_STATUS_PENDING, Target: req.GetTarget(), }, nil } func (p *Provider) GetRun(_ context.Context, req *proto.GetWorkflowProviderRunRequest) (*proto.BoundWorkflowRun, error) { return &proto.BoundWorkflowRun{ Id: req.GetRunId(), }, nil } func (p *Provider) ListRuns(context.Context, *proto.ListWorkflowProviderRunsRequest) (*proto.ListWorkflowProviderRunsResponse, error) { return &proto.ListWorkflowProviderRunsResponse{}, nil } func (p *Provider) CancelRun(_ context.Context, req *proto.CancelWorkflowProviderRunRequest) (*proto.BoundWorkflowRun, error) { return &proto.BoundWorkflowRun{ Id: req.GetRunId(), Status: proto.WorkflowRunStatus_WORKFLOW_RUN_STATUS_CANCELED, }, nil } func (p *Provider) UpsertSchedule(_ context.Context, req *proto.UpsertWorkflowProviderScheduleRequest) (*proto.BoundWorkflowSchedule, error) { return &proto.BoundWorkflowSchedule{ Id: req.GetScheduleId(), Cron: req.GetCron(), Target: req.GetTarget(), Paused: req.GetPaused(), }, nil } func (p *Provider) GetSchedule(_ context.Context, req *proto.GetWorkflowProviderScheduleRequest) (*proto.BoundWorkflowSchedule, error) { return &proto.BoundWorkflowSchedule{Id: req.GetScheduleId()}, nil } func (p *Provider) ListSchedules(context.Context, *proto.ListWorkflowProviderSchedulesRequest) (*proto.ListWorkflowProviderSchedulesResponse, error) { return &proto.ListWorkflowProviderSchedulesResponse{}, nil } func (p *Provider) DeleteSchedule(context.Context, *proto.DeleteWorkflowProviderScheduleRequest) error { return nil } func (p *Provider) PauseSchedule(_ context.Context, req *proto.PauseWorkflowProviderScheduleRequest) (*proto.BoundWorkflowSchedule, error) { return &proto.BoundWorkflowSchedule{Id: req.GetScheduleId(), Paused: true}, nil } func (p *Provider) ResumeSchedule(_ context.Context, req *proto.ResumeWorkflowProviderScheduleRequest) (*proto.BoundWorkflowSchedule, error) { return &proto.BoundWorkflowSchedule{Id: req.GetScheduleId(), Paused: false}, nil } func (p *Provider) UpsertEventTrigger(_ context.Context, req *proto.UpsertWorkflowProviderEventTriggerRequest) (*proto.BoundWorkflowEventTrigger, error) { return &proto.BoundWorkflowEventTrigger{ Id: req.GetTriggerId(), Match: req.GetMatch(), Target: req.GetTarget(), Paused: req.GetPaused(), }, nil } func (p *Provider) GetEventTrigger(_ context.Context, req *proto.GetWorkflowProviderEventTriggerRequest) (*proto.BoundWorkflowEventTrigger, error) { return &proto.BoundWorkflowEventTrigger{Id: req.GetTriggerId()}, nil } func (p *Provider) ListEventTriggers(context.Context, *proto.ListWorkflowProviderEventTriggersRequest) (*proto.ListWorkflowProviderEventTriggersResponse, error) { return &proto.ListWorkflowProviderEventTriggersResponse{}, nil } func (p *Provider) DeleteEventTrigger(context.Context, *proto.DeleteWorkflowProviderEventTriggerRequest) error { return nil } func (p *Provider) PauseEventTrigger(_ context.Context, req *proto.PauseWorkflowProviderEventTriggerRequest) (*proto.BoundWorkflowEventTrigger, error) { return &proto.BoundWorkflowEventTrigger{Id: req.GetTriggerId(), Paused: true}, nil } func (p *Provider) ResumeEventTrigger(_ context.Context, req *proto.ResumeWorkflowProviderEventTriggerRequest) (*proto.BoundWorkflowEventTrigger, error) { return &proto.BoundWorkflowEventTrigger{Id: req.GetTriggerId(), Paused: false}, nil } func (p *Provider) PublishEvent(context.Context, *proto.PublishWorkflowProviderEventRequest) error { return nil } func main() { if err := gestalt.ServeWorkflowProvider(context.Background(), New()); err != nil { panic(err) } }

Workflow host

Workflow providers call back into Gestalt through the workflow host socket to invoke target plugin operations.

host, err := gestalt.WorkflowHost() if err != nil { return nil, err } defer host.Close() resp, err := host.InvokeOperation(ctx, &proto.InvokeWorkflowOperationRequest{ Target: &proto.BoundWorkflowTarget{ Plugin: &proto.BoundWorkflowPluginTarget{ PluginName: "roadmap", Operation: "sync_items", }, }, RunId: "run-1", })

Config reference

To use a workflow provider in a deployment, configure it under providers.workflow, then reference it from top-level workflows: or the global workflow API/CLI:

providers: indexeddb: workflow_state: source: https://artifacts.example.com/indexeddb/relationaldb/v0.0.1-alpha.1/provider-release.yaml config: dsn: ${DATABASE_URL} workflow: local: source: ./workflow/manifest.yaml indexeddb: provider: workflow_state db: workflow config: pollInterval: 1s workflows: schedules: nightly_sync: provider: local cron: "0 3 * * *" timezone: America/New_York target: plugin: name: roadmap operation: sync_items

providers.workflow.<name>.indexeddb is optional. When present, Gestalt exposes a host IndexedDB binding to the workflow provider over the usual IndexedDB SDK socket.