perf: streaming core improvements — batched diffs, RTT tracking, relay merging

Runtime improvements:
- Diff batching: multiple signal changes coalesced into 1 WS frame
  via _pendingDiffs + microtask Promise.resolve()
- Connection status: _streamConnected, _streamLatency, _streamReconnects,
  _streamFrameCount, _streamByteCount tracked for stream health
- RTT tracking: periodic ping/pong every 5s measures round-trip latency
- Removed redundant _streamSync from flush() — diffs are already batched

Relay improvements:
- Diff merging: late joiners receive 1 consolidated sync frame instead
  of replaying hundreds of individual diffs
- Version counters merged: conflict resolution preserved across catchup
- Fallback: if JSON parse fails, falls back to raw sync + all diffs

Test updates:
- state_cache_signal_sync verifies merged payload (count: 1)
- All 54 relay tests pass
- All 8 examples pass regression
This commit is contained in:
enzotar 2026-02-26 18:02:31 -08:00
parent f29673cbd8
commit 746b76fe4f
2 changed files with 182 additions and 17 deletions

View file

@ -2316,10 +2316,8 @@ const DS = (() => {
for (const eff of effects) {
eff._run();
}
// After effects recompute derived signals, sync all values to stream
if (_streamWs && _streamWs.readyState === 1 && _streamMode === 'signal' && !_applyingRemoteDiff) {
_streamSync(_signalRegistry);
}
// After effects recompute derived signals, flush batched diffs to stream
// (individual _streamDiff calls already batched changes — no full sync needed)
}
// ── Event System ──
@ -2640,6 +2638,18 @@ const DS = (() => {
let _streamStart = 0;
let _prevSignals = null;
// ── Diff Batching — coalesce multiple signal changes into one WS frame ──
var _pendingDiffs = {}; // accumulated changes: { name: value }
var _pendingVersions = {}; // version for each pending change
var _diffFlushTimer = null; // microtask coalescing
// ── Connection Status — reactive signals for stream health ──
var _streamConnected = false;
var _streamLatency = 0; // RTT in milliseconds
var _streamReconnects = 0;
var _streamFrameCount = 0;
var _streamByteCount = 0;
var _lastPingTime = 0; // for RTT calculation
function _encodeHeader(type, flags, seq, ts, w, h, len) {
const b = new ArrayBuffer(HEADER_SIZE);
@ -2712,15 +2722,32 @@ const DS = (() => {
var bytes = new Uint8Array(e.data);
var view = new DataView(bytes.buffer);
var type = view.getUint8(0);
_streamFrameCount++;
_streamByteCount += e.data.byteLength;
// Signal diff from another peer — apply to local signals
if (type === 0x31) {
var payload = bytes.subarray(HEADER_SIZE);
var json = new TextDecoder().decode(payload);
_applyRemoteDiff(json);
}
// Pong response — calculate RTT
if (type === 0xFE) {
if (_lastPingTime > 0) {
_streamLatency = Math.round(performance.now() - _lastPingTime);
}
}
};
_streamWs.onclose = function() {
_streamConnected = false;
_streamReconnects++;
console.log('[ds-stream] Disconnected, reconnecting in 2s (attempt ' + _streamReconnects + ')');
setTimeout(function() { _initStream(url, mode); }, 2000);
};
_streamWs.onerror = function() {
_streamConnected = false;
};
_streamWs.onclose = function() { setTimeout(function() { _initStream(url, mode); }, 2000); };
_streamWs.onopen = function() {
_streamConnected = true;
console.log('[ds-stream] Peer connected:', peerUrl);
// Send schema announcement (0x32) with output signal list
var outputNames = Object.keys(_signalRegistry);
@ -2737,6 +2764,16 @@ const DS = (() => {
fullState._v[name] = _signalVersions[name] || 0;
}
_streamSend(0x31, 0, new TextEncoder().encode(JSON.stringify(fullState)));
// Start periodic ping for RTT measurement (every 5s)
if (!_streamWs._pingInterval) {
_streamWs._pingInterval = setInterval(function() {
if (_streamWs && _streamWs.readyState === 1) {
_lastPingTime = performance.now();
_streamSend(0xFE, 0, new Uint8Array(0));
}
}, 5000);
}
};
}
@ -2754,15 +2791,35 @@ const DS = (() => {
_streamWs.send(msg.buffer);
}
// ── Batched diff: coalesce multiple signal changes into one WS frame ──
function _streamDiff(name, value) {
if (!_streamWs || _streamWs.readyState !== 1 || _streamMode !== 'signal') return;
if (_applyingRemoteDiff) return; // prevent echo loops
// Increment version for conflict resolution
_signalVersions[name] = (_signalVersions[name] || 0) + 1;
// Accumulate into pending batch
_pendingDiffs[name] = (typeof value === 'object' && value !== null && 'value' in value) ? value.value : value;
_pendingVersions[name] = _signalVersions[name];
// Schedule flush on next microtask (coalesces all changes in current event loop tick)
if (!_diffFlushTimer) {
_diffFlushTimer = Promise.resolve().then(_flushDiffBatch);
}
}
function _flushDiffBatch() {
_diffFlushTimer = null;
var keys = Object.keys(_pendingDiffs);
if (keys.length === 0) return;
// Build single frame with all changes
var obj = { _pid: _peerId, _v: {} };
obj._v[name] = _signalVersions[name];
obj[name] = (typeof value === 'object' && value !== null && 'value' in value) ? value.value : value;
for (var i = 0; i < keys.length; i++) {
var k = keys[i];
obj[k] = _pendingDiffs[k];
obj._v[k] = _pendingVersions[k];
}
_streamSend(0x31, 0, new TextEncoder().encode(JSON.stringify(obj)));
_pendingDiffs = {};
_pendingVersions = {};
}
function _streamSync(signals) {

View file

@ -144,17 +144,117 @@ impl StateCache {
}
/// Get all messages a late-joining receiver needs to reconstruct current state.
///
/// Instead of replaying hundreds of individual diffs, merges all accumulated
/// diffs into the last sync frame to produce ONE consolidated state message.
/// This dramatically reduces catchup time and bandwidth.
fn catchup_messages(&self) -> Vec<Vec<u8>> {
let mut msgs = Vec::new();
// Send last signal sync first (if available)
if let Some(ref sync) = self.last_signal_sync {
msgs.push(sync.clone());
// Strategy: merge all signal diffs into last sync to build a consolidated frame
if let Some(ref sync_frame) = self.last_signal_sync {
if !self.pending_signal_diffs.is_empty() {
// Parse the sync frame's JSON payload
if sync_frame.len() >= HEADER_SIZE {
let payload_len = u32::from_le_bytes([
sync_frame[12], sync_frame[13], sync_frame[14], sync_frame[15],
]) as usize;
let sync_end = HEADER_SIZE + payload_len.min(sync_frame.len() - HEADER_SIZE);
let sync_payload = &sync_frame[HEADER_SIZE..sync_end];
if let Ok(mut merged) = serde_json::from_slice::<serde_json::Value>(sync_payload) {
// Apply each diff on top
for diff_frame in &self.pending_signal_diffs {
if diff_frame.len() < HEADER_SIZE { continue; }
let diff_pl_len = u32::from_le_bytes([
diff_frame[12], diff_frame[13], diff_frame[14], diff_frame[15],
]) as usize;
let diff_end = HEADER_SIZE + diff_pl_len.min(diff_frame.len() - HEADER_SIZE);
let diff_payload = &diff_frame[HEADER_SIZE..diff_end];
if let Ok(diff_obj) = serde_json::from_slice::<serde_json::Value>(diff_payload) {
if let (Some(merged_map), Some(diff_map)) = (merged.as_object_mut(), diff_obj.as_object()) {
for (k, v) in diff_map {
if k != "_pid" && k != "_v" {
merged_map.insert(k.clone(), v.clone());
}
}
// Merge version counters
if let (Some(merged_v), Some(diff_v)) = (
merged_map.get_mut("_v").and_then(|v| v.as_object_mut()),
diff_map.get("_v").and_then(|v| v.as_object()),
) {
for (k, v) in diff_v {
merged_v.insert(k.clone(), v.clone());
}
}
}
}
}
// Build a consolidated SignalSync (0x30) frame
let merged_json = serde_json::to_vec(&merged).unwrap_or_default();
let mut frame = Vec::with_capacity(HEADER_SIZE + merged_json.len());
// Copy sync frame header but set type to SignalSync and update length
frame.extend_from_slice(&sync_frame[..HEADER_SIZE]);
frame[0] = 0x30; // SignalSync
frame[1] = 0x02; // FLAG_KEYFRAME
let len_bytes = (merged_json.len() as u32).to_le_bytes();
frame[12] = len_bytes[0];
frame[13] = len_bytes[1];
frame[14] = len_bytes[2];
frame[15] = len_bytes[3];
frame.truncate(HEADER_SIZE);
frame.extend_from_slice(&merged_json);
msgs.push(frame);
} else {
// Fallback: can't parse, send raw sync + all diffs
msgs.push(sync_frame.clone());
for diff in &self.pending_signal_diffs {
msgs.push(diff.clone());
}
}
} else {
msgs.push(sync_frame.clone());
}
} else {
// No diffs accumulated — just send the sync
msgs.push(sync_frame.clone());
}
} else if !self.pending_signal_diffs.is_empty() {
// No sync frame but we have diffs — merge diffs into one frame
let mut merged = serde_json::Map::new();
let mut merged_v = serde_json::Map::new();
for diff_frame in &self.pending_signal_diffs {
if diff_frame.len() < HEADER_SIZE { continue; }
let diff_pl_len = u32::from_le_bytes([
diff_frame[12], diff_frame[13], diff_frame[14], diff_frame[15],
]) as usize;
let diff_end = HEADER_SIZE + diff_pl_len.min(diff_frame.len() - HEADER_SIZE);
let diff_payload = &diff_frame[HEADER_SIZE..diff_end];
if let Ok(diff_obj) = serde_json::from_slice::<serde_json::Value>(diff_payload) {
if let Some(diff_map) = diff_obj.as_object() {
for (k, v) in diff_map {
if k == "_v" {
if let Some(v_map) = v.as_object() {
for (vk, vv) in v_map {
merged_v.insert(vk.clone(), vv.clone());
}
}
} else if k != "_pid" {
merged.insert(k.clone(), v.clone());
}
}
}
}
}
if !merged_v.is_empty() {
merged.insert("_v".to_string(), serde_json::Value::Object(merged_v));
}
let merged_json = serde_json::to_vec(&serde_json::Value::Object(merged)).unwrap_or_default();
let frame = crate::codec::signal_sync_frame(0, 0, &merged_json);
msgs.push(frame);
}
// Then all accumulated diffs
for diff in &self.pending_signal_diffs {
msgs.push(diff.clone());
}
// Then last pixel keyframe (if available)
// Pixel keyframe (if available)
if let Some(ref kf) = self.last_keyframe {
msgs.push(kf.clone());
}
@ -961,9 +1061,17 @@ mod tests {
assert_eq!(cache.pending_signal_diffs.len(), 1);
// Catchup should contain sync + diff
// Catchup should contain ONE merged frame (sync + diff consolidated)
let catchup = cache.catchup_messages();
assert_eq!(catchup.len(), 2);
assert_eq!(catchup.len(), 1);
// Verify the merged payload contains the updated count
let frame = &catchup[0];
assert!(frame.len() >= HEADER_SIZE);
let payload_len = u32::from_le_bytes([frame[12], frame[13], frame[14], frame[15]]) as usize;
let payload = &frame[HEADER_SIZE..HEADER_SIZE + payload_len];
let merged: serde_json::Value = serde_json::from_slice(payload).unwrap();
assert_eq!(merged["count"], 1); // diff applied on top of sync
}
#[test]