#!/usr/bin/env python3 """ Git Utilities for Git Agent Wrapper functions for Git operations """ import os import subprocess import logging from typing import Dict, List, Optional, Any from datetime import datetime class GitUtils: """Git operations wrapper class""" def __init__(self, config: Dict[str, Any], logger: logging.Logger): self.config = config self.logger = logger self.project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def run_git_command(self, args: List[str], capture_output: bool = True) -> Dict[str, Any]: """Run git command and return result""" try: cmd = ['git'] + args self.logger.debug(f"Running: {' '.join(cmd)}") if capture_output: result = subprocess.run( cmd, cwd=self.project_root, capture_output=True, text=True, check=False ) return { 'success': result.returncode == 0, 'stdout': result.stdout.strip(), 'stderr': result.stderr.strip(), 'returncode': result.returncode } else: result = subprocess.run(cmd, cwd=self.project_root, check=False) return { 'success': result.returncode == 0, 'returncode': result.returncode } except Exception as e: self.logger.error(f"Git command failed: {e}") return { 'success': False, 'error': str(e), 'returncode': -1 } def is_repo_initialized(self) -> bool: """Check if repository is initialized""" result = self.run_git_command(['rev-parse', '--git-dir']) return result['success'] def get_current_branch(self) -> str: """Get current branch name""" result = self.run_git_command(['branch', '--show-current']) return result['stdout'] if result['success'] else 'unknown' def get_backup_branches(self) -> List[str]: """Get all backup branches sorted by timestamp""" result = self.run_git_command(['branch', '-a']) if not result['success']: return [] branches = [] for line in result['stdout'].split('\n'): branch = line.strip().replace('* ', '').replace('remotes/origin/', '') if branch.startswith('backup-'): branches.append(branch) # Sort by timestamp (extract from branch name) branches.sort(key=lambda x: x.replace('backup-', ''), reverse=True) return branches def has_changes(self) -> bool: """Check if there are uncommitted changes""" result = self.run_git_command(['status', '--porcelain']) return bool(result['stdout'].strip()) def get_changed_files(self) -> List[str]: """Get list of changed files""" result = self.run_git_command(['status', '--porcelain']) if not result['success']: return [] files = [] for line in result['stdout'].split('\n'): if line.strip(): # Extract filename (remove status codes) filename = line.strip()[2:] if len(line.strip()) > 2 else line.strip() if filename: files.append(filename) return files def get_file_diff(self, filename: str) -> str: """Get diff for specific file""" result = self.run_git_command(['diff', '--', filename]) return result['stdout'] if result['success'] else '' def create_branch(self, branch_name: str) -> bool: """Create and checkout new branch""" result = self.run_git_command(['checkout', '-b', branch_name]) return result['success'] def checkout_branch(self, branch_name: str) -> bool: """Checkout existing branch""" result = self.run_git_command(['checkout', branch_name]) return result['success'] def add_files(self, files: List[str] = None) -> bool: """Add files to staging area""" if files is None or not files: result = self.run_git_command(['add', '.']) else: result = self.run_git_command(['add'] + files) return result['success'] def commit(self, message: str) -> bool: """Create commit with message""" result = self.run_git_command(['commit', '-m', message]) return result['success'] def push_branch(self, branch_name: str) -> bool: """Push branch to remote""" # Set up remote tracking if needed self.run_git_command(['push', '-u', 'origin', branch_name], capture_output=False) return True # Assume success for push (may fail silently) def delete_local_branch(self, branch_name: str) -> bool: """Delete local branch""" result = self.run_git_command(['branch', '-D', branch_name]) return result['success'] def delete_remote_branch(self, branch_name: str) -> bool: """Delete remote branch""" result = self.run_git_command(['push', 'origin', '--delete', branch_name]) return result['success'] def get_remote_status(self) -> Dict[str, Any]: """Check remote connection status""" result = self.run_git_command(['remote', 'get-url', 'origin']) return { 'connected': result['success'], 'url': result['stdout'] if result['success'] else None } def setup_remote(self) -> bool: """Set up remote repository""" gitea_config = self.config.get('gitea', {}) server_url = gitea_config.get('server_url') username = gitea_config.get('username') repository = gitea_config.get('repository') if not all([server_url, username, repository]): self.logger.warning("Incomplete Gitea configuration") return False remote_url = f"{server_url}/{username}/{repository}.git" # Check if remote already exists existing_remote = self.run_git_command(['remote', 'get-url', 'origin']) if existing_remote['success']: self.logger.info("Remote already configured") return True # Add remote result = self.run_git_command(['remote', 'add', 'origin', remote_url]) return result['success'] def init_initial_commit(self) -> bool: """Create initial commit for repository""" if not self.is_repo_initialized(): # Initialize repository result = self.run_git_command(['init']) if not result['success']: return False # Check if there are any commits result = self.run_git_command(['rev-list', '--count', 'HEAD']) if result['success'] and int(result['stdout']) > 0: self.logger.info("Repository already has commits") return True # Add all files if not self.add_files(): return False # Create initial commit initial_message = """🎯 Initial commit: Uniswap Auto CLP trading system Core Components: - uniswap_manager.py: V3 concentrated liquidity position manager - clp_hedger.py: Hyperliquid perpetuals hedging bot - requirements.txt: Python dependencies - .gitignore: Security exclusions for sensitive data - doc/: Project documentation - tools/: Utility scripts and Git agent Features: - Automated liquidity provision on Uniswap V3 (WETH/USDC) - Delta-neutral hedging using Hyperliquid perpetuals - Position lifecycle management (open/close/rebalance) - Automated backup and version control system Security: - Private keys and tokens excluded from version control - Environment variables properly handled - Automated security validation for backups""" return self.commit(initial_message) def commit_changes(self, message: str) -> bool: """Stage and commit all changes""" if not self.add_files(): return False return self.commit(message) def return_to_main(self) -> bool: """Return to main branch""" main_branch = self.config.get('main_branch', {}).get('name', 'main') return self.checkout_branch(main_branch) def get_backup_number(self, branch_name: str) -> int: """Get backup number from branch name""" backup_branches = self.get_backup_branches() try: return backup_branches.index(branch_name) + 1 except ValueError: return 0