dreamstack/BITSTREAM_INTEGRATION.md
enzotar ebf11889a3 docs: comprehensive documentation update
- DREAMSTACK.md: rewritten with accurate counts (48 examples, 14
  components, 136 tests), full CLI reference, architecture diagrams,
  quick start guide, comparison table, and phased roadmap
- IMPLEMENTATION_PLAN.md: rewritten with all 10 phases showing
  accurate completion status, current capabilities, and next steps
- BITSTREAM_INTEGRATION.md: updated test count (82 → 136)
- USE_CASES.md and STREAM_COMPOSITION.md: already current, unchanged
2026-02-27 11:15:54 -08:00

34 KiB

Bitstream Integration Spec

Purpose: This document specifies all changes needed to make DreamStack natively bitstream-aware. A .ds file should compile into a fully streamable app with one keyword. Follow each section in order — they build on each other.

Implementation Status

All changes in this spec have been implemented. Status per change:

Change Description Status Notes
1. AST StreamDecl, StreamMode, refactored StreamFrom Done ast.rs
2. Lexer Pixel, Delta, Signals keywords Done lexer.rs
3. Parser parse_stream_decl(), mode parsing Done parser.rs
4. Signal Graph streamable flag on SignalNode Done signal_graph.rs
5. Type Checker StreamFromType::Stream Done checker.rs
6. Codegen emit_stream_init, StreamFrom, runtime JS Done js_emitter.rs
7. JS Runtime _initStream, _streamDiff, _streamSync, _connectStream, _streamSceneState Done Embedded in RUNTIME_JS
8. CLI dreamstack stream command Done main.rs
9. Layout to_bytes/from_bytes on LayoutRect Done solver.rs

Test counts: 136 tests passing across full workspace (ds-stream: 38, ds-parser: 15, ds-analyzer: 6, ds-types: 11, ds-codegen: 6, ds-layout: 5, plus integration tests). 48 .ds examples compile successfully.

Background

The engine/ds-stream/ crate already implements:

  • A 16-byte binary protocol (see src/protocol.rs)
  • WebSocket relay server (see src/relay.rs)
  • Encode/decode with delta compression (see src/codec.rs)
  • Two working HTML demos (examples/stream-source.html, examples/stream-receiver.html)

The compiler pipeline is: .ds → Lexer → Parser → AST → Analyzer (signal graph) → Type Checker → Codegen (JS+HTML).

Current state (all wired up):

  • Lexer: stream, pixel, delta, signals keywords → dedicated TokenKind variants
  • AST: StreamDecl, StreamMode enum, Expr::StreamFrom { source, mode }
  • Parser: parse_stream_decl() with full mode block parsing
  • Signal Graph: streamable flag auto-set when Declaration::Stream is present
  • Type checker: returns Type::Stream for StreamFrom
  • Codegen: emits DS._initStream() for Declaration::Stream, DS._connectStream() for StreamFrom, DS._streamDiff() on signal mutations
  • CLI: dreamstack stream command with relay/mode/port flags
  • Runtime JS: full streaming layer (_initStream, _streamDiff, _streamSync, _streamSceneState, _connectStream, _handleRemoteInput)

Change 1: AST — New Types

File: compiler/ds-parser/src/ast.rs

Add StreamDecl to Declaration enum

pub enum Declaration {
    Let(LetDecl),
    View(ViewDecl),
    Effect(EffectDecl),
    OnHandler(OnHandler),
    Component(ComponentDecl),
    Route(RouteDecl),
    Constrain(ConstrainDecl),
    Stream(StreamDecl),       // ← NEW
}

Add StreamDecl struct

/// `stream main on "ws://localhost:9100" { mode: signal }`
#[derive(Debug, Clone)]
pub struct StreamDecl {
    pub view_name: String,         // which view to stream
    pub relay_url: Expr,           // the WebSocket URL (an expression, usually a string lit)
    pub mode: StreamMode,          // streaming mode
    pub span: Span,
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum StreamMode {
    Pixel,      // raw RGBA framebuffer every frame
    Delta,      // XOR + RLE — only changed pixels
    Signal,     // JSON signal diffs (DreamStack-native, ~2 KB/s)
}

impl Default for StreamMode {
    fn default() -> Self { StreamMode::Signal }
}

Modify StreamFrom

// Replace:
StreamFrom(String),
// With:
StreamFrom {
    source: String,
    mode: Option<StreamMode>,
},

Update all match arms in parser.rs, signal_graph.rs, checker.rs, and js_emitter.rs that reference StreamFrom.


Change 2: Lexer — New Keywords

File: compiler/ds-parser/src/lexer.rs

In the lex_ident_or_keyword function, add to the keyword match (around line 306):

"stream" => TokenKind::Stream,     // already exists
"pixel" => TokenKind::Pixel,       // NEW
"delta" => TokenKind::Delta,       // NEW
"signals" => TokenKind::Signals,   // NEW (note: plural to avoid clash with signal())

Add these to the TokenKind enum:

pub enum TokenKind {
    // ... existing tokens ...
    Stream,     // already exists
    Pixel,      // NEW
    Delta,      // NEW
    Signals,    // NEW
}

Change 3: Parser — Parse Stream Declarations

File: compiler/ds-parser/src/parser.rs

In parse_declaration (around line 91)

Add a new arm for the stream keyword:

TokenKind::Stream => self.parse_stream_decl(),

New method: parse_stream_decl

/// Parse: `stream <view_name> on <url_expr>`
/// Optional: `{ mode: pixel | delta | signal }`
fn parse_stream_decl(&mut self) -> Result<Declaration, ParseError> {
    let span = self.current_token().span;
    self.advance(); // consume `stream`

    // Check if this is `stream from` (existing syntax) or `stream <name> on`
    if self.check(&TokenKind::Ident("from".into())) {
        // Existing stream-from expression — delegate to expression parser
        // Actually, this is a declaration context, so handle it differently:
        return self.parse_stream_from_as_decl();
    }

    let view_name = self.expect_ident()?;

    // Expect `on`
    match self.peek() {
        TokenKind::Ident(s) if s == "on" => { self.advance(); }
        _ => return Err(self.error("Expected 'on' after stream view name".into())),
    }

    let relay_url = self.parse_expr()?;

    // Optional mode block: `{ mode: signal }`
    let mode = if self.check(&TokenKind::LBrace) {
        self.advance(); // {
        let mut mode = StreamMode::Signal;
        // Parse key-value pairs
        while !self.check(&TokenKind::RBrace) && !self.is_at_end() {
            self.skip_newlines();
            let key = self.expect_ident()?;
            self.expect(&TokenKind::Colon)?;
            if key == "mode" {
                match self.peek() {
                    TokenKind::Pixel => { mode = StreamMode::Pixel; self.advance(); }
                    TokenKind::Delta => { mode = StreamMode::Delta; self.advance(); }
                    TokenKind::Signals => { mode = StreamMode::Signal; self.advance(); }
                    TokenKind::Ident(s) if s == "signal" => { mode = StreamMode::Signal; self.advance(); }
                    _ => return Err(self.error("Expected pixel, delta, or signal".into())),
                }
            }
            // Skip comma if present
            if self.check(&TokenKind::Comma) { self.advance(); }
            self.skip_newlines();
        }
        self.expect(&TokenKind::RBrace)?;
        mode
    } else {
        StreamMode::Signal // default
    };

    Ok(Declaration::Stream(StreamDecl { view_name, relay_url, mode, span }))
}

Update parse_primary for StreamFrom

Around line 610-616, update the existing stream from parsing:

TokenKind::Stream => {
    self.advance(); // consume `stream`
    // Expect `from`
    match self.peek() {
        TokenKind::Ident(s) if s == "from" => { self.advance(); }
        _ => return Err(self.error("Expected 'from' after stream".into())),
    }
    let source = self.parse_expr()?;
    let source_str = match &source {
        Expr::StringLit(s) => s.segments.iter().map(|seg| match seg {
            StringSegment::Literal(l) => l.clone(),
            _ => String::new(),
        }).collect(),
        Expr::Ident(name) => name.clone(),
        _ => return Err(self.error("Stream source must be a string or identifier".into())),
    };
    Ok(Expr::StreamFrom { source: source_str, mode: None })
}

Change 4: Signal Graph — Stream Awareness

File: compiler/ds-analyzer/src/signal_graph.rs

Add streamable flag to SignalNode

pub struct SignalNode {
    pub name: String,
    pub kind: SignalKind,
    pub initial: Option<InitialValue>,
    pub deps: Vec<Dependency>,
    pub streamable: bool,           // NEW: should this be sent to receivers?
}

Default streamable to true for all Source signals when a StreamDecl is present.

New: SignalManifest

/// Static description of all signals for receiver reconstruction.
#[derive(Debug, Clone)]
pub struct SignalManifest {
    pub signals: Vec<ManifestEntry>,
}

#[derive(Debug, Clone)]
pub struct ManifestEntry {
    pub name: String,
    pub kind: SignalKind,
    pub initial: Option<InitialValue>,
    pub is_spring: bool,
}

impl SignalGraph {
    /// Generate a manifest for receivers to know how to reconstruct the signal state.
    pub fn signal_manifest(&self) -> SignalManifest {
        SignalManifest {
            signals: self.nodes.iter()
                .filter(|n| n.streamable)
                .map(|n| ManifestEntry {
                    name: n.name.clone(),
                    kind: n.kind.clone(),
                    initial: n.initial.clone(),
                    is_spring: false, // detect from AST if the let decl uses spring()
                })
                .collect()
        }
    }
}

Detect StreamDecl in from_program

In from_program (line 97), after processing all declarations, check if any Declaration::Stream exists. If so, mark all source signals as streamable = true.


Change 5: Type Checker

File: compiler/ds-types/src/checker.rs

At line 355, replace:

Expr::StreamFrom(_source) => {
    // TODO
    Ok(Type::Unknown)
}

With:

Expr::StreamFrom { source, .. } => {
    // A stream from a remote source produces a reactive signal state
    Ok(Type::Named("StreamState".into()))
}

Also add handling for Declaration::Stream in the declaration checker — it should verify that the view name exists and that the URL is a string expression.


Change 6: Codegen — the big one

File: compiler/ds-codegen/src/js_emitter.rs

6a. Handle Declaration::Stream in emit_program

In emit_program (line 54), where declarations are iterated, add:

Declaration::Stream(stream_decl) => {
    self.emit_stream_init(stream_decl, graph);
}

6b. New method: emit_stream_init

fn emit_stream_init(&mut self, decl: &StreamDecl, graph: &SignalGraph) {
    let url = self.emit_expr(&decl.relay_url);
    let mode = match decl.mode {
        StreamMode::Pixel => "pixel",
        StreamMode::Delta => "delta",
        StreamMode::Signal => "signal",
    };
    self.emit_line(&format!("DS._initStream({}, '{}');", url, mode));
}

6c. Wrap signal mutations with stream diffs

In the code that emits signal assignments (search for emit_event_handler_expr, around line 655), after emitting signal.value = expr, also emit:

DS._streamDiff("signalName", signal.value);

Find every place where the codegen writes .value = for a signal assignment. After each one, add a DS._streamDiff(name, newValue) call if the signal is streamable.

6d. Handle Expr::StreamFrom in emit_expr

In emit_expr (line 565), StreamFrom is currently not matched. Add:

Expr::StreamFrom { source, .. } => {
    format!("DS._connectStream({})", source)
}

6e. Scene streaming hook

In emit_scene (starting line 733), inside the _sceneLoop function that's emitted (around line 925), add after _sceneDraw():

if (DS._streamWs) DS._streamSceneState(_world, _sceneW, _sceneH);

Change 7: JS Runtime — Streaming Layer

File: compiler/ds-codegen/src/js_emitter.rs, inside the RUNTIME_JS const (starting at line 1125)

Add the following before the closing return { ... } statement (line 1553). Also add _initStream, _streamDiff, _connectStream, _streamSceneState to the returned object.

// ── Bitstream Streaming ──
const HEADER_SIZE = 16;
let _streamWs = null;
let _streamMode = 'signal';
let _streamSeq = 0;
let _streamStart = 0;
let _prevFrame = null;

function _initStream(url, mode) {
  _streamMode = mode || 'signal';
  _streamStart = performance.now();
  _streamWs = new WebSocket(url);
  _streamWs.binaryType = 'arraybuffer';
  _streamWs.onmessage = function(e) {
    if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return;
    const bytes = new Uint8Array(e.data);
    const view = new DataView(bytes.buffer);
    const type = view.getUint8(0);
    const flags = view.getUint8(1);
    if (flags & 0x01) _handleRemoteInput(type, bytes.subarray(HEADER_SIZE));
  };
  _streamWs.onclose = function() { setTimeout(function() { _initStream(url, mode); }, 2000); };
  console.log('[ds-stream] Source connected:', url, 'mode:', mode);
}

function _streamSend(type, flags, payload) {
  if (!_streamWs || _streamWs.readyState !== 1) return;
  const ts = (performance.now() - _streamStart) | 0;
  const msg = new Uint8Array(HEADER_SIZE + payload.length);
  const v = new DataView(msg.buffer);
  v.setUint8(0, type);
  v.setUint8(1, flags);
  v.setUint16(2, (_streamSeq++) & 0xFFFF, true);
  v.setUint32(4, ts, true);
  v.setUint32(12, payload.length, true);
  msg.set(payload, HEADER_SIZE);
  _streamWs.send(msg.buffer);
}

// Signal diff: send changed signal values as JSON
function _streamDiff(name, value) {
  if (!_streamWs || _streamMode !== 'signal') return;
  const json = JSON.stringify({ [name]: typeof value === 'object' && 'value' in value ? value.value : value });
  _streamSend(0x31, 0, new TextEncoder().encode(json));
}

// Full signal sync: send all signal values
function _streamSync(signals) {
  const state = {};
  for (const [name, sig] of Object.entries(signals)) {
    state[name] = typeof sig === 'object' && '_value' in sig ? sig._value : sig;
  }
  _streamSend(0x30, 0x02, new TextEncoder().encode(JSON.stringify(state)));
}

// Scene streaming: send body positions as binary or capture canvas pixels
function _streamSceneState(world, w, h) {
  if (_streamMode === 'signal') {
    // Send body positions as JSON signal state
    const bodies = [];
    for (let b = 0; b < world.body_count(); b++) {
      const p = world.get_body_center(b);
      bodies.push({ x: p[0] | 0, y: p[1] | 0 });
    }
    _streamSend(0x31, 0, new TextEncoder().encode(JSON.stringify({ _bodies: bodies })));
  }
  // Pixel/delta modes are handled by canvas capture in the scene wrapper
}

// Handle input events from receivers
function _handleRemoteInput(type, payload) {
  if (payload.length < 4) return;
  const view = new DataView(payload.buffer, payload.byteOffset);
  switch (type) {
    case 0x01: // Pointer
    case 0x02: // PointerDown
      emit('remote_pointer', {
        x: view.getUint16(0, true),
        y: view.getUint16(2, true),
        buttons: payload.length > 4 ? view.getUint8(4) : 0,
        type: type === 0x02 ? 'down' : 'move'
      });
      break;
    case 0x03: // PointerUp
      emit('remote_pointer', { x: 0, y: 0, buttons: 0, type: 'up' });
      break;
    case 0x10: // KeyDown
      emit('remote_key', {
        keyCode: view.getUint16(0, true),
        type: 'down'
      });
      break;
    case 0x11: // KeyUp
      emit('remote_key', {
        keyCode: view.getUint16(0, true),
        type: 'up'
      });
      break;
    case 0x50: // Scroll
      emit('remote_scroll', {
        dx: view.getInt16(0, true),
        dy: view.getInt16(2, true)
      });
      break;
  }
}

// Connect as a receiver
function _connectStream(url) {
  const state = new Signal(null);
  const ws = new WebSocket(url);
  ws.binaryType = 'arraybuffer';
  ws.onmessage = function(e) {
    if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return;
    const bytes = new Uint8Array(e.data);
    const view = new DataView(bytes.buffer);
    const type = view.getUint8(0);
    const payloadLen = view.getUint32(12, true);
    const payload = bytes.subarray(HEADER_SIZE, HEADER_SIZE + payloadLen);
    if (type === 0x30 || type === 0x31) {
      try {
        const newState = JSON.parse(new TextDecoder().decode(payload));
        state.value = Object.assign(state._value || {}, newState);
      } catch(e) {}
    }
  };
  ws.onclose = function() { setTimeout(function() { _connectStream(url); }, 2000); };
  return state;
}

Then update the return statement (line 1553):

return { signal, derived, effect, batch, flush, onEvent, emit,
         keyedList, route: _route, navigate, matchRoute,
         resource, fetchJSON,
         spring, constrain, viewport: _viewport,
         scene, circle, rect, line,
         _initStream, _streamDiff, _streamSync, _streamSceneState, _connectStream,  // NEW
         Signal, Derived, Effect, Spring };

Change 8: CLI — Stream Command

File: compiler/ds-cli/src/main.rs

Add to the Commands enum (around line 21)

/// Compile and start streaming a .ds file
Stream {
    /// Input .ds file
    file: PathBuf,
    /// Relay WebSocket URL
    #[arg(short, long, default_value = "ws://localhost:9100")]
    relay: String,
    /// Streaming mode (pixel, delta, signal)
    #[arg(short, long, default_value = "signal")]
    mode: String,
    /// Port to serve source on
    #[arg(short, long, default_value_t = 3000)]
    port: u16,
}

Add to the main match (around line 50)

Commands::Stream { file, relay, mode, port } => {
    cmd_stream(&file, &relay, &mode, port);
}

New function: cmd_stream

fn cmd_stream(file: &Path, relay: &str, mode: &str, port: u16) {
    // 1. Read the source .ds file
    let source = std::fs::read_to_string(file).unwrap_or_else(|e| {
        eprintln!("Error reading {}: {}", file.display(), e);
        std::process::exit(1);
    });

    // 2. Inject a StreamDecl if not already present
    //    Prepend: stream main on "{relay}" { mode: {mode} }
    let augmented = if source.contains("stream ") {
        source
    } else {
        format!("stream main on \"{}\" {{ mode: {} }}\n{}", relay, mode, source)
    };

    // 3. Compile
    let html = match compile(&augmented) {
        Ok(html) => html,
        Err(e) => { eprintln!("Compile error: {}", e); std::process::exit(1); }
    };

    // 4. Serve like `dev` mode, but also print stream info
    println!("╔══════════════════════════════════════════════════╗");
    println!("║   DreamStack Stream                             ║");
    println!("║                                                  ║");
    println!("║   Source: http://localhost:{}              ║", port);
    println!("║   Relay:  {}                   ║", relay);
    println!("║   Mode:   {}                             ║", mode);
    println!("╚══════════════════════════════════════════════════╝");

    // Start HTTP server serving the compiled HTML (reuse dev server logic)
    // ...
}

Change 9: Layout Serialization (Deferred)

File: compiler/ds-layout/src/solver.rs

Add serialization to LayoutRect:

impl LayoutRect {
    /// Serialize to 16 bytes: x(f32) + y(f32) + w(f32) + h(f32)
    pub fn to_bytes(&self) -> [u8; 16] {
        let mut buf = [0u8; 16];
        buf[0..4].copy_from_slice(&(self.x as f32).to_le_bytes());
        buf[4..8].copy_from_slice(&(self.y as f32).to_le_bytes());
        buf[8..12].copy_from_slice(&(self.width as f32).to_le_bytes());
        buf[12..16].copy_from_slice(&(self.height as f32).to_le_bytes());
        buf
    }

    pub fn from_bytes(buf: &[u8; 16]) -> Self {
        Self {
            x: f32::from_le_bytes(buf[0..4].try_into().unwrap()) as f64,
            y: f32::from_le_bytes(buf[4..8].try_into().unwrap()) as f64,
            width: f32::from_le_bytes(buf[8..12].try_into().unwrap()) as f64,
            height: f32::from_le_bytes(buf[12..16].try_into().unwrap()) as f64,
        }
    }
}

Execution Order

Implement in this order to keep the build passing at each step:

  1. AST + Lexer (Changes 1-2) — add types, update all match arms with todo!() or pass-through
  2. Parser (Change 3) — parse the new syntax
  3. Type checker (Change 5) — handle new types
  4. Signal graph (Change 4) — add streamable flag + manifest
  5. Runtime JS (Change 7) — add streaming functions to RUNTIME_JS
  6. Codegen (Change 6) — emit stream init and signal diff calls
  7. CLI (Change 8) — add stream command
  8. Layout (Change 9) — add serialization

After each step, run cargo test --workspace and fix any compile errors.

Verification

After all changes:

# Build + test
cargo test --workspace

# Create a test file
cat > /tmp/test_stream.ds << 'EOF'
stream main on "ws://localhost:9100" { mode: signal }

let count = 0
let doubled = count * 2

view main = column [
  text "Count: {count}"
  text "Doubled: {doubled}"
  button "+" { click: count += 1 }
  button "-" { click: count -= 1 }
]
EOF

# Start relay
cargo run -p ds-stream &

# Compile and serve
dreamstack dev /tmp/test_stream.ds

# Open in browser — the compiled page should:
# 1. Connect to ws://localhost:9100 automatically
# 2. Send signal diffs when you click + or -
# 3. A receiver on another tab should see count update in real time

⚠️ Ownership Boundaries — DO NOT TOUCH

Another agent is actively working on the engine/ds-stream/ crate and the HTML demos. Do not modify:

Path Owner Reason
engine/ds-stream/src/* bitstream agent Active development on protocol, relay, codec
engine/ds-stream/Cargo.toml bitstream agent Dependency management
engine/ds-stream/README.md bitstream agent Documentation
examples/stream-source.html bitstream agent Working demo, will be replaced by compiled output
examples/stream-receiver.html bitstream agent Working demo, will be replaced by compiled output

Your scope is the compiler pipeline only:

  • compiler/ds-parser/src/ — lexer, parser, AST
  • compiler/ds-analyzer/src/ — signal graph
  • compiler/ds-types/src/ — type checker
  • compiler/ds-codegen/src/ — JS emitter + embedded runtime
  • compiler/ds-cli/src/ — CLI commands
  • compiler/ds-layout/src/ — layout solver

🔴 Exact Breakage Map

When you change Expr::StreamFrom(String) to Expr::StreamFrom { source, mode }, these 3 exact lines will fail to compile:

File Line Current Code Fix
compiler/ds-parser/src/parser.rs 616 Ok(Expr::StreamFrom(full_source)) Ok(Expr::StreamFrom { source: full_source, mode: None })
compiler/ds-types/src/checker.rs 355 Expr::StreamFrom(_source) => { Expr::StreamFrom { source, .. } => {
compiler/ds-analyzer/src/signal_graph.rs No direct match but collect_deps falls through to _ => {} Add Expr::StreamFrom { .. } => {} for clarity

When you add Declaration::Stream(StreamDecl) to the enum, nothing breaks because all existing code uses if let Declaration::X(...) patterns, not exhaustive match blocks. The new variant is silently ignored.

However, you should explicitly add handling in these locations:

File Method Line What to add
ds-codegen/src/js_emitter.rs emit_program 54-228 Process Declaration::Stream to emit DS._initStream()
ds-analyzer/src/signal_graph.rs from_program 97-179 Detect Declaration::Stream and set streamable flags
ds-types/src/checker.rs top-level loop 59-110 Validate StreamDecl fields

📋 Current State of Every File You'll Touch

compiler/ds-parser/src/ast.rs (273 lines)

  • Declaration enum: 7 variants (line 11-27)
  • Expr enum: 24 variants (line 107-166)
  • StreamFrom(String) at line 143 — the one to change
  • ContainerKind enum includes Scene (added for physics)
  • Uses Span for source locations throughout

compiler/ds-parser/src/lexer.rs (453 lines)

  • TokenKind enum: ~55 variants (line 10-92)
  • Keywords are matched in lex_ident_or_keyword (line 284-329)
  • Stream keyword already exists at line 306
  • Keywords are case-sensitive, lowercase only
  • 6 existing tests (line 397-451)

compiler/ds-parser/src/parser.rs (1089 lines)

  • parse_declaration is the entry point (line 91-105)
  • Current stream from parsing is in parse_primary (around line 610-616)
  • GOTCHA: The parser uses TokenKind::Ident(s) for string comparison — the s field is a String, so matching requires .into() or s == "from" style
  • ParseError struct is at ds-parser/src/lib.rs — it has message, line, column fields

compiler/ds-analyzer/src/signal_graph.rs (471 lines)

  • SignalNode struct at line 20 — has name, kind, initial, deps fields
  • from_program (line 97-179) — walks declarations, creates signal nodes
  • collect_deps (line 235-305) — exhaustive Expr match with _ => {} fallthrough
  • collect_bindings (line 338-409) — similar pattern
  • extract_mutations (line 307-329) — only handles Expr::Assign and Expr::Block
  • 4 existing tests (line 426-469)

compiler/ds-types/src/checker.rs (590 lines)

  • infer_type method has the big Expr match (line 180-430)
  • StreamFrom at line 355 returns Type::Unknown — change to Type::Named
  • Type enum is in types.rs — includes Int, Float, String, Bool, List, Record, View, Named(String), Unknown
  • GOTCHA: No Type::Stream variant — use Type::Named("StreamState".into()) instead of adding a new variant

compiler/ds-codegen/src/js_emitter.rs (1561 lines)

  • emit_program (line 54-228) — the main driver, processes declarations in order
  • emit_view_expr (line 239-546) — big match on Expr for view context
  • emit_expr (line 565-653) — general expression to JS string conversion
  • emit_event_handler_expr (line 655-679) — event handlers, this is where _streamDiff hooks go
  • emit_scene (line 733-932) — emits physics scene code
  • RUNTIME_JS const (line 1125-1559) — the full runtime, ~435 lines of JS in a Rust raw string
  • CSS_RESET const (line 1034-1122)
  • GOTCHA: The runtime is a raw string r#"..."#. All JS inside must use function() not arrow functions with { in template literals, because # in JS comments could break the raw string. Be careful with escaping.
  • GOTCHA: emit_program iterates program.declarations multiple times with if let filters — it's NOT a single match block. Add a new iteration pass for Declaration::Stream.

compiler/ds-cli/src/main.rs (420 lines)

  • Uses clap::Parser derive macro
  • Commands enum at line 21 — has Build, Dev, Check
  • compile() function at line 57 — runs the full pipeline: lex → parse → analyze → type check → codegen
  • cmd_dev (line 158-326) — HTTP server with HMR, this is what cmd_stream should reuse
  • GOTCHA: ds-cli depends on all compiler crates but NOT on ds-stream. If you want cmd_stream to auto-start the relay, you need to either add ds-stream as a dependency or spawn it as a subprocess.

compiler/ds-layout/src/solver.rs (453 lines)

  • LayoutRect struct at line 143 — has x, y, width, height (all f64)
  • The solver is not used at runtime yet — it's available but the codegen currently uses CSS flexbox
  • Adding to_bytes/from_bytes is safe and self-contained

🧪 Existing Tests

Run all tests before and after each change:

cargo test --workspace 2>&1 | grep -E '(test result|FAILED)'

Current test counts:

  • ds-parser: 5 tests (lexer) + parser tests
  • ds-analyzer: 4 tests (signal graph)
  • ds-types: ~8 tests (type checker)
  • ds-codegen: 0 tests (output is verified manually via HTML)
  • ds-stream: 38 tests (protocol + codec + relay + input events)

Tests to add:

// In ds-parser tests:
#[test]
fn test_stream_decl() {
    let src = r#"stream main on "ws://localhost:9100" { mode: signal }"#;
    let tokens = Lexer::new(src).tokenize();
    let program = Parser::new(tokens).parse_program().unwrap();
    assert!(matches!(&program.declarations[0], Declaration::Stream(_)));
}

// In ds-analyzer tests:
#[test]
fn test_streamable_signals() {
    let src = r#"
        stream main on "ws://localhost:9100"
        let count = 0
        view main = column [ text "Count: {count}" ]
    "#;
    let (graph, _) = analyze(src);
    assert!(graph.nodes.iter().any(|n| n.name == "count" && n.streamable));
}

📡 Protocol Reference (for JS runtime code)

The JS runtime needs to encode/decode the exact same binary protocol as engine/ds-stream/src/protocol.rs. Here are the constants:

Header Layout (16 bytes, little-endian)

Offset Size Field Description
0 u8 frame_type See frame types below
1 u8 flags 0x01=input, 0x02=keyframe, 0x04=compressed
2 u16 seq Sequence number (wraps at 65535)
4 u32 timestamp Milliseconds since stream start
8 u16 width Frame width (0 for non-pixel frames)
10 u16 height Frame height (0 for non-pixel frames)
12 u32 payload_len Byte length of payload after header

Frame Type Constants (use in JS)

const FRAME_PIXELS       = 0x01;  // Raw RGBA framebuffer
const FRAME_COMPRESSED   = 0x02;  // PNG/WebP compressed frame
const FRAME_DELTA        = 0x03;  // XOR delta + RLE compressed
const FRAME_AUDIO_PCM    = 0x10;  // Float32 PCM samples
const FRAME_AUDIO_COMP   = 0x11;  // Opus compressed audio
const FRAME_HAPTIC       = 0x20;  // Vibration command
const FRAME_SIGNAL_SYNC  = 0x30;  // Full signal state (JSON)
const FRAME_SIGNAL_DIFF  = 0x31;  // Changed signals only (JSON)
const FRAME_NEURAL       = 0x40;  // Neural-generated pixels
const FRAME_PING         = 0xFE;  // Keep-alive
const FRAME_END          = 0xFF;  // Stream termination

Input Type Constants (receiver → source)

const INPUT_POINTER      = 0x01;  // x(u16) y(u16) buttons(u8)
const INPUT_PTR_DOWN     = 0x02;  //   same
const INPUT_PTR_UP       = 0x03;  //   same
const INPUT_KEY_DOWN     = 0x10;  // keycode(u16) modifiers(u8)
const INPUT_KEY_UP       = 0x11;  //   same
const INPUT_SCROLL       = 0x50;  // dx(i16) dy(i16)
const INPUT_RESIZE       = 0x60;  // width(u16) height(u16)
const FLAG_INPUT         = 0x01;  // Set in flags byte for input messages

Relay WebSocket Paths

ws://localhost:9100/source  — first connection becomes the source
ws://localhost:9100/stream  — subsequent connections are receivers

The relay uses tokio-tungstenite and routes:

  • Source frames → broadcast to all receivers
  • Receiver inputs → mpsc to source

🚨 Common Gotchas

  1. Raw string escaping: The RUNTIME_JS const uses r#"..."#. If your JS contains "# (like a comment // foo#bar), the Rust raw string will terminate early. Use r##"..."## if needed.

  2. Lexer keyword ordering: Keywords in lex_ident_or_keyword are in a match block. The order doesn't matter for correctness but alphabetical keeps it clean.

  3. emit_program is multi-pass: The codegen iterates program.declarations 6 separate times with if let filters, not once with a match. Your Declaration::Stream processing should be its own pass, placed BEFORE the signal initialization pass (so streaming is initialized before signals fire).

  4. Signal .value wrapping: The codegen assumes every signal identifier gets .value appended (line 588: Expr::Ident(name) => format!("{name}.value")). For StreamFrom, the returned Signal already has .value, so accessing remote.count should emit remote.value.count.

  5. Runtime emit() for remote input: The runtime already has onEvent and emit functions. Use emit('remote_pointer', data) to dispatch input events — then user .ds code can handle them with on remote_pointer -> ....

  6. No Serialize/Deserialize on AST: The AST types don't derive serde traits. SignalManifest should derive Serialize if you want to embed it as JSON in the compiled output.

  7. ds-cli doesn't depend on ds-stream: The CLI can't import relay code directly. To auto-start the relay, use std::process::Command::new("cargo").args(["run", "-p", "ds-stream"]).


🚀 Next Steps

All spec changes are implemented. The pipeline is wired end-to-end. Here's the roadmap for what comes next:

Phase A: Prove It Works (High Priority)

1. End-to-End Integration Test

Write a .ds file with stream main on "ws://...", run the relay + dreamstack dev, open two browser tabs, and verify signal changes propagate in real time. This is the first time compiler-generated streaming code talks to the Rust relay.

# Terminal 1: start relay
cargo run -p ds-stream

# Terminal 2: compile and serve
dreamstack stream examples/counter.ds

# Open http://localhost:3000 — click buttons, verify signals stream to relay
# Open stream-receiver.html — verify it receives updates

2. Receiver .ds Syntax Demo

Create a receiver.ds that uses stream from "ws://localhost:9100" to consume a remote stream and render it locally — closing the compiler-to-compiler loop.

let remote = stream from "ws://localhost:9100"

view main = column [
  text "Remote count: {remote.count}"
  text "Remote doubled: {remote.doubled}"
]

Phase B: Infrastructure

3. WASM Codec

engine/ds-stream-wasm/ — 18KB WASM binary with wasm-bindgen. Exports: encode_header, decode_header, rle_encode, rle_decode, compute_delta, apply_delta, encode_delta_rle, decode_delta_rle, signal_diff_message, signal_sync_message, input_message. 7 tests.

4. Multi-Source Routing

relay.rs refactored with ChannelState per channel, HashMap<String, Arc<RwLock<ChannelState>>>. Paths: /source/{name}, /stream/{name}. Backward compatible: /source and /stream use "default" channel. 7 new routing tests.

5. WebRTC Transport

Add WebRTC as a transport option alongside WebSocket for sub-frame latency. WebSocket adds ~1-2 frames of buffering; WebRTC data channels can eliminate this.

Phase C: Polish

6. Language Documentation

Added streaming section to DREAMSTACK.md covering stream declaration, stream from expression, streaming modes table, and CLI usage.

7. Example .ds Files

Created compiler-native streaming examples:

  • examples/streaming-counter.ds — signal streaming with counter (23KB compiled)
  • examples/streaming-receiver.ds — receiver with stream from
  • examples/streaming-physics.ds — physics scene streaming with named channel (27KB compiled)