#!/usr/bin/env python3 """ Enhanced AI Processor for Network Analysis with Feedback Learning and Context Support This processor reads feedback from rejected PRs and improves its suggestions Now supports context-aware analysis for focused improvements """ import os import sys import json import time import logging import signal from datetime import datetime from pathlib import Path from typing import Dict, Optional import requests # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/var/log/ai-processor/ai-processor.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class AIProcessor: def __init__(self): """Initialize the AI processor""" self.request_dir = Path('/shared/ai-gitops/requests') self.response_dir = Path('/shared/ai-gitops/responses') self.feedback_dir = Path('/shared/ai-gitops/feedback') self.config_dir = Path('/shared/ai-gitops/configs') self.context_dir = Path('/shared/ai-gitops/context') # NEW: Context directory self.running = True # Ollama settings self.ollama_url = "http://localhost:11434" self.model = "llama2:13b" # Ensure directories exist self.response_dir.mkdir(parents=True, exist_ok=True) self.config_dir.mkdir(parents=True, exist_ok=True) self.context_dir.mkdir(parents=True, exist_ok=True) # NEW logger.info("AI Processor initialized with feedback learning and context support enabled") def load_context(self) -> Dict: """Load context from current_context.json if available""" context_file = self.context_dir / 'current_context.json' if context_file.exists(): try: with open(context_file, 'r') as f: context = json.load(f) logger.info(f"✓ Loaded context: focus on {context.get('focus_area', 'general')}") return context except Exception as e: logger.warning(f"Could not load context: {e}") # Default context if none provided return { "focus_area": "general", "skip_basic": True, "priority_features": [] } def load_feedback_learning(self) -> str: """Load feedback from rejected PRs and enforce MANDATORY rules""" feedback_file = self.feedback_dir / 'pr_feedback_history.json' learning_prompt = "" if feedback_file.exists(): try: with open(feedback_file, 'r') as f: feedback_history = json.load(f) # Count rejections - YOUR DATA STRUCTURE USES 'feedback_type' rejected_prs = [f for f in feedback_history if f.get('feedback_type') == 'rejected'] if rejected_prs: # CRITICAL: Make rules MANDATORY, not suggestions learning_prompt += """ ################################################################################ # ⚠️ CRITICAL MANDATORY RULES - VIOLATION = AUTOMATIC REJECTION ⚠️ ################################################################################ # YOU HAVE HAD {} PREVIOUS CONFIGURATIONS REJECTED! # # FORBIDDEN PATTERNS THAT WILL CAUSE REJECTION: # ❌ NEVER use: source-address any # ❌ NEVER use: destination-address any # ❌ NEVER use: application any # ❌ NEVER use: threshold values > 100 (use 10-50 range) # NEVER use XML notation, YANG models, or namespace syntax # # MANDATORY PATTERNS YOU MUST USE: # ✅ ALWAYS define address-sets first: # set security address-book global address-set INTERNAL-NETS address 192.168.100.0/24 # set security address-book global address-set EXTERNAL-NETS address 0.0.0.0/8 # ✅ ALWAYS use specific addresses from address-sets # ✅ ALWAYS enable logging with session-init and session-close # ✅ ALWAYS use IDS thresholds between 10-50 # # REPLACEMENT RULES (AUTOMATIC): # • Replace "source-address any" with "source-address INTERNAL-NETS" # • Replace "destination-address any" with "destination-address EXTERNAL-NETS" # • Replace "application any" with "application [ junos-https junos-ssh ]" # • Replace "threshold 1000" with "threshold 20" # """.format(len(rejected_prs)) # Add specific rejection reasons learning_prompt += "# SPECIFIC REJECTION REASONS FROM YOUR HISTORY:\n" for i, pr in enumerate(rejected_prs[-3:], 1): # Last 3 rejections details = pr.get('details', {}) reason = details.get('reason', 'Unknown') learning_prompt += f"# Rejection {i}: {reason}" learning_prompt += """# # IF YOU USE 'ANY' OR HIGH THRESHOLDS, THIS PR WILL BE REJECTED! # THE ORCHESTRATOR WILL NOT ACCEPT CONFIGS WITH THESE VIOLATIONS! ################################################################################ """ # Log enforcement logger.info(f"⚠️ ENFORCING MANDATORY RULES from {len(rejected_prs)} rejections") logger.info("✓ Forbidden patterns: any keywords, high thresholds") logger.info("✓ Required patterns: address-sets, specific addresses, logging") else: learning_prompt = "# No rejected PRs found - following best practices\n\n" except Exception as e: logger.warning(f"Could not load feedback: {e}") learning_prompt = "# Could not load feedback - using strict security rules\n\n" else: logger.info("No feedback history found - using default best practices") learning_prompt = """# No feedback history - using STRICT SECURITY DEFAULTS # ✅ Never use 'any' for addresses or applications # ✅ Always define address-sets # ✅ Keep IDS thresholds between 10-50 # ✅ Enable logging on all policies """ return learning_prompt def get_current_srx_config(self) -> str: """Get current SRX configuration and analysis - ENHANCED VERSION""" try: # First try to read the analysis file analysis_file = self.config_dir / 'srx_config_analysis_latest.json' config_file = self.config_dir / 'srx_config_latest.txt' context = "# Current SRX Configuration Context:\n" # Read analysis if available if analysis_file.exists(): with open(analysis_file, 'r') as f: analysis = json.load(f) # Build comprehensive context context += f"# Timestamp: {analysis.get('timestamp', 'Unknown')}\n" context += f"# Configured Zones: {', '.join(analysis.get('zones', []))}\n\n" # Show networks per zone context += "# Networks by Zone:\n" for zone, networks in analysis.get('networks', {}).items(): if networks: context += f"# {zone}: {', '.join(networks)}\n" # Show address book entries context += "\n# Address Book Entries:\n" address_book = analysis.get('address_book', {}) for name, addr in list(address_book.items())[:10]: # First 10 entries context += f"# {name}: {addr}\n" if len(address_book) > 10: context += f"# ... and {len(address_book) - 10} more entries\n" # Show interfaces context += "\n# Configured Interfaces:\n" for iface, data in analysis.get('interfaces', {}).items(): if 'addresses' in data: context += f"# {iface}: {', '.join(data['addresses'])}\n" context += f"\n# Total Policies: {analysis.get('policy_count', 0)}\n" # NEW: Add existing features summary context += "\n# EXISTING FEATURES (DO NOT DUPLICATE):\n" context += "# - Basic connectivity: COMPLETE\n" context += "# - Zone segmentation: CONFIGURED\n" context += "# - Gaming optimizations: ACTIVE\n" context += "# - VPN (WireGuard): CONFIGURED\n" context += "# - DNS infrastructure: Pi-hole at 192.168.100.11\n" else: context += "# No analysis file found - reading raw config\n" # Add sample of actual config if config_file.exists(): with open(config_file, 'r') as f: lines = f.readlines() # Find and extract key sections context += "\n# Sample Security Policies:\n" in_policies = False policy_lines = [] for i, line in enumerate(lines): if 'security policies' in line: in_policies = True policy_lines = lines[i:i+30] # Get next 30 lines break if policy_lines: context += ''.join(policy_lines[:20]) # First 20 lines of policies else: context += "# No policies section found\n" else: context += "# No configuration file available\n" context += "# Please run: python3 /home/netops/orchestrator/collect_srx_config.py\n" return context except Exception as e: logger.error(f"Error reading SRX config: {e}") return f"# Unable to read configuration: {str(e)}" def build_context_aware_prompt(self, focus_area: str) -> str: """Build focus-specific instructions based on context""" if focus_area == "security": return """ # SECURITY FOCUS: Generate ONLY advanced security enhancements PRIORITY FEATURES TO IMPLEMENT: 1. Rate Limiting per source IP: - Connection rate limits - SYN flood protection thresholds - Session limits per zone 2. DDoS Protection Screens: - Zone-specific screen options - ICMP flood thresholds (e.g., 1000/sec) - UDP flood protection (e.g., 10000/sec) - TCP SYN flood (attack-threshold: 200, alarm-threshold: 100) - Port scan detection 3. IDS/IPS Features: - Application identification rules - Threat detection policies - Anomaly-based detection Example commands to generate: set security screen ids-option WAN-screen icmp flood threshold 1000 set security screen ids-option WAN-screen tcp syn-flood attack-threshold 200 set security zones security-zone WAN screen WAN-screen """ elif focus_area == "performance": return """ # PERFORMANCE FOCUS: Generate ONLY QoS and traffic optimization PRIORITY FEATURES TO IMPLEMENT: 1. QoS for Gaming/Streaming: - Class-based queuing - DSCP marking for gaming traffic - Bandwidth guarantees (40% for gaming) - Priority queues for real-time traffic 2. Traffic Shaping: - Rate limiting for bulk transfers - TCP optimization - MSS clamping 3. Resource Optimization: - Session timeout tuning - Flow table optimization - CPU affinity settings Example commands to generate: set class-of-service classifiers dscp gaming-classifier forwarding-class gaming loss-priority low code-points ef set class-of-service interfaces ge-0/0/0 scheduler-map gaming-priority set class-of-service schedulers gaming-scheduler priority strict-high """ elif focus_area == "monitoring": return """ # MONITORING FOCUS: Generate ONLY visibility and logging enhancements PRIORITY FEATURES TO IMPLEMENT: 1. Enhanced Syslog: - Structured logging to Elasticsearch (192.168.100.85) - Security event correlation - Flow logging optimization 2. SNMP Configuration: - Trap configuration for alerts - Performance counter exposure - Custom MIBs 3. Traffic Analytics: - Application visibility - Session tracking - Detailed flow exports Example commands to generate: set system syslog host 192.168.100.85 any info set system syslog host 192.168.100.85 structured-data set snmp trap-group monitoring targets 192.168.100.85 """ elif focus_area == "automation": return """ # AUTOMATION FOCUS: Generate ONLY automation and dynamic features PRIORITY FEATURES TO IMPLEMENT: 1. Event Scripts: - Auto-response to security events - Dynamic blacklisting - Scheduled tasks 2. API Configuration: - NETCONF/REST setup - Webhook integrations - Automated backup triggers 3. Dynamic Policies: - Time-based rules - Threshold triggers - Self-healing configs Example commands to generate: set event-options policy block-attacker events SECURITY_SCREEN_TCP_SYN_FLOOD set event-options policy block-attacker then execute-commands "set security zones security-zone untrust address-book address attacker" """ else: # general return """ # GENERAL IMPROVEMENTS: Focus on most needed enhancements Generate improvements based on traffic analysis, avoiding basic connectivity. Focus on security hardening, performance optimization, or monitoring enhancements. """ def build_prompt(self, request_data: Dict) -> str: """Build prompt for AI with traffic data, feedback learning, AND context""" # Load context to determine focus area context = self.load_context() focus_area = context.get('focus_area', 'general') logger.info(f"Building prompt with focus area: {focus_area}") # IMPORTANT: Load feedback learning first learning_prompt = self.load_feedback_learning() # Get current SRX configuration if available srx_config = self.get_current_srx_config() # Get context-specific instructions context_instructions = self.build_context_aware_prompt(focus_area) # Extract traffic summary traffic_data = request_data.get('data', {}) top_talkers = traffic_data.get('top_talkers', {}).get('buckets', []) vlans = traffic_data.get('vlans', {}).get('buckets', []) protocols = traffic_data.get('protocols', {}).get('buckets', []) # Build traffic summary traffic_summary = "Traffic Analysis Summary:\n" if top_talkers: traffic_summary += f"- Top {len(top_talkers)} source IPs detected\n" for talker in top_talkers[:5]: traffic_summary += f" - {talker['key']}: {talker['doc_count']} flows\n" else: traffic_summary += "- No specific traffic patterns detected (using best practices)\n" # Build the enhanced prompt with context # Build the enhanced prompt with context prompt = f"""You are a Juniper SRX security expert. Generate ONLY valid SRX configuration commands. CURRENT CONTEXT: {focus_area.upper()} IMPROVEMENTS REQUESTED =============================================== {learning_prompt} {context_instructions} Current SRX Configuration Context: {srx_config[:800]}... CRITICAL REQUIREMENTS FOR ALL SUGGESTIONS: ========================================== 1. This is a MATURE network with ALL basic connectivity COMPLETE 2. DO NOT suggest: - Basic zone-to-zone policies (already configured) - Standard internet access (already working) - Basic NAT rules (already implemented) - Gaming optimizations (PS5/Xbox already optimized) 3. SYNTAX REQUIREMENTS - THIS IS CRITICAL: - Each set command must be COMPLETE on a single line - NO opening braces at the end of lines - NO multi-line commands - Example CORRECT: set security screen ids-option WAN-screen icmp flood threshold 1000 - Example WRONG: set security screen ids-option WAN-screen icmp flood threshold 1000 {{ 4. ONLY suggest ADVANCED features for {focus_area}: - NOT already in the configuration - Specific to the {focus_area} context - Production-ready with real values 5. ABSOLUTE FORBIDDEN PATTERNS: - NEVER use "any any any" rules - NEVER use "source-address any" with "destination-address any" - NEVER suggest redundant features - NEVER ignore the focus area ({focus_area}) 6. MUST follow these patterns: - Use specific address-sets - Name policies descriptively - Include logging - Add comments explaining purpose {traffic_summary} Based on the analysis and {focus_area} focus, generate configuration that: 1. Enhances {focus_area} capabilities 2. Uses specific values (no placeholders) 3. Avoids any previously rejected patterns 4. Does NOT duplicate existing features Output ONLY the set commands and comments. Focus on {focus_area} improvements only. """ return prompt def query_ollama(self, prompt: str) -> str: """Query Ollama API for AI suggestions""" try: # First check if model is loaded models_response = requests.get(f"{self.ollama_url}/api/tags") if models_response.status_code == 200: models = models_response.json().get('models', []) model_names = [m['name'] for m in models] if self.model not in model_names: logger.warning(f"Model {self.model} not found, available: {model_names}") return self.generate_fallback_config() # Query the model response = requests.post( f"{self.ollama_url}/api/generate", json={ "model": self.model, "prompt": prompt, "stream": False, "options": { "temperature": 0.7, "max_tokens": 2000 } }, timeout=600 ) if response.status_code == 200: result = response.json() validated, _ = self.validate_response(result.get("response", "")) return validated else: logger.error(f"Ollama API error: {response.status_code}") return self.generate_fallback_config() except requests.Timeout: logger.warning("AI query timeout - using enhanced fallback") return self.generate_fallback_config() except Exception as e: logger.error(f"Error querying Ollama: {e}") return self.generate_fallback_config() def generate_fallback_config(self) -> str: """Generate context-aware fallback configuration""" context = self.load_context() focus_area = context.get('focus_area', 'security') timestamp = datetime.now().isoformat() if focus_area == "security": return f"""# AI-Generated SRX Security Configuration (Fallback) # Generated: {timestamp} # Focus: Advanced Security Features # DDoS Protection Screens for WAN set security screen ids-option WAN-screen icmp flood threshold 1000 set security screen ids-option WAN-screen udp flood threshold 10000 set security screen ids-option WAN-screen tcp syn-flood attack-threshold 200 set security screen ids-option WAN-screen tcp syn-flood alarm-threshold 100 set security screen ids-option WAN-screen tcp port-scan threshold 3000 set security zones security-zone WAN screen WAN-screen # Rate limiting for HOME zone set security screen ids-option HOME-screen tcp syn-flood attack-threshold 500 set security screen ids-option HOME-screen limit-session source-ip-based 100 set security zones security-zone HOME screen HOME-screen""" elif focus_area == "performance": return f"""# AI-Generated SRX Performance Configuration (Fallback) # Generated: {timestamp} # Focus: QoS and Traffic Optimization # QoS for Gaming Traffic set class-of-service classifiers dscp gaming-classifier forwarding-class gaming loss-priority low code-points ef set class-of-service forwarding-classes class gaming queue-num 0 set class-of-service interfaces ge-0/0/0 scheduler-map gaming-priority set class-of-service scheduler-maps gaming-priority forwarding-class gaming scheduler gaming-scheduler set class-of-service schedulers gaming-scheduler priority strict-high set class-of-service schedulers gaming-scheduler transmit-rate percent 40""" elif focus_area == "monitoring": return f"""# AI-Generated SRX Monitoring Configuration (Fallback) # Generated: {timestamp} # Focus: Enhanced Logging and Visibility # Syslog to Elasticsearch set system syslog host 192.168.100.85 any info set system syslog host 192.168.100.85 structured-data set system syslog file security-log security info # SNMP monitoring set snmp community monitoring authorization read-only set snmp trap-group monitoring targets 192.168.100.85""" else: return f"""# AI-Generated SRX Configuration (Fallback) # Generated: {timestamp} # Focus: General Improvements # Basic security hardening set security screen ids-option GENERAL-screen tcp syn-flood attack-threshold 200 set security zones security-zone WAN screen GENERAL-screen""" def extract_srx_commands(self, ai_response: str) -> str: """Extract valid SRX commands from AI response""" lines = ai_response.split('\n') commands = [] for line in lines: line = line.strip() # Keep comments and set commands # Keep comments if line.startswith('#'): commands.append(line) # Process set commands elif line.startswith('set '): # CRITICAL FIX: Remove any trailing braces or incomplete syntax # Clean up common AI mistakes clean_line = line.rstrip(' {') # Remove trailing " {" clean_line = clean_line.rstrip('{') # Remove trailing "{" # Skip if line appears incomplete if clean_line.count('(') != clean_line.count(')'): logger.warning(f"Skipping unbalanced parentheses: {line}") continue # Only add if it looks complete if len(clean_line) > 10: # Basic sanity check commands.append(clean_line) else: logger.warning(f"Skipping too-short command: {line}") if commands: logger.info(f"Extracted {len(commands)} valid commands") return '\n'.join(commands) # Otherwise, log issue and return fallback logger.warning("No valid SRX commands found in AI response") return self.generate_fallback_config() # if line.startswith('#') or line.startswith('set '): # commands.append(line) # # Also keep section headers that look like comments # elif line.startswith('/*') or line.startswith('*'): # commands.append(f"# {line}") # If we got commands, return them # if commands: # return '\n'.join(commands) # Otherwise, log issue and return fallback # logger.warning("No valid SRX commands found in AI response") # return self.generate_fallback_config() def validate_response(self, config: str) -> tuple[str, list]: """Validate and auto-fix configuration before returning Returns: (fixed_config, list_of_violations) """ violations = [] lines = config.split('\n') fixed_lines = [] for line in lines: original = line # Check and fix 'any' violations if 'source-address any' in line.lower(): line = line.replace('any', 'INTERNAL-NETS') violations.append(f"Fixed 'source-address any' on line: {original.strip()}") if 'destination-address any' in line.lower(): line = line.replace('any', 'EXTERNAL-NETS') violations.append(f"Fixed 'destination-address any' on line: {original.strip()}") if 'application any' in line.lower(): line = line.replace('any', '[ junos-https junos-ssh ]') violations.append(f"Fixed 'application any' on line: {original.strip()}") # Fix high thresholds import re if 'threshold' in line.lower(): def fix_threshold(match): val = int(match.group(2)) if val > 100: violations.append(f"Fixed threshold {val} -> 20") return match.group(1) + '20' return match.group(0) line = re.sub(r'(threshold\s+)(\d+)', fix_threshold, line) fixed_lines.append(line) # Check if address-sets are defined fixed_config = '\n'.join(fixed_lines) if 'address-set' not in fixed_config.lower(): # Prepend required address-sets address_sets = """# MANDATORY: Address-set definitions set security address-book global address-set INTERNAL-NETS address 192.168.100.0/24 set security address-book global address-set EXTERNAL-NETS address 0.0.0.0/8 set security address-book global address-set DMZ-NETS address 10.0.0.0/8 """ fixed_config = address_sets + fixed_config violations.append("Added mandatory address-sets") if violations: logger.warning(f"⚠️ Fixed {len(violations)} violations in generated config") for v in violations[:5]: logger.info(f" • {v}") return fixed_config, violations def process_request(self, request_file: Path) -> Dict: """Process a single analysis request with context awareness""" logger.info(f"Processing request: {request_file}") try: # Load request with open(request_file, 'r') as f: request_data = json.load(f) request_id = request_data.get('request_id', 'unknown') logger.info(f"Request ID: {request_id}") # Load context to understand focus area context = self.load_context() focus_area = context.get('focus_area', 'general') logger.info(f"Focus area: {focus_area}") # Check for feedback feedback_file = self.feedback_dir / 'pr_feedback_history.json' if feedback_file.exists(): logger.info("✓ Found feedback history - will incorporate learning") else: logger.info("No feedback history found - using best practices") # Build prompt with feedback and context logger.info(f"Building enhanced prompt with traffic data, feedback, and {focus_area} focus...") prompt = self.build_prompt(request_data) # Log what we're doing if "CRITICAL LEARNING FROM REJECTED" in prompt: logger.info("✓ Applying feedback learning to avoid previous mistakes") logger.info(f"✓ Focusing on {focus_area} improvements only") # Get AI suggestions logger.info(f"Querying {self.model} for {focus_area} suggestions...") ai_response = self.query_ollama(prompt) # Extract commands suggestions = self.extract_srx_commands(ai_response) # Build response with context info response_data = { 'request_id': request_id, 'timestamp': datetime.now().isoformat(), 'model': self.model, 'focus_area': focus_area, # NEW: Include focus area 'context': context, # NEW: Include full context 'response': ai_response, 'suggestions': suggestions, 'feedback_aware': feedback_file.exists(), 'processing_time': time.time() } # Save response response_file = self.response_dir / f"{request_id}_response.json" with open(response_file, 'w') as f: json.dump(response_data, f, indent=2) logger.info(f"✓ Response saved to: {response_file}") logger.info(f"✓ Generated {focus_area} improvements") # Remove processed request request_file.unlink() logger.info("✓ Removed processed request") return response_data except Exception as e: logger.error(f"Error processing request: {e}") return None def monitor_requests(self): """Monitor for new analysis requests""" logger.info("Starting request monitor with context support...") while self.running: try: # Check for request files request_files = list(self.request_dir.glob('*.json')) if request_files: logger.info(f"Found {len(request_files)} request(s) to process") for request_file in request_files: self.process_request(request_file) # Wait before next check time.sleep(5) except KeyboardInterrupt: logger.info("Received interrupt signal") break except Exception as e: logger.error(f"Error in monitor loop: {e}") time.sleep(10) def shutdown(self, signum=None, frame=None): """Graceful shutdown""" logger.info("Shutting down AI processor...") self.running = False def main(): """Main entry point""" processor = AIProcessor() # Set up signal handlers signal.signal(signal.SIGTERM, processor.shutdown) signal.signal(signal.SIGINT, processor.shutdown) try: processor.monitor_requests() except Exception as e: logger.error(f"Fatal error: {e}") sys.exit(1) if __name__ == "__main__": main()