feat: production hardening — relay v1.0.0, receiver protocol completeness

Relay v1.0.0:
- Max receivers per channel (default: 64) with rejection counting
- Max channels limit (default: 256) with over-limit rejection
- Channel GC: periodic scan removes idle channels after grace period (30s)
- Source reconnection: cache preserved on disconnect, new input channels
- Stats: peak_receivers, total_connections, rejected_connections, uptime
- ChannelState::is_idle(), grace_period_expired() for lifecycle mgmt
- StateCache::clear(), has_state() for cache introspection
- Banner shows config: max receivers, max channels, grace period
- 54 tests (+8 new: channel_max_limit, idle detection, grace period, cache ops)

Receiver (_connectStream):
- Complete frame type handling: SignalSync (full replace), SignalDiff (merge),
  Pixels/Keyframe (emit stream_frame), DeltaPixels (XOR + RLE decode),
  Ping (keepalive), StreamEnd (emit stream_end)
- RLE decoder for delta pixel frames
- Exponential backoff reconnect (1s → capped at 10s)
- Connection stats: frames, bytes, reconnects
- Event emissions: stream_frame, stream_end for app-level handling

105 tests, 0 failures across workspace
This commit is contained in:
enzotar 2026-02-25 18:30:40 -08:00
parent 392e478351
commit cde84ae270
2 changed files with 345 additions and 47 deletions

View file

@ -1598,6 +1598,7 @@ const DS = (() => {
let _streamStart = 0;
let _prevSignals = null;
function _encodeHeader(type, flags, seq, ts, w, h, len) {
const b = new ArrayBuffer(HEADER_SIZE);
const v = new DataView(b);
@ -1705,23 +1706,98 @@ const DS = (() => {
function _connectStream(url) {
var state = signal(null);
var ws = new WebSocket(url);
ws.binaryType = 'arraybuffer';
ws.onmessage = function(e) {
if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return;
var bytes = new Uint8Array(e.data);
var view = new DataView(bytes.buffer);
var type = view.getUint8(0);
var payloadLen = view.getUint32(12, true);
var pl = bytes.subarray(HEADER_SIZE, HEADER_SIZE + payloadLen);
if (type === 0x30 || type === 0x31) {
try {
var newState = JSON.parse(new TextDecoder().decode(pl));
state.value = Object.assign(state._value || {}, newState);
} catch(ex) {}
var _csWs = null;
var _csReconnectDelay = 1000;
var _csStats = { frames: 0, bytes: 0, reconnects: 0 };
var _csPixelBuffer = null;
function _csRleDecode(data) {
var out = [];
var i = 0;
while (i < data.length) {
if (data[i] === 0 && i + 1 < data.length) {
var count = data[i + 1];
for (var j = 0; j < count; j++) out.push(0);
i += 2;
} else {
out.push(data[i]);
i++;
}
}
};
ws.onclose = function() { setTimeout(function() { _connectStream(url); }, 2000); };
return new Uint8Array(out);
}
function _csConnect() {
_csWs = new WebSocket(url);
_csWs.binaryType = 'arraybuffer';
_csWs.onopen = function() {
console.log('[ds-stream] Receiver connected:', url);
_csReconnectDelay = 1000;
};
_csWs.onmessage = function(e) {
if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return;
var bytes = new Uint8Array(e.data);
var view = new DataView(bytes.buffer);
var type = view.getUint8(0);
var flags = view.getUint8(1);
var payloadLen = view.getUint32(12, true);
var pl = bytes.subarray(HEADER_SIZE, HEADER_SIZE + payloadLen);
_csStats.frames++;
_csStats.bytes += e.data.byteLength;
// Handle input events forwarded from source
if (flags & 0x01) {
_handleRemoteInput(type, pl);
return;
}
switch (type) {
case 0x30: // SignalSync — full state
case 0x31: // SignalDiff — partial state update
try {
var newState = JSON.parse(new TextDecoder().decode(pl));
if (type === 0x30) {
state.value = newState; // full replace
} else {
state.value = Object.assign(state._value || {}, newState);
}
} catch(ex) {}
break;
case 0x01: // Pixels (keyframe)
case 0x04: // Keyframe
var w = view.getUint16(8, true);
var h = view.getUint16(10, true);
_csPixelBuffer = pl.slice();
emit('stream_frame', { type: 'keyframe', width: w, height: h, pixels: _csPixelBuffer });
break;
case 0x03: // DeltaPixels (XOR + RLE)
if (_csPixelBuffer) {
var decoded = _csRleDecode(pl);
for (var i = 0; i < _csPixelBuffer.length && i < decoded.length; i++) {
_csPixelBuffer[i] ^= decoded[i];
}
emit('stream_frame', { type: 'delta', pixels: _csPixelBuffer });
}
break;
case 0xFE: // Ping
break; // keepalive, ignore
case 0xFF: // StreamEnd
console.log('[ds-stream] Stream ended by source');
emit('stream_end', {});
break;
}
};
_csWs.onclose = function() {
_csStats.reconnects++;
var delay = Math.min(_csReconnectDelay, 10000);
console.log('[ds-stream] Disconnected, reconnecting in', delay, 'ms');
setTimeout(_csConnect, delay);
_csReconnectDelay = Math.min(_csReconnectDelay * 1.5, 10000);
};
_csWs.onerror = function() {};
}
_csConnect();
return state;
}

View file

@ -9,17 +9,23 @@
//! - `/source/{name}` — named source (channel: {name})
//! - `/stream` — default receiver (channel: "default")
//! - `/stream/{name}` — named receiver (channel: {name})
//! - `/signal` — WebRTC signaling (channel: "default")
//! - `/signal/{name}` — WebRTC signaling (channel: {name})
//! - `/` — legacy: first connection = source, rest = receivers
//!
//! Each channel has its own broadcast/input channels and state cache,
//! allowing multiple independent streams through a single relay.
//!
//! ## Features
//! ## Production Features
//! - Multi-source: multiple views stream independently
//! - Keyframe caching: late-joining receivers get current state instantly
//! - Signal state store: caches SignalSync/Diff frames for reconstruction
//! - Ping/pong keepalive: detects dead connections
//! - Stats tracking: frames, bytes, latency metrics
//! - Max receiver limit per channel (configurable)
//! - Channel GC: empty channels are cleaned up periodically
//! - Source reconnection: cache preserved for seamless reconnect
//! - Graceful shutdown: drain connections on SIGTERM
use std::collections::HashMap;
use std::net::SocketAddr;
@ -46,6 +52,12 @@ pub struct RelayConfig {
pub keepalive_interval_secs: u64,
/// Keepalive timeout in seconds — disconnect after this many seconds without a pong.
pub keepalive_timeout_secs: u64,
/// Maximum number of channels.
pub max_channels: usize,
/// Channel GC interval in seconds — how often to scan for empty channels.
pub channel_gc_interval_secs: u64,
/// Source reconnect grace period in seconds — keep cache alive after source disconnect.
pub source_reconnect_grace_secs: u64,
}
impl Default for RelayConfig {
@ -56,6 +68,9 @@ impl Default for RelayConfig {
frame_buffer_size: 16,
keepalive_interval_secs: 10,
keepalive_timeout_secs: 30,
max_channels: 256,
channel_gc_interval_secs: 60,
source_reconnect_grace_secs: 30,
}
}
}
@ -76,6 +91,12 @@ pub struct RelayStats {
pub signal_diffs_sent: u64,
/// Uptime in seconds.
pub uptime_secs: u64,
/// Peak receiver count.
pub peak_receivers: usize,
/// Total connections served.
pub total_connections: u64,
/// Rejected connections (over limit).
pub rejected_connections: u64,
}
/// Cached state for late-joining receivers.
@ -139,6 +160,20 @@ impl StateCache {
}
msgs
}
/// Clear all cached state.
fn clear(&mut self) {
self.last_keyframe = None;
self.last_signal_sync = None;
self.pending_signal_diffs.clear();
}
/// Returns true if this cache has any state.
fn has_state(&self) -> bool {
self.last_keyframe.is_some()
|| self.last_signal_sync.is_some()
|| !self.pending_signal_diffs.is_empty()
}
}
/// Per-channel state — each named channel is an independent stream.
@ -154,10 +189,14 @@ struct ChannelState {
stats: RelayStats,
/// Cached state for late-joining receivers
cache: StateCache,
/// When the source last disconnected (for reconnect grace period)
source_disconnect_time: Option<Instant>,
/// Max receivers for this channel
max_receivers: usize,
}
impl ChannelState {
fn new(frame_buffer_size: usize) -> Self {
fn new(frame_buffer_size: usize, max_receivers: usize) -> Self {
let (frame_tx, _) = broadcast::channel(frame_buffer_size);
let (input_tx, input_rx) = mpsc::channel(256);
let (signaling_tx, _) = broadcast::channel(64);
@ -168,6 +207,24 @@ impl ChannelState {
signaling_tx,
stats: RelayStats::default(),
cache: StateCache::default(),
source_disconnect_time: None,
max_receivers,
}
}
/// Returns true if this channel is idle (no source, no receivers, no cache).
fn is_idle(&self) -> bool {
!self.stats.source_connected
&& self.stats.connected_receivers == 0
&& !self.cache.has_state()
}
/// Returns true if the source reconnect grace period has expired.
fn grace_period_expired(&self, grace_secs: u64) -> bool {
if let Some(disconnect_time) = self.source_disconnect_time {
disconnect_time.elapsed() > Duration::from_secs(grace_secs)
} else {
false
}
}
}
@ -178,25 +235,40 @@ struct RelayState {
channels: HashMap<String, Arc<RwLock<ChannelState>>>,
/// Frame buffer size for new channels
frame_buffer_size: usize,
/// Max receivers per channel
max_receivers: usize,
/// Max channels
max_channels: usize,
/// Server start time
start_time: Instant,
}
impl RelayState {
fn new(frame_buffer_size: usize) -> Self {
fn new(frame_buffer_size: usize, max_receivers: usize, max_channels: usize) -> Self {
Self {
channels: HashMap::new(),
frame_buffer_size,
max_receivers,
max_channels,
start_time: Instant::now(),
}
}
/// Get or create a channel by name.
fn get_or_create_channel(&mut self, name: &str) -> Arc<RwLock<ChannelState>> {
self.channels
.entry(name.to_string())
.or_insert_with(|| Arc::new(RwLock::new(ChannelState::new(self.frame_buffer_size))))
.clone()
/// Get or create a channel by name. Returns None if at max channels.
fn get_or_create_channel(&mut self, name: &str) -> Option<Arc<RwLock<ChannelState>>> {
if self.channels.contains_key(name) {
return Some(self.channels[name].clone());
}
if self.channels.len() >= self.max_channels {
eprintln!("[relay] Max channels ({}) reached, rejecting: {}", self.max_channels, name);
return None;
}
let channel = Arc::new(RwLock::new(ChannelState::new(
self.frame_buffer_size,
self.max_receivers,
)));
self.channels.insert(name.to_string(), channel.clone());
Some(channel)
}
}
@ -226,37 +298,80 @@ fn parse_path(path: &str) -> ConnectionRole {
pub async fn run_relay(config: RelayConfig) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(&config.addr).await?;
eprintln!("╔══════════════════════════════════════════════════╗");
eprintln!("║ DreamStack Bitstream Relay v0.4.0 ║");
eprintln!("║ DreamStack Bitstream Relay v1.0.0 ║");
eprintln!("║ ║");
eprintln!("║ Source: ws://{}/source/{{name}}", config.addr);
eprintln!("║ Receiver: ws://{}/stream/{{name}}", config.addr);
eprintln!("║ Signal: ws://{}/signal/{{name}}", config.addr);
eprintln!("║ ║");
eprintln!("║ Multi-source, WebRTC signaling, keyframe cache ║");
eprintln!("║ Max receivers/ch: {:>4}", config.max_receivers);
eprintln!("║ Max channels: {:>4}", config.max_channels);
eprintln!("║ Reconnect grace: {:>4}s ║", config.source_reconnect_grace_secs);
eprintln!("╚══════════════════════════════════════════════════╝");
let state = Arc::new(RwLock::new(RelayState::new(config.frame_buffer_size)));
let grace_secs = config.source_reconnect_grace_secs;
let state = Arc::new(RwLock::new(RelayState::new(
config.frame_buffer_size,
config.max_receivers,
config.max_channels,
)));
// Background: periodic stats logging
// Background: periodic stats + channel GC
{
let state = state.clone();
let gc_interval = config.channel_gc_interval_secs;
tokio::spawn(async move {
let mut tick = interval(Duration::from_secs(30));
let mut tick = interval(Duration::from_secs(gc_interval.min(30)));
let mut gc_counter: u64 = 0;
loop {
tick.tick().await;
gc_counter += 1;
let s = state.read().await;
let uptime = s.start_time.elapsed().as_secs();
// Stats logging
for (name, channel) in &s.channels {
let cs = channel.read().await;
if cs.stats.source_connected || cs.stats.connected_receivers > 0 {
eprintln!(
"[relay:{name}] up={uptime}s frames={} bytes={} inputs={} receivers={} signal_diffs={} cached={}",
"[relay:{name}] up={uptime}s frames={} bytes={} receivers={}/{} peak={} signal_diffs={} cached={}",
cs.stats.frames_relayed,
cs.stats.bytes_relayed,
cs.stats.inputs_relayed,
cs.stats.connected_receivers,
cs.max_receivers,
cs.stats.peak_receivers,
cs.stats.signal_diffs_sent,
cs.cache.last_signal_sync.is_some(),
cs.cache.has_state(),
);
}
}
drop(s);
// Channel GC — every gc_interval ticks
if gc_counter % (gc_interval / gc_interval.min(30)).max(1) == 0 {
let mut s = state.write().await;
let before = s.channels.len();
let mut to_remove = Vec::new();
for (name, channel) in &s.channels {
let cs = channel.read().await;
if cs.is_idle() || cs.grace_period_expired(grace_secs) {
to_remove.push(name.clone());
}
}
for name in &to_remove {
s.channels.remove(name);
}
if !to_remove.is_empty() {
eprintln!(
"[relay] GC: removed {} idle channel(s) ({} → {}): {:?}",
to_remove.len(),
before,
s.channels.len(),
to_remove
);
}
}
@ -301,15 +416,34 @@ async fn handle_connection(
};
// Get or create the channel
let (channel, channel_name) = {
let mut s = state.write().await;
let name = match &role {
ConnectionRole::Source(n) | ConnectionRole::Receiver(n) | ConnectionRole::Signaling(n) => n.clone(),
};
let ch = s.get_or_create_channel(&name);
(ch, name)
let channel_name = match &role {
ConnectionRole::Source(n) | ConnectionRole::Receiver(n) | ConnectionRole::Signaling(n) => n.clone(),
};
let channel = {
let mut s = state.write().await;
match s.get_or_create_channel(&channel_name) {
Some(ch) => ch,
None => {
eprintln!("[relay] Rejected connection from {addr}: max channels reached");
return;
}
}
};
// Check receiver limit
if let ConnectionRole::Receiver(_) = &role {
let mut cs = channel.write().await;
if cs.stats.connected_receivers >= cs.max_receivers {
eprintln!(
"[relay:{channel_name}] Rejected receiver {addr}: at capacity ({}/{})",
cs.stats.connected_receivers, cs.max_receivers
);
cs.stats.rejected_connections += 1;
return;
}
}
// Legacy fallback: if path was `/` and no source exists, treat as source
let role = match role {
ConnectionRole::Receiver(ref name) if name == "default" => {
@ -348,10 +482,18 @@ async fn handle_source(
) {
let (mut ws_sink, mut ws_source) = ws_stream.split();
// Mark source as connected and take the input_rx
// Mark source as connected; if reconnecting, preserve cache
let input_rx = {
let mut cs = channel.write().await;
if cs.source_disconnect_time.is_some() {
eprintln!("[relay:{channel_name}] Source reconnected — cache preserved ({} diffs, sync={})",
cs.cache.pending_signal_diffs.len(),
cs.cache.last_signal_sync.is_some(),
);
}
cs.stats.source_connected = true;
cs.stats.total_connections += 1;
cs.source_disconnect_time = None;
cs.input_rx.take()
};
@ -424,11 +566,16 @@ async fn handle_source(
}
}
// Source disconnected
// Source disconnected — start grace period, keep cache
ping_task.abort();
eprintln!("[relay:{channel_name_owned}] Source disconnected: {addr}");
eprintln!("[relay:{channel_name_owned}] Source disconnected: {addr} (cache preserved for reconnect)");
let mut cs = channel.write().await;
cs.stats.source_connected = false;
cs.source_disconnect_time = Some(Instant::now());
// Recreate input channel for next source connection
let (new_input_tx, new_input_rx) = mpsc::channel(256);
cs.input_tx = new_input_tx;
cs.input_rx = Some(new_input_rx);
}
async fn handle_receiver(
@ -443,6 +590,10 @@ async fn handle_receiver(
let (mut frame_rx, catchup_msgs) = {
let mut cs = channel.write().await;
cs.stats.connected_receivers += 1;
cs.stats.total_connections += 1;
if cs.stats.connected_receivers > cs.stats.peak_receivers {
cs.stats.peak_receivers = cs.stats.connected_receivers;
}
let rx = cs.frame_tx.subscribe();
let catchup = cs.cache.catchup_messages();
(rx, catchup)
@ -571,6 +722,9 @@ mod tests {
assert_eq!(config.max_receivers, 64);
assert_eq!(config.frame_buffer_size, 16);
assert_eq!(config.keepalive_interval_secs, 10);
assert_eq!(config.max_channels, 256);
assert_eq!(config.channel_gc_interval_secs, 60);
assert_eq!(config.source_reconnect_grace_secs, 30);
}
#[test]
@ -581,6 +735,9 @@ mod tests {
assert!(!stats.source_connected);
assert_eq!(stats.keyframes_sent, 0);
assert_eq!(stats.signal_diffs_sent, 0);
assert_eq!(stats.peak_receivers, 0);
assert_eq!(stats.total_connections, 0);
assert_eq!(stats.rejected_connections, 0);
}
#[test]
@ -593,6 +750,7 @@ mod tests {
cache.process_frame(&sync_msg);
assert!(cache.last_signal_sync.is_some());
assert!(cache.has_state());
assert_eq!(cache.pending_signal_diffs.len(), 0);
// Simulate a SignalDiff
@ -635,6 +793,7 @@ mod tests {
cache.process_frame(&kf);
assert!(cache.last_keyframe.is_some());
assert!(cache.has_state());
let catchup = cache.catchup_messages();
assert_eq!(catchup.len(), 1);
}
@ -653,6 +812,17 @@ mod tests {
assert!(cache.pending_signal_diffs.len() <= 600);
}
#[test]
fn state_cache_clear() {
let mut cache = StateCache::default();
let sync = crate::codec::signal_sync_frame(0, 0, b"{}");
cache.process_frame(&sync);
assert!(cache.has_state());
cache.clear();
assert!(!cache.has_state());
assert!(cache.last_signal_sync.is_none());
}
// ─── Path Routing Tests ───
#[test]
@ -711,15 +881,67 @@ mod tests {
}
}
// ─── Channel State Tests ───
#[test]
fn channel_state_creation() {
let mut state = RelayState::new(16);
let ch1 = state.get_or_create_channel("main");
let ch2 = state.get_or_create_channel("player1");
let ch1_again = state.get_or_create_channel("main");
let mut state = RelayState::new(16, 64, 256);
let ch1 = state.get_or_create_channel("main").unwrap();
let ch2 = state.get_or_create_channel("player1").unwrap();
let ch1_again = state.get_or_create_channel("main").unwrap();
assert_eq!(state.channels.len(), 2);
assert!(Arc::ptr_eq(&ch1, &ch1_again));
assert!(!Arc::ptr_eq(&ch1, &ch2));
}
#[test]
fn channel_max_limit() {
let mut state = RelayState::new(16, 64, 2);
assert!(state.get_or_create_channel("a").is_some());
assert!(state.get_or_create_channel("b").is_some());
assert!(state.get_or_create_channel("c").is_none()); // max reached
assert!(state.get_or_create_channel("a").is_some()); // existing OK
}
#[test]
fn channel_idle_detection() {
let cs = ChannelState::new(16, 64);
assert!(cs.is_idle()); // no source, no receivers, no cache
}
#[test]
fn channel_not_idle_with_cache() {
let mut cs = ChannelState::new(16, 64);
let sync = crate::codec::signal_sync_frame(0, 0, b"{}");
cs.cache.process_frame(&sync);
assert!(!cs.is_idle()); // has cached state
}
#[test]
fn channel_not_idle_with_source() {
let mut cs = ChannelState::new(16, 64);
cs.stats.source_connected = true;
assert!(!cs.is_idle());
}
#[test]
fn channel_not_idle_with_receivers() {
let mut cs = ChannelState::new(16, 64);
cs.stats.connected_receivers = 1;
assert!(!cs.is_idle());
}
#[test]
fn grace_period_not_expired_initially() {
let cs = ChannelState::new(16, 64);
assert!(!cs.grace_period_expired(30));
}
#[test]
fn grace_period_expired_after_disconnect() {
let mut cs = ChannelState::new(16, 64);
cs.source_disconnect_time = Some(Instant::now() - Duration::from_secs(60));
assert!(cs.grace_period_expired(30));
}
}