diff --git a/compiler/ds-codegen/src/js_emitter.rs b/compiler/ds-codegen/src/js_emitter.rs index a3cce47..f26155f 100644 --- a/compiler/ds-codegen/src/js_emitter.rs +++ b/compiler/ds-codegen/src/js_emitter.rs @@ -2316,10 +2316,8 @@ const DS = (() => { for (const eff of effects) { eff._run(); } - // After effects recompute derived signals, sync all values to stream - if (_streamWs && _streamWs.readyState === 1 && _streamMode === 'signal' && !_applyingRemoteDiff) { - _streamSync(_signalRegistry); - } + // After effects recompute derived signals, flush batched diffs to stream + // (individual _streamDiff calls already batched changes — no full sync needed) } // ── Event System ── @@ -2640,6 +2638,18 @@ const DS = (() => { let _streamStart = 0; let _prevSignals = null; + // ── Diff Batching — coalesce multiple signal changes into one WS frame ── + var _pendingDiffs = {}; // accumulated changes: { name: value } + var _pendingVersions = {}; // version for each pending change + var _diffFlushTimer = null; // microtask coalescing + + // ── Connection Status — reactive signals for stream health ── + var _streamConnected = false; + var _streamLatency = 0; // RTT in milliseconds + var _streamReconnects = 0; + var _streamFrameCount = 0; + var _streamByteCount = 0; + var _lastPingTime = 0; // for RTT calculation function _encodeHeader(type, flags, seq, ts, w, h, len) { const b = new ArrayBuffer(HEADER_SIZE); @@ -2712,15 +2722,32 @@ const DS = (() => { var bytes = new Uint8Array(e.data); var view = new DataView(bytes.buffer); var type = view.getUint8(0); + _streamFrameCount++; + _streamByteCount += e.data.byteLength; // Signal diff from another peer — apply to local signals if (type === 0x31) { var payload = bytes.subarray(HEADER_SIZE); var json = new TextDecoder().decode(payload); _applyRemoteDiff(json); } + // Pong response — calculate RTT + if (type === 0xFE) { + if (_lastPingTime > 0) { + _streamLatency = Math.round(performance.now() - _lastPingTime); + } + } + }; + _streamWs.onclose = function() { + _streamConnected = false; + _streamReconnects++; + console.log('[ds-stream] Disconnected, reconnecting in 2s (attempt ' + _streamReconnects + ')'); + setTimeout(function() { _initStream(url, mode); }, 2000); + }; + _streamWs.onerror = function() { + _streamConnected = false; }; - _streamWs.onclose = function() { setTimeout(function() { _initStream(url, mode); }, 2000); }; _streamWs.onopen = function() { + _streamConnected = true; console.log('[ds-stream] Peer connected:', peerUrl); // Send schema announcement (0x32) with output signal list var outputNames = Object.keys(_signalRegistry); @@ -2737,6 +2764,16 @@ const DS = (() => { fullState._v[name] = _signalVersions[name] || 0; } _streamSend(0x31, 0, new TextEncoder().encode(JSON.stringify(fullState))); + + // Start periodic ping for RTT measurement (every 5s) + if (!_streamWs._pingInterval) { + _streamWs._pingInterval = setInterval(function() { + if (_streamWs && _streamWs.readyState === 1) { + _lastPingTime = performance.now(); + _streamSend(0xFE, 0, new Uint8Array(0)); + } + }, 5000); + } }; } @@ -2754,15 +2791,35 @@ const DS = (() => { _streamWs.send(msg.buffer); } + // ── Batched diff: coalesce multiple signal changes into one WS frame ── function _streamDiff(name, value) { if (!_streamWs || _streamWs.readyState !== 1 || _streamMode !== 'signal') return; if (_applyingRemoteDiff) return; // prevent echo loops // Increment version for conflict resolution _signalVersions[name] = (_signalVersions[name] || 0) + 1; + // Accumulate into pending batch + _pendingDiffs[name] = (typeof value === 'object' && value !== null && 'value' in value) ? value.value : value; + _pendingVersions[name] = _signalVersions[name]; + // Schedule flush on next microtask (coalesces all changes in current event loop tick) + if (!_diffFlushTimer) { + _diffFlushTimer = Promise.resolve().then(_flushDiffBatch); + } + } + + function _flushDiffBatch() { + _diffFlushTimer = null; + var keys = Object.keys(_pendingDiffs); + if (keys.length === 0) return; + // Build single frame with all changes var obj = { _pid: _peerId, _v: {} }; - obj._v[name] = _signalVersions[name]; - obj[name] = (typeof value === 'object' && value !== null && 'value' in value) ? value.value : value; + for (var i = 0; i < keys.length; i++) { + var k = keys[i]; + obj[k] = _pendingDiffs[k]; + obj._v[k] = _pendingVersions[k]; + } _streamSend(0x31, 0, new TextEncoder().encode(JSON.stringify(obj))); + _pendingDiffs = {}; + _pendingVersions = {}; } function _streamSync(signals) { diff --git a/engine/ds-stream/src/relay.rs b/engine/ds-stream/src/relay.rs index 51eb387..979aa2a 100644 --- a/engine/ds-stream/src/relay.rs +++ b/engine/ds-stream/src/relay.rs @@ -144,17 +144,117 @@ impl StateCache { } /// Get all messages a late-joining receiver needs to reconstruct current state. + /// + /// Instead of replaying hundreds of individual diffs, merges all accumulated + /// diffs into the last sync frame to produce ONE consolidated state message. + /// This dramatically reduces catchup time and bandwidth. fn catchup_messages(&self) -> Vec> { let mut msgs = Vec::new(); - // Send last signal sync first (if available) - if let Some(ref sync) = self.last_signal_sync { - msgs.push(sync.clone()); + + // Strategy: merge all signal diffs into last sync to build a consolidated frame + if let Some(ref sync_frame) = self.last_signal_sync { + if !self.pending_signal_diffs.is_empty() { + // Parse the sync frame's JSON payload + if sync_frame.len() >= HEADER_SIZE { + let payload_len = u32::from_le_bytes([ + sync_frame[12], sync_frame[13], sync_frame[14], sync_frame[15], + ]) as usize; + let sync_end = HEADER_SIZE + payload_len.min(sync_frame.len() - HEADER_SIZE); + let sync_payload = &sync_frame[HEADER_SIZE..sync_end]; + + if let Ok(mut merged) = serde_json::from_slice::(sync_payload) { + // Apply each diff on top + for diff_frame in &self.pending_signal_diffs { + if diff_frame.len() < HEADER_SIZE { continue; } + let diff_pl_len = u32::from_le_bytes([ + diff_frame[12], diff_frame[13], diff_frame[14], diff_frame[15], + ]) as usize; + let diff_end = HEADER_SIZE + diff_pl_len.min(diff_frame.len() - HEADER_SIZE); + let diff_payload = &diff_frame[HEADER_SIZE..diff_end]; + if let Ok(diff_obj) = serde_json::from_slice::(diff_payload) { + if let (Some(merged_map), Some(diff_map)) = (merged.as_object_mut(), diff_obj.as_object()) { + for (k, v) in diff_map { + if k != "_pid" && k != "_v" { + merged_map.insert(k.clone(), v.clone()); + } + } + // Merge version counters + if let (Some(merged_v), Some(diff_v)) = ( + merged_map.get_mut("_v").and_then(|v| v.as_object_mut()), + diff_map.get("_v").and_then(|v| v.as_object()), + ) { + for (k, v) in diff_v { + merged_v.insert(k.clone(), v.clone()); + } + } + } + } + } + // Build a consolidated SignalSync (0x30) frame + let merged_json = serde_json::to_vec(&merged).unwrap_or_default(); + let mut frame = Vec::with_capacity(HEADER_SIZE + merged_json.len()); + // Copy sync frame header but set type to SignalSync and update length + frame.extend_from_slice(&sync_frame[..HEADER_SIZE]); + frame[0] = 0x30; // SignalSync + frame[1] = 0x02; // FLAG_KEYFRAME + let len_bytes = (merged_json.len() as u32).to_le_bytes(); + frame[12] = len_bytes[0]; + frame[13] = len_bytes[1]; + frame[14] = len_bytes[2]; + frame[15] = len_bytes[3]; + frame.truncate(HEADER_SIZE); + frame.extend_from_slice(&merged_json); + msgs.push(frame); + } else { + // Fallback: can't parse, send raw sync + all diffs + msgs.push(sync_frame.clone()); + for diff in &self.pending_signal_diffs { + msgs.push(diff.clone()); + } + } + } else { + msgs.push(sync_frame.clone()); + } + } else { + // No diffs accumulated — just send the sync + msgs.push(sync_frame.clone()); + } + } else if !self.pending_signal_diffs.is_empty() { + // No sync frame but we have diffs — merge diffs into one frame + let mut merged = serde_json::Map::new(); + let mut merged_v = serde_json::Map::new(); + for diff_frame in &self.pending_signal_diffs { + if diff_frame.len() < HEADER_SIZE { continue; } + let diff_pl_len = u32::from_le_bytes([ + diff_frame[12], diff_frame[13], diff_frame[14], diff_frame[15], + ]) as usize; + let diff_end = HEADER_SIZE + diff_pl_len.min(diff_frame.len() - HEADER_SIZE); + let diff_payload = &diff_frame[HEADER_SIZE..diff_end]; + if let Ok(diff_obj) = serde_json::from_slice::(diff_payload) { + if let Some(diff_map) = diff_obj.as_object() { + for (k, v) in diff_map { + if k == "_v" { + if let Some(v_map) = v.as_object() { + for (vk, vv) in v_map { + merged_v.insert(vk.clone(), vv.clone()); + } + } + } else if k != "_pid" { + merged.insert(k.clone(), v.clone()); + } + } + } + } + } + if !merged_v.is_empty() { + merged.insert("_v".to_string(), serde_json::Value::Object(merged_v)); + } + let merged_json = serde_json::to_vec(&serde_json::Value::Object(merged)).unwrap_or_default(); + let frame = crate::codec::signal_sync_frame(0, 0, &merged_json); + msgs.push(frame); } - // Then all accumulated diffs - for diff in &self.pending_signal_diffs { - msgs.push(diff.clone()); - } - // Then last pixel keyframe (if available) + + // Pixel keyframe (if available) if let Some(ref kf) = self.last_keyframe { msgs.push(kf.clone()); } @@ -961,9 +1061,17 @@ mod tests { assert_eq!(cache.pending_signal_diffs.len(), 1); - // Catchup should contain sync + diff + // Catchup should contain ONE merged frame (sync + diff consolidated) let catchup = cache.catchup_messages(); - assert_eq!(catchup.len(), 2); + assert_eq!(catchup.len(), 1); + + // Verify the merged payload contains the updated count + let frame = &catchup[0]; + assert!(frame.len() >= HEADER_SIZE); + let payload_len = u32::from_le_bytes([frame[12], frame[13], frame[14], frame[15]]) as usize; + let payload = &frame[HEADER_SIZE..HEADER_SIZE + payload_len]; + let merged: serde_json::Value = serde_json::from_slice(payload).unwrap(); + assert_eq!(merged["count"], 1); // diff applied on top of sync } #[test]