Files
hedge/position.py

173 lines
7.1 KiB
Python

import os
import asyncio
import time
from flextrade.flextrade_client import Client
from flextrade.constants.markets import (
BASE_MARKET_ETH_USD, BASE_MARKET_BTC_USD, BASE_MARKET_BNB_USD,
BASE_MARKET_SHIB_USD, BASE_MARKET_PEPE_USD, BASE_MARKET_SUI_USD,
BASE_MARKET_DOGE_USD, BASE_MARKET_AAVE_USD, BASE_MARKET_HBAR_USD,
BASE_MARKET_VIRTUAL_USD, BASE_MARKET_ADA_USD, BASE_MARKET_PENDLE_USD,
BASE_MARKET_TRX_USD, BASE_MARKET_AVAX_USD, BASE_MARKET_UNI_USD,
BASE_MARKET_SOL_USD, BASE_MARKET_LINK_USD, BASE_MARKET_XRP_USD,
BASE_MARKET_TON_USD
)
from dotenv import load_dotenv
load_dotenv()
RPC_URL = os.getenv("RPC_URL")
PRIVATE_KEY = os.getenv("PRIVATE_KEY")
def list_open_positions(account_id=0, market=None, delay=1, max_retries=3):
"""
List all open positions for a given account, optionally filtered by market
Args:
account_id (int): Account ID to get positions for (default: 0)
market (str): Optional market constant to filter positions (e.g., BASE_MARKET_ETH_USD)
delay (float): Delay in seconds between requests to avoid rate limiting (default: 1)
max_retries (int): Maximum number of retries on rate limit errors (default: 3)
Returns:
list: List of position dictionaries with details
"""
for attempt in range(max_retries):
try:
client = Client(
eth_private_key=PRIVATE_KEY,
rpc_url=RPC_URL
)
if market:
# Get position for specific market
try:
position = client.public.get_position_info(
client.private.get_public_address(), account_id, market
)
positions = [position] if abs(position.get("position_size", 0)) > 0 else []
except Exception as e:
if "429" in str(e) and attempt < max_retries - 1:
print(f"Rate limit hit, waiting {delay * (attempt + 1)} seconds before retry {attempt + 1}/{max_retries}...")
time.sleep(delay * (attempt + 1))
continue
print(f"Error getting position for market {market}: {e}")
return []
else:
# Get all positions for the account
positions = client.public.get_all_position_info(
client.private.get_public_address(), account_id
)
# Filter only positions with non-zero size (open positions)
positions = [pos for pos in positions if abs(pos.get("position_size", 0)) > 0]
open_positions = positions
if open_positions:
market_filter = f" for {market}" if market else ""
print(f"#### Open Positions for Account {account_id}{market_filter} ####")
print(f"Found {len(open_positions)} open position(s):\n")
for i, position in enumerate(open_positions, 1):
account_info = f'{position["primary_account"]}-{position["sub_account_id"]}'
market_name = position["market"]
size = position["position_size"]
entry_price = position["avg_entry_price"]
pnl = position["pnl"]
# Determine position direction
direction = "LONG" if size > 0 else "SHORT"
print(f"{i}. {market_name} ({direction})")
print(f" Account: {account_info}")
print(f" Size: {size:.4f}")
print(f" Entry Price: ${entry_price:.6f}")
print(f" PnL: ${pnl:.4f}")
print()
else:
market_filter = f" in market {market}" if market else ""
print(f"No open positions found for account {account_id}{market_filter}")
return open_positions
except Exception as e:
if "429" in str(e) and attempt < max_retries - 1:
print(f"Rate limit hit, waiting {delay * (attempt + 1)} seconds before retry {attempt + 1}/{max_retries}...")
time.sleep(delay * (attempt + 1))
continue
print(f"Error listing open positions: {e}")
return []
print(f"Max retries ({max_retries}) reached. Unable to fetch positions due to rate limiting.")
return []
async def main():
client = Client(
eth_private_key=PRIVATE_KEY,
rpc_url=RPC_URL
)
# Test the list_open_positions function with delays to avoid rate limiting
print("Testing list_open_positions function:")
# List all open positions for account 0
list_open_positions(0, delay=5)
time.sleep(3) # Wait 3 seconds between calls
# List all open positions for account 1
list_open_positions(1, delay=5)
time.sleep(3) # Wait 3 seconds between calls
# List open positions for specific markets with longer delays
print("Checking specific markets (with delays to avoid rate limiting):")
markets_to_check = [BASE_MARKET_ETH_USD, BASE_MARKET_BTC_USD, BASE_MARKET_SOL_USD]
accounts_to_check = [0, 1]
for account in accounts_to_check:
for market in markets_to_check:
print(f"Checking {market} for account {account}...")
list_open_positions(account, market, delay=2)
time.sleep(2) # Wait 2 seconds between each market check
print("All position checks completed.")
# print("#### Getting position ID from one market ETHUSD (get_position_id) ####")
# position_id = client.public.get_position_id(
# client.private.get_public_address(), 0, BASE_MARKET_ETH_USD)
# print(''.join(format(x, '02x') for x in position_id))
# print("#### Getting position ID from one market BTCUSD (get_position_id) ####")
# position_id = client.public.get_position_id(
# client.private.get_public_address(), 0, BASE_MARKET_BTC_USD)
# print(''.join(format(x, '02x') for x in position_id))
# print("#### Getting all positions info (get_all_position_info) ####")
# positions = client.public.get_all_position_info(
# client.private.get_public_address(), 0)
# for position in positions:
# print(
# f'Account: {position["primary_account"]}-{position["sub_account_id"]}')
# print(f'Market: {position["market"]}')
# print('Size: {0:.4f}'.format(position["position_size"]))
# print('Entry price: {0:.6f}'.format(position["avg_entry_price"]))
# print('Pnl: {0:.4f}'.format(position["pnl"]))
# print("#### Getting positions from one market (get_position_info) ####")
# position = client.public.get_position_info(
# client.private.get_public_address(), 0, BASE_MARKET_ETH_USD)
# print(
# f'Account: {position["primary_account"]}-{position["sub_account_id"]}')
# print(f'Market: {position["market"]}')
# print('Size: {0:.4f}'.format(position["position_size"]))
# print('Entry price: {0:.6f}'.format(position["avg_entry_price"]))
# print('Pnl: {0:.4f}'.format(position["pnl"]))
if __name__ == '__main__':
asyncio.run(main())