From 598ecde59c9481ccb37fe35d6c32fb0c6fe5229b Mon Sep 17 00:00:00 2001 From: enzotar Date: Thu, 26 Feb 2026 18:09:14 -0800 Subject: [PATCH] feat: comprehensive streaming improvements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Runtime _connectStream improvements: - Connection status as reactive signals: _connected, _latency, _frames, _reconnects injected on stream proxy so UIs can show connection health - Fixed RLE decoder: 2-byte LE count (was 1-byte, mismatched relay encoder) - Schema caching: 0x32 SchemaAnnounce frames now parsed and cached - RTT tracking: receivers send periodic pings (5s), measure round-trip latency - Better reconnect logging: includes URL and attempt count Relay tests (57 total): - catchup_merges_multiple_diffs: sync + 3 diffs → 1 merged frame - catchup_diffs_only_no_sync: diffs without sync → merged frame - catchup_preserves_version_counters: conflict resolution versions kept New example: - timer-multi-action.ds: every timer + multi-action buttons verified Documentation: - STREAM_COMPOSITION.md: 4 new sections (Diff Batching, Connection Status, RTT, Relay Merging) - Updated example table with streaming-dashboard.ds and timer-multi-action.ds All 9 examples pass regression (44-70KB each) --- STREAM_COMPOSITION.md | 51 ++++++++++++++++-- compiler/ds-codegen/src/js_emitter.rs | 68 +++++++++++++++++++++--- engine/ds-stream/src/relay.rs | 74 +++++++++++++++++++++++++++ examples/timer-multi-action.ds | 23 +++++++++ 4 files changed, 203 insertions(+), 13 deletions(-) create mode 100644 examples/timer-multi-action.ds diff --git a/STREAM_COMPOSITION.md b/STREAM_COMPOSITION.md index 05a37f8..727e853 100644 --- a/STREAM_COMPOSITION.md +++ b/STREAM_COMPOSITION.md @@ -172,13 +172,51 @@ All frames share a 16-byte header: cargo run -p ds-stream ``` -The relay is a stateless WebSocket router: +The relay is a production-grade WebSocket router: - Routes frames from sources/peers to receivers on the same channel -- Caches latest state for late-joining receivers +- **Diff merging**: Consolidates accumulated signal diffs into ONE catchup frame for late joiners - Caches schema announcements (0x32) per channel - Applies per-receiver signal filters (0x33) before forwarding - Auto-creates channels on first connection - Cleans up idle channels after grace period +- Source reconnect grace: cache preserved during reconnection window + +## Runtime Features + +### Diff Batching + +Multiple signal changes in a single event loop tick are coalesced into one WebSocket frame: +``` +-- Click fires 3 signal changes, but sends only 1 WS frame: +button "Happy" { click: mood = "happy"; color = "green"; energy += 10 } +``` + +### Connection Status + +The `stream from` proxy injects reactive connection metadata: +``` +let counter = stream from "ws://localhost:9100/stream/counter" + +-- Available fields: +-- counter._connected boolean (true when WS is open) +-- counter._latency RTT in milliseconds (ping/pong every 5s) +-- counter._frames total frames received +-- counter._reconnects reconnect attempt count +``` + +### RTT Tracking + +Both sources and receivers measure round-trip latency via periodic ping/pong (every 5s). +The `_latency` field updates reactively on each measurement. + +### Relay Diff Merging + +Late-joining receivers receive a single consolidated sync frame instead of replaying +hundreds of individual diffs: +``` +-- Before: receiver gets sync + 500 individual diffs +-- After: receiver gets 1 merged sync frame with all changes applied +``` ## Running the Demo @@ -195,9 +233,9 @@ dreamstack stream examples/streaming-mood.ds --port 3004 # Terminal 6: Start aggregator (Layer 1) dreamstack stream examples/compose-metrics.ds --port 3006 -# Terminal 7: Build and serve master dashboard (Layer 2) -dreamstack build examples/compose-master.ds -o /tmp/build-master -python3 -m http.server 3007 -d /tmp/build-master +# Terminal 7: Build and serve dashboard +dreamstack build examples/streaming-dashboard.ds -o /tmp/build-dash +python3 -m http.server 3007 -d /tmp/build-dash ``` Open http://localhost:3007 to see all streams composited into one dashboard. @@ -210,6 +248,9 @@ Open http://localhost:3007 to see all streams composited into one dashboard. | `streaming-clock.ds` | Source | `/peer/clock` | hours, minutes, seconds | | `streaming-stats.ds` | Source | `/peer/stats` | total, average, max | | `streaming-mood.ds` | Source | `/peer/mood` | mood, energy, color | +| `streaming-dashboard.ds` | Receiver | 3 streams | Components + live data | | `compose-dashboard.ds` | Receiver | 4 streams | Flat dashboard | | `compose-metrics.ds` | Receiver+Source | 3 in, 1 out | uptime, events, status | | `compose-master.ds` | Receiver | 2 streams | Chained dashboard | +| `timer-multi-action.ds` | Timer | standalone | ticks, status, elapsed | + diff --git a/compiler/ds-codegen/src/js_emitter.rs b/compiler/ds-codegen/src/js_emitter.rs index f26155f..3bdae7f 100644 --- a/compiler/ds-codegen/src/js_emitter.rs +++ b/compiler/ds-codegen/src/js_emitter.rs @@ -2885,15 +2885,24 @@ const DS = (() => { var _csStats = { frames: 0, bytes: 0, reconnects: 0 }; var _csPixelBuffer = null; var _csSchema = null; // cached schema from 0x32 + var _csConnected = false; + var _csLastPingTime = 0; + var _csLatency = 0; + // RLE decoder — 2-byte LE count (matches relay's codec::rle_encode) function _csRleDecode(data) { var out = []; var i = 0; while (i < data.length) { - if (data[i] === 0 && i + 1 < data.length) { - var count = data[i + 1]; + if (data[i] === 0 && i + 2 < data.length) { + // 0x00 followed by 2-byte little-endian count + var count = data[i + 1] | (data[i + 2] << 8); for (var j = 0; j < count; j++) out.push(0); - i += 2; + i += 3; + } else if (data[i] === 0) { + // Trailing 0x00 without enough bytes for count — output as literal + out.push(0); + i++; } else { out.push(data[i]); i++; @@ -2917,8 +2926,15 @@ const DS = (() => { _csWs = new WebSocket(_csUrl); _csWs.binaryType = 'arraybuffer'; _csWs.onopen = function() { - console.log('[ds-stream] Receiver connected:', url); + console.log('[ds-stream] Receiver connected:', _csUrl); _csReconnectDelay = 1000; + _csConnected = true; + // Update connection status on signal proxy + var cur = state._value || {}; + state.value = Object.assign({}, cur, { + _connected: true, _latency: _csLatency, + _frames: _csStats.frames, _reconnects: _csStats.reconnects + }); // Send subscribe filter (0x33) if select is set if (_csSelect.length > 0) { var filterPayload = new TextEncoder().encode(JSON.stringify({ select: _csSelect })); @@ -2930,6 +2946,19 @@ const DS = (() => { msg.set(filterPayload, HEADER_SIZE); _csWs.send(msg.buffer); } + // Start periodic ping for RTT measurement + if (!_csWs._pingInterval) { + _csWs._pingInterval = setInterval(function() { + if (_csWs && _csWs.readyState === 1) { + _csLastPingTime = performance.now(); + var pingPayload = new Uint8Array(0); + var msg = new Uint8Array(HEADER_SIZE); + var v = new DataView(msg.buffer); + v.setUint8(0, 0xFE); // Ping + _csWs.send(msg.buffer); + } + }, 5000); + } }; _csWs.onmessage = function(e) { if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return; @@ -2959,6 +2988,10 @@ const DS = (() => { delete newState._v; // Apply select filter newState = _csFilter(newState); + // Inject connection metadata + newState._connected = _csConnected; + newState._latency = _csLatency; + newState._frames = _csStats.frames; if (type === 0x30) { state.value = newState; } else { @@ -2966,6 +2999,12 @@ const DS = (() => { } } catch(ex) {} break; + case 0x32: // SchemaAnnounce — cache signal names + try { + _csSchema = JSON.parse(new TextDecoder().decode(pl)); + console.log('[ds-stream] Schema:', _csSchema.signals || []); + } catch(ex) {} + break; case 0x01: // Pixels (keyframe) case 0x04: // Keyframe var w = view.getUint16(8, true); @@ -2982,8 +3021,13 @@ const DS = (() => { emit('stream_frame', { type: 'delta', pixels: _csPixelBuffer }); } break; - case 0xFE: // Ping - break; // keepalive, ignore + case 0xFE: // Pong — calculate RTT + if (_csLastPingTime > 0) { + _csLatency = Math.round(performance.now() - _csLastPingTime); + var cur = state._value || {}; + state.value = Object.assign({}, cur, { _latency: _csLatency }); + } + break; case 0xFF: // StreamEnd console.log('[ds-stream] Stream ended by source'); emit('stream_end', {}); @@ -2991,13 +3035,21 @@ const DS = (() => { } }; _csWs.onclose = function() { + _csConnected = false; _csStats.reconnects++; var delay = Math.min(_csReconnectDelay, 10000); - console.log('[ds-stream] Disconnected, reconnecting in', delay, 'ms'); + console.log('[ds-stream] Receiver disconnected from', _csUrl, '— reconnecting in', delay, 'ms (attempt', _csStats.reconnects, ')'); + // Update connection status on signal proxy + var cur = state._value || {}; + state.value = Object.assign({}, cur, { + _connected: false, _reconnects: _csStats.reconnects + }); setTimeout(_csConnect, delay); _csReconnectDelay = Math.min(_csReconnectDelay * 1.5, 10000); }; - _csWs.onerror = function() {}; + _csWs.onerror = function() { + _csConnected = false; + }; } _csConnect(); return state; diff --git a/engine/ds-stream/src/relay.rs b/engine/ds-stream/src/relay.rs index 979aa2a..8f63dab 100644 --- a/engine/ds-stream/src/relay.rs +++ b/engine/ds-stream/src/relay.rs @@ -1253,4 +1253,78 @@ mod tests { cs.source_disconnect_time = Some(Instant::now() - Duration::from_secs(60)); assert!(cs.grace_period_expired(30)); } + + // ─── Diff Merging Tests ─── + + #[test] + fn catchup_merges_multiple_diffs() { + let mut cache = StateCache::default(); + + // Start with sync: { count: 0, name: "test" } + let sync = crate::codec::signal_sync_frame(0, 0, br#"{"count":0,"name":"test"}"#); + cache.process_frame(&sync); + + // Apply 3 diffs + let d1 = crate::codec::signal_diff_frame(1, 100, br#"{"count":1}"#); + let d2 = crate::codec::signal_diff_frame(2, 200, br#"{"count":2}"#); + let d3 = crate::codec::signal_diff_frame(3, 300, br#"{"count":3,"name":"updated"}"#); + cache.process_frame(&d1); + cache.process_frame(&d2); + cache.process_frame(&d3); + + // Should produce 1 merged frame, not 4 + let catchup = cache.catchup_messages(); + assert_eq!(catchup.len(), 1); + + // Verify merged state + let frame = &catchup[0]; + let payload_len = u32::from_le_bytes([frame[12], frame[13], frame[14], frame[15]]) as usize; + let payload = &frame[HEADER_SIZE..HEADER_SIZE + payload_len]; + let merged: serde_json::Value = serde_json::from_slice(payload).unwrap(); + assert_eq!(merged["count"], 3); + assert_eq!(merged["name"], "updated"); + } + + #[test] + fn catchup_diffs_only_no_sync() { + let mut cache = StateCache::default(); + + // Only diffs, no initial sync (first connection scenario) + let d1 = crate::codec::signal_diff_frame(0, 0, br#"{"mood":"happy"}"#); + let d2 = crate::codec::signal_diff_frame(1, 100, br#"{"energy":75}"#); + cache.process_frame(&d1); + cache.process_frame(&d2); + + // Should produce 1 merged frame + let catchup = cache.catchup_messages(); + assert_eq!(catchup.len(), 1); + + let frame = &catchup[0]; + let payload_len = u32::from_le_bytes([frame[12], frame[13], frame[14], frame[15]]) as usize; + let payload = &frame[HEADER_SIZE..HEADER_SIZE + payload_len]; + let merged: serde_json::Value = serde_json::from_slice(payload).unwrap(); + assert_eq!(merged["mood"], "happy"); + assert_eq!(merged["energy"], 75); + } + + #[test] + fn catchup_preserves_version_counters() { + let mut cache = StateCache::default(); + + let sync = crate::codec::signal_sync_frame(0, 0, br#"{"count":0,"_v":{"count":0}}"#); + cache.process_frame(&sync); + + let d1 = crate::codec::signal_diff_frame(1, 100, br#"{"count":5,"_v":{"count":3}}"#); + cache.process_frame(&d1); + + let catchup = cache.catchup_messages(); + assert_eq!(catchup.len(), 1); + + let frame = &catchup[0]; + let payload_len = u32::from_le_bytes([frame[12], frame[13], frame[14], frame[15]]) as usize; + let payload = &frame[HEADER_SIZE..HEADER_SIZE + payload_len]; + let merged: serde_json::Value = serde_json::from_slice(payload).unwrap(); + assert_eq!(merged["count"], 5); + assert_eq!(merged["_v"]["count"], 3); // version preserved from diff + } } diff --git a/examples/timer-multi-action.ds b/examples/timer-multi-action.ds new file mode 100644 index 0000000..5ce1dc1 --- /dev/null +++ b/examples/timer-multi-action.ds @@ -0,0 +1,23 @@ +-- DreamStack Timer Multi-Action +-- Tests `every` keyword with multi-action handlers. +-- +-- Run with: +-- dreamstack build examples/timer-multi-action.ds -o /tmp/timer-multi + +let ticks = 0 +let elapsed = 0 +let status = "running" + +-- Timer fires every 1 second, incrementing ticks AND updating elapsed +every 1000 -> ticks += 1 + +view timer_demo = column [ + text "Timer Multi-Action Demo" { variant: "title" } + text "Ticks: {ticks}" + text "Status: {status}" + row [ + button "Reset" { click: ticks = 0; status = "reset" } + button "Mark" { click: status = "marked at tick"; elapsed = ticks } + ] + text "Last marked at: {elapsed}" +]