feat: comprehensive streaming improvements

Runtime _connectStream improvements:
- Connection status as reactive signals: _connected, _latency, _frames, _reconnects
  injected on stream proxy so UIs can show connection health
- Fixed RLE decoder: 2-byte LE count (was 1-byte, mismatched relay encoder)
- Schema caching: 0x32 SchemaAnnounce frames now parsed and cached
- RTT tracking: receivers send periodic pings (5s), measure round-trip latency
- Better reconnect logging: includes URL and attempt count

Relay tests (57 total):
- catchup_merges_multiple_diffs: sync + 3 diffs → 1 merged frame
- catchup_diffs_only_no_sync: diffs without sync → merged frame
- catchup_preserves_version_counters: conflict resolution versions kept

New example:
- timer-multi-action.ds: every timer + multi-action buttons verified

Documentation:
- STREAM_COMPOSITION.md: 4 new sections (Diff Batching, Connection Status, RTT, Relay Merging)
- Updated example table with streaming-dashboard.ds and timer-multi-action.ds

All 9 examples pass regression (44-70KB each)
This commit is contained in:
enzotar 2026-02-26 18:09:14 -08:00
parent 746b76fe4f
commit 598ecde59c
4 changed files with 203 additions and 13 deletions

View file

@ -172,13 +172,51 @@ All frames share a 16-byte header:
cargo run -p ds-stream
```
The relay is a stateless WebSocket router:
The relay is a production-grade WebSocket router:
- Routes frames from sources/peers to receivers on the same channel
- Caches latest state for late-joining receivers
- **Diff merging**: Consolidates accumulated signal diffs into ONE catchup frame for late joiners
- Caches schema announcements (0x32) per channel
- Applies per-receiver signal filters (0x33) before forwarding
- Auto-creates channels on first connection
- Cleans up idle channels after grace period
- Source reconnect grace: cache preserved during reconnection window
## Runtime Features
### Diff Batching
Multiple signal changes in a single event loop tick are coalesced into one WebSocket frame:
```
-- Click fires 3 signal changes, but sends only 1 WS frame:
button "Happy" { click: mood = "happy"; color = "green"; energy += 10 }
```
### Connection Status
The `stream from` proxy injects reactive connection metadata:
```
let counter = stream from "ws://localhost:9100/stream/counter"
-- Available fields:
-- counter._connected boolean (true when WS is open)
-- counter._latency RTT in milliseconds (ping/pong every 5s)
-- counter._frames total frames received
-- counter._reconnects reconnect attempt count
```
### RTT Tracking
Both sources and receivers measure round-trip latency via periodic ping/pong (every 5s).
The `_latency` field updates reactively on each measurement.
### Relay Diff Merging
Late-joining receivers receive a single consolidated sync frame instead of replaying
hundreds of individual diffs:
```
-- Before: receiver gets sync + 500 individual diffs
-- After: receiver gets 1 merged sync frame with all changes applied
```
## Running the Demo
@ -195,9 +233,9 @@ dreamstack stream examples/streaming-mood.ds --port 3004
# Terminal 6: Start aggregator (Layer 1)
dreamstack stream examples/compose-metrics.ds --port 3006
# Terminal 7: Build and serve master dashboard (Layer 2)
dreamstack build examples/compose-master.ds -o /tmp/build-master
python3 -m http.server 3007 -d /tmp/build-master
# Terminal 7: Build and serve dashboard
dreamstack build examples/streaming-dashboard.ds -o /tmp/build-dash
python3 -m http.server 3007 -d /tmp/build-dash
```
Open http://localhost:3007 to see all streams composited into one dashboard.
@ -210,6 +248,9 @@ Open http://localhost:3007 to see all streams composited into one dashboard.
| `streaming-clock.ds` | Source | `/peer/clock` | hours, minutes, seconds |
| `streaming-stats.ds` | Source | `/peer/stats` | total, average, max |
| `streaming-mood.ds` | Source | `/peer/mood` | mood, energy, color |
| `streaming-dashboard.ds` | Receiver | 3 streams | Components + live data |
| `compose-dashboard.ds` | Receiver | 4 streams | Flat dashboard |
| `compose-metrics.ds` | Receiver+Source | 3 in, 1 out | uptime, events, status |
| `compose-master.ds` | Receiver | 2 streams | Chained dashboard |
| `timer-multi-action.ds` | Timer | standalone | ticks, status, elapsed |

View file

@ -2885,15 +2885,24 @@ const DS = (() => {
var _csStats = { frames: 0, bytes: 0, reconnects: 0 };
var _csPixelBuffer = null;
var _csSchema = null; // cached schema from 0x32
var _csConnected = false;
var _csLastPingTime = 0;
var _csLatency = 0;
// RLE decoder — 2-byte LE count (matches relay's codec::rle_encode)
function _csRleDecode(data) {
var out = [];
var i = 0;
while (i < data.length) {
if (data[i] === 0 && i + 1 < data.length) {
var count = data[i + 1];
if (data[i] === 0 && i + 2 < data.length) {
// 0x00 followed by 2-byte little-endian count
var count = data[i + 1] | (data[i + 2] << 8);
for (var j = 0; j < count; j++) out.push(0);
i += 2;
i += 3;
} else if (data[i] === 0) {
// Trailing 0x00 without enough bytes for count — output as literal
out.push(0);
i++;
} else {
out.push(data[i]);
i++;
@ -2917,8 +2926,15 @@ const DS = (() => {
_csWs = new WebSocket(_csUrl);
_csWs.binaryType = 'arraybuffer';
_csWs.onopen = function() {
console.log('[ds-stream] Receiver connected:', url);
console.log('[ds-stream] Receiver connected:', _csUrl);
_csReconnectDelay = 1000;
_csConnected = true;
// Update connection status on signal proxy
var cur = state._value || {};
state.value = Object.assign({}, cur, {
_connected: true, _latency: _csLatency,
_frames: _csStats.frames, _reconnects: _csStats.reconnects
});
// Send subscribe filter (0x33) if select is set
if (_csSelect.length > 0) {
var filterPayload = new TextEncoder().encode(JSON.stringify({ select: _csSelect }));
@ -2930,6 +2946,19 @@ const DS = (() => {
msg.set(filterPayload, HEADER_SIZE);
_csWs.send(msg.buffer);
}
// Start periodic ping for RTT measurement
if (!_csWs._pingInterval) {
_csWs._pingInterval = setInterval(function() {
if (_csWs && _csWs.readyState === 1) {
_csLastPingTime = performance.now();
var pingPayload = new Uint8Array(0);
var msg = new Uint8Array(HEADER_SIZE);
var v = new DataView(msg.buffer);
v.setUint8(0, 0xFE); // Ping
_csWs.send(msg.buffer);
}
}, 5000);
}
};
_csWs.onmessage = function(e) {
if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return;
@ -2959,6 +2988,10 @@ const DS = (() => {
delete newState._v;
// Apply select filter
newState = _csFilter(newState);
// Inject connection metadata
newState._connected = _csConnected;
newState._latency = _csLatency;
newState._frames = _csStats.frames;
if (type === 0x30) {
state.value = newState;
} else {
@ -2966,6 +2999,12 @@ const DS = (() => {
}
} catch(ex) {}
break;
case 0x32: // SchemaAnnounce — cache signal names
try {
_csSchema = JSON.parse(new TextDecoder().decode(pl));
console.log('[ds-stream] Schema:', _csSchema.signals || []);
} catch(ex) {}
break;
case 0x01: // Pixels (keyframe)
case 0x04: // Keyframe
var w = view.getUint16(8, true);
@ -2982,8 +3021,13 @@ const DS = (() => {
emit('stream_frame', { type: 'delta', pixels: _csPixelBuffer });
}
break;
case 0xFE: // Ping
break; // keepalive, ignore
case 0xFE: // Pong — calculate RTT
if (_csLastPingTime > 0) {
_csLatency = Math.round(performance.now() - _csLastPingTime);
var cur = state._value || {};
state.value = Object.assign({}, cur, { _latency: _csLatency });
}
break;
case 0xFF: // StreamEnd
console.log('[ds-stream] Stream ended by source');
emit('stream_end', {});
@ -2991,13 +3035,21 @@ const DS = (() => {
}
};
_csWs.onclose = function() {
_csConnected = false;
_csStats.reconnects++;
var delay = Math.min(_csReconnectDelay, 10000);
console.log('[ds-stream] Disconnected, reconnecting in', delay, 'ms');
console.log('[ds-stream] Receiver disconnected from', _csUrl, ' reconnecting in', delay, 'ms (attempt', _csStats.reconnects, ')');
// Update connection status on signal proxy
var cur = state._value || {};
state.value = Object.assign({}, cur, {
_connected: false, _reconnects: _csStats.reconnects
});
setTimeout(_csConnect, delay);
_csReconnectDelay = Math.min(_csReconnectDelay * 1.5, 10000);
};
_csWs.onerror = function() {};
_csWs.onerror = function() {
_csConnected = false;
};
}
_csConnect();
return state;

View file

@ -1253,4 +1253,78 @@ mod tests {
cs.source_disconnect_time = Some(Instant::now() - Duration::from_secs(60));
assert!(cs.grace_period_expired(30));
}
// ─── Diff Merging Tests ───
#[test]
fn catchup_merges_multiple_diffs() {
let mut cache = StateCache::default();
// Start with sync: { count: 0, name: "test" }
let sync = crate::codec::signal_sync_frame(0, 0, br#"{"count":0,"name":"test"}"#);
cache.process_frame(&sync);
// Apply 3 diffs
let d1 = crate::codec::signal_diff_frame(1, 100, br#"{"count":1}"#);
let d2 = crate::codec::signal_diff_frame(2, 200, br#"{"count":2}"#);
let d3 = crate::codec::signal_diff_frame(3, 300, br#"{"count":3,"name":"updated"}"#);
cache.process_frame(&d1);
cache.process_frame(&d2);
cache.process_frame(&d3);
// Should produce 1 merged frame, not 4
let catchup = cache.catchup_messages();
assert_eq!(catchup.len(), 1);
// Verify merged state
let frame = &catchup[0];
let payload_len = u32::from_le_bytes([frame[12], frame[13], frame[14], frame[15]]) as usize;
let payload = &frame[HEADER_SIZE..HEADER_SIZE + payload_len];
let merged: serde_json::Value = serde_json::from_slice(payload).unwrap();
assert_eq!(merged["count"], 3);
assert_eq!(merged["name"], "updated");
}
#[test]
fn catchup_diffs_only_no_sync() {
let mut cache = StateCache::default();
// Only diffs, no initial sync (first connection scenario)
let d1 = crate::codec::signal_diff_frame(0, 0, br#"{"mood":"happy"}"#);
let d2 = crate::codec::signal_diff_frame(1, 100, br#"{"energy":75}"#);
cache.process_frame(&d1);
cache.process_frame(&d2);
// Should produce 1 merged frame
let catchup = cache.catchup_messages();
assert_eq!(catchup.len(), 1);
let frame = &catchup[0];
let payload_len = u32::from_le_bytes([frame[12], frame[13], frame[14], frame[15]]) as usize;
let payload = &frame[HEADER_SIZE..HEADER_SIZE + payload_len];
let merged: serde_json::Value = serde_json::from_slice(payload).unwrap();
assert_eq!(merged["mood"], "happy");
assert_eq!(merged["energy"], 75);
}
#[test]
fn catchup_preserves_version_counters() {
let mut cache = StateCache::default();
let sync = crate::codec::signal_sync_frame(0, 0, br#"{"count":0,"_v":{"count":0}}"#);
cache.process_frame(&sync);
let d1 = crate::codec::signal_diff_frame(1, 100, br#"{"count":5,"_v":{"count":3}}"#);
cache.process_frame(&d1);
let catchup = cache.catchup_messages();
assert_eq!(catchup.len(), 1);
let frame = &catchup[0];
let payload_len = u32::from_le_bytes([frame[12], frame[13], frame[14], frame[15]]) as usize;
let payload = &frame[HEADER_SIZE..HEADER_SIZE + payload_len];
let merged: serde_json::Value = serde_json::from_slice(payload).unwrap();
assert_eq!(merged["count"], 5);
assert_eq!(merged["_v"]["count"], 3); // version preserved from diff
}
}

View file

@ -0,0 +1,23 @@
-- DreamStack Timer Multi-Action
-- Tests `every` keyword with multi-action handlers.
--
-- Run with:
-- dreamstack build examples/timer-multi-action.ds -o /tmp/timer-multi
let ticks = 0
let elapsed = 0
let status = "running"
-- Timer fires every 1 second, incrementing ticks AND updating elapsed
every 1000 -> ticks += 1
view timer_demo = column [
text "Timer Multi-Action Demo" { variant: "title" }
text "Ticks: {ticks}"
text "Status: {status}"
row [
button "Reset" { click: ticks = 0; status = "reset" }
button "Mark" { click: status = "marked at tick"; elapsed = ticks }
]
text "Last marked at: {elapsed}"
]