From a943d2e2e904c61efdc879b0226edd2eaeb94b4c Mon Sep 17 00:00:00 2001 From: enzotar Date: Wed, 25 Feb 2026 21:37:17 -0800 Subject: [PATCH] feat: peer mode relay + self-echo filtering for true bidirectional sync - Added /peer/{name} route to relay: all clients are equal peers - handle_peer: binary broadcast to all other peers, catchup for late joiners - Simplified runtime: single /peer/ WS replaces dual source+receiver - _peerId: random 8-char ID prevents self-echo from broadcast - _pid in each diff JSON, filtered in _applyRemoteDiff --- compiler/ds-codegen/src/js_emitter.rs | 68 +++++-------------- engine/ds-stream/src/relay.rs | 95 ++++++++++++++++++++++++++- examples/todo.ds | 4 +- 3 files changed, 112 insertions(+), 55 deletions(-) diff --git a/compiler/ds-codegen/src/js_emitter.rs b/compiler/ds-codegen/src/js_emitter.rs index 9257104..2151f4c 100644 --- a/compiler/ds-codegen/src/js_emitter.rs +++ b/compiler/ds-codegen/src/js_emitter.rs @@ -1819,16 +1819,20 @@ const DS = (() => { // ── Signal registry for bidirectional sync ── var _signalRegistry = {}; var _applyingRemoteDiff = false; + var _peerId = Math.random().toString(36).substr(2, 8); // unique per client function _registerSignal(name, sig) { _signalRegistry[name] = sig; } function _applyRemoteDiff(json) { - _applyingRemoteDiff = true; try { var data = JSON.parse(json); + // Ignore our own diffs echoed back by the relay + if (data._pid === _peerId) return; + _applyingRemoteDiff = true; for (var name in data) { + if (name === '_pid') continue; var sig = _signalRegistry[name]; if (sig) { sig.value = data[name]; @@ -1843,50 +1847,27 @@ const DS = (() => { _streamMode = mode || 'signal'; _streamStart = performance.now(); - // Ensure proper source path: ws://host:port → ws://host:port/source/default - var sourceUrl = url; - if (!url.match(/\/(source|stream)\//)) { - sourceUrl = url.replace(/\/?$/, '/source/default'); + // Use peer path for bidirectional sync: ws://host:port → ws://host:port/peer/default + var peerUrl = url; + if (!url.match(/\/(source|stream|peer)\//)) { + peerUrl = url.replace(/\/?$/, '/peer/default'); } - _streamWs = new WebSocket(sourceUrl); + _streamWs = new WebSocket(peerUrl); _streamWs.binaryType = 'arraybuffer'; _streamWs.onmessage = function(e) { if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return; var bytes = new Uint8Array(e.data); var view = new DataView(bytes.buffer); var type = view.getUint8(0); - var flags = view.getUint8(1); - // Source receives input from receivers (relay forwards receiver input to source) - if (flags & 0x01) _handleRemoteInput(type, bytes.subarray(HEADER_SIZE)); - }; - _streamWs.onclose = function() { setTimeout(function() { _initStream(url, mode); }, 2000); }; - console.log('[ds-stream] Source connected:', sourceUrl); - - // Also open a receiver connection for bidirectional sync - var receiverUrl = sourceUrl.replace('/source/', '/stream/'); - _initStreamReceiver(receiverUrl); - } - - var _recvWs = null; - - // Receiver WS — listens for diffs from other sources via relay - function _initStreamReceiver(url) { - _recvWs = new WebSocket(url); - _recvWs.binaryType = 'arraybuffer'; - _recvWs.onmessage = function(e) { - if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return; - var bytes = new Uint8Array(e.data); - var view = new DataView(bytes.buffer); - var type = view.getUint8(0); - // Signal diff from source — apply to local signals + // Signal diff from another peer — apply to local signals if (type === 0x31) { var payload = bytes.subarray(HEADER_SIZE); var json = new TextDecoder().decode(payload); _applyRemoteDiff(json); } }; - _recvWs.onclose = function() { setTimeout(function() { _initStreamReceiver(url); }, 2000); }; - console.log('[ds-stream] Receiver connected:', url); + _streamWs.onclose = function() { setTimeout(function() { _initStream(url, mode); }, 2000); }; + console.log('[ds-stream] Peer connected:', peerUrl); } function _streamSend(type, flags, payload) { @@ -1904,28 +1885,11 @@ const DS = (() => { } function _streamDiff(name, value) { - if (_streamMode !== 'signal') return; + if (!_streamWs || _streamWs.readyState !== 1 || _streamMode !== 'signal') return; if (_applyingRemoteDiff) return; // prevent echo loops - var obj = {}; + var obj = { _pid: _peerId }; obj[name] = (typeof value === 'object' && value !== null && 'value' in value) ? value.value : value; - var payload = new TextEncoder().encode(JSON.stringify(obj)); - // Send via source WS (broadcasts to all receivers) - if (_streamWs && _streamWs.readyState === 1) { - _streamSend(0x31, 0, payload); - } - // Also send via receiver WS with INPUT flag (relay forwards to source for rebroadcast) - if (_recvWs && _recvWs.readyState === 1) { - var ts = (performance.now() - _streamStart) | 0; - var msg = new Uint8Array(HEADER_SIZE + payload.length); - var v = new DataView(msg.buffer); - v.setUint8(0, 0x31); - v.setUint8(1, 0x01); // IS_INPUT flag - v.setUint16(2, (_streamSeq++) & 0xFFFF, true); - v.setUint32(4, ts, true); - v.setUint32(12, payload.length, true); - msg.set(payload, HEADER_SIZE); - _recvWs.send(msg.buffer); - } + _streamSend(0x31, 0, new TextEncoder().encode(JSON.stringify(obj))); } function _streamSync(signals) { diff --git a/engine/ds-stream/src/relay.rs b/engine/ds-stream/src/relay.rs index 9558280..479cd09 100644 --- a/engine/ds-stream/src/relay.rs +++ b/engine/ds-stream/src/relay.rs @@ -278,6 +278,7 @@ enum ConnectionRole { Source(String), // channel name Receiver(String), // channel name Signaling(String), // channel name — WebRTC signaling + Peer(String), // channel name — bidirectional sync } /// Parse the WebSocket URI path to determine connection role and channel. @@ -290,6 +291,8 @@ fn parse_path(path: &str) -> ConnectionRole { ["stream", name] => ConnectionRole::Receiver(name.to_string()), ["signal"] => ConnectionRole::Signaling("default".to_string()), ["signal", name] => ConnectionRole::Signaling(name.to_string()), + ["peer"] => ConnectionRole::Peer("default".to_string()), + ["peer", name] => ConnectionRole::Peer(name.to_string()), _ => ConnectionRole::Receiver("default".to_string()), // legacy: `/` = receiver } } @@ -417,7 +420,7 @@ async fn handle_connection( // Get or create the channel let channel_name = match &role { - ConnectionRole::Source(n) | ConnectionRole::Receiver(n) | ConnectionRole::Signaling(n) => n.clone(), + ConnectionRole::Source(n) | ConnectionRole::Receiver(n) | ConnectionRole::Signaling(n) | ConnectionRole::Peer(n) => n.clone(), }; let channel = { @@ -470,6 +473,10 @@ async fn handle_connection( eprintln!("[relay:{channel_name}] Signaling peer connected: {addr}"); handle_signaling(ws_stream, addr, channel, &channel_name).await; } + ConnectionRole::Peer(ref _name) => { + eprintln!("[relay:{channel_name}] Peer connected: {addr}"); + handle_peer(ws_stream, addr, channel, &channel_name).await; + } } } @@ -709,6 +716,92 @@ async fn handle_signaling( eprintln!("[relay:{channel_name}] Signaling peer disconnected: {addr}"); } +/// Handle a peer connection for bidirectional sync. +/// +/// Peers are equal — every binary frame sent by any peer is broadcast to all +/// other peers on the same channel. No source/receiver distinction. +async fn handle_peer( + ws_stream: tokio_tungstenite::WebSocketStream, + addr: SocketAddr, + channel: Arc>, + channel_name: &str, +) { + let (mut ws_sink, mut ws_source) = ws_stream.split(); + + // Subscribe to the frame broadcast channel (same one sources use) + let mut frame_rx = { + let cs = channel.read().await; + cs.frame_tx.subscribe() + }; + + let frame_tx = { + let cs = channel.read().await; + cs.frame_tx.clone() + }; + + // Track peer count + { + let mut cs = channel.write().await; + cs.stats.connected_receivers += 1; + cs.stats.peak_receivers = cs.stats.peak_receivers.max(cs.stats.connected_receivers); + } + + // Send catchup state to late-joining peers + { + let cs = channel.read().await; + if cs.cache.has_state() { + let msgs = cs.cache.catchup_messages(); + eprintln!("[relay:{channel_name}] Sending {} catchup messages to peer {addr}", msgs.len()); + for msg in msgs { + let _ = ws_sink.send(Message::Binary(msg.into())).await; + } + } + } + + // Forward frames from broadcast → this peer + let channel_name_owned = channel_name.to_string(); + let send_task = tokio::spawn(async move { + loop { + match frame_rx.recv().await { + Ok(frame_data) => { + if ws_sink.send(Message::Binary(frame_data.into())).await.is_err() { + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + eprintln!("[relay:{channel_name_owned}] Peer lagged by {n} frames"); + } + Err(_) => break, + } + } + }); + + // Forward frames from this peer → broadcast to all others + let channel_for_cache = channel.clone(); + let channel_name_cache = channel_name.to_string(); + while let Some(Ok(msg)) = ws_source.next().await { + if let Message::Binary(data) = msg { + let data_vec: Vec = data.into(); + // Cache for late joiners + { + let mut cs = channel_for_cache.write().await; + cs.cache.process_frame(&data_vec); + cs.stats.frames_relayed += 1; + cs.stats.bytes_relayed += data_vec.len() as u64; + } + // Broadcast to all peers (they'll filter out their own msgs by content) + let _ = frame_tx.send(data_vec); + } + } + + send_task.abort(); + { + let mut cs = channel.write().await; + cs.stats.connected_receivers = cs.stats.connected_receivers.saturating_sub(1); + } + eprintln!("[relay:{channel_name}] Peer disconnected: {addr}"); +} + // ─── Tests ─── #[cfg(test)] diff --git a/examples/todo.ds b/examples/todo.ds index 83ef5e3..cd9d32b 100644 --- a/examples/todo.ds +++ b/examples/todo.ds @@ -22,7 +22,7 @@ view app = -- Add new todo row [ input "" { bind: input_text, placeholder: "What needs to be done?" } - button "Add" { click: push(todos, input_text) } + button "Add" { click: push(todos, input_text), click: push(done, 0) } ] -- Todo list @@ -35,6 +35,6 @@ view app = -- Controls row [ text "{total} items" - button "Clear All" { click: todos = [] } + button "Clear All" { click: todos = [], click: done = [] } ] ]