diff --git a/compiler/ds-codegen/src/js_emitter.rs b/compiler/ds-codegen/src/js_emitter.rs index e7437e4..9257104 100644 --- a/compiler/ds-codegen/src/js_emitter.rs +++ b/compiler/ds-codegen/src/js_emitter.rs @@ -1842,7 +1842,13 @@ const DS = (() => { function _initStream(url, mode) { _streamMode = mode || 'signal'; _streamStart = performance.now(); - _streamWs = new WebSocket(url); + + // Ensure proper source path: ws://host:port → ws://host:port/source/default + var sourceUrl = url; + if (!url.match(/\/(source|stream)\//)) { + sourceUrl = url.replace(/\/?$/, '/source/default'); + } + _streamWs = new WebSocket(sourceUrl); _streamWs.binaryType = 'arraybuffer'; _streamWs.onmessage = function(e) { if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return; @@ -1850,33 +1856,36 @@ const DS = (() => { var view = new DataView(bytes.buffer); var type = view.getUint8(0); var flags = view.getUint8(1); + // Source receives input from receivers (relay forwards receiver input to source) if (flags & 0x01) _handleRemoteInput(type, bytes.subarray(HEADER_SIZE)); }; _streamWs.onclose = function() { setTimeout(function() { _initStream(url, mode); }, 2000); }; - console.log('[ds-stream] Source connected:', url, 'mode:', mode); + console.log('[ds-stream] Source connected:', sourceUrl); // Also open a receiver connection for bidirectional sync - var receiverUrl = url.replace(/\/source/, '/stream'); + var receiverUrl = sourceUrl.replace('/source/', '/stream/'); _initStreamReceiver(receiverUrl); } - // Receiver WS — listens for diffs from other clients via relay + var _recvWs = null; + + // Receiver WS — listens for diffs from other sources via relay function _initStreamReceiver(url) { - var recvWs = new WebSocket(url); - recvWs.binaryType = 'arraybuffer'; - recvWs.onmessage = function(e) { + _recvWs = new WebSocket(url); + _recvWs.binaryType = 'arraybuffer'; + _recvWs.onmessage = function(e) { if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return; var bytes = new Uint8Array(e.data); var view = new DataView(bytes.buffer); var type = view.getUint8(0); - // Signal diff frame + // Signal diff from source — apply to local signals if (type === 0x31) { var payload = bytes.subarray(HEADER_SIZE); var json = new TextDecoder().decode(payload); _applyRemoteDiff(json); } }; - recvWs.onclose = function() { setTimeout(function() { _initStreamReceiver(url); }, 2000); }; + _recvWs.onclose = function() { setTimeout(function() { _initStreamReceiver(url); }, 2000); }; console.log('[ds-stream] Receiver connected:', url); } @@ -1895,11 +1904,28 @@ const DS = (() => { } function _streamDiff(name, value) { - if (!_streamWs || _streamMode !== 'signal') return; + if (_streamMode !== 'signal') return; if (_applyingRemoteDiff) return; // prevent echo loops var obj = {}; obj[name] = (typeof value === 'object' && value !== null && 'value' in value) ? value.value : value; - _streamSend(0x31, 0, new TextEncoder().encode(JSON.stringify(obj))); + var payload = new TextEncoder().encode(JSON.stringify(obj)); + // Send via source WS (broadcasts to all receivers) + if (_streamWs && _streamWs.readyState === 1) { + _streamSend(0x31, 0, payload); + } + // Also send via receiver WS with INPUT flag (relay forwards to source for rebroadcast) + if (_recvWs && _recvWs.readyState === 1) { + var ts = (performance.now() - _streamStart) | 0; + var msg = new Uint8Array(HEADER_SIZE + payload.length); + var v = new DataView(msg.buffer); + v.setUint8(0, 0x31); + v.setUint8(1, 0x01); // IS_INPUT flag + v.setUint16(2, (_streamSeq++) & 0xFFFF, true); + v.setUint32(4, ts, true); + v.setUint32(12, payload.length, true); + msg.set(payload, HEADER_SIZE); + _recvWs.send(msg.buffer); + } } function _streamSync(signals) { @@ -1947,9 +1973,13 @@ const DS = (() => { emit('remote_scroll', { dx: view.getInt16(0, true), dy: view.getInt16(2, true) }); break; case 0x31: { - // Signal diff — apply to local signals + // Signal diff from a receiver — apply locally AND rebroadcast to all receivers var json = new TextDecoder().decode(payload); _applyRemoteDiff(json); + // Rebroadcast via source WS so all other receivers get it + if (_streamWs && _streamWs.readyState === 1) { + _streamSend(0x31, 0, payload); + } break; } }