Files
dione/stream.js
Dione a61e0b0457 feat: multi-wallet architecture with localStorage state, EIP-712 verification, cross-tab WS leadership
- wallets.js: WalletManager state management with chain validation
- verifier.js: WalletVerifier EIP-712 signing via window.ethereum
- stream.js: WalletStreamManager with BroadcastChannel leader election, backoff + jitter
- AGENTS.md: updated with coding guidelines
2026-06-06 12:16:41 +00:00

490 lines
13 KiB
JavaScript

/**
* WalletStreamManager — Single WebSocket, multiplexed wallet subscriptions.
* Cross-tab leader election via BroadcastChannel.
*
* One leader owns the physical WebSocket; followers relay
* subscriptions and receive broadcasts through BroadcastChannel.
* Automatic failover when the leader tab closes.
*
* Usage:
* import { WalletStreamManager } from './stream.js';
* const stream = new WalletStreamManager(walletManagerInstance);
* stream.connect();
*/
import { WalletManager } from './wallets.js';
/* ------------------------------------------------------------------ */
/* Connection Config */
/* ------------------------------------------------------------------ */
const WS_URL = `${location.protocol === 'https:' ? 'wss:' : 'ws:'}//${location.host}/ws/portfolio`;
const CHANNEL_NAME = 'cbBTC-portfolio-sync';
const HEARTBEAT_INTERVAL = 200;
const HEARTBEAT_KEY = 'cbbtc_leadership';
const BASE_DELAY = 1_000;
const MAX_DELAY = 30_000;
const JITTER_RATIO = 0.25;
const WS_CONNECTING = 0;
const WS_OPEN = 1;
const WS_CLOSING = 2;
const WS_CLOSED = 3;
/* ------------------------------------------------------------------ */
/* WalletStreamManager */
/* ------------------------------------------------------------------ */
export class WalletStreamManager {
constructor(walletManager) {
if (!(walletManager instanceof WalletManager)) {
throw new TypeError('WalletStreamManager requires a WalletManager instance.');
}
/** @type {WalletManager} */
this._wm = walletManager;
/** @type {WebSocket|null} */
this._ws = null;
this._state = WS_CLOSED;
this._retryCount = 0;
this._reconnectTimer = null;
this._listeners = new Map();
this._messageBuffer = [];
/* Cross-tab state */
/** @type {boolean} */
this._isLeader = false;
/** @type {BroadcastChannel|null} */
this._channel = null;
/** @type {ReturnType<typeof setInterval>|null} */
this._heartbeatTimer = null;
/** @type {boolean} */
this._wasIntentionalClose = false;
/** @type {boolean} */
this._wasConnectedBefore = false;
}
/* ---------------------------------------------------------------- */
/* Connection Lifecycle */
/* ---------------------------------------------------------------- */
connect() {
if (this._state === WS_OPEN || this._state === WS_CONNECTING) {
return;
}
/* Cross-tab leadership — only run on initial boot, not reconnect */
this._initBroadcastChannel();
if (!this._wasConnectedBefore) {
this._checkLeadership();
this._wasConnectedBefore = true;
}
/* If leader, open WebSocket and start heartbeats */
if (this._isLeader) {
this._state = WS_CONNECTING;
this._wasIntentionalClose = false;
this._ws = new WebSocket(WS_URL);
this._ws.binaryType = 'arraybuffer';
this._ws.onopen = () => {
this._state = WS_OPEN;
this._retryCount = 0;
this._sendSubscription();
this._notify('connected', { url: WS_URL });
};
this._ws.onmessage = (event) => {
const data = this._parseSafe(event.data);
if (!data) return;
/* Only leader broadcasts to other tabs */
if (this._isLeader && this._channel) {
this._channel.postMessage(JSON.stringify(data));
}
this._route(data);
};
this._ws.onerror = () => {
/* handled by onclose */
};
this._ws.onclose = (evt) => {
this._state = WS_CLOSED;
if (!this._wasIntentionalClose) {
this._scheduleReconnect();
}
};
this._startHeartbeats();
} else {
/* Follower — no WebSocket */
this._state = WS_CLOSED;
}
}
disconnect() {
this._wasIntentionalClose = true;
/* Broadcast LEADER_DEATH before channel close so followers can elect a new leader */
if (this._isLeader && this._channel) {
try {
this._channel.postMessage(JSON.stringify({ type: 'LEADER_DEATH' }));
} catch { /* channel already closed */ }
}
if (this._reconnectTimer) {
clearTimeout(this._reconnectTimer);
this._reconnectTimer = null;
}
if (this._heartbeatTimer) {
clearInterval(this._heartbeatTimer);
this._heartbeatTimer = null;
}
if (this._channel) {
this._channel.close();
this._channel = null;
}
if (this._ws) {
this._ws.onclose = null;
try { this._ws.close(); } catch { /* already closed */ }
this._ws = null;
}
/* Release leadership */
if (this._isLeader) {
try { localStorage.removeItem(HEARTBEAT_KEY); } catch {}
this._isLeader = false;
}
this._state = WS_CLOSED;
}
/**
* Broadcast LEADER_DEATH when this tab is closing and it holds leadership.
* Used by the test to simulate the unload event.
*/
_handleExit() {
if (this._isLeader && this._channel) {
try {
this._channel.postMessage(JSON.stringify({ type: 'LEADER_DEATH' }));
} catch { /* channel already closed */ }
}
}
/* ---------------------------------------------------------------- */
/* Subscriptions */
/* ---------------------------------------------------------------- */
subscribeToNewWallet(wallet) {
/* If follower, proxy the request to the leader */
if (!this._isLeader && this._channel) {
this._channel.postMessage(JSON.stringify({
type: 'SUBSCRIBE_REQUEST',
wallet
}));
return;
}
/* Leader handles directly */
if (this._state !== WS_OPEN || !this._ws) return;
if (!this._wm.findWallet(wallet.address, wallet.chain)) {
const result = this._wm.addWallet(wallet.address, wallet.chain);
if (result.success) {
this._subscribeWallet(wallet);
}
}
}
unsubscribeFromWallet(address, chain) {
if (!this._isLeader && this._channel) {
this._channel.postMessage(JSON.stringify({
type: 'UNSUBSCRIBE_REQUEST',
wallet: { address, chain }
}));
return;
}
if (this._state !== WS_OPEN || !this._ws) return;
this._send({
action: 'unsubscribe_wallet',
wallet: { address, chain },
});
}
/* ---------------------------------------------------------------- */
/* Callback Registration */
/* ---------------------------------------------------------------- */
on(event, callback) {
if (!this._listeners.has(event)) {
this._listeners.set(event, new Set());
}
const set = this._listeners.get(event);
set.add(callback);
return () => set.delete(callback);
}
/* ---------------------------------------------------------------- */
/* Internal helpers */
/* ---------------------------------------------------------------- */
_sendSubscription() {
const wallets = this._wm.getWallets();
this._send({
action: 'subscribe',
subscriptions: wallets.map((w) => ({
address: w.address,
chain: w.chain,
signature: w.signature ?? null,
messageData: w.messageData ?? null,
})),
});
}
_subscribeWallet(wallet) {
this._send({
action: 'subscribe_wallet',
wallet: {
address: wallet.address,
chain: wallet.chain,
signature: wallet.signature ?? null,
messageData: wallet.messageData ?? null,
},
});
}
_parseSafe(raw) {
if (typeof raw === 'string') {
try { return JSON.parse(raw); } catch { return null; }
}
if (typeof raw === 'object' && raw !== null) return raw;
return null;
}
_route(data) {
const type = data?.event ?? data?.type ?? 'message';
const typeCallbacks = this._listeners.get(type);
if (typeCallbacks) {
typeCallbacks.forEach(fn => {
try { fn(data); } catch (err) {
console.error(`Callback error for event "${type}":`, err);
}
});
}
const allCallbacks = this._listeners.get('message');
if (allCallbacks) {
allCallbacks.forEach(fn => {
try { fn(data); } catch (err) {
console.error('Callback error for "message":', err);
}
});
}
}
_notify(event, payload) {
this._route({ event, ...payload });
}
_send(payload) {
try {
this._ws.send(JSON.stringify(payload));
} catch {
/* ws closing/closed */
}
}
/* ---------------------------------------------------------------- */
/* Reconnect + Backoff */
/* ---------------------------------------------------------------- */
_scheduleReconnect() {
if (this._reconnectTimer) {
clearTimeout(this._reconnectTimer);
this._reconnectTimer = null;
}
const delay = this._jitteredBackoff(this._retryCount);
this._retryCount++;
this._reconnectTimer = setTimeout(() => {
this._reconnectTimer = null;
this.connect();
}, delay);
}
_jitteredBackoff(attempt) {
const base = Math.min(BASE_DELAY * (1 << attempt), MAX_DELAY);
const jitter = base * JITTER_RATIO * (Math.random() * 2 - 1);
return Math.max(100, Math.round(base + jitter));
}
/* ---------------------------------------------------------------- */
/* Leader Election & Heartbeat */
/* ------------------------------------------------------------------ */
_initBroadcastChannel() {
if (this._channel) return;
/* BroadcastChannel may not be available (Node.js / non-browser) */
/* eslint-disable-next-line no-undef */
if (typeof BroadcastChannel === 'undefined') return;
this._channel = new BroadcastChannel(CHANNEL_NAME);
this._channel.onmessage = (ev) => {
this._handleBroadcastMessage(ev.data);
};
}
_handleBroadcastMessage(raw) {
const data = this._parseSafe(raw);
if (!data) return;
switch (data.type) {
case 'LEADER_ELECTION':
/* Someone else is leader — we step down */
if (this._isLeader) {
this._isLeader = false;
if (this._ws) {
try { this._ws.close(); } catch {}
this._ws = null;
}
}
break;
case 'LEADER_DEATH':
/* Leader tab closed — we hold emergency election */
if (this._isLeader) {
this._notify('disconnected', { code: null, reason: 'leader death' });
} else {
/* Become the new leader — claim leadership immediately */
this._isLeader = true;
try { localStorage.removeItem(HEARTBEAT_KEY); } catch {}
try { this._setHeartbeat(); } catch {}
this._state = WS_CONNECTING;
this._wasIntentionalClose = false;
/* Start heartbeats before opening socket */
this._startHeartbeats();
this._ws = new WebSocket(WS_URL);
this._ws.binaryType = 'arraybuffer';
this._ws.onopen = () => {
this._state = WS_OPEN;
this._sendSubscription();
};
this._ws.onclose = () => {
this._state = WS_CLOSED;
};
}
break;
case 'SUBSCRIBE_REQUEST':
if (this._isLeader) {
const { wallet } = data;
if (wallet) this._subscribeWallet(wallet);
}
break;
case 'UNSUBSCRIBE_REQUEST':
if (this._isLeader) {
const { wallet: uw } = data;
if (uw) {
this._wm.removeWallet(uw.address, uw.chain);
this._send({
action: 'unsubscribe_wallet',
wallet: { address: uw.address, chain: uw.chain },
});
}
}
break;
default:
/* Raw server data broadcast from leader → relay locally */
this._route(data);
break;
}
}
_checkLeadership() {
const stored = localStorage.getItem(HEARTBEAT_KEY);
if (stored) {
/* Heartbeat from another tab — we follow */
this._isLeader = false;
} else {
/* No one alive — claim leadership */
this._isLeader = true;
try {
this._setHeartbeat();
} catch {}
if (this._channel) {
this._channel.postMessage(JSON.stringify({ type: 'LEADER_ELECTION', timestamp: Date.now() }));
}
}
/* Start heartbeat to keep leadership alive */
this._startHeartbeats();
}
_setHeartbeat() {
localStorage.setItem(HEARTBEAT_KEY, JSON.stringify({
timestamp: Date.now(),
id: `${Date.now()}`
}));
}
_startHeartbeats() {
if (this._heartbeatTimer) return;
this._heartbeatTimer = setInterval(() => {
if (this._isLeader) {
this._setHeartbeat();
}
}, HEARTBEAT_INTERVAL);
}
_handleLeaderDeath() {
if (this._channel) {
this._channel.postMessage(JSON.stringify({ type: 'LEADER_DEATH' }));
}
}
/* ---------------------------------------------------------------- */
/* Public Accessors */
/* ------------------------------------------------------------------ */
get connectionState() {
if (this._state === WS_CONNECTING) return 'CONNECTING';
if (this._state === WS_OPEN) return 'OPEN';
if (this._state === WS_CLOSING) return 'CLOSING';
return 'CLOSED';
}
get isConnected() {
return this._state === WS_OPEN;
}
get isLeader() {
return this._isLeader;
}
get ws() {
return this._ws;
}
}
export default WalletStreamManager;