feat: add florida module for unified hedging and monitoring
This commit is contained in:
238
florida/tools/git_utils.py
Normal file
238
florida/tools/git_utils.py
Normal file
@ -0,0 +1,238 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Git Utilities for Git Agent
|
||||
Wrapper functions for Git operations
|
||||
"""
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import logging
|
||||
from typing import Dict, List, Optional, Any
|
||||
from datetime import datetime
|
||||
|
||||
class GitUtils:
|
||||
"""Git operations wrapper class"""
|
||||
|
||||
def __init__(self, config: Dict[str, Any], logger: logging.Logger):
|
||||
self.config = config
|
||||
self.logger = logger
|
||||
self.project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
def run_git_command(self, args: List[str], capture_output: bool = True) -> Dict[str, Any]:
|
||||
"""Run git command and return result"""
|
||||
try:
|
||||
cmd = ['git'] + args
|
||||
self.logger.debug(f"Running: {' '.join(cmd)}")
|
||||
|
||||
if capture_output:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
cwd=self.project_root,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False
|
||||
)
|
||||
return {
|
||||
'success': result.returncode == 0,
|
||||
'stdout': result.stdout.strip(),
|
||||
'stderr': result.stderr.strip(),
|
||||
'returncode': result.returncode
|
||||
}
|
||||
else:
|
||||
result = subprocess.run(cmd, cwd=self.project_root, check=False)
|
||||
return {
|
||||
'success': result.returncode == 0,
|
||||
'returncode': result.returncode
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Git command failed: {e}")
|
||||
return {
|
||||
'success': False,
|
||||
'error': str(e),
|
||||
'returncode': -1
|
||||
}
|
||||
|
||||
def is_repo_initialized(self) -> bool:
|
||||
"""Check if repository is initialized"""
|
||||
result = self.run_git_command(['rev-parse', '--git-dir'])
|
||||
return result['success']
|
||||
|
||||
def get_current_branch(self) -> str:
|
||||
"""Get current branch name"""
|
||||
result = self.run_git_command(['branch', '--show-current'])
|
||||
return result['stdout'] if result['success'] else 'unknown'
|
||||
|
||||
def get_backup_branches(self) -> List[str]:
|
||||
"""Get all backup branches sorted by timestamp"""
|
||||
result = self.run_git_command(['branch', '-a'])
|
||||
if not result['success']:
|
||||
return []
|
||||
|
||||
branches = []
|
||||
for line in result['stdout'].split('\n'):
|
||||
branch = line.strip().replace('* ', '').replace('remotes/origin/', '')
|
||||
if branch.startswith('backup-'):
|
||||
branches.append(branch)
|
||||
|
||||
# Sort by timestamp (extract from branch name)
|
||||
branches.sort(key=lambda x: x.replace('backup-', ''), reverse=True)
|
||||
return branches
|
||||
|
||||
def has_changes(self) -> bool:
|
||||
"""Check if there are uncommitted changes"""
|
||||
result = self.run_git_command(['status', '--porcelain'])
|
||||
return bool(result['stdout'].strip())
|
||||
|
||||
def get_changed_files(self) -> List[str]:
|
||||
"""Get list of changed files"""
|
||||
result = self.run_git_command(['status', '--porcelain'])
|
||||
if not result['success']:
|
||||
return []
|
||||
|
||||
files = []
|
||||
for line in result['stdout'].split('\n'):
|
||||
if line.strip():
|
||||
# Extract filename (remove status codes)
|
||||
filename = line.strip()[2:] if len(line.strip()) > 2 else line.strip()
|
||||
if filename:
|
||||
files.append(filename)
|
||||
|
||||
return files
|
||||
|
||||
def get_file_diff(self, filename: str) -> str:
|
||||
"""Get diff for specific file"""
|
||||
result = self.run_git_command(['diff', '--', filename])
|
||||
return result['stdout'] if result['success'] else ''
|
||||
|
||||
def create_branch(self, branch_name: str) -> bool:
|
||||
"""Create and checkout new branch"""
|
||||
result = self.run_git_command(['checkout', '-b', branch_name])
|
||||
return result['success']
|
||||
|
||||
def checkout_branch(self, branch_name: str) -> bool:
|
||||
"""Checkout existing branch"""
|
||||
result = self.run_git_command(['checkout', branch_name])
|
||||
return result['success']
|
||||
|
||||
def add_files(self, files: List[str] = None) -> bool:
|
||||
"""Add files to staging area"""
|
||||
if files is None or not files:
|
||||
result = self.run_git_command(['add', '.'])
|
||||
else:
|
||||
result = self.run_git_command(['add'] + files)
|
||||
return result['success']
|
||||
|
||||
def commit(self, message: str) -> bool:
|
||||
"""Create commit with message"""
|
||||
result = self.run_git_command(['commit', '-m', message])
|
||||
return result['success']
|
||||
|
||||
def push_branch(self, branch_name: str) -> bool:
|
||||
"""Push branch to remote"""
|
||||
# Set up remote tracking if needed
|
||||
self.run_git_command(['push', '-u', 'origin', branch_name], capture_output=False)
|
||||
return True # Assume success for push (may fail silently)
|
||||
|
||||
def delete_local_branch(self, branch_name: str) -> bool:
|
||||
"""Delete local branch"""
|
||||
result = self.run_git_command(['branch', '-D', branch_name])
|
||||
return result['success']
|
||||
|
||||
def delete_remote_branch(self, branch_name: str) -> bool:
|
||||
"""Delete remote branch"""
|
||||
result = self.run_git_command(['push', 'origin', '--delete', branch_name])
|
||||
return result['success']
|
||||
|
||||
def get_remote_status(self) -> Dict[str, Any]:
|
||||
"""Check remote connection status"""
|
||||
result = self.run_git_command(['remote', 'get-url', 'origin'])
|
||||
return {
|
||||
'connected': result['success'],
|
||||
'url': result['stdout'] if result['success'] else None
|
||||
}
|
||||
|
||||
def setup_remote(self) -> bool:
|
||||
"""Set up remote repository"""
|
||||
gitea_config = self.config.get('gitea', {})
|
||||
server_url = gitea_config.get('server_url')
|
||||
username = gitea_config.get('username')
|
||||
repository = gitea_config.get('repository')
|
||||
|
||||
if not all([server_url, username, repository]):
|
||||
self.logger.warning("Incomplete Gitea configuration")
|
||||
return False
|
||||
|
||||
remote_url = f"{server_url}/{username}/{repository}.git"
|
||||
|
||||
# Check if remote already exists
|
||||
existing_remote = self.run_git_command(['remote', 'get-url', 'origin'])
|
||||
if existing_remote['success']:
|
||||
self.logger.info("Remote already configured")
|
||||
return True
|
||||
|
||||
# Add remote
|
||||
result = self.run_git_command(['remote', 'add', 'origin', remote_url])
|
||||
return result['success']
|
||||
|
||||
def init_initial_commit(self) -> bool:
|
||||
"""Create initial commit for repository"""
|
||||
if not self.is_repo_initialized():
|
||||
# Initialize repository
|
||||
result = self.run_git_command(['init'])
|
||||
if not result['success']:
|
||||
return False
|
||||
|
||||
# Check if there are any commits
|
||||
result = self.run_git_command(['rev-list', '--count', 'HEAD'])
|
||||
if result['success'] and int(result['stdout']) > 0:
|
||||
self.logger.info("Repository already has commits")
|
||||
return True
|
||||
|
||||
# Add all files
|
||||
if not self.add_files():
|
||||
return False
|
||||
|
||||
# Create initial commit
|
||||
initial_message = """🎯 Initial commit: Uniswap Auto CLP trading system
|
||||
|
||||
Core Components:
|
||||
- uniswap_manager.py: V3 concentrated liquidity position manager
|
||||
- clp_hedger.py: Hyperliquid perpetuals hedging bot
|
||||
- requirements.txt: Python dependencies
|
||||
- .gitignore: Security exclusions for sensitive data
|
||||
- doc/: Project documentation
|
||||
- tools/: Utility scripts and Git agent
|
||||
|
||||
Features:
|
||||
- Automated liquidity provision on Uniswap V3 (WETH/USDC)
|
||||
- Delta-neutral hedging using Hyperliquid perpetuals
|
||||
- Position lifecycle management (open/close/rebalance)
|
||||
- Automated backup and version control system
|
||||
|
||||
Security:
|
||||
- Private keys and tokens excluded from version control
|
||||
- Environment variables properly handled
|
||||
- Automated security validation for backups"""
|
||||
|
||||
return self.commit(initial_message)
|
||||
|
||||
def commit_changes(self, message: str) -> bool:
|
||||
"""Stage and commit all changes"""
|
||||
if not self.add_files():
|
||||
return False
|
||||
|
||||
return self.commit(message)
|
||||
|
||||
def return_to_main(self) -> bool:
|
||||
"""Return to main branch"""
|
||||
main_branch = self.config.get('main_branch', {}).get('name', 'main')
|
||||
return self.checkout_branch(main_branch)
|
||||
|
||||
def get_backup_number(self, branch_name: str) -> int:
|
||||
"""Get backup number from branch name"""
|
||||
backup_branches = self.get_backup_branches()
|
||||
try:
|
||||
return backup_branches.index(branch_name) + 1
|
||||
except ValueError:
|
||||
return 0
|
||||
Reference in New Issue
Block a user