Run Pipeline Definitions CLI Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Extend run_pipeline.py to support definition-based pipelines via --definition and --config flags.
Architecture: Add two new arguments to the existing command. Selection logic determines which orchestrator to use based on flags. Reuse existing payload handling; add definition-specific dry-run and result display methods.
Tech Stack: Django management commands, DefinitionBasedOrchestrator, PipelineDefinition model
Task 1: Add Tests for Definition Flag Validation
Files:
- Modify:
apps/orchestration/tests/test_run_pipeline_command.py
Step 1: Write failing tests for new argument validation
Add to test_run_pipeline_command.py:
def test_run_pipeline_definition_not_found(self):
out = io.StringIO()
with self.assertRaises(CommandError) as ctx:
call_command("run_pipeline", "--definition", "nonexistent", stdout=out)
self.assertIn("Pipeline definition not found", str(ctx.exception))
def test_run_pipeline_config_file_not_found(self):
out = io.StringIO()
with self.assertRaises(CommandError) as ctx:
call_command("run_pipeline", "--config", "missing.json", stdout=out)
self.assertIn("Config file not found", str(ctx.exception))
def test_run_pipeline_definition_and_config_mutually_exclusive(self):
out = io.StringIO()
with self.assertRaises(CommandError) as ctx:
call_command(
"run_pipeline",
"--definition", "test",
"--config", "test.json",
stdout=out
)
self.assertIn("Cannot specify both", str(ctx.exception))
Step 2: Run tests to verify they fail
Run: pytest apps/orchestration/tests/test_run_pipeline_command.py -v -k "definition or config" Expected: FAIL (unrecognized arguments)
Step 3: Commit failing tests
git add apps/orchestration/tests/test_run_pipeline_command.py
git commit -m "test: add failing tests for definition flag validation"
Task 2: Add New Command Arguments
Files:
- Modify:
apps/orchestration/management/commands/run_pipeline.py:34-87
Step 1: Add --definition and --config arguments
In add_arguments method, add after --json:
parser.add_argument(
"--definition",
type=str,
help="Name of a PipelineDefinition to run (from database)",
)
parser.add_argument(
"--config",
type=str,
help="Path to JSON file containing pipeline definition config",
)
Step 2: Run tests to check arguments are recognized
Run: pytest apps/orchestration/tests/test_run_pipeline_command.py -v -k "definition or config" Expected: FAIL (but now with different error - missing validation logic)
Step 3: Commit
git add apps/orchestration/management/commands/run_pipeline.py
git commit -m "feat: add --definition and --config arguments to run_pipeline"
Task 3: Add Mutual Exclusivity Validation
Files:
- Modify:
apps/orchestration/management/commands/run_pipeline.py:89-119
Step 1: Add validation in handle() method
At the start of handle() method, before payload = self._get_payload(options):
# Validate mutually exclusive definition options
if options.get("definition") and options.get("config"):
raise CommandError("Cannot specify both --definition and --config")
Step 2: Run mutual exclusivity test
Run: pytest apps/orchestration/tests/test_run_pipeline_command.py::RunPipelineCommandTest::test_run_pipeline_definition_and_config_mutually_exclusive -v Expected: PASS
Step 3: Commit
git add apps/orchestration/management/commands/run_pipeline.py
git commit -m "feat: add mutual exclusivity validation for definition flags"
Task 4: Add Definition Loading Logic
Files:
- Modify:
apps/orchestration/management/commands/run_pipeline.py
Step 1: Add imports at top of file
from apps.orchestration.definition_orchestrator import DefinitionBasedOrchestrator
from apps.orchestration.models import PipelineDefinition
Step 2: Add _get_definition() method
Add after _get_sample_payload() method:
def _get_definition(self, options) -> tuple[PipelineDefinition | None, dict | None]:
"""
Load pipeline definition from database or config file.
Returns:
Tuple of (PipelineDefinition or None, config_path or None)
"""
if options.get("definition"):
name = options["definition"]
try:
definition = PipelineDefinition.objects.get(name=name)
return definition, None
except PipelineDefinition.DoesNotExist:
raise CommandError(f"Pipeline definition not found: {name}")
if options.get("config"):
config_path = options["config"]
try:
with open(config_path) as f:
config = json.load(f)
except FileNotFoundError:
raise CommandError(f"Config file not found: {config_path}")
except json.JSONDecodeError as e:
raise CommandError(f"Invalid JSON in config file: {e}")
# Create an unsaved PipelineDefinition for execution
definition = PipelineDefinition(
name=f"__adhoc__{config_path}",
config=config,
)
return definition, config_path
return None, None
Step 3: Run definition loading tests
Run: pytest apps/orchestration/tests/test_run_pipeline_command.py -v -k "definition_not_found or config_file_not_found" Expected: PASS
Step 4: Commit
git add apps/orchestration/management/commands/run_pipeline.py
git commit -m "feat: add definition loading from database and config file"
Task 5: Add Tests for Definition Dry-Run
Files:
- Modify:
apps/orchestration/tests/test_run_pipeline_command.py
Step 1: Write failing test for definition dry-run
def test_run_pipeline_definition_dry_run(self):
from apps.orchestration.models import PipelineDefinition
PipelineDefinition.objects.create(
name="test-pipeline",
config={
"version": "1.0",
"nodes": [
{"id": "ctx", "type": "context", "config": {"include": ["cpu"]}, "next": "notify"},
{"id": "notify", "type": "notify", "config": {"driver": "slack"}},
]
}
)
out = io.StringIO()
call_command("run_pipeline", "--definition", "test-pipeline", "--dry-run", stdout=out)
output = out.getvalue()
self.assertIn("=== DRY RUN ===", output)
self.assertIn("Pipeline Definition: test-pipeline", output)
self.assertIn("[context] ctx", output)
self.assertIn("[notify] notify", output)
Step 2: Run test to verify it fails
Run: pytest apps/orchestration/tests/test_run_pipeline_command.py::RunPipelineCommandTest::test_run_pipeline_definition_dry_run -v Expected: FAIL
Step 3: Commit
git add apps/orchestration/tests/test_run_pipeline_command.py
git commit -m "test: add failing test for definition dry-run output"
Task 6: Implement Definition Dry-Run Display
Files:
- Modify:
apps/orchestration/management/commands/run_pipeline.py
Step 1: Add _show_definition_dry_run() method
Add after _show_dry_run() method:
def _show_definition_dry_run(
self,
definition: "PipelineDefinition",
payload: dict,
options: dict,
config_path: str | None = None,
):
"""Display what would happen in a definition-based dry run."""
self.stdout.write(self.style.WARNING("=== DRY RUN ==="))
self.stdout.write("")
if config_path:
self.stdout.write(f"Pipeline Config: {config_path}")
else:
self.stdout.write(f"Pipeline Definition: {definition.name}")
self.stdout.write(f"Source: {options['source']}")
self.stdout.write(f"Environment: {options['environment']}")
self.stdout.write("")
nodes = definition.get_nodes()
self.stdout.write(f"Nodes ({len(nodes)}):")
for i, node in enumerate(nodes, 1):
node_id = node.get("id", f"node_{i}")
node_type = node.get("type", "unknown")
node_config = node.get("config", {})
next_node = node.get("next")
self.stdout.write(f" {i}. [{node_type}] {node_id}")
if node_config:
self.stdout.write(f" Config: {json.dumps(node_config)}")
if next_node:
self.stdout.write(f" → next: {next_node}")
else:
self.stdout.write(" → end")
self.stdout.write("")
self.stdout.write("Payload:")
self.stdout.write(json.dumps(payload, indent=2))
self.stdout.write("")
self.stdout.write(self.style.SUCCESS("Use without --dry-run to execute"))
Step 2: Update handle() to use definition dry-run
Update the dry-run section in handle():
if options["dry_run"]:
definition, config_path = self._get_definition(options)
if definition:
self._show_definition_dry_run(definition, payload, options, config_path)
else:
self._show_dry_run(payload, options)
return
Step 3: Move definition loading before payload
Reorder in handle() so definition is available for dry-run:
def handle(self, *args, **options):
# Validate mutually exclusive definition options
if options.get("definition") and options.get("config"):
raise CommandError("Cannot specify both --definition and --config")
# Load definition (if specified)
definition, config_path = self._get_definition(options)
# Build payload
payload = self._get_payload(options, definition)
if options["dry_run"]:
if definition:
self._show_definition_dry_run(definition, payload, options, config_path)
else:
self._show_dry_run(payload, options)
return
# ... rest of handle()
Step 4: Update _get_payload() signature
Update to accept optional definition:
def _get_payload(self, options, definition: "PipelineDefinition | None" = None) -> dict:
And update the validation at the end:
elif options["checks_only"]:
inner_payload = {}
elif definition:
# Definition-based pipelines can run without explicit payload
inner_payload = {}
else:
raise CommandError("Must specify --sample, --payload, --file, --checks-only, --definition, or --config")
Step 5: Run dry-run test
Run: pytest apps/orchestration/tests/test_run_pipeline_command.py::RunPipelineCommandTest::test_run_pipeline_definition_dry_run -v Expected: PASS
Step 6: Commit
git add apps/orchestration/management/commands/run_pipeline.py
git commit -m "feat: implement definition dry-run display"
Task 7: Add Tests for Definition Execution
Files:
- Modify:
apps/orchestration/tests/test_run_pipeline_command.py
Step 1: Write failing test for definition execution
@mock.patch("apps.orchestration.definition_orchestrator.DefinitionBasedOrchestrator.execute")
def test_run_pipeline_with_definition(self, mock_execute):
from apps.orchestration.models import PipelineDefinition
PipelineDefinition.objects.create(
name="test-exec-pipeline",
config={
"version": "1.0",
"nodes": [
{"id": "notify", "type": "notify", "config": {"driver": "slack"}},
]
}
)
mock_execute.return_value = {
"trace_id": "trace-456",
"run_id": "run-456",
"definition": "test-exec-pipeline",
"definition_version": 1,
"status": "completed",
"executed_nodes": ["notify"],
"skipped_nodes": [],
"node_results": {
"notify": {"node_id": "notify", "node_type": "notify", "duration_ms": 50}
},
"duration_ms": 100.0,
"error": None,
}
out = io.StringIO()
call_command("run_pipeline", "--definition", "test-exec-pipeline", stdout=out)
output = out.getvalue()
self.assertIn("PIPELINE RESULT", output)
self.assertIn("Definition: test-exec-pipeline", output)
self.assertIn("completed", output.lower())
Step 2: Run test to verify it fails
Run: pytest apps/orchestration/tests/test_run_pipeline_command.py::RunPipelineCommandTest::test_run_pipeline_with_definition -v Expected: FAIL
Step 3: Commit
git add apps/orchestration/tests/test_run_pipeline_command.py
git commit -m "test: add failing test for definition execution"
Task 8: Implement Definition Execution
Files:
- Modify:
apps/orchestration/management/commands/run_pipeline.py
Step 1: Add definition execution path in handle()
Replace the orchestrator section in handle():
# Run pipeline
self.stdout.write(self.style.NOTICE("Starting pipeline..."))
self.stdout.write(f" Source: {options['source']}")
self.stdout.write(f" Environment: {options['environment']}")
if definition:
self.stdout.write(f" Definition: {definition.name}")
self.stdout.write("")
try:
if definition:
# Definition-based execution
orchestrator = DefinitionBasedOrchestrator(definition)
# Validate before execution
errors = orchestrator.validate()
if errors:
raise CommandError(
"Pipeline definition invalid:\n - " + "\n - ".join(errors)
)
result = orchestrator.execute(
payload=payload.get("payload", {}),
source=options["source"],
trace_id=options.get("trace_id"),
environment=options["environment"],
)
if options["json"]:
self.stdout.write(json.dumps(result, indent=2, default=str))
else:
self._display_definition_result(result, definition, config_path)
else:
# Hardcoded pipeline execution
orchestrator = PipelineOrchestrator()
result = orchestrator.run_pipeline(
payload=payload,
source=options["source"],
trace_id=options.get("trace_id"),
environment=options["environment"],
)
if options["json"]:
self.stdout.write(json.dumps(result.to_dict(), indent=2, default=str))
else:
self._display_result(result)
except CommandError:
raise
except Exception as e:
raise CommandError(f"Pipeline failed: {e}")
Step 2: Make config_path available in execution section
Ensure config_path is defined at the class level or passed through. It’s already available from _get_definition() call earlier.
Step 3: Run test (will still fail - missing display method)
Run: pytest apps/orchestration/tests/test_run_pipeline_command.py::RunPipelineCommandTest::test_run_pipeline_with_definition -v Expected: FAIL (AttributeError: _display_definition_result)
Step 4: Commit partial implementation
git add apps/orchestration/management/commands/run_pipeline.py
git commit -m "feat: add definition-based execution path"
Task 9: Implement Definition Result Display
Files:
- Modify:
apps/orchestration/management/commands/run_pipeline.py
Step 1: Add _display_definition_result() method
Add after _display_result() method:
def _display_definition_result(
self,
result: dict,
definition: "PipelineDefinition",
config_path: str | None = None,
):
"""Display definition-based pipeline result in human-readable format."""
self.stdout.write("")
self.stdout.write("=" * 60)
self.stdout.write(self.style.HTTP_INFO("PIPELINE RESULT"))
self.stdout.write("=" * 60)
self.stdout.write("")
# Overall status
status = result.get("status", "unknown")
if status == "completed":
self.stdout.write(self.style.SUCCESS(f"Status: {status}"))
else:
self.stdout.write(self.style.ERROR(f"Status: {status}"))
if config_path:
self.stdout.write(f"Config: {config_path}")
else:
self.stdout.write(f"Definition: {result.get('definition', definition.name)}")
self.stdout.write(f"Trace ID: {result.get('trace_id', 'N/A')}")
self.stdout.write(f"Run ID: {result.get('run_id', 'N/A')}")
self.stdout.write(f"Duration: {result.get('duration_ms', 0):.2f}ms")
self.stdout.write("")
# Node results
node_results = result.get("node_results", {})
nodes = definition.get_nodes()
for node in nodes:
node_id = node.get("id")
node_type = node.get("type")
self.stdout.write(f"--- {node_type} ({node_id}) ---")
if node_id in result.get("skipped_nodes", []):
self.stdout.write(self.style.WARNING(" (skipped)"))
elif node_id in node_results:
node_result = node_results[node_id]
# Show key info based on node type
if node_type == "context":
self.stdout.write(f" Checks run: {node_result.get('checks_run', 'N/A')}")
elif node_type == "intelligence":
summary = node_result.get("summary", node_result.get("output", {}).get("summary", "N/A"))
if isinstance(summary, str) and len(summary) > 100:
summary = summary[:100] + "..."
self.stdout.write(f" Summary: {summary}")
provider = node_result.get("provider", node_result.get("output", {}).get("provider"))
if provider:
self.stdout.write(f" Provider: {provider}")
elif node_type == "notify":
driver = node.get("config", {}).get("driver", "unknown")
self.stdout.write(f" Driver: {driver}")
self.stdout.write(f" Channels attempted: {node_result.get('channels_attempted', 'N/A')}")
self.stdout.write(f" Succeeded: {node_result.get('channels_succeeded', 'N/A')}")
elif node_type == "ingest":
self.stdout.write(f" Incident ID: {node_result.get('incident_id', 'N/A')}")
self.stdout.write(f" Alerts created: {node_result.get('alerts_created', 'N/A')}")
# Show errors if any
errors = node_result.get("errors", [])
if errors:
self.stdout.write(self.style.ERROR(f" Errors: {errors}"))
duration = node_result.get("duration_ms", 0)
self.stdout.write(f" Duration: {duration:.2f}ms")
else:
self.stdout.write(self.style.WARNING(" (not executed)"))
self.stdout.write("")
# Final summary
if status == "completed":
self.stdout.write(self.style.SUCCESS("✓ Pipeline completed successfully"))
else:
self.stdout.write(self.style.ERROR(f"✗ Pipeline failed: {status}"))
error = result.get("error")
if error:
self.stdout.write(self.style.ERROR(f" {error}"))
Step 2: Run execution test
Run: pytest apps/orchestration/tests/test_run_pipeline_command.py::RunPipelineCommandTest::test_run_pipeline_with_definition -v Expected: PASS
Step 3: Commit
git add apps/orchestration/management/commands/run_pipeline.py
git commit -m "feat: implement definition result display"
Task 10: Add Test for Config File Execution
Files:
- Modify:
apps/orchestration/tests/test_run_pipeline_command.py
Step 1: Write test for config file execution
@mock.patch("apps.orchestration.definition_orchestrator.DefinitionBasedOrchestrator.execute")
def test_run_pipeline_with_config_file(self, mock_execute):
import tempfile
import os
config = {
"version": "1.0",
"nodes": [
{"id": "notify", "type": "notify", "config": {"driver": "slack"}},
]
}
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(config, f)
config_path = f.name
try:
mock_execute.return_value = {
"trace_id": "trace-789",
"run_id": "run-789",
"definition": f"__adhoc__{config_path}",
"definition_version": 1,
"status": "completed",
"executed_nodes": ["notify"],
"skipped_nodes": [],
"node_results": {},
"duration_ms": 50.0,
"error": None,
}
out = io.StringIO()
call_command("run_pipeline", "--config", config_path, stdout=out)
output = out.getvalue()
self.assertIn("PIPELINE RESULT", output)
self.assertIn("completed", output.lower())
finally:
os.unlink(config_path)
Step 2: Run test
Run: pytest apps/orchestration/tests/test_run_pipeline_command.py::RunPipelineCommandTest::test_run_pipeline_with_config_file -v Expected: PASS
Step 3: Commit
git add apps/orchestration/tests/test_run_pipeline_command.py
git commit -m "test: add test for config file execution"
Task 11: Add Test for Invalid Config File
Files:
- Modify:
apps/orchestration/tests/test_run_pipeline_command.py
Step 1: Write test for invalid JSON in config file
def test_run_pipeline_invalid_config_json(self):
import tempfile
import os
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
f.write("{invalid json content")
config_path = f.name
try:
out = io.StringIO()
with self.assertRaises(CommandError) as ctx:
call_command("run_pipeline", "--config", config_path, stdout=out)
self.assertIn("Invalid JSON in config file", str(ctx.exception))
finally:
os.unlink(config_path)
Step 2: Run test
Run: pytest apps/orchestration/tests/test_run_pipeline_command.py::RunPipelineCommandTest::test_run_pipeline_invalid_config_json -v Expected: PASS
Step 3: Commit
git add apps/orchestration/tests/test_run_pipeline_command.py
git commit -m "test: add test for invalid config file JSON"
Task 12: Add Test for Definition Validation Errors
Files:
- Modify:
apps/orchestration/tests/test_run_pipeline_command.py
Step 1: Write test for validation errors
def test_run_pipeline_definition_validation_error(self):
from apps.orchestration.models import PipelineDefinition
PipelineDefinition.objects.create(
name="invalid-pipeline",
config={
# Missing version and has invalid node type
"nodes": [
{"id": "bad", "type": "nonexistent_type"},
]
}
)
out = io.StringIO()
with self.assertRaises(CommandError) as ctx:
call_command("run_pipeline", "--definition", "invalid-pipeline", stdout=out)
self.assertIn("Pipeline definition invalid", str(ctx.exception))
Step 2: Run test
Run: pytest apps/orchestration/tests/test_run_pipeline_command.py::RunPipelineCommandTest::test_run_pipeline_definition_validation_error -v Expected: PASS
Step 3: Commit
git add apps/orchestration/tests/test_run_pipeline_command.py
git commit -m "test: add test for definition validation errors"
Task 13: Update Command Docstring
Files:
- Modify:
apps/orchestration/management/commands/run_pipeline.py:1-22
Step 1: Update module docstring
Replace the docstring at the top of the file:
"""
Management command to run the pipeline end-to-end.
Usage:
# Run with sample alert payload (hardcoded pipeline)
python manage.py run_pipeline --sample
# Run with custom JSON payload
python manage.py run_pipeline --payload '{"alerts": [...]}'
# Run with payload from file
python manage.py run_pipeline --file alert.json
# Run with specific source
python manage.py run_pipeline --sample --source grafana
# Run checks only (no alert ingestion)
python manage.py run_pipeline --checks-only
# Dry run (show what would happen)
python manage.py run_pipeline --sample --dry-run
# Run a pipeline definition from database
python manage.py run_pipeline --definition my-pipeline
# Run a pipeline definition with payload
python manage.py run_pipeline --definition my-pipeline --payload '{"server": "web-01"}'
# Run from a JSON config file
python manage.py run_pipeline --config ./pipelines/custom.json
# Dry run a definition
python manage.py run_pipeline --definition my-pipeline --dry-run
"""
Step 2: Update help text
Update the help attribute on the Command class:
help = "Run a pipeline: hardcoded (alerts → checkers → intelligence → notify) or definition-based"
Step 3: Commit
git add apps/orchestration/management/commands/run_pipeline.py
git commit -m "docs: update run_pipeline command docstring and help"
Task 14: Run Full Test Suite
Step 1: Run all run_pipeline tests
Run: pytest apps/orchestration/tests/test_run_pipeline_command.py -v Expected: All tests PASS
Step 2: Run full orchestration tests
Run: pytest apps/orchestration/ -v Expected: All tests PASS
Step 3: Final commit if any cleanup needed
git status
# If clean, no commit needed
Task 15: Manual Verification
Step 1: Test dry-run with sample definition
Create a test definition in Django shell:
python manage.py shell -c "
from apps.orchestration.models import PipelineDefinition
PipelineDefinition.objects.get_or_create(
name='cli-test',
defaults={
'config': {
'version': '1.0',
'nodes': [
{'id': 'ctx', 'type': 'context', 'config': {'include': ['cpu']}, 'next': 'notify'},
{'id': 'notify', 'type': 'notify', 'config': {'driver': 'generic'}},
]
}
}
)
print('Created cli-test definition')
"
Step 2: Test dry-run
Run: python manage.py run_pipeline --definition cli-test --dry-run
Verify output shows:
- Pipeline Definition: cli-test
- Nodes (2):
- [context] ctx
- [notify] notify
Step 3: Test with JSON config file
Create a test config file and run dry-run:
echo '{"version": "1.0", "nodes": [{"id": "n", "type": "notify", "config": {"driver": "generic"}}]}' > /tmp/test-pipeline.json
python manage.py run_pipeline --config /tmp/test-pipeline.json --dry-run