#!/usr/bin/env python3 """ Change Detector for Git Agent Detects and analyzes file changes for detailed commit messages """ import os import re import subprocess import logging from typing import Dict, Any, List from decimal import Decimal class ChangeDetector: """Detects and categorizes file changes""" def __init__(self, config: Dict[str, Any], logger: logging.Logger): self.config = config self.logger = logger self.project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def detect_changes(self) -> Dict[str, Any]: """Detect all changes in the repository""" try: # Get changed files changed_files = self._get_changed_files() if not changed_files: return { 'has_changes': False, 'files': [], 'categories': {}, 'parameter_changes': {} } # Analyze changes file_details = [] categories = { 'python': [], 'config': [], 'docs': [], 'other': [] } parameter_changes = {} for file_path in changed_files: details = self._analyze_file_changes(file_path) file_details.append(details) # Categorize file category = self._categorize_file(file_path) categories[category].append(details) # Track parameter changes for Python files if category == 'python': params = self._extract_parameter_changes(file_path, details.get('diff', '')) if params: parameter_changes[file_path] = params return { 'has_changes': True, 'files': file_details, 'categories': categories, 'parameter_changes': parameter_changes } except Exception as e: self.logger.error(f"❌ Error detecting changes: {e}") return { 'has_changes': False, 'files': [], 'categories': {}, 'parameter_changes': {}, 'error': str(e) } def _get_changed_files(self) -> List[str]: """Get list of changed files using git status""" try: result = subprocess.run( ['git', 'status', '--porcelain'], cwd=self.project_root, capture_output=True, text=True, check=False ) if result.returncode != 0: return [] files = [] for line in result.stdout.strip().split('\n'): if line.strip(): # Extract filename (remove status codes) filename = line.strip()[2:] if len(line.strip()) > 2 else line.strip() if filename and filename not in ['.git', '__pycache__']: files.append(filename) return files except Exception as e: self.logger.error(f"Error getting changed files: {e}") return [] def _analyze_file_changes(self, file_path: str) -> Dict[str, Any]: """Analyze changes for a specific file""" try: # Get diff result = subprocess.run( ['git', 'diff', '--', file_path], cwd=self.project_root, capture_output=True, text=True, check=False ) diff = result.stdout if result.returncode == 0 else '' # Get file status status_result = subprocess.run( ['git', 'status', '--porcelain', '--', file_path], cwd=self.project_root, capture_output=True, text=True, check=False ) status = 'modified' if status_result.returncode == 0 and status_result.stdout.strip(): status_line = status_result.stdout.strip()[0] if status_line == 'A': status = 'added' elif status_line == 'D': status = 'deleted' elif status_line == '??': status = 'untracked' # Count lines changed lines_added = diff.count('\n+') - diff.count('\n++') # Exclude +++ indicators lines_deleted = diff.count('\n-') - diff.count('\n--') # Exclude --- indicators return { 'path': file_path, 'status': status, 'lines_added': max(0, lines_added), 'lines_deleted': max(0, lines_deleted), 'diff': diff } except Exception as e: self.logger.error(f"Error analyzing {file_path}: {e}") return { 'path': file_path, 'status': 'error', 'lines_added': 0, 'lines_deleted': 0, 'diff': '', 'error': str(e) } def _categorize_file(self, file_path: str) -> str: """Categorize file type""" if file_path.endswith('.py'): return 'python' elif file_path.endswith(('.json', '.yaml', '.yml', '.toml', '.ini')): return 'config' elif file_path.endswith(('.md', '.txt', '.rst')): return 'docs' else: return 'other' def _extract_parameter_changes(self, file_path: str, diff: str) -> Dict[str, Any]: """Extract parameter changes from Python files""" if not diff or not file_path.endswith('.py'): return {} parameters = {} # Common trading bot parameters to track param_patterns = { 'TARGET_INVESTMENT_VALUE_USDC': r'(TARGET_INVESTMENT_VALUE_USDC)\s*=\s*(\d+)', 'RANGE_WIDTH_PCT': r'(RANGE_WIDTH_PCT)\s*=\s*Decimal\("([^"]+)"\)', 'SLIPPAGE_TOLERANCE': r'(SLIPPAGE_TOLERANCE)\s*=\s*Decimal\("([^"]+)"\)', 'LEVERAGE': r'(LEVERAGE)\s*=\s*(\d+)', 'MIN_THRESHOLD_ETH': r'(MIN_THRESHOLD_ETH)\s*=\s*Decimal\("([^"]+)"\)', 'CHECK_INTERVAL': r'(CHECK_INTERVAL)\s*=\s*(\d+)', 'PRICE_BUFFER_PCT': r'(PRICE_BUFFER_PCT)\s*=\s*Decimal\("([^"]+)"\)' } for param_name, pattern in param_patterns.items(): matches = re.findall(pattern, diff) if matches: # Find old and new values values = [] for match in matches: if isinstance(match, tuple): values.append(match[1] if len(match) > 1 else match[0]) else: values.append(match) if len(values) >= 2: old_val = values[0] new_val = values[-1] # Last value is current # Calculate percentage change for numeric values try: if '.' in old_val or '.' in new_val: old_num = float(old_val) new_num = float(new_val) if old_num != 0: pct_change = ((new_num - old_num) / abs(old_num)) * 100 else: pct_change = 0 else: old_num = int(old_val) new_num = int(new_val) if old_num != 0: pct_change = ((new_num - old_num) / abs(old_num)) * 100 else: pct_change = 0 except (ValueError, ZeroDivisionError): pct_change = 0 parameters[param_name] = { 'old': old_val, 'new': new_val, 'pct_change': round(pct_change, 1) } return parameters