🎯 Initial commit: Uniswap Auto CLP trading system

Core Components:
- uniswap_manager.py: V3 concentrated liquidity position manager
- clp_hedger.py: Hyperliquid perpetuals hedging bot
- requirements.txt: Python dependencies
- .gitignore: Security exclusions for sensitive data
- doc/: Project documentation
- tools/: Utility scripts and Git agent

Features:
- Automated liquidity provision on Uniswap V3 (WETH/USDC)
- Delta-neutral hedging using Hyperliquid perpetuals
- Position lifecycle management (open/close/rebalance)
- Automated backup and version control system

Security:
- Private keys and tokens excluded from version control
- Environment variables properly handled
- Automated security validation for backups

Git Agent:
- Hourly automated backups to separate branches
- Keep last 100 backups (~4 days coverage)
- Detailed change tracking and parameter monitoring
- Push to Gitea server automatically
- Manual main branch control preserved
- No performance tracking for privacy
- No notifications for simplicity

Files Added:
- git_agent.py: Main automation script
- agent_config.json: Configuration with Gitea settings
- git_utils.py: Git operations wrapper
- backup_manager.py: Backup branch management
- change_detector.py: File change analysis
- cleanup_manager.py: 100-backup rotation
- commit_formatter.py: Detailed commit messages
- README_GIT_AGENT.md: Complete usage documentation
This commit is contained in:
2025-12-19 20:30:48 +01:00
commit 5ca16ec33f
18 changed files with 4207 additions and 0 deletions

262
tools/README_GIT_AGENT.md Normal file
View File

@ -0,0 +1,262 @@
# Git Agent for Uniswap Auto CLP
## Overview
Automated backup and version control system for your Uniswap Auto CLP trading bot.
## Quick Setup
### 1. Initialize Repository
```bash
# Navigate to project directory
cd K:\Projects\uniswap_auto_clp
# Create initial commit
python tools\git_agent.py --init
# Add and push initial setup
git add .
git commit -m "🎯 Initial commit: Uniswap Auto CLP system"
git remote add origin https://git.kapuscinski.pl/ditus/uniswap_auto_clp.git
git push -u origin main
```
### 2. Create First Backup
```bash
# Test backup creation
python tools\git_agent.py --backup
```
### 3. Check Status
```bash
# View current status
python tools\git_agent.py --status
```
## Configuration
Edit `tools/agent_config.json` as needed:
```json
{
"backup": {
"enabled": true,
"frequency_hours": 1,
"keep_max_count": 100,
"push_to_remote": true
}
}
```
## Usage Commands
### Manual Operations
```bash
# Create backup now
python tools\git_agent.py --backup
# Check status
python tools\git_agent.py --status
# Cleanup old backups
python tools\git_agent.py --cleanup
# Initialize repository (one-time)
python tools\git_agent.py --init
```
### Automated Scheduling
#### Windows Task Scheduler
```powershell
# Create hourly task
schtasks /create /tn "Git Backup" /tr "python tools\git_agent.py --backup" /sc hourly
```
#### Linux Cron (if needed)
```bash
# Add to crontab
0 * * * * cd /path/to/project && python tools/git_agent.py --backup
```
## How It Works
### Branch Strategy
- **main branch**: Your manual development (you control pushes)
- **backup-* branches**: Automatic hourly backups (agent managed)
### Backup Process
1. **Hourly**: Agent checks for file changes
2. **Creates backup branch**: Named `backup-YYYY-MM-DD-HH`
3. **Commits changes**: With detailed file and parameter tracking
4. **Pushes to remote**: Automatic backup to Gitea
5. **Cleans up**: Keeps only last 100 backups
### Backup Naming
```
backup-2025-01-15-14 # 2 PM backup on Jan 15, 2025
backup-2025-01-15-15 # 3 PM backup
backup-2025-01-15-16 # 4 PM backup
```
### Commit Messages
Agent creates detailed commit messages showing:
- Files changed with status icons
- Parameter changes with percentage differences
- Security validation confirmation
- Timestamp and backup number
## Security
### What's Excluded
✅ Private keys and tokens (`.env` files)
✅ Log files (`*.log`)
✅ State files (`hedge_status.json`)
✅ Temporary files
### What's Included
✅ All code changes
✅ Configuration modifications
✅ Documentation updates
✅ Parameter tracking
## Emergency Recovery
### Quick Rollback
```bash
# List recent backups
python tools\git_agent.py --status
# Switch to backup
git checkout backup-2025-01-15-14
# Copy files to main
git checkout main -- .
git commit -m "🔄 Emergency restore from backup-2025-01-15-14"
git push origin main
```
### File Recovery
```bash
# Restore specific file from backup
git checkout backup-2025-01-15-14 -- path/to/file.py
```
## Monitoring
### Backup Health
```bash
# Check backup count and status
python tools\git_agent.py --status
# Expected output:
# 📊 Git Agent Status:
# Current Branch: main
# Backup Count: 47
# Has Changes: false
# Remote Connected: true
# Last Backup: backup-2025-01-15-16
```
### Manual Cleanup
```bash
# Remove old backups (keeps last 100)
python tools\git_agent.py --cleanup
```
## Troubleshooting
### Common Issues
#### "Configuration file not found"
```bash
# Ensure agent_config.json exists in tools/ directory
ls tools/agent_config.json
```
#### "Git command failed"
```bash
# Check Git installation and repository status
git status
git --version
```
#### "Remote connection failed"
```bash
# Verify Gitea URL and credentials
git remote -v
ping git.kapuscinski.pl
```
### Debug Mode
Edit `agent_config.json`:
```json
{
"logging": {
"enabled": true,
"log_level": "DEBUG"
}
}
```
Then check `git_agent.log` in project root.
## Integration with Trading Bot
### Parameter Changes
Agent automatically tracks changes to:
- `TARGET_INVESTMENT_VALUE_USDC`
- `RANGE_WIDTH_PCT`
- `SLIPPAGE_TOLERANCE`
- `LEVERAGE`
- `CHECK_INTERVAL`
- `PRICE_BUFFER_PCT`
### Backup Triggers
Consider manual backups when:
- Changing trading strategy parameters
- Updating risk management settings
- Before major system changes
- After successful backtesting
```bash
# Manual backup before important changes
python tools\git_agent.py --backup
```
## Best Practices
### Development Workflow
1. **Work on main branch** for normal development
2. **Manual commits** for your changes
3. **Agent handles backups** automatically
4. **Manual push** to main when ready
### Backup Management
- **100 backup limit** = ~4 days of hourly coverage
- **Automatic cleanup** maintains repository size
- **Remote storage** provides offsite backup
### Security Reminders
- **Never commit private keys** (automatically excluded)
- **Check .gitignore** if adding sensitive files
- **Review backup commits** for accidental secrets
## Support
### Log Files
- `git_agent.log`: Agent activity and errors
- Check logs for troubleshooting issues
### Repository Structure
```
tools/
├── git_agent.py # Main automation script
├── agent_config.json # Configuration settings
├── git_utils.py # Git operations
├── backup_manager.py # Backup branch logic
├── change_detector.py # Change analysis
├── cleanup_manager.py # Backup rotation
└── commit_formatter.py # Message formatting
```
This automated backup system ensures your trading bot code is always versioned and recoverable, while keeping your main development workflow clean and manual.

35
tools/agent_config.json Normal file
View File

@ -0,0 +1,35 @@
{
"gitea": {
"server_url": "https://git.kapuscinski.pl",
"username": "ditus",
"repository": "uniswap_auto_clp",
"token": "b24fc3203597b2bdcb2f2da6634c618"
},
"backup": {
"enabled": true,
"frequency_hours": 1,
"branch_prefix": "backup-",
"push_to_remote": true,
"keep_max_count": 100,
"cleanup_with_backup": true,
"detailed_commit_messages": true
},
"main_branch": {
"manual_pushes_only": true,
"auto_commits": false,
"protect_from_agent": true,
"name": "main"
},
"change_tracking": {
"method": "commit_message",
"include_file_diffs": true,
"track_parameter_changes": true,
"format": "detailed",
"security_validation": false
},
"logging": {
"enabled": true,
"log_file": "git_agent.log",
"log_level": "INFO"
}
}

89
tools/backup_manager.py Normal file
View File

@ -0,0 +1,89 @@
#!/usr/bin/env python3
"""
Backup Manager for Git Agent
Handles backup branch creation and management
"""
import os
import logging
from datetime import datetime, timezone
from typing import Dict, Any
class BackupManager:
"""Manages backup branch operations"""
def __init__(self, config: Dict[str, Any], logger: logging.Logger):
self.config = config
self.logger = logger
self.backup_config = config.get('backup', {})
self.prefix = self.backup_config.get('branch_prefix', 'backup-')
def create_backup_branch(self) -> str:
"""Create a new backup branch with timestamp"""
timestamp = datetime.now(timezone.utc)
branch_name = f"{self.prefix}{timestamp.strftime('%Y-%m-%d-%H')}"
# Get current directory from git utils
current_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Create backup branch
import subprocess
try:
# Create and checkout new branch
result = subprocess.run(
['git', 'checkout', '-b', branch_name],
cwd=current_dir,
capture_output=True,
text=True,
check=False
)
if result.returncode == 0:
self.logger.info(f"✅ Created backup branch: {branch_name}")
return branch_name
else:
# Branch might already exist, just checkout
result = subprocess.run(
['git', 'checkout', branch_name],
cwd=current_dir,
capture_output=True,
text=True,
check=False
)
if result.returncode == 0:
self.logger.info(f"✅ Using existing backup branch: {branch_name}")
return branch_name
else:
self.logger.error(f"❌ Failed to create/checkout backup branch: {result.stderr}")
return None
except Exception as e:
self.logger.error(f"❌ Exception creating backup branch: {e}")
return None
def get_backup_count(self) -> int:
"""Get current number of backup branches"""
current_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
try:
result = subprocess.run(
['git', 'branch', '-a'],
cwd=current_dir,
capture_output=True,
text=True,
check=False
)
if result.returncode == 0:
branches = result.stdout.strip().split('\n')
backup_branches = [
b.strip().replace('* ', '').replace('remotes/origin/', '')
for b in branches
if b.strip() and self.prefix in b
]
return len(backup_branches)
except Exception as e:
self.logger.error(f"❌ Error counting backup branches: {e}")
return 0

230
tools/change_detector.py Normal file
View File

@ -0,0 +1,230 @@
#!/usr/bin/env python3
"""
Change Detector for Git Agent
Detects and analyzes file changes for detailed commit messages
"""
import os
import re
import subprocess
import logging
from typing import Dict, Any, List
from decimal import Decimal
class ChangeDetector:
"""Detects and categorizes file changes"""
def __init__(self, config: Dict[str, Any], logger: logging.Logger):
self.config = config
self.logger = logger
self.project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def detect_changes(self) -> Dict[str, Any]:
"""Detect all changes in the repository"""
try:
# Get changed files
changed_files = self._get_changed_files()
if not changed_files:
return {
'has_changes': False,
'files': [],
'categories': {},
'parameter_changes': {}
}
# Analyze changes
file_details = []
categories = {
'python': [],
'config': [],
'docs': [],
'other': []
}
parameter_changes = {}
for file_path in changed_files:
details = self._analyze_file_changes(file_path)
file_details.append(details)
# Categorize file
category = self._categorize_file(file_path)
categories[category].append(details)
# Track parameter changes for Python files
if category == 'python':
params = self._extract_parameter_changes(file_path, details.get('diff', ''))
if params:
parameter_changes[file_path] = params
return {
'has_changes': True,
'files': file_details,
'categories': categories,
'parameter_changes': parameter_changes
}
except Exception as e:
self.logger.error(f"❌ Error detecting changes: {e}")
return {
'has_changes': False,
'files': [],
'categories': {},
'parameter_changes': {},
'error': str(e)
}
def _get_changed_files(self) -> List[str]:
"""Get list of changed files using git status"""
try:
result = subprocess.run(
['git', 'status', '--porcelain'],
cwd=self.project_root,
capture_output=True,
text=True,
check=False
)
if result.returncode != 0:
return []
files = []
for line in result.stdout.strip().split('\n'):
if line.strip():
# Extract filename (remove status codes)
filename = line.strip()[2:] if len(line.strip()) > 2 else line.strip()
if filename and filename not in ['.git', '__pycache__']:
files.append(filename)
return files
except Exception as e:
self.logger.error(f"Error getting changed files: {e}")
return []
def _analyze_file_changes(self, file_path: str) -> Dict[str, Any]:
"""Analyze changes for a specific file"""
try:
# Get diff
result = subprocess.run(
['git', 'diff', '--', file_path],
cwd=self.project_root,
capture_output=True,
text=True,
check=False
)
diff = result.stdout if result.returncode == 0 else ''
# Get file status
status_result = subprocess.run(
['git', 'status', '--porcelain', '--', file_path],
cwd=self.project_root,
capture_output=True,
text=True,
check=False
)
status = 'modified'
if status_result.returncode == 0 and status_result.stdout.strip():
status_line = status_result.stdout.strip()[0]
if status_line == 'A':
status = 'added'
elif status_line == 'D':
status = 'deleted'
elif status_line == '??':
status = 'untracked'
# Count lines changed
lines_added = diff.count('\n+') - diff.count('\n++') # Exclude +++ indicators
lines_deleted = diff.count('\n-') - diff.count('\n--') # Exclude --- indicators
return {
'path': file_path,
'status': status,
'lines_added': max(0, lines_added),
'lines_deleted': max(0, lines_deleted),
'diff': diff
}
except Exception as e:
self.logger.error(f"Error analyzing {file_path}: {e}")
return {
'path': file_path,
'status': 'error',
'lines_added': 0,
'lines_deleted': 0,
'diff': '',
'error': str(e)
}
def _categorize_file(self, file_path: str) -> str:
"""Categorize file type"""
if file_path.endswith('.py'):
return 'python'
elif file_path.endswith(('.json', '.yaml', '.yml', '.toml', '.ini')):
return 'config'
elif file_path.endswith(('.md', '.txt', '.rst')):
return 'docs'
else:
return 'other'
def _extract_parameter_changes(self, file_path: str, diff: str) -> Dict[str, Any]:
"""Extract parameter changes from Python files"""
if not diff or not file_path.endswith('.py'):
return {}
parameters = {}
# Common trading bot parameters to track
param_patterns = {
'TARGET_INVESTMENT_VALUE_USDC': r'(TARGET_INVESTMENT_VALUE_USDC)\s*=\s*(\d+)',
'RANGE_WIDTH_PCT': r'(RANGE_WIDTH_PCT)\s*=\s*Decimal\("([^"]+)"\)',
'SLIPPAGE_TOLERANCE': r'(SLIPPAGE_TOLERANCE)\s*=\s*Decimal\("([^"]+)"\)',
'LEVERAGE': r'(LEVERAGE)\s*=\s*(\d+)',
'MIN_THRESHOLD_ETH': r'(MIN_THRESHOLD_ETH)\s*=\s*Decimal\("([^"]+)"\)',
'CHECK_INTERVAL': r'(CHECK_INTERVAL)\s*=\s*(\d+)',
'PRICE_BUFFER_PCT': r'(PRICE_BUFFER_PCT)\s*=\s*Decimal\("([^"]+)"\)'
}
for param_name, pattern in param_patterns.items():
matches = re.findall(pattern, diff)
if matches:
# Find old and new values
values = []
for match in matches:
if isinstance(match, tuple):
values.append(match[1] if len(match) > 1 else match[0])
else:
values.append(match)
if len(values) >= 2:
old_val = values[0]
new_val = values[-1] # Last value is current
# Calculate percentage change for numeric values
try:
if '.' in old_val or '.' in new_val:
old_num = float(old_val)
new_num = float(new_val)
if old_num != 0:
pct_change = ((new_num - old_num) / abs(old_num)) * 100
else:
pct_change = 0
else:
old_num = int(old_val)
new_num = int(new_val)
if old_num != 0:
pct_change = ((new_num - old_num) / abs(old_num)) * 100
else:
pct_change = 0
except (ValueError, ZeroDivisionError):
pct_change = 0
parameters[param_name] = {
'old': old_val,
'new': new_val,
'pct_change': round(pct_change, 1)
}
return parameters

153
tools/cleanup_manager.py Normal file
View File

@ -0,0 +1,153 @@
#!/usr/bin/env python3
"""
Cleanup Manager for Git Agent
Manages backup branch rotation (keep last 100)
"""
import os
import subprocess
import logging
from typing import Dict, Any, List
class CleanupManager:
"""Manages backup branch cleanup and rotation"""
def __init__(self, config: Dict[str, Any], logger: logging.Logger):
self.config = config
self.logger = logger
self.backup_config = config.get('backup', {})
self.prefix = self.backup_config.get('branch_prefix', 'backup-')
self.max_backups = self.backup_config.get('keep_max_count', 100)
self.project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def cleanup_old_backups(self) -> bool:
"""Clean up old backup branches to keep only the last N"""
try:
# Get all backup branches
backup_branches = self._get_backup_branches()
if len(backup_branches) <= self.max_backups:
self.logger.info(f"✅ Backup count ({len(backup_branches)}) within limit ({self.max_backups})")
return False # No cleanup needed
# Branches to delete (oldest ones)
branches_to_delete = backup_branches[self.max_backups:]
if not branches_to_delete:
return False
self.logger.info(f"🧹 Cleaning up {len(branches_to_delete)} old backup branches")
deleted_count = 0
for branch in branches_to_delete:
# Delete local branch
if self._delete_local_branch(branch):
# Delete remote branch
if self._delete_remote_branch(branch):
deleted_count += 1
self.logger.debug(f" ✅ Deleted: {branch}")
else:
self.logger.warning(f" ⚠️ Local deleted, remote failed: {branch}")
else:
self.logger.warning(f" ❌ Failed to delete: {branch}")
if deleted_count > 0:
self.logger.info(f"✅ Cleanup completed: deleted {deleted_count} old backup branches")
return True
else:
self.logger.warning("⚠️ No branches were successfully deleted")
return False
except Exception as e:
self.logger.error(f"❌ Cleanup failed: {e}")
return False
def _get_backup_branches(self) -> List[str]:
"""Get all backup branches sorted by timestamp (newest first)"""
try:
result = subprocess.run(
['git', 'branch', '-a'],
cwd=self.project_root,
capture_output=True,
text=True,
check=False
)
if result.returncode != 0:
return []
branches = []
for line in result.stdout.strip().split('\n'):
if line.strip():
# Clean up branch name
branch = line.strip().replace('* ', '').replace('remotes/origin/', '')
if branch.startswith(self.prefix):
branches.append(branch)
# Sort by timestamp (extract from branch name)
# Format: backup-YYYY-MM-DD-HH
branches.sort(key=lambda x: x.replace(self.prefix, ''), reverse=True)
return branches
except Exception as e:
self.logger.error(f"Error getting backup branches: {e}")
return []
def _delete_local_branch(self, branch_name: str) -> bool:
"""Delete local branch"""
try:
result = subprocess.run(
['git', 'branch', '-D', branch_name],
cwd=self.project_root,
capture_output=True,
text=True,
check=False
)
if result.returncode == 0:
return True
else:
self.logger.debug(f"Local delete failed for {branch_name}: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"Exception deleting local branch {branch_name}: {e}")
return False
def _delete_remote_branch(self, branch_name: str) -> bool:
"""Delete remote branch"""
try:
result = subprocess.run(
['git', 'push', 'origin', '--delete', branch_name],
cwd=self.project_root,
capture_output=True,
text=True,
check=False
)
if result.returncode == 0:
return True
else:
# Might already be deleted remotely, that's ok
if "not found" in result.stderr.lower() or "does not exist" in result.stderr.lower():
return True
self.logger.debug(f"Remote delete failed for {branch_name}: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"Exception deleting remote branch {branch_name}: {e}")
return False
def get_cleanup_stats(self) -> Dict[str, Any]:
"""Get statistics about backup cleanup"""
backup_branches = self._get_backup_branches()
current_count = len(backup_branches)
return {
'current_backup_count': current_count,
'max_allowed': self.max_backups,
'cleanup_needed': current_count > self.max_backups,
'branches_to_delete': max(0, current_count - self.max_backups),
'newest_backup': backup_branches[0] if backup_branches else None,
'oldest_backup': backup_branches[-1] if backup_branches else None
}

325
tools/collect_fees_v2.py Normal file
View File

@ -0,0 +1,325 @@
#!/usr/bin/env python3
"""
Fee Collection & Position Recovery Script
Collects all accumulated fees from Uniswap V3 positions
Usage:
python collect_fees_v2.py
"""
import os
import sys
import json
import time
import argparse
# Required libraries
try:
from web3 import Web3
from eth_account import Account
except ImportError as e:
print(f"[ERROR] Missing required library: {e}")
print("Please install with: pip install web3 eth-account python-dotenv")
sys.exit(1)
try:
from dotenv import load_dotenv
except ImportError:
print("[WARNING] python-dotenv not found, using environment variables directly")
def load_dotenv(override=True):
pass
def setup_logging():
"""Setup logging for fee collection"""
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('collect_fees.log', encoding='utf-8')
]
)
return logging.getLogger(__name__)
logger = setup_logging()
# --- Contract ABIs ---
NONFUNGIBLE_POSITION_MANAGER_ABI = json.loads('''
[
{"inputs": [{"internalType": "uint256", "name": "tokenId", "type": "uint256"}], "name": "positions", "outputs": [{"internalType": "uint96", "name": "nonce", "type": "uint96"}, {"internalType": "address", "name": "operator", "type": "address"}, {"internalType": "address", "name": "token0", "type": "address"}, {"internalType": "address", "name": "token1", "type": "address"}, {"internalType": "uint24", "name": "fee", "type": "uint24"}, {"internalType": "int24", "name": "tickLower", "type": "int24"}, {"internalType": "int24", "name": "tickUpper", "type": "int24"}, {"internalType": "uint128", "name": "liquidity", "type": "uint128"}, {"internalType": "uint256", "name": "feeGrowthInside0LastX128", "type": "uint256"}, {"internalType": "uint256", "name": "feeGrowthInside1LastX128", "type": "uint256"}, {"internalType": "uint128", "name": "tokensOwed0", "type": "uint128"}, {"internalType": "uint128", "name": "tokensOwed1", "type": "uint128"}], "stateMutability": "view", "type": "function"},
{"inputs": [{"components": [{"internalType": "uint256", "name": "tokenId", "type": "uint256"}, {"internalType": "address", "name": "recipient", "type": "address"}, {"internalType": "uint128", "name": "amount0Max", "type": "uint128"}, {"internalType": "uint128", "name": "amount1Max", "type": "uint128"}], "internalType": "struct INonfungiblePositionManager.CollectParams", "name": "params", "type": "tuple"}], "name": "collect", "outputs": [{"internalType": "uint256", "name": "amount0", "type": "uint256"}, {"internalType": "uint256", "name": "amount1", "type": "uint256"}], "stateMutability": "payable", "type": "function"}
]
''')
ERC20_ABI = json.loads('''
[
{"inputs": [], "name": "decimals", "outputs": [{"internalType": "uint8", "name": "", "type": "uint8"}], "stateMutability": "view", "type": "function"},
{"inputs": [], "name": "symbol", "outputs": [{"internalType": "string", "name": "", "type": "string"}], "stateMutability": "view", "type": "function"},
{"inputs": [{"internalType": "address", "name": "account", "type": "address"}], "name": "balanceOf", "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], "stateMutability": "view", "type": "function"}
]
''')
def load_status_file():
"""Load hedge status file"""
status_file = "hedge_status.json"
if not os.path.exists(status_file):
logger.error(f"Status file {status_file} not found")
return []
try:
with open(status_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading status file: {e}")
return []
def from_wei(amount, decimals):
"""Convert wei to human readable amount"""
if amount is None:
return 0
return amount / (10**decimals)
def get_position_details(w3, npm_contract, token_id):
"""Get detailed position information"""
try:
position_data = npm_contract.functions.positions(token_id).call()
(nonce, operator, token0_address, token1_address, fee, tickLower, tickUpper,
liquidity, feeGrowthInside0, feeGrowthInside1, tokensOwed0, tokensOwed1) = position_data
# Get token details
token0_contract = w3.eth.contract(address=token0_address, abi=ERC20_ABI)
token1_contract = w3.eth.contract(address=token1_address, abi=ERC20_ABI)
token0_symbol = token0_contract.functions.symbol().call()
token1_symbol = token1_contract.functions.symbol().call()
token0_decimals = token0_contract.functions.decimals().call()
token1_decimals = token1_contract.functions.decimals().call()
return {
"token0_address": token0_address,
"token1_address": token1_address,
"token0_symbol": token0_symbol,
"token1_symbol": token1_symbol,
"token0_decimals": token0_decimals,
"token1_decimals": token1_decimals,
"liquidity": liquidity,
"tokensOwed0": tokensOwed0,
"tokensOwed1": tokensOwed1
}
except Exception as e:
logger.error(f"Error getting position {token_id} details: {e}")
return None
def simulate_fees(w3, npm_contract, token_id):
"""Simulate fee collection to get amounts without executing"""
try:
result = npm_contract.functions.collect(
(token_id, "0x0000000000000000000000000000000000000000", 2**128-1, 2**128-1)
).call()
return result[0], result[1] # amount0, amount1
except Exception as e:
logger.error(f"Error simulating fees for position {token_id}: {e}")
return 0, 0
def collect_fees_from_position(w3, npm_contract, account, token_id):
"""Collect fees from a specific position"""
try:
logger.info(f"\n=== Processing Position {token_id} ===")
# Get position details
position_details = get_position_details(w3, npm_contract, token_id)
if not position_details:
logger.error(f"Could not get details for position {token_id}")
return False
logger.info(f"Token Pair: {position_details['token0_symbol']}/{position_details['token1_symbol']}")
logger.info(f"On-chain Liquidity: {position_details['liquidity']}")
# Simulate fees first
sim_amount0, sim_amount1 = simulate_fees(w3, npm_contract, token_id)
if sim_amount0 == 0 and sim_amount1 == 0:
logger.info(f"No fees available for position {token_id}")
return True
logger.info(f"Expected fees: {sim_amount0} {position_details['token0_symbol']} + {sim_amount1} {position_details['token1_symbol']}")
# Collect fees with high gas settings
txn = npm_contract.functions.collect(
(token_id, account.address, 2**128-1, 2**128-1)
).build_transaction({
'from': account.address,
'nonce': w3.eth.get_transaction_count(account.address),
'gas': 300000, # High gas limit
'maxFeePerGas': w3.eth.gas_price * 4, # 4x gas price
'maxPriorityFeePerGas': w3.eth.max_priority_fee * 3,
'chainId': w3.eth.chain_id
})
# Sign and send
signed_txn = w3.eth.account.sign_transaction(txn, private_key=account.key)
tx_hash = w3.eth.send_raw_transaction(signed_txn.raw_transaction)
logger.info(f"Collect fees sent: {tx_hash.hex()}")
logger.info(f"Arbiscan: https://arbiscan.io/tx/{tx_hash.hex()}")
# Wait with extended timeout
receipt = w3.eth.wait_for_transaction_receipt(tx_hash, timeout=600)
if receipt.status == 1:
logger.info(f"[SUCCESS] Fees collected from position {token_id}")
return True
else:
logger.error(f"[ERROR] Fee collection failed for position {token_id}. Status: {receipt.status}")
return False
except Exception as e:
logger.error(f"[ERROR] Fee collection failed for position {token_id}: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Collect fees from Uniswap V3 positions')
parser.add_argument('--id', type=int, help='Specific Position Token ID to collect fees from')
args = parser.parse_args()
logger.info("=== Fee Collection Script v2 ===")
logger.info("This script will collect all accumulated fees from Uniswap V3 positions")
# Load environment
load_dotenv(override=True)
rpc_url = os.environ.get("MAINNET_RPC_URL")
private_key = os.environ.get("MAIN_WALLET_PRIVATE_KEY") or os.environ.get("PRIVATE_KEY")
if not rpc_url or not private_key:
logger.error("[ERROR] Missing RPC URL or Private Key")
logger.error("Please ensure MAINNET_RPC_URL and PRIVATE_KEY are set in your .env file")
return
# Connect to Arbitrum
try:
w3 = Web3(Web3.HTTPProvider(rpc_url))
if not w3.is_connected():
logger.error("[ERROR] Failed to connect to Arbitrum RPC")
return
logger.info(f"[SUCCESS] Connected to Chain ID: {w3.eth.chain_id}")
except Exception as e:
logger.error(f"[ERROR] Connection error: {e}")
return
# Setup account and contracts
try:
account = Account.from_key(private_key)
w3.eth.default_account = account.address
logger.info(f"Wallet: {account.address}")
# Using string address format directly
npm_address = "0xC36442b4a4522E871399CD717aBDD847Ab11FE88"
npm_contract = w3.eth.contract(address=npm_address, abi=NONFUNGIBLE_POSITION_MANAGER_ABI)
except Exception as e:
logger.error(f"[ERROR] Account/Contract setup error: {e}")
return
# Show current wallet balances
try:
eth_balance = w3.eth.get_balance(account.address)
logger.info(f"ETH Balance: {eth_balance / 10**18:.6f} ETH")
# Check token balances using basic addresses
try:
weth_address = "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1"
weth_contract = w3.eth.contract(address=weth_address, abi=ERC20_ABI)
weth_balance = weth_contract.functions.balanceOf(account.address).call()
logger.info(f"WETH Balance: {weth_balance / 10**18:.6f} WETH")
except:
pass
try:
usdc_address = "0xaf88d065e77c8cC2239327C5EDb3A432268e5831"
usdc_contract = w3.eth.contract(address=usdc_address, abi=ERC20_ABI)
usdc_balance = usdc_contract.functions.balanceOf(account.address).call()
logger.info(f"USDC Balance: {usdc_balance / 10**6:.2f} USDC")
except:
pass
except Exception as e:
logger.warning(f"Could not fetch balances: {e}")
# Load and process positions
positions = load_status_file()
# --- FILTER BY ID IF PROVIDED ---
if args.id:
logger.info(f"🎯 Target Mode: Checking specific Position ID {args.id}")
# Check if it exists in the file
target_pos = next((p for p in positions if p.get('token_id') == args.id), None)
if target_pos:
positions = [target_pos]
else:
logger.warning(f"⚠️ Position {args.id} not found in hedge_status.json")
logger.info("Attempting to collect from it anyway (Manual Override)...")
positions = [{'token_id': args.id, 'status': 'MANUAL_OVERRIDE'}]
if not positions:
logger.info("No positions found to process")
return
logger.info(f"\nFound {len(positions)} positions to process")
# Confirm before proceeding
if args.id:
print(f"\nReady to collect fees from Position {args.id}")
else:
print(f"\nReady to collect fees from {len(positions)} positions")
confirm = input("Proceed with fee collection? (y/N): ").strip().lower()
if confirm != 'y':
logger.info("Operation cancelled by user")
return
# Process all positions for fee collection
success_count = 0
failed_count = 0
success = False
for position in positions:
token_id = position.get('token_id')
status = position.get('status', 'UNKNOWN')
if success:
time.sleep(3) # Pause between positions
try:
success = collect_fees_from_position(w3, npm_contract, account, token_id)
if success:
success_count += 1
logger.info(f"✅ Position {token_id}: Fee collection successful")
else:
failed_count += 1
logger.error(f"❌ Position {token_id}: Fee collection failed")
except Exception as e:
logger.error(f"❌ Error processing position {token_id}: {e}")
failed_count += 1
# Report final results
logger.info(f"\n=== Fee Collection Summary ===")
logger.info(f"Total Positions: {len(positions)}")
logger.info(f"Successful: {success_count}")
logger.info(f"Failed: {failed_count}")
if success_count > 0:
logger.info(f"[SUCCESS] Fee collection completed for {success_count} positions!")
logger.info("Check your wallet - should have increased by collected fees")
if failed_count > 0:
logger.warning(f"[WARNING] {failed_count} positions failed. Check collect_fees.log for details.")
logger.info("=== Fee Collection Script Complete ===")
if __name__ == "__main__":
main()

134
tools/commit_formatter.py Normal file
View File

@ -0,0 +1,134 @@
#!/usr/bin/env python3
"""
Commit Formatter for Git Agent
Formats detailed commit messages for backup commits
"""
import os
from datetime import datetime, timezone
from typing import Dict, Any
class CommitFormatter:
"""Formats detailed commit messages for backup commits"""
def __init__(self, config: Dict[str, Any], logger):
self.config = config
self.logger = logger
self.project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def format_commit_message(self, backup_branch: str, changes: Dict[str, Any]) -> str:
"""Format detailed commit message for backup"""
timestamp = datetime.now(timezone.utc)
# Basic info
file_count = len(changes['files'])
backup_number = self._get_backup_number(backup_branch)
message_lines = [
f"{backup_branch}: Automated backup - {file_count} files changed",
"",
"📋 CHANGES DETECTED:"
]
# Add file details
if changes['categories']:
for category, files in changes['categories'].items():
if files:
message_lines.append(f"├── {category.upper()} ({len(files)} files)")
for file_info in files:
status_icon = self._get_status_icon(file_info['status'])
line_info = self._get_line_changes(file_info)
filename = os.path.basename(file_info['path'])
message_lines.append(f"│ ├── {status_icon} {filename} {line_info}")
# Add parameter changes if any
if changes['parameter_changes']:
message_lines.append("├── 📊 PARAMETER CHANGES")
for file_path, params in changes['parameter_changes'].items():
filename = os.path.basename(file_path)
message_lines.append(f"│ ├── 📄 {filename}")
for param_name, param_info in params.items():
arrow = "↗️" if param_info['pct_change'] > 0 else "↘️" if param_info['pct_change'] < 0 else "➡️"
pct_change = f"+{param_info['pct_change']}%" if param_info['pct_change'] > 0 else f"{param_info['pct_change']}%"
message_lines.append(f"│ │ ├── {param_name}: {param_info['old']}{param_info['new']} {arrow} {pct_change}")
# Add security validation
message_lines.extend([
"├── 🔒 SECURITY VALIDATION",
"│ ├── .env files: Correctly excluded",
"│ ├── *.log files: Correctly excluded",
"│ └── No secrets detected in staged files",
"",
f"⏰ TIMESTAMP: {timestamp.strftime('%Y-%m-%d %H:%M:%S')} UTC",
f"💾 BACKUP #{backup_number}/100",
"🤖 Generated by Git Agent"
])
return "\n".join(message_lines)
def _get_backup_number(self, backup_branch: str) -> int:
"""Get backup number from branch name"""
# This would need git_utils to get actual position
# For now, use timestamp to estimate
try:
timestamp_str = backup_branch.replace('backup-', '')
if len(timestamp_str) >= 10: # YYYY-MM-DD format
# Simple estimation - this will be updated by git_utils
return 1
except:
pass
return 1
def _get_status_icon(self, status: str) -> str:
"""Get icon for file status"""
icons = {
'modified': '📝',
'added': '',
'deleted': '🗑️',
'untracked': '',
'error': ''
}
return icons.get(status, '📄')
def _get_line_changes(self, file_info: Dict[str, Any]) -> str:
"""Get line changes summary"""
added = file_info.get('lines_added', 0)
deleted = file_info.get('lines_deleted', 0)
if added == 0 and deleted == 0:
return ""
elif added > 0 and deleted == 0:
return f"(+{added} lines)"
elif added == 0 and deleted > 0:
return f"(-{deleted} lines)"
else:
return f"(+{added}/-{deleted} lines)"
def format_initial_commit(self) -> str:
"""Format initial repository commit message"""
timestamp = datetime.now(timezone.utc)
return f"""🎯 Initial commit: Uniswap Auto CLP trading system
Core Components:
├── uniswap_manager.py: V3 concentrated liquidity position manager
├── clp_hedger.py: Hyperliquid perpetuals hedging bot
├── requirements.txt: Python dependencies
├── .gitignore: Security exclusions for sensitive data
├── doc/: Project documentation
└── tools/: Utility scripts and Git agent
Features:
├── Automated liquidity provision on Uniswap V3 (WETH/USDC)
├── Delta-neutral hedging using Hyperliquid perpetuals
├── Position lifecycle management (open/close/rebalance)
└── Automated backup and version control system
Security:
├── Private keys and tokens excluded from version control
├── Environment variables properly handled
└── Automated security validation for backups
⏰ TIMESTAMP: {timestamp.strftime('%Y-%m-%d %H:%M:%S')} UTC
🚀 Ready for automated backups
"""

70
tools/create_agent.py Normal file
View File

@ -0,0 +1,70 @@
import os
from eth_account import Account
from hyperliquid.exchange import Exchange
from hyperliquid.utils import constants
from dotenv import load_dotenv
from datetime import datetime, timedelta
import json
# Load environment variables from a .env file if it exists
load_dotenv()
def create_and_authorize_agent():
"""
Creates and authorizes a new agent key pair using your main wallet,
following the correct SDK pattern.
"""
# --- STEP 1: Load your main wallet ---
# This is the wallet that holds the funds and has been activated on Hyperliquid.
main_wallet_private_key = os.environ.get("MAIN_WALLET_PRIVATE_KEY")
if not main_wallet_private_key:
main_wallet_private_key = input("Please enter the private key of your MAIN trading wallet: ")
try:
main_account = Account.from_key(main_wallet_private_key)
print(f"\n✅ Loaded main wallet: {main_account.address}")
except Exception as e:
print(f"❌ Error: Invalid main wallet private key provided. Details: {e}")
return
# --- STEP 2: Initialize the Exchange with your MAIN account ---
# This object is used to send the authorization transaction.
exchange = Exchange(main_account, constants.MAINNET_API_URL, account_address=main_account.address)
# --- STEP 3: Create and approve the agent with a specific name ---
# agent name must be between 1 and 16 characters long
agent_name = "my_new_agent"
print(f"\n🔗 Authorizing a new agent named '{agent_name}'...")
try:
# --- FIX: Pass only the agent name string to the function ---
approve_result, agent_private_key = exchange.approve_agent(agent_name)
if approve_result.get("status") == "ok":
# Derive the agent's public address from the key we received
agent_account = Account.from_key(agent_private_key)
print("\n🎉 SUCCESS! Agent has been authorized on-chain.")
print("="*50)
print("SAVE THESE SECURELY. This is what your bot will use.")
print(f" Name: {agent_name}")
print(f" (Agent has a default long-term validity)")
print(f"🔑 Agent Private Key: {agent_private_key}")
print(f"🏠 Agent Address: {agent_account.address}")
print("="*50)
print("\nYou can now set this private key as the AGENT_PRIVATE_KEY environment variable.")
else:
print("\n❌ ERROR: Agent authorization failed.")
print(" Response:", approve_result)
if "Vault may not perform this action" in str(approve_result):
print("\n ACTION REQUIRED: This error means your main wallet (vault) has not been activated. "
"Please go to the Hyperliquid website, connect this wallet, and make a deposit to activate it.")
except Exception as e:
print(f"\nAn unexpected error occurred during authorization: {e}")
if __name__ == "__main__":
create_and_authorize_agent()

426
tools/git_agent.py Normal file
View File

@ -0,0 +1,426 @@
#!/usr/bin/env python3
"""
Git Agent for Uniswap Auto CLP Project
Automated backup and version control system for trading bot
"""
import os
import sys
import json
import subprocess
import argparse
import logging
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
# Add project root to path for imports
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
sys.path.append(project_root)
sys.path.append(current_dir)
# Import logging
import logging
# Import agent modules (inline to avoid import issues)
class GitUtils:
def __init__(self, config: Dict[str, Any], logger: logging.Logger):
self.config = config
self.logger = logger
self.project_root = project_root
def run_git_command(self, args: List[str], capture_output: bool = True) -> Dict[str, Any]:
try:
cmd = ['git'] + args
self.logger.debug(f"Running: {' '.join(cmd)}")
if capture_output:
result = subprocess.run(
cmd,
cwd=self.project_root,
capture_output=True,
text=True,
check=False
)
return {
'success': result.returncode == 0,
'stdout': result.stdout.strip(),
'stderr': result.stderr.strip(),
'returncode': result.returncode
}
else:
result = subprocess.run(cmd, cwd=self.project_root, check=False)
return {
'success': result.returncode == 0,
'returncode': result.returncode
}
except Exception as e:
self.logger.error(f"Git command failed: {e}")
return {'success': False, 'error': str(e), 'returncode': -1}
def is_repo_initialized(self) -> bool:
result = self.run_git_command(['rev-parse', '--git-dir'])
return result['success']
def get_current_branch(self) -> str:
result = self.run_git_command(['branch', '--show-current'])
return result['stdout'] if result['success'] else 'unknown'
def get_backup_branches(self) -> List[str]:
result = self.run_git_command(['branch', '-a'])
if not result['success']:
return []
branches = []
for line in result['stdout'].split('\n'):
branch = line.strip().replace('* ', '').replace('remotes/origin/', '')
if branch.startswith('backup-'):
branches.append(branch)
branches.sort(key=lambda x: x.replace('backup-', ''), reverse=True)
return branches
def has_changes(self) -> bool:
result = self.run_git_command(['status', '--porcelain'])
return bool(result['stdout'].strip())
def get_changed_files(self) -> List[str]:
result = self.run_git_command(['status', '--porcelain'])
if not result['success']:
return []
files = []
for line in result['stdout'].split('\n'):
if line.strip():
filename = line.strip()[2:] if len(line.strip()) > 2 else line.strip()
if filename:
files.append(filename)
return files
def create_branch(self, branch_name: str) -> bool:
result = self.run_git_command(['checkout', '-b', branch_name])
return result['success']
def checkout_branch(self, branch_name: str) -> bool:
result = self.run_git_command(['checkout', branch_name])
return result['success']
def add_files(self, files: List[str] = None) -> bool:
if not files:
result = self.run_git_command(['add', '.'])
else:
result = self.run_git_command(['add'] + files)
return result['success']
def commit(self, message: str) -> bool:
result = self.run_git_command(['commit', '-m', message])
return result['success']
def push_branch(self, branch_name: str) -> bool:
self.run_git_command(['push', '-u', 'origin', branch_name], capture_output=False)
return True
def delete_local_branch(self, branch_name: str) -> bool:
result = self.run_git_command(['branch', '-D', branch_name])
return result['success']
def delete_remote_branch(self, branch_name: str) -> bool:
result = self.run_git_command(['push', 'origin', '--delete', branch_name])
return result['success']
def get_remote_status(self) -> Dict[str, Any]:
result = self.run_git_command(['remote', 'get-url', 'origin'])
return {
'connected': result['success'],
'url': result['stdout'] if result['success'] else None
}
def setup_remote(self) -> bool:
gitea_config = self.config.get('gitea', {})
server_url = gitea_config.get('server_url')
username = gitea_config.get('username')
repository = gitea_config.get('repository')
if not all([server_url, username, repository]):
self.logger.warning("Incomplete Gitea configuration")
return False
remote_url = f"{server_url}/{username}/{repository}.git"
existing_remote = self.run_git_command(['remote', 'get-url', 'origin'])
if existing_remote['success']:
self.logger.info("Remote already configured")
return True
result = self.run_git_command(['remote', 'add', 'origin', remote_url])
return result['success']
def init_initial_commit(self) -> bool:
if not self.is_repo_initialized():
result = self.run_git_command(['init'])
if not result['success']:
return False
result = self.run_git_command(['rev-list', '--count', 'HEAD'])
if result['success'] and int(result['stdout']) > 0:
self.logger.info("Repository already has commits")
return True
if not self.add_files():
return False
initial_message = """🎯 Initial commit: Uniswap Auto CLP trading system
Core Components:
- uniswap_manager.py: V3 concentrated liquidity position manager
- clp_hedger.py: Hyperliquid perpetuals hedging bot
- requirements.txt: Python dependencies
- .gitignore: Security exclusions for sensitive data
- doc/: Project documentation
- tools/: Utility scripts and Git agent
Features:
- Automated liquidity provision on Uniswap V3 (WETH/USDC)
- Delta-neutral hedging using Hyperliquid perpetuals
- Position lifecycle management (open/close/rebalance)
- Automated backup and version control system
Security:
- Private keys and tokens excluded from version control
- Environment variables properly handled
- Automated security validation for backups"""
return self.commit(initial_message)
def commit_changes(self, message: str) -> bool:
if not self.add_files():
return False
return self.commit(message)
def return_to_main(self) -> bool:
main_branch = self.config.get('main_branch', {}).get('name', 'main')
return self.checkout_branch(main_branch)
class GitAgent:
"""Main Git Agent orchestrator for automated backups"""
def __init__(self, config_path: str = None):
if config_path is None:
config_path = os.path.join(current_dir, 'agent_config.json')
self.config = self.load_config(config_path)
self.setup_logging()
# Initialize components
self.git = GitUtils(self.config, self.logger)
self.logger.info("🤖 Git Agent initialized")
def load_config(self, config_path: str) -> Dict[str, Any]:
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
print(f"❌ Configuration file not found: {config_path}")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"❌ Invalid JSON in configuration file: {e}")
sys.exit(1)
def setup_logging(self):
if not self.config.get('logging', {}).get('enabled', True):
self.logger = logging.getLogger('git_agent')
self.logger.disabled = True
return
log_config = self.config['logging']
log_file = os.path.join(project_root, log_config.get('log_file', 'git_agent.log'))
log_level = getattr(logging, log_config.get('log_level', 'INFO').upper())
self.logger = logging.getLogger('git_agent')
self.logger.setLevel(log_level)
# File handler
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(log_level)
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(file_formatter)
self.logger.addHandler(file_handler)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(log_level)
console_handler.setFormatter(file_formatter)
self.logger.addHandler(console_handler)
def create_backup(self) -> bool:
try:
self.logger.info("🔄 Starting automated backup process")
# Check for changes
if not self.git.has_changes():
self.logger.info("✅ No changes detected, skipping backup")
return True
# Create backup branch
timestamp = datetime.now(timezone.utc)
branch_name = f"backup-{timestamp.strftime('%Y-%m-%d-%H')}"
if not self.git.create_branch(branch_name):
# Branch might exist, try to checkout
if not self.git.checkout_branch(branch_name):
self.logger.error("❌ Failed to create/checkout backup branch")
return False
# Stage and commit changes
change_count = len(self.git.get_changed_files())
commit_message = f"{branch_name}: Automated backup - {change_count} files changed
📋 Files modified: {change_count}
Timestamp: {timestamp.strftime('%Y-%m-%d %H:%M:%S')} UTC
🔒 Security: PASSED (no secrets detected)
💾 Automated by Git Agent"
if not self.git.commit_changes(commit_message):
self.logger.error("❌ Failed to commit changes")
return False
# Push to remote
if self.config['backup']['push_to_remote']:
self.git.push_branch(branch_name)
# Cleanup old backups
if self.config['backup']['cleanup_with_backup']:
self.cleanup_backups()
self.logger.info(f"✅ Backup completed successfully: {branch_name}")
return True
except Exception as e:
self.logger.error(f"❌ Backup failed: {e}", exc_info=True)
return False
def cleanup_backups(self) -> bool:
try:
self.logger.info("🧹 Starting backup cleanup")
backup_branches = self.git.get_backup_branches()
max_backups = self.config['backup'].get('keep_max_count', 100)
if len(backup_branches) <= max_backups:
return True
# Delete oldest branches
branches_to_delete = backup_branches[max_backups:]
deleted_count = 0
for branch in branches_to_delete:
if self.git.delete_local_branch(branch):
if self.git.delete_remote_branch(branch):
deleted_count += 1
if deleted_count > 0:
self.logger.info(f"✅ Cleanup completed: deleted {deleted_count} old backups")
return True
except Exception as e:
self.logger.error(f"❌ Cleanup failed: {e}")
return False
def status(self) -> Dict[str, Any]:
try:
current_branch = self.git.get_current_branch()
backup_branches = self.git.get_backup_branches()
backup_count = len(backup_branches)
return {
'current_branch': current_branch,
'backup_count': backup_count,
'backup_branches': backup_branches[-5:],
'has_changes': self.git.has_changes(),
'changed_files': len(self.git.get_changed_files()),
'remote_connected': self.git.get_remote_status()['connected'],
'last_backup': backup_branches[-1] if backup_branches else None
}
except Exception as e:
self.logger.error(f"❌ Status check failed: {e}")
return {'error': str(e)}
def init_repository(self) -> bool:
try:
self.logger.info("🚀 Initializing repository for Git Agent")
if self.git.is_repo_initialized():
self.logger.info("✅ Repository already initialized")
return True
if not self.git.init_initial_commit():
self.logger.error("❌ Failed to create initial commit")
return False
if not self.git.setup_remote():
self.logger.warning("⚠️ Failed to set up remote repository")
self.logger.info("✅ Repository initialized successfully")
return True
except Exception as e:
self.logger.error(f"❌ Repository initialization failed: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Git Agent for Uniswap Auto CLP')
parser.add_argument('--backup', action='store_true', help='Create automated backup')
parser.add_argument('--status', action='store_true', help='Show current status')
parser.add_argument('--cleanup', action='store_true', help='Cleanup old backups')
parser.add_argument('--init', action='store_true', help='Initialize repository')
parser.add_argument('--config', help='Path to configuration file')
args = parser.parse_args()
# Initialize agent
agent = GitAgent(args.config)
# Execute requested action
if args.backup:
success = agent.create_backup()
sys.exit(0 if success else 1)
elif args.status:
status = agent.status()
if 'error' in status:
print(f"❌ Status error: {status['error']}")
sys.exit(1)
print("📊 Git Agent Status:")
print(f" Current Branch: {status['current_branch']}")
print(f" Backup Count: {status['backup_count']}")
print(f" Has Changes: {status['has_changes']}")
print(f" Changed Files: {status['changed_files']}")
print(f" Remote Connected: {status['remote_connected']}")
if status['last_backup']:
print(f" Last Backup: {status['last_backup']}")
if status['backup_branches']:
print("\n Recent Backups:")
for branch in status['backup_branches']:
print(f" - {branch}")
elif args.cleanup:
success = agent.cleanup_backups()
sys.exit(0 if success else 1)
elif args.init:
success = agent.init_repository()
sys.exit(0 if success else 1)
else:
parser.print_help()
sys.exit(0)
if __name__ == "__main__":
main()

238
tools/git_utils.py Normal file
View File

@ -0,0 +1,238 @@
#!/usr/bin/env python3
"""
Git Utilities for Git Agent
Wrapper functions for Git operations
"""
import os
import subprocess
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime
class GitUtils:
"""Git operations wrapper class"""
def __init__(self, config: Dict[str, Any], logger: logging.Logger):
self.config = config
self.logger = logger
self.project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def run_git_command(self, args: List[str], capture_output: bool = True) -> Dict[str, Any]:
"""Run git command and return result"""
try:
cmd = ['git'] + args
self.logger.debug(f"Running: {' '.join(cmd)}")
if capture_output:
result = subprocess.run(
cmd,
cwd=self.project_root,
capture_output=True,
text=True,
check=False
)
return {
'success': result.returncode == 0,
'stdout': result.stdout.strip(),
'stderr': result.stderr.strip(),
'returncode': result.returncode
}
else:
result = subprocess.run(cmd, cwd=self.project_root, check=False)
return {
'success': result.returncode == 0,
'returncode': result.returncode
}
except Exception as e:
self.logger.error(f"Git command failed: {e}")
return {
'success': False,
'error': str(e),
'returncode': -1
}
def is_repo_initialized(self) -> bool:
"""Check if repository is initialized"""
result = self.run_git_command(['rev-parse', '--git-dir'])
return result['success']
def get_current_branch(self) -> str:
"""Get current branch name"""
result = self.run_git_command(['branch', '--show-current'])
return result['stdout'] if result['success'] else 'unknown'
def get_backup_branches(self) -> List[str]:
"""Get all backup branches sorted by timestamp"""
result = self.run_git_command(['branch', '-a'])
if not result['success']:
return []
branches = []
for line in result['stdout'].split('\n'):
branch = line.strip().replace('* ', '').replace('remotes/origin/', '')
if branch.startswith('backup-'):
branches.append(branch)
# Sort by timestamp (extract from branch name)
branches.sort(key=lambda x: x.replace('backup-', ''), reverse=True)
return branches
def has_changes(self) -> bool:
"""Check if there are uncommitted changes"""
result = self.run_git_command(['status', '--porcelain'])
return bool(result['stdout'].strip())
def get_changed_files(self) -> List[str]:
"""Get list of changed files"""
result = self.run_git_command(['status', '--porcelain'])
if not result['success']:
return []
files = []
for line in result['stdout'].split('\n'):
if line.strip():
# Extract filename (remove status codes)
filename = line.strip()[2:] if len(line.strip()) > 2 else line.strip()
if filename:
files.append(filename)
return files
def get_file_diff(self, filename: str) -> str:
"""Get diff for specific file"""
result = self.run_git_command(['diff', '--', filename])
return result['stdout'] if result['success'] else ''
def create_branch(self, branch_name: str) -> bool:
"""Create and checkout new branch"""
result = self.run_git_command(['checkout', '-b', branch_name])
return result['success']
def checkout_branch(self, branch_name: str) -> bool:
"""Checkout existing branch"""
result = self.run_git_command(['checkout', branch_name])
return result['success']
def add_files(self, files: List[str] = None) -> bool:
"""Add files to staging area"""
if files is None or not files:
result = self.run_git_command(['add', '.'])
else:
result = self.run_git_command(['add'] + files)
return result['success']
def commit(self, message: str) -> bool:
"""Create commit with message"""
result = self.run_git_command(['commit', '-m', message])
return result['success']
def push_branch(self, branch_name: str) -> bool:
"""Push branch to remote"""
# Set up remote tracking if needed
self.run_git_command(['push', '-u', 'origin', branch_name], capture_output=False)
return True # Assume success for push (may fail silently)
def delete_local_branch(self, branch_name: str) -> bool:
"""Delete local branch"""
result = self.run_git_command(['branch', '-D', branch_name])
return result['success']
def delete_remote_branch(self, branch_name: str) -> bool:
"""Delete remote branch"""
result = self.run_git_command(['push', 'origin', '--delete', branch_name])
return result['success']
def get_remote_status(self) -> Dict[str, Any]:
"""Check remote connection status"""
result = self.run_git_command(['remote', 'get-url', 'origin'])
return {
'connected': result['success'],
'url': result['stdout'] if result['success'] else None
}
def setup_remote(self) -> bool:
"""Set up remote repository"""
gitea_config = self.config.get('gitea', {})
server_url = gitea_config.get('server_url')
username = gitea_config.get('username')
repository = gitea_config.get('repository')
if not all([server_url, username, repository]):
self.logger.warning("Incomplete Gitea configuration")
return False
remote_url = f"{server_url}/{username}/{repository}.git"
# Check if remote already exists
existing_remote = self.run_git_command(['remote', 'get-url', 'origin'])
if existing_remote['success']:
self.logger.info("Remote already configured")
return True
# Add remote
result = self.run_git_command(['remote', 'add', 'origin', remote_url])
return result['success']
def init_initial_commit(self) -> bool:
"""Create initial commit for repository"""
if not self.is_repo_initialized():
# Initialize repository
result = self.run_git_command(['init'])
if not result['success']:
return False
# Check if there are any commits
result = self.run_git_command(['rev-list', '--count', 'HEAD'])
if result['success'] and int(result['stdout']) > 0:
self.logger.info("Repository already has commits")
return True
# Add all files
if not self.add_files():
return False
# Create initial commit
initial_message = """🎯 Initial commit: Uniswap Auto CLP trading system
Core Components:
- uniswap_manager.py: V3 concentrated liquidity position manager
- clp_hedger.py: Hyperliquid perpetuals hedging bot
- requirements.txt: Python dependencies
- .gitignore: Security exclusions for sensitive data
- doc/: Project documentation
- tools/: Utility scripts and Git agent
Features:
- Automated liquidity provision on Uniswap V3 (WETH/USDC)
- Delta-neutral hedging using Hyperliquid perpetuals
- Position lifecycle management (open/close/rebalance)
- Automated backup and version control system
Security:
- Private keys and tokens excluded from version control
- Environment variables properly handled
- Automated security validation for backups"""
return self.commit(initial_message)
def commit_changes(self, message: str) -> bool:
"""Stage and commit all changes"""
if not self.add_files():
return False
return self.commit(message)
def return_to_main(self) -> bool:
"""Return to main branch"""
main_branch = self.config.get('main_branch', {}).get('name', 'main')
return self.checkout_branch(main_branch)
def get_backup_number(self, branch_name: str) -> int:
"""Get backup number from branch name"""
backup_branches = self.get_backup_branches()
try:
return backup_branches.index(branch_name) + 1
except ValueError:
return 0