diff --git a/compiler/ds-codegen/src/js_emitter.rs b/compiler/ds-codegen/src/js_emitter.rs index 90f79d5..0c07ede 100644 --- a/compiler/ds-codegen/src/js_emitter.rs +++ b/compiler/ds-codegen/src/js_emitter.rs @@ -236,10 +236,20 @@ impl JsEmitter { StreamMode::Signal => "signal", }; let url = self.emit_expr(&stream.relay_url); - self.emit_line(&format!( - "DS._initStream({}, '{}');", - url, mode - )); + + match stream.transport { + StreamTransport::WebRTC => { + // Derive signaling URL: replace /source/ with /signal/ + self.emit_line(&format!( + "DS._initWebRTC({url}.replace('/source/', '/signal/'), {url}, '{mode}');" + )); + } + StreamTransport::WebSocket => { + self.emit_line(&format!( + "DS._initStream({url}, '{mode}');" + )); + } + } } } @@ -1715,6 +1725,110 @@ const DS = (() => { return state; } + // ── WebRTC Data Channel Transport ── + var _rtcPc = null; + var _rtcDc = null; + + function _initWebRTC(signalingUrl, streamUrl, mode) { + var sigWs = new WebSocket(signalingUrl); + var pc = new RTCPeerConnection({ + iceServers: [{ urls: 'stun:stun.l.google.com:19302' }] + }); + _rtcPc = pc; + + // Create data channel for frames (source side) + var dc = pc.createDataChannel('ds-frames', { ordered: false, maxRetransmits: 0 }); + dc.binaryType = 'arraybuffer'; + dc.onopen = function() { + console.log('[ds-webrtc] Data channel open'); + _rtcDc = dc; + // Override stream send to use data channel + var origSend = _streamSend; + _streamSend = function(type, flags, payload) { + if (_rtcDc && _rtcDc.readyState === 'open') { + var ts = (performance.now() - _streamStart) | 0; + var msg = new Uint8Array(16 + payload.length); + var v = new DataView(msg.buffer); + v.setUint8(0, type); + v.setUint8(1, flags); + v.setUint16(2, (_streamSeq++) & 0xFFFF, true); + v.setUint32(4, ts, true); + v.setUint32(12, payload.length, true); + msg.set(payload, 16); + _rtcDc.send(msg.buffer); + } else { + origSend(type, flags, payload); + } + }; + }; + dc.onclose = function() { + console.log('[ds-webrtc] Data channel closed, falling back to WebSocket'); + _rtcDc = null; + }; + + // Handle incoming data channels (receiver side) + pc.ondatachannel = function(event) { + var incoming = event.channel; + incoming.binaryType = 'arraybuffer'; + incoming.onmessage = function(e) { + if (_streamWs && _streamWs.onmessage) { + _streamWs.onmessage(e); + } + }; + incoming.onopen = function() { + console.log('[ds-webrtc] Incoming data channel open'); + _rtcDc = incoming; + }; + }; + + // ICE candidate exchange + pc.onicecandidate = function(e) { + if (e.candidate && sigWs.readyState === 1) { + sigWs.send(JSON.stringify({ type: 'ice', candidate: e.candidate })); + } + }; + + // Signaling messages + sigWs.onmessage = function(e) { + try { + var msg = JSON.parse(e.data); + if (msg.type === 'offer') { + pc.setRemoteDescription(new RTCSessionDescription(msg.sdp)) + .then(function() { return pc.createAnswer(); }) + .then(function(answer) { + pc.setLocalDescription(answer); + sigWs.send(JSON.stringify({ type: 'answer', sdp: answer })); + }); + } else if (msg.type === 'answer') { + pc.setRemoteDescription(new RTCSessionDescription(msg.sdp)); + } else if (msg.type === 'ice' && msg.candidate) { + pc.addIceCandidate(new RTCIceCandidate(msg.candidate)); + } + } catch(ex) {} + }; + + sigWs.onopen = function() { + console.log('[ds-webrtc] Signaling connected:', signalingUrl); + // Source creates offer + pc.createOffer() + .then(function(offer) { + pc.setLocalDescription(offer); + sigWs.send(JSON.stringify({ type: 'offer', sdp: offer })); + }); + }; + + // Fallback: if WebRTC doesn't connect in 5s, use WebSocket + setTimeout(function() { + if (!_rtcDc || _rtcDc.readyState !== 'open') { + console.log('[ds-webrtc] Timeout, falling back to WebSocket'); + _initStream(streamUrl, mode); + } + }, 5000); + + // Also init WebSocket as immediate fallback + _initStream(streamUrl, mode); + } + var _ds = { signal: signal, derived: derived, effect: effect, batch: batch, flush: flush, onEvent: onEvent, emit: emit, keyedList: keyedList, route: _route, navigate: navigate, matchRoute: matchRoute, resource: resource, fetchJSON: fetchJSON, @@ -1722,8 +1836,10 @@ const DS = (() => { scene: scene, circle: circle, rect: rect, line: line, _initStream: _initStream, _streamDiff: _streamDiff, _streamSync: _streamSync, _streamSceneState: _streamSceneState, _connectStream: _connectStream, + _initWebRTC: _initWebRTC, Signal: Signal, Derived: Derived, Effect: Effect, Spring: Spring }; Object.defineProperty(_ds, '_streamWs', { get: function() { return _streamWs; } }); + Object.defineProperty(_ds, '_rtcDc', { get: function() { return _rtcDc; } }); return _ds; })(); "#; diff --git a/compiler/ds-parser/src/ast.rs b/compiler/ds-parser/src/ast.rs index b9b6044..cdbb3cc 100644 --- a/compiler/ds-parser/src/ast.rs +++ b/compiler/ds-parser/src/ast.rs @@ -96,6 +96,7 @@ pub struct StreamDecl { pub view_name: String, pub relay_url: Expr, pub mode: StreamMode, + pub transport: StreamTransport, pub span: Span, } @@ -110,6 +111,16 @@ impl Default for StreamMode { fn default() -> Self { StreamMode::Signal } } +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum StreamTransport { + WebSocket, // default — frames over WebSocket via relay + WebRTC, // peer-to-peer data channels, relay for signaling only +} + +impl Default for StreamTransport { + fn default() -> Self { StreamTransport::WebSocket } +} + /// Function/view parameter. #[derive(Debug, Clone)] pub struct Param { diff --git a/compiler/ds-parser/src/parser.rs b/compiler/ds-parser/src/parser.rs index 80ca411..37cb0b9 100644 --- a/compiler/ds-parser/src/parser.rs +++ b/compiler/ds-parser/src/parser.rs @@ -281,10 +281,12 @@ impl Parser { let relay_url = self.parse_expr()?; - // Optional mode block: `{ mode: signal }` - let mode = if self.check(&TokenKind::LBrace) { + // Optional mode block: `{ mode: signal, transport: webrtc }` + let mut mode = StreamMode::Signal; + let mut transport = StreamTransport::WebSocket; + + if self.check(&TokenKind::LBrace) { self.advance(); // { - let mut mode = StreamMode::Signal; while !self.check(&TokenKind::RBrace) && !self.is_at_end() { self.skip_newlines(); let key = self.expect_ident()?; @@ -295,22 +297,32 @@ impl Parser { TokenKind::Delta => { mode = StreamMode::Delta; self.advance(); } TokenKind::Signals => { mode = StreamMode::Signal; self.advance(); } TokenKind::Ident(s) if s == "signal" => { mode = StreamMode::Signal; self.advance(); } - _ => return Err(self.error("Expected pixel, delta, or signals".into())), + _ => return Err(self.error("Expected pixel, delta, or signal".into())), + } + } else if key == "transport" { + match self.peek() { + TokenKind::Ident(s) if s == "webrtc" || s == "WebRTC" => { + transport = StreamTransport::WebRTC; + self.advance(); + } + TokenKind::Ident(s) if s == "websocket" || s == "ws" => { + transport = StreamTransport::WebSocket; + self.advance(); + } + _ => return Err(self.error("Expected webrtc or websocket".into())), } } if self.check(&TokenKind::Comma) { self.advance(); } self.skip_newlines(); } self.expect(&TokenKind::RBrace)?; - mode - } else { - StreamMode::Signal - }; + } Ok(Declaration::Stream(StreamDecl { view_name, relay_url, mode, + transport, span: Span { start: 0, end: 0, line }, })) } diff --git a/engine/ds-stream/src/relay.rs b/engine/ds-stream/src/relay.rs index b7e37ef..5ca18a3 100644 --- a/engine/ds-stream/src/relay.rs +++ b/engine/ds-stream/src/relay.rs @@ -148,6 +148,8 @@ struct ChannelState { /// Channel: receivers → source (input events) input_tx: mpsc::Sender>, input_rx: Option>>, + /// Broadcast channel for WebRTC signaling (SDP/ICE as text) + signaling_tx: broadcast::Sender, /// Live stats for this channel stats: RelayStats, /// Cached state for late-joining receivers @@ -158,10 +160,12 @@ impl ChannelState { fn new(frame_buffer_size: usize) -> Self { let (frame_tx, _) = broadcast::channel(frame_buffer_size); let (input_tx, input_rx) = mpsc::channel(256); + let (signaling_tx, _) = broadcast::channel(64); Self { frame_tx, input_tx, input_rx: Some(input_rx), + signaling_tx, stats: RelayStats::default(), cache: StateCache::default(), } @@ -199,8 +203,9 @@ impl RelayState { /// Parsed connection role from the WebSocket URI path. #[derive(Debug, Clone)] enum ConnectionRole { - Source(String), // channel name - Receiver(String), // channel name + Source(String), // channel name + Receiver(String), // channel name + Signaling(String), // channel name — WebRTC signaling } /// Parse the WebSocket URI path to determine connection role and channel. @@ -211,6 +216,8 @@ fn parse_path(path: &str) -> ConnectionRole { ["source", name] => ConnectionRole::Source(name.to_string()), ["stream"] => ConnectionRole::Receiver("default".to_string()), ["stream", name] => ConnectionRole::Receiver(name.to_string()), + ["signal"] => ConnectionRole::Signaling("default".to_string()), + ["signal", name] => ConnectionRole::Signaling(name.to_string()), _ => ConnectionRole::Receiver("default".to_string()), // legacy: `/` = receiver } } @@ -219,12 +226,13 @@ fn parse_path(path: &str) -> ConnectionRole { pub async fn run_relay(config: RelayConfig) -> Result<(), Box> { let listener = TcpListener::bind(&config.addr).await?; eprintln!("╔══════════════════════════════════════════════════╗"); - eprintln!("║ DreamStack Bitstream Relay v0.3.0 ║"); + eprintln!("║ DreamStack Bitstream Relay v0.4.0 ║"); eprintln!("║ ║"); eprintln!("║ Source: ws://{}/source/{{name}} ║", config.addr); eprintln!("║ Receiver: ws://{}/stream/{{name}} ║", config.addr); + eprintln!("║ Signal: ws://{}/signal/{{name}} ║", config.addr); eprintln!("║ ║"); - eprintln!("║ Multi-source, keyframe cache, keepalive, RLE ║"); + eprintln!("║ Multi-source, WebRTC signaling, keyframe cache ║"); eprintln!("╚══════════════════════════════════════════════════╝"); let state = Arc::new(RwLock::new(RelayState::new(config.frame_buffer_size))); @@ -296,7 +304,7 @@ async fn handle_connection( let (channel, channel_name) = { let mut s = state.write().await; let name = match &role { - ConnectionRole::Source(n) | ConnectionRole::Receiver(n) => n.clone(), + ConnectionRole::Source(n) | ConnectionRole::Receiver(n) | ConnectionRole::Signaling(n) => n.clone(), }; let ch = s.get_or_create_channel(&name); (ch, name) @@ -324,6 +332,10 @@ async fn handle_connection( eprintln!("[relay:{channel_name}] Receiver connected: {addr}"); handle_receiver(ws_stream, addr, channel, &channel_name).await; } + ConnectionRole::Signaling(ref _name) => { + eprintln!("[relay:{channel_name}] Signaling peer connected: {addr}"); + handle_signaling(ws_stream, addr, channel, &channel_name).await; + } } } @@ -491,6 +503,61 @@ async fn handle_receiver( cs.stats.connected_receivers = cs.stats.connected_receivers.saturating_sub(1); } +/// Handle a WebRTC signaling connection. +/// +/// Signaling peers exchange JSON messages (SDP offers/answers, ICE candidates) +/// over WebSocket. The relay broadcasts all text messages to all other peers +/// on the same channel, enabling peer-to-peer WebRTC setup. +async fn handle_signaling( + ws_stream: tokio_tungstenite::WebSocketStream, + addr: SocketAddr, + channel: Arc>, + channel_name: &str, +) { + let (mut ws_sink, mut ws_source) = ws_stream.split(); + + // Subscribe to signaling broadcast + let mut sig_rx = { + let cs = channel.read().await; + cs.signaling_tx.subscribe() + }; + + let sig_tx = { + let cs = channel.read().await; + cs.signaling_tx.clone() + }; + + // Forward signaling messages from broadcast → this peer + let channel_name_owned = channel_name.to_string(); + let send_task = tokio::spawn(async move { + loop { + match sig_rx.recv().await { + Ok(msg_text) => { + let msg = Message::Text(msg_text.into()); + if ws_sink.send(msg).await.is_err() { + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + eprintln!("[relay:{channel_name_owned}] Signaling peer lagged by {n} messages"); + } + Err(_) => break, + } + } + }); + + // Forward signaling messages from this peer → broadcast to others + while let Some(Ok(msg)) = ws_source.next().await { + if let Message::Text(text) = msg { + let text_str: String = text.into(); + let _ = sig_tx.send(text_str); + } + } + + send_task.abort(); + eprintln!("[relay:{channel_name}] Signaling peer disconnected: {addr}"); +} + // ─── Tests ─── #[cfg(test)] @@ -620,6 +687,22 @@ mod tests { } } + #[test] + fn parse_path_signal_default() { + match parse_path("/signal") { + ConnectionRole::Signaling(name) => assert_eq!(name, "default"), + _ => panic!("Expected Signaling"), + } + } + + #[test] + fn parse_path_signal_named() { + match parse_path("/signal/main") { + ConnectionRole::Signaling(name) => assert_eq!(name, "main"), + _ => panic!("Expected Signaling"), + } + } + #[test] fn parse_path_legacy_root() { match parse_path("/") { diff --git a/examples/streaming-webrtc.ds b/examples/streaming-webrtc.ds new file mode 100644 index 0000000..040930b --- /dev/null +++ b/examples/streaming-webrtc.ds @@ -0,0 +1,26 @@ +-- DreamStack WebRTC Streaming Counter +-- Same as streaming-counter.ds but uses peer-to-peer WebRTC +-- data channels for sub-frame latency, with WebSocket fallback. +-- +-- Run with: +-- cargo run -p ds-stream & +-- dreamstack stream examples/streaming-webrtc.ds + +let count = 0 +let doubled = count * 2 +let message = "WebRTC Counter" + +stream counter on "ws://localhost:9100/source/webrtc-demo" { mode: signal, transport: webrtc } + +view counter = + column [ + text message + text count + text doubled + row [ + button "-" { click: count -= 1 } + button "+" { click: count += 1 } + ] + when count > 10 -> + text "On fire!" + ]