Realtime API Reference¶
HTTP API reference for the Centrali Realtime Service. For most use cases, we recommend using the SDK which handles authentication, reconnection, and event parsing automatically.
Overview¶
The Realtime Service uses Server-Sent Events (SSE) to stream record events to clients. SSE is a standard HTTP-based protocol supported by all modern browsers and most HTTP clients.
Base URL: https://api.centrali.io/realtime
Endpoints¶
Subscribe to Events¶
Opens an SSE stream for receiving record events in the specified workspace.
Path Parameters¶
| Parameter | Type | Required | Description |
|---|---|---|---|
workspaceSlug | string | Yes | The workspace to subscribe to |
Query Parameters¶
| Parameter | Type | Required | Description |
|---|---|---|---|
access_token | string | Yes* | JWT access token for authentication |
structures | string | No | Comma-separated structure slugs to filter |
events | string | No | Comma-separated event types to filter |
filter | string | No | CFL v1 filter expression |
jobId | string | No | Filter compute events to a specific job ID |
triggerType | string | No | Filter compute events by trigger type |
functionId | string | No | Filter compute events to a specific function |
*The access_token can also be provided via the Authorization: Bearer <token> header.
Note: The jobId, triggerType, and functionId parameters are used for filtering compute function events (function_run_completed, function_run_failed). They have no effect on record events.
Example Request¶
curl -N "https://api.centrali.io/realtime/workspace/my-workspace/events?\
access_token=eyJhbG...&\
structures=order,invoice&\
events=record_created,record_updated&\
filter=data.status:pending"
Response Headers¶
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no
SSE Stream Format¶
The server sends events in standard SSE format:
retry: 3000
: connected to workspace my-workspace
event: message
data: {"event":"record_created","workspaceSlug":"my-workspace","recordSlug":"order","recordId":"rec_abc123","data":{"customer":"John","total":99.99},"timestamp":"2025-01-15T10:30:00Z","createdBy":"user_123"}
: ping
event: message
data: {"event":"record_updated","workspaceSlug":"my-workspace","recordSlug":"order","recordId":"rec_abc123","data":{"before":{"status":"pending"},"after":{"status":"shipped"}},"timestamp":"2025-01-15T10:35:00Z","updatedBy":"user_456"}
event: close
data: {"reason":"timeout","reconnect":true}
SSE Event Types¶
| Event Name | Description |
|---|---|
message | Record event (create, update, delete) |
close | Connection closing, includes reconnection hint |
| (comment) | Keep-alive ping or status message |
Event Payloads¶
Record Event¶
{
"event": "record_created",
"workspaceSlug": "my-workspace",
"recordSlug": "order",
"recordId": "rec_abc123",
"data": {
"customer": "John Doe",
"total": 99.99,
"status": "pending"
},
"timestamp": "2025-01-15T10:30:00Z",
"createdBy": "user_123"
}
| Field | Type | Description |
|---|---|---|
event | string | Event type: record_created, record_updated, record_deleted, records_bulk_created |
workspaceSlug | string | Workspace where event occurred |
recordSlug | string | Structure's slug (e.g., "order") |
recordId | string | Unique record identifier (not present in bulk events) |
recordIds | string[] | Array of record IDs (only in bulk events) |
count | number | Number of records (only in bulk events) |
data | object | Record data (see format below, not present in bulk events) |
timestamp | string | ISO 8601 timestamp |
createdBy | string | User ID (for create events) |
updatedBy | string | User ID (for update events) |
deletedBy | string | User ID (for delete events) |
Data Format by Event Type¶
record_created:
record_updated:
record_deleted:
records_bulk_created:
{
"event": "records_bulk_created",
"workspaceSlug": "my-workspace",
"recordSlug": "order",
"structureId": "123e4567-e89b-12d3-a456-426614174000",
"recordIds": ["rec_abc123", "rec_def456", "rec_ghi789"],
"count": 3,
"timestamp": "2025-01-15T10:30:00Z",
"createdBy": "user_123",
"schemaDiscoveryMode": "strict"
}
Note: Bulk events contain only record IDs, not full record data. Use the Records API to fetch full record details if needed.
Compute Function Event¶
Sent when a compute function completes or fails:
{
"event": "function_run_completed",
"workspaceSlug": "my-workspace",
"jobId": "job_abc123",
"runId": "run_xyz789",
"triggerId": "trigger_456",
"functionId": "func_789",
"triggerType": "on-demand",
"status": "completed",
"outputs": { "result": "success", "processedItems": 42 },
"duration": 1234,
"memoryUsage": { "heapUsed": 52428800, "rss": 104857600 },
"userId": "user_123",
"timestamp": "2025-01-15T10:30:00Z"
}
| Field | Type | Description |
|---|---|---|
event | string | Event type: function_run_completed or function_run_failed |
workspaceSlug | string | Workspace where function ran |
jobId | string | Unique job identifier (returned from execute call) |
runId | string | Function run record ID |
triggerId | string | Trigger that initiated the run |
functionId | string | Function that was executed |
triggerType | string | on-demand, event-driven, scheduled, or http-trigger |
status | string | completed or failure |
outputs | object | Function return value (if completed) |
error | object | Error details (if failed) |
duration | number | Execution time in milliseconds |
memoryUsage | object | Memory metrics: heapUsed, rss |
userId | string | User who triggered the run |
timestamp | string | ISO 8601 timestamp |
Error Object (for failed runs)¶
{
"error": {
"message": "TypeError: Cannot read property 'foo' of undefined",
"code": "EXECUTION_ERROR",
"stack": "at handler (index.js:10:5)..."
}
}
Close Event¶
Sent before server-initiated disconnection:
| Field | Type | Description |
|---|---|---|
reason | string | Reason for closing: timeout, permission_revoked, maintenance |
reconnect | boolean | Whether client should attempt to reconnect |
Error Responses¶
Error response format varies by error type. Authentication errors return JSON; other errors return plain text.
400 Bad Request¶
Plain text response:
401 Unauthorized¶
JSON response with error code:
403 Forbidden¶
JSON response with error code:
429 Too Many Requests¶
JSON response for plan-based rate limits:
Plain text for workspace connection limits:
500 Internal Server Error¶
JSON response for auth service errors:
Plain text for other errors:
503 Service Unavailable¶
Plain text response:
Error Codes¶
Note: Not all errors return a code field. Authentication errors (401, 403) include JSON with codes; other errors may return plain text.
| Code | HTTP Status | Description | Recoverable |
|---|---|---|---|
MISSING_TOKEN | 401 | No token provided | No |
TOKEN_EXPIRED | 401 | Token has expired | Yes |
INVALID_TOKEN | 401 | Token is malformed or signature invalid | No |
WORKSPACE_MISMATCH | 403 | Token not valid for requested workspace | No |
FORBIDDEN | 403 | User lacks permission | No |
AUTH_ERROR | 500 | Authorization service unavailable | Yes |
RATE_LIMIT_EXCEEDED | 429 | Connection limit reached | Yes |
CFL Filter Syntax¶
Centrali Filter Language (CFL) v1 for filtering events by data values.
Format¶
Operators¶
| Operator | Description | Example |
|---|---|---|
eq | Equals (default) | data.status:shipped |
ne | Not equals | data.status:ne:cancelled |
gt | Greater than | data.amount:gt:100 |
lt | Less than | data.amount:lt:50 |
gte | Greater than or equal | data.priority:gte:5 |
lte | Less than or equal | data.quantity:lte:10 |
in | Value in list | data.status:in:a,b,c |
nin | Not in list | data.status:nin:x,y |
contains | Contains substring | data.name:contains:test |
startswith | Starts with | data.sku:startswith:PROD- |
endswith | Ends with | data.email:endswith:@co.com |
Event Data Structure¶
Filter paths depend on the event type:
| Event | Data Structure | Filter Example |
|---|---|---|
record_created | { field, ... } | data.status:pending |
record_updated | { before: {...}, after: {...} } | data.after.status:shipped |
record_deleted | { field, ... } | data.status:archived |
Examples¶
# Simple equality (for create/delete events)
filter=data.status:shipped
# For update events, filter on new value
filter=data.after.status:shipped
# Or filter on old value
filter=data.before.status:pending
# Numeric comparison
filter=data.total:gt:1000
# Multiple values
filter=data.status:in:pending,processing,shipped
# Nested fields
filter=data.address.city:New York
Rate Limits¶
Connection limits are based on workspace plan:
| Plan | Concurrent Connections |
|---|---|
| Free | 10 per workspace |
| Standard | 100 per workspace |
| Pro | 1,000 per workspace |
| Enterprise | Custom |
Connections exceeding the limit receive HTTP 429.
Connection Behavior¶
Keep-Alive¶
The server sends comment pings every 30 seconds:
Connection Timeout¶
Connections automatically close after 1 hour with a close event:
Clients should reconnect when reconnect: true.
Retry Directive¶
The server sends a retry directive at connection start:
This tells SSE clients to wait 3 seconds before reconnecting after disconnection.
Client Implementation¶
JavaScript (Browser)¶
const url = new URL('https://api.centrali.io/realtime/workspace/my-workspace/events');
url.searchParams.set('access_token', token);
url.searchParams.set('structures', 'order');
const eventSource = new EventSource(url);
eventSource.addEventListener('message', (e) => {
const event = JSON.parse(e.data);
console.log('Event:', event);
});
eventSource.addEventListener('close', (e) => {
const data = JSON.parse(e.data);
if (data.reconnect) {
eventSource.close();
// Reconnect with fresh token
setTimeout(() => connect(), 1000);
}
});
eventSource.onerror = (e) => {
console.error('Connection error');
};
Node.js¶
import { EventSource } from 'eventsource';
const url = `https://api.centrali.io/realtime/workspace/my-workspace/events?access_token=${token}`;
const eventSource = new EventSource(url);
eventSource.addEventListener('message', (e) => {
const event = JSON.parse(e.data);
console.log('Event:', event);
});
eventSource.addEventListener('error', (e) => {
console.error('Error:', e);
});
Waiting for Compute Function Result¶
// Trigger a function and wait for the result via SSE
async function executeAndWait(triggerId, params, token) {
// 1. Execute the function
const response = await fetch(`https://api.centrali.io/data/workspace/my-workspace/functions/${triggerId}/execute`, {
method: 'POST',
headers: { 'Authorization': `Bearer ${token}`, 'Content-Type': 'application/json' },
body: JSON.stringify({ params })
});
const { jobId } = await response.json();
// 2. Connect to SSE with jobId filter
return new Promise((resolve, reject) => {
const url = new URL('https://api.centrali.io/realtime/workspace/my-workspace/events');
url.searchParams.set('access_token', token);
url.searchParams.set('events', 'function_run_completed,function_run_failed');
url.searchParams.set('jobId', jobId);
const eventSource = new EventSource(url);
eventSource.addEventListener('message', (e) => {
const event = JSON.parse(e.data);
eventSource.close();
if (event.event === 'function_run_completed') {
resolve(event.outputs);
} else {
reject(new Error(event.error?.message || 'Function failed'));
}
});
eventSource.onerror = () => {
eventSource.close();
reject(new Error('SSE connection error'));
};
// Timeout after 60 seconds
setTimeout(() => {
eventSource.close();
reject(new Error('Function execution timeout'));
}, 60000);
});
}
// Usage
const result = await executeAndWait('trigger_123', { input: 'data' }, token);
console.log('Function result:', result);
cURL¶
# Stream events (use -N to disable buffering)
curl -N "https://api.centrali.io/realtime/workspace/my-workspace/events?access_token=eyJ..."
# Stream compute function events for a specific job
curl -N "https://api.centrali.io/realtime/workspace/my-workspace/events?access_token=eyJ...&events=function_run_completed,function_run_failed&jobId=job_abc123"
Python¶
import sseclient
import requests
import json
url = "https://api.centrali.io/realtime/workspace/my-workspace/events"
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers, stream=True)
client = sseclient.SSEClient(response)
for event in client.events():
if event.event == "message":
data = json.loads(event.data)
print(f"Event: {data}")
elif event.event == "close":
data = json.loads(event.data)
print(f"Connection closing: {data['reason']}")
if data.get('reconnect'):
# Reconnect logic here
pass
Health Endpoints¶
Liveness¶
Returns 200 if service is running.
Readiness¶
Returns 200 if service is ready to accept connections (Redis connected).
Related Documentation¶
- SDK Quickstart - Recommended approach
- Authentication - Token management
- Filtering - CFL filter syntax
- Troubleshooting - Common issues