Pipeline Inspector Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Extract pipeline display logic into a reusable service layer and expose it via a new show_pipeline management command, then integrate into all CLI consumers.

Architecture: PipelineInspector service in apps/orchestration/services.py with PipelineDetail dataclass. A thin show_pipeline management command wraps it for CLI/bash access. Existing commands (setup_instance, monitor_pipeline) and bash scripts (cli.sh, install.sh) consume the service.

Tech Stack: Django ORM, Python dataclasses, Django management commands, bash


Task 1: PipelineDetail dataclass and PipelineInspector.list_all

Files:

  • Create: apps/orchestration/services.py
  • Create: apps/orchestration/_tests/test_services.py

Step 1: Write the failing tests

"""Tests for pipeline inspector service."""

from dataclasses import asdict
from io import StringIO

from django.core.management.base import BaseCommand
from django.test import TestCase

from apps.notify.models import NotificationChannel
from apps.orchestration.models import PipelineDefinition
from apps.orchestration.services import PipelineDetail, PipelineInspector


class PipelineDetailTests(TestCase):
    """Tests for PipelineDetail dataclass."""

    def test_to_dict_returns_all_fields(self):
        detail = PipelineDetail(
            name="full",
            description="Full pipeline",
            flow=["ingest", "check", "analyze", "notify"],
            checkers=["cpu", "memory"],
            intelligence="openai",
            notify_drivers=["slack"],
            channels=[{"name": "ops-slack", "driver": "slack"}],
            created_at="2026-02-28 14:30",
            is_active=True,
        )
        d = detail.to_dict()
        assert d["name"] == "full"
        assert d["flow"] == ["ingest", "check", "analyze", "notify"]
        assert d["intelligence"] == "openai"
        assert d["is_active"] is True

    def test_to_dict_with_none_intelligence(self):
        detail = PipelineDetail(
            name="direct",
            description="Direct",
            flow=["ingest", "notify"],
            checkers=[],
            intelligence=None,
            notify_drivers=["slack"],
            channels=[],
            created_at="2026-02-28 14:30",
            is_active=True,
        )
        d = detail.to_dict()
        assert d["intelligence"] is None
        assert d["checkers"] == []


class ListAllTests(TestCase):
    """Tests for PipelineInspector.list_all."""

    def test_returns_empty_list_when_no_pipelines(self):
        result = PipelineInspector.list_all()
        assert result == []

    def test_returns_active_pipelines(self):
        PipelineDefinition.objects.create(
            name="full",
            config={
                "version": "1.0",
                "nodes": [
                    {
                        "id": "check_health",
                        "type": "context",
                        "config": {"checker_names": ["cpu", "memory"]},
                        "next": "analyze_incident",
                    },
                    {
                        "id": "analyze_incident",
                        "type": "intelligence",
                        "config": {"provider": "openai"},
                        "next": "notify_channels",
                    },
                    {
                        "id": "notify_channels",
                        "type": "notify",
                        "config": {"drivers": ["slack"]},
                    },
                ],
            },
            created_by="setup_instance",
        )
        result = PipelineInspector.list_all()
        assert len(result) == 1
        detail = result[0]
        assert detail.name == "full"
        assert detail.flow == ["check_health", "analyze_incident", "notify_channels"]
        assert detail.checkers == ["cpu", "memory"]
        assert detail.intelligence == "openai"
        assert detail.notify_drivers == ["slack"]
        assert detail.is_active is True

    def test_excludes_inactive_by_default(self):
        PipelineDefinition.objects.create(
            name="old",
            config={"version": "1.0", "nodes": []},
            is_active=False,
        )
        result = PipelineInspector.list_all()
        assert result == []

    def test_includes_inactive_when_requested(self):
        PipelineDefinition.objects.create(
            name="old",
            config={"version": "1.0", "nodes": []},
            is_active=False,
        )
        result = PipelineInspector.list_all(active_only=False)
        assert len(result) == 1
        assert result[0].is_active is False

    def test_includes_linked_channels(self):
        PipelineDefinition.objects.create(
            name="direct",
            config={
                "version": "1.0",
                "nodes": [
                    {"id": "notify", "type": "notify", "config": {"drivers": ["slack"]}},
                ],
            },
            created_by="setup_instance",
        )
        NotificationChannel.objects.create(
            name="ops-slack",
            driver="slack",
            config={},
            description="[setup_wizard] slack channel",
        )
        result = PipelineInspector.list_all()
        assert len(result[0].channels) == 1
        assert result[0].channels[0]["name"] == "ops-slack"
        assert result[0].channels[0]["driver"] == "slack"

    def test_no_intelligence_when_absent(self):
        PipelineDefinition.objects.create(
            name="direct",
            config={
                "version": "1.0",
                "nodes": [
                    {"id": "ingest", "type": "ingest", "config": {}},
                    {"id": "notify", "type": "notify", "config": {"drivers": ["slack"]}},
                ],
            },
        )
        result = PipelineInspector.list_all()
        assert result[0].intelligence is None

    def test_empty_checkers_when_no_context_node(self):
        PipelineDefinition.objects.create(
            name="direct",
            config={
                "version": "1.0",
                "nodes": [
                    {"id": "notify", "type": "notify", "config": {"drivers": ["slack"]}},
                ],
            },
        )
        result = PipelineInspector.list_all()
        assert result[0].checkers == []

    def test_multiple_pipelines_sorted_by_name(self):
        PipelineDefinition.objects.create(
            name="beta", config={"version": "1.0", "nodes": []},
        )
        PipelineDefinition.objects.create(
            name="alpha", config={"version": "1.0", "nodes": []},
        )
        result = PipelineInspector.list_all()
        assert [d.name for d in result] == ["alpha", "beta"]

Step 2: Run tests to verify they fail

Run: uv run pytest apps/orchestration/_tests/test_services.py -v Expected: ImportError — apps.orchestration.services does not exist

Step 3: Write the implementation

"""
Pipeline inspection services.

Provides reusable data fetching and rendering for pipeline definitions,
used by management commands (show_pipeline, setup_instance, monitor_pipeline)
and bash scripts (cli.sh, install.sh).
"""

from dataclasses import asdict, dataclass, field

from apps.notify.models import NotificationChannel
from apps.orchestration.models import PipelineDefinition


@dataclass
class PipelineDetail:
    """Pipeline definition details as a plain data object."""

    name: str
    description: str
    flow: list[str]
    checkers: list[str]
    intelligence: str | None
    notify_drivers: list[str]
    channels: list[dict] = field(default_factory=list)
    created_at: str = ""
    is_active: bool = True

    def to_dict(self) -> dict:
        """Return a JSON-serializable dict."""
        return asdict(self)


class PipelineInspector:
    """Fetch and render pipeline definition details."""

    @staticmethod
    def _extract_detail(defn: PipelineDefinition) -> PipelineDetail:
        """Extract a PipelineDetail from a PipelineDefinition instance."""
        nodes = defn.get_nodes()

        flow = [n.get("id", n.get("type", "?")) for n in nodes]

        checkers = []
        intelligence = None
        notify_drivers = []

        for node in nodes:
            node_type = node.get("type", "?")
            node_config = node.get("config", {})
            if node_type == "context":
                checkers = node_config.get("checker_names", [])
            elif node_type == "intelligence":
                intelligence = node_config.get("provider")
            elif node_type == "notify":
                notify_drivers = node_config.get("drivers", [])

        # Linked wizard-created channels
        wizard_channels = NotificationChannel.objects.filter(
            description__startswith="[setup_wizard]", is_active=True
        )
        channels = [
            {"name": ch.name, "driver": ch.driver} for ch in wizard_channels
        ]

        return PipelineDetail(
            name=defn.name,
            description=defn.description,
            flow=flow,
            checkers=checkers,
            intelligence=intelligence,
            notify_drivers=notify_drivers,
            channels=channels,
            created_at=defn.created_at.strftime("%Y-%m-%d %H:%M"),
            is_active=defn.is_active,
        )

    @staticmethod
    def list_all(active_only: bool = True) -> list[PipelineDetail]:
        """Return details for all pipeline definitions."""
        qs = PipelineDefinition.objects.all()
        if active_only:
            qs = qs.filter(is_active=True)
        qs = qs.order_by("name")
        return [PipelineInspector._extract_detail(defn) for defn in qs]

    @staticmethod
    def get_by_name(name: str) -> PipelineDetail | None:
        """Return details for a specific pipeline by name, or None."""
        try:
            defn = PipelineDefinition.objects.get(name=name)
        except PipelineDefinition.DoesNotExist:
            return None
        return PipelineInspector._extract_detail(defn)

    @staticmethod
    def render_text(detail: PipelineDetail, stdout) -> None:
        """Write styled pipeline details to a Django command stdout."""
        from django.core.management.base import OutputWrapper
        from django.core.management.color import color_style

        style = color_style()

        stdout.write(style.HTTP_INFO(f'\n--- Pipeline: "{detail.name}" ---'))

        if not detail.is_active:
            stdout.write(style.WARNING("  (inactive)"))

        if detail.flow:
            chain = " \u2192 ".join(detail.flow)
            stdout.write(f"  Flow: {chain}")

        if detail.checkers:
            stdout.write(f"  Checkers: {', '.join(detail.checkers)}")

        if detail.intelligence:
            stdout.write(f"  Intelligence: {detail.intelligence}")

        if detail.notify_drivers:
            stdout.write(f"  Notify drivers: {', '.join(detail.notify_drivers)}")

        if detail.channels:
            stdout.write("  Channels:")
            for ch in detail.channels:
                stdout.write(f"    - {ch['name']} ({ch['driver']})")

        if detail.created_at:
            stdout.write(f"  Created: {detail.created_at}")

Step 4: Run tests to verify they pass

Run: uv run pytest apps/orchestration/_tests/test_services.py -v Expected: All pass

Step 5: Commit

git add apps/orchestration/services.py apps/orchestration/_tests/test_services.py
git commit -m "feat: add PipelineInspector service with list_all and get_by_name"

Task 2: PipelineInspector.get_by_name and render_text tests

Files:

  • Modify: apps/orchestration/_tests/test_services.py

Step 1: Add tests for get_by_name and render_text

Append to test_services.py:

class GetByNameTests(TestCase):
    """Tests for PipelineInspector.get_by_name."""

    def test_returns_detail_for_existing_pipeline(self):
        PipelineDefinition.objects.create(
            name="full",
            config={
                "version": "1.0",
                "nodes": [
                    {
                        "id": "check_health",
                        "type": "context",
                        "config": {"checker_names": ["cpu"]},
                    },
                ],
            },
        )
        detail = PipelineInspector.get_by_name("full")
        assert detail is not None
        assert detail.name == "full"
        assert detail.checkers == ["cpu"]

    def test_returns_none_for_missing_pipeline(self):
        result = PipelineInspector.get_by_name("nonexistent")
        assert result is None

    def test_returns_inactive_pipeline(self):
        PipelineDefinition.objects.create(
            name="old",
            config={"version": "1.0", "nodes": []},
            is_active=False,
        )
        detail = PipelineInspector.get_by_name("old")
        assert detail is not None
        assert detail.is_active is False


class RenderTextTests(TestCase):
    """Tests for PipelineInspector.render_text."""

    def test_renders_full_pipeline(self):
        detail = PipelineDetail(
            name="full",
            description="Full pipeline",
            flow=["ingest", "check", "analyze", "notify"],
            checkers=["cpu", "memory"],
            intelligence="openai",
            notify_drivers=["slack", "email"],
            channels=[{"name": "ops-slack", "driver": "slack"}],
            created_at="2026-02-28 14:30",
            is_active=True,
        )
        stdout = StringIO()
        PipelineInspector.render_text(detail, stdout)
        output = stdout.getvalue()
        assert '"full"' in output
        assert "ingest" in output
        assert "cpu, memory" in output
        assert "openai" in output
        assert "slack, email" in output
        assert "ops-slack (slack)" in output
        assert "2026-02-28 14:30" in output

    def test_renders_minimal_pipeline(self):
        detail = PipelineDetail(
            name="empty",
            description="",
            flow=[],
            checkers=[],
            intelligence=None,
            notify_drivers=[],
            channels=[],
            created_at="",
            is_active=True,
        )
        stdout = StringIO()
        PipelineInspector.render_text(detail, stdout)
        output = stdout.getvalue()
        assert '"empty"' in output
        assert "Flow:" not in output
        assert "Checkers:" not in output
        assert "Intelligence:" not in output

    def test_renders_inactive_marker(self):
        detail = PipelineDetail(
            name="old",
            description="",
            flow=["notify"],
            checkers=[],
            intelligence=None,
            notify_drivers=[],
            channels=[],
            created_at="2026-01-01 00:00",
            is_active=False,
        )
        stdout = StringIO()
        PipelineInspector.render_text(detail, stdout)
        output = stdout.getvalue()
        assert "(inactive)" in output

Step 2: Run tests

Run: uv run pytest apps/orchestration/_tests/test_services.py -v Expected: All pass (implementation already covers these)

Step 3: Check coverage

Run: uv run coverage run -m pytest apps/orchestration/_tests/test_services.py -q && uv run coverage report --include="apps/orchestration/services.py" --show-missing Expected: 100% coverage. If not, add tests for uncovered branches.

Step 4: Commit

git add apps/orchestration/_tests/test_services.py
git commit -m "test: add get_by_name and render_text coverage for PipelineInspector"

Task 3: show_pipeline management command

Files:

  • Create: apps/orchestration/management/commands/show_pipeline.py
  • Create: apps/orchestration/_tests/test_show_pipeline.py

Step 1: Write the failing tests

"""Tests for the show_pipeline management command."""

import json
from io import StringIO

from django.core.management import call_command
from django.test import TestCase

from apps.notify.models import NotificationChannel
from apps.orchestration.models import PipelineDefinition


class ShowPipelineListTests(TestCase):
    """Tests for show_pipeline list mode (default)."""

    def test_empty_state_shows_message(self):
        out = StringIO()
        call_command("show_pipeline", stdout=out)
        assert "No pipeline definitions found" in out.getvalue()

    def test_lists_active_pipelines(self):
        PipelineDefinition.objects.create(
            name="full",
            config={
                "version": "1.0",
                "nodes": [
                    {"id": "ingest", "type": "ingest", "config": {}},
                    {"id": "notify", "type": "notify", "config": {"drivers": ["slack"]}},
                ],
            },
        )
        out = StringIO()
        call_command("show_pipeline", stdout=out)
        output = out.getvalue()
        assert "full" in output
        assert "ingest" in output

    def test_excludes_inactive_by_default(self):
        PipelineDefinition.objects.create(
            name="old",
            config={"version": "1.0", "nodes": []},
            is_active=False,
        )
        out = StringIO()
        call_command("show_pipeline", stdout=out)
        assert "No pipeline definitions found" in out.getvalue()

    def test_all_flag_includes_inactive(self):
        PipelineDefinition.objects.create(
            name="old",
            config={"version": "1.0", "nodes": []},
            is_active=False,
        )
        out = StringIO()
        call_command("show_pipeline", all=True, stdout=out)
        output = out.getvalue()
        assert "old" in output
        assert "(inactive)" in output


class ShowPipelineNameTests(TestCase):
    """Tests for show_pipeline --name mode."""

    def test_shows_specific_pipeline(self):
        PipelineDefinition.objects.create(
            name="full",
            config={
                "version": "1.0",
                "nodes": [
                    {"id": "ingest", "type": "ingest", "config": {}},
                ],
            },
        )
        out = StringIO()
        call_command("show_pipeline", name="full", stdout=out)
        assert "full" in out.getvalue()

    def test_not_found_shows_error(self):
        out = StringIO()
        call_command("show_pipeline", name="nonexistent", stdout=out)
        assert "not found" in out.getvalue().lower()


class ShowPipelineJsonTests(TestCase):
    """Tests for show_pipeline --json mode."""

    def test_json_list_output(self):
        PipelineDefinition.objects.create(
            name="direct",
            config={
                "version": "1.0",
                "nodes": [
                    {"id": "notify", "type": "notify", "config": {"drivers": ["slack"]}},
                ],
            },
        )
        out = StringIO()
        call_command("show_pipeline", json_output=True, stdout=out)
        data = json.loads(out.getvalue())
        assert isinstance(data, list)
        assert len(data) == 1
        assert data[0]["name"] == "direct"

    def test_json_name_output(self):
        PipelineDefinition.objects.create(
            name="full",
            config={"version": "1.0", "nodes": []},
        )
        out = StringIO()
        call_command("show_pipeline", name="full", json_output=True, stdout=out)
        data = json.loads(out.getvalue())
        assert isinstance(data, dict)
        assert data["name"] == "full"

    def test_json_not_found(self):
        out = StringIO()
        call_command("show_pipeline", name="nope", json_output=True, stdout=out)
        data = json.loads(out.getvalue())
        assert data["error"] == "not_found"

    def test_json_empty_list(self):
        out = StringIO()
        call_command("show_pipeline", json_output=True, stdout=out)
        data = json.loads(out.getvalue())
        assert data == []

Step 2: Run tests to verify they fail

Run: uv run pytest apps/orchestration/_tests/test_show_pipeline.py -v Expected: ModuleNotFoundError for show_pipeline command

Step 3: Write the implementation

"""
Display pipeline definitions.

Usage:
    python manage.py show_pipeline              # list all active
    python manage.py show_pipeline --all        # include inactive
    python manage.py show_pipeline --name X     # specific pipeline
    python manage.py show_pipeline --json       # JSON output
"""

import json

from django.core.management.base import BaseCommand

from apps.orchestration.services import PipelineInspector


class Command(BaseCommand):
    help = "Display pipeline definitions with full details."

    def add_arguments(self, parser):
        parser.add_argument(
            "--name",
            type=str,
            help="Show a specific pipeline by name.",
        )
        parser.add_argument(
            "--all",
            action="store_true",
            dest="show_all",
            help="Include inactive pipelines.",
        )
        parser.add_argument(
            "--json",
            action="store_true",
            dest="json_output",
            help="Output as JSON.",
        )

    def handle(self, *args, **options):
        name = options.get("name")
        show_all = options.get("show_all", False)
        json_output = options.get("json_output", False)

        if name:
            self._show_single(name, json_output)
        else:
            self._show_list(show_all, json_output)

    def _show_single(self, name, json_output):
        detail = PipelineInspector.get_by_name(name)
        if detail is None:
            if json_output:
                self.stdout.write(json.dumps({"error": "not_found", "name": name}))
            else:
                self.stdout.write(self.style.ERROR(f'Pipeline "{name}" not found.'))
            return
        if json_output:
            self.stdout.write(json.dumps(detail.to_dict(), indent=2))
        else:
            PipelineInspector.render_text(detail, self.stdout)

    def _show_list(self, show_all, json_output):
        details = PipelineInspector.list_all(active_only=not show_all)
        if json_output:
            self.stdout.write(json.dumps([d.to_dict() for d in details], indent=2))
            return
        if not details:
            self.stdout.write(self.style.WARNING("No pipeline definitions found."))
            return
        for detail in details:
            PipelineInspector.render_text(detail, self.stdout)

Step 4: Run tests to verify they pass

Run: uv run pytest apps/orchestration/_tests/test_show_pipeline.py -v Expected: All pass

Step 5: Check coverage

Run: uv run coverage run -m pytest apps/orchestration/_tests/test_show_pipeline.py -q && uv run coverage report --include="apps/orchestration/management/commands/show_pipeline.py" --show-missing Expected: 100%

Step 6: Commit

git add apps/orchestration/management/commands/show_pipeline.py apps/orchestration/_tests/test_show_pipeline.py
git commit -m "feat: add show_pipeline management command with JSON support"

Task 4: Refactor setup_instance to use PipelineInspector

Files:

  • Modify: apps/orchestration/management/commands/setup_instance.py:602-645
  • Modify: apps/orchestration/_tests/test_setup_instance.py:687-810

Step 1: Replace _show_existing_details body

In setup_instance.py, replace the _show_existing_details method body with a delegation to PipelineInspector:

    def _show_existing_details(self, existing):
        """
        Display details of an existing wizard-created pipeline.

        Args:
            existing: PipelineDefinition instance.
        """
        from apps.orchestration.services import PipelineInspector

        detail = PipelineInspector.get_by_name(existing.name)
        if detail:
            PipelineInspector.render_text(detail, self.stdout)

This replaces the entire ~40-line method body with 4 lines.

Step 2: Run existing tests to verify nothing broke

Run: uv run pytest apps/orchestration/_tests/test_setup_instance.py -v Expected: All 102 tests pass

Step 3: Check coverage on setup_instance

Run: uv run coverage run -m pytest apps/orchestration/_tests/test_setup_instance.py -q && uv run coverage report --include="apps/orchestration/management/commands/setup_instance.py" --show-missing Expected: 100%. The ShowExistingDetailsTests class now exercises the delegation path. The detailed rendering is covered in test_services.py.

Step 4: Remove the now-redundant ShowExistingDetailsTests from test_setup_instance.py

The detailed rendering tests now live in test_services.py. Replace the ShowExistingDetailsTests class in test_setup_instance.py with a minimal test that verifies the delegation:

class ShowExistingDetailsTests(TestCase):
    """Tests for _show_existing_details delegation to PipelineInspector."""

    def setUp(self):
        self.cmd = Command(stdout=StringIO(), stderr=StringIO())

    def test_delegates_to_pipeline_inspector(self):
        defn = PipelineDefinition.objects.create(
            name="full",
            config={
                "version": "1.0",
                "nodes": [
                    {
                        "id": "check_health",
                        "type": "context",
                        "config": {"checker_names": ["cpu", "memory"]},
                        "next": "notify_channels",
                    },
                    {
                        "id": "notify_channels",
                        "type": "notify",
                        "config": {"drivers": ["slack"]},
                    },
                ],
            },
            created_by="setup_instance",
        )
        self.cmd._show_existing_details(defn)
        output = self.cmd.stdout.getvalue()
        # Verify the service rendered the details
        assert "full" in output
        assert "check_health" in output

    def test_handles_missing_pipeline_gracefully(self):
        """get_by_name returns None for a name that doesn't match."""
        defn = PipelineDefinition.objects.create(
            name="test",
            config={"version": "1.0", "nodes": []},
            created_by="setup_instance",
        )
        # Delete so get_by_name returns None
        defn.delete()
        self.cmd._show_existing_details(defn)
        # Should not crash — just no output
        output = self.cmd.stdout.getvalue()
        assert "test" not in output

Step 5: Run all tests

Run: uv run pytest apps/orchestration/_tests/test_setup_instance.py apps/orchestration/_tests/test_services.py -v Expected: All pass

Step 6: Check coverage on both files

Run: uv run coverage run -m pytest apps/orchestration/_tests/ -q && uv run coverage report --include="apps/orchestration/services.py,apps/orchestration/management/commands/setup_instance.py" --show-missing Expected: 100% on both

Step 7: Commit

git add apps/orchestration/management/commands/setup_instance.py apps/orchestration/_tests/test_setup_instance.py
git commit -m "refactor: delegate setup_instance display to PipelineInspector service"

Task 5: Integrate into monitor_pipeline

Files:

  • Modify: apps/orchestration/management/commands/monitor_pipeline.py:70-97

Step 1: Add pipeline definition display to show_run_details

In monitor_pipeline.py, after line 77 (self.stdout.write(self.style.HTTP_INFO(...))), add:

        # Show linked pipeline definition if available
        from apps.orchestration.services import PipelineInspector

        # Try to find a pipeline definition matching this run's source or trace
        details = PipelineInspector.list_all()
        if details:
            for detail in details:
                PipelineInspector.render_text(detail, self.stdout)
            self.stdout.write("")

Note: PipelineRun doesn’t have a FK to PipelineDefinition, so we show all active definitions as context. This is the simplest correct approach given the current schema.

Step 2: Run full test suite

Run: uv run pytest apps/orchestration/_tests/ -v Expected: All pass

Step 3: Commit

git add apps/orchestration/management/commands/monitor_pipeline.py
git commit -m "feat: show pipeline definitions inline in monitor_pipeline detail view"

Task 6: Integrate into cli.sh

Files:

  • Modify: bin/cli.sh:493-512

Step 1: Add “View pipelines” option to pipeline_menu

Replace the pipeline_menu function:

pipeline_menu() {
    show_banner
    echo -e "${BOLD}═══ Pipeline Orchestration ═══${NC}"
    echo ""

    local options=(
        "show_pipeline - View pipeline definitions"
        "run_pipeline - Execute a pipeline"
        "monitor_pipeline - Monitor pipeline execution"
        "Back to main menu"
    )

    select opt in "${options[@]}"; do
        case $REPLY in
            1) show_pipeline_menu ;;
            2) run_pipeline_menu ;;
            3) monitor_pipeline_menu ;;
            4) return ;;
            *) echo -e "${RED}Invalid option${NC}" ;;
        esac
        break
    done
}

Step 2: Add the show_pipeline_menu function

Add after the pipeline_menu function:

show_pipeline_menu() {
    show_banner
    echo -e "${BOLD}═══ show_pipeline ═══${NC}"
    echo ""
    echo "View pipeline definitions and their configuration"
    echo ""

    local options=(
        "Show all active pipelines"
        "Show all pipelines (including inactive)"
        "Show specific pipeline"
        "Show as JSON"
        "Back"
    )

    select opt in "${options[@]}"; do
        case $REPLY in
            1)
                confirm_and_run "uv run python manage.py show_pipeline"
                ;;
            2)
                confirm_and_run "uv run python manage.py show_pipeline --all"
                ;;
            3)
                read -p "Enter pipeline name: " pipeline_name
                if [ -n "$pipeline_name" ]; then
                    confirm_and_run "uv run python manage.py show_pipeline --name $pipeline_name"
                else
                    echo -e "${RED}Pipeline name required${NC}"
                fi
                ;;
            4)
                confirm_and_run "uv run python manage.py show_pipeline --json"
                ;;
            5)
                return
                ;;
            *)
                echo -e "${RED}Invalid option${NC}"
                ;;
        esac
        break
    done
}

Step 3: Commit

git add bin/cli.sh
git commit -m "feat: add show_pipeline menu option to cli.sh"

Task 7: Integrate into install.sh

Files:

  • Modify: bin/install.sh:344-359

Step 1: Add pipeline summary after setup

After the cron setup prompt (line 350) and before the aliases prompt (line 352), add:

# Show existing pipeline definitions if any
echo ""
PIPELINE_COUNT=$(uv run python manage.py show_pipeline --json 2>/dev/null | python3 -c "import sys,json; print(len(json.load(sys.stdin)))" 2>/dev/null || echo "0")
if [ "$PIPELINE_COUNT" != "0" ] && [ "$PIPELINE_COUNT" != "" ]; then
    info "Found $PIPELINE_COUNT configured pipeline(s):"
    uv run python manage.py show_pipeline
    echo ""
else
    info "No pipelines configured yet. Run the setup wizard to create one:"
    echo "  uv run python manage.py setup_instance"
    echo ""
fi

Step 2: Commit

git add bin/install.sh
git commit -m "feat: show pipeline summary in install.sh post-setup"

Task 8: Final verification and cleanup

Step 1: Run full test suite

Run: uv run pytest -v Expected: All tests pass

Step 2: Check coverage on all new/modified files

Run: uv run coverage run -m pytest apps/orchestration/_tests/ -q && uv run coverage report --include="apps/orchestration/services.py,apps/orchestration/management/commands/show_pipeline.py,apps/orchestration/management/commands/setup_instance.py,apps/orchestration/management/commands/monitor_pipeline.py" --show-missing Expected: 100% on services.py, show_pipeline.py, setup_instance.py

Step 3: Lint and format

Run: uv run black . && uv run ruff check . --fix Expected: Clean

Step 4: Run pre-commit

Run: uv run pre-commit run --all-files Expected: All pass

Step 5: Final commit if any formatting changes

git add -A && git commit -m "style: format and lint"

This site uses Just the Docs, a documentation theme for Jekyll.