feat: multi-wallet architecture with localStorage state, EIP-712 verification, cross-tab WS leadership
- wallets.js: WalletManager state management with chain validation - verifier.js: WalletVerifier EIP-712 signing via window.ethereum - stream.js: WalletStreamManager with BroadcastChannel leader election, backoff + jitter - AGENTS.md: updated with coding guidelines
This commit is contained in:
489
stream.js
Normal file
489
stream.js
Normal file
@ -0,0 +1,489 @@
|
||||
/**
|
||||
* WalletStreamManager — Single WebSocket, multiplexed wallet subscriptions.
|
||||
* Cross-tab leader election via BroadcastChannel.
|
||||
*
|
||||
* One leader owns the physical WebSocket; followers relay
|
||||
* subscriptions and receive broadcasts through BroadcastChannel.
|
||||
* Automatic failover when the leader tab closes.
|
||||
*
|
||||
* Usage:
|
||||
* import { WalletStreamManager } from './stream.js';
|
||||
* const stream = new WalletStreamManager(walletManagerInstance);
|
||||
* stream.connect();
|
||||
*/
|
||||
|
||||
import { WalletManager } from './wallets.js';
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Connection Config */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
const WS_URL = `${location.protocol === 'https:' ? 'wss:' : 'ws:'}//${location.host}/ws/portfolio`;
|
||||
const CHANNEL_NAME = 'cbBTC-portfolio-sync';
|
||||
const HEARTBEAT_INTERVAL = 200;
|
||||
const HEARTBEAT_KEY = 'cbbtc_leadership';
|
||||
|
||||
const BASE_DELAY = 1_000;
|
||||
const MAX_DELAY = 30_000;
|
||||
const JITTER_RATIO = 0.25;
|
||||
|
||||
const WS_CONNECTING = 0;
|
||||
const WS_OPEN = 1;
|
||||
const WS_CLOSING = 2;
|
||||
const WS_CLOSED = 3;
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* WalletStreamManager */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
export class WalletStreamManager {
|
||||
constructor(walletManager) {
|
||||
if (!(walletManager instanceof WalletManager)) {
|
||||
throw new TypeError('WalletStreamManager requires a WalletManager instance.');
|
||||
}
|
||||
|
||||
/** @type {WalletManager} */
|
||||
this._wm = walletManager;
|
||||
|
||||
/** @type {WebSocket|null} */
|
||||
this._ws = null;
|
||||
this._state = WS_CLOSED;
|
||||
this._retryCount = 0;
|
||||
this._reconnectTimer = null;
|
||||
this._listeners = new Map();
|
||||
this._messageBuffer = [];
|
||||
|
||||
/* Cross-tab state */
|
||||
/** @type {boolean} */
|
||||
this._isLeader = false;
|
||||
|
||||
/** @type {BroadcastChannel|null} */
|
||||
this._channel = null;
|
||||
|
||||
/** @type {ReturnType<typeof setInterval>|null} */
|
||||
this._heartbeatTimer = null;
|
||||
|
||||
/** @type {boolean} */
|
||||
this._wasIntentionalClose = false;
|
||||
|
||||
/** @type {boolean} */
|
||||
this._wasConnectedBefore = false;
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------- */
|
||||
/* Connection Lifecycle */
|
||||
/* ---------------------------------------------------------------- */
|
||||
|
||||
connect() {
|
||||
if (this._state === WS_OPEN || this._state === WS_CONNECTING) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* Cross-tab leadership — only run on initial boot, not reconnect */
|
||||
this._initBroadcastChannel();
|
||||
if (!this._wasConnectedBefore) {
|
||||
this._checkLeadership();
|
||||
this._wasConnectedBefore = true;
|
||||
}
|
||||
|
||||
/* If leader, open WebSocket and start heartbeats */
|
||||
if (this._isLeader) {
|
||||
this._state = WS_CONNECTING;
|
||||
this._wasIntentionalClose = false;
|
||||
|
||||
this._ws = new WebSocket(WS_URL);
|
||||
this._ws.binaryType = 'arraybuffer';
|
||||
|
||||
this._ws.onopen = () => {
|
||||
this._state = WS_OPEN;
|
||||
this._retryCount = 0;
|
||||
this._sendSubscription();
|
||||
this._notify('connected', { url: WS_URL });
|
||||
};
|
||||
|
||||
this._ws.onmessage = (event) => {
|
||||
const data = this._parseSafe(event.data);
|
||||
if (!data) return;
|
||||
/* Only leader broadcasts to other tabs */
|
||||
if (this._isLeader && this._channel) {
|
||||
this._channel.postMessage(JSON.stringify(data));
|
||||
}
|
||||
this._route(data);
|
||||
};
|
||||
|
||||
this._ws.onerror = () => {
|
||||
/* handled by onclose */
|
||||
};
|
||||
|
||||
this._ws.onclose = (evt) => {
|
||||
this._state = WS_CLOSED;
|
||||
|
||||
if (!this._wasIntentionalClose) {
|
||||
this._scheduleReconnect();
|
||||
}
|
||||
};
|
||||
|
||||
this._startHeartbeats();
|
||||
} else {
|
||||
/* Follower — no WebSocket */
|
||||
this._state = WS_CLOSED;
|
||||
}
|
||||
}
|
||||
|
||||
disconnect() {
|
||||
this._wasIntentionalClose = true;
|
||||
|
||||
/* Broadcast LEADER_DEATH before channel close so followers can elect a new leader */
|
||||
if (this._isLeader && this._channel) {
|
||||
try {
|
||||
this._channel.postMessage(JSON.stringify({ type: 'LEADER_DEATH' }));
|
||||
} catch { /* channel already closed */ }
|
||||
}
|
||||
|
||||
if (this._reconnectTimer) {
|
||||
clearTimeout(this._reconnectTimer);
|
||||
this._reconnectTimer = null;
|
||||
}
|
||||
|
||||
if (this._heartbeatTimer) {
|
||||
clearInterval(this._heartbeatTimer);
|
||||
this._heartbeatTimer = null;
|
||||
}
|
||||
|
||||
if (this._channel) {
|
||||
this._channel.close();
|
||||
this._channel = null;
|
||||
}
|
||||
|
||||
if (this._ws) {
|
||||
this._ws.onclose = null;
|
||||
try { this._ws.close(); } catch { /* already closed */ }
|
||||
this._ws = null;
|
||||
}
|
||||
|
||||
/* Release leadership */
|
||||
if (this._isLeader) {
|
||||
try { localStorage.removeItem(HEARTBEAT_KEY); } catch {}
|
||||
this._isLeader = false;
|
||||
}
|
||||
|
||||
this._state = WS_CLOSED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast LEADER_DEATH when this tab is closing and it holds leadership.
|
||||
* Used by the test to simulate the unload event.
|
||||
*/
|
||||
_handleExit() {
|
||||
if (this._isLeader && this._channel) {
|
||||
try {
|
||||
this._channel.postMessage(JSON.stringify({ type: 'LEADER_DEATH' }));
|
||||
} catch { /* channel already closed */ }
|
||||
}
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------- */
|
||||
/* Subscriptions */
|
||||
/* ---------------------------------------------------------------- */
|
||||
|
||||
subscribeToNewWallet(wallet) {
|
||||
/* If follower, proxy the request to the leader */
|
||||
if (!this._isLeader && this._channel) {
|
||||
this._channel.postMessage(JSON.stringify({
|
||||
type: 'SUBSCRIBE_REQUEST',
|
||||
wallet
|
||||
}));
|
||||
return;
|
||||
}
|
||||
|
||||
/* Leader handles directly */
|
||||
if (this._state !== WS_OPEN || !this._ws) return;
|
||||
|
||||
if (!this._wm.findWallet(wallet.address, wallet.chain)) {
|
||||
const result = this._wm.addWallet(wallet.address, wallet.chain);
|
||||
if (result.success) {
|
||||
this._subscribeWallet(wallet);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsubscribeFromWallet(address, chain) {
|
||||
if (!this._isLeader && this._channel) {
|
||||
this._channel.postMessage(JSON.stringify({
|
||||
type: 'UNSUBSCRIBE_REQUEST',
|
||||
wallet: { address, chain }
|
||||
}));
|
||||
return;
|
||||
}
|
||||
|
||||
if (this._state !== WS_OPEN || !this._ws) return;
|
||||
this._send({
|
||||
action: 'unsubscribe_wallet',
|
||||
wallet: { address, chain },
|
||||
});
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------- */
|
||||
/* Callback Registration */
|
||||
/* ---------------------------------------------------------------- */
|
||||
|
||||
on(event, callback) {
|
||||
if (!this._listeners.has(event)) {
|
||||
this._listeners.set(event, new Set());
|
||||
}
|
||||
const set = this._listeners.get(event);
|
||||
set.add(callback);
|
||||
return () => set.delete(callback);
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------- */
|
||||
/* Internal helpers */
|
||||
/* ---------------------------------------------------------------- */
|
||||
|
||||
_sendSubscription() {
|
||||
const wallets = this._wm.getWallets();
|
||||
this._send({
|
||||
action: 'subscribe',
|
||||
subscriptions: wallets.map((w) => ({
|
||||
address: w.address,
|
||||
chain: w.chain,
|
||||
signature: w.signature ?? null,
|
||||
messageData: w.messageData ?? null,
|
||||
})),
|
||||
});
|
||||
}
|
||||
|
||||
_subscribeWallet(wallet) {
|
||||
this._send({
|
||||
action: 'subscribe_wallet',
|
||||
wallet: {
|
||||
address: wallet.address,
|
||||
chain: wallet.chain,
|
||||
signature: wallet.signature ?? null,
|
||||
messageData: wallet.messageData ?? null,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
_parseSafe(raw) {
|
||||
if (typeof raw === 'string') {
|
||||
try { return JSON.parse(raw); } catch { return null; }
|
||||
}
|
||||
if (typeof raw === 'object' && raw !== null) return raw;
|
||||
return null;
|
||||
}
|
||||
|
||||
_route(data) {
|
||||
const type = data?.event ?? data?.type ?? 'message';
|
||||
|
||||
const typeCallbacks = this._listeners.get(type);
|
||||
if (typeCallbacks) {
|
||||
typeCallbacks.forEach(fn => {
|
||||
try { fn(data); } catch (err) {
|
||||
console.error(`Callback error for event "${type}":`, err);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const allCallbacks = this._listeners.get('message');
|
||||
if (allCallbacks) {
|
||||
allCallbacks.forEach(fn => {
|
||||
try { fn(data); } catch (err) {
|
||||
console.error('Callback error for "message":', err);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
_notify(event, payload) {
|
||||
this._route({ event, ...payload });
|
||||
}
|
||||
|
||||
_send(payload) {
|
||||
try {
|
||||
this._ws.send(JSON.stringify(payload));
|
||||
} catch {
|
||||
/* ws closing/closed */
|
||||
}
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------- */
|
||||
/* Reconnect + Backoff */
|
||||
/* ---------------------------------------------------------------- */
|
||||
|
||||
_scheduleReconnect() {
|
||||
if (this._reconnectTimer) {
|
||||
clearTimeout(this._reconnectTimer);
|
||||
this._reconnectTimer = null;
|
||||
}
|
||||
|
||||
const delay = this._jitteredBackoff(this._retryCount);
|
||||
this._retryCount++;
|
||||
|
||||
this._reconnectTimer = setTimeout(() => {
|
||||
this._reconnectTimer = null;
|
||||
this.connect();
|
||||
}, delay);
|
||||
}
|
||||
|
||||
_jitteredBackoff(attempt) {
|
||||
const base = Math.min(BASE_DELAY * (1 << attempt), MAX_DELAY);
|
||||
const jitter = base * JITTER_RATIO * (Math.random() * 2 - 1);
|
||||
return Math.max(100, Math.round(base + jitter));
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------- */
|
||||
/* Leader Election & Heartbeat */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
_initBroadcastChannel() {
|
||||
if (this._channel) return;
|
||||
|
||||
/* BroadcastChannel may not be available (Node.js / non-browser) */
|
||||
/* eslint-disable-next-line no-undef */
|
||||
if (typeof BroadcastChannel === 'undefined') return;
|
||||
|
||||
this._channel = new BroadcastChannel(CHANNEL_NAME);
|
||||
|
||||
this._channel.onmessage = (ev) => {
|
||||
this._handleBroadcastMessage(ev.data);
|
||||
};
|
||||
}
|
||||
|
||||
_handleBroadcastMessage(raw) {
|
||||
const data = this._parseSafe(raw);
|
||||
if (!data) return;
|
||||
|
||||
switch (data.type) {
|
||||
case 'LEADER_ELECTION':
|
||||
/* Someone else is leader — we step down */
|
||||
if (this._isLeader) {
|
||||
this._isLeader = false;
|
||||
if (this._ws) {
|
||||
try { this._ws.close(); } catch {}
|
||||
this._ws = null;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case 'LEADER_DEATH':
|
||||
/* Leader tab closed — we hold emergency election */
|
||||
if (this._isLeader) {
|
||||
this._notify('disconnected', { code: null, reason: 'leader death' });
|
||||
} else {
|
||||
/* Become the new leader — claim leadership immediately */
|
||||
this._isLeader = true;
|
||||
try { localStorage.removeItem(HEARTBEAT_KEY); } catch {}
|
||||
try { this._setHeartbeat(); } catch {}
|
||||
this._state = WS_CONNECTING;
|
||||
this._wasIntentionalClose = false;
|
||||
/* Start heartbeats before opening socket */
|
||||
this._startHeartbeats();
|
||||
this._ws = new WebSocket(WS_URL);
|
||||
this._ws.binaryType = 'arraybuffer';
|
||||
this._ws.onopen = () => {
|
||||
this._state = WS_OPEN;
|
||||
this._sendSubscription();
|
||||
};
|
||||
this._ws.onclose = () => {
|
||||
this._state = WS_CLOSED;
|
||||
};
|
||||
}
|
||||
break;
|
||||
|
||||
case 'SUBSCRIBE_REQUEST':
|
||||
if (this._isLeader) {
|
||||
const { wallet } = data;
|
||||
if (wallet) this._subscribeWallet(wallet);
|
||||
}
|
||||
break;
|
||||
|
||||
case 'UNSUBSCRIBE_REQUEST':
|
||||
if (this._isLeader) {
|
||||
const { wallet: uw } = data;
|
||||
if (uw) {
|
||||
this._wm.removeWallet(uw.address, uw.chain);
|
||||
this._send({
|
||||
action: 'unsubscribe_wallet',
|
||||
wallet: { address: uw.address, chain: uw.chain },
|
||||
});
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
/* Raw server data broadcast from leader → relay locally */
|
||||
this._route(data);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_checkLeadership() {
|
||||
const stored = localStorage.getItem(HEARTBEAT_KEY);
|
||||
|
||||
if (stored) {
|
||||
/* Heartbeat from another tab — we follow */
|
||||
this._isLeader = false;
|
||||
} else {
|
||||
/* No one alive — claim leadership */
|
||||
this._isLeader = true;
|
||||
try {
|
||||
this._setHeartbeat();
|
||||
} catch {}
|
||||
if (this._channel) {
|
||||
this._channel.postMessage(JSON.stringify({ type: 'LEADER_ELECTION', timestamp: Date.now() }));
|
||||
}
|
||||
}
|
||||
|
||||
/* Start heartbeat to keep leadership alive */
|
||||
this._startHeartbeats();
|
||||
}
|
||||
|
||||
_setHeartbeat() {
|
||||
localStorage.setItem(HEARTBEAT_KEY, JSON.stringify({
|
||||
timestamp: Date.now(),
|
||||
id: `${Date.now()}`
|
||||
}));
|
||||
}
|
||||
|
||||
_startHeartbeats() {
|
||||
if (this._heartbeatTimer) return;
|
||||
|
||||
this._heartbeatTimer = setInterval(() => {
|
||||
if (this._isLeader) {
|
||||
this._setHeartbeat();
|
||||
}
|
||||
}, HEARTBEAT_INTERVAL);
|
||||
}
|
||||
|
||||
_handleLeaderDeath() {
|
||||
if (this._channel) {
|
||||
this._channel.postMessage(JSON.stringify({ type: 'LEADER_DEATH' }));
|
||||
}
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------- */
|
||||
/* Public Accessors */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
get connectionState() {
|
||||
if (this._state === WS_CONNECTING) return 'CONNECTING';
|
||||
if (this._state === WS_OPEN) return 'OPEN';
|
||||
if (this._state === WS_CLOSING) return 'CLOSING';
|
||||
return 'CLOSED';
|
||||
}
|
||||
|
||||
get isConnected() {
|
||||
return this._state === WS_OPEN;
|
||||
}
|
||||
|
||||
get isLeader() {
|
||||
return this._isLeader;
|
||||
}
|
||||
|
||||
get ws() {
|
||||
return this._ws;
|
||||
}
|
||||
}
|
||||
|
||||
export default WalletStreamManager;
|
||||
Reference in New Issue
Block a user