Run Pipeline Definitions via CLI
Overview
Extend run_pipeline.py to support definition-based pipelines, allowing execution of PipelineDefinition records or ad-hoc JSON config files from the command line.
Command Interface
# Run a definition from the database
python manage.py run_pipeline --definition health-check --payload '{"server": "web-01"}'
# Run from a JSON config file
python manage.py run_pipeline --config ./pipelines/custom.json --sample
# Existing hardcoded pipeline still works
python manage.py run_pipeline --sample --source grafana
New Arguments
| Argument | Type | Description |
|---|---|---|
--definition | string | Name of a PipelineDefinition in the database |
--config | path | Path to a JSON file containing pipeline config |
Selection Logic
- If
--definitionprovided → load from database, useDefinitionBasedOrchestrator - Else if
--configprovided → load JSON file, useDefinitionBasedOrchestrator - Else → use existing
PipelineOrchestrator(hardcoded)
Constraints
--definitionand--configare mutually exclusive- All existing payload flags (
--sample,--payload,--file) work with both orchestrators
Dry-Run Output
For definition-based pipelines:
=== DRY RUN ===
Pipeline Definition: health-check
Source: cli
Environment: development
Nodes (3):
1. [context] metrics
Config: {"include": ["cpu", "memory", "disk"]}
→ next: analyze
2. [intelligence] analyze
Config: {"provider": "openai"}
→ next: notify
3. [notify] notify
Config: {"driver": "slack"}
→ end
Payload:
{
"server": "web-01"
}
Use without --dry-run to execute
For ad-hoc JSON configs, shows "Pipeline Config: <filepath>" instead of definition name.
Result Display
============================================================
PIPELINE RESULT
============================================================
Status: COMPLETED
Definition: health-check
Trace ID: abc-123
Run ID: 42
Duration: 1523.45ms
--- context (metrics) ---
Checks run: 3
Duration: 234.12ms
--- intelligence (analyze) ---
Summary: System healthy, minor CPU spike detected...
Provider: openai
Duration: 1105.67ms
--- notify (notify) ---
Driver: slack
Channels attempted: 1
Succeeded: 1
Duration: 183.66ms
✓ Pipeline completed successfully
The --json flag outputs raw PipelineRun result as JSON.
Error Handling
Validation Errors
# Definition not found
CommandError: Pipeline definition not found: nonexistent
# Invalid JSON file
CommandError: Invalid JSON in config file: ...
# Config file not found
CommandError: Config file not found: missing.json
# Both flags provided
CommandError: Cannot specify both --definition and --config
Pipeline Validation
Before execution, call orchestrator.validate(). On failure:
CommandError: Pipeline definition invalid:
- Node 'analyze' has unknown type: ai
- Node 'notify' references non-existent next: 'missing'
Runtime Errors
Wrapped as CommandError: Pipeline failed: <message>
Implementation Notes
Files to Modify
apps/orchestration/management/commands/run_pipeline.py- Add new arguments and orchestrator selection logic
Key Changes
- Add
--definitionand--configarguments - Add
_get_definition()method to load definition from database or file - Add
_show_definition_dry_run()for definition-specific dry-run output - Add
_display_definition_result()for definition-specific result display - Update
handle()to select orchestrator based on flags