feat: HTTP /meta API, signal dedup, periodic auto-sync

Relay HTTP /meta endpoint:
- GET /meta → JSON with all channel stats
- GET /meta/{name} → JSON with specific channel stats, schema, current state
- Uses TCP peek to intercept raw HTTP before WS handshake
- CORS headers for browser access

Signal deduplication:
- _lastSentValues tracks last-sent value per signal
- JSON.stringify comparison skips unchanged values
- Prevents redundant WS frames from derived signals

Periodic auto-sync:
- Every 50 diff batches, source sends full SignalSync (0x30)
- Relay can compact its cache instead of accumulating infinite diffs
- Resets pending_signal_diffs in relay cache

All 57 relay tests pass. All 9 examples pass.
This commit is contained in:
enzotar 2026-02-26 18:17:25 -08:00
parent 598ecde59c
commit 08e36573a5
2 changed files with 156 additions and 2 deletions

View file

@ -2792,13 +2792,22 @@ const DS = (() => {
} }
// ── Batched diff: coalesce multiple signal changes into one WS frame ── // ── Batched diff: coalesce multiple signal changes into one WS frame ──
var _lastSentValues = {}; // deduplication: track last-sent value per signal
var _diffBatchCount = 0; // count batches for periodic auto-sync
function _streamDiff(name, value) { function _streamDiff(name, value) {
if (!_streamWs || _streamWs.readyState !== 1 || _streamMode !== 'signal') return; if (!_streamWs || _streamWs.readyState !== 1 || _streamMode !== 'signal') return;
if (_applyingRemoteDiff) return; // prevent echo loops if (_applyingRemoteDiff) return; // prevent echo loops
var val = (typeof value === 'object' && value !== null && 'value' in value) ? value.value : value;
// Deduplication: skip if value hasn't changed
var lastStr = JSON.stringify(_lastSentValues[name]);
var newStr = JSON.stringify(val);
if (lastStr === newStr) return;
_lastSentValues[name] = val;
// Increment version for conflict resolution // Increment version for conflict resolution
_signalVersions[name] = (_signalVersions[name] || 0) + 1; _signalVersions[name] = (_signalVersions[name] || 0) + 1;
// Accumulate into pending batch // Accumulate into pending batch
_pendingDiffs[name] = (typeof value === 'object' && value !== null && 'value' in value) ? value.value : value; _pendingDiffs[name] = val;
_pendingVersions[name] = _signalVersions[name]; _pendingVersions[name] = _signalVersions[name];
// Schedule flush on next microtask (coalesces all changes in current event loop tick) // Schedule flush on next microtask (coalesces all changes in current event loop tick)
if (!_diffFlushTimer) { if (!_diffFlushTimer) {
@ -2810,6 +2819,15 @@ const DS = (() => {
_diffFlushTimer = null; _diffFlushTimer = null;
var keys = Object.keys(_pendingDiffs); var keys = Object.keys(_pendingDiffs);
if (keys.length === 0) return; if (keys.length === 0) return;
_diffBatchCount++;
// Every 50 batches, send a full SignalSync instead of a diff
// This lets the relay compact its cache (merge diffs into one sync)
if (_diffBatchCount % 50 === 0) {
_streamSync(_signalRegistry);
_pendingDiffs = {};
_pendingVersions = {};
return;
}
// Build single frame with all changes // Build single frame with all changes
var obj = { _pid: _peerId, _v: {} }; var obj = { _pid: _peerId, _v: {} };
for (var i = 0; i < keys.length; i++) { for (var i = 0; i < keys.length; i++) {

View file

@ -492,11 +492,147 @@ pub async fn run_relay(config: RelayConfig) -> Result<(), Box<dyn std::error::Er
let state = state.clone(); let state = state.clone();
let keepalive_interval = config.keepalive_interval_secs; let keepalive_interval = config.keepalive_interval_secs;
let keepalive_timeout = config.keepalive_timeout_secs; let keepalive_timeout = config.keepalive_timeout_secs;
tokio::spawn(handle_connection(stream, addr, state, keepalive_interval, keepalive_timeout));
tokio::spawn(async move {
// Peek at the HTTP request to check for /meta requests
// Meta requests get a raw HTTP JSON response, not a WS upgrade
let mut peek_buf = [0u8; 512];
let n = match stream.peek(&mut peek_buf).await {
Ok(n) => n,
Err(_) => return,
};
let request_line = String::from_utf8_lossy(&peek_buf[..n]);
// Extract path from "GET /meta/name HTTP/1.1"
if let Some(path) = request_line.lines().next().and_then(|line| {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 { Some(parts[1]) } else { None }
}) {
if path.starts_with("/meta") {
handle_meta_http(stream, addr, &state, path).await;
return;
}
}
handle_connection(stream, addr, state, keepalive_interval, keepalive_timeout).await;
});
} }
Ok(()) Ok(())
} }
/// Handle an HTTP /meta/{channel} request — return channel stats as JSON.
///
/// This is NOT a WebSocket connection. We read the full HTTP request,
/// then respond with an HTTP 200 + JSON body.
async fn handle_meta_http(
mut stream: TcpStream,
addr: SocketAddr,
state: &Arc<RwLock<RelayState>>,
path: &str,
) {
use tokio::io::AsyncWriteExt;
use tokio::io::AsyncReadExt;
// Drain the HTTP request from the socket
let mut buf = vec![0u8; 4096];
let _ = stream.read(&mut buf).await;
// Parse channel name from path
let channel_name = path.trim_start_matches("/meta/").trim_matches('/');
let channel_name = if channel_name.is_empty() { "__all__" } else { channel_name };
let s = state.read().await;
let uptime = s.start_time.elapsed().as_secs();
let body = if channel_name == "__all__" {
// Return stats for ALL channels
let mut channels = serde_json::Map::new();
for (name, channel) in &s.channels {
let cs = channel.read().await;
channels.insert(name.clone(), serde_json::json!({
"source_connected": cs.stats.source_connected,
"receivers": cs.stats.connected_receivers,
"peak_receivers": cs.stats.peak_receivers,
"frames_relayed": cs.stats.frames_relayed,
"bytes_relayed": cs.stats.bytes_relayed,
"signal_diffs": cs.stats.signal_diffs_sent,
"keyframes": cs.stats.keyframes_sent,
"total_connections": cs.stats.total_connections,
"rejected_connections": cs.stats.rejected_connections,
"cached": cs.cache.has_state(),
"pending_diffs": cs.cache.pending_signal_diffs.len(),
}));
}
serde_json::json!({
"uptime_secs": uptime,
"channel_count": s.channels.len(),
"channels": serde_json::Value::Object(channels),
}).to_string()
} else if let Some(channel) = s.channels.get(channel_name) {
// Return stats for a specific channel
let cs = channel.read().await;
// Extract current signal state if available
let mut current_state = serde_json::Value::Null;
if let Some(ref sync_frame) = cs.cache.last_signal_sync {
if sync_frame.len() >= HEADER_SIZE {
let pl_len = u32::from_le_bytes([
sync_frame[12], sync_frame[13], sync_frame[14], sync_frame[15],
]) as usize;
let end = HEADER_SIZE + pl_len.min(sync_frame.len() - HEADER_SIZE);
if let Ok(val) = serde_json::from_slice::<serde_json::Value>(&sync_frame[HEADER_SIZE..end]) {
current_state = val;
}
}
}
serde_json::json!({
"channel": channel_name,
"uptime_secs": uptime,
"source_connected": cs.stats.source_connected,
"receivers": cs.stats.connected_receivers,
"max_receivers": cs.max_receivers,
"peak_receivers": cs.stats.peak_receivers,
"frames_relayed": cs.stats.frames_relayed,
"bytes_relayed": cs.stats.bytes_relayed,
"signal_diffs": cs.stats.signal_diffs_sent,
"keyframes": cs.stats.keyframes_sent,
"total_connections": cs.stats.total_connections,
"rejected_connections": cs.stats.rejected_connections,
"cached": cs.cache.has_state(),
"pending_diffs": cs.cache.pending_signal_diffs.len(),
"schema": cs.schema.as_ref().and_then(|s| {
if s.len() >= HEADER_SIZE {
let pl_len = u32::from_le_bytes([s[12], s[13], s[14], s[15]]) as usize;
let end = HEADER_SIZE + pl_len.min(s.len() - HEADER_SIZE);
serde_json::from_slice::<serde_json::Value>(&s[HEADER_SIZE..end]).ok()
} else { None }
}),
"current_state": current_state,
}).to_string()
} else {
serde_json::json!({
"error": "channel not found",
"channel": channel_name,
}).to_string()
};
// Write HTTP response
let response = format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: application/json\r\n\
Access-Control-Allow-Origin: *\r\n\
Content-Length: {}\r\n\
Connection: close\r\n\
\r\n\
{}",
body.len(),
body
);
let _ = stream.write_all(response.as_bytes()).await;
eprintln!("[relay:meta] {addr}{path}");
}
async fn handle_connection( async fn handle_connection(
stream: TcpStream, stream: TcpStream,
addr: SocketAddr, addr: SocketAddr,