AnalysisRun Audit Logging Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Wire AnalysisRun audit records into every provider execution path and collapse the two-method provider interface into a single analyze().
Architecture: Add BaseProvider.run() (mirrors BaseChecker.run()) to manage AnalysisRun lifecycle. Collapse analyze()+get_recommendations() into single analyze(incident=None, analysis_type=""). Update all call sites to use run().
Tech Stack: Django ORM, Python ABC, time.perf_counter() for timing
Task 1: Add BaseProvider.run() and _redact_config() with tests
Files:
- Modify:
apps/intelligence/providers/base.py— addrun(),_redact_config(), addanalysis_typetoanalyze() - Create:
apps/intelligence/_tests/providers/test_base.py— tests forrun()and_redact_config()
Step 1: Write failing tests in test_base.py
"""Tests for BaseProvider.run() audit logging."""
from unittest.mock import MagicMock, patch
import pytest
from django.test import TestCase
from apps.intelligence.models import AnalysisRun, AnalysisRunStatus
from apps.intelligence.providers.base import (
BaseProvider,
Recommendation,
RecommendationPriority,
RecommendationType,
)
class FakeProvider(BaseProvider):
"""Concrete provider for testing."""
name = "fake"
description = "Fake provider for tests"
def __init__(self, recommendations=None, error=None):
self._recommendations = recommendations or [
Recommendation(
type=RecommendationType.GENERAL,
priority=RecommendationPriority.LOW,
title="Test",
description="Test recommendation",
)
]
self._error = error
def analyze(self, incident=None, analysis_type=""):
if self._error:
raise self._error
return self._recommendations
class BaseProviderRunTests(TestCase):
"""Tests for BaseProvider.run() audit logging."""
def test_run_returns_recommendations(self):
expected = [
Recommendation(
type=RecommendationType.MEMORY,
priority=RecommendationPriority.HIGH,
title="Mem",
description="High mem",
)
]
provider = FakeProvider(recommendations=expected)
result = provider.run()
self.assertEqual(result, expected)
def test_run_creates_analysis_run_record(self):
provider = FakeProvider()
provider.run()
self.assertEqual(AnalysisRun.objects.count(), 1)
row = AnalysisRun.objects.first()
self.assertEqual(row.provider, "fake")
self.assertEqual(row.status, AnalysisRunStatus.SUCCEEDED)
self.assertEqual(row.recommendations_count, 1)
def test_run_records_succeeded_status(self):
provider = FakeProvider()
provider.run()
row = AnalysisRun.objects.first()
self.assertEqual(row.status, AnalysisRunStatus.SUCCEEDED)
self.assertIsNotNone(row.started_at)
self.assertIsNotNone(row.completed_at)
self.assertGreaterEqual(row.duration_ms, 0)
def test_run_records_failed_status_and_reraises(self):
provider = FakeProvider(error=RuntimeError("boom"))
with self.assertRaises(RuntimeError):
provider.run()
row = AnalysisRun.objects.first()
self.assertEqual(row.status, AnalysisRunStatus.FAILED)
self.assertIn("boom", row.error_message)
def test_run_accepts_trace_id(self):
provider = FakeProvider()
provider.run(trace_id="trace-abc")
row = AnalysisRun.objects.first()
self.assertEqual(row.trace_id, "trace-abc")
def test_run_accepts_pipeline_run_id(self):
provider = FakeProvider()
provider.run(pipeline_run_id="run-123")
row = AnalysisRun.objects.first()
self.assertEqual(row.pipeline_run_id, "run-123")
def test_run_default_ids_empty(self):
provider = FakeProvider()
provider.run()
row = AnalysisRun.objects.first()
self.assertEqual(row.trace_id, "")
self.assertEqual(row.pipeline_run_id, "")
def test_run_stores_incident_fk(self):
from apps.alerts.models import AlertSeverity, Incident, IncidentStatus
incident = Incident.objects.create(
title="Test",
description="Test",
status=IncidentStatus.OPEN,
severity=AlertSeverity.WARNING,
)
provider = FakeProvider()
provider.run(incident=incident)
row = AnalysisRun.objects.first()
self.assertEqual(row.incident_id, incident.id)
def test_run_incident_none(self):
provider = FakeProvider()
provider.run()
row = AnalysisRun.objects.first()
self.assertIsNone(row.incident)
def test_run_stores_recommendations_as_dicts(self):
recs = [
Recommendation(
type=RecommendationType.DISK,
priority=RecommendationPriority.MEDIUM,
title="Disk",
description="Disk issue",
actions=["Clean up"],
)
]
provider = FakeProvider(recommendations=recs)
provider.run()
row = AnalysisRun.objects.first()
self.assertEqual(len(row.recommendations), 1)
self.assertEqual(row.recommendations[0]["type"], "disk")
self.assertEqual(row.recommendations[0]["title"], "Disk")
def test_run_passes_incident_to_analyze(self):
provider = FakeProvider()
with patch.object(provider, "analyze", wraps=provider.analyze) as mock_analyze:
provider.run(incident="mock-incident")
mock_analyze.assert_called_once_with("mock-incident", "")
def test_run_passes_analysis_type_to_analyze(self):
provider = FakeProvider()
with patch.object(provider, "analyze", wraps=provider.analyze) as mock_analyze:
provider.run(analysis_type="memory")
mock_analyze.assert_called_once_with(None, "memory")
def test_run_returns_recommendations_when_db_fails(self):
provider = FakeProvider()
with patch("apps.intelligence.models.AnalysisRun.objects") as mock_objects:
mock_objects.create.side_effect = RuntimeError("DB down")
result = provider.run()
self.assertEqual(len(result), 1)
self.assertEqual(result[0].title, "Test")
def test_run_stores_redacted_config(self):
provider = FakeProvider()
provider.run(provider_config={"api_key": "secret123", "model": "gpt-4"})
row = AnalysisRun.objects.first()
self.assertEqual(row.provider_config["api_key"], "***")
self.assertEqual(row.provider_config["model"], "gpt-4")
class RedactConfigTests(TestCase):
"""Tests for _redact_config."""
def test_redacts_key_patterns(self):
config = {
"api_key": "sk-abc123",
"secret": "mysecret",
"token": "tok-xyz",
"password": "hunter2",
"model": "gpt-4",
"max_tokens_count": "1024",
}
result = BaseProvider._redact_config(config)
self.assertEqual(result["api_key"], "***")
self.assertEqual(result["secret"], "***")
self.assertEqual(result["token"], "***")
self.assertEqual(result["password"], "***")
self.assertEqual(result["model"], "gpt-4")
# "max_tokens_count" contains "token" → redacted
self.assertEqual(result["max_tokens_count"], "***")
def test_empty_config(self):
result = BaseProvider._redact_config({})
self.assertEqual(result, {})
Step 2: Run tests to verify they fail
uv run pytest apps/intelligence/_tests/providers/test_base.py -v
Expected: FAIL — BaseProvider has no run() method, FakeProvider doesn’t match new analyze() signature.
Step 3: Implement in base.py
Add import logging, import time, logger, _redact_config(), and run() to BaseProvider. Change analyze() signature to accept analysis_type. Remove get_recommendations() abstract method.
"""
Base provider interface for intelligence providers.
"""
import logging
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
logger = logging.getLogger(__name__)
SENSITIVE_PATTERNS = {"key", "secret", "token", "password", "api"}
# ... CheckStatus, Recommendation unchanged ...
class BaseProvider(ABC):
"""
Abstract base class for intelligence providers.
Intelligence providers analyze system state and incidents to generate
actionable recommendations.
"""
name: str = "base"
description: str = "Base intelligence provider"
@abstractmethod
def analyze(
self, incident: Any | None = None, analysis_type: str = ""
) -> list[Recommendation]:
"""
Analyze system state and/or incident to generate recommendations.
Args:
incident: Optional incident object to analyze.
analysis_type: Optional type hint to narrow analysis (e.g. "memory", "disk").
Returns:
List of recommendations.
"""
...
def run(
self,
*,
incident: Any | None = None,
analysis_type: str = "",
trace_id: str = "",
pipeline_run_id: str = "",
provider_config: dict | None = None,
) -> list[Recommendation]:
"""
Run analysis, manage AnalysisRun lifecycle, return recommendations.
Creates an AnalysisRun record tracking the execution. DB failures
are caught and logged — they never break analysis.
"""
analysis_run = self._create_analysis_run(
trace_id=trace_id,
pipeline_run_id=pipeline_run_id,
incident=incident,
provider_config=provider_config,
)
if analysis_run:
analysis_run.mark_started()
try:
recommendations = self.analyze(incident, analysis_type)
except Exception as exc:
if analysis_run:
try:
analysis_run.mark_failed(str(exc))
except Exception:
logger.warning(
"Failed to mark AnalysisRun as failed for '%s'",
self.name,
exc_info=True,
)
raise
if analysis_run:
try:
analysis_run.mark_succeeded(
recommendations=[r.to_dict() for r in recommendations],
)
except Exception:
logger.warning(
"Failed to mark AnalysisRun as succeeded for '%s'",
self.name,
exc_info=True,
)
return recommendations
def _create_analysis_run(
self,
trace_id: str,
pipeline_run_id: str,
incident: Any | None,
provider_config: dict | None,
):
"""Create an AnalysisRun record. Returns None if DB fails."""
try:
from apps.intelligence.models import AnalysisRun
incident_obj = incident if incident and hasattr(incident, "pk") else None
return AnalysisRun.objects.create(
trace_id=trace_id,
pipeline_run_id=pipeline_run_id,
provider=self.name,
provider_config=self._redact_config(provider_config or {}),
incident=incident_obj,
)
except Exception:
logger.warning(
"Failed to create AnalysisRun for '%s'", self.name, exc_info=True
)
return None
@staticmethod
def _redact_config(config: dict) -> dict:
"""Mask sensitive keys in provider config."""
return {
k: "***" if any(p in k.lower() for p in SENSITIVE_PATTERNS) else v
for k, v in config.items()
}
Step 4: Run tests to verify they pass
uv run pytest apps/intelligence/_tests/providers/test_base.py -v
Expected: ALL PASS
Step 5: Commit
git add apps/intelligence/providers/base.py apps/intelligence/_tests/providers/test_base.py
git commit -m "feat: add BaseProvider.run() with AnalysisRun audit logging"
Task 2: Refactor LocalRecommendationProvider
Files:
- Modify:
apps/intelligence/providers/local.py— mergeget_recommendations()intoanalyze(), addanalysis_type - Modify:
apps/intelligence/_tests/providers/test_local.py— update tests
Step 1: Update existing tests
In test_local.py, update tests that call get_recommendations() or test it:
test_get_recommendations_low_memory(line 235): changeprovider.get_recommendations()→provider.analyze()TestIntegration.test_analyze_with_incident(line 261): addanalysis_type=""is implicit, no change needed
Also update any mocks that stub get_recommendations to stub analyze instead.
# In test_get_recommendations_low_memory (around line 235-254):
# Change:
# recommendations = provider.get_recommendations()
# To:
# recommendations = provider.analyze()
Step 2: Run tests to verify they fail
uv run pytest apps/intelligence/_tests/providers/test_local.py -v
Expected: FAIL — get_recommendations() no longer exists.
Step 3: Implement in local.py
Update analyze() signature and merge get_recommendations() body:
def analyze(self, incident: Any | None = None, analysis_type: str = "") -> list[Recommendation]:
"""
Analyze an incident and generate targeted recommendations.
Args:
incident: An Incident object from apps.alerts.models.
analysis_type: Optional type hint ("memory", "disk") to narrow analysis.
Returns:
List of recommendations relevant to the incident.
"""
# Targeted analysis by type (no incident required)
if analysis_type == "memory":
return self._get_memory_recommendations()
elif analysis_type == "disk":
return self._get_disk_recommendations()
if incident is None:
# General system scan (was get_recommendations)
return self._general_recommendations()
# Check incident type based on title/description/alerts
incident_type = self._detect_incident_type(incident)
if incident_type == "memory":
return self._analyze_memory_incident(incident)
elif incident_type == "disk":
return self._analyze_disk_incident(incident)
elif incident_type == "cpu":
return self._analyze_cpu_incident(incident)
else:
return self._general_recommendations()
def _general_recommendations(self) -> list[Recommendation]:
"""Get general recommendations based on current system state."""
recommendations = []
mem = psutil.virtual_memory()
if mem.percent > 70:
recommendations.extend(self._get_memory_recommendations())
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
if usage.percent > 70:
recommendations.extend(self._get_disk_recommendations(partition.mountpoint))
break
except (PermissionError, OSError):
continue
return recommendations
Delete get_recommendations() method entirely.
Update get_local_recommendations() convenience function at bottom of file:
def get_local_recommendations(incident=None) -> list[Recommendation]:
provider = LocalRecommendationProvider()
return provider.analyze(incident)
Step 4: Run tests to verify they pass
uv run pytest apps/intelligence/_tests/providers/test_local.py -v
Expected: ALL PASS
Step 5: Commit
git add apps/intelligence/providers/local.py apps/intelligence/_tests/providers/test_local.py
git commit -m "refactor: collapse local provider get_recommendations into analyze"
Task 3: Refactor OpenAIRecommendationProvider
Files:
- Modify:
apps/intelligence/providers/openai.py— removeget_recommendations(), addanalysis_type - Modify:
apps/intelligence/_tests/providers/test_openai.py— update tests
Step 1: Update existing tests
In test_openai.py:
test_analyze_without_incident(line 347): keep —provider.analyze(None)should still return[]test_get_recommendations_returns_empty(line 355): change to testprovider.analyze()returns[]
# Change test_get_recommendations_returns_empty:
def test_analyze_without_incident_returns_empty(self):
"""Test analyze returns empty list without incident."""
provider = OpenAIRecommendationProvider(api_key="test-key")
recommendations = provider.analyze(None)
assert recommendations == []
Step 2: Run tests to verify they fail
uv run pytest apps/intelligence/_tests/providers/test_openai.py -v
Expected: FAIL — get_recommendations() no longer exists.
Step 3: Implement in openai.py
def analyze(self, incident: Any | None = None, analysis_type: str = "") -> list[Recommendation]:
"""
Analyze an incident using OpenAI and generate recommendations.
Args:
incident: An Incident object from apps.alerts.models.
analysis_type: Ignored — OpenAI provider requires incident context.
Returns:
List of AI-generated recommendations.
"""
if incident is None:
return []
prompt = self._build_prompt(incident)
try:
response = self._call_openai(prompt)
return self._parse_response(response, incident_id=incident.id)
except Exception as e:
logger.error(f"OpenAI API error: {e}")
return self._get_fallback_recommendation(incident, str(e))
Delete get_recommendations() method.
Step 4: Run tests to verify they pass
uv run pytest apps/intelligence/_tests/providers/test_openai.py -v
Expected: ALL PASS
Step 5: Commit
git add apps/intelligence/providers/openai.py apps/intelligence/_tests/providers/test_openai.py
git commit -m "refactor: collapse openai provider get_recommendations into analyze"
Task 4: Update AnalyzeExecutor call site
Files:
- Modify:
apps/orchestration/executors.py:159-230— replaceprovider.analyze()/get_recommendations()routing withprovider.run()
Step 1: Update the executor
In AnalyzeExecutor.execute(), replace lines 174-182:
# Before:
recommendations = []
if incident_id:
from apps.alerts.models import Incident
incident = Incident.objects.filter(id=incident_id).first()
if incident:
recommendations = provider.analyze(incident)
else:
recommendations = provider.get_recommendations()
# After:
incident = None
if incident_id:
from apps.alerts.models import Incident
incident = Incident.objects.filter(id=incident_id).first()
recommendations = provider.run(
incident=incident,
trace_id=ctx.trace_id,
pipeline_run_id=ctx.run_id,
provider_config=provider_config,
)
Step 2: Run all tests
uv run pytest -v
Expected: ALL PASS. The executor tests mock the provider, so the mock needs .run.return_value instead of .analyze.return_value / .get_recommendations.return_value. Check apps/orchestration/_tests/ for any mocks that need updating.
Step 3: Fix any broken executor tests
Search for mocks of provider.analyze or provider.get_recommendations in orchestration tests and update to mock provider.run.
Step 4: Commit
git add apps/orchestration/executors.py apps/orchestration/_tests/
git commit -m "refactor: update AnalyzeExecutor to use provider.run()"
Task 5: Update management command call site
Files:
- Modify:
apps/intelligence/management/commands/get_recommendations.py:143-172— route all modes throughprovider.run() - Modify:
apps/intelligence/_tests/commands/test_get_recommendations.py— update mocks
Step 1: Update the command
Replace the if/elif chain (lines 143-172):
# Before:
recommendations = []
if options["incident_id"]:
...
recommendations = provider.analyze(incident)
elif options["memory"]:
recommendations = provider._get_memory_recommendations()
elif options["disk"]:
recommendations = provider._get_disk_recommendations(options["path"])
elif options["all"]:
recommendations.extend(provider._get_memory_recommendations())
recommendations.extend(provider._get_disk_recommendations(options["path"]))
else:
recommendations = provider.get_recommendations()
# After:
recommendations = []
if options["incident_id"]:
from apps.alerts.models import Incident
try:
incident = Incident.objects.get(id=options["incident_id"])
self.stdout.write(self.style.SUCCESS(f"Analyzing incident: {incident.title}"))
recommendations = provider.run(incident=incident)
except Incident.DoesNotExist:
self.stderr.write(self.style.ERROR(f"Incident {options['incident_id']} not found"))
return
elif options["memory"]:
recommendations = provider.run(analysis_type="memory")
elif options["disk"]:
recommendations = provider.run(analysis_type="disk")
elif options["all"]:
recommendations.extend(provider.run(analysis_type="memory"))
recommendations.extend(provider.run(analysis_type="disk"))
else:
recommendations = provider.run()
Step 2: Update tests
In test_get_recommendations.py, update mocks:
test_default_provider_is_local(line 38):mock_provider.get_recommendations.return_value = []→mock_provider.run.return_value = []test_custom_provider(line 53): sametest_provider_configuration_options(line 67): sametest_analyze_incident_by_id(line 115):mock_provider.analyze.return_value→mock_provider.run.return_value, assertmock_provider.run.assert_called_once()test_memory_recommendations(line 159):mock_provider._get_memory_recommendations.return_value→mock_provider.run.return_value, assertmock_provider.run.assert_called_once()test_disk_recommendations(line 178): similartest_disk_recommendations_with_path(line 197): similartest_all_recommendations(line 215):mock_provider.runcalled twicetest_default_calls_get_recommendations(line 228):mock_provider.get_recommendations.return_value→mock_provider.run.return_value- All
TestOutputFormatsandTestPrintRecommendationstests: change.get_recommendations.return_value→.run.return_value
Step 3: Run tests
uv run pytest apps/intelligence/_tests/commands/test_get_recommendations.py -v
Expected: ALL PASS
Step 4: Commit
git add apps/intelligence/management/commands/get_recommendations.py apps/intelligence/_tests/commands/test_get_recommendations.py
git commit -m "refactor: route management command through provider.run()"
Task 6: Update view call sites
Files:
- Modify:
apps/intelligence/views/recommendations.py:40,47,82,88— useprovider.run() - Modify:
apps/intelligence/views/memory.py:26— useprovider.run(analysis_type="memory") - Modify:
apps/intelligence/views/disk.py:35— useprovider.run(analysis_type="disk")
Step 1: Update recommendations.py
# GET handler — replace lines 39-47:
# Before:
provider = get_provider(provider_name)
recommendations = provider.analyze(incident)
# ...
provider = get_provider(provider_name)
recommendations = provider.get_recommendations()
# After:
provider = get_provider(provider_name)
recommendations = provider.run(incident=incident)
# ...
provider = get_provider(provider_name)
recommendations = provider.run()
# POST handler — replace lines 75-88:
# Before:
provider = get_provider(provider_name, **provider_config)
# ...
recommendations = provider.analyze(incident)
# ...
recommendations = provider.get_recommendations()
# After:
provider = get_provider(provider_name, **provider_config)
# ...
recommendations = provider.run(incident=incident, provider_config=provider_config)
# ...
recommendations = provider.run(provider_config=provider_config)
Step 2: Update memory.py
# Line 26 — replace:
recommendations = provider._get_memory_recommendations()
# With:
recommendations = provider.run(analysis_type="memory")
Step 3: Update disk.py
# Line 35 — replace:
recommendations = provider._get_disk_recommendations(path)
# With:
recommendations = provider.run(analysis_type="disk")
Note: The path parameter is already passed to the provider constructor via get_provider("local", ...) kwargs — scan_paths is not the same as the path arg to _get_disk_recommendations. We need to verify this works. The local provider’s _get_disk_recommendations() defaults to "/" — the view passes path directly. With analysis_type="disk", analyze() will call _get_disk_recommendations() with no path arg (defaults to "/"). But the view’s path query param should still be respected. Solution: _get_disk_recommendations already defaults to "/", and the disk view already passes path-related config as provider kwargs. If the path needs to be passed to analyze(), we can extend analysis_type to carry it or accept it as a kwarg. For now, the default "/" matches the existing default.
Step 4: Update view tests if any exist
Check apps/intelligence/_tests/views/ for tests that mock provider methods and update accordingly.
Step 5: Run tests
uv run pytest apps/intelligence/_tests/views/ -v
Expected: ALL PASS
Step 6: Commit
git add apps/intelligence/views/recommendations.py apps/intelligence/views/memory.py apps/intelligence/views/disk.py apps/intelligence/_tests/views/
git commit -m "refactor: route all views through provider.run()"
Task 7: Clean up provider __init__.py
Files:
- Modify:
apps/intelligence/providers/__init__.py— removeget_local_recommendationsfrom exports if it changed
Step 1: Verify __init__.py imports are correct
The get_local_recommendations function in local.py was updated to call provider.analyze() instead of provider.analyze(incident) (no change needed since the old code already did provider.analyze(incident)). Verify no broken imports.
Step 2: Run full test suite
uv run pytest -v
Step 3: Commit if any changes
git add apps/intelligence/providers/__init__.py
git commit -m "chore: clean up provider __init__.py exports"
Task 8: Full verification
Step 1: Run all tests
uv run pytest -v
Expected: ALL PASS
Step 2: Django system checks
uv run python manage.py check
Expected: No issues
Step 3: Lint and format
uv run black .
uv run ruff check . --fix
Expected: Clean
Step 4: Verify AnalysisRun creation manually
uv run python manage.py get_recommendations --memory
# Check /admin/intelligence/analysisrun/ for new row
Step 5: Final commit if needed
Files Summary
| File | Change |
|---|---|
apps/intelligence/providers/base.py | Remove get_recommendations() abstract, add analysis_type to analyze(), add run(), _create_analysis_run(), _redact_config() |
apps/intelligence/providers/local.py | Merge get_recommendations() into analyze(), add analysis_type, extract _general_recommendations() |
apps/intelligence/providers/openai.py | Remove get_recommendations(), add analysis_type to analyze() |
apps/orchestration/executors.py | Replace analyze()/get_recommendations() routing → provider.run() |
apps/intelligence/management/commands/get_recommendations.py | Route all modes through provider.run() |
apps/intelligence/views/recommendations.py | Use provider.run() |
apps/intelligence/views/memory.py | Use provider.run(analysis_type="memory") |
apps/intelligence/views/disk.py | Use provider.run(analysis_type="disk") |
apps/intelligence/_tests/providers/test_base.py | NEW — FakeProvider + ~15 tests for run() and _redact_config() |
apps/intelligence/_tests/providers/test_local.py | Update get_recommendations() → analyze() |
apps/intelligence/_tests/providers/test_openai.py | Remove get_recommendations test, update mocks |
apps/intelligence/_tests/commands/test_get_recommendations.py | Update all mocks to use provider.run() |
apps/intelligence/_tests/views/ | Update mocks if needed |
apps/intelligence/providers/__init__.py | Clean up exports |