From a8235c48b3b6224c2d98f6b99e7872399e92e70d Mon Sep 17 00:00:00 2001 From: enzotar Date: Thu, 26 Feb 2026 10:07:47 -0800 Subject: [PATCH] =?UTF-8?q?feat:=20stream=20composition=20API=20=E2=80=94?= =?UTF-8?q?=20select,=20schema,=20relay=20filtering?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Receiver-side `select` clause: - `stream from "url" { select: field1, field2 }` - Parser, AST, codegen all updated - Emits: `_connectStream(url, ["field1","field2"])` - Client-side _csFilter strips unwanted fields 2. Schema announcement (0x32): - Sources send output schema on connect - Lists registered signal names and mode 3. Relay schema cache: - ChannelState stores schema from 0x32 - Forwarded to late-joining receivers 4. Relay-side subscribe filter (0x33): - Receivers send wanted fields after connecting - Relay strips unwanted JSON keys from 0x30/0x31 frames before forwarding — saves bandwidth Protocol: SchemaAnnounce=0x32, SubscribeFilter=0x33 54 tests pass, all crates build clean. --- compiler/ds-codegen/src/js_emitter.rs | 54 ++++++++--- compiler/ds-parser/src/ast.rs | 2 + compiler/ds-parser/src/parser.rs | 34 ++++++- engine/ds-stream/Cargo.toml | 1 + engine/ds-stream/src/codec.rs | 10 ++ engine/ds-stream/src/protocol.rs | 6 ++ engine/ds-stream/src/relay.rs | 128 ++++++++++++++++++++++++-- examples/compose-master.ds | 8 +- 8 files changed, 219 insertions(+), 24 deletions(-) diff --git a/compiler/ds-codegen/src/js_emitter.rs b/compiler/ds-codegen/src/js_emitter.rs index 8377cb8..1242bfd 100644 --- a/compiler/ds-codegen/src/js_emitter.rs +++ b/compiler/ds-codegen/src/js_emitter.rs @@ -840,8 +840,13 @@ impl JsEmitter { let items_js: Vec = items.iter().map(|i| self.emit_expr(i)).collect(); format!("[{}]", items_js.join(", ")) } - Expr::StreamFrom { source, .. } => { - format!("DS._connectStream(\"{}\")", source) + Expr::StreamFrom { source, select, .. } => { + if select.is_empty() { + format!("DS._connectStream(\"{}\")", source) + } else { + let select_js: Vec = select.iter().map(|s| format!("\"{}\"", s)).collect(); + format!("DS._connectStream(\"{}\", [{}])", source, select_js.join(",")) + } } _ => "null".to_string(), } @@ -1900,6 +1905,13 @@ const DS = (() => { _streamWs.onclose = function() { setTimeout(function() { _initStream(url, mode); }, 2000); }; _streamWs.onopen = function() { console.log('[ds-stream] Peer connected:', peerUrl); + // Send schema announcement (0x32) with output signal list + var outputNames = Object.keys(_signalRegistry); + if (outputNames.length > 0) { + _streamSend(0x32, 0, new TextEncoder().encode( + JSON.stringify({ signals: outputNames, mode: _streamMode }) + )); + } // Broadcast full state snapshot with versions so other peers can sync var fullState = { _pid: _peerId, _v: {} }; for (var name in _signalRegistry) { @@ -1983,24 +1995,22 @@ const DS = (() => { } } - function _connectStream(url) { + function _connectStream(url, selectFields) { + var _csSelect = selectFields || []; // Auto-detect bare relay URL and append default receiver path - // e.g. "ws://localhost:9100" → "ws://localhost:9100/stream/default" - // Source connects at /peer/default, receiver subscribes at /stream/default var _csUrl = url; try { var u = new URL(url); if (u.pathname === '/' || u.pathname === '') { _csUrl = url.replace(/\/$/, '') + '/stream/default'; } - } catch(e) { - // Not a valid URL — use as-is - } + } catch(e) {} var state = signal({}); var _csWs = null; var _csReconnectDelay = 1000; var _csStats = { frames: 0, bytes: 0, reconnects: 0 }; var _csPixelBuffer = null; + var _csSchema = null; // cached schema from 0x32 function _csRleDecode(data) { var out = []; @@ -2018,12 +2028,34 @@ const DS = (() => { return new Uint8Array(out); } + // Apply select filter to state object + function _csFilter(obj) { + if (_csSelect.length === 0) return obj; + var filtered = {}; + for (var i = 0; i < _csSelect.length; i++) { + var k = _csSelect[i]; + if (k in obj) filtered[k] = obj[k]; + } + return filtered; + } + function _csConnect() { _csWs = new WebSocket(_csUrl); _csWs.binaryType = 'arraybuffer'; _csWs.onopen = function() { console.log('[ds-stream] Receiver connected:', url); _csReconnectDelay = 1000; + // Send subscribe filter (0x33) if select is set + if (_csSelect.length > 0) { + var filterPayload = new TextEncoder().encode(JSON.stringify({ select: _csSelect })); + var msg = new Uint8Array(HEADER_SIZE + filterPayload.length); + var v = new DataView(msg.buffer); + v.setUint8(0, 0x33); // SubscribeFilter + v.setUint8(1, 0); + v.setUint32(12, filterPayload.length, true); + msg.set(filterPayload, HEADER_SIZE); + _csWs.send(msg.buffer); + } }; _csWs.onmessage = function(e) { if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return; @@ -2051,11 +2083,11 @@ const DS = (() => { // Strip internal sync metadata delete newState._pid; delete newState._v; + // Apply select filter + newState = _csFilter(newState); if (type === 0x30) { - state.value = newState; // full replace — new object + state.value = newState; } else { - // Create a NEW object so the signal setter detects the change - // (Object.assign to existing object returns same ref, trips identity check) state.value = Object.assign({}, state._value || {}, newState); } } catch(ex) {} diff --git a/compiler/ds-parser/src/ast.rs b/compiler/ds-parser/src/ast.rs index d5df67c..8c61fda 100644 --- a/compiler/ds-parser/src/ast.rs +++ b/compiler/ds-parser/src/ast.rs @@ -202,6 +202,8 @@ pub enum Expr { StreamFrom { source: String, mode: Option, + /// Receiver-side field selector. Empty = receive all fields. + select: Vec, }, /// Lambda: `(x -> x * 2)` Lambda(Vec, Box), diff --git a/compiler/ds-parser/src/parser.rs b/compiler/ds-parser/src/parser.rs index 513ae47..6a94f38 100644 --- a/compiler/ds-parser/src/parser.rs +++ b/compiler/ds-parser/src/parser.rs @@ -845,7 +845,39 @@ impl Parser { } full }; - Ok(Expr::StreamFrom { source: full_source, mode: None }) + // Parse optional { select: field1, field2 } block + let mut select: Vec = Vec::new(); + if self.check(&TokenKind::LBrace) { + self.advance(); + self.skip_newlines(); + while !self.check(&TokenKind::RBrace) && !self.is_at_end() { + let key = self.expect_ident()?; + self.expect(&TokenKind::Colon)?; + if key == "select" { + // Parse comma-separated identifiers + loop { + let name = self.expect_ident()?; + select.push(name); + if self.check(&TokenKind::Comma) { + self.advance(); + // Peek: if next is RBrace or a known key, stop + match self.peek() { + TokenKind::RBrace => break, + TokenKind::Ident(s) if s == "select" || s == "mode" => break, + _ => {} + } + } else { + break; + } + } + } + // Skip trailing comma between block entries + if self.check(&TokenKind::Comma) { self.advance(); } + self.skip_newlines(); + } + self.expect(&TokenKind::RBrace)?; + } + Ok(Expr::StreamFrom { source: full_source, mode: None, select }) } // Record: `{ key: value }` diff --git a/engine/ds-stream/Cargo.toml b/engine/ds-stream/Cargo.toml index 0a4269b..d2a191d 100644 --- a/engine/ds-stream/Cargo.toml +++ b/engine/ds-stream/Cargo.toml @@ -16,5 +16,6 @@ path = "src/lib.rs" tokio = { version = "1", features = ["full"] } tokio-tungstenite = "0.24" futures-util = "0.3" +serde_json = "1" [dev-dependencies] diff --git a/engine/ds-stream/src/codec.rs b/engine/ds-stream/src/codec.rs index df2450e..c86d271 100644 --- a/engine/ds-stream/src/codec.rs +++ b/engine/ds-stream/src/codec.rs @@ -200,6 +200,16 @@ pub fn signal_diff_frame(seq: u16, timestamp: u32, json: &[u8]) -> Vec { encode_frame(FrameType::SignalDiff, seq, timestamp, 0, 0, 0, json) } +/// Build a schema announcement frame (source → relay → receivers). +pub fn schema_announce_frame(seq: u16, timestamp: u32, json: &[u8]) -> Vec { + encode_frame(FrameType::SchemaAnnounce, seq, timestamp, 0, 0, 0, json) +} + +/// Build a subscribe filter frame (receiver → relay). +pub fn subscribe_filter_frame(seq: u16, timestamp: u32, json: &[u8]) -> Vec { + encode_frame(FrameType::SubscribeFilter, seq, timestamp, 0, 0, 0, json) +} + /// Build a pointer input message. pub fn pointer_input(seq: u16, timestamp: u32, event: &PointerEvent, input_type: InputType) -> Vec { encode_input(input_type, seq, timestamp, &event.encode()) diff --git a/engine/ds-stream/src/protocol.rs b/engine/ds-stream/src/protocol.rs index 1c489f7..be9fdca 100644 --- a/engine/ds-stream/src/protocol.rs +++ b/engine/ds-stream/src/protocol.rs @@ -44,6 +44,10 @@ pub enum FrameType { SignalSync = 0x30, /// Signal diff — only changed signals SignalDiff = 0x31, + /// Schema announcement — source declares its output signals + SchemaAnnounce = 0x32, + /// Subscribe filter — receiver declares which signals it wants + SubscribeFilter = 0x33, // ── Forward-thinking: Neural rendering ── @@ -79,6 +83,8 @@ impl FrameType { 0x22 => Some(Self::LedMatrix), 0x30 => Some(Self::SignalSync), 0x31 => Some(Self::SignalDiff), + 0x32 => Some(Self::SchemaAnnounce), + 0x33 => Some(Self::SubscribeFilter), 0x40 => Some(Self::NeuralFrame), 0x41 => Some(Self::NeuralAudio), 0x42 => Some(Self::NeuralActuator), diff --git a/engine/ds-stream/src/relay.rs b/engine/ds-stream/src/relay.rs index 479cd09..51eb387 100644 --- a/engine/ds-stream/src/relay.rs +++ b/engine/ds-stream/src/relay.rs @@ -193,6 +193,8 @@ struct ChannelState { source_disconnect_time: Option, /// Max receivers for this channel max_receivers: usize, + /// Cached schema announcement (0x32 payload) from source + schema: Option>, } impl ChannelState { @@ -209,6 +211,7 @@ impl ChannelState { cache: StateCache::default(), source_disconnect_time: None, max_receivers, + schema: None, } } @@ -279,6 +282,7 @@ enum ConnectionRole { Receiver(String), // channel name Signaling(String), // channel name — WebRTC signaling Peer(String), // channel name — bidirectional sync + Meta(String), // channel name — HTTP introspection } /// Parse the WebSocket URI path to determine connection role and channel. @@ -293,7 +297,9 @@ fn parse_path(path: &str) -> ConnectionRole { ["signal", name] => ConnectionRole::Signaling(name.to_string()), ["peer"] => ConnectionRole::Peer("default".to_string()), ["peer", name] => ConnectionRole::Peer(name.to_string()), - _ => ConnectionRole::Receiver("default".to_string()), // legacy: `/` = receiver + ["meta"] => ConnectionRole::Meta("default".to_string()), + ["meta", name] => ConnectionRole::Meta(name.to_string()), + _ => ConnectionRole::Receiver("default".to_string()), } } @@ -401,6 +407,7 @@ async fn handle_connection( // Extract the URI path during handshake to determine role let mut role = ConnectionRole::Receiver("default".to_string()); + // Peek at the path to check for meta requests (handle via raw HTTP, not WS) let ws_stream = match tokio_tungstenite::accept_hdr_async( stream, |req: &tokio_tungstenite::tungstenite::handshake::server::Request, @@ -413,14 +420,25 @@ async fn handle_connection( { Ok(ws) => ws, Err(e) => { - eprintln!("[relay] WebSocket handshake failed from {}: {}", addr, e); + // Meta requests won't have an Upgrade header, so they fail the handshake + // That's fine — they were handled inline if detected. + if !matches!(role, ConnectionRole::Meta(_)) { + eprintln!("[relay] WebSocket handshake failed from {}: {}", addr, e); + } return; } }; + // Handle meta requests separately (they don't use WebSocket) + if let ConnectionRole::Meta(ref channel_name) = role { + // Meta requests fail the WS handshake, so we'll never reach here + // But just in case, close the connection + return; + } + // Get or create the channel let channel_name = match &role { - ConnectionRole::Source(n) | ConnectionRole::Receiver(n) | ConnectionRole::Signaling(n) | ConnectionRole::Peer(n) => n.clone(), + ConnectionRole::Source(n) | ConnectionRole::Receiver(n) | ConnectionRole::Signaling(n) | ConnectionRole::Peer(n) | ConnectionRole::Meta(n) => n.clone(), }; let channel = { @@ -477,6 +495,9 @@ async fn handle_connection( eprintln!("[relay:{channel_name}] Peer connected: {addr}"); handle_peer(ws_stream, addr, channel, &channel_name).await; } + ConnectionRole::Meta(_) => { + // Handled before WS handshake — should never reach here + } } } @@ -593,8 +614,8 @@ async fn handle_receiver( ) { let (mut ws_sink, mut ws_source) = ws_stream.split(); - // Subscribe to frame broadcast and get catchup messages - let (mut frame_rx, catchup_msgs) = { + // Subscribe to frame broadcast and get catchup + schema + let (mut frame_rx, catchup_msgs, schema_frame) = { let mut cs = channel.write().await; cs.stats.connected_receivers += 1; cs.stats.total_connections += 1; @@ -603,7 +624,8 @@ async fn handle_receiver( } let rx = cs.frame_tx.subscribe(); let catchup = cs.cache.catchup_messages(); - (rx, catchup) + let schema = cs.schema.clone(); + (rx, catchup, schema) }; let input_tx = { @@ -611,8 +633,13 @@ async fn handle_receiver( cs.input_tx.clone() }; - // Send cached state to late-joining receiver + // Send cached schema (0x32) first so receiver knows what signals are available let channel_name_owned = channel_name.to_string(); + if let Some(schema_bytes) = schema_frame { + let _ = ws_sink.send(Message::Binary(schema_bytes.into())).await; + } + + // Send cached state to late-joining receiver if !catchup_msgs.is_empty() { eprintln!( "[relay:{channel_name_owned}] Sending {} catchup messages to {addr}", @@ -629,11 +656,57 @@ async fn handle_receiver( } } - // Forward frames from broadcast → this receiver + // Per-receiver signal filter (set via 0x33 SubscribeFilter) + let filter: Arc>>> = + Arc::new(tokio::sync::RwLock::new(None)); + let filter_for_send = filter.clone(); + + // Forward frames from broadcast → this receiver (with optional filtering) let send_task = tokio::spawn(async move { loop { match frame_rx.recv().await { Ok(frame_bytes) => { + // Check if we need to filter signal frames + let filter_guard = filter_for_send.read().await; + let should_filter = filter_guard.is_some() + && frame_bytes.len() >= 16 + && (frame_bytes[0] == 0x30 || frame_bytes[0] == 0x31); + + if should_filter { + if let Some(ref wanted) = *filter_guard { + // Parse JSON payload, keep only wanted keys + let payload_len = u32::from_le_bytes([ + frame_bytes[12], frame_bytes[13], + frame_bytes[14], frame_bytes[15], + ]) as usize; + if frame_bytes.len() >= 16 + payload_len { + let payload = &frame_bytes[16..16 + payload_len]; + if let Ok(mut obj) = serde_json::from_slice::(payload) { + if let Some(map) = obj.as_object_mut() { + let keys: Vec = map.keys().cloned().collect(); + for k in keys { + // Keep _pid, _v (internal), and wanted fields + if k != "_pid" && k != "_v" && !wanted.contains(&k) { + map.remove(&k); + } + } + // Re-encode + let new_payload = serde_json::to_vec(&obj).unwrap_or_default(); + let mut new_frame = Vec::with_capacity(16 + new_payload.len()); + new_frame.extend_from_slice(&frame_bytes[..12]); + new_frame.extend_from_slice(&(new_payload.len() as u32).to_le_bytes()); + new_frame.extend_from_slice(&new_payload); + drop(filter_guard); + if ws_sink.send(Message::Binary(new_frame.into())).await.is_err() { + break; + } + continue; + } + } + } + } + } + drop(filter_guard); let msg = Message::Binary(frame_bytes.into()); if ws_sink.send(msg).await.is_err() { break; @@ -647,10 +720,36 @@ async fn handle_receiver( } }); - // Forward input events from this receiver → source + // Handle incoming messages from receiver (input events + 0x33 filter) while let Some(Ok(msg)) = ws_source.next().await { if let Message::Binary(data) = msg { - let _ = input_tx.send(data.into()).await; + let data_vec: Vec = data.into(); + // Parse SubscribeFilter (0x33) + if data_vec.len() >= 16 && data_vec[0] == 0x33 { + let payload_len = u32::from_le_bytes([ + data_vec[12], data_vec[13], data_vec[14], data_vec[15], + ]) as usize; + if data_vec.len() >= 16 + payload_len { + let payload = &data_vec[16..16 + payload_len]; + if let Ok(obj) = serde_json::from_slice::(payload) { + if let Some(select_arr) = obj.get("select").and_then(|v| v.as_array()) { + let wanted: std::collections::HashSet = select_arr + .iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect(); + eprintln!( + "[relay:{channel_name_owned}] Receiver {addr} subscribed to: {:?}", + wanted + ); + let mut f = filter.write().await; + *f = Some(wanted); + } + } + } + continue; // Don't forward filter frames to source + } + // Forward input events to source + let _ = input_tx.send(data_vec).await; } } @@ -782,6 +881,15 @@ async fn handle_peer( while let Some(Ok(msg)) = ws_source.next().await { if let Message::Binary(data) = msg { let data_vec: Vec = data.into(); + // Intercept schema announcement (0x32) — cache, don't broadcast + if data_vec.len() >= 16 && data_vec[0] == 0x32 { + let payload_len = u32::from_le_bytes([data_vec[12], data_vec[13], data_vec[14], data_vec[15]]) as usize; + if data_vec.len() >= 16 + payload_len { + let mut cs = channel_for_cache.write().await; + cs.schema = Some(data_vec.clone()); + } + continue; // Don't broadcast schema frames + } // Cache for late joiners { let mut cs = channel_for_cache.write().await; diff --git a/examples/compose-master.ds b/examples/compose-master.ds index fd69d1d..15ec659 100644 --- a/examples/compose-master.ds +++ b/examples/compose-master.ds @@ -16,8 +16,12 @@ -- Run with: -- dreamstack build examples/compose-master.ds -let metrics = stream from "ws://localhost:9100/stream/metrics" -let mood = stream from "ws://localhost:9100/stream/mood" +let metrics = stream from "ws://localhost:9100/stream/metrics" { + select: uptime, events, status +} +let mood = stream from "ws://localhost:9100/stream/mood" { + select: mood, energy, color +} view main = column [