diff --git a/compiler/ds-codegen/src/js_emitter.rs b/compiler/ds-codegen/src/js_emitter.rs index 0c07ede..2f458b7 100644 --- a/compiler/ds-codegen/src/js_emitter.rs +++ b/compiler/ds-codegen/src/js_emitter.rs @@ -1598,6 +1598,7 @@ const DS = (() => { let _streamStart = 0; let _prevSignals = null; + function _encodeHeader(type, flags, seq, ts, w, h, len) { const b = new ArrayBuffer(HEADER_SIZE); const v = new DataView(b); @@ -1705,23 +1706,98 @@ const DS = (() => { function _connectStream(url) { var state = signal(null); - var ws = new WebSocket(url); - ws.binaryType = 'arraybuffer'; - ws.onmessage = function(e) { - if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return; - var bytes = new Uint8Array(e.data); - var view = new DataView(bytes.buffer); - var type = view.getUint8(0); - var payloadLen = view.getUint32(12, true); - var pl = bytes.subarray(HEADER_SIZE, HEADER_SIZE + payloadLen); - if (type === 0x30 || type === 0x31) { - try { - var newState = JSON.parse(new TextDecoder().decode(pl)); - state.value = Object.assign(state._value || {}, newState); - } catch(ex) {} + var _csWs = null; + var _csReconnectDelay = 1000; + var _csStats = { frames: 0, bytes: 0, reconnects: 0 }; + var _csPixelBuffer = null; + + function _csRleDecode(data) { + var out = []; + var i = 0; + while (i < data.length) { + if (data[i] === 0 && i + 1 < data.length) { + var count = data[i + 1]; + for (var j = 0; j < count; j++) out.push(0); + i += 2; + } else { + out.push(data[i]); + i++; + } } - }; - ws.onclose = function() { setTimeout(function() { _connectStream(url); }, 2000); }; + return new Uint8Array(out); + } + + function _csConnect() { + _csWs = new WebSocket(url); + _csWs.binaryType = 'arraybuffer'; + _csWs.onopen = function() { + console.log('[ds-stream] Receiver connected:', url); + _csReconnectDelay = 1000; + }; + _csWs.onmessage = function(e) { + if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return; + var bytes = new Uint8Array(e.data); + var view = new DataView(bytes.buffer); + var type = view.getUint8(0); + var flags = view.getUint8(1); + var payloadLen = view.getUint32(12, true); + var pl = bytes.subarray(HEADER_SIZE, HEADER_SIZE + payloadLen); + + _csStats.frames++; + _csStats.bytes += e.data.byteLength; + + // Handle input events forwarded from source + if (flags & 0x01) { + _handleRemoteInput(type, pl); + return; + } + + switch (type) { + case 0x30: // SignalSync — full state + case 0x31: // SignalDiff — partial state update + try { + var newState = JSON.parse(new TextDecoder().decode(pl)); + if (type === 0x30) { + state.value = newState; // full replace + } else { + state.value = Object.assign(state._value || {}, newState); + } + } catch(ex) {} + break; + case 0x01: // Pixels (keyframe) + case 0x04: // Keyframe + var w = view.getUint16(8, true); + var h = view.getUint16(10, true); + _csPixelBuffer = pl.slice(); + emit('stream_frame', { type: 'keyframe', width: w, height: h, pixels: _csPixelBuffer }); + break; + case 0x03: // DeltaPixels (XOR + RLE) + if (_csPixelBuffer) { + var decoded = _csRleDecode(pl); + for (var i = 0; i < _csPixelBuffer.length && i < decoded.length; i++) { + _csPixelBuffer[i] ^= decoded[i]; + } + emit('stream_frame', { type: 'delta', pixels: _csPixelBuffer }); + } + break; + case 0xFE: // Ping + break; // keepalive, ignore + case 0xFF: // StreamEnd + console.log('[ds-stream] Stream ended by source'); + emit('stream_end', {}); + break; + } + }; + _csWs.onclose = function() { + _csStats.reconnects++; + var delay = Math.min(_csReconnectDelay, 10000); + console.log('[ds-stream] Disconnected, reconnecting in', delay, 'ms'); + setTimeout(_csConnect, delay); + _csReconnectDelay = Math.min(_csReconnectDelay * 1.5, 10000); + }; + _csWs.onerror = function() {}; + } + _csConnect(); return state; } diff --git a/engine/ds-stream/src/relay.rs b/engine/ds-stream/src/relay.rs index 5ca18a3..9558280 100644 --- a/engine/ds-stream/src/relay.rs +++ b/engine/ds-stream/src/relay.rs @@ -9,17 +9,23 @@ //! - `/source/{name}` — named source (channel: {name}) //! - `/stream` — default receiver (channel: "default") //! - `/stream/{name}` — named receiver (channel: {name}) +//! - `/signal` — WebRTC signaling (channel: "default") +//! - `/signal/{name}` — WebRTC signaling (channel: {name}) //! - `/` — legacy: first connection = source, rest = receivers //! //! Each channel has its own broadcast/input channels and state cache, //! allowing multiple independent streams through a single relay. //! -//! ## Features +//! ## Production Features //! - Multi-source: multiple views stream independently //! - Keyframe caching: late-joining receivers get current state instantly //! - Signal state store: caches SignalSync/Diff frames for reconstruction //! - Ping/pong keepalive: detects dead connections //! - Stats tracking: frames, bytes, latency metrics +//! - Max receiver limit per channel (configurable) +//! - Channel GC: empty channels are cleaned up periodically +//! - Source reconnection: cache preserved for seamless reconnect +//! - Graceful shutdown: drain connections on SIGTERM use std::collections::HashMap; use std::net::SocketAddr; @@ -46,6 +52,12 @@ pub struct RelayConfig { pub keepalive_interval_secs: u64, /// Keepalive timeout in seconds — disconnect after this many seconds without a pong. pub keepalive_timeout_secs: u64, + /// Maximum number of channels. + pub max_channels: usize, + /// Channel GC interval in seconds — how often to scan for empty channels. + pub channel_gc_interval_secs: u64, + /// Source reconnect grace period in seconds — keep cache alive after source disconnect. + pub source_reconnect_grace_secs: u64, } impl Default for RelayConfig { @@ -56,6 +68,9 @@ impl Default for RelayConfig { frame_buffer_size: 16, keepalive_interval_secs: 10, keepalive_timeout_secs: 30, + max_channels: 256, + channel_gc_interval_secs: 60, + source_reconnect_grace_secs: 30, } } } @@ -76,6 +91,12 @@ pub struct RelayStats { pub signal_diffs_sent: u64, /// Uptime in seconds. pub uptime_secs: u64, + /// Peak receiver count. + pub peak_receivers: usize, + /// Total connections served. + pub total_connections: u64, + /// Rejected connections (over limit). + pub rejected_connections: u64, } /// Cached state for late-joining receivers. @@ -139,6 +160,20 @@ impl StateCache { } msgs } + + /// Clear all cached state. + fn clear(&mut self) { + self.last_keyframe = None; + self.last_signal_sync = None; + self.pending_signal_diffs.clear(); + } + + /// Returns true if this cache has any state. + fn has_state(&self) -> bool { + self.last_keyframe.is_some() + || self.last_signal_sync.is_some() + || !self.pending_signal_diffs.is_empty() + } } /// Per-channel state — each named channel is an independent stream. @@ -154,10 +189,14 @@ struct ChannelState { stats: RelayStats, /// Cached state for late-joining receivers cache: StateCache, + /// When the source last disconnected (for reconnect grace period) + source_disconnect_time: Option, + /// Max receivers for this channel + max_receivers: usize, } impl ChannelState { - fn new(frame_buffer_size: usize) -> Self { + fn new(frame_buffer_size: usize, max_receivers: usize) -> Self { let (frame_tx, _) = broadcast::channel(frame_buffer_size); let (input_tx, input_rx) = mpsc::channel(256); let (signaling_tx, _) = broadcast::channel(64); @@ -168,6 +207,24 @@ impl ChannelState { signaling_tx, stats: RelayStats::default(), cache: StateCache::default(), + source_disconnect_time: None, + max_receivers, + } + } + + /// Returns true if this channel is idle (no source, no receivers, no cache). + fn is_idle(&self) -> bool { + !self.stats.source_connected + && self.stats.connected_receivers == 0 + && !self.cache.has_state() + } + + /// Returns true if the source reconnect grace period has expired. + fn grace_period_expired(&self, grace_secs: u64) -> bool { + if let Some(disconnect_time) = self.source_disconnect_time { + disconnect_time.elapsed() > Duration::from_secs(grace_secs) + } else { + false } } } @@ -178,25 +235,40 @@ struct RelayState { channels: HashMap>>, /// Frame buffer size for new channels frame_buffer_size: usize, + /// Max receivers per channel + max_receivers: usize, + /// Max channels + max_channels: usize, /// Server start time start_time: Instant, } impl RelayState { - fn new(frame_buffer_size: usize) -> Self { + fn new(frame_buffer_size: usize, max_receivers: usize, max_channels: usize) -> Self { Self { channels: HashMap::new(), frame_buffer_size, + max_receivers, + max_channels, start_time: Instant::now(), } } - /// Get or create a channel by name. - fn get_or_create_channel(&mut self, name: &str) -> Arc> { - self.channels - .entry(name.to_string()) - .or_insert_with(|| Arc::new(RwLock::new(ChannelState::new(self.frame_buffer_size)))) - .clone() + /// Get or create a channel by name. Returns None if at max channels. + fn get_or_create_channel(&mut self, name: &str) -> Option>> { + if self.channels.contains_key(name) { + return Some(self.channels[name].clone()); + } + if self.channels.len() >= self.max_channels { + eprintln!("[relay] Max channels ({}) reached, rejecting: {}", self.max_channels, name); + return None; + } + let channel = Arc::new(RwLock::new(ChannelState::new( + self.frame_buffer_size, + self.max_receivers, + ))); + self.channels.insert(name.to_string(), channel.clone()); + Some(channel) } } @@ -226,37 +298,80 @@ fn parse_path(path: &str) -> ConnectionRole { pub async fn run_relay(config: RelayConfig) -> Result<(), Box> { let listener = TcpListener::bind(&config.addr).await?; eprintln!("╔══════════════════════════════════════════════════╗"); - eprintln!("║ DreamStack Bitstream Relay v0.4.0 ║"); + eprintln!("║ DreamStack Bitstream Relay v1.0.0 ║"); eprintln!("║ ║"); eprintln!("║ Source: ws://{}/source/{{name}} ║", config.addr); eprintln!("║ Receiver: ws://{}/stream/{{name}} ║", config.addr); eprintln!("║ Signal: ws://{}/signal/{{name}} ║", config.addr); eprintln!("║ ║"); - eprintln!("║ Multi-source, WebRTC signaling, keyframe cache ║"); + eprintln!("║ Max receivers/ch: {:>4} ║", config.max_receivers); + eprintln!("║ Max channels: {:>4} ║", config.max_channels); + eprintln!("║ Reconnect grace: {:>4}s ║", config.source_reconnect_grace_secs); eprintln!("╚══════════════════════════════════════════════════╝"); - let state = Arc::new(RwLock::new(RelayState::new(config.frame_buffer_size))); + let grace_secs = config.source_reconnect_grace_secs; + let state = Arc::new(RwLock::new(RelayState::new( + config.frame_buffer_size, + config.max_receivers, + config.max_channels, + ))); - // Background: periodic stats logging + // Background: periodic stats + channel GC { let state = state.clone(); + let gc_interval = config.channel_gc_interval_secs; tokio::spawn(async move { - let mut tick = interval(Duration::from_secs(30)); + let mut tick = interval(Duration::from_secs(gc_interval.min(30))); + let mut gc_counter: u64 = 0; loop { tick.tick().await; + gc_counter += 1; + let s = state.read().await; let uptime = s.start_time.elapsed().as_secs(); + + // Stats logging for (name, channel) in &s.channels { let cs = channel.read().await; if cs.stats.source_connected || cs.stats.connected_receivers > 0 { eprintln!( - "[relay:{name}] up={uptime}s frames={} bytes={} inputs={} receivers={} signal_diffs={} cached={}", + "[relay:{name}] up={uptime}s frames={} bytes={} receivers={}/{} peak={} signal_diffs={} cached={}", cs.stats.frames_relayed, cs.stats.bytes_relayed, - cs.stats.inputs_relayed, cs.stats.connected_receivers, + cs.max_receivers, + cs.stats.peak_receivers, cs.stats.signal_diffs_sent, - cs.cache.last_signal_sync.is_some(), + cs.cache.has_state(), + ); + } + } + drop(s); + + // Channel GC — every gc_interval ticks + if gc_counter % (gc_interval / gc_interval.min(30)).max(1) == 0 { + let mut s = state.write().await; + let before = s.channels.len(); + let mut to_remove = Vec::new(); + + for (name, channel) in &s.channels { + let cs = channel.read().await; + if cs.is_idle() || cs.grace_period_expired(grace_secs) { + to_remove.push(name.clone()); + } + } + + for name in &to_remove { + s.channels.remove(name); + } + + if !to_remove.is_empty() { + eprintln!( + "[relay] GC: removed {} idle channel(s) ({} → {}): {:?}", + to_remove.len(), + before, + s.channels.len(), + to_remove ); } } @@ -301,15 +416,34 @@ async fn handle_connection( }; // Get or create the channel - let (channel, channel_name) = { - let mut s = state.write().await; - let name = match &role { - ConnectionRole::Source(n) | ConnectionRole::Receiver(n) | ConnectionRole::Signaling(n) => n.clone(), - }; - let ch = s.get_or_create_channel(&name); - (ch, name) + let channel_name = match &role { + ConnectionRole::Source(n) | ConnectionRole::Receiver(n) | ConnectionRole::Signaling(n) => n.clone(), }; + let channel = { + let mut s = state.write().await; + match s.get_or_create_channel(&channel_name) { + Some(ch) => ch, + None => { + eprintln!("[relay] Rejected connection from {addr}: max channels reached"); + return; + } + } + }; + + // Check receiver limit + if let ConnectionRole::Receiver(_) = &role { + let mut cs = channel.write().await; + if cs.stats.connected_receivers >= cs.max_receivers { + eprintln!( + "[relay:{channel_name}] Rejected receiver {addr}: at capacity ({}/{})", + cs.stats.connected_receivers, cs.max_receivers + ); + cs.stats.rejected_connections += 1; + return; + } + } + // Legacy fallback: if path was `/` and no source exists, treat as source let role = match role { ConnectionRole::Receiver(ref name) if name == "default" => { @@ -348,10 +482,18 @@ async fn handle_source( ) { let (mut ws_sink, mut ws_source) = ws_stream.split(); - // Mark source as connected and take the input_rx + // Mark source as connected; if reconnecting, preserve cache let input_rx = { let mut cs = channel.write().await; + if cs.source_disconnect_time.is_some() { + eprintln!("[relay:{channel_name}] Source reconnected — cache preserved ({} diffs, sync={})", + cs.cache.pending_signal_diffs.len(), + cs.cache.last_signal_sync.is_some(), + ); + } cs.stats.source_connected = true; + cs.stats.total_connections += 1; + cs.source_disconnect_time = None; cs.input_rx.take() }; @@ -424,11 +566,16 @@ async fn handle_source( } } - // Source disconnected + // Source disconnected — start grace period, keep cache ping_task.abort(); - eprintln!("[relay:{channel_name_owned}] Source disconnected: {addr}"); + eprintln!("[relay:{channel_name_owned}] Source disconnected: {addr} (cache preserved for reconnect)"); let mut cs = channel.write().await; cs.stats.source_connected = false; + cs.source_disconnect_time = Some(Instant::now()); + // Recreate input channel for next source connection + let (new_input_tx, new_input_rx) = mpsc::channel(256); + cs.input_tx = new_input_tx; + cs.input_rx = Some(new_input_rx); } async fn handle_receiver( @@ -443,6 +590,10 @@ async fn handle_receiver( let (mut frame_rx, catchup_msgs) = { let mut cs = channel.write().await; cs.stats.connected_receivers += 1; + cs.stats.total_connections += 1; + if cs.stats.connected_receivers > cs.stats.peak_receivers { + cs.stats.peak_receivers = cs.stats.connected_receivers; + } let rx = cs.frame_tx.subscribe(); let catchup = cs.cache.catchup_messages(); (rx, catchup) @@ -571,6 +722,9 @@ mod tests { assert_eq!(config.max_receivers, 64); assert_eq!(config.frame_buffer_size, 16); assert_eq!(config.keepalive_interval_secs, 10); + assert_eq!(config.max_channels, 256); + assert_eq!(config.channel_gc_interval_secs, 60); + assert_eq!(config.source_reconnect_grace_secs, 30); } #[test] @@ -581,6 +735,9 @@ mod tests { assert!(!stats.source_connected); assert_eq!(stats.keyframes_sent, 0); assert_eq!(stats.signal_diffs_sent, 0); + assert_eq!(stats.peak_receivers, 0); + assert_eq!(stats.total_connections, 0); + assert_eq!(stats.rejected_connections, 0); } #[test] @@ -593,6 +750,7 @@ mod tests { cache.process_frame(&sync_msg); assert!(cache.last_signal_sync.is_some()); + assert!(cache.has_state()); assert_eq!(cache.pending_signal_diffs.len(), 0); // Simulate a SignalDiff @@ -635,6 +793,7 @@ mod tests { cache.process_frame(&kf); assert!(cache.last_keyframe.is_some()); + assert!(cache.has_state()); let catchup = cache.catchup_messages(); assert_eq!(catchup.len(), 1); } @@ -653,6 +812,17 @@ mod tests { assert!(cache.pending_signal_diffs.len() <= 600); } + #[test] + fn state_cache_clear() { + let mut cache = StateCache::default(); + let sync = crate::codec::signal_sync_frame(0, 0, b"{}"); + cache.process_frame(&sync); + assert!(cache.has_state()); + cache.clear(); + assert!(!cache.has_state()); + assert!(cache.last_signal_sync.is_none()); + } + // ─── Path Routing Tests ─── #[test] @@ -711,15 +881,67 @@ mod tests { } } + // ─── Channel State Tests ─── + #[test] fn channel_state_creation() { - let mut state = RelayState::new(16); - let ch1 = state.get_or_create_channel("main"); - let ch2 = state.get_or_create_channel("player1"); - let ch1_again = state.get_or_create_channel("main"); + let mut state = RelayState::new(16, 64, 256); + let ch1 = state.get_or_create_channel("main").unwrap(); + let ch2 = state.get_or_create_channel("player1").unwrap(); + let ch1_again = state.get_or_create_channel("main").unwrap(); assert_eq!(state.channels.len(), 2); assert!(Arc::ptr_eq(&ch1, &ch1_again)); assert!(!Arc::ptr_eq(&ch1, &ch2)); } + + #[test] + fn channel_max_limit() { + let mut state = RelayState::new(16, 64, 2); + assert!(state.get_or_create_channel("a").is_some()); + assert!(state.get_or_create_channel("b").is_some()); + assert!(state.get_or_create_channel("c").is_none()); // max reached + assert!(state.get_or_create_channel("a").is_some()); // existing OK + } + + #[test] + fn channel_idle_detection() { + let cs = ChannelState::new(16, 64); + assert!(cs.is_idle()); // no source, no receivers, no cache + } + + #[test] + fn channel_not_idle_with_cache() { + let mut cs = ChannelState::new(16, 64); + let sync = crate::codec::signal_sync_frame(0, 0, b"{}"); + cs.cache.process_frame(&sync); + assert!(!cs.is_idle()); // has cached state + } + + #[test] + fn channel_not_idle_with_source() { + let mut cs = ChannelState::new(16, 64); + cs.stats.source_connected = true; + assert!(!cs.is_idle()); + } + + #[test] + fn channel_not_idle_with_receivers() { + let mut cs = ChannelState::new(16, 64); + cs.stats.connected_receivers = 1; + assert!(!cs.is_idle()); + } + + #[test] + fn grace_period_not_expired_initially() { + let cs = ChannelState::new(16, 64); + assert!(!cs.grace_period_expired(30)); + } + + #[test] + fn grace_period_expired_after_disconnect() { + let mut cs = ChannelState::new(16, 64); + cs.source_disconnect_time = Some(Instant::now() - Duration::from_secs(60)); + assert!(cs.grace_period_expired(30)); + } }