Remove test and development scripts from production docs

This commit is contained in:
2025-09-04 19:35:35 +00:00
parent d2fed2cc03
commit ab9a661551
16 changed files with 0 additions and 3384 deletions

View File

@@ -1,210 +0,0 @@
#!/usr/bin/env python3
"""
Force Deployment - Manually deploy approved configurations to SRX
"""
import os
import json
import yaml
import time
import paramiko
from datetime import datetime
from pathlib import Path
class ManualDeployment:
def __init__(self):
# Load configuration
with open('/home/netops/orchestrator/config.yaml', 'r') as f:
self.config = yaml.safe_load(f)
self.srx_config = self.config['srx']
# Load state to check for merged PRs
state_file = '/var/lib/orchestrator/state.json'
with open(state_file, 'r') as f:
self.state = json.load(f)
def get_latest_merged_config(self):
"""Find the most recent merged configuration"""
# Check if there's a recently merged PR
if self.state.get('last_pr_status') == 'merged':
pr_number = self.state.get('deployment_pr')
print(f"✅ Found merged PR #{pr_number}")
# For now, we'll use the latest response as the config
# In production, this would fetch from the merged PR
response_dir = '/shared/ai-gitops/responses'
if os.path.exists(response_dir):
files = sorted(os.listdir(response_dir), key=lambda x: os.path.getmtime(os.path.join(response_dir, x)))
if files:
latest_file = os.path.join(response_dir, files[-1])
with open(latest_file, 'r') as f:
data = json.load(f)
return data.get('suggestions', data.get('response', ''))
return None
def connect_to_srx(self):
"""Establish SSH connection to SRX"""
print(f"\n🔌 Connecting to SRX at {self.srx_config['host']}...")
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Connect using SSH key
client.connect(
hostname=self.srx_config['host'],
username=self.srx_config['username'],
key_filename=self.srx_config['ssh_key'],
port=22
)
print("✅ Connected to SRX successfully")
return client
except Exception as e:
print(f"❌ Failed to connect: {e}")
return None
def deploy_config(self, client, config_text):
"""Deploy configuration to SRX with commit confirmed"""
print("\n📤 Deploying configuration to SRX...")
try:
# Enter configuration mode
stdin, stdout, stderr = client.exec_command('configure')
time.sleep(1)
# Load the configuration
print("Loading configuration...")
config_lines = config_text.strip().split('\n')
for line in config_lines:
if line.strip() and not line.startswith('#'):
stdin, stdout, stderr = client.exec_command(f'configure\n{line}')
result = stdout.read().decode()
if 'error' in result.lower():
print(f"⚠️ Error with command: {line}")
print(f" {result}")
# Commit with confirmed (2 minute timeout)
print("\n🔄 Committing configuration with 2-minute confirmation timeout...")
stdin, stdout, stderr = client.exec_command('configure\ncommit confirmed 2\nexit')
commit_result = stdout.read().decode()
if 'commit complete' in commit_result.lower():
print("✅ Configuration committed (pending confirmation)")
# Wait a bit to test connectivity
print("⏳ Testing configuration (30 seconds)...")
time.sleep(30)
# If we're still connected, confirm the commit
print("✅ Configuration appears stable, confirming commit...")
stdin, stdout, stderr = client.exec_command('configure\ncommit\nexit')
confirm_result = stdout.read().decode()
if 'commit complete' in confirm_result.lower():
print("✅ Configuration confirmed and saved!")
return True
else:
print("❌ Failed to confirm configuration")
return False
else:
print("❌ Initial commit failed")
print(commit_result)
return False
except Exception as e:
print(f"❌ Deployment error: {e}")
return False
def create_deployment_record(self, success, config_text):
"""Record the deployment attempt"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if success:
deploy_dir = '/shared/ai-gitops/deployed'
filename = f"deployed_{timestamp}.conf"
else:
deploy_dir = '/shared/ai-gitops/failed'
filename = f"failed_{timestamp}.conf"
os.makedirs(deploy_dir, exist_ok=True)
filepath = os.path.join(deploy_dir, filename)
with open(filepath, 'w') as f:
f.write(f"# Deployment {'SUCCESS' if success else 'FAILED'}\n")
f.write(f"# Timestamp: {datetime.now().isoformat()}\n")
f.write(f"# SRX: {self.srx_config['host']}\n\n")
f.write(config_text)
print(f"\n📝 Deployment record saved to: {filepath}")
def run(self):
"""Run the manual deployment"""
print("\n🚀 MANUAL DEPLOYMENT TO SRX")
print("="*60)
# Get configuration to deploy
print("\n📋 Looking for approved configuration...")
config_text = self.get_latest_merged_config()
if not config_text:
print("❌ No approved configuration found")
print("\nMake sure you have:")
print("1. Created a PR using force_pipeline_test.py")
print("2. Merged the PR in Gitea")
return False
print("\n📄 Configuration to deploy:")
print("-"*40)
print(config_text[:500] + "..." if len(config_text) > 500 else config_text)
print("-"*40)
print("\n⚠️ WARNING: This will apply configuration to your production SRX!")
print("The configuration will auto-rollback after 2 minutes if not confirmed.")
print("\nDo you want to continue? (yes/no): ", end="")
response = input().strip().lower()
if response != 'yes':
print("Deployment cancelled.")
return False
# Connect to SRX
client = self.connect_to_srx()
if not client:
return False
try:
# Deploy the configuration
success = self.deploy_config(client, config_text)
# Record the deployment
self.create_deployment_record(success, config_text)
# Update state
if success:
self.state['last_successful_deployment'] = datetime.now().isoformat()
self.state['pending_deployment'] = False
else:
self.state['last_failed_deployment'] = datetime.now().isoformat()
with open('/var/lib/orchestrator/state.json', 'w') as f:
json.dump(self.state, f, indent=2)
return success
finally:
client.close()
print("\n🔌 Disconnected from SRX")
def main():
deployment = ManualDeployment()
if deployment.run():
print("\n✅ Deployment completed successfully!")
else:
print("\n❌ Deployment failed")
if __name__ == "__main__":
main()

View File

@@ -1,133 +0,0 @@
#!/usr/bin/env python3
"""
Force Pipeline Test - Manual trigger for complete pipeline cycle
This will analyze all data and create a PR immediately
"""
import os
import sys
import json
import yaml
from datetime import datetime
import time
# Add orchestrator directory to path
sys.path.append('/home/netops/orchestrator')
from orchestrator_main import NetworkOrchestrator
from gitea_integration import GiteaIntegration
def force_pr_creation():
"""Force the creation of a PR with all accumulated data"""
print("🚀 Starting forced pipeline test...")
print("="*60)
# Load configuration
with open('/home/netops/orchestrator/config.yaml', 'r') as f:
config = yaml.safe_load(f)
# Initialize orchestrator
orchestrator = NetworkOrchestrator('/home/netops/orchestrator/config.yaml')
print("\n📊 Step 1: Setting up Elasticsearch connection...")
orchestrator.setup_elasticsearch()
print("\n📈 Step 2: Collecting all available traffic data...")
# Temporarily override the analysis window to get ALL data
original_window = orchestrator.config['analysis']['window_hours']
orchestrator.config['analysis']['window_hours'] = 168 # 7 days of data
traffic_data = orchestrator.collect_traffic_data()
if not traffic_data:
print("❌ No traffic data available")
return False
# Show summary of collected data
top_talkers = traffic_data.get('top_talkers', {}).get('buckets', [])
print(f"✅ Collected data summary:")
print(f" - Top talkers: {len(top_talkers)} IPs")
print(f" - VLANs: {len(traffic_data.get('vlans', {}).get('buckets', []))}")
print(f" - Protocols: {len(traffic_data.get('protocols', {}).get('buckets', []))}")
print("\n🤖 Step 3: Requesting AI analysis...")
ai_response = orchestrator.request_ai_analysis(traffic_data)
if not ai_response:
print("❌ Failed to get AI response")
return False
print("✅ AI analysis complete")
# Save the analysis to state
orchestrator.save_state({
'last_analysis_run': datetime.now().isoformat(),
'last_analysis_data': {
'top_talkers_count': len(top_talkers),
'response_received': True
}
})
print("\n📝 Step 4: Creating PR in Gitea...")
# Force PR creation by temporarily overriding the schedule check
# Save original should_create_pr method
original_should_create = orchestrator.should_create_pr
# Override to always return True
orchestrator.should_create_pr = lambda: True
# Clear any pending PR flag
state = orchestrator.load_state()
if 'pending_pr' in state:
del state['pending_pr']
orchestrator.save_state({'pending_pr': None})
# Create PR
success = orchestrator.create_gitea_pr(ai_response)
# Restore original method
orchestrator.should_create_pr = original_should_create
if success:
print("\n✅ PR created successfully!")
# Get the PR number from state
state = orchestrator.load_state()
pr_number = state.get('pending_pr')
pr_url = state.get('pr_url')
print(f"\n🔗 PR Details:")
print(f" - PR Number: #{pr_number}")
print(f" - URL: {pr_url}")
print(f"\n📋 Next Steps:")
print(f" 1. Review the PR at: {pr_url}")
print(f" 2. Click 'Merge Pull Request' to approve")
print(f" 3. Run: python3 force_deployment.py")
return True
else:
print("❌ Failed to create PR")
return False
def main():
"""Main function"""
print("\n🔬 FORCE PIPELINE TEST")
print("This will:")
print("1. Analyze all traffic data from the past week")
print("2. Generate AI suggestions")
print("3. Create a PR in Gitea immediately")
print("\nDo you want to continue? (yes/no): ", end="")
response = input().strip().lower()
if response != 'yes':
print("Cancelled.")
return
# Run the test
if force_pr_creation():
print("\n✅ Pipeline test successful!")
else:
print("\n❌ Pipeline test failed")
if __name__ == "__main__":
main()

View File

@@ -1,384 +0,0 @@
#!/usr/bin/env python3
"""
Gitea Integration Module for SRX GitOps - Fixed Authentication
Handles Git operations and Gitea API interactions
"""
import os
import json
import logging
import tempfile
import shutil
from datetime import datetime
from typing import Dict, Optional, Tuple
import subprocess
import requests
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
class GiteaIntegration:
"""Handles all Gitea-related operations"""
def __init__(self, config: Dict):
"""
Initialize Gitea integration
Args:
config: Dictionary containing:
- url: Gitea instance URL
- token: API token
- repo: repository in format "owner/repo"
- branch: default branch (usually "main")
"""
self.url = config['url'].rstrip('/')
self.token = config['token']
self.repo = config['repo']
self.default_branch = config.get('branch', 'main')
# Parse owner and repo name
self.owner, self.repo_name = self.repo.split('/')
# Set up API headers
self.headers = {
'Authorization': f'token {self.token}',
'Content-Type': 'application/json'
}
# Git configuration - Fix authentication format
self.git_url = f"{self.url}/{self.repo}.git"
logger.info(f"Initialized Gitea integration for {self.repo}")
def _run_git_command(self, cmd: list, cwd: str = None) -> Tuple[bool, str]:
"""
Run a git command and return success status and output
Args:
cmd: List of command arguments
cwd: Working directory
Returns:
Tuple of (success, output)
"""
try:
# Create a copy of the command to modify
auth_cmd = cmd.copy()
# Add authentication to git commands that need it
if any(action in cmd for action in ['clone', 'push', 'pull', 'fetch']):
for i, arg in enumerate(auth_cmd):
if arg.startswith('http'):
# Gitea supports multiple auth formats, let's use oauth2
parsed = urlparse(arg)
# Try oauth2 format which is commonly supported
auth_url = f"{parsed.scheme}://oauth2:{self.token}@{parsed.netloc}{parsed.path}"
auth_cmd[i] = auth_url
break
# Also set up git credentials via environment
env = os.environ.copy()
env['GIT_ASKPASS'] = 'echo'
env['GIT_USERNAME'] = 'oauth2'
env['GIT_PASSWORD'] = self.token
result = subprocess.run(
auth_cmd,
cwd=cwd,
capture_output=True,
text=True,
check=True,
env=env
)
return True, result.stdout
except subprocess.CalledProcessError as e:
logger.error(f"Git command failed: {' '.join(cmd)}")
logger.error(f"Error: {e.stderr}")
return False, e.stderr
def test_authentication(self) -> bool:
"""Test if Git authentication is working"""
try:
logger.info("Testing Git authentication...")
# Try to list remote refs
success, output = self._run_git_command(
['git', 'ls-remote', self.git_url, 'HEAD']
)
if success:
logger.info("Git authentication successful")
return True
else:
logger.error("Git authentication failed")
return False
except Exception as e:
logger.error(f"Authentication test error: {e}")
return False
def create_pr_with_config(self, srx_config: str, title: str = None,
description: str = None) -> Optional[Dict]:
"""
Create a pull request with SRX configuration
Args:
srx_config: The SRX configuration content
title: PR title (auto-generated if not provided)
description: PR description (auto-generated if not provided)
Returns:
PR information dict or None if failed
"""
# First test authentication
if not self.test_authentication():
logger.error("Git authentication test failed, aborting PR creation")
return None
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
branch_name = f"ai-suggestions-{timestamp}"
# Auto-generate title and description if not provided
if not title:
title = f"AI Network Configuration Suggestions - {datetime.now().strftime('%Y-%m-%d')}"
if not description:
description = self._generate_pr_description(srx_config)
# Create temporary directory for git operations
with tempfile.TemporaryDirectory() as temp_dir:
logger.info(f"Working in temporary directory: {temp_dir}")
# Step 1: Clone the repository
logger.info("Cloning repository...")
success, output = self._run_git_command(
['git', 'clone', '--depth', '1', self.git_url, '.'],
cwd=temp_dir
)
if not success:
logger.error("Failed to clone repository")
# Try alternative authentication method
logger.info("Trying alternative clone method...")
# Use git credential helper
self._setup_git_credentials(temp_dir)
success, output = self._run_git_command(
['git', 'clone', '--depth', '1', self.git_url, '.'],
cwd=temp_dir
)
if not success:
logger.error("All clone attempts failed")
return None
# Step 2: Configure git user
self._run_git_command(
['git', 'config', 'user.email', 'ai-orchestrator@srx-gitops.local'],
cwd=temp_dir
)
self._run_git_command(
['git', 'config', 'user.name', 'AI Orchestrator'],
cwd=temp_dir
)
# Step 3: Create and checkout new branch
logger.info(f"Creating branch: {branch_name}")
success, _ = self._run_git_command(
['git', 'checkout', '-b', branch_name],
cwd=temp_dir
)
if not success:
logger.error("Failed to create branch")
return None
# Step 4: Create ai-suggestions directory if it doesn't exist
suggestions_dir = os.path.join(temp_dir, 'ai-suggestions')
os.makedirs(suggestions_dir, exist_ok=True)
# Step 5: Write configuration file
config_filename = f"suggestion-{timestamp}.conf"
config_path = os.path.join(suggestions_dir, config_filename)
with open(config_path, 'w') as f:
f.write(f"# AI-Generated SRX Configuration\n")
f.write(f"# Generated: {datetime.now().isoformat()}\n")
f.write(f"# Analysis Period: Last 7 days\n\n")
f.write(srx_config)
logger.info(f"Created config file: {config_filename}")
# Step 6: Add and commit changes
self._run_git_command(['git', 'add', '.'], cwd=temp_dir)
commit_message = f"Add AI-generated configuration suggestions for {datetime.now().strftime('%Y-%m-%d')}"
success, _ = self._run_git_command(
['git', 'commit', '-m', commit_message],
cwd=temp_dir
)
if not success:
logger.error("Failed to commit changes")
return None
# Step 7: Push branch
logger.info(f"Pushing branch {branch_name}...")
success, _ = self._run_git_command(
['git', 'push', 'origin', branch_name],
cwd=temp_dir
)
if not success:
logger.error("Failed to push branch")
return None
# Step 8: Create pull request via API
logger.info("Creating pull request via Gitea API...")
# First, get the label IDs if configured
label_ids = []
if 'labels' in self.config:
label_ids = self.get_label_ids(self.config['labels'])
pr_data = {
"title": title,
"body": description,
"head": branch_name,
"base": self.default_branch
}
# Only add labels if we found valid IDs
if label_ids:
pr_data["labels"] = label_ids
api_url = f"{self.url}/api/v1/repos/{self.repo}/pulls"
try:
response = requests.post(api_url, json=pr_data, headers=self.headers)
response.raise_for_status()
pr_info = response.json()
logger.info(f"Successfully created PR #{pr_info['number']}: {pr_info['title']}")
return {
'number': pr_info['number'],
'url': pr_info['html_url'],
'branch': branch_name,
'created_at': pr_info['created_at']
}
except requests.exceptions.RequestException as e:
logger.error(f"Failed to create PR via API: {e}")
if hasattr(e.response, 'text'):
logger.error(f"Response: {e.response.text}")
return None
def _setup_git_credentials(self, cwd: str):
"""Setup git credentials using credential helper"""
# Configure credential helper
self._run_git_command(
['git', 'config', '--local', 'credential.helper', 'store'],
cwd=cwd
)
# Write credentials file
cred_file = os.path.join(cwd, '.git-credentials')
parsed = urlparse(self.git_url)
cred_url = f"{parsed.scheme}://oauth2:{self.token}@{parsed.netloc}\n"
with open(cred_file, 'w') as f:
f.write(cred_url)
os.chmod(cred_file, 0o600)
def _generate_pr_description(self, srx_config: str) -> str:
"""Generate a descriptive PR body"""
config_lines = srx_config.strip().split('\n')
summary = []
# Parse configuration to create summary
for line in config_lines:
if 'security-zone' in line and 'address' in line:
summary.append(f"- {line.strip()}")
elif 'application' in line and 'destination-port' in line:
summary.append(f"- {line.strip()}")
description = f"""## 🤖 AI-Generated Network Configuration
This pull request contains network configuration suggestions generated by the AI orchestrator based on traffic analysis from the past 7 days.
### 📊 Analysis Summary
- **Analysis Period**: Last 7 days
- **Data Source**: NetFlow/J-Flow from Elasticsearch
- **Generation Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
### 🔧 Proposed Changes
{chr(10).join(summary[:10]) if summary else 'Various security zone and application updates'}
{'... and more' if len(summary) > 10 else ''}
### ⚠️ Review Required
Please review these suggestions carefully before approving. The AI has analyzed traffic patterns and suggested optimizations, but human validation is essential.
### 🔄 Deployment
Once approved, these changes will be automatically deployed during the next deployment window (daily at 5 AM).
---
*Generated by SRX AI GitOps Orchestrator*"""
return description
def get_pr_status(self, pr_number: int) -> Optional[Dict]:
"""
Get the status of a pull request
Args:
pr_number: PR number to check
Returns:
Dictionary with PR status info or None
"""
api_url = f"{self.url}/api/v1/repos/{self.repo}/pulls/{pr_number}"
try:
response = requests.get(api_url, headers=self.headers)
response.raise_for_status()
pr_data = response.json()
return {
'number': pr_data['number'],
'state': pr_data['state'], # open, closed
'merged': pr_data['merged'],
'mergeable': pr_data['mergeable'],
'title': pr_data['title'],
'created_at': pr_data['created_at'],
'updated_at': pr_data['updated_at']
}
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get PR status: {e}")
return None
def get_label_ids(self, label_names: list) -> list:
"""
Get label IDs from label names
Args:
label_names: List of label names
Returns:
List of label IDs
"""
api_url = f"{self.url}/api/v1/repos/{self.repo}/labels"
try:
response = requests.get(api_url, headers=self.headers)
response.raise_for_status()
labels = response.json()
label_map = {label['name']: label['id'] for label in labels}
found_ids = []
for name in label_names:
if name in label_map:
found_ids.append(label_map[name])
logger.info(f"Found label '{name}' with ID {label_map[name]}")
else:
logger.warning(f"Label '{name}' not found in repository")
return found_ids
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get labels: {e}")
return []

View File

@@ -1,351 +0,0 @@
#!/usr/bin/env python3
"""
Gitea Integration Module for SRX GitOps - Fixed Push Authentication
Handles Git operations and Gitea API interactions
"""
import os
import json
import logging
import tempfile
import shutil
from datetime import datetime
from typing import Dict, Optional, Tuple
import subprocess
import requests
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
class GiteaIntegration:
"""Handles all Gitea-related operations"""
def __init__(self, config: Dict):
"""
Initialize Gitea integration
Args:
config: Dictionary containing:
- url: Gitea instance URL
- token: API token
- repo: repository in format "owner/repo"
- branch: default branch (usually "main")
"""
self.url = config['url'].rstrip('/')
self.token = config['token']
self.repo = config['repo']
self.default_branch = config.get('branch', 'main')
# Parse owner and repo name
self.owner, self.repo_name = self.repo.split('/')
# Set up API headers
self.headers = {
'Authorization': f'token {self.token}',
'Content-Type': 'application/json'
}
# Git configuration
self.git_url = f"{self.url}/{self.repo}.git"
self.auth_git_url = f"https://oauth2:{self.token}@{urlparse(self.url).netloc}/{self.repo}.git"
logger.info(f"Initialized Gitea integration for {self.repo}")
def _run_git_command(self, cmd: list, cwd: str = None) -> Tuple[bool, str]:
"""
Run a git command and return success status and output
Args:
cmd: List of command arguments
cwd: Working directory
Returns:
Tuple of (success, output)
"""
try:
# Log the command (but hide token)
safe_cmd = []
for arg in cmd:
if self.token in arg:
safe_cmd.append(arg.replace(self.token, "***TOKEN***"))
else:
safe_cmd.append(arg)
logger.debug(f"Running git command: {' '.join(safe_cmd)}")
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
check=True
)
return True, result.stdout
except subprocess.CalledProcessError as e:
safe_cmd = []
for arg in cmd:
if self.token in arg:
safe_cmd.append(arg.replace(self.token, "***TOKEN***"))
else:
safe_cmd.append(arg)
logger.error(f"Git command failed: {' '.join(safe_cmd)}")
logger.error(f"Error: {e.stderr}")
return False, e.stderr
def create_pr_with_config(self, srx_config: str, title: str = None,
description: str = None) -> Optional[Dict]:
"""
Create a pull request with SRX configuration
Args:
srx_config: The SRX configuration content
title: PR title (auto-generated if not provided)
description: PR description (auto-generated if not provided)
Returns:
PR information dict or None if failed
"""
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
branch_name = f"ai-suggestions-{timestamp}"
# Auto-generate title and description if not provided
if not title:
title = f"AI Network Configuration Suggestions - {datetime.now().strftime('%Y-%m-%d')}"
if not description:
description = self._generate_pr_description(srx_config)
# Create temporary directory for git operations
with tempfile.TemporaryDirectory() as temp_dir:
logger.info(f"Working in temporary directory: {temp_dir}")
# Step 1: Clone the repository with authentication
logger.info("Cloning repository...")
success, output = self._run_git_command(
['git', 'clone', '--depth', '1', self.auth_git_url, '.'],
cwd=temp_dir
)
if not success:
logger.error("Failed to clone repository")
return None
# Step 2: Configure git user
self._run_git_command(
['git', 'config', 'user.email', 'ai-orchestrator@srx-gitops.local'],
cwd=temp_dir
)
self._run_git_command(
['git', 'config', 'user.name', 'AI Orchestrator'],
cwd=temp_dir
)
# IMPORTANT: Set the push URL explicitly with authentication
# This ensures push uses the authenticated URL
logger.info("Setting authenticated push URL...")
self._run_git_command(
['git', 'remote', 'set-url', 'origin', self.auth_git_url],
cwd=temp_dir
)
# Step 3: Create and checkout new branch
logger.info(f"Creating branch: {branch_name}")
success, _ = self._run_git_command(
['git', 'checkout', '-b', branch_name],
cwd=temp_dir
)
if not success:
logger.error("Failed to create branch")
return None
# Step 4: Create ai-suggestions directory if it doesn't exist
suggestions_dir = os.path.join(temp_dir, 'ai-suggestions')
os.makedirs(suggestions_dir, exist_ok=True)
# Step 5: Write configuration file
config_filename = f"suggestion-{timestamp}.conf"
config_path = os.path.join(suggestions_dir, config_filename)
with open(config_path, 'w') as f:
f.write(f"# AI-Generated SRX Configuration\n")
f.write(f"# Generated: {datetime.now().isoformat()}\n")
f.write(f"# Analysis Period: Last 7 days\n\n")
f.write(srx_config)
logger.info(f"Created config file: {config_filename}")
# Step 6: Add and commit changes
self._run_git_command(['git', 'add', '.'], cwd=temp_dir)
commit_message = f"Add AI-generated configuration suggestions for {datetime.now().strftime('%Y-%m-%d')}"
success, _ = self._run_git_command(
['git', 'commit', '-m', commit_message],
cwd=temp_dir
)
if not success:
logger.warning("No changes to commit (file might already exist)")
# Check if we actually have changes
status_success, status_output = self._run_git_command(
['git', 'status', '--porcelain'],
cwd=temp_dir
)
if not status_output.strip():
logger.info("No changes detected, skipping PR creation")
return None
# Step 7: Push branch
logger.info(f"Pushing branch {branch_name}...")
success, _ = self._run_git_command(
['git', 'push', '-u', 'origin', branch_name],
cwd=temp_dir
)
if not success:
logger.error("Failed to push branch")
# Try alternative push command
logger.info("Trying alternative push method...")
success, _ = self._run_git_command(
['git', 'push', self.auth_git_url, f"{branch_name}:{branch_name}"],
cwd=temp_dir
)
if not success:
logger.error("All push attempts failed")
return None
# Step 8: Create pull request via API
logger.info("Creating pull request via Gitea API...")
# Get label IDs if configured
label_ids = []
if 'labels' in self.config:
label_ids = self.get_label_ids(self.config['labels'])
pr_data = {
"title": title,
"body": description,
"head": branch_name,
"base": self.default_branch
}
# Only add labels if we found valid IDs
if label_ids:
pr_data["labels"] = label_ids
api_url = f"{self.url}/api/v1/repos/{self.repo}/pulls"
try:
response = requests.post(api_url, json=pr_data, headers=self.headers)
response.raise_for_status()
pr_info = response.json()
logger.info(f"Successfully created PR #{pr_info['number']}: {pr_info['title']}")
return {
'number': pr_info['number'],
'url': pr_info['html_url'],
'branch': branch_name,
'created_at': pr_info['created_at']
}
except requests.exceptions.RequestException as e:
logger.error(f"Failed to create PR via API: {e}")
if hasattr(e.response, 'text'):
logger.error(f"Response: {e.response.text}")
return None
def _generate_pr_description(self, srx_config: str) -> str:
"""Generate a descriptive PR body"""
config_lines = srx_config.strip().split('\n')
summary = []
# Parse configuration to create summary
for line in config_lines:
if 'security-zone' in line and 'address' in line:
summary.append(f"- {line.strip()}")
elif 'application' in line and 'destination-port' in line:
summary.append(f"- {line.strip()}")
description = f"""## 🤖 AI-Generated Network Configuration
This pull request contains network configuration suggestions generated by the AI orchestrator based on traffic analysis from the past 7 days.
### 📊 Analysis Summary
- **Analysis Period**: Last 7 days
- **Data Source**: NetFlow/J-Flow from Elasticsearch
- **Generation Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
### 🔧 Proposed Changes
{chr(10).join(summary[:10]) if summary else 'Various security zone and application updates'}
{'... and more' if len(summary) > 10 else ''}
### ⚠️ Review Required
Please review these suggestions carefully before approving. The AI has analyzed traffic patterns and suggested optimizations, but human validation is essential.
### 🔄 Deployment
Once approved, these changes will be automatically deployed during the next deployment window (daily at 5 AM).
---
*Generated by SRX AI GitOps Orchestrator*"""
return description
def get_pr_status(self, pr_number: int) -> Optional[Dict]:
"""
Get the status of a pull request
Args:
pr_number: PR number to check
Returns:
Dictionary with PR status info or None
"""
api_url = f"{self.url}/api/v1/repos/{self.repo}/pulls/{pr_number}"
try:
response = requests.get(api_url, headers=self.headers)
response.raise_for_status()
pr_data = response.json()
return {
'number': pr_data['number'],
'state': pr_data['state'], # open, closed
'merged': pr_data['merged'],
'mergeable': pr_data['mergeable'],
'title': pr_data['title'],
'created_at': pr_data['created_at'],
'updated_at': pr_data['updated_at']
}
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get PR status: {e}")
return None
def get_label_ids(self, label_names: list) -> list:
"""
Get label IDs from label names
Args:
label_names: List of label names
Returns:
List of label IDs
"""
api_url = f"{self.url}/api/v1/repos/{self.repo}/labels"
try:
response = requests.get(api_url, headers=self.headers)
response.raise_for_status()
labels = response.json()
label_map = {label['name']: label['id'] for label in labels}
found_ids = []
for name in label_names:
if name in label_map:
found_ids.append(label_map[name])
logger.info(f"Found label '{name}' with ID {label_map[name]}")
else:
logger.warning(f"Label '{name}' not found in repository")
return found_ids
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get labels: {e}")
return []

View File

@@ -1,544 +0,0 @@
#!/usr/bin/env python3
"""
Enhanced Network AI Orchestrator - Production Version
Compatible with Elasticsearch 7.x
"""
import os
import sys
import json
import time
import logging
import signal
import hashlib
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
import yaml
import uuid
import threading
from elasticsearch import Elasticsearch # Using sync version for ES 7.x
from git import Repo
import requests
# Configure production logging
def setup_logging():
"""Configure comprehensive logging for production"""
log_dir = Path("/home/netops/orchestrator/logs")
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"orchestrator_{datetime.now().strftime('%Y%m%d')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(sys.stdout)
]
)
return logging.getLogger(__name__)
logger = setup_logging()
class NetworkOrchestrator:
def __init__(self, config_path: str = "/home/netops/orchestrator/config.yaml"):
"""Initialize the orchestrator with configuration"""
self.config = self.load_config(config_path)
self.es_client = None
self.git_repo = None
self.running = True
self.shared_dir = Path("/shared/ai-gitops")
self.request_dir = self.shared_dir / "requests"
self.response_dir = self.shared_dir / "responses"
# Ensure directories exist
self.request_dir.mkdir(parents=True, exist_ok=True)
self.response_dir.mkdir(parents=True, exist_ok=True)
logger.info("Orchestrator initialized")
def should_create_pr(self):
"""Check if we should create a PR based on schedule and state"""
if not self.config.get('pr_creation', {}).get('enabled', True):
return False
# Load state
state = self.load_state()
# Check if pending PR exists
if self.config['pr_creation'].get('skip_if_pending', True):
if state.get('pending_pr', False):
logger.info("Skipping PR - existing PR is pending")
return False
# Check frequency
frequency = self.config['pr_creation'].get('frequency', 'weekly')
if frequency == 'weekly':
# Check if it's the right day and hour
now = datetime.now()
target_day = self.config['pr_creation'].get('day_of_week', 'monday')
target_hour = self.config['pr_creation'].get('hour_of_day', 9)
# Convert day name to number
days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
target_day_num = days.index(target_day.lower())
# Check if it's the right day and hour
if now.weekday() != target_day_num or now.hour != target_hour:
return False
# Check minimum days between PRs
if state.get('last_pr_created'):
last_pr_date = datetime.fromisoformat(state['last_pr_created'])
days_since = (datetime.now() - last_pr_date).days
min_days = self.config['pr_creation'].get('min_days_between', 7)
if days_since < min_days:
logger.info(f"Skipping PR - only {days_since} days since last PR")
return False
return True
def load_state(self):
"""Load orchestrator state"""
state_file = self.config.get('state_tracking', {}).get('state_file', '/var/lib/orchestrator/state.json')
if Path(state_file).exists():
with open(state_file, 'r') as f:
return json.load(f)
return {}
def save_state(self, updates):
"""Save orchestrator state"""
state_file = self.config.get('state_tracking', {}).get('state_file', '/var/lib/orchestrator/state.json')
state = self.load_state()
state.update(updates)
state['last_updated'] = datetime.now().isoformat()
with open(state_file, 'w') as f:
json.dump(state, f, indent=2)
def load_config(self, config_path: str) -> Dict:
"""Load configuration from YAML file"""
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
logger.info(f"Configuration loaded from {config_path}")
# Replace environment variables
if 'gitea' in config and 'token' in config['gitea']:
if config['gitea']['token'] == '${GITEA_TOKEN}':
config['gitea']['token'] = os.environ.get('GITEA_TOKEN', '')
return config
except Exception as e:
logger.error(f"Failed to load config: {e}")
# Use defaults if config fails
return {
'elasticsearch': {
'host': 'INTERNAL_IP:9200',
'index': 'netflow-*'
},
'analysis': {
'interval_minutes': 60,
'window_hours': 24
},
'gitea': {
'url': 'https://git.salmutt.dev',
'repo': 'sal/srx-config',
'token': os.environ.get('GITEA_TOKEN', '')
}
}
def setup_elasticsearch(self):
"""Initialize Elasticsearch connection (synchronous for ES 7.x)"""
try:
es_config = self.config['elasticsearch']
self.es_client = Elasticsearch(
hosts=[es_config['host']],
verify_certs=False,
timeout=30
)
# Test connection
info = self.es_client.info()
logger.info(f"Connected to Elasticsearch: {info['version']['number']}")
return True
except Exception as e:
logger.error(f"Failed to connect to Elasticsearch: {e}")
self.es_client = None
return False
def collect_traffic_data(self) -> Dict:
"""Collect traffic data from Elasticsearch (synchronous)"""
if not self.es_client:
logger.warning("Elasticsearch not connected, using mock data")
return self.generate_mock_data()
try:
window_hours = self.config['analysis']['window_hours']
query = {
"query": {
"range": {
"@timestamp": {
"gte": f"now-{window_hours}h",
"lte": "now"
}
}
},
"size": 0,
"aggs": {
"top_talkers": {
"terms": {
"field": "source_ip",
"size": 20
},
"aggs": {
"bytes": {"sum": {"field": "bytes"}},
"packets": {"sum": {"field": "packets"}}
}
},
"protocols": {
"terms": {
"field": "protocol",
"size": 10
}
},
"vlans": {
"terms": {
"field": "vlan_id",
"size": 20
},
"aggs": {
"bytes": {"sum": {"field": "bytes"}}
}
},
"hourly_traffic": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "hour"
},
"aggs": {
"bytes": {"sum": {"field": "bytes"}}
}
}
}
}
result = self.es_client.search(
index=self.config['elasticsearch']['index'],
body=query
)
total_hits = result['hits']['total']
# Handle both ES 6.x and 7.x response formats
if isinstance(total_hits, dict):
total_count = total_hits['value']
else:
total_count = total_hits
logger.info(f"Collected traffic data: {total_count} flows")
return result['aggregations']
except Exception as e:
logger.error(f"Error collecting traffic data: {e}")
return self.generate_mock_data()
def generate_mock_data(self) -> Dict:
"""Generate mock traffic data for testing"""
return {
"top_talkers": {
"buckets": [
{"key": "INTERNAL_IP", "doc_count": 15000,
"bytes": {"value": 5000000}, "packets": {"value": 10000}},
{"key": "INTERNAL_IP", "doc_count": 12000,
"bytes": {"value": 4000000}, "packets": {"value": 8000}},
{"key": "INTERNAL_IP", "doc_count": 8000,
"bytes": {"value": 2000000}, "packets": {"value": 5000}},
{"key": "10.0.0.5", "doc_count": 6000,
"bytes": {"value": 1500000}, "packets": {"value": 3000}}
]
},
"protocols": {
"buckets": [
{"key": "tcp", "doc_count": 25000},
{"key": "udp", "doc_count": 15000},
{"key": "icmp", "doc_count": 2000}
]
},
"vlans": {
"buckets": [
{"key": 100, "doc_count": 20000, "bytes": {"value": 8000000}},
{"key": 200, "doc_count": 15000, "bytes": {"value": 6000000}},
{"key": 300, "doc_count": 5000, "bytes": {"value": 2000000}}
]
}
}
def request_ai_analysis(self, traffic_data: Dict) -> Optional[Dict]:
"""Send traffic data to AI for analysis"""
request_id = str(uuid.uuid4())
request_file = self.request_dir / f"{request_id}.json"
request_data = {
"request_id": request_id,
"timestamp": datetime.now().isoformat(),
"type": "traffic_analysis",
"data": traffic_data,
"prompt": self.build_analysis_prompt(traffic_data)
}
try:
with open(request_file, 'w') as f:
json.dump(request_data, f, indent=2)
logger.info(f"AI request created: {request_id}")
# Wait for response (with timeout)
response = self.wait_for_ai_response(request_id, timeout=120)
return response
except Exception as e:
logger.error(f"Error requesting AI analysis: {e}")
return None
def build_analysis_prompt(self, traffic_data: Dict) -> str:
"""Build prompt for AI analysis"""
prompt = """Analyze this network traffic data and suggest optimizations for a Juniper SRX firewall:
Traffic Summary:
- Top Talkers: {}
- Active VLANs: {}
- Protocol Distribution: {}
Based on this data, please provide:
1. Security rule optimizations (as Juniper SRX configuration commands)
2. QoS improvements for high-traffic hosts
3. VLAN segmentation recommendations
4. Potential security concerns or anomalies
Format your response with specific Juniper SRX configuration commands that can be applied.
Include comments explaining each change."""
# Extract key metrics
top_ips = [b['key'] for b in traffic_data.get('top_talkers', {}).get('buckets', [])][:5]
vlans = [str(b['key']) for b in traffic_data.get('vlans', {}).get('buckets', [])][:5]
protocols = [b['key'] for b in traffic_data.get('protocols', {}).get('buckets', [])][:3]
return prompt.format(
', '.join(top_ips) if top_ips else 'No data',
', '.join(vlans) if vlans else 'No VLANs',
', '.join(protocols) if protocols else 'No protocols'
)
def wait_for_ai_response(self, request_id: str, timeout: int = 120) -> Optional[Dict]:
"""Wait for AI response file"""
response_file = self.response_dir / f"{request_id}_response.json"
start_time = time.time()
while time.time() - start_time < timeout:
if response_file.exists():
try:
time.sleep(1) # Give AI time to finish writing
with open(response_file, 'r') as f:
response = json.load(f)
logger.info(f"AI response received: {request_id}")
# Log a snippet of the response
if 'response' in response:
snippet = response['response'][:200] + '...' if len(response['response']) > 200 else response['response']
logger.info(f"AI suggestion snippet: {snippet}")
# Clean up files
response_file.unlink()
(self.request_dir / f"{request_id}.json").unlink(missing_ok=True)
return response
except Exception as e:
logger.error(f"Error reading AI response: {e}")
return None
time.sleep(2)
logger.warning(f"AI response timeout for {request_id}")
return None
def create_gitea_pr(self, ai_response: Dict) -> bool:
"""Create pull request in Gitea with suggested changes"""
try:
gitea_config = self.config['gitea']
if not gitea_config.get('token'):
logger.error("Gitea token not configured")
return False
# Extract configuration from AI response
# Use 'suggestions' field if available, fallback to 'response'
config_changes = ai_response.get('suggestions', ai_response.get('response', 'No configuration suggested'))
# Create a unique branch name
branch_name = f"ai-suggestions-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
# Format the PR body
pr_data = {
"title": f"AI Network Optimizations - {datetime.now().strftime('%Y-%m-%d %H:%M')}",
"body": f"""## AI-Generated Network Optimizations
### Analysis Summary
Analysis completed at {datetime.now().isoformat()}
### Traffic Patterns Analyzed
- Analysis Window: {self.config['analysis']['window_hours']} hours
- Data Source: NetFlow/J-Flow from SRX
### Proposed Configuration Changes
```junos
{config_changes}
```
### Review Instructions
1. Review the proposed changes carefully
2. Test in lab environment if possible
3. Schedule maintenance window if approved
4. Monitor after deployment
**This PR was automatically generated by the AI Network Orchestrator**
""",
"base": "main",
"head": branch_name
}
headers = {
"Authorization": f"token {gitea_config['token']}",
"Content-Type": "application/json"
}
# For now, log what would be sent (since branch creation needs more setup)
logger.info(f"Would create PR with title: {pr_data['title']}")
logger.info(f"Configuration changes proposed: {len(config_changes)} characters")
# TODO: Implement actual Git operations and PR creation
# This requires cloning the repo, creating branch, committing changes, pushing
return True
except Exception as e:
logger.error(f"Error creating Gitea PR: {e}")
return False
def run_analysis_cycle(self):
"""Run a complete analysis cycle"""
logger.info("="*60)
logger.info("Starting traffic analysis cycle")
logger.info("="*60)
try:
# Always collect traffic data
logger.info("Step 1: Collecting traffic data from Elasticsearch...")
traffic_data = self.collect_traffic_data()
if not traffic_data:
logger.warning("No traffic data available, skipping analysis")
return
# Log summary of collected data
top_talkers = traffic_data.get('top_talkers', {}).get('buckets', [])
if top_talkers:
logger.info(f"Found {len(top_talkers)} top talkers")
logger.info(f"Top IP: {top_talkers[0]['key']} with {top_talkers[0]['doc_count']} flows")
# Always request AI analysis
logger.info("Step 2: Requesting AI analysis...")
ai_response = self.request_ai_analysis(traffic_data)
# Save state for analysis
self.save_state({
'last_analysis_run': datetime.now().isoformat(),
'last_analysis_data': {
'top_talkers_count': len(top_talkers),
'response_received': bool(ai_response)
}
})
# Check if we should create PR
if self.should_create_pr():
if ai_response and (ai_response.get('response') or ai_response.get('suggestions')):
logger.info("Step 3: Creating PR with AI suggestions...")
if self.create_gitea_pr(ai_response):
logger.info("✓ PR created successfully")
self.save_state({
'last_pr_created': datetime.now().isoformat(),
'pending_pr': True
})
else:
logger.warning("Failed to create Gitea PR")
else:
logger.info("No actionable suggestions from AI analysis")
else:
logger.info("Not time for PR creation - analysis data saved for future use")
except Exception as e:
logger.error(f"Error in analysis cycle: {e}")
logger.info("="*60)
def main_loop(self):
"""Main orchestrator loop"""
logger.info("Starting Network AI Orchestrator")
# Setup Elasticsearch connection
if not self.setup_elasticsearch():
logger.warning("Running without Elasticsearch connection - using mock data")
interval = self.config['analysis'].get('interval_minutes', 60) * 60
# Run first analysis immediately
self.run_analysis_cycle()
while self.running:
try:
logger.info(f"Next analysis scheduled in {interval/60} minutes")
logger.info(f"Next run at: {(datetime.now() + timedelta(seconds=interval)).strftime('%H:%M:%S')}")
time.sleep(interval)
if self.running: # Check again after sleep
self.run_analysis_cycle()
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
break
except Exception as e:
logger.error(f"Error in main loop: {e}")
time.sleep(60) # Wait before retry
logger.info("Orchestrator shutdown complete")
def shutdown(self, signum=None, frame=None):
"""Graceful shutdown handler"""
if signum:
logger.info(f"Received signal {signum}, initiating shutdown...")
else:
logger.info("Initiating shutdown...")
self.running = False
if self.es_client:
try:
# Close Elasticsearch connection
self.es_client.transport.close()
except:
pass
def main():
"""Main entry point"""
orchestrator = NetworkOrchestrator()
# Set up signal handlers
signal.signal(signal.SIGTERM, orchestrator.shutdown)
signal.signal(signal.SIGINT, orchestrator.shutdown)
try:
orchestrator.main_loop()
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -1,621 +0,0 @@
#!/usr/bin/env python3
"""
Enhanced Network AI Orchestrator - Production Version with Gitea Integration
Compatible with Elasticsearch 7.x
"""
import os
import sys
import json
import time
import logging
import signal
import hashlib
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
import yaml
import uuid
import threading
from elasticsearch import Elasticsearch # Using sync version for ES 7.x
from git import Repo
import requests
# ADD THIS IMPORT - New for Phase 3
from gitea_integration import GiteaIntegration
# Configure production logging
def setup_logging():
"""Configure comprehensive logging for production"""
log_dir = Path("/home/netops/orchestrator/logs")
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"orchestrator_{datetime.now().strftime('%Y%m%d')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(sys.stdout)
]
)
return logging.getLogger(__name__)
logger = setup_logging()
class NetworkOrchestrator:
def __init__(self, config_path: str = "/home/netops/orchestrator/config.yaml"):
"""Initialize the orchestrator with configuration"""
self.config = self.load_config(config_path)
self.es_client = None
self.git_repo = None
self.running = True
self.shared_dir = Path("/shared/ai-gitops")
self.request_dir = self.shared_dir / "requests"
self.response_dir = self.shared_dir / "responses"
# ADD THIS - Initialize state for Phase 3
self.state = self.load_state()
# Ensure directories exist
self.request_dir.mkdir(parents=True, exist_ok=True)
self.response_dir.mkdir(parents=True, exist_ok=True)
logger.info("Orchestrator initialized")
def should_create_pr(self):
"""Check if we should create a PR based on schedule and state"""
if not self.config.get('pr_creation', {}).get('enabled', True):
return False
# Load state
state = self.load_state()
# Check if pending PR exists
if self.config['pr_creation'].get('skip_if_pending', True):
if state.get('pending_pr', False):
logger.info("Skipping PR - existing PR is pending")
return False
# Check frequency
frequency = self.config['pr_creation'].get('frequency', 'weekly')
if frequency == 'weekly':
# Check if it's the right day and hour
now = datetime.now()
target_day = self.config['pr_creation'].get('day_of_week', 'saturday')
target_hour = self.config['pr_creation'].get('hour_of_day', 5)
# Convert day name to number
days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
target_day_num = days.index(target_day.lower())
# Check if it's the right day and hour
if now.weekday() != target_day_num or now.hour != target_hour:
return False
# Check minimum days between PRs
if state.get('last_pr_created'):
last_pr_date = datetime.fromisoformat(state['last_pr_created'])
days_since = (datetime.now() - last_pr_date).days
min_days = self.config['pr_creation'].get('min_days_between', 7)
if days_since < min_days:
logger.info(f"Skipping PR - only {days_since} days since last PR")
return False
return True
def load_state(self):
"""Load orchestrator state"""
state_file = self.config.get('state_tracking', {}).get('state_file', '/var/lib/orchestrator/state.json')
if Path(state_file).exists():
with open(state_file, 'r') as f:
return json.load(f)
return {}
def save_state(self, updates):
"""Save orchestrator state"""
state_file = self.config.get('state_tracking', {}).get('state_file', '/var/lib/orchestrator/state.json')
state = self.load_state()
state.update(updates)
state['last_updated'] = datetime.now().isoformat()
with open(state_file, 'w') as f:
json.dump(state, f, indent=2)
def load_config(self, config_path: str) -> Dict:
"""Load configuration from YAML file"""
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
logger.info(f"Configuration loaded from {config_path}")
# Replace environment variables
if 'gitea' in config and 'token' in config['gitea']:
if config['gitea']['token'] == '${GITEA_TOKEN}':
config['gitea']['token'] = os.environ.get('GITEA_TOKEN', '')
return config
except Exception as e:
logger.error(f"Failed to load config: {e}")
# Use defaults if config fails
return {
'elasticsearch': {
'host': 'INTERNAL_IP:9200',
'index': 'netflow-*'
},
'analysis': {
'interval_minutes': 60,
'window_hours': 24
},
'gitea': {
'url': 'https://git.salmutt.dev',
'repo': 'sal/srx-config',
'token': os.environ.get('GITEA_TOKEN', '')
}
}
def setup_elasticsearch(self):
"""Initialize Elasticsearch connection (synchronous for ES 7.x)"""
try:
es_config = self.config['elasticsearch']
self.es_client = Elasticsearch(
hosts=[es_config['host']],
verify_certs=False,
timeout=30
)
# Test connection
info = self.es_client.info()
logger.info(f"Connected to Elasticsearch: {info['version']['number']}")
return True
except Exception as e:
logger.error(f"Failed to connect to Elasticsearch: {e}")
self.es_client = None
return False
def collect_traffic_data(self) -> Dict:
"""Collect traffic data from Elasticsearch (synchronous)"""
if not self.es_client:
logger.warning("Elasticsearch not connected, using mock data")
return self.generate_mock_data()
try:
window_hours = self.config['analysis']['window_hours']
query = {
"query": {
"range": {
"@timestamp": {
"gte": f"now-{window_hours}h",
"lte": "now"
}
}
},
"size": 0,
"aggs": {
"top_talkers": {
"terms": {
"field": "source_ip",
"size": 20
},
"aggs": {
"bytes": {"sum": {"field": "bytes"}},
"packets": {"sum": {"field": "packets"}}
}
},
"protocols": {
"terms": {
"field": "protocol",
"size": 10
}
},
"vlans": {
"terms": {
"field": "vlan_id",
"size": 20
},
"aggs": {
"bytes": {"sum": {"field": "bytes"}}
}
},
"hourly_traffic": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "hour"
},
"aggs": {
"bytes": {"sum": {"field": "bytes"}}
}
}
}
}
result = self.es_client.search(
index=self.config['elasticsearch']['index'],
body=query
)
total_hits = result['hits']['total']
# Handle both ES 6.x and 7.x response formats
if isinstance(total_hits, dict):
total_count = total_hits['value']
else:
total_count = total_hits
logger.info(f"Collected traffic data: {total_count} flows")
return result['aggregations']
except Exception as e:
logger.error(f"Error collecting traffic data: {e}")
return self.generate_mock_data()
def generate_mock_data(self) -> Dict:
"""Generate mock traffic data for testing"""
return {
"top_talkers": {
"buckets": [
{"key": "INTERNAL_IP", "doc_count": 15000,
"bytes": {"value": 5000000}, "packets": {"value": 10000}},
{"key": "INTERNAL_IP", "doc_count": 12000,
"bytes": {"value": 4000000}, "packets": {"value": 8000}},
{"key": "INTERNAL_IP", "doc_count": 8000,
"bytes": {"value": 2000000}, "packets": {"value": 5000}},
{"key": "10.0.0.5", "doc_count": 6000,
"bytes": {"value": 1500000}, "packets": {"value": 3000}}
]
},
"protocols": {
"buckets": [
{"key": "tcp", "doc_count": 25000},
{"key": "udp", "doc_count": 15000},
{"key": "icmp", "doc_count": 2000}
]
},
"vlans": {
"buckets": [
{"key": 100, "doc_count": 20000, "bytes": {"value": 8000000}},
{"key": 200, "doc_count": 15000, "bytes": {"value": 6000000}},
{"key": 300, "doc_count": 5000, "bytes": {"value": 2000000}}
]
}
}
def request_ai_analysis(self, traffic_data: Dict) -> Optional[Dict]:
"""Send traffic data to AI for analysis"""
request_id = str(uuid.uuid4())
request_file = self.request_dir / f"{request_id}.json"
request_data = {
"request_id": request_id,
"timestamp": datetime.now().isoformat(),
"type": "traffic_analysis",
"data": traffic_data,
"prompt": self.build_analysis_prompt(traffic_data)
}
try:
with open(request_file, 'w') as f:
json.dump(request_data, f, indent=2)
logger.info(f"AI request created: {request_id}")
# Wait for response (with timeout)
response = self.wait_for_ai_response(request_id, timeout=120)
return response
except Exception as e:
logger.error(f"Error requesting AI analysis: {e}")
return None
def build_analysis_prompt(self, traffic_data: Dict) -> str:
"""Build prompt for AI analysis"""
prompt = """Analyze this network traffic data and suggest optimizations for a Juniper SRX firewall:
Traffic Summary:
- Top Talkers: {}
- Active VLANs: {}
- Protocol Distribution: {}
Based on this data, please provide:
1. Security rule optimizations (as Juniper SRX configuration commands)
2. QoS improvements for high-traffic hosts
3. VLAN segmentation recommendations
4. Potential security concerns or anomalies
Format your response with specific Juniper SRX configuration commands that can be applied.
Include comments explaining each change."""
# Extract key metrics
top_ips = [b['key'] for b in traffic_data.get('top_talkers', {}).get('buckets', [])][:5]
vlans = [str(b['key']) for b in traffic_data.get('vlans', {}).get('buckets', [])][:5]
protocols = [b['key'] for b in traffic_data.get('protocols', {}).get('buckets', [])][:3]
return prompt.format(
', '.join(top_ips) if top_ips else 'No data',
', '.join(vlans) if vlans else 'No VLANs',
', '.join(protocols) if protocols else 'No protocols'
)
def wait_for_ai_response(self, request_id: str, timeout: int = 120) -> Optional[Dict]:
"""Wait for AI response file"""
response_file = self.response_dir / f"{request_id}_response.json"
start_time = time.time()
while time.time() - start_time < timeout:
if response_file.exists():
try:
time.sleep(1) # Give AI time to finish writing
with open(response_file, 'r') as f:
response = json.load(f)
logger.info(f"AI response received: {request_id}")
# Log a snippet of the response
if 'response' in response:
snippet = response['response'][:200] + '...' if len(response['response']) > 200 else response['response']
logger.info(f"AI suggestion snippet: {snippet}")
# Clean up files
response_file.unlink()
(self.request_dir / f"{request_id}.json").unlink(missing_ok=True)
return response
except Exception as e:
logger.error(f"Error reading AI response: {e}")
return None
time.sleep(2)
logger.warning(f"AI response timeout for {request_id}")
return None
# REPLACE THE EXISTING create_gitea_pr METHOD WITH THIS ENHANCED VERSION
def create_gitea_pr(self, ai_response: Dict = None) -> bool:
"""Create pull request in Gitea with suggested changes"""
try:
# If no AI response provided, get the latest one
if not ai_response:
latest_suggestion = self._get_latest_ai_suggestion()
if not latest_suggestion:
logger.warning("No AI suggestions found to create PR")
return False
# Read the suggestion file
with open(latest_suggestion['path'], 'r') as f:
ai_response = json.load(f)
# Check if we should create a PR
if not self.should_create_pr():
logger.info("Skipping PR creation - conditions not met")
return False
# Check for existing pending PR
if self.state.get('pending_pr'):
logger.info(f"Skipping PR creation - pending PR exists: {self.state['pending_pr']}")
return False
logger.info("Creating Gitea pull request with AI suggestions...")
# Initialize Gitea integration
gitea = GiteaIntegration(self.config['gitea'])
# Extract the SRX configuration
srx_config = ai_response.get('suggestions', ai_response.get('response', ''))
if not srx_config or srx_config.strip() == '':
logger.warning("Empty or invalid suggestions, skipping PR creation")
return False
# Create the PR
pr_info = gitea.create_pr_with_config(
srx_config=srx_config,
title=f"AI Network Configuration Suggestions - {datetime.now().strftime('%B %d, %Y')}",
description=None # Will auto-generate
)
if pr_info:
# Update state with PR information
self.state['pending_pr'] = pr_info['number']
self.state['last_pr_created'] = datetime.now().isoformat()
self.state['pr_url'] = pr_info['url']
self.save_state(self.state)
logger.info(f"Successfully created PR #{pr_info['number']}: {pr_info['url']}")
# Log to a separate file for notifications/monitoring
with open('/var/log/orchestrator/pr_created.log', 'a') as f:
f.write(f"{datetime.now().isoformat()} - Created PR #{pr_info['number']} - {pr_info['url']}\n")
return True
else:
logger.error("Failed to create PR in Gitea")
return False
except Exception as e:
logger.error(f"Error creating Gitea PR: {e}", exc_info=True)
return False
# ADD THIS NEW METHOD
def _get_latest_ai_suggestion(self) -> Optional[Dict]:
"""Get the most recent AI suggestion file"""
response_dir = '/shared/ai-gitops/responses'
try:
# List all response files
response_files = []
for filename in os.listdir(response_dir):
if filename.startswith('response_') and filename.endswith('.json'):
filepath = os.path.join(response_dir, filename)
# Get file modification time
mtime = os.path.getmtime(filepath)
response_files.append({
'path': filepath,
'filename': filename,
'mtime': mtime
})
if not response_files:
return None
# Sort by modification time and get the latest
response_files.sort(key=lambda x: x['mtime'], reverse=True)
return response_files[0]
except Exception as e:
logger.error(f"Error finding latest AI suggestion: {e}")
return None
# ADD THIS NEW METHOD
def check_pr_status(self):
"""Check the status of pending pull requests"""
if not self.state.get('pending_pr'):
return
try:
gitea = GiteaIntegration(self.config['gitea'])
pr_status = gitea.get_pr_status(self.state['pending_pr'])
if pr_status:
logger.info(f"PR #{pr_status['number']} status: {pr_status['state']} (merged: {pr_status['merged']})")
# If PR is closed or merged, clear the pending_pr flag
if pr_status['state'] == 'closed':
logger.info(f"PR #{pr_status['number']} has been closed")
self.state['pending_pr'] = None
self.state['last_pr_status'] = 'closed'
self.state['last_pr_closed'] = datetime.now().isoformat()
if pr_status['merged']:
self.state['last_pr_status'] = 'merged'
logger.info(f"PR #{pr_status['number']} was merged!")
# Mark for deployment
self.state['pending_deployment'] = True
self.state['deployment_pr'] = pr_status['number']
self.save_state(self.state)
except Exception as e:
logger.error(f"Error checking PR status: {e}")
def run_analysis_cycle(self):
"""Run a complete analysis cycle"""
logger.info("="*60)
logger.info("Starting traffic analysis cycle")
logger.info("="*60)
try:
# Always collect traffic data
logger.info("Step 1: Collecting traffic data from Elasticsearch...")
traffic_data = self.collect_traffic_data()
if not traffic_data:
logger.warning("No traffic data available, skipping analysis")
return
# Log summary of collected data
top_talkers = traffic_data.get('top_talkers', {}).get('buckets', [])
if top_talkers:
logger.info(f"Found {len(top_talkers)} top talkers")
logger.info(f"Top IP: {top_talkers[0]['key']} with {top_talkers[0]['doc_count']} flows")
# Always request AI analysis
logger.info("Step 2: Requesting AI analysis...")
ai_response = self.request_ai_analysis(traffic_data)
# Save state for analysis
self.save_state({
'last_analysis_run': datetime.now().isoformat(),
'last_analysis_data': {
'top_talkers_count': len(top_talkers),
'response_received': bool(ai_response)
}
})
# Check if we should create PR
if self.should_create_pr():
if ai_response and (ai_response.get('response') or ai_response.get('suggestions')):
logger.info("Step 3: Creating PR with AI suggestions...")
if self.create_gitea_pr(ai_response):
logger.info("✓ PR created successfully")
else:
logger.warning("Failed to create Gitea PR")
else:
logger.info("No actionable suggestions from AI analysis")
else:
logger.info("Not time for PR creation - analysis data saved for future use")
except Exception as e:
logger.error(f"Error in analysis cycle: {e}")
logger.info("="*60)
def main_loop(self):
"""Main orchestrator loop"""
logger.info("Starting Network AI Orchestrator")
# Setup Elasticsearch connection
if not self.setup_elasticsearch():
logger.warning("Running without Elasticsearch connection - using mock data")
interval = self.config['analysis'].get('interval_minutes', 60) * 60
# Run first analysis immediately
self.run_analysis_cycle()
while self.running:
try:
logger.info(f"Next analysis scheduled in {interval/60} minutes")
logger.info(f"Next run at: {(datetime.now() + timedelta(seconds=interval)).strftime('%H:%M:%S')}")
# MODIFIED: Check PR status every 15 minutes during the wait
for i in range(int(interval / 60)): # Check every minute
if not self.running:
break
time.sleep(60)
# Check PR status every 15 minutes
if i % 15 == 14 and self.state.get('pending_pr'):
logger.info("Checking PR status...")
self.check_pr_status()
if self.running: # Check again after sleep
self.run_analysis_cycle()
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
break
except Exception as e:
logger.error(f"Error in main loop: {e}")
time.sleep(60) # Wait before retry
logger.info("Orchestrator shutdown complete")
def shutdown(self, signum=None, frame=None):
"""Graceful shutdown handler"""
if signum:
logger.info(f"Received signal {signum}, initiating shutdown...")
else:
logger.info("Initiating shutdown...")
self.running = False
if self.es_client:
try:
# Close Elasticsearch connection
self.es_client.transport.close()
except:
pass
def main():
"""Main entry point"""
orchestrator = NetworkOrchestrator()
# Set up signal handlers
signal.signal(signal.SIGTERM, orchestrator.shutdown)
signal.signal(signal.SIGINT, orchestrator.shutdown)
try:
orchestrator.main_loop()
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -1,204 +0,0 @@
#!/usr/bin/env python3
"""
SRX AI GitOps Pipeline Status Monitor - Final Version
Shows the complete status of the automation pipeline
"""
import os
import json
import yaml
from datetime import datetime, timedelta
from tabulate import tabulate
import requests
import subprocess
class PipelineMonitor:
def __init__(self):
# Load configuration
with open('/home/netops/orchestrator/config.yaml', 'r') as f:
self.config = yaml.safe_load(f)
# Load state
state_file = self.config.get('state_tracking', {}).get('state_file', '/var/lib/orchestrator/state.json')
if os.path.exists(state_file):
try:
with open(state_file, 'r') as f:
self.state = json.load(f)
except:
self.state = {}
else:
self.state = {}
def check_services(self):
"""Check if services are running"""
status = []
# Check local orchestrator
try:
result = subprocess.run(
['systemctl', 'is-active', 'orchestrator.service'],
capture_output=True,
text=True
)
is_active = result.stdout.strip() == 'active'
status.append(['Orchestrator (Local)', '✅ Active' if is_active else '❌ Inactive'])
except:
status.append(['Orchestrator (Local)', '❓ Unknown'])
# Check AI Processor by looking at recent activity
ai_status = self.check_ai_processor_activity()
status.append(['AI Processor (INTERNAL_IP)', ai_status])
# Check deployment timer
try:
result = subprocess.run(
['systemctl', 'is-active', 'srx-deployment.timer'],
capture_output=True,
text=True
)
is_active = result.stdout.strip() == 'active'
status.append(['Deployment Timer', '✅ Active' if is_active else '❌ Not configured'])
except:
status.append(['Deployment Timer', '❓ Unknown'])
return status
def check_ai_processor_activity(self):
"""Check AI processor activity through shared files and state"""
# Check if we've had recent AI responses
if self.state.get('last_analysis_data', {}).get('response_received'):
last_analysis = self.state.get('last_analysis_run', '')
if last_analysis:
try:
last_time = datetime.fromisoformat(last_analysis)
if datetime.now() - last_time < timedelta(hours=2):
return '✅ Active (Recent activity)'
except:
pass
# Check response directory
response_dir = '/shared/ai-gitops/responses'
if os.path.exists(response_dir):
files = os.listdir(response_dir)
if len(files) > 0:
return '✅ Active (Has responses)'
# Check if requests are pending
request_dir = '/shared/ai-gitops/requests'
if os.path.exists(request_dir):
files = os.listdir(request_dir)
if len(files) > 0:
return '⏳ Processing requests'
return '💤 Idle'
def check_pr_status(self):
"""Check current PR status"""
if self.state.get('pending_pr'):
pr_num = self.state['pending_pr']
return f"PR #{pr_num} - Pending Review"
else:
return "No pending PR"
def get_next_events(self):
"""Calculate next scheduled events"""
now = datetime.now()
# Next analysis (hourly)
next_analysis = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
# Next PR creation (Saturday 5 AM)
days_until_saturday = (5 - now.weekday()) % 7
if days_until_saturday == 0 and now.hour >= 5:
days_until_saturday = 7
next_pr = now.replace(hour=5, minute=0, second=0, microsecond=0)
next_pr += timedelta(days=days_until_saturday)
# Next deployment (Daily 5 AM)
next_deploy = now.replace(hour=5, minute=0, second=0, microsecond=0)
if now.hour >= 5:
next_deploy += timedelta(days=1)
return [
['Next Analysis', next_analysis.strftime('%Y-%m-%d %H:%M')],
['Next PR Creation', next_pr.strftime('%Y-%m-%d %H:%M')],
['Next Deployment Check', next_deploy.strftime('%Y-%m-%d %H:%M')]
]
def get_recent_activity(self):
"""Get recent pipeline activity"""
activity = []
# Last analysis
if self.state.get('last_analysis_run'):
try:
last_analysis = datetime.fromisoformat(self.state['last_analysis_run'])
activity.append(['Last Analysis', last_analysis.strftime('%Y-%m-%d %H:%M')])
# Check if AI responded
if self.state.get('last_analysis_data', {}).get('response_received'):
activity.append(['AI Response', '✅ Received'])
else:
activity.append(['AI Response', '❌ Not received'])
except:
pass
# Last PR created
if self.state.get('last_pr_created'):
try:
last_pr = datetime.fromisoformat(self.state['last_pr_created'])
activity.append(['Last PR Created', last_pr.strftime('%Y-%m-%d %H:%M')])
except:
pass
# Last deployment
if self.state.get('last_successful_deployment'):
try:
last_deploy = datetime.fromisoformat(self.state['last_successful_deployment'])
activity.append(['Last Deployment', last_deploy.strftime('%Y-%m-%d %H:%M')])
except:
pass
return activity if activity else [['Status', 'No recent activity']]
def display_status(self):
"""Display complete pipeline status"""
print("\n" + "="*60)
print("🚀 SRX AI GitOps Pipeline Status")
print("="*60)
print(f"Current Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("\n📊 Service Status:")
print(tabulate(self.check_services(), headers=['Service', 'Status']))
print("\n🔄 Current State:")
print(f"PR Status: {self.check_pr_status()}")
print(f"Pending Deployment: {'Yes' if self.state.get('pending_deployment') else 'No'}")
print("\n📅 Scheduled Events:")
print(tabulate(self.get_next_events(), headers=['Event', 'Time']))
print("\n📜 Recent Activity:")
print(tabulate(self.get_recent_activity(), headers=['Event', 'Details']))
print("\n💾 Data Locations:")
print("Requests: /shared/ai-gitops/requests/")
print("Responses: /shared/ai-gitops/responses/")
print("Approved: /shared/ai-gitops/approved/")
print("Deployed: /shared/ai-gitops/deployed/")
print("\n🏗️ Architecture:")
print("Orchestrator VM: INTERNAL_IP (this VM)")
print("AI Processor VM: INTERNAL_IP")
print("Elasticsearch VM: INTERNAL_IP")
print("Gitea Server: git.salmutt.dev")
print("\n📋 Pipeline Flow:")
print("1. Every 60 min → Analyze traffic → Generate suggestions")
print("2. Saturday 5 AM → Create PR if suggestions exist")
print("3. Manual → Review and approve/reject PR")
print("4. Daily 5 AM → Deploy approved configurations")
print("="*60 + "\n")
if __name__ == "__main__":
monitor = PipelineMonitor()
monitor.display_status()

View File

@@ -1,193 +0,0 @@
#!/usr/bin/env python3
"""
Test script to verify context-aware AI processing
Run this before full deployment to ensure everything works
"""
import json
import time
from pathlib import Path
from datetime import datetime
def create_test_context(focus_area="security"):
"""Create a test context file"""
context = {
"timestamp": datetime.now().isoformat(),
"focus_area": focus_area,
"skip_basic": True,
"existing_features": [
"zones_configured",
"gaming_optimizations",
"vpn_configured"
],
"priority_features": [],
"instructions": [
"DO NOT suggest basic connectivity policies",
"DO NOT suggest any/any/any rules",
f"FOCUS on {focus_area} optimizations"
]
}
# Add focus-specific priorities
if focus_area == "security":
context["priority_features"] = [
"rate_limiting",
"ddos_protection",
"ids_ips_rules"
]
elif focus_area == "performance":
context["priority_features"] = [
"qos_policies",
"traffic_shaping",
"bandwidth_management"
]
elif focus_area == "monitoring":
context["priority_features"] = [
"syslog_enhancements",
"snmp_traps",
"flow_analytics"
]
# Save context
context_dir = Path('/shared/ai-gitops/context')
context_dir.mkdir(parents=True, exist_ok=True)
context_file = context_dir / 'current_context.json'
with open(context_file, 'w') as f:
json.dump(context, f, indent=2)
print(f"✅ Created context file for {focus_area}")
return context_file
def create_test_request():
"""Create a test analysis request"""
request = {
"request_id": f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"timestamp": datetime.now().isoformat(),
"data": {
"top_talkers": {
"buckets": [
{"key": "INTERNAL_IP", "doc_count": 1000},
{"key": "INTERNAL_IP", "doc_count": 800}
]
},
"vlans": {"buckets": []},
"protocols": {"buckets": []}
}
}
# Save request
request_dir = Path('/shared/ai-gitops/requests')
request_dir.mkdir(parents=True, exist_ok=True)
request_file = request_dir / f"{request['request_id']}.json"
with open(request_file, 'w') as f:
json.dump(request, f, indent=2)
print(f"✅ Created test request: {request['request_id']}")
return request['request_id']
def check_response(request_id, focus_area):
"""Check if response was generated with correct context"""
response_dir = Path('/shared/ai-gitops/responses')
response_file = response_dir / f"{request_id}_response.json"
# Wait for response (max 30 seconds)
for i in range(30):
if response_file.exists():
with open(response_file, 'r') as f:
response = json.load(f)
print(f"\n✅ Response generated!")
print(f" Focus area: {response.get('focus_area', 'unknown')}")
print(f" Feedback aware: {response.get('feedback_aware', False)}")
# Check if context was applied
if response.get('focus_area') == focus_area:
print(f" ✅ Context correctly applied: {focus_area}")
else:
print(f" ❌ Context mismatch! Expected: {focus_area}, Got: {response.get('focus_area')}")
# Show sample of suggestions
suggestions = response.get('suggestions', '').split('\n')[:5]
print(f"\n Sample suggestions:")
for line in suggestions:
if line.strip():
print(f" {line}")
return True
time.sleep(1)
print(f" Waiting for response... ({i+1}/30)")
print(f"❌ No response generated after 30 seconds")
return False
def run_test(focus_area="security"):
"""Run a complete test cycle"""
print(f"\n{'='*60}")
print(f"Testing Context System - Focus: {focus_area.upper()}")
print(f"{'='*60}")
# Step 1: Create context
context_file = create_test_context(focus_area)
# Step 2: Create request
request_id = create_test_request()
# Step 3: Check if AI processor is running
print("\n⏳ Waiting for AI processor to pick up request...")
print(" (Make sure ai_processor.py is running)")
# Step 4: Check response
success = check_response(request_id, focus_area)
if success:
print(f"\n🎉 Test PASSED for {focus_area} context!")
else:
print(f"\n❌ Test FAILED for {focus_area} context")
print("\nTroubleshooting:")
print("1. Is ai_processor.py running?")
print("2. Check logs: tail -f /var/log/ai-processor/ai-processor.log")
print("3. Verify Ollama is running: curl http://localhost:11434/api/tags")
return success
def main():
"""Main test function"""
print("🧪 AI Context System Test Suite")
print("================================")
import argparse
parser = argparse.ArgumentParser(description='Test context-aware AI processing')
parser.add_argument('--focus',
choices=['security', 'performance', 'monitoring', 'automation'],
default='security',
help='Focus area to test')
parser.add_argument('--all',
action='store_true',
help='Test all focus areas')
args = parser.parse_args()
if args.all:
# Test all focus areas
areas = ['security', 'performance', 'monitoring', 'automation']
results = {}
for area in areas:
results[area] = run_test(area)
time.sleep(5) # Wait between tests
# Summary
print(f"\n{'='*60}")
print("TEST SUMMARY")
print(f"{'='*60}")
for area, result in results.items():
status = "✅ PASSED" if result else "❌ FAILED"
print(f"{area.capitalize():15} {status}")
else:
# Test single focus area
run_test(args.focus)
if __name__ == "__main__":
main()

View File

@@ -1,364 +0,0 @@
#!/usr/bin/env python3
"""
Complete Feedback Loop Test
Tests creating a PR, rejecting it with feedback, and AI learning from it
"""
import os
import sys
import json
import time
import subprocess
from datetime import datetime
from pathlib import Path
import requests
class FeedbackLoopTester:
def __init__(self):
self.shared_dir = Path('/shared/ai-gitops')
self.feedback_dir = self.shared_dir / 'feedback'
self.pending_prs_dir = self.shared_dir / 'pending_prs'
self.responses_dir = self.shared_dir / 'responses'
# Ensure feedback directory exists
self.feedback_dir.mkdir(parents=True, exist_ok=True)
# Load config for Gitea (if exists)
self.config_file = Path('/home/netops/orchestrator/config.yaml')
if self.config_file.exists():
import yaml
with open(self.config_file, 'r') as f:
self.config = yaml.safe_load(f)
else:
self.config = {}
def step1_create_test_pr_data(self):
"""Create a test PR with intentionally problematic config"""
print("\n" + "="*60)
print("STEP 1: Creating Test PR with Problematic Config")
print("="*60)
# Create intentionally bad config for testing
bad_suggestions = """# TEST: Intentionally problematic config for feedback testing
# This should be rejected for security reasons
# ❌ BAD: Any/Any/Any rule (security risk)
set security policies from-zone trust to-zone untrust policy ALLOW-ALL match source-address any
set security policies from-zone trust to-zone untrust policy ALLOW-ALL match destination-address any
set security policies from-zone trust to-zone untrust policy ALLOW-ALL match application any
set security policies from-zone trust to-zone untrust policy ALLOW-ALL then permit
# ❌ BAD: No logging enabled
set security policies from-zone dmz to-zone untrust policy DMZ-OUT match source-address any
set security policies from-zone dmz to-zone untrust policy DMZ-OUT match destination-address any
set security policies from-zone dmz to-zone untrust policy DMZ-OUT then permit
# ❌ BAD: Overly permissive IoT access
set security policies from-zone IOT to-zone HOME policy IOT-ACCESS match source-address any
set security policies from-zone IOT to-zone HOME policy IOT-ACCESS match destination-address any
set security policies from-zone IOT to-zone HOME policy IOT-ACCESS match application any
set security policies from-zone IOT to-zone HOME policy IOT-ACCESS then permit"""
pr_data = {
'pr_number': f'TEST-{datetime.now().strftime("%Y%m%d-%H%M%S")}',
'title': 'TEST: AI Network Optimization for Feedback Testing',
'description': 'This is a test PR with intentionally problematic config to test feedback learning',
'suggestions': bad_suggestions,
'timestamp': datetime.now().isoformat(),
'test_pr': True,
'expected_rejection_reasons': [
'Any/any/any rule detected',
'No logging enabled',
'IoT to HOME unrestricted access'
]
}
# Save test PR
pr_file = self.pending_prs_dir / f"test_pr_{pr_data['pr_number']}.json"
self.pending_prs_dir.mkdir(parents=True, exist_ok=True)
with open(pr_file, 'w') as f:
json.dump(pr_data, f, indent=2)
print(f"✅ Created test PR: {pr_file.name}")
print("\n📋 Problematic configurations included:")
for reason in pr_data['expected_rejection_reasons']:
print(f"{reason}")
return pr_data
def step2_simulate_pr_rejection(self, pr_data):
"""Simulate rejecting the PR with specific feedback"""
print("\n" + "="*60)
print("STEP 2: Simulating PR Rejection with Feedback")
print("="*60)
rejection_feedback = {
'pr_number': pr_data['pr_number'],
'timestamp': datetime.now().isoformat(),
'feedback_type': 'rejected',
'reviewer': 'security_team',
'details': {
'reason': 'Security policy violations detected',
'specific_issues': 'Multiple any/any/any rules found which violate zero-trust principles',
'configuration_issues': [
{
'line': 'policy ALLOW-ALL match source-address any',
'issue': 'Never use source-address any in permit rules',
'type': 'security',
'severity': 'critical'
},
{
'line': 'policy DMZ-OUT then permit',
'issue': 'No logging enabled for DMZ traffic',
'type': 'security',
'severity': 'high'
},
{
'line': 'from-zone IOT to-zone HOME',
'issue': 'IoT devices should never have unrestricted access to HOME zone',
'type': 'security',
'severity': 'critical'
}
],
'recommendations': [
'Use specific address-sets instead of any',
'Always enable logging with "then log session-init"',
'IoT devices should only access specific services, not entire zones',
'Implement proper zone segmentation'
]
}
}
print(f"📝 PR Number: {rejection_feedback['pr_number']}")
print(f"❌ Status: REJECTED")
print(f"👤 Reviewer: {rejection_feedback['reviewer']}")
print(f"\n📋 Issues identified:")
for issue in rejection_feedback['details']['configuration_issues']:
print(f"{issue['issue']}")
print(f" Severity: {issue['severity'].upper()}")
return rejection_feedback
def step3_save_feedback(self, feedback):
"""Save feedback to the feedback history file"""
print("\n" + "="*60)
print("STEP 3: Saving Feedback to History")
print("="*60)
feedback_file = self.feedback_dir / 'pr_feedback_history.json'
# Load existing feedback if exists
if feedback_file.exists():
with open(feedback_file, 'r') as f:
feedback_history = json.load(f)
print(f"📂 Loaded existing feedback history ({len(feedback_history)} entries)")
else:
feedback_history = []
print("📂 Creating new feedback history")
# Add new feedback
feedback_history.append(feedback)
# Save updated history
with open(feedback_file, 'w') as f:
json.dump(feedback_history, f, indent=2)
print(f"✅ Saved feedback to: {feedback_file}")
print(f"📊 Total feedback entries: {len(feedback_history)}")
# Count types
rejected = len([f for f in feedback_history if f.get('feedback_type') == 'rejected'])
approved = len([f for f in feedback_history if f.get('feedback_type') == 'approved'])
print(f" • Rejected: {rejected}")
print(f" • Approved: {approved}")
return feedback_file
def step4_trigger_new_ai_request(self):
"""Create a new AI request to test if it learned from feedback"""
print("\n" + "="*60)
print("STEP 4: Creating New AI Request to Test Learning")
print("="*60)
# Create a new request that should avoid the rejected patterns
test_request = {
'request_id': f'feedback_test_{datetime.now().strftime("%Y%m%d_%H%M%S")}',
'timestamp': datetime.now().isoformat(),
'type': 'feedback_test',
'data': {
'message': 'Testing if AI learned from rejection feedback',
'zones_to_configure': ['IOT', 'HOME', 'DMZ'],
'requirements': [
'Configure IoT to HOME access',
'Configure DMZ outbound rules',
'Ensure security best practices'
]
},
'context': {
'test_feedback_learning': True,
'previous_rejection': True
}
}
request_file = self.shared_dir / 'requests' / f"{test_request['request_id']}.json"
request_file.parent.mkdir(parents=True, exist_ok=True)
with open(request_file, 'w') as f:
json.dump(test_request, f, indent=2)
print(f"✅ Created test request: {request_file.name}")
print(f" Request ID: {test_request['request_id']}")
print("\n🎯 This request specifically asks for:")
for req in test_request['data']['requirements']:
print(f"{req}")
print("\n⏳ AI should now avoid the mistakes from the rejection...")
return test_request['request_id']
def step5_wait_and_verify_learning(self, request_id, timeout=150):
"""Wait for AI response and verify it learned from feedback"""
print("\n" + "="*60)
print("STEP 5: Waiting for AI Response and Verifying Learning")
print("="*60)
response_file = self.responses_dir / f"{request_id}_response.json"
start_time = time.time()
print(f"⏳ Waiting for AI response (timeout: {timeout}s)...")
# Wait for response
while time.time() - start_time < timeout:
if response_file.exists():
print(f"✅ Response received after {int(time.time() - start_time)} seconds")
break
if int(time.time() - start_time) % 20 == 0 and time.time() - start_time > 0:
print(f" ... still waiting ({int(time.time() - start_time)}s elapsed)")
time.sleep(2)
else:
print(f"❌ Timeout waiting for response")
return False
# Analyze response
with open(response_file, 'r') as f:
response = json.load(f)
print(f"\n📋 AI Response Analysis:")
print(f" Model: {response.get('model')}")
print(f" Feedback aware: {response.get('feedback_aware')}")
suggestions = response.get('suggestions', '')
# Check if AI avoided the mistakes
print("\n🔍 Checking if AI learned from feedback:")
learned_correctly = True
checks = [
('source-address any', 'Still using "any" in source-address', False),
('destination-address any', 'Still using "any" in destination-address', False),
('application any', 'Still using "any" in application', False),
('then log', 'Now includes logging', True),
('address-set', 'Uses address-sets', True),
('specific', 'Uses specific addresses/applications', True)
]
for pattern, description, should_exist in checks:
if should_exist:
if pattern in suggestions.lower():
print(f" ✅ LEARNED: {description}")
else:
print(f" ❌ NOT LEARNED: {description}")
learned_correctly = False
else:
if pattern not in suggestions.lower():
print(f" ✅ AVOIDED: Not {description}")
else:
print(f" ❌ MISTAKE: {description}")
learned_correctly = False
# Show sample of new suggestions
print("\n📝 Sample of new AI suggestions:")
print("-" * 50)
for line in suggestions.split('\n')[:10]:
if line.strip():
print(f" {line}")
print("-" * 50)
return learned_correctly
def run_complete_test(self):
"""Run the complete feedback loop test"""
print("\n" + "="*70)
print(" 🔄 COMPLETE FEEDBACK LOOP TEST")
print("="*70)
print("\nThis test will:")
print("1. Create a PR with intentionally bad config")
print("2. Simulate rejection with specific feedback")
print("3. Save feedback for AI learning")
print("4. Create new request to test learning")
print("5. Verify AI avoided previous mistakes")
input("\nPress Enter to start the test...")
# Run all steps
pr_data = self.step1_create_test_pr_data()
feedback = self.step2_simulate_pr_rejection(pr_data)
self.step3_save_feedback(feedback)
request_id = self.step4_trigger_new_ai_request()
learned = self.step5_wait_and_verify_learning(request_id)
# Final summary
print("\n" + "="*70)
print(" 📊 FEEDBACK LOOP TEST RESULTS")
print("="*70)
if learned:
print("\n🎉 SUCCESS! The AI learned from the rejection feedback!")
print("\nThe AI now:")
print(" ✅ Avoids any/any/any rules")
print(" ✅ Includes logging in policies")
print(" ✅ Uses specific address-sets")
print(" ✅ Implements proper zone segmentation")
else:
print("\n⚠️ PARTIAL SUCCESS - AI needs more training")
print("\nRecommendations:")
print(" • Add more rejected examples")
print(" • Adjust the prompt in ai_processor.py")
print(" • Consider using a larger model")
print("\n📁 Files created during test:")
print(f" • Test PR: {self.pending_prs_dir}/test_pr_*.json")
print(f" • Feedback: {self.feedback_dir}/pr_feedback_history.json")
print(f" • AI Response: {self.responses_dir}/{request_id}_response.json")
return learned
def main():
"""Main entry point"""
tester = FeedbackLoopTester()
# Check if AI processor is running
result = subprocess.run(['pgrep', '-f', 'ai_processor.py'],
capture_output=True, text=True)
if not result.stdout:
print("⚠️ AI processor not running on AI VM")
print(" Start it with: ssh netops@INTERNAL_IP")
print(" Then: sudo systemctl start ai-processor")
response = input("\nContinue anyway? (y/n): ")
if response.lower() != 'y':
return
# Run the test
success = tester.run_complete_test()
if success:
print("\n✅ Your feedback learning system is working correctly!")
else:
print("\n⚠️ Review the feedback and adjust as needed")
if __name__ == "__main__":
main()

View File

@@ -1,90 +0,0 @@
#!/usr/bin/env python3
"""Test Git authentication for Gitea"""
import subprocess
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Your Gitea configuration
GITEA_URL = "https://git.salmutt.dev"
GITEA_TOKEN = "da3be18aad877edb94e896e6c1e7c449581444420"
REPO = "sal/srx-config"
def test_auth_methods():
"""Test different authentication methods"""
git_url = f"{GITEA_URL}/{REPO}.git"
# Method 1: oauth2 format
print("\n1. Testing oauth2 authentication format...")
auth_url = f"https://oauth2:{GITEA_TOKEN}@git.salmutt.dev/{REPO}.git"
try:
result = subprocess.run(
['git', 'ls-remote', auth_url, 'HEAD'],
capture_output=True,
text=True
)
if result.returncode == 0:
print("✅ OAuth2 authentication successful!")
print(f" HEAD: {result.stdout.strip()}")
return True
else:
print("❌ OAuth2 authentication failed")
print(f" Error: {result.stderr}")
except Exception as e:
print(f"❌ OAuth2 test error: {e}")
# Method 2: Direct token format
print("\n2. Testing direct token authentication...")
auth_url = f"https://{GITEA_TOKEN}@git.salmutt.dev/{REPO}.git"
try:
result = subprocess.run(
['git', 'ls-remote', auth_url, 'HEAD'],
capture_output=True,
text=True
)
if result.returncode == 0:
print("✅ Direct token authentication successful!")
print(f" HEAD: {result.stdout.strip()}")
return True
else:
print("❌ Direct token authentication failed")
print(f" Error: {result.stderr}")
except Exception as e:
print(f"❌ Direct token test error: {e}")
# Method 3: Username:token format (using 'git' as username)
print("\n3. Testing username:token format...")
auth_url = f"https://git:{GITEA_TOKEN}@git.salmutt.dev/{REPO}.git"
try:
result = subprocess.run(
['git', 'ls-remote', auth_url, 'HEAD'],
capture_output=True,
text=True
)
if result.returncode == 0:
print("✅ Username:token authentication successful!")
print(f" HEAD: {result.stdout.strip()}")
return True
else:
print("❌ Username:token authentication failed")
print(f" Error: {result.stderr}")
except Exception as e:
print(f"❌ Username:token test error: {e}")
return False
if __name__ == "__main__":
print("Testing Gitea authentication methods...")
print(f"Repository: {REPO}")
print(f"Token: {GITEA_TOKEN[:10]}..." + "*" * (len(GITEA_TOKEN) - 10))
if test_auth_methods():
print("\n✅ At least one authentication method works!")
else:
print("\n❌ All authentication methods failed")
print("\nPlease verify:")
print("1. The token is correct and has appropriate permissions")
print("2. The repository exists and is accessible")
print("3. Network connectivity to Gitea is working")

View File

@@ -1,26 +0,0 @@
import yaml
from gitea_integration import GiteaIntegration
# Load your config
with open('/home/netops/orchestrator/config.yaml', 'r') as f:
config = yaml.safe_load(f)
# Test PR creation
gitea = GiteaIntegration(config['gitea'])
# Test with sample config
test_config = """# Test configuration
set security zones security-zone DMZ address 192.168.50.0/24
set applications application TEST-APP destination-port 8080"""
pr_info = gitea.create_pr_with_config(
srx_config=test_config,
title="Test PR - Please Close",
description="This is a test PR to verify Gitea integration"
)
if pr_info:
print(f"Success! Created PR #{pr_info['number']}")
print(f"URL: {pr_info['url']}")
else:
print("Failed to create PR")

View File

@@ -1,11 +0,0 @@
#!/usr/bin/env python3
import sys
sys.path.append('/home/netops/orchestrator')
from orchestrator_main import NetworkOrchestrator
from datetime import datetime
orch = NetworkOrchestrator()
print(f"Current time: {datetime.now()}")
print(f"Current day: {datetime.now().strftime('%A')}")
print(f"Should create PR: {orch.should_create_pr()}")
print(f"State: {orch.load_state()}")

View File

@@ -1,47 +0,0 @@
#!/usr/bin/env python3
"""
Create a simple test request for the AI processor
Run this on the orchestrator VM to create a test request
"""
import json
from datetime import datetime
from pathlib import Path
print("Creating test request for AI processor...")
# Create test request
request_data = {
'request_id': f'test_{datetime.now().strftime("%Y%m%d_%H%M%S")}',
'timestamp': datetime.now().isoformat(),
'type': 'test_request',
'data': {
'top_talkers': {
'buckets': [
{'key': 'INTERNAL_IP', 'doc_count': 1000},
{'key': '192.168.10.100', 'doc_count': 500},
{'key': '10.0.1.25', 'doc_count': 250}
]
},
'message': 'This is a test request to verify AI processor is working'
},
'context': {
'config_available': True,
'analysis_available': True,
'test': True
}
}
# Save request
request_dir = Path('/shared/ai-gitops/requests')
request_dir.mkdir(parents=True, exist_ok=True)
request_file = request_dir / f"{request_data['request_id']}.json"
with open(request_file, 'w') as f:
json.dump(request_data, f, indent=2)
print(f"✅ Test request created: {request_file}")
print(f" Request ID: {request_data['request_id']}")
print("\nNow check if AI processor picks it up:")
print(" 1. Wait 10-20 seconds")
print(" 2. Check for response: ls -la /shared/ai-gitops/responses/")
print(f" 3. Look for: {request_data['request_id']}_response.json")

View File

@@ -1,54 +0,0 @@
#!/usr/bin/env python3
import subprocess
import tempfile
import os
TOKEN = "da3be18aad877edb94e896e6c1e7c449581444420"
REPO_URL = f"https://oauth2:{TOKEN}@git.salmutt.dev/sal/srx-config.git"
print("Testing simple git push...")
with tempfile.TemporaryDirectory() as tmpdir:
print(f"Working in: {tmpdir}")
# Clone
print("1. Cloning...")
result = subprocess.run(['git', 'clone', '--depth', '1', REPO_URL, '.'],
cwd=tmpdir, capture_output=True, text=True)
if result.returncode != 0:
print(f"Clone failed: {result.stderr}")
exit(1)
print("✅ Clone successful")
# Configure git
subprocess.run(['git', 'config', 'user.email', 'test@example.com'], cwd=tmpdir)
subprocess.run(['git', 'config', 'user.name', 'Test User'], cwd=tmpdir)
# Set push URL explicitly
print("2. Setting push URL...")
subprocess.run(['git', 'remote', 'set-url', 'origin', REPO_URL], cwd=tmpdir)
# Create test branch
test_branch = "test-push-permissions"
print(f"3. Creating branch {test_branch}...")
subprocess.run(['git', 'checkout', '-b', test_branch], cwd=tmpdir)
# Create a test file
test_file = os.path.join(tmpdir, 'test-permissions.txt')
with open(test_file, 'w') as f:
f.write("Testing push permissions\n")
# Add and commit
subprocess.run(['git', 'add', '.'], cwd=tmpdir)
subprocess.run(['git', 'commit', '-m', 'Test push permissions'], cwd=tmpdir)
# Try to push
print("4. Attempting push...")
result = subprocess.run(['git', 'push', '-u', 'origin', test_branch],
cwd=tmpdir, capture_output=True, text=True)
if result.returncode == 0:
print("✅ Push successful! Token has write permissions.")
print(" You may want to delete the test branch from Gitea")
else:
print(f"❌ Push failed: {result.stderr}")

View File

@@ -1,118 +0,0 @@
#!/usr/bin/env python3
"""
Simple test script to verify the split architecture communication
Run this on Orchestrator VM to test AI processing
"""
import json
import time
from pathlib import Path
from datetime import datetime
REQUEST_DIR = Path("/shared/ai-gitops/requests")
RESPONSE_DIR = Path("/shared/ai-gitops/responses")
def test_ai_communication():
"""Test basic communication with AI VM"""
print("Testing Split Architecture Communication")
print("=" * 50)
# Test 1: Simple message
print("\nTest 1: Simple message exchange")
request_id = f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
request_file = REQUEST_DIR / f"{request_id}.json"
request_data = {
"type": "test",
"message": "Hello AI, please confirm you received this",
"timestamp": datetime.now().isoformat()
}
print(f"Sending request: {request_id}")
with open(request_file, 'w') as f:
json.dump(request_data, f, indent=2)
# Wait for response
response_file = RESPONSE_DIR / f"{request_id}.json"
print("Waiting for response...", end="")
for i in range(30): # Wait up to 30 seconds
if response_file.exists():
print(" Received!")
with open(response_file, 'r') as f:
response = json.load(f)
print(f"Response: {json.dumps(response, indent=2)}")
response_file.unlink()
break
print(".", end="", flush=True)
time.sleep(1)
else:
print(" Timeout!")
return False
# Test 2: Traffic analysis request
print("\nTest 2: Simulated traffic analysis")
request_id = f"analyze_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
request_file = REQUEST_DIR / f"{request_id}.json"
request_data = {
"type": "analyze_traffic",
"traffic_data": {
"top_sources": [
{"ip": "INTERNAL_IP", "bytes": 1000000},
{"ip": "INTERNAL_IP", "bytes": 500000}
],
"top_ports": [443, 80, 22, 3389]
},
"srx_context": {
"zones": ["trust", "untrust", "dmz"],
"rule_count": 15
}
}
print(f"Sending analysis request: {request_id}")
with open(request_file, 'w') as f:
json.dump(request_data, f, indent=2)
# Wait for response
response_file = RESPONSE_DIR / f"{request_id}.json"
print("Waiting for AI analysis...", end="")
for i in range(60): # Wait up to 60 seconds for analysis
if response_file.exists():
print(" Received!")
with open(response_file, 'r') as f:
response = json.load(f)
if response.get("status") == "success":
print("AI Analysis completed successfully!")
print(f"Analysis preview: {response.get('analysis', '')[:200]}...")
else:
print(f"Analysis failed: {response}")
response_file.unlink()
break
print(".", end="", flush=True)
time.sleep(1)
else:
print(" Timeout!")
return False
print("\n" + "=" * 50)
print("All tests completed successfully!")
return True
if __name__ == "__main__":
# Make sure AI processor is running on AI VM first
print("Make sure ai_processor.py is running on the AI VM!")
print("Press Ctrl+C to cancel, or Enter to continue...")
try:
input()
except KeyboardInterrupt:
print("\nCancelled")
exit(1)
if test_ai_communication():
print("\n✅ Split architecture is working correctly!")
else:
print("\n❌ Communication test failed")

View File

@@ -1,34 +0,0 @@
#!/usr/bin/env python3
import json
from pathlib import Path
# Get latest response
responses = sorted(Path('/shared/ai-gitops/responses').glob('pipeline_*.json'),
key=lambda x: x.stat().st_mtime, reverse=True)
if responses:
with open(responses[0], 'r') as f:
data = json.load(f)
print(f"Checking: {responses[0].name}")
print(f"Feedback aware: {data.get('feedback_aware')}")
suggestions = data.get('suggestions', '')
# Count issues
dest_any_count = suggestions.count('destination-address any')
src_any_count = suggestions.count('source-address any')
app_any_count = suggestions.count('application any')
log_count = suggestions.count('then log')
print(f"\n📊 Analysis:")
print(f" source-address any: {src_any_count} {'' if src_any_count else ''}")
print(f" destination-address any: {dest_any_count} {'' if dest_any_count else ''}")
print(f" application any: {app_any_count} {'' if app_any_count else ''}")
print(f" logging statements: {log_count} {'' if log_count else ''}")
if dest_any_count > 0:
print(f"\n⚠️ AI still needs to learn about destination-address!")
print(" Add more feedback and update the prompt")
else:
print(f"\n✅ AI has learned to avoid any/any/any!")