/** * WalletStreamManager — Single WebSocket, multiplexed wallet subscriptions. * Cross-tab leader election via BroadcastChannel. * * One leader owns the physical WebSocket; followers relay * subscriptions and receive broadcasts through BroadcastChannel. * Automatic failover when the leader tab closes. * * Usage: * import { WalletStreamManager } from './stream.js'; * const stream = new WalletStreamManager(walletManagerInstance); * stream.connect(); */ import { WalletManager } from './wallets.js'; /* ------------------------------------------------------------------ */ /* Connection Config */ /* ------------------------------------------------------------------ */ const WS_URL = `${location.protocol === 'https:' ? 'wss:' : 'ws:'}//${location.host}/ws/portfolio`; const CHANNEL_NAME = 'cbBTC-portfolio-sync'; const HEARTBEAT_INTERVAL = 200; const HEARTBEAT_KEY = 'cbbtc_leadership'; const BASE_DELAY = 1_000; const MAX_DELAY = 30_000; const JITTER_RATIO = 0.25; const WS_CONNECTING = 0; const WS_OPEN = 1; const WS_CLOSING = 2; const WS_CLOSED = 3; /* ------------------------------------------------------------------ */ /* WalletStreamManager */ /* ------------------------------------------------------------------ */ export class WalletStreamManager { constructor(walletManager) { if (!(walletManager instanceof WalletManager)) { throw new TypeError('WalletStreamManager requires a WalletManager instance.'); } /** @type {WalletManager} */ this._wm = walletManager; /** @type {WebSocket|null} */ this._ws = null; this._state = WS_CLOSED; this._retryCount = 0; this._reconnectTimer = null; this._listeners = new Map(); this._messageBuffer = []; /* Cross-tab state */ /** @type {boolean} */ this._isLeader = false; /** @type {BroadcastChannel|null} */ this._channel = null; /** @type {ReturnType|null} */ this._heartbeatTimer = null; /** @type {boolean} */ this._wasIntentionalClose = false; /** @type {boolean} */ this._wasConnectedBefore = false; } /* ---------------------------------------------------------------- */ /* Connection Lifecycle */ /* ---------------------------------------------------------------- */ connect() { if (this._state === WS_OPEN || this._state === WS_CONNECTING) { return; } /* Cross-tab leadership — only run on initial boot, not reconnect */ this._initBroadcastChannel(); if (!this._wasConnectedBefore) { this._checkLeadership(); this._wasConnectedBefore = true; } /* If leader, open WebSocket and start heartbeats */ if (this._isLeader) { this._state = WS_CONNECTING; this._wasIntentionalClose = false; this._ws = new WebSocket(WS_URL); this._ws.binaryType = 'arraybuffer'; this._ws.onopen = () => { this._state = WS_OPEN; this._retryCount = 0; this._sendSubscription(); this._notify('connected', { url: WS_URL }); }; this._ws.onmessage = (event) => { const data = this._parseSafe(event.data); if (!data) return; /* Only leader broadcasts to other tabs */ if (this._isLeader && this._channel) { this._channel.postMessage(JSON.stringify(data)); } this._route(data); }; this._ws.onerror = () => { /* handled by onclose */ }; this._ws.onclose = (evt) => { this._state = WS_CLOSED; if (!this._wasIntentionalClose) { this._scheduleReconnect(); } }; this._startHeartbeats(); } else { /* Follower — no WebSocket */ this._state = WS_CLOSED; } } disconnect() { this._wasIntentionalClose = true; /* Broadcast LEADER_DEATH before channel close so followers can elect a new leader */ if (this._isLeader && this._channel) { try { this._channel.postMessage(JSON.stringify({ type: 'LEADER_DEATH' })); } catch { /* channel already closed */ } } if (this._reconnectTimer) { clearTimeout(this._reconnectTimer); this._reconnectTimer = null; } if (this._heartbeatTimer) { clearInterval(this._heartbeatTimer); this._heartbeatTimer = null; } if (this._channel) { this._channel.close(); this._channel = null; } if (this._ws) { this._ws.onclose = null; try { this._ws.close(); } catch { /* already closed */ } this._ws = null; } /* Release leadership */ if (this._isLeader) { try { localStorage.removeItem(HEARTBEAT_KEY); } catch {} this._isLeader = false; } this._state = WS_CLOSED; } /** * Broadcast LEADER_DEATH when this tab is closing and it holds leadership. * Used by the test to simulate the unload event. */ _handleExit() { if (this._isLeader && this._channel) { try { this._channel.postMessage(JSON.stringify({ type: 'LEADER_DEATH' })); } catch { /* channel already closed */ } } } /* ---------------------------------------------------------------- */ /* Subscriptions */ /* ---------------------------------------------------------------- */ subscribeToNewWallet(wallet) { /* If follower, proxy the request to the leader */ if (!this._isLeader && this._channel) { this._channel.postMessage(JSON.stringify({ type: 'SUBSCRIBE_REQUEST', wallet })); return; } /* Leader handles directly */ if (this._state !== WS_OPEN || !this._ws) return; if (!this._wm.findWallet(wallet.address, wallet.chain)) { const result = this._wm.addWallet(wallet.address, wallet.chain); if (result.success) { this._subscribeWallet(wallet); } } } unsubscribeFromWallet(address, chain) { if (!this._isLeader && this._channel) { this._channel.postMessage(JSON.stringify({ type: 'UNSUBSCRIBE_REQUEST', wallet: { address, chain } })); return; } if (this._state !== WS_OPEN || !this._ws) return; this._send({ action: 'unsubscribe_wallet', wallet: { address, chain }, }); } /* ---------------------------------------------------------------- */ /* Callback Registration */ /* ---------------------------------------------------------------- */ on(event, callback) { if (!this._listeners.has(event)) { this._listeners.set(event, new Set()); } const set = this._listeners.get(event); set.add(callback); return () => set.delete(callback); } /* ---------------------------------------------------------------- */ /* Internal helpers */ /* ---------------------------------------------------------------- */ _sendSubscription() { const wallets = this._wm.getWallets(); this._send({ action: 'subscribe', subscriptions: wallets.map((w) => ({ address: w.address, chain: w.chain, signature: w.signature ?? null, messageData: w.messageData ?? null, })), }); } _subscribeWallet(wallet) { this._send({ action: 'subscribe_wallet', wallet: { address: wallet.address, chain: wallet.chain, signature: wallet.signature ?? null, messageData: wallet.messageData ?? null, }, }); } _parseSafe(raw) { if (typeof raw === 'string') { try { return JSON.parse(raw); } catch { return null; } } if (typeof raw === 'object' && raw !== null) return raw; return null; } _route(data) { const type = data?.event ?? data?.type ?? 'message'; const typeCallbacks = this._listeners.get(type); if (typeCallbacks) { typeCallbacks.forEach(fn => { try { fn(data); } catch (err) { console.error(`Callback error for event "${type}":`, err); } }); } const allCallbacks = this._listeners.get('message'); if (allCallbacks) { allCallbacks.forEach(fn => { try { fn(data); } catch (err) { console.error('Callback error for "message":', err); } }); } } _notify(event, payload) { this._route({ event, ...payload }); } _send(payload) { try { this._ws.send(JSON.stringify(payload)); } catch { /* ws closing/closed */ } } /* ---------------------------------------------------------------- */ /* Reconnect + Backoff */ /* ---------------------------------------------------------------- */ _scheduleReconnect() { if (this._reconnectTimer) { clearTimeout(this._reconnectTimer); this._reconnectTimer = null; } const delay = this._jitteredBackoff(this._retryCount); this._retryCount++; this._reconnectTimer = setTimeout(() => { this._reconnectTimer = null; this.connect(); }, delay); } _jitteredBackoff(attempt) { const base = Math.min(BASE_DELAY * (1 << attempt), MAX_DELAY); const jitter = base * JITTER_RATIO * (Math.random() * 2 - 1); return Math.max(100, Math.round(base + jitter)); } /* ---------------------------------------------------------------- */ /* Leader Election & Heartbeat */ /* ------------------------------------------------------------------ */ _initBroadcastChannel() { if (this._channel) return; /* BroadcastChannel may not be available (Node.js / non-browser) */ /* eslint-disable-next-line no-undef */ if (typeof BroadcastChannel === 'undefined') return; this._channel = new BroadcastChannel(CHANNEL_NAME); this._channel.onmessage = (ev) => { this._handleBroadcastMessage(ev.data); }; } _handleBroadcastMessage(raw) { const data = this._parseSafe(raw); if (!data) return; switch (data.type) { case 'LEADER_ELECTION': /* Someone else is leader — we step down */ if (this._isLeader) { this._isLeader = false; if (this._ws) { try { this._ws.close(); } catch {} this._ws = null; } } break; case 'LEADER_DEATH': /* Leader tab closed — we hold emergency election */ if (this._isLeader) { this._notify('disconnected', { code: null, reason: 'leader death' }); } else { /* Become the new leader — claim leadership immediately */ this._isLeader = true; try { localStorage.removeItem(HEARTBEAT_KEY); } catch {} try { this._setHeartbeat(); } catch {} this._state = WS_CONNECTING; this._wasIntentionalClose = false; /* Start heartbeats before opening socket */ this._startHeartbeats(); this._ws = new WebSocket(WS_URL); this._ws.binaryType = 'arraybuffer'; this._ws.onopen = () => { this._state = WS_OPEN; this._sendSubscription(); }; this._ws.onclose = () => { this._state = WS_CLOSED; }; } break; case 'SUBSCRIBE_REQUEST': if (this._isLeader) { const { wallet } = data; if (wallet) this._subscribeWallet(wallet); } break; case 'UNSUBSCRIBE_REQUEST': if (this._isLeader) { const { wallet: uw } = data; if (uw) { this._wm.removeWallet(uw.address, uw.chain); this._send({ action: 'unsubscribe_wallet', wallet: { address: uw.address, chain: uw.chain }, }); } } break; default: /* Raw server data broadcast from leader → relay locally */ this._route(data); break; } } _checkLeadership() { const stored = localStorage.getItem(HEARTBEAT_KEY); if (stored) { /* Heartbeat from another tab — we follow */ this._isLeader = false; } else { /* No one alive — claim leadership */ this._isLeader = true; try { this._setHeartbeat(); } catch {} if (this._channel) { this._channel.postMessage(JSON.stringify({ type: 'LEADER_ELECTION', timestamp: Date.now() })); } } /* Start heartbeat to keep leadership alive */ this._startHeartbeats(); } _setHeartbeat() { localStorage.setItem(HEARTBEAT_KEY, JSON.stringify({ timestamp: Date.now(), id: `${Date.now()}` })); } _startHeartbeats() { if (this._heartbeatTimer) return; this._heartbeatTimer = setInterval(() => { if (this._isLeader) { this._setHeartbeat(); } }, HEARTBEAT_INTERVAL); } _handleLeaderDeath() { if (this._channel) { this._channel.postMessage(JSON.stringify({ type: 'LEADER_DEATH' })); } } /* ---------------------------------------------------------------- */ /* Public Accessors */ /* ------------------------------------------------------------------ */ get connectionState() { if (this._state === WS_CONNECTING) return 'CONNECTING'; if (this._state === WS_OPEN) return 'OPEN'; if (this._state === WS_CLOSING) return 'CLOSING'; return 'CLOSED'; } get isConnected() { return this._state === WS_OPEN; } get isLeader() { return this._isLeader; } get ws() { return this._ws; } } export default WalletStreamManager;