diff --git a/STREAM_COMPOSITION.md b/STREAM_COMPOSITION.md new file mode 100644 index 0000000..05a37f8 --- /dev/null +++ b/STREAM_COMPOSITION.md @@ -0,0 +1,215 @@ +# DreamStack Stream Composition + +Compose independent apps through live signal streams. Each app is a node in a reactive dataflow graph — it can source signals, receive them, derive new ones, and re-stream the results. + +## Core Concepts + +**Source** — an app that publishes signals to the relay: +``` +stream counter on "ws://localhost:9100/peer/counter" { + mode: signal, + output: count, doubled +} +``` + +**Receiver** — an app that subscribes to a stream: +``` +let counter = stream from "ws://localhost:9100/stream/counter" +text "Count: {counter.count}" +``` + +**Composition** — an app that is both receiver AND source: +``` +let counter = stream from ".../stream/counter" +let events = counter.count + stats.total -- derived from upstream +stream metrics on ".../peer/metrics" { output: events } +``` + +## URL Convention + +| Role | URL Pattern | Description | +|------|------------|-------------| +| Source/Peer | `ws://relay/peer/{channel}` | Publishes signals to a named channel | +| Receiver | `ws://relay/stream/{channel}` | Subscribes to a named channel | +| Signaling | `ws://relay/signal/{channel}` | WebRTC signaling exchange | + +Each channel is independent. A source on `/peer/counter` is received at `/stream/counter`. + +## The `select` Clause + +Filter which signals a receiver cares about: + +``` +-- Receive everything (default) +let counter = stream from "ws://.../stream/counter" + +-- Receive only specific fields +let counter = stream from "ws://.../stream/counter" { + select: count, doubled +} +``` + +**What happens:** +1. **Client-side** — `_connectStream` filters incoming state to only selected keys +2. **Relay-side** — Receiver sends a `0x33 SubscribeFilter` frame; relay strips unwanted fields from signal frames before forwarding (saving bandwidth) + +## `stream` Declaration + +``` +stream on "" { + mode: signal, -- signal | pixel + output: field1, field2 -- which signals to publish (private signals excluded) +} +``` + +| Key | Values | Description | +|-----|--------|-------------| +| `mode` | `signal` (default) | Signal mode streams JSON state; pixel mode streams RGBA frames | +| `output` | comma-separated names | Only listed signals are sent; unlisted ones stay local | + +**Private signals** — any signal not in `output` is never transmitted: +``` +let count = 0 -- public (in output) +let doubled = count * 2 -- public (in output) +let message = "hello" -- private (not in output) + +stream counter on "..." { output: count, doubled } +``` + +## `stream from` Expression + +``` +let = stream from "" +let = stream from "" { select: field1, field2 } +``` + +Returns a reactive signal proxy. Access fields with dot notation: +``` +text "Count: {counter.count}" +let events = counter.count + stats.total +``` + +The proxy auto-updates when new frames arrive from the relay. All dependent derived signals and UI elements update reactively. + +## Chained Composition + +Apps can be stacked in layers. Each layer receives, derives, and re-streams: + +``` +Layer 0 (Sources): + Counter ──┐ + Clock ───┤ + Stats ───┘ + └──► Layer 1 (Aggregator): + compose-metrics + derives: uptime, events, status + re-streams on /peer/metrics + └──► Layer 2 (Dashboard): + compose-master + receives: metrics + mood +``` + +**Layer 1** — `compose-metrics.ds`: +``` +let counter = stream from ".../stream/counter" +let clock = stream from ".../stream/clock" +let stats = stream from ".../stream/stats" + +let uptime = clock.seconds +let events = counter.count + stats.total +let status = "nominal" + +stream metrics on ".../peer/metrics" { + mode: signal, output: uptime, events, status +} +``` + +**Layer 2** — `compose-master.ds`: +``` +let metrics = stream from ".../stream/metrics" { select: uptime, events } +let mood = stream from ".../stream/mood" { select: mood, energy } +``` + +## Bitstream Protocol + +All frames share a 16-byte header: + +``` +┌──────┬───────┬──────┬───────────┬───────┬────────┬────────┐ +│ type │ flags │ seq │ timestamp │ width │ height │ length │ +│ u8 │ u8 │ u16 │ u32 │ u16 │ u16 │ u32 │ +└──────┴───────┴──────┴───────────┴───────┴────────┴────────┘ +``` + +### Signal Frame Types + +| Code | Name | Direction | Payload | +|------|------|-----------|---------| +| `0x30` | SignalSync | source → relay → receivers | Full JSON state `{ count: 3, doubled: 6 }` | +| `0x31` | SignalDiff | source → relay → receivers | Changed fields only `{ count: 4 }` | +| `0x32` | SchemaAnnounce | source → relay | `{ signals: ["count","doubled"], mode: "signal" }` | +| `0x33` | SubscribeFilter | receiver → relay | `{ select: ["count"] }` | + +### Pixel Frame Types + +| Code | Name | Payload | +|------|------|---------| +| `0x01` | Pixels | Raw RGBA (width × height × 4 bytes) | +| `0x03` | DeltaPixels | XOR delta, RLE-compressed | +| `0xF0` | Keyframe | Reset receiver state | + +### Flags + +| Bit | Name | Meaning | +|-----|------|---------| +| 0 | `FLAG_INPUT` | Frame is input (receiver → source) | +| 1 | `FLAG_KEYFRAME` | Full state, no delta | +| 2 | `FLAG_COMPRESSED` | Payload is compressed | + +## Relay + +```bash +cargo run -p ds-stream +``` + +The relay is a stateless WebSocket router: +- Routes frames from sources/peers to receivers on the same channel +- Caches latest state for late-joining receivers +- Caches schema announcements (0x32) per channel +- Applies per-receiver signal filters (0x33) before forwarding +- Auto-creates channels on first connection +- Cleans up idle channels after grace period + +## Running the Demo + +```bash +# Terminal 1: Start relay +cargo run -p ds-stream + +# Terminal 2-5: Start source apps +dreamstack stream examples/streaming-counter.ds --port 3000 +dreamstack stream examples/streaming-clock.ds --port 3002 +dreamstack stream examples/streaming-stats.ds --port 3003 +dreamstack stream examples/streaming-mood.ds --port 3004 + +# Terminal 6: Start aggregator (Layer 1) +dreamstack stream examples/compose-metrics.ds --port 3006 + +# Terminal 7: Build and serve master dashboard (Layer 2) +dreamstack build examples/compose-master.ds -o /tmp/build-master +python3 -m http.server 3007 -d /tmp/build-master +``` + +Open http://localhost:3007 to see all streams composited into one dashboard. + +## Examples + +| File | Role | Channel | Signals | +|------|------|---------|---------| +| `streaming-counter.ds` | Source | `/peer/counter` | count, doubled | +| `streaming-clock.ds` | Source | `/peer/clock` | hours, minutes, seconds | +| `streaming-stats.ds` | Source | `/peer/stats` | total, average, max | +| `streaming-mood.ds` | Source | `/peer/mood` | mood, energy, color | +| `compose-dashboard.ds` | Receiver | 4 streams | Flat dashboard | +| `compose-metrics.ds` | Receiver+Source | 3 in, 1 out | uptime, events, status | +| `compose-master.ds` | Receiver | 2 streams | Chained dashboard |