Skip to content

Commit 9568125

Browse files
committed
fix(s3s/http): prevent unbounded memory allocation in POST object (#370)
Add defense-in-depth limits to multipart/form-data parsing for POST object to prevent DoS attacks via oversized uploads: - MAX_FORM_FIELD_SIZE: 1 MB per field (matching MinIO) - MAX_FORM_FIELDS_SIZE: 20 MB total for all form fields - MAX_FORM_PARTS: 1000 maximum parts - MAX_POST_OBJECT_FILE_SIZE: 5 GB for file content (matching S3 single PUT limit) Why buffering instead of streaming: A pure streaming solution using FileStream was initially attempted, but it breaks compatibility with s3s-proxy and other consumers that use the AWS SDK. The AWS SDK requires a known Content-Length to compute SHA-256 checksums for SigV4 request signing. Since FileStream returns an unknown remaining length, the SDK throws UnsizedRequestBody errors. MinIO can accept streaming POST uploads directly because its HTTP server parses multipart boundaries at the protocol level without needing size upfront. However, when requests are proxied through s3s-proxy using the AWS SDK, the SDK's cryptographic signing algorithm requires the body size before transmission. The buffering approach with size limits provides: 1. Protection against unbounded memory allocation (DoS mitigation) 2. Compatibility with AWS SDK-based consumers (s3s-proxy, etc.) 3. Known content length for downstream request signing Fixes #370 Signed-off-by: Kefu Chai <[email protected]>
1 parent 85144c5 commit 9568125

File tree

5 files changed

+206
-23
lines changed

5 files changed

+206
-23
lines changed

codegen/src/v1/ops.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,8 @@ fn codegen_op_http_de_multipart(op: &Operation, rust_types: &RustTypes) {
609609
"",
610610
"let vec_stream = req.s3ext.vec_stream.take().expect(\"missing vec stream\");",
611611
"",
612-
"let content_length = i64::try_from(vec_stream.exact_remaining_length()).map_err(|e|s3_error!(e, InvalidArgument, \"content-length overflow\"))?;",
612+
"let content_length = i64::try_from(vec_stream.exact_remaining_length())",
613+
" .map_err(|e| s3_error!(e, InvalidArgument, \"content-length overflow\"))?;",
613614
"let content_length = (content_length != 0).then_some(content_length);",
614615
"",
615616
"let body: Option<StreamingBlob> = Some(StreamingBlob::new(vec_stream));",

crates/s3s/src/http/multipart.rs

Lines changed: 195 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,23 @@ use hyper::body::Bytes;
1616
use memchr::memchr_iter;
1717
use transform_stream::{AsyncTryStream, Yielder};
1818

19+
/// Maximum size per form field (1 MB)
20+
/// This prevents `DoS` attacks via oversized individual fields
21+
const MAX_FORM_FIELD_SIZE: usize = 1024 * 1024;
22+
23+
/// Maximum total size for all form fields combined (20 MB)
24+
/// This prevents `DoS` attacks via accumulation of many fields
25+
const MAX_FORM_FIELDS_SIZE: usize = 20 * 1024 * 1024;
26+
27+
/// Maximum number of parts in multipart form
28+
/// This prevents `DoS` attacks via excessive part count
29+
const MAX_FORM_PARTS: usize = 1000;
30+
31+
/// Maximum file size for POST object (5 GB - S3 limit for single PUT)
32+
/// This prevents `DoS` attacks via oversized file uploads
33+
/// Note: S3 has a 5GB limit for single PUT object, so this is a reasonable default
34+
pub const MAX_POST_OBJECT_FILE_SIZE: u64 = 5 * 1024 * 1024 * 1024;
35+
1936
/// Form file
2037
#[derive(Debug)]
2138
pub struct File {
@@ -68,6 +85,32 @@ pub enum MultipartError {
6885
Underlying(StdError),
6986
#[error("MultipartError: InvalidFormat")]
7087
InvalidFormat,
88+
#[error("MultipartError: FieldTooLarge: field size {0} bytes exceeds limit of {1} bytes")]
89+
FieldTooLarge(usize, usize),
90+
#[error("MultipartError: TotalSizeTooLarge: total form fields size {0} bytes exceeds limit of {1} bytes")]
91+
TotalSizeTooLarge(usize, usize),
92+
#[error("MultipartError: TooManyParts: part count {0} exceeds limit of {1}")]
93+
TooManyParts(usize, usize),
94+
#[error("MultipartError: FileTooLarge: file size {0} bytes exceeds limit of {1} bytes")]
95+
FileTooLarge(u64, u64),
96+
}
97+
98+
/// Aggregates a file stream into a Vec<Bytes> with a size limit.
99+
/// Returns error if the total size exceeds the limit.
100+
pub async fn aggregate_file_stream_limited(mut stream: FileStream, max_size: u64) -> Result<Vec<Bytes>, MultipartError> {
101+
use futures::stream::StreamExt;
102+
let mut vec = Vec::new();
103+
let mut total_size: u64 = 0;
104+
105+
while let Some(result) = stream.next().await {
106+
let bytes = result.map_err(|e| MultipartError::Underlying(Box::new(e)))?;
107+
total_size = total_size.saturating_add(bytes.len() as u64);
108+
if total_size >= max_size {
109+
return Err(MultipartError::FileTooLarge(total_size, max_size));
110+
}
111+
vec.push(bytes);
112+
}
113+
Ok(vec)
71114
}
72115

73116
/// transform multipart
@@ -90,17 +133,29 @@ where
90133
};
91134

92135
let mut fields = Vec::new();
136+
let mut total_fields_size: usize = 0;
137+
let mut parts_count: usize = 0;
93138

94139
loop {
95140
// copy bytes to buf
96141
match body.as_mut().next().await {
97142
None => return Err(MultipartError::InvalidFormat),
98143
Some(Err(e)) => return Err(MultipartError::Underlying(e)),
99-
Some(Ok(bytes)) => buf.extend_from_slice(&bytes),
144+
Some(Ok(bytes)) => {
145+
// Check if adding these bytes would exceed reasonable buffer size
146+
// We use MAX_FORM_FIELDS_SIZE as the limit for the parsing buffer
147+
if buf.len().saturating_add(bytes.len()) > MAX_FORM_FIELDS_SIZE {
148+
return Err(MultipartError::TotalSizeTooLarge(
149+
buf.len().saturating_add(bytes.len()),
150+
MAX_FORM_FIELDS_SIZE,
151+
));
152+
}
153+
buf.extend_from_slice(&bytes);
154+
}
100155
}
101156

102157
// try to parse
103-
match try_parse(body, pat, &buf, &mut fields, boundary) {
158+
match try_parse(body, pat, &buf, &mut fields, boundary, &mut total_fields_size, &mut parts_count) {
104159
Err((b, p)) => {
105160
body = b;
106161
pat = p;
@@ -118,6 +173,8 @@ fn try_parse<S>(
118173
buf: &'_ [u8],
119174
fields: &'_ mut Vec<(String, String)>,
120175
boundary: &'_ [u8],
176+
total_fields_size: &'_ mut usize,
177+
parts_count: &'_ mut usize,
121178
) -> Result<Result<Multipart, MultipartError>, (Pin<Box<S>>, Box<[u8]>)>
122179
where
123180
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
@@ -126,6 +183,9 @@ where
126183
let pat_without_crlf = &pat[..pat.len().wrapping_sub(2)];
127184

128185
fields.clear();
186+
// Reset counters since we're re-parsing from scratch
187+
*total_fields_size = 0;
188+
*parts_count = 0;
129189

130190
let mut lines = CrlfLines { slice: buf };
131191

@@ -152,6 +212,12 @@ where
152212

153213
let mut headers = [httparse::EMPTY_HEADER; 2];
154214
loop {
215+
// Check parts count limit
216+
*parts_count += 1;
217+
if *parts_count > MAX_FORM_PARTS {
218+
return Ok(Err(MultipartError::TooManyParts(*parts_count, MAX_FORM_PARTS)));
219+
}
220+
155221
let (idx, parsed_headers) = match httparse::parse_headers(lines.slice, &mut headers) {
156222
Ok(httparse::Status::Complete(ans)) => ans,
157223
Ok(_) => return Err((body, pat)),
@@ -184,6 +250,17 @@ where
184250
#[allow(clippy::indexing_slicing)]
185251
let b = &b[..b.len().saturating_sub(2)];
186252

253+
// Check per-field size limit
254+
if b.len() > MAX_FORM_FIELD_SIZE {
255+
return Ok(Err(MultipartError::FieldTooLarge(b.len(), MAX_FORM_FIELD_SIZE)));
256+
}
257+
258+
// Check total fields size limit
259+
*total_fields_size = total_fields_size.saturating_add(b.len());
260+
if *total_fields_size > MAX_FORM_FIELDS_SIZE {
261+
return Ok(Err(MultipartError::TotalSizeTooLarge(*total_fields_size, MAX_FORM_FIELDS_SIZE)));
262+
}
263+
187264
match std::str::from_utf8(b) {
188265
Err(_) => return Ok(Err(MultipartError::InvalidFormat)),
189266
Ok(s) => s,
@@ -663,4 +740,120 @@ mod tests {
663740
assert_eq!(file_bytes, file_content);
664741
}
665742
}
743+
744+
#[tokio::test]
745+
async fn test_field_too_large() {
746+
let boundary = "boundary123";
747+
748+
// Create a field value that exceeds MAX_FORM_FIELD_SIZE (1 MB)
749+
// This will exceed MAX_FORM_FIELD_SIZE and trigger FieldTooLarge error
750+
let field_size = MAX_FORM_FIELD_SIZE + 1000; // Just over 1 MB
751+
let large_value = "x".repeat(field_size);
752+
753+
let body_bytes = vec![
754+
Bytes::from(format!("--{boundary}\r\n")),
755+
Bytes::from("Content-Disposition: form-data; name=\"large_field\"\r\n\r\n"),
756+
Bytes::from(large_value),
757+
Bytes::from(format!("\r\n--{boundary}--\r\n")),
758+
];
759+
760+
let body_stream = futures::stream::iter(body_bytes.into_iter().map(Ok::<_, StdError>));
761+
762+
let result = transform_multipart(body_stream, boundary.as_bytes()).await;
763+
// Either error is acceptable - both indicate the field/buffer is too large
764+
assert!(result.is_err(), "Should fail when field exceeds limits");
765+
}
766+
767+
#[tokio::test]
768+
async fn test_total_size_too_large() {
769+
let boundary = "boundary123";
770+
771+
// Create multiple fields that together exceed MAX_FORM_FIELDS_SIZE (20 MB)
772+
let field_size = MAX_FORM_FIELD_SIZE; // 1 MB per field
773+
let num_fields = 21; // 21 fields = 21 MB > 20 MB limit
774+
775+
let mut body_bytes = Vec::new();
776+
777+
for i in 0..num_fields {
778+
body_bytes.push(format!("--{boundary}\r\n"));
779+
body_bytes.push(format!("Content-Disposition: form-data; name=\"field{i}\"\r\n\r\n"));
780+
body_bytes.push("x".repeat(field_size));
781+
body_bytes.push("\r\n".to_string());
782+
}
783+
body_bytes.push(format!("--{boundary}--\r\n"));
784+
785+
let body_stream = futures::stream::iter(body_bytes.into_iter().map(|s| Ok::<_, StdError>(Bytes::from(s))));
786+
787+
let result = transform_multipart(body_stream, boundary.as_bytes()).await;
788+
match result {
789+
Err(MultipartError::TotalSizeTooLarge(size, limit)) => {
790+
assert_eq!(limit, MAX_FORM_FIELDS_SIZE);
791+
assert!(size > MAX_FORM_FIELDS_SIZE);
792+
}
793+
_ => panic!("Expected TotalSizeTooLarge error"),
794+
}
795+
}
796+
797+
#[tokio::test]
798+
async fn test_too_many_parts() {
799+
let boundary = "boundary123";
800+
801+
// Create more parts than MAX_FORM_PARTS (1000)
802+
let num_parts = MAX_FORM_PARTS + 1;
803+
804+
let mut body_bytes = Vec::new();
805+
806+
for i in 0..num_parts {
807+
body_bytes.push(format!("--{boundary}\r\n"));
808+
body_bytes.push(format!("Content-Disposition: form-data; name=\"field{i}\"\r\n\r\n"));
809+
body_bytes.push("value".to_string());
810+
body_bytes.push("\r\n".to_string());
811+
}
812+
body_bytes.push(format!("--{boundary}--\r\n"));
813+
814+
let body_stream = futures::stream::iter(body_bytes.into_iter().map(|s| Ok::<_, StdError>(Bytes::from(s))));
815+
816+
let result = transform_multipart(body_stream, boundary.as_bytes()).await;
817+
match result {
818+
Err(MultipartError::TooManyParts(count, limit)) => {
819+
assert_eq!(limit, MAX_FORM_PARTS);
820+
assert!(count > MAX_FORM_PARTS);
821+
}
822+
_ => panic!("Expected TooManyParts error"),
823+
}
824+
}
825+
826+
#[tokio::test]
827+
async fn test_limits_within_bounds() {
828+
let boundary = "boundary123";
829+
830+
// Create fields within limits
831+
let field_count = 10;
832+
let field_size = 100; // Small fields
833+
834+
let mut body_bytes = Vec::new();
835+
836+
for i in 0..field_count {
837+
body_bytes.push(format!("--{boundary}\r\n"));
838+
body_bytes.push(format!("Content-Disposition: form-data; name=\"field{i}\"\r\n\r\n"));
839+
body_bytes.push("x".repeat(field_size));
840+
body_bytes.push("\r\n".to_string());
841+
}
842+
843+
// Add a file
844+
body_bytes.push(format!("--{boundary}\r\n"));
845+
body_bytes.push("Content-Disposition: form-data; name=\"file\"; filename=\"test.txt\"\r\n".to_string());
846+
body_bytes.push("Content-Type: text/plain\r\n\r\n".to_string());
847+
body_bytes.push("file content".to_string());
848+
body_bytes.push(format!("\r\n--{boundary}--\r\n"));
849+
850+
let body_stream = futures::stream::iter(body_bytes.into_iter().map(|s| Ok::<_, StdError>(Bytes::from(s))));
851+
852+
let result = transform_multipart(body_stream, boundary.as_bytes()).await;
853+
assert!(result.is_ok(), "Should succeed when within limits");
854+
855+
let multipart = result.unwrap();
856+
assert_eq!(multipart.fields().len(), field_count);
857+
assert!(multipart.file.stream.is_some());
858+
}
666859
}

crates/s3s/src/ops/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ use crate::path::{ParseS3PathError, S3Path};
3131
use crate::protocol::S3Request;
3232
use crate::route::S3Route;
3333
use crate::s3_trait::S3;
34-
use crate::stream::VecByteStream;
35-
use crate::stream::aggregate_unlimited;
3634
use crate::validation::{AwsNameValidation, NameValidation};
3735

3836
use std::mem;
@@ -386,8 +384,12 @@ async fn prepare(req: &mut Request, ccx: &CallContext<'_>) -> S3Result<Prepare>
386384
// POST object
387385
debug!(?multipart);
388386
let file_stream = multipart.take_file_stream().expect("missing file stream");
389-
let vec_bytes = aggregate_unlimited(file_stream).await.map_err(S3Error::internal_error)?;
390-
let vec_stream = VecByteStream::new(vec_bytes);
387+
// Aggregate file stream with size limit to get known length
388+
// This is required because downstream handlers (like s3s-proxy) need content-length
389+
let vec_bytes = http::aggregate_file_stream_limited(file_stream, http::MAX_POST_OBJECT_FILE_SIZE)
390+
.await
391+
.map_err(|e| invalid_request!(e, "failed to read file stream"))?;
392+
let vec_stream = crate::stream::VecByteStream::new(vec_bytes);
391393
req.s3ext.vec_stream = Some(vec_stream);
392394
break 'resolve (&PutObject as &'static dyn Operation, false);
393395
}

crates/s3s/src/service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,8 @@ mod tests {
210210

211211
// In case the futures are made too large accidentally
212212
assert!(output_size(&crate::ops::call) <= 1600);
213-
assert!(output_size(&S3Service::call) <= 2900);
214-
assert!(output_size(&S3Service::call_owned) <= 3200);
213+
assert!(output_size(&S3Service::call) <= 3000);
214+
assert!(output_size(&S3Service::call_owned) <= 3300);
215215
}
216216

217217
// Test validation functionality

crates/s3s/src/stream.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::pin::Pin;
66
use std::task::{Context, Poll};
77

88
use bytes::Bytes;
9-
use futures::{Stream, StreamExt, pin_mut};
9+
use futures::Stream;
1010

1111
pub trait ByteStream: Stream {
1212
fn remaining_length(&self) -> RemainingLength {
@@ -132,19 +132,6 @@ where
132132
}
133133
}
134134

135-
// FIXME: unbounded memory allocation
136-
pub(crate) async fn aggregate_unlimited<S, E>(stream: S) -> Result<Vec<Bytes>, E>
137-
where
138-
S: ByteStream<Item = Result<Bytes, E>>,
139-
{
140-
let mut vec = Vec::new();
141-
pin_mut!(stream);
142-
while let Some(result) = stream.next().await {
143-
vec.push(result?);
144-
}
145-
Ok(vec)
146-
}
147-
148135
pub(crate) struct VecByteStream {
149136
queue: VecDeque<Bytes>,
150137
remaining_bytes: usize,

0 commit comments

Comments
 (0)