378 lines
14 KiB
Python
Executable File
378 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Gitea PR Creation and Feedback Handler
|
|
Creates real PRs in Gitea and handles rejection feedback
|
|
"""
|
|
import os
|
|
import sys
|
|
import json
|
|
import yaml
|
|
import requests
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import subprocess
|
|
|
|
class GiteaPRManager:
|
|
def __init__(self, config_path='/home/netops/orchestrator/config.yaml'):
|
|
"""Initialize with Gitea configuration"""
|
|
# Load config
|
|
with open(config_path, 'r') as f:
|
|
self.config = yaml.safe_load(f)
|
|
|
|
self.gitea_config = self.config.get('gitea', {})
|
|
self.base_url = self.gitea_config.get('url', 'http://localhost:3000')
|
|
self.token = self.gitea_config.get('token', '')
|
|
self.repo_owner = self.gitea_config.get('owner', 'netops')
|
|
self.repo_name = self.gitea_config.get('repo', 'srx-config')
|
|
|
|
self.headers = {
|
|
'Authorization': f'token {self.token}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
self.pending_prs_dir = Path('/shared/ai-gitops/pending_prs')
|
|
self.feedback_dir = Path('/shared/ai-gitops/feedback')
|
|
|
|
def create_pr_from_ai_suggestions(self, pr_file=None):
|
|
"""Create a PR in Gitea from AI suggestions"""
|
|
print("\n" + "="*60)
|
|
print("Creating Gitea PR from AI Suggestions")
|
|
print("="*60)
|
|
|
|
# Find latest PR file if not specified
|
|
if pr_file is None:
|
|
pr_files = sorted(self.pending_prs_dir.glob('pr_*.json'),
|
|
key=lambda x: x.stat().st_mtime, reverse=True)
|
|
if not pr_files:
|
|
print("❌ No pending PR files found")
|
|
return None
|
|
pr_file = pr_files[0]
|
|
|
|
print(f"📄 Using PR file: {pr_file.name}")
|
|
|
|
# Load PR data
|
|
with open(pr_file, 'r') as f:
|
|
pr_data = json.load(f)
|
|
|
|
# Create a new branch
|
|
branch_name = f"ai-suggestions-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
|
|
|
|
# Create the configuration file content
|
|
config_content = f"""# AI-Generated Network Configuration
|
|
# Generated: {pr_data.get('timestamp', datetime.now().isoformat())}
|
|
# Model: {pr_data.get('model', 'llama2:13b')}
|
|
# Feedback Aware: {pr_data.get('feedback_aware', False)}
|
|
|
|
{pr_data.get('suggestions', '')}
|
|
"""
|
|
|
|
# Create branch and file via Gitea API
|
|
try:
|
|
# First, get the default branch SHA
|
|
repo_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}"
|
|
repo_response = requests.get(repo_url, headers=self.headers)
|
|
|
|
if repo_response.status_code != 200:
|
|
print(f"❌ Failed to get repo info: {repo_response.status_code}")
|
|
print(f" Response: {repo_response.text}")
|
|
return None
|
|
|
|
default_branch = repo_response.json().get('default_branch', 'main')
|
|
|
|
# Get the SHA of the default branch
|
|
branch_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/branches/{default_branch}"
|
|
branch_response = requests.get(branch_url, headers=self.headers)
|
|
|
|
if branch_response.status_code != 200:
|
|
print(f"❌ Failed to get branch info: {branch_response.status_code}")
|
|
return None
|
|
|
|
base_sha = branch_response.json()['commit']['id']
|
|
|
|
# Create new branch
|
|
create_branch_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/branches"
|
|
branch_data = {
|
|
'new_branch_name': branch_name,
|
|
'old_branch_name': default_branch
|
|
}
|
|
|
|
branch_create = requests.post(create_branch_url,
|
|
headers=self.headers,
|
|
json=branch_data)
|
|
|
|
if branch_create.status_code not in [201, 200]:
|
|
print(f"❌ Failed to create branch: {branch_create.status_code}")
|
|
print(f" Response: {branch_create.text}")
|
|
return None
|
|
|
|
print(f"✅ Created branch: {branch_name}")
|
|
|
|
# Create or update file in the new branch
|
|
file_path = f"ai-suggestions/config_{datetime.now().strftime('%Y%m%d_%H%M%S')}.conf"
|
|
file_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/contents/{file_path}"
|
|
|
|
import base64
|
|
file_data = {
|
|
'branch': branch_name,
|
|
'content': base64.b64encode(config_content.encode()).decode(),
|
|
'message': f"AI suggestions: {pr_data.get('title', 'Network optimization')}"
|
|
}
|
|
|
|
file_response = requests.post(file_url, headers=self.headers, json=file_data)
|
|
|
|
if file_response.status_code not in [201, 200]:
|
|
print(f"⚠️ Could not create file via API, trying alternative method")
|
|
else:
|
|
print(f"✅ Created config file: {file_path}")
|
|
|
|
# Create Pull Request
|
|
pr_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/pulls"
|
|
|
|
pr_body = f"""## AI-Generated Network Configuration
|
|
|
|
### Analysis Context
|
|
- **Zones Analyzed**: {', '.join(pr_data.get('network_context', {}).get('zones', []))}
|
|
- **Policies Reviewed**: {pr_data.get('network_context', {}).get('policies', 0)}
|
|
- **Feedback Aware**: {pr_data.get('feedback_aware', False)}
|
|
|
|
### Suggested Changes
|
|
```junos
|
|
{pr_data.get('suggestions', '')}
|
|
```
|
|
|
|
### Review Checklist
|
|
- [ ] No any/any/any rules
|
|
- [ ] Logging enabled on all policies
|
|
- [ ] Proper zone segmentation
|
|
- [ ] Address-sets used instead of individual IPs
|
|
- [ ] Applications are specific (not "any")
|
|
|
|
### How to Test
|
|
1. Apply to lab SRX first
|
|
2. Verify traffic flow
|
|
3. Check logs for any issues
|
|
4. Apply to production if tests pass
|
|
|
|
---
|
|
*This PR was automatically generated by the AI Network Automation system*
|
|
"""
|
|
|
|
pr_request = {
|
|
'title': pr_data.get('title', 'AI Network Configuration Suggestions'),
|
|
'head': branch_name,
|
|
'base': default_branch,
|
|
'body': pr_body
|
|
}
|
|
|
|
pr_response = requests.post(pr_url, headers=self.headers, json=pr_request)
|
|
|
|
if pr_response.status_code == 201:
|
|
pr_info = pr_response.json()
|
|
pr_number = pr_info['number']
|
|
pr_html_url = pr_info['html_url']
|
|
|
|
print(f"\n✅ Pull Request created successfully!")
|
|
print(f" PR Number: #{pr_number}")
|
|
print(f" URL: {pr_html_url}")
|
|
|
|
# Save PR info for tracking
|
|
pr_tracking = {
|
|
'pr_number': pr_number,
|
|
'pr_url': pr_html_url,
|
|
'branch': branch_name,
|
|
'created_at': datetime.now().isoformat(),
|
|
'ai_request_id': pr_data.get('request_id'),
|
|
'suggestions_file': str(pr_file)
|
|
}
|
|
|
|
tracking_file = self.pending_prs_dir / f"gitea_pr_{pr_number}.json"
|
|
with open(tracking_file, 'w') as f:
|
|
json.dump(pr_tracking, f, indent=2)
|
|
|
|
return pr_number
|
|
else:
|
|
print(f"❌ Failed to create PR: {pr_response.status_code}")
|
|
print(f" Response: {pr_response.text}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error creating PR: {e}")
|
|
return None
|
|
|
|
def reject_pr_with_feedback(self, pr_number, feedback_message):
|
|
"""Reject a PR and save feedback for AI learning"""
|
|
print("\n" + "="*60)
|
|
print(f"Rejecting PR #{pr_number} with Feedback")
|
|
print("="*60)
|
|
|
|
# Close the PR via API
|
|
pr_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/pulls/{pr_number}"
|
|
|
|
# Add comment with feedback
|
|
comment_url = f"{pr_url}/reviews"
|
|
comment_data = {
|
|
'body': feedback_message,
|
|
'event': 'REJECT' # or 'REQUEST_CHANGES'
|
|
}
|
|
|
|
comment_response = requests.post(comment_url, headers=self.headers, json=comment_data)
|
|
|
|
if comment_response.status_code not in [200, 201]:
|
|
# Try alternative: just add a comment
|
|
issue_comment_url = f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/issues/{pr_number}/comments"
|
|
comment_data = {
|
|
'body': f"❌ **REJECTED**\n\n{feedback_message}"
|
|
}
|
|
requests.post(issue_comment_url, headers=self.headers, json=comment_data)
|
|
|
|
# Close the PR
|
|
close_data = {
|
|
'state': 'closed'
|
|
}
|
|
close_response = requests.patch(pr_url, headers=self.headers, json=close_data)
|
|
|
|
if close_response.status_code == 200:
|
|
print(f"✅ PR #{pr_number} closed")
|
|
else:
|
|
print(f"⚠️ Could not close PR via API")
|
|
|
|
# Save feedback for AI learning
|
|
feedback_entry = {
|
|
'pr_number': pr_number,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'feedback_type': 'rejected',
|
|
'reviewer': 'security_team',
|
|
'details': {
|
|
'reason': feedback_message,
|
|
'specific_issues': self.parse_feedback_for_issues(feedback_message)
|
|
}
|
|
}
|
|
|
|
# Load and update feedback history
|
|
feedback_file = self.feedback_dir / 'pr_feedback_history.json'
|
|
self.feedback_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
if feedback_file.exists():
|
|
with open(feedback_file, 'r') as f:
|
|
history = json.load(f)
|
|
else:
|
|
history = []
|
|
|
|
history.append(feedback_entry)
|
|
|
|
with open(feedback_file, 'w') as f:
|
|
json.dump(history, f, indent=2)
|
|
|
|
print(f"✅ Feedback saved for AI learning")
|
|
print(f" Total feedback entries: {len(history)}")
|
|
|
|
return feedback_entry
|
|
|
|
def parse_feedback_for_issues(self, feedback_text):
|
|
"""Parse feedback text to extract specific issues"""
|
|
issues = []
|
|
|
|
# Common security issues to look for
|
|
patterns = [
|
|
('any/any/any', 'Never use any/any/any rules'),
|
|
('no logging', 'Always enable logging'),
|
|
('source-address any', 'Avoid using source-address any'),
|
|
('destination-address any', 'Avoid using destination-address any'),
|
|
('application any', 'Specify applications instead of any'),
|
|
('overly permissive', 'Rules are too permissive'),
|
|
('zone segmentation', 'Improper zone segmentation'),
|
|
('iot', 'IoT security concerns')
|
|
]
|
|
|
|
feedback_lower = feedback_text.lower()
|
|
for pattern, description in patterns:
|
|
if pattern in feedback_lower:
|
|
issues.append({
|
|
'pattern': pattern,
|
|
'description': description,
|
|
'type': 'security'
|
|
})
|
|
|
|
return issues if issues else feedback_text
|
|
|
|
def main():
|
|
"""Main entry point for testing"""
|
|
print("\n" + "="*60)
|
|
print(" GITEA PR FEEDBACK TESTING")
|
|
print("="*60)
|
|
|
|
manager = GiteaPRManager()
|
|
|
|
print("\nOptions:")
|
|
print("1. Create a new PR from latest AI suggestions")
|
|
print("2. Reject a PR with feedback")
|
|
print("3. Run complete test cycle")
|
|
|
|
choice = input("\nSelect option (1-3): ")
|
|
|
|
if choice == '1':
|
|
pr_number = manager.create_pr_from_ai_suggestions()
|
|
if pr_number:
|
|
print(f"\n✅ Successfully created PR #{pr_number}")
|
|
print("\nYou can now:")
|
|
print(f"1. Review it in Gitea")
|
|
print(f"2. Reject it with: python3 gitea_pr_feedback.py")
|
|
|
|
elif choice == '2':
|
|
pr_number = input("Enter PR number to reject: ")
|
|
print("\nEnter rejection feedback (press Ctrl+D when done):")
|
|
feedback_lines = []
|
|
try:
|
|
while True:
|
|
feedback_lines.append(input())
|
|
except EOFError:
|
|
pass
|
|
|
|
feedback = '\n'.join(feedback_lines)
|
|
|
|
if not feedback:
|
|
feedback = """This configuration has security issues:
|
|
|
|
1. Any/any/any rules detected - this violates zero-trust principles
|
|
2. No logging enabled on some policies
|
|
3. Overly permissive access between zones
|
|
|
|
Please revise to:
|
|
- Use specific address-sets
|
|
- Enable logging on all policies
|
|
- Implement proper zone segmentation"""
|
|
|
|
manager.reject_pr_with_feedback(pr_number, feedback)
|
|
|
|
elif choice == '3':
|
|
print("\n📋 Complete test cycle:")
|
|
print("1. Creating PR from AI suggestions...")
|
|
pr_number = manager.create_pr_from_ai_suggestions()
|
|
|
|
if pr_number:
|
|
print(f"\n2. Waiting for review...")
|
|
input(" Press Enter to simulate rejection...")
|
|
|
|
feedback = """Security Review Failed:
|
|
|
|
❌ Critical Issues Found:
|
|
- Any/any/any rule in policy ALLOW-ALL
|
|
- No logging on DMZ policies
|
|
- IoT zone has unrestricted access to HOME zone
|
|
|
|
Requirements:
|
|
- All policies must use specific addresses
|
|
- Logging must be enabled
|
|
- IoT devices need strict access control
|
|
"""
|
|
|
|
print("\n3. Rejecting PR with feedback...")
|
|
manager.reject_pr_with_feedback(pr_number, feedback)
|
|
|
|
print("\n4. AI will learn from this feedback in next run")
|
|
print(" Run: python3 run_pipeline.py --skip-netflow")
|
|
print(" The AI should avoid these mistakes next time!")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|