offline file with IDs of openned trigger orders

This commit is contained in:
2025-08-05 21:09:45 +02:00
parent 19f920bb53
commit 064fd7b9ce
4 changed files with 640 additions and 71 deletions

469
app.py
View File

@ -1,6 +1,8 @@
import time
from datetime import datetime
import threading
import json
import os
from datetime import datetime
from market import (
market_info_loop,
get_latest_price,
@ -10,7 +12,8 @@ from market import (
get_price_comparison,
get_market_arbitrage_opportunities
)
from order import create_market_order, create_trigger_order, update_trigger_order, cancel_trigger_order
from order import create_market_order, create_trigger_order, update_trigger_order, cancel_trigger_order, list_trigger_orders
from position import list_open_positions
from flextrade.constants.markets import BASE_MARKET_ETH_USD, BASE_MARKET_SOL_USD, BASE_MARKET_BTC_USD
from flextrade.enum import Action
@ -18,10 +21,20 @@ from flextrade.enum import Action
print(f"✅ Action enum imported: BUY={Action.BUY} (boolean), SELL={Action.SELL} (boolean)")
# Configuration
ENABLE_AUTO_HEDGE = False # Set to True to enable automatic hedging
ENABLE_TRIGGER_ORDERS = False # Set to True to enable trigger order functionality
sol_hedge_price = 161 # Example price for SOL auto-hedging
ENABLE_AUTO_HEDGE = True # Set to True to enable automatic hedging
ENABLE_TRIGGER_ORDERS = True # Set to True to enable trigger order functionality
ORDER_TRACKING_FILE = "opened_trigger_orders.json" # File to store all opened trigger orders
# Single coin configuration
MONITORED_COIN = "SOL" # Change this to monitor different coins: SOL, ETH, BTC, etc.
MONITORED_MARKET_NAME = "SOLUSD" # Market name for price lookup
MONITORED_MARKET_CONSTANT = BASE_MARKET_SOL_USD # Market constant for orders
SUB_ACCOUNT_ID = 1 # Example sub-account ID for testing
HEDGE_PRICE_THRESHOLD = 167.75 # Price threshold for auto-hedging
HEDGE_SIZE = 500.0 # Size of the hedge order
HEDGE_PRICE_SL = 1.0 # Stop-loss threshold for hedging
# Global storage for active orders
active_orders = {
"trigger_orders": [], # List of {order_id, market, account_id, action, size, trigger_price}
@ -30,10 +43,329 @@ active_orders = {
# Track executed hedges to prevent duplicates
executed_hedges = {
"sol_hedge_executed": False,
"last_hedge_price": 0
"hedge_executed": False,
"last_hedge_price": 0,
"initial_position_check_done": False
}
def save_last_trigger_order_id(order_id, account_id, market, action, size, trigger_price):
"""
Save a new trigger order to the opened orders JSON file
Args:
order_id (int): Order ID
account_id (int): Account ID
market (str): Market constant
action (Action): Buy/Sell action
size (float): Order size
trigger_price (float): Trigger price
"""
try:
# Load existing orders
existing_orders = load_opened_trigger_orders_data()
if existing_orders is None:
existing_orders = {"opened_orders": []}
# Create new order data
new_order = {
"order_id": order_id,
"account_id": account_id,
"market": str(market),
"action": safe_action_to_string(action),
"size": size,
"trigger_price": trigger_price,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"coin": MONITORED_COIN
}
# Check if order already exists (prevent duplicates)
existing_order_ids = [order.get("order_id") for order in existing_orders.get("opened_orders", [])]
if order_id not in existing_order_ids:
existing_orders["opened_orders"].append(new_order)
# Save updated orders list
with open(ORDER_TRACKING_FILE, 'w') as f:
json.dump(existing_orders, f, indent=2)
print(f"💾 Added trigger order ID {order_id} to {ORDER_TRACKING_FILE}")
print(f" Total opened orders: {len(existing_orders['opened_orders'])}")
else:
print(f" Order ID {order_id} already exists in {ORDER_TRACKING_FILE}")
except Exception as e:
print(f"❌ Error saving trigger order ID: {e}")
def load_opened_trigger_orders_data():
"""
Load all opened trigger orders from JSON file
Returns:
dict: Orders data or None if file doesn't exist or error
"""
try:
if not os.path.exists(ORDER_TRACKING_FILE):
print(f" No existing order tracking file found: {ORDER_TRACKING_FILE}")
return None
with open(ORDER_TRACKING_FILE, 'r') as f:
orders_data = json.load(f)
return orders_data
except Exception as e:
print(f"❌ Error loading trigger orders: {e}")
return None
def load_last_trigger_order_id():
"""
Load and display all opened trigger orders from JSON file
Returns:
dict: Orders data or None if file doesn't exist or error
"""
try:
orders_data = load_opened_trigger_orders_data()
if not orders_data:
return None
opened_orders = orders_data.get("opened_orders", [])
print(f"📂 Loaded opened trigger orders from {ORDER_TRACKING_FILE}:")
print(f" Total active orders: {len(opened_orders)}")
if opened_orders:
print(" Order details:")
for i, order in enumerate(opened_orders, 1):
print(f" {i}. ✅ Order ID: {order.get('order_id', 'N/A')}")
print(f" Account: {order.get('account_id', 'N/A')} | Market: {order.get('market', 'N/A')}")
print(f" Action: {order.get('action', 'N/A')} | Size: {order.get('size', 'N/A')}")
print(f" Trigger Price: ${order.get('trigger_price', 0):.2f}")
print(f" Created: {order.get('created_at', 'N/A')} | Coin: {order.get('coin', 'N/A')}")
if i < len(opened_orders): # Don't add extra line after last order
print()
return orders_data
except Exception as e:
print(f"❌ Error loading trigger orders: {e}")
return None
def clear_last_trigger_order_file():
"""
Clear the opened trigger orders tracking file
"""
try:
if os.path.exists(ORDER_TRACKING_FILE):
os.remove(ORDER_TRACKING_FILE)
print(f"🗑️ Cleared order tracking file: {ORDER_TRACKING_FILE}")
else:
print(f" Order tracking file doesn't exist: {ORDER_TRACKING_FILE}")
except Exception as e:
print(f"❌ Error clearing order tracking file: {e}")
def mark_order_as_cancelled(order_id):
"""
Remove a cancelled order from the tracking file
Args:
order_id (int): Order ID to remove
"""
try:
orders_data = load_opened_trigger_orders_data()
if not orders_data:
print(f" No orders data found to update")
return False
opened_orders = orders_data.get("opened_orders", [])
original_count = len(opened_orders)
# Remove the order with the matching order_id
updated_orders = [order for order in opened_orders if order.get("order_id") != order_id]
if len(updated_orders) < original_count:
# Order was found and removed
orders_data["opened_orders"] = updated_orders
# Save updated orders list
with open(ORDER_TRACKING_FILE, 'w') as f:
json.dump(orders_data, f, indent=2)
print(f"✅ Removed cancelled order ID {order_id} from {ORDER_TRACKING_FILE}")
print(f" Remaining orders: {len(updated_orders)}")
return True
else:
print(f"⚠️ Order ID {order_id} not found in tracking file")
return False
except Exception as e:
print(f"❌ Error removing cancelled order: {e}")
return False
def get_active_orders_from_file():
"""
Get list of active orders from the tracking file
Returns:
list: List of active orders
"""
try:
orders_data = load_opened_trigger_orders_data()
if not orders_data:
return []
# All orders in the file are active since cancelled ones are removed
opened_orders = orders_data.get("opened_orders", [])
return opened_orders
except Exception as e:
print(f"❌ Error getting active orders: {e}")
return []
def check_existing_positions_for_hedges():
"""
Check existing open positions and mark appropriate hedges as executed
to prevent duplicate hedge orders for the monitored coin.
Also closes all existing trigger orders and creates new stop-loss orders for hedged positions.
"""
try:
print(f"🔍 Checking existing positions for {MONITORED_COIN} hedge requirements...")
# First, close all existing trigger orders (if method is available)
print("🧹 Attempting to close existing trigger orders...")
# Load active trigger orders from file
active_orders_in_file = get_active_orders_from_file()
if active_orders_in_file:
print(f"🗑️ Found {len(active_orders_in_file)} active trigger orders to cancel...")
else:
print(" No active trigger orders found in file.")
for account_id in [SUB_ACCOUNT_ID]:
# Cancel each active trigger order found in the file
for order in active_orders_in_file:
order_id = order.get("order_id")
print(f"🗑️ Attempting to cancel trigger order ID: {order_id}")
try:
cancel_result = handle_cancel_trigger_order(account_id, MONITORED_MARKET_CONSTANT, order_id)
if cancel_result:
print(f"✅ Successfully cancelled order {order_id}")
# Remove from file after successful cancellation
mark_order_as_cancelled(order_id)
else:
print(f"❌ Failed to cancel order {order_id}")
except Exception as e:
print(f"⚠️ Error cancelling trigger order {order_id}: {e}")
time.sleep(8) # Delay between cancellations
print("⏳ Brief wait before position check...")
time.sleep(3)
# Now check positions for hedge requirements
hedge_positions_found = []
for account_id in [SUB_ACCOUNT_ID]:
positions = list_open_positions(account_id, delay=2)
if positions:
print(f"📊 Found {len(positions)} open position(s) in account {account_id}")
for position in positions:
market = position.get("market", "")
size = position.get("position_size", 0)
entry_price = position.get("avg_entry_price", 0)
orderIndex = position.get("orderIndex", 0)
direction = "LONG" if size > 0 else "SHORT"
print(f" - {market} ({direction}): Size={size:.4f}, Entry=${entry_price:.2f}, orderIndex={orderIndex}")
# Check hedge requirements for monitored coin
if MONITORED_MARKET_NAME in market and size > 0 and entry_price > HEDGE_PRICE_THRESHOLD:
print(f"{MONITORED_COIN} hedge requirement met: Position size {size:.4f} at ${entry_price:.2f} (above ${HEDGE_PRICE_THRESHOLD})")
executed_hedges["hedge_executed"] = True
executed_hedges["last_hedge_price"] = entry_price
# Store hedge position info for stop-loss creation
hedge_positions_found.append({
'account_id': account_id,
'size': size,
'entry_price': entry_price
})
# Add delay between account checks to avoid rate limiting
time.sleep(3)
# Create stop-loss trigger orders for hedged positions
if hedge_positions_found:
print(f"\n🛡️ Creating stop-loss orders for {len(hedge_positions_found)} hedged position(s)...")
for hedge_pos in hedge_positions_found:
account_id = hedge_pos['account_id']
size = hedge_pos['size']
entry_price = hedge_pos['entry_price']
# Calculate stop-loss price: entry_price - HEDGE_PRICE_SL
stop_loss_price = entry_price - HEDGE_PRICE_SL
print(f"📉 Creating stop-loss order for position:")
print(f" Account: {account_id}, Size: {size:.4f}")
print(f" Entry Price: ${entry_price:.2f}")
print(f" Stop-Loss Price: ${stop_loss_price:.2f} (Entry - ${HEDGE_PRICE_SL})")
print(f" orderIndex: {hedge_pos.get('orderIndex', 'N/A')}")
# Retry logic for creating stop-loss trigger order
max_retries = 3
retry_delay = 5
trigger_result = None
for attempt in range(max_retries):
try:
print(f"🔄 Attempt {attempt + 1}/{max_retries} to create stop-loss order...")
# Create stop-loss trigger order (SELL when price goes below stop-loss price)
trigger_result = handle_trigger_order(
account_id=account_id,
market=MONITORED_MARKET_CONSTANT,
action=Action.SELL,
size=size,
trigger_price=stop_loss_price,
trigger_above=False # Trigger when price goes BELOW stop-loss price
)
if trigger_result:
print(f"✅ Stop-loss trigger order created successfully!")
break
else:
print(f"❌ Failed to create stop-loss trigger order (attempt {attempt + 1})")
except Exception as e:
print(f"❌ Exception during stop-loss creation (attempt {attempt + 1}): {e}")
# Wait before retry if not the last attempt
if attempt < max_retries - 1:
wait_time = retry_delay * (attempt + 1)
print(f"⏳ Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
if not trigger_result:
print(f"❌ Failed to create stop-loss trigger order after {max_retries} attempts")
# Add longer delay between different positions
time.sleep(5)
executed_hedges["initial_position_check_done"] = True
print("✅ Initial position check completed")
# Show current hedge status
print(f"\n📋 {MONITORED_COIN} Hedge Status Summary:")
print(f" {MONITORED_COIN} Hedge: {'✅ Executed' if executed_hedges['hedge_executed'] else '❌ Not executed'}")
if executed_hedges["hedge_executed"]:
print(f" Last Hedge Price: ${executed_hedges['last_hedge_price']:.2f}")
print(f" Stop-Loss Orders: {'✅ Created' if hedge_positions_found else '❌ None needed'}")
print()
except Exception as e:
print(f"❌ Error checking existing positions: {e}")
executed_hedges["initial_position_check_done"] = True # Mark as done even on error to prevent infinite retries
def start_market_data_thread(debug=True, include_binance=True):
"""Start market data collection in a separate thread"""
market_thread = threading.Thread(target=lambda: market_info_loop(debug, include_binance), daemon=True)
@ -43,8 +375,9 @@ def start_market_data_thread(debug=True, include_binance=True):
def store_trigger_order(result, market, account_id, action, size, trigger_price):
"""Store trigger order information for later management"""
if result and 'order' in result and 'orderIndex' in result['order']:
order_id = result['order']['orderIndex']
order_info = {
'order_id': result['order']['orderIndex'],
'order_id': order_id,
'market': market,
'account_id': account_id,
'action': action,
@ -54,7 +387,11 @@ def store_trigger_order(result, market, account_id, action, size, trigger_price)
'created_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
active_orders["trigger_orders"].append(order_info)
print(f"📝 Stored trigger order info: ID {order_info['order_id']}")
print(f"📝 Stored trigger order info: ID {order_id}")
# Save to file for persistence
save_last_trigger_order_id(order_id, account_id, market, action, size, trigger_price)
return order_info
return None
@ -63,12 +400,15 @@ def get_active_trigger_orders():
return active_orders["trigger_orders"].copy()
def remove_trigger_order(order_id):
"""Remove a trigger order from active list"""
"""Remove a trigger order from active list and remove from file"""
active_orders["trigger_orders"] = [
order for order in active_orders["trigger_orders"]
if order['order_id'] != order_id
]
print(f"📝 Removed trigger order ID {order_id} from active list")
# Also remove from the persistent file
mark_order_as_cancelled(order_id)
def safe_action_to_string(action):
"""Convert action to string safely, handling both enum and boolean cases"""
@ -303,8 +643,22 @@ def smart_trigger_order(market_name, market_constant, action, size, trigger_pric
def main():
print("Starting app with market data collection...")
print("-" * 40)
print(f"Starting app - monitoring {MONITORED_COIN} with market data collection...")
print(f"Monitored Coin: {MONITORED_COIN}")
print(f"Market: {MONITORED_MARKET_NAME}")
print(f"Hedge Price Threshold: ${HEDGE_PRICE_THRESHOLD}")
print(f"Auto-hedge enabled: {ENABLE_AUTO_HEDGE}")
print("-" * 50)
# Load last trigger order information on startup
print("🔍 Checking for existing opened trigger orders...")
orders_data = load_last_trigger_order_id()
if orders_data:
total_count = len(orders_data.get("opened_orders", []))
print(f"✅ Found {total_count} active trigger orders for {MONITORED_COIN} coin")
else:
print(" No existing opened trigger orders found")
print()
# Start market data collection in background thread with Binance comparison
start_market_data_thread(debug=False, include_binance=True)
@ -319,38 +673,29 @@ def main():
print(f"[{current_time}] update!")
# Show some sample prices with timestamps (FlexTrade and Binance)
eth_data = get_latest_price_with_timestamp("ETHUSD")
btc_data = get_latest_price_with_timestamp("BTCUSD")
sol_data = get_latest_price_with_timestamp("SOLUSD")
monitored_data = get_latest_price_with_timestamp(MONITORED_MARKET_NAME)
# Check if we have received first successful price update and haven't checked positions yet
if (not executed_hedges["initial_position_check_done"] and monitored_data):
print(f"\n🎯 First successful price update received for {MONITORED_COIN} - checking existing positions...")
check_existing_positions_for_hedges()
# Get Binance prices for comparison
eth_binance = get_latest_price_with_timestamp("ETHUSD_BINANCE")
btc_binance = get_latest_price_with_timestamp("BTCUSD_BINANCE")
sol_binance = get_latest_price_with_timestamp("SOLUSD_BINANCE")
monitored_binance = get_latest_price_with_timestamp(f"{MONITORED_MARKET_NAME}_BINANCE")
if eth_data or eth_binance:
ft_price = f"${eth_data['price']:.2f}" if eth_data else "N/A"
bn_price = f"${eth_binance['price']:.2f}" if eth_binance else "N/A"
print(f"ETH/USD: FlexTrade={ft_price} | Binance={bn_price}")
if btc_data or btc_binance:
ft_price = f"${btc_data['price']:.2f}" if btc_data else "N/A"
bn_price = f"${btc_binance['price']:.2f}" if btc_binance else "N/A"
print(f"BTC/USD: FlexTrade={ft_price} | Binance={bn_price}")
if sol_data or sol_binance:
ft_price = f"${sol_data['price']:.2f}" if sol_data else "N/A"
bn_price = f"${sol_binance['price']:.2f}" if sol_binance else "N/A"
ft_time = sol_data['timestamp_str'] if sol_data else "N/A"
bn_time = sol_binance['timestamp_str'] if sol_binance else "N/A"
print(f"SOL/USD: FlexTrade={ft_price} ({ft_time}) | Binance={bn_price} ({bn_time})")
if monitored_data or monitored_binance:
ft_price = f"${monitored_data['price']:.2f}" if monitored_data else "N/A"
bn_price = f"${monitored_binance['price']:.2f}" if monitored_binance else "N/A"
ft_time = monitored_data['timestamp_str'] if monitored_data else "N/A"
bn_time = monitored_binance['timestamp_str'] if monitored_binance else "N/A"
print(f"{MONITORED_COIN}/USD: FlexTrade={ft_price} ({ft_time}) | Binance={bn_price} ({bn_time})")
# Show price difference if both available
if sol_data and sol_binance:
price_diff = sol_data['price'] - sol_binance['price']
diff_pct = (price_diff / sol_binance['price']) * 100 if sol_binance['price'] > 0 else 0
if monitored_data and monitored_binance:
price_diff = monitored_data['price'] - monitored_binance['price']
diff_pct = (price_diff / monitored_binance['price']) * 100 if monitored_binance['price'] > 0 else 0
diff_emoji = "📈" if price_diff > 0 else "📉" if price_diff < 0 else "➡️"
print(f"SOL Difference: {diff_emoji} {price_diff:+.4f} ({diff_pct:+.2f}%)")
print(f"{MONITORED_COIN} Difference: {diff_emoji} {price_diff:+.4f} ({diff_pct:+.2f}%)")
# Show total number of markets with prices
all_prices = get_all_latest_prices()
@ -385,51 +730,33 @@ def main():
# Auto-hedging logic
if ENABLE_AUTO_HEDGE:
# Market order examples - immediate execution
# if eth_data and eth_data['price'] < 3000: # Buy ETH if price drops below $3000
# print("🤖 Auto-hedge triggered: ETH price below $3000")
# smart_market_order("ETHUSD", BASE_MARKET_ETH_USD, Action.BUY, 0.01)
# SOL hedging logic with duplicate prevention
if (sol_data and sol_data['price'] > sol_hedge_price and
not executed_hedges["sol_hedge_executed"]): # Only execute once
# Hedging logic with duplicate prevention for monitored coin
if (monitored_data and monitored_data['price'] > HEDGE_PRICE_THRESHOLD and
not executed_hedges["hedge_executed"]): # Only execute once
print(f"🤖 Auto-hedge triggered: SOL price above ${sol_hedge_price}")
print(f"🤖 Auto-hedge triggered: {MONITORED_COIN} price above ${HEDGE_PRICE_THRESHOLD}")
# Execute market order to buy SOL
market_result = smart_market_order("SOLUSD", BASE_MARKET_SOL_USD, Action.BUY, 100.0)
# Execute market order to buy the monitored coin
market_result = smart_market_order(MONITORED_MARKET_NAME, MONITORED_MARKET_CONSTANT, Action.BUY, 100.0)
if market_result:
print("🤖 Auto-hedge triggered: SOL stop-loss trigger")
print(f"🤖 Auto-hedge triggered: {MONITORED_COIN} stop-loss trigger")
time.sleep(10)
# Create stop-loss trigger order
trigger_result = smart_trigger_order("SOLUSD", BASE_MARKET_SOL_USD, Action.SELL, 100.0, 0, trigger_above=False, offset_percentage=5)
trigger_result = smart_trigger_order(MONITORED_MARKET_NAME, MONITORED_MARKET_CONSTANT, Action.SELL, 100.0, 0, trigger_above=False, offset_percentage=5)
if trigger_result:
# Mark hedge as executed
executed_hedges["sol_hedge_executed"] = True
executed_hedges["last_hedge_price"] = sol_data['price']
print(f"SOL hedge completed at price ${sol_data['price']:.2f}")
executed_hedges["hedge_executed"] = True
executed_hedges["last_hedge_price"] = monitored_data['price']
print(f"{MONITORED_COIN} hedge completed at price ${monitored_data['price']:.2f}")
# Reset hedge flag if price drops significantly below hedge price
if (sol_data and sol_data['price'] < (sol_hedge_price * 0.95) and
executed_hedges["sol_hedge_executed"]):
print(f"🔄 Resetting SOL hedge flag - price dropped to ${sol_data['price']:.2f}")
executed_hedges["sol_hedge_executed"] = False
if (monitored_data and monitored_data['price'] < (HEDGE_PRICE_THRESHOLD * 0.95) and
executed_hedges["hedge_executed"]):
print(f"🔄 Resetting {MONITORED_COIN} hedge flag - price dropped to ${monitored_data['price']:.2f}")
executed_hedges["hedge_executed"] = False
# Trigger order examples - conditional execution
# Create stop-loss orders:
# if eth_data and eth_data['price'] > 3500: # Set stop-loss 5% below current ETH price
# print("🤖 Setting ETH stop-loss trigger")
# smart_trigger_order("ETHUSD", BASE_MARKET_ETH_USD, Action.SELL, 0.01, 0, trigger_above=False, offset_percentage=5)
# Create take-profit orders:
# if btc_data and btc_data['price'] < 50000: # Set take-profit 10% above current BTC price
# print("🤖 Setting BTC take-profit trigger")
# smart_trigger_order("BTCUSD", BASE_MARKET_BTC_USD, Action.SELL, 0.001, 0, trigger_above=True, offset_percentage=10)
# Sleep for 60 seconds (1 minute)
time.sleep(60)

View File

@ -0,0 +1,14 @@
{
"opened_orders": [
{
"order_id": 27,
"account_id": 1,
"market": "2",
"action": "SELL",
"size": 500.0,
"trigger_price": 167.16671529986962,
"created_at": "2025-08-05 13:17:58",
"coin": "SOL"
}
]
}

View File

@ -161,6 +161,62 @@ def cancel_trigger_order(account_id, market, order_id):
return None
def list_trigger_orders(account_id, market):
"""
List all trigger orders for a specific account and market
Args:
account_id (int): Account ID to get orders for
market (str): Market to get orders for (e.g., BASE_MARKET_ETH_USD)
Returns:
list: List of trigger orders with their details, or empty list if method not available
"""
try:
client = Client(
eth_private_key=PRIVATE_KEY,
rpc_url=RPC_URL
)
# Check if the method exists before calling it
if not hasattr(client.private, 'get_all_orders'):
print(f'⚠️ get_all_orders method not available in FlexTrade SDK')
print(f' Cannot list existing trigger orders - this is expected with some SDK versions')
return []
orders = client.private.get_all_orders(account_id, market)
if orders:
print(f'Found {len(orders)} trigger orders for account {account_id} in market {market}:')
for i, order in enumerate(orders, 1):
order_id = order.get('orderIndex', 'N/A')
action = order.get('action', 'N/A')
size = order.get('size', 'N/A')
trigger_price = order.get('triggerPrice', 'N/A')
trigger_above = order.get('triggerAbove', 'N/A')
is_reduce_only = order.get('isReduceOnly', 'N/A')
print(f' {i}. Order ID: {order_id}')
print(f' Action: {action}')
print(f' Size: {size}')
print(f' Trigger Price: ${trigger_price}')
print(f' Trigger Above: {trigger_above}')
print(f' Reduce Only: {is_reduce_only}')
print()
else:
print(f'No trigger orders found for account {account_id} in market {market}')
return orders if orders else []
except AttributeError as e:
print(f"⚠️ Method not available in FlexTrade SDK: {e}")
print(f" Skipping trigger order listing - continuing with position check")
return []
except Exception as e:
print(f"❌ Error listing trigger orders: {e}")
return []
async def main():
client = Client(
eth_private_key=PRIVATE_KEY,

172
position.py Normal file
View File

@ -0,0 +1,172 @@
import os
import asyncio
import time
from flextrade.flextrade_client import Client
from flextrade.constants.markets import (
BASE_MARKET_ETH_USD, BASE_MARKET_BTC_USD, BASE_MARKET_BNB_USD,
BASE_MARKET_SHIB_USD, BASE_MARKET_PEPE_USD, BASE_MARKET_SUI_USD,
BASE_MARKET_DOGE_USD, BASE_MARKET_AAVE_USD, BASE_MARKET_HBAR_USD,
BASE_MARKET_VIRTUAL_USD, BASE_MARKET_ADA_USD, BASE_MARKET_PENDLE_USD,
BASE_MARKET_TRX_USD, BASE_MARKET_AVAX_USD, BASE_MARKET_UNI_USD,
BASE_MARKET_SOL_USD, BASE_MARKET_LINK_USD, BASE_MARKET_XRP_USD,
BASE_MARKET_TON_USD
)
from dotenv import load_dotenv
load_dotenv()
RPC_URL = os.getenv("RPC_URL")
PRIVATE_KEY = os.getenv("PRIVATE_KEY")
def list_open_positions(account_id=0, market=None, delay=1, max_retries=3):
"""
List all open positions for a given account, optionally filtered by market
Args:
account_id (int): Account ID to get positions for (default: 0)
market (str): Optional market constant to filter positions (e.g., BASE_MARKET_ETH_USD)
delay (float): Delay in seconds between requests to avoid rate limiting (default: 1)
max_retries (int): Maximum number of retries on rate limit errors (default: 3)
Returns:
list: List of position dictionaries with details
"""
for attempt in range(max_retries):
try:
client = Client(
eth_private_key=PRIVATE_KEY,
rpc_url=RPC_URL
)
if market:
# Get position for specific market
try:
position = client.public.get_position_info(
client.private.get_public_address(), account_id, market
)
positions = [position] if abs(position.get("position_size", 0)) > 0 else []
except Exception as e:
if "429" in str(e) and attempt < max_retries - 1:
print(f"Rate limit hit, waiting {delay * (attempt + 1)} seconds before retry {attempt + 1}/{max_retries}...")
time.sleep(delay * (attempt + 1))
continue
print(f"Error getting position for market {market}: {e}")
return []
else:
# Get all positions for the account
positions = client.public.get_all_position_info(
client.private.get_public_address(), account_id
)
# Filter only positions with non-zero size (open positions)
positions = [pos for pos in positions if abs(pos.get("position_size", 0)) > 0]
open_positions = positions
if open_positions:
market_filter = f" for {market}" if market else ""
print(f"#### Open Positions for Account {account_id}{market_filter} ####")
print(f"Found {len(open_positions)} open position(s):\n")
for i, position in enumerate(open_positions, 1):
account_info = f'{position["primary_account"]}-{position["sub_account_id"]}'
market_name = position["market"]
size = position["position_size"]
entry_price = position["avg_entry_price"]
pnl = position["pnl"]
# Determine position direction
direction = "LONG" if size > 0 else "SHORT"
print(f"{i}. {market_name} ({direction})")
print(f" Account: {account_info}")
print(f" Size: {size:.4f}")
print(f" Entry Price: ${entry_price:.6f}")
print(f" PnL: ${pnl:.4f}")
print()
else:
market_filter = f" in market {market}" if market else ""
print(f"No open positions found for account {account_id}{market_filter}")
return open_positions
except Exception as e:
if "429" in str(e) and attempt < max_retries - 1:
print(f"Rate limit hit, waiting {delay * (attempt + 1)} seconds before retry {attempt + 1}/{max_retries}...")
time.sleep(delay * (attempt + 1))
continue
print(f"Error listing open positions: {e}")
return []
print(f"Max retries ({max_retries}) reached. Unable to fetch positions due to rate limiting.")
return []
async def main():
client = Client(
eth_private_key=PRIVATE_KEY,
rpc_url=RPC_URL
)
# Test the list_open_positions function with delays to avoid rate limiting
print("Testing list_open_positions function:")
# List all open positions for account 0
list_open_positions(0, delay=5)
time.sleep(3) # Wait 3 seconds between calls
# List all open positions for account 1
list_open_positions(1, delay=5)
time.sleep(3) # Wait 3 seconds between calls
# List open positions for specific markets with longer delays
print("Checking specific markets (with delays to avoid rate limiting):")
markets_to_check = [BASE_MARKET_ETH_USD, BASE_MARKET_BTC_USD, BASE_MARKET_SOL_USD]
accounts_to_check = [0, 1]
for account in accounts_to_check:
for market in markets_to_check:
print(f"Checking {market} for account {account}...")
list_open_positions(account, market, delay=2)
time.sleep(2) # Wait 2 seconds between each market check
print("All position checks completed.")
# print("#### Getting position ID from one market ETHUSD (get_position_id) ####")
# position_id = client.public.get_position_id(
# client.private.get_public_address(), 0, BASE_MARKET_ETH_USD)
# print(''.join(format(x, '02x') for x in position_id))
# print("#### Getting position ID from one market BTCUSD (get_position_id) ####")
# position_id = client.public.get_position_id(
# client.private.get_public_address(), 0, BASE_MARKET_BTC_USD)
# print(''.join(format(x, '02x') for x in position_id))
# print("#### Getting all positions info (get_all_position_info) ####")
# positions = client.public.get_all_position_info(
# client.private.get_public_address(), 0)
# for position in positions:
# print(
# f'Account: {position["primary_account"]}-{position["sub_account_id"]}')
# print(f'Market: {position["market"]}')
# print('Size: {0:.4f}'.format(position["position_size"]))
# print('Entry price: {0:.6f}'.format(position["avg_entry_price"]))
# print('Pnl: {0:.4f}'.format(position["pnl"]))
# print("#### Getting positions from one market (get_position_info) ####")
# position = client.public.get_position_info(
# client.private.get_public_address(), 0, BASE_MARKET_ETH_USD)
# print(
# f'Account: {position["primary_account"]}-{position["sub_account_id"]}')
# print(f'Market: {position["market"]}')
# print('Size: {0:.4f}'.format(position["position_size"]))
# print('Entry price: {0:.6f}'.format(position["avg_entry_price"]))
# print('Pnl: {0:.4f}'.format(position["pnl"]))
if __name__ == '__main__':
asyncio.run(main())