# DreamStack Stream Composition Compose independent apps through live signal streams. Each app is a node in a reactive dataflow graph — it can source signals, receive them, derive new ones, and re-stream the results. ## Core Concepts **Source** — an app that publishes signals to the relay: ``` stream counter on "ws://localhost:9100/peer/counter" { mode: signal, output: count, doubled } ``` **Receiver** — an app that subscribes to a stream: ``` let counter = stream from "ws://localhost:9100/stream/counter" text "Count: {counter.count}" ``` **Composition** — an app that is both receiver AND source: ``` let counter = stream from ".../stream/counter" let events = counter.count + stats.total -- derived from upstream stream metrics on ".../peer/metrics" { output: events } ``` ## URL Convention | Role | URL Pattern | Description | |------|------------|-------------| | Source/Peer | `ws://relay/peer/{channel}` | Publishes signals to a named channel | | Receiver | `ws://relay/stream/{channel}` | Subscribes to a named channel | | Signaling | `ws://relay/signal/{channel}` | WebRTC signaling exchange | Each channel is independent. A source on `/peer/counter` is received at `/stream/counter`. ## The `select` Clause Filter which signals a receiver cares about: ``` -- Receive everything (default) let counter = stream from "ws://.../stream/counter" -- Receive only specific fields let counter = stream from "ws://.../stream/counter" { select: count, doubled } ``` **What happens:** 1. **Client-side** — `_connectStream` filters incoming state to only selected keys 2. **Relay-side** — Receiver sends a `0x33 SubscribeFilter` frame; relay strips unwanted fields from signal frames before forwarding (saving bandwidth) ## `stream` Declaration ``` stream on "" { mode: signal, -- signal | pixel output: field1, field2 -- which signals to publish (private signals excluded) } ``` | Key | Values | Description | |-----|--------|-------------| | `mode` | `signal` (default) | Signal mode streams JSON state; pixel mode streams RGBA frames | | `output` | comma-separated names | Only listed signals are sent; unlisted ones stay local | **Private signals** — any signal not in `output` is never transmitted: ``` let count = 0 -- public (in output) let doubled = count * 2 -- public (in output) let message = "hello" -- private (not in output) stream counter on "..." { output: count, doubled } ``` ## `stream from` Expression ``` let = stream from "" let = stream from "" { select: field1, field2 } ``` Returns a reactive signal proxy. Access fields with dot notation: ``` text "Count: {counter.count}" let events = counter.count + stats.total ``` The proxy auto-updates when new frames arrive from the relay. All dependent derived signals and UI elements update reactively. ## Chained Composition Apps can be stacked in layers. Each layer receives, derives, and re-streams: ``` Layer 0 (Sources): Counter ──┐ Clock ───┤ Stats ───┘ └──► Layer 1 (Aggregator): compose-metrics derives: uptime, events, status re-streams on /peer/metrics └──► Layer 2 (Dashboard): compose-master receives: metrics + mood ``` **Layer 1** — `compose-metrics.ds`: ``` let counter = stream from ".../stream/counter" let clock = stream from ".../stream/clock" let stats = stream from ".../stream/stats" let uptime = clock.seconds let events = counter.count + stats.total let status = "nominal" stream metrics on ".../peer/metrics" { mode: signal, output: uptime, events, status } ``` **Layer 2** — `compose-master.ds`: ``` let metrics = stream from ".../stream/metrics" { select: uptime, events } let mood = stream from ".../stream/mood" { select: mood, energy } ``` ## Bitstream Protocol All frames share a 16-byte header: ``` ┌──────┬───────┬──────┬───────────┬───────┬────────┬────────┐ │ type │ flags │ seq │ timestamp │ width │ height │ length │ │ u8 │ u8 │ u16 │ u32 │ u16 │ u16 │ u32 │ └──────┴───────┴──────┴───────────┴───────┴────────┴────────┘ ``` ### Signal Frame Types | Code | Name | Direction | Payload | |------|------|-----------|---------| | `0x30` | SignalSync | source → relay → receivers | Full JSON state `{ count: 3, doubled: 6 }` | | `0x31` | SignalDiff | source → relay → receivers | Changed fields only `{ count: 4 }` | | `0x32` | SchemaAnnounce | source → relay | `{ signals: ["count","doubled"], mode: "signal" }` | | `0x33` | SubscribeFilter | receiver → relay | `{ select: ["count"] }` | ### Pixel Frame Types | Code | Name | Payload | |------|------|---------| | `0x01` | Pixels | Raw RGBA (width × height × 4 bytes) | | `0x03` | DeltaPixels | XOR delta, RLE-compressed | | `0xF0` | Keyframe | Reset receiver state | ### Flags | Bit | Name | Meaning | |-----|------|---------| | 0 | `FLAG_INPUT` | Frame is input (receiver → source) | | 1 | `FLAG_KEYFRAME` | Full state, no delta | | 2 | `FLAG_COMPRESSED` | Payload is compressed | ## Relay ```bash cargo run -p ds-stream ``` The relay is a production-grade WebSocket router: - Routes frames from sources/peers to receivers on the same channel - **Diff merging**: Consolidates accumulated signal diffs into ONE catchup frame for late joiners - Caches schema announcements (0x32) per channel - Applies per-receiver signal filters (0x33) before forwarding - Auto-creates channels on first connection - Cleans up idle channels after grace period - Source reconnect grace: cache preserved during reconnection window ## Runtime Features ### Diff Batching Multiple signal changes in a single event loop tick are coalesced into one WebSocket frame: ``` -- Click fires 3 signal changes, but sends only 1 WS frame: button "Happy" { click: mood = "happy"; color = "green"; energy += 10 } ``` ### Connection Status The `stream from` proxy injects reactive connection metadata: ``` let counter = stream from "ws://localhost:9100/stream/counter" -- Available fields: -- counter._connected boolean (true when WS is open) -- counter._latency RTT in milliseconds (ping/pong every 5s) -- counter._frames total frames received -- counter._reconnects reconnect attempt count ``` ### RTT Tracking Both sources and receivers measure round-trip latency via periodic ping/pong (every 5s). The `_latency` field updates reactively on each measurement. ### Relay Diff Merging Late-joining receivers receive a single consolidated sync frame instead of replaying hundreds of individual diffs: ``` -- Before: receiver gets sync + 500 individual diffs -- After: receiver gets 1 merged sync frame with all changes applied ``` ## Running the Demo ```bash # Terminal 1: Start relay cargo run -p ds-stream # Terminal 2-5: Start source apps dreamstack stream examples/streaming-counter.ds --port 3000 dreamstack stream examples/streaming-clock.ds --port 3002 dreamstack stream examples/streaming-stats.ds --port 3003 dreamstack stream examples/streaming-mood.ds --port 3004 # Terminal 6: Start aggregator (Layer 1) dreamstack stream examples/compose-metrics.ds --port 3006 # Terminal 7: Build and serve dashboard dreamstack build examples/streaming-dashboard.ds -o /tmp/build-dash python3 -m http.server 3007 -d /tmp/build-dash ``` Open http://localhost:3007 to see all streams composited into one dashboard. ## Examples | File | Role | Channel | Signals | |------|------|---------|---------| | `streaming-counter.ds` | Source | `/peer/counter` | count, doubled | | `streaming-clock.ds` | Source | `/peer/clock` | hours, minutes, seconds | | `streaming-stats.ds` | Source | `/peer/stats` | total, average, max | | `streaming-mood.ds` | Source | `/peer/mood` | mood, energy, color | | `streaming-dashboard.ds` | Receiver | 3 streams | Components + live data | | `compose-dashboard.ds` | Receiver | 4 streams | Flat dashboard | | `compose-metrics.ds` | Receiver+Source | 3 in, 1 out | uptime, events, status | | `compose-master.ds` | Receiver | 2 streams | Chained dashboard | | `timer-multi-action.ds` | Timer | standalone | ticks, status, elapsed |