Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion codegen/src/v1/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,8 @@ fn codegen_op_http_de_multipart(op: &Operation, rust_types: &RustTypes) {
"",
"let vec_stream = req.s3ext.vec_stream.take().expect(\"missing vec stream\");",
"",
"let content_length = i64::try_from(vec_stream.exact_remaining_length()).map_err(|e|s3_error!(e, InvalidArgument, \"content-length overflow\"))?;",
"let content_length = i64::try_from(vec_stream.exact_remaining_length())",
" .map_err(|e| s3_error!(e, InvalidArgument, \"content-length overflow\"))?;",
"let content_length = (content_length != 0).then_some(content_length);",
"",
"let body: Option<StreamingBlob> = Some(StreamingBlob::new(vec_stream));",
Expand Down
197 changes: 195 additions & 2 deletions crates/s3s/src/http/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@ use hyper::body::Bytes;
use memchr::memchr_iter;
use transform_stream::{AsyncTryStream, Yielder};

/// Maximum size per form field (1 MB)
/// This prevents `DoS` attacks via oversized individual fields
const MAX_FORM_FIELD_SIZE: usize = 1024 * 1024;

/// Maximum total size for all form fields combined (20 MB)
/// This prevents `DoS` attacks via accumulation of many fields
const MAX_FORM_FIELDS_SIZE: usize = 20 * 1024 * 1024;

/// Maximum number of parts in multipart form
/// This prevents `DoS` attacks via excessive part count
const MAX_FORM_PARTS: usize = 1000;

/// Maximum file size for POST object (5 GB - S3 limit for single PUT)
/// This prevents `DoS` attacks via oversized file uploads
/// Note: S3 has a 5GB limit for single PUT object, so this is a reasonable default
pub const MAX_POST_OBJECT_FILE_SIZE: u64 = 5 * 1024 * 1024 * 1024;

/// Form file
#[derive(Debug)]
pub struct File {
Expand Down Expand Up @@ -68,6 +85,32 @@ pub enum MultipartError {
Underlying(StdError),
#[error("MultipartError: InvalidFormat")]
InvalidFormat,
#[error("MultipartError: FieldTooLarge: field size {0} bytes exceeds limit of {1} bytes")]
FieldTooLarge(usize, usize),
#[error("MultipartError: TotalSizeTooLarge: total form fields size {0} bytes exceeds limit of {1} bytes")]
TotalSizeTooLarge(usize, usize),
#[error("MultipartError: TooManyParts: part count {0} exceeds limit of {1}")]
TooManyParts(usize, usize),
#[error("MultipartError: FileTooLarge: file size {0} bytes exceeds limit of {1} bytes")]
FileTooLarge(u64, u64),
}

/// Aggregates a file stream into a Vec<Bytes> with a size limit.
/// Returns error if the total size exceeds the limit.
pub async fn aggregate_file_stream_limited(mut stream: FileStream, max_size: u64) -> Result<Vec<Bytes>, MultipartError> {
use futures::stream::StreamExt;
let mut vec = Vec::new();
let mut total_size: u64 = 0;

while let Some(result) = stream.next().await {
let bytes = result.map_err(|e| MultipartError::Underlying(Box::new(e)))?;
total_size = total_size.saturating_add(bytes.len() as u64);
if total_size > max_size {
return Err(MultipartError::FileTooLarge(total_size, max_size));
}
vec.push(bytes);
}
Ok(vec)
}

/// transform multipart
Expand All @@ -90,17 +133,29 @@ where
};

let mut fields = Vec::new();
let mut total_fields_size: usize = 0;
let mut parts_count: usize = 0;

loop {
// copy bytes to buf
match body.as_mut().next().await {
None => return Err(MultipartError::InvalidFormat),
Some(Err(e)) => return Err(MultipartError::Underlying(e)),
Some(Ok(bytes)) => buf.extend_from_slice(&bytes),
Some(Ok(bytes)) => {
// Check if adding these bytes would exceed reasonable buffer size
// We use MAX_FORM_FIELDS_SIZE as the limit for the parsing buffer
if buf.len().saturating_add(bytes.len()) > MAX_FORM_FIELDS_SIZE {
return Err(MultipartError::TotalSizeTooLarge(
buf.len().saturating_add(bytes.len()),
MAX_FORM_FIELDS_SIZE,
));
}
buf.extend_from_slice(&bytes);
}
}

// try to parse
match try_parse(body, pat, &buf, &mut fields, boundary) {
match try_parse(body, pat, &buf, &mut fields, boundary, &mut total_fields_size, &mut parts_count) {
Err((b, p)) => {
body = b;
pat = p;
Expand All @@ -118,6 +173,8 @@ fn try_parse<S>(
buf: &'_ [u8],
fields: &'_ mut Vec<(String, String)>,
boundary: &'_ [u8],
total_fields_size: &'_ mut usize,
parts_count: &'_ mut usize,
) -> Result<Result<Multipart, MultipartError>, (Pin<Box<S>>, Box<[u8]>)>
where
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
Expand All @@ -126,6 +183,9 @@ where
let pat_without_crlf = &pat[..pat.len().wrapping_sub(2)];

fields.clear();
// Reset counters since we're re-parsing from scratch
*total_fields_size = 0;
*parts_count = 0;

let mut lines = CrlfLines { slice: buf };

Expand All @@ -152,6 +212,12 @@ where

let mut headers = [httparse::EMPTY_HEADER; 2];
loop {
// Check parts count limit
*parts_count += 1;
if *parts_count > MAX_FORM_PARTS {
return Ok(Err(MultipartError::TooManyParts(*parts_count, MAX_FORM_PARTS)));
}

let (idx, parsed_headers) = match httparse::parse_headers(lines.slice, &mut headers) {
Ok(httparse::Status::Complete(ans)) => ans,
Ok(_) => return Err((body, pat)),
Expand Down Expand Up @@ -184,6 +250,17 @@ where
#[allow(clippy::indexing_slicing)]
let b = &b[..b.len().saturating_sub(2)];

// Check per-field size limit
if b.len() > MAX_FORM_FIELD_SIZE {
return Ok(Err(MultipartError::FieldTooLarge(b.len(), MAX_FORM_FIELD_SIZE)));
}

// Check total fields size limit
*total_fields_size = total_fields_size.saturating_add(b.len());
if *total_fields_size > MAX_FORM_FIELDS_SIZE {
return Ok(Err(MultipartError::TotalSizeTooLarge(*total_fields_size, MAX_FORM_FIELDS_SIZE)));
}

match std::str::from_utf8(b) {
Err(_) => return Ok(Err(MultipartError::InvalidFormat)),
Ok(s) => s,
Expand Down Expand Up @@ -668,4 +745,120 @@ mod tests {
assert_eq!(file_bytes, file_content);
}
}

#[tokio::test]
async fn test_field_too_large() {
let boundary = "boundary123";

// Create a field value that exceeds MAX_FORM_FIELD_SIZE (1 MB)
// This will exceed MAX_FORM_FIELD_SIZE and trigger FieldTooLarge error
let field_size = MAX_FORM_FIELD_SIZE + 1000; // Just over 1 MB
let large_value = "x".repeat(field_size);

let body_bytes = vec![
Bytes::from(format!("--{boundary}\r\n")),
Bytes::from("Content-Disposition: form-data; name=\"large_field\"\r\n\r\n"),
Bytes::from(large_value),
Bytes::from(format!("\r\n--{boundary}--\r\n")),
];

let body_stream = futures::stream::iter(body_bytes.into_iter().map(Ok::<_, StdError>));

let result = transform_multipart(body_stream, boundary.as_bytes()).await;
// Either error is acceptable - both indicate the field/buffer is too large
assert!(result.is_err(), "Should fail when field exceeds limits");
}

#[tokio::test]
async fn test_total_size_too_large() {
let boundary = "boundary123";

// Create multiple fields that together exceed MAX_FORM_FIELDS_SIZE (20 MB)
let field_size = MAX_FORM_FIELD_SIZE; // 1 MB per field
let num_fields = 21; // 21 fields = 21 MB > 20 MB limit

let mut body_bytes = Vec::new();

for i in 0..num_fields {
body_bytes.push(format!("--{boundary}\r\n"));
body_bytes.push(format!("Content-Disposition: form-data; name=\"field{i}\"\r\n\r\n"));
body_bytes.push("x".repeat(field_size));
body_bytes.push("\r\n".to_string());
}
body_bytes.push(format!("--{boundary}--\r\n"));

let body_stream = futures::stream::iter(body_bytes.into_iter().map(|s| Ok::<_, StdError>(Bytes::from(s))));

let result = transform_multipart(body_stream, boundary.as_bytes()).await;
match result {
Err(MultipartError::TotalSizeTooLarge(size, limit)) => {
assert_eq!(limit, MAX_FORM_FIELDS_SIZE);
assert!(size > MAX_FORM_FIELDS_SIZE);
}
_ => panic!("Expected TotalSizeTooLarge error"),
}
}

#[tokio::test]
async fn test_too_many_parts() {
let boundary = "boundary123";

// Create more parts than MAX_FORM_PARTS (1000)
let num_parts = MAX_FORM_PARTS + 1;

let mut body_bytes = Vec::new();

for i in 0..num_parts {
body_bytes.push(format!("--{boundary}\r\n"));
body_bytes.push(format!("Content-Disposition: form-data; name=\"field{i}\"\r\n\r\n"));
body_bytes.push("value".to_string());
body_bytes.push("\r\n".to_string());
}
body_bytes.push(format!("--{boundary}--\r\n"));

let body_stream = futures::stream::iter(body_bytes.into_iter().map(|s| Ok::<_, StdError>(Bytes::from(s))));

let result = transform_multipart(body_stream, boundary.as_bytes()).await;
match result {
Err(MultipartError::TooManyParts(count, limit)) => {
assert_eq!(limit, MAX_FORM_PARTS);
assert!(count > MAX_FORM_PARTS);
}
_ => panic!("Expected TooManyParts error"),
}
}

#[tokio::test]
async fn test_limits_within_bounds() {
let boundary = "boundary123";

// Create fields within limits
let field_count = 10;
let field_size = 100; // Small fields

let mut body_bytes = Vec::new();

for i in 0..field_count {
body_bytes.push(format!("--{boundary}\r\n"));
body_bytes.push(format!("Content-Disposition: form-data; name=\"field{i}\"\r\n\r\n"));
body_bytes.push("x".repeat(field_size));
body_bytes.push("\r\n".to_string());
}

// Add a file
body_bytes.push(format!("--{boundary}\r\n"));
body_bytes.push("Content-Disposition: form-data; name=\"file\"; filename=\"test.txt\"\r\n".to_string());
body_bytes.push("Content-Type: text/plain\r\n\r\n".to_string());
body_bytes.push("file content".to_string());
body_bytes.push(format!("\r\n--{boundary}--\r\n"));

let body_stream = futures::stream::iter(body_bytes.into_iter().map(|s| Ok::<_, StdError>(Bytes::from(s))));

let result = transform_multipart(body_stream, boundary.as_bytes()).await;
assert!(result.is_ok(), "Should succeed when within limits");

let multipart = result.unwrap();
assert_eq!(multipart.fields().len(), field_count);
assert!(multipart.file.stream.is_some());
}
}
10 changes: 6 additions & 4 deletions crates/s3s/src/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use crate::path::{ParseS3PathError, S3Path};
use crate::protocol::S3Request;
use crate::route::S3Route;
use crate::s3_trait::S3;
use crate::stream::VecByteStream;
use crate::stream::aggregate_unlimited;
use crate::validation::{AwsNameValidation, NameValidation};

use std::mem;
Expand Down Expand Up @@ -383,8 +381,12 @@ async fn prepare(req: &mut Request, ccx: &CallContext<'_>) -> S3Result<Prepare>
// POST object
debug!(?multipart);
let file_stream = multipart.take_file_stream().expect("missing file stream");
let vec_bytes = aggregate_unlimited(file_stream).await.map_err(S3Error::internal_error)?;
let vec_stream = VecByteStream::new(vec_bytes);
// Aggregate file stream with size limit to get known length
// This is required because downstream handlers (like s3s-proxy) need content-length
let vec_bytes = http::aggregate_file_stream_limited(file_stream, http::MAX_POST_OBJECT_FILE_SIZE)
.await
.map_err(|e| invalid_request!(e, "failed to read file stream"))?;
let vec_stream = crate::stream::VecByteStream::new(vec_bytes);
req.s3ext.vec_stream = Some(vec_stream);
break 'resolve (&PutObject as &'static dyn Operation, false);
}
Expand Down
4 changes: 2 additions & 2 deletions crates/s3s/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ mod tests {

// In case the futures are made too large accidentally
assert!(output_size(&crate::ops::call) <= 1600);
assert!(output_size(&S3Service::call) <= 2900);
assert!(output_size(&S3Service::call_owned) <= 3200);
assert!(output_size(&S3Service::call) <= 3000);
assert!(output_size(&S3Service::call_owned) <= 3300);
}

// Test validation functionality
Expand Down
15 changes: 1 addition & 14 deletions crates/s3s/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
use futures::{Stream, StreamExt, pin_mut};
use futures::Stream;

pub trait ByteStream: Stream {
fn remaining_length(&self) -> RemainingLength {
Expand Down Expand Up @@ -132,19 +132,6 @@ where
}
}

// FIXME: unbounded memory allocation
pub(crate) async fn aggregate_unlimited<S, E>(stream: S) -> Result<Vec<Bytes>, E>
where
S: ByteStream<Item = Result<Bytes, E>>,
{
let mut vec = Vec::new();
pin_mut!(stream);
while let Some(result) = stream.next().await {
vec.push(result?);
}
Ok(vec)
}

pub(crate) struct VecByteStream {
queue: VecDeque<Bytes>,
remaining_bytes: usize,
Expand Down