Subscribe an Agent to Your Event Stream¶
Point an outbound webhook subscription at an AI agent's HTTP endpoint and let the agent decide, per event, whether to act. This is a reactive, durable path: Centrali queues and retries deliveries, so the agent reacts to workspace changes without polling.
This pattern needs no platform change — it uses the Outbound Webhooks primitive that Centrali already ships. You create a subscription, stand up an endpoint that verifies the signature, and route each event into your agent's decision loop.
When to Use This¶
Centrali exposes three ways for an agent to consume workspace events. Pick by the agent's runtime and how it should react:
| Path | Mechanism | Best when |
|---|---|---|
| Webhook subscription (this guide) | Centrali POSTs each event to your endpoint | The agent runs as a service and needs queued, durable, retried delivery. Survives agent downtime via replay. |
| Realtime (SSE) | Agent holds an open stream | The agent is online and wants low-latency events while connected. See Realtime Quickstart. |
| MCP tools | Agent pulls via tool calls | The agent is conversational / on-demand and queries state when prompted. See MCP Server. |
These compose. A common shape: an MCP-driven agent for ad-hoc questions plus a webhook subscription so the same agent reacts autonomously to record changes.
Tip
Need code to run inside Centrali when an event happens? Use event-driven triggers or automations instead. Use a webhook subscription when the agent runs outside Centrali.
How It Works¶
record change ─▶ Centrali ─▶ signed POST ─▶ your endpoint ─▶ verify ─▶ ack 200
(queue, │
retry, └─▶ enqueue ─▶ agent.decide(event)
replay) │
├─ act (call a tool, write back, notify)
└─ skip (no-op, log only)
Centrali delivers at least once. The agent endpoint should acknowledge fast (return 200 as soon as the event is verified and durably enqueued) and run the decision asynchronously, so a slow model call never blocks delivery or triggers a retry.
Prerequisites¶
- A Centrali workspace and a service account (client ID + secret)
- A publicly reachable HTTPS endpoint for the agent (for local development, a tunnel such as
ngrokworks) - Node.js 18+
Step 1 — Create the Subscription¶
Point the subscription url at the agent's endpoint and subscribe to the record events the agent should react to. Scope with recordSlugs so the agent only sees the collections it cares about.
curl -X POST https://api.centrali.io/data/workspace/my-workspace/api/v1/webhook-subscriptions \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "Order triage agent",
"url": "https://your-agent.example.com/webhooks/centrali",
"events": ["record_created", "record_updated"],
"recordSlugs": ["orders"]
}'
The 201 response includes the signing secret (prefixed whsec_) exactly once. Store it in the agent's environment as WEBHOOK_SECRET — you cannot read it back later, only rotate it.
Available record events are record_created, record_updated, record_deleted, and records_bulk_created. See Outbound Webhooks for the full delivery contract.
Step 2 — Verify, Then Decide¶
Every delivery is signed with the Standard Webhooks model. Verify the signature against the raw request body before doing anything with the event, then hand the parsed event to the agent's decision step.
The verification below is identical to the canonical verifier in Outbound Webhooks — the agent layer is what's added on top.
import crypto from 'node:crypto';
import express from 'express';
const app = express();
// --- Signature verification (Standard Webhooks) ---
function deriveKey(secret: string): Buffer {
// Centrali secrets are `whsec_`-prefixed and base64url-encoded.
return Buffer.from(secret.replace(/^whsec_/, ''), 'base64url');
}
function timingSafeEqual(a: string, b: string): boolean {
const left = Buffer.from(a);
const right = Buffer.from(b);
return left.length === right.length && crypto.timingSafeEqual(left, right);
}
function verify(headers: Record<string, unknown>, rawBody: Buffer, secret: string): boolean {
const id = (headers['webhook-id'] ?? headers['centrali-id']) as string | undefined;
const ts = (headers['webhook-timestamp'] ?? headers['centrali-timestamp']) as string | undefined;
const sig = (headers['webhook-signature'] ?? headers['centrali-signature']) as string | undefined;
if (!id || !ts || !sig) return false;
if (Math.abs(Date.now() / 1000 - Number(ts)) > 300) return false; // 5-minute tolerance
const expected = `v1,${crypto
.createHmac('sha256', deriveKey(secret))
.update(`${id}.${ts}.${rawBody.toString('utf8')}`)
.digest('base64')}`;
// During secret rotation the header carries multiple space-separated signatures.
return String(sig)
.split(/\s+/)
.filter(Boolean)
.some((candidate) => timingSafeEqual(candidate, expected));
}
// --- The agent decision step ---
interface CentraliEvent {
event: string;
workspaceSlug: string;
recordSlug?: string;
recordId?: string;
data?: Record<string, unknown>;
timestamp: string;
correlationId?: string;
}
async function decide(event: CentraliEvent): Promise<void> {
// Per-event triage. Cheap, deterministic checks first — only escalate to the
// model (and its cost/latency) for events that actually warrant a decision.
if (event.event !== 'record_created' || event.recordSlug !== 'orders') return;
const order = event.data?.data as { total?: number } | undefined;
if (!order || (order.total ?? 0) < 1000) return; // small orders: no-op
// High-value order → let the agent decide what to do.
//
// import Anthropic from '@anthropic-ai/sdk';
// const client = new Anthropic();
// const decision = await client.messages.create({
// model: 'claude-sonnet-4-6',
// max_tokens: 512,
// messages: [{ role: 'user', content: `Triage this order: ${JSON.stringify(order)}` }],
// });
// // ...then act on `decision`: write back to Centrali, notify a human, call a tool.
console.log(`[agent] reviewing high-value order ${event.recordId}`);
}
// --- Endpoint: verify, ack fast, decide async ---
const seen = new Set<string>(); // swap for Redis/DB in production
app.post('/webhooks/centrali', express.raw({ type: 'application/json' }), (req, res) => {
if (!verify(req.headers, req.body, process.env.WEBHOOK_SECRET!)) {
return res.status(401).send('Invalid signature');
}
// Idempotency: Centrali delivers at least once. Dedupe on webhook-id.
const id = String(req.headers['webhook-id'] ?? req.headers['centrali-id']);
if (seen.has(id)) return res.status(200).send('Duplicate ignored');
seen.add(id);
const event = JSON.parse(req.body.toString('utf8')) as CentraliEvent;
// Acknowledge immediately; never block delivery on a model call.
res.status(200).send('OK');
// Run the decision out of band. A real deployment enqueues here (BullMQ, SQS, …)
// so a crash mid-decision is retried without re-fetching from Centrali.
decide(event).catch((err) => console.error('[agent] decide failed', err));
});
app.listen(3000, () => console.log('Agent listener on :3000'));
Production Notes¶
- Acknowledge before you think. Return
200once the event is verified and durably enqueued. A model call inside the request handler will eventually exceed Centrali's delivery timeout and cause needless retries. - Dedupe on
webhook-id. Delivery is at-least-once; the same event can arrive more than once (notably after a retry or manual replay). The example uses an in-memorySet— use Redis or a unique DB constraint in production. - Survive downtime with replay. If the agent is offline, deliveries fail and can be replayed from the console once it recovers — you don't lose events. This is the durability advantage over SSE.
- Pin the payload version. Each delivery carries a
Centrali-Payload-Versionheader naming the contract that built it. Branch on it if you ever support more than one version of your handler. - Keep the model on the hot path small. Filter cheaply (event type, collection, thresholds) before invoking the model, so cost and latency scale with decisions made, not events received.
Related¶
- Outbound Webhooks — the full delivery contract, signature details, retries, and DLQ
- MCP Server — let an agent pull workspace state on demand
- Realtime Quickstart — low-latency SSE for online agents