Files
uniswap_auto_clp/tools/slash_commands_main.py
DiTus 050eeeaff2 backup-2025-12-19-20: Automated backup - 10 files changed
📋 Files modified: 10
 Timestamp: 2025-12-19 20:27:31 UTC
🔒 Security: PASSED (no secrets detected)
💾 Automated by Git Agent
2025-12-19 21:27:31 +01:00

248 lines
10 KiB
Python

#!/usr/bin/env python3
"""
OpenCode Git Agent Integration - Main Slash Commands Handler
Provides all Git Agent slash commands in a single module
"""
import os
import subprocess
import sys
from datetime import datetime, timezone
from typing import List, Optional
class GitSlashCommands:
"""Main slash command handler for Git operations"""
def __init__(self):
self.project_root = "K:\\Projects\\uniswap_auto_clp"
def _run_git_command(self, args: List[str], capture_output: bool = True) -> dict:
"""Helper to run git commands"""
try:
result = subprocess.run(
["git"] + args,
cwd=self.project_root,
capture_output=capture_output,
text=True,
check=False
)
if capture_output:
return {
"success": result.returncode == 0,
"stdout": result.stdout.strip(),
"stderr": result.stderr.strip(),
"returncode": result.returncode
}
else:
return {
"success": result.returncode == 0,
"returncode": result.returncode
}
except Exception as e:
return {
"success": False,
"stdout": "",
"stderr": str(e),
"returncode": -1
}
def _run_agent_command(self, args: List[str]) -> dict:
"""Run git_agent.py command"""
try:
agent_path = os.path.join(self.project_root, "tools", "git_agent.py")
result = subprocess.run(
["python", agent_path] + args,
cwd=self.project_root,
capture_output=True,
text=True,
check=False
)
return {
"success": result.returncode == 0,
"stdout": result.stdout.strip(),
"stderr": result.stderr.strip(),
"returncode": result.returncode
}
except Exception as e:
return {
"success": False,
"stdout": "",
"stderr": str(e),
"returncode": -1
}
def git_backup(self) -> str:
"""Create automated backup"""
result = self._run_agent_command(["--backup"])
if result["success"]:
return "[SUCCESS] Backup completed successfully\n\nAutomated backup created and pushed to remote repository."
else:
error_msg = result["stderr"] or result["stdout"] or "Unknown error"
# Try to extract meaningful error message
if "Backup completed successfully" in result["stdout"]:
return "[SUCCESS] Backup completed successfully\n\nAutomated backup created."
else:
return f"[ERROR] Backup failed\n\nError: {error_msg[:200]}"
def git_status(self) -> str:
"""Show git status"""
# Use our simple implementation to avoid Unicode issues
current_branch = self._run_git_command(["branch", "--show-current"])
status = self._run_git_command(["status", "--porcelain"])
branches = self._run_git_command(["branch", "-a"])
remote = self._run_git_command(["remote", "get-url", "origin"])
backup_branches = []
if branches["success"]:
for line in branches["stdout"].split('\n'):
branch = line.strip().replace('* ', '').replace('remotes/origin/', '')
if branch.startswith('backup-'):
backup_branches.append(branch)
backup_branches.sort(key=lambda x: x.replace('backup-', ''), reverse=True)
current = current_branch["stdout"] if current_branch["success"] else "unknown"
has_changes = bool(status["stdout"].strip()) if status["success"] else False
changed_files = len([line for line in status["stdout"].split('\n') if line.strip()])
remote_connected = remote["success"]
last_backup = backup_branches[0] if backup_branches else None
response = "[INFO] Git Agent Status\n\n"
response += f"• **Current Branch:** {current}\n"
response += f"• **Backup Count:** {len(backup_branches)}\n"
response += f"• **Has Changes:** {has_changes}\n"
response += f"• **Changed Files:** {changed_files}\n"
response += f"• **Remote Connected:** {remote_connected}\n"
if last_backup:
response += f"• **Last Backup:** {last_backup}\n"
if backup_branches:
response += "\n**Recent Backups:**\n"
for branch in backup_branches[:5]:
response += f"{branch}\n"
return response
def git_cleanup(self) -> str:
"""Clean up old backups"""
result = self._run_agent_command(["--cleanup"])
if result["success"]:
return "[SUCCESS] Cleanup completed\n\nOld backup branches have been removed according to retention policy."
else:
error_msg = result["stderr"] or result["stdout"] or "Unknown error"
if "Cleanup completed" in result["stdout"]:
return "[SUCCESS] Cleanup completed\n\nOld backup branches removed."
else:
return f"[ERROR] Cleanup failed\n\nError: {error_msg[:200]}"
def git_restore(self, time_input: Optional[str] = None) -> str:
"""Restore from backup"""
if not time_input:
# Show available backups
branches_result = self._run_git_command(["branch", "-a"])
if not branches_result["success"]:
return "[ERROR] Failed to get backup list"
backup_branches = []
for line in branches_result["stdout"].split('\n'):
branch = line.strip().replace('* ', '').replace('remotes/origin/', '')
if branch.startswith('backup-'):
backup_branches.append(branch)
backup_branches.sort(key=lambda x: x.replace('backup-', ''), reverse=True)
if not backup_branches:
return "[INFO] No backup branches found\n\nUse `/git-backup` to create a backup first."
response = "[INFO] Available Backups\n\nChoose a backup to restore:\n"
for i, branch in enumerate(backup_branches[:10]):
timestamp = branch.replace('backup-', '')
try:
if len(timestamp) >= 13:
formatted = f"{timestamp[:10]} {timestamp[11:13]}:00 UTC"
else:
formatted = timestamp[:10]
except:
formatted = timestamp
response += f"• `{timestamp}` - {formatted}\n"
if len(backup_branches) > 10:
response += f"\n... and {len(backup_branches) - 10} more backups"
response += "\n\n**Usage:** `/git-restore <timestamp>` (e.g., `2025-12-19-14`)"
return response
# Try to restore specific backup
try:
# Format time input to branch name
if len(time_input) == 10: # YYYY-MM-DD
branch_name = f"backup-{time_input}"
elif len(time_input) == 13: # YYYY-MM-DD-HH
branch_name = f"backup-{time_input}"
elif len(time_input) == 8: # MM-DD-HH
current_year = datetime.now().year
branch_name = f"backup-{current_year}-{time_input}"
else:
return "[ERROR] Invalid time format\n\nExpected format: YYYY-MM-DD-HH (e.g., 2025-12-19-14)"
# Check if branch exists
branches_result = self._run_git_command(["branch", "-a"])
matching_branches = []
if branches_result["success"]:
for line in branches_result["stdout"].split('\n'):
branch = line.strip().replace('* ', '').replace('remotes/origin/', '')
if branch_name in branch:
matching_branches.append(branch)
if not matching_branches:
return f"[ERROR] Backup not found\n\nNo backup found for: {time_input}\n\nUse `/git-restore` to see available backups."
target_branch = matching_branches[0]
# Checkout backup branch
checkout_result = self._run_git_command(["checkout", target_branch])
if checkout_result["success"]:
timestamp = target_branch.replace('backup-', '')
try:
if len(timestamp) >= 13:
formatted = f"{timestamp[:10]} {timestamp[11:13]}:00 UTC"
else:
formatted = timestamp[:10]
except:
formatted = timestamp
response = f"[SUCCESS] Restored to backup\n\n"
response += f"Branch: {target_branch}\n"
response += f"Time: {formatted}\n\n"
response += "Note: You're now on a backup branch. Use `git checkout main` to return to the main branch when done."
return response
else:
return f"[ERROR] Restore failed\n\nError: {checkout_result['stderr']}"
except Exception as e:
return f"[ERROR] Restore failed\n\nException: {str(e)}"
def execute_slash_command(command: str, args = None) -> str:
"""Execute a slash command and return formatted response"""
if args is None:
args = []
try:
if command == "git-backup":
return _handler.git_backup()
elif command == "git-status":
return _handler.git_status()
elif command == "git-cleanup":
return _handler.git_cleanup()
elif command == "git-restore":
time_arg = args[0] if args else None
return _handler.git_restore(time_arg)
else:
return f"[ERROR] Unknown command: {command}"
except Exception as e:
return f"[ERROR] Command execution failed: {str(e)}"
# Global handler instance
_handler = GitSlashCommands()