feat: explicit signal output API for stream declarations

Add 'output: field1, field2' syntax to stream declarations to control
which signals are exposed over the relay. Only listed signals are
registered in _signalRegistry (and thus streamed). Omitting output
streams all signals (backwards-compatible).

Also strips internal sync metadata (_pid, _v) from receiver state
so composition consumers only see clean signal values.

Parser: parse comma-separated idents after 'output:' key
AST: Vec<String> output field on StreamDecl
Codegen: conditional _registerSignal, delete _pid/_v on receive

Example: stream counter on 'ws://...' { mode: signal, output: count, doubled }
This commit is contained in:
enzotar 2026-02-26 08:56:32 -08:00
parent b0e7de3b2e
commit 627ee44275
4 changed files with 43 additions and 7 deletions

View file

@ -78,6 +78,13 @@ impl JsEmitter {
self.indent += 1;
// Phase 1: Create all signals
// Collect explicit output list from stream declaration (if any)
let stream_outputs: Vec<String> = program.declarations.iter()
.filter_map(|d| if let Declaration::Stream(s) = d { Some(s) } else { None })
.flat_map(|s| s.output.iter().cloned())
.collect();
let has_explicit_output = !stream_outputs.is_empty();
self.emit_line("// ── Signals ──");
for node in &graph.nodes {
match &node.kind {
@ -110,8 +117,10 @@ impl JsEmitter {
}
};
self.emit_line(&format!("const {} = DS.signal({});", node.name, init));
// Register signal for bidirectional streaming sync
self.emit_line(&format!("DS._registerSignal(\"{}\", {});", node.name, node.name));
// Register for streaming only if in output list (or no explicit output = all)
if !has_explicit_output || stream_outputs.contains(&node.name) {
self.emit_line(&format!("DS._registerSignal(\"{}\", {});", node.name, node.name));
}
}
SignalKind::Derived => {
// Find the let declaration to get the expression
@ -122,8 +131,10 @@ impl JsEmitter {
"const {} = DS.derived(() => {});",
node.name, js_expr
));
// Register derived signal so it's included in stream sync
self.emit_line(&format!("DS._registerSignal(\"{}\", {});", node.name, node.name));
// Register for streaming only if in output list (or no explicit output = all)
if !has_explicit_output || stream_outputs.contains(&node.name) {
self.emit_line(&format!("DS._registerSignal(\"{}\", {});", node.name, node.name));
}
}
}
SignalKind::Handler { .. } => {} // Handled later
@ -2037,6 +2048,9 @@ const DS = (() => {
case 0x31: // SignalDiff — partial state update
try {
var newState = JSON.parse(new TextDecoder().decode(pl));
// Strip internal sync metadata
delete newState._pid;
delete newState._v;
if (type === 0x30) {
state.value = newState; // full replace — new object
} else {

View file

@ -113,6 +113,8 @@ pub struct StreamDecl {
pub relay_url: Expr,
pub mode: StreamMode,
pub transport: StreamTransport,
/// Explicit signal output list. Empty = stream all signals (backwards-compat).
pub output: Vec<String>,
pub span: Span,
}

View file

@ -365,7 +365,7 @@ impl Parser {
}))
}
/// Parse: `stream <view_name> on <url_expr> { mode: pixel | delta | signal }`
/// Parse: `stream <view_name> on <url_expr> { mode: signal, output: a, b }`
fn parse_stream_decl(&mut self) -> Result<Declaration, ParseError> {
let line = self.current_token().line;
self.advance(); // consume `stream`
@ -380,9 +380,10 @@ impl Parser {
let relay_url = self.parse_expr()?;
// Optional mode block: `{ mode: signal, transport: webrtc }`
// Optional config block: `{ mode: signal, transport: webrtc, output: a, b }`
let mut mode = StreamMode::Signal;
let mut transport = StreamTransport::WebSocket;
let mut output: Vec<String> = Vec::new();
if self.check(&TokenKind::LBrace) {
self.advance(); // {
@ -410,6 +411,24 @@ impl Parser {
}
_ => return Err(self.error("Expected webrtc or websocket".into())),
}
} else if key == "output" {
// Parse comma-separated identifiers: output: count, doubled
loop {
let name = self.expect_ident()?;
output.push(name);
if self.check(&TokenKind::Comma) {
self.advance();
// Peek ahead: if next is a known key or RBrace, stop
// (the comma was a field separator, not an output separator)
match self.peek() {
TokenKind::RBrace => break,
TokenKind::Ident(s) if s == "mode" || s == "transport" || s == "output" => break,
_ => {} // continue parsing output names
}
} else {
break;
}
}
}
if self.check(&TokenKind::Comma) { self.advance(); }
self.skip_newlines();
@ -422,6 +441,7 @@ impl Parser {
relay_url,
mode,
transport,
output,
span: Span { start: 0, end: 0, line },
}))
}

View file

@ -10,7 +10,7 @@ let count = 0
let doubled = count * 2
let message = "Streaming Counter"
stream counter on "ws://localhost:9100" { mode: signal }
stream counter on "ws://localhost:9100" { mode: signal, output: count, doubled }
view counter =
column [