Reactive observers
Pattern: entities that watch other entities and react to changes.
Source: examples/durable-agents-playground/src/reactive/
Core mechanism
An entity calls ctx.observe(entity(entityUrl), { wake: { on: 'change', collections: [...] } }) to start watching another entity. The entity() helper (imported from @durable-streams/darix-runtime) wraps a raw URL into the correct observe target. When the observed entity has new activity in the specified collections, the observer is woken.
Monitor example
The monitor watches multiple entities and reports status changes.
Source: examples/durable-agents-playground/src/reactive/monitor.ts
export function registerMonitor(registry: EntityRegistry) {
registry.define(`monitor`, {
description: `Health dashboard agent that watches multiple entities and reports status changes and anomalies`,
state: {
status: { primaryKey: `key` },
},
async handler(ctx) {
if (ctx.firstWake) {
ctx.db.actions.status_insert({ row: { key: `current`, value: `idle` } })
}
const baseObserveTool = createObserveTool(ctx)
const observeTool = {
...baseObserveTool,
execute: async (toolCallId: string, params: unknown) => {
ctx.db.actions.status_update({
key: `current`,
updater: (draft) => {
draft.value = `observing`
},
})
return baseObserveTool.execute(toolCallId, params)
},
}
ctx.useAgent({
systemPrompt: MONITOR_SYSTEM_PROMPT,
model: `claude-sonnet-4-5-20250929`,
tools: [...ctx.darixTools, observeTool],
})
await ctx.agent.run()
},
})
}The monitor wraps the base observe tool to also transition its own state to observing.
The observe tool
The observe_entity tool lets the LLM decide what to watch:
import { entity } from "@durable-streams/darix-runtime"
export function createObserveTool(ctx: HandlerContext): AgentTool {
return {
name: `observe_entity`,
label: `Observe Entity`,
description: `Start observing another entity by its URL. The current entity will receive observation_update messages when the observed entity has new activity.`,
parameters: Type.Object({
entity_url: Type.String({
description: `The URL of the entity to observe`,
}),
collections: Type.Optional(
Type.Array(Type.String(), {
description: `Which collections to watch (default: all). Options: texts, textDeltas, runs, toolCalls, childStatus`,
})
),
}),
execute: async (_toolCallId, params) => {
const { entity_url, collections } = params as {
entity_url: string
collections?: string[]
}
try {
await ctx.observe(entity(entity_url), {
wake: { on: `change`, collections },
})
return {
content: [
{
type: `text`,
text: `Now observing entity: ${entity_url}. You will receive observation_update messages when new activity is detected.`,
},
],
details: {},
}
} catch (err) {
return {
content: [
{
type: `text`,
text: `Error observing entity: ${err instanceof Error ? err.message : `Unknown error`}`,
},
],
details: {},
}
}
},
}
}Other reactive examples
- Summarizer (
reactive/summarizer.ts) -- observes an entity'stextsandtextDeltascollections and produces progressive summaries of its output. - Guardian (
reactive/guardian.ts) -- observes an entity'stextsandtoolCallscollections and evaluates output quality, checking for hallucination signals, safety issues, and formatting problems.
All three follow the same structure: register an entity, wrap createObserveTool with a state transition, configure the agent with the observe tool, and run.