feat: peer mode relay + self-echo filtering for true bidirectional sync
- Added /peer/{name} route to relay: all clients are equal peers
- handle_peer: binary broadcast to all other peers, catchup for late joiners
- Simplified runtime: single /peer/ WS replaces dual source+receiver
- _peerId: random 8-char ID prevents self-echo from broadcast
- _pid in each diff JSON, filtered in _applyRemoteDiff
This commit is contained in:
parent
5f09886c3c
commit
a943d2e2e9
3 changed files with 112 additions and 55 deletions
|
|
@ -1819,16 +1819,20 @@ const DS = (() => {
|
|||
// ── Signal registry for bidirectional sync ──
|
||||
var _signalRegistry = {};
|
||||
var _applyingRemoteDiff = false;
|
||||
var _peerId = Math.random().toString(36).substr(2, 8); // unique per client
|
||||
|
||||
function _registerSignal(name, sig) {
|
||||
_signalRegistry[name] = sig;
|
||||
}
|
||||
|
||||
function _applyRemoteDiff(json) {
|
||||
_applyingRemoteDiff = true;
|
||||
try {
|
||||
var data = JSON.parse(json);
|
||||
// Ignore our own diffs echoed back by the relay
|
||||
if (data._pid === _peerId) return;
|
||||
_applyingRemoteDiff = true;
|
||||
for (var name in data) {
|
||||
if (name === '_pid') continue;
|
||||
var sig = _signalRegistry[name];
|
||||
if (sig) {
|
||||
sig.value = data[name];
|
||||
|
|
@ -1843,50 +1847,27 @@ const DS = (() => {
|
|||
_streamMode = mode || 'signal';
|
||||
_streamStart = performance.now();
|
||||
|
||||
// Ensure proper source path: ws://host:port → ws://host:port/source/default
|
||||
var sourceUrl = url;
|
||||
if (!url.match(/\/(source|stream)\//)) {
|
||||
sourceUrl = url.replace(/\/?$/, '/source/default');
|
||||
// Use peer path for bidirectional sync: ws://host:port → ws://host:port/peer/default
|
||||
var peerUrl = url;
|
||||
if (!url.match(/\/(source|stream|peer)\//)) {
|
||||
peerUrl = url.replace(/\/?$/, '/peer/default');
|
||||
}
|
||||
_streamWs = new WebSocket(sourceUrl);
|
||||
_streamWs = new WebSocket(peerUrl);
|
||||
_streamWs.binaryType = 'arraybuffer';
|
||||
_streamWs.onmessage = function(e) {
|
||||
if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return;
|
||||
var bytes = new Uint8Array(e.data);
|
||||
var view = new DataView(bytes.buffer);
|
||||
var type = view.getUint8(0);
|
||||
var flags = view.getUint8(1);
|
||||
// Source receives input from receivers (relay forwards receiver input to source)
|
||||
if (flags & 0x01) _handleRemoteInput(type, bytes.subarray(HEADER_SIZE));
|
||||
};
|
||||
_streamWs.onclose = function() { setTimeout(function() { _initStream(url, mode); }, 2000); };
|
||||
console.log('[ds-stream] Source connected:', sourceUrl);
|
||||
|
||||
// Also open a receiver connection for bidirectional sync
|
||||
var receiverUrl = sourceUrl.replace('/source/', '/stream/');
|
||||
_initStreamReceiver(receiverUrl);
|
||||
}
|
||||
|
||||
var _recvWs = null;
|
||||
|
||||
// Receiver WS — listens for diffs from other sources via relay
|
||||
function _initStreamReceiver(url) {
|
||||
_recvWs = new WebSocket(url);
|
||||
_recvWs.binaryType = 'arraybuffer';
|
||||
_recvWs.onmessage = function(e) {
|
||||
if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return;
|
||||
var bytes = new Uint8Array(e.data);
|
||||
var view = new DataView(bytes.buffer);
|
||||
var type = view.getUint8(0);
|
||||
// Signal diff from source — apply to local signals
|
||||
// Signal diff from another peer — apply to local signals
|
||||
if (type === 0x31) {
|
||||
var payload = bytes.subarray(HEADER_SIZE);
|
||||
var json = new TextDecoder().decode(payload);
|
||||
_applyRemoteDiff(json);
|
||||
}
|
||||
};
|
||||
_recvWs.onclose = function() { setTimeout(function() { _initStreamReceiver(url); }, 2000); };
|
||||
console.log('[ds-stream] Receiver connected:', url);
|
||||
_streamWs.onclose = function() { setTimeout(function() { _initStream(url, mode); }, 2000); };
|
||||
console.log('[ds-stream] Peer connected:', peerUrl);
|
||||
}
|
||||
|
||||
function _streamSend(type, flags, payload) {
|
||||
|
|
@ -1904,28 +1885,11 @@ const DS = (() => {
|
|||
}
|
||||
|
||||
function _streamDiff(name, value) {
|
||||
if (_streamMode !== 'signal') return;
|
||||
if (!_streamWs || _streamWs.readyState !== 1 || _streamMode !== 'signal') return;
|
||||
if (_applyingRemoteDiff) return; // prevent echo loops
|
||||
var obj = {};
|
||||
var obj = { _pid: _peerId };
|
||||
obj[name] = (typeof value === 'object' && value !== null && 'value' in value) ? value.value : value;
|
||||
var payload = new TextEncoder().encode(JSON.stringify(obj));
|
||||
// Send via source WS (broadcasts to all receivers)
|
||||
if (_streamWs && _streamWs.readyState === 1) {
|
||||
_streamSend(0x31, 0, payload);
|
||||
}
|
||||
// Also send via receiver WS with INPUT flag (relay forwards to source for rebroadcast)
|
||||
if (_recvWs && _recvWs.readyState === 1) {
|
||||
var ts = (performance.now() - _streamStart) | 0;
|
||||
var msg = new Uint8Array(HEADER_SIZE + payload.length);
|
||||
var v = new DataView(msg.buffer);
|
||||
v.setUint8(0, 0x31);
|
||||
v.setUint8(1, 0x01); // IS_INPUT flag
|
||||
v.setUint16(2, (_streamSeq++) & 0xFFFF, true);
|
||||
v.setUint32(4, ts, true);
|
||||
v.setUint32(12, payload.length, true);
|
||||
msg.set(payload, HEADER_SIZE);
|
||||
_recvWs.send(msg.buffer);
|
||||
}
|
||||
_streamSend(0x31, 0, new TextEncoder().encode(JSON.stringify(obj)));
|
||||
}
|
||||
|
||||
function _streamSync(signals) {
|
||||
|
|
|
|||
|
|
@ -278,6 +278,7 @@ enum ConnectionRole {
|
|||
Source(String), // channel name
|
||||
Receiver(String), // channel name
|
||||
Signaling(String), // channel name — WebRTC signaling
|
||||
Peer(String), // channel name — bidirectional sync
|
||||
}
|
||||
|
||||
/// Parse the WebSocket URI path to determine connection role and channel.
|
||||
|
|
@ -290,6 +291,8 @@ fn parse_path(path: &str) -> ConnectionRole {
|
|||
["stream", name] => ConnectionRole::Receiver(name.to_string()),
|
||||
["signal"] => ConnectionRole::Signaling("default".to_string()),
|
||||
["signal", name] => ConnectionRole::Signaling(name.to_string()),
|
||||
["peer"] => ConnectionRole::Peer("default".to_string()),
|
||||
["peer", name] => ConnectionRole::Peer(name.to_string()),
|
||||
_ => ConnectionRole::Receiver("default".to_string()), // legacy: `/` = receiver
|
||||
}
|
||||
}
|
||||
|
|
@ -417,7 +420,7 @@ async fn handle_connection(
|
|||
|
||||
// Get or create the channel
|
||||
let channel_name = match &role {
|
||||
ConnectionRole::Source(n) | ConnectionRole::Receiver(n) | ConnectionRole::Signaling(n) => n.clone(),
|
||||
ConnectionRole::Source(n) | ConnectionRole::Receiver(n) | ConnectionRole::Signaling(n) | ConnectionRole::Peer(n) => n.clone(),
|
||||
};
|
||||
|
||||
let channel = {
|
||||
|
|
@ -470,6 +473,10 @@ async fn handle_connection(
|
|||
eprintln!("[relay:{channel_name}] Signaling peer connected: {addr}");
|
||||
handle_signaling(ws_stream, addr, channel, &channel_name).await;
|
||||
}
|
||||
ConnectionRole::Peer(ref _name) => {
|
||||
eprintln!("[relay:{channel_name}] Peer connected: {addr}");
|
||||
handle_peer(ws_stream, addr, channel, &channel_name).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -709,6 +716,92 @@ async fn handle_signaling(
|
|||
eprintln!("[relay:{channel_name}] Signaling peer disconnected: {addr}");
|
||||
}
|
||||
|
||||
/// Handle a peer connection for bidirectional sync.
|
||||
///
|
||||
/// Peers are equal — every binary frame sent by any peer is broadcast to all
|
||||
/// other peers on the same channel. No source/receiver distinction.
|
||||
async fn handle_peer(
|
||||
ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
|
||||
addr: SocketAddr,
|
||||
channel: Arc<RwLock<ChannelState>>,
|
||||
channel_name: &str,
|
||||
) {
|
||||
let (mut ws_sink, mut ws_source) = ws_stream.split();
|
||||
|
||||
// Subscribe to the frame broadcast channel (same one sources use)
|
||||
let mut frame_rx = {
|
||||
let cs = channel.read().await;
|
||||
cs.frame_tx.subscribe()
|
||||
};
|
||||
|
||||
let frame_tx = {
|
||||
let cs = channel.read().await;
|
||||
cs.frame_tx.clone()
|
||||
};
|
||||
|
||||
// Track peer count
|
||||
{
|
||||
let mut cs = channel.write().await;
|
||||
cs.stats.connected_receivers += 1;
|
||||
cs.stats.peak_receivers = cs.stats.peak_receivers.max(cs.stats.connected_receivers);
|
||||
}
|
||||
|
||||
// Send catchup state to late-joining peers
|
||||
{
|
||||
let cs = channel.read().await;
|
||||
if cs.cache.has_state() {
|
||||
let msgs = cs.cache.catchup_messages();
|
||||
eprintln!("[relay:{channel_name}] Sending {} catchup messages to peer {addr}", msgs.len());
|
||||
for msg in msgs {
|
||||
let _ = ws_sink.send(Message::Binary(msg.into())).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Forward frames from broadcast → this peer
|
||||
let channel_name_owned = channel_name.to_string();
|
||||
let send_task = tokio::spawn(async move {
|
||||
loop {
|
||||
match frame_rx.recv().await {
|
||||
Ok(frame_data) => {
|
||||
if ws_sink.send(Message::Binary(frame_data.into())).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
eprintln!("[relay:{channel_name_owned}] Peer lagged by {n} frames");
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Forward frames from this peer → broadcast to all others
|
||||
let channel_for_cache = channel.clone();
|
||||
let channel_name_cache = channel_name.to_string();
|
||||
while let Some(Ok(msg)) = ws_source.next().await {
|
||||
if let Message::Binary(data) = msg {
|
||||
let data_vec: Vec<u8> = data.into();
|
||||
// Cache for late joiners
|
||||
{
|
||||
let mut cs = channel_for_cache.write().await;
|
||||
cs.cache.process_frame(&data_vec);
|
||||
cs.stats.frames_relayed += 1;
|
||||
cs.stats.bytes_relayed += data_vec.len() as u64;
|
||||
}
|
||||
// Broadcast to all peers (they'll filter out their own msgs by content)
|
||||
let _ = frame_tx.send(data_vec);
|
||||
}
|
||||
}
|
||||
|
||||
send_task.abort();
|
||||
{
|
||||
let mut cs = channel.write().await;
|
||||
cs.stats.connected_receivers = cs.stats.connected_receivers.saturating_sub(1);
|
||||
}
|
||||
eprintln!("[relay:{channel_name}] Peer disconnected: {addr}");
|
||||
}
|
||||
|
||||
// ─── Tests ───
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ view app =
|
|||
-- Add new todo
|
||||
row [
|
||||
input "" { bind: input_text, placeholder: "What needs to be done?" }
|
||||
button "Add" { click: push(todos, input_text) }
|
||||
button "Add" { click: push(todos, input_text), click: push(done, 0) }
|
||||
]
|
||||
|
||||
-- Todo list
|
||||
|
|
@ -35,6 +35,6 @@ view app =
|
|||
-- Controls
|
||||
row [
|
||||
text "{total} items"
|
||||
button "Clear All" { click: todos = [] }
|
||||
button "Clear All" { click: todos = [], click: done = [] }
|
||||
]
|
||||
]
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue