Files

331 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Enhanced pipeline runner with context support for split architecture
Works with AI processor running on separate VM (192.168.100.86)
"""
import argparse
import json
import subprocess
import sys
import time
from pathlib import Path
from datetime import datetime
import uuid
def load_feedback_history():
"""Load PR feedback history to understand what's already configured"""
feedback_path = Path('/shared/ai-gitops/feedback/pr_feedback_history.json')
if feedback_path.exists():
with open(feedback_path, 'r') as f:
return json.load(f)
return []
def load_existing_config():
"""Load current SRX config to identify already-configured features"""
config_path = Path('/shared/ai-gitops/configs/current_srx_config.conf')
if config_path.exists():
with open(config_path, 'r') as f:
return f.read()
return ""
def build_ai_context(args):
"""Build comprehensive context for AI based on arguments and history"""
context = {
"timestamp": datetime.now().isoformat(),
"focus_area": args.context,
"skip_basic": True, # Always skip basic connectivity suggestions
"existing_features": [],
"priority_features": [],
"constraints": []
}
# Load existing configuration to prevent redundant suggestions
current_config = load_existing_config()
# Identify already-configured features
if "security-zone" in current_config:
context["existing_features"].append("zones_configured")
if "port-forwarding" in current_config:
context["existing_features"].append("gaming_optimizations")
if "wireguard" in current_config.lower():
context["existing_features"].append("vpn_configured")
# Set priorities based on context argument
if args.context == "performance":
context["priority_features"] = [
"qos_policies",
"traffic_shaping",
"bandwidth_management",
"flow_optimization"
]
context["constraints"].append("Focus on QoS and traffic optimization")
elif args.context == "security":
context["priority_features"] = [
"rate_limiting",
"ddos_protection",
"ids_ips_rules",
"geo_blocking",
"threat_feeds"
]
context["constraints"].append("Focus on advanced security features")
elif args.context == "monitoring":
context["priority_features"] = [
"syslog_enhancements",
"snmp_traps",
"flow_analytics",
"performance_metrics"
]
context["constraints"].append("Focus on visibility and monitoring")
elif args.context == "automation":
context["priority_features"] = [
"event_scripts",
"automated_responses",
"dynamic_policies",
"api_integrations"
]
context["constraints"].append("Focus on automation capabilities")
# Add learned constraints from feedback history
feedback = load_feedback_history()
if feedback:
# Extract patterns AI should avoid
rejected_patterns = []
for entry in feedback:
if entry.get("status") == "rejected" or entry.get("feedback_type") == "rejected":
rejected_patterns.append(entry.get("reason", ""))
if rejected_patterns:
context["constraints"].append("Avoid patterns that were previously rejected")
context["rejected_patterns"] = rejected_patterns[-5:] # Last 5 rejections
# Add instruction to avoid redundant suggestions
context["instructions"] = [
"DO NOT suggest basic connectivity policies - all zones are properly configured",
"DO NOT suggest any/any/any rules - this has been rejected multiple times",
"FOCUS on advanced features that enhance the existing configuration",
"CHECK if feature already exists before suggesting",
f"PRIORITY: {args.context} optimizations and enhancements"
]
return context
def run_collection():
"""Run the config collection script"""
print("📊 Collecting current SRX configuration...")
result = subprocess.run(
["python3", "/home/netops/orchestrator/collect_srx_config.py"],
capture_output=True,
text=True
)
if result.returncode != 0:
print(f"❌ Collection failed: {result.stderr}")
return False
print("✅ Configuration collected successfully")
return True
def create_ai_request(context):
"""Create an AI analysis request in the shared directory"""
print(f"🤖 Creating AI analysis request with context: {context['focus_area']}...")
# Generate unique request ID
request_id = f"pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
# Create request data
request_data = {
"request_id": request_id,
"timestamp": datetime.now().isoformat(),
"type": "analyze_config",
"context": context,
"data": {
"config_file": "/shared/ai-gitops/configs/srx_config_latest.txt",
"analysis_file": "/shared/ai-gitops/configs/srx_config_analysis_latest.json",
"top_talkers": {"buckets": []}, # Empty for context-based analysis
"vlans": {"buckets": []},
"protocols": {"buckets": []}
}
}
# Save context to shared location
context_path = Path('/shared/ai-gitops/context/current_context.json')
context_path.parent.mkdir(parents=True, exist_ok=True)
with open(context_path, 'w') as f:
json.dump(context, f, indent=2)
# Save request to trigger AI processor
request_path = Path('/shared/ai-gitops/requests') / f"{request_id}.json"
request_path.parent.mkdir(parents=True, exist_ok=True)
with open(request_path, 'w') as f:
json.dump(request_data, f, indent=2)
print(f"✅ Request created: {request_id}")
return request_id
def wait_for_ai_response(request_id, timeout=60):
"""Wait for AI processor to complete analysis"""
print(f"⏳ Waiting for AI processor response (timeout: {timeout}s)...")
response_path = Path('/shared/ai-gitops/responses') / f"{request_id}_response.json"
for i in range(timeout):
if response_path.exists():
print("✅ AI analysis completed")
# Read and display key info
with open(response_path, 'r') as f:
response = json.load(f)
if 'focus_area' in response:
print(f" Focus area: {response['focus_area']}")
if 'feedback_aware' in response:
print(f" Feedback aware: {response['feedback_aware']}")
return True
# Show progress every 5 seconds
if i % 5 == 0 and i > 0:
print(f" Still waiting... ({i}/{timeout}s)")
time.sleep(1)
print(f"❌ Timeout waiting for AI response after {timeout} seconds")
print(" Check AI processor logs: ssh netops@192.168.100.86 'sudo tail /var/log/ai-processor/ai-processor.log'")
return False
def create_pr():
"""Create pull request in Gitea"""
print("📝 Creating pull request...")
# Check if create_ai_pr.py exists
create_pr_script = Path('/home/netops/orchestrator/create_ai_pr.py')
if not create_pr_script.exists():
print("❌ create_ai_pr.py not found - using placeholder")
print(" To create PRs, ensure create_ai_pr.py is available")
return False
result = subprocess.run(
["python3", str(create_pr_script)],
capture_output=True,
text=True
)
if result.returncode != 0:
print(f"❌ PR creation failed: {result.stderr}")
return False
print("✅ Pull request created")
return True
def check_ai_processor_status():
"""Check if AI processor service is running on remote VM"""
print("🔍 Checking AI processor status...")
# Try without sudo first (systemctl can check status without sudo)
result = subprocess.run(
["ssh", "netops@192.168.100.86", "systemctl is-active ai-processor"],
capture_output=True,
text=True
)
if result.stdout.strip() == "active":
print("✅ AI processor service is running")
return True
else:
# Try checking if the process is running another way
result = subprocess.run(
["ssh", "netops@192.168.100.86", "ps aux | grep -v grep | grep ai_processor"],
capture_output=True,
text=True
)
if "ai_processor.py" in result.stdout:
print("✅ AI processor is running (detected via process)")
return True
else:
print("⚠️ Cannot verify AI processor status (but it may still be running)")
print(" Continuing anyway...")
return True # Continue anyway since we know it's running
def main():
parser = argparse.ArgumentParser(
description='Run AI-driven network optimization pipeline with context'
)
parser.add_argument(
'--context',
choices=['performance', 'security', 'monitoring', 'automation'],
default='security',
help='Focus area for AI analysis (default: security)'
)
parser.add_argument(
'--skip-collection',
action='store_true',
help='Skip config collection (use existing)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Run analysis but do not create PR'
)
parser.add_argument(
'--verbose',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'--timeout',
type=int,
default=60,
help='Timeout waiting for AI response (default: 60s)'
)
args = parser.parse_args()
print(f"🚀 Starting pipeline with context: {args.context}")
print("=" * 50)
# Step 0: Check AI processor is running
if not check_ai_processor_status():
print("\n⚠️ Please start the AI processor service first")
sys.exit(1)
# Step 1: Collect current config (unless skipped)
if not args.skip_collection:
if not run_collection():
sys.exit(1)
# Step 2: Build context for AI
context = build_ai_context(args)
if args.verbose:
print("\n📋 AI Context:")
print(json.dumps(context, indent=2))
# Step 3: Create AI request (this triggers the remote AI processor)
request_id = create_ai_request(context)
# Step 4: Wait for AI processor to complete
if not wait_for_ai_response(request_id, args.timeout):
print("\n⚠️ AI processor may be busy or not running properly")
print(" Check status: ssh netops@192.168.100.86 'sudo systemctl status ai-processor'")
sys.exit(1)
# Step 5: Create PR (unless dry-run)
if not args.dry_run:
if not create_pr():
print("⚠️ PR creation failed but analysis is complete")
print(f" View results: cat /shared/ai-gitops/responses/{request_id}_response.json")
else:
print("⚡ Dry run - skipping PR creation")
print(f" View analysis: cat /shared/ai-gitops/responses/{request_id}_response.json | jq .suggestions")
print("\n✨ Pipeline completed successfully!")
print(f"Focus area: {args.context}")
if not args.dry_run:
print("Next steps: Review the PR in Gitea")
else:
print(f"Next steps: Review the suggestions and run without --dry-run to create PR")
if __name__ == "__main__":
main()