Workflow
This page covers building custom workflow providers. If you only need to configure workflow storage for a deployment, use Providers > Workflow.
Workflow providers back global workflow runs, schedules, and triggers. Gestalt uses them to persist workflow state and to call back into target plugin operations through the workflow host socket.
This runtime is not a declarative DAG planner. Gestalt does not hand a workflow provider a graph of named steps. A provider stores schedules, triggers, runs, and events, then asks Gestalt to invoke one explicit target operation at a time. Larger flows are composed above this layer by publishing events and attaching more triggers to those events.
Manifest
A workflow provider manifest declares kind: workflow:
kind: workflow
source: github.com/your-org/workflow/example
version: 0.0.1
displayName: Example Workflow
description: Workflow provider with custom run and schedule storage.
spec:
configSchemaPath: ./workflow_config.jsonProvider interface
The workflow service includes:
| Method | Purpose |
|---|---|
StartRun / GetRun / ListRuns / CancelRun | Manage workflow runs |
UpsertSchedule / GetSchedule / ListSchedules / DeleteSchedule / PauseSchedule / ResumeSchedule | Manage schedules |
UpsertEventTrigger / GetEventTrigger / ListEventTriggers / DeleteEventTrigger / PauseEventTrigger / ResumeEventTrigger | Manage triggers |
PublishEvent | Deliver an event into the provider |
The examples below show the core methods. The remaining methods follow the same request/response shape.
Go
package exampleworkflow
import (
"context"
gestalt "github.com/valon-technologies/gestalt/sdk/go"
proto "github.com/valon-technologies/gestalt/sdk/go/gen/v1"
)
type Provider struct{}
func New() *Provider { return &Provider{} }
func (p *Provider) Configure(_ context.Context, _ string, config map[string]any) error {
return nil
}
func (p *Provider) StartRun(_ context.Context, req *proto.StartWorkflowProviderRunRequest) (*proto.BoundWorkflowRun, error) {
return &proto.BoundWorkflowRun{
Id: req.GetTarget().GetPlugin().GetPluginName() + ":run-1",
Status: proto.WorkflowRunStatus_WORKFLOW_RUN_STATUS_PENDING,
Target: req.GetTarget(),
}, nil
}
func (p *Provider) GetRun(_ context.Context, req *proto.GetWorkflowProviderRunRequest) (*proto.BoundWorkflowRun, error) {
return &proto.BoundWorkflowRun{
Id: req.GetRunId(),
}, nil
}
func (p *Provider) ListRuns(context.Context, *proto.ListWorkflowProviderRunsRequest) (*proto.ListWorkflowProviderRunsResponse, error) {
return &proto.ListWorkflowProviderRunsResponse{}, nil
}
func (p *Provider) CancelRun(_ context.Context, req *proto.CancelWorkflowProviderRunRequest) (*proto.BoundWorkflowRun, error) {
return &proto.BoundWorkflowRun{
Id: req.GetRunId(),
Status: proto.WorkflowRunStatus_WORKFLOW_RUN_STATUS_CANCELED,
}, nil
}
func (p *Provider) UpsertSchedule(_ context.Context, req *proto.UpsertWorkflowProviderScheduleRequest) (*proto.BoundWorkflowSchedule, error) {
return &proto.BoundWorkflowSchedule{
Id: req.GetScheduleId(),
Cron: req.GetCron(),
Target: req.GetTarget(),
Paused: req.GetPaused(),
}, nil
}
func (p *Provider) GetSchedule(_ context.Context, req *proto.GetWorkflowProviderScheduleRequest) (*proto.BoundWorkflowSchedule, error) {
return &proto.BoundWorkflowSchedule{Id: req.GetScheduleId()}, nil
}
func (p *Provider) ListSchedules(context.Context, *proto.ListWorkflowProviderSchedulesRequest) (*proto.ListWorkflowProviderSchedulesResponse, error) {
return &proto.ListWorkflowProviderSchedulesResponse{}, nil
}
func (p *Provider) DeleteSchedule(context.Context, *proto.DeleteWorkflowProviderScheduleRequest) error {
return nil
}
func (p *Provider) PauseSchedule(_ context.Context, req *proto.PauseWorkflowProviderScheduleRequest) (*proto.BoundWorkflowSchedule, error) {
return &proto.BoundWorkflowSchedule{Id: req.GetScheduleId(), Paused: true}, nil
}
func (p *Provider) ResumeSchedule(_ context.Context, req *proto.ResumeWorkflowProviderScheduleRequest) (*proto.BoundWorkflowSchedule, error) {
return &proto.BoundWorkflowSchedule{Id: req.GetScheduleId(), Paused: false}, nil
}
func (p *Provider) UpsertEventTrigger(_ context.Context, req *proto.UpsertWorkflowProviderEventTriggerRequest) (*proto.BoundWorkflowEventTrigger, error) {
return &proto.BoundWorkflowEventTrigger{
Id: req.GetTriggerId(),
Match: req.GetMatch(),
Target: req.GetTarget(),
Paused: req.GetPaused(),
}, nil
}
func (p *Provider) GetEventTrigger(_ context.Context, req *proto.GetWorkflowProviderEventTriggerRequest) (*proto.BoundWorkflowEventTrigger, error) {
return &proto.BoundWorkflowEventTrigger{Id: req.GetTriggerId()}, nil
}
func (p *Provider) ListEventTriggers(context.Context, *proto.ListWorkflowProviderEventTriggersRequest) (*proto.ListWorkflowProviderEventTriggersResponse, error) {
return &proto.ListWorkflowProviderEventTriggersResponse{}, nil
}
func (p *Provider) DeleteEventTrigger(context.Context, *proto.DeleteWorkflowProviderEventTriggerRequest) error {
return nil
}
func (p *Provider) PauseEventTrigger(_ context.Context, req *proto.PauseWorkflowProviderEventTriggerRequest) (*proto.BoundWorkflowEventTrigger, error) {
return &proto.BoundWorkflowEventTrigger{Id: req.GetTriggerId(), Paused: true}, nil
}
func (p *Provider) ResumeEventTrigger(_ context.Context, req *proto.ResumeWorkflowProviderEventTriggerRequest) (*proto.BoundWorkflowEventTrigger, error) {
return &proto.BoundWorkflowEventTrigger{Id: req.GetTriggerId(), Paused: false}, nil
}
func (p *Provider) PublishEvent(context.Context, *proto.PublishWorkflowProviderEventRequest) error {
return nil
}
func main() {
if err := gestalt.ServeWorkflowProvider(context.Background(), New()); err != nil {
panic(err)
}
}Workflow host
Workflow providers call back into Gestalt through the workflow host socket to invoke target plugin operations.
Go
host, err := gestalt.WorkflowHost()
if err != nil {
return nil, err
}
defer host.Close()
resp, err := host.InvokeOperation(ctx, &proto.InvokeWorkflowOperationRequest{
Target: &proto.BoundWorkflowTarget{
Plugin: &proto.BoundWorkflowPluginTarget{
PluginName: "roadmap",
Operation: "sync_items",
},
},
RunId: "run-1",
})Config reference
To use a workflow provider in a deployment, configure it under
providers.workflow, then reference it from top-level workflows: or the
global workflow API/CLI:
providers:
indexeddb:
workflow_state:
source: https://artifacts.example.com/indexeddb/relationaldb/v0.0.1-alpha.1/provider-release.yaml
config:
dsn: ${DATABASE_URL}
workflow:
local:
source: ./workflow/manifest.yaml
indexeddb:
provider: workflow_state
db: workflow
config:
pollInterval: 1s
workflows:
schedules:
nightly_sync:
provider: local
cron: "0 3 * * *"
timezone: America/New_York
target:
plugin:
name: roadmap
operation: sync_itemsproviders.workflow.<name>.indexeddb is optional. When present, Gestalt
exposes a host IndexedDB binding to the workflow provider over the usual
IndexedDB SDK socket.
What to read next
- Providers > Workflow: configuring workflow providers
- Releasing: packaging and publishing provider releases
- Built-in Providers: first-party workflow provider reference