Files
srx-ai-automation-docs/scripts/ai-processor/ai_processor.py
2025-09-05 04:09:04 +00:00

787 lines
30 KiB
Python

#!/usr/bin/env python3
"""
Enhanced AI Processor for Network Analysis with Feedback Learning and Context Support
This processor reads feedback from rejected PRs and improves its suggestions
Now supports context-aware analysis for focused improvements
"""
import os
import sys
import json
import time
import logging
import signal
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional
import requests
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/ai-processor/ai-processor.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class AIProcessor:
def __init__(self):
"""Initialize the AI processor"""
self.request_dir = Path('/shared/ai-gitops/requests')
self.response_dir = Path('/shared/ai-gitops/responses')
self.feedback_dir = Path('/shared/ai-gitops/feedback')
self.config_dir = Path('/shared/ai-gitops/configs')
self.context_dir = Path('/shared/ai-gitops/context') # NEW: Context directory
self.running = True
# Ollama settings
self.ollama_url = "http://localhost:11434"
self.model = "llama2:13b"
# Ensure directories exist
self.response_dir.mkdir(parents=True, exist_ok=True)
self.config_dir.mkdir(parents=True, exist_ok=True)
self.context_dir.mkdir(parents=True, exist_ok=True) # NEW
logger.info("AI Processor initialized with feedback learning and context support enabled")
def load_context(self) -> Dict:
"""Load context from current_context.json if available"""
context_file = self.context_dir / 'current_context.json'
if context_file.exists():
try:
with open(context_file, 'r') as f:
context = json.load(f)
logger.info(f"✓ Loaded context: focus on {context.get('focus_area', 'general')}")
return context
except Exception as e:
logger.warning(f"Could not load context: {e}")
# Default context if none provided
return {
"focus_area": "general",
"skip_basic": True,
"priority_features": []
}
def load_feedback_learning(self) -> str:
"""Load feedback from rejected PRs and enforce MANDATORY rules"""
feedback_file = self.feedback_dir / 'pr_feedback_history.json'
learning_prompt = ""
if feedback_file.exists():
try:
with open(feedback_file, 'r') as f:
feedback_history = json.load(f)
# Count rejections - YOUR DATA STRUCTURE USES 'feedback_type'
rejected_prs = [f for f in feedback_history if f.get('feedback_type') == 'rejected']
if rejected_prs:
# CRITICAL: Make rules MANDATORY, not suggestions
learning_prompt += """
################################################################################
# ⚠️ CRITICAL MANDATORY RULES - VIOLATION = AUTOMATIC REJECTION ⚠️
################################################################################
# YOU HAVE HAD {} PREVIOUS CONFIGURATIONS REJECTED!
#
# FORBIDDEN PATTERNS THAT WILL CAUSE REJECTION:
# ❌ NEVER use: source-address any
# ❌ NEVER use: destination-address any
# ❌ NEVER use: application any
# ❌ NEVER use: threshold values > 100 (use 10-50 range)
# NEVER use XML notation, YANG models, or namespace syntax
#
# MANDATORY PATTERNS YOU MUST USE:
# ✅ ALWAYS define address-sets first:
# set security address-book global address-set INTERNAL-NETS address 192.168.100.0/24
# set security address-book global address-set EXTERNAL-NETS address 0.0.0.0/8
# ✅ ALWAYS use specific addresses from address-sets
# ✅ ALWAYS enable logging with session-init and session-close
# ✅ ALWAYS use IDS thresholds between 10-50
#
# REPLACEMENT RULES (AUTOMATIC):
# • Replace "source-address any" with "source-address INTERNAL-NETS"
# • Replace "destination-address any" with "destination-address EXTERNAL-NETS"
# • Replace "application any" with "application [ junos-https junos-ssh ]"
# • Replace "threshold 1000" with "threshold 20"
#
""".format(len(rejected_prs))
# Add specific rejection reasons
learning_prompt += "# SPECIFIC REJECTION REASONS FROM YOUR HISTORY:\n"
for i, pr in enumerate(rejected_prs[-3:], 1): # Last 3 rejections
details = pr.get('details', {})
reason = details.get('reason', 'Unknown')
learning_prompt += f"# Rejection {i}: {reason}"
learning_prompt += """#
# IF YOU USE 'ANY' OR HIGH THRESHOLDS, THIS PR WILL BE REJECTED!
# THE ORCHESTRATOR WILL NOT ACCEPT CONFIGS WITH THESE VIOLATIONS!
################################################################################
"""
# Log enforcement
logger.info(f"⚠️ ENFORCING MANDATORY RULES from {len(rejected_prs)} rejections")
logger.info("✓ Forbidden patterns: any keywords, high thresholds")
logger.info("✓ Required patterns: address-sets, specific addresses, logging")
else:
learning_prompt = "# No rejected PRs found - following best practices\n\n"
except Exception as e:
logger.warning(f"Could not load feedback: {e}")
learning_prompt = "# Could not load feedback - using strict security rules\n\n"
else:
logger.info("No feedback history found - using default best practices")
learning_prompt = """# No feedback history - using STRICT SECURITY DEFAULTS
# ✅ Never use 'any' for addresses or applications
# ✅ Always define address-sets
# ✅ Keep IDS thresholds between 10-50
# ✅ Enable logging on all policies
"""
return learning_prompt
def get_current_srx_config(self) -> str:
"""Get current SRX configuration and analysis - ENHANCED VERSION"""
try:
# First try to read the analysis file
analysis_file = self.config_dir / 'srx_config_analysis_latest.json'
config_file = self.config_dir / 'srx_config_latest.txt'
context = "# Current SRX Configuration Context:\n"
# Read analysis if available
if analysis_file.exists():
with open(analysis_file, 'r') as f:
analysis = json.load(f)
# Build comprehensive context
context += f"# Timestamp: {analysis.get('timestamp', 'Unknown')}\n"
context += f"# Configured Zones: {', '.join(analysis.get('zones', []))}\n\n"
# Show networks per zone
context += "# Networks by Zone:\n"
for zone, networks in analysis.get('networks', {}).items():
if networks:
context += f"# {zone}: {', '.join(networks)}\n"
# Show address book entries
context += "\n# Address Book Entries:\n"
address_book = analysis.get('address_book', {})
for name, addr in list(address_book.items())[:10]: # First 10 entries
context += f"# {name}: {addr}\n"
if len(address_book) > 10:
context += f"# ... and {len(address_book) - 10} more entries\n"
# Show interfaces
context += "\n# Configured Interfaces:\n"
for iface, data in analysis.get('interfaces', {}).items():
if 'addresses' in data:
context += f"# {iface}: {', '.join(data['addresses'])}\n"
context += f"\n# Total Policies: {analysis.get('policy_count', 0)}\n"
# NEW: Add existing features summary
context += "\n# EXISTING FEATURES (DO NOT DUPLICATE):\n"
context += "# - Basic connectivity: COMPLETE\n"
context += "# - Zone segmentation: CONFIGURED\n"
context += "# - Gaming optimizations: ACTIVE\n"
context += "# - VPN (WireGuard): CONFIGURED\n"
context += "# - DNS infrastructure: Pi-hole at 192.168.100.11\n"
else:
context += "# No analysis file found - reading raw config\n"
# Add sample of actual config
if config_file.exists():
with open(config_file, 'r') as f:
lines = f.readlines()
# Find and extract key sections
context += "\n# Sample Security Policies:\n"
in_policies = False
policy_lines = []
for i, line in enumerate(lines):
if 'security policies' in line:
in_policies = True
policy_lines = lines[i:i+30] # Get next 30 lines
break
if policy_lines:
context += ''.join(policy_lines[:20]) # First 20 lines of policies
else:
context += "# No policies section found\n"
else:
context += "# No configuration file available\n"
context += "# Please run: python3 /home/netops/orchestrator/collect_srx_config.py\n"
return context
except Exception as e:
logger.error(f"Error reading SRX config: {e}")
return f"# Unable to read configuration: {str(e)}"
def build_context_aware_prompt(self, focus_area: str) -> str:
"""Build focus-specific instructions based on context"""
if focus_area == "security":
return """
# SECURITY FOCUS: Generate ONLY advanced security enhancements
PRIORITY FEATURES TO IMPLEMENT:
1. Rate Limiting per source IP:
- Connection rate limits
- SYN flood protection thresholds
- Session limits per zone
2. DDoS Protection Screens:
- Zone-specific screen options
- ICMP flood thresholds (e.g., 1000/sec)
- UDP flood protection (e.g., 10000/sec)
- TCP SYN flood (attack-threshold: 200, alarm-threshold: 100)
- Port scan detection
3. IDS/IPS Features:
- Application identification rules
- Threat detection policies
- Anomaly-based detection
Example commands to generate:
set security screen ids-option WAN-screen icmp flood threshold 1000
set security screen ids-option WAN-screen tcp syn-flood attack-threshold 200
set security zones security-zone WAN screen WAN-screen
"""
elif focus_area == "performance":
return """
# PERFORMANCE FOCUS: Generate ONLY QoS and traffic optimization
PRIORITY FEATURES TO IMPLEMENT:
1. QoS for Gaming/Streaming:
- Class-based queuing
- DSCP marking for gaming traffic
- Bandwidth guarantees (40% for gaming)
- Priority queues for real-time traffic
2. Traffic Shaping:
- Rate limiting for bulk transfers
- TCP optimization
- MSS clamping
3. Resource Optimization:
- Session timeout tuning
- Flow table optimization
- CPU affinity settings
Example commands to generate:
set class-of-service classifiers dscp gaming-classifier forwarding-class gaming loss-priority low code-points ef
set class-of-service interfaces ge-0/0/0 scheduler-map gaming-priority
set class-of-service schedulers gaming-scheduler priority strict-high
"""
elif focus_area == "monitoring":
return """
# MONITORING FOCUS: Generate ONLY visibility and logging enhancements
PRIORITY FEATURES TO IMPLEMENT:
1. Enhanced Syslog:
- Structured logging to Elasticsearch (192.168.100.85)
- Security event correlation
- Flow logging optimization
2. SNMP Configuration:
- Trap configuration for alerts
- Performance counter exposure
- Custom MIBs
3. Traffic Analytics:
- Application visibility
- Session tracking
- Detailed flow exports
Example commands to generate:
set system syslog host 192.168.100.85 any info
set system syslog host 192.168.100.85 structured-data
set snmp trap-group monitoring targets 192.168.100.85
"""
elif focus_area == "automation":
return """
# AUTOMATION FOCUS: Generate ONLY automation and dynamic features
PRIORITY FEATURES TO IMPLEMENT:
1. Event Scripts:
- Auto-response to security events
- Dynamic blacklisting
- Scheduled tasks
2. API Configuration:
- NETCONF/REST setup
- Webhook integrations
- Automated backup triggers
3. Dynamic Policies:
- Time-based rules
- Threshold triggers
- Self-healing configs
Example commands to generate:
set event-options policy block-attacker events SECURITY_SCREEN_TCP_SYN_FLOOD
set event-options policy block-attacker then execute-commands "set security zones security-zone untrust address-book address attacker"
"""
else: # general
return """
# GENERAL IMPROVEMENTS: Focus on most needed enhancements
Generate improvements based on traffic analysis, avoiding basic connectivity.
Focus on security hardening, performance optimization, or monitoring enhancements.
"""
def build_prompt(self, request_data: Dict) -> str:
"""Build prompt for AI with traffic data, feedback learning, AND context"""
# Load context to determine focus area
context = self.load_context()
focus_area = context.get('focus_area', 'general')
logger.info(f"Building prompt with focus area: {focus_area}")
# IMPORTANT: Load feedback learning first
learning_prompt = self.load_feedback_learning()
# Get current SRX configuration if available
srx_config = self.get_current_srx_config()
# Get context-specific instructions
context_instructions = self.build_context_aware_prompt(focus_area)
# Extract traffic summary
traffic_data = request_data.get('data', {})
top_talkers = traffic_data.get('top_talkers', {}).get('buckets', [])
vlans = traffic_data.get('vlans', {}).get('buckets', [])
protocols = traffic_data.get('protocols', {}).get('buckets', [])
# Build traffic summary
traffic_summary = "Traffic Analysis Summary:\n"
if top_talkers:
traffic_summary += f"- Top {len(top_talkers)} source IPs detected\n"
for talker in top_talkers[:5]:
traffic_summary += f" - {talker['key']}: {talker['doc_count']} flows\n"
else:
traffic_summary += "- No specific traffic patterns detected (using best practices)\n"
# Build the enhanced prompt with context
# Build the enhanced prompt with context
prompt = f"""You are a Juniper SRX security expert. Generate ONLY valid SRX configuration commands.
CURRENT CONTEXT: {focus_area.upper()} IMPROVEMENTS REQUESTED
===============================================
{learning_prompt}
{context_instructions}
Current SRX Configuration Context:
{srx_config[:800]}...
CRITICAL REQUIREMENTS FOR ALL SUGGESTIONS:
==========================================
1. This is a MATURE network with ALL basic connectivity COMPLETE
2. DO NOT suggest:
- Basic zone-to-zone policies (already configured)
- Standard internet access (already working)
- Basic NAT rules (already implemented)
- Gaming optimizations (PS5/Xbox already optimized)
3. SYNTAX REQUIREMENTS - THIS IS CRITICAL:
- Each set command must be COMPLETE on a single line
- NO opening braces at the end of lines
- NO multi-line commands
- Example CORRECT: set security screen ids-option WAN-screen icmp flood threshold 1000
- Example WRONG: set security screen ids-option WAN-screen icmp flood threshold 1000 {{
4. ONLY suggest ADVANCED features for {focus_area}:
- NOT already in the configuration
- Specific to the {focus_area} context
- Production-ready with real values
5. ABSOLUTE FORBIDDEN PATTERNS:
- NEVER use "any any any" rules
- NEVER use "source-address any" with "destination-address any"
- NEVER suggest redundant features
- NEVER ignore the focus area ({focus_area})
6. MUST follow these patterns:
- Use specific address-sets
- Name policies descriptively
- Include logging
- Add comments explaining purpose
{traffic_summary}
Based on the analysis and {focus_area} focus, generate configuration that:
1. Enhances {focus_area} capabilities
2. Uses specific values (no placeholders)
3. Avoids any previously rejected patterns
4. Does NOT duplicate existing features
Output ONLY the set commands and comments. Focus on {focus_area} improvements only.
"""
return prompt
def query_ollama(self, prompt: str) -> str:
"""Query Ollama API for AI suggestions"""
try:
# First check if model is loaded
models_response = requests.get(f"{self.ollama_url}/api/tags")
if models_response.status_code == 200:
models = models_response.json().get('models', [])
model_names = [m['name'] for m in models]
if self.model not in model_names:
logger.warning(f"Model {self.model} not found, available: {model_names}")
return self.generate_fallback_config()
# Query the model
response = requests.post(
f"{self.ollama_url}/api/generate",
json={
"model": self.model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.7,
"max_tokens": 2000
}
},
timeout=600
)
if response.status_code == 200:
result = response.json()
validated, _ = self.validate_response(result.get("response", ""))
return validated
else:
logger.error(f"Ollama API error: {response.status_code}")
return self.generate_fallback_config()
except requests.Timeout:
logger.warning("AI query timeout - using enhanced fallback")
return self.generate_fallback_config()
except Exception as e:
logger.error(f"Error querying Ollama: {e}")
return self.generate_fallback_config()
def generate_fallback_config(self) -> str:
"""Generate context-aware fallback configuration"""
context = self.load_context()
focus_area = context.get('focus_area', 'security')
timestamp = datetime.now().isoformat()
if focus_area == "security":
return f"""# AI-Generated SRX Security Configuration (Fallback)
# Generated: {timestamp}
# Focus: Advanced Security Features
# DDoS Protection Screens for WAN
set security screen ids-option WAN-screen icmp flood threshold 1000
set security screen ids-option WAN-screen udp flood threshold 10000
set security screen ids-option WAN-screen tcp syn-flood attack-threshold 200
set security screen ids-option WAN-screen tcp syn-flood alarm-threshold 100
set security screen ids-option WAN-screen tcp port-scan threshold 3000
set security zones security-zone WAN screen WAN-screen
# Rate limiting for HOME zone
set security screen ids-option HOME-screen tcp syn-flood attack-threshold 500
set security screen ids-option HOME-screen limit-session source-ip-based 100
set security zones security-zone HOME screen HOME-screen"""
elif focus_area == "performance":
return f"""# AI-Generated SRX Performance Configuration (Fallback)
# Generated: {timestamp}
# Focus: QoS and Traffic Optimization
# QoS for Gaming Traffic
set class-of-service classifiers dscp gaming-classifier forwarding-class gaming loss-priority low code-points ef
set class-of-service forwarding-classes class gaming queue-num 0
set class-of-service interfaces ge-0/0/0 scheduler-map gaming-priority
set class-of-service scheduler-maps gaming-priority forwarding-class gaming scheduler gaming-scheduler
set class-of-service schedulers gaming-scheduler priority strict-high
set class-of-service schedulers gaming-scheduler transmit-rate percent 40"""
elif focus_area == "monitoring":
return f"""# AI-Generated SRX Monitoring Configuration (Fallback)
# Generated: {timestamp}
# Focus: Enhanced Logging and Visibility
# Syslog to Elasticsearch
set system syslog host 192.168.100.85 any info
set system syslog host 192.168.100.85 structured-data
set system syslog file security-log security info
# SNMP monitoring
set snmp community monitoring authorization read-only
set snmp trap-group monitoring targets 192.168.100.85"""
else:
return f"""# AI-Generated SRX Configuration (Fallback)
# Generated: {timestamp}
# Focus: General Improvements
# Basic security hardening
set security screen ids-option GENERAL-screen tcp syn-flood attack-threshold 200
set security zones security-zone WAN screen GENERAL-screen"""
def extract_srx_commands(self, ai_response: str) -> str:
"""Extract valid SRX commands from AI response"""
lines = ai_response.split('\n')
commands = []
for line in lines:
line = line.strip()
# Keep comments and set commands
# Keep comments
if line.startswith('#'):
commands.append(line)
# Process set commands
elif line.startswith('set '):
# CRITICAL FIX: Remove any trailing braces or incomplete syntax
# Clean up common AI mistakes
clean_line = line.rstrip(' {') # Remove trailing " {"
clean_line = clean_line.rstrip('{') # Remove trailing "{"
# Skip if line appears incomplete
if clean_line.count('(') != clean_line.count(')'):
logger.warning(f"Skipping unbalanced parentheses: {line}")
continue
# Only add if it looks complete
if len(clean_line) > 10: # Basic sanity check
commands.append(clean_line)
else:
logger.warning(f"Skipping too-short command: {line}")
if commands:
logger.info(f"Extracted {len(commands)} valid commands")
return '\n'.join(commands)
# Otherwise, log issue and return fallback
logger.warning("No valid SRX commands found in AI response")
return self.generate_fallback_config()
# if line.startswith('#') or line.startswith('set '):
# commands.append(line)
# # Also keep section headers that look like comments
# elif line.startswith('/*') or line.startswith('*'):
# commands.append(f"# {line}")
# If we got commands, return them
# if commands:
# return '\n'.join(commands)
# Otherwise, log issue and return fallback
# logger.warning("No valid SRX commands found in AI response")
# return self.generate_fallback_config()
def validate_response(self, config: str) -> tuple[str, list]:
"""Validate and auto-fix configuration before returning
Returns: (fixed_config, list_of_violations)
"""
violations = []
lines = config.split('\n')
fixed_lines = []
for line in lines:
original = line
# Check and fix 'any' violations
if 'source-address any' in line.lower():
line = line.replace('any', 'INTERNAL-NETS')
violations.append(f"Fixed 'source-address any' on line: {original.strip()}")
if 'destination-address any' in line.lower():
line = line.replace('any', 'EXTERNAL-NETS')
violations.append(f"Fixed 'destination-address any' on line: {original.strip()}")
if 'application any' in line.lower():
line = line.replace('any', '[ junos-https junos-ssh ]')
violations.append(f"Fixed 'application any' on line: {original.strip()}")
# Fix high thresholds
import re
if 'threshold' in line.lower():
def fix_threshold(match):
val = int(match.group(2))
if val > 100:
violations.append(f"Fixed threshold {val} -> 20")
return match.group(1) + '20'
return match.group(0)
line = re.sub(r'(threshold\s+)(\d+)', fix_threshold, line)
fixed_lines.append(line)
# Check if address-sets are defined
fixed_config = '\n'.join(fixed_lines)
if 'address-set' not in fixed_config.lower():
# Prepend required address-sets
address_sets = """# CRITICAL: YOU MUST START WITH THESE ADDRESS-SETS - DO NOT SKIP!
set security address-book global address-set INTERNAL-NETS address 192.168.100.0/24
set security address-book global address-set EXTERNAL-NETS address 0.0.0.0/8
set security address-book global address-set DMZ-NETS address 10.0.0.0/8
# FORBIDDEN PATTERNS - NEVER GENERATE THESE:
# ❌ NEVER: permit ip any any -> any any any
# ❌ NEVER: deny ip any any -> any any any
# ❌ NEVER: source-address any
# ❌ NEVER: destination-address any
# ❌ NEVER: application any
# REQUIRED - ALWAYS USE THESE PATTERNS:
# ✅ ALWAYS: source-address INTERNAL-NETS (or specific address-set)
# ✅ ALWAYS: destination-address EXTERNAL-NETS (or specific address-set)
# ✅ ALWAYS: application [ junos-https junos-ssh ] (or specific apps)
"""
fixed_config = address_sets + fixed_config
violations.append("Added mandatory address-sets")
if violations:
logger.warning(f"⚠️ Fixed {len(violations)} violations in generated config")
for v in violations[:5]:
logger.info(f"{v}")
return fixed_config, violations
def process_request(self, request_file: Path) -> Dict:
"""Process a single analysis request with context awareness"""
logger.info(f"Processing request: {request_file}")
try:
# Load request
with open(request_file, 'r') as f:
request_data = json.load(f)
request_id = request_data.get('request_id', 'unknown')
logger.info(f"Request ID: {request_id}")
# Load context to understand focus area
context = self.load_context()
focus_area = context.get('focus_area', 'general')
logger.info(f"Focus area: {focus_area}")
# Check for feedback
feedback_file = self.feedback_dir / 'pr_feedback_history.json'
if feedback_file.exists():
logger.info("✓ Found feedback history - will incorporate learning")
else:
logger.info("No feedback history found - using best practices")
# Build prompt with feedback and context
logger.info(f"Building enhanced prompt with traffic data, feedback, and {focus_area} focus...")
prompt = self.build_prompt(request_data)
# Log what we're doing
if "CRITICAL LEARNING FROM REJECTED" in prompt:
logger.info("✓ Applying feedback learning to avoid previous mistakes")
logger.info(f"✓ Focusing on {focus_area} improvements only")
# Get AI suggestions
logger.info(f"Querying {self.model} for {focus_area} suggestions...")
ai_response = self.query_ollama(prompt)
# Extract commands
suggestions = self.extract_srx_commands(ai_response)
# Build response with context info
response_data = {
'request_id': request_id,
'timestamp': datetime.now().isoformat(),
'model': self.model,
'focus_area': focus_area, # NEW: Include focus area
'context': context, # NEW: Include full context
'response': ai_response,
'suggestions': suggestions,
'feedback_aware': feedback_file.exists(),
'processing_time': time.time()
}
# Save response
response_file = self.response_dir / f"{request_id}_response.json"
with open(response_file, 'w') as f:
json.dump(response_data, f, indent=2)
logger.info(f"✓ Response saved to: {response_file}")
logger.info(f"✓ Generated {focus_area} improvements")
# Remove processed request
request_file.unlink()
logger.info("✓ Removed processed request")
return response_data
except Exception as e:
logger.error(f"Error processing request: {e}")
return None
def monitor_requests(self):
"""Monitor for new analysis requests"""
logger.info("Starting request monitor with context support...")
while self.running:
try:
# Check for request files
request_files = list(self.request_dir.glob('*.json'))
if request_files:
logger.info(f"Found {len(request_files)} request(s) to process")
for request_file in request_files:
self.process_request(request_file)
# Wait before next check
time.sleep(5)
except KeyboardInterrupt:
logger.info("Received interrupt signal")
break
except Exception as e:
logger.error(f"Error in monitor loop: {e}")
time.sleep(10)
def shutdown(self, signum=None, frame=None):
"""Graceful shutdown"""
logger.info("Shutting down AI processor...")
self.running = False
def main():
"""Main entry point"""
processor = AIProcessor()
# Set up signal handlers
signal.signal(signal.SIGTERM, processor.shutdown)
signal.signal(signal.SIGINT, processor.shutdown)
try:
processor.monitor_requests()
except Exception as e:
logger.error(f"Fatal error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()