#!/usr/bin/env python3 """ SRX Configuration Collector Pulls current configuration from SRX and stores it for AI analysis """ import os import sys import json import yaml import paramiko from datetime import datetime from pathlib import Path import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SRXConfigCollector: def __init__(self, config_path='/home/netops/orchestrator/config.yaml'): """Initialize with orchestrator config""" with open(config_path, 'r') as f: self.config = yaml.safe_load(f) self.srx_config = self.config['srx'] self.config_dir = Path('/shared/ai-gitops/configs') self.config_dir.mkdir(parents=True, exist_ok=True) def connect_to_srx(self): """Establish SSH connection to SRX""" try: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Connect using SSH key client.connect( hostname=self.srx_config['host'], username=self.srx_config['username'], key_filename=self.srx_config['ssh_key'], port=22 ) logger.info(f"Connected to SRX at {self.srx_config['host']}") return client except Exception as e: logger.error(f"Failed to connect: {e}") return None def get_full_config(self, client): """Get complete SRX configuration""" logger.info("Fetching full SRX configuration...") stdin, stdout, stderr = client.exec_command('show configuration | no-more') config_output = stdout.read().decode('utf-8') if config_output: logger.info(f"Retrieved {len(config_output)} bytes of configuration") return config_output else: logger.error("Failed to retrieve configuration") return None def get_security_config(self, client): """Get security-specific configuration""" logger.info("Fetching security policies...") commands = [ 'show configuration security policies', 'show configuration security zones', 'show configuration security address-book', 'show configuration applications', 'show configuration security nat', 'show configuration interfaces' ] security_config = {} for cmd in commands: stdin, stdout, stderr = client.exec_command(f'{cmd} | no-more') output = stdout.read().decode('utf-8') section = cmd.split()[-1] # Get last word as section name security_config[section] = output logger.info(f"Retrieved {section} configuration") return security_config def analyze_config(self, full_config, security_config): """Analyze configuration and extract key information - FIXED VERSION""" analysis = { 'timestamp': datetime.now().isoformat(), 'zones': [], 'networks': {}, 'policies': [], 'policy_count': 0, 'applications': [], 'interfaces': {}, 'nat_rules': [], 'address_book': {} } # Extract zones - FIXED parsing for your format if 'zones' in security_config: zones_content = security_config['zones'] if zones_content: lines = zones_content.split('\n') for line in lines: # Your format: "security-zone WAN {" or "security-zone HOME {" if 'security-zone' in line and '{' in line: # Extract zone name between 'security-zone' and '{' parts = line.strip().split() if len(parts) >= 2 and parts[0] == 'security-zone': zone_name = parts[1] if zone_name != '{': # Make sure it's not just the bracket analysis['zones'].append(zone_name) analysis['networks'][zone_name] = [] # Extract address-book entries from zones section if 'zones' in security_config: lines = security_config['zones'].split('\n') current_zone = None in_address_book = False for line in lines: line = line.strip() # Track current zone if 'security-zone' in line and '{' in line: parts = line.split() if len(parts) >= 2: current_zone = parts[1] in_address_book = False # Check if we're in address-book section elif 'address-book' in line and '{' in line: in_address_book = True # Parse addresses within address-book elif in_address_book and 'address ' in line and current_zone: # Format: "address GAMING-NETWORK 192.168.10.0/24;" parts = line.split() if len(parts) >= 3 and parts[0] == 'address': addr_name = parts[1] addr_value = parts[2].rstrip(';') if '/' in addr_value or '.' in addr_value: analysis['address_book'][addr_name] = addr_value if current_zone in analysis['networks']: analysis['networks'][current_zone].append(addr_value) # Extract policies - FIXED for your format if 'policies' in security_config: policies_content = security_config['policies'] if policies_content: lines = policies_content.split('\n') from_zone = None to_zone = None current_policy = None for line in lines: line = line.strip() # Format: "from-zone HOME to-zone WAN {" if 'from-zone' in line and 'to-zone' in line: parts = line.split() if len(parts) >= 4: from_idx = parts.index('from-zone') if 'from-zone' in parts else -1 to_idx = parts.index('to-zone') if 'to-zone' in parts else -1 if from_idx >= 0 and to_idx >= 0: from_zone = parts[from_idx + 1] if from_idx + 1 < len(parts) else None to_zone = parts[to_idx + 1] if to_idx + 1 < len(parts) else None to_zone = to_zone.rstrip('{') if to_zone else None # Format: "policy GAMING-VLAN-PRIORITY {" elif 'policy ' in line and '{' in line and from_zone and to_zone: parts = line.split() if len(parts) >= 2 and parts[0] == 'policy': policy_name = parts[1].rstrip('{') analysis['policies'].append({ 'name': policy_name, 'from_zone': from_zone, 'to_zone': to_zone }) analysis['policy_count'] += 1 # Extract applications if 'applications' in security_config: apps_content = security_config['applications'] if apps_content: lines = apps_content.split('\n') for line in lines: # Format: "application PS5-HTTP {" if 'application ' in line and '{' in line: parts = line.strip().split() if len(parts) >= 2 and parts[0] == 'application': app_name = parts[1].rstrip('{') if app_name and app_name != 'application': analysis['applications'].append(app_name) # Extract interfaces with IPs if 'interfaces' in security_config: interfaces_content = security_config['interfaces'] if interfaces_content: lines = interfaces_content.split('\n') current_interface = None for line in lines: line = line.strip() # Interface line (e.g., "ge-0/0/0 {" or "reth0 {") if (line.startswith('ge-') or line.startswith('reth')) and '{' in line: current_interface = line.split()[0] analysis['interfaces'][current_interface] = {'addresses': []} # IP address line (e.g., "address 192.168.1.1/24;") elif current_interface and 'address ' in line and '/' in line: parts = line.split() for part in parts: if '/' in part: addr = part.rstrip(';') analysis['interfaces'][current_interface]['addresses'].append(addr) # Extract NAT rules if 'nat' in security_config: nat_content = security_config['nat'] if nat_content: source_nat_count = nat_content.count('source pool') dest_nat_count = nat_content.count('destination pool') analysis['nat_rules'] = { 'source_nat': source_nat_count, 'destination_nat': dest_nat_count, 'total': source_nat_count + dest_nat_count } return analysis def save_config(self, full_config, security_config, analysis): """Save configuration and analysis""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # Save full config full_config_path = self.config_dir / f'srx_config_{timestamp}.txt' with open(full_config_path, 'w') as f: f.write(full_config) logger.info(f"Saved full config to {full_config_path}") # Save latest symlink latest_path = self.config_dir / 'srx_config_latest.txt' if latest_path.exists(): latest_path.unlink() latest_path.symlink_to(full_config_path.name) # Save security config sections security_config_path = self.config_dir / f'srx_security_config_{timestamp}.json' with open(security_config_path, 'w') as f: json.dump(security_config, f, indent=2) # Save analysis analysis_path = self.config_dir / f'srx_config_analysis_{timestamp}.json' with open(analysis_path, 'w') as f: json.dump(analysis, f, indent=2) logger.info(f"Saved config analysis to {analysis_path}") # Save latest analysis symlink latest_analysis = self.config_dir / 'srx_config_analysis_latest.json' if latest_analysis.exists(): latest_analysis.unlink() latest_analysis.symlink_to(analysis_path.name) return analysis def collect(self): """Main collection process""" logger.info("Starting SRX configuration collection...") # Connect to SRX client = self.connect_to_srx() if not client: return None try: # Get configurations full_config = self.get_full_config(client) security_config = self.get_security_config(client) if full_config: # Analyze configuration analysis = self.analyze_config(full_config, security_config) # Save everything self.save_config(full_config, security_config, analysis) # Print summary print("\nšŸ“Š Configuration Summary:") print(f"Zones: {', '.join(analysis['zones'])}") print(f"Networks: {len([n for nets in analysis['networks'].values() for n in nets])} subnets across {len(analysis['zones'])} zones") print(f"Policies: {analysis.get('policy_count', 0)} security policies") print(f"Address Book: {len(analysis['address_book'])} entries") print(f"Interfaces: {len(analysis['interfaces'])} configured") return analysis finally: client.close() logger.info("Disconnected from SRX") def main(): collector = SRXConfigCollector() analysis = collector.collect() if analysis: print("\nāœ… Configuration collected successfully!") print(f"Files saved in: /shared/ai-gitops/configs/") else: print("\nāŒ Failed to collect configuration") sys.exit(1) if __name__ == "__main__": main()