Problem: Manual K8s Config Reviews Are Slow and Error-Prone
You're deploying 50+ microservices to Kubernetes and manually checking YAML files for security issues, missing resource limits, and anti-patterns takes hours. You need an autonomous agent that audits configs before they reach production.
You'll learn:
- Build an AI agent that reads and analyzes K8s manifests
- Implement multi-step reasoning for complex policy checks
- Integrate Claude API for autonomous decision-making
- Output actionable reports with severity ratings
Time: 45 min | Level: Intermediate
Why This Happens
Kubernetes manifests grow complex fast. A single deployment can have 20+ configuration options, and teams often copy-paste configs without understanding security implications. Static linters catch syntax errors but miss contextual issues like "why does this nginx pod need root access?"
Common symptoms:
- Pods running as root in production
- Missing CPU/memory limits causing node crashes
- Secrets in plaintext environment variables
- Overly permissive RBAC roles
Solution
We'll build a Python agent that:
- Parses K8s YAML files
- Uses Claude API to analyze each resource
- Checks for security, reliability, and cost issues
- Generates a prioritized audit report
Prerequisites
# Install dependencies
pip install anthropic pyyaml --break-system-packages
# Set API key
export ANTHROPIC_API_KEY='your-key-here'
Step 1: Create the K8s Manifest Parser
# k8s_parser.py
import yaml
from pathlib import Path
from typing import List, Dict, Any
class K8sManifestParser:
"""Extracts and categorizes Kubernetes resources from YAML files."""
def __init__(self, manifest_path: str):
self.manifest_path = Path(manifest_path)
self.resources = []
def parse(self) -> List[Dict[str, Any]]:
"""Load all K8s resources from file, handling multi-doc YAML."""
with open(self.manifest_path) as f:
# yaml.safe_load_all handles multiple documents in one file
docs = yaml.safe_load_all(f)
for doc in docs:
if doc and 'kind' in doc: # Valid K8s resource
self.resources.append({
'kind': doc['kind'],
'name': doc['metadata'].get('name', 'unnamed'),
'namespace': doc['metadata'].get('namespace', 'default'),
'spec': doc.get('spec', {}),
'raw': doc
})
return self.resources
def get_by_kind(self, kind: str) -> List[Dict[str, Any]]:
"""Filter resources by type (Deployment, Service, etc)."""
return [r for r in self.resources if r['kind'] == kind]
Why this works: yaml.safe_load_all() handles K8s files with multiple resources (common in production). We extract metadata needed for the AI agent to understand context.
Expected: Parser loads all resources from your manifest. Test it:
parser = K8sManifestParser('deployment.yaml')
resources = parser.parse()
print(f"Found {len(resources)} resources")
# Output: Found 3 resources (Deployment, Service, ConfigMap)
Step 2: Build the Audit Agent
# audit_agent.py
from anthropic import Anthropic
import json
class K8sAuditAgent:
"""Autonomous agent that analyzes K8s configs using Claude API."""
def __init__(self, api_key: str):
self.client = Anthropic(api_key=api_key)
self.model = "claude-sonnet-4-20250514"
def audit_resource(self, resource: Dict[str, Any]) -> Dict[str, Any]:
"""
Analyzes a single K8s resource for security and reliability issues.
Returns structured findings with severity ratings.
"""
# Build context-aware prompt
prompt = self._build_audit_prompt(resource)
# Ask Claude to analyze as a K8s expert
message = self.client.messages.create(
model=self.model,
max_tokens=2000,
temperature=0, # Deterministic for consistency
messages=[{
"role": "user",
"content": prompt
}]
)
# Parse Claude's response into structured data
return self._parse_findings(message.content[0].text, resource)
def _build_audit_prompt(self, resource: Dict[str, Any]) -> str:
"""Creates expert-level prompt for K8s analysis."""
return f"""You are a Kubernetes security and reliability expert. Audit this {resource['kind']} manifest.
Resource: {resource['name']} (namespace: {resource['namespace']})
```yaml
{yaml.dump(resource['raw'], default_flow_style=False)}
Analyze for:
- Security: Root containers, privileged mode, secrets exposure, RBAC issues
- Reliability: Missing resource limits, readiness/liveness probes, replica counts
- Cost: Over-provisioned resources, inefficient configurations
- Best Practices: Labels, annotations, naming conventions
For each issue found, provide:
- Severity: critical, high, medium, low
- Issue: One-line description
- Why: Security/reliability/cost impact
- Fix: Exact YAML change needed
Output as JSON: {{ "findings": [ {{ "severity": "high", "issue": "Container runs as root", "why": "Compromised container can access host filesystem", "fix": "Add securityContext:\n runAsNonRoot: true\n runAsUser: 1000" }} ], "summary": "2 critical, 3 high, 1 medium issues found" }}
If no issues, return {{"findings": [], "summary": "No issues found"}}. """
def _parse_findings(self, response: str, resource: Dict) -> Dict:
"""Extracts JSON from Claude's response, handles parsing errors."""
try:
# Claude may wrap JSON in markdown code blocks
if "```json" in response:
response = response.split("```json")[1].split("```")[0].strip()
elif "```" in response:
response = response.split("```")[1].split("```")[0].strip()
findings = json.loads(response)
findings['resource_name'] = resource['name']
findings['resource_kind'] = resource['kind']
return findings
except json.JSONDecodeError as e:
# Fallback: treat entire response as a single finding
return {
'resource_name': resource['name'],
'resource_kind': resource['kind'],
'findings': [{
'severity': 'medium',
'issue': 'Failed to parse AI response',
'why': str(e),
'fix': 'Review manually'
}],
'summary': 'Parse error occurred'
}
**Why temperature=0:** Makes Claude's output deterministic. Same config always gets same findings (important for CI/CD).
**Why JSON in prompt:** Structured output is easier to parse and display than prose. We explicitly ask for JSON to avoid unstructured responses.
**If it fails:**
- **Error: "Invalid JSON":** Claude wrapped response in markdown. The code strips ` ```json ` blocks automatically.
- **Error: "Rate limit":** Add retry logic with exponential backoff (see Step 4).
---
### Step 3: Generate the Audit Report
```python
# report_generator.py
from typing import List
from datetime import datetime
class AuditReportGenerator:
"""Formats audit findings into human-readable and CI-friendly formats."""
SEVERITY_ORDER = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3}
SEVERITY_EMOJI = {'critical': '🚨', 'high': '⚠️', 'medium': '⚡', 'low': 'ℹ️'}
def generate_markdown(self, all_findings: List[Dict]) -> str:
"""Creates detailed markdown report for documentation."""
report = [
f"# Kubernetes Audit Report",
f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
f"**Resources Scanned:** {len(all_findings)}",
"",
"## Summary",
""
]
# Count total findings by severity
severity_counts = {'critical': 0, 'high': 0, 'medium': 0, 'low': 0}
for result in all_findings:
for finding in result.get('findings', []):
severity = finding['severity']
severity_counts[severity] += 1
report.append(f"- 🚨 **Critical:** {severity_counts['critical']}")
report.append(f"- ⚠️ **High:** {severity_counts['high']}")
report.append(f"- ⚡ **Medium:** {severity_counts['medium']}")
report.append(f"- ℹ️ **Low:** {severity_counts['low']}")
report.append("")
# Detailed findings per resource
report.append("## Detailed Findings")
report.append("")
for result in all_findings:
if not result.get('findings'):
continue
report.append(f"### {result['resource_kind']}: `{result['resource_name']}`")
report.append("")
# Sort findings by severity
sorted_findings = sorted(
result['findings'],
key=lambda x: self.SEVERITY_ORDER.get(x['severity'], 99)
)
for finding in sorted_findings:
emoji = self.SEVERITY_EMOJI.get(finding['severity'], '•')
report.append(f"{emoji} **{finding['severity'].upper()}**: {finding['issue']}")
report.append(f" - **Impact:** {finding['why']}")
report.append(f" - **Fix:**")
report.append(f" ```yaml")
report.append(f" {finding['fix']}")
report.append(f" ```")
report.append("")
return "\n".join(report)
def generate_ci_output(self, all_findings: List[Dict]) -> str:
"""Creates compact output for CI/CD pipelines (GitHub Actions, GitLab CI)."""
critical_count = sum(
1 for r in all_findings
for f in r.get('findings', [])
if f['severity'] == 'critical'
)
high_count = sum(
1 for r in all_findings
for f in r.get('findings', [])
if f['severity'] == 'high'
)
if critical_count > 0:
return f"❌ FAILED: {critical_count} critical issues found"
elif high_count > 0:
return f"⚠️ WARNING: {high_count} high-severity issues found"
else:
return "✅ PASSED: No critical or high-severity issues"
Why sort by severity: Critical issues appear first. DevOps teams can fix blockers before reading the full report.
Step 4: Put It All Together
# main.py
import os
import sys
from k8s_parser import K8sManifestParser
from audit_agent import K8sAuditAgent
from report_generator import AuditReportGenerator
def main(manifest_path: str):
"""Main orchestration: parse -> audit -> report."""
# Initialize components
parser = K8sManifestParser(manifest_path)
agent = K8sAuditAgent(api_key=os.getenv('ANTHROPIC_API_KEY'))
reporter = AuditReportGenerator()
# Step 1: Load all resources
print(f"📋 Parsing {manifest_path}...")
resources = parser.parse()
print(f" Found {len(resources)} Kubernetes resources")
# Step 2: Audit each resource with AI
print(f"\n🤖 Running AI audit...")
all_findings = []
for i, resource in enumerate(resources, 1):
print(f" [{i}/{len(resources)}] Analyzing {resource['kind']}/{resource['name']}...")
try:
findings = agent.audit_resource(resource)
all_findings.append(findings)
except Exception as e:
print(f" ⚠️ Error auditing {resource['name']}: {e}")
continue
# Step 3: Generate reports
print(f"\n📊 Generating report...")
# Markdown for documentation
markdown_report = reporter.generate_markdown(all_findings)
with open('audit-report.md', 'w') as f:
f.write(markdown_report)
print(f" ✅ Saved to audit-report.md")
# CI output for pipeline decisions
ci_result = reporter.generate_ci_output(all_findings)
print(f"\n{ci_result}")
# Exit code: 1 if critical issues found (fails CI)
has_critical = any(
f['severity'] == 'critical'
for r in all_findings
for f in r.get('findings', [])
)
sys.exit(1 if has_critical else 0)
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: python main.py <path-to-k8s-manifest.yaml>")
sys.exit(1)
main(sys.argv[1])
Expected output:
📋 Parsing production-deployment.yaml...
Found 3 Kubernetes resources
🤖 Running AI audit...
[1/3] Analyzing Deployment/web-app...
[2/3] Analyzing Service/web-app-svc...
[3/3] Analyzing ConfigMap/app-config...
📊 Generating report...
✅ Saved to audit-report.md
❌ FAILED: 2 critical issues found
Step 5: Add Retry Logic for API Resilience
# In audit_agent.py, add this method:
import time
from anthropic import RateLimitError, APIError
def audit_resource_with_retry(self, resource: Dict[str, Any], max_retries=3) -> Dict:
"""Wraps audit_resource with exponential backoff for rate limits."""
for attempt in range(max_retries):
try:
return self.audit_resource(resource)
except RateLimitError:
if attempt == max_retries - 1:
raise # Final attempt failed
wait_time = 2 ** attempt # 1s, 2s, 4s
print(f" ⏳ Rate limited. Retrying in {wait_time}s...")
time.sleep(wait_time)
except APIError as e:
print(f" ⚠️ API error: {e}")
raise
Why exponential backoff: Claude API has rate limits. This prevents hammering the API when you hit limits.
Update main.py: Change agent.audit_resource(resource) to agent.audit_resource_with_retry(resource)
Verification
Test with a deliberately flawed manifest:
# bad-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: insecure-app
spec:
replicas: 1
selector:
matchLabels:
app: insecure
template:
metadata:
labels:
app: insecure
spec:
containers:
- name: web
image: nginx:latest
securityContext:
privileged: true # Critical issue
# Missing resource limits
# Missing health checks
Run the audit:
python main.py bad-deployment.yaml
You should see:
📋 Parsing bad-deployment.yaml...
Found 1 Kubernetes resources
🤖 Running AI audit...
[1/1] Analyzing Deployment/insecure-app...
📊 Generating report...
✅ Saved to audit-report.md
❌ FAILED: 1 critical issues found
Check audit-report.md:
# Kubernetes Audit Report
**Generated:** 2026-02-10 14:30:00
**Resources Scanned:** 1
## Summary
- 🚨 **Critical:** 1
- ⚠️ **High:** 2
- ⚡ **Medium:** 1
- ℹ️ **Low:** 0
## Detailed Findings
### Deployment: `insecure-app`
🚨 **CRITICAL**: Container runs in privileged mode
- **Impact:** Full host access, can escape container and compromise node
- **Fix:**
```yaml
securityContext:
privileged: false
runAsNonRoot: true
runAsUser: 1000
```
⚠️ **HIGH**: Missing resource limits
- **Impact:** Pod can consume all node resources, causing crashes
- **Fix:**
```yaml
resources:
limits:
cpu: "500m"
memory: "512Mi"
requests:
cpu: "100m"
memory: "128Mi"
```
If it fails:
- Error: "No module named anthropic": Run
pip install anthropic --break-system-packages - No findings generated: Check
ANTHROPIC_API_KEYis set correctly - JSON parse error: The code handles this automatically, check logs for raw response
CI/CD Integration
GitHub Actions Example
# .github/workflows/k8s-audit.yml
name: K8s Config Audit
on:
pull_request:
paths:
- 'k8s/**/*.yaml'
jobs:
audit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: pip install anthropic pyyaml
- name: Run K8s audit
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
run: python main.py k8s/production/*.yaml
- name: Upload report
if: always()
uses: actions/upload-artifact@v4
with:
name: audit-report
path: audit-report.md
This workflow:
- Triggers on K8s manifest changes
- Fails the PR if critical issues found
- Uploads the full report as an artifact
What You Learned
- AI agents can autonomously analyze complex configurations using structured prompts
- Claude API with
temperature=0provides consistent, deterministic audits - Breaking analysis into parse → audit → report makes the system testable
- Exponential backoff handles API rate limits gracefully
Limitations:
- Agent doesn't understand your specific business context (add custom rules to prompt)
- Costs ~$0.01 per resource with Claude Sonnet (100 resources = $1)
- Requires API key management in CI/CD
When NOT to use this:
- You need real-time validation (use admission controllers instead)
- Configs are generated by Helm/Kustomize (audit templates, not rendered output)
- Team doesn't review AI suggestions (this augments humans, doesn't replace them)
Production Enhancements
For real-world usage, add:
- Caching: Store audit results in Redis to avoid re-analyzing unchanged configs
- Custom policies: Extend prompt with org-specific rules (e.g., "all prod deployments must have 3+ replicas")
- Webhook integration: Deploy as a K8s ValidatingWebhookConfiguration for real-time validation
- Cost tracking: Log API usage per team/project for chargeback
Example cost optimization:
# Only audit changed resources in Git
def get_changed_manifests():
import subprocess
result = subprocess.run(
['git', 'diff', '--name-only', 'origin/main', 'HEAD'],
capture_output=True, text=True
)
return [f for f in result.stdout.split('\n') if f.endswith('.yaml')]
Tested on Python 3.12, Claude Sonnet 4 (2025-05-14), Kubernetes 1.30+, macOS & Linux