Runtime _connectStream improvements: - Connection status as reactive signals: _connected, _latency, _frames, _reconnects injected on stream proxy so UIs can show connection health - Fixed RLE decoder: 2-byte LE count (was 1-byte, mismatched relay encoder) - Schema caching: 0x32 SchemaAnnounce frames now parsed and cached - RTT tracking: receivers send periodic pings (5s), measure round-trip latency - Better reconnect logging: includes URL and attempt count Relay tests (57 total): - catchup_merges_multiple_diffs: sync + 3 diffs → 1 merged frame - catchup_diffs_only_no_sync: diffs without sync → merged frame - catchup_preserves_version_counters: conflict resolution versions kept New example: - timer-multi-action.ds: every timer + multi-action buttons verified Documentation: - STREAM_COMPOSITION.md: 4 new sections (Diff Batching, Connection Status, RTT, Relay Merging) - Updated example table with streaming-dashboard.ds and timer-multi-action.ds All 9 examples pass regression (44-70KB each)
256 lines
8.5 KiB
Markdown
256 lines
8.5 KiB
Markdown
# DreamStack Stream Composition
|
||
|
||
Compose independent apps through live signal streams. Each app is a node in a reactive dataflow graph — it can source signals, receive them, derive new ones, and re-stream the results.
|
||
|
||
## Core Concepts
|
||
|
||
**Source** — an app that publishes signals to the relay:
|
||
```
|
||
stream counter on "ws://localhost:9100/peer/counter" {
|
||
mode: signal,
|
||
output: count, doubled
|
||
}
|
||
```
|
||
|
||
**Receiver** — an app that subscribes to a stream:
|
||
```
|
||
let counter = stream from "ws://localhost:9100/stream/counter"
|
||
text "Count: {counter.count}"
|
||
```
|
||
|
||
**Composition** — an app that is both receiver AND source:
|
||
```
|
||
let counter = stream from ".../stream/counter"
|
||
let events = counter.count + stats.total -- derived from upstream
|
||
stream metrics on ".../peer/metrics" { output: events }
|
||
```
|
||
|
||
## URL Convention
|
||
|
||
| Role | URL Pattern | Description |
|
||
|------|------------|-------------|
|
||
| Source/Peer | `ws://relay/peer/{channel}` | Publishes signals to a named channel |
|
||
| Receiver | `ws://relay/stream/{channel}` | Subscribes to a named channel |
|
||
| Signaling | `ws://relay/signal/{channel}` | WebRTC signaling exchange |
|
||
|
||
Each channel is independent. A source on `/peer/counter` is received at `/stream/counter`.
|
||
|
||
## The `select` Clause
|
||
|
||
Filter which signals a receiver cares about:
|
||
|
||
```
|
||
-- Receive everything (default)
|
||
let counter = stream from "ws://.../stream/counter"
|
||
|
||
-- Receive only specific fields
|
||
let counter = stream from "ws://.../stream/counter" {
|
||
select: count, doubled
|
||
}
|
||
```
|
||
|
||
**What happens:**
|
||
1. **Client-side** — `_connectStream` filters incoming state to only selected keys
|
||
2. **Relay-side** — Receiver sends a `0x33 SubscribeFilter` frame; relay strips unwanted fields from signal frames before forwarding (saving bandwidth)
|
||
|
||
## `stream` Declaration
|
||
|
||
```
|
||
stream <name> on "<relay-url>" {
|
||
mode: signal, -- signal | pixel
|
||
output: field1, field2 -- which signals to publish (private signals excluded)
|
||
}
|
||
```
|
||
|
||
| Key | Values | Description |
|
||
|-----|--------|-------------|
|
||
| `mode` | `signal` (default) | Signal mode streams JSON state; pixel mode streams RGBA frames |
|
||
| `output` | comma-separated names | Only listed signals are sent; unlisted ones stay local |
|
||
|
||
**Private signals** — any signal not in `output` is never transmitted:
|
||
```
|
||
let count = 0 -- public (in output)
|
||
let doubled = count * 2 -- public (in output)
|
||
let message = "hello" -- private (not in output)
|
||
|
||
stream counter on "..." { output: count, doubled }
|
||
```
|
||
|
||
## `stream from` Expression
|
||
|
||
```
|
||
let <name> = stream from "<receiver-url>"
|
||
let <name> = stream from "<receiver-url>" { select: field1, field2 }
|
||
```
|
||
|
||
Returns a reactive signal proxy. Access fields with dot notation:
|
||
```
|
||
text "Count: {counter.count}"
|
||
let events = counter.count + stats.total
|
||
```
|
||
|
||
The proxy auto-updates when new frames arrive from the relay. All dependent derived signals and UI elements update reactively.
|
||
|
||
## Chained Composition
|
||
|
||
Apps can be stacked in layers. Each layer receives, derives, and re-streams:
|
||
|
||
```
|
||
Layer 0 (Sources):
|
||
Counter ──┐
|
||
Clock ───┤
|
||
Stats ───┘
|
||
└──► Layer 1 (Aggregator):
|
||
compose-metrics
|
||
derives: uptime, events, status
|
||
re-streams on /peer/metrics
|
||
└──► Layer 2 (Dashboard):
|
||
compose-master
|
||
receives: metrics + mood
|
||
```
|
||
|
||
**Layer 1** — `compose-metrics.ds`:
|
||
```
|
||
let counter = stream from ".../stream/counter"
|
||
let clock = stream from ".../stream/clock"
|
||
let stats = stream from ".../stream/stats"
|
||
|
||
let uptime = clock.seconds
|
||
let events = counter.count + stats.total
|
||
let status = "nominal"
|
||
|
||
stream metrics on ".../peer/metrics" {
|
||
mode: signal, output: uptime, events, status
|
||
}
|
||
```
|
||
|
||
**Layer 2** — `compose-master.ds`:
|
||
```
|
||
let metrics = stream from ".../stream/metrics" { select: uptime, events }
|
||
let mood = stream from ".../stream/mood" { select: mood, energy }
|
||
```
|
||
|
||
## Bitstream Protocol
|
||
|
||
All frames share a 16-byte header:
|
||
|
||
```
|
||
┌──────┬───────┬──────┬───────────┬───────┬────────┬────────┐
|
||
│ type │ flags │ seq │ timestamp │ width │ height │ length │
|
||
│ u8 │ u8 │ u16 │ u32 │ u16 │ u16 │ u32 │
|
||
└──────┴───────┴──────┴───────────┴───────┴────────┴────────┘
|
||
```
|
||
|
||
### Signal Frame Types
|
||
|
||
| Code | Name | Direction | Payload |
|
||
|------|------|-----------|---------|
|
||
| `0x30` | SignalSync | source → relay → receivers | Full JSON state `{ count: 3, doubled: 6 }` |
|
||
| `0x31` | SignalDiff | source → relay → receivers | Changed fields only `{ count: 4 }` |
|
||
| `0x32` | SchemaAnnounce | source → relay | `{ signals: ["count","doubled"], mode: "signal" }` |
|
||
| `0x33` | SubscribeFilter | receiver → relay | `{ select: ["count"] }` |
|
||
|
||
### Pixel Frame Types
|
||
|
||
| Code | Name | Payload |
|
||
|------|------|---------|
|
||
| `0x01` | Pixels | Raw RGBA (width × height × 4 bytes) |
|
||
| `0x03` | DeltaPixels | XOR delta, RLE-compressed |
|
||
| `0xF0` | Keyframe | Reset receiver state |
|
||
|
||
### Flags
|
||
|
||
| Bit | Name | Meaning |
|
||
|-----|------|---------|
|
||
| 0 | `FLAG_INPUT` | Frame is input (receiver → source) |
|
||
| 1 | `FLAG_KEYFRAME` | Full state, no delta |
|
||
| 2 | `FLAG_COMPRESSED` | Payload is compressed |
|
||
|
||
## Relay
|
||
|
||
```bash
|
||
cargo run -p ds-stream
|
||
```
|
||
|
||
The relay is a production-grade WebSocket router:
|
||
- Routes frames from sources/peers to receivers on the same channel
|
||
- **Diff merging**: Consolidates accumulated signal diffs into ONE catchup frame for late joiners
|
||
- Caches schema announcements (0x32) per channel
|
||
- Applies per-receiver signal filters (0x33) before forwarding
|
||
- Auto-creates channels on first connection
|
||
- Cleans up idle channels after grace period
|
||
- Source reconnect grace: cache preserved during reconnection window
|
||
|
||
## Runtime Features
|
||
|
||
### Diff Batching
|
||
|
||
Multiple signal changes in a single event loop tick are coalesced into one WebSocket frame:
|
||
```
|
||
-- Click fires 3 signal changes, but sends only 1 WS frame:
|
||
button "Happy" { click: mood = "happy"; color = "green"; energy += 10 }
|
||
```
|
||
|
||
### Connection Status
|
||
|
||
The `stream from` proxy injects reactive connection metadata:
|
||
```
|
||
let counter = stream from "ws://localhost:9100/stream/counter"
|
||
|
||
-- Available fields:
|
||
-- counter._connected boolean (true when WS is open)
|
||
-- counter._latency RTT in milliseconds (ping/pong every 5s)
|
||
-- counter._frames total frames received
|
||
-- counter._reconnects reconnect attempt count
|
||
```
|
||
|
||
### RTT Tracking
|
||
|
||
Both sources and receivers measure round-trip latency via periodic ping/pong (every 5s).
|
||
The `_latency` field updates reactively on each measurement.
|
||
|
||
### Relay Diff Merging
|
||
|
||
Late-joining receivers receive a single consolidated sync frame instead of replaying
|
||
hundreds of individual diffs:
|
||
```
|
||
-- Before: receiver gets sync + 500 individual diffs
|
||
-- After: receiver gets 1 merged sync frame with all changes applied
|
||
```
|
||
|
||
## Running the Demo
|
||
|
||
```bash
|
||
# Terminal 1: Start relay
|
||
cargo run -p ds-stream
|
||
|
||
# Terminal 2-5: Start source apps
|
||
dreamstack stream examples/streaming-counter.ds --port 3000
|
||
dreamstack stream examples/streaming-clock.ds --port 3002
|
||
dreamstack stream examples/streaming-stats.ds --port 3003
|
||
dreamstack stream examples/streaming-mood.ds --port 3004
|
||
|
||
# Terminal 6: Start aggregator (Layer 1)
|
||
dreamstack stream examples/compose-metrics.ds --port 3006
|
||
|
||
# Terminal 7: Build and serve dashboard
|
||
dreamstack build examples/streaming-dashboard.ds -o /tmp/build-dash
|
||
python3 -m http.server 3007 -d /tmp/build-dash
|
||
```
|
||
|
||
Open http://localhost:3007 to see all streams composited into one dashboard.
|
||
|
||
## Examples
|
||
|
||
| File | Role | Channel | Signals |
|
||
|------|------|---------|---------|
|
||
| `streaming-counter.ds` | Source | `/peer/counter` | count, doubled |
|
||
| `streaming-clock.ds` | Source | `/peer/clock` | hours, minutes, seconds |
|
||
| `streaming-stats.ds` | Source | `/peer/stats` | total, average, max |
|
||
| `streaming-mood.ds` | Source | `/peer/mood` | mood, energy, color |
|
||
| `streaming-dashboard.ds` | Receiver | 3 streams | Components + live data |
|
||
| `compose-dashboard.ds` | Receiver | 4 streams | Flat dashboard |
|
||
| `compose-metrics.ds` | Receiver+Source | 3 in, 1 out | uptime, events, status |
|
||
| `compose-master.ds` | Receiver | 2 streams | Chained dashboard |
|
||
| `timer-multi-action.ds` | Timer | standalone | ticks, status, elapsed |
|
||
|