Real Node Handlers for Definition-Based Pipeline
Date: 2026-02-22 Status: Completed (implemented 2026-02-22)
Overview
Replace the stub ContextNodeHandler and NotifyNodeHandler with real implementations that call actual system checkers and notification drivers. This makes any PipelineDefinition (including local-monitor from setup_instance) fully executable.
Scope
Two node handlers need real implementations:
| Node | Current State | Target State |
|---|---|---|
context | Returns fake data {"cpu": {"load": 0.5}} | Runs real checkers via CHECKER_REGISTRY |
notify | Returns {"delivered": True} without sending | Sends via NotifySelector + driver.send() |
Two nodes are already functional and unchanged: ingest, intelligence.
Context Node Handler
Config: {"checker_names": ["cpu", "memory", "disk"]} (optional)
If checker_names is missing/empty, use get_enabled_checkers() which respects CHECKERS_SKIP.
Execution flow:
- Resolve checker names from config or
get_enabled_checkers() - For each checker: instantiate class, call
.check(), catch exceptions - Count passed (OK) vs failed (WARNING/CRITICAL/UNKNOWN)
- Return all results
Output:
{
"checks_run": 3,
"checks_passed": 2,
"checks_failed": 1,
"results": {
"cpu": {"status": "ok", "message": "CPU usage: 12%", "metrics": {"usage_percent": 12.0}},
"memory": {"status": "warning", "message": "...", "metrics": {...}},
}
}
Errors from individual checkers don’t fail the node — they’re recorded in results with status "unknown" and an error message. The node only adds to result.errors if zero checkers could run.
validate_config: No required fields (empty config = run all enabled checkers).
Notify Node Handler
Config: {"drivers": ["slack", "email"]} (list of driver types to send to)
Queries NotificationChannel.objects.filter(driver__in=drivers, is_active=True) to find channels created by setup_instance or manually.
Execution flow:
- Get
driverslist from config - Query active
NotificationChannelrecords matching those driver types - Build a
NotificationMessagefrom previous node outputs (title, message body, severity) - For each channel: instantiate driver via
DRIVER_REGISTRY, callvalidate_config(), calldriver.send(message, channel.config) - Track attempted/succeeded/failed counts
Message building:
- Title: derived from checker results or “Pipeline Notification”
- Body: markdown summary of all
previous_outputs(checker results, intelligence, etc.) - Severity: highest severity from checker results, or “info” if no checkers ran
Output:
{
"channels_attempted": 2,
"channels_succeeded": 1,
"channels_failed": 1,
"deliveries": [
{"driver": "slack", "channel": "ops-slack", "status": "success", "message_id": "..."},
{"driver": "email", "channel": "ops-email", "status": "failed", "error": "..."},
]
}
Fallback: If no matching NotificationChannel records exist, fall back to NotifySelector.resolve(None) which picks the first active channel.
validate_config: Requires drivers (list) with at least one entry, OR a single driver string (backwards compat with existing tests/configs).
Config Compatibility
setup_instance writes {"drivers": ["slack"]} (plural). The old stub expected {"driver": "slack"} (singular). The new handler accepts both:
drivers(list) — preferreddriver(string) — normalized to[driver]
Testing
- Mock
CHECKER_REGISTRYto use fake checkers for context node tests - Mock
NotificationChannel.objectsand driver.send()for notify node tests - Integration: run
run_pipeline --definition local-monitorend-to-end with mocked drivers
Files
apps/orchestration/nodes/context.py # Replace stub
apps/orchestration/nodes/notify.py # Replace stub
apps/orchestration/nodes/_tests/ # Tests (new directory or existing)