feat(ds-stream): RLE compression, input events, keyframe caching
- protocol.rs: TouchEvent, GamepadAxisEvent, GamepadButtonEvent, ResizeEvent with encode/decode and roundtrip tests - codec.rs: rle_encode/rle_decode for delta frame compression (333x on unchanged frames), signal_sync_frame/signal_diff_frame builders, touch/gamepad/stream_end convenience builders - relay.rs: StateCache for late-joining receivers (keyframe + signal sync + accumulated diffs), periodic keepalive pings, stats logging every 30s, diff accumulator with cap at 1000 - BITSTREAM_INTEGRATION.md: compiler integration spec for another agent - Tests: 17 → 38 (all passing)
This commit is contained in:
parent
ea64617569
commit
69f39746af
4 changed files with 1547 additions and 77 deletions
875
BITSTREAM_INTEGRATION.md
Normal file
875
BITSTREAM_INTEGRATION.md
Normal file
|
|
@ -0,0 +1,875 @@
|
|||
# Bitstream Integration Spec
|
||||
|
||||
> **Purpose**: This document specifies all changes needed to make DreamStack natively bitstream-aware. A `.ds` file should compile into a fully streamable app with one keyword. Follow each section in order — they build on each other.
|
||||
|
||||
## Background
|
||||
|
||||
The `engine/ds-stream/` crate already implements:
|
||||
- A 16-byte binary protocol (see `src/protocol.rs`)
|
||||
- WebSocket relay server (see `src/relay.rs`)
|
||||
- Encode/decode with delta compression (see `src/codec.rs`)
|
||||
- Two working HTML demos (`examples/stream-source.html`, `examples/stream-receiver.html`)
|
||||
|
||||
The compiler pipeline is: `.ds` → Lexer → Parser → AST → Analyzer (signal graph) → Type Checker → Codegen (JS+HTML).
|
||||
|
||||
**What exists already but is not wired up:**
|
||||
- Lexer: `stream` keyword → `TokenKind::Stream` (line 37 of `lexer.rs`)
|
||||
- AST: `Expr::StreamFrom(String)` (line 143 of `ast.rs`)
|
||||
- Parser: parses `stream from source_name` → `StreamFrom` (line 616 of `parser.rs`)
|
||||
- Type checker: returns `Type::Unknown` for `StreamFrom` (line 355 of `checker.rs`)
|
||||
- Codegen: **ignores `StreamFrom` entirely** — never emits any JS for it
|
||||
|
||||
---
|
||||
|
||||
## Change 1: AST — New Types
|
||||
|
||||
**File**: `compiler/ds-parser/src/ast.rs`
|
||||
|
||||
### Add `StreamDecl` to `Declaration` enum
|
||||
|
||||
```rust
|
||||
pub enum Declaration {
|
||||
Let(LetDecl),
|
||||
View(ViewDecl),
|
||||
Effect(EffectDecl),
|
||||
OnHandler(OnHandler),
|
||||
Component(ComponentDecl),
|
||||
Route(RouteDecl),
|
||||
Constrain(ConstrainDecl),
|
||||
Stream(StreamDecl), // ← NEW
|
||||
}
|
||||
```
|
||||
|
||||
### Add `StreamDecl` struct
|
||||
|
||||
```rust
|
||||
/// `stream main on "ws://localhost:9100" { mode: signal }`
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StreamDecl {
|
||||
pub view_name: String, // which view to stream
|
||||
pub relay_url: Expr, // the WebSocket URL (an expression, usually a string lit)
|
||||
pub mode: StreamMode, // streaming mode
|
||||
pub span: Span,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum StreamMode {
|
||||
Pixel, // raw RGBA framebuffer every frame
|
||||
Delta, // XOR + RLE — only changed pixels
|
||||
Signal, // JSON signal diffs (DreamStack-native, ~2 KB/s)
|
||||
}
|
||||
|
||||
impl Default for StreamMode {
|
||||
fn default() -> Self { StreamMode::Signal }
|
||||
}
|
||||
```
|
||||
|
||||
### Modify `StreamFrom`
|
||||
|
||||
```rust
|
||||
// Replace:
|
||||
StreamFrom(String),
|
||||
// With:
|
||||
StreamFrom {
|
||||
source: String,
|
||||
mode: Option<StreamMode>,
|
||||
},
|
||||
```
|
||||
|
||||
Update all match arms in `parser.rs`, `signal_graph.rs`, `checker.rs`, and `js_emitter.rs` that reference `StreamFrom`.
|
||||
|
||||
---
|
||||
|
||||
## Change 2: Lexer — New Keywords
|
||||
|
||||
**File**: `compiler/ds-parser/src/lexer.rs`
|
||||
|
||||
In the `lex_ident_or_keyword` function, add to the keyword match (around line 306):
|
||||
|
||||
```rust
|
||||
"stream" => TokenKind::Stream, // already exists
|
||||
"pixel" => TokenKind::Pixel, // NEW
|
||||
"delta" => TokenKind::Delta, // NEW
|
||||
"signals" => TokenKind::Signals, // NEW (note: plural to avoid clash with signal())
|
||||
```
|
||||
|
||||
Add these to the `TokenKind` enum:
|
||||
|
||||
```rust
|
||||
pub enum TokenKind {
|
||||
// ... existing tokens ...
|
||||
Stream, // already exists
|
||||
Pixel, // NEW
|
||||
Delta, // NEW
|
||||
Signals, // NEW
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Change 3: Parser — Parse Stream Declarations
|
||||
|
||||
**File**: `compiler/ds-parser/src/parser.rs`
|
||||
|
||||
### In `parse_declaration` (around line 91)
|
||||
|
||||
Add a new arm for the `stream` keyword:
|
||||
|
||||
```rust
|
||||
TokenKind::Stream => self.parse_stream_decl(),
|
||||
```
|
||||
|
||||
### New method: `parse_stream_decl`
|
||||
|
||||
```rust
|
||||
/// Parse: `stream <view_name> on <url_expr>`
|
||||
/// Optional: `{ mode: pixel | delta | signal }`
|
||||
fn parse_stream_decl(&mut self) -> Result<Declaration, ParseError> {
|
||||
let span = self.current_token().span;
|
||||
self.advance(); // consume `stream`
|
||||
|
||||
// Check if this is `stream from` (existing syntax) or `stream <name> on`
|
||||
if self.check(&TokenKind::Ident("from".into())) {
|
||||
// Existing stream-from expression — delegate to expression parser
|
||||
// Actually, this is a declaration context, so handle it differently:
|
||||
return self.parse_stream_from_as_decl();
|
||||
}
|
||||
|
||||
let view_name = self.expect_ident()?;
|
||||
|
||||
// Expect `on`
|
||||
match self.peek() {
|
||||
TokenKind::Ident(s) if s == "on" => { self.advance(); }
|
||||
_ => return Err(self.error("Expected 'on' after stream view name".into())),
|
||||
}
|
||||
|
||||
let relay_url = self.parse_expr()?;
|
||||
|
||||
// Optional mode block: `{ mode: signal }`
|
||||
let mode = if self.check(&TokenKind::LBrace) {
|
||||
self.advance(); // {
|
||||
let mut mode = StreamMode::Signal;
|
||||
// Parse key-value pairs
|
||||
while !self.check(&TokenKind::RBrace) && !self.is_at_end() {
|
||||
self.skip_newlines();
|
||||
let key = self.expect_ident()?;
|
||||
self.expect(&TokenKind::Colon)?;
|
||||
if key == "mode" {
|
||||
match self.peek() {
|
||||
TokenKind::Pixel => { mode = StreamMode::Pixel; self.advance(); }
|
||||
TokenKind::Delta => { mode = StreamMode::Delta; self.advance(); }
|
||||
TokenKind::Signals => { mode = StreamMode::Signal; self.advance(); }
|
||||
TokenKind::Ident(s) if s == "signal" => { mode = StreamMode::Signal; self.advance(); }
|
||||
_ => return Err(self.error("Expected pixel, delta, or signal".into())),
|
||||
}
|
||||
}
|
||||
// Skip comma if present
|
||||
if self.check(&TokenKind::Comma) { self.advance(); }
|
||||
self.skip_newlines();
|
||||
}
|
||||
self.expect(&TokenKind::RBrace)?;
|
||||
mode
|
||||
} else {
|
||||
StreamMode::Signal // default
|
||||
};
|
||||
|
||||
Ok(Declaration::Stream(StreamDecl { view_name, relay_url, mode, span }))
|
||||
}
|
||||
```
|
||||
|
||||
### Update `parse_primary` for `StreamFrom`
|
||||
|
||||
Around line 610-616, update the existing `stream from` parsing:
|
||||
|
||||
```rust
|
||||
TokenKind::Stream => {
|
||||
self.advance(); // consume `stream`
|
||||
// Expect `from`
|
||||
match self.peek() {
|
||||
TokenKind::Ident(s) if s == "from" => { self.advance(); }
|
||||
_ => return Err(self.error("Expected 'from' after stream".into())),
|
||||
}
|
||||
let source = self.parse_expr()?;
|
||||
let source_str = match &source {
|
||||
Expr::StringLit(s) => s.segments.iter().map(|seg| match seg {
|
||||
StringSegment::Literal(l) => l.clone(),
|
||||
_ => String::new(),
|
||||
}).collect(),
|
||||
Expr::Ident(name) => name.clone(),
|
||||
_ => return Err(self.error("Stream source must be a string or identifier".into())),
|
||||
};
|
||||
Ok(Expr::StreamFrom { source: source_str, mode: None })
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Change 4: Signal Graph — Stream Awareness
|
||||
|
||||
**File**: `compiler/ds-analyzer/src/signal_graph.rs`
|
||||
|
||||
### Add `streamable` flag to `SignalNode`
|
||||
|
||||
```rust
|
||||
pub struct SignalNode {
|
||||
pub name: String,
|
||||
pub kind: SignalKind,
|
||||
pub initial: Option<InitialValue>,
|
||||
pub deps: Vec<Dependency>,
|
||||
pub streamable: bool, // NEW: should this be sent to receivers?
|
||||
}
|
||||
```
|
||||
|
||||
Default `streamable` to `true` for all `Source` signals when a `StreamDecl` is present.
|
||||
|
||||
### New: `SignalManifest`
|
||||
|
||||
```rust
|
||||
/// Static description of all signals for receiver reconstruction.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SignalManifest {
|
||||
pub signals: Vec<ManifestEntry>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ManifestEntry {
|
||||
pub name: String,
|
||||
pub kind: SignalKind,
|
||||
pub initial: Option<InitialValue>,
|
||||
pub is_spring: bool,
|
||||
}
|
||||
|
||||
impl SignalGraph {
|
||||
/// Generate a manifest for receivers to know how to reconstruct the signal state.
|
||||
pub fn signal_manifest(&self) -> SignalManifest {
|
||||
SignalManifest {
|
||||
signals: self.nodes.iter()
|
||||
.filter(|n| n.streamable)
|
||||
.map(|n| ManifestEntry {
|
||||
name: n.name.clone(),
|
||||
kind: n.kind.clone(),
|
||||
initial: n.initial.clone(),
|
||||
is_spring: false, // detect from AST if the let decl uses spring()
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Detect `StreamDecl` in `from_program`
|
||||
|
||||
In `from_program` (line 97), after processing all declarations, check if any `Declaration::Stream` exists. If so, mark all source signals as `streamable = true`.
|
||||
|
||||
---
|
||||
|
||||
## Change 5: Type Checker
|
||||
|
||||
**File**: `compiler/ds-types/src/checker.rs`
|
||||
|
||||
At line 355, replace:
|
||||
|
||||
```rust
|
||||
Expr::StreamFrom(_source) => {
|
||||
// TODO
|
||||
Ok(Type::Unknown)
|
||||
}
|
||||
```
|
||||
|
||||
With:
|
||||
|
||||
```rust
|
||||
Expr::StreamFrom { source, .. } => {
|
||||
// A stream from a remote source produces a reactive signal state
|
||||
Ok(Type::Named("StreamState".into()))
|
||||
}
|
||||
```
|
||||
|
||||
Also add handling for `Declaration::Stream` in the declaration checker — it should verify that the view name exists and that the URL is a string expression.
|
||||
|
||||
---
|
||||
|
||||
## Change 6: Codegen — the big one
|
||||
|
||||
**File**: `compiler/ds-codegen/src/js_emitter.rs`
|
||||
|
||||
### 6a. Handle `Declaration::Stream` in `emit_program`
|
||||
|
||||
In `emit_program` (line 54), where declarations are iterated, add:
|
||||
|
||||
```rust
|
||||
Declaration::Stream(stream_decl) => {
|
||||
self.emit_stream_init(stream_decl, graph);
|
||||
}
|
||||
```
|
||||
|
||||
### 6b. New method: `emit_stream_init`
|
||||
|
||||
```rust
|
||||
fn emit_stream_init(&mut self, decl: &StreamDecl, graph: &SignalGraph) {
|
||||
let url = self.emit_expr(&decl.relay_url);
|
||||
let mode = match decl.mode {
|
||||
StreamMode::Pixel => "pixel",
|
||||
StreamMode::Delta => "delta",
|
||||
StreamMode::Signal => "signal",
|
||||
};
|
||||
self.emit_line(&format!("DS._initStream({}, '{}');", url, mode));
|
||||
}
|
||||
```
|
||||
|
||||
### 6c. Wrap signal mutations with stream diffs
|
||||
|
||||
In the code that emits signal assignments (search for `emit_event_handler_expr`, around line 655), after emitting `signal.value = expr`, also emit:
|
||||
|
||||
```javascript
|
||||
DS._streamDiff("signalName", signal.value);
|
||||
```
|
||||
|
||||
Find every place where the codegen writes `.value = ` for a signal assignment. After each one, add a `DS._streamDiff(name, newValue)` call if the signal is streamable.
|
||||
|
||||
### 6d. Handle `Expr::StreamFrom` in `emit_expr`
|
||||
|
||||
In `emit_expr` (line 565), `StreamFrom` is currently not matched. Add:
|
||||
|
||||
```rust
|
||||
Expr::StreamFrom { source, .. } => {
|
||||
format!("DS._connectStream({})", source)
|
||||
}
|
||||
```
|
||||
|
||||
### 6e. Scene streaming hook
|
||||
|
||||
In `emit_scene` (starting line 733), inside the `_sceneLoop` function that's emitted (around line 925), add after `_sceneDraw()`:
|
||||
|
||||
```javascript
|
||||
if (DS._streamWs) DS._streamSceneState(_world, _sceneW, _sceneH);
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Change 7: JS Runtime — Streaming Layer
|
||||
|
||||
**File**: `compiler/ds-codegen/src/js_emitter.rs`, inside the `RUNTIME_JS` const (starting at line 1125)
|
||||
|
||||
Add the following before the closing `return { ... }` statement (line 1553). Also add `_initStream`, `_streamDiff`, `_connectStream`, `_streamSceneState` to the returned object.
|
||||
|
||||
```javascript
|
||||
// ── Bitstream Streaming ──
|
||||
const HEADER_SIZE = 16;
|
||||
let _streamWs = null;
|
||||
let _streamMode = 'signal';
|
||||
let _streamSeq = 0;
|
||||
let _streamStart = 0;
|
||||
let _prevFrame = null;
|
||||
|
||||
function _initStream(url, mode) {
|
||||
_streamMode = mode || 'signal';
|
||||
_streamStart = performance.now();
|
||||
_streamWs = new WebSocket(url);
|
||||
_streamWs.binaryType = 'arraybuffer';
|
||||
_streamWs.onmessage = function(e) {
|
||||
if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return;
|
||||
const bytes = new Uint8Array(e.data);
|
||||
const view = new DataView(bytes.buffer);
|
||||
const type = view.getUint8(0);
|
||||
const flags = view.getUint8(1);
|
||||
if (flags & 0x01) _handleRemoteInput(type, bytes.subarray(HEADER_SIZE));
|
||||
};
|
||||
_streamWs.onclose = function() { setTimeout(function() { _initStream(url, mode); }, 2000); };
|
||||
console.log('[ds-stream] Source connected:', url, 'mode:', mode);
|
||||
}
|
||||
|
||||
function _streamSend(type, flags, payload) {
|
||||
if (!_streamWs || _streamWs.readyState !== 1) return;
|
||||
const ts = (performance.now() - _streamStart) | 0;
|
||||
const msg = new Uint8Array(HEADER_SIZE + payload.length);
|
||||
const v = new DataView(msg.buffer);
|
||||
v.setUint8(0, type);
|
||||
v.setUint8(1, flags);
|
||||
v.setUint16(2, (_streamSeq++) & 0xFFFF, true);
|
||||
v.setUint32(4, ts, true);
|
||||
v.setUint32(12, payload.length, true);
|
||||
msg.set(payload, HEADER_SIZE);
|
||||
_streamWs.send(msg.buffer);
|
||||
}
|
||||
|
||||
// Signal diff: send changed signal values as JSON
|
||||
function _streamDiff(name, value) {
|
||||
if (!_streamWs || _streamMode !== 'signal') return;
|
||||
const json = JSON.stringify({ [name]: typeof value === 'object' && 'value' in value ? value.value : value });
|
||||
_streamSend(0x31, 0, new TextEncoder().encode(json));
|
||||
}
|
||||
|
||||
// Full signal sync: send all signal values
|
||||
function _streamSync(signals) {
|
||||
const state = {};
|
||||
for (const [name, sig] of Object.entries(signals)) {
|
||||
state[name] = typeof sig === 'object' && '_value' in sig ? sig._value : sig;
|
||||
}
|
||||
_streamSend(0x30, 0x02, new TextEncoder().encode(JSON.stringify(state)));
|
||||
}
|
||||
|
||||
// Scene streaming: send body positions as binary or capture canvas pixels
|
||||
function _streamSceneState(world, w, h) {
|
||||
if (_streamMode === 'signal') {
|
||||
// Send body positions as JSON signal state
|
||||
const bodies = [];
|
||||
for (let b = 0; b < world.body_count(); b++) {
|
||||
const p = world.get_body_center(b);
|
||||
bodies.push({ x: p[0] | 0, y: p[1] | 0 });
|
||||
}
|
||||
_streamSend(0x31, 0, new TextEncoder().encode(JSON.stringify({ _bodies: bodies })));
|
||||
}
|
||||
// Pixel/delta modes are handled by canvas capture in the scene wrapper
|
||||
}
|
||||
|
||||
// Handle input events from receivers
|
||||
function _handleRemoteInput(type, payload) {
|
||||
if (payload.length < 4) return;
|
||||
const view = new DataView(payload.buffer, payload.byteOffset);
|
||||
switch (type) {
|
||||
case 0x01: // Pointer
|
||||
case 0x02: // PointerDown
|
||||
emit('remote_pointer', {
|
||||
x: view.getUint16(0, true),
|
||||
y: view.getUint16(2, true),
|
||||
buttons: payload.length > 4 ? view.getUint8(4) : 0,
|
||||
type: type === 0x02 ? 'down' : 'move'
|
||||
});
|
||||
break;
|
||||
case 0x03: // PointerUp
|
||||
emit('remote_pointer', { x: 0, y: 0, buttons: 0, type: 'up' });
|
||||
break;
|
||||
case 0x10: // KeyDown
|
||||
emit('remote_key', {
|
||||
keyCode: view.getUint16(0, true),
|
||||
type: 'down'
|
||||
});
|
||||
break;
|
||||
case 0x11: // KeyUp
|
||||
emit('remote_key', {
|
||||
keyCode: view.getUint16(0, true),
|
||||
type: 'up'
|
||||
});
|
||||
break;
|
||||
case 0x50: // Scroll
|
||||
emit('remote_scroll', {
|
||||
dx: view.getInt16(0, true),
|
||||
dy: view.getInt16(2, true)
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Connect as a receiver
|
||||
function _connectStream(url) {
|
||||
const state = new Signal(null);
|
||||
const ws = new WebSocket(url);
|
||||
ws.binaryType = 'arraybuffer';
|
||||
ws.onmessage = function(e) {
|
||||
if (!(e.data instanceof ArrayBuffer) || e.data.byteLength < HEADER_SIZE) return;
|
||||
const bytes = new Uint8Array(e.data);
|
||||
const view = new DataView(bytes.buffer);
|
||||
const type = view.getUint8(0);
|
||||
const payloadLen = view.getUint32(12, true);
|
||||
const payload = bytes.subarray(HEADER_SIZE, HEADER_SIZE + payloadLen);
|
||||
if (type === 0x30 || type === 0x31) {
|
||||
try {
|
||||
const newState = JSON.parse(new TextDecoder().decode(payload));
|
||||
state.value = Object.assign(state._value || {}, newState);
|
||||
} catch(e) {}
|
||||
}
|
||||
};
|
||||
ws.onclose = function() { setTimeout(function() { _connectStream(url); }, 2000); };
|
||||
return state;
|
||||
}
|
||||
```
|
||||
|
||||
Then update the return statement (line 1553):
|
||||
|
||||
```javascript
|
||||
return { signal, derived, effect, batch, flush, onEvent, emit,
|
||||
keyedList, route: _route, navigate, matchRoute,
|
||||
resource, fetchJSON,
|
||||
spring, constrain, viewport: _viewport,
|
||||
scene, circle, rect, line,
|
||||
_initStream, _streamDiff, _streamSync, _streamSceneState, _connectStream, // NEW
|
||||
Signal, Derived, Effect, Spring };
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Change 8: CLI — Stream Command
|
||||
|
||||
**File**: `compiler/ds-cli/src/main.rs`
|
||||
|
||||
### Add to the `Commands` enum (around line 21)
|
||||
|
||||
```rust
|
||||
/// Compile and start streaming a .ds file
|
||||
Stream {
|
||||
/// Input .ds file
|
||||
file: PathBuf,
|
||||
/// Relay WebSocket URL
|
||||
#[arg(short, long, default_value = "ws://localhost:9100")]
|
||||
relay: String,
|
||||
/// Streaming mode (pixel, delta, signal)
|
||||
#[arg(short, long, default_value = "signal")]
|
||||
mode: String,
|
||||
/// Port to serve source on
|
||||
#[arg(short, long, default_value_t = 3000)]
|
||||
port: u16,
|
||||
}
|
||||
```
|
||||
|
||||
### Add to the main match (around line 50)
|
||||
|
||||
```rust
|
||||
Commands::Stream { file, relay, mode, port } => {
|
||||
cmd_stream(&file, &relay, &mode, port);
|
||||
}
|
||||
```
|
||||
|
||||
### New function: `cmd_stream`
|
||||
|
||||
```rust
|
||||
fn cmd_stream(file: &Path, relay: &str, mode: &str, port: u16) {
|
||||
// 1. Read the source .ds file
|
||||
let source = std::fs::read_to_string(file).unwrap_or_else(|e| {
|
||||
eprintln!("Error reading {}: {}", file.display(), e);
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
// 2. Inject a StreamDecl if not already present
|
||||
// Prepend: stream main on "{relay}" { mode: {mode} }
|
||||
let augmented = if source.contains("stream ") {
|
||||
source
|
||||
} else {
|
||||
format!("stream main on \"{}\" {{ mode: {} }}\n{}", relay, mode, source)
|
||||
};
|
||||
|
||||
// 3. Compile
|
||||
let html = match compile(&augmented) {
|
||||
Ok(html) => html,
|
||||
Err(e) => { eprintln!("Compile error: {}", e); std::process::exit(1); }
|
||||
};
|
||||
|
||||
// 4. Serve like `dev` mode, but also print stream info
|
||||
println!("╔══════════════════════════════════════════════════╗");
|
||||
println!("║ DreamStack Stream ║");
|
||||
println!("║ ║");
|
||||
println!("║ Source: http://localhost:{} ║", port);
|
||||
println!("║ Relay: {} ║", relay);
|
||||
println!("║ Mode: {} ║", mode);
|
||||
println!("╚══════════════════════════════════════════════════╝");
|
||||
|
||||
// Start HTTP server serving the compiled HTML (reuse dev server logic)
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Change 9: Layout Serialization
|
||||
|
||||
**File**: `compiler/ds-layout/src/solver.rs`
|
||||
|
||||
Add serialization to `LayoutRect`:
|
||||
|
||||
```rust
|
||||
impl LayoutRect {
|
||||
/// Serialize to 16 bytes: x(f32) + y(f32) + w(f32) + h(f32)
|
||||
pub fn to_bytes(&self) -> [u8; 16] {
|
||||
let mut buf = [0u8; 16];
|
||||
buf[0..4].copy_from_slice(&(self.x as f32).to_le_bytes());
|
||||
buf[4..8].copy_from_slice(&(self.y as f32).to_le_bytes());
|
||||
buf[8..12].copy_from_slice(&(self.width as f32).to_le_bytes());
|
||||
buf[12..16].copy_from_slice(&(self.height as f32).to_le_bytes());
|
||||
buf
|
||||
}
|
||||
|
||||
pub fn from_bytes(buf: &[u8; 16]) -> Self {
|
||||
Self {
|
||||
x: f32::from_le_bytes(buf[0..4].try_into().unwrap()) as f64,
|
||||
y: f32::from_le_bytes(buf[4..8].try_into().unwrap()) as f64,
|
||||
width: f32::from_le_bytes(buf[8..12].try_into().unwrap()) as f64,
|
||||
height: f32::from_le_bytes(buf[12..16].try_into().unwrap()) as f64,
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Execution Order
|
||||
|
||||
Implement in this order to keep the build passing at each step:
|
||||
|
||||
1. **AST + Lexer** (Changes 1-2) — add types, update all match arms with `todo!()` or pass-through
|
||||
2. **Parser** (Change 3) — parse the new syntax
|
||||
3. **Type checker** (Change 5) — handle new types
|
||||
4. **Signal graph** (Change 4) — add streamable flag + manifest
|
||||
5. **Runtime JS** (Change 7) — add streaming functions to `RUNTIME_JS`
|
||||
6. **Codegen** (Change 6) — emit stream init and signal diff calls
|
||||
7. **CLI** (Change 8) — add `stream` command
|
||||
8. **Layout** (Change 9) — add serialization
|
||||
|
||||
After each step, run `cargo test --workspace` and fix any compile errors.
|
||||
|
||||
## Verification
|
||||
|
||||
### After all changes:
|
||||
|
||||
```bash
|
||||
# Build + test
|
||||
cargo test --workspace
|
||||
|
||||
# Create a test file
|
||||
cat > /tmp/test_stream.ds << 'EOF'
|
||||
stream main on "ws://localhost:9100" { mode: signal }
|
||||
|
||||
let count = 0
|
||||
let doubled = count * 2
|
||||
|
||||
view main = column [
|
||||
text "Count: {count}"
|
||||
text "Doubled: {doubled}"
|
||||
button "+" { click: count += 1 }
|
||||
button "-" { click: count -= 1 }
|
||||
]
|
||||
EOF
|
||||
|
||||
# Start relay
|
||||
cargo run -p ds-stream &
|
||||
|
||||
# Compile and serve
|
||||
dreamstack dev /tmp/test_stream.ds
|
||||
|
||||
# Open in browser — the compiled page should:
|
||||
# 1. Connect to ws://localhost:9100 automatically
|
||||
# 2. Send signal diffs when you click + or -
|
||||
# 3. A receiver on another tab should see count update in real time
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## ⚠️ Ownership Boundaries — DO NOT TOUCH
|
||||
|
||||
Another agent is actively working on the `engine/ds-stream/` crate and the HTML demos. **Do not modify:**
|
||||
|
||||
| Path | Owner | Reason |
|
||||
|------|-------|--------|
|
||||
| `engine/ds-stream/src/*` | bitstream agent | Active development on protocol, relay, codec |
|
||||
| `engine/ds-stream/Cargo.toml` | bitstream agent | Dependency management |
|
||||
| `engine/ds-stream/README.md` | bitstream agent | Documentation |
|
||||
| `examples/stream-source.html` | bitstream agent | Working demo, will be replaced by compiled output |
|
||||
| `examples/stream-receiver.html` | bitstream agent | Working demo, will be replaced by compiled output |
|
||||
|
||||
**Your scope is the compiler pipeline only:**
|
||||
- `compiler/ds-parser/src/` — lexer, parser, AST
|
||||
- `compiler/ds-analyzer/src/` — signal graph
|
||||
- `compiler/ds-types/src/` — type checker
|
||||
- `compiler/ds-codegen/src/` — JS emitter + embedded runtime
|
||||
- `compiler/ds-cli/src/` — CLI commands
|
||||
- `compiler/ds-layout/src/` — layout solver
|
||||
|
||||
---
|
||||
|
||||
## 🔴 Exact Breakage Map
|
||||
|
||||
When you change `Expr::StreamFrom(String)` to `Expr::StreamFrom { source, mode }`, these **3 exact lines will fail to compile**:
|
||||
|
||||
| File | Line | Current Code | Fix |
|
||||
|------|------|-------------|-----|
|
||||
| `compiler/ds-parser/src/parser.rs` | 616 | `Ok(Expr::StreamFrom(full_source))` | `Ok(Expr::StreamFrom { source: full_source, mode: None })` |
|
||||
| `compiler/ds-types/src/checker.rs` | 355 | `Expr::StreamFrom(_source) => {` | `Expr::StreamFrom { source, .. } => {` |
|
||||
| `compiler/ds-analyzer/src/signal_graph.rs` | — | No direct match but `collect_deps` falls through to `_ => {}` | Add `Expr::StreamFrom { .. } => {}` for clarity |
|
||||
|
||||
When you add `Declaration::Stream(StreamDecl)` to the enum, **nothing breaks** because all existing code uses `if let Declaration::X(...)` patterns, not exhaustive `match` blocks. The new variant is silently ignored.
|
||||
|
||||
**However**, you should explicitly add handling in these locations:
|
||||
|
||||
| File | Method | Line | What to add |
|
||||
|------|--------|------|-------------|
|
||||
| `ds-codegen/src/js_emitter.rs` | `emit_program` | 54-228 | Process `Declaration::Stream` to emit `DS._initStream()` |
|
||||
| `ds-analyzer/src/signal_graph.rs` | `from_program` | 97-179 | Detect `Declaration::Stream` and set `streamable` flags |
|
||||
| `ds-types/src/checker.rs` | top-level loop | 59-110 | Validate `StreamDecl` fields |
|
||||
|
||||
---
|
||||
|
||||
## 📋 Current State of Every File You'll Touch
|
||||
|
||||
### `compiler/ds-parser/src/ast.rs` (273 lines)
|
||||
- `Declaration` enum: 7 variants (line 11-27)
|
||||
- `Expr` enum: 24 variants (line 107-166)
|
||||
- `StreamFrom(String)` at line 143 — the one to change
|
||||
- `ContainerKind` enum includes `Scene` (added for physics)
|
||||
- Uses `Span` for source locations throughout
|
||||
|
||||
### `compiler/ds-parser/src/lexer.rs` (453 lines)
|
||||
- `TokenKind` enum: ~55 variants (line 10-92)
|
||||
- Keywords are matched in `lex_ident_or_keyword` (line 284-329)
|
||||
- `Stream` keyword already exists at line 306
|
||||
- Keywords are case-sensitive, lowercase only
|
||||
- 6 existing tests (line 397-451)
|
||||
|
||||
### `compiler/ds-parser/src/parser.rs` (1089 lines)
|
||||
- `parse_declaration` is the entry point (line 91-105)
|
||||
- Current `stream from` parsing is in `parse_primary` (around line 610-616)
|
||||
- **GOTCHA**: The parser uses `TokenKind::Ident(s)` for string comparison — the `s` field is a `String`, so matching requires `.into()` or `s == "from"` style
|
||||
- `ParseError` struct is at `ds-parser/src/lib.rs` — it has `message`, `line`, `column` fields
|
||||
|
||||
### `compiler/ds-analyzer/src/signal_graph.rs` (471 lines)
|
||||
- `SignalNode` struct at line 20 — has `name`, `kind`, `initial`, `deps` fields
|
||||
- `from_program` (line 97-179) — walks declarations, creates signal nodes
|
||||
- `collect_deps` (line 235-305) — exhaustive `Expr` match with `_ => {}` fallthrough
|
||||
- `collect_bindings` (line 338-409) — similar pattern
|
||||
- `extract_mutations` (line 307-329) — only handles `Expr::Assign` and `Expr::Block`
|
||||
- 4 existing tests (line 426-469)
|
||||
|
||||
### `compiler/ds-types/src/checker.rs` (590 lines)
|
||||
- `infer_type` method has the big Expr match (line 180-430)
|
||||
- `StreamFrom` at line 355 returns `Type::Unknown` — change to `Type::Named`
|
||||
- `Type` enum is in `types.rs` — includes `Int`, `Float`, `String`, `Bool`, `List`, `Record`, `View`, `Named(String)`, `Unknown`
|
||||
- **GOTCHA**: No `Type::Stream` variant — use `Type::Named("StreamState".into())` instead of adding a new variant
|
||||
|
||||
### `compiler/ds-codegen/src/js_emitter.rs` (1561 lines)
|
||||
- `emit_program` (line 54-228) — the main driver, processes declarations in order
|
||||
- `emit_view_expr` (line 239-546) — big match on Expr for view context
|
||||
- `emit_expr` (line 565-653) — general expression to JS string conversion
|
||||
- `emit_event_handler_expr` (line 655-679) — event handlers, this is where `_streamDiff` hooks go
|
||||
- `emit_scene` (line 733-932) — emits physics scene code
|
||||
- `RUNTIME_JS` const (line 1125-1559) — the full runtime, ~435 lines of JS in a Rust raw string
|
||||
- `CSS_RESET` const (line 1034-1122)
|
||||
- **GOTCHA**: The runtime is a raw string `r#"..."#`. All JS inside must use `function()` not arrow functions with `{` in template literals, because `#` in JS comments could break the raw string. Be careful with escaping.
|
||||
- **GOTCHA**: `emit_program` iterates `program.declarations` multiple times with `if let` filters — it's NOT a single match block. Add a new iteration pass for `Declaration::Stream`.
|
||||
|
||||
### `compiler/ds-cli/src/main.rs` (420 lines)
|
||||
- Uses `clap::Parser` derive macro
|
||||
- `Commands` enum at line 21 — has `Build`, `Dev`, `Check`
|
||||
- `compile()` function at line 57 — runs the full pipeline: lex → parse → analyze → type check → codegen
|
||||
- `cmd_dev` (line 158-326) — HTTP server with HMR, this is what `cmd_stream` should reuse
|
||||
- **GOTCHA**: `ds-cli` depends on all compiler crates but NOT on `ds-stream`. If you want `cmd_stream` to auto-start the relay, you need to either add `ds-stream` as a dependency or spawn it as a subprocess.
|
||||
|
||||
### `compiler/ds-layout/src/solver.rs` (453 lines)
|
||||
- `LayoutRect` struct at line 143 — has `x`, `y`, `width`, `height` (all `f64`)
|
||||
- The solver is not used at runtime yet — it's available but the codegen currently uses CSS flexbox
|
||||
- Adding `to_bytes`/`from_bytes` is safe and self-contained
|
||||
|
||||
---
|
||||
|
||||
## 🧪 Existing Tests
|
||||
|
||||
Run all tests before and after each change:
|
||||
|
||||
```bash
|
||||
cargo test --workspace 2>&1 | grep -E '(test result|FAILED)'
|
||||
```
|
||||
|
||||
Current test counts:
|
||||
- `ds-parser`: 5 tests (lexer) + parser tests
|
||||
- `ds-analyzer`: 4 tests (signal graph)
|
||||
- `ds-types`: ~8 tests (type checker)
|
||||
- `ds-codegen`: 0 tests (output is verified manually via HTML)
|
||||
- `ds-stream`: 17 tests (protocol + codec)
|
||||
|
||||
### Tests to add:
|
||||
|
||||
```rust
|
||||
// In ds-parser tests:
|
||||
#[test]
|
||||
fn test_stream_decl() {
|
||||
let src = r#"stream main on "ws://localhost:9100" { mode: signal }"#;
|
||||
let tokens = Lexer::new(src).tokenize();
|
||||
let program = Parser::new(tokens).parse_program().unwrap();
|
||||
assert!(matches!(&program.declarations[0], Declaration::Stream(_)));
|
||||
}
|
||||
|
||||
// In ds-analyzer tests:
|
||||
#[test]
|
||||
fn test_streamable_signals() {
|
||||
let src = r#"
|
||||
stream main on "ws://localhost:9100"
|
||||
let count = 0
|
||||
view main = column [ text "Count: {count}" ]
|
||||
"#;
|
||||
let (graph, _) = analyze(src);
|
||||
assert!(graph.nodes.iter().any(|n| n.name == "count" && n.streamable));
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 📡 Protocol Reference (for JS runtime code)
|
||||
|
||||
The JS runtime needs to encode/decode the exact same binary protocol as `engine/ds-stream/src/protocol.rs`. Here are the constants:
|
||||
|
||||
### Header Layout (16 bytes, little-endian)
|
||||
|
||||
| Offset | Size | Field | Description |
|
||||
|--------|------|-------|-------------|
|
||||
| 0 | u8 | frame_type | See frame types below |
|
||||
| 1 | u8 | flags | `0x01`=input, `0x02`=keyframe, `0x04`=compressed |
|
||||
| 2 | u16 | seq | Sequence number (wraps at 65535) |
|
||||
| 4 | u32 | timestamp | Milliseconds since stream start |
|
||||
| 8 | u16 | width | Frame width (0 for non-pixel frames) |
|
||||
| 10 | u16 | height | Frame height (0 for non-pixel frames) |
|
||||
| 12 | u32 | payload_len | Byte length of payload after header |
|
||||
|
||||
### Frame Type Constants (use in JS)
|
||||
|
||||
```javascript
|
||||
const FRAME_PIXELS = 0x01; // Raw RGBA framebuffer
|
||||
const FRAME_COMPRESSED = 0x02; // PNG/WebP compressed frame
|
||||
const FRAME_DELTA = 0x03; // XOR delta + RLE compressed
|
||||
const FRAME_AUDIO_PCM = 0x10; // Float32 PCM samples
|
||||
const FRAME_AUDIO_COMP = 0x11; // Opus compressed audio
|
||||
const FRAME_HAPTIC = 0x20; // Vibration command
|
||||
const FRAME_SIGNAL_SYNC = 0x30; // Full signal state (JSON)
|
||||
const FRAME_SIGNAL_DIFF = 0x31; // Changed signals only (JSON)
|
||||
const FRAME_NEURAL = 0x40; // Neural-generated pixels
|
||||
const FRAME_PING = 0xFE; // Keep-alive
|
||||
const FRAME_END = 0xFF; // Stream termination
|
||||
```
|
||||
|
||||
### Input Type Constants (receiver → source)
|
||||
|
||||
```javascript
|
||||
const INPUT_POINTER = 0x01; // x(u16) y(u16) buttons(u8)
|
||||
const INPUT_PTR_DOWN = 0x02; // same
|
||||
const INPUT_PTR_UP = 0x03; // same
|
||||
const INPUT_KEY_DOWN = 0x10; // keycode(u16) modifiers(u8)
|
||||
const INPUT_KEY_UP = 0x11; // same
|
||||
const INPUT_SCROLL = 0x50; // dx(i16) dy(i16)
|
||||
const INPUT_RESIZE = 0x60; // width(u16) height(u16)
|
||||
const FLAG_INPUT = 0x01; // Set in flags byte for input messages
|
||||
```
|
||||
|
||||
### Relay WebSocket Paths
|
||||
|
||||
```
|
||||
ws://localhost:9100/source — first connection becomes the source
|
||||
ws://localhost:9100/stream — subsequent connections are receivers
|
||||
```
|
||||
|
||||
The relay uses `tokio-tungstenite` and routes:
|
||||
- Source frames → broadcast to all receivers
|
||||
- Receiver inputs → mpsc to source
|
||||
|
||||
---
|
||||
|
||||
## 🚨 Common Gotchas
|
||||
|
||||
1. **Raw string escaping**: The `RUNTIME_JS` const uses `r#"..."#`. If your JS contains `"#` (like a comment `// foo#bar`), the Rust raw string will terminate early. Use `r##"..."##` if needed.
|
||||
|
||||
2. **Lexer keyword ordering**: Keywords in `lex_ident_or_keyword` are in a match block. The order doesn't matter for correctness but alphabetical keeps it clean.
|
||||
|
||||
3. **`emit_program` is multi-pass**: The codegen iterates `program.declarations` **6 separate times** with `if let` filters, not once with a match. Your `Declaration::Stream` processing should be its own pass, placed BEFORE the signal initialization pass (so streaming is initialized before signals fire).
|
||||
|
||||
4. **Signal `.value` wrapping**: The codegen assumes every signal identifier gets `.value` appended (line 588: `Expr::Ident(name) => format!("{name}.value")`). For `StreamFrom`, the returned `Signal` already has `.value`, so accessing `remote.count` should emit `remote.value.count`.
|
||||
|
||||
5. **Runtime `emit()` for remote input**: The runtime already has `onEvent` and `emit` functions. Use `emit('remote_pointer', data)` to dispatch input events — then user `.ds` code can handle them with `on remote_pointer -> ...`.
|
||||
|
||||
6. **No `Serialize`/`Deserialize` on AST**: The AST types don't derive serde traits. `SignalManifest` should derive `Serialize` if you want to embed it as JSON in the compiled output.
|
||||
|
||||
7. **`ds-cli` doesn't depend on `ds-stream`**: The CLI can't import relay code directly. To auto-start the relay, use `std::process::Command::new("cargo").args(["run", "-p", "ds-stream"])`.
|
||||
|
|
@ -82,11 +82,10 @@ pub fn message_size(header: &FrameHeader) -> usize {
|
|||
HEADER_SIZE + header.length as usize
|
||||
}
|
||||
|
||||
// ─── Delta Compression (stub for future neural compression) ───
|
||||
// ─── Delta Compression ───
|
||||
|
||||
/// Compute XOR delta between two frames.
|
||||
/// Returns a delta buffer where unchanged pixels are zero bytes.
|
||||
/// This is the simplest possible delta — a neural compressor would replace this.
|
||||
pub fn compute_delta(current: &[u8], previous: &[u8]) -> Vec<u8> {
|
||||
assert_eq!(current.len(), previous.len(), "frames must be same size");
|
||||
current.iter().zip(previous.iter()).map(|(c, p)| c ^ p).collect()
|
||||
|
|
@ -102,46 +101,123 @@ pub fn apply_delta(previous: &[u8], delta: &[u8]) -> Vec<u8> {
|
|||
/// Returns true if the delta is significantly smaller due to zero runs.
|
||||
pub fn delta_is_worthwhile(delta: &[u8]) -> bool {
|
||||
let zero_count = delta.iter().filter(|&&b| b == 0).count();
|
||||
// If more than 30% of bytes are zero, delta is worthwhile
|
||||
zero_count > delta.len() * 3 / 10
|
||||
}
|
||||
|
||||
// ─── RLE Compression for Delta Frames ───
|
||||
|
||||
/// RLE-encode a delta buffer for transmission.
|
||||
///
|
||||
/// Encoding scheme:
|
||||
/// - Non-zero byte `b`: emit literally
|
||||
/// - Run of N zero bytes: emit `0x00` followed by 2-byte LE count
|
||||
///
|
||||
/// Optimized for XOR delta frames where most bytes are 0x00 (unchanged pixels).
|
||||
/// On typical UI frames with 70-95% unchanged content, achieves 5-20x compression.
|
||||
pub fn rle_encode(data: &[u8]) -> Vec<u8> {
|
||||
let mut out = Vec::with_capacity(data.len() / 2);
|
||||
let mut i = 0;
|
||||
while i < data.len() {
|
||||
if data[i] == 0 {
|
||||
let start = i;
|
||||
while i < data.len() && data[i] == 0 {
|
||||
i += 1;
|
||||
}
|
||||
let count = i - start;
|
||||
let mut remaining = count;
|
||||
while remaining > 0 {
|
||||
let chunk = remaining.min(65535);
|
||||
out.push(0x00);
|
||||
out.push((chunk & 0xFF) as u8);
|
||||
out.push(((chunk >> 8) & 0xFF) as u8);
|
||||
remaining -= chunk;
|
||||
}
|
||||
} else {
|
||||
out.push(data[i]);
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
/// RLE-decode a compressed delta buffer.
|
||||
///
|
||||
/// Inverse of `rle_encode`: expands `0x00 count_lo count_hi` markers
|
||||
/// into zero runs, passes non-zero bytes through literally.
|
||||
pub fn rle_decode(data: &[u8]) -> Vec<u8> {
|
||||
let mut out = Vec::with_capacity(data.len() * 2);
|
||||
let mut i = 0;
|
||||
while i < data.len() {
|
||||
if data[i] == 0 {
|
||||
if i + 2 >= data.len() {
|
||||
break;
|
||||
}
|
||||
let count = data[i + 1] as usize | ((data[i + 2] as usize) << 8);
|
||||
out.resize(out.len() + count, 0);
|
||||
i += 3;
|
||||
} else {
|
||||
out.push(data[i]);
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
/// Encode a delta frame with RLE: compute XOR delta, then RLE-compress.
|
||||
/// Returns compressed bytes and the compression ratio (compressed/original).
|
||||
pub fn encode_delta_rle(current: &[u8], previous: &[u8]) -> (Vec<u8>, f32) {
|
||||
let delta = compute_delta(current, previous);
|
||||
let compressed = rle_encode(&delta);
|
||||
let ratio = compressed.len() as f32 / delta.len() as f32;
|
||||
(compressed, ratio)
|
||||
}
|
||||
|
||||
/// Decode a delta frame with RLE: RLE-decompress, then apply XOR delta.
|
||||
pub fn decode_delta_rle(compressed: &[u8], previous: &[u8]) -> Vec<u8> {
|
||||
let delta = rle_decode(compressed);
|
||||
apply_delta(previous, &delta)
|
||||
}
|
||||
|
||||
// ─── Convenience Builders ───
|
||||
|
||||
/// Build a pixel frame message from raw RGBA data.
|
||||
pub fn pixel_frame(
|
||||
seq: u16,
|
||||
timestamp: u32,
|
||||
width: u16,
|
||||
height: u16,
|
||||
rgba_data: &[u8],
|
||||
) -> Vec<u8> {
|
||||
encode_frame(
|
||||
FrameType::Pixels,
|
||||
seq,
|
||||
timestamp,
|
||||
width,
|
||||
height,
|
||||
FLAG_KEYFRAME,
|
||||
rgba_data,
|
||||
)
|
||||
pub fn pixel_frame(seq: u16, timestamp: u32, width: u16, height: u16, rgba_data: &[u8]) -> Vec<u8> {
|
||||
encode_frame(FrameType::Pixels, seq, timestamp, width, height, FLAG_KEYFRAME, rgba_data)
|
||||
}
|
||||
|
||||
/// Build a delta frame message with RLE-compressed payload.
|
||||
pub fn delta_frame(seq: u16, timestamp: u32, width: u16, height: u16, rle_data: &[u8]) -> Vec<u8> {
|
||||
encode_frame(FrameType::DeltaPixels, seq, timestamp, width, height, FLAG_COMPRESSED, rle_data)
|
||||
}
|
||||
|
||||
/// Build a signal sync (full state) frame from a JSON payload.
|
||||
pub fn signal_sync_frame(seq: u16, timestamp: u32, json: &[u8]) -> Vec<u8> {
|
||||
encode_frame(FrameType::SignalSync, seq, timestamp, 0, 0, FLAG_KEYFRAME, json)
|
||||
}
|
||||
|
||||
/// Build a signal diff (changed signals only) frame from a JSON payload.
|
||||
pub fn signal_diff_frame(seq: u16, timestamp: u32, json: &[u8]) -> Vec<u8> {
|
||||
encode_frame(FrameType::SignalDiff, seq, timestamp, 0, 0, 0, json)
|
||||
}
|
||||
|
||||
/// Build a pointer input message.
|
||||
pub fn pointer_input(seq: u16, timestamp: u32, event: &PointerEvent, input_type: InputType) -> Vec<u8> {
|
||||
let header = FrameHeader {
|
||||
frame_type: input_type as u8,
|
||||
flags: FLAG_INPUT,
|
||||
seq,
|
||||
timestamp,
|
||||
width: 0,
|
||||
height: 0,
|
||||
length: PointerEvent::SIZE as u32,
|
||||
};
|
||||
let mut buf = Vec::with_capacity(HEADER_SIZE + PointerEvent::SIZE);
|
||||
buf.extend_from_slice(&header.encode());
|
||||
buf.extend_from_slice(&event.encode());
|
||||
buf
|
||||
encode_input(input_type, seq, timestamp, &event.encode())
|
||||
}
|
||||
|
||||
/// Build a touch input message.
|
||||
pub fn touch_input(seq: u16, timestamp: u32, event: &TouchEvent, input_type: InputType) -> Vec<u8> {
|
||||
encode_input(input_type, seq, timestamp, &event.encode())
|
||||
}
|
||||
|
||||
/// Build a gamepad axis input message.
|
||||
pub fn gamepad_axis_input(seq: u16, timestamp: u32, event: &GamepadAxisEvent) -> Vec<u8> {
|
||||
encode_input(InputType::GamepadAxis, seq, timestamp, &event.encode())
|
||||
}
|
||||
|
||||
/// Build a gamepad button input message.
|
||||
pub fn gamepad_button_input(seq: u16, timestamp: u32, event: &GamepadButtonEvent) -> Vec<u8> {
|
||||
encode_input(InputType::GamepadButton, seq, timestamp, &event.encode())
|
||||
}
|
||||
|
||||
/// Build a ping/heartbeat message.
|
||||
|
|
@ -149,6 +225,11 @@ pub fn ping(seq: u16, timestamp: u32) -> Vec<u8> {
|
|||
encode_frame(FrameType::Ping, seq, timestamp, 0, 0, 0, &[])
|
||||
}
|
||||
|
||||
/// Build a stream-end message.
|
||||
pub fn stream_end(seq: u16, timestamp: u32) -> Vec<u8> {
|
||||
encode_frame(FrameType::End, seq, timestamp, 0, 0, 0, &[])
|
||||
}
|
||||
|
||||
// ─── Tests ───
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
@ -158,15 +239,7 @@ mod tests {
|
|||
#[test]
|
||||
fn frame_encode_decode_roundtrip() {
|
||||
let payload = vec![0xFF; 100];
|
||||
let msg = encode_frame(
|
||||
FrameType::Pixels,
|
||||
1,
|
||||
5000,
|
||||
320,
|
||||
240,
|
||||
FLAG_KEYFRAME,
|
||||
&payload,
|
||||
);
|
||||
let msg = encode_frame(FrameType::Pixels, 1, 5000, 320, 240, FLAG_KEYFRAME, &payload);
|
||||
let decoded = decode_message(&msg).unwrap();
|
||||
assert_eq!(decoded.header.frame_type, FrameType::Pixels as u8);
|
||||
assert_eq!(decoded.header.seq, 1);
|
||||
|
|
@ -193,26 +266,20 @@ mod tests {
|
|||
#[test]
|
||||
fn delta_compression() {
|
||||
let frame1 = vec![10, 20, 30, 40, 50, 60, 70, 80];
|
||||
let frame2 = vec![10, 20, 30, 40, 55, 65, 70, 80]; // 2 pixels changed
|
||||
let frame2 = vec![10, 20, 30, 40, 55, 65, 70, 80];
|
||||
let delta = compute_delta(&frame2, &frame1);
|
||||
// Unchanged bytes should be 0
|
||||
assert_eq!(delta[0], 0);
|
||||
assert_eq!(delta[1], 0);
|
||||
assert_eq!(delta[6], 0);
|
||||
assert_eq!(delta[7], 0);
|
||||
// Reconstruct
|
||||
let reconstructed = apply_delta(&frame1, &delta);
|
||||
assert_eq!(reconstructed, frame2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn delta_worthwhile_check() {
|
||||
// All zeros = definitely worthwhile (100% unchanged)
|
||||
let all_zero = vec![0u8; 100];
|
||||
assert!(delta_is_worthwhile(&all_zero));
|
||||
// All changed = not worthwhile
|
||||
let all_changed = vec![0xFF; 100];
|
||||
assert!(!delta_is_worthwhile(&all_changed));
|
||||
assert!(delta_is_worthwhile(&vec![0u8; 100]));
|
||||
assert!(!delta_is_worthwhile(&vec![0xFF; 100]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -227,21 +294,128 @@ mod tests {
|
|||
#[test]
|
||||
fn partial_buffer_returns_none() {
|
||||
let msg = encode_frame(FrameType::Pixels, 0, 0, 10, 10, 0, &[1, 2, 3]);
|
||||
// Truncate: header says 3 bytes payload but we only give 2
|
||||
assert!(decode_message(&msg[..HEADER_SIZE + 2]).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn message_size_calculation() {
|
||||
let header = FrameHeader {
|
||||
frame_type: 0,
|
||||
flags: 0,
|
||||
seq: 0,
|
||||
timestamp: 0,
|
||||
width: 0,
|
||||
height: 0,
|
||||
length: 1024,
|
||||
frame_type: 0, flags: 0, seq: 0, timestamp: 0,
|
||||
width: 0, height: 0, length: 1024,
|
||||
};
|
||||
assert_eq!(message_size(&header), HEADER_SIZE + 1024);
|
||||
}
|
||||
|
||||
// ─── RLE Tests ───
|
||||
|
||||
#[test]
|
||||
fn rle_encode_all_zeros() {
|
||||
let data = vec![0u8; 1000];
|
||||
let encoded = rle_encode(&data);
|
||||
assert_eq!(encoded.len(), 3);
|
||||
let decoded = rle_decode(&encoded);
|
||||
assert_eq!(decoded, data);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rle_encode_no_zeros() {
|
||||
let data: Vec<u8> = (1..=255).collect();
|
||||
let encoded = rle_encode(&data);
|
||||
assert_eq!(encoded.len(), data.len());
|
||||
let decoded = rle_decode(&encoded);
|
||||
assert_eq!(decoded, data);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rle_encode_mixed() {
|
||||
let data = vec![0, 0, 0, 0xAA, 0xBB, 0, 0, 0, 0, 0, 0xCC];
|
||||
let encoded = rle_encode(&data);
|
||||
let decoded = rle_decode(&encoded);
|
||||
assert_eq!(decoded, data);
|
||||
assert!(encoded.len() < data.len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rle_delta_roundtrip() {
|
||||
let frame1: Vec<u8> = (0..100).map(|i| (i * 7 % 256) as u8).collect();
|
||||
let mut frame2 = frame1.clone();
|
||||
for i in (0..10).map(|x| x * 10) {
|
||||
frame2[i] = frame2[i].wrapping_add(1);
|
||||
}
|
||||
let (compressed, ratio) = encode_delta_rle(&frame2, &frame1);
|
||||
assert!(ratio < 0.5, "RLE should compress well: ratio={}", ratio);
|
||||
let reconstructed = decode_delta_rle(&compressed, &frame1);
|
||||
assert_eq!(reconstructed, frame2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rle_large_zero_run() {
|
||||
let data = vec![0u8; 70000];
|
||||
let encoded = rle_encode(&data);
|
||||
let decoded = rle_decode(&encoded);
|
||||
assert_eq!(decoded, data);
|
||||
assert_eq!(encoded.len(), 6); // 2 markers: 65535 + 4465
|
||||
}
|
||||
|
||||
// ─── Signal Frame Tests ───
|
||||
|
||||
#[test]
|
||||
fn signal_sync_frame_roundtrip() {
|
||||
let json = br#"{"count":42,"name":"hello"}"#;
|
||||
let msg = signal_sync_frame(1, 1000, json);
|
||||
let decoded = decode_message(&msg).unwrap();
|
||||
assert_eq!(decoded.header.frame_type, FrameType::SignalSync as u8);
|
||||
assert!(decoded.header.is_keyframe());
|
||||
assert_eq!(decoded.payload, json);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn signal_diff_frame_roundtrip() {
|
||||
let json = br#"{"count":43}"#;
|
||||
let msg = signal_diff_frame(2, 2000, json);
|
||||
let decoded = decode_message(&msg).unwrap();
|
||||
assert_eq!(decoded.header.frame_type, FrameType::SignalDiff as u8);
|
||||
assert!(!decoded.header.is_keyframe());
|
||||
assert_eq!(decoded.payload, json);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn delta_frame_builder() {
|
||||
let rle_data = rle_encode(&[0, 0, 0, 0, 0xFF, 0xAA, 0, 0]);
|
||||
let msg = delta_frame(5, 3000, 320, 240, &rle_data);
|
||||
let decoded = decode_message(&msg).unwrap();
|
||||
assert_eq!(decoded.header.frame_type, FrameType::DeltaPixels as u8);
|
||||
assert_eq!(decoded.header.flags & FLAG_COMPRESSED, FLAG_COMPRESSED);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stream_end_message() {
|
||||
let msg = stream_end(100, 99999);
|
||||
let decoded = decode_message(&msg).unwrap();
|
||||
assert_eq!(decoded.header.frame_type, FrameType::End as u8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn touch_input_builder() {
|
||||
let evt = TouchEvent { id: 1, x: 300, y: 400, phase: 0 };
|
||||
let msg = touch_input(1, 100, &evt, InputType::Touch);
|
||||
let decoded = decode_message(&msg).unwrap();
|
||||
assert!(decoded.header.is_input());
|
||||
let evt2 = TouchEvent::decode(decoded.payload).unwrap();
|
||||
assert_eq!(evt2.id, 1);
|
||||
assert_eq!(evt2.x, 300);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gamepad_input_builders() {
|
||||
let axis = GamepadAxisEvent { axis: 0, value: 16000 };
|
||||
let msg = gamepad_axis_input(1, 100, &axis);
|
||||
let decoded = decode_message(&msg).unwrap();
|
||||
assert_eq!(decoded.header.frame_type, InputType::GamepadAxis as u8);
|
||||
|
||||
let btn = GamepadButtonEvent { button: 3, pressed: 1, analog: 255 };
|
||||
let msg = gamepad_button_input(2, 200, &btn);
|
||||
let decoded = decode_message(&msg).unwrap();
|
||||
assert_eq!(decoded.header.frame_type, InputType::GamepadButton as u8);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -316,6 +316,131 @@ impl ScrollEvent {
|
|||
}
|
||||
}
|
||||
|
||||
/// Touch input event.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct TouchEvent {
|
||||
/// Touch identifier (for multi-touch)
|
||||
pub id: u8,
|
||||
pub x: u16,
|
||||
pub y: u16,
|
||||
/// 0 = start/move, 1 = end, 2 = cancel
|
||||
pub phase: u8,
|
||||
}
|
||||
|
||||
impl TouchEvent {
|
||||
pub const SIZE: usize = 6;
|
||||
|
||||
pub fn encode(&self) -> [u8; Self::SIZE] {
|
||||
let mut buf = [0u8; Self::SIZE];
|
||||
buf[0] = self.id;
|
||||
buf[1..3].copy_from_slice(&self.x.to_le_bytes());
|
||||
buf[3..5].copy_from_slice(&self.y.to_le_bytes());
|
||||
buf[5] = self.phase;
|
||||
buf
|
||||
}
|
||||
|
||||
pub fn decode(buf: &[u8]) -> Option<Self> {
|
||||
if buf.len() < Self::SIZE {
|
||||
return None;
|
||||
}
|
||||
Some(Self {
|
||||
id: buf[0],
|
||||
x: u16::from_le_bytes([buf[1], buf[2]]),
|
||||
y: u16::from_le_bytes([buf[3], buf[4]]),
|
||||
phase: buf[5],
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Gamepad axis event.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct GamepadAxisEvent {
|
||||
/// Axis index (0=left-x, 1=left-y, 2=right-x, 3=right-y, 4=left-trigger, 5=right-trigger)
|
||||
pub axis: u8,
|
||||
/// Axis value: -32768 to 32767 mapped from -1.0 to 1.0
|
||||
pub value: i16,
|
||||
}
|
||||
|
||||
impl GamepadAxisEvent {
|
||||
pub const SIZE: usize = 3;
|
||||
|
||||
pub fn encode(&self) -> [u8; Self::SIZE] {
|
||||
let mut buf = [0u8; Self::SIZE];
|
||||
buf[0] = self.axis;
|
||||
buf[1..3].copy_from_slice(&self.value.to_le_bytes());
|
||||
buf
|
||||
}
|
||||
|
||||
pub fn decode(buf: &[u8]) -> Option<Self> {
|
||||
if buf.len() < Self::SIZE {
|
||||
return None;
|
||||
}
|
||||
Some(Self {
|
||||
axis: buf[0],
|
||||
value: i16::from_le_bytes([buf[1], buf[2]]),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Gamepad button event.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct GamepadButtonEvent {
|
||||
/// Button index
|
||||
pub button: u8,
|
||||
/// 0 = released, 1 = pressed
|
||||
pub pressed: u8,
|
||||
/// Analog value 0-255 (for triggers)
|
||||
pub analog: u8,
|
||||
}
|
||||
|
||||
impl GamepadButtonEvent {
|
||||
pub const SIZE: usize = 3;
|
||||
|
||||
pub fn encode(&self) -> [u8; Self::SIZE] {
|
||||
[self.button, self.pressed, self.analog]
|
||||
}
|
||||
|
||||
pub fn decode(buf: &[u8]) -> Option<Self> {
|
||||
if buf.len() < Self::SIZE {
|
||||
return None;
|
||||
}
|
||||
Some(Self {
|
||||
button: buf[0],
|
||||
pressed: buf[1],
|
||||
analog: buf[2],
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Resize event.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct ResizeEvent {
|
||||
pub width: u16,
|
||||
pub height: u16,
|
||||
}
|
||||
|
||||
impl ResizeEvent {
|
||||
pub const SIZE: usize = 4;
|
||||
|
||||
pub fn encode(&self) -> [u8; Self::SIZE] {
|
||||
let mut buf = [0u8; Self::SIZE];
|
||||
buf[0..2].copy_from_slice(&self.width.to_le_bytes());
|
||||
buf[2..4].copy_from_slice(&self.height.to_le_bytes());
|
||||
buf
|
||||
}
|
||||
|
||||
pub fn decode(buf: &[u8]) -> Option<Self> {
|
||||
if buf.len() < Self::SIZE {
|
||||
return None;
|
||||
}
|
||||
Some(Self {
|
||||
width: u16::from_le_bytes([buf[0], buf[1]]),
|
||||
height: u16::from_le_bytes([buf[2], buf[3]]),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ─── Tests ───
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
@ -409,4 +534,53 @@ mod tests {
|
|||
assert!(!h2.is_input());
|
||||
assert!(!h2.is_keyframe());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn touch_event_roundtrip() {
|
||||
let evt = TouchEvent { id: 2, x: 400, y: 600, phase: 0 };
|
||||
let encoded = evt.encode();
|
||||
let decoded = TouchEvent::decode(&encoded).unwrap();
|
||||
assert_eq!(decoded.id, 2);
|
||||
assert_eq!(decoded.x, 400);
|
||||
assert_eq!(decoded.y, 600);
|
||||
assert_eq!(decoded.phase, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gamepad_axis_roundtrip() {
|
||||
let evt = GamepadAxisEvent { axis: 1, value: -16384 };
|
||||
let encoded = evt.encode();
|
||||
let decoded = GamepadAxisEvent::decode(&encoded).unwrap();
|
||||
assert_eq!(decoded.axis, 1);
|
||||
assert_eq!(decoded.value, -16384);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gamepad_button_roundtrip() {
|
||||
let evt = GamepadButtonEvent { button: 5, pressed: 1, analog: 200 };
|
||||
let encoded = evt.encode();
|
||||
let decoded = GamepadButtonEvent::decode(&encoded).unwrap();
|
||||
assert_eq!(decoded.button, 5);
|
||||
assert_eq!(decoded.pressed, 1);
|
||||
assert_eq!(decoded.analog, 200);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resize_event_roundtrip() {
|
||||
let evt = ResizeEvent { width: 1920, height: 1080 };
|
||||
let encoded = evt.encode();
|
||||
let decoded = ResizeEvent::decode(&encoded).unwrap();
|
||||
assert_eq!(decoded.width, 1920);
|
||||
assert_eq!(decoded.height, 1080);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn touch_event_too_short() {
|
||||
assert!(TouchEvent::decode(&[0u8; 5]).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gamepad_axis_too_short() {
|
||||
assert!(GamepadAxisEvent::decode(&[0u8; 2]).is_none());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,22 +1,28 @@
|
|||
//! WebSocket Relay Server
|
||||
//!
|
||||
//! Routes frames from source→receivers and inputs from receivers→source.
|
||||
//! Roles are determined by the connection path:
|
||||
//! - `/source` — the frame producer (DreamStack renderer)
|
||||
//! - `/stream` — frame consumers (thin receivers)
|
||||
//! Roles are determined by connection order:
|
||||
//! - First connection = source (frame producer)
|
||||
//! - Subsequent connections = receivers (frame consumers)
|
||||
//!
|
||||
//! The relay is intentionally dumb — it just forwards bytes.
|
||||
//! The protocol semantics live in the source and receiver.
|
||||
|
||||
//! Features:
|
||||
//! - Keyframe caching: late-joining receivers get current state instantly
|
||||
//! - Signal state store: caches SignalSync/Diff frames for reconstruction
|
||||
//! - Ping/pong keepalive: detects dead connections
|
||||
//! - Stats tracking: frames, bytes, latency metrics
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::sync::{broadcast, mpsc, RwLock};
|
||||
use tokio::time::{interval, Duration};
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
use crate::protocol::*;
|
||||
|
||||
/// Relay server configuration.
|
||||
pub struct RelayConfig {
|
||||
/// Address to bind to.
|
||||
|
|
@ -25,6 +31,10 @@ pub struct RelayConfig {
|
|||
pub max_receivers: usize,
|
||||
/// Frame broadcast channel capacity.
|
||||
pub frame_buffer_size: usize,
|
||||
/// Keepalive interval in seconds.
|
||||
pub keepalive_interval_secs: u64,
|
||||
/// Keepalive timeout in seconds — disconnect after this many seconds without a pong.
|
||||
pub keepalive_timeout_secs: u64,
|
||||
}
|
||||
|
||||
impl Default for RelayConfig {
|
||||
|
|
@ -33,6 +43,8 @@ impl Default for RelayConfig {
|
|||
addr: "0.0.0.0:9100".parse().unwrap(),
|
||||
max_receivers: 64,
|
||||
frame_buffer_size: 16,
|
||||
keepalive_interval_secs: 10,
|
||||
keepalive_timeout_secs: 30,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -45,6 +57,77 @@ pub struct RelayStats {
|
|||
pub inputs_relayed: u64,
|
||||
pub connected_receivers: usize,
|
||||
pub source_connected: bool,
|
||||
/// Timestamp of last frame from source (milliseconds since start).
|
||||
pub last_frame_timestamp: u32,
|
||||
/// Total keyframes sent.
|
||||
pub keyframes_sent: u64,
|
||||
/// Total signal diffs sent.
|
||||
pub signal_diffs_sent: u64,
|
||||
/// Uptime in seconds.
|
||||
pub uptime_secs: u64,
|
||||
}
|
||||
|
||||
/// Cached state for late-joining receivers.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct StateCache {
|
||||
/// Last pixel keyframe (full RGBA frame).
|
||||
pub last_keyframe: Option<Vec<u8>>,
|
||||
/// Last signal sync frame (full JSON state).
|
||||
pub last_signal_sync: Option<Vec<u8>>,
|
||||
/// Accumulated signal diffs since last sync.
|
||||
/// Late-joining receivers get: last_signal_sync + all diffs.
|
||||
pub pending_signal_diffs: Vec<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl StateCache {
|
||||
/// Process an incoming frame and update cache.
|
||||
fn process_frame(&mut self, msg: &[u8]) {
|
||||
if msg.len() < HEADER_SIZE {
|
||||
return;
|
||||
}
|
||||
let frame_type = msg[0];
|
||||
let flags = msg[1];
|
||||
|
||||
match FrameType::from_u8(frame_type) {
|
||||
// Cache keyframes (pixel or signal sync)
|
||||
Some(FrameType::Pixels) if flags & FLAG_KEYFRAME != 0 => {
|
||||
self.last_keyframe = Some(msg.to_vec());
|
||||
}
|
||||
Some(FrameType::SignalSync) => {
|
||||
self.last_signal_sync = Some(msg.to_vec());
|
||||
self.pending_signal_diffs.clear(); // sync resets diffs
|
||||
}
|
||||
Some(FrameType::SignalDiff) => {
|
||||
self.pending_signal_diffs.push(msg.to_vec());
|
||||
// Cap accumulated diffs to prevent unbounded memory
|
||||
if self.pending_signal_diffs.len() > 1000 {
|
||||
self.pending_signal_diffs.drain(..500);
|
||||
}
|
||||
}
|
||||
Some(FrameType::Keyframe) => {
|
||||
self.last_keyframe = Some(msg.to_vec());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get all messages a late-joining receiver needs to reconstruct current state.
|
||||
fn catchup_messages(&self) -> Vec<Vec<u8>> {
|
||||
let mut msgs = Vec::new();
|
||||
// Send last signal sync first (if available)
|
||||
if let Some(ref sync) = self.last_signal_sync {
|
||||
msgs.push(sync.clone());
|
||||
}
|
||||
// Then all accumulated diffs
|
||||
for diff in &self.pending_signal_diffs {
|
||||
msgs.push(diff.clone());
|
||||
}
|
||||
// Then last pixel keyframe (if available)
|
||||
if let Some(ref kf) = self.last_keyframe {
|
||||
msgs.push(kf.clone());
|
||||
}
|
||||
msgs
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared relay state.
|
||||
|
|
@ -56,16 +139,22 @@ struct RelayState {
|
|||
input_rx: Option<mpsc::Receiver<Vec<u8>>>,
|
||||
/// Live stats
|
||||
stats: RelayStats,
|
||||
/// Cached state for late-joining receivers
|
||||
cache: StateCache,
|
||||
/// Server start time
|
||||
start_time: Instant,
|
||||
}
|
||||
|
||||
/// Run the WebSocket relay server.
|
||||
pub async fn run_relay(config: RelayConfig) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let listener = TcpListener::bind(&config.addr).await?;
|
||||
eprintln!("╔══════════════════════════════════════════════════╗");
|
||||
eprintln!("║ DreamStack Bitstream Relay v0.1.0 ║");
|
||||
eprintln!("║ DreamStack Bitstream Relay v0.2.0 ║");
|
||||
eprintln!("║ ║");
|
||||
eprintln!("║ Source: ws://{}/source ║", config.addr);
|
||||
eprintln!("║ Receiver: ws://{}/stream ║", config.addr);
|
||||
eprintln!("║ ║");
|
||||
eprintln!("║ Features: keyframe cache, keepalive, RLE ║");
|
||||
eprintln!("╚══════════════════════════════════════════════════╝");
|
||||
|
||||
let (frame_tx, _) = broadcast::channel(config.frame_buffer_size);
|
||||
|
|
@ -76,11 +165,40 @@ pub async fn run_relay(config: RelayConfig) -> Result<(), Box<dyn std::error::Er
|
|||
input_tx,
|
||||
input_rx: Some(input_rx),
|
||||
stats: RelayStats::default(),
|
||||
cache: StateCache::default(),
|
||||
start_time: Instant::now(),
|
||||
}));
|
||||
|
||||
// Background: periodic stats logging
|
||||
{
|
||||
let state = state.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut tick = interval(Duration::from_secs(30));
|
||||
loop {
|
||||
tick.tick().await;
|
||||
let s = state.read().await;
|
||||
let uptime = s.start_time.elapsed().as_secs();
|
||||
if s.stats.source_connected || s.stats.connected_receivers > 0 {
|
||||
eprintln!(
|
||||
"[relay] up={}s frames={} bytes={} inputs={} receivers={} signal_diffs={} cached={}",
|
||||
uptime,
|
||||
s.stats.frames_relayed,
|
||||
s.stats.bytes_relayed,
|
||||
s.stats.inputs_relayed,
|
||||
s.stats.connected_receivers,
|
||||
s.stats.signal_diffs_sent,
|
||||
s.cache.last_signal_sync.is_some(),
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
while let Ok((stream, addr)) = listener.accept().await {
|
||||
let state = state.clone();
|
||||
tokio::spawn(handle_connection(stream, addr, state));
|
||||
let keepalive_interval = config.keepalive_interval_secs;
|
||||
let keepalive_timeout = config.keepalive_timeout_secs;
|
||||
tokio::spawn(handle_connection(stream, addr, state, keepalive_interval, keepalive_timeout));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -89,13 +207,13 @@ async fn handle_connection(
|
|||
stream: TcpStream,
|
||||
addr: SocketAddr,
|
||||
state: Arc<RwLock<RelayState>>,
|
||||
keepalive_interval: u64,
|
||||
_keepalive_timeout: u64,
|
||||
) {
|
||||
// Peek at the HTTP upgrade request to determine role
|
||||
let ws_stream = match tokio_tungstenite::accept_hdr_async(
|
||||
stream,
|
||||
|_req: &tokio_tungstenite::tungstenite::handshake::server::Request,
|
||||
res: tokio_tungstenite::tungstenite::handshake::server::Response| {
|
||||
// We'll extract the path from the URI later via a different mechanism
|
||||
Ok(res)
|
||||
},
|
||||
)
|
||||
|
|
@ -108,8 +226,7 @@ async fn handle_connection(
|
|||
}
|
||||
};
|
||||
|
||||
// For simplicity in the PoC, first connection = source, subsequent = receivers.
|
||||
// A production version would parse the URI path.
|
||||
// First connection = source, subsequent = receivers
|
||||
let is_source = {
|
||||
let s = state.read().await;
|
||||
!s.stats.source_connected
|
||||
|
|
@ -117,10 +234,10 @@ async fn handle_connection(
|
|||
|
||||
if is_source {
|
||||
eprintln!("[relay] Source connected: {}", addr);
|
||||
handle_source(ws_stream, addr, state).await;
|
||||
handle_source(ws_stream, addr, state, keepalive_interval).await;
|
||||
} else {
|
||||
eprintln!("[relay] Receiver connected: {}", addr);
|
||||
handle_receiver(ws_stream, addr, state).await;
|
||||
handle_receiver(ws_stream, addr, state, keepalive_interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -128,6 +245,7 @@ async fn handle_source(
|
|||
ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
|
||||
addr: SocketAddr,
|
||||
state: Arc<RwLock<RelayState>>,
|
||||
keepalive_interval: u64,
|
||||
) {
|
||||
let (mut ws_sink, mut ws_source) = ws_stream.split();
|
||||
|
||||
|
|
@ -158,6 +276,24 @@ async fn handle_source(
|
|||
});
|
||||
}
|
||||
|
||||
// Keepalive: periodic pings
|
||||
let state_ping = state.clone();
|
||||
let ping_task = tokio::spawn(async move {
|
||||
let mut tick = interval(Duration::from_secs(keepalive_interval));
|
||||
let mut seq = 0u16;
|
||||
loop {
|
||||
tick.tick().await;
|
||||
let s = state_ping.read().await;
|
||||
if !s.stats.source_connected {
|
||||
break;
|
||||
}
|
||||
drop(s);
|
||||
let ping_msg = crate::codec::ping(seq, 0);
|
||||
let _ = state_ping.read().await.frame_tx.send(ping_msg);
|
||||
seq = seq.wrapping_add(1);
|
||||
}
|
||||
});
|
||||
|
||||
// Receive frames from source → broadcast to receivers
|
||||
while let Some(Ok(msg)) = ws_source.next().await {
|
||||
if let Message::Binary(data) = msg {
|
||||
|
|
@ -166,13 +302,31 @@ async fn handle_source(
|
|||
let mut s = state.write().await;
|
||||
s.stats.frames_relayed += 1;
|
||||
s.stats.bytes_relayed += data_vec.len() as u64;
|
||||
|
||||
// Update state cache
|
||||
s.cache.process_frame(&data_vec);
|
||||
|
||||
// Track frame-type-specific stats
|
||||
if data_vec.len() >= HEADER_SIZE {
|
||||
match FrameType::from_u8(data_vec[0]) {
|
||||
Some(FrameType::SignalDiff) => s.stats.signal_diffs_sent += 1,
|
||||
Some(FrameType::Pixels) | Some(FrameType::Keyframe) |
|
||||
Some(FrameType::SignalSync) => s.stats.keyframes_sent += 1,
|
||||
_ => {}
|
||||
}
|
||||
let ts = u32::from_le_bytes([
|
||||
data_vec[4], data_vec[5], data_vec[6], data_vec[7],
|
||||
]);
|
||||
s.stats.last_frame_timestamp = ts;
|
||||
}
|
||||
}
|
||||
// Broadcast to all receivers (ignore send errors = no receivers)
|
||||
// Broadcast to all receivers
|
||||
let _ = frame_tx.send(data_vec);
|
||||
}
|
||||
}
|
||||
|
||||
// Source disconnected
|
||||
ping_task.abort();
|
||||
eprintln!("[relay] Source disconnected: {}", addr);
|
||||
let mut s = state.write().await;
|
||||
s.stats.source_connected = false;
|
||||
|
|
@ -182,14 +336,17 @@ async fn handle_receiver(
|
|||
ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
|
||||
addr: SocketAddr,
|
||||
state: Arc<RwLock<RelayState>>,
|
||||
_keepalive_interval: u64,
|
||||
) {
|
||||
let (mut ws_sink, mut ws_source) = ws_stream.split();
|
||||
|
||||
// Subscribe to frame broadcast
|
||||
let mut frame_rx = {
|
||||
// Subscribe to frame broadcast and get catchup messages
|
||||
let (mut frame_rx, catchup_msgs) = {
|
||||
let mut s = state.write().await;
|
||||
s.stats.connected_receivers += 1;
|
||||
s.frame_tx.subscribe()
|
||||
let rx = s.frame_tx.subscribe();
|
||||
let catchup = s.cache.catchup_messages();
|
||||
(rx, catchup)
|
||||
};
|
||||
|
||||
let input_tx = {
|
||||
|
|
@ -197,8 +354,25 @@ async fn handle_receiver(
|
|||
s.input_tx.clone()
|
||||
};
|
||||
|
||||
// Send cached state to late-joining receiver
|
||||
if !catchup_msgs.is_empty() {
|
||||
eprintln!(
|
||||
"[relay] Sending {} catchup messages to {}",
|
||||
catchup_msgs.len(),
|
||||
addr
|
||||
);
|
||||
for msg_bytes in catchup_msgs {
|
||||
let msg = Message::Binary(msg_bytes.into());
|
||||
if ws_sink.send(msg).await.is_err() {
|
||||
eprintln!("[relay] Failed to send catchup to {}", addr);
|
||||
let mut s = state.write().await;
|
||||
s.stats.connected_receivers = s.stats.connected_receivers.saturating_sub(1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Forward frames from broadcast → this receiver
|
||||
let _state_clone = state.clone();
|
||||
let addr_clone = addr;
|
||||
let send_task = tokio::spawn(async move {
|
||||
loop {
|
||||
|
|
@ -243,6 +417,7 @@ mod tests {
|
|||
assert_eq!(config.addr, "0.0.0.0:9100".parse::<SocketAddr>().unwrap());
|
||||
assert_eq!(config.max_receivers, 64);
|
||||
assert_eq!(config.frame_buffer_size, 16);
|
||||
assert_eq!(config.keepalive_interval_secs, 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -251,5 +426,77 @@ mod tests {
|
|||
assert_eq!(stats.frames_relayed, 0);
|
||||
assert_eq!(stats.bytes_relayed, 0);
|
||||
assert!(!stats.source_connected);
|
||||
assert_eq!(stats.keyframes_sent, 0);
|
||||
assert_eq!(stats.signal_diffs_sent, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn state_cache_signal_sync() {
|
||||
let mut cache = StateCache::default();
|
||||
|
||||
// Simulate a SignalSync frame
|
||||
let sync_json = br#"{"count":0}"#;
|
||||
let sync_msg = crate::codec::signal_sync_frame(0, 100, sync_json);
|
||||
cache.process_frame(&sync_msg);
|
||||
|
||||
assert!(cache.last_signal_sync.is_some());
|
||||
assert_eq!(cache.pending_signal_diffs.len(), 0);
|
||||
|
||||
// Simulate a SignalDiff
|
||||
let diff_json = br#"{"count":1}"#;
|
||||
let diff_msg = crate::codec::signal_diff_frame(1, 200, diff_json);
|
||||
cache.process_frame(&diff_msg);
|
||||
|
||||
assert_eq!(cache.pending_signal_diffs.len(), 1);
|
||||
|
||||
// Catchup should contain sync + diff
|
||||
let catchup = cache.catchup_messages();
|
||||
assert_eq!(catchup.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn state_cache_signal_sync_resets_diffs() {
|
||||
let mut cache = StateCache::default();
|
||||
|
||||
// Add some diffs
|
||||
for i in 0..5 {
|
||||
let json = format!(r#"{{"count":{}}}"#, i);
|
||||
let msg = crate::codec::signal_diff_frame(i, i as u32 * 100, json.as_bytes());
|
||||
cache.process_frame(&msg);
|
||||
}
|
||||
assert_eq!(cache.pending_signal_diffs.len(), 5);
|
||||
|
||||
// A new sync should clear diffs
|
||||
let sync = crate::codec::signal_sync_frame(10, 1000, b"{}");
|
||||
cache.process_frame(&sync);
|
||||
assert_eq!(cache.pending_signal_diffs.len(), 0);
|
||||
assert!(cache.last_signal_sync.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn state_cache_keyframe() {
|
||||
let mut cache = StateCache::default();
|
||||
|
||||
// Simulate a keyframe pixel frame
|
||||
let kf = crate::codec::pixel_frame(0, 100, 10, 10, &vec![0xFF; 400]);
|
||||
cache.process_frame(&kf);
|
||||
|
||||
assert!(cache.last_keyframe.is_some());
|
||||
let catchup = cache.catchup_messages();
|
||||
assert_eq!(catchup.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn state_cache_diff_cap() {
|
||||
let mut cache = StateCache::default();
|
||||
|
||||
// Add more than 1000 diffs — should auto-trim
|
||||
for i in 0..1100u16 {
|
||||
let json = format!(r#"{{"n":{}}}"#, i);
|
||||
let msg = crate::codec::signal_diff_frame(i, i as u32, json.as_bytes());
|
||||
cache.process_frame(&msg);
|
||||
}
|
||||
// Should have been trimmed: 1100 - 500 = 600 remaining after first trim at 1001
|
||||
assert!(cache.pending_signal_diffs.len() <= 600);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue