From ab9a661551ec05ad1edf12f6e78fd2eac81ec0c8 Mon Sep 17 00:00:00 2001 From: netops Date: Thu, 4 Sep 2025 19:35:35 +0000 Subject: [PATCH] Remove test and development scripts from production docs --- scripts/orchestrator/core/force_deployment.py | 210 ------ .../orchestrator/core/force_pipeline_test.py | 133 ---- .../core/gitea_integration_newer.py | 384 ----------- .../core/gitea_integration_newerer.py | 351 ---------- .../core/orchestrator_main_enhanced.py | 544 --------------- .../core/orchestrator_main_newer.py | 621 ------------------ .../core/pipeline_status_enhanced.py | 204 ------ scripts/orchestrator/core/test_context.py | 193 ------ .../orchestrator/core/test_feedback_loop.py | 364 ---------- scripts/orchestrator/core/test_git_auth.py | 90 --- scripts/orchestrator/core/test_pr_creation.py | 26 - scripts/orchestrator/core/test_pr_schedule.py | 11 - scripts/orchestrator/core/test_request.py | 47 -- scripts/orchestrator/core/test_simple_push.py | 54 -- .../core/test_split_architecture.py | 118 ---- scripts/orchestrator/core/validate_latest.py | 34 - 16 files changed, 3384 deletions(-) delete mode 100755 scripts/orchestrator/core/force_deployment.py delete mode 100755 scripts/orchestrator/core/force_pipeline_test.py delete mode 100644 scripts/orchestrator/core/gitea_integration_newer.py delete mode 100644 scripts/orchestrator/core/gitea_integration_newerer.py delete mode 100644 scripts/orchestrator/core/orchestrator_main_enhanced.py delete mode 100644 scripts/orchestrator/core/orchestrator_main_newer.py delete mode 100644 scripts/orchestrator/core/pipeline_status_enhanced.py delete mode 100755 scripts/orchestrator/core/test_context.py delete mode 100755 scripts/orchestrator/core/test_feedback_loop.py delete mode 100755 scripts/orchestrator/core/test_git_auth.py delete mode 100755 scripts/orchestrator/core/test_pr_creation.py delete mode 100644 scripts/orchestrator/core/test_pr_schedule.py delete mode 100755 scripts/orchestrator/core/test_request.py delete mode 100644 scripts/orchestrator/core/test_simple_push.py delete mode 100755 scripts/orchestrator/core/test_split_architecture.py delete mode 100644 scripts/orchestrator/core/validate_latest.py diff --git a/scripts/orchestrator/core/force_deployment.py b/scripts/orchestrator/core/force_deployment.py deleted file mode 100755 index 6cbc091..0000000 --- a/scripts/orchestrator/core/force_deployment.py +++ /dev/null @@ -1,210 +0,0 @@ -#!/usr/bin/env python3 -""" -Force Deployment - Manually deploy approved configurations to SRX -""" -import os -import json -import yaml -import time -import paramiko -from datetime import datetime -from pathlib import Path - -class ManualDeployment: - def __init__(self): - # Load configuration - with open('/home/netops/orchestrator/config.yaml', 'r') as f: - self.config = yaml.safe_load(f) - - self.srx_config = self.config['srx'] - - # Load state to check for merged PRs - state_file = '/var/lib/orchestrator/state.json' - with open(state_file, 'r') as f: - self.state = json.load(f) - - def get_latest_merged_config(self): - """Find the most recent merged configuration""" - # Check if there's a recently merged PR - if self.state.get('last_pr_status') == 'merged': - pr_number = self.state.get('deployment_pr') - print(f"โœ… Found merged PR #{pr_number}") - - # For now, we'll use the latest response as the config - # In production, this would fetch from the merged PR - response_dir = '/shared/ai-gitops/responses' - if os.path.exists(response_dir): - files = sorted(os.listdir(response_dir), key=lambda x: os.path.getmtime(os.path.join(response_dir, x))) - if files: - latest_file = os.path.join(response_dir, files[-1]) - with open(latest_file, 'r') as f: - data = json.load(f) - return data.get('suggestions', data.get('response', '')) - - return None - - def connect_to_srx(self): - """Establish SSH connection to SRX""" - print(f"\n๐Ÿ”Œ Connecting to SRX at {self.srx_config['host']}...") - - try: - client = paramiko.SSHClient() - client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - - # Connect using SSH key - client.connect( - hostname=self.srx_config['host'], - username=self.srx_config['username'], - key_filename=self.srx_config['ssh_key'], - port=22 - ) - - print("โœ… Connected to SRX successfully") - return client - - except Exception as e: - print(f"โŒ Failed to connect: {e}") - return None - - def deploy_config(self, client, config_text): - """Deploy configuration to SRX with commit confirmed""" - print("\n๐Ÿ“ค Deploying configuration to SRX...") - - try: - # Enter configuration mode - stdin, stdout, stderr = client.exec_command('configure') - time.sleep(1) - - # Load the configuration - print("Loading configuration...") - config_lines = config_text.strip().split('\n') - - for line in config_lines: - if line.strip() and not line.startswith('#'): - stdin, stdout, stderr = client.exec_command(f'configure\n{line}') - result = stdout.read().decode() - if 'error' in result.lower(): - print(f"โš ๏ธ Error with command: {line}") - print(f" {result}") - - # Commit with confirmed (2 minute timeout) - print("\n๐Ÿ”„ Committing configuration with 2-minute confirmation timeout...") - stdin, stdout, stderr = client.exec_command('configure\ncommit confirmed 2\nexit') - commit_result = stdout.read().decode() - - if 'commit complete' in commit_result.lower(): - print("โœ… Configuration committed (pending confirmation)") - - # Wait a bit to test connectivity - print("โณ Testing configuration (30 seconds)...") - time.sleep(30) - - # If we're still connected, confirm the commit - print("โœ… Configuration appears stable, confirming commit...") - stdin, stdout, stderr = client.exec_command('configure\ncommit\nexit') - confirm_result = stdout.read().decode() - - if 'commit complete' in confirm_result.lower(): - print("โœ… Configuration confirmed and saved!") - return True - else: - print("โŒ Failed to confirm configuration") - return False - else: - print("โŒ Initial commit failed") - print(commit_result) - return False - - except Exception as e: - print(f"โŒ Deployment error: {e}") - return False - - def create_deployment_record(self, success, config_text): - """Record the deployment attempt""" - timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') - - if success: - deploy_dir = '/shared/ai-gitops/deployed' - filename = f"deployed_{timestamp}.conf" - else: - deploy_dir = '/shared/ai-gitops/failed' - filename = f"failed_{timestamp}.conf" - - os.makedirs(deploy_dir, exist_ok=True) - filepath = os.path.join(deploy_dir, filename) - - with open(filepath, 'w') as f: - f.write(f"# Deployment {'SUCCESS' if success else 'FAILED'}\n") - f.write(f"# Timestamp: {datetime.now().isoformat()}\n") - f.write(f"# SRX: {self.srx_config['host']}\n\n") - f.write(config_text) - - print(f"\n๐Ÿ“ Deployment record saved to: {filepath}") - - def run(self): - """Run the manual deployment""" - print("\n๐Ÿš€ MANUAL DEPLOYMENT TO SRX") - print("="*60) - - # Get configuration to deploy - print("\n๐Ÿ“‹ Looking for approved configuration...") - config_text = self.get_latest_merged_config() - - if not config_text: - print("โŒ No approved configuration found") - print("\nMake sure you have:") - print("1. Created a PR using force_pipeline_test.py") - print("2. Merged the PR in Gitea") - return False - - print("\n๐Ÿ“„ Configuration to deploy:") - print("-"*40) - print(config_text[:500] + "..." if len(config_text) > 500 else config_text) - print("-"*40) - - print("\nโš ๏ธ WARNING: This will apply configuration to your production SRX!") - print("The configuration will auto-rollback after 2 minutes if not confirmed.") - print("\nDo you want to continue? (yes/no): ", end="") - - response = input().strip().lower() - if response != 'yes': - print("Deployment cancelled.") - return False - - # Connect to SRX - client = self.connect_to_srx() - if not client: - return False - - try: - # Deploy the configuration - success = self.deploy_config(client, config_text) - - # Record the deployment - self.create_deployment_record(success, config_text) - - # Update state - if success: - self.state['last_successful_deployment'] = datetime.now().isoformat() - self.state['pending_deployment'] = False - else: - self.state['last_failed_deployment'] = datetime.now().isoformat() - - with open('/var/lib/orchestrator/state.json', 'w') as f: - json.dump(self.state, f, indent=2) - - return success - - finally: - client.close() - print("\n๐Ÿ”Œ Disconnected from SRX") - -def main(): - deployment = ManualDeployment() - if deployment.run(): - print("\nโœ… Deployment completed successfully!") - else: - print("\nโŒ Deployment failed") - -if __name__ == "__main__": - main() diff --git a/scripts/orchestrator/core/force_pipeline_test.py b/scripts/orchestrator/core/force_pipeline_test.py deleted file mode 100755 index 73baf12..0000000 --- a/scripts/orchestrator/core/force_pipeline_test.py +++ /dev/null @@ -1,133 +0,0 @@ -#!/usr/bin/env python3 -""" -Force Pipeline Test - Manual trigger for complete pipeline cycle -This will analyze all data and create a PR immediately -""" -import os -import sys -import json -import yaml -from datetime import datetime -import time - -# Add orchestrator directory to path -sys.path.append('/home/netops/orchestrator') - -from orchestrator_main import NetworkOrchestrator -from gitea_integration import GiteaIntegration - -def force_pr_creation(): - """Force the creation of a PR with all accumulated data""" - print("๐Ÿš€ Starting forced pipeline test...") - print("="*60) - - # Load configuration - with open('/home/netops/orchestrator/config.yaml', 'r') as f: - config = yaml.safe_load(f) - - # Initialize orchestrator - orchestrator = NetworkOrchestrator('/home/netops/orchestrator/config.yaml') - - print("\n๐Ÿ“Š Step 1: Setting up Elasticsearch connection...") - orchestrator.setup_elasticsearch() - - print("\n๐Ÿ“ˆ Step 2: Collecting all available traffic data...") - # Temporarily override the analysis window to get ALL data - original_window = orchestrator.config['analysis']['window_hours'] - orchestrator.config['analysis']['window_hours'] = 168 # 7 days of data - - traffic_data = orchestrator.collect_traffic_data() - - if not traffic_data: - print("โŒ No traffic data available") - return False - - # Show summary of collected data - top_talkers = traffic_data.get('top_talkers', {}).get('buckets', []) - print(f"โœ… Collected data summary:") - print(f" - Top talkers: {len(top_talkers)} IPs") - print(f" - VLANs: {len(traffic_data.get('vlans', {}).get('buckets', []))}") - print(f" - Protocols: {len(traffic_data.get('protocols', {}).get('buckets', []))}") - - print("\n๐Ÿค– Step 3: Requesting AI analysis...") - ai_response = orchestrator.request_ai_analysis(traffic_data) - - if not ai_response: - print("โŒ Failed to get AI response") - return False - - print("โœ… AI analysis complete") - - # Save the analysis to state - orchestrator.save_state({ - 'last_analysis_run': datetime.now().isoformat(), - 'last_analysis_data': { - 'top_talkers_count': len(top_talkers), - 'response_received': True - } - }) - - print("\n๐Ÿ“ Step 4: Creating PR in Gitea...") - - # Force PR creation by temporarily overriding the schedule check - # Save original should_create_pr method - original_should_create = orchestrator.should_create_pr - - # Override to always return True - orchestrator.should_create_pr = lambda: True - - # Clear any pending PR flag - state = orchestrator.load_state() - if 'pending_pr' in state: - del state['pending_pr'] - orchestrator.save_state({'pending_pr': None}) - - # Create PR - success = orchestrator.create_gitea_pr(ai_response) - - # Restore original method - orchestrator.should_create_pr = original_should_create - - if success: - print("\nโœ… PR created successfully!") - - # Get the PR number from state - state = orchestrator.load_state() - pr_number = state.get('pending_pr') - pr_url = state.get('pr_url') - - print(f"\n๐Ÿ”— PR Details:") - print(f" - PR Number: #{pr_number}") - print(f" - URL: {pr_url}") - print(f"\n๐Ÿ“‹ Next Steps:") - print(f" 1. Review the PR at: {pr_url}") - print(f" 2. Click 'Merge Pull Request' to approve") - print(f" 3. Run: python3 force_deployment.py") - - return True - else: - print("โŒ Failed to create PR") - return False - -def main(): - """Main function""" - print("\n๐Ÿ”ฌ FORCE PIPELINE TEST") - print("This will:") - print("1. Analyze all traffic data from the past week") - print("2. Generate AI suggestions") - print("3. Create a PR in Gitea immediately") - print("\nDo you want to continue? (yes/no): ", end="") - - response = input().strip().lower() - if response != 'yes': - print("Cancelled.") - return - - # Run the test - if force_pr_creation(): - print("\nโœ… Pipeline test successful!") - else: - print("\nโŒ Pipeline test failed") - -if __name__ == "__main__": - main() diff --git a/scripts/orchestrator/core/gitea_integration_newer.py b/scripts/orchestrator/core/gitea_integration_newer.py deleted file mode 100644 index 8978eb8..0000000 --- a/scripts/orchestrator/core/gitea_integration_newer.py +++ /dev/null @@ -1,384 +0,0 @@ -#!/usr/bin/env python3 -""" -Gitea Integration Module for SRX GitOps - Fixed Authentication -Handles Git operations and Gitea API interactions -""" -import os -import json -import logging -import tempfile -import shutil -from datetime import datetime -from typing import Dict, Optional, Tuple -import subprocess -import requests -from urllib.parse import urlparse - -logger = logging.getLogger(__name__) - -class GiteaIntegration: - """Handles all Gitea-related operations""" - - def __init__(self, config: Dict): - """ - Initialize Gitea integration - - Args: - config: Dictionary containing: - - url: Gitea instance URL - - token: API token - - repo: repository in format "owner/repo" - - branch: default branch (usually "main") - """ - self.url = config['url'].rstrip('/') - self.token = config['token'] - self.repo = config['repo'] - self.default_branch = config.get('branch', 'main') - - # Parse owner and repo name - self.owner, self.repo_name = self.repo.split('/') - - # Set up API headers - self.headers = { - 'Authorization': f'token {self.token}', - 'Content-Type': 'application/json' - } - - # Git configuration - Fix authentication format - self.git_url = f"{self.url}/{self.repo}.git" - - logger.info(f"Initialized Gitea integration for {self.repo}") - - def _run_git_command(self, cmd: list, cwd: str = None) -> Tuple[bool, str]: - """ - Run a git command and return success status and output - - Args: - cmd: List of command arguments - cwd: Working directory - - Returns: - Tuple of (success, output) - """ - try: - # Create a copy of the command to modify - auth_cmd = cmd.copy() - - # Add authentication to git commands that need it - if any(action in cmd for action in ['clone', 'push', 'pull', 'fetch']): - for i, arg in enumerate(auth_cmd): - if arg.startswith('http'): - # Gitea supports multiple auth formats, let's use oauth2 - parsed = urlparse(arg) - # Try oauth2 format which is commonly supported - auth_url = f"{parsed.scheme}://oauth2:{self.token}@{parsed.netloc}{parsed.path}" - auth_cmd[i] = auth_url - break - - # Also set up git credentials via environment - env = os.environ.copy() - env['GIT_ASKPASS'] = 'echo' - env['GIT_USERNAME'] = 'oauth2' - env['GIT_PASSWORD'] = self.token - - result = subprocess.run( - auth_cmd, - cwd=cwd, - capture_output=True, - text=True, - check=True, - env=env - ) - return True, result.stdout - except subprocess.CalledProcessError as e: - logger.error(f"Git command failed: {' '.join(cmd)}") - logger.error(f"Error: {e.stderr}") - return False, e.stderr - - def test_authentication(self) -> bool: - """Test if Git authentication is working""" - try: - logger.info("Testing Git authentication...") - # Try to list remote refs - success, output = self._run_git_command( - ['git', 'ls-remote', self.git_url, 'HEAD'] - ) - if success: - logger.info("Git authentication successful") - return True - else: - logger.error("Git authentication failed") - return False - except Exception as e: - logger.error(f"Authentication test error: {e}") - return False - - def create_pr_with_config(self, srx_config: str, title: str = None, - description: str = None) -> Optional[Dict]: - """ - Create a pull request with SRX configuration - - Args: - srx_config: The SRX configuration content - title: PR title (auto-generated if not provided) - description: PR description (auto-generated if not provided) - - Returns: - PR information dict or None if failed - """ - # First test authentication - if not self.test_authentication(): - logger.error("Git authentication test failed, aborting PR creation") - return None - - timestamp = datetime.now().strftime('%Y%m%d-%H%M%S') - branch_name = f"ai-suggestions-{timestamp}" - - # Auto-generate title and description if not provided - if not title: - title = f"AI Network Configuration Suggestions - {datetime.now().strftime('%Y-%m-%d')}" - - if not description: - description = self._generate_pr_description(srx_config) - - # Create temporary directory for git operations - with tempfile.TemporaryDirectory() as temp_dir: - logger.info(f"Working in temporary directory: {temp_dir}") - - # Step 1: Clone the repository - logger.info("Cloning repository...") - success, output = self._run_git_command( - ['git', 'clone', '--depth', '1', self.git_url, '.'], - cwd=temp_dir - ) - if not success: - logger.error("Failed to clone repository") - # Try alternative authentication method - logger.info("Trying alternative clone method...") - # Use git credential helper - self._setup_git_credentials(temp_dir) - success, output = self._run_git_command( - ['git', 'clone', '--depth', '1', self.git_url, '.'], - cwd=temp_dir - ) - if not success: - logger.error("All clone attempts failed") - return None - - # Step 2: Configure git user - self._run_git_command( - ['git', 'config', 'user.email', 'ai-orchestrator@srx-gitops.local'], - cwd=temp_dir - ) - self._run_git_command( - ['git', 'config', 'user.name', 'AI Orchestrator'], - cwd=temp_dir - ) - - # Step 3: Create and checkout new branch - logger.info(f"Creating branch: {branch_name}") - success, _ = self._run_git_command( - ['git', 'checkout', '-b', branch_name], - cwd=temp_dir - ) - if not success: - logger.error("Failed to create branch") - return None - - # Step 4: Create ai-suggestions directory if it doesn't exist - suggestions_dir = os.path.join(temp_dir, 'ai-suggestions') - os.makedirs(suggestions_dir, exist_ok=True) - - # Step 5: Write configuration file - config_filename = f"suggestion-{timestamp}.conf" - config_path = os.path.join(suggestions_dir, config_filename) - - with open(config_path, 'w') as f: - f.write(f"# AI-Generated SRX Configuration\n") - f.write(f"# Generated: {datetime.now().isoformat()}\n") - f.write(f"# Analysis Period: Last 7 days\n\n") - f.write(srx_config) - - logger.info(f"Created config file: {config_filename}") - - # Step 6: Add and commit changes - self._run_git_command(['git', 'add', '.'], cwd=temp_dir) - - commit_message = f"Add AI-generated configuration suggestions for {datetime.now().strftime('%Y-%m-%d')}" - success, _ = self._run_git_command( - ['git', 'commit', '-m', commit_message], - cwd=temp_dir - ) - if not success: - logger.error("Failed to commit changes") - return None - - # Step 7: Push branch - logger.info(f"Pushing branch {branch_name}...") - success, _ = self._run_git_command( - ['git', 'push', 'origin', branch_name], - cwd=temp_dir - ) - if not success: - logger.error("Failed to push branch") - return None - - # Step 8: Create pull request via API - logger.info("Creating pull request via Gitea API...") - - # First, get the label IDs if configured - label_ids = [] - if 'labels' in self.config: - label_ids = self.get_label_ids(self.config['labels']) - - pr_data = { - "title": title, - "body": description, - "head": branch_name, - "base": self.default_branch - } - - # Only add labels if we found valid IDs - if label_ids: - pr_data["labels"] = label_ids - - api_url = f"{self.url}/api/v1/repos/{self.repo}/pulls" - - try: - response = requests.post(api_url, json=pr_data, headers=self.headers) - response.raise_for_status() - - pr_info = response.json() - logger.info(f"Successfully created PR #{pr_info['number']}: {pr_info['title']}") - - return { - 'number': pr_info['number'], - 'url': pr_info['html_url'], - 'branch': branch_name, - 'created_at': pr_info['created_at'] - } - - except requests.exceptions.RequestException as e: - logger.error(f"Failed to create PR via API: {e}") - if hasattr(e.response, 'text'): - logger.error(f"Response: {e.response.text}") - return None - - def _setup_git_credentials(self, cwd: str): - """Setup git credentials using credential helper""" - # Configure credential helper - self._run_git_command( - ['git', 'config', '--local', 'credential.helper', 'store'], - cwd=cwd - ) - - # Write credentials file - cred_file = os.path.join(cwd, '.git-credentials') - parsed = urlparse(self.git_url) - cred_url = f"{parsed.scheme}://oauth2:{self.token}@{parsed.netloc}\n" - - with open(cred_file, 'w') as f: - f.write(cred_url) - - os.chmod(cred_file, 0o600) - - def _generate_pr_description(self, srx_config: str) -> str: - """Generate a descriptive PR body""" - config_lines = srx_config.strip().split('\n') - summary = [] - - # Parse configuration to create summary - for line in config_lines: - if 'security-zone' in line and 'address' in line: - summary.append(f"- {line.strip()}") - elif 'application' in line and 'destination-port' in line: - summary.append(f"- {line.strip()}") - - description = f"""## ๐Ÿค– AI-Generated Network Configuration - -This pull request contains network configuration suggestions generated by the AI orchestrator based on traffic analysis from the past 7 days. - -### ๐Ÿ“Š Analysis Summary -- **Analysis Period**: Last 7 days -- **Data Source**: NetFlow/J-Flow from Elasticsearch -- **Generation Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - -### ๐Ÿ”ง Proposed Changes -{chr(10).join(summary[:10]) if summary else 'Various security zone and application updates'} -{'... and more' if len(summary) > 10 else ''} - -### โš ๏ธ Review Required -Please review these suggestions carefully before approving. The AI has analyzed traffic patterns and suggested optimizations, but human validation is essential. - -### ๐Ÿ”„ Deployment -Once approved, these changes will be automatically deployed during the next deployment window (daily at 5 AM). - ---- -*Generated by SRX AI GitOps Orchestrator*""" - - return description - - def get_pr_status(self, pr_number: int) -> Optional[Dict]: - """ - Get the status of a pull request - - Args: - pr_number: PR number to check - - Returns: - Dictionary with PR status info or None - """ - api_url = f"{self.url}/api/v1/repos/{self.repo}/pulls/{pr_number}" - - try: - response = requests.get(api_url, headers=self.headers) - response.raise_for_status() - - pr_data = response.json() - - return { - 'number': pr_data['number'], - 'state': pr_data['state'], # open, closed - 'merged': pr_data['merged'], - 'mergeable': pr_data['mergeable'], - 'title': pr_data['title'], - 'created_at': pr_data['created_at'], - 'updated_at': pr_data['updated_at'] - } - - except requests.exceptions.RequestException as e: - logger.error(f"Failed to get PR status: {e}") - return None - - def get_label_ids(self, label_names: list) -> list: - """ - Get label IDs from label names - - Args: - label_names: List of label names - - Returns: - List of label IDs - """ - api_url = f"{self.url}/api/v1/repos/{self.repo}/labels" - - try: - response = requests.get(api_url, headers=self.headers) - response.raise_for_status() - - labels = response.json() - label_map = {label['name']: label['id'] for label in labels} - - found_ids = [] - for name in label_names: - if name in label_map: - found_ids.append(label_map[name]) - logger.info(f"Found label '{name}' with ID {label_map[name]}") - else: - logger.warning(f"Label '{name}' not found in repository") - - return found_ids - - except requests.exceptions.RequestException as e: - logger.error(f"Failed to get labels: {e}") - return [] diff --git a/scripts/orchestrator/core/gitea_integration_newerer.py b/scripts/orchestrator/core/gitea_integration_newerer.py deleted file mode 100644 index 56d514a..0000000 --- a/scripts/orchestrator/core/gitea_integration_newerer.py +++ /dev/null @@ -1,351 +0,0 @@ -#!/usr/bin/env python3 -""" -Gitea Integration Module for SRX GitOps - Fixed Push Authentication -Handles Git operations and Gitea API interactions -""" -import os -import json -import logging -import tempfile -import shutil -from datetime import datetime -from typing import Dict, Optional, Tuple -import subprocess -import requests -from urllib.parse import urlparse - -logger = logging.getLogger(__name__) - -class GiteaIntegration: - """Handles all Gitea-related operations""" - - def __init__(self, config: Dict): - """ - Initialize Gitea integration - - Args: - config: Dictionary containing: - - url: Gitea instance URL - - token: API token - - repo: repository in format "owner/repo" - - branch: default branch (usually "main") - """ - self.url = config['url'].rstrip('/') - self.token = config['token'] - self.repo = config['repo'] - self.default_branch = config.get('branch', 'main') - - # Parse owner and repo name - self.owner, self.repo_name = self.repo.split('/') - - # Set up API headers - self.headers = { - 'Authorization': f'token {self.token}', - 'Content-Type': 'application/json' - } - - # Git configuration - self.git_url = f"{self.url}/{self.repo}.git" - self.auth_git_url = f"https://oauth2:{self.token}@{urlparse(self.url).netloc}/{self.repo}.git" - - logger.info(f"Initialized Gitea integration for {self.repo}") - - def _run_git_command(self, cmd: list, cwd: str = None) -> Tuple[bool, str]: - """ - Run a git command and return success status and output - - Args: - cmd: List of command arguments - cwd: Working directory - - Returns: - Tuple of (success, output) - """ - try: - # Log the command (but hide token) - safe_cmd = [] - for arg in cmd: - if self.token in arg: - safe_cmd.append(arg.replace(self.token, "***TOKEN***")) - else: - safe_cmd.append(arg) - logger.debug(f"Running git command: {' '.join(safe_cmd)}") - - result = subprocess.run( - cmd, - cwd=cwd, - capture_output=True, - text=True, - check=True - ) - return True, result.stdout - except subprocess.CalledProcessError as e: - safe_cmd = [] - for arg in cmd: - if self.token in arg: - safe_cmd.append(arg.replace(self.token, "***TOKEN***")) - else: - safe_cmd.append(arg) - logger.error(f"Git command failed: {' '.join(safe_cmd)}") - logger.error(f"Error: {e.stderr}") - return False, e.stderr - - def create_pr_with_config(self, srx_config: str, title: str = None, - description: str = None) -> Optional[Dict]: - """ - Create a pull request with SRX configuration - - Args: - srx_config: The SRX configuration content - title: PR title (auto-generated if not provided) - description: PR description (auto-generated if not provided) - - Returns: - PR information dict or None if failed - """ - timestamp = datetime.now().strftime('%Y%m%d-%H%M%S') - branch_name = f"ai-suggestions-{timestamp}" - - # Auto-generate title and description if not provided - if not title: - title = f"AI Network Configuration Suggestions - {datetime.now().strftime('%Y-%m-%d')}" - - if not description: - description = self._generate_pr_description(srx_config) - - # Create temporary directory for git operations - with tempfile.TemporaryDirectory() as temp_dir: - logger.info(f"Working in temporary directory: {temp_dir}") - - # Step 1: Clone the repository with authentication - logger.info("Cloning repository...") - success, output = self._run_git_command( - ['git', 'clone', '--depth', '1', self.auth_git_url, '.'], - cwd=temp_dir - ) - if not success: - logger.error("Failed to clone repository") - return None - - # Step 2: Configure git user - self._run_git_command( - ['git', 'config', 'user.email', 'ai-orchestrator@srx-gitops.local'], - cwd=temp_dir - ) - self._run_git_command( - ['git', 'config', 'user.name', 'AI Orchestrator'], - cwd=temp_dir - ) - - # IMPORTANT: Set the push URL explicitly with authentication - # This ensures push uses the authenticated URL - logger.info("Setting authenticated push URL...") - self._run_git_command( - ['git', 'remote', 'set-url', 'origin', self.auth_git_url], - cwd=temp_dir - ) - - # Step 3: Create and checkout new branch - logger.info(f"Creating branch: {branch_name}") - success, _ = self._run_git_command( - ['git', 'checkout', '-b', branch_name], - cwd=temp_dir - ) - if not success: - logger.error("Failed to create branch") - return None - - # Step 4: Create ai-suggestions directory if it doesn't exist - suggestions_dir = os.path.join(temp_dir, 'ai-suggestions') - os.makedirs(suggestions_dir, exist_ok=True) - - # Step 5: Write configuration file - config_filename = f"suggestion-{timestamp}.conf" - config_path = os.path.join(suggestions_dir, config_filename) - - with open(config_path, 'w') as f: - f.write(f"# AI-Generated SRX Configuration\n") - f.write(f"# Generated: {datetime.now().isoformat()}\n") - f.write(f"# Analysis Period: Last 7 days\n\n") - f.write(srx_config) - - logger.info(f"Created config file: {config_filename}") - - # Step 6: Add and commit changes - self._run_git_command(['git', 'add', '.'], cwd=temp_dir) - - commit_message = f"Add AI-generated configuration suggestions for {datetime.now().strftime('%Y-%m-%d')}" - success, _ = self._run_git_command( - ['git', 'commit', '-m', commit_message], - cwd=temp_dir - ) - if not success: - logger.warning("No changes to commit (file might already exist)") - # Check if we actually have changes - status_success, status_output = self._run_git_command( - ['git', 'status', '--porcelain'], - cwd=temp_dir - ) - if not status_output.strip(): - logger.info("No changes detected, skipping PR creation") - return None - - # Step 7: Push branch - logger.info(f"Pushing branch {branch_name}...") - success, _ = self._run_git_command( - ['git', 'push', '-u', 'origin', branch_name], - cwd=temp_dir - ) - if not success: - logger.error("Failed to push branch") - # Try alternative push command - logger.info("Trying alternative push method...") - success, _ = self._run_git_command( - ['git', 'push', self.auth_git_url, f"{branch_name}:{branch_name}"], - cwd=temp_dir - ) - if not success: - logger.error("All push attempts failed") - return None - - # Step 8: Create pull request via API - logger.info("Creating pull request via Gitea API...") - - # Get label IDs if configured - label_ids = [] - if 'labels' in self.config: - label_ids = self.get_label_ids(self.config['labels']) - - pr_data = { - "title": title, - "body": description, - "head": branch_name, - "base": self.default_branch - } - - # Only add labels if we found valid IDs - if label_ids: - pr_data["labels"] = label_ids - - api_url = f"{self.url}/api/v1/repos/{self.repo}/pulls" - - try: - response = requests.post(api_url, json=pr_data, headers=self.headers) - response.raise_for_status() - - pr_info = response.json() - logger.info(f"Successfully created PR #{pr_info['number']}: {pr_info['title']}") - - return { - 'number': pr_info['number'], - 'url': pr_info['html_url'], - 'branch': branch_name, - 'created_at': pr_info['created_at'] - } - - except requests.exceptions.RequestException as e: - logger.error(f"Failed to create PR via API: {e}") - if hasattr(e.response, 'text'): - logger.error(f"Response: {e.response.text}") - return None - - def _generate_pr_description(self, srx_config: str) -> str: - """Generate a descriptive PR body""" - config_lines = srx_config.strip().split('\n') - summary = [] - - # Parse configuration to create summary - for line in config_lines: - if 'security-zone' in line and 'address' in line: - summary.append(f"- {line.strip()}") - elif 'application' in line and 'destination-port' in line: - summary.append(f"- {line.strip()}") - - description = f"""## ๐Ÿค– AI-Generated Network Configuration - -This pull request contains network configuration suggestions generated by the AI orchestrator based on traffic analysis from the past 7 days. - -### ๐Ÿ“Š Analysis Summary -- **Analysis Period**: Last 7 days -- **Data Source**: NetFlow/J-Flow from Elasticsearch -- **Generation Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - -### ๐Ÿ”ง Proposed Changes -{chr(10).join(summary[:10]) if summary else 'Various security zone and application updates'} -{'... and more' if len(summary) > 10 else ''} - -### โš ๏ธ Review Required -Please review these suggestions carefully before approving. The AI has analyzed traffic patterns and suggested optimizations, but human validation is essential. - -### ๐Ÿ”„ Deployment -Once approved, these changes will be automatically deployed during the next deployment window (daily at 5 AM). - ---- -*Generated by SRX AI GitOps Orchestrator*""" - - return description - - def get_pr_status(self, pr_number: int) -> Optional[Dict]: - """ - Get the status of a pull request - - Args: - pr_number: PR number to check - - Returns: - Dictionary with PR status info or None - """ - api_url = f"{self.url}/api/v1/repos/{self.repo}/pulls/{pr_number}" - - try: - response = requests.get(api_url, headers=self.headers) - response.raise_for_status() - - pr_data = response.json() - - return { - 'number': pr_data['number'], - 'state': pr_data['state'], # open, closed - 'merged': pr_data['merged'], - 'mergeable': pr_data['mergeable'], - 'title': pr_data['title'], - 'created_at': pr_data['created_at'], - 'updated_at': pr_data['updated_at'] - } - - except requests.exceptions.RequestException as e: - logger.error(f"Failed to get PR status: {e}") - return None - - def get_label_ids(self, label_names: list) -> list: - """ - Get label IDs from label names - - Args: - label_names: List of label names - - Returns: - List of label IDs - """ - api_url = f"{self.url}/api/v1/repos/{self.repo}/labels" - - try: - response = requests.get(api_url, headers=self.headers) - response.raise_for_status() - - labels = response.json() - label_map = {label['name']: label['id'] for label in labels} - - found_ids = [] - for name in label_names: - if name in label_map: - found_ids.append(label_map[name]) - logger.info(f"Found label '{name}' with ID {label_map[name]}") - else: - logger.warning(f"Label '{name}' not found in repository") - - return found_ids - - except requests.exceptions.RequestException as e: - logger.error(f"Failed to get labels: {e}") - return [] diff --git a/scripts/orchestrator/core/orchestrator_main_enhanced.py b/scripts/orchestrator/core/orchestrator_main_enhanced.py deleted file mode 100644 index 08aad41..0000000 --- a/scripts/orchestrator/core/orchestrator_main_enhanced.py +++ /dev/null @@ -1,544 +0,0 @@ -#!/usr/bin/env python3 -""" -Enhanced Network AI Orchestrator - Production Version -Compatible with Elasticsearch 7.x -""" -import os -import sys -import json -import time -import logging -import signal -import hashlib -from datetime import datetime, timedelta -from pathlib import Path -from typing import Dict, List, Optional -import yaml -import uuid -import threading - -from elasticsearch import Elasticsearch # Using sync version for ES 7.x -from git import Repo -import requests - -# Configure production logging -def setup_logging(): - """Configure comprehensive logging for production""" - log_dir = Path("/home/netops/orchestrator/logs") - log_dir.mkdir(exist_ok=True) - - log_file = log_dir / f"orchestrator_{datetime.now().strftime('%Y%m%d')}.log" - - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(log_file), - logging.StreamHandler(sys.stdout) - ] - ) - return logging.getLogger(__name__) - -logger = setup_logging() - -class NetworkOrchestrator: - def __init__(self, config_path: str = "/home/netops/orchestrator/config.yaml"): - """Initialize the orchestrator with configuration""" - self.config = self.load_config(config_path) - self.es_client = None - self.git_repo = None - self.running = True - self.shared_dir = Path("/shared/ai-gitops") - self.request_dir = self.shared_dir / "requests" - self.response_dir = self.shared_dir / "responses" - - # Ensure directories exist - self.request_dir.mkdir(parents=True, exist_ok=True) - self.response_dir.mkdir(parents=True, exist_ok=True) - - logger.info("Orchestrator initialized") - - def should_create_pr(self): - """Check if we should create a PR based on schedule and state""" - if not self.config.get('pr_creation', {}).get('enabled', True): - return False - - # Load state - state = self.load_state() - - # Check if pending PR exists - if self.config['pr_creation'].get('skip_if_pending', True): - if state.get('pending_pr', False): - logger.info("Skipping PR - existing PR is pending") - return False - - # Check frequency - frequency = self.config['pr_creation'].get('frequency', 'weekly') - - if frequency == 'weekly': - # Check if it's the right day and hour - now = datetime.now() - target_day = self.config['pr_creation'].get('day_of_week', 'monday') - target_hour = self.config['pr_creation'].get('hour_of_day', 9) - - # Convert day name to number - days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday'] - target_day_num = days.index(target_day.lower()) - - # Check if it's the right day and hour - if now.weekday() != target_day_num or now.hour != target_hour: - return False - - # Check minimum days between PRs - if state.get('last_pr_created'): - last_pr_date = datetime.fromisoformat(state['last_pr_created']) - days_since = (datetime.now() - last_pr_date).days - min_days = self.config['pr_creation'].get('min_days_between', 7) - - if days_since < min_days: - logger.info(f"Skipping PR - only {days_since} days since last PR") - return False - - return True - - def load_state(self): - """Load orchestrator state""" - state_file = self.config.get('state_tracking', {}).get('state_file', '/var/lib/orchestrator/state.json') - if Path(state_file).exists(): - with open(state_file, 'r') as f: - return json.load(f) - return {} - - def save_state(self, updates): - """Save orchestrator state""" - state_file = self.config.get('state_tracking', {}).get('state_file', '/var/lib/orchestrator/state.json') - state = self.load_state() - state.update(updates) - state['last_updated'] = datetime.now().isoformat() - - with open(state_file, 'w') as f: - json.dump(state, f, indent=2) - - def load_config(self, config_path: str) -> Dict: - """Load configuration from YAML file""" - try: - with open(config_path, 'r') as f: - config = yaml.safe_load(f) - logger.info(f"Configuration loaded from {config_path}") - - # Replace environment variables - if 'gitea' in config and 'token' in config['gitea']: - if config['gitea']['token'] == '${GITEA_TOKEN}': - config['gitea']['token'] = os.environ.get('GITEA_TOKEN', '') - - return config - except Exception as e: - logger.error(f"Failed to load config: {e}") - # Use defaults if config fails - return { - 'elasticsearch': { - 'host': 'INTERNAL_IP:9200', - 'index': 'netflow-*' - }, - 'analysis': { - 'interval_minutes': 60, - 'window_hours': 24 - }, - 'gitea': { - 'url': 'https://git.salmutt.dev', - 'repo': 'sal/srx-config', - 'token': os.environ.get('GITEA_TOKEN', '') - } - } - - def setup_elasticsearch(self): - """Initialize Elasticsearch connection (synchronous for ES 7.x)""" - try: - es_config = self.config['elasticsearch'] - self.es_client = Elasticsearch( - hosts=[es_config['host']], - verify_certs=False, - timeout=30 - ) - # Test connection - info = self.es_client.info() - logger.info(f"Connected to Elasticsearch: {info['version']['number']}") - return True - except Exception as e: - logger.error(f"Failed to connect to Elasticsearch: {e}") - self.es_client = None - return False - - def collect_traffic_data(self) -> Dict: - """Collect traffic data from Elasticsearch (synchronous)""" - if not self.es_client: - logger.warning("Elasticsearch not connected, using mock data") - return self.generate_mock_data() - - try: - window_hours = self.config['analysis']['window_hours'] - query = { - "query": { - "range": { - "@timestamp": { - "gte": f"now-{window_hours}h", - "lte": "now" - } - } - }, - "size": 0, - "aggs": { - "top_talkers": { - "terms": { - "field": "source_ip", - "size": 20 - }, - "aggs": { - "bytes": {"sum": {"field": "bytes"}}, - "packets": {"sum": {"field": "packets"}} - } - }, - "protocols": { - "terms": { - "field": "protocol", - "size": 10 - } - }, - "vlans": { - "terms": { - "field": "vlan_id", - "size": 20 - }, - "aggs": { - "bytes": {"sum": {"field": "bytes"}} - } - }, - "hourly_traffic": { - "date_histogram": { - "field": "@timestamp", - "calendar_interval": "hour" - }, - "aggs": { - "bytes": {"sum": {"field": "bytes"}} - } - } - } - } - - result = self.es_client.search( - index=self.config['elasticsearch']['index'], - body=query - ) - - total_hits = result['hits']['total'] - # Handle both ES 6.x and 7.x response formats - if isinstance(total_hits, dict): - total_count = total_hits['value'] - else: - total_count = total_hits - - logger.info(f"Collected traffic data: {total_count} flows") - return result['aggregations'] - - except Exception as e: - logger.error(f"Error collecting traffic data: {e}") - return self.generate_mock_data() - - def generate_mock_data(self) -> Dict: - """Generate mock traffic data for testing""" - return { - "top_talkers": { - "buckets": [ - {"key": "INTERNAL_IP", "doc_count": 15000, - "bytes": {"value": 5000000}, "packets": {"value": 10000}}, - {"key": "INTERNAL_IP", "doc_count": 12000, - "bytes": {"value": 4000000}, "packets": {"value": 8000}}, - {"key": "INTERNAL_IP", "doc_count": 8000, - "bytes": {"value": 2000000}, "packets": {"value": 5000}}, - {"key": "10.0.0.5", "doc_count": 6000, - "bytes": {"value": 1500000}, "packets": {"value": 3000}} - ] - }, - "protocols": { - "buckets": [ - {"key": "tcp", "doc_count": 25000}, - {"key": "udp", "doc_count": 15000}, - {"key": "icmp", "doc_count": 2000} - ] - }, - "vlans": { - "buckets": [ - {"key": 100, "doc_count": 20000, "bytes": {"value": 8000000}}, - {"key": 200, "doc_count": 15000, "bytes": {"value": 6000000}}, - {"key": 300, "doc_count": 5000, "bytes": {"value": 2000000}} - ] - } - } - - def request_ai_analysis(self, traffic_data: Dict) -> Optional[Dict]: - """Send traffic data to AI for analysis""" - request_id = str(uuid.uuid4()) - request_file = self.request_dir / f"{request_id}.json" - - request_data = { - "request_id": request_id, - "timestamp": datetime.now().isoformat(), - "type": "traffic_analysis", - "data": traffic_data, - "prompt": self.build_analysis_prompt(traffic_data) - } - - try: - with open(request_file, 'w') as f: - json.dump(request_data, f, indent=2) - logger.info(f"AI request created: {request_id}") - - # Wait for response (with timeout) - response = self.wait_for_ai_response(request_id, timeout=120) - return response - - except Exception as e: - logger.error(f"Error requesting AI analysis: {e}") - return None - - def build_analysis_prompt(self, traffic_data: Dict) -> str: - """Build prompt for AI analysis""" - prompt = """Analyze this network traffic data and suggest optimizations for a Juniper SRX firewall: - -Traffic Summary: -- Top Talkers: {} -- Active VLANs: {} -- Protocol Distribution: {} - -Based on this data, please provide: -1. Security rule optimizations (as Juniper SRX configuration commands) -2. QoS improvements for high-traffic hosts -3. VLAN segmentation recommendations -4. Potential security concerns or anomalies - -Format your response with specific Juniper SRX configuration commands that can be applied. -Include comments explaining each change.""" - - # Extract key metrics - top_ips = [b['key'] for b in traffic_data.get('top_talkers', {}).get('buckets', [])][:5] - vlans = [str(b['key']) for b in traffic_data.get('vlans', {}).get('buckets', [])][:5] - protocols = [b['key'] for b in traffic_data.get('protocols', {}).get('buckets', [])][:3] - - return prompt.format( - ', '.join(top_ips) if top_ips else 'No data', - ', '.join(vlans) if vlans else 'No VLANs', - ', '.join(protocols) if protocols else 'No protocols' - ) - - def wait_for_ai_response(self, request_id: str, timeout: int = 120) -> Optional[Dict]: - """Wait for AI response file""" - response_file = self.response_dir / f"{request_id}_response.json" - start_time = time.time() - - while time.time() - start_time < timeout: - if response_file.exists(): - try: - time.sleep(1) # Give AI time to finish writing - with open(response_file, 'r') as f: - response = json.load(f) - logger.info(f"AI response received: {request_id}") - - # Log a snippet of the response - if 'response' in response: - snippet = response['response'][:200] + '...' if len(response['response']) > 200 else response['response'] - logger.info(f"AI suggestion snippet: {snippet}") - - # Clean up files - response_file.unlink() - (self.request_dir / f"{request_id}.json").unlink(missing_ok=True) - return response - except Exception as e: - logger.error(f"Error reading AI response: {e}") - return None - time.sleep(2) - - logger.warning(f"AI response timeout for {request_id}") - return None - - def create_gitea_pr(self, ai_response: Dict) -> bool: - """Create pull request in Gitea with suggested changes""" - try: - gitea_config = self.config['gitea'] - - if not gitea_config.get('token'): - logger.error("Gitea token not configured") - return False - - # Extract configuration from AI response - # Use 'suggestions' field if available, fallback to 'response' - config_changes = ai_response.get('suggestions', ai_response.get('response', 'No configuration suggested')) - - # Create a unique branch name - branch_name = f"ai-suggestions-{datetime.now().strftime('%Y%m%d-%H%M%S')}" - - # Format the PR body - pr_data = { - "title": f"AI Network Optimizations - {datetime.now().strftime('%Y-%m-%d %H:%M')}", - "body": f"""## AI-Generated Network Optimizations - -### Analysis Summary -Analysis completed at {datetime.now().isoformat()} - -### Traffic Patterns Analyzed -- Analysis Window: {self.config['analysis']['window_hours']} hours -- Data Source: NetFlow/J-Flow from SRX - -### Proposed Configuration Changes -```junos -{config_changes} -``` - -### Review Instructions -1. Review the proposed changes carefully -2. Test in lab environment if possible -3. Schedule maintenance window if approved -4. Monitor after deployment - -**This PR was automatically generated by the AI Network Orchestrator** -""", - "base": "main", - "head": branch_name - } - - headers = { - "Authorization": f"token {gitea_config['token']}", - "Content-Type": "application/json" - } - - # For now, log what would be sent (since branch creation needs more setup) - logger.info(f"Would create PR with title: {pr_data['title']}") - logger.info(f"Configuration changes proposed: {len(config_changes)} characters") - - # TODO: Implement actual Git operations and PR creation - # This requires cloning the repo, creating branch, committing changes, pushing - - return True - - except Exception as e: - logger.error(f"Error creating Gitea PR: {e}") - return False - - def run_analysis_cycle(self): - """Run a complete analysis cycle""" - logger.info("="*60) - logger.info("Starting traffic analysis cycle") - logger.info("="*60) - - try: - # Always collect traffic data - logger.info("Step 1: Collecting traffic data from Elasticsearch...") - traffic_data = self.collect_traffic_data() - - if not traffic_data: - logger.warning("No traffic data available, skipping analysis") - return - - # Log summary of collected data - top_talkers = traffic_data.get('top_talkers', {}).get('buckets', []) - if top_talkers: - logger.info(f"Found {len(top_talkers)} top talkers") - logger.info(f"Top IP: {top_talkers[0]['key']} with {top_talkers[0]['doc_count']} flows") - - # Always request AI analysis - logger.info("Step 2: Requesting AI analysis...") - ai_response = self.request_ai_analysis(traffic_data) - - # Save state for analysis - self.save_state({ - 'last_analysis_run': datetime.now().isoformat(), - 'last_analysis_data': { - 'top_talkers_count': len(top_talkers), - 'response_received': bool(ai_response) - } - }) - - # Check if we should create PR - if self.should_create_pr(): - if ai_response and (ai_response.get('response') or ai_response.get('suggestions')): - logger.info("Step 3: Creating PR with AI suggestions...") - if self.create_gitea_pr(ai_response): - logger.info("โœ“ PR created successfully") - self.save_state({ - 'last_pr_created': datetime.now().isoformat(), - 'pending_pr': True - }) - else: - logger.warning("Failed to create Gitea PR") - else: - logger.info("No actionable suggestions from AI analysis") - else: - logger.info("Not time for PR creation - analysis data saved for future use") - - except Exception as e: - logger.error(f"Error in analysis cycle: {e}") - - logger.info("="*60) - - def main_loop(self): - """Main orchestrator loop""" - logger.info("Starting Network AI Orchestrator") - - # Setup Elasticsearch connection - if not self.setup_elasticsearch(): - logger.warning("Running without Elasticsearch connection - using mock data") - - interval = self.config['analysis'].get('interval_minutes', 60) * 60 - - # Run first analysis immediately - self.run_analysis_cycle() - - while self.running: - try: - logger.info(f"Next analysis scheduled in {interval/60} minutes") - logger.info(f"Next run at: {(datetime.now() + timedelta(seconds=interval)).strftime('%H:%M:%S')}") - time.sleep(interval) - - if self.running: # Check again after sleep - self.run_analysis_cycle() - - except KeyboardInterrupt: - logger.info("Received keyboard interrupt") - break - except Exception as e: - logger.error(f"Error in main loop: {e}") - time.sleep(60) # Wait before retry - - logger.info("Orchestrator shutdown complete") - - def shutdown(self, signum=None, frame=None): - """Graceful shutdown handler""" - if signum: - logger.info(f"Received signal {signum}, initiating shutdown...") - else: - logger.info("Initiating shutdown...") - self.running = False - if self.es_client: - try: - # Close Elasticsearch connection - self.es_client.transport.close() - except: - pass - -def main(): - """Main entry point""" - orchestrator = NetworkOrchestrator() - - # Set up signal handlers - signal.signal(signal.SIGTERM, orchestrator.shutdown) - signal.signal(signal.SIGINT, orchestrator.shutdown) - - try: - orchestrator.main_loop() - except Exception as e: - logger.error(f"Fatal error: {e}", exc_info=True) - sys.exit(1) - - sys.exit(0) - -if __name__ == "__main__": - main() diff --git a/scripts/orchestrator/core/orchestrator_main_newer.py b/scripts/orchestrator/core/orchestrator_main_newer.py deleted file mode 100644 index 4973db9..0000000 --- a/scripts/orchestrator/core/orchestrator_main_newer.py +++ /dev/null @@ -1,621 +0,0 @@ -#!/usr/bin/env python3 -""" -Enhanced Network AI Orchestrator - Production Version with Gitea Integration -Compatible with Elasticsearch 7.x -""" -import os -import sys -import json -import time -import logging -import signal -import hashlib -from datetime import datetime, timedelta -from pathlib import Path -from typing import Dict, List, Optional -import yaml -import uuid -import threading - -from elasticsearch import Elasticsearch # Using sync version for ES 7.x -from git import Repo -import requests - -# ADD THIS IMPORT - New for Phase 3 -from gitea_integration import GiteaIntegration - -# Configure production logging -def setup_logging(): - """Configure comprehensive logging for production""" - log_dir = Path("/home/netops/orchestrator/logs") - log_dir.mkdir(exist_ok=True) - - log_file = log_dir / f"orchestrator_{datetime.now().strftime('%Y%m%d')}.log" - - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(log_file), - logging.StreamHandler(sys.stdout) - ] - ) - return logging.getLogger(__name__) - -logger = setup_logging() - -class NetworkOrchestrator: - def __init__(self, config_path: str = "/home/netops/orchestrator/config.yaml"): - """Initialize the orchestrator with configuration""" - self.config = self.load_config(config_path) - self.es_client = None - self.git_repo = None - self.running = True - self.shared_dir = Path("/shared/ai-gitops") - self.request_dir = self.shared_dir / "requests" - self.response_dir = self.shared_dir / "responses" - - # ADD THIS - Initialize state for Phase 3 - self.state = self.load_state() - - # Ensure directories exist - self.request_dir.mkdir(parents=True, exist_ok=True) - self.response_dir.mkdir(parents=True, exist_ok=True) - - logger.info("Orchestrator initialized") - - def should_create_pr(self): - """Check if we should create a PR based on schedule and state""" - if not self.config.get('pr_creation', {}).get('enabled', True): - return False - - # Load state - state = self.load_state() - - # Check if pending PR exists - if self.config['pr_creation'].get('skip_if_pending', True): - if state.get('pending_pr', False): - logger.info("Skipping PR - existing PR is pending") - return False - - # Check frequency - frequency = self.config['pr_creation'].get('frequency', 'weekly') - - if frequency == 'weekly': - # Check if it's the right day and hour - now = datetime.now() - target_day = self.config['pr_creation'].get('day_of_week', 'saturday') - target_hour = self.config['pr_creation'].get('hour_of_day', 5) - - # Convert day name to number - days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday'] - target_day_num = days.index(target_day.lower()) - - # Check if it's the right day and hour - if now.weekday() != target_day_num or now.hour != target_hour: - return False - - # Check minimum days between PRs - if state.get('last_pr_created'): - last_pr_date = datetime.fromisoformat(state['last_pr_created']) - days_since = (datetime.now() - last_pr_date).days - min_days = self.config['pr_creation'].get('min_days_between', 7) - - if days_since < min_days: - logger.info(f"Skipping PR - only {days_since} days since last PR") - return False - - return True - - def load_state(self): - """Load orchestrator state""" - state_file = self.config.get('state_tracking', {}).get('state_file', '/var/lib/orchestrator/state.json') - if Path(state_file).exists(): - with open(state_file, 'r') as f: - return json.load(f) - return {} - - def save_state(self, updates): - """Save orchestrator state""" - state_file = self.config.get('state_tracking', {}).get('state_file', '/var/lib/orchestrator/state.json') - state = self.load_state() - state.update(updates) - state['last_updated'] = datetime.now().isoformat() - - with open(state_file, 'w') as f: - json.dump(state, f, indent=2) - - def load_config(self, config_path: str) -> Dict: - """Load configuration from YAML file""" - try: - with open(config_path, 'r') as f: - config = yaml.safe_load(f) - logger.info(f"Configuration loaded from {config_path}") - - # Replace environment variables - if 'gitea' in config and 'token' in config['gitea']: - if config['gitea']['token'] == '${GITEA_TOKEN}': - config['gitea']['token'] = os.environ.get('GITEA_TOKEN', '') - - return config - except Exception as e: - logger.error(f"Failed to load config: {e}") - # Use defaults if config fails - return { - 'elasticsearch': { - 'host': 'INTERNAL_IP:9200', - 'index': 'netflow-*' - }, - 'analysis': { - 'interval_minutes': 60, - 'window_hours': 24 - }, - 'gitea': { - 'url': 'https://git.salmutt.dev', - 'repo': 'sal/srx-config', - 'token': os.environ.get('GITEA_TOKEN', '') - } - } - - def setup_elasticsearch(self): - """Initialize Elasticsearch connection (synchronous for ES 7.x)""" - try: - es_config = self.config['elasticsearch'] - self.es_client = Elasticsearch( - hosts=[es_config['host']], - verify_certs=False, - timeout=30 - ) - # Test connection - info = self.es_client.info() - logger.info(f"Connected to Elasticsearch: {info['version']['number']}") - return True - except Exception as e: - logger.error(f"Failed to connect to Elasticsearch: {e}") - self.es_client = None - return False - - def collect_traffic_data(self) -> Dict: - """Collect traffic data from Elasticsearch (synchronous)""" - if not self.es_client: - logger.warning("Elasticsearch not connected, using mock data") - return self.generate_mock_data() - - try: - window_hours = self.config['analysis']['window_hours'] - query = { - "query": { - "range": { - "@timestamp": { - "gte": f"now-{window_hours}h", - "lte": "now" - } - } - }, - "size": 0, - "aggs": { - "top_talkers": { - "terms": { - "field": "source_ip", - "size": 20 - }, - "aggs": { - "bytes": {"sum": {"field": "bytes"}}, - "packets": {"sum": {"field": "packets"}} - } - }, - "protocols": { - "terms": { - "field": "protocol", - "size": 10 - } - }, - "vlans": { - "terms": { - "field": "vlan_id", - "size": 20 - }, - "aggs": { - "bytes": {"sum": {"field": "bytes"}} - } - }, - "hourly_traffic": { - "date_histogram": { - "field": "@timestamp", - "calendar_interval": "hour" - }, - "aggs": { - "bytes": {"sum": {"field": "bytes"}} - } - } - } - } - - result = self.es_client.search( - index=self.config['elasticsearch']['index'], - body=query - ) - - total_hits = result['hits']['total'] - # Handle both ES 6.x and 7.x response formats - if isinstance(total_hits, dict): - total_count = total_hits['value'] - else: - total_count = total_hits - - logger.info(f"Collected traffic data: {total_count} flows") - return result['aggregations'] - - except Exception as e: - logger.error(f"Error collecting traffic data: {e}") - return self.generate_mock_data() - - def generate_mock_data(self) -> Dict: - """Generate mock traffic data for testing""" - return { - "top_talkers": { - "buckets": [ - {"key": "INTERNAL_IP", "doc_count": 15000, - "bytes": {"value": 5000000}, "packets": {"value": 10000}}, - {"key": "INTERNAL_IP", "doc_count": 12000, - "bytes": {"value": 4000000}, "packets": {"value": 8000}}, - {"key": "INTERNAL_IP", "doc_count": 8000, - "bytes": {"value": 2000000}, "packets": {"value": 5000}}, - {"key": "10.0.0.5", "doc_count": 6000, - "bytes": {"value": 1500000}, "packets": {"value": 3000}} - ] - }, - "protocols": { - "buckets": [ - {"key": "tcp", "doc_count": 25000}, - {"key": "udp", "doc_count": 15000}, - {"key": "icmp", "doc_count": 2000} - ] - }, - "vlans": { - "buckets": [ - {"key": 100, "doc_count": 20000, "bytes": {"value": 8000000}}, - {"key": 200, "doc_count": 15000, "bytes": {"value": 6000000}}, - {"key": 300, "doc_count": 5000, "bytes": {"value": 2000000}} - ] - } - } - - def request_ai_analysis(self, traffic_data: Dict) -> Optional[Dict]: - """Send traffic data to AI for analysis""" - request_id = str(uuid.uuid4()) - request_file = self.request_dir / f"{request_id}.json" - - request_data = { - "request_id": request_id, - "timestamp": datetime.now().isoformat(), - "type": "traffic_analysis", - "data": traffic_data, - "prompt": self.build_analysis_prompt(traffic_data) - } - - try: - with open(request_file, 'w') as f: - json.dump(request_data, f, indent=2) - logger.info(f"AI request created: {request_id}") - - # Wait for response (with timeout) - response = self.wait_for_ai_response(request_id, timeout=120) - return response - - except Exception as e: - logger.error(f"Error requesting AI analysis: {e}") - return None - - def build_analysis_prompt(self, traffic_data: Dict) -> str: - """Build prompt for AI analysis""" - prompt = """Analyze this network traffic data and suggest optimizations for a Juniper SRX firewall: - -Traffic Summary: -- Top Talkers: {} -- Active VLANs: {} -- Protocol Distribution: {} - -Based on this data, please provide: -1. Security rule optimizations (as Juniper SRX configuration commands) -2. QoS improvements for high-traffic hosts -3. VLAN segmentation recommendations -4. Potential security concerns or anomalies - -Format your response with specific Juniper SRX configuration commands that can be applied. -Include comments explaining each change.""" - - # Extract key metrics - top_ips = [b['key'] for b in traffic_data.get('top_talkers', {}).get('buckets', [])][:5] - vlans = [str(b['key']) for b in traffic_data.get('vlans', {}).get('buckets', [])][:5] - protocols = [b['key'] for b in traffic_data.get('protocols', {}).get('buckets', [])][:3] - - return prompt.format( - ', '.join(top_ips) if top_ips else 'No data', - ', '.join(vlans) if vlans else 'No VLANs', - ', '.join(protocols) if protocols else 'No protocols' - ) - - def wait_for_ai_response(self, request_id: str, timeout: int = 120) -> Optional[Dict]: - """Wait for AI response file""" - response_file = self.response_dir / f"{request_id}_response.json" - start_time = time.time() - - while time.time() - start_time < timeout: - if response_file.exists(): - try: - time.sleep(1) # Give AI time to finish writing - with open(response_file, 'r') as f: - response = json.load(f) - logger.info(f"AI response received: {request_id}") - - # Log a snippet of the response - if 'response' in response: - snippet = response['response'][:200] + '...' if len(response['response']) > 200 else response['response'] - logger.info(f"AI suggestion snippet: {snippet}") - - # Clean up files - response_file.unlink() - (self.request_dir / f"{request_id}.json").unlink(missing_ok=True) - return response - except Exception as e: - logger.error(f"Error reading AI response: {e}") - return None - time.sleep(2) - - logger.warning(f"AI response timeout for {request_id}") - return None - - # REPLACE THE EXISTING create_gitea_pr METHOD WITH THIS ENHANCED VERSION - def create_gitea_pr(self, ai_response: Dict = None) -> bool: - """Create pull request in Gitea with suggested changes""" - try: - # If no AI response provided, get the latest one - if not ai_response: - latest_suggestion = self._get_latest_ai_suggestion() - if not latest_suggestion: - logger.warning("No AI suggestions found to create PR") - return False - - # Read the suggestion file - with open(latest_suggestion['path'], 'r') as f: - ai_response = json.load(f) - - # Check if we should create a PR - if not self.should_create_pr(): - logger.info("Skipping PR creation - conditions not met") - return False - - # Check for existing pending PR - if self.state.get('pending_pr'): - logger.info(f"Skipping PR creation - pending PR exists: {self.state['pending_pr']}") - return False - - logger.info("Creating Gitea pull request with AI suggestions...") - - # Initialize Gitea integration - gitea = GiteaIntegration(self.config['gitea']) - - # Extract the SRX configuration - srx_config = ai_response.get('suggestions', ai_response.get('response', '')) - if not srx_config or srx_config.strip() == '': - logger.warning("Empty or invalid suggestions, skipping PR creation") - return False - - # Create the PR - pr_info = gitea.create_pr_with_config( - srx_config=srx_config, - title=f"AI Network Configuration Suggestions - {datetime.now().strftime('%B %d, %Y')}", - description=None # Will auto-generate - ) - - if pr_info: - # Update state with PR information - self.state['pending_pr'] = pr_info['number'] - self.state['last_pr_created'] = datetime.now().isoformat() - self.state['pr_url'] = pr_info['url'] - self.save_state(self.state) - - logger.info(f"Successfully created PR #{pr_info['number']}: {pr_info['url']}") - - # Log to a separate file for notifications/monitoring - with open('/var/log/orchestrator/pr_created.log', 'a') as f: - f.write(f"{datetime.now().isoformat()} - Created PR #{pr_info['number']} - {pr_info['url']}\n") - - return True - else: - logger.error("Failed to create PR in Gitea") - return False - - except Exception as e: - logger.error(f"Error creating Gitea PR: {e}", exc_info=True) - return False - - # ADD THIS NEW METHOD - def _get_latest_ai_suggestion(self) -> Optional[Dict]: - """Get the most recent AI suggestion file""" - response_dir = '/shared/ai-gitops/responses' - - try: - # List all response files - response_files = [] - for filename in os.listdir(response_dir): - if filename.startswith('response_') and filename.endswith('.json'): - filepath = os.path.join(response_dir, filename) - # Get file modification time - mtime = os.path.getmtime(filepath) - response_files.append({ - 'path': filepath, - 'filename': filename, - 'mtime': mtime - }) - - if not response_files: - return None - - # Sort by modification time and get the latest - response_files.sort(key=lambda x: x['mtime'], reverse=True) - return response_files[0] - - except Exception as e: - logger.error(f"Error finding latest AI suggestion: {e}") - return None - - # ADD THIS NEW METHOD - def check_pr_status(self): - """Check the status of pending pull requests""" - if not self.state.get('pending_pr'): - return - - try: - gitea = GiteaIntegration(self.config['gitea']) - pr_status = gitea.get_pr_status(self.state['pending_pr']) - - if pr_status: - logger.info(f"PR #{pr_status['number']} status: {pr_status['state']} (merged: {pr_status['merged']})") - - # If PR is closed or merged, clear the pending_pr flag - if pr_status['state'] == 'closed': - logger.info(f"PR #{pr_status['number']} has been closed") - self.state['pending_pr'] = None - self.state['last_pr_status'] = 'closed' - self.state['last_pr_closed'] = datetime.now().isoformat() - - if pr_status['merged']: - self.state['last_pr_status'] = 'merged' - logger.info(f"PR #{pr_status['number']} was merged!") - # Mark for deployment - self.state['pending_deployment'] = True - self.state['deployment_pr'] = pr_status['number'] - - self.save_state(self.state) - - except Exception as e: - logger.error(f"Error checking PR status: {e}") - - def run_analysis_cycle(self): - """Run a complete analysis cycle""" - logger.info("="*60) - logger.info("Starting traffic analysis cycle") - logger.info("="*60) - - try: - # Always collect traffic data - logger.info("Step 1: Collecting traffic data from Elasticsearch...") - traffic_data = self.collect_traffic_data() - - if not traffic_data: - logger.warning("No traffic data available, skipping analysis") - return - - # Log summary of collected data - top_talkers = traffic_data.get('top_talkers', {}).get('buckets', []) - if top_talkers: - logger.info(f"Found {len(top_talkers)} top talkers") - logger.info(f"Top IP: {top_talkers[0]['key']} with {top_talkers[0]['doc_count']} flows") - - # Always request AI analysis - logger.info("Step 2: Requesting AI analysis...") - ai_response = self.request_ai_analysis(traffic_data) - - # Save state for analysis - self.save_state({ - 'last_analysis_run': datetime.now().isoformat(), - 'last_analysis_data': { - 'top_talkers_count': len(top_talkers), - 'response_received': bool(ai_response) - } - }) - - # Check if we should create PR - if self.should_create_pr(): - if ai_response and (ai_response.get('response') or ai_response.get('suggestions')): - logger.info("Step 3: Creating PR with AI suggestions...") - if self.create_gitea_pr(ai_response): - logger.info("โœ“ PR created successfully") - else: - logger.warning("Failed to create Gitea PR") - else: - logger.info("No actionable suggestions from AI analysis") - else: - logger.info("Not time for PR creation - analysis data saved for future use") - - except Exception as e: - logger.error(f"Error in analysis cycle: {e}") - - logger.info("="*60) - - def main_loop(self): - """Main orchestrator loop""" - logger.info("Starting Network AI Orchestrator") - - # Setup Elasticsearch connection - if not self.setup_elasticsearch(): - logger.warning("Running without Elasticsearch connection - using mock data") - - interval = self.config['analysis'].get('interval_minutes', 60) * 60 - - # Run first analysis immediately - self.run_analysis_cycle() - - while self.running: - try: - logger.info(f"Next analysis scheduled in {interval/60} minutes") - logger.info(f"Next run at: {(datetime.now() + timedelta(seconds=interval)).strftime('%H:%M:%S')}") - - # MODIFIED: Check PR status every 15 minutes during the wait - for i in range(int(interval / 60)): # Check every minute - if not self.running: - break - - time.sleep(60) - - # Check PR status every 15 minutes - if i % 15 == 14 and self.state.get('pending_pr'): - logger.info("Checking PR status...") - self.check_pr_status() - - if self.running: # Check again after sleep - self.run_analysis_cycle() - - except KeyboardInterrupt: - logger.info("Received keyboard interrupt") - break - except Exception as e: - logger.error(f"Error in main loop: {e}") - time.sleep(60) # Wait before retry - - logger.info("Orchestrator shutdown complete") - - def shutdown(self, signum=None, frame=None): - """Graceful shutdown handler""" - if signum: - logger.info(f"Received signal {signum}, initiating shutdown...") - else: - logger.info("Initiating shutdown...") - self.running = False - if self.es_client: - try: - # Close Elasticsearch connection - self.es_client.transport.close() - except: - pass - -def main(): - """Main entry point""" - orchestrator = NetworkOrchestrator() - - # Set up signal handlers - signal.signal(signal.SIGTERM, orchestrator.shutdown) - signal.signal(signal.SIGINT, orchestrator.shutdown) - - try: - orchestrator.main_loop() - except Exception as e: - logger.error(f"Fatal error: {e}", exc_info=True) - sys.exit(1) - - sys.exit(0) - -if __name__ == "__main__": - main() diff --git a/scripts/orchestrator/core/pipeline_status_enhanced.py b/scripts/orchestrator/core/pipeline_status_enhanced.py deleted file mode 100644 index 94008b6..0000000 --- a/scripts/orchestrator/core/pipeline_status_enhanced.py +++ /dev/null @@ -1,204 +0,0 @@ -#!/usr/bin/env python3 -""" -SRX AI GitOps Pipeline Status Monitor - Final Version -Shows the complete status of the automation pipeline -""" -import os -import json -import yaml -from datetime import datetime, timedelta -from tabulate import tabulate -import requests -import subprocess - -class PipelineMonitor: - def __init__(self): - # Load configuration - with open('/home/netops/orchestrator/config.yaml', 'r') as f: - self.config = yaml.safe_load(f) - - # Load state - state_file = self.config.get('state_tracking', {}).get('state_file', '/var/lib/orchestrator/state.json') - if os.path.exists(state_file): - try: - with open(state_file, 'r') as f: - self.state = json.load(f) - except: - self.state = {} - else: - self.state = {} - - def check_services(self): - """Check if services are running""" - status = [] - - # Check local orchestrator - try: - result = subprocess.run( - ['systemctl', 'is-active', 'orchestrator.service'], - capture_output=True, - text=True - ) - is_active = result.stdout.strip() == 'active' - status.append(['Orchestrator (Local)', 'โœ… Active' if is_active else 'โŒ Inactive']) - except: - status.append(['Orchestrator (Local)', 'โ“ Unknown']) - - # Check AI Processor by looking at recent activity - ai_status = self.check_ai_processor_activity() - status.append(['AI Processor (INTERNAL_IP)', ai_status]) - - # Check deployment timer - try: - result = subprocess.run( - ['systemctl', 'is-active', 'srx-deployment.timer'], - capture_output=True, - text=True - ) - is_active = result.stdout.strip() == 'active' - status.append(['Deployment Timer', 'โœ… Active' if is_active else 'โŒ Not configured']) - except: - status.append(['Deployment Timer', 'โ“ Unknown']) - - return status - - def check_ai_processor_activity(self): - """Check AI processor activity through shared files and state""" - # Check if we've had recent AI responses - if self.state.get('last_analysis_data', {}).get('response_received'): - last_analysis = self.state.get('last_analysis_run', '') - if last_analysis: - try: - last_time = datetime.fromisoformat(last_analysis) - if datetime.now() - last_time < timedelta(hours=2): - return 'โœ… Active (Recent activity)' - except: - pass - - # Check response directory - response_dir = '/shared/ai-gitops/responses' - if os.path.exists(response_dir): - files = os.listdir(response_dir) - if len(files) > 0: - return 'โœ… Active (Has responses)' - - # Check if requests are pending - request_dir = '/shared/ai-gitops/requests' - if os.path.exists(request_dir): - files = os.listdir(request_dir) - if len(files) > 0: - return 'โณ Processing requests' - - return '๐Ÿ’ค Idle' - - def check_pr_status(self): - """Check current PR status""" - if self.state.get('pending_pr'): - pr_num = self.state['pending_pr'] - return f"PR #{pr_num} - Pending Review" - else: - return "No pending PR" - - def get_next_events(self): - """Calculate next scheduled events""" - now = datetime.now() - - # Next analysis (hourly) - next_analysis = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) - - # Next PR creation (Saturday 5 AM) - days_until_saturday = (5 - now.weekday()) % 7 - if days_until_saturday == 0 and now.hour >= 5: - days_until_saturday = 7 - next_pr = now.replace(hour=5, minute=0, second=0, microsecond=0) - next_pr += timedelta(days=days_until_saturday) - - # Next deployment (Daily 5 AM) - next_deploy = now.replace(hour=5, minute=0, second=0, microsecond=0) - if now.hour >= 5: - next_deploy += timedelta(days=1) - - return [ - ['Next Analysis', next_analysis.strftime('%Y-%m-%d %H:%M')], - ['Next PR Creation', next_pr.strftime('%Y-%m-%d %H:%M')], - ['Next Deployment Check', next_deploy.strftime('%Y-%m-%d %H:%M')] - ] - - def get_recent_activity(self): - """Get recent pipeline activity""" - activity = [] - - # Last analysis - if self.state.get('last_analysis_run'): - try: - last_analysis = datetime.fromisoformat(self.state['last_analysis_run']) - activity.append(['Last Analysis', last_analysis.strftime('%Y-%m-%d %H:%M')]) - - # Check if AI responded - if self.state.get('last_analysis_data', {}).get('response_received'): - activity.append(['AI Response', 'โœ… Received']) - else: - activity.append(['AI Response', 'โŒ Not received']) - except: - pass - - # Last PR created - if self.state.get('last_pr_created'): - try: - last_pr = datetime.fromisoformat(self.state['last_pr_created']) - activity.append(['Last PR Created', last_pr.strftime('%Y-%m-%d %H:%M')]) - except: - pass - - # Last deployment - if self.state.get('last_successful_deployment'): - try: - last_deploy = datetime.fromisoformat(self.state['last_successful_deployment']) - activity.append(['Last Deployment', last_deploy.strftime('%Y-%m-%d %H:%M')]) - except: - pass - - return activity if activity else [['Status', 'No recent activity']] - - def display_status(self): - """Display complete pipeline status""" - print("\n" + "="*60) - print("๐Ÿš€ SRX AI GitOps Pipeline Status") - print("="*60) - print(f"Current Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - - print("\n๐Ÿ“Š Service Status:") - print(tabulate(self.check_services(), headers=['Service', 'Status'])) - - print("\n๐Ÿ”„ Current State:") - print(f"PR Status: {self.check_pr_status()}") - print(f"Pending Deployment: {'Yes' if self.state.get('pending_deployment') else 'No'}") - - print("\n๐Ÿ“… Scheduled Events:") - print(tabulate(self.get_next_events(), headers=['Event', 'Time'])) - - print("\n๐Ÿ“œ Recent Activity:") - print(tabulate(self.get_recent_activity(), headers=['Event', 'Details'])) - - print("\n๐Ÿ’พ Data Locations:") - print("Requests: /shared/ai-gitops/requests/") - print("Responses: /shared/ai-gitops/responses/") - print("Approved: /shared/ai-gitops/approved/") - print("Deployed: /shared/ai-gitops/deployed/") - - print("\n๐Ÿ—๏ธ Architecture:") - print("Orchestrator VM: INTERNAL_IP (this VM)") - print("AI Processor VM: INTERNAL_IP") - print("Elasticsearch VM: INTERNAL_IP") - print("Gitea Server: git.salmutt.dev") - - print("\n๐Ÿ“‹ Pipeline Flow:") - print("1. Every 60 min โ†’ Analyze traffic โ†’ Generate suggestions") - print("2. Saturday 5 AM โ†’ Create PR if suggestions exist") - print("3. Manual โ†’ Review and approve/reject PR") - print("4. Daily 5 AM โ†’ Deploy approved configurations") - print("="*60 + "\n") - -if __name__ == "__main__": - monitor = PipelineMonitor() - monitor.display_status() diff --git a/scripts/orchestrator/core/test_context.py b/scripts/orchestrator/core/test_context.py deleted file mode 100755 index 6ec3ca7..0000000 --- a/scripts/orchestrator/core/test_context.py +++ /dev/null @@ -1,193 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script to verify context-aware AI processing -Run this before full deployment to ensure everything works -""" -import json -import time -from pathlib import Path -from datetime import datetime - -def create_test_context(focus_area="security"): - """Create a test context file""" - context = { - "timestamp": datetime.now().isoformat(), - "focus_area": focus_area, - "skip_basic": True, - "existing_features": [ - "zones_configured", - "gaming_optimizations", - "vpn_configured" - ], - "priority_features": [], - "instructions": [ - "DO NOT suggest basic connectivity policies", - "DO NOT suggest any/any/any rules", - f"FOCUS on {focus_area} optimizations" - ] - } - - # Add focus-specific priorities - if focus_area == "security": - context["priority_features"] = [ - "rate_limiting", - "ddos_protection", - "ids_ips_rules" - ] - elif focus_area == "performance": - context["priority_features"] = [ - "qos_policies", - "traffic_shaping", - "bandwidth_management" - ] - elif focus_area == "monitoring": - context["priority_features"] = [ - "syslog_enhancements", - "snmp_traps", - "flow_analytics" - ] - - # Save context - context_dir = Path('/shared/ai-gitops/context') - context_dir.mkdir(parents=True, exist_ok=True) - - context_file = context_dir / 'current_context.json' - with open(context_file, 'w') as f: - json.dump(context, f, indent=2) - - print(f"โœ… Created context file for {focus_area}") - return context_file - -def create_test_request(): - """Create a test analysis request""" - request = { - "request_id": f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}", - "timestamp": datetime.now().isoformat(), - "data": { - "top_talkers": { - "buckets": [ - {"key": "INTERNAL_IP", "doc_count": 1000}, - {"key": "INTERNAL_IP", "doc_count": 800} - ] - }, - "vlans": {"buckets": []}, - "protocols": {"buckets": []} - } - } - - # Save request - request_dir = Path('/shared/ai-gitops/requests') - request_dir.mkdir(parents=True, exist_ok=True) - - request_file = request_dir / f"{request['request_id']}.json" - with open(request_file, 'w') as f: - json.dump(request, f, indent=2) - - print(f"โœ… Created test request: {request['request_id']}") - return request['request_id'] - -def check_response(request_id, focus_area): - """Check if response was generated with correct context""" - response_dir = Path('/shared/ai-gitops/responses') - response_file = response_dir / f"{request_id}_response.json" - - # Wait for response (max 30 seconds) - for i in range(30): - if response_file.exists(): - with open(response_file, 'r') as f: - response = json.load(f) - - print(f"\nโœ… Response generated!") - print(f" Focus area: {response.get('focus_area', 'unknown')}") - print(f" Feedback aware: {response.get('feedback_aware', False)}") - - # Check if context was applied - if response.get('focus_area') == focus_area: - print(f" โœ… Context correctly applied: {focus_area}") - else: - print(f" โŒ Context mismatch! Expected: {focus_area}, Got: {response.get('focus_area')}") - - # Show sample of suggestions - suggestions = response.get('suggestions', '').split('\n')[:5] - print(f"\n Sample suggestions:") - for line in suggestions: - if line.strip(): - print(f" {line}") - - return True - - time.sleep(1) - print(f" Waiting for response... ({i+1}/30)") - - print(f"โŒ No response generated after 30 seconds") - return False - -def run_test(focus_area="security"): - """Run a complete test cycle""" - print(f"\n{'='*60}") - print(f"Testing Context System - Focus: {focus_area.upper()}") - print(f"{'='*60}") - - # Step 1: Create context - context_file = create_test_context(focus_area) - - # Step 2: Create request - request_id = create_test_request() - - # Step 3: Check if AI processor is running - print("\nโณ Waiting for AI processor to pick up request...") - print(" (Make sure ai_processor.py is running)") - - # Step 4: Check response - success = check_response(request_id, focus_area) - - if success: - print(f"\n๐ŸŽ‰ Test PASSED for {focus_area} context!") - else: - print(f"\nโŒ Test FAILED for {focus_area} context") - print("\nTroubleshooting:") - print("1. Is ai_processor.py running?") - print("2. Check logs: tail -f /var/log/ai-processor/ai-processor.log") - print("3. Verify Ollama is running: curl http://localhost:11434/api/tags") - - return success - -def main(): - """Main test function""" - print("๐Ÿงช AI Context System Test Suite") - print("================================") - - import argparse - parser = argparse.ArgumentParser(description='Test context-aware AI processing') - parser.add_argument('--focus', - choices=['security', 'performance', 'monitoring', 'automation'], - default='security', - help='Focus area to test') - parser.add_argument('--all', - action='store_true', - help='Test all focus areas') - - args = parser.parse_args() - - if args.all: - # Test all focus areas - areas = ['security', 'performance', 'monitoring', 'automation'] - results = {} - - for area in areas: - results[area] = run_test(area) - time.sleep(5) # Wait between tests - - # Summary - print(f"\n{'='*60}") - print("TEST SUMMARY") - print(f"{'='*60}") - for area, result in results.items(): - status = "โœ… PASSED" if result else "โŒ FAILED" - print(f"{area.capitalize():15} {status}") - else: - # Test single focus area - run_test(args.focus) - -if __name__ == "__main__": - main() diff --git a/scripts/orchestrator/core/test_feedback_loop.py b/scripts/orchestrator/core/test_feedback_loop.py deleted file mode 100755 index 529b947..0000000 --- a/scripts/orchestrator/core/test_feedback_loop.py +++ /dev/null @@ -1,364 +0,0 @@ -#!/usr/bin/env python3 -""" -Complete Feedback Loop Test -Tests creating a PR, rejecting it with feedback, and AI learning from it -""" -import os -import sys -import json -import time -import subprocess -from datetime import datetime -from pathlib import Path -import requests - -class FeedbackLoopTester: - def __init__(self): - self.shared_dir = Path('/shared/ai-gitops') - self.feedback_dir = self.shared_dir / 'feedback' - self.pending_prs_dir = self.shared_dir / 'pending_prs' - self.responses_dir = self.shared_dir / 'responses' - - # Ensure feedback directory exists - self.feedback_dir.mkdir(parents=True, exist_ok=True) - - # Load config for Gitea (if exists) - self.config_file = Path('/home/netops/orchestrator/config.yaml') - if self.config_file.exists(): - import yaml - with open(self.config_file, 'r') as f: - self.config = yaml.safe_load(f) - else: - self.config = {} - - def step1_create_test_pr_data(self): - """Create a test PR with intentionally problematic config""" - print("\n" + "="*60) - print("STEP 1: Creating Test PR with Problematic Config") - print("="*60) - - # Create intentionally bad config for testing - bad_suggestions = """# TEST: Intentionally problematic config for feedback testing -# This should be rejected for security reasons - -# โŒ BAD: Any/Any/Any rule (security risk) -set security policies from-zone trust to-zone untrust policy ALLOW-ALL match source-address any -set security policies from-zone trust to-zone untrust policy ALLOW-ALL match destination-address any -set security policies from-zone trust to-zone untrust policy ALLOW-ALL match application any -set security policies from-zone trust to-zone untrust policy ALLOW-ALL then permit - -# โŒ BAD: No logging enabled -set security policies from-zone dmz to-zone untrust policy DMZ-OUT match source-address any -set security policies from-zone dmz to-zone untrust policy DMZ-OUT match destination-address any -set security policies from-zone dmz to-zone untrust policy DMZ-OUT then permit - -# โŒ BAD: Overly permissive IoT access -set security policies from-zone IOT to-zone HOME policy IOT-ACCESS match source-address any -set security policies from-zone IOT to-zone HOME policy IOT-ACCESS match destination-address any -set security policies from-zone IOT to-zone HOME policy IOT-ACCESS match application any -set security policies from-zone IOT to-zone HOME policy IOT-ACCESS then permit""" - - pr_data = { - 'pr_number': f'TEST-{datetime.now().strftime("%Y%m%d-%H%M%S")}', - 'title': 'TEST: AI Network Optimization for Feedback Testing', - 'description': 'This is a test PR with intentionally problematic config to test feedback learning', - 'suggestions': bad_suggestions, - 'timestamp': datetime.now().isoformat(), - 'test_pr': True, - 'expected_rejection_reasons': [ - 'Any/any/any rule detected', - 'No logging enabled', - 'IoT to HOME unrestricted access' - ] - } - - # Save test PR - pr_file = self.pending_prs_dir / f"test_pr_{pr_data['pr_number']}.json" - self.pending_prs_dir.mkdir(parents=True, exist_ok=True) - - with open(pr_file, 'w') as f: - json.dump(pr_data, f, indent=2) - - print(f"โœ… Created test PR: {pr_file.name}") - print("\n๐Ÿ“‹ Problematic configurations included:") - for reason in pr_data['expected_rejection_reasons']: - print(f" โŒ {reason}") - - return pr_data - - def step2_simulate_pr_rejection(self, pr_data): - """Simulate rejecting the PR with specific feedback""" - print("\n" + "="*60) - print("STEP 2: Simulating PR Rejection with Feedback") - print("="*60) - - rejection_feedback = { - 'pr_number': pr_data['pr_number'], - 'timestamp': datetime.now().isoformat(), - 'feedback_type': 'rejected', - 'reviewer': 'security_team', - 'details': { - 'reason': 'Security policy violations detected', - 'specific_issues': 'Multiple any/any/any rules found which violate zero-trust principles', - 'configuration_issues': [ - { - 'line': 'policy ALLOW-ALL match source-address any', - 'issue': 'Never use source-address any in permit rules', - 'type': 'security', - 'severity': 'critical' - }, - { - 'line': 'policy DMZ-OUT then permit', - 'issue': 'No logging enabled for DMZ traffic', - 'type': 'security', - 'severity': 'high' - }, - { - 'line': 'from-zone IOT to-zone HOME', - 'issue': 'IoT devices should never have unrestricted access to HOME zone', - 'type': 'security', - 'severity': 'critical' - } - ], - 'recommendations': [ - 'Use specific address-sets instead of any', - 'Always enable logging with "then log session-init"', - 'IoT devices should only access specific services, not entire zones', - 'Implement proper zone segmentation' - ] - } - } - - print(f"๐Ÿ“ PR Number: {rejection_feedback['pr_number']}") - print(f"โŒ Status: REJECTED") - print(f"๐Ÿ‘ค Reviewer: {rejection_feedback['reviewer']}") - print(f"\n๐Ÿ“‹ Issues identified:") - - for issue in rejection_feedback['details']['configuration_issues']: - print(f" โ€ข {issue['issue']}") - print(f" Severity: {issue['severity'].upper()}") - - return rejection_feedback - - def step3_save_feedback(self, feedback): - """Save feedback to the feedback history file""" - print("\n" + "="*60) - print("STEP 3: Saving Feedback to History") - print("="*60) - - feedback_file = self.feedback_dir / 'pr_feedback_history.json' - - # Load existing feedback if exists - if feedback_file.exists(): - with open(feedback_file, 'r') as f: - feedback_history = json.load(f) - print(f"๐Ÿ“‚ Loaded existing feedback history ({len(feedback_history)} entries)") - else: - feedback_history = [] - print("๐Ÿ“‚ Creating new feedback history") - - # Add new feedback - feedback_history.append(feedback) - - # Save updated history - with open(feedback_file, 'w') as f: - json.dump(feedback_history, f, indent=2) - - print(f"โœ… Saved feedback to: {feedback_file}") - print(f"๐Ÿ“Š Total feedback entries: {len(feedback_history)}") - - # Count types - rejected = len([f for f in feedback_history if f.get('feedback_type') == 'rejected']) - approved = len([f for f in feedback_history if f.get('feedback_type') == 'approved']) - - print(f" โ€ข Rejected: {rejected}") - print(f" โ€ข Approved: {approved}") - - return feedback_file - - def step4_trigger_new_ai_request(self): - """Create a new AI request to test if it learned from feedback""" - print("\n" + "="*60) - print("STEP 4: Creating New AI Request to Test Learning") - print("="*60) - - # Create a new request that should avoid the rejected patterns - test_request = { - 'request_id': f'feedback_test_{datetime.now().strftime("%Y%m%d_%H%M%S")}', - 'timestamp': datetime.now().isoformat(), - 'type': 'feedback_test', - 'data': { - 'message': 'Testing if AI learned from rejection feedback', - 'zones_to_configure': ['IOT', 'HOME', 'DMZ'], - 'requirements': [ - 'Configure IoT to HOME access', - 'Configure DMZ outbound rules', - 'Ensure security best practices' - ] - }, - 'context': { - 'test_feedback_learning': True, - 'previous_rejection': True - } - } - - request_file = self.shared_dir / 'requests' / f"{test_request['request_id']}.json" - request_file.parent.mkdir(parents=True, exist_ok=True) - - with open(request_file, 'w') as f: - json.dump(test_request, f, indent=2) - - print(f"โœ… Created test request: {request_file.name}") - print(f" Request ID: {test_request['request_id']}") - print("\n๐ŸŽฏ This request specifically asks for:") - for req in test_request['data']['requirements']: - print(f" โ€ข {req}") - - print("\nโณ AI should now avoid the mistakes from the rejection...") - - return test_request['request_id'] - - def step5_wait_and_verify_learning(self, request_id, timeout=150): - """Wait for AI response and verify it learned from feedback""" - print("\n" + "="*60) - print("STEP 5: Waiting for AI Response and Verifying Learning") - print("="*60) - - response_file = self.responses_dir / f"{request_id}_response.json" - start_time = time.time() - - print(f"โณ Waiting for AI response (timeout: {timeout}s)...") - - # Wait for response - while time.time() - start_time < timeout: - if response_file.exists(): - print(f"โœ… Response received after {int(time.time() - start_time)} seconds") - break - - if int(time.time() - start_time) % 20 == 0 and time.time() - start_time > 0: - print(f" ... still waiting ({int(time.time() - start_time)}s elapsed)") - - time.sleep(2) - else: - print(f"โŒ Timeout waiting for response") - return False - - # Analyze response - with open(response_file, 'r') as f: - response = json.load(f) - - print(f"\n๐Ÿ“‹ AI Response Analysis:") - print(f" Model: {response.get('model')}") - print(f" Feedback aware: {response.get('feedback_aware')}") - - suggestions = response.get('suggestions', '') - - # Check if AI avoided the mistakes - print("\n๐Ÿ” Checking if AI learned from feedback:") - - learned_correctly = True - checks = [ - ('source-address any', 'Still using "any" in source-address', False), - ('destination-address any', 'Still using "any" in destination-address', False), - ('application any', 'Still using "any" in application', False), - ('then log', 'Now includes logging', True), - ('address-set', 'Uses address-sets', True), - ('specific', 'Uses specific addresses/applications', True) - ] - - for pattern, description, should_exist in checks: - if should_exist: - if pattern in suggestions.lower(): - print(f" โœ… LEARNED: {description}") - else: - print(f" โŒ NOT LEARNED: {description}") - learned_correctly = False - else: - if pattern not in suggestions.lower(): - print(f" โœ… AVOIDED: Not {description}") - else: - print(f" โŒ MISTAKE: {description}") - learned_correctly = False - - # Show sample of new suggestions - print("\n๐Ÿ“ Sample of new AI suggestions:") - print("-" * 50) - for line in suggestions.split('\n')[:10]: - if line.strip(): - print(f" {line}") - print("-" * 50) - - return learned_correctly - - def run_complete_test(self): - """Run the complete feedback loop test""" - print("\n" + "="*70) - print(" ๐Ÿ”„ COMPLETE FEEDBACK LOOP TEST") - print("="*70) - print("\nThis test will:") - print("1. Create a PR with intentionally bad config") - print("2. Simulate rejection with specific feedback") - print("3. Save feedback for AI learning") - print("4. Create new request to test learning") - print("5. Verify AI avoided previous mistakes") - - input("\nPress Enter to start the test...") - - # Run all steps - pr_data = self.step1_create_test_pr_data() - feedback = self.step2_simulate_pr_rejection(pr_data) - self.step3_save_feedback(feedback) - request_id = self.step4_trigger_new_ai_request() - learned = self.step5_wait_and_verify_learning(request_id) - - # Final summary - print("\n" + "="*70) - print(" ๐Ÿ“Š FEEDBACK LOOP TEST RESULTS") - print("="*70) - - if learned: - print("\n๐ŸŽ‰ SUCCESS! The AI learned from the rejection feedback!") - print("\nThe AI now:") - print(" โœ… Avoids any/any/any rules") - print(" โœ… Includes logging in policies") - print(" โœ… Uses specific address-sets") - print(" โœ… Implements proper zone segmentation") - else: - print("\nโš ๏ธ PARTIAL SUCCESS - AI needs more training") - print("\nRecommendations:") - print(" โ€ข Add more rejected examples") - print(" โ€ข Adjust the prompt in ai_processor.py") - print(" โ€ข Consider using a larger model") - - print("\n๐Ÿ“ Files created during test:") - print(f" โ€ข Test PR: {self.pending_prs_dir}/test_pr_*.json") - print(f" โ€ข Feedback: {self.feedback_dir}/pr_feedback_history.json") - print(f" โ€ข AI Response: {self.responses_dir}/{request_id}_response.json") - - return learned - -def main(): - """Main entry point""" - tester = FeedbackLoopTester() - - # Check if AI processor is running - result = subprocess.run(['pgrep', '-f', 'ai_processor.py'], - capture_output=True, text=True) - if not result.stdout: - print("โš ๏ธ AI processor not running on AI VM") - print(" Start it with: ssh netops@INTERNAL_IP") - print(" Then: sudo systemctl start ai-processor") - response = input("\nContinue anyway? (y/n): ") - if response.lower() != 'y': - return - - # Run the test - success = tester.run_complete_test() - - if success: - print("\nโœ… Your feedback learning system is working correctly!") - else: - print("\nโš ๏ธ Review the feedback and adjust as needed") - -if __name__ == "__main__": - main() diff --git a/scripts/orchestrator/core/test_git_auth.py b/scripts/orchestrator/core/test_git_auth.py deleted file mode 100755 index 5018960..0000000 --- a/scripts/orchestrator/core/test_git_auth.py +++ /dev/null @@ -1,90 +0,0 @@ -#!/usr/bin/env python3 -"""Test Git authentication for Gitea""" -import subprocess -import logging - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -# Your Gitea configuration -GITEA_URL = "https://git.salmutt.dev" -GITEA_TOKEN = "da3be18aad877edb94e896e6c1e7c449581444420" -REPO = "sal/srx-config" - -def test_auth_methods(): - """Test different authentication methods""" - - git_url = f"{GITEA_URL}/{REPO}.git" - - # Method 1: oauth2 format - print("\n1. Testing oauth2 authentication format...") - auth_url = f"https://oauth2:{GITEA_TOKEN}@git.salmutt.dev/{REPO}.git" - try: - result = subprocess.run( - ['git', 'ls-remote', auth_url, 'HEAD'], - capture_output=True, - text=True - ) - if result.returncode == 0: - print("โœ… OAuth2 authentication successful!") - print(f" HEAD: {result.stdout.strip()}") - return True - else: - print("โŒ OAuth2 authentication failed") - print(f" Error: {result.stderr}") - except Exception as e: - print(f"โŒ OAuth2 test error: {e}") - - # Method 2: Direct token format - print("\n2. Testing direct token authentication...") - auth_url = f"https://{GITEA_TOKEN}@git.salmutt.dev/{REPO}.git" - try: - result = subprocess.run( - ['git', 'ls-remote', auth_url, 'HEAD'], - capture_output=True, - text=True - ) - if result.returncode == 0: - print("โœ… Direct token authentication successful!") - print(f" HEAD: {result.stdout.strip()}") - return True - else: - print("โŒ Direct token authentication failed") - print(f" Error: {result.stderr}") - except Exception as e: - print(f"โŒ Direct token test error: {e}") - - # Method 3: Username:token format (using 'git' as username) - print("\n3. Testing username:token format...") - auth_url = f"https://git:{GITEA_TOKEN}@git.salmutt.dev/{REPO}.git" - try: - result = subprocess.run( - ['git', 'ls-remote', auth_url, 'HEAD'], - capture_output=True, - text=True - ) - if result.returncode == 0: - print("โœ… Username:token authentication successful!") - print(f" HEAD: {result.stdout.strip()}") - return True - else: - print("โŒ Username:token authentication failed") - print(f" Error: {result.stderr}") - except Exception as e: - print(f"โŒ Username:token test error: {e}") - - return False - -if __name__ == "__main__": - print("Testing Gitea authentication methods...") - print(f"Repository: {REPO}") - print(f"Token: {GITEA_TOKEN[:10]}..." + "*" * (len(GITEA_TOKEN) - 10)) - - if test_auth_methods(): - print("\nโœ… At least one authentication method works!") - else: - print("\nโŒ All authentication methods failed") - print("\nPlease verify:") - print("1. The token is correct and has appropriate permissions") - print("2. The repository exists and is accessible") - print("3. Network connectivity to Gitea is working") diff --git a/scripts/orchestrator/core/test_pr_creation.py b/scripts/orchestrator/core/test_pr_creation.py deleted file mode 100755 index 78d2b4b..0000000 --- a/scripts/orchestrator/core/test_pr_creation.py +++ /dev/null @@ -1,26 +0,0 @@ -import yaml -from gitea_integration import GiteaIntegration - -# Load your config -with open('/home/netops/orchestrator/config.yaml', 'r') as f: - config = yaml.safe_load(f) - -# Test PR creation -gitea = GiteaIntegration(config['gitea']) - -# Test with sample config -test_config = """# Test configuration -set security zones security-zone DMZ address 192.168.50.0/24 -set applications application TEST-APP destination-port 8080""" - -pr_info = gitea.create_pr_with_config( - srx_config=test_config, - title="Test PR - Please Close", - description="This is a test PR to verify Gitea integration" -) - -if pr_info: - print(f"Success! Created PR #{pr_info['number']}") - print(f"URL: {pr_info['url']}") -else: - print("Failed to create PR") diff --git a/scripts/orchestrator/core/test_pr_schedule.py b/scripts/orchestrator/core/test_pr_schedule.py deleted file mode 100644 index 07e6447..0000000 --- a/scripts/orchestrator/core/test_pr_schedule.py +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env python3 -import sys -sys.path.append('/home/netops/orchestrator') -from orchestrator_main import NetworkOrchestrator -from datetime import datetime - -orch = NetworkOrchestrator() -print(f"Current time: {datetime.now()}") -print(f"Current day: {datetime.now().strftime('%A')}") -print(f"Should create PR: {orch.should_create_pr()}") -print(f"State: {orch.load_state()}") diff --git a/scripts/orchestrator/core/test_request.py b/scripts/orchestrator/core/test_request.py deleted file mode 100755 index eacf30e..0000000 --- a/scripts/orchestrator/core/test_request.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env python3 -""" -Create a simple test request for the AI processor -Run this on the orchestrator VM to create a test request -""" -import json -from datetime import datetime -from pathlib import Path - -print("Creating test request for AI processor...") - -# Create test request -request_data = { - 'request_id': f'test_{datetime.now().strftime("%Y%m%d_%H%M%S")}', - 'timestamp': datetime.now().isoformat(), - 'type': 'test_request', - 'data': { - 'top_talkers': { - 'buckets': [ - {'key': 'INTERNAL_IP', 'doc_count': 1000}, - {'key': '192.168.10.100', 'doc_count': 500}, - {'key': '10.0.1.25', 'doc_count': 250} - ] - }, - 'message': 'This is a test request to verify AI processor is working' - }, - 'context': { - 'config_available': True, - 'analysis_available': True, - 'test': True - } -} - -# Save request -request_dir = Path('/shared/ai-gitops/requests') -request_dir.mkdir(parents=True, exist_ok=True) - -request_file = request_dir / f"{request_data['request_id']}.json" -with open(request_file, 'w') as f: - json.dump(request_data, f, indent=2) - -print(f"โœ… Test request created: {request_file}") -print(f" Request ID: {request_data['request_id']}") -print("\nNow check if AI processor picks it up:") -print(" 1. Wait 10-20 seconds") -print(" 2. Check for response: ls -la /shared/ai-gitops/responses/") -print(f" 3. Look for: {request_data['request_id']}_response.json") diff --git a/scripts/orchestrator/core/test_simple_push.py b/scripts/orchestrator/core/test_simple_push.py deleted file mode 100644 index 0c5a0d7..0000000 --- a/scripts/orchestrator/core/test_simple_push.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python3 -import subprocess -import tempfile -import os - -TOKEN = "da3be18aad877edb94e896e6c1e7c449581444420" -REPO_URL = f"https://oauth2:{TOKEN}@git.salmutt.dev/sal/srx-config.git" - -print("Testing simple git push...") - -with tempfile.TemporaryDirectory() as tmpdir: - print(f"Working in: {tmpdir}") - - # Clone - print("1. Cloning...") - result = subprocess.run(['git', 'clone', '--depth', '1', REPO_URL, '.'], - cwd=tmpdir, capture_output=True, text=True) - if result.returncode != 0: - print(f"Clone failed: {result.stderr}") - exit(1) - print("โœ… Clone successful") - - # Configure git - subprocess.run(['git', 'config', 'user.email', 'test@example.com'], cwd=tmpdir) - subprocess.run(['git', 'config', 'user.name', 'Test User'], cwd=tmpdir) - - # Set push URL explicitly - print("2. Setting push URL...") - subprocess.run(['git', 'remote', 'set-url', 'origin', REPO_URL], cwd=tmpdir) - - # Create test branch - test_branch = "test-push-permissions" - print(f"3. Creating branch {test_branch}...") - subprocess.run(['git', 'checkout', '-b', test_branch], cwd=tmpdir) - - # Create a test file - test_file = os.path.join(tmpdir, 'test-permissions.txt') - with open(test_file, 'w') as f: - f.write("Testing push permissions\n") - - # Add and commit - subprocess.run(['git', 'add', '.'], cwd=tmpdir) - subprocess.run(['git', 'commit', '-m', 'Test push permissions'], cwd=tmpdir) - - # Try to push - print("4. Attempting push...") - result = subprocess.run(['git', 'push', '-u', 'origin', test_branch], - cwd=tmpdir, capture_output=True, text=True) - - if result.returncode == 0: - print("โœ… Push successful! Token has write permissions.") - print(" You may want to delete the test branch from Gitea") - else: - print(f"โŒ Push failed: {result.stderr}") diff --git a/scripts/orchestrator/core/test_split_architecture.py b/scripts/orchestrator/core/test_split_architecture.py deleted file mode 100755 index 019e023..0000000 --- a/scripts/orchestrator/core/test_split_architecture.py +++ /dev/null @@ -1,118 +0,0 @@ -#!/usr/bin/env python3 -""" -Simple test script to verify the split architecture communication -Run this on Orchestrator VM to test AI processing -""" - -import json -import time -from pathlib import Path -from datetime import datetime - -REQUEST_DIR = Path("/shared/ai-gitops/requests") -RESPONSE_DIR = Path("/shared/ai-gitops/responses") - -def test_ai_communication(): - """Test basic communication with AI VM""" - print("Testing Split Architecture Communication") - print("=" * 50) - - # Test 1: Simple message - print("\nTest 1: Simple message exchange") - request_id = f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}" - request_file = REQUEST_DIR / f"{request_id}.json" - - request_data = { - "type": "test", - "message": "Hello AI, please confirm you received this", - "timestamp": datetime.now().isoformat() - } - - print(f"Sending request: {request_id}") - with open(request_file, 'w') as f: - json.dump(request_data, f, indent=2) - - # Wait for response - response_file = RESPONSE_DIR / f"{request_id}.json" - print("Waiting for response...", end="") - - for i in range(30): # Wait up to 30 seconds - if response_file.exists(): - print(" Received!") - with open(response_file, 'r') as f: - response = json.load(f) - print(f"Response: {json.dumps(response, indent=2)}") - response_file.unlink() - break - print(".", end="", flush=True) - time.sleep(1) - else: - print(" Timeout!") - return False - - # Test 2: Traffic analysis request - print("\nTest 2: Simulated traffic analysis") - request_id = f"analyze_{datetime.now().strftime('%Y%m%d_%H%M%S')}" - request_file = REQUEST_DIR / f"{request_id}.json" - - request_data = { - "type": "analyze_traffic", - "traffic_data": { - "top_sources": [ - {"ip": "INTERNAL_IP", "bytes": 1000000}, - {"ip": "INTERNAL_IP", "bytes": 500000} - ], - "top_ports": [443, 80, 22, 3389] - }, - "srx_context": { - "zones": ["trust", "untrust", "dmz"], - "rule_count": 15 - } - } - - print(f"Sending analysis request: {request_id}") - with open(request_file, 'w') as f: - json.dump(request_data, f, indent=2) - - # Wait for response - response_file = RESPONSE_DIR / f"{request_id}.json" - print("Waiting for AI analysis...", end="") - - for i in range(60): # Wait up to 60 seconds for analysis - if response_file.exists(): - print(" Received!") - with open(response_file, 'r') as f: - response = json.load(f) - - if response.get("status") == "success": - print("AI Analysis completed successfully!") - print(f"Analysis preview: {response.get('analysis', '')[:200]}...") - else: - print(f"Analysis failed: {response}") - - response_file.unlink() - break - print(".", end="", flush=True) - time.sleep(1) - else: - print(" Timeout!") - return False - - print("\n" + "=" * 50) - print("All tests completed successfully!") - return True - -if __name__ == "__main__": - # Make sure AI processor is running on AI VM first - print("Make sure ai_processor.py is running on the AI VM!") - print("Press Ctrl+C to cancel, or Enter to continue...") - try: - input() - except KeyboardInterrupt: - print("\nCancelled") - exit(1) - - if test_ai_communication(): - print("\nโœ… Split architecture is working correctly!") - else: - print("\nโŒ Communication test failed") diff --git a/scripts/orchestrator/core/validate_latest.py b/scripts/orchestrator/core/validate_latest.py deleted file mode 100644 index f65af64..0000000 --- a/scripts/orchestrator/core/validate_latest.py +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python3 -import json -from pathlib import Path - -# Get latest response -responses = sorted(Path('/shared/ai-gitops/responses').glob('pipeline_*.json'), - key=lambda x: x.stat().st_mtime, reverse=True) - -if responses: - with open(responses[0], 'r') as f: - data = json.load(f) - - print(f"Checking: {responses[0].name}") - print(f"Feedback aware: {data.get('feedback_aware')}") - - suggestions = data.get('suggestions', '') - - # Count issues - dest_any_count = suggestions.count('destination-address any') - src_any_count = suggestions.count('source-address any') - app_any_count = suggestions.count('application any') - log_count = suggestions.count('then log') - - print(f"\n๐Ÿ“Š Analysis:") - print(f" source-address any: {src_any_count} {'โŒ' if src_any_count else 'โœ…'}") - print(f" destination-address any: {dest_any_count} {'โŒ' if dest_any_count else 'โœ…'}") - print(f" application any: {app_any_count} {'โŒ' if app_any_count else 'โœ…'}") - print(f" logging statements: {log_count} {'โœ…' if log_count else 'โŒ'}") - - if dest_any_count > 0: - print(f"\nโš ๏ธ AI still needs to learn about destination-address!") - print(" Add more feedback and update the prompt") - else: - print(f"\nโœ… AI has learned to avoid any/any/any!")