#!/usr/bin/env python3 """ SRX Configuration Manager Handles all interactions with the Juniper SRX device """ import subprocess import logging import json from datetime import datetime from typing import Dict, Optional, List import re logger = logging.getLogger(__name__) class SRXManager: """Manages SRX configuration retrieval and deployment""" def __init__(self, host: str, user: str, ssh_key: str): """ Initialize SRX Manager Args: host: SRX IP address user: SSH username ssh_key: Path to SSH private key """ self.host = host self.user = user self.ssh_key = ssh_key def _execute_ssh_command(self, command: str) -> tuple[bool, str]: """ Execute command on SRX via SSH Returns: (success, output) tuple """ ssh_cmd = [ 'ssh', '-i', self.ssh_key, '-o', 'StrictHostKeyChecking=no', '-o', 'ConnectTimeout=10', f'{self.user}@{self.host}', command ] try: result = subprocess.run( ssh_cmd, capture_output=True, text=True, timeout=30 ) if result.returncode == 0: logger.info(f"Successfully executed: {command[:50]}...") return True, result.stdout else: logger.error(f"Command failed: {result.stderr}") return False, result.stderr except subprocess.TimeoutExpired: logger.error("SSH command timed out") return False, "Command timed out" except Exception as e: logger.error(f"SSH execution error: {e}") return False, str(e) def get_current_config(self, format: str = "set") -> Optional[str]: """ Retrieve current SRX configuration Args: format: Configuration format ('set', 'json', 'xml') Returns: Configuration string or None if failed """ format_map = { "set": "display set", "json": "display json", "xml": "display xml" } display_format = format_map.get(format, "display set") command = f"show configuration | {display_format} | no-more" logger.info(f"Pulling SRX configuration in {format} format") success, output = self._execute_ssh_command(command) if success: logger.info(f"Retrieved {len(output)} characters of configuration") return output else: logger.error("Failed to retrieve configuration") return None def get_config_section(self, section: str) -> Optional[str]: """ Get specific configuration section Args: section: Config section (e.g., 'security policies', 'interfaces') Returns: Configuration section or None """ command = f"show configuration {section} | display set | no-more" success, output = self._execute_ssh_command(command) if success: return output return None def parse_security_policies(self, config: str) -> Dict: """ Parse security policies from configuration Returns: Dictionary of policies organized by zones """ policies = { "zone_pairs": {}, "total_policies": 0, "applications": set(), "addresses": set() } # Regex patterns for parsing policy_pattern = r'set security policies from-zone (\S+) to-zone (\S+) policy (\S+)' app_pattern = r'set security policies .* application (\S+)' addr_pattern = r'set security policies .* (source|destination)-address (\S+)' for line in config.split('\n'): # Parse policy definitions policy_match = re.match(policy_pattern, line) if policy_match: from_zone, to_zone, policy_name = policy_match.groups() zone_pair = f"{from_zone}->{to_zone}" if zone_pair not in policies["zone_pairs"]: policies["zone_pairs"][zone_pair] = [] if policy_name not in policies["zone_pairs"][zone_pair]: policies["zone_pairs"][zone_pair].append(policy_name) policies["total_policies"] += 1 # Parse applications app_match = re.search(app_pattern, line) if app_match: policies["applications"].add(app_match.group(1)) # Parse addresses addr_match = re.search(addr_pattern, line) if addr_match: policies["addresses"].add(addr_match.group(2)) # Convert sets to lists for JSON serialization policies["applications"] = list(policies["applications"]) policies["addresses"] = list(policies["addresses"]) return policies def validate_config_syntax(self, config_lines: List[str]) -> tuple[bool, List[str]]: """ Validate SRX configuration syntax Args: config_lines: List of configuration commands Returns: (valid, errors) tuple """ errors = [] valid_commands = [ 'set security policies', 'set security zones', 'set security address-book', 'set applications application', 'set firewall policer', 'set firewall filter', 'set class-of-service', 'set interfaces', 'set routing-options' ] for i, line in enumerate(config_lines, 1): line = line.strip() # Skip comments and empty lines if not line or line.startswith('#'): continue # Check if line starts with valid command if not any(line.startswith(cmd) for cmd in valid_commands): errors.append(f"Line {i}: Invalid command prefix: {line[:50]}") # Check for required keywords in policies if 'security policies' in line and 'policy' in line: if not any(keyword in line for keyword in ['match', 'then', 'from-zone', 'to-zone']): errors.append(f"Line {i}: Policy missing required keywords: {line[:50]}") return len(errors) == 0, errors def test_connectivity(self) -> bool: """ Test SSH connectivity to SRX Returns: True if connected successfully """ logger.info(f"Testing connectivity to {self.host}") success, output = self._execute_ssh_command("show version | match Junos:") if success and "Junos:" in output: version = output.strip() logger.info(f"Connected successfully: {version}") return True else: logger.error("Connectivity test failed") return False def get_traffic_statistics(self) -> Optional[Dict]: """ Get interface traffic statistics Returns: Dictionary of traffic stats or None """ command = "show interfaces statistics | display json" success, output = self._execute_ssh_command(command) if success: try: # Parse JSON output stats = json.loads(output) return stats except json.JSONDecodeError: logger.error("Failed to parse traffic statistics JSON") return None return None def create_config_diff(self, current_config: str, proposed_config: List[str]) -> Dict: """ Create a diff between current and proposed configurations Args: current_config: Current SRX configuration proposed_config: List of proposed configuration lines Returns: Dictionary with additions and analysis """ current_lines = set(current_config.split('\n')) proposed_set = set(proposed_config) # Find truly new configurations new_configs = [] duplicate_configs = [] for config in proposed_set: if config.strip() and not config.startswith('#'): if config not in current_lines: new_configs.append(config) else: duplicate_configs.append(config) return { "new_configurations": new_configs, "duplicate_configurations": duplicate_configs, "total_proposed": len(proposed_config), "total_new": len(new_configs), "total_duplicates": len(duplicate_configs) } # Test function for standalone execution if __name__ == "__main__": # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Test the SRX Manager srx = SRXManager( host="192.168.100.1", user="netops", ssh_key="/home/netops/.ssh/srx_key" ) # Test connectivity if srx.test_connectivity(): print("✅ Connectivity test passed") # Get current config config = srx.get_current_config() if config: print(f"✅ Retrieved {len(config)} characters of configuration") # Parse policies policies = srx.parse_security_policies(config) print(f"📊 Found {policies['total_policies']} security policies") print(f"📊 Zone pairs: {list(policies['zone_pairs'].keys())}") else: print("❌ Failed to retrieve configuration") else: print("❌ Connectivity test failed")