331 lines
12 KiB
Python
Executable File
331 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Enhanced pipeline runner with context support for split architecture
|
|
Works with AI processor running on separate VM (192.168.100.86)
|
|
"""
|
|
import argparse
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import uuid
|
|
|
|
def load_feedback_history():
|
|
"""Load PR feedback history to understand what's already configured"""
|
|
feedback_path = Path('/shared/ai-gitops/feedback/pr_feedback_history.json')
|
|
if feedback_path.exists():
|
|
with open(feedback_path, 'r') as f:
|
|
return json.load(f)
|
|
return []
|
|
|
|
def load_existing_config():
|
|
"""Load current SRX config to identify already-configured features"""
|
|
config_path = Path('/shared/ai-gitops/configs/current_srx_config.conf')
|
|
if config_path.exists():
|
|
with open(config_path, 'r') as f:
|
|
return f.read()
|
|
return ""
|
|
|
|
def build_ai_context(args):
|
|
"""Build comprehensive context for AI based on arguments and history"""
|
|
context = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"focus_area": args.context,
|
|
"skip_basic": True, # Always skip basic connectivity suggestions
|
|
"existing_features": [],
|
|
"priority_features": [],
|
|
"constraints": []
|
|
}
|
|
|
|
# Load existing configuration to prevent redundant suggestions
|
|
current_config = load_existing_config()
|
|
|
|
# Identify already-configured features
|
|
if "security-zone" in current_config:
|
|
context["existing_features"].append("zones_configured")
|
|
if "port-forwarding" in current_config:
|
|
context["existing_features"].append("gaming_optimizations")
|
|
if "wireguard" in current_config.lower():
|
|
context["existing_features"].append("vpn_configured")
|
|
|
|
# Set priorities based on context argument
|
|
if args.context == "performance":
|
|
context["priority_features"] = [
|
|
"qos_policies",
|
|
"traffic_shaping",
|
|
"bandwidth_management",
|
|
"flow_optimization"
|
|
]
|
|
context["constraints"].append("Focus on QoS and traffic optimization")
|
|
|
|
elif args.context == "security":
|
|
context["priority_features"] = [
|
|
"rate_limiting",
|
|
"ddos_protection",
|
|
"ids_ips_rules",
|
|
"geo_blocking",
|
|
"threat_feeds"
|
|
]
|
|
context["constraints"].append("Focus on advanced security features")
|
|
|
|
elif args.context == "monitoring":
|
|
context["priority_features"] = [
|
|
"syslog_enhancements",
|
|
"snmp_traps",
|
|
"flow_analytics",
|
|
"performance_metrics"
|
|
]
|
|
context["constraints"].append("Focus on visibility and monitoring")
|
|
|
|
elif args.context == "automation":
|
|
context["priority_features"] = [
|
|
"event_scripts",
|
|
"automated_responses",
|
|
"dynamic_policies",
|
|
"api_integrations"
|
|
]
|
|
context["constraints"].append("Focus on automation capabilities")
|
|
|
|
# Add learned constraints from feedback history
|
|
feedback = load_feedback_history()
|
|
if feedback:
|
|
# Extract patterns AI should avoid
|
|
rejected_patterns = []
|
|
for entry in feedback:
|
|
if entry.get("status") == "rejected" or entry.get("feedback_type") == "rejected":
|
|
rejected_patterns.append(entry.get("reason", ""))
|
|
|
|
if rejected_patterns:
|
|
context["constraints"].append("Avoid patterns that were previously rejected")
|
|
context["rejected_patterns"] = rejected_patterns[-5:] # Last 5 rejections
|
|
|
|
# Add instruction to avoid redundant suggestions
|
|
context["instructions"] = [
|
|
"DO NOT suggest basic connectivity policies - all zones are properly configured",
|
|
"DO NOT suggest any/any/any rules - this has been rejected multiple times",
|
|
"FOCUS on advanced features that enhance the existing configuration",
|
|
"CHECK if feature already exists before suggesting",
|
|
f"PRIORITY: {args.context} optimizations and enhancements"
|
|
]
|
|
|
|
return context
|
|
|
|
def run_collection():
|
|
"""Run the config collection script"""
|
|
print("📊 Collecting current SRX configuration...")
|
|
result = subprocess.run(
|
|
["python3", "/home/netops/orchestrator/collect_srx_config.py"],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
if result.returncode != 0:
|
|
print(f"❌ Collection failed: {result.stderr}")
|
|
return False
|
|
print("✅ Configuration collected successfully")
|
|
return True
|
|
|
|
def create_ai_request(context):
|
|
"""Create an AI analysis request in the shared directory"""
|
|
print(f"🤖 Creating AI analysis request with context: {context['focus_area']}...")
|
|
|
|
# Generate unique request ID
|
|
request_id = f"pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
|
|
|
|
# Create request data
|
|
request_data = {
|
|
"request_id": request_id,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"type": "analyze_config",
|
|
"context": context,
|
|
"data": {
|
|
"config_file": "/shared/ai-gitops/configs/srx_config_latest.txt",
|
|
"analysis_file": "/shared/ai-gitops/configs/srx_config_analysis_latest.json",
|
|
"top_talkers": {"buckets": []}, # Empty for context-based analysis
|
|
"vlans": {"buckets": []},
|
|
"protocols": {"buckets": []}
|
|
}
|
|
}
|
|
|
|
# Save context to shared location
|
|
context_path = Path('/shared/ai-gitops/context/current_context.json')
|
|
context_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(context_path, 'w') as f:
|
|
json.dump(context, f, indent=2)
|
|
|
|
# Save request to trigger AI processor
|
|
request_path = Path('/shared/ai-gitops/requests') / f"{request_id}.json"
|
|
request_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(request_path, 'w') as f:
|
|
json.dump(request_data, f, indent=2)
|
|
|
|
print(f"✅ Request created: {request_id}")
|
|
return request_id
|
|
|
|
def wait_for_ai_response(request_id, timeout=60):
|
|
"""Wait for AI processor to complete analysis"""
|
|
print(f"⏳ Waiting for AI processor response (timeout: {timeout}s)...")
|
|
|
|
response_path = Path('/shared/ai-gitops/responses') / f"{request_id}_response.json"
|
|
|
|
for i in range(timeout):
|
|
if response_path.exists():
|
|
print("✅ AI analysis completed")
|
|
|
|
# Read and display key info
|
|
with open(response_path, 'r') as f:
|
|
response = json.load(f)
|
|
|
|
if 'focus_area' in response:
|
|
print(f" Focus area: {response['focus_area']}")
|
|
if 'feedback_aware' in response:
|
|
print(f" Feedback aware: {response['feedback_aware']}")
|
|
|
|
return True
|
|
|
|
# Show progress every 5 seconds
|
|
if i % 5 == 0 and i > 0:
|
|
print(f" Still waiting... ({i}/{timeout}s)")
|
|
|
|
time.sleep(1)
|
|
|
|
print(f"❌ Timeout waiting for AI response after {timeout} seconds")
|
|
print(" Check AI processor logs: ssh netops@192.168.100.86 'sudo tail /var/log/ai-processor/ai-processor.log'")
|
|
return False
|
|
|
|
def create_pr():
|
|
"""Create pull request in Gitea"""
|
|
print("📝 Creating pull request...")
|
|
|
|
# Check if create_ai_pr.py exists
|
|
create_pr_script = Path('/home/netops/orchestrator/create_ai_pr.py')
|
|
if not create_pr_script.exists():
|
|
print("❌ create_ai_pr.py not found - using placeholder")
|
|
print(" To create PRs, ensure create_ai_pr.py is available")
|
|
return False
|
|
|
|
result = subprocess.run(
|
|
["python3", str(create_pr_script)],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
if result.returncode != 0:
|
|
print(f"❌ PR creation failed: {result.stderr}")
|
|
return False
|
|
print("✅ Pull request created")
|
|
return True
|
|
|
|
def check_ai_processor_status():
|
|
"""Check if AI processor service is running on remote VM"""
|
|
print("🔍 Checking AI processor status...")
|
|
|
|
# Try without sudo first (systemctl can check status without sudo)
|
|
result = subprocess.run(
|
|
["ssh", "netops@192.168.100.86", "systemctl is-active ai-processor"],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.stdout.strip() == "active":
|
|
print("✅ AI processor service is running")
|
|
return True
|
|
else:
|
|
# Try checking if the process is running another way
|
|
result = subprocess.run(
|
|
["ssh", "netops@192.168.100.86", "ps aux | grep -v grep | grep ai_processor"],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if "ai_processor.py" in result.stdout:
|
|
print("✅ AI processor is running (detected via process)")
|
|
return True
|
|
else:
|
|
print("⚠️ Cannot verify AI processor status (but it may still be running)")
|
|
print(" Continuing anyway...")
|
|
return True # Continue anyway since we know it's running
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Run AI-driven network optimization pipeline with context'
|
|
)
|
|
parser.add_argument(
|
|
'--context',
|
|
choices=['performance', 'security', 'monitoring', 'automation'],
|
|
default='security',
|
|
help='Focus area for AI analysis (default: security)'
|
|
)
|
|
parser.add_argument(
|
|
'--skip-collection',
|
|
action='store_true',
|
|
help='Skip config collection (use existing)'
|
|
)
|
|
parser.add_argument(
|
|
'--dry-run',
|
|
action='store_true',
|
|
help='Run analysis but do not create PR'
|
|
)
|
|
parser.add_argument(
|
|
'--verbose',
|
|
action='store_true',
|
|
help='Enable verbose output'
|
|
)
|
|
parser.add_argument(
|
|
'--timeout',
|
|
type=int,
|
|
default=60,
|
|
help='Timeout waiting for AI response (default: 60s)'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
print(f"🚀 Starting pipeline with context: {args.context}")
|
|
print("=" * 50)
|
|
|
|
# Step 0: Check AI processor is running
|
|
if not check_ai_processor_status():
|
|
print("\n⚠️ Please start the AI processor service first")
|
|
sys.exit(1)
|
|
|
|
# Step 1: Collect current config (unless skipped)
|
|
if not args.skip_collection:
|
|
if not run_collection():
|
|
sys.exit(1)
|
|
|
|
# Step 2: Build context for AI
|
|
context = build_ai_context(args)
|
|
|
|
if args.verbose:
|
|
print("\n📋 AI Context:")
|
|
print(json.dumps(context, indent=2))
|
|
|
|
# Step 3: Create AI request (this triggers the remote AI processor)
|
|
request_id = create_ai_request(context)
|
|
|
|
# Step 4: Wait for AI processor to complete
|
|
if not wait_for_ai_response(request_id, args.timeout):
|
|
print("\n⚠️ AI processor may be busy or not running properly")
|
|
print(" Check status: ssh netops@192.168.100.86 'sudo systemctl status ai-processor'")
|
|
sys.exit(1)
|
|
|
|
# Step 5: Create PR (unless dry-run)
|
|
if not args.dry_run:
|
|
if not create_pr():
|
|
print("⚠️ PR creation failed but analysis is complete")
|
|
print(f" View results: cat /shared/ai-gitops/responses/{request_id}_response.json")
|
|
else:
|
|
print("⚡ Dry run - skipping PR creation")
|
|
print(f" View analysis: cat /shared/ai-gitops/responses/{request_id}_response.json | jq .suggestions")
|
|
|
|
print("\n✨ Pipeline completed successfully!")
|
|
print(f"Focus area: {args.context}")
|
|
|
|
if not args.dry_run:
|
|
print("Next steps: Review the PR in Gitea")
|
|
else:
|
|
print(f"Next steps: Review the suggestions and run without --dry-run to create PR")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|