Skip to content

Orchestrations

Overview

Orchestrations are multi-step workflows that coordinate complex business processes. Unlike single compute functions that react to one event, orchestrations chain multiple steps together with conditional logic, delays, and decision branches.

What are Orchestrations?

Orchestrations allow you to: - Chain multiple steps: Execute a series of compute functions in sequence - Add conditional logic: Branch workflows based on data conditions - Introduce delays: Wait for specific durations between steps - React to events: Trigger workflows from record changes, schedules, or HTTP requests - Handle complex flows: Build approval workflows, multi-stage processing, and more

Key Features

Step Types

Step Type Description Use Case
Compute Execute a compute function Data processing, API calls, calculations
Decision Branch based on conditions Route workflows based on data values
Delay Wait for a duration Rate limiting, scheduled follow-ups

Trigger Types

Trigger Description Example
Event-Driven React to record events Process new orders when created
Scheduled Run on a schedule Daily reports, cleanup jobs
On-Demand Manual invocation Ad-hoc workflow execution
HTTP Trigger External HTTP requests Webhook endpoints

Workflow Context

Orchestrations maintain context across steps: - input: Original trigger payload (immutable) - context: Shared mutable state across steps - steps.\<id>.output: Output from completed steps


Creating an Orchestration

Via Console UI

  1. Navigate to Orchestrations in your workspace
  2. Click + Orchestration
  3. Configure the trigger type and settings
  4. Add steps using the visual builder
  5. Connect steps and configure conditions
  6. Save and activate

Via API

curl -X POST https://api.centrali.io/orchestration/api/v1/workspaces/my-workspace/orchestrations \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "slug": "order-processing",
    "name": "Order Processing Workflow",
    "description": "Process new orders through validation, fulfillment, and notification",
    "trigger": {
      "type": "event-driven",
      "eventType": "record.created",
      "structureSlug": "orders"
    },
    "steps": [
      {
        "id": "validate",
        "name": "Validate Order",
        "type": "compute",
        "functionId": "func_validate_order",
        "nextStepId": "check-result"
      },
      {
        "id": "check-result",
        "name": "Check Validation Result",
        "type": "decision",
        "cases": [
          {
            "conditions": [
              { "path": "steps.validate.output.isValid", "op": "eq", "value": true }
            ],
            "nextStepId": "fulfill"
          },
          {
            "conditions": [
              { "path": "steps.validate.output.isValid", "op": "eq", "value": false }
            ],
            "nextStepId": "reject"
          }
        ],
        "defaultNextStepId": "reject"
      },
      {
        "id": "fulfill",
        "name": "Fulfill Order",
        "type": "compute",
        "functionId": "func_fulfill_order",
        "nextStepId": "notify"
      },
      {
        "id": "reject",
        "name": "Reject Order",
        "type": "compute",
        "functionId": "func_reject_order"
      },
      {
        "id": "notify",
        "name": "Send Notification",
        "type": "compute",
        "functionId": "func_send_notification"
      }
    ]
  }'

Step Configuration

Compute Step

Executes a compute function and captures its output.

{
  "id": "process-payment",
  "name": "Process Payment",
  "type": "compute",
  "functionId": "func_payment_processor",
  "timeoutMs": 30000,
  "nextStepId": "send-receipt"
}
Field Type Required Description
id string Yes Unique step identifier
name string No Human-readable name
type string Yes Must be "compute"
functionId string Yes ID of the compute function to execute
timeoutMs number No Timeout in milliseconds (default: 300000)
nextStepId string No Next step to execute (null = end workflow)

Decision Step

Evaluates conditions and routes to different steps based on results.

{
  "id": "check-amount",
  "name": "Check Order Amount",
  "type": "decision",
  "cases": [
    {
      "conditions": [
        { "path": "input.data.data.total", "op": "gt", "value": 1000 }
      ],
      "nextStepId": "manual-review"
    },
    {
      "conditions": [
        { "path": "input.data.data.total", "op": "lte", "value": 1000 }
      ],
      "nextStepId": "auto-approve"
    }
  ],
  "defaultNextStepId": "auto-approve"
}
Field Type Required Description
id string Yes Unique step identifier
name string No Human-readable name
type string Yes Must be "decision"
cases array Yes Array of condition cases
defaultNextStepId string Yes Fallback step if no case matches

Delay Step

Pauses workflow execution for a specified duration.

{
  "id": "wait-for-processing",
  "name": "Wait 5 Minutes",
  "type": "delay",
  "delayMs": 300000,
  "nextStepId": "check-status"
}
Field Type Required Description
id string Yes Unique step identifier
name string No Human-readable name
type string Yes Must be "delay"
delayMs number Yes Delay duration in milliseconds
nextStepId string No Next step after delay

Condition Operators

Use these operators in decision step conditions:

Operator Description Example
eq Equals { "path": "input.status", "op": "eq", "value": "approved" }
neq Not equals { "path": "input.status", "op": "neq", "value": "rejected" }
gt Greater than { "path": "input.amount", "op": "gt", "value": 100 }
gte Greater than or equal { "path": "input.amount", "op": "gte", "value": 100 }
lt Less than { "path": "input.amount", "op": "lt", "value": 100 }
lte Less than or equal { "path": "input.amount", "op": "lte", "value": 100 }
in Value in array { "path": "input.status", "op": "in", "value": ["pending", "review"] }
notIn Value not in array { "path": "input.status", "op": "notIn", "value": ["rejected"] }
exists Field exists { "path": "input.email", "op": "exists" }
notExists Field does not exist { "path": "input.phone", "op": "notExists" }

Path Syntax

Access data in conditions using dot-notation paths:

Available Paths

Path Prefix Description Example
input.* Original trigger payload input.recordId, input.data.data.status
context.* Shared mutable state context.retryCount, context.approvalStatus
steps.<id>.output.* Output from a previous step steps.validate.output.isValid

Input (Trigger Payload)

The input object contains the original event that triggered the orchestration:

// For record.created event:
"input.event"              // "record_created"
"input.workspaceSlug"      // "my-workspace"
"input.recordSlug"         // "orders"
"input.recordId"           // "uuid-here"
"input.data.id"            // Record ID
"input.data.data.status"   // Field from record data
"input.data.version"       // Version number
"input.timestamp"          // ISO 8601 timestamp
"input.createdBy"          // User ID

Context (Shared State)

The context object is mutable and shared across all steps. Compute functions can read from and write to context:

// In a compute function:
async function run() {
  // Read from context
  const retryCount = triggerParams.context?.retryCount || 0;

  // Write to context (returned values are merged into context)
  return {
    context: {
      retryCount: retryCount + 1,
      lastProcessedAt: new Date().toISOString()
    }
  };
}

In decision step conditions:

"context.retryCount"       // Number of retries
"context.approvalStatus"   // Custom state
"context.lastProcessedAt"  // Custom timestamp

Step Outputs

Each compute step's return value is stored and accessible to subsequent steps:

// If "validate" step returns: { isValid: true, errors: [] }

// In decision step conditions:
"steps.validate.output"           // { isValid: true, errors: [] }
"steps.validate.output.isValid"   // true
"steps.validate.output.errors"    // []

Example workflow using step outputs:

{
  "steps": [
    {
      "id": "validate",
      "type": "compute",
      "functionId": "func_validate",
      "nextStepId": "check-valid"
    },
    {
      "id": "check-valid",
      "type": "decision",
      "cases": [
        {
          "conditions": [
            { "path": "steps.validate.output.isValid", "op": "eq", "value": true }
          ],
          "nextStepId": "process"
        },
        {
          "conditions": [
            { "path": "steps.validate.output.errorCount", "op": "gt", "value": 0 }
          ],
          "nextStepId": "handle-errors"
        }
      ],
      "defaultNextStepId": "reject"
    }
  ]
}

Path Examples

// Access record ID from trigger
"input.recordId"

// Access nested field in record data
"input.data.data.orderStatus"

// Access output from "validate" step
"steps.validate.output.isValid"

// Access shared context
"context.approvalLevel"

Compute Functions in Orchestrations

When a compute function is executed as part of an orchestration, it receives enriched data via triggerParams:

What Compute Functions Receive

// triggerParams contains:
{
  // Original trigger input (event payload)
  event: "record_created",
  workspaceSlug: "my-workspace",
  recordSlug: "orders",
  recordId: "uuid-here",
  data: { ... },
  timestamp: "2025-01-15T10:00:00Z",
  createdBy: "user-id",

  // Outputs from previous steps (keyed by step ID)
  validate: { isValid: true, errors: [] },
  transform: { processedData: { ... } },

  // Shared context (if set by previous steps)
  context: {
    retryCount: 1,
    customState: "value"
  }
}

Accessing Data in Functions

async function run() {
  // Access original trigger data
  const recordId = triggerParams.recordId;
  const recordData = triggerParams.data;

  // Access output from previous "validate" step
  const validationResult = triggerParams.validate;
  const isValid = validationResult?.isValid;

  // Access shared context
  const retryCount = triggerParams.context?.retryCount || 0;

  // Your processing logic
  const result = await processOrder(recordId, recordData);

  // Return outputs (will be available as steps.<thisStepId>.output)
  return {
    success: true,
    orderId: result.id,
    // Optionally update context
    context: {
      ...triggerParams.context,
      lastProcessedOrder: result.id
    }
  };
}

Returning Data from Functions

Return Field Purpose Available As
Any field Step output steps.<stepId>.output.<field> in decision conditions
context Update shared state context.<field> in subsequent steps

Input Payloads by Trigger Type

The structure of input depends on how the orchestration was triggered.

On-Demand Trigger

For manually triggered runs, input contains whatever JSON you pass:

# Trigger with custom input
curl -X POST /orchestrations/{id}/runs \
  -d '{ "orderId": "123", "priority": "high", "options": { "notify": true } }'
// In conditions and functions:
"input.orderId"          // "123"
"input.priority"         // "high"
"input.options.notify"   // true

You define the schema - pass any JSON structure your workflow needs.


HTTP Trigger (Webhook)

For webhooks, input contains the request body plus metadata:

{
  // Your JSON payload from request body
  customField: "value",
  data: { ... },

  // Webhook metadata (automatically added)
  _webhook: {
    method: "POST",           // HTTP method
    path: "/webhooks/stripe", // Request path
    contentType: "application/json",
    userAgent: "Stripe/1.0",
    query: {                  // Query parameters
      token: "abc123"
    }
  },

  // If body is not valid JSON:
  _raw: "raw request body string"
}

Path examples: - input.customField - Your JSON payload fields - input._webhook.method - HTTP method used - input._webhook.query.token - Query parameter value - input._raw - Raw body if not JSON


Event-Driven Trigger

When triggered by record events, input contains the event payload.

record.created

{
  event: "record_created",
  workspaceSlug: string,
  recordSlug: string,        // Structure slug (e.g., "orders")
  recordId: string,          // UUID of created record
  data: {
    id: string,
    data: { ... },           // Record field values
    status: "active",
    version: 1,
    createdAt: string,
    createdBy: string
  },
  timestamp: string,
  createdBy: string
}

Path examples: - input.recordId - The record UUID - input.recordSlug - The structure slug - input.data.data.status - A field from the record - input.createdBy - User who created the record

record.updated

{
  event: "record_updated",
  workspaceSlug: string,
  recordSlug: string,
  recordId: string,
  data: {
    before: { ... },         // Record before update
    after: { ... }           // Record after update
  },
  timestamp: string,
  updatedBy: string
}

Path examples: - input.data.before.data.status - Status before update - input.data.after.data.status - Status after update - input.updatedBy - User who updated

record.deleted

{
  event: "record_deleted",
  workspaceSlug: string,
  recordSlug: string,
  recordId: string,
  data: { ... },             // The deleted record
  timestamp: string,
  deletedBy: string,
  hardDelete: boolean
}

Path examples: - input.recordId - The deleted record ID - input.data.data.customerEmail - Field from deleted record - input.hardDelete - Whether permanently deleted


Trigger Configuration

Event-Driven Trigger

React to record events:

{
  "type": "event-driven",
  "eventType": "record.created",
  "structureSlug": "orders"
}
Field Type Required Description
type string Yes "event-driven"
eventType string Yes "record.created", "record.updated", "record.deleted", "record.restored", or "records.bulk_created"
structureSlug string No Filter to specific structure

Scheduled Trigger

Run on a schedule:

{
  "type": "scheduled",
  "scheduleType": "cron",
  "cronExpression": "0 9 * * *",
  "timezone": "America/New_York"
}
Field Type Required Description
type string Yes "scheduled"
scheduleType string Yes "cron", "interval", or "once"
cronExpression string For cron Cron expression (e.g., "0 9 * * *")
interval number For interval Interval in milliseconds
scheduledAt string For once ISO 8601 timestamp
timezone string No Timezone (default: UTC)

Common Cron Examples: - 0 * * * * - Every hour - 0 0 * * * - Daily at midnight - 0 9 * * 1 - Every Monday at 9 AM - */15 * * * * - Every 15 minutes

On-Demand Trigger

Manual invocation:

{
  "type": "on-demand"
}

Invoke via API:

curl -X POST https://api.centrali.io/orchestration/api/v1/workspaces/my-workspace/orchestrations/orch_abc123/trigger \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "input": {
      "customField": "value"
    }
  }'

HTTP Trigger

External webhook endpoint:

{
  "type": "http-trigger",
  "path": "/webhooks/stripe-payment",
  "validateSignature": true
}

Orchestration Examples

Example 1: Order Processing Workflow

{
  "slug": "order-processing",
  "name": "Order Processing",
  "trigger": {
    "type": "event-driven",
    "eventType": "record.created",
    "structureSlug": "orders"
  },
  "steps": [
    {
      "id": "validate",
      "type": "compute",
      "functionId": "func_validate_order",
      "nextStepId": "check-valid"
    },
    {
      "id": "check-valid",
      "type": "decision",
      "cases": [
        {
          "conditions": [
            { "path": "steps.validate.output.isValid", "op": "eq", "value": true }
          ],
          "nextStepId": "process-payment"
        }
      ],
      "defaultNextStepId": "reject-order"
    },
    {
      "id": "process-payment",
      "type": "compute",
      "functionId": "func_process_payment",
      "nextStepId": "send-confirmation"
    },
    {
      "id": "send-confirmation",
      "type": "compute",
      "functionId": "func_send_email"
    },
    {
      "id": "reject-order",
      "type": "compute",
      "functionId": "func_reject_order"
    }
  ]
}

Example 2: Approval Workflow with Delay

{
  "slug": "expense-approval",
  "name": "Expense Approval",
  "trigger": {
    "type": "event-driven",
    "eventType": "record.created",
    "structureSlug": "expenses"
  },
  "steps": [
    {
      "id": "check-amount",
      "type": "decision",
      "cases": [
        {
          "conditions": [
            { "path": "input.data.data.amount", "op": "lte", "value": 100 }
          ],
          "nextStepId": "auto-approve"
        },
        {
          "conditions": [
            { "path": "input.data.data.amount", "op": "gt", "value": 100 }
          ],
          "nextStepId": "request-approval"
        }
      ],
      "defaultNextStepId": "request-approval"
    },
    {
      "id": "auto-approve",
      "type": "compute",
      "functionId": "func_approve_expense"
    },
    {
      "id": "request-approval",
      "type": "compute",
      "functionId": "func_send_approval_request",
      "nextStepId": "wait-for-response"
    },
    {
      "id": "wait-for-response",
      "type": "delay",
      "delayMs": 86400000,
      "nextStepId": "check-response"
    },
    {
      "id": "check-response",
      "type": "compute",
      "functionId": "func_check_approval_status"
    }
  ]
}

Example 3: Scheduled Report

{
  "slug": "daily-report",
  "name": "Daily Sales Report",
  "trigger": {
    "type": "scheduled",
    "scheduleType": "cron",
    "cronExpression": "0 8 * * *",
    "timezone": "America/New_York"
  },
  "steps": [
    {
      "id": "gather-data",
      "type": "compute",
      "functionId": "func_gather_sales_data",
      "nextStepId": "generate-report"
    },
    {
      "id": "generate-report",
      "type": "compute",
      "functionId": "func_generate_report",
      "nextStepId": "send-report"
    },
    {
      "id": "send-report",
      "type": "compute",
      "functionId": "func_email_report"
    }
  ]
}

Monitoring Runs

View Orchestration Runs

curl -X GET "https://api.centrali.io/orchestration/api/v1/workspaces/my-workspace/orchestrations/orch_abc123/runs" \
  -H "Authorization: Bearer YOUR_TOKEN"

Response:

{
  "data": [
    {
      "id": "run_xyz789",
      "status": "completed",
      "triggerType": "event-driven",
      "currentStepId": null,
      "startedAt": "2025-01-15T10:00:00Z",
      "completedAt": "2025-01-15T10:00:15Z"
    }
  ],
  "meta": {
    "total": 1,
    "page": 1,
    "pageSize": 20
  }
}

View Run Details

curl -X GET "https://api.centrali.io/orchestration/api/v1/workspaces/my-workspace/orchestrations/orch_abc123/runs/run_xyz789" \
  -H "Authorization: Bearer YOUR_TOKEN"

Best Practices

Workflow Design

  • Keep workflows focused on a single business process
  • Use descriptive step IDs and names
  • Add decision steps to handle edge cases
  • Use delays sparingly (consider async patterns for long waits)

Error Handling

  • Design for failure - add fallback paths
  • Use decision steps to check function outputs
  • Log context at each step for debugging

Performance

  • Keep individual functions fast
  • Use appropriate timeouts
  • Consider splitting long workflows

Testing

  • Test individual functions before adding to orchestrations
  • Use on-demand triggers for testing workflows
  • Verify decision conditions with different inputs