feat: stream composition API — select, schema, relay filtering

1. Receiver-side `select` clause:
   - `stream from "url" { select: field1, field2 }`
   - Parser, AST, codegen all updated
   - Emits: `_connectStream(url, ["field1","field2"])`
   - Client-side _csFilter strips unwanted fields

2. Schema announcement (0x32):
   - Sources send output schema on connect
   - Lists registered signal names and mode

3. Relay schema cache:
   - ChannelState stores schema from 0x32
   - Forwarded to late-joining receivers

4. Relay-side subscribe filter (0x33):
   - Receivers send wanted fields after connecting
   - Relay strips unwanted JSON keys from 0x30/0x31
     frames before forwarding — saves bandwidth

Protocol: SchemaAnnounce=0x32, SubscribeFilter=0x33
54 tests pass, all crates build clean.
This commit is contained in:
enzotar 2026-02-26 10:07:47 -08:00
parent b5d813b9af
commit a8235c48b3
8 changed files with 219 additions and 24 deletions

View file

@ -840,8 +840,13 @@ impl JsEmitter {
let items_js: Vec<String> = items.iter().map(|i| self.emit_expr(i)).collect();
format!("[{}]", items_js.join(", "))
}
Expr::StreamFrom { source, .. } => {
format!("DS._connectStream(\"{}\")", source)
Expr::StreamFrom { source, select, .. } => {
if select.is_empty() {
format!("DS._connectStream(\"{}\")", source)
} else {
let select_js: Vec<String> = select.iter().map(|s| format!("\"{}\"", s)).collect();
format!("DS._connectStream(\"{}\", [{}])", source, select_js.join(","))
}
}
_ => "null".to_string(),
}
@ -1900,6 +1905,13 @@ const DS = (() => {
_streamWs.onclose = function() { setTimeout(function() { _initStream(url, mode); }, 2000); };
_streamWs.onopen = function() {
console.log('[ds-stream] Peer connected:', peerUrl);
// Send schema announcement (0x32) with output signal list
var outputNames = Object.keys(_signalRegistry);
if (outputNames.length > 0) {
_streamSend(0x32, 0, new TextEncoder().encode(
JSON.stringify({ signals: outputNames, mode: _streamMode })
));
}
// Broadcast full state snapshot with versions so other peers can sync
var fullState = { _pid: _peerId, _v: {} };
for (var name in _signalRegistry) {
@ -1983,24 +1995,22 @@ const DS = (() => {
}
}
function _connectStream(url) {
function _connectStream(url, selectFields) {
var _csSelect = selectFields || [];
// Auto-detect bare relay URL and append default receiver path
// e.g. "ws://localhost:9100" → "ws://localhost:9100/stream/default"
// Source connects at /peer/default, receiver subscribes at /stream/default
var _csUrl = url;
try {
var u = new URL(url);
if (u.pathname === '/' || u.pathname === '') {
_csUrl = url.replace(/\/$/, '') + '/stream/default';
}
} catch(e) {
// Not a valid URL — use as-is
}
} catch(e) {}
var state = signal({});
var _csWs = null;
var _csReconnectDelay = 1000;
var _csStats = { frames: 0, bytes: 0, reconnects: 0 };
var _csPixelBuffer = null;
var _csSchema = null; // cached schema from 0x32
function _csRleDecode(data) {
var out = [];
@ -2018,12 +2028,34 @@ const DS = (() => {
return new Uint8Array(out);
}
// Apply select filter to state object
function _csFilter(obj) {
if (_csSelect.length === 0) return obj;
var filtered = {};
for (var i = 0; i < _csSelect.length; i++) {
var k = _csSelect[i];
if (k in obj) filtered[k] = obj[k];
}
return filtered;
}
function _csConnect() {
_csWs = new WebSocket(_csUrl);
_csWs.binaryType = 'arraybuffer';
_csWs.onopen = function() {
console.log('[ds-stream] Receiver connected:', url);
_csReconnectDelay = 1000;
// Send subscribe filter (0x33) if select is set
if (_csSelect.length > 0) {
var filterPayload = new TextEncoder().encode(JSON.stringify({ select: _csSelect }));
var msg = new Uint8Array(HEADER_SIZE + filterPayload.length);
var v = new DataView(msg.buffer);
v.setUint8(0, 0x33); // SubscribeFilter
v.setUint8(1, 0);
v.setUint32(12, filterPayload.length, true);
msg.set(filterPayload, HEADER_SIZE);
_csWs.send(msg.buffer);
}
};
_csWs.onmessage = function(e) {
if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return;
@ -2051,11 +2083,11 @@ const DS = (() => {
// Strip internal sync metadata
delete newState._pid;
delete newState._v;
// Apply select filter
newState = _csFilter(newState);
if (type === 0x30) {
state.value = newState; // full replace — new object
state.value = newState;
} else {
// Create a NEW object so the signal setter detects the change
// (Object.assign to existing object returns same ref, trips identity check)
state.value = Object.assign({}, state._value || {}, newState);
}
} catch(ex) {}

View file

@ -202,6 +202,8 @@ pub enum Expr {
StreamFrom {
source: String,
mode: Option<StreamMode>,
/// Receiver-side field selector. Empty = receive all fields.
select: Vec<String>,
},
/// Lambda: `(x -> x * 2)`
Lambda(Vec<String>, Box<Expr>),

View file

@ -845,7 +845,39 @@ impl Parser {
}
full
};
Ok(Expr::StreamFrom { source: full_source, mode: None })
// Parse optional { select: field1, field2 } block
let mut select: Vec<String> = Vec::new();
if self.check(&TokenKind::LBrace) {
self.advance();
self.skip_newlines();
while !self.check(&TokenKind::RBrace) && !self.is_at_end() {
let key = self.expect_ident()?;
self.expect(&TokenKind::Colon)?;
if key == "select" {
// Parse comma-separated identifiers
loop {
let name = self.expect_ident()?;
select.push(name);
if self.check(&TokenKind::Comma) {
self.advance();
// Peek: if next is RBrace or a known key, stop
match self.peek() {
TokenKind::RBrace => break,
TokenKind::Ident(s) if s == "select" || s == "mode" => break,
_ => {}
}
} else {
break;
}
}
}
// Skip trailing comma between block entries
if self.check(&TokenKind::Comma) { self.advance(); }
self.skip_newlines();
}
self.expect(&TokenKind::RBrace)?;
}
Ok(Expr::StreamFrom { source: full_source, mode: None, select })
}
// Record: `{ key: value }`

View file

@ -16,5 +16,6 @@ path = "src/lib.rs"
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.24"
futures-util = "0.3"
serde_json = "1"
[dev-dependencies]

View file

@ -200,6 +200,16 @@ pub fn signal_diff_frame(seq: u16, timestamp: u32, json: &[u8]) -> Vec<u8> {
encode_frame(FrameType::SignalDiff, seq, timestamp, 0, 0, 0, json)
}
/// Build a schema announcement frame (source → relay → receivers).
pub fn schema_announce_frame(seq: u16, timestamp: u32, json: &[u8]) -> Vec<u8> {
encode_frame(FrameType::SchemaAnnounce, seq, timestamp, 0, 0, 0, json)
}
/// Build a subscribe filter frame (receiver → relay).
pub fn subscribe_filter_frame(seq: u16, timestamp: u32, json: &[u8]) -> Vec<u8> {
encode_frame(FrameType::SubscribeFilter, seq, timestamp, 0, 0, 0, json)
}
/// Build a pointer input message.
pub fn pointer_input(seq: u16, timestamp: u32, event: &PointerEvent, input_type: InputType) -> Vec<u8> {
encode_input(input_type, seq, timestamp, &event.encode())

View file

@ -44,6 +44,10 @@ pub enum FrameType {
SignalSync = 0x30,
/// Signal diff — only changed signals
SignalDiff = 0x31,
/// Schema announcement — source declares its output signals
SchemaAnnounce = 0x32,
/// Subscribe filter — receiver declares which signals it wants
SubscribeFilter = 0x33,
// ── Forward-thinking: Neural rendering ──
@ -79,6 +83,8 @@ impl FrameType {
0x22 => Some(Self::LedMatrix),
0x30 => Some(Self::SignalSync),
0x31 => Some(Self::SignalDiff),
0x32 => Some(Self::SchemaAnnounce),
0x33 => Some(Self::SubscribeFilter),
0x40 => Some(Self::NeuralFrame),
0x41 => Some(Self::NeuralAudio),
0x42 => Some(Self::NeuralActuator),

View file

@ -193,6 +193,8 @@ struct ChannelState {
source_disconnect_time: Option<Instant>,
/// Max receivers for this channel
max_receivers: usize,
/// Cached schema announcement (0x32 payload) from source
schema: Option<Vec<u8>>,
}
impl ChannelState {
@ -209,6 +211,7 @@ impl ChannelState {
cache: StateCache::default(),
source_disconnect_time: None,
max_receivers,
schema: None,
}
}
@ -279,6 +282,7 @@ enum ConnectionRole {
Receiver(String), // channel name
Signaling(String), // channel name — WebRTC signaling
Peer(String), // channel name — bidirectional sync
Meta(String), // channel name — HTTP introspection
}
/// Parse the WebSocket URI path to determine connection role and channel.
@ -293,7 +297,9 @@ fn parse_path(path: &str) -> ConnectionRole {
["signal", name] => ConnectionRole::Signaling(name.to_string()),
["peer"] => ConnectionRole::Peer("default".to_string()),
["peer", name] => ConnectionRole::Peer(name.to_string()),
_ => ConnectionRole::Receiver("default".to_string()), // legacy: `/` = receiver
["meta"] => ConnectionRole::Meta("default".to_string()),
["meta", name] => ConnectionRole::Meta(name.to_string()),
_ => ConnectionRole::Receiver("default".to_string()),
}
}
@ -401,6 +407,7 @@ async fn handle_connection(
// Extract the URI path during handshake to determine role
let mut role = ConnectionRole::Receiver("default".to_string());
// Peek at the path to check for meta requests (handle via raw HTTP, not WS)
let ws_stream = match tokio_tungstenite::accept_hdr_async(
stream,
|req: &tokio_tungstenite::tungstenite::handshake::server::Request,
@ -413,14 +420,25 @@ async fn handle_connection(
{
Ok(ws) => ws,
Err(e) => {
eprintln!("[relay] WebSocket handshake failed from {}: {}", addr, e);
// Meta requests won't have an Upgrade header, so they fail the handshake
// That's fine — they were handled inline if detected.
if !matches!(role, ConnectionRole::Meta(_)) {
eprintln!("[relay] WebSocket handshake failed from {}: {}", addr, e);
}
return;
}
};
// Handle meta requests separately (they don't use WebSocket)
if let ConnectionRole::Meta(ref channel_name) = role {
// Meta requests fail the WS handshake, so we'll never reach here
// But just in case, close the connection
return;
}
// Get or create the channel
let channel_name = match &role {
ConnectionRole::Source(n) | ConnectionRole::Receiver(n) | ConnectionRole::Signaling(n) | ConnectionRole::Peer(n) => n.clone(),
ConnectionRole::Source(n) | ConnectionRole::Receiver(n) | ConnectionRole::Signaling(n) | ConnectionRole::Peer(n) | ConnectionRole::Meta(n) => n.clone(),
};
let channel = {
@ -477,6 +495,9 @@ async fn handle_connection(
eprintln!("[relay:{channel_name}] Peer connected: {addr}");
handle_peer(ws_stream, addr, channel, &channel_name).await;
}
ConnectionRole::Meta(_) => {
// Handled before WS handshake — should never reach here
}
}
}
@ -593,8 +614,8 @@ async fn handle_receiver(
) {
let (mut ws_sink, mut ws_source) = ws_stream.split();
// Subscribe to frame broadcast and get catchup messages
let (mut frame_rx, catchup_msgs) = {
// Subscribe to frame broadcast and get catchup + schema
let (mut frame_rx, catchup_msgs, schema_frame) = {
let mut cs = channel.write().await;
cs.stats.connected_receivers += 1;
cs.stats.total_connections += 1;
@ -603,7 +624,8 @@ async fn handle_receiver(
}
let rx = cs.frame_tx.subscribe();
let catchup = cs.cache.catchup_messages();
(rx, catchup)
let schema = cs.schema.clone();
(rx, catchup, schema)
};
let input_tx = {
@ -611,8 +633,13 @@ async fn handle_receiver(
cs.input_tx.clone()
};
// Send cached state to late-joining receiver
// Send cached schema (0x32) first so receiver knows what signals are available
let channel_name_owned = channel_name.to_string();
if let Some(schema_bytes) = schema_frame {
let _ = ws_sink.send(Message::Binary(schema_bytes.into())).await;
}
// Send cached state to late-joining receiver
if !catchup_msgs.is_empty() {
eprintln!(
"[relay:{channel_name_owned}] Sending {} catchup messages to {addr}",
@ -629,11 +656,57 @@ async fn handle_receiver(
}
}
// Forward frames from broadcast → this receiver
// Per-receiver signal filter (set via 0x33 SubscribeFilter)
let filter: Arc<tokio::sync::RwLock<Option<std::collections::HashSet<String>>>> =
Arc::new(tokio::sync::RwLock::new(None));
let filter_for_send = filter.clone();
// Forward frames from broadcast → this receiver (with optional filtering)
let send_task = tokio::spawn(async move {
loop {
match frame_rx.recv().await {
Ok(frame_bytes) => {
// Check if we need to filter signal frames
let filter_guard = filter_for_send.read().await;
let should_filter = filter_guard.is_some()
&& frame_bytes.len() >= 16
&& (frame_bytes[0] == 0x30 || frame_bytes[0] == 0x31);
if should_filter {
if let Some(ref wanted) = *filter_guard {
// Parse JSON payload, keep only wanted keys
let payload_len = u32::from_le_bytes([
frame_bytes[12], frame_bytes[13],
frame_bytes[14], frame_bytes[15],
]) as usize;
if frame_bytes.len() >= 16 + payload_len {
let payload = &frame_bytes[16..16 + payload_len];
if let Ok(mut obj) = serde_json::from_slice::<serde_json::Value>(payload) {
if let Some(map) = obj.as_object_mut() {
let keys: Vec<String> = map.keys().cloned().collect();
for k in keys {
// Keep _pid, _v (internal), and wanted fields
if k != "_pid" && k != "_v" && !wanted.contains(&k) {
map.remove(&k);
}
}
// Re-encode
let new_payload = serde_json::to_vec(&obj).unwrap_or_default();
let mut new_frame = Vec::with_capacity(16 + new_payload.len());
new_frame.extend_from_slice(&frame_bytes[..12]);
new_frame.extend_from_slice(&(new_payload.len() as u32).to_le_bytes());
new_frame.extend_from_slice(&new_payload);
drop(filter_guard);
if ws_sink.send(Message::Binary(new_frame.into())).await.is_err() {
break;
}
continue;
}
}
}
}
}
drop(filter_guard);
let msg = Message::Binary(frame_bytes.into());
if ws_sink.send(msg).await.is_err() {
break;
@ -647,10 +720,36 @@ async fn handle_receiver(
}
});
// Forward input events from this receiver → source
// Handle incoming messages from receiver (input events + 0x33 filter)
while let Some(Ok(msg)) = ws_source.next().await {
if let Message::Binary(data) = msg {
let _ = input_tx.send(data.into()).await;
let data_vec: Vec<u8> = data.into();
// Parse SubscribeFilter (0x33)
if data_vec.len() >= 16 && data_vec[0] == 0x33 {
let payload_len = u32::from_le_bytes([
data_vec[12], data_vec[13], data_vec[14], data_vec[15],
]) as usize;
if data_vec.len() >= 16 + payload_len {
let payload = &data_vec[16..16 + payload_len];
if let Ok(obj) = serde_json::from_slice::<serde_json::Value>(payload) {
if let Some(select_arr) = obj.get("select").and_then(|v| v.as_array()) {
let wanted: std::collections::HashSet<String> = select_arr
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
eprintln!(
"[relay:{channel_name_owned}] Receiver {addr} subscribed to: {:?}",
wanted
);
let mut f = filter.write().await;
*f = Some(wanted);
}
}
}
continue; // Don't forward filter frames to source
}
// Forward input events to source
let _ = input_tx.send(data_vec).await;
}
}
@ -782,6 +881,15 @@ async fn handle_peer(
while let Some(Ok(msg)) = ws_source.next().await {
if let Message::Binary(data) = msg {
let data_vec: Vec<u8> = data.into();
// Intercept schema announcement (0x32) — cache, don't broadcast
if data_vec.len() >= 16 && data_vec[0] == 0x32 {
let payload_len = u32::from_le_bytes([data_vec[12], data_vec[13], data_vec[14], data_vec[15]]) as usize;
if data_vec.len() >= 16 + payload_len {
let mut cs = channel_for_cache.write().await;
cs.schema = Some(data_vec.clone());
}
continue; // Don't broadcast schema frames
}
// Cache for late joiners
{
let mut cs = channel_for_cache.write().await;

View file

@ -16,8 +16,12 @@
-- Run with:
-- dreamstack build examples/compose-master.ds
let metrics = stream from "ws://localhost:9100/stream/metrics"
let mood = stream from "ws://localhost:9100/stream/mood"
let metrics = stream from "ws://localhost:9100/stream/metrics" {
select: uptime, events, status
}
let mood = stream from "ws://localhost:9100/stream/mood" {
select: mood, energy, color
}
view main =
column [