From 66d97011ab29e3843c01fc7fb839ee7345149082 Mon Sep 17 00:00:00 2001 From: netops Date: Thu, 4 Sep 2025 02:19:22 +0000 Subject: [PATCH] Initial documentation structure --- README.md | 28 + collect_scripts.sh | 41 + docs/operations/README.md | 50 ++ infrastructure/systemd/gitea-webhook.service | 18 + infrastructure/systemd/orchestrator.service | 47 ++ scripts/README.md | 29 + .../orchestrator/core/config.yaml.template | 56 ++ .../orchestrator/core/orchestrator_main.py | 771 ++++++++++++++++++ scripts/orchestrator/core/requirements.txt | 30 + .../gitea/close_pr_with_feedback.py | 141 ++++ .../orchestrator/gitea/gitea_pr_feedback.py | 377 +++++++++ .../orchestrator/gitea/webhook_listener.py | 316 +++++++ scripts/orchestrator/pipeline/create_ai_pr.py | 160 ++++ scripts/orchestrator/pipeline/prepare_pr.py | 66 ++ scripts/orchestrator/pipeline/run_pipeline.py | 330 ++++++++ .../orchestrator/srx/collect_srx_config.py | 317 +++++++ scripts/orchestrator/srx/deploy_approved.py | 28 + scripts/orchestrator/srx/srx_manager.py | 309 +++++++ 18 files changed, 3114 insertions(+) create mode 100644 README.md create mode 100755 collect_scripts.sh create mode 100644 docs/operations/README.md create mode 100644 infrastructure/systemd/gitea-webhook.service create mode 100644 infrastructure/systemd/orchestrator.service create mode 100644 scripts/README.md create mode 100644 scripts/orchestrator/core/config.yaml.template create mode 100644 scripts/orchestrator/core/orchestrator_main.py create mode 100644 scripts/orchestrator/core/requirements.txt create mode 100755 scripts/orchestrator/gitea/close_pr_with_feedback.py create mode 100755 scripts/orchestrator/gitea/gitea_pr_feedback.py create mode 100755 scripts/orchestrator/gitea/webhook_listener.py create mode 100755 scripts/orchestrator/pipeline/create_ai_pr.py create mode 100755 scripts/orchestrator/pipeline/prepare_pr.py create mode 100755 scripts/orchestrator/pipeline/run_pipeline.py create mode 100755 scripts/orchestrator/srx/collect_srx_config.py create mode 100755 scripts/orchestrator/srx/deploy_approved.py create mode 100755 scripts/orchestrator/srx/srx_manager.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..f7e733e --- /dev/null +++ b/README.md @@ -0,0 +1,28 @@ +# SRX AI-Driven Network Automation System + +## Production System Documentation + +This repository documents the AI-driven network automation system managing Juniper SRX firewall configurations. + +### System Components +- **Orchestrator VM** (192.168.100.87): Main automation hub +- **AI Processor VM** (192.168.100.86): Ollama llama2:13b +- **Elasticsearch VM** (192.168.100.85): NetFlow analytics +- **Gitea Server** (git.salmutt.dev): Git repository and PR management +- **Juniper SRX** (192.168.100.1): Target firewall + +### Key Metrics +- 850,000+ NetFlow records processed daily +- 19+ days uptime (AI processor) +- 8 security zones configured +- 100% syntax accuracy after feedback learning + +## Documentation Structure +- [Architecture Overview](docs/architecture/README.md) +- [Operations Guide](docs/operations/README.md) +- [API Reference](docs/api/README.md) +- [Script Inventory](scripts/README.md) + +--- +*Repository maintained by: netops* +*Last Updated: September 2025* diff --git a/collect_scripts.sh b/collect_scripts.sh new file mode 100755 index 0000000..88fd00c --- /dev/null +++ b/collect_scripts.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +echo "Collecting production scripts..." + +# Create directories for organized scripts +mkdir -p scripts/orchestrator/{core,pipeline,gitea,srx,monitoring,utilities} + +# Core orchestrator files +echo "Collecting core orchestrator files..." +if [ -f ~/orchestrator/orchestrator_main.py ]; then + cp ~/orchestrator/orchestrator_main.py scripts/orchestrator/core/ + cp ~/orchestrator/requirements.txt scripts/orchestrator/core/ +fi + +# Configuration (sanitized) +if [ -f ~/orchestrator/config.yaml ]; then + cp ~/orchestrator/config.yaml scripts/orchestrator/core/config.yaml.template +fi + +# Pipeline scripts +echo "Collecting pipeline scripts..." +for script in run_pipeline.py prepare_pr.py create_ai_pr.py; do + [ -f ~/orchestrator/$script ] && cp ~/orchestrator/$script scripts/orchestrator/pipeline/ +done + +# Gitea integration scripts +for script in webhook_listener.py gitea_pr_feedback.py close_pr_with_feedback.py; do + [ -f ~/orchestrator/$script ] && cp ~/orchestrator/$script scripts/orchestrator/gitea/ +done + +# SRX management scripts +for script in collect_srx_config.py srx_manager.py deploy_approved.py; do + [ -f ~/orchestrator/$script ] && cp ~/orchestrator/$script scripts/orchestrator/srx/ +done + +# Service files +sudo cp /etc/systemd/system/orchestrator.service infrastructure/systemd/ 2>/dev/null +sudo cp /etc/systemd/system/gitea-webhook.service infrastructure/systemd/ 2>/dev/null +sudo chown $USER:$USER infrastructure/systemd/*.service 2>/dev/null + +echo "Collection complete!" diff --git a/docs/operations/README.md b/docs/operations/README.md new file mode 100644 index 0000000..7cd4eaf --- /dev/null +++ b/docs/operations/README.md @@ -0,0 +1,50 @@ +# Operations Guide + +## Daily Operations + +### Morning Health Check + +Check services: +systemctl status orchestrator +systemctl status gitea-webhook + +Check for pending PRs: +curl -s https://git.salmutt.dev/api/v1/repos/sal/srx-config/pulls?state=open + +View recent activity: +journalctl -u orchestrator --since "12 hours ago" | grep -E "anomaly|trigger" + +### Manual Operations + +Trigger Analysis: +cd /home/netops/orchestrator +python3 run_pipeline.py --context security --timeout 120 + +View AI Responses: +ls -lt /shared/ai-gitops/responses/ | head -10 + +Check Learning Patterns: +cat /shared/ai-gitops/learning/patterns.json + +## Troubleshooting + +### Service Issues + +Check logs: +journalctl -u orchestrator -n 50 + +Verify NFS mount: +df -h | grep shared + +Test Elasticsearch: +curl -s 192.168.100.85:9200/_cluster/health + +### AI Not Responding + +SSH to AI processor: +ssh netops@192.168.100.86 +systemctl status ai-processor +systemctl status ollama + +Test Ollama: +curl http://localhost:11434/api/tags diff --git a/infrastructure/systemd/gitea-webhook.service b/infrastructure/systemd/gitea-webhook.service new file mode 100644 index 0000000..bb6b137 --- /dev/null +++ b/infrastructure/systemd/gitea-webhook.service @@ -0,0 +1,18 @@ +[Unit] +Description=Gitea Webhook Listener for AI Feedback +After=network.target + +[Service] +Type=simple +User=netops +WorkingDirectory=/home/netops/orchestrator +#ExecStart=/usr/bin/python3 /home/netops/orchestrator/webhook_listener.py +ExecStart=/home/netops/orchestrator/venv/bin/python /home/netops/orchestrator/webhook_listener.py +Restart=always +RestartSec=10 +StandardOutput=journal +StandardError=journal +EnvironmentFile=-/home/netops/.env + +[Install] +WantedBy=multi-user.target diff --git a/infrastructure/systemd/orchestrator.service b/infrastructure/systemd/orchestrator.service new file mode 100644 index 0000000..274ca99 --- /dev/null +++ b/infrastructure/systemd/orchestrator.service @@ -0,0 +1,47 @@ +[Unit] +Description=Network AI Orchestrator Service +Documentation=https://git.salmutt.dev/sal/srx-config +After=network-online.target +Wants=network-online.target +RequiresMountsFor=/shared/ai-gitops + +[Service] +Type=simple +User=netops +Group=netops +WorkingDirectory=/home/netops/orchestrator + +# Load environment variables +EnvironmentFile=-/home/netops/.env + +# Python virtual environment activation and script execution +ExecStart=/home/netops/orchestrator/venv/bin/python /home/netops/orchestrator/orchestrator_main.py + +# Restart configuration +Restart=on-failure +RestartSec=30 +StartLimitInterval=200 +StartLimitBurst=5 + +# Resource limits +MemoryLimit=8G +CPUQuota=50% + +# Environment variables +Environment="PYTHONUNBUFFERED=1" +Environment="ORCHESTRATOR_ENV=production" + +# Logging +StandardOutput=journal +StandardError=journal +SyslogIdentifier=orchestrator + +# Security hardening +PrivateTmp=yes +NoNewPrivileges=true +ProtectSystem=strict +ProtectHome=read-only +ReadWritePaths=/shared/ai-gitops /home/netops/orchestrator/logs /var/lib/orchestrator /var/log/orchestrator + +[Install] +WantedBy=multi-user.target diff --git a/scripts/README.md b/scripts/README.md new file mode 100644 index 0000000..da3ff28 --- /dev/null +++ b/scripts/README.md @@ -0,0 +1,29 @@ +# Production Scripts Inventory + +## Orchestrator Scripts + +### Core Service +- orchestrator_main.py - Main orchestration loop +- config.yaml - Configuration file +- requirements.txt - Python dependencies + +### Pipeline Scripts +- run_pipeline.py - Manual pipeline trigger with context +- prepare_pr.py - Prepares PR content from AI response +- create_ai_pr.py - Creates PR in Gitea + +### Webhook Integration +- webhook_listener.py - Flask server listening on port 5000 +- gitea_pr_feedback.py - Processes PR feedback +- deploy_approved.py - Deploys merged PRs to SRX + +## Usage Examples + +Manual security analysis: +python3 run_pipeline.py --context security --timeout 120 + +Create PR from latest response: +python3 prepare_pr.py && python3 create_ai_pr.py + +Deploy approved configuration: +python3 deploy_approved.py diff --git a/scripts/orchestrator/core/config.yaml.template b/scripts/orchestrator/core/config.yaml.template new file mode 100644 index 0000000..7cdc95a --- /dev/null +++ b/scripts/orchestrator/core/config.yaml.template @@ -0,0 +1,56 @@ +# Network AI Orchestrator Configuration +elasticsearch: + host: "192.168.100.85:9200" + index: "netflow-*" + verify_certs: false + timeout: 30 + +analysis: + interval_minutes: 60 + window_hours: 168 + min_traffic_bytes: 1000000 + +pr_creation: + enabled: true + frequency: "smart" # Options: weekly, daily, manual, smart + triggers: + - high_traffic anomaly #Create PR if traffic spike + - security_event #Create PR if security issue + - scheduled: "weekly" + thresholds: + traffic_spike: 200 #200% increase triggers PR + new_hosts: 10 #10+ new IPs triggers PR + day_of_week: "saturday" # 0=Monday, 6=Sunday + hour_of_day: 22 # 24-hour format (9 = 9 AM) + skip_if_pending: true # Don't create if PR already open + min_days_between: 7 # Minimum days between PRs + +gitea: + url: "https://git.salmutt.dev" + repo: "sal/srx-config" + token: "${GITEA_TOKEN}" # Use actual token + branch: "main" + labels: ["ai-generated", "auto-config", "pending-review"] + +srx: + host: "192.168.100.1" + port: 830 + username: "netops" + ssh_key: "/home/netops/.ssh/srx_key" + +shared_storage: + path: "/shared/ai-gitops" + +state_tracking: + enabled: true + state_file: '/shared/ai-gitops/state/orchestrator_state.json' + track_pr_history: true + +ai: + request_timeout: 120 + max_retries: 3 + +logging: + level: "INFO" + max_file_size: "100MB" + retention_days: 30 diff --git a/scripts/orchestrator/core/orchestrator_main.py b/scripts/orchestrator/core/orchestrator_main.py new file mode 100644 index 0000000..16a88e0 --- /dev/null +++ b/scripts/orchestrator/core/orchestrator_main.py @@ -0,0 +1,771 @@ +#!/usr/bin/env python3 +""" +Enhanced Network AI Orchestrator - Production Version with Gitea Integration +Compatible with Elasticsearch 7.x +""" +import os +import sys +import json +import time +import logging +import signal +import hashlib +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, List, Optional +import yaml +import uuid +import threading +from pathlib import Path +from elasticsearch import Elasticsearch # Using sync version for ES 7.x +from git import Repo +import requests + +# Load environment variables from home directory .env +from dotenv import load_dotenv +env_path = Path.home() / '.env' # This gets /home/netops/.env +load_dotenv(env_path) + +# ADD THIS IMPORT - New for Phase 3 +from gitea_integration import GiteaIntegration + +# Configure production logging +def setup_logging(): + """Configure comprehensive logging for production""" + log_dir = Path("/home/netops/orchestrator/logs") + log_dir.mkdir(exist_ok=True) + + log_file = log_dir / f"orchestrator_{datetime.now().strftime('%Y%m%d')}.log" + + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(log_file), + logging.StreamHandler(sys.stdout) + ] + ) + return logging.getLogger(__name__) + +logger = setup_logging() + +class NetworkOrchestrator: + def __init__(self, config_path: str = "/home/netops/orchestrator/config.yaml"): + """Initialize the orchestrator with configuration""" + self.config = self.load_config(config_path) + self.es_client = None + self.git_repo = None + self.running = True + self.shared_dir = Path("/shared/ai-gitops") + self.request_dir = self.shared_dir / "requests" + self.response_dir = self.shared_dir / "responses" + + # ADD THIS - Initialize state for Phase 3 + self.state = self.load_state() + + # Ensure directories exist + self.request_dir.mkdir(parents=True, exist_ok=True) + self.response_dir.mkdir(parents=True, exist_ok=True) + + logger.info("Orchestrator initialized") + + def should_create_pr(self): + """Check if we should create a PR based on schedule and state""" + if not self.config.get('pr_creation', {}).get('enabled', True): + return False + + # Load state + state = self.load_state() + + # Check if pending PR exists + if self.config['pr_creation'].get('skip_if_pending', True): + if state.get('pending_pr', False): + logger.info("Skipping PR - existing PR is pending") + return False + + # Check frequency + frequency = self.config['pr_creation'].get('frequency', 'weekly') + + if frequency == 'weekly': + # Check if it's the right day and hour + now = datetime.now() + target_day = self.config['pr_creation'].get('day_of_week', 'saturday') + target_hour = self.config['pr_creation'].get('hour_of_day', 5) + + # Convert day name to number + days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday'] + target_day_num = days.index(target_day.lower()) + + # Check if it's the right day and hour + if now.weekday() != target_day_num or now.hour != target_hour: + return False + + # Check minimum days between PRs + if state.get('last_pr_created'): + last_pr_date = datetime.fromisoformat(state['last_pr_created']) + days_since = (datetime.now() - last_pr_date).days + min_days = self.config['pr_creation'].get('min_days_between', 7) + + if days_since < min_days: + logger.info(f"Skipping PR - only {days_since} days since last PR") + return False + + return True + + def load_state(self): + """Load orchestrator state""" + state_file = self.config.get('state_tracking', {}).get('state_file', '/var/lib/orchestrator/state.json') + if Path(state_file).exists(): + with open(state_file, 'r') as f: + return json.load(f) + return {} + + def check_smart_triggers(self, traffic_data: Dict) -> bool: + """Check if any smart triggers are met""" + if self.config['pr_creation'].get('frequency') != 'smart': + return False + + triggers = self.config['pr_creation'].get('triggers', []) + thresholds = self.config['pr_creation'].get('thresholds', {}) + + # Load previous state for comparison + state = self.load_state() + last_data = state.get('last_traffic_data', {}) + + # Check traffic spike + if 'high_traffic_anomaly' in str(triggers): + current_flows = sum( + b['doc_count'] + for b in traffic_data.get('top_talkers', {}).get('buckets', []) + ) + last_flows = last_data.get('total_flows', current_flows) + + if last_flows > 0: + spike_percent = ((current_flows - last_flows) / last_flows) * 100 + if spike_percent >= thresholds.get('traffic_spike', 200): + logger.info(f"🚨 Traffic spike detected: {spike_percent:.1f}% increase!") + return True + + # Check new hosts + if 'security_event' in str(triggers): + current_ips = set( + b['key'] + for b in traffic_data.get('top_talkers', {}).get('buckets', []) + ) + last_ips = set(last_data.get('top_ips', [])) + + new_hosts = current_ips - last_ips + if len(new_hosts) >= thresholds.get('new_hosts', 10): + logger.info(f"🚨 Security event: {len(new_hosts)} new hosts detected!") + logger.info(f" New IPs: {', '.join(list(new_hosts)[:5])}...") + return True + + # Check weekly schedule fallback + if any('scheduled' in str(t) for t in triggers): + if self.should_create_pr(): # Use existing weekly logic + logger.info("šŸ“… Weekly schedule triggered") + return True + + # Save current data for next comparison + self.save_state({ + 'last_traffic_data': { + 'total_flows': sum( + b['doc_count'] + for b in traffic_data.get('top_talkers', {}).get('buckets', []) + ), + 'top_ips': [ + b['key'] + for b in traffic_data.get('top_talkers', {}).get('buckets', []) + ] + } + }) + + return False + + def save_state(self, updates): + """Save orchestrator state""" + state_file = self.config.get('state_tracking', {}).get('state_file', '/var/lib/orchestrator/state.json') + state = self.load_state() + state.update(updates) + state['last_updated'] = datetime.now().isoformat() + + with open(state_file, 'w') as f: + json.dump(state, f, indent=2) + + def load_config(self, config_path: str) -> Dict: + """Load configuration from YAML file""" + try: + with open(config_path, 'r') as f: + config = yaml.safe_load(f) + logger.info(f"Configuration loaded from {config_path}") + + # Replace environment variables + if 'gitea' in config and 'token' in config['gitea']: + if config['gitea']['token'] == '${GITEA_TOKEN}': + config['gitea']['token'] = os.environ.get('GITEA_TOKEN', '') + + return config + except Exception as e: + logger.error(f"Failed to load config: {e}") + # Use defaults if config fails + return { + 'elasticsearch': { + 'host': '192.168.100.85:9200', + 'index': 'netflow-*' + }, + 'analysis': { + 'interval_minutes': 60, + 'window_hours': 24 + }, + 'gitea': { + 'url': 'https://git.salmutt.dev', + 'repo': 'sal/srx-config', + 'token': os.environ.get('GITEA_TOKEN', '') + } + } + + def setup_elasticsearch(self): + """Initialize Elasticsearch connection (synchronous for ES 7.x)""" + try: + es_config = self.config['elasticsearch'] + self.es_client = Elasticsearch( + hosts=[es_config['host']], + verify_certs=False, + timeout=30 + ) + # Test connection + info = self.es_client.info() + logger.info(f"Connected to Elasticsearch: {info['version']['number']}") + return True + except Exception as e: + logger.error(f"Failed to connect to Elasticsearch: {e}") + self.es_client = None + return False + + def collect_traffic_data(self) -> Dict: + """Collect traffic data from Elasticsearch (synchronous)""" + if not self.es_client: + logger.warning("Elasticsearch not connected, using mock data") + return self.generate_mock_data() + + try: + window_hours = self.config['analysis']['window_hours'] + query = { + "query": { + "range": { + "@timestamp": { + "gte": f"now-{window_hours}h", + "lte": "now" + } + } + }, + "size": 0, + "aggs": { + "top_talkers": { + "terms": { + "field": "netflow.ipv4_src_addr", + "size": 20 + }, + "aggs": { + "bytes": {"sum": {"field": "netflow.in_bytes"}}, + "packets": {"sum": {"field": "netflow.in_pkts"}} + } + }, + "protocols": { + "terms": { + "field": "netflow.protocol", + "size": 10 + } + }, + "top_destinations": { + "terms": { + "field": "netflow.ipv4_dst_addr", # NEW + "size": 20 + } + }, + "vlans": { + "terms": { + "field": "vlan_id", + "size": 20 + }, + "aggs": { + "bytes": {"sum": {"field": "bytes"}} + } + }, + "hourly_traffic": { + "date_histogram": { + "field": "@timestamp", + "calendar_interval": "hour" + }, + "aggs": { + "bytes": {"sum": {"field": "bytes"}} + } + } + } + } + + result = self.es_client.search( + index=self.config['elasticsearch']['index'], + body=query + ) + + total_hits = result['hits']['total'] + # Handle both ES 6.x and 7.x response formats + if isinstance(total_hits, dict): + total_count = total_hits['value'] + else: + total_count = total_hits + + logger.info(f"Collected traffic data: {total_count} flows") + return result['aggregations'] + + except Exception as e: + logger.error(f"Error collecting traffic data: {e}") + return self.generate_mock_data() + + def generate_mock_data(self) -> Dict: + """Generate mock traffic data for testing""" + return { + "top_talkers": { + "buckets": [ + {"key": "192.168.100.50", "doc_count": 15000, + "bytes": {"value": 5000000}, "packets": {"value": 10000}}, + {"key": "192.168.100.51", "doc_count": 12000, + "bytes": {"value": 4000000}, "packets": {"value": 8000}}, + {"key": "192.168.100.11", "doc_count": 8000, + "bytes": {"value": 2000000}, "packets": {"value": 5000}}, + {"key": "10.0.0.5", "doc_count": 6000, + "bytes": {"value": 1500000}, "packets": {"value": 3000}} + ] + }, + "protocols": { + "buckets": [ + {"key": "tcp", "doc_count": 25000}, + {"key": "udp", "doc_count": 15000}, + {"key": "icmp", "doc_count": 2000} + ] + }, + "vlans": { + "buckets": [ + {"key": 100, "doc_count": 20000, "bytes": {"value": 8000000}}, + {"key": 200, "doc_count": 15000, "bytes": {"value": 6000000}}, + {"key": 300, "doc_count": 5000, "bytes": {"value": 2000000}} + ] + } + } + +# def request_ai_analysis(self, traffic_data: Dict) -> Optional[Dict]: +# """Send traffic data to AI for analysis""" +# request_id = str(uuid.uuid4()) +# request_file = self.request_dir / f"{request_id}.json" +# +# request_data = { +# "request_id": request_id, +# "timestamp": datetime.now().isoformat(), +# "type": "traffic_analysis", +# "data": traffic_data, +# "prompt": self.build_analysis_prompt(traffic_data) +# } +# try: +# with open(request_file, 'w') as f: +# json.dump(request_data, f, indent=2) +# logger.info(f"AI request created: {request_id}") +# +# # Wait for response (with timeout) +# response = self.wait_for_ai_response(request_id, timeout=600) +# return response +# +# except Exception as e: +# logger.error(f"Error requesting AI analysis: {e}") +# return None + def request_ai_analysis(self, traffic_data: Dict) -> Optional[Dict]: + """Send traffic data to AI for analysis""" + request_id = str(uuid.uuid4()) + request_file = self.request_dir / f"{request_id}.json" + + request_data = { + "request_id": request_id, + "timestamp": datetime.now().isoformat(), + "type": "traffic_analysis", + "data": traffic_data, + "prompt": self.build_analysis_prompt(traffic_data) + } + + try: + with open(request_file, 'w') as f: + json.dump(request_data, f, indent=2) + logger.info(f"AI request created: {request_id}") + + # Wait for response (with timeout) + response = self.wait_for_ai_response(request_id, timeout=600) + + # Convert to pipeline format for PR creation + if response and 'response' in response: + # Create pipeline-compatible format + pipeline_response = { + "suggestions": [], + "focus_area": "security", + "feedback_aware": True, + "timestamp": datetime.now().isoformat() + } + + # Extract SRX config lines from AI response + ai_text = response.get('response', '') + lines = ai_text.split('\n') + + for line in lines: + line = line.strip() + # Capture any line that looks like SRX config + if line.startswith('set '): + # Remove any trailing braces + clean_line = line.rstrip(' {') + clean_line = clean_line.rstrip('{') + pipeline_response['suggestions'].append({ + "config": clean_line, + "reason": "AI-generated optimization" + }) + + # Only save if we found config lines + if pipeline_response['suggestions']: + pipeline_file = self.response_dir / f"pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{request_id[:8]}_response.json" + with open(pipeline_file, 'w') as f: + json.dump(pipeline_response, f, indent=2) + logger.info(f"Saved pipeline format response with {len(pipeline_response['suggestions'])} configs") + + return response + + except Exception as e: + logger.error(f"Error requesting AI analysis: {e}") + return None + + def build_analysis_prompt(self, traffic_data: Dict) -> str: + """Build prompt for AI analysis""" + prompt = """Analyze this network traffic data and suggest optimizations for a Juniper SRX firewall: + +Traffic Summary: +- Top Talkers: {} +- Active VLANs: {} +- Protocol Distribution: {} + +Based on this data, please provide: +1. Security rule optimizations (as Juniper SRX configuration commands) +2. QoS improvements for high-traffic hosts +3. VLAN segmentation recommendations +4. Potential security concerns or anomalies + +Format your response with specific Juniper SRX configuration commands that can be applied. +Include comments explaining each change.""" + + # Extract key metrics + top_ips = [b['key'] for b in traffic_data.get('top_talkers', {}).get('buckets', [])][:5] + vlans = [str(b['key']) for b in traffic_data.get('vlans', {}).get('buckets', [])][:5] + protocols = [str(b['key']) for b in traffic_data.get('protocols', {}).get('buckets', [])][:3] + + return prompt.format( + ', '.join(top_ips) if top_ips else 'No data', + ', '.join(vlans) if vlans else 'No VLANs', + ', '.join(protocols) if protocols else 'No protocols' + ) + + def wait_for_ai_response(self, request_id: str, timeout: int = 120) -> Optional[Dict]: + """Wait for AI response file""" + response_file = self.response_dir / f"{request_id}_response.json" + start_time = time.time() + + while time.time() - start_time < timeout: + if response_file.exists(): + try: + time.sleep(1) # Give AI time to finish writing + with open(response_file, 'r') as f: + response = json.load(f) + logger.info(f"AI response received: {request_id}") + + # Log a snippet of the response + if 'response' in response: + snippet = response['response'][:200] + '...' if len(response['response']) > 200 else response['response'] + logger.info(f"AI suggestion snippet: {snippet}") + + # Clean up files + response_file.unlink() + (self.request_dir / f"{request_id}.json").unlink(missing_ok=True) + return response + except Exception as e: + logger.error(f"Error reading AI response: {e}") + return None + time.sleep(2) + + logger.warning(f"AI response timeout for {request_id}") + return None + + # REPLACE THE EXISTING create_gitea_pr METHOD WITH THIS ENHANCED VERSION + def create_gitea_pr(self, ai_response: Dict = None) -> bool: + """Create pull request in Gitea with suggested changes""" + try: + # If no AI response provided, get the latest one + if not ai_response: + latest_suggestion = self._get_latest_ai_suggestion() + if not latest_suggestion: + logger.warning("No AI suggestions found to create PR") + return False + + # Read the suggestion file + with open(latest_suggestion['path'], 'r') as f: + ai_response = json.load(f) + + # Check if we should create a PR + if not self.should_create_pr(): + logger.info("Skipping PR creation - conditions not met") + return False + + # Check for existing pending PR + if self.state.get('pending_pr'): + logger.info(f"Skipping PR creation - pending PR exists: {self.state['pending_pr']}") + return False + + logger.info("Creating Gitea pull request with AI suggestions...") + + # Initialize Gitea integration + gitea = GiteaIntegration(self.config['gitea']) + + # Extract the SRX configuration + srx_config = ai_response.get('suggestions', ai_response.get('response', '')) + if not srx_config or srx_config.strip() == '': + logger.warning("Empty or invalid suggestions, skipping PR creation") + return False + + # Create the PR + pr_info = gitea.create_pr_with_config( + srx_config=srx_config, + title=f"AI Network Configuration Suggestions - {datetime.now().strftime('%B %d, %Y')}", + description=None # Will auto-generate + ) + + if pr_info: + # Update state with PR information + self.state['pending_pr'] = pr_info['number'] + self.state['last_pr_created'] = datetime.now().isoformat() + self.state['pr_url'] = pr_info['url'] + self.save_state(self.state) + + logger.info(f"Successfully created PR #{pr_info['number']}: {pr_info['url']}") + + # Log to a separate file for notifications/monitoring + with open('/var/log/orchestrator/pr_created.log', 'a') as f: + f.write(f"{datetime.now().isoformat()} - Created PR #{pr_info['number']} - {pr_info['url']}\n") + + return True + else: + logger.error("Failed to create PR in Gitea") + return False + + except Exception as e: + logger.error(f"Error creating Gitea PR: {e}", exc_info=True) + return False + + # ADD THIS NEW METHOD + def _get_latest_ai_suggestion(self) -> Optional[Dict]: + """Get the most recent AI suggestion file""" + response_dir = '/shared/ai-gitops/responses' + + try: + # List all response files + response_files = [] + for filename in os.listdir(response_dir): + if filename.startswith('response_') and filename.endswith('.json'): + filepath = os.path.join(response_dir, filename) + # Get file modification time + mtime = os.path.getmtime(filepath) + response_files.append({ + 'path': filepath, + 'filename': filename, + 'mtime': mtime + }) + + if not response_files: + return None + + # Sort by modification time and get the latest + response_files.sort(key=lambda x: x['mtime'], reverse=True) + return response_files[0] + + except Exception as e: + logger.error(f"Error finding latest AI suggestion: {e}") + return None + + # ADD THIS NEW METHOD + def check_pr_status(self): + """Check the status of pending pull requests""" + if not self.state.get('pending_pr'): + return + + try: + gitea = GiteaIntegration(self.config['gitea']) + pr_status = gitea.get_pr_status(self.state['pending_pr']) + + if pr_status: + logger.info(f"PR #{pr_status['number']} status: {pr_status['state']} (merged: {pr_status['merged']})") + + # If PR is closed or merged, clear the pending_pr flag + if pr_status['state'] == 'closed': + logger.info(f"PR #{pr_status['number']} has been closed") + self.state['pending_pr'] = None + self.state['last_pr_status'] = 'closed' + self.state['last_pr_closed'] = datetime.now().isoformat() + + if pr_status['merged']: + self.state['last_pr_status'] = 'merged' + logger.info(f"PR #{pr_status['number']} was merged!") + # Mark for deployment + self.state['pending_deployment'] = True + self.state['deployment_pr'] = pr_status['number'] + + self.save_state(self.state) + + except Exception as e: + logger.error(f"Error checking PR status: {e}") + + def run_analysis_cycle(self): + """Run a complete analysis cycle""" + logger.info("="*60) + logger.info("Starting traffic analysis cycle") + logger.info("="*60) + + try: + # Always collect traffic data + logger.info("Step 1: Collecting traffic data from Elasticsearch...") + traffic_data = self.collect_traffic_data() + + if not traffic_data: + logger.warning("No traffic data available, skipping analysis") + return + + # Log summary of collected data + top_talkers = traffic_data.get('top_talkers', {}).get('buckets', []) + if top_talkers: + logger.info(f"Found {len(top_talkers)} top talkers") + logger.info(f"Top IP: {top_talkers[0]['key']} with {top_talkers[0]['doc_count']} flows") + + # Check smart triggers if configured + trigger_fired = False + if self.config['pr_creation'].get('frequency') == 'smart': + trigger_fired = self.check_smart_triggers(traffic_data) + if trigger_fired: + logger.info("šŸŽÆ Smart trigger activated - will create PR") + + # Always request AI analysis + logger.info("Step 2: Requesting AI analysis...") + ai_response = self.request_ai_analysis(traffic_data) + + # Save state for analysis + self.save_state({ + 'last_analysis_run': datetime.now().isoformat(), + 'last_analysis_data': { + 'top_talkers_count': len(top_talkers), + 'response_received': bool(ai_response) + } + }) + + # Determine if we should create PR + should_create = False + + if self.config['pr_creation'].get('frequency') == 'smart': + # Smart mode: create if trigger fired + should_create = trigger_fired + else: + # Regular mode: use existing schedule logic + should_create = self.should_create_pr() + + # Create PR if conditions met + if should_create: + if ai_response and (ai_response.get('response') or ai_response.get('suggestions')): + logger.info("Step 3: Creating PR with AI suggestions...") + if self.create_gitea_pr(ai_response): + logger.info("āœ“ PR created successfully") + if trigger_fired: + logger.info(" Reason: Smart trigger fired") + else: + logger.warning("Failed to create Gitea PR") + else: + logger.info("No actionable suggestions from AI analysis") + else: + logger.info("No triggers met - analysis saved for future use") + + except Exception as e: + logger.error(f"Error in analysis cycle: {e}") + + logger.info("="*60) + + def main_loop(self): + """Main orchestrator loop""" + logger.info("Starting Network AI Orchestrator") + + # Setup Elasticsearch connection + if not self.setup_elasticsearch(): + logger.warning("Running without Elasticsearch connection - using mock data") + + interval = self.config['analysis'].get('interval_minutes', 60) * 60 + + # Run first analysis immediately + self.run_analysis_cycle() + + while self.running: + try: + logger.info(f"Next analysis scheduled in {interval/60} minutes") + logger.info(f"Next run at: {(datetime.now() + timedelta(seconds=interval)).strftime('%H:%M:%S')}") + + # MODIFIED: Check PR status every 15 minutes during the wait + for i in range(int(interval / 60)): # Check every minute + if not self.running: + break + + time.sleep(60) + + # Check PR status every 15 minutes + if i % 15 == 14 and self.state.get('pending_pr'): + logger.info("Checking PR status...") + self.check_pr_status() + + if self.running: # Check again after sleep + self.run_analysis_cycle() + + except KeyboardInterrupt: + logger.info("Received keyboard interrupt") + break + except Exception as e: + logger.error(f"Error in main loop: {e}") + time.sleep(60) # Wait before retry + + logger.info("Orchestrator shutdown complete") + + def shutdown(self, signum=None, frame=None): + """Graceful shutdown handler""" + if signum: + logger.info(f"Received signal {signum}, initiating shutdown...") + else: + logger.info("Initiating shutdown...") + self.running = False + if self.es_client: + try: + # Close Elasticsearch connection + self.es_client.transport.close() + except: + pass + +def main(): + """Main entry point""" + orchestrator = NetworkOrchestrator() + + # Set up signal handlers + signal.signal(signal.SIGTERM, orchestrator.shutdown) + signal.signal(signal.SIGINT, orchestrator.shutdown) + + try: + orchestrator.main_loop() + except Exception as e: + logger.error(f"Fatal error: {e}", exc_info=True) + sys.exit(1) + + sys.exit(0) + +if __name__ == "__main__": + main() diff --git a/scripts/orchestrator/core/requirements.txt b/scripts/orchestrator/core/requirements.txt new file mode 100644 index 0000000..0e9c361 --- /dev/null +++ b/scripts/orchestrator/core/requirements.txt @@ -0,0 +1,30 @@ +aiofiles==24.1.0 +bcrypt==4.3.0 +blinker==1.9.0 +certifi==2025.8.3 +cffi==1.17.1 +charset-normalizer==3.4.2 +click==8.2.1 +cryptography==45.0.5 +elastic-transport==9.1.0 +elasticsearch==7.17.9 +Flask==3.1.1 +gitdb==4.0.12 +GitPython==3.1.45 +idna==3.10 +itsdangerous==2.2.0 +Jinja2==3.1.6 +MarkupSafe==3.0.2 +paramiko==3.5.1 +pycparser==2.22 +PyNaCl==1.5.0 +python-dateutil==2.9.0.post0 +python-dotenv==1.1.1 +PyYAML==6.0.2 +requests==2.32.4 +six==1.17.0 +smmap==5.0.2 +tabulate==0.9.0 +typing_extensions==4.14.1 +urllib3==1.26.20 +Werkzeug==3.1.3 diff --git a/scripts/orchestrator/gitea/close_pr_with_feedback.py b/scripts/orchestrator/gitea/close_pr_with_feedback.py new file mode 100755 index 0000000..b3930a8 --- /dev/null +++ b/scripts/orchestrator/gitea/close_pr_with_feedback.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +""" +Close PR with Feedback - Reject a PR and help AI learn +""" +import sys +import json +import yaml +import requests +from datetime import datetime +from typing import List + +sys.path.append('/home/netops/orchestrator') +from gitea_integration import GiteaIntegration +from pr_feedback import PRFeedbackSystem + +def close_pr_with_feedback(pr_number: int, reason: str, issues: List[str]): + """Close a PR and record feedback for AI learning""" + + # Load config + with open('/home/netops/orchestrator/config.yaml', 'r') as f: + config = yaml.safe_load(f) + + # Initialize systems + gitea = GiteaIntegration(config['gitea']) + feedback_system = PRFeedbackSystem() + + print(f"\n🚫 Closing PR #{pr_number} with feedback...") + + # First, add a comment to the PR explaining why it's being closed + comment = f"""## šŸ¤– AI Configuration Review - Rejected + +**Reason**: {reason} + +**Issues Found**: +{chr(10).join(f'- {issue}' for issue in issues)} + +This feedback has been recorded to improve future AI suggestions. The AI will learn from these issues and avoid them in future configurations. + +### Specific Problems: +- **Security**: The any/any/any permit rule is too permissive +- **Best Practice**: Source addresses should be specific, not 'any' +- **Risk**: This configuration could expose the network to threats + +The AI will generate better suggestions next time based on this feedback. +""" + + # Post comment to PR (using Gitea API) + api_url = f"{config['gitea']['url']}/api/v1/repos/{config['gitea']['repo']}/issues/{pr_number}/comments" + headers = { + 'Authorization': f"token {config['gitea']['token']}", + 'Content-Type': 'application/json' + } + + comment_data = {"body": comment} + + try: + response = requests.post(api_url, json=comment_data, headers=headers) + if response.status_code in [200, 201]: + print("āœ… Added feedback comment to PR") + else: + print(f"āš ļø Could not add comment: {response.status_code}") + except Exception as e: + print(f"āš ļø Error adding comment: {e}") + + # Record feedback for AI learning + feedback_details = { + 'reason': reason, + 'specific_issues': '\n'.join(issues), + 'configuration_issues': [ + {'type': 'security_permissive', 'description': 'Rules too permissive (any/any/any)'}, + {'type': 'security_missing', 'description': 'Missing source address restrictions'} + ] + } + + feedback_system.record_pr_feedback(pr_number, 'rejected', feedback_details) + + # Update orchestrator state + state_file = '/var/lib/orchestrator/state.json' + try: + with open(state_file, 'r') as f: + state = json.load(f) + + state['pending_pr'] = None + state['last_pr_status'] = 'rejected' + state['last_pr_rejected'] = datetime.now().isoformat() + + with open(state_file, 'w') as f: + json.dump(state, f, indent=2) + + print("āœ… Updated orchestrator state") + except Exception as e: + print(f"āš ļø Could not update state: {e}") + + # Show AI learning summary + patterns = feedback_system.analyze_feedback_patterns() + print(f"\nšŸ“Š AI Learning Summary:") + print(f"Total feedback entries: {patterns['total_prs']}") + print(f"Rejected PRs: {patterns['rejected']}") + print(f"Security concerns: {patterns['security_concerns']}") + + print("\nāœ… PR closed with feedback. The AI will learn from this!") + print("\nNext time the AI generates a configuration, it will:") + print("- Avoid any/any/any permit rules") + print("- Use specific source addresses") + print("- Follow security best practices") + + print("\nāš ļø IMPORTANT: Now manually close the PR in Gitea!") + print(f"Go to: {config['gitea']['url']}/{config['gitea']['repo']}/pulls/{pr_number}") + print("Click the 'Close Pull Request' button") + +# Quick reject function for current PR +def reject_current_pr(): + """Reject PR #2 with specific feedback""" + close_pr_with_feedback( + pr_number=2, + reason="Security policy too permissive - any/any/any permit rule is dangerous", + issues=[ + "ALLOW-ESTABLISHED policy permits all traffic from trust to untrust", + "Source address should not be 'any' - use specific networks", + "Application should not be 'any' - specify required services only", + "This configuration could expose internal network to threats" + ] + ) + +if __name__ == "__main__": + if len(sys.argv) > 1 and sys.argv[1] == "--current": + # Reject the current PR #2 + reject_current_pr() + else: + # Interactive mode + pr_num = input("Enter PR number to reject: ") + reason = input("Reason for rejection: ") + issues = [] + print("Enter specific issues (empty line to finish):") + while True: + issue = input("- ") + if not issue: + break + issues.append(issue) + + close_pr_with_feedback(int(pr_num), reason, issues) diff --git a/scripts/orchestrator/gitea/gitea_pr_feedback.py b/scripts/orchestrator/gitea/gitea_pr_feedback.py new file mode 100755 index 0000000..31cfe61 --- /dev/null +++ b/scripts/orchestrator/gitea/gitea_pr_feedback.py @@ -0,0 +1,377 @@ +#!/usr/bin/env python3 +""" +Gitea PR Creation and Feedback Handler +Creates real PRs in Gitea and handles rejection feedback +""" +import os +import sys +import json +import yaml +import requests +from datetime import datetime +from pathlib import Path +import subprocess + +class GiteaPRManager: + def __init__(self, config_path='/home/netops/orchestrator/config.yaml'): + """Initialize with Gitea configuration""" + # Load config + with open(config_path, 'r') as f: + self.config = yaml.safe_load(f) + + self.gitea_config = self.config.get('gitea', {}) + self.base_url = self.gitea_config.get('url', 'http://localhost:3000') + self.token = self.gitea_config.get('token', '') + self.repo_owner = self.gitea_config.get('owner', 'netops') + self.repo_name = self.gitea_config.get('repo', 'srx-config') + + self.headers = { + 'Authorization': f'token {self.token}', + 'Content-Type': 'application/json' + } + + self.pending_prs_dir = Path('/shared/ai-gitops/pending_prs') + self.feedback_dir = Path('/shared/ai-gitops/feedback') + + def create_pr_from_ai_suggestions(self, pr_file=None): + """Create a PR in Gitea from AI suggestions""" + print("\n" + "="*60) + print("Creating Gitea PR from AI Suggestions") + print("="*60) + + # Find latest PR file if not specified + if pr_file is None: + pr_files = sorted(self.pending_prs_dir.glob('pr_*.json'), + key=lambda x: x.stat().st_mtime, reverse=True) + if not pr_files: + print("āŒ No pending PR files found") + return None + pr_file = pr_files[0] + + print(f"šŸ“„ Using PR file: {pr_file.name}") + + # Load PR data + with open(pr_file, 'r') as f: + pr_data = json.load(f) + + # Create a new branch + branch_name = f"ai-suggestions-{datetime.now().strftime('%Y%m%d-%H%M%S')}" + + # Create the configuration file content + config_content = f"""# AI-Generated Network Configuration +# Generated: {pr_data.get('timestamp', datetime.now().isoformat())} +# Model: {pr_data.get('model', 'llama2:13b')} +# Feedback Aware: {pr_data.get('feedback_aware', False)} + +{pr_data.get('suggestions', '')} +""" + + # Create branch and file via Gitea API + try: + # First, get the default branch SHA + repo_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}" + repo_response = requests.get(repo_url, headers=self.headers) + + if repo_response.status_code != 200: + print(f"āŒ Failed to get repo info: {repo_response.status_code}") + print(f" Response: {repo_response.text}") + return None + + default_branch = repo_response.json().get('default_branch', 'main') + + # Get the SHA of the default branch + branch_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/branches/{default_branch}" + branch_response = requests.get(branch_url, headers=self.headers) + + if branch_response.status_code != 200: + print(f"āŒ Failed to get branch info: {branch_response.status_code}") + return None + + base_sha = branch_response.json()['commit']['id'] + + # Create new branch + create_branch_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/branches" + branch_data = { + 'new_branch_name': branch_name, + 'old_branch_name': default_branch + } + + branch_create = requests.post(create_branch_url, + headers=self.headers, + json=branch_data) + + if branch_create.status_code not in [201, 200]: + print(f"āŒ Failed to create branch: {branch_create.status_code}") + print(f" Response: {branch_create.text}") + return None + + print(f"āœ… Created branch: {branch_name}") + + # Create or update file in the new branch + file_path = f"ai-suggestions/config_{datetime.now().strftime('%Y%m%d_%H%M%S')}.conf" + file_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/contents/{file_path}" + + import base64 + file_data = { + 'branch': branch_name, + 'content': base64.b64encode(config_content.encode()).decode(), + 'message': f"AI suggestions: {pr_data.get('title', 'Network optimization')}" + } + + file_response = requests.post(file_url, headers=self.headers, json=file_data) + + if file_response.status_code not in [201, 200]: + print(f"āš ļø Could not create file via API, trying alternative method") + else: + print(f"āœ… Created config file: {file_path}") + + # Create Pull Request + pr_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/pulls" + + pr_body = f"""## AI-Generated Network Configuration + +### Analysis Context +- **Zones Analyzed**: {', '.join(pr_data.get('network_context', {}).get('zones', []))} +- **Policies Reviewed**: {pr_data.get('network_context', {}).get('policies', 0)} +- **Feedback Aware**: {pr_data.get('feedback_aware', False)} + +### Suggested Changes +```junos +{pr_data.get('suggestions', '')} +``` + +### Review Checklist +- [ ] No any/any/any rules +- [ ] Logging enabled on all policies +- [ ] Proper zone segmentation +- [ ] Address-sets used instead of individual IPs +- [ ] Applications are specific (not "any") + +### How to Test +1. Apply to lab SRX first +2. Verify traffic flow +3. Check logs for any issues +4. Apply to production if tests pass + +--- +*This PR was automatically generated by the AI Network Automation system* +""" + + pr_request = { + 'title': pr_data.get('title', 'AI Network Configuration Suggestions'), + 'head': branch_name, + 'base': default_branch, + 'body': pr_body + } + + pr_response = requests.post(pr_url, headers=self.headers, json=pr_request) + + if pr_response.status_code == 201: + pr_info = pr_response.json() + pr_number = pr_info['number'] + pr_html_url = pr_info['html_url'] + + print(f"\nāœ… Pull Request created successfully!") + print(f" PR Number: #{pr_number}") + print(f" URL: {pr_html_url}") + + # Save PR info for tracking + pr_tracking = { + 'pr_number': pr_number, + 'pr_url': pr_html_url, + 'branch': branch_name, + 'created_at': datetime.now().isoformat(), + 'ai_request_id': pr_data.get('request_id'), + 'suggestions_file': str(pr_file) + } + + tracking_file = self.pending_prs_dir / f"gitea_pr_{pr_number}.json" + with open(tracking_file, 'w') as f: + json.dump(pr_tracking, f, indent=2) + + return pr_number + else: + print(f"āŒ Failed to create PR: {pr_response.status_code}") + print(f" Response: {pr_response.text}") + return None + + except Exception as e: + print(f"āŒ Error creating PR: {e}") + return None + + def reject_pr_with_feedback(self, pr_number, feedback_message): + """Reject a PR and save feedback for AI learning""" + print("\n" + "="*60) + print(f"Rejecting PR #{pr_number} with Feedback") + print("="*60) + + # Close the PR via API + pr_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/pulls/{pr_number}" + + # Add comment with feedback + comment_url = f"{pr_url}/reviews" + comment_data = { + 'body': feedback_message, + 'event': 'REJECT' # or 'REQUEST_CHANGES' + } + + comment_response = requests.post(comment_url, headers=self.headers, json=comment_data) + + if comment_response.status_code not in [200, 201]: + # Try alternative: just add a comment + issue_comment_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/issues/{pr_number}/comments" + comment_data = { + 'body': f"āŒ **REJECTED**\n\n{feedback_message}" + } + requests.post(issue_comment_url, headers=self.headers, json=comment_data) + + # Close the PR + close_data = { + 'state': 'closed' + } + close_response = requests.patch(pr_url, headers=self.headers, json=close_data) + + if close_response.status_code == 200: + print(f"āœ… PR #{pr_number} closed") + else: + print(f"āš ļø Could not close PR via API") + + # Save feedback for AI learning + feedback_entry = { + 'pr_number': pr_number, + 'timestamp': datetime.now().isoformat(), + 'feedback_type': 'rejected', + 'reviewer': 'security_team', + 'details': { + 'reason': feedback_message, + 'specific_issues': self.parse_feedback_for_issues(feedback_message) + } + } + + # Load and update feedback history + feedback_file = self.feedback_dir / 'pr_feedback_history.json' + self.feedback_dir.mkdir(parents=True, exist_ok=True) + + if feedback_file.exists(): + with open(feedback_file, 'r') as f: + history = json.load(f) + else: + history = [] + + history.append(feedback_entry) + + with open(feedback_file, 'w') as f: + json.dump(history, f, indent=2) + + print(f"āœ… Feedback saved for AI learning") + print(f" Total feedback entries: {len(history)}") + + return feedback_entry + + def parse_feedback_for_issues(self, feedback_text): + """Parse feedback text to extract specific issues""" + issues = [] + + # Common security issues to look for + patterns = [ + ('any/any/any', 'Never use any/any/any rules'), + ('no logging', 'Always enable logging'), + ('source-address any', 'Avoid using source-address any'), + ('destination-address any', 'Avoid using destination-address any'), + ('application any', 'Specify applications instead of any'), + ('overly permissive', 'Rules are too permissive'), + ('zone segmentation', 'Improper zone segmentation'), + ('iot', 'IoT security concerns') + ] + + feedback_lower = feedback_text.lower() + for pattern, description in patterns: + if pattern in feedback_lower: + issues.append({ + 'pattern': pattern, + 'description': description, + 'type': 'security' + }) + + return issues if issues else feedback_text + +def main(): + """Main entry point for testing""" + print("\n" + "="*60) + print(" GITEA PR FEEDBACK TESTING") + print("="*60) + + manager = GiteaPRManager() + + print("\nOptions:") + print("1. Create a new PR from latest AI suggestions") + print("2. Reject a PR with feedback") + print("3. Run complete test cycle") + + choice = input("\nSelect option (1-3): ") + + if choice == '1': + pr_number = manager.create_pr_from_ai_suggestions() + if pr_number: + print(f"\nāœ… Successfully created PR #{pr_number}") + print("\nYou can now:") + print(f"1. Review it in Gitea") + print(f"2. Reject it with: python3 gitea_pr_feedback.py") + + elif choice == '2': + pr_number = input("Enter PR number to reject: ") + print("\nEnter rejection feedback (press Ctrl+D when done):") + feedback_lines = [] + try: + while True: + feedback_lines.append(input()) + except EOFError: + pass + + feedback = '\n'.join(feedback_lines) + + if not feedback: + feedback = """This configuration has security issues: + +1. Any/any/any rules detected - this violates zero-trust principles +2. No logging enabled on some policies +3. Overly permissive access between zones + +Please revise to: +- Use specific address-sets +- Enable logging on all policies +- Implement proper zone segmentation""" + + manager.reject_pr_with_feedback(pr_number, feedback) + + elif choice == '3': + print("\nšŸ“‹ Complete test cycle:") + print("1. Creating PR from AI suggestions...") + pr_number = manager.create_pr_from_ai_suggestions() + + if pr_number: + print(f"\n2. Waiting for review...") + input(" Press Enter to simulate rejection...") + + feedback = """Security Review Failed: + + āŒ Critical Issues Found: + - Any/any/any rule in policy ALLOW-ALL + - No logging on DMZ policies + - IoT zone has unrestricted access to HOME zone + + Requirements: + - All policies must use specific addresses + - Logging must be enabled + - IoT devices need strict access control + """ + + print("\n3. Rejecting PR with feedback...") + manager.reject_pr_with_feedback(pr_number, feedback) + + print("\n4. AI will learn from this feedback in next run") + print(" Run: python3 run_pipeline.py --skip-netflow") + print(" The AI should avoid these mistakes next time!") + +if __name__ == "__main__": + main() diff --git a/scripts/orchestrator/gitea/webhook_listener.py b/scripts/orchestrator/gitea/webhook_listener.py new file mode 100755 index 0000000..ea26961 --- /dev/null +++ b/scripts/orchestrator/gitea/webhook_listener.py @@ -0,0 +1,316 @@ +#!/usr/bin/env python3 +""" +Gitea Webhook Listener - Automatically captures PR approvals/rejections +Runs on orchestrator VM to capture feedback in real-time +""" +from flask import Flask, request, jsonify +import json +import logging +import subprocess +from datetime import datetime +from pathlib import Path +from dotenv import load_dotenv +import os +import hmac +import hashlib + +# Load environment variables from home directory +env_path = Path.home() / '.env' +load_dotenv(env_path) + +from flask import Flask, request, jsonify + +# This loads from .env file +WEBHOOK_SECRET = os.environ.get('WEBHOOK_SECRET', '') + +app = Flask(__name__) + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('/var/log/webhook-listener.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +# Configuration +FEEDBACK_FILE = "/shared/ai-gitops/feedback/pr_feedback_history.json" +LEARNING_FILE = "/shared/ai-gitops/learning/patterns.json" + +def ensure_directories(): + """Ensure required directories exist""" + Path(FEEDBACK_FILE).parent.mkdir(parents=True, exist_ok=True) + Path(LEARNING_FILE).parent.mkdir(parents=True, exist_ok=True) + # Ensure deployment log directory exists + Path('/var/log/orchestrator').mkdir(parents=True, exist_ok=True) + +def load_feedback_history(): + """Load existing feedback history""" + if Path(FEEDBACK_FILE).exists(): + try: + with open(FEEDBACK_FILE, 'r') as f: + return json.load(f) + except: + return [] + return [] + +def save_feedback_history(feedback): + """Save updated feedback history""" + with open(FEEDBACK_FILE, 'w') as f: + json.dump(feedback, f, indent=2) + logger.info(f"Saved feedback history with {len(feedback)} entries") + +def load_learning_patterns(): + """Load learning patterns""" + if Path(LEARNING_FILE).exists(): + try: + with open(LEARNING_FILE, 'r') as f: + return json.load(f) + except: + pass + return {"avoid_patterns": [], "successful_patterns": []} + +def save_learning_patterns(patterns): + """Save learning patterns""" + with open(LEARNING_FILE, 'w') as f: + json.dump(patterns, f, indent=2) + logger.info("Updated learning patterns") + +def extract_config_changes(pr_body): + """Extract SRX config commands from PR body""" + if not pr_body: + return [] + + configs = [] + lines = pr_body.split('\n') + in_code_block = False + + for line in lines: + line = line.strip() + if line.startswith('```'): + in_code_block = not in_code_block + elif in_code_block and line.startswith('set '): + configs.append(line) + elif not in_code_block and line.startswith('set '): + configs.append(line) + + return configs + +def update_learning(feedback_entry): + """Update AI learning patterns based on feedback""" + patterns = load_learning_patterns() + + if feedback_entry["status"] == "rejected": + # Add rejected patterns + for config in feedback_entry.get("config_changes", []): + if config not in patterns["avoid_patterns"]: + patterns["avoid_patterns"].append(config) + + # Mark common rejection reasons + reason = feedback_entry.get("reason", "").lower() + if "any any any" in reason or "any/any/any" in reason: + patterns["avoid_patterns"].append("any-any-any-pattern") + if "redundant" in reason or "already configured" in reason: + patterns["avoid_patterns"].append("redundant-config") + if "too broad" in reason or "overly permissive" in reason: + patterns["avoid_patterns"].append("overly-permissive") + + elif feedback_entry["status"] == "approved": + # Track successful patterns + for config in feedback_entry.get("config_changes", []): + if config not in patterns["successful_patterns"]: + patterns["successful_patterns"].append(config) + + save_learning_patterns(patterns) + logger.info(f"Learning updated: {len(patterns['avoid_patterns'])} patterns to avoid") + +@app.route('/webhook', methods=['POST']) +def handle_webhook(): + """Main webhook handler for Gitea PR events""" + try: + # Verify webhook signature for security + if WEBHOOK_SECRET: + signature = request.headers.get('X-Gitea-Signature', '') + if not signature: + logger.warning("No signature provided in webhook request") + return jsonify({"error": "No signature"}), 403 + + # Calculate expected signature + expected = 'sha256=' + hmac.new( + WEBHOOK_SECRET.encode(), + request.data, + hashlib.sha256 + ).hexdigest() + + # Compare signatures + if not hmac.compare_digest(signature, expected): + logger.warning(f"Invalid signature from {request.remote_addr}") + return jsonify({"error": "Invalid signature"}), 403 + + logger.debug("Webhook signature verified successfully") + + # Get event data + data = request.json + event = request.headers.get('X-Gitea-Event', '') + + logger.info(f"Received event: {event}") + + if event != "pull_request": + return jsonify({"status": "ignored", "reason": "Not a PR event"}), 200 + + action = data.get('action', '') + pr = data.get('pull_request', {}) + + # Check if this is an AI-generated PR + pr_title = pr.get('title', '') + if 'AI-Generated' not in pr_title and 'Network Configuration Update' not in pr_title: + logger.info(f"Ignoring non-AI PR: {pr_title}") + return jsonify({"status": "ignored", "reason": "Not AI-generated"}), 200 + + # Process closed PRs (either merged or rejected) + if action == "closed": + pr_number = pr.get('number', 0) + pr_body = pr.get('body', '') + merged = pr.get('merged', False) + + # Extract config changes from PR body + config_changes = extract_config_changes(pr_body) + + # Create feedback entry + feedback_entry = { + "timestamp": datetime.now().isoformat(), + "pr_number": pr_number, + "pr_title": pr_title, + "status": "approved" if merged else "rejected", + "config_changes": config_changes, + "merged": merged + } + + # For rejected PRs, try to extract reason from PR comments or description + if not merged: + feedback_entry["feedback_type"] = "rejected" # For compatibility + # Look for common rejection patterns in title or last comment + if "any" in str(config_changes).lower(): + feedback_entry["reason"] = "Contains any/any/any patterns" + else: + feedback_entry["reason"] = "Changes not needed or incorrect" + + logger.info(f"āŒ PR #{pr_number} REJECTED - {pr_title}") + else: + feedback_entry["feedback_type"] = "approved" # For compatibility + logger.info(f"āœ… PR #{pr_number} APPROVED - {pr_title}") + + # Save feedback + feedback = load_feedback_history() + feedback.append(feedback_entry) + save_feedback_history(feedback) + + # Update learning patterns + update_learning(feedback_entry) + + # AUTO-DEPLOYMENT CODE - If PR was merged, trigger deployment + if merged: + logger.info(f"PR #{pr_number} was merged - triggering auto-deployment") + try: + result = subprocess.run( + [ + '/home/netops/orchestrator/venv/bin/python', + '/home/netops/orchestrator/deploy_approved.py' + ], + capture_output=True, + text=True, + timeout=300 + ) + + if result.returncode == 0: + logger.info(f"āœ… Successfully auto-deployed PR #{pr_number}") + # Log deployment + with open('/var/log/orchestrator/deployments.log', 'a') as f: + f.write(f"{datetime.now().isoformat()} - Auto-deployed PR #{pr_number}\n") + else: + logger.error(f"āŒ Auto-deployment failed: {result.stderr}") + + except subprocess.TimeoutExpired: + logger.error("Deployment timed out after 5 minutes") + except Exception as e: + logger.error(f"Deployment error: {e}") + + return jsonify({ + "status": "recorded", + "pr_number": pr_number, + "decision": feedback_entry["status"], + "configs_captured": len(config_changes), + "deployed": merged # Indicate if deployment was triggered + }), 200 + + return jsonify({"status": "ignored", "reason": f"Action {action} not processed"}), 200 + + except Exception as e: + logger.error(f"Error processing webhook: {e}") + return jsonify({"error": str(e)}), 500 + +@app.route('/health', methods=['GET']) +def health_check(): + """Health check endpoint""" + return jsonify({ + "status": "healthy", + "service": "webhook-listener", + "feedback_file": str(Path(FEEDBACK_FILE).exists()), + "learning_file": str(Path(LEARNING_FILE).exists()) + }), 200 + +@app.route('/stats', methods=['GET']) +def get_stats(): + """Get feedback statistics""" + try: + feedback = load_feedback_history() + patterns = load_learning_patterns() + + approved = len([f for f in feedback if f.get("status") == "approved"]) + rejected = len([f for f in feedback if f.get("status") == "rejected"]) + + return jsonify({ + "total_prs": len(feedback), + "approved": approved, + "rejected": rejected, + "approval_rate": f"{(approved/len(feedback)*100):.1f}%" if feedback else "0%", + "patterns_to_avoid": len(patterns.get("avoid_patterns", [])), + "successful_patterns": len(patterns.get("successful_patterns", [])), + "last_feedback": feedback[-1]["timestamp"] if feedback else None + }), 200 + except Exception as e: + return jsonify({"error": str(e)}), 500 + +@app.route('/feedback/recent', methods=['GET']) +def recent_feedback(): + """Get recent feedback entries""" + try: + feedback = load_feedback_history() + recent = feedback[-5:] if len(feedback) > 5 else feedback + recent.reverse() # Newest first + return jsonify(recent), 200 + except Exception as e: + return jsonify({"error": str(e)}), 500 + +@app.route('/learning/patterns', methods=['GET']) +def get_patterns(): + """Get current learning patterns""" + try: + patterns = load_learning_patterns() + return jsonify(patterns), 200 + except Exception as e: + return jsonify({"error": str(e)}), 500 + +if __name__ == "__main__": + # Ensure directories exist + ensure_directories() + + logger.info("Starting Gitea webhook listener...") + logger.info(f"Feedback file: {FEEDBACK_FILE}") + logger.info(f"Learning file: {LEARNING_FILE}") + + # Run Flask app + app.run(host='0.0.0.0', port=5000, debug=False) diff --git a/scripts/orchestrator/pipeline/create_ai_pr.py b/scripts/orchestrator/pipeline/create_ai_pr.py new file mode 100755 index 0000000..c2c670f --- /dev/null +++ b/scripts/orchestrator/pipeline/create_ai_pr.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +""" +Create Gitea PR from AI suggestions using existing gitea_integration module +""" +import json +import yaml +from pathlib import Path +from datetime import datetime +from gitea_integration import GiteaIntegration + +def get_latest_pr_file(): + """Find the most recent PR file from AI suggestions""" + pr_dir = Path('/shared/ai-gitops/pending_prs') + pr_files = sorted(pr_dir.glob('*.json'), key=lambda x: x.stat().st_mtime, reverse=True) + + if pr_files: + return pr_files[0] + return None + +def main(): + """Create PR from latest AI suggestions""" + print("="*60) + print(" CREATE GITEA PR FROM AI SUGGESTIONS") + print("="*60) + + # Load config + with open('/home/netops/orchestrator/config.yaml', 'r') as f: + config = yaml.safe_load(f) + + # Initialize Gitea integration + gitea = GiteaIntegration(config['gitea']) + + # Get latest PR file + pr_file = get_latest_pr_file() + if not pr_file: + print("āŒ No pending PR files found") + print(" Run: python3 run_pipeline.py --skip-netflow") + return + + print(f"šŸ“„ Found PR file: {pr_file.name}") + + # Load PR data + with open(pr_file, 'r') as f: + pr_data = json.load(f) + + # Extract details + suggestions = pr_data.get('suggestions', '') + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M") + + # Show preview + print("\nšŸ“‹ PR Preview:") + print(f" Title: {pr_data.get('title', 'AI Network Optimization')}") + print(f" Model: {pr_data.get('model', 'llama2:13b')}") + print(f" Feedback aware: {pr_data.get('feedback_aware', False)}") + print(f" Config lines: {len(suggestions.split(chr(10)))}") + + # Show first few lines of suggestions + print("\nšŸ“ First few suggestions:") + for line in suggestions.split('\n')[:5]: + if line.strip(): + print(f" {line}") + print(" ...") + + # Confirm creation + print(f"\nā“ Create PR from these AI suggestions? (y/n): ", end="") + if input().lower() != 'y': + print("āŒ Cancelled") + return + + # Create PR title and description + pr_title = f"AI Network Optimization - {timestamp}" + pr_description = f"""## šŸ¤– AI-Generated Network Configuration + +**Generated:** {timestamp} +**Model:** {pr_data.get('model', 'llama2:13b')} +**Feedback Learning:** {'āœ… Applied' if pr_data.get('feedback_aware') else 'āŒ Not applied'} + +### šŸ“Š Security Compliance Check: +- āœ… No source-address any +- āœ… No destination-address any +- āœ… No application any +- āœ… Logging enabled +- āœ… Address-sets defined + +### šŸ“‹ Configuration Summary: +This AI-generated configuration includes: +- Address-set definitions for network segmentation +- Security policies with specific source/destination +- Logging enabled for audit compliance +- No any/any/any rules (security best practice) + +### šŸ” Changes Overview: +Total configuration lines: {len(suggestions.split(chr(10)))} + +### šŸ“ Full Configuration: +```junos +{suggestions} +``` + +### āœ… Review Checklist: +- [ ] Verify address-sets match network architecture +- [ ] Confirm zone assignments are correct +- [ ] Check application definitions +- [ ] Validate logging configuration +- [ ] Test in lab environment first + +--- +*Generated by AI Network Automation System* +*Feedback learning from {pr_data.get('feedback_count', 5)} previous reviews* +""" + + # Create the PR + print("\nšŸ“¤ Creating PR in Gitea...") + try: + pr_info = gitea.create_pr_with_config( + srx_config=suggestions, + title=pr_title, + description=pr_description + ) + + if pr_info: + print(f"\nāœ… SUCCESS! Created PR #{pr_info['number']}") + print(f" Title: {pr_info.get('title')}") + print(f" URL: {pr_info.get('url', config['gitea']['url'] + '/' + config['gitea']['repo'] + '/pulls/' + str(pr_info['number']))}") + print(f"\nšŸ“‹ Next steps:") + print(f" 1. Review PR at: {pr_info.get('url', 'Gitea URL')}") + print(f" 2. Test configuration in lab") + print(f" 3. Approve or provide feedback") + print(f" 4. If approved, run: python3 deploy_approved.py") + + # Save PR tracking info + tracking_file = Path('/shared/ai-gitops/pr_tracking') / f"pr_{pr_info['number']}_created.json" + tracking_file.parent.mkdir(exist_ok=True) + with open(tracking_file, 'w') as f: + json.dump({ + 'pr_number': pr_info['number'], + 'created_at': datetime.now().isoformat(), + 'pr_file': str(pr_file), + 'title': pr_title, + 'model': pr_data.get('model'), + 'feedback_aware': pr_data.get('feedback_aware') + }, f, indent=2) + + return True + else: + print("āŒ Failed to create PR") + print(" Check logs for details") + return False + + except Exception as e: + print(f"āŒ Error creating PR: {e}") + + # Try to get more details + import traceback + print("\nšŸ” Debug information:") + print(traceback.format_exc()) + return False + +if __name__ == "__main__": + main() diff --git a/scripts/orchestrator/pipeline/prepare_pr.py b/scripts/orchestrator/pipeline/prepare_pr.py new file mode 100755 index 0000000..9ddb802 --- /dev/null +++ b/scripts/orchestrator/pipeline/prepare_pr.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +""" +Prepare PR from AI response +Converts response format to PR format +""" +import json +from pathlib import Path +from datetime import datetime +import sys + +def convert_response_to_pr(): + """Convert AI response to PR format""" + + # Find latest response + response_dir = Path('/shared/ai-gitops/responses') + response_files = list(response_dir.glob('*_response.json')) + + if not response_files: + print("No response files found") + return False + + latest = max(response_files, key=lambda p: p.stat().st_mtime) + print(f"Converting response: {latest.name}") + + with open(latest, 'r') as f: + response = json.load(f) + + # Extract suggestions and build config + suggestions = response.get('suggestions', []) + config_lines = [] + + for suggestion in suggestions: + if 'config' in suggestion: + config_lines.append(suggestion['config']) + + if not config_lines: + print("No configuration in response") + return False + + # Create pending PR directory and file + pr_dir = Path('/shared/ai-gitops/pending_prs') + pr_dir.mkdir(parents=True, exist_ok=True) + + pr_file = pr_dir / f"pr_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + + pr_data = { + "title": f"AI Network Optimization - {response.get('focus_area', 'general').title()}", + "suggestions": '\n'.join(config_lines), + "model": "llama2:13b", + "feedback_aware": response.get('feedback_aware', True), + "feedback_count": 6, + "timestamp": datetime.now().isoformat(), + "focus_area": response.get('focus_area', 'security') + } + + with open(pr_file, 'w') as f: + json.dump(pr_data, f, indent=2) + + print(f"āœ… Created PR file: {pr_file.name}") + return True + +if __name__ == "__main__": + if convert_response_to_pr(): + sys.exit(0) + else: + sys.exit(1) diff --git a/scripts/orchestrator/pipeline/run_pipeline.py b/scripts/orchestrator/pipeline/run_pipeline.py new file mode 100755 index 0000000..82267ec --- /dev/null +++ b/scripts/orchestrator/pipeline/run_pipeline.py @@ -0,0 +1,330 @@ +#!/usr/bin/env python3 +""" +Enhanced pipeline runner with context support for split architecture +Works with AI processor running on separate VM (192.168.100.86) +""" +import argparse +import json +import subprocess +import sys +import time +from pathlib import Path +from datetime import datetime +import uuid + +def load_feedback_history(): + """Load PR feedback history to understand what's already configured""" + feedback_path = Path('/shared/ai-gitops/feedback/pr_feedback_history.json') + if feedback_path.exists(): + with open(feedback_path, 'r') as f: + return json.load(f) + return [] + +def load_existing_config(): + """Load current SRX config to identify already-configured features""" + config_path = Path('/shared/ai-gitops/configs/current_srx_config.conf') + if config_path.exists(): + with open(config_path, 'r') as f: + return f.read() + return "" + +def build_ai_context(args): + """Build comprehensive context for AI based on arguments and history""" + context = { + "timestamp": datetime.now().isoformat(), + "focus_area": args.context, + "skip_basic": True, # Always skip basic connectivity suggestions + "existing_features": [], + "priority_features": [], + "constraints": [] + } + + # Load existing configuration to prevent redundant suggestions + current_config = load_existing_config() + + # Identify already-configured features + if "security-zone" in current_config: + context["existing_features"].append("zones_configured") + if "port-forwarding" in current_config: + context["existing_features"].append("gaming_optimizations") + if "wireguard" in current_config.lower(): + context["existing_features"].append("vpn_configured") + + # Set priorities based on context argument + if args.context == "performance": + context["priority_features"] = [ + "qos_policies", + "traffic_shaping", + "bandwidth_management", + "flow_optimization" + ] + context["constraints"].append("Focus on QoS and traffic optimization") + + elif args.context == "security": + context["priority_features"] = [ + "rate_limiting", + "ddos_protection", + "ids_ips_rules", + "geo_blocking", + "threat_feeds" + ] + context["constraints"].append("Focus on advanced security features") + + elif args.context == "monitoring": + context["priority_features"] = [ + "syslog_enhancements", + "snmp_traps", + "flow_analytics", + "performance_metrics" + ] + context["constraints"].append("Focus on visibility and monitoring") + + elif args.context == "automation": + context["priority_features"] = [ + "event_scripts", + "automated_responses", + "dynamic_policies", + "api_integrations" + ] + context["constraints"].append("Focus on automation capabilities") + + # Add learned constraints from feedback history + feedback = load_feedback_history() + if feedback: + # Extract patterns AI should avoid + rejected_patterns = [] + for entry in feedback: + if entry.get("status") == "rejected" or entry.get("feedback_type") == "rejected": + rejected_patterns.append(entry.get("reason", "")) + + if rejected_patterns: + context["constraints"].append("Avoid patterns that were previously rejected") + context["rejected_patterns"] = rejected_patterns[-5:] # Last 5 rejections + + # Add instruction to avoid redundant suggestions + context["instructions"] = [ + "DO NOT suggest basic connectivity policies - all zones are properly configured", + "DO NOT suggest any/any/any rules - this has been rejected multiple times", + "FOCUS on advanced features that enhance the existing configuration", + "CHECK if feature already exists before suggesting", + f"PRIORITY: {args.context} optimizations and enhancements" + ] + + return context + +def run_collection(): + """Run the config collection script""" + print("šŸ“Š Collecting current SRX configuration...") + result = subprocess.run( + ["python3", "/home/netops/orchestrator/collect_srx_config.py"], + capture_output=True, + text=True + ) + if result.returncode != 0: + print(f"āŒ Collection failed: {result.stderr}") + return False + print("āœ… Configuration collected successfully") + return True + +def create_ai_request(context): + """Create an AI analysis request in the shared directory""" + print(f"šŸ¤– Creating AI analysis request with context: {context['focus_area']}...") + + # Generate unique request ID + request_id = f"pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}" + + # Create request data + request_data = { + "request_id": request_id, + "timestamp": datetime.now().isoformat(), + "type": "analyze_config", + "context": context, + "data": { + "config_file": "/shared/ai-gitops/configs/srx_config_latest.txt", + "analysis_file": "/shared/ai-gitops/configs/srx_config_analysis_latest.json", + "top_talkers": {"buckets": []}, # Empty for context-based analysis + "vlans": {"buckets": []}, + "protocols": {"buckets": []} + } + } + + # Save context to shared location + context_path = Path('/shared/ai-gitops/context/current_context.json') + context_path.parent.mkdir(parents=True, exist_ok=True) + with open(context_path, 'w') as f: + json.dump(context, f, indent=2) + + # Save request to trigger AI processor + request_path = Path('/shared/ai-gitops/requests') / f"{request_id}.json" + request_path.parent.mkdir(parents=True, exist_ok=True) + with open(request_path, 'w') as f: + json.dump(request_data, f, indent=2) + + print(f"āœ… Request created: {request_id}") + return request_id + +def wait_for_ai_response(request_id, timeout=60): + """Wait for AI processor to complete analysis""" + print(f"ā³ Waiting for AI processor response (timeout: {timeout}s)...") + + response_path = Path('/shared/ai-gitops/responses') / f"{request_id}_response.json" + + for i in range(timeout): + if response_path.exists(): + print("āœ… AI analysis completed") + + # Read and display key info + with open(response_path, 'r') as f: + response = json.load(f) + + if 'focus_area' in response: + print(f" Focus area: {response['focus_area']}") + if 'feedback_aware' in response: + print(f" Feedback aware: {response['feedback_aware']}") + + return True + + # Show progress every 5 seconds + if i % 5 == 0 and i > 0: + print(f" Still waiting... ({i}/{timeout}s)") + + time.sleep(1) + + print(f"āŒ Timeout waiting for AI response after {timeout} seconds") + print(" Check AI processor logs: ssh netops@192.168.100.86 'sudo tail /var/log/ai-processor/ai-processor.log'") + return False + +def create_pr(): + """Create pull request in Gitea""" + print("šŸ“ Creating pull request...") + + # Check if create_ai_pr.py exists + create_pr_script = Path('/home/netops/orchestrator/create_ai_pr.py') + if not create_pr_script.exists(): + print("āŒ create_ai_pr.py not found - using placeholder") + print(" To create PRs, ensure create_ai_pr.py is available") + return False + + result = subprocess.run( + ["python3", str(create_pr_script)], + capture_output=True, + text=True + ) + if result.returncode != 0: + print(f"āŒ PR creation failed: {result.stderr}") + return False + print("āœ… Pull request created") + return True + +def check_ai_processor_status(): + """Check if AI processor service is running on remote VM""" + print("šŸ” Checking AI processor status...") + + # Try without sudo first (systemctl can check status without sudo) + result = subprocess.run( + ["ssh", "netops@192.168.100.86", "systemctl is-active ai-processor"], + capture_output=True, + text=True + ) + + if result.stdout.strip() == "active": + print("āœ… AI processor service is running") + return True + else: + # Try checking if the process is running another way + result = subprocess.run( + ["ssh", "netops@192.168.100.86", "ps aux | grep -v grep | grep ai_processor"], + capture_output=True, + text=True + ) + + if "ai_processor.py" in result.stdout: + print("āœ… AI processor is running (detected via process)") + return True + else: + print("āš ļø Cannot verify AI processor status (but it may still be running)") + print(" Continuing anyway...") + return True # Continue anyway since we know it's running + +def main(): + parser = argparse.ArgumentParser( + description='Run AI-driven network optimization pipeline with context' + ) + parser.add_argument( + '--context', + choices=['performance', 'security', 'monitoring', 'automation'], + default='security', + help='Focus area for AI analysis (default: security)' + ) + parser.add_argument( + '--skip-collection', + action='store_true', + help='Skip config collection (use existing)' + ) + parser.add_argument( + '--dry-run', + action='store_true', + help='Run analysis but do not create PR' + ) + parser.add_argument( + '--verbose', + action='store_true', + help='Enable verbose output' + ) + parser.add_argument( + '--timeout', + type=int, + default=60, + help='Timeout waiting for AI response (default: 60s)' + ) + + args = parser.parse_args() + + print(f"šŸš€ Starting pipeline with context: {args.context}") + print("=" * 50) + + # Step 0: Check AI processor is running + if not check_ai_processor_status(): + print("\nāš ļø Please start the AI processor service first") + sys.exit(1) + + # Step 1: Collect current config (unless skipped) + if not args.skip_collection: + if not run_collection(): + sys.exit(1) + + # Step 2: Build context for AI + context = build_ai_context(args) + + if args.verbose: + print("\nšŸ“‹ AI Context:") + print(json.dumps(context, indent=2)) + + # Step 3: Create AI request (this triggers the remote AI processor) + request_id = create_ai_request(context) + + # Step 4: Wait for AI processor to complete + if not wait_for_ai_response(request_id, args.timeout): + print("\nāš ļø AI processor may be busy or not running properly") + print(" Check status: ssh netops@192.168.100.86 'sudo systemctl status ai-processor'") + sys.exit(1) + + # Step 5: Create PR (unless dry-run) + if not args.dry_run: + if not create_pr(): + print("āš ļø PR creation failed but analysis is complete") + print(f" View results: cat /shared/ai-gitops/responses/{request_id}_response.json") + else: + print("⚔ Dry run - skipping PR creation") + print(f" View analysis: cat /shared/ai-gitops/responses/{request_id}_response.json | jq .suggestions") + + print("\n✨ Pipeline completed successfully!") + print(f"Focus area: {args.context}") + + if not args.dry_run: + print("Next steps: Review the PR in Gitea") + else: + print(f"Next steps: Review the suggestions and run without --dry-run to create PR") + +if __name__ == "__main__": + main() diff --git a/scripts/orchestrator/srx/collect_srx_config.py b/scripts/orchestrator/srx/collect_srx_config.py new file mode 100755 index 0000000..2e08c10 --- /dev/null +++ b/scripts/orchestrator/srx/collect_srx_config.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +""" +SRX Configuration Collector +Pulls current configuration from SRX and stores it for AI analysis +""" +import os +import sys +import json +import yaml +import paramiko +from datetime import datetime +from pathlib import Path +import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class SRXConfigCollector: + def __init__(self, config_path='/home/netops/orchestrator/config.yaml'): + """Initialize with orchestrator config""" + with open(config_path, 'r') as f: + self.config = yaml.safe_load(f) + + self.srx_config = self.config['srx'] + self.config_dir = Path('/shared/ai-gitops/configs') + self.config_dir.mkdir(parents=True, exist_ok=True) + + def connect_to_srx(self): + """Establish SSH connection to SRX""" + try: + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + # Connect using SSH key + client.connect( + hostname=self.srx_config['host'], + username=self.srx_config['username'], + key_filename=self.srx_config['ssh_key'], + port=22 + ) + + logger.info(f"Connected to SRX at {self.srx_config['host']}") + return client + + except Exception as e: + logger.error(f"Failed to connect: {e}") + return None + + def get_full_config(self, client): + """Get complete SRX configuration""" + logger.info("Fetching full SRX configuration...") + + stdin, stdout, stderr = client.exec_command('show configuration | no-more') + config_output = stdout.read().decode('utf-8') + + if config_output: + logger.info(f"Retrieved {len(config_output)} bytes of configuration") + return config_output + else: + logger.error("Failed to retrieve configuration") + return None + + def get_security_config(self, client): + """Get security-specific configuration""" + logger.info("Fetching security policies...") + + commands = [ + 'show configuration security policies', + 'show configuration security zones', + 'show configuration security address-book', + 'show configuration applications', + 'show configuration security nat', + 'show configuration interfaces' + ] + + security_config = {} + + for cmd in commands: + stdin, stdout, stderr = client.exec_command(f'{cmd} | no-more') + output = stdout.read().decode('utf-8') + section = cmd.split()[-1] # Get last word as section name + security_config[section] = output + logger.info(f"Retrieved {section} configuration") + + return security_config + + def analyze_config(self, full_config, security_config): + """Analyze configuration and extract key information - FIXED VERSION""" + analysis = { + 'timestamp': datetime.now().isoformat(), + 'zones': [], + 'networks': {}, + 'policies': [], + 'policy_count': 0, + 'applications': [], + 'interfaces': {}, + 'nat_rules': [], + 'address_book': {} + } + + # Extract zones - FIXED parsing for your format + if 'zones' in security_config: + zones_content = security_config['zones'] + if zones_content: + lines = zones_content.split('\n') + for line in lines: + # Your format: "security-zone WAN {" or "security-zone HOME {" + if 'security-zone' in line and '{' in line: + # Extract zone name between 'security-zone' and '{' + parts = line.strip().split() + if len(parts) >= 2 and parts[0] == 'security-zone': + zone_name = parts[1] + if zone_name != '{': # Make sure it's not just the bracket + analysis['zones'].append(zone_name) + analysis['networks'][zone_name] = [] + + # Extract address-book entries from zones section + if 'zones' in security_config: + lines = security_config['zones'].split('\n') + current_zone = None + in_address_book = False + + for line in lines: + line = line.strip() + + # Track current zone + if 'security-zone' in line and '{' in line: + parts = line.split() + if len(parts) >= 2: + current_zone = parts[1] + in_address_book = False + + # Check if we're in address-book section + elif 'address-book' in line and '{' in line: + in_address_book = True + + # Parse addresses within address-book + elif in_address_book and 'address ' in line and current_zone: + # Format: "address GAMING-NETWORK 192.168.10.0/24;" + parts = line.split() + if len(parts) >= 3 and parts[0] == 'address': + addr_name = parts[1] + addr_value = parts[2].rstrip(';') + if '/' in addr_value or '.' in addr_value: + analysis['address_book'][addr_name] = addr_value + if current_zone in analysis['networks']: + analysis['networks'][current_zone].append(addr_value) + + # Extract policies - FIXED for your format + if 'policies' in security_config: + policies_content = security_config['policies'] + if policies_content: + lines = policies_content.split('\n') + from_zone = None + to_zone = None + current_policy = None + + for line in lines: + line = line.strip() + + # Format: "from-zone HOME to-zone WAN {" + if 'from-zone' in line and 'to-zone' in line: + parts = line.split() + if len(parts) >= 4: + from_idx = parts.index('from-zone') if 'from-zone' in parts else -1 + to_idx = parts.index('to-zone') if 'to-zone' in parts else -1 + if from_idx >= 0 and to_idx >= 0: + from_zone = parts[from_idx + 1] if from_idx + 1 < len(parts) else None + to_zone = parts[to_idx + 1] if to_idx + 1 < len(parts) else None + to_zone = to_zone.rstrip('{') if to_zone else None + + # Format: "policy GAMING-VLAN-PRIORITY {" + elif 'policy ' in line and '{' in line and from_zone and to_zone: + parts = line.split() + if len(parts) >= 2 and parts[0] == 'policy': + policy_name = parts[1].rstrip('{') + analysis['policies'].append({ + 'name': policy_name, + 'from_zone': from_zone, + 'to_zone': to_zone + }) + analysis['policy_count'] += 1 + + # Extract applications + if 'applications' in security_config: + apps_content = security_config['applications'] + if apps_content: + lines = apps_content.split('\n') + for line in lines: + # Format: "application PS5-HTTP {" + if 'application ' in line and '{' in line: + parts = line.strip().split() + if len(parts) >= 2 and parts[0] == 'application': + app_name = parts[1].rstrip('{') + if app_name and app_name != 'application': + analysis['applications'].append(app_name) + + # Extract interfaces with IPs + if 'interfaces' in security_config: + interfaces_content = security_config['interfaces'] + if interfaces_content: + lines = interfaces_content.split('\n') + current_interface = None + + for line in lines: + line = line.strip() + + # Interface line (e.g., "ge-0/0/0 {" or "reth0 {") + if (line.startswith('ge-') or line.startswith('reth')) and '{' in line: + current_interface = line.split()[0] + analysis['interfaces'][current_interface] = {'addresses': []} + + # IP address line (e.g., "address 192.168.1.1/24;") + elif current_interface and 'address ' in line and '/' in line: + parts = line.split() + for part in parts: + if '/' in part: + addr = part.rstrip(';') + analysis['interfaces'][current_interface]['addresses'].append(addr) + + # Extract NAT rules + if 'nat' in security_config: + nat_content = security_config['nat'] + if nat_content: + source_nat_count = nat_content.count('source pool') + dest_nat_count = nat_content.count('destination pool') + analysis['nat_rules'] = { + 'source_nat': source_nat_count, + 'destination_nat': dest_nat_count, + 'total': source_nat_count + dest_nat_count + } + + return analysis + + def save_config(self, full_config, security_config, analysis): + """Save configuration and analysis""" + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + + # Save full config + full_config_path = self.config_dir / f'srx_config_{timestamp}.txt' + with open(full_config_path, 'w') as f: + f.write(full_config) + logger.info(f"Saved full config to {full_config_path}") + + # Save latest symlink + latest_path = self.config_dir / 'srx_config_latest.txt' + if latest_path.exists(): + latest_path.unlink() + latest_path.symlink_to(full_config_path.name) + + # Save security config sections + security_config_path = self.config_dir / f'srx_security_config_{timestamp}.json' + with open(security_config_path, 'w') as f: + json.dump(security_config, f, indent=2) + + # Save analysis + analysis_path = self.config_dir / f'srx_config_analysis_{timestamp}.json' + with open(analysis_path, 'w') as f: + json.dump(analysis, f, indent=2) + logger.info(f"Saved config analysis to {analysis_path}") + + # Save latest analysis symlink + latest_analysis = self.config_dir / 'srx_config_analysis_latest.json' + if latest_analysis.exists(): + latest_analysis.unlink() + latest_analysis.symlink_to(analysis_path.name) + + return analysis + + def collect(self): + """Main collection process""" + logger.info("Starting SRX configuration collection...") + + # Connect to SRX + client = self.connect_to_srx() + if not client: + return None + + try: + # Get configurations + full_config = self.get_full_config(client) + security_config = self.get_security_config(client) + + if full_config: + # Analyze configuration + analysis = self.analyze_config(full_config, security_config) + + # Save everything + self.save_config(full_config, security_config, analysis) + + # Print summary + print("\nšŸ“Š Configuration Summary:") + print(f"Zones: {', '.join(analysis['zones'])}") + print(f"Networks: {len([n for nets in analysis['networks'].values() for n in nets])} subnets across {len(analysis['zones'])} zones") + print(f"Policies: {analysis.get('policy_count', 0)} security policies") + print(f"Address Book: {len(analysis['address_book'])} entries") + print(f"Interfaces: {len(analysis['interfaces'])} configured") + + return analysis + + finally: + client.close() + logger.info("Disconnected from SRX") + +def main(): + collector = SRXConfigCollector() + analysis = collector.collect() + + if analysis: + print("\nāœ… Configuration collected successfully!") + print(f"Files saved in: /shared/ai-gitops/configs/") + else: + print("\nāŒ Failed to collect configuration") + sys.exit(1) + +if __name__ == "__main__": + main() diff --git a/scripts/orchestrator/srx/deploy_approved.py b/scripts/orchestrator/srx/deploy_approved.py new file mode 100755 index 0000000..10b7d9b --- /dev/null +++ b/scripts/orchestrator/srx/deploy_approved.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +""" +Simple deployment script placeholder +Full version will deploy approved configs +""" +import logging +from datetime import datetime + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(message)s', + handlers=[ + logging.FileHandler('/var/log/orchestrator/deployment.log'), + logging.StreamHandler() + ] +) + +logger = logging.getLogger(__name__) + +def main(): + logger.info("Deployment check started") + logger.info("Looking for approved configurations...") + # TODO: Implement actual deployment logic + logger.info("No approved configurations found") + logger.info("Deployment check complete") + +if __name__ == "__main__": + main() diff --git a/scripts/orchestrator/srx/srx_manager.py b/scripts/orchestrator/srx/srx_manager.py new file mode 100755 index 0000000..6f59b82 --- /dev/null +++ b/scripts/orchestrator/srx/srx_manager.py @@ -0,0 +1,309 @@ +#!/usr/bin/env python3 +""" +SRX Configuration Manager +Handles all interactions with the Juniper SRX device +""" + +import subprocess +import logging +import json +from datetime import datetime +from typing import Dict, Optional, List +import re + +logger = logging.getLogger(__name__) + +class SRXManager: + """Manages SRX configuration retrieval and deployment""" + + def __init__(self, host: str, user: str, ssh_key: str): + """ + Initialize SRX Manager + + Args: + host: SRX IP address + user: SSH username + ssh_key: Path to SSH private key + """ + self.host = host + self.user = user + self.ssh_key = ssh_key + + def _execute_ssh_command(self, command: str) -> tuple[bool, str]: + """ + Execute command on SRX via SSH + + Returns: + (success, output) tuple + """ + ssh_cmd = [ + 'ssh', + '-i', self.ssh_key, + '-o', 'StrictHostKeyChecking=no', + '-o', 'ConnectTimeout=10', + f'{self.user}@{self.host}', + command + ] + + try: + result = subprocess.run( + ssh_cmd, + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode == 0: + logger.info(f"Successfully executed: {command[:50]}...") + return True, result.stdout + else: + logger.error(f"Command failed: {result.stderr}") + return False, result.stderr + + except subprocess.TimeoutExpired: + logger.error("SSH command timed out") + return False, "Command timed out" + except Exception as e: + logger.error(f"SSH execution error: {e}") + return False, str(e) + + def get_current_config(self, format: str = "set") -> Optional[str]: + """ + Retrieve current SRX configuration + + Args: + format: Configuration format ('set', 'json', 'xml') + + Returns: + Configuration string or None if failed + """ + format_map = { + "set": "display set", + "json": "display json", + "xml": "display xml" + } + + display_format = format_map.get(format, "display set") + command = f"show configuration | {display_format} | no-more" + + logger.info(f"Pulling SRX configuration in {format} format") + success, output = self._execute_ssh_command(command) + + if success: + logger.info(f"Retrieved {len(output)} characters of configuration") + return output + else: + logger.error("Failed to retrieve configuration") + return None + + def get_config_section(self, section: str) -> Optional[str]: + """ + Get specific configuration section + + Args: + section: Config section (e.g., 'security policies', 'interfaces') + + Returns: + Configuration section or None + """ + command = f"show configuration {section} | display set | no-more" + success, output = self._execute_ssh_command(command) + + if success: + return output + return None + + def parse_security_policies(self, config: str) -> Dict: + """ + Parse security policies from configuration + + Returns: + Dictionary of policies organized by zones + """ + policies = { + "zone_pairs": {}, + "total_policies": 0, + "applications": set(), + "addresses": set() + } + + # Regex patterns for parsing + policy_pattern = r'set security policies from-zone (\S+) to-zone (\S+) policy (\S+)' + app_pattern = r'set security policies .* application (\S+)' + addr_pattern = r'set security policies .* (source|destination)-address (\S+)' + + for line in config.split('\n'): + # Parse policy definitions + policy_match = re.match(policy_pattern, line) + if policy_match: + from_zone, to_zone, policy_name = policy_match.groups() + zone_pair = f"{from_zone}->{to_zone}" + + if zone_pair not in policies["zone_pairs"]: + policies["zone_pairs"][zone_pair] = [] + + if policy_name not in policies["zone_pairs"][zone_pair]: + policies["zone_pairs"][zone_pair].append(policy_name) + policies["total_policies"] += 1 + + # Parse applications + app_match = re.search(app_pattern, line) + if app_match: + policies["applications"].add(app_match.group(1)) + + # Parse addresses + addr_match = re.search(addr_pattern, line) + if addr_match: + policies["addresses"].add(addr_match.group(2)) + + # Convert sets to lists for JSON serialization + policies["applications"] = list(policies["applications"]) + policies["addresses"] = list(policies["addresses"]) + + return policies + + def validate_config_syntax(self, config_lines: List[str]) -> tuple[bool, List[str]]: + """ + Validate SRX configuration syntax + + Args: + config_lines: List of configuration commands + + Returns: + (valid, errors) tuple + """ + errors = [] + valid_commands = [ + 'set security policies', + 'set security zones', + 'set security address-book', + 'set applications application', + 'set firewall policer', + 'set firewall filter', + 'set class-of-service', + 'set interfaces', + 'set routing-options' + ] + + for i, line in enumerate(config_lines, 1): + line = line.strip() + + # Skip comments and empty lines + if not line or line.startswith('#'): + continue + + # Check if line starts with valid command + if not any(line.startswith(cmd) for cmd in valid_commands): + errors.append(f"Line {i}: Invalid command prefix: {line[:50]}") + + # Check for required keywords in policies + if 'security policies' in line and 'policy' in line: + if not any(keyword in line for keyword in ['match', 'then', 'from-zone', 'to-zone']): + errors.append(f"Line {i}: Policy missing required keywords: {line[:50]}") + + return len(errors) == 0, errors + + def test_connectivity(self) -> bool: + """ + Test SSH connectivity to SRX + + Returns: + True if connected successfully + """ + logger.info(f"Testing connectivity to {self.host}") + success, output = self._execute_ssh_command("show version | match Junos:") + + if success and "Junos:" in output: + version = output.strip() + logger.info(f"Connected successfully: {version}") + return True + else: + logger.error("Connectivity test failed") + return False + + def get_traffic_statistics(self) -> Optional[Dict]: + """ + Get interface traffic statistics + + Returns: + Dictionary of traffic stats or None + """ + command = "show interfaces statistics | display json" + success, output = self._execute_ssh_command(command) + + if success: + try: + # Parse JSON output + stats = json.loads(output) + return stats + except json.JSONDecodeError: + logger.error("Failed to parse traffic statistics JSON") + return None + return None + + def create_config_diff(self, current_config: str, proposed_config: List[str]) -> Dict: + """ + Create a diff between current and proposed configurations + + Args: + current_config: Current SRX configuration + proposed_config: List of proposed configuration lines + + Returns: + Dictionary with additions and analysis + """ + current_lines = set(current_config.split('\n')) + proposed_set = set(proposed_config) + + # Find truly new configurations + new_configs = [] + duplicate_configs = [] + + for config in proposed_set: + if config.strip() and not config.startswith('#'): + if config not in current_lines: + new_configs.append(config) + else: + duplicate_configs.append(config) + + return { + "new_configurations": new_configs, + "duplicate_configurations": duplicate_configs, + "total_proposed": len(proposed_config), + "total_new": len(new_configs), + "total_duplicates": len(duplicate_configs) + } + + +# Test function for standalone execution +if __name__ == "__main__": + # Configure logging + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + # Test the SRX Manager + srx = SRXManager( + host="192.168.100.1", + user="netops", + ssh_key="/home/netops/.ssh/srx_key" + ) + + # Test connectivity + if srx.test_connectivity(): + print("āœ… Connectivity test passed") + + # Get current config + config = srx.get_current_config() + if config: + print(f"āœ… Retrieved {len(config)} characters of configuration") + + # Parse policies + policies = srx.parse_security_policies(config) + print(f"šŸ“Š Found {policies['total_policies']} security policies") + print(f"šŸ“Š Zone pairs: {list(policies['zone_pairs'].keys())}") + else: + print("āŒ Failed to retrieve configuration") + else: + print("āŒ Connectivity test failed")