System Status Command Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Implement a system_status management command that shows system profile at a glance and flags configuration inconsistencies across .env, settings.py, database state, and filesystem.
Architecture: A new management command (system_status) renders a dashboard (profile, pipeline state, definitions) then runs consistency checks from modular check functions in apps/checkers/status/. Each module exposes run() -> list[CheckResult]. Supports --json, --checks-only, --verbose flags.
Tech Stack: Django management commands, dataclasses, pathlib, re for env parsing. Tests use django.test.TestCase, unittest.mock, call_command.
Design doc: docs/plans/2026-04-05-system-status-design.md
Task 1: CheckResult dataclass and status package init
Files:
- Create:
apps/checkers/status/__init__.py - Test:
apps/checkers/_tests/status/__init__.py - Test:
apps/checkers/_tests/status/test_check_result.py
Step 1: Create test directory and write failing test
Create apps/checkers/_tests/status/__init__.py (empty).
Create apps/checkers/_tests/status/test_check_result.py:
"""Tests for CheckResult dataclass."""
from django.test import TestCase
from apps.checkers.status import CheckResult
class CheckResultTests(TestCase):
def test_defaults(self):
r = CheckResult(level="warn", message="something")
self.assertEqual(r.level, "warn")
self.assertEqual(r.message, "something")
self.assertEqual(r.hint, "")
self.assertEqual(r.category, "")
def test_all_fields(self):
r = CheckResult(
level="error",
message="bad config",
hint="fix it",
category="cluster",
)
self.assertEqual(r.level, "error")
self.assertEqual(r.hint, "fix it")
self.assertEqual(r.category, "cluster")
def test_valid_levels(self):
for level in ("ok", "info", "warn", "error"):
r = CheckResult(level=level, message="test")
self.assertEqual(r.level, level)
Step 2: Run test to verify it fails
Run: uv run pytest apps/checkers/_tests/status/test_check_result.py -v Expected: FAIL — ImportError: cannot import name 'CheckResult' from 'apps.checkers.status'
Step 3: Write minimal implementation
Create apps/checkers/status/__init__.py:
"""System status checks — cross-source configuration consistency."""
from dataclasses import dataclass
@dataclass
class CheckResult:
"""Result from a status check."""
level: str # "ok", "info", "warn", "error"
message: str
hint: str = ""
category: str = "" # "env", "cluster", "runtime", "database", "installation"
Step 4: Run test to verify it passes
Run: uv run pytest apps/checkers/_tests/status/test_check_result.py -v Expected: PASS (3 tests)
Step 5: Commit
git add apps/checkers/status/__init__.py apps/checkers/_tests/status/
git commit -m "feat(status): add CheckResult dataclass and status package"
Task 2: Env file consistency checks
Files:
- Create:
apps/checkers/status/env_checks.py - Test:
apps/checkers/_tests/status/test_env_checks.py
Context: This module parses .env, .env.sample, and config/settings.py to find drift between them. It uses file I/O via pathlib, not Django settings.
Step 1: Write the failing tests
Create apps/checkers/_tests/status/test_env_checks.py:
"""Tests for env file consistency checks."""
from pathlib import Path
from unittest.mock import patch
from django.test import TestCase
from apps.checkers.status import CheckResult
from apps.checkers.status.env_checks import (
parse_env_keys,
parse_sample_keys,
parse_settings_env_refs,
run,
)
class ParseEnvKeysTests(TestCase):
"""Tests for .env file key parsing."""
def test_parses_simple_keys(self):
content = "FOO=bar\nBAZ=qux\n"
self.assertEqual(parse_env_keys(content), {"FOO", "BAZ"})
def test_ignores_comments(self):
content = "# comment\nFOO=bar\n# BAZ=qux\n"
self.assertEqual(parse_env_keys(content), {"FOO"})
def test_ignores_blank_lines(self):
content = "\nFOO=bar\n\n\nBAZ=qux\n"
self.assertEqual(parse_env_keys(content), {"FOO", "BAZ"})
def test_handles_values_with_equals(self):
content = "URL=https://example.com?a=1&b=2\n"
self.assertEqual(parse_env_keys(content), {"URL"})
def test_empty_value(self):
content = "FOO=\n"
self.assertEqual(parse_env_keys(content), {"FOO"})
def test_empty_content(self):
self.assertEqual(parse_env_keys(""), set())
class ParseSampleKeysTests(TestCase):
"""Tests for .env.sample key parsing (includes commented-out keys)."""
def test_parses_active_and_commented_keys(self):
content = "FOO=bar\n# BAZ=qux\n# pure comment no equals\n"
active, commented = parse_sample_keys(content)
self.assertEqual(active, {"FOO"})
self.assertEqual(commented, {"BAZ"})
def test_double_hash_ignored(self):
content = "## heading\nFOO=bar\n"
active, commented = parse_sample_keys(content)
self.assertEqual(active, {"FOO"})
self.assertEqual(commented, set())
def test_commented_with_space(self):
content = "# OPTIONAL_KEY=default_value\n"
active, commented = parse_sample_keys(content)
self.assertEqual(active, set())
self.assertEqual(commented, {"OPTIONAL_KEY"})
class ParseSettingsEnvRefsTests(TestCase):
"""Tests for extracting os.environ.get references from settings.py."""
def test_parses_environ_get(self):
content = 'FOO = os.environ.get("MY_VAR", "default")\n'
self.assertEqual(parse_settings_env_refs(content), {"MY_VAR"})
def test_parses_single_quotes(self):
content = "FOO = os.environ.get('MY_VAR')\n"
self.assertEqual(parse_settings_env_refs(content), {"MY_VAR"})
def test_multiple_refs(self):
content = (
'A = os.environ.get("VAR_A", "")\n'
'B = os.environ.get("VAR_B", "0")\n'
)
self.assertEqual(parse_settings_env_refs(content), {"VAR_A", "VAR_B"})
def test_no_refs(self):
self.assertEqual(parse_settings_env_refs("x = 1\n"), set())
class RunEnvChecksTests(TestCase):
"""Integration tests for the env checks run() function."""
@patch("apps.checkers.status.env_checks._read_file")
def test_missing_env_file(self, mock_read):
mock_read.return_value = None # .env not found
results = run(base_dir=Path("/fake"))
errors = [r for r in results if r.level == "error"]
self.assertTrue(any(".env file not found" in r.message for r in errors))
@patch("apps.checkers.status.env_checks._read_file")
def test_missing_sample_file(self, mock_read):
def side_effect(path):
if path.name == ".env":
return "FOO=bar\n"
return None # .env.sample not found
mock_read.side_effect = side_effect
results = run(base_dir=Path("/fake"))
warns = [r for r in results if r.level == "warn"]
self.assertTrue(any(".env.sample not found" in r.message for r in warns))
@patch("apps.checkers.status.env_checks._read_file")
def test_sample_key_missing_from_env(self, mock_read):
def side_effect(path):
if path.name == ".env":
return "FOO=bar\n"
if path.name == ".env.sample":
return "FOO=bar\nMISSING_KEY=default\n"
if path.name == "settings.py":
return ""
return None
mock_read.side_effect = side_effect
results = run(base_dir=Path("/fake"))
warns = [r for r in results if r.level == "warn"]
self.assertTrue(any("MISSING_KEY" in r.message for r in warns))
@patch("apps.checkers.status.env_checks._read_file")
def test_env_key_not_in_sample(self, mock_read):
def side_effect(path):
if path.name == ".env":
return "FOO=bar\nEXTRA=val\n"
if path.name == ".env.sample":
return "FOO=bar\n"
if path.name == "settings.py":
return ""
return None
mock_read.side_effect = side_effect
results = run(base_dir=Path("/fake"))
warns = [r for r in results if r.level == "warn"]
self.assertTrue(any("EXTRA" in r.message for r in warns))
@patch("apps.checkers.status.env_checks._read_file")
def test_settings_ref_missing_from_sample(self, mock_read):
def side_effect(path):
if path.name == ".env":
return "FOO=bar\n"
if path.name == ".env.sample":
return "FOO=bar\n"
if path.name == "settings.py":
return 'x = os.environ.get("UNDOCUMENTED_VAR", "")\n'
return None
mock_read.side_effect = side_effect
results = run(base_dir=Path("/fake"))
warns = [r for r in results if r.level == "warn"]
self.assertTrue(any("UNDOCUMENTED_VAR" in r.message for r in warns))
@patch("apps.checkers.status.env_checks._read_file")
def test_all_consistent_returns_ok(self, mock_read):
def side_effect(path):
if path.name == ".env":
return "FOO=bar\n"
if path.name == ".env.sample":
return "FOO=default\n"
if path.name == "settings.py":
return 'x = os.environ.get("FOO", "")\n'
return None
mock_read.side_effect = side_effect
results = run(base_dir=Path("/fake"))
levels = {r.level for r in results}
self.assertNotIn("error", levels)
self.assertNotIn("warn", levels)
Step 2: Run test to verify it fails
Run: uv run pytest apps/checkers/_tests/status/test_env_checks.py -v Expected: FAIL — ModuleNotFoundError: No module named 'apps.checkers.status.env_checks'
Step 3: Write minimal implementation
Create apps/checkers/status/env_checks.py:
"""Env file consistency checks.
Compares .env, .env.sample, and config/settings.py to detect drift:
- Keys in .env.sample missing from .env
- Keys in .env not in .env.sample
- Keys referenced in settings.py missing from .env.sample
- Keys in .env.sample never referenced in settings.py
"""
import re
from pathlib import Path
from apps.checkers.status import CheckResult
CATEGORY = "env"
def _read_file(path: Path) -> str | None:
"""Read file contents or return None if missing."""
try:
return path.read_text()
except (FileNotFoundError, PermissionError):
return None
def parse_env_keys(content: str) -> set[str]:
"""Extract variable names from .env file content."""
keys = set()
for line in content.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key = line.split("=", 1)[0].strip()
if key:
keys.add(key)
return keys
def parse_sample_keys(content: str) -> tuple[set[str], set[str]]:
"""Extract active and commented-out keys from .env.sample.
Returns:
(active_keys, commented_keys)
"""
active = set()
commented = set()
for line in content.splitlines():
stripped = line.strip()
if not stripped:
continue
# Active key
if not stripped.startswith("#") and "=" in stripped:
key = stripped.split("=", 1)[0].strip()
if key:
active.add(key)
# Single-hash commented key (not ## headings)
elif stripped.startswith("#") and not stripped.startswith("##"):
rest = stripped.lstrip("# ")
if "=" in rest:
key = rest.split("=", 1)[0].strip()
# Filter out prose comments (key must look like an env var)
if re.match(r"^[A-Z][A-Z0-9_]*$", key):
commented.add(key)
return active, commented
def parse_settings_env_refs(content: str) -> set[str]:
"""Extract env var names from os.environ.get() calls in settings.py."""
return set(re.findall(r'os\.environ\.get\(["\']([A-Z_][A-Z0-9_]*)["\']', content))
def run(base_dir: Path) -> list[CheckResult]:
"""Run all env file consistency checks."""
results: list[CheckResult] = []
# Read files
env_content = _read_file(base_dir / ".env")
if env_content is None:
results.append(
CheckResult(
level="error",
message=".env file not found",
hint="Copy .env.sample to .env and configure it.",
category=CATEGORY,
)
)
return results
sample_content = _read_file(base_dir / ".env.sample")
if sample_content is None:
results.append(
CheckResult(
level="warn",
message=".env.sample not found",
hint=".env.sample serves as the reference for expected env vars.",
category=CATEGORY,
)
)
return results
settings_content = _read_file(base_dir / "config" / "settings.py")
# Parse
env_keys = parse_env_keys(env_content)
sample_active, sample_commented = parse_sample_keys(sample_content)
all_sample_keys = sample_active | sample_commented
settings_refs = parse_settings_env_refs(settings_content) if settings_content else set()
# Check: sample keys missing from .env
missing_from_env = sample_active - env_keys
for key in sorted(missing_from_env):
results.append(
CheckResult(
level="warn",
message=f"{key} is in .env.sample but missing from .env",
hint="Add it to .env or remove from .env.sample if no longer needed.",
category=CATEGORY,
)
)
# Check: .env keys not in .env.sample
unknown_in_env = env_keys - all_sample_keys
for key in sorted(unknown_in_env):
results.append(
CheckResult(
level="warn",
message=f"{key} is in .env but not documented in .env.sample",
hint="Add it to .env.sample so others know about it.",
category=CATEGORY,
)
)
# Check: settings.py references not in .env.sample
undocumented = settings_refs - all_sample_keys
for key in sorted(undocumented):
results.append(
CheckResult(
level="warn",
message=f"{key} is referenced in settings.py but missing from .env.sample",
hint="Document it in .env.sample.",
category=CATEGORY,
)
)
# Check: .env.sample keys never referenced in settings.py
if settings_refs:
unreferenced = sample_active - settings_refs
for key in sorted(unreferenced):
results.append(
CheckResult(
level="warn",
message=f"{key} is in .env.sample but never referenced in settings.py",
hint="Remove from .env.sample if no longer used, or it may be used in shell scripts only.",
category=CATEGORY,
)
)
# If no issues found, emit OK
if not results:
results.append(
CheckResult(level="ok", message="All .env keys are consistent", category=CATEGORY)
)
return results
Step 4: Run test to verify it passes
Run: uv run pytest apps/checkers/_tests/status/test_env_checks.py -v Expected: PASS (all tests)
Step 5: Commit
git add apps/checkers/status/env_checks.py apps/checkers/_tests/status/test_env_checks.py
git commit -m "feat(status): add env file consistency checks"
Task 3: Cluster profile coherence checks
Files:
- Create:
apps/checkers/status/cluster_checks.py - Test:
apps/checkers/_tests/status/test_cluster_checks.py
Context: These checks read Django settings (already loaded from .env) to detect cluster role conflicts.
Step 1: Write the failing tests
Create apps/checkers/_tests/status/test_cluster_checks.py:
"""Tests for cluster profile coherence checks."""
from unittest.mock import patch
from django.test import TestCase, override_settings
from apps.checkers.status.cluster_checks import run
class ClusterChecksTests(TestCase):
"""Test cluster profile consistency."""
@override_settings(
HUB_URL="https://hub.example.com",
CLUSTER_ENABLED=True,
WEBHOOK_SECRET_CLUSTER="secret",
INSTANCE_ID="node-1",
)
def test_agent_and_hub_conflict(self):
results = run()
errors = [r for r in results if r.level == "error"]
self.assertTrue(any("conflict" in r.message.lower() for r in errors))
@override_settings(
HUB_URL="https://hub.example.com",
CLUSTER_ENABLED=False,
WEBHOOK_SECRET_CLUSTER="",
INSTANCE_ID="node-1",
)
def test_agent_without_secret(self):
results = run()
warns = [r for r in results if r.level == "warn"]
self.assertTrue(any("WEBHOOK_SECRET_CLUSTER" in r.message for r in warns))
@override_settings(
HUB_URL="https://hub.example.com",
CLUSTER_ENABLED=False,
WEBHOOK_SECRET_CLUSTER="secret",
INSTANCE_ID="",
)
def test_agent_without_instance_id(self):
results = run()
warns = [r for r in results if r.level == "warn"]
self.assertTrue(any("INSTANCE_ID" in r.message for r in warns))
@override_settings(
HUB_URL="",
CLUSTER_ENABLED=True,
WEBHOOK_SECRET_CLUSTER="",
INSTANCE_ID="",
)
def test_hub_without_secret(self):
results = run()
errors = [r for r in results if r.level == "error"]
self.assertTrue(any("WEBHOOK_SECRET_CLUSTER" in r.message for r in errors))
@override_settings(
HUB_URL="",
CLUSTER_ENABLED=False,
WEBHOOK_SECRET_CLUSTER="",
INSTANCE_ID="",
)
def test_standalone_is_ok(self):
results = run()
levels = {r.level for r in results}
self.assertNotIn("error", levels)
self.assertNotIn("warn", levels)
@override_settings(
HUB_URL="https://hub.example.com",
CLUSTER_ENABLED=False,
WEBHOOK_SECRET_CLUSTER="secret",
INSTANCE_ID="node-1",
)
def test_valid_agent_is_ok(self):
results = run()
levels = {r.level for r in results}
self.assertNotIn("error", levels)
self.assertNotIn("warn", levels)
@override_settings(
HUB_URL="",
CLUSTER_ENABLED=True,
WEBHOOK_SECRET_CLUSTER="secret",
INSTANCE_ID="",
)
def test_valid_hub_is_ok(self):
results = run()
levels = {r.level for r in results}
self.assertNotIn("error", levels)
self.assertNotIn("warn", levels)
Step 2: Run test to verify it fails
Run: uv run pytest apps/checkers/_tests/status/test_cluster_checks.py -v Expected: FAIL — ModuleNotFoundError
Step 3: Write minimal implementation
Create apps/checkers/status/cluster_checks.py:
"""Cluster profile coherence checks.
Detects conflicts in cluster role configuration (agent vs hub vs standalone).
"""
from django.conf import settings
from apps.checkers.status import CheckResult
CATEGORY = "cluster"
def _get_role() -> str:
"""Derive cluster role from settings."""
has_hub_url = bool(getattr(settings, "HUB_URL", ""))
cluster_enabled = getattr(settings, "CLUSTER_ENABLED", False)
if has_hub_url and cluster_enabled:
return "conflict"
if has_hub_url:
return "agent"
if cluster_enabled:
return "hub"
return "standalone"
def run() -> list[CheckResult]:
"""Run cluster profile coherence checks."""
results: list[CheckResult] = []
role = _get_role()
hub_url = getattr(settings, "HUB_URL", "")
secret = getattr(settings, "WEBHOOK_SECRET_CLUSTER", "")
instance_id = getattr(settings, "INSTANCE_ID", "")
if role == "conflict":
results.append(
CheckResult(
level="error",
message="Cluster role conflict: both HUB_URL and CLUSTER_ENABLED=1 are set",
hint="An instance cannot be both an agent and a hub. Unset one.",
category=CATEGORY,
)
)
return results
if role == "agent":
if not secret:
results.append(
CheckResult(
level="warn",
message="Agent mode: WEBHOOK_SECRET_CLUSTER is empty",
hint="Set WEBHOOK_SECRET_CLUSTER for signed payloads to the hub.",
category=CATEGORY,
)
)
if not instance_id:
results.append(
CheckResult(
level="warn",
message="Agent mode: INSTANCE_ID is empty",
hint="Set INSTANCE_ID to identify this agent (defaults to hostname at runtime).",
category=CATEGORY,
)
)
if role == "hub":
if not secret:
results.append(
CheckResult(
level="error",
message="Hub mode: WEBHOOK_SECRET_CLUSTER is empty",
hint="Hub must have WEBHOOK_SECRET_CLUSTER to verify agent payloads.",
category=CATEGORY,
)
)
# OK if no issues
if not results:
if role == "standalone":
results.append(
CheckResult(level="ok", message="Standalone mode (no cluster)", category=CATEGORY)
)
else:
results.append(
CheckResult(level="ok", message=f"Cluster role: {role}", category=CATEGORY)
)
return results
Step 4: Run test to verify it passes
Run: uv run pytest apps/checkers/_tests/status/test_cluster_checks.py -v Expected: PASS (7 tests)
Step 5: Commit
git add apps/checkers/status/cluster_checks.py apps/checkers/_tests/status/test_cluster_checks.py
git commit -m "feat(status): add cluster profile coherence checks"
Task 4: Runtime state checks
Files:
- Create:
apps/checkers/status/runtime_checks.py - Test:
apps/checkers/_tests/status/test_runtime_checks.py
Context: Checks for contradictions between environment settings (prod debug, eager celery, metrics misconfiguration).
Step 1: Write the failing tests
Create apps/checkers/_tests/status/test_runtime_checks.py:
"""Tests for runtime state consistency checks."""
import os
from unittest.mock import patch
from django.test import TestCase, override_settings
from apps.checkers.status.runtime_checks import run
class RuntimeChecksTests(TestCase):
@override_settings(DEBUG=True)
@patch.dict(os.environ, {"DJANGO_ENV": "prod"})
def test_debug_on_in_production(self):
results = run()
errors = [r for r in results if r.level == "error"]
self.assertTrue(any("DEBUG" in r.message for r in errors))
@override_settings(DEBUG=False, ALLOWED_HOSTS=[])
@patch.dict(os.environ, {"DJANGO_ENV": "prod"})
def test_no_allowed_hosts_in_production(self):
results = run()
errors = [r for r in results if r.level == "error"]
self.assertTrue(any("ALLOWED_HOSTS" in r.message for r in errors))
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
@patch.dict(os.environ, {"DJANGO_ENV": "prod"})
def test_celery_eager_in_production(self):
results = run()
warns = [r for r in results if r.level == "warn"]
self.assertTrue(any("eager" in r.message.lower() for r in warns))
@override_settings(
ORCHESTRATION_METRICS_BACKEND="logging",
STATSD_HOST="custom-host",
)
@patch.dict(os.environ, {"DJANGO_ENV": "dev"})
def test_statsd_configured_but_backend_logging(self):
results = run()
infos = [r for r in results if r.level == "info"]
self.assertTrue(any("StatsD" in r.message for r in infos))
@override_settings(
ORCHESTRATION_METRICS_BACKEND="statsd",
STATSD_HOST="localhost",
)
@patch.dict(os.environ, {"DJANGO_ENV": "dev"})
def test_statsd_backend_with_default_host(self):
results = run()
warns = [r for r in results if r.level == "warn"]
self.assertTrue(any("STATSD_HOST" in r.message for r in warns))
@override_settings(
DEBUG=False,
ALLOWED_HOSTS=["example.com"],
CELERY_TASK_ALWAYS_EAGER=False,
ORCHESTRATION_METRICS_BACKEND="logging",
STATSD_HOST="localhost",
)
@patch.dict(os.environ, {"DJANGO_ENV": "prod"})
def test_clean_production(self):
results = run()
levels = {r.level for r in results}
self.assertNotIn("error", levels)
self.assertNotIn("warn", levels)
@override_settings(
DEBUG=True,
CELERY_TASK_ALWAYS_EAGER=True,
ORCHESTRATION_METRICS_BACKEND="logging",
STATSD_HOST="localhost",
)
@patch.dict(os.environ, {"DJANGO_ENV": "dev"})
def test_dev_mode_allows_debug_and_eager(self):
results = run()
levels = {r.level for r in results}
self.assertNotIn("error", levels)
Step 2: Run test to verify it fails
Run: uv run pytest apps/checkers/_tests/status/test_runtime_checks.py -v Expected: FAIL — ModuleNotFoundError
Step 3: Write minimal implementation
Create apps/checkers/status/runtime_checks.py:
"""Environment vs runtime state consistency checks.
Detects contradictions like DEBUG=True in production or Celery eager mode
in production.
"""
import os
from django.conf import settings
from apps.checkers.status import CheckResult
CATEGORY = "runtime"
def _is_production() -> bool:
return os.environ.get("DJANGO_ENV", "dev") in ("prod", "production")
def run() -> list[CheckResult]:
"""Run runtime state consistency checks."""
results: list[CheckResult] = []
prod = _is_production()
# Production-only checks
if prod:
if settings.DEBUG:
results.append(
CheckResult(
level="error",
message="DEBUG is enabled in production",
hint="Set DJANGO_DEBUG=0 in .env for production.",
category=CATEGORY,
)
)
if not settings.ALLOWED_HOSTS:
results.append(
CheckResult(
level="error",
message="ALLOWED_HOSTS is empty in production",
hint="Set DJANGO_ALLOWED_HOSTS in .env.",
category=CATEGORY,
)
)
if settings.CELERY_TASK_ALWAYS_EAGER:
results.append(
CheckResult(
level="warn",
message="Celery is in eager mode in production",
hint="Set CELERY_TASK_ALWAYS_EAGER=0 for real task execution.",
category=CATEGORY,
)
)
# Metrics consistency (any environment)
backend = getattr(settings, "ORCHESTRATION_METRICS_BACKEND", "logging")
statsd_host = getattr(settings, "STATSD_HOST", "localhost")
if backend == "logging" and statsd_host != "localhost":
results.append(
CheckResult(
level="info",
message=f"StatsD host is configured ({statsd_host}) but metrics backend is 'logging'",
hint="Set ORCHESTRATION_METRICS_BACKEND=statsd to use StatsD.",
category=CATEGORY,
)
)
if backend == "statsd" and statsd_host == "localhost":
results.append(
CheckResult(
level="warn",
message="Metrics backend is 'statsd' but STATSD_HOST is still 'localhost'",
hint="Set STATSD_HOST to your StatsD server address.",
category=CATEGORY,
)
)
if not results:
results.append(
CheckResult(level="ok", message="Runtime configuration is consistent", category=CATEGORY)
)
return results
Step 4: Run test to verify it passes
Run: uv run pytest apps/checkers/_tests/status/test_runtime_checks.py -v Expected: PASS (7 tests)
Step 5: Commit
git add apps/checkers/status/runtime_checks.py apps/checkers/_tests/status/test_runtime_checks.py
git commit -m "feat(status): add runtime state consistency checks"
Task 5: Database vs config state checks
Files:
- Create:
apps/checkers/status/database_checks.py - Test:
apps/checkers/_tests/status/test_database_checks.py
Context: Queries PipelineDefinition, NotificationChannel, and IntelligenceProvider models to detect conflicts with env config.
Step 1: Write the failing tests
Create apps/checkers/_tests/status/test_database_checks.py:
"""Tests for database vs config state checks."""
from django.test import TestCase, override_settings
from apps.checkers.status.database_checks import run
from apps.intelligence.models import IntelligenceProvider
from apps.notify.models import NotificationChannel
from apps.orchestration.models import PipelineDefinition
class DatabaseChecksTests(TestCase):
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
def test_active_pipeline_with_eager_celery(self):
PipelineDefinition.objects.create(name="test", config={}, is_active=True)
results = run()
warns = [r for r in results if r.level == "warn"]
self.assertTrue(any("eager" in r.message.lower() for r in warns))
@override_settings(CELERY_TASK_ALWAYS_EAGER=False)
def test_active_pipeline_without_eager_is_ok(self):
PipelineDefinition.objects.create(name="test", config={}, is_active=True)
NotificationChannel.objects.create(name="ch", driver="slack", is_active=True)
results = run()
errors = [r for r in results if r.level in ("error", "warn")]
# Should not warn about eager
self.assertFalse(any("eager" in r.message.lower() for r in errors))
def test_no_active_channels(self):
results = run()
warns = [r for r in results if r.level == "warn"]
self.assertTrue(any("notification channel" in r.message.lower() for r in warns))
def test_active_channel_present(self):
NotificationChannel.objects.create(name="ch", driver="slack", is_active=True)
results = run()
warns = [r for r in results if r.level == "warn"]
self.assertFalse(any("notification channel" in r.message.lower() for r in warns))
def test_no_active_definitions(self):
results = run()
infos = [r for r in results if r.level == "info"]
self.assertTrue(any("pipeline definition" in r.message.lower() for r in infos))
@override_settings(ORCHESTRATION_INTELLIGENCE_FALLBACK_ENABLED=False)
def test_intelligence_active_fallback_disabled(self):
IntelligenceProvider.objects.create(
name="test-ai", provider="claude", is_active=True
)
results = run()
infos = [r for r in results if r.level == "info"]
self.assertTrue(any("fallback" in r.message.lower() for r in infos))
Step 2: Run test to verify it fails
Run: uv run pytest apps/checkers/_tests/status/test_database_checks.py -v Expected: FAIL — ModuleNotFoundError
Step 3: Write minimal implementation
Create apps/checkers/status/database_checks.py:
"""Database vs config state consistency checks.
Compares database records (pipeline definitions, notification channels,
intelligence providers) against environment configuration.
"""
from django.conf import settings
from apps.checkers.status import CheckResult
CATEGORY = "database"
def run() -> list[CheckResult]:
"""Run database vs config consistency checks."""
from apps.intelligence.models import IntelligenceProvider
from apps.notify.models import NotificationChannel
from apps.orchestration.models import PipelineDefinition
results: list[CheckResult] = []
active_definitions = PipelineDefinition.objects.filter(is_active=True).count()
active_channels = NotificationChannel.objects.filter(is_active=True).count()
active_providers = IntelligenceProvider.objects.filter(is_active=True)
# Active pipelines + Celery eager
if active_definitions > 0 and settings.CELERY_TASK_ALWAYS_EAGER:
results.append(
CheckResult(
level="warn",
message=f"{active_definitions} active pipeline definition(s) but Celery is in eager mode",
hint="Eager mode runs tasks inline — set CELERY_TASK_ALWAYS_EAGER=0 for async execution.",
category=CATEGORY,
)
)
# No active notification channels
if active_channels == 0:
results.append(
CheckResult(
level="warn",
message="No active notification channels configured",
hint="Add a notification channel via Django Admin.",
category=CATEGORY,
)
)
# No active pipeline definitions
if active_definitions == 0:
results.append(
CheckResult(
level="info",
message="No active pipeline definitions",
hint="Create one via Django Admin or run: manage.py setup_instance",
category=CATEGORY,
)
)
# Intelligence provider active but fallback disabled
fallback = getattr(settings, "ORCHESTRATION_INTELLIGENCE_FALLBACK_ENABLED", True)
if active_providers.exists() and not fallback:
results.append(
CheckResult(
level="info",
message="Intelligence provider is active but fallback is disabled",
hint="If the AI provider fails, the pipeline will fail. Set ORCHESTRATION_INTELLIGENCE_FALLBACK_ENABLED=1 to continue on failure.",
category=CATEGORY,
)
)
if not results:
results.append(
CheckResult(level="ok", message="Database state is consistent with config", category=CATEGORY)
)
return results
Step 4: Run test to verify it passes
Run: uv run pytest apps/checkers/_tests/status/test_database_checks.py -v Expected: PASS (6 tests)
Step 5: Commit
git add apps/checkers/status/database_checks.py apps/checkers/_tests/status/test_database_checks.py
git commit -m "feat(status): add database vs config consistency checks"
Task 6: Installation state checks
Files:
- Create:
apps/checkers/status/installation_checks.py - Test:
apps/checkers/_tests/status/test_installation_checks.py
Context: Checks filesystem state (aliases, hooks, cron, directory permissions). Uses pathlib and subprocess for crontab inspection.
Step 1: Write the failing tests
Create apps/checkers/_tests/status/test_installation_checks.py:
"""Tests for installation state checks."""
import os
from pathlib import Path
from unittest.mock import patch, MagicMock
from django.test import TestCase, override_settings
from apps.checkers.status.installation_checks import run
class InstallationChecksTests(TestCase):
@patch.dict(os.environ, {"DJANGO_ENV": "dev"})
@patch("apps.checkers.status.installation_checks._path_exists")
def test_aliases_missing_in_dev(self, mock_exists):
mock_exists.side_effect = lambda p: "aliases" not in str(p)
results = run(base_dir=Path("/fake"))
warns = [r for r in results if r.level == "warn"]
self.assertTrue(any("aliases" in r.message.lower() for r in warns))
@patch.dict(os.environ, {"DJANGO_ENV": "dev"})
@patch("apps.checkers.status.installation_checks._path_exists")
def test_precommit_missing_in_dev(self, mock_exists):
mock_exists.side_effect = lambda p: "pre-commit" not in str(p)
results = run(base_dir=Path("/fake"))
warns = [r for r in results if r.level == "warn"]
self.assertTrue(any("pre-commit" in r.message.lower() for r in warns))
@patch.dict(os.environ, {"DJANGO_ENV": "prod"})
@patch("apps.checkers.status.installation_checks._path_exists")
@patch("apps.checkers.status.installation_checks._check_crontab")
@patch("apps.checkers.status.installation_checks._is_writable")
def test_cron_missing_in_prod(self, mock_writable, mock_cron, mock_exists):
mock_exists.return_value = True
mock_cron.return_value = False
mock_writable.return_value = True
results = run(base_dir=Path("/fake"))
warns = [r for r in results if r.level == "warn"]
self.assertTrue(any("cron" in r.message.lower() for r in warns))
@patch.dict(os.environ, {"DJANGO_ENV": "dev"})
@patch("apps.checkers.status.installation_checks._path_exists")
@override_settings(LOGS_DIR=Path("/fake/logs"))
@patch("apps.checkers.status.installation_checks._is_writable")
def test_logs_dir_not_writable(self, mock_writable, mock_exists):
mock_exists.return_value = True
mock_writable.return_value = False
results = run(base_dir=Path("/fake"))
errors = [r for r in results if r.level == "error"]
self.assertTrue(any("logs" in r.message.lower() for r in errors))
@patch.dict(os.environ, {"DJANGO_ENV": "dev"})
@patch("apps.checkers.status.installation_checks._path_exists")
@override_settings(
LOGS_DIR=Path("/fake/logs"),
DATABASES={"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": Path("/fake/db.sqlite3")}},
)
@patch("apps.checkers.status.installation_checks._is_writable")
def test_db_file_not_writable(self, mock_writable, mock_exists):
mock_exists.return_value = True
mock_writable.side_effect = lambda p: "db" not in str(p)
results = run(base_dir=Path("/fake"))
errors = [r for r in results if r.level == "error"]
self.assertTrue(any("database" in r.message.lower() for r in errors))
@patch.dict(os.environ, {"DJANGO_ENV": "dev"})
@patch("apps.checkers.status.installation_checks._path_exists")
@override_settings(LOGS_DIR=Path("/fake/logs"))
@patch("apps.checkers.status.installation_checks._is_writable")
def test_all_ok_in_dev(self, mock_writable, mock_exists):
mock_exists.return_value = True
mock_writable.return_value = True
results = run(base_dir=Path("/fake"))
levels = {r.level for r in results}
self.assertNotIn("error", levels)
Step 2: Run test to verify it fails
Run: uv run pytest apps/checkers/_tests/status/test_installation_checks.py -v Expected: FAIL — ModuleNotFoundError
Step 3: Write minimal implementation
Create apps/checkers/status/installation_checks.py:
"""Installation state consistency checks.
Checks filesystem state: aliases, pre-commit hooks, cron, directory permissions.
"""
import os
import subprocess
from pathlib import Path
from django.conf import settings
from apps.checkers.status import CheckResult
CATEGORY = "installation"
def _path_exists(path: Path) -> bool:
return path.exists()
def _is_writable(path: Path) -> bool:
return os.access(path, os.W_OK)
def _check_crontab() -> bool:
"""Check if any crontab entries reference this project."""
try:
result = subprocess.run(
["crontab", "-l"],
capture_output=True,
text=True,
timeout=5,
)
return "server-maintanence" in result.stdout or "manage.py" in result.stdout
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
return False
def _is_production() -> bool:
return os.environ.get("DJANGO_ENV", "dev") in ("prod", "production")
def run(base_dir: Path) -> list[CheckResult]:
"""Run installation state checks."""
results: list[CheckResult] = []
prod = _is_production()
# Dev-only checks
if not prod:
if not _path_exists(base_dir / "bin" / "aliases.sh"):
results.append(
CheckResult(
level="warn",
message="Shell aliases not installed",
hint="Run: bin/install.sh aliases",
category=CATEGORY,
)
)
if not _path_exists(base_dir / ".git" / "hooks" / "pre-commit"):
results.append(
CheckResult(
level="warn",
message="Pre-commit hooks not installed",
hint="Run: uv run pre-commit install",
category=CATEGORY,
)
)
# Production-only checks
if prod:
if not _check_crontab():
results.append(
CheckResult(
level="warn",
message="No cron jobs configured for this project",
hint="Run: bin/install.sh cron",
category=CATEGORY,
)
)
# Directory permission checks (all environments)
logs_dir = getattr(settings, "LOGS_DIR", base_dir / "logs")
if _path_exists(logs_dir) and not _is_writable(logs_dir):
results.append(
CheckResult(
level="error",
message=f"Logs directory is not writable: {logs_dir}",
hint="Fix permissions: chmod u+w on the logs directory.",
category=CATEGORY,
)
)
db_settings = settings.DATABASES.get("default", {})
if db_settings.get("ENGINE", "").endswith("sqlite3"):
db_path = Path(db_settings.get("NAME", ""))
if db_path and _path_exists(db_path) and not _is_writable(db_path):
results.append(
CheckResult(
level="error",
message=f"Database file is not writable: {db_path}",
hint="Fix permissions on the SQLite database file.",
category=CATEGORY,
)
)
if not results:
results.append(
CheckResult(level="ok", message="Installation state is consistent", category=CATEGORY)
)
return results
Step 4: Run test to verify it passes
Run: uv run pytest apps/checkers/_tests/status/test_installation_checks.py -v Expected: PASS (6 tests)
Step 5: Commit
git add apps/checkers/status/installation_checks.py apps/checkers/_tests/status/test_installation_checks.py
git commit -m "feat(status): add installation state consistency checks"
Task 7: Dashboard renderer
Files:
- Create:
apps/checkers/status/dashboard.py - Test:
apps/checkers/_tests/status/test_dashboard.py
Context: Renders the system profile, pipeline state, and pipeline definitions sections. Returns both structured data (for JSON) and human-readable strings.
Step 1: Write the failing tests
Create apps/checkers/_tests/status/test_dashboard.py:
"""Tests for the system status dashboard renderer."""
import os
from unittest.mock import patch
from django.test import TestCase, override_settings
from apps.checkers.status.dashboard import get_profile, get_pipeline_state, render_definition_chain
from apps.intelligence.models import IntelligenceProvider
from apps.notify.models import NotificationChannel
from apps.orchestration.models import PipelineDefinition, PipelineRun
class GetProfileTests(TestCase):
@override_settings(
HUB_URL="https://hub.example.com",
CLUSTER_ENABLED=False,
DEBUG=False,
DATABASES={"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": "/tmp/db.sqlite3"}},
CELERY_BROKER_URL="redis://localhost:6379/0",
CELERY_TASK_ALWAYS_EAGER=False,
ORCHESTRATION_METRICS_BACKEND="logging",
INSTANCE_ID="node-1",
LOGS_DIR="/var/log/sm",
)
@patch.dict(os.environ, {"DJANGO_ENV": "prod", "DEPLOY_METHOD": "bare"})
def test_agent_profile(self):
profile = get_profile()
self.assertEqual(profile["role"], "agent")
self.assertEqual(profile["hub_url"], "https://hub.example.com")
self.assertEqual(profile["environment"], "prod")
self.assertFalse(profile["debug"])
self.assertEqual(profile["deploy_method"], "bare")
self.assertEqual(profile["instance_id"], "node-1")
@override_settings(
HUB_URL="",
CLUSTER_ENABLED=True,
DEBUG=False,
DATABASES={"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": "/tmp/db.sqlite3"}},
CELERY_BROKER_URL="redis://localhost:6379/0",
CELERY_TASK_ALWAYS_EAGER=False,
ORCHESTRATION_METRICS_BACKEND="logging",
INSTANCE_ID="",
LOGS_DIR="/var/log/sm",
)
@patch.dict(os.environ, {"DJANGO_ENV": "prod", "DEPLOY_METHOD": "docker"})
def test_hub_profile(self):
profile = get_profile()
self.assertEqual(profile["role"], "hub")
@override_settings(
HUB_URL="",
CLUSTER_ENABLED=False,
DEBUG=True,
DATABASES={"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": "/tmp/db.sqlite3"}},
CELERY_BROKER_URL="redis://localhost:6379/0",
CELERY_TASK_ALWAYS_EAGER=True,
ORCHESTRATION_METRICS_BACKEND="logging",
INSTANCE_ID="",
LOGS_DIR="/tmp/logs",
)
@patch.dict(os.environ, {"DJANGO_ENV": "dev", "DEPLOY_METHOD": "bare"})
def test_standalone_profile(self):
profile = get_profile()
self.assertEqual(profile["role"], "standalone")
self.assertTrue(profile["debug"])
self.assertTrue(profile["celery_eager"])
class GetPipelineStateTests(TestCase):
def test_empty_state(self):
state = get_pipeline_state()
self.assertEqual(state["channels"], [])
self.assertEqual(state["intelligence"], [])
self.assertIsNone(state["last_run"])
def test_with_channels_and_providers(self):
NotificationChannel.objects.create(name="slack", driver="slack", is_active=True)
NotificationChannel.objects.create(name="email", driver="email", is_active=False)
IntelligenceProvider.objects.create(name="ai", provider="claude", is_active=True)
state = get_pipeline_state()
self.assertEqual(len(state["channels"]), 2)
self.assertEqual(len(state["intelligence"]), 1)
def test_last_run(self):
run = PipelineRun.objects.create(
trace_id="t1", run_id="r1", status="notified"
)
state = get_pipeline_state()
self.assertIsNotNone(state["last_run"])
self.assertEqual(state["last_run"]["status"], "notified")
class RenderDefinitionChainTests(TestCase):
def test_renders_node_chain(self):
defn = PipelineDefinition.objects.create(
name="full",
config={
"nodes": [
{"id": "ingest", "type": "alerts", "config": {"driver": "webhook"}, "next": "check"},
{"id": "check", "type": "checkers", "config": {"checkers": ["cpu", "memory"]}, "next": "notify"},
{"id": "notify", "type": "notify", "config": {"driver": "slack"}},
]
},
is_active=True,
)
chain = render_definition_chain(defn)
self.assertIn("alerts", chain)
self.assertIn("cpu", chain)
self.assertIn("notify", chain)
self.assertIn("→", chain)
def test_empty_config(self):
defn = PipelineDefinition.objects.create(name="empty", config={}, is_active=True)
chain = render_definition_chain(defn)
self.assertEqual(chain, "(no stages)")
Step 2: Run test to verify it fails
Run: uv run pytest apps/checkers/_tests/status/test_dashboard.py -v Expected: FAIL — ModuleNotFoundError
Step 3: Write minimal implementation
Create apps/checkers/status/dashboard.py:
"""System status dashboard data and rendering.
Produces structured data for the system profile, pipeline state,
and pipeline definitions. Used by the system_status command for
both human-readable and JSON output.
"""
import os
from pathlib import Path
from django.conf import settings
def get_profile() -> dict:
"""Build the system profile dictionary."""
hub_url = getattr(settings, "HUB_URL", "")
cluster_enabled = getattr(settings, "CLUSTER_ENABLED", False)
if hub_url and cluster_enabled:
role = "conflict"
elif hub_url:
role = "agent"
elif cluster_enabled:
role = "hub"
else:
role = "standalone"
db_config = settings.DATABASES.get("default", {})
db_name = str(db_config.get("NAME", ""))
return {
"role": role,
"hub_url": hub_url,
"environment": os.environ.get("DJANGO_ENV", "dev"),
"debug": settings.DEBUG,
"deploy_method": os.environ.get("DEPLOY_METHOD", "bare"),
"database": db_name,
"celery_broker": getattr(settings, "CELERY_BROKER_URL", ""),
"celery_eager": getattr(settings, "CELERY_TASK_ALWAYS_EAGER", False),
"metrics_backend": getattr(settings, "ORCHESTRATION_METRICS_BACKEND", "logging"),
"instance_id": getattr(settings, "INSTANCE_ID", ""),
"logs_dir": str(getattr(settings, "LOGS_DIR", "")),
}
def get_pipeline_state() -> dict:
"""Build pipeline state: channels, intelligence, last run."""
from apps.intelligence.models import IntelligenceProvider
from apps.notify.models import NotificationChannel
from apps.orchestration.models import PipelineRun
channels = list(
NotificationChannel.objects.all()
.order_by("name")
.values("name", "driver", "is_active")
)
intelligence = list(
IntelligenceProvider.objects.filter(is_active=True)
.order_by("name")
.values("name", "provider", "is_active")
)
last_run_qs = PipelineRun.objects.order_by("-created_at").first()
last_run = None
if last_run_qs:
last_run = {
"timestamp": last_run_qs.created_at.isoformat() if last_run_qs.created_at else None,
"status": last_run_qs.status,
"run_id": last_run_qs.run_id,
}
return {
"channels": channels,
"intelligence": intelligence,
"last_run": last_run,
}
def get_definitions() -> list[dict]:
"""Build pipeline definitions with stage chains."""
from apps.orchestration.models import PipelineDefinition
definitions = []
for defn in PipelineDefinition.objects.order_by("-is_active", "name"):
definitions.append(
{
"name": defn.name,
"active": defn.is_active,
"chain": render_definition_chain(defn),
"stages": _extract_stages(defn),
}
)
return definitions
def render_definition_chain(defn) -> str:
"""Render a pipeline definition's nodes as a human-readable chain.
Example: 'alerts: webhook → checkers: cpu,memory → notify: slack'
"""
nodes = defn.get_nodes()
if not nodes:
return "(no stages)"
parts = []
for node in nodes:
node_type = node.get("type", "unknown")
config = node.get("config", {})
# Extract the meaningful config values
detail_parts = []
for key in ("driver", "provider"):
if key in config:
detail_parts.append(config[key])
if "checkers" in config:
detail_parts.append(",".join(config["checkers"]))
if "drivers" in config:
detail_parts.append(",".join(config["drivers"]))
if "channels" in config:
detail_parts.append(",".join(config["channels"]))
if detail_parts:
parts.append(f"{node_type}: {','.join(detail_parts)}")
else:
parts.append(node_type)
return " → ".join(parts)
def _extract_stages(defn) -> list[dict]:
"""Extract stages for JSON output."""
nodes = defn.get_nodes()
stages = []
for node in nodes:
stage = {"stage": node.get("type", "unknown")}
config = node.get("config", {})
for key in ("driver", "drivers", "provider", "providers", "checkers", "channels"):
if key in config:
val = config[key]
stage[key] = val if isinstance(val, list) else [val]
stages.append(stage)
return stages
Step 4: Run test to verify it passes
Run: uv run pytest apps/checkers/_tests/status/test_dashboard.py -v Expected: PASS (7 tests)
Step 5: Commit
git add apps/checkers/status/dashboard.py apps/checkers/_tests/status/test_dashboard.py
git commit -m "feat(status): add dashboard renderer for system profile and pipeline state"
Task 8: The system_status management command
Files:
- Create:
apps/checkers/management/commands/system_status.py - Test:
apps/checkers/_tests/status/test_command.py
Context: Ties everything together. Renders dashboard, runs all check modules, formats output. Follow the patterns from preflight.py and check_health.py (using self.style.*, self.stdout.write, JSON support).
Step 1: Write the failing tests
Create apps/checkers/_tests/status/test_command.py:
"""Tests for the system_status management command."""
import json
import os
from io import StringIO
from pathlib import Path
from unittest.mock import patch
from django.core.management import call_command
from django.test import TestCase, override_settings
from apps.notify.models import NotificationChannel
from apps.orchestration.models import PipelineDefinition
class SystemStatusCommandTests(TestCase):
"""Tests for the system_status management command."""
def _call(self, *args, **kwargs):
out = StringIO()
err = StringIO()
call_command("system_status", *args, stdout=out, stderr=err, **kwargs)
return out.getvalue(), err.getvalue()
@patch("apps.checkers.status.env_checks._read_file")
@patch.dict(os.environ, {"DJANGO_ENV": "dev", "DEPLOY_METHOD": "bare"})
def test_human_output_contains_profile(self, mock_read):
mock_read.return_value = None # skip env checks
output, _ = self._call()
self.assertIn("System Profile", output)
self.assertIn("Role:", output)
@patch("apps.checkers.status.env_checks._read_file")
@patch.dict(os.environ, {"DJANGO_ENV": "dev", "DEPLOY_METHOD": "bare"})
def test_json_output_is_valid(self, mock_read):
mock_read.return_value = None
output, _ = self._call("--json")
data = json.loads(output)
self.assertIn("profile", data)
self.assertIn("pipeline", data)
self.assertIn("definitions", data)
self.assertIn("checks", data)
self.assertIn("summary", data)
@patch("apps.checkers.status.env_checks._read_file")
@patch.dict(os.environ, {"DJANGO_ENV": "dev", "DEPLOY_METHOD": "bare"})
def test_checks_only_skips_dashboard(self, mock_read):
mock_read.return_value = None
output, _ = self._call("--checks-only")
self.assertNotIn("System Profile", output)
self.assertIn("Consistency", output)
@patch("apps.checkers.status.env_checks._read_file")
@patch.dict(os.environ, {"DJANGO_ENV": "dev", "DEPLOY_METHOD": "bare"})
def test_verbose_shows_ok_checks(self, mock_read):
mock_read.return_value = None
NotificationChannel.objects.create(name="ch", driver="slack", is_active=True)
output, _ = self._call("--verbose")
self.assertIn("OK", output)
@patch("apps.checkers.status.env_checks._read_file")
@patch.dict(os.environ, {"DJANGO_ENV": "dev", "DEPLOY_METHOD": "bare"})
def test_pipeline_definitions_shown(self, mock_read):
mock_read.return_value = None
PipelineDefinition.objects.create(
name="test-pipe",
config={"nodes": [{"id": "n1", "type": "notify", "config": {"driver": "slack"}}]},
is_active=True,
)
output, _ = self._call()
self.assertIn("Pipeline Definitions", output)
self.assertIn("test-pipe", output)
@patch("apps.checkers.status.env_checks._read_file")
@patch.dict(os.environ, {"DJANGO_ENV": "dev", "DEPLOY_METHOD": "bare"})
def test_json_definitions_include_stages(self, mock_read):
mock_read.return_value = None
PipelineDefinition.objects.create(
name="test-pipe",
config={"nodes": [{"id": "n1", "type": "notify", "config": {"driver": "slack"}}]},
is_active=True,
)
output, _ = self._call("--json")
data = json.loads(output)
self.assertTrue(len(data["definitions"]) > 0)
self.assertIn("stages", data["definitions"][0])
@patch("apps.checkers.status.env_checks._read_file")
@patch.dict(os.environ, {"DJANGO_ENV": "dev", "DEPLOY_METHOD": "bare"})
def test_summary_counts(self, mock_read):
mock_read.return_value = None
output, _ = self._call("--json")
data = json.loads(output)
summary = data["summary"]
self.assertIn("passed", summary)
self.assertIn("warnings", summary)
self.assertIn("errors", summary)
Step 2: Run test to verify it fails
Run: uv run pytest apps/checkers/_tests/status/test_command.py -v Expected: FAIL — Unknown command: 'system_status'
Step 3: Write the command implementation
Create apps/checkers/management/commands/system_status.py:
"""
System status: configuration dashboard and consistency checks.
Usage:
python manage.py system_status # Dashboard + issues
python manage.py system_status --json # Full JSON for CI
python manage.py system_status --checks-only # Skip dashboard, issues only
python manage.py system_status --verbose # Include passing checks
"""
import json
from pathlib import Path
from typing import Any
from django.conf import settings
from django.core.management.base import BaseCommand
from apps.checkers.status import CheckResult
from apps.checkers.status.dashboard import get_definitions, get_pipeline_state, get_profile
# Check modules — each exposes run() -> list[CheckResult]
from apps.checkers.status import (
cluster_checks,
database_checks,
env_checks,
installation_checks,
runtime_checks,
)
BASE_DIR = Path(settings.BASE_DIR)
class Command(BaseCommand):
help = "Show system profile and flag configuration inconsistencies"
requires_system_checks: list[str] = []
def add_arguments(self, parser):
parser.add_argument(
"--json",
action="store_true",
default=False,
dest="json_output",
help="Output as JSON",
)
parser.add_argument(
"--checks-only",
action="store_true",
default=False,
help="Skip dashboard, show only consistency checks",
)
parser.add_argument(
"--verbose",
action="store_true",
default=False,
help="Show passing checks too",
)
def handle(self, *args, **options):
json_output = options["json_output"]
checks_only = options["checks_only"]
verbose = options["verbose"]
# Collect data
profile = get_profile()
pipeline = get_pipeline_state()
definitions = get_definitions()
# Run all consistency checks
all_checks: list[CheckResult] = []
all_checks.extend(env_checks.run(base_dir=BASE_DIR))
all_checks.extend(cluster_checks.run())
all_checks.extend(runtime_checks.run())
all_checks.extend(database_checks.run())
all_checks.extend(installation_checks.run(base_dir=BASE_DIR))
# Summary
passed = sum(1 for c in all_checks if c.level == "ok")
warnings = sum(1 for c in all_checks if c.level == "warn")
errors = sum(1 for c in all_checks if c.level == "error")
if json_output:
self._output_json(profile, pipeline, definitions, all_checks, passed, warnings, errors)
else:
self._output_human(
profile, pipeline, definitions, all_checks, passed, warnings, errors,
checks_only=checks_only, verbose=verbose,
)
def _output_json(
self,
profile: dict,
pipeline: dict,
definitions: list[dict],
checks: list[CheckResult],
passed: int,
warnings: int,
errors: int,
) -> None:
data = {
"profile": profile,
"pipeline": pipeline,
"definitions": definitions,
"checks": [
{
"level": c.level,
"category": c.category,
"message": c.message,
"hint": c.hint,
}
for c in checks
if c.level != "ok"
],
"summary": {"passed": passed, "warnings": warnings, "errors": errors},
}
self.stdout.write(json.dumps(data, indent=2, default=str))
def _output_human(
self,
profile: dict,
pipeline: dict,
definitions: list[dict],
checks: list[CheckResult],
passed: int,
warnings: int,
errors: int,
checks_only: bool = False,
verbose: bool = False,
) -> None:
if not checks_only:
self._render_profile(profile)
self._render_pipeline_state(pipeline)
self._render_definitions(definitions)
self._render_checks(checks, verbose)
self._render_summary(passed, warnings, errors)
def _render_profile(self, profile: dict) -> None:
self.stdout.write(self.style.MIGRATE_HEADING("\n═══ System Profile ══════════════════════════\n"))
role_str = profile["role"]
if role_str == "agent":
role_str = f"agent → hub at {profile['hub_url']}"
elif role_str == "hub":
role_str = "hub (accepting cluster payloads)"
elif role_str == "conflict":
role_str = "CONFLICT (both agent and hub)"
debug_str = "on" if profile["debug"] else "off"
eager_str = "eager" if profile["celery_eager"] else "async"
lines = [
("Role:", role_str),
("Environment:", f"{profile['environment']} (DEBUG={debug_str})"),
("Deploy:", profile["deploy_method"]),
("Database:", profile["database"]),
("Celery:", f"{profile['celery_broker']} ({eager_str})"),
("Metrics:", profile["metrics_backend"]),
("Logging:", profile["logs_dir"]),
]
if profile["instance_id"]:
lines.append(("Instance ID:", profile["instance_id"]))
for label, value in lines:
self.stdout.write(f" {label:<14} {value}")
self.stdout.write("")
def _render_pipeline_state(self, pipeline: dict) -> None:
self.stdout.write(self.style.MIGRATE_HEADING("═══ Pipeline State ══════════════════════════\n"))
if pipeline["channels"]:
ch_parts = []
for ch in pipeline["channels"]:
status = "active" if ch["is_active"] else "inactive"
ch_parts.append(f"{ch['name']} ({status})")
self.stdout.write(f" {'Channels:':<14} {', '.join(ch_parts)}")
else:
self.stdout.write(f" {'Channels:':<14} (none)")
if pipeline["intelligence"]:
int_parts = [f"{p['name']} ({p['provider']})" for p in pipeline["intelligence"]]
self.stdout.write(f" {'Intelligence:':<14} {', '.join(int_parts)}")
else:
self.stdout.write(f" {'Intelligence:':<14} (none)")
if pipeline["last_run"]:
lr = pipeline["last_run"]
self.stdout.write(f" {'Last run:':<14} {lr['timestamp']} — {lr['status']}")
else:
self.stdout.write(f" {'Last run:':<14} (none)")
self.stdout.write("")
def _render_definitions(self, definitions: list[dict]) -> None:
self.stdout.write(self.style.MIGRATE_HEADING("═══ Pipeline Definitions ════════════════════\n"))
if not definitions:
self.stdout.write(" (none)")
self.stdout.write("")
return
for defn in definitions:
status = "active" if defn["active"] else "inactive"
name_line = f" {defn['name']} ({status})"
if defn["active"]:
self.stdout.write(self.style.SUCCESS(name_line))
else:
self.stdout.write(f"\033[2m{name_line}\033[0m") # dim for inactive
self.stdout.write(f" {defn['chain']}")
self.stdout.write("")
def _render_checks(self, checks: list[CheckResult], verbose: bool) -> None:
self.stdout.write(self.style.MIGRATE_HEADING("═══ Consistency ═════════════════════════════\n"))
visible = checks if verbose else [c for c in checks if c.level != "ok"]
if not visible:
self.stdout.write(self.style.SUCCESS(" All checks passed"))
self.stdout.write("")
return
for check in visible:
if check.level == "error":
self.stdout.write(self.style.ERROR(f" ERR {check.message}"))
elif check.level == "warn":
self.stdout.write(self.style.WARNING(f" WARN {check.message}"))
elif check.level == "info":
self.stdout.write(f" \033[34mINFO\033[0m {check.message}")
else:
self.stdout.write(self.style.SUCCESS(f" OK {check.message}"))
if check.hint:
self.stdout.write(f" {check.hint}")
self.stdout.write("")
def _render_summary(self, passed: int, warnings: int, errors: int) -> None:
summary = f" {passed} passed, {warnings} warning(s), {errors} error(s)"
if errors > 0:
self.stdout.write(self.style.ERROR(summary))
elif warnings > 0:
self.stdout.write(self.style.WARNING(summary))
else:
self.stdout.write(self.style.SUCCESS(summary))
Step 4: Run test to verify it passes
Run: uv run pytest apps/checkers/_tests/status/test_command.py -v Expected: PASS (7 tests)
Step 5: Commit
git add apps/checkers/management/commands/system_status.py apps/checkers/_tests/status/test_command.py
git commit -m "feat(status): add system_status management command"
Task 9: CLI integration — add status to bin/cli.sh
Files:
- Modify:
bin/cli/system.sh— add “System status” option - Test: Manual — run
bin/cli.sh systemand verify new option appears
Step 1: Add the status option to system_menu
In bin/cli/system.sh, add “System status (config consistency)” as the first option in the options array:
system_menu() {
show_banner
echo -e "${BOLD}═══ System & Security ═══${NC}"
echo ""
local options=(
"System status (config consistency)"
"System check (full preflight)"
"Security audit"
"Set production mode"
"Back to main menu"
)
# shellcheck disable=SC2034
select opt in "${options[@]}"; do
case $REPLY in
1)
confirm_and_run "uv run python manage.py system_status"
;;
2)
confirm_and_run "$SCRIPT_DIR/check_system.sh"
;;
3)
security_menu
;;
4)
confirm_and_run "$SCRIPT_DIR/set_production.sh"
;;
5)
return
;;
*)
echo -e "${RED}Invalid option${NC}"
;;
esac
break
done
}
Step 2: Verify manually
Run: bin/cli.sh system (or just test the Django command directly)
Run: uv run python manage.py system_status Expected: Dashboard output followed by consistency checks.
Run: uv run python manage.py system_status --json Expected: Valid JSON output.
Step 3: Commit
git add bin/cli/system.sh
git commit -m "feat(cli): add system status option to system menu"
Task 10: Full test suite run and coverage verification
Step 1: Run all status tests
Run: uv run pytest apps/checkers/_tests/status/ -v Expected: All tests pass.
Step 2: Run full test suite
Run: uv run pytest Expected: No regressions.
Step 3: Check coverage
Run: uv run coverage run -m pytest && uv run coverage report --include="apps/checkers/status/*" Expected: 100% branch coverage on all status modules.
Step 4: Run linters
Run: uv run black . && uv run ruff check . --fix Expected: No issues.
Step 5: Final commit (if any formatting changes)
git add -u
git commit -m "style: format system status module"