diff --git a/compiler/ds-codegen/src/js_emitter.rs b/compiler/ds-codegen/src/js_emitter.rs index 3f12807..e7437e4 100644 --- a/compiler/ds-codegen/src/js_emitter.rs +++ b/compiler/ds-codegen/src/js_emitter.rs @@ -104,6 +104,8 @@ impl JsEmitter { } }; self.emit_line(&format!("const {} = DS.signal({});", node.name, init)); + // Register signal for bidirectional streaming sync + self.emit_line(&format!("DS._registerSignal(\"{}\", {});", node.name, node.name)); } SignalKind::Derived => { // Find the let declaration to get the expression @@ -1814,6 +1816,29 @@ const DS = (() => { }; } + // ── Signal registry for bidirectional sync ── + var _signalRegistry = {}; + var _applyingRemoteDiff = false; + + function _registerSignal(name, sig) { + _signalRegistry[name] = sig; + } + + function _applyRemoteDiff(json) { + _applyingRemoteDiff = true; + try { + var data = JSON.parse(json); + for (var name in data) { + var sig = _signalRegistry[name]; + if (sig) { + sig.value = data[name]; + } + } + flush(); + } catch(e) { console.warn('[ds-stream] bad diff:', e); } + _applyingRemoteDiff = false; + } + function _initStream(url, mode) { _streamMode = mode || 'signal'; _streamStart = performance.now(); @@ -1829,6 +1854,30 @@ const DS = (() => { }; _streamWs.onclose = function() { setTimeout(function() { _initStream(url, mode); }, 2000); }; console.log('[ds-stream] Source connected:', url, 'mode:', mode); + + // Also open a receiver connection for bidirectional sync + var receiverUrl = url.replace(/\/source/, '/stream'); + _initStreamReceiver(receiverUrl); + } + + // Receiver WS — listens for diffs from other clients via relay + function _initStreamReceiver(url) { + var recvWs = new WebSocket(url); + recvWs.binaryType = 'arraybuffer'; + recvWs.onmessage = function(e) { + if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return; + var bytes = new Uint8Array(e.data); + var view = new DataView(bytes.buffer); + var type = view.getUint8(0); + // Signal diff frame + if (type === 0x31) { + var payload = bytes.subarray(HEADER_SIZE); + var json = new TextDecoder().decode(payload); + _applyRemoteDiff(json); + } + }; + recvWs.onclose = function() { setTimeout(function() { _initStreamReceiver(url); }, 2000); }; + console.log('[ds-stream] Receiver connected:', url); } function _streamSend(type, flags, payload) { @@ -1847,6 +1896,7 @@ const DS = (() => { function _streamDiff(name, value) { if (!_streamWs || _streamMode !== 'signal') return; + if (_applyingRemoteDiff) return; // prevent echo loops var obj = {}; obj[name] = (typeof value === 'object' && value !== null && 'value' in value) ? value.value : value; _streamSend(0x31, 0, new TextEncoder().encode(JSON.stringify(obj))); @@ -1896,6 +1946,12 @@ const DS = (() => { case 0x50: emit('remote_scroll', { dx: view.getInt16(0, true), dy: view.getInt16(2, true) }); break; + case 0x31: { + // Signal diff — apply to local signals + var json = new TextDecoder().decode(payload); + _applyRemoteDiff(json); + break; + } } } @@ -2107,7 +2163,7 @@ const DS = (() => { scene: scene, circle: circle, rect: rect, line: line, _initStream: _initStream, _streamDiff: _streamDiff, _streamSync: _streamSync, _streamSceneState: _streamSceneState, _connectStream: _connectStream, - _initWebRTC: _initWebRTC, + _initWebRTC: _initWebRTC, _registerSignal: _registerSignal, Signal: Signal, Derived: Derived, Effect: Effect, Spring: Spring }; Object.defineProperty(_ds, '_streamWs', { get: function() { return _streamWs; } }); Object.defineProperty(_ds, '_rtcDc', { get: function() { return _rtcDc; } }); diff --git a/examples/todo.ds b/examples/todo.ds index 63a65c2..83ef5e3 100644 --- a/examples/todo.ds +++ b/examples/todo.ds @@ -12,7 +12,7 @@ let filter_mode = "all" let total = len(todos) -- Stream: any device on the network can view / interact -stream todo on "ws://localhost:9100" { mode: signal } +stream todo on "ws://192.168.1.235:9100" { mode: signal } view app = column [