diff --git a/crates/zap-stream/src/api.rs b/crates/zap-stream/src/api.rs index db733b3..a74d194 100644 --- a/crates/zap-stream/src/api.rs +++ b/crates/zap-stream/src/api.rs @@ -28,6 +28,12 @@ use zap_stream_core::egress::hls::HlsEgress; use zap_stream_core::listen::ListenerEndpoint; use zap_stream_core::overseer::Overseer; use zap_stream_db::ZapStreamDb; +use futures_util::{SinkExt, StreamExt}; +use hyper_tungstenite::{HyperWebsocket, tungstenite::Message}; +use tokio::fs::File; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::time::{interval, Duration}; +use tungstenite::Utf8Bytes; #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum Route { @@ -560,21 +566,80 @@ impl Api { Ok(base.body(Self::body_json(&())?)?) } (&Method::GET, Route::AdminPipelineLog) => { + // Check if this is a WebSocket upgrade request + if hyper_tungstenite::is_upgrade_request(&req) { + // For WebSocket, extract auth token from query parameter + // since browsers can't add Authorization headers to WebSocket connections + let full_url = format!( + "{}{}", + self.settings.public_url.trim_end_matches('/'), + req.uri() + ); + let url: url::Url = full_url.parse()?; + let auth_token = url + .query_pairs() + .find_map(|(k, v)| if k == "auth" { Some(v.to_string()) } else { None }) + .ok_or_else(|| anyhow!("Missing auth query parameter for WebSocket"))?; + + let auth = self.check_nip98_auth_from_token(&auth_token, &url).await?; + let admin_uid = self.check_admin_access(&auth.pubkey).await?; + let stream_id = params + .get("stream_id") + .ok_or_else(|| anyhow!("Missing stream_id"))?; + return self.handle_pipeline_log_websocket(req, admin_uid, stream_id).await; + } + let auth = check_nip98_auth(&req, &self.settings, &self.db).await?; let admin_uid = self.check_admin_access(&auth.pubkey).await?; let stream_id = params .get("stream_id") .ok_or_else(|| anyhow!("Missing stream_id"))?; - let log_content = self.admin_get_pipeline_log(admin_uid, stream_id).await?; - let response = Response::builder() + + // Parse query parameters + let full_url = format!( + "{}{}", + self.settings.public_url.trim_end_matches('/'), + req.uri() + ); + let url: url::Url = full_url.parse()?; + + let tail_lines: Option = url + .query_pairs() + .find_map(|(k, v)| if k == "tail" { Some(v) } else { None }) + .and_then(|v| v.parse().ok()); + + let download: bool = url + .query_pairs() + .find_map(|(k, v)| if k == "download" { Some(v) } else { None }) + .and_then(|v| v.parse().ok()) + .unwrap_or(false); + + let log_content = self + .admin_get_pipeline_log(admin_uid, stream_id, tail_lines, download) + .await?; + + let mut response_builder = Response::builder() .header("server", "zap-stream") - .header("content-type", "text/plain; charset=utf-8") .header("access-control-allow-origin", "*") .header("access-control-allow-headers", "*") .header( "access-control-allow-methods", "HEAD, GET, PATCH, DELETE, POST, OPTIONS", - ) + ); + + if download { + response_builder = response_builder + .header("content-type", "text/plain; charset=utf-8") + .header( + "content-disposition", + format!("attachment; filename=\"pipeline-{}.log\"", stream_id), + ); + } else { + response_builder = response_builder + .header("content-type", "text/plain; charset=utf-8"); + } + + let response = response_builder .body(Full::from(log_content) .map_err(|e| match e {}) .boxed())?; @@ -1043,6 +1108,26 @@ impl Api { Ok(uid) } + /// Authenticate using a base64 NIP-98 token from query parameter + /// Used for WebSocket connections where Authorization header cannot be set + async fn check_nip98_auth_from_token( + &self, + token: &str, + expected_url: &url::Url, + ) -> Result { + use crate::auth::{AuthRequest, TokenSource, authenticate_nip98}; + + let auth_request = AuthRequest { + token_source: TokenSource::WebSocketToken(token.to_string()), + expected_url: expected_url.clone(), + expected_method: "GET".to_string(), + skip_url_check: self.settings.ignore_auth_url.unwrap_or(false), + admin_pubkey: self.settings.admin_pubkey.clone(), + }; + + authenticate_nip98(auth_request, &self.db).await + } + async fn delete_stream(&self, pubkey: &PublicKey, stream_id: &str) -> Result<()> { let uid = self.db.upsert_user(&pubkey.to_bytes()).await?; let stream_uuid = Uuid::parse_str(stream_id)?; @@ -1611,7 +1696,13 @@ impl Api { Ok(()) } - async fn admin_get_pipeline_log(&self, admin_uid: u64, stream_id: &str) -> Result { + async fn admin_get_pipeline_log( + &self, + admin_uid: u64, + stream_id: &str, + tail_lines: Option, + download: bool, + ) -> Result { use tokio::fs; // Validate stream_id is a valid UUID to prevent path traversal attacks @@ -1626,7 +1717,22 @@ impl Api { // Try to read the log file let log_content = match fs::read_to_string(&log_path).await { - Ok(content) => content, + Ok(content) => { + if download { + // Return entire file for download + content + } else { + // Return last N lines (default 200) + let lines_to_return = tail_lines.unwrap_or(200); + let lines: Vec<&str> = content.lines().collect(); + let start_index = if lines.len() > lines_to_return { + lines.len() - lines_to_return + } else { + 0 + }; + lines[start_index..].join("\n") + } + } Err(e) if e.kind() == std::io::ErrorKind::NotFound => { // Return helpful message if file doesn't exist String::from("Pipeline log file not found. This may be because the stream has not been started yet or the stream ID is invalid.") @@ -1638,19 +1744,202 @@ impl Api { }; // Log admin action + let action_desc = if download { + "downloaded" + } else { + "viewed" + }; self.db .log_admin_action( admin_uid, "view_pipeline_log", Some("stream"), Some(stream_id), - &format!("Admin viewed pipeline log for stream {}", stream_id), + &format!("Admin {} pipeline log for stream {}", action_desc, stream_id), None, ) .await?; Ok(log_content) } + + async fn handle_pipeline_log_websocket( + &self, + req: Request, + admin_uid: u64, + stream_id: &str, + ) -> Result>> { + // Validate stream_id is a valid UUID to prevent path traversal attacks + let stream_uuid = Uuid::parse_str(stream_id) + .context("Invalid stream_id format, must be a valid UUID")?; + + // Construct path to pipeline.log in stream's output directory + let log_path = std::path::Path::new(&self.settings.output_dir) + .join(stream_uuid.to_string()) + .join("pipeline.log"); + + // Upgrade the connection to WebSocket + let (response, websocket) = hyper_tungstenite::upgrade(req, None)?; + + // Log admin action + let db = self.db.clone(); + let stream_id_str = stream_id.to_string(); + self.db + .log_admin_action( + admin_uid, + "tail_pipeline_log", + Some("stream"), + Some(stream_id), + &format!("Admin started tailing pipeline log for stream {}", stream_id), + None, + ) + .await?; + + // Spawn a task to handle the WebSocket connection + tokio::spawn(async move { + if let Err(e) = + Self::handle_log_tail_websocket(websocket, log_path, stream_id_str).await + { + error!("Pipeline log WebSocket error: {}", e); + } + }); + + Ok(response.map(|body| body.map_err(|e| anyhow::anyhow!("{}", e)).boxed())) + } + + async fn send_ws_error_and_close( + ws_sender: &mut futures_util::stream::SplitSink< + hyper_tungstenite::WebSocketStream>, + Message, + >, + msg: &str, + ) -> Result<()> { + ws_sender + .send(Message::Text(Utf8Bytes::from(msg))) + .await?; + ws_sender.send(Message::Close(None)).await?; + Ok(()) + } + + async fn handle_log_tail_websocket( + websocket: HyperWebsocket, + log_path: std::path::PathBuf, + stream_id: String, + ) -> Result<()> { + let ws_stream = websocket.await?; + let (mut ws_sender, mut ws_receiver) = ws_stream.split(); + + info!( + "WebSocket connection established for pipeline log tailing: stream={}", + stream_id + ); + + // Open the log file + let file = match File::open(&log_path).await { + Ok(f) => f, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + Self::send_ws_error_and_close( + &mut ws_sender, + "Pipeline log file not found. This may be because the stream has not been started yet or the stream ID is invalid." + ).await?; + return Ok(()); + } + Err(e) => { + let msg = format!("Failed to open pipeline log: {}", e); + Self::send_ws_error_and_close(&mut ws_sender, &msg).await?; + return Err(anyhow!(msg)); + } + }; + + // Send existing log content first (last 200 lines) using the same file handle + // Note: This reads the entire file into memory to get the last N lines. + // For typical pipeline logs (< 100MB), this is acceptable and provides good UX. + // Alternative approaches (seeking from end, circular buffer) would be more complex + // and wouldn't improve the common case significantly. + let mut reader = BufReader::new(file); + let mut line = String::new(); + let mut lines_buffer = Vec::new(); + + // Read all lines into a buffer + loop { + line.clear(); + match reader.read_line(&mut line).await { + Ok(0) => break, // EOF + Ok(_) => { + lines_buffer.push(line.clone()); + } + Err(e) => { + warn!("Error reading log file during initial load: {}", e); + break; + } + } + } + + // Send last 200 lines + let start_index = if lines_buffer.len() > 200 { + lines_buffer.len() - 200 + } else { + 0 + }; + + if !lines_buffer.is_empty() { + let existing_content = lines_buffer[start_index..].join(""); + ws_sender + .send(Message::Text(Utf8Bytes::from(&existing_content))) + .await?; + } + + // Now tail new lines + let mut check_interval = interval(Duration::from_millis(100)); + + loop { + tokio::select! { + // Check for new lines in the log file + _ = check_interval.tick() => { + loop { + line.clear(); + match reader.read_line(&mut line).await { + Ok(0) => break, // EOF + Ok(_) => { + if !line.is_empty() { + if let Err(e) = ws_sender + .send(Message::Text(Utf8Bytes::from(&line))) + .await + { + error!("Failed to send log line: {}", e); + return Ok(()); + } + } + } + Err(e) => { + error!("Error reading log file: {}", e); + return Err(anyhow!(e)); + } + } + } + } + + // Handle incoming WebSocket messages (for close/ping/pong) + msg = ws_receiver.next() => { + match msg { + Some(Ok(Message::Close(_))) => { + info!("WebSocket connection closed by client: stream={}", stream_id); + return Ok(()); + } + Some(Err(e)) => { + error!("WebSocket error: {}", e); + return Err(anyhow!(e)); + } + None => { + info!("WebSocket connection ended: stream={}", stream_id); + return Ok(()); + } + _ => {} + } + } + } + } + } } impl HttpServerPlugin for Api {