Skip to content
Draft
303 changes: 296 additions & 7 deletions crates/zap-stream/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ use zap_stream_core::egress::hls::HlsEgress;
use zap_stream_core::listen::ListenerEndpoint;
use zap_stream_core::overseer::Overseer;
use zap_stream_db::ZapStreamDb;
use futures_util::{SinkExt, StreamExt};
use hyper_tungstenite::{HyperWebsocket, tungstenite::Message};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::time::{interval, Duration};
use tungstenite::Utf8Bytes;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Route {
Expand Down Expand Up @@ -560,21 +566,80 @@ impl Api {
Ok(base.body(Self::body_json(&())?)?)
}
(&Method::GET, Route::AdminPipelineLog) => {
// Check if this is a WebSocket upgrade request
if hyper_tungstenite::is_upgrade_request(&req) {
// For WebSocket, extract auth token from query parameter
// since browsers can't add Authorization headers to WebSocket connections
let full_url = format!(
"{}{}",
self.settings.public_url.trim_end_matches('/'),
req.uri()
);
let url: url::Url = full_url.parse()?;
let auth_token = url
.query_pairs()
.find_map(|(k, v)| if k == "auth" { Some(v.to_string()) } else { None })
.ok_or_else(|| anyhow!("Missing auth query parameter for WebSocket"))?;

let auth = self.check_nip98_auth_from_token(&auth_token, &url).await?;
let admin_uid = self.check_admin_access(&auth.pubkey).await?;
let stream_id = params
.get("stream_id")
.ok_or_else(|| anyhow!("Missing stream_id"))?;
return self.handle_pipeline_log_websocket(req, admin_uid, stream_id).await;
}

let auth = check_nip98_auth(&req, &self.settings, &self.db).await?;
let admin_uid = self.check_admin_access(&auth.pubkey).await?;
let stream_id = params
.get("stream_id")
.ok_or_else(|| anyhow!("Missing stream_id"))?;
let log_content = self.admin_get_pipeline_log(admin_uid, stream_id).await?;
let response = Response::builder()

// Parse query parameters
let full_url = format!(
"{}{}",
self.settings.public_url.trim_end_matches('/'),
req.uri()
);
let url: url::Url = full_url.parse()?;

let tail_lines: Option<usize> = url
.query_pairs()
.find_map(|(k, v)| if k == "tail" { Some(v) } else { None })
.and_then(|v| v.parse().ok());

let download: bool = url
.query_pairs()
.find_map(|(k, v)| if k == "download" { Some(v) } else { None })
.and_then(|v| v.parse().ok())
.unwrap_or(false);

let log_content = self
.admin_get_pipeline_log(admin_uid, stream_id, tail_lines, download)
.await?;

let mut response_builder = Response::builder()
.header("server", "zap-stream")
.header("content-type", "text/plain; charset=utf-8")
.header("access-control-allow-origin", "*")
.header("access-control-allow-headers", "*")
.header(
"access-control-allow-methods",
"HEAD, GET, PATCH, DELETE, POST, OPTIONS",
)
);

if download {
response_builder = response_builder
.header("content-type", "text/plain; charset=utf-8")
.header(
"content-disposition",
format!("attachment; filename=\"pipeline-{}.log\"", stream_id),
);
} else {
response_builder = response_builder
.header("content-type", "text/plain; charset=utf-8");
}

let response = response_builder
.body(Full::from(log_content)
.map_err(|e| match e {})
.boxed())?;
Expand Down Expand Up @@ -1043,6 +1108,26 @@ impl Api {
Ok(uid)
}

/// Authenticate using a base64 NIP-98 token from query parameter
/// Used for WebSocket connections where Authorization header cannot be set
async fn check_nip98_auth_from_token(
&self,
token: &str,
expected_url: &url::Url,
) -> Result<crate::auth::AuthResult> {
use crate::auth::{AuthRequest, TokenSource, authenticate_nip98};

let auth_request = AuthRequest {
token_source: TokenSource::WebSocketToken(token.to_string()),
expected_url: expected_url.clone(),
expected_method: "GET".to_string(),
skip_url_check: self.settings.ignore_auth_url.unwrap_or(false),
admin_pubkey: self.settings.admin_pubkey.clone(),
};

authenticate_nip98(auth_request, &self.db).await
}

async fn delete_stream(&self, pubkey: &PublicKey, stream_id: &str) -> Result<()> {
let uid = self.db.upsert_user(&pubkey.to_bytes()).await?;
let stream_uuid = Uuid::parse_str(stream_id)?;
Expand Down Expand Up @@ -1611,7 +1696,13 @@ impl Api {
Ok(())
}

async fn admin_get_pipeline_log(&self, admin_uid: u64, stream_id: &str) -> Result<String> {
async fn admin_get_pipeline_log(
&self,
admin_uid: u64,
stream_id: &str,
tail_lines: Option<usize>,
download: bool,
) -> Result<String> {
use tokio::fs;

// Validate stream_id is a valid UUID to prevent path traversal attacks
Expand All @@ -1626,7 +1717,22 @@ impl Api {

// Try to read the log file
let log_content = match fs::read_to_string(&log_path).await {
Ok(content) => content,
Ok(content) => {
if download {
// Return entire file for download
content
} else {
// Return last N lines (default 200)
let lines_to_return = tail_lines.unwrap_or(200);
let lines: Vec<&str> = content.lines().collect();
let start_index = if lines.len() > lines_to_return {
lines.len() - lines_to_return
} else {
0
};
lines[start_index..].join("\n")
}
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// Return helpful message if file doesn't exist
String::from("Pipeline log file not found. This may be because the stream has not been started yet or the stream ID is invalid.")
Expand All @@ -1638,19 +1744,202 @@ impl Api {
};

// Log admin action
let action_desc = if download {
"downloaded"
} else {
"viewed"
};
self.db
.log_admin_action(
admin_uid,
"view_pipeline_log",
Some("stream"),
Some(stream_id),
&format!("Admin viewed pipeline log for stream {}", stream_id),
&format!("Admin {} pipeline log for stream {}", action_desc, stream_id),
None,
)
.await?;

Ok(log_content)
}

async fn handle_pipeline_log_websocket(
&self,
req: Request<Incoming>,
admin_uid: u64,
stream_id: &str,
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>> {
// Validate stream_id is a valid UUID to prevent path traversal attacks
let stream_uuid = Uuid::parse_str(stream_id)
.context("Invalid stream_id format, must be a valid UUID")?;

// Construct path to pipeline.log in stream's output directory
let log_path = std::path::Path::new(&self.settings.output_dir)
.join(stream_uuid.to_string())
.join("pipeline.log");

// Upgrade the connection to WebSocket
let (response, websocket) = hyper_tungstenite::upgrade(req, None)?;

// Log admin action
let db = self.db.clone();
let stream_id_str = stream_id.to_string();
self.db
.log_admin_action(
admin_uid,
"tail_pipeline_log",
Some("stream"),
Some(stream_id),
&format!("Admin started tailing pipeline log for stream {}", stream_id),
None,
)
.await?;

// Spawn a task to handle the WebSocket connection
tokio::spawn(async move {
if let Err(e) =
Self::handle_log_tail_websocket(websocket, log_path, stream_id_str).await
{
error!("Pipeline log WebSocket error: {}", e);
}
});

Ok(response.map(|body| body.map_err(|e| anyhow::anyhow!("{}", e)).boxed()))
}

async fn send_ws_error_and_close(
ws_sender: &mut futures_util::stream::SplitSink<
hyper_tungstenite::WebSocketStream<hyper_util::rt::TokioIo<hyper::upgrade::Upgraded>>,
Message,
>,
msg: &str,
) -> Result<()> {
ws_sender
.send(Message::Text(Utf8Bytes::from(msg)))
.await?;
ws_sender.send(Message::Close(None)).await?;
Ok(())
}

async fn handle_log_tail_websocket(
websocket: HyperWebsocket,
log_path: std::path::PathBuf,
stream_id: String,
) -> Result<()> {
let ws_stream = websocket.await?;
let (mut ws_sender, mut ws_receiver) = ws_stream.split();

info!(
"WebSocket connection established for pipeline log tailing: stream={}",
stream_id
);

// Open the log file
let file = match File::open(&log_path).await {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
Self::send_ws_error_and_close(
&mut ws_sender,
"Pipeline log file not found. This may be because the stream has not been started yet or the stream ID is invalid."
).await?;
return Ok(());
}
Err(e) => {
let msg = format!("Failed to open pipeline log: {}", e);
Self::send_ws_error_and_close(&mut ws_sender, &msg).await?;
return Err(anyhow!(msg));
}
};

// Send existing log content first (last 200 lines) using the same file handle
// Note: This reads the entire file into memory to get the last N lines.
// For typical pipeline logs (< 100MB), this is acceptable and provides good UX.
// Alternative approaches (seeking from end, circular buffer) would be more complex
// and wouldn't improve the common case significantly.
let mut reader = BufReader::new(file);
let mut line = String::new();
let mut lines_buffer = Vec::new();

// Read all lines into a buffer
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break, // EOF
Ok(_) => {
lines_buffer.push(line.clone());
}
Err(e) => {
warn!("Error reading log file during initial load: {}", e);
break;
}
}
}

// Send last 200 lines
let start_index = if lines_buffer.len() > 200 {
lines_buffer.len() - 200
} else {
0
};

if !lines_buffer.is_empty() {
let existing_content = lines_buffer[start_index..].join("");
ws_sender
.send(Message::Text(Utf8Bytes::from(&existing_content)))
.await?;
}

// Now tail new lines
let mut check_interval = interval(Duration::from_millis(100));

loop {
tokio::select! {
// Check for new lines in the log file
_ = check_interval.tick() => {
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break, // EOF
Ok(_) => {
if !line.is_empty() {
if let Err(e) = ws_sender
.send(Message::Text(Utf8Bytes::from(&line)))
.await
{
error!("Failed to send log line: {}", e);
return Ok(());
}
}
}
Err(e) => {
error!("Error reading log file: {}", e);
return Err(anyhow!(e));
}
}
}
}

// Handle incoming WebSocket messages (for close/ping/pong)
msg = ws_receiver.next() => {
match msg {
Some(Ok(Message::Close(_))) => {
info!("WebSocket connection closed by client: stream={}", stream_id);
return Ok(());
}
Some(Err(e)) => {
error!("WebSocket error: {}", e);
return Err(anyhow!(e));
}
None => {
info!("WebSocket connection ended: stream={}", stream_id);
return Ok(());
}
_ => {}
}
}
}
}
}
}

impl HttpServerPlugin for Api {
Expand Down