feat: bidirectional signal streaming sync

- _signalRegistry: maps signal names → signal objects
- _registerSignal: called after each DS.signal() declaration
- _applyRemoteDiff: JSON diff → update local signals + flush
- _initStreamReceiver: parallel receiver WS for incoming diffs
- Echo loop guard: _applyingRemoteDiff prevents re-broadcasting
- 0x31 handler in _handleRemoteInput for signal diff frames

Changes sync both directions: laptop → phone, phone → laptop.
This commit is contained in:
enzotar 2026-02-25 21:00:57 -08:00
parent 0369bf831f
commit 69c7ff1e22
2 changed files with 58 additions and 2 deletions

View file

@ -104,6 +104,8 @@ impl JsEmitter {
}
};
self.emit_line(&format!("const {} = DS.signal({});", node.name, init));
// Register signal for bidirectional streaming sync
self.emit_line(&format!("DS._registerSignal(\"{}\", {});", node.name, node.name));
}
SignalKind::Derived => {
// Find the let declaration to get the expression
@ -1814,6 +1816,29 @@ const DS = (() => {
};
}
// ── Signal registry for bidirectional sync ──
var _signalRegistry = {};
var _applyingRemoteDiff = false;
function _registerSignal(name, sig) {
_signalRegistry[name] = sig;
}
function _applyRemoteDiff(json) {
_applyingRemoteDiff = true;
try {
var data = JSON.parse(json);
for (var name in data) {
var sig = _signalRegistry[name];
if (sig) {
sig.value = data[name];
}
}
flush();
} catch(e) { console.warn('[ds-stream] bad diff:', e); }
_applyingRemoteDiff = false;
}
function _initStream(url, mode) {
_streamMode = mode || 'signal';
_streamStart = performance.now();
@ -1829,6 +1854,30 @@ const DS = (() => {
};
_streamWs.onclose = function() { setTimeout(function() { _initStream(url, mode); }, 2000); };
console.log('[ds-stream] Source connected:', url, 'mode:', mode);
// Also open a receiver connection for bidirectional sync
var receiverUrl = url.replace(/\/source/, '/stream');
_initStreamReceiver(receiverUrl);
}
// Receiver WS — listens for diffs from other clients via relay
function _initStreamReceiver(url) {
var recvWs = new WebSocket(url);
recvWs.binaryType = 'arraybuffer';
recvWs.onmessage = function(e) {
if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return;
var bytes = new Uint8Array(e.data);
var view = new DataView(bytes.buffer);
var type = view.getUint8(0);
// Signal diff frame
if (type === 0x31) {
var payload = bytes.subarray(HEADER_SIZE);
var json = new TextDecoder().decode(payload);
_applyRemoteDiff(json);
}
};
recvWs.onclose = function() { setTimeout(function() { _initStreamReceiver(url); }, 2000); };
console.log('[ds-stream] Receiver connected:', url);
}
function _streamSend(type, flags, payload) {
@ -1847,6 +1896,7 @@ const DS = (() => {
function _streamDiff(name, value) {
if (!_streamWs || _streamMode !== 'signal') return;
if (_applyingRemoteDiff) return; // prevent echo loops
var obj = {};
obj[name] = (typeof value === 'object' && value !== null && 'value' in value) ? value.value : value;
_streamSend(0x31, 0, new TextEncoder().encode(JSON.stringify(obj)));
@ -1896,6 +1946,12 @@ const DS = (() => {
case 0x50:
emit('remote_scroll', { dx: view.getInt16(0, true), dy: view.getInt16(2, true) });
break;
case 0x31: {
// Signal diff — apply to local signals
var json = new TextDecoder().decode(payload);
_applyRemoteDiff(json);
break;
}
}
}
@ -2107,7 +2163,7 @@ const DS = (() => {
scene: scene, circle: circle, rect: rect, line: line,
_initStream: _initStream, _streamDiff: _streamDiff, _streamSync: _streamSync,
_streamSceneState: _streamSceneState, _connectStream: _connectStream,
_initWebRTC: _initWebRTC,
_initWebRTC: _initWebRTC, _registerSignal: _registerSignal,
Signal: Signal, Derived: Derived, Effect: Effect, Spring: Spring };
Object.defineProperty(_ds, '_streamWs', { get: function() { return _streamWs; } });
Object.defineProperty(_ds, '_rtcDc', { get: function() { return _rtcDc; } });

View file

@ -12,7 +12,7 @@ let filter_mode = "all"
let total = len(todos)
-- Stream: any device on the network can view / interact
stream todo on "ws://localhost:9100" { mode: signal }
stream todo on "ws://192.168.1.235:9100" { mode: signal }
view app =
column [