Orchestrations¶
Overview¶
Orchestrations are multi-step workflows that coordinate complex business processes. Unlike single compute functions that react to one event, orchestrations chain multiple steps together with conditional logic, delays, and decision branches.
What are Orchestrations?¶
Orchestrations allow you to: - Chain multiple steps: Execute a series of compute functions in sequence - Add conditional logic: Branch workflows based on data conditions - Introduce delays: Wait for specific durations between steps - React to events: Trigger workflows from record changes, schedules, or HTTP requests - Handle complex flows: Build approval workflows, multi-stage processing, and more
Key Features¶
Step Types¶
| Step Type | Description | Use Case |
|---|---|---|
| Compute | Execute a compute function | Data processing, API calls, calculations |
| Decision | Branch based on conditions | Route workflows based on data values |
| Delay | Wait for a duration | Rate limiting, scheduled follow-ups |
Trigger Types¶
| Trigger | Description | Example |
|---|---|---|
| Event-Driven | React to record events | Process new orders when created |
| Scheduled | Run on a schedule | Daily reports, cleanup jobs |
| On-Demand | Manual invocation | Ad-hoc workflow execution |
| HTTP Trigger | External HTTP requests | Webhook endpoints |
Workflow Context¶
Orchestrations maintain context across steps: - input: Original trigger payload (immutable) - context: Shared mutable state across steps - steps.\<id>.output: Output from completed steps
Creating an Orchestration¶
Via Console UI¶
- Navigate to Orchestrations in your workspace
- Click + Orchestration
- Configure the trigger type and settings
- Add steps using the visual builder
- Connect steps and configure conditions
- Save and activate
Via API¶
curl -X POST https://api.centrali.io/orchestration/api/v1/workspaces/my-workspace/orchestrations \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"slug": "order-processing",
"name": "Order Processing Workflow",
"description": "Process new orders through validation, fulfillment, and notification",
"trigger": {
"type": "event-driven",
"eventType": "record.created",
"structureSlug": "orders"
},
"steps": [
{
"id": "validate",
"name": "Validate Order",
"type": "compute",
"functionId": "func_validate_order",
"nextStepId": "check-result"
},
{
"id": "check-result",
"name": "Check Validation Result",
"type": "decision",
"cases": [
{
"conditions": [
{ "path": "steps.validate.output.isValid", "op": "eq", "value": true }
],
"nextStepId": "fulfill"
},
{
"conditions": [
{ "path": "steps.validate.output.isValid", "op": "eq", "value": false }
],
"nextStepId": "reject"
}
],
"defaultNextStepId": "reject"
},
{
"id": "fulfill",
"name": "Fulfill Order",
"type": "compute",
"functionId": "func_fulfill_order",
"nextStepId": "notify"
},
{
"id": "reject",
"name": "Reject Order",
"type": "compute",
"functionId": "func_reject_order"
},
{
"id": "notify",
"name": "Send Notification",
"type": "compute",
"functionId": "func_send_notification"
}
]
}'
Step Configuration¶
Compute Step¶
Executes a compute function and captures its output.
{
"id": "process-payment",
"name": "Process Payment",
"type": "compute",
"functionId": "func_payment_processor",
"timeoutMs": 30000,
"nextStepId": "send-receipt"
}
| Field | Type | Required | Description |
|---|---|---|---|
id | string | Yes | Unique step identifier |
name | string | No | Human-readable name |
type | string | Yes | Must be "compute" |
functionId | string | Yes | ID of the compute function to execute |
timeoutMs | number | No | Timeout in milliseconds (default: 300000) |
nextStepId | string | No | Next step to execute (null = end workflow) |
Decision Step¶
Evaluates conditions and routes to different steps based on results.
{
"id": "check-amount",
"name": "Check Order Amount",
"type": "decision",
"cases": [
{
"conditions": [
{ "path": "input.data.data.total", "op": "gt", "value": 1000 }
],
"nextStepId": "manual-review"
},
{
"conditions": [
{ "path": "input.data.data.total", "op": "lte", "value": 1000 }
],
"nextStepId": "auto-approve"
}
],
"defaultNextStepId": "auto-approve"
}
| Field | Type | Required | Description |
|---|---|---|---|
id | string | Yes | Unique step identifier |
name | string | No | Human-readable name |
type | string | Yes | Must be "decision" |
cases | array | Yes | Array of condition cases |
defaultNextStepId | string | Yes | Fallback step if no case matches |
Delay Step¶
Pauses workflow execution for a specified duration.
{
"id": "wait-for-processing",
"name": "Wait 5 Minutes",
"type": "delay",
"delayMs": 300000,
"nextStepId": "check-status"
}
| Field | Type | Required | Description |
|---|---|---|---|
id | string | Yes | Unique step identifier |
name | string | No | Human-readable name |
type | string | Yes | Must be "delay" |
delayMs | number | Yes | Delay duration in milliseconds |
nextStepId | string | No | Next step after delay |
Condition Operators¶
Use these operators in decision step conditions:
| Operator | Description | Example |
|---|---|---|
eq | Equals | { "path": "input.status", "op": "eq", "value": "approved" } |
neq | Not equals | { "path": "input.status", "op": "neq", "value": "rejected" } |
gt | Greater than | { "path": "input.amount", "op": "gt", "value": 100 } |
gte | Greater than or equal | { "path": "input.amount", "op": "gte", "value": 100 } |
lt | Less than | { "path": "input.amount", "op": "lt", "value": 100 } |
lte | Less than or equal | { "path": "input.amount", "op": "lte", "value": 100 } |
in | Value in array | { "path": "input.status", "op": "in", "value": ["pending", "review"] } |
notIn | Value not in array | { "path": "input.status", "op": "notIn", "value": ["rejected"] } |
exists | Field exists | { "path": "input.email", "op": "exists" } |
notExists | Field does not exist | { "path": "input.phone", "op": "notExists" } |
Path Syntax¶
Access data in conditions using dot-notation paths:
Available Paths¶
| Path Prefix | Description | Example |
|---|---|---|
input.* | Original trigger payload | input.recordId, input.data.data.status |
context.* | Shared mutable state | context.retryCount, context.approvalStatus |
steps.<id>.output.* | Output from a previous step | steps.validate.output.isValid |
Input (Trigger Payload)¶
The input object contains the original event that triggered the orchestration:
// For record.created event:
"input.event" // "record_created"
"input.workspaceSlug" // "my-workspace"
"input.recordSlug" // "orders"
"input.recordId" // "uuid-here"
"input.data.id" // Record ID
"input.data.data.status" // Field from record data
"input.data.version" // Version number
"input.timestamp" // ISO 8601 timestamp
"input.createdBy" // User ID
Context (Shared State)¶
The context object is mutable and shared across all steps. Compute functions can read from and write to context:
// In a compute function:
async function run() {
// Read from context
const retryCount = triggerParams.context?.retryCount || 0;
// Write to context (returned values are merged into context)
return {
context: {
retryCount: retryCount + 1,
lastProcessedAt: new Date().toISOString()
}
};
}
In decision step conditions:
"context.retryCount" // Number of retries
"context.approvalStatus" // Custom state
"context.lastProcessedAt" // Custom timestamp
Step Outputs¶
Each compute step's return value is stored and accessible to subsequent steps:
// If "validate" step returns: { isValid: true, errors: [] }
// In decision step conditions:
"steps.validate.output" // { isValid: true, errors: [] }
"steps.validate.output.isValid" // true
"steps.validate.output.errors" // []
Example workflow using step outputs:
{
"steps": [
{
"id": "validate",
"type": "compute",
"functionId": "func_validate",
"nextStepId": "check-valid"
},
{
"id": "check-valid",
"type": "decision",
"cases": [
{
"conditions": [
{ "path": "steps.validate.output.isValid", "op": "eq", "value": true }
],
"nextStepId": "process"
},
{
"conditions": [
{ "path": "steps.validate.output.errorCount", "op": "gt", "value": 0 }
],
"nextStepId": "handle-errors"
}
],
"defaultNextStepId": "reject"
}
]
}
Path Examples¶
// Access record ID from trigger
"input.recordId"
// Access nested field in record data
"input.data.data.orderStatus"
// Access output from "validate" step
"steps.validate.output.isValid"
// Access shared context
"context.approvalLevel"
Compute Functions in Orchestrations¶
When a compute function is executed as part of an orchestration, it receives enriched data via triggerParams:
What Compute Functions Receive¶
// triggerParams contains:
{
// Original trigger input (event payload)
event: "record_created",
workspaceSlug: "my-workspace",
recordSlug: "orders",
recordId: "uuid-here",
data: { ... },
timestamp: "2025-01-15T10:00:00Z",
createdBy: "user-id",
// Outputs from previous steps (keyed by step ID)
validate: { isValid: true, errors: [] },
transform: { processedData: { ... } },
// Shared context (if set by previous steps)
context: {
retryCount: 1,
customState: "value"
}
}
Accessing Data in Functions¶
async function run() {
// Access original trigger data
const recordId = triggerParams.recordId;
const recordData = triggerParams.data;
// Access output from previous "validate" step
const validationResult = triggerParams.validate;
const isValid = validationResult?.isValid;
// Access shared context
const retryCount = triggerParams.context?.retryCount || 0;
// Your processing logic
const result = await processOrder(recordId, recordData);
// Return outputs (will be available as steps.<thisStepId>.output)
return {
success: true,
orderId: result.id,
// Optionally update context
context: {
...triggerParams.context,
lastProcessedOrder: result.id
}
};
}
Returning Data from Functions¶
| Return Field | Purpose | Available As |
|---|---|---|
| Any field | Step output | steps.<stepId>.output.<field> in decision conditions |
context | Update shared state | context.<field> in subsequent steps |
Input Payloads by Trigger Type¶
The structure of input depends on how the orchestration was triggered.
On-Demand Trigger¶
For manually triggered runs, input contains whatever JSON you pass:
# Trigger with custom input
curl -X POST /orchestrations/{id}/runs \
-d '{ "orderId": "123", "priority": "high", "options": { "notify": true } }'
// In conditions and functions:
"input.orderId" // "123"
"input.priority" // "high"
"input.options.notify" // true
You define the schema - pass any JSON structure your workflow needs.
HTTP Trigger (Webhook)¶
For webhooks, input contains the request body plus metadata:
{
// Your JSON payload from request body
customField: "value",
data: { ... },
// Webhook metadata (automatically added)
_webhook: {
method: "POST", // HTTP method
path: "/webhooks/stripe", // Request path
contentType: "application/json",
userAgent: "Stripe/1.0",
query: { // Query parameters
token: "abc123"
}
},
// If body is not valid JSON:
_raw: "raw request body string"
}
Path examples: - input.customField - Your JSON payload fields - input._webhook.method - HTTP method used - input._webhook.query.token - Query parameter value - input._raw - Raw body if not JSON
Event-Driven Trigger¶
When triggered by record events, input contains the event payload.
record.created¶
{
event: "record_created",
workspaceSlug: string,
recordSlug: string, // Structure slug (e.g., "orders")
recordId: string, // UUID of created record
data: {
id: string,
data: { ... }, // Record field values
status: "active",
version: 1,
createdAt: string,
createdBy: string
},
timestamp: string,
createdBy: string
}
Path examples: - input.recordId - The record UUID - input.recordSlug - The structure slug - input.data.data.status - A field from the record - input.createdBy - User who created the record
record.updated¶
{
event: "record_updated",
workspaceSlug: string,
recordSlug: string,
recordId: string,
data: {
before: { ... }, // Record before update
after: { ... } // Record after update
},
timestamp: string,
updatedBy: string
}
Path examples: - input.data.before.data.status - Status before update - input.data.after.data.status - Status after update - input.updatedBy - User who updated
record.deleted¶
{
event: "record_deleted",
workspaceSlug: string,
recordSlug: string,
recordId: string,
data: { ... }, // The deleted record
timestamp: string,
deletedBy: string,
hardDelete: boolean
}
Path examples: - input.recordId - The deleted record ID - input.data.data.customerEmail - Field from deleted record - input.hardDelete - Whether permanently deleted
Trigger Configuration¶
Event-Driven Trigger¶
React to record events:
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | "event-driven" |
eventType | string | Yes | "record.created", "record.updated", "record.deleted", "record.restored", or "records.bulk_created" |
structureSlug | string | No | Filter to specific structure |
Scheduled Trigger¶
Run on a schedule:
{
"type": "scheduled",
"scheduleType": "cron",
"cronExpression": "0 9 * * *",
"timezone": "America/New_York"
}
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | "scheduled" |
scheduleType | string | Yes | "cron", "interval", or "once" |
cronExpression | string | For cron | Cron expression (e.g., "0 9 * * *") |
interval | number | For interval | Interval in milliseconds |
scheduledAt | string | For once | ISO 8601 timestamp |
timezone | string | No | Timezone (default: UTC) |
Common Cron Examples: - 0 * * * * - Every hour - 0 0 * * * - Daily at midnight - 0 9 * * 1 - Every Monday at 9 AM - */15 * * * * - Every 15 minutes
On-Demand Trigger¶
Manual invocation:
Invoke via API:
curl -X POST https://api.centrali.io/orchestration/api/v1/workspaces/my-workspace/orchestrations/orch_abc123/trigger \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"input": {
"customField": "value"
}
}'
HTTP Trigger¶
External webhook endpoint:
Orchestration Examples¶
Example 1: Order Processing Workflow¶
{
"slug": "order-processing",
"name": "Order Processing",
"trigger": {
"type": "event-driven",
"eventType": "record.created",
"structureSlug": "orders"
},
"steps": [
{
"id": "validate",
"type": "compute",
"functionId": "func_validate_order",
"nextStepId": "check-valid"
},
{
"id": "check-valid",
"type": "decision",
"cases": [
{
"conditions": [
{ "path": "steps.validate.output.isValid", "op": "eq", "value": true }
],
"nextStepId": "process-payment"
}
],
"defaultNextStepId": "reject-order"
},
{
"id": "process-payment",
"type": "compute",
"functionId": "func_process_payment",
"nextStepId": "send-confirmation"
},
{
"id": "send-confirmation",
"type": "compute",
"functionId": "func_send_email"
},
{
"id": "reject-order",
"type": "compute",
"functionId": "func_reject_order"
}
]
}
Example 2: Approval Workflow with Delay¶
{
"slug": "expense-approval",
"name": "Expense Approval",
"trigger": {
"type": "event-driven",
"eventType": "record.created",
"structureSlug": "expenses"
},
"steps": [
{
"id": "check-amount",
"type": "decision",
"cases": [
{
"conditions": [
{ "path": "input.data.data.amount", "op": "lte", "value": 100 }
],
"nextStepId": "auto-approve"
},
{
"conditions": [
{ "path": "input.data.data.amount", "op": "gt", "value": 100 }
],
"nextStepId": "request-approval"
}
],
"defaultNextStepId": "request-approval"
},
{
"id": "auto-approve",
"type": "compute",
"functionId": "func_approve_expense"
},
{
"id": "request-approval",
"type": "compute",
"functionId": "func_send_approval_request",
"nextStepId": "wait-for-response"
},
{
"id": "wait-for-response",
"type": "delay",
"delayMs": 86400000,
"nextStepId": "check-response"
},
{
"id": "check-response",
"type": "compute",
"functionId": "func_check_approval_status"
}
]
}
Example 3: Scheduled Report¶
{
"slug": "daily-report",
"name": "Daily Sales Report",
"trigger": {
"type": "scheduled",
"scheduleType": "cron",
"cronExpression": "0 8 * * *",
"timezone": "America/New_York"
},
"steps": [
{
"id": "gather-data",
"type": "compute",
"functionId": "func_gather_sales_data",
"nextStepId": "generate-report"
},
{
"id": "generate-report",
"type": "compute",
"functionId": "func_generate_report",
"nextStepId": "send-report"
},
{
"id": "send-report",
"type": "compute",
"functionId": "func_email_report"
}
]
}
Monitoring Runs¶
View Orchestration Runs¶
curl -X GET "https://api.centrali.io/orchestration/api/v1/workspaces/my-workspace/orchestrations/orch_abc123/runs" \
-H "Authorization: Bearer YOUR_TOKEN"
Response:
{
"data": [
{
"id": "run_xyz789",
"status": "completed",
"triggerType": "event-driven",
"currentStepId": null,
"startedAt": "2025-01-15T10:00:00Z",
"completedAt": "2025-01-15T10:00:15Z"
}
],
"meta": {
"total": 1,
"page": 1,
"pageSize": 20
}
}
View Run Details¶
curl -X GET "https://api.centrali.io/orchestration/api/v1/workspaces/my-workspace/orchestrations/orch_abc123/runs/run_xyz789" \
-H "Authorization: Bearer YOUR_TOKEN"
Best Practices¶
Workflow Design¶
- Keep workflows focused on a single business process
- Use descriptive step IDs and names
- Add decision steps to handle edge cases
- Use delays sparingly (consider async patterns for long waits)
Error Handling¶
- Design for failure - add fallback paths
- Use decision steps to check function outputs
- Log context at each step for debugging
Performance¶
- Keep individual functions fast
- Use appropriate timeouts
- Consider splitting long workflows
Testing¶
- Test individual functions before adding to orchestrations
- Use on-demand triggers for testing workflows
- Verify decision conditions with different inputs
Related Documentation¶
- Event Payloads Reference - Detailed event payload documentation
- Compute Functions - Writing compute functions
- Orchestrations API - Complete API reference