dreamstack/STREAM_COMPOSITION.md
enzotar 598ecde59c feat: comprehensive streaming improvements
Runtime _connectStream improvements:
- Connection status as reactive signals: _connected, _latency, _frames, _reconnects
  injected on stream proxy so UIs can show connection health
- Fixed RLE decoder: 2-byte LE count (was 1-byte, mismatched relay encoder)
- Schema caching: 0x32 SchemaAnnounce frames now parsed and cached
- RTT tracking: receivers send periodic pings (5s), measure round-trip latency
- Better reconnect logging: includes URL and attempt count

Relay tests (57 total):
- catchup_merges_multiple_diffs: sync + 3 diffs → 1 merged frame
- catchup_diffs_only_no_sync: diffs without sync → merged frame
- catchup_preserves_version_counters: conflict resolution versions kept

New example:
- timer-multi-action.ds: every timer + multi-action buttons verified

Documentation:
- STREAM_COMPOSITION.md: 4 new sections (Diff Batching, Connection Status, RTT, Relay Merging)
- Updated example table with streaming-dashboard.ds and timer-multi-action.ds

All 9 examples pass regression (44-70KB each)
2026-02-26 18:09:14 -08:00

256 lines
8.5 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# DreamStack Stream Composition
Compose independent apps through live signal streams. Each app is a node in a reactive dataflow graph — it can source signals, receive them, derive new ones, and re-stream the results.
## Core Concepts
**Source** — an app that publishes signals to the relay:
```
stream counter on "ws://localhost:9100/peer/counter" {
mode: signal,
output: count, doubled
}
```
**Receiver** — an app that subscribes to a stream:
```
let counter = stream from "ws://localhost:9100/stream/counter"
text "Count: {counter.count}"
```
**Composition** — an app that is both receiver AND source:
```
let counter = stream from ".../stream/counter"
let events = counter.count + stats.total -- derived from upstream
stream metrics on ".../peer/metrics" { output: events }
```
## URL Convention
| Role | URL Pattern | Description |
|------|------------|-------------|
| Source/Peer | `ws://relay/peer/{channel}` | Publishes signals to a named channel |
| Receiver | `ws://relay/stream/{channel}` | Subscribes to a named channel |
| Signaling | `ws://relay/signal/{channel}` | WebRTC signaling exchange |
Each channel is independent. A source on `/peer/counter` is received at `/stream/counter`.
## The `select` Clause
Filter which signals a receiver cares about:
```
-- Receive everything (default)
let counter = stream from "ws://.../stream/counter"
-- Receive only specific fields
let counter = stream from "ws://.../stream/counter" {
select: count, doubled
}
```
**What happens:**
1. **Client-side**`_connectStream` filters incoming state to only selected keys
2. **Relay-side** — Receiver sends a `0x33 SubscribeFilter` frame; relay strips unwanted fields from signal frames before forwarding (saving bandwidth)
## `stream` Declaration
```
stream <name> on "<relay-url>" {
mode: signal, -- signal | pixel
output: field1, field2 -- which signals to publish (private signals excluded)
}
```
| Key | Values | Description |
|-----|--------|-------------|
| `mode` | `signal` (default) | Signal mode streams JSON state; pixel mode streams RGBA frames |
| `output` | comma-separated names | Only listed signals are sent; unlisted ones stay local |
**Private signals** — any signal not in `output` is never transmitted:
```
let count = 0 -- public (in output)
let doubled = count * 2 -- public (in output)
let message = "hello" -- private (not in output)
stream counter on "..." { output: count, doubled }
```
## `stream from` Expression
```
let <name> = stream from "<receiver-url>"
let <name> = stream from "<receiver-url>" { select: field1, field2 }
```
Returns a reactive signal proxy. Access fields with dot notation:
```
text "Count: {counter.count}"
let events = counter.count + stats.total
```
The proxy auto-updates when new frames arrive from the relay. All dependent derived signals and UI elements update reactively.
## Chained Composition
Apps can be stacked in layers. Each layer receives, derives, and re-streams:
```
Layer 0 (Sources):
Counter ──┐
Clock ───┤
Stats ───┘
└──► Layer 1 (Aggregator):
compose-metrics
derives: uptime, events, status
re-streams on /peer/metrics
└──► Layer 2 (Dashboard):
compose-master
receives: metrics + mood
```
**Layer 1**`compose-metrics.ds`:
```
let counter = stream from ".../stream/counter"
let clock = stream from ".../stream/clock"
let stats = stream from ".../stream/stats"
let uptime = clock.seconds
let events = counter.count + stats.total
let status = "nominal"
stream metrics on ".../peer/metrics" {
mode: signal, output: uptime, events, status
}
```
**Layer 2**`compose-master.ds`:
```
let metrics = stream from ".../stream/metrics" { select: uptime, events }
let mood = stream from ".../stream/mood" { select: mood, energy }
```
## Bitstream Protocol
All frames share a 16-byte header:
```
┌──────┬───────┬──────┬───────────┬───────┬────────┬────────┐
│ type │ flags │ seq │ timestamp │ width │ height │ length │
│ u8 │ u8 │ u16 │ u32 │ u16 │ u16 │ u32 │
└──────┴───────┴──────┴───────────┴───────┴────────┴────────┘
```
### Signal Frame Types
| Code | Name | Direction | Payload |
|------|------|-----------|---------|
| `0x30` | SignalSync | source → relay → receivers | Full JSON state `{ count: 3, doubled: 6 }` |
| `0x31` | SignalDiff | source → relay → receivers | Changed fields only `{ count: 4 }` |
| `0x32` | SchemaAnnounce | source → relay | `{ signals: ["count","doubled"], mode: "signal" }` |
| `0x33` | SubscribeFilter | receiver → relay | `{ select: ["count"] }` |
### Pixel Frame Types
| Code | Name | Payload |
|------|------|---------|
| `0x01` | Pixels | Raw RGBA (width × height × 4 bytes) |
| `0x03` | DeltaPixels | XOR delta, RLE-compressed |
| `0xF0` | Keyframe | Reset receiver state |
### Flags
| Bit | Name | Meaning |
|-----|------|---------|
| 0 | `FLAG_INPUT` | Frame is input (receiver → source) |
| 1 | `FLAG_KEYFRAME` | Full state, no delta |
| 2 | `FLAG_COMPRESSED` | Payload is compressed |
## Relay
```bash
cargo run -p ds-stream
```
The relay is a production-grade WebSocket router:
- Routes frames from sources/peers to receivers on the same channel
- **Diff merging**: Consolidates accumulated signal diffs into ONE catchup frame for late joiners
- Caches schema announcements (0x32) per channel
- Applies per-receiver signal filters (0x33) before forwarding
- Auto-creates channels on first connection
- Cleans up idle channels after grace period
- Source reconnect grace: cache preserved during reconnection window
## Runtime Features
### Diff Batching
Multiple signal changes in a single event loop tick are coalesced into one WebSocket frame:
```
-- Click fires 3 signal changes, but sends only 1 WS frame:
button "Happy" { click: mood = "happy"; color = "green"; energy += 10 }
```
### Connection Status
The `stream from` proxy injects reactive connection metadata:
```
let counter = stream from "ws://localhost:9100/stream/counter"
-- Available fields:
-- counter._connected boolean (true when WS is open)
-- counter._latency RTT in milliseconds (ping/pong every 5s)
-- counter._frames total frames received
-- counter._reconnects reconnect attempt count
```
### RTT Tracking
Both sources and receivers measure round-trip latency via periodic ping/pong (every 5s).
The `_latency` field updates reactively on each measurement.
### Relay Diff Merging
Late-joining receivers receive a single consolidated sync frame instead of replaying
hundreds of individual diffs:
```
-- Before: receiver gets sync + 500 individual diffs
-- After: receiver gets 1 merged sync frame with all changes applied
```
## Running the Demo
```bash
# Terminal 1: Start relay
cargo run -p ds-stream
# Terminal 2-5: Start source apps
dreamstack stream examples/streaming-counter.ds --port 3000
dreamstack stream examples/streaming-clock.ds --port 3002
dreamstack stream examples/streaming-stats.ds --port 3003
dreamstack stream examples/streaming-mood.ds --port 3004
# Terminal 6: Start aggregator (Layer 1)
dreamstack stream examples/compose-metrics.ds --port 3006
# Terminal 7: Build and serve dashboard
dreamstack build examples/streaming-dashboard.ds -o /tmp/build-dash
python3 -m http.server 3007 -d /tmp/build-dash
```
Open http://localhost:3007 to see all streams composited into one dashboard.
## Examples
| File | Role | Channel | Signals |
|------|------|---------|---------|
| `streaming-counter.ds` | Source | `/peer/counter` | count, doubled |
| `streaming-clock.ds` | Source | `/peer/clock` | hours, minutes, seconds |
| `streaming-stats.ds` | Source | `/peer/stats` | total, average, max |
| `streaming-mood.ds` | Source | `/peer/mood` | mood, energy, color |
| `streaming-dashboard.ds` | Receiver | 3 streams | Components + live data |
| `compose-dashboard.ds` | Receiver | 4 streams | Flat dashboard |
| `compose-metrics.ds` | Receiver+Source | 3 in, 1 out | uptime, events, status |
| `compose-master.ds` | Receiver | 2 streams | Chained dashboard |
| `timer-multi-action.ds` | Timer | standalone | ticks, status, elapsed |