From c36c22afd5c0f533ff651140da1d9e618abdbaed Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Fri, 21 Nov 2025 10:10:48 +0800 Subject: [PATCH] fix(s3s/http): prevent unbounded memory allocation in POST object (#370) Add defense-in-depth limits to multipart/form-data parsing for POST object to prevent DoS attacks via oversized uploads: - MAX_FORM_FIELD_SIZE: 1 MB per field (matching MinIO) - MAX_FORM_FIELDS_SIZE: 20 MB total for all form fields - MAX_FORM_PARTS: 1000 maximum parts - MAX_POST_OBJECT_FILE_SIZE: 5 GB for file content (matching S3 single PUT limit) Why buffering instead of streaming: A pure streaming solution using FileStream was initially attempted, but it breaks compatibility with s3s-proxy and other consumers that use the AWS SDK. The AWS SDK requires a known Content-Length to compute SHA-256 checksums for SigV4 request signing. Since FileStream returns an unknown remaining length, the SDK throws UnsizedRequestBody errors. MinIO can accept streaming POST uploads directly because its HTTP server parses multipart boundaries at the protocol level without needing size upfront. However, when requests are proxied through s3s-proxy using the AWS SDK, the SDK's cryptographic signing algorithm requires the body size before transmission. The buffering approach with size limits provides: 1. Protection against unbounded memory allocation (DoS mitigation) 2. Compatibility with AWS SDK-based consumers (s3s-proxy, etc.) 3. Known content length for downstream request signing Fixes #370 Signed-off-by: Kefu Chai --- codegen/src/v1/ops.rs | 3 +- crates/s3s/src/http/multipart.rs | 197 ++++++++++++++++++++++++++++++- crates/s3s/src/ops/mod.rs | 10 +- crates/s3s/src/service.rs | 4 +- crates/s3s/src/stream.rs | 15 +-- 5 files changed, 206 insertions(+), 23 deletions(-) diff --git a/codegen/src/v1/ops.rs b/codegen/src/v1/ops.rs index d2e6260c..86f319fa 100644 --- a/codegen/src/v1/ops.rs +++ b/codegen/src/v1/ops.rs @@ -609,7 +609,8 @@ fn codegen_op_http_de_multipart(op: &Operation, rust_types: &RustTypes) { "", "let vec_stream = req.s3ext.vec_stream.take().expect(\"missing vec stream\");", "", - "let content_length = i64::try_from(vec_stream.exact_remaining_length()).map_err(|e|s3_error!(e, InvalidArgument, \"content-length overflow\"))?;", + "let content_length = i64::try_from(vec_stream.exact_remaining_length())", + " .map_err(|e| s3_error!(e, InvalidArgument, \"content-length overflow\"))?;", "let content_length = (content_length != 0).then_some(content_length);", "", "let body: Option = Some(StreamingBlob::new(vec_stream));", diff --git a/crates/s3s/src/http/multipart.rs b/crates/s3s/src/http/multipart.rs index 1951e663..1104cfad 100644 --- a/crates/s3s/src/http/multipart.rs +++ b/crates/s3s/src/http/multipart.rs @@ -16,6 +16,23 @@ use hyper::body::Bytes; use memchr::memchr_iter; use transform_stream::{AsyncTryStream, Yielder}; +/// Maximum size per form field (1 MB) +/// This prevents `DoS` attacks via oversized individual fields +const MAX_FORM_FIELD_SIZE: usize = 1024 * 1024; + +/// Maximum total size for all form fields combined (20 MB) +/// This prevents `DoS` attacks via accumulation of many fields +const MAX_FORM_FIELDS_SIZE: usize = 20 * 1024 * 1024; + +/// Maximum number of parts in multipart form +/// This prevents `DoS` attacks via excessive part count +const MAX_FORM_PARTS: usize = 1000; + +/// Maximum file size for POST object (5 GB - S3 limit for single PUT) +/// This prevents `DoS` attacks via oversized file uploads +/// Note: S3 has a 5GB limit for single PUT object, so this is a reasonable default +pub const MAX_POST_OBJECT_FILE_SIZE: u64 = 5 * 1024 * 1024 * 1024; + /// Form file #[derive(Debug)] pub struct File { @@ -68,6 +85,32 @@ pub enum MultipartError { Underlying(StdError), #[error("MultipartError: InvalidFormat")] InvalidFormat, + #[error("MultipartError: FieldTooLarge: field size {0} bytes exceeds limit of {1} bytes")] + FieldTooLarge(usize, usize), + #[error("MultipartError: TotalSizeTooLarge: total form fields size {0} bytes exceeds limit of {1} bytes")] + TotalSizeTooLarge(usize, usize), + #[error("MultipartError: TooManyParts: part count {0} exceeds limit of {1}")] + TooManyParts(usize, usize), + #[error("MultipartError: FileTooLarge: file size {0} bytes exceeds limit of {1} bytes")] + FileTooLarge(u64, u64), +} + +/// Aggregates a file stream into a Vec with a size limit. +/// Returns error if the total size exceeds the limit. +pub async fn aggregate_file_stream_limited(mut stream: FileStream, max_size: u64) -> Result, MultipartError> { + use futures::stream::StreamExt; + let mut vec = Vec::new(); + let mut total_size: u64 = 0; + + while let Some(result) = stream.next().await { + let bytes = result.map_err(|e| MultipartError::Underlying(Box::new(e)))?; + total_size = total_size.saturating_add(bytes.len() as u64); + if total_size > max_size { + return Err(MultipartError::FileTooLarge(total_size, max_size)); + } + vec.push(bytes); + } + Ok(vec) } /// transform multipart @@ -90,17 +133,29 @@ where }; let mut fields = Vec::new(); + let mut total_fields_size: usize = 0; + let mut parts_count: usize = 0; loop { // copy bytes to buf match body.as_mut().next().await { None => return Err(MultipartError::InvalidFormat), Some(Err(e)) => return Err(MultipartError::Underlying(e)), - Some(Ok(bytes)) => buf.extend_from_slice(&bytes), + Some(Ok(bytes)) => { + // Check if adding these bytes would exceed reasonable buffer size + // We use MAX_FORM_FIELDS_SIZE as the limit for the parsing buffer + if buf.len().saturating_add(bytes.len()) > MAX_FORM_FIELDS_SIZE { + return Err(MultipartError::TotalSizeTooLarge( + buf.len().saturating_add(bytes.len()), + MAX_FORM_FIELDS_SIZE, + )); + } + buf.extend_from_slice(&bytes); + } } // try to parse - match try_parse(body, pat, &buf, &mut fields, boundary) { + match try_parse(body, pat, &buf, &mut fields, boundary, &mut total_fields_size, &mut parts_count) { Err((b, p)) => { body = b; pat = p; @@ -118,6 +173,8 @@ fn try_parse( buf: &'_ [u8], fields: &'_ mut Vec<(String, String)>, boundary: &'_ [u8], + total_fields_size: &'_ mut usize, + parts_count: &'_ mut usize, ) -> Result, (Pin>, Box<[u8]>)> where S: Stream> + Send + Sync + 'static, @@ -126,6 +183,9 @@ where let pat_without_crlf = &pat[..pat.len().wrapping_sub(2)]; fields.clear(); + // Reset counters since we're re-parsing from scratch + *total_fields_size = 0; + *parts_count = 0; let mut lines = CrlfLines { slice: buf }; @@ -152,6 +212,12 @@ where let mut headers = [httparse::EMPTY_HEADER; 2]; loop { + // Check parts count limit + *parts_count += 1; + if *parts_count > MAX_FORM_PARTS { + return Ok(Err(MultipartError::TooManyParts(*parts_count, MAX_FORM_PARTS))); + } + let (idx, parsed_headers) = match httparse::parse_headers(lines.slice, &mut headers) { Ok(httparse::Status::Complete(ans)) => ans, Ok(_) => return Err((body, pat)), @@ -184,6 +250,17 @@ where #[allow(clippy::indexing_slicing)] let b = &b[..b.len().saturating_sub(2)]; + // Check per-field size limit + if b.len() > MAX_FORM_FIELD_SIZE { + return Ok(Err(MultipartError::FieldTooLarge(b.len(), MAX_FORM_FIELD_SIZE))); + } + + // Check total fields size limit + *total_fields_size = total_fields_size.saturating_add(b.len()); + if *total_fields_size > MAX_FORM_FIELDS_SIZE { + return Ok(Err(MultipartError::TotalSizeTooLarge(*total_fields_size, MAX_FORM_FIELDS_SIZE))); + } + match std::str::from_utf8(b) { Err(_) => return Ok(Err(MultipartError::InvalidFormat)), Ok(s) => s, @@ -663,4 +740,120 @@ mod tests { assert_eq!(file_bytes, file_content); } } + + #[tokio::test] + async fn test_field_too_large() { + let boundary = "boundary123"; + + // Create a field value that exceeds MAX_FORM_FIELD_SIZE (1 MB) + // This will exceed MAX_FORM_FIELD_SIZE and trigger FieldTooLarge error + let field_size = MAX_FORM_FIELD_SIZE + 1000; // Just over 1 MB + let large_value = "x".repeat(field_size); + + let body_bytes = vec![ + Bytes::from(format!("--{boundary}\r\n")), + Bytes::from("Content-Disposition: form-data; name=\"large_field\"\r\n\r\n"), + Bytes::from(large_value), + Bytes::from(format!("\r\n--{boundary}--\r\n")), + ]; + + let body_stream = futures::stream::iter(body_bytes.into_iter().map(Ok::<_, StdError>)); + + let result = transform_multipart(body_stream, boundary.as_bytes()).await; + // Either error is acceptable - both indicate the field/buffer is too large + assert!(result.is_err(), "Should fail when field exceeds limits"); + } + + #[tokio::test] + async fn test_total_size_too_large() { + let boundary = "boundary123"; + + // Create multiple fields that together exceed MAX_FORM_FIELDS_SIZE (20 MB) + let field_size = MAX_FORM_FIELD_SIZE; // 1 MB per field + let num_fields = 21; // 21 fields = 21 MB > 20 MB limit + + let mut body_bytes = Vec::new(); + + for i in 0..num_fields { + body_bytes.push(format!("--{boundary}\r\n")); + body_bytes.push(format!("Content-Disposition: form-data; name=\"field{i}\"\r\n\r\n")); + body_bytes.push("x".repeat(field_size)); + body_bytes.push("\r\n".to_string()); + } + body_bytes.push(format!("--{boundary}--\r\n")); + + let body_stream = futures::stream::iter(body_bytes.into_iter().map(|s| Ok::<_, StdError>(Bytes::from(s)))); + + let result = transform_multipart(body_stream, boundary.as_bytes()).await; + match result { + Err(MultipartError::TotalSizeTooLarge(size, limit)) => { + assert_eq!(limit, MAX_FORM_FIELDS_SIZE); + assert!(size > MAX_FORM_FIELDS_SIZE); + } + _ => panic!("Expected TotalSizeTooLarge error"), + } + } + + #[tokio::test] + async fn test_too_many_parts() { + let boundary = "boundary123"; + + // Create more parts than MAX_FORM_PARTS (1000) + let num_parts = MAX_FORM_PARTS + 1; + + let mut body_bytes = Vec::new(); + + for i in 0..num_parts { + body_bytes.push(format!("--{boundary}\r\n")); + body_bytes.push(format!("Content-Disposition: form-data; name=\"field{i}\"\r\n\r\n")); + body_bytes.push("value".to_string()); + body_bytes.push("\r\n".to_string()); + } + body_bytes.push(format!("--{boundary}--\r\n")); + + let body_stream = futures::stream::iter(body_bytes.into_iter().map(|s| Ok::<_, StdError>(Bytes::from(s)))); + + let result = transform_multipart(body_stream, boundary.as_bytes()).await; + match result { + Err(MultipartError::TooManyParts(count, limit)) => { + assert_eq!(limit, MAX_FORM_PARTS); + assert!(count > MAX_FORM_PARTS); + } + _ => panic!("Expected TooManyParts error"), + } + } + + #[tokio::test] + async fn test_limits_within_bounds() { + let boundary = "boundary123"; + + // Create fields within limits + let field_count = 10; + let field_size = 100; // Small fields + + let mut body_bytes = Vec::new(); + + for i in 0..field_count { + body_bytes.push(format!("--{boundary}\r\n")); + body_bytes.push(format!("Content-Disposition: form-data; name=\"field{i}\"\r\n\r\n")); + body_bytes.push("x".repeat(field_size)); + body_bytes.push("\r\n".to_string()); + } + + // Add a file + body_bytes.push(format!("--{boundary}\r\n")); + body_bytes.push("Content-Disposition: form-data; name=\"file\"; filename=\"test.txt\"\r\n".to_string()); + body_bytes.push("Content-Type: text/plain\r\n\r\n".to_string()); + body_bytes.push("file content".to_string()); + body_bytes.push(format!("\r\n--{boundary}--\r\n")); + + let body_stream = futures::stream::iter(body_bytes.into_iter().map(|s| Ok::<_, StdError>(Bytes::from(s)))); + + let result = transform_multipart(body_stream, boundary.as_bytes()).await; + assert!(result.is_ok(), "Should succeed when within limits"); + + let multipart = result.unwrap(); + assert_eq!(multipart.fields().len(), field_count); + assert!(multipart.file.stream.is_some()); + } } diff --git a/crates/s3s/src/ops/mod.rs b/crates/s3s/src/ops/mod.rs index 180db387..b9d8ed54 100644 --- a/crates/s3s/src/ops/mod.rs +++ b/crates/s3s/src/ops/mod.rs @@ -31,8 +31,6 @@ use crate::path::{ParseS3PathError, S3Path}; use crate::protocol::S3Request; use crate::route::S3Route; use crate::s3_trait::S3; -use crate::stream::VecByteStream; -use crate::stream::aggregate_unlimited; use crate::validation::{AwsNameValidation, NameValidation}; use std::mem; @@ -386,8 +384,12 @@ async fn prepare(req: &mut Request, ccx: &CallContext<'_>) -> S3Result // POST object debug!(?multipart); let file_stream = multipart.take_file_stream().expect("missing file stream"); - let vec_bytes = aggregate_unlimited(file_stream).await.map_err(S3Error::internal_error)?; - let vec_stream = VecByteStream::new(vec_bytes); + // Aggregate file stream with size limit to get known length + // This is required because downstream handlers (like s3s-proxy) need content-length + let vec_bytes = http::aggregate_file_stream_limited(file_stream, http::MAX_POST_OBJECT_FILE_SIZE) + .await + .map_err(|e| invalid_request!(e, "failed to read file stream"))?; + let vec_stream = crate::stream::VecByteStream::new(vec_bytes); req.s3ext.vec_stream = Some(vec_stream); break 'resolve (&PutObject as &'static dyn Operation, false); } diff --git a/crates/s3s/src/service.rs b/crates/s3s/src/service.rs index 905189e3..1fe7638a 100644 --- a/crates/s3s/src/service.rs +++ b/crates/s3s/src/service.rs @@ -210,8 +210,8 @@ mod tests { // In case the futures are made too large accidentally assert!(output_size(&crate::ops::call) <= 1600); - assert!(output_size(&S3Service::call) <= 2900); - assert!(output_size(&S3Service::call_owned) <= 3200); + assert!(output_size(&S3Service::call) <= 3000); + assert!(output_size(&S3Service::call_owned) <= 3300); } // Test validation functionality diff --git a/crates/s3s/src/stream.rs b/crates/s3s/src/stream.rs index df7c3aff..90af949f 100644 --- a/crates/s3s/src/stream.rs +++ b/crates/s3s/src/stream.rs @@ -6,7 +6,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use bytes::Bytes; -use futures::{Stream, StreamExt, pin_mut}; +use futures::Stream; pub trait ByteStream: Stream { fn remaining_length(&self) -> RemainingLength { @@ -132,19 +132,6 @@ where } } -// FIXME: unbounded memory allocation -pub(crate) async fn aggregate_unlimited(stream: S) -> Result, E> -where - S: ByteStream>, -{ - let mut vec = Vec::new(); - pin_mut!(stream); - while let Some(result) = stream.next().await { - vec.push(result?); - } - Ok(vec) -} - pub(crate) struct VecByteStream { queue: VecDeque, remaining_bytes: usize,