Files

378 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Gitea PR Creation and Feedback Handler
Creates real PRs in Gitea and handles rejection feedback
"""
import os
import sys
import json
import yaml
import requests
from datetime import datetime
from pathlib import Path
import subprocess
class GiteaPRManager:
def __init__(self, config_path='/home/netops/orchestrator/config.yaml'):
"""Initialize with Gitea configuration"""
# Load config
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.gitea_config = self.config.get('gitea', {})
self.base_url = self.gitea_config.get('url', 'http://localhost:3000')
self.token = self.gitea_config.get('token', '')
self.repo_owner = self.gitea_config.get('owner', 'netops')
self.repo_name = self.gitea_config.get('repo', 'srx-config')
self.headers = {
'Authorization': f'token {self.token}',
'Content-Type': 'application/json'
}
self.pending_prs_dir = Path('/shared/ai-gitops/pending_prs')
self.feedback_dir = Path('/shared/ai-gitops/feedback')
def create_pr_from_ai_suggestions(self, pr_file=None):
"""Create a PR in Gitea from AI suggestions"""
print("\n" + "="*60)
print("Creating Gitea PR from AI Suggestions")
print("="*60)
# Find latest PR file if not specified
if pr_file is None:
pr_files = sorted(self.pending_prs_dir.glob('pr_*.json'),
key=lambda x: x.stat().st_mtime, reverse=True)
if not pr_files:
print("❌ No pending PR files found")
return None
pr_file = pr_files[0]
print(f"📄 Using PR file: {pr_file.name}")
# Load PR data
with open(pr_file, 'r') as f:
pr_data = json.load(f)
# Create a new branch
branch_name = f"ai-suggestions-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
# Create the configuration file content
config_content = f"""# AI-Generated Network Configuration
# Generated: {pr_data.get('timestamp', datetime.now().isoformat())}
# Model: {pr_data.get('model', 'llama2:13b')}
# Feedback Aware: {pr_data.get('feedback_aware', False)}
{pr_data.get('suggestions', '')}
"""
# Create branch and file via Gitea API
try:
# First, get the default branch SHA
repo_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}"
repo_response = requests.get(repo_url, headers=self.headers)
if repo_response.status_code != 200:
print(f"❌ Failed to get repo info: {repo_response.status_code}")
print(f" Response: {repo_response.text}")
return None
default_branch = repo_response.json().get('default_branch', 'main')
# Get the SHA of the default branch
branch_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/branches/{default_branch}"
branch_response = requests.get(branch_url, headers=self.headers)
if branch_response.status_code != 200:
print(f"❌ Failed to get branch info: {branch_response.status_code}")
return None
base_sha = branch_response.json()['commit']['id']
# Create new branch
create_branch_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/branches"
branch_data = {
'new_branch_name': branch_name,
'old_branch_name': default_branch
}
branch_create = requests.post(create_branch_url,
headers=self.headers,
json=branch_data)
if branch_create.status_code not in [201, 200]:
print(f"❌ Failed to create branch: {branch_create.status_code}")
print(f" Response: {branch_create.text}")
return None
print(f"✅ Created branch: {branch_name}")
# Create or update file in the new branch
file_path = f"ai-suggestions/config_{datetime.now().strftime('%Y%m%d_%H%M%S')}.conf"
file_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/contents/{file_path}"
import base64
file_data = {
'branch': branch_name,
'content': base64.b64encode(config_content.encode()).decode(),
'message': f"AI suggestions: {pr_data.get('title', 'Network optimization')}"
}
file_response = requests.post(file_url, headers=self.headers, json=file_data)
if file_response.status_code not in [201, 200]:
print(f"⚠️ Could not create file via API, trying alternative method")
else:
print(f"✅ Created config file: {file_path}")
# Create Pull Request
pr_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/pulls"
pr_body = f"""## AI-Generated Network Configuration
### Analysis Context
- **Zones Analyzed**: {', '.join(pr_data.get('network_context', {}).get('zones', []))}
- **Policies Reviewed**: {pr_data.get('network_context', {}).get('policies', 0)}
- **Feedback Aware**: {pr_data.get('feedback_aware', False)}
### Suggested Changes
```junos
{pr_data.get('suggestions', '')}
```
### Review Checklist
- [ ] No any/any/any rules
- [ ] Logging enabled on all policies
- [ ] Proper zone segmentation
- [ ] Address-sets used instead of individual IPs
- [ ] Applications are specific (not "any")
### How to Test
1. Apply to lab SRX first
2. Verify traffic flow
3. Check logs for any issues
4. Apply to production if tests pass
---
*This PR was automatically generated by the AI Network Automation system*
"""
pr_request = {
'title': pr_data.get('title', 'AI Network Configuration Suggestions'),
'head': branch_name,
'base': default_branch,
'body': pr_body
}
pr_response = requests.post(pr_url, headers=self.headers, json=pr_request)
if pr_response.status_code == 201:
pr_info = pr_response.json()
pr_number = pr_info['number']
pr_html_url = pr_info['html_url']
print(f"\n✅ Pull Request created successfully!")
print(f" PR Number: #{pr_number}")
print(f" URL: {pr_html_url}")
# Save PR info for tracking
pr_tracking = {
'pr_number': pr_number,
'pr_url': pr_html_url,
'branch': branch_name,
'created_at': datetime.now().isoformat(),
'ai_request_id': pr_data.get('request_id'),
'suggestions_file': str(pr_file)
}
tracking_file = self.pending_prs_dir / f"gitea_pr_{pr_number}.json"
with open(tracking_file, 'w') as f:
json.dump(pr_tracking, f, indent=2)
return pr_number
else:
print(f"❌ Failed to create PR: {pr_response.status_code}")
print(f" Response: {pr_response.text}")
return None
except Exception as e:
print(f"❌ Error creating PR: {e}")
return None
def reject_pr_with_feedback(self, pr_number, feedback_message):
"""Reject a PR and save feedback for AI learning"""
print("\n" + "="*60)
print(f"Rejecting PR #{pr_number} with Feedback")
print("="*60)
# Close the PR via API
pr_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/pulls/{pr_number}"
# Add comment with feedback
comment_url = f"{pr_url}/reviews"
comment_data = {
'body': feedback_message,
'event': 'REJECT' # or 'REQUEST_CHANGES'
}
comment_response = requests.post(comment_url, headers=self.headers, json=comment_data)
if comment_response.status_code not in [200, 201]:
# Try alternative: just add a comment
issue_comment_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/issues/{pr_number}/comments"
comment_data = {
'body': f"❌ **REJECTED**\n\n{feedback_message}"
}
requests.post(issue_comment_url, headers=self.headers, json=comment_data)
# Close the PR
close_data = {
'state': 'closed'
}
close_response = requests.patch(pr_url, headers=self.headers, json=close_data)
if close_response.status_code == 200:
print(f"✅ PR #{pr_number} closed")
else:
print(f"⚠️ Could not close PR via API")
# Save feedback for AI learning
feedback_entry = {
'pr_number': pr_number,
'timestamp': datetime.now().isoformat(),
'feedback_type': 'rejected',
'reviewer': 'security_team',
'details': {
'reason': feedback_message,
'specific_issues': self.parse_feedback_for_issues(feedback_message)
}
}
# Load and update feedback history
feedback_file = self.feedback_dir / 'pr_feedback_history.json'
self.feedback_dir.mkdir(parents=True, exist_ok=True)
if feedback_file.exists():
with open(feedback_file, 'r') as f:
history = json.load(f)
else:
history = []
history.append(feedback_entry)
with open(feedback_file, 'w') as f:
json.dump(history, f, indent=2)
print(f"✅ Feedback saved for AI learning")
print(f" Total feedback entries: {len(history)}")
return feedback_entry
def parse_feedback_for_issues(self, feedback_text):
"""Parse feedback text to extract specific issues"""
issues = []
# Common security issues to look for
patterns = [
('any/any/any', 'Never use any/any/any rules'),
('no logging', 'Always enable logging'),
('source-address any', 'Avoid using source-address any'),
('destination-address any', 'Avoid using destination-address any'),
('application any', 'Specify applications instead of any'),
('overly permissive', 'Rules are too permissive'),
('zone segmentation', 'Improper zone segmentation'),
('iot', 'IoT security concerns')
]
feedback_lower = feedback_text.lower()
for pattern, description in patterns:
if pattern in feedback_lower:
issues.append({
'pattern': pattern,
'description': description,
'type': 'security'
})
return issues if issues else feedback_text
def main():
"""Main entry point for testing"""
print("\n" + "="*60)
print(" GITEA PR FEEDBACK TESTING")
print("="*60)
manager = GiteaPRManager()
print("\nOptions:")
print("1. Create a new PR from latest AI suggestions")
print("2. Reject a PR with feedback")
print("3. Run complete test cycle")
choice = input("\nSelect option (1-3): ")
if choice == '1':
pr_number = manager.create_pr_from_ai_suggestions()
if pr_number:
print(f"\n✅ Successfully created PR #{pr_number}")
print("\nYou can now:")
print(f"1. Review it in Gitea")
print(f"2. Reject it with: python3 gitea_pr_feedback.py")
elif choice == '2':
pr_number = input("Enter PR number to reject: ")
print("\nEnter rejection feedback (press Ctrl+D when done):")
feedback_lines = []
try:
while True:
feedback_lines.append(input())
except EOFError:
pass
feedback = '\n'.join(feedback_lines)
if not feedback:
feedback = """This configuration has security issues:
1. Any/any/any rules detected - this violates zero-trust principles
2. No logging enabled on some policies
3. Overly permissive access between zones
Please revise to:
- Use specific address-sets
- Enable logging on all policies
- Implement proper zone segmentation"""
manager.reject_pr_with_feedback(pr_number, feedback)
elif choice == '3':
print("\n📋 Complete test cycle:")
print("1. Creating PR from AI suggestions...")
pr_number = manager.create_pr_from_ai_suggestions()
if pr_number:
print(f"\n2. Waiting for review...")
input(" Press Enter to simulate rejection...")
feedback = """Security Review Failed:
❌ Critical Issues Found:
- Any/any/any rule in policy ALLOW-ALL
- No logging on DMZ policies
- IoT zone has unrestricted access to HOME zone
Requirements:
- All policies must use specific addresses
- Logging must be enabled
- IoT devices need strict access control
"""
print("\n3. Rejecting PR with feedback...")
manager.reject_pr_with_feedback(pr_number, feedback)
print("\n4. AI will learn from this feedback in next run")
print(" Run: python3 run_pipeline.py --skip-netflow")
print(" The AI should avoid these mistakes next time!")
if __name__ == "__main__":
main()