diff --git a/compiler/ds-codegen/src/js_emitter.rs b/compiler/ds-codegen/src/js_emitter.rs index 3bdae7f..63c779e 100644 --- a/compiler/ds-codegen/src/js_emitter.rs +++ b/compiler/ds-codegen/src/js_emitter.rs @@ -2792,13 +2792,22 @@ const DS = (() => { } // ── Batched diff: coalesce multiple signal changes into one WS frame ── + var _lastSentValues = {}; // deduplication: track last-sent value per signal + var _diffBatchCount = 0; // count batches for periodic auto-sync + function _streamDiff(name, value) { if (!_streamWs || _streamWs.readyState !== 1 || _streamMode !== 'signal') return; if (_applyingRemoteDiff) return; // prevent echo loops + var val = (typeof value === 'object' && value !== null && 'value' in value) ? value.value : value; + // Deduplication: skip if value hasn't changed + var lastStr = JSON.stringify(_lastSentValues[name]); + var newStr = JSON.stringify(val); + if (lastStr === newStr) return; + _lastSentValues[name] = val; // Increment version for conflict resolution _signalVersions[name] = (_signalVersions[name] || 0) + 1; // Accumulate into pending batch - _pendingDiffs[name] = (typeof value === 'object' && value !== null && 'value' in value) ? value.value : value; + _pendingDiffs[name] = val; _pendingVersions[name] = _signalVersions[name]; // Schedule flush on next microtask (coalesces all changes in current event loop tick) if (!_diffFlushTimer) { @@ -2810,6 +2819,15 @@ const DS = (() => { _diffFlushTimer = null; var keys = Object.keys(_pendingDiffs); if (keys.length === 0) return; + _diffBatchCount++; + // Every 50 batches, send a full SignalSync instead of a diff + // This lets the relay compact its cache (merge diffs into one sync) + if (_diffBatchCount % 50 === 0) { + _streamSync(_signalRegistry); + _pendingDiffs = {}; + _pendingVersions = {}; + return; + } // Build single frame with all changes var obj = { _pid: _peerId, _v: {} }; for (var i = 0; i < keys.length; i++) { diff --git a/engine/ds-stream/src/relay.rs b/engine/ds-stream/src/relay.rs index 8f63dab..947a1bc 100644 --- a/engine/ds-stream/src/relay.rs +++ b/engine/ds-stream/src/relay.rs @@ -492,11 +492,147 @@ pub async fn run_relay(config: RelayConfig) -> Result<(), Box n, + Err(_) => return, + }; + let request_line = String::from_utf8_lossy(&peek_buf[..n]); + + // Extract path from "GET /meta/name HTTP/1.1" + if let Some(path) = request_line.lines().next().and_then(|line| { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() >= 2 { Some(parts[1]) } else { None } + }) { + if path.starts_with("/meta") { + handle_meta_http(stream, addr, &state, path).await; + return; + } + } + + handle_connection(stream, addr, state, keepalive_interval, keepalive_timeout).await; + }); } Ok(()) } +/// Handle an HTTP /meta/{channel} request — return channel stats as JSON. +/// +/// This is NOT a WebSocket connection. We read the full HTTP request, +/// then respond with an HTTP 200 + JSON body. +async fn handle_meta_http( + mut stream: TcpStream, + addr: SocketAddr, + state: &Arc>, + path: &str, +) { + use tokio::io::AsyncWriteExt; + use tokio::io::AsyncReadExt; + + // Drain the HTTP request from the socket + let mut buf = vec![0u8; 4096]; + let _ = stream.read(&mut buf).await; + + // Parse channel name from path + let channel_name = path.trim_start_matches("/meta/").trim_matches('/'); + let channel_name = if channel_name.is_empty() { "__all__" } else { channel_name }; + + let s = state.read().await; + let uptime = s.start_time.elapsed().as_secs(); + + let body = if channel_name == "__all__" { + // Return stats for ALL channels + let mut channels = serde_json::Map::new(); + for (name, channel) in &s.channels { + let cs = channel.read().await; + channels.insert(name.clone(), serde_json::json!({ + "source_connected": cs.stats.source_connected, + "receivers": cs.stats.connected_receivers, + "peak_receivers": cs.stats.peak_receivers, + "frames_relayed": cs.stats.frames_relayed, + "bytes_relayed": cs.stats.bytes_relayed, + "signal_diffs": cs.stats.signal_diffs_sent, + "keyframes": cs.stats.keyframes_sent, + "total_connections": cs.stats.total_connections, + "rejected_connections": cs.stats.rejected_connections, + "cached": cs.cache.has_state(), + "pending_diffs": cs.cache.pending_signal_diffs.len(), + })); + } + serde_json::json!({ + "uptime_secs": uptime, + "channel_count": s.channels.len(), + "channels": serde_json::Value::Object(channels), + }).to_string() + } else if let Some(channel) = s.channels.get(channel_name) { + // Return stats for a specific channel + let cs = channel.read().await; + + // Extract current signal state if available + let mut current_state = serde_json::Value::Null; + if let Some(ref sync_frame) = cs.cache.last_signal_sync { + if sync_frame.len() >= HEADER_SIZE { + let pl_len = u32::from_le_bytes([ + sync_frame[12], sync_frame[13], sync_frame[14], sync_frame[15], + ]) as usize; + let end = HEADER_SIZE + pl_len.min(sync_frame.len() - HEADER_SIZE); + if let Ok(val) = serde_json::from_slice::(&sync_frame[HEADER_SIZE..end]) { + current_state = val; + } + } + } + + serde_json::json!({ + "channel": channel_name, + "uptime_secs": uptime, + "source_connected": cs.stats.source_connected, + "receivers": cs.stats.connected_receivers, + "max_receivers": cs.max_receivers, + "peak_receivers": cs.stats.peak_receivers, + "frames_relayed": cs.stats.frames_relayed, + "bytes_relayed": cs.stats.bytes_relayed, + "signal_diffs": cs.stats.signal_diffs_sent, + "keyframes": cs.stats.keyframes_sent, + "total_connections": cs.stats.total_connections, + "rejected_connections": cs.stats.rejected_connections, + "cached": cs.cache.has_state(), + "pending_diffs": cs.cache.pending_signal_diffs.len(), + "schema": cs.schema.as_ref().and_then(|s| { + if s.len() >= HEADER_SIZE { + let pl_len = u32::from_le_bytes([s[12], s[13], s[14], s[15]]) as usize; + let end = HEADER_SIZE + pl_len.min(s.len() - HEADER_SIZE); + serde_json::from_slice::(&s[HEADER_SIZE..end]).ok() + } else { None } + }), + "current_state": current_state, + }).to_string() + } else { + serde_json::json!({ + "error": "channel not found", + "channel": channel_name, + }).to_string() + }; + + // Write HTTP response + let response = format!( + "HTTP/1.1 200 OK\r\n\ + Content-Type: application/json\r\n\ + Access-Control-Allow-Origin: *\r\n\ + Content-Length: {}\r\n\ + Connection: close\r\n\ + \r\n\ + {}", + body.len(), + body + ); + let _ = stream.write_all(response.as_bytes()).await; + eprintln!("[relay:meta] {addr} → {path}"); +} + async fn handle_connection( stream: TcpStream, addr: SocketAddr,