#!/usr/bin/env python3 """ Gitea PR Creation and Feedback Handler Creates real PRs in Gitea and handles rejection feedback """ import os import sys import json import yaml import requests from datetime import datetime from pathlib import Path import subprocess class GiteaPRManager: def __init__(self, config_path='/home/netops/orchestrator/config.yaml'): """Initialize with Gitea configuration""" # Load config with open(config_path, 'r') as f: self.config = yaml.safe_load(f) self.gitea_config = self.config.get('gitea', {}) self.base_url = self.gitea_config.get('url', 'http://localhost:3000') self.token = self.gitea_config.get('token', '') self.repo_owner = self.gitea_config.get('owner', 'netops') self.repo_name = self.gitea_config.get('repo', 'srx-config') self.headers = { 'Authorization': f'token {self.token}', 'Content-Type': 'application/json' } self.pending_prs_dir = Path('/shared/ai-gitops/pending_prs') self.feedback_dir = Path('/shared/ai-gitops/feedback') def create_pr_from_ai_suggestions(self, pr_file=None): """Create a PR in Gitea from AI suggestions""" print("\n" + "="*60) print("Creating Gitea PR from AI Suggestions") print("="*60) # Find latest PR file if not specified if pr_file is None: pr_files = sorted(self.pending_prs_dir.glob('pr_*.json'), key=lambda x: x.stat().st_mtime, reverse=True) if not pr_files: print("āŒ No pending PR files found") return None pr_file = pr_files[0] print(f"šŸ“„ Using PR file: {pr_file.name}") # Load PR data with open(pr_file, 'r') as f: pr_data = json.load(f) # Create a new branch branch_name = f"ai-suggestions-{datetime.now().strftime('%Y%m%d-%H%M%S')}" # Create the configuration file content config_content = f"""# AI-Generated Network Configuration # Generated: {pr_data.get('timestamp', datetime.now().isoformat())} # Model: {pr_data.get('model', 'llama2:13b')} # Feedback Aware: {pr_data.get('feedback_aware', False)} {pr_data.get('suggestions', '')} """ # Create branch and file via Gitea API try: # First, get the default branch SHA repo_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}" repo_response = requests.get(repo_url, headers=self.headers) if repo_response.status_code != 200: print(f"āŒ Failed to get repo info: {repo_response.status_code}") print(f" Response: {repo_response.text}") return None default_branch = repo_response.json().get('default_branch', 'main') # Get the SHA of the default branch branch_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/branches/{default_branch}" branch_response = requests.get(branch_url, headers=self.headers) if branch_response.status_code != 200: print(f"āŒ Failed to get branch info: {branch_response.status_code}") return None base_sha = branch_response.json()['commit']['id'] # Create new branch create_branch_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/branches" branch_data = { 'new_branch_name': branch_name, 'old_branch_name': default_branch } branch_create = requests.post(create_branch_url, headers=self.headers, json=branch_data) if branch_create.status_code not in [201, 200]: print(f"āŒ Failed to create branch: {branch_create.status_code}") print(f" Response: {branch_create.text}") return None print(f"āœ… Created branch: {branch_name}") # Create or update file in the new branch file_path = f"ai-suggestions/config_{datetime.now().strftime('%Y%m%d_%H%M%S')}.conf" file_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/contents/{file_path}" import base64 file_data = { 'branch': branch_name, 'content': base64.b64encode(config_content.encode()).decode(), 'message': f"AI suggestions: {pr_data.get('title', 'Network optimization')}" } file_response = requests.post(file_url, headers=self.headers, json=file_data) if file_response.status_code not in [201, 200]: print(f"āš ļø Could not create file via API, trying alternative method") else: print(f"āœ… Created config file: {file_path}") # Create Pull Request pr_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/pulls" pr_body = f"""## AI-Generated Network Configuration ### Analysis Context - **Zones Analyzed**: {', '.join(pr_data.get('network_context', {}).get('zones', []))} - **Policies Reviewed**: {pr_data.get('network_context', {}).get('policies', 0)} - **Feedback Aware**: {pr_data.get('feedback_aware', False)} ### Suggested Changes ```junos {pr_data.get('suggestions', '')} ``` ### Review Checklist - [ ] No any/any/any rules - [ ] Logging enabled on all policies - [ ] Proper zone segmentation - [ ] Address-sets used instead of individual IPs - [ ] Applications are specific (not "any") ### How to Test 1. Apply to lab SRX first 2. Verify traffic flow 3. Check logs for any issues 4. Apply to production if tests pass --- *This PR was automatically generated by the AI Network Automation system* """ pr_request = { 'title': pr_data.get('title', 'AI Network Configuration Suggestions'), 'head': branch_name, 'base': default_branch, 'body': pr_body } pr_response = requests.post(pr_url, headers=self.headers, json=pr_request) if pr_response.status_code == 201: pr_info = pr_response.json() pr_number = pr_info['number'] pr_html_url = pr_info['html_url'] print(f"\nāœ… Pull Request created successfully!") print(f" PR Number: #{pr_number}") print(f" URL: {pr_html_url}") # Save PR info for tracking pr_tracking = { 'pr_number': pr_number, 'pr_url': pr_html_url, 'branch': branch_name, 'created_at': datetime.now().isoformat(), 'ai_request_id': pr_data.get('request_id'), 'suggestions_file': str(pr_file) } tracking_file = self.pending_prs_dir / f"gitea_pr_{pr_number}.json" with open(tracking_file, 'w') as f: json.dump(pr_tracking, f, indent=2) return pr_number else: print(f"āŒ Failed to create PR: {pr_response.status_code}") print(f" Response: {pr_response.text}") return None except Exception as e: print(f"āŒ Error creating PR: {e}") return None def reject_pr_with_feedback(self, pr_number, feedback_message): """Reject a PR and save feedback for AI learning""" print("\n" + "="*60) print(f"Rejecting PR #{pr_number} with Feedback") print("="*60) # Close the PR via API pr_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/pulls/{pr_number}" # Add comment with feedback comment_url = f"{pr_url}/reviews" comment_data = { 'body': feedback_message, 'event': 'REJECT' # or 'REQUEST_CHANGES' } comment_response = requests.post(comment_url, headers=self.headers, json=comment_data) if comment_response.status_code not in [200, 201]: # Try alternative: just add a comment issue_comment_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/issues/{pr_number}/comments" comment_data = { 'body': f"āŒ **REJECTED**\n\n{feedback_message}" } requests.post(issue_comment_url, headers=self.headers, json=comment_data) # Close the PR close_data = { 'state': 'closed' } close_response = requests.patch(pr_url, headers=self.headers, json=close_data) if close_response.status_code == 200: print(f"āœ… PR #{pr_number} closed") else: print(f"āš ļø Could not close PR via API") # Save feedback for AI learning feedback_entry = { 'pr_number': pr_number, 'timestamp': datetime.now().isoformat(), 'feedback_type': 'rejected', 'reviewer': 'security_team', 'details': { 'reason': feedback_message, 'specific_issues': self.parse_feedback_for_issues(feedback_message) } } # Load and update feedback history feedback_file = self.feedback_dir / 'pr_feedback_history.json' self.feedback_dir.mkdir(parents=True, exist_ok=True) if feedback_file.exists(): with open(feedback_file, 'r') as f: history = json.load(f) else: history = [] history.append(feedback_entry) with open(feedback_file, 'w') as f: json.dump(history, f, indent=2) print(f"āœ… Feedback saved for AI learning") print(f" Total feedback entries: {len(history)}") return feedback_entry def parse_feedback_for_issues(self, feedback_text): """Parse feedback text to extract specific issues""" issues = [] # Common security issues to look for patterns = [ ('any/any/any', 'Never use any/any/any rules'), ('no logging', 'Always enable logging'), ('source-address any', 'Avoid using source-address any'), ('destination-address any', 'Avoid using destination-address any'), ('application any', 'Specify applications instead of any'), ('overly permissive', 'Rules are too permissive'), ('zone segmentation', 'Improper zone segmentation'), ('iot', 'IoT security concerns') ] feedback_lower = feedback_text.lower() for pattern, description in patterns: if pattern in feedback_lower: issues.append({ 'pattern': pattern, 'description': description, 'type': 'security' }) return issues if issues else feedback_text def main(): """Main entry point for testing""" print("\n" + "="*60) print(" GITEA PR FEEDBACK TESTING") print("="*60) manager = GiteaPRManager() print("\nOptions:") print("1. Create a new PR from latest AI suggestions") print("2. Reject a PR with feedback") print("3. Run complete test cycle") choice = input("\nSelect option (1-3): ") if choice == '1': pr_number = manager.create_pr_from_ai_suggestions() if pr_number: print(f"\nāœ… Successfully created PR #{pr_number}") print("\nYou can now:") print(f"1. Review it in Gitea") print(f"2. Reject it with: python3 gitea_pr_feedback.py") elif choice == '2': pr_number = input("Enter PR number to reject: ") print("\nEnter rejection feedback (press Ctrl+D when done):") feedback_lines = [] try: while True: feedback_lines.append(input()) except EOFError: pass feedback = '\n'.join(feedback_lines) if not feedback: feedback = """This configuration has security issues: 1. Any/any/any rules detected - this violates zero-trust principles 2. No logging enabled on some policies 3. Overly permissive access between zones Please revise to: - Use specific address-sets - Enable logging on all policies - Implement proper zone segmentation""" manager.reject_pr_with_feedback(pr_number, feedback) elif choice == '3': print("\nšŸ“‹ Complete test cycle:") print("1. Creating PR from AI suggestions...") pr_number = manager.create_pr_from_ai_suggestions() if pr_number: print(f"\n2. Waiting for review...") input(" Press Enter to simulate rejection...") feedback = """Security Review Failed: āŒ Critical Issues Found: - Any/any/any rule in policy ALLOW-ALL - No logging on DMZ policies - IoT zone has unrestricted access to HOME zone Requirements: - All policies must use specific addresses - Logging must be enabled - IoT devices need strict access control """ print("\n3. Rejecting PR with feedback...") manager.reject_pr_with_feedback(pr_number, feedback) print("\n4. AI will learn from this feedback in next run") print(" Run: python3 run_pipeline.py --skip-netflow") print(" The AI should avoid these mistakes next time!") if __name__ == "__main__": main()