dreamstack/STREAM_COMPOSITION.md

257 lines
8.5 KiB
Markdown
Raw Permalink Normal View History

# DreamStack Stream Composition
Compose independent apps through live signal streams. Each app is a node in a reactive dataflow graph — it can source signals, receive them, derive new ones, and re-stream the results.
## Core Concepts
**Source** — an app that publishes signals to the relay:
```
stream counter on "ws://localhost:9100/peer/counter" {
mode: signal,
output: count, doubled
}
```
**Receiver** — an app that subscribes to a stream:
```
let counter = stream from "ws://localhost:9100/stream/counter"
text "Count: {counter.count}"
```
**Composition** — an app that is both receiver AND source:
```
let counter = stream from ".../stream/counter"
let events = counter.count + stats.total -- derived from upstream
stream metrics on ".../peer/metrics" { output: events }
```
## URL Convention
| Role | URL Pattern | Description |
|------|------------|-------------|
| Source/Peer | `ws://relay/peer/{channel}` | Publishes signals to a named channel |
| Receiver | `ws://relay/stream/{channel}` | Subscribes to a named channel |
| Signaling | `ws://relay/signal/{channel}` | WebRTC signaling exchange |
Each channel is independent. A source on `/peer/counter` is received at `/stream/counter`.
## The `select` Clause
Filter which signals a receiver cares about:
```
-- Receive everything (default)
let counter = stream from "ws://.../stream/counter"
-- Receive only specific fields
let counter = stream from "ws://.../stream/counter" {
select: count, doubled
}
```
**What happens:**
1. **Client-side**`_connectStream` filters incoming state to only selected keys
2. **Relay-side** — Receiver sends a `0x33 SubscribeFilter` frame; relay strips unwanted fields from signal frames before forwarding (saving bandwidth)
## `stream` Declaration
```
stream <name> on "<relay-url>" {
mode: signal, -- signal | pixel
output: field1, field2 -- which signals to publish (private signals excluded)
}
```
| Key | Values | Description |
|-----|--------|-------------|
| `mode` | `signal` (default) | Signal mode streams JSON state; pixel mode streams RGBA frames |
| `output` | comma-separated names | Only listed signals are sent; unlisted ones stay local |
**Private signals** — any signal not in `output` is never transmitted:
```
let count = 0 -- public (in output)
let doubled = count * 2 -- public (in output)
let message = "hello" -- private (not in output)
stream counter on "..." { output: count, doubled }
```
## `stream from` Expression
```
let <name> = stream from "<receiver-url>"
let <name> = stream from "<receiver-url>" { select: field1, field2 }
```
Returns a reactive signal proxy. Access fields with dot notation:
```
text "Count: {counter.count}"
let events = counter.count + stats.total
```
The proxy auto-updates when new frames arrive from the relay. All dependent derived signals and UI elements update reactively.
## Chained Composition
Apps can be stacked in layers. Each layer receives, derives, and re-streams:
```
Layer 0 (Sources):
Counter ──┐
Clock ───┤
Stats ───┘
└──► Layer 1 (Aggregator):
compose-metrics
derives: uptime, events, status
re-streams on /peer/metrics
└──► Layer 2 (Dashboard):
compose-master
receives: metrics + mood
```
**Layer 1** — `compose-metrics.ds`:
```
let counter = stream from ".../stream/counter"
let clock = stream from ".../stream/clock"
let stats = stream from ".../stream/stats"
let uptime = clock.seconds
let events = counter.count + stats.total
let status = "nominal"
stream metrics on ".../peer/metrics" {
mode: signal, output: uptime, events, status
}
```
**Layer 2** — `compose-master.ds`:
```
let metrics = stream from ".../stream/metrics" { select: uptime, events }
let mood = stream from ".../stream/mood" { select: mood, energy }
```
## Bitstream Protocol
All frames share a 16-byte header:
```
┌──────┬───────┬──────┬───────────┬───────┬────────┬────────┐
│ type │ flags │ seq │ timestamp │ width │ height │ length │
│ u8 │ u8 │ u16 │ u32 │ u16 │ u16 │ u32 │
└──────┴───────┴──────┴───────────┴───────┴────────┴────────┘
```
### Signal Frame Types
| Code | Name | Direction | Payload |
|------|------|-----------|---------|
| `0x30` | SignalSync | source → relay → receivers | Full JSON state `{ count: 3, doubled: 6 }` |
| `0x31` | SignalDiff | source → relay → receivers | Changed fields only `{ count: 4 }` |
| `0x32` | SchemaAnnounce | source → relay | `{ signals: ["count","doubled"], mode: "signal" }` |
| `0x33` | SubscribeFilter | receiver → relay | `{ select: ["count"] }` |
### Pixel Frame Types
| Code | Name | Payload |
|------|------|---------|
| `0x01` | Pixels | Raw RGBA (width × height × 4 bytes) |
| `0x03` | DeltaPixels | XOR delta, RLE-compressed |
| `0xF0` | Keyframe | Reset receiver state |
### Flags
| Bit | Name | Meaning |
|-----|------|---------|
| 0 | `FLAG_INPUT` | Frame is input (receiver → source) |
| 1 | `FLAG_KEYFRAME` | Full state, no delta |
| 2 | `FLAG_COMPRESSED` | Payload is compressed |
## Relay
```bash
cargo run -p ds-stream
```
The relay is a production-grade WebSocket router:
- Routes frames from sources/peers to receivers on the same channel
- **Diff merging**: Consolidates accumulated signal diffs into ONE catchup frame for late joiners
- Caches schema announcements (0x32) per channel
- Applies per-receiver signal filters (0x33) before forwarding
- Auto-creates channels on first connection
- Cleans up idle channels after grace period
- Source reconnect grace: cache preserved during reconnection window
## Runtime Features
### Diff Batching
Multiple signal changes in a single event loop tick are coalesced into one WebSocket frame:
```
-- Click fires 3 signal changes, but sends only 1 WS frame:
button "Happy" { click: mood = "happy"; color = "green"; energy += 10 }
```
### Connection Status
The `stream from` proxy injects reactive connection metadata:
```
let counter = stream from "ws://localhost:9100/stream/counter"
-- Available fields:
-- counter._connected boolean (true when WS is open)
-- counter._latency RTT in milliseconds (ping/pong every 5s)
-- counter._frames total frames received
-- counter._reconnects reconnect attempt count
```
### RTT Tracking
Both sources and receivers measure round-trip latency via periodic ping/pong (every 5s).
The `_latency` field updates reactively on each measurement.
### Relay Diff Merging
Late-joining receivers receive a single consolidated sync frame instead of replaying
hundreds of individual diffs:
```
-- Before: receiver gets sync + 500 individual diffs
-- After: receiver gets 1 merged sync frame with all changes applied
```
## Running the Demo
```bash
# Terminal 1: Start relay
cargo run -p ds-stream
# Terminal 2-5: Start source apps
dreamstack stream examples/streaming-counter.ds --port 3000
dreamstack stream examples/streaming-clock.ds --port 3002
dreamstack stream examples/streaming-stats.ds --port 3003
dreamstack stream examples/streaming-mood.ds --port 3004
# Terminal 6: Start aggregator (Layer 1)
dreamstack stream examples/compose-metrics.ds --port 3006
# Terminal 7: Build and serve dashboard
dreamstack build examples/streaming-dashboard.ds -o /tmp/build-dash
python3 -m http.server 3007 -d /tmp/build-dash
```
Open http://localhost:3007 to see all streams composited into one dashboard.
## Examples
| File | Role | Channel | Signals |
|------|------|---------|---------|
| `streaming-counter.ds` | Source | `/peer/counter` | count, doubled |
| `streaming-clock.ds` | Source | `/peer/clock` | hours, minutes, seconds |
| `streaming-stats.ds` | Source | `/peer/stats` | total, average, max |
| `streaming-mood.ds` | Source | `/peer/mood` | mood, energy, color |
| `streaming-dashboard.ds` | Receiver | 3 streams | Components + live data |
| `compose-dashboard.ds` | Receiver | 4 streams | Flat dashboard |
| `compose-metrics.ds` | Receiver+Source | 3 in, 1 out | uptime, events, status |
| `compose-master.ds` | Receiver | 2 streams | Chained dashboard |
| `timer-multi-action.ds` | Timer | standalone | ticks, status, elapsed |