Files
srx-ai-automation-docs/scripts/orchestrator/srx/srx_manager.py

310 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""
SRX Configuration Manager
Handles all interactions with the Juniper SRX device
"""
import subprocess
import logging
import json
from datetime import datetime
from typing import Dict, Optional, List
import re
logger = logging.getLogger(__name__)
class SRXManager:
"""Manages SRX configuration retrieval and deployment"""
def __init__(self, host: str, user: str, ssh_key: str):
"""
Initialize SRX Manager
Args:
host: SRX IP address
user: SSH username
ssh_key: Path to SSH private key
"""
self.host = host
self.user = user
self.ssh_key = ssh_key
def _execute_ssh_command(self, command: str) -> tuple[bool, str]:
"""
Execute command on SRX via SSH
Returns:
(success, output) tuple
"""
ssh_cmd = [
'ssh',
'-i', self.ssh_key,
'-o', 'StrictHostKeyChecking=no',
'-o', 'ConnectTimeout=10',
f'{self.user}@{self.host}',
command
]
try:
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
logger.info(f"Successfully executed: {command[:50]}...")
return True, result.stdout
else:
logger.error(f"Command failed: {result.stderr}")
return False, result.stderr
except subprocess.TimeoutExpired:
logger.error("SSH command timed out")
return False, "Command timed out"
except Exception as e:
logger.error(f"SSH execution error: {e}")
return False, str(e)
def get_current_config(self, format: str = "set") -> Optional[str]:
"""
Retrieve current SRX configuration
Args:
format: Configuration format ('set', 'json', 'xml')
Returns:
Configuration string or None if failed
"""
format_map = {
"set": "display set",
"json": "display json",
"xml": "display xml"
}
display_format = format_map.get(format, "display set")
command = f"show configuration | {display_format} | no-more"
logger.info(f"Pulling SRX configuration in {format} format")
success, output = self._execute_ssh_command(command)
if success:
logger.info(f"Retrieved {len(output)} characters of configuration")
return output
else:
logger.error("Failed to retrieve configuration")
return None
def get_config_section(self, section: str) -> Optional[str]:
"""
Get specific configuration section
Args:
section: Config section (e.g., 'security policies', 'interfaces')
Returns:
Configuration section or None
"""
command = f"show configuration {section} | display set | no-more"
success, output = self._execute_ssh_command(command)
if success:
return output
return None
def parse_security_policies(self, config: str) -> Dict:
"""
Parse security policies from configuration
Returns:
Dictionary of policies organized by zones
"""
policies = {
"zone_pairs": {},
"total_policies": 0,
"applications": set(),
"addresses": set()
}
# Regex patterns for parsing
policy_pattern = r'set security policies from-zone (\S+) to-zone (\S+) policy (\S+)'
app_pattern = r'set security policies .* application (\S+)'
addr_pattern = r'set security policies .* (source|destination)-address (\S+)'
for line in config.split('\n'):
# Parse policy definitions
policy_match = re.match(policy_pattern, line)
if policy_match:
from_zone, to_zone, policy_name = policy_match.groups()
zone_pair = f"{from_zone}->{to_zone}"
if zone_pair not in policies["zone_pairs"]:
policies["zone_pairs"][zone_pair] = []
if policy_name not in policies["zone_pairs"][zone_pair]:
policies["zone_pairs"][zone_pair].append(policy_name)
policies["total_policies"] += 1
# Parse applications
app_match = re.search(app_pattern, line)
if app_match:
policies["applications"].add(app_match.group(1))
# Parse addresses
addr_match = re.search(addr_pattern, line)
if addr_match:
policies["addresses"].add(addr_match.group(2))
# Convert sets to lists for JSON serialization
policies["applications"] = list(policies["applications"])
policies["addresses"] = list(policies["addresses"])
return policies
def validate_config_syntax(self, config_lines: List[str]) -> tuple[bool, List[str]]:
"""
Validate SRX configuration syntax
Args:
config_lines: List of configuration commands
Returns:
(valid, errors) tuple
"""
errors = []
valid_commands = [
'set security policies',
'set security zones',
'set security address-book',
'set applications application',
'set firewall policer',
'set firewall filter',
'set class-of-service',
'set interfaces',
'set routing-options'
]
for i, line in enumerate(config_lines, 1):
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith('#'):
continue
# Check if line starts with valid command
if not any(line.startswith(cmd) for cmd in valid_commands):
errors.append(f"Line {i}: Invalid command prefix: {line[:50]}")
# Check for required keywords in policies
if 'security policies' in line and 'policy' in line:
if not any(keyword in line for keyword in ['match', 'then', 'from-zone', 'to-zone']):
errors.append(f"Line {i}: Policy missing required keywords: {line[:50]}")
return len(errors) == 0, errors
def test_connectivity(self) -> bool:
"""
Test SSH connectivity to SRX
Returns:
True if connected successfully
"""
logger.info(f"Testing connectivity to {self.host}")
success, output = self._execute_ssh_command("show version | match Junos:")
if success and "Junos:" in output:
version = output.strip()
logger.info(f"Connected successfully: {version}")
return True
else:
logger.error("Connectivity test failed")
return False
def get_traffic_statistics(self) -> Optional[Dict]:
"""
Get interface traffic statistics
Returns:
Dictionary of traffic stats or None
"""
command = "show interfaces statistics | display json"
success, output = self._execute_ssh_command(command)
if success:
try:
# Parse JSON output
stats = json.loads(output)
return stats
except json.JSONDecodeError:
logger.error("Failed to parse traffic statistics JSON")
return None
return None
def create_config_diff(self, current_config: str, proposed_config: List[str]) -> Dict:
"""
Create a diff between current and proposed configurations
Args:
current_config: Current SRX configuration
proposed_config: List of proposed configuration lines
Returns:
Dictionary with additions and analysis
"""
current_lines = set(current_config.split('\n'))
proposed_set = set(proposed_config)
# Find truly new configurations
new_configs = []
duplicate_configs = []
for config in proposed_set:
if config.strip() and not config.startswith('#'):
if config not in current_lines:
new_configs.append(config)
else:
duplicate_configs.append(config)
return {
"new_configurations": new_configs,
"duplicate_configurations": duplicate_configs,
"total_proposed": len(proposed_config),
"total_new": len(new_configs),
"total_duplicates": len(duplicate_configs)
}
# Test function for standalone execution
if __name__ == "__main__":
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Test the SRX Manager
srx = SRXManager(
host="192.168.100.1",
user="netops",
ssh_key="/home/netops/.ssh/srx_key"
)
# Test connectivity
if srx.test_connectivity():
print("✅ Connectivity test passed")
# Get current config
config = srx.get_current_config()
if config:
print(f"✅ Retrieved {len(config)} characters of configuration")
# Parse policies
policies = srx.parse_security_policies(config)
print(f"📊 Found {policies['total_policies']} security policies")
print(f"📊 Zone pairs: {list(policies['zone_pairs'].keys())}")
else:
print("❌ Failed to retrieve configuration")
else:
print("❌ Connectivity test failed")