7.1 KiB
DreamStack Stream Composition
Compose independent apps through live signal streams. Each app is a node in a reactive dataflow graph — it can source signals, receive them, derive new ones, and re-stream the results.
Core Concepts
Source — an app that publishes signals to the relay:
stream counter on "ws://localhost:9100/peer/counter" {
mode: signal,
output: count, doubled
}
Receiver — an app that subscribes to a stream:
let counter = stream from "ws://localhost:9100/stream/counter"
text "Count: {counter.count}"
Composition — an app that is both receiver AND source:
let counter = stream from ".../stream/counter"
let events = counter.count + stats.total -- derived from upstream
stream metrics on ".../peer/metrics" { output: events }
URL Convention
| Role | URL Pattern | Description |
|---|---|---|
| Source/Peer | ws://relay/peer/{channel} |
Publishes signals to a named channel |
| Receiver | ws://relay/stream/{channel} |
Subscribes to a named channel |
| Signaling | ws://relay/signal/{channel} |
WebRTC signaling exchange |
Each channel is independent. A source on /peer/counter is received at /stream/counter.
The select Clause
Filter which signals a receiver cares about:
-- Receive everything (default)
let counter = stream from "ws://.../stream/counter"
-- Receive only specific fields
let counter = stream from "ws://.../stream/counter" {
select: count, doubled
}
What happens:
- Client-side —
_connectStreamfilters incoming state to only selected keys - Relay-side — Receiver sends a
0x33 SubscribeFilterframe; relay strips unwanted fields from signal frames before forwarding (saving bandwidth)
stream Declaration
stream <name> on "<relay-url>" {
mode: signal, -- signal | pixel
output: field1, field2 -- which signals to publish (private signals excluded)
}
| Key | Values | Description |
|---|---|---|
mode |
signal (default) |
Signal mode streams JSON state; pixel mode streams RGBA frames |
output |
comma-separated names | Only listed signals are sent; unlisted ones stay local |
Private signals — any signal not in output is never transmitted:
let count = 0 -- public (in output)
let doubled = count * 2 -- public (in output)
let message = "hello" -- private (not in output)
stream counter on "..." { output: count, doubled }
stream from Expression
let <name> = stream from "<receiver-url>"
let <name> = stream from "<receiver-url>" { select: field1, field2 }
Returns a reactive signal proxy. Access fields with dot notation:
text "Count: {counter.count}"
let events = counter.count + stats.total
The proxy auto-updates when new frames arrive from the relay. All dependent derived signals and UI elements update reactively.
Chained Composition
Apps can be stacked in layers. Each layer receives, derives, and re-streams:
Layer 0 (Sources):
Counter ──┐
Clock ───┤
Stats ───┘
└──► Layer 1 (Aggregator):
compose-metrics
derives: uptime, events, status
re-streams on /peer/metrics
└──► Layer 2 (Dashboard):
compose-master
receives: metrics + mood
Layer 1 — compose-metrics.ds:
let counter = stream from ".../stream/counter"
let clock = stream from ".../stream/clock"
let stats = stream from ".../stream/stats"
let uptime = clock.seconds
let events = counter.count + stats.total
let status = "nominal"
stream metrics on ".../peer/metrics" {
mode: signal, output: uptime, events, status
}
Layer 2 — compose-master.ds:
let metrics = stream from ".../stream/metrics" { select: uptime, events }
let mood = stream from ".../stream/mood" { select: mood, energy }
Bitstream Protocol
All frames share a 16-byte header:
┌──────┬───────┬──────┬───────────┬───────┬────────┬────────┐
│ type │ flags │ seq │ timestamp │ width │ height │ length │
│ u8 │ u8 │ u16 │ u32 │ u16 │ u16 │ u32 │
└──────┴───────┴──────┴───────────┴───────┴────────┴────────┘
Signal Frame Types
| Code | Name | Direction | Payload |
|---|---|---|---|
0x30 |
SignalSync | source → relay → receivers | Full JSON state { count: 3, doubled: 6 } |
0x31 |
SignalDiff | source → relay → receivers | Changed fields only { count: 4 } |
0x32 |
SchemaAnnounce | source → relay | { signals: ["count","doubled"], mode: "signal" } |
0x33 |
SubscribeFilter | receiver → relay | { select: ["count"] } |
Pixel Frame Types
| Code | Name | Payload |
|---|---|---|
0x01 |
Pixels | Raw RGBA (width × height × 4 bytes) |
0x03 |
DeltaPixels | XOR delta, RLE-compressed |
0xF0 |
Keyframe | Reset receiver state |
Flags
| Bit | Name | Meaning |
|---|---|---|
| 0 | FLAG_INPUT |
Frame is input (receiver → source) |
| 1 | FLAG_KEYFRAME |
Full state, no delta |
| 2 | FLAG_COMPRESSED |
Payload is compressed |
Relay
cargo run -p ds-stream
The relay is a stateless WebSocket router:
- Routes frames from sources/peers to receivers on the same channel
- Caches latest state for late-joining receivers
- Caches schema announcements (0x32) per channel
- Applies per-receiver signal filters (0x33) before forwarding
- Auto-creates channels on first connection
- Cleans up idle channels after grace period
Running the Demo
# Terminal 1: Start relay
cargo run -p ds-stream
# Terminal 2-5: Start source apps
dreamstack stream examples/streaming-counter.ds --port 3000
dreamstack stream examples/streaming-clock.ds --port 3002
dreamstack stream examples/streaming-stats.ds --port 3003
dreamstack stream examples/streaming-mood.ds --port 3004
# Terminal 6: Start aggregator (Layer 1)
dreamstack stream examples/compose-metrics.ds --port 3006
# Terminal 7: Build and serve master dashboard (Layer 2)
dreamstack build examples/compose-master.ds -o /tmp/build-master
python3 -m http.server 3007 -d /tmp/build-master
Open http://localhost:3007 to see all streams composited into one dashboard.
Examples
| File | Role | Channel | Signals |
|---|---|---|---|
streaming-counter.ds |
Source | /peer/counter |
count, doubled |
streaming-clock.ds |
Source | /peer/clock |
hours, minutes, seconds |
streaming-stats.ds |
Source | /peer/stats |
total, average, max |
streaming-mood.ds |
Source | /peer/mood |
mood, energy, color |
compose-dashboard.ds |
Receiver | 4 streams | Flat dashboard |
compose-metrics.ds |
Receiver+Source | 3 in, 1 out | uptime, events, status |
compose-master.ds |
Receiver | 2 streams | Chained dashboard |