From 84eb00e899c74a40b1d1544383a78b34758bddfd Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Thu, 19 Mar 2026 16:15:13 +0100 Subject: [PATCH 01/19] correctly handle attachment references in the stores --- relay-server/src/envelope/attachment.rs | 5 ++- relay-server/src/envelope/item.rs | 2 +- relay-server/src/services/objectstore.rs | 17 +++++--- relay-server/src/services/store.rs | 54 +++++++++++++++++++++++- relay-server/src/services/upload.rs | 8 +++- tests/integration/test_playstation.py | 4 +- 6 files changed, 77 insertions(+), 13 deletions(-) diff --git a/relay-server/src/envelope/attachment.rs b/relay-server/src/envelope/attachment.rs index 78eb2131556..22c9614f2ef 100644 --- a/relay-server/src/envelope/attachment.rs +++ b/relay-server/src/envelope/attachment.rs @@ -1,6 +1,6 @@ use std::fmt; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use crate::envelope::ContentType; @@ -78,8 +78,9 @@ impl fmt::Display for AttachmentType { /// Represents the payload of an [attachment placeholder item]( /// https://develop.sentry.dev/sdk/telemetry/attachments/#attachment-placeholder-item). #[cfg_attr(not(sentry), expect(unused))] -#[derive(Serialize)] +#[derive(Serialize, Deserialize)] pub struct AttachmentPlaceholder<'a> { + #[serde(borrow)] pub location: &'a str, #[serde(skip_serializing_if = "Option::is_none")] pub content_type: Option, diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index 6b4c2a46039..7f016681a68 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -558,7 +558,7 @@ impl Item { } /// Returns `true` if this item is an attachment placeholder. - fn is_attachment_ref(&self) -> bool { + pub fn is_attachment_ref(&self) -> bool { self.ty() == &ItemType::Attachment && self.content_type() == Some(ContentType::AttachmentRef) } diff --git a/relay-server/src/services/objectstore.rs b/relay-server/src/services/objectstore.rs index 60806f91312..8c1ac1c2e99 100644 --- a/relay-server/src/services/objectstore.rs +++ b/relay-server/src/services/objectstore.rs @@ -17,7 +17,7 @@ use relay_system::{ use sentry_protos::snuba::v1::TraceItem; use crate::constants::DEFAULT_ATTACHMENT_RETENTION; -use crate::envelope::ItemType; +use crate::envelope::{Item, ItemType}; use crate::managed::{ Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected, }; @@ -294,8 +294,7 @@ impl ObjectstoreServiceInner { } Ok(session) => { for attachment in attachments { - // we are not storing zero-size attachments in objectstore - if attachment.is_empty() { + if Self::should_skip_upload(&attachment) { continue; } let result = self @@ -326,8 +325,7 @@ impl ObjectstoreServiceInner { /// This mutates the attachment item in-place, setting the `stored_key` field to the key in the /// objectstore. async fn handle_event_attachment(&self, mut attachment: Managed) { - // we are not storing zero-size attachments in objectstore - if attachment.attachment.is_empty() { + if Self::should_skip_upload(&attachment.attachment) { self.store.send(attachment); return; } @@ -520,4 +518,13 @@ impl ObjectstoreServiceInner { Ok(ObjectstoreKey(response.key)) } + + /// Returns `true` if the item should **not** be uploaded to the objectstore. + /// + /// The case for: + /// - Zero-size attachments + /// - Attachment placeholders + fn should_skip_upload(item: &Item) -> bool { + item.is_empty() || item.is_attachment_ref() + } } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 74aa5d746e0..782f74a726a 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -33,12 +33,13 @@ use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, NoResponse, Service}; use relay_threading::AsyncPool; -use crate::envelope::{AttachmentType, ContentType, Item, ItemType}; +use crate::envelope::{AttachmentPlaceholder, AttachmentType, ContentType, Item, ItemType}; use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities}; use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes}; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; +use crate::services::upload::SignedLocation; use crate::statsd::{RelayCounters, RelayGauges, RelayTimers}; use crate::utils::{self, FormDataIter}; @@ -55,6 +56,8 @@ pub enum StoreError { EncodingFailed(std::io::Error), #[error("failed to store event because event id was missing")] NoEventId, + #[error("invalid attachment reference")] + InvalidAttachmentRef, } impl OutcomeError for StoreError { @@ -958,6 +961,44 @@ impl StoreService { Ok(()) } + fn produce_attachment_ref( + &self, + event_id: EventId, + project_id: ProjectId, + item: &Item, + send_individual_attachments: bool, + ) -> Result, StoreError> { + let payload = item.payload(); + let placeholder: AttachmentPlaceholder<'_> = + serde_json::from_slice(&payload).map_err(|_| StoreError::InvalidAttachmentRef)?; + let store_key = SignedLocation::try_from_str(&placeholder.location) + .ok_or(StoreError::InvalidAttachmentRef)? + .unverified_key() + .to_owned(); + + let attachment = ChunkedAttachment { + id: Uuid::new_v4().to_string(), + name: item.filename().unwrap_or(UNNAMED_ATTACHMENT).to_owned(), + rate_limited: item.rate_limited(), + content_type: placeholder.content_type.map(|c| c.as_str().to_owned()), + attachment_type: item.attachment_type().unwrap_or_default(), + size: item.attachment_body_size(), + payload: AttachmentPayload::Stored(store_key), + }; + + if send_individual_attachments { + let message = KafkaMessage::Attachment(AttachmentKafkaMessage { + event_id, + project_id, + attachment, + }); + self.produce(KafkaTopic::Attachments, message)?; + Ok(None) + } else { + Ok(Some(attachment)) + } + } + /// Produces Kafka messages for the content and metadata of an attachment item. /// /// The `send_individual_attachments` controls whether the metadata of an attachment @@ -976,6 +1017,17 @@ impl StoreService { item: &Item, send_individual_attachments: bool, ) -> Result, StoreError> { + // If the attachment is actually a placeholder we need to do some logic that does not neatly + // fit into this function. + if item.is_attachment_ref() { + return self.produce_attachment_ref( + event_id, + project_id, + item, + send_individual_attachments, + ); + } + let id = Uuid::new_v4().to_string(); let payload = item.payload(); diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index 742d99b247e..19ef2841e16 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -381,7 +381,7 @@ impl SignedLocation { /// /// Fails if the signature is outdated or incorrect. #[cfg(feature = "processing")] - fn verify(self, received: DateTime, config: &Config) -> Result { + pub fn verify(self, received: DateTime, config: &Config) -> Result { let public_key = config.public_key().ok_or(Error::SigningFailed)?; let is_valid = self.signature.verify( self.location.as_uri().as_bytes(), @@ -411,7 +411,7 @@ impl SignedLocation { } } - fn try_from_str(uri: &str) -> Option { + pub fn try_from_str(uri: &str) -> Option { static ROUTER: std::sync::LazyLock> = std::sync::LazyLock::new(|| { let mut router = matchit::Router::new(); router @@ -432,6 +432,10 @@ impl SignedLocation { Some(Self::from_parts(project_id, key, length, signature)) } + + pub fn unverified_key(&self) -> &str { + &self.location.key + } } enum RequestKind { diff --git a/tests/integration/test_playstation.py b/tests/integration/test_playstation.py index 409ce272fb1..98d24a04d38 100644 --- a/tests/integration/test_playstation.py +++ b/tests/integration/test_playstation.py @@ -26,6 +26,7 @@ def playstation_project_config(): "features": [ "organizations:relay-playstation-ingestion", "projects:relay-upload-endpoint", + "organizations:relay-new-error-processing", ], } } @@ -442,8 +443,7 @@ def test_playstation_large_attachments( video_attachment = [ a for a in event["attachments"] if a["name"] == "crash-video.webm" ][0] - location_uri = chunks[video_attachment["id"]].decode() - video_key = location_uri.split("/upload/")[1].split("/")[0] + video_key = video_attachment["stored_id"] objectstore_session = objectstore("attachments", PROJECT_ID) assert objectstore_session.get(video_key).payload.read() == video_content.encode() From 0d396724fe38c023b06c6f3bc7b516262421307b Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 20 Mar 2026 10:48:47 +0100 Subject: [PATCH 02/19] assert`stored_key` is none before it is set --- relay-server/src/services/store.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 782f74a726a..4e11738544f 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -968,6 +968,11 @@ impl StoreService { item: &Item, send_individual_attachments: bool, ) -> Result, StoreError> { + debug_assert!( + item.stored_key().is_none(), + "AttachmentRef should not have been uploaded to objectstore" + ); + let payload = item.payload(); let placeholder: AttachmentPlaceholder<'_> = serde_json::from_slice(&payload).map_err(|_| StoreError::InvalidAttachmentRef)?; From 15c82942efd06161132c17ed4b34353ce3b6185e Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 20 Mar 2026 10:50:05 +0100 Subject: [PATCH 03/19] add happy path test --- tests/integration/test_attachment_ref.py | 116 +++++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/tests/integration/test_attachment_ref.py b/tests/integration/test_attachment_ref.py index e4f17651e9f..03cc44bb731 100644 --- a/tests/integration/test_attachment_ref.py +++ b/tests/integration/test_attachment_ref.py @@ -1,9 +1,13 @@ +import json import pytest import uuid +from unittest import mock from requests.exceptions import HTTPError from sentry_sdk.envelope import Envelope, Item, PayloadRef +from .test_store import make_transaction + def make_envelope(event_id): """Create an envelope with a single AttachmentRef item.""" @@ -21,6 +25,43 @@ def make_envelope(event_id): return envelope +def upload_and_make_ref( + relay, + project_id, + project_key, + data, + filename="test.txt", + content_type="text/plain", + attachment_type="event.attachment", +): + """Upload data via TUS, then construct an AttachmentRef item pointing at it.""" + # TODO: We want to use the new (POST and PATCH) approach here as well or is the old way fine? + response = relay.post( + f"/api/{project_id}/upload/?sentry_key={project_key}", + headers={ + "Tus-Resumable": "1.0.0", + "Content-Type": "application/offset+octet-stream", + "Upload-Length": str(len(data)), + }, + data=data, + ) + assert response.status_code == 201 + location = response.headers["Location"] + + payload = json.dumps({"location": location, "content_type": content_type}) + return Item( + payload=PayloadRef(bytes=payload.encode()), + headers={ + "type": "attachment", + "content_type": "application/vnd.sentry.attachment-ref", + "length": len(payload), + "attachment_length": len(data), + "filename": filename, + "attachment_type": attachment_type, + }, + ) + + @pytest.mark.parametrize("data_category", ["attachment", "attachment_item"]) def test_attachment_ref_ratelimit( mini_sentry, @@ -70,3 +111,78 @@ def test_attachment_ref_ratelimit( "attachment_ref_exceeded", categories={"attachment": 1, "attachment_item": 1}, ) + + +@pytest.mark.parametrize( + "event_type", + [ + pytest.param("none", id="standalone"), + pytest.param("event", id="with_event"), + pytest.param("transaction", id="with_transaction"), + ], +) +def test_attachment_ref( + mini_sentry, + relay_with_processing, + attachments_consumer, + objectstore, + event_type, +): + event_id = "515539018c9b4260a6f999572f1661ee" + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"].setdefault("features", []).append( + "projects:relay-upload-endpoint" + ) + mini_sentry.global_config["options"][ + "relay.objectstore-attachments.sample-rate" + ] = 1.0 + + relay = relay_with_processing() + attachments_consumer = attachments_consumer() + project_key = mini_sentry.get_dsn_public_key(project_id) + + attachment_data = b"" + envelope = Envelope(headers=[["event_id", event_id]]) + envelope.add_item( + upload_and_make_ref( + relay, + project_id, + project_key, + attachment_data, + ) + ) + + if event_type == "event": + envelope.add_event({"message": "Hello, World!"}) + elif event_type == "transaction": + envelope.add_transaction(make_transaction({"event_id": event_id})) + + relay.send_envelope(project_id, envelope) + expected_attachment = { + "id": mock.ANY, + "name": "test.txt", + "content_type": "text/plain", + "attachment_type": "event.attachment", + "size": len(attachment_data), + "rate_limited": False, + "stored_id": mock.ANY, + } + + if event_type == "event": + _, event_message = attachments_consumer.get_event() + assert len(event_message["attachments"]) == 1 + assert event_message["attachments"][0] == expected_attachment + stored_id = event_message["attachments"][0]["stored_id"] + else: + attachment = attachments_consumer.get_individual_attachment() + assert attachment == { + "type": "attachment", + "event_id": event_id, + "project_id": project_id, + "attachment": expected_attachment, + } + stored_id = attachment["attachment"]["stored_id"] + + objectstore_session = objectstore("attachments", project_id) + assert objectstore_session.get(stored_id).payload.read() == attachment_data From 12da644a76c2424c2d178d66faf92a012b5057a6 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 20 Mar 2026 15:27:06 +0100 Subject: [PATCH 04/19] fix test (broke due to me adding `relay-new-error-processing`) --- tests/integration/test_playstation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_playstation.py b/tests/integration/test_playstation.py index 98d24a04d38..4277855cc53 100644 --- a/tests/integration/test_playstation.py +++ b/tests/integration/test_playstation.py @@ -727,7 +727,7 @@ def test_event_merging( "id": "legacy:2019-03-12", }, "_metrics": { - "bytes.ingested.event": 725, + "bytes.ingested.event": 836, "bytes.ingested.event.minidump": 60446, "bytes.ingested.event.attachment": 158008, }, From e246ebe5c3d8d176469713f0bcf2995a33ecd185 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 20 Mar 2026 15:36:53 +0100 Subject: [PATCH 05/19] add validation logic --- .../src/processing/attachments/mod.rs | 4 ++- .../src/processing/attachments/process.rs | 25 +++++++++++++++++ relay-server/src/processing/errors/mod.rs | 2 ++ relay-server/src/processing/errors/process.rs | 22 +++++++++++++++ .../src/processing/transactions/mod.rs | 6 ++++ .../src/processing/transactions/process.rs | 22 +++++++++++++++ .../src/processing/utils/attachments.rs | 28 +++++++++++++++++++ relay-server/src/services/outcome.rs | 6 ++++ relay-server/src/services/processor.rs | 8 ++++++ 9 files changed, 122 insertions(+), 1 deletion(-) diff --git a/relay-server/src/processing/attachments/mod.rs b/relay-server/src/processing/attachments/mod.rs index 3ea4281353d..3846707cdb5 100644 --- a/relay-server/src/processing/attachments/mod.rs +++ b/relay-server/src/processing/attachments/mod.rs @@ -88,7 +88,7 @@ impl processing::Processor for AttachmentProcessor { async fn process( &self, - attachments: Managed, + #[allow(unused_mut)] mut attachments: Managed, ctx: processing::Context<'_>, ) -> Result, Rejected> { let client_name = crate::utils::client_name_tag(attachments.headers.meta().client_name()); @@ -125,6 +125,8 @@ impl processing::Processor for AttachmentProcessor { } } + #[cfg(feature = "processing")] + process::validate_attachments(&mut attachments, ctx); let mut attachments = self.limiter.enforce_quotas(attachments, ctx).await?; process::scrub(&mut attachments, ctx)?; diff --git a/relay-server/src/processing/attachments/process.rs b/relay-server/src/processing/attachments/process.rs index 3bcb70c1ed6..d0a90512480 100644 --- a/relay-server/src/processing/attachments/process.rs +++ b/relay-server/src/processing/attachments/process.rs @@ -16,3 +16,28 @@ pub fn scrub( Ok::<_, Error>(()) }) } + +/// Validates the attachments and drop any invalid ones. +/// +/// An attachment might be a placeholder, in which case it needs to be validated. +#[cfg(feature = "processing")] +pub fn validate_attachments( + attachments: &mut Managed, + ctx: processing::Context<'_>, +) { + if !ctx.is_processing() { + return; + } + + attachments.modify(|attachments, records| { + attachments.attachments.retain_mut(|attachment| { + match utils::attachments::validate(attachment, ctx.config) { + Ok(()) => true, + Err(err) => { + records.reject_err(err, &*attachment); + false + } + } + }); + }); +} diff --git a/relay-server/src/processing/errors/mod.rs b/relay-server/src/processing/errors/mod.rs index 71b0ec7db66..8c9c48c8efa 100644 --- a/relay-server/src/processing/errors/mod.rs +++ b/relay-server/src/processing/errors/mod.rs @@ -108,6 +108,8 @@ impl processing::Processor for ErrorsProcessor { ctx: Context<'_>, ) -> Result, Rejected> { let mut error = process::expand(error, ctx)?; + #[cfg(feature = "processing")] + process::validate_attachments(&mut error, ctx); process::process(&mut error)?; diff --git a/relay-server/src/processing/errors/process.rs b/relay-server/src/processing/errors/process.rs index 110955b9b26..0c08f5dca3b 100644 --- a/relay-server/src/processing/errors/process.rs +++ b/relay-server/src/processing/errors/process.rs @@ -141,3 +141,25 @@ pub fn scrub(error: &mut Managed, ctx: Context<'_>) -> Result<(), Ok::<_, Error>(()) }) } + +/// Validates the attachments and drop any invalid ones. +/// +/// An attachment might be a placeholder, in which case it needs to be validated. +#[cfg(feature = "processing")] +pub fn validate_attachments(error: &mut Managed, ctx: Context<'_>) { + if !ctx.is_processing() { + return; + } + + error.modify(|error, records| { + error.attachments.retain_mut(|attachment| { + match processing::utils::attachments::validate(attachment, ctx.config) { + Ok(()) => true, + Err(err) => { + records.reject_err(err, &*attachment); + false + } + } + }); + }); +} diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index a460e6eb804..280295dc9fa 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -134,6 +134,12 @@ impl Processor for TransactionProcessor { relay_log::trace!("Expand transaction"); let mut tx = process::expand(tx)?; + #[cfg(feature = "processing")] + { + relay_log::trace!("Validate attachments"); + process::validate_attachments(&mut tx, ctx); + } + relay_log::trace!("Prepare transaction data"); process::prepare_data(&mut tx, &mut ctx, &mut metrics)?; diff --git a/relay-server/src/processing/transactions/process.rs b/relay-server/src/processing/transactions/process.rs index ad1e5de5500..e151dd3304c 100644 --- a/relay-server/src/processing/transactions/process.rs +++ b/relay-server/src/processing/transactions/process.rs @@ -445,3 +445,25 @@ impl Counted for IndexedSpans { smallvec![(DataCategory::SpanIndexed, self.0)] } } + +/// Validates the attachments and drop any invalid ones. +/// +/// An attachment might be a placeholder, in which case it needs to be validated. +#[cfg(feature = "processing")] +pub fn validate_attachments(transaction: &mut Managed>, ctx: Context<'_>) { + if !ctx.is_processing() { + return; + } + + transaction.modify(|transaction, records| { + transaction.attachments.retain_mut(|attachment| { + match utils::attachments::validate(attachment, ctx.config) { + Ok(()) => true, + Err(err) => { + records.reject_err(err, &*attachment); + false + } + } + }); + }); +} diff --git a/relay-server/src/processing/utils/attachments.rs b/relay-server/src/processing/utils/attachments.rs index 2cc612e026c..5843e52a87a 100644 --- a/relay-server/src/processing/utils/attachments.rs +++ b/relay-server/src/processing/utils/attachments.rs @@ -1,16 +1,44 @@ use std::error::Error; use std::time::Instant; +#[cfg(feature = "processing")] +use relay_config::Config; use relay_pii::{PiiAttachmentsProcessor, SelectorPathItem, SelectorSpec}; use relay_statsd::metric; use crate::envelope::{AttachmentType, ContentType, Item, ItemType}; use crate::managed::RecordKeeper; +#[cfg(feature = "processing")] +use crate::services::processor::ProcessingError; use crate::statsd::RelayTimers; use crate::services::projects::project::ProjectInfo; use relay_dynamic_config::Feature; +#[cfg(feature = "processing")] +pub fn validate<'a>(item: &Item, config: &Config) -> Result<(), ProcessingError> { + if !item.is_attachment_ref() { + return Ok(()); + } + let payload = item.payload(); + let payload: crate::envelope::AttachmentPlaceholder = + serde_json::from_slice(&payload).map_err(|_| ProcessingError::InvalidAttachmentRef)?; + let signed_location = crate::services::upload::SignedLocation::try_from_str(payload.location) + .ok_or(ProcessingError::InvalidAttachmentRef)?; + // NOTE: Using the received timestamp here breaks tests without a pop-relay. + let location = signed_location + .verify(chrono::Utc::now(), config) + .map_err(|_| ProcessingError::InvalidAttachmentRef)?; + let signed_length = location + .length + .ok_or(ProcessingError::InvalidAttachmentRef)?; + + match item.attachment_body_size() == signed_length { + true => Ok(()), + false => Err(ProcessingError::InvalidAttachmentRef), + } +} + /// Apply data privacy rules to attachments in the envelope. /// /// This only applies the new PII rules that explicitly select `ValueType::Binary` or one of the diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index d2b0c69904d..6223e5c8981 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -485,6 +485,10 @@ pub enum DiscardReason { /// (Relay) A trace attachment that has invalid item headers or attachment meta-data. InvalidTraceAttachment, + /// (Relay) A attachment ref that has invalid item headers or payload. + #[cfg(feature = "processing")] + InvalidAttachmentRef, + /// (Relay) A required feature is not enabled. FeatureDisabled(Feature), @@ -555,6 +559,8 @@ impl DiscardReason { DiscardReason::InvalidSpan => "invalid_span", DiscardReason::InvalidSpanAttachment => "invalid_span_attachment", DiscardReason::InvalidTraceAttachment => "invalid_trace_attachment", + #[cfg(feature = "processing")] + DiscardReason::InvalidAttachmentRef => "invalid_placeholder_attachment", DiscardReason::FeatureDisabled(_) => "feature_disabled", DiscardReason::TransactionAttachment => "transaction_attachment", DiscardReason::InvalidCheckIn => "invalid_check_in", diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 1474906eff1..d09d4273dd9 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -620,6 +620,10 @@ pub enum ProcessingError { ProcessingGroupMismatch, #[error("new processing pipeline failed")] ProcessingFailure, + + #[cfg(feature = "processing")] + #[error("invalid attachment reference")] + InvalidAttachmentRef, } impl ProcessingError { @@ -662,6 +666,10 @@ impl ProcessingError { Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)), // Outcomes are emitted in the new processing pipeline already. Self::ProcessingFailure => None, + #[cfg(feature = "processing")] + Self::InvalidAttachmentRef => { + Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef)) + } } } From a3db734f5d2203c4ec84824d871e922c4ca04c67 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 20 Mar 2026 15:37:43 +0100 Subject: [PATCH 06/19] add test for validation logic and update ratelimit test --- tests/integration/test_attachment_ref.py | 118 +++++++++++++++++++---- 1 file changed, 101 insertions(+), 17 deletions(-) diff --git a/tests/integration/test_attachment_ref.py b/tests/integration/test_attachment_ref.py index 03cc44bb731..0b7bdbe3833 100644 --- a/tests/integration/test_attachment_ref.py +++ b/tests/integration/test_attachment_ref.py @@ -1,30 +1,58 @@ import json import pytest import uuid - from unittest import mock + from requests.exceptions import HTTPError +from sentry_relay.consts import DataCategory from sentry_sdk.envelope import Envelope, Item, PayloadRef from .test_store import make_transaction -def make_envelope(event_id): - """Create an envelope with a single AttachmentRef item.""" +def make_envelope(event_id, relay, project_id, project_key): + """Create an envelope with a single valid AttachmentRef item.""" envelope = Envelope(headers=[["event_id", event_id]]) - envelope.add_item( - Item( - payload=PayloadRef(bytes=b""), - headers={ - "type": "attachment", - "content_type": "application/vnd.sentry.attachment-ref", - "attachment_length": 1, - }, - ) - ) + envelope.add_item(upload_and_make_ref(relay, project_id, project_key, data=b"x")) return envelope +def make_invalid_attachment_ref(offense, relay=None, project_id=None, project_key=None): + if offense == "invalid_payload": + return ( + Item( + payload=PayloadRef(bytes=b"not a valid payload"), + headers={ + "type": "attachment", + "content_type": "application/vnd.sentry.attachment-ref", + "length": 16, + "attachment_length": 100, + "filename": "test.txt", + }, + ), + 100, + ) + + if offense == "tampered_signature": + data = b"hello world" + ref_item = upload_and_make_ref(relay, project_id, project_key, data) + payload = json.loads(ref_item.payload.bytes) + location = payload["location"] + payload["location"] = location[:-1] + ("b" if location.endswith("a") else "a") + tampered = json.dumps(payload).encode() + ref_item.payload = PayloadRef(bytes=tampered) + ref_item.headers["length"] = len(tampered) + return ref_item, len(data) + + if offense == "spoofed_size": + data = b"" + ref_item = upload_and_make_ref(relay, project_id, project_key, data) + ref_item.headers["attachment_length"] = 1 + return ref_item, 1 + + raise ValueError(f"Unknown offense: {offense}") + + def upload_and_make_ref( relay, project_id, @@ -74,7 +102,9 @@ def test_attachment_ref_ratelimit( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) - + project_config["config"].setdefault("features", []).append( + "projects:relay-upload-endpoint" + ) project_config["config"]["quotas"] = [ { "id": f"test_rate_limiting_{uuid.uuid4().hex}", @@ -88,14 +118,14 @@ def test_attachment_ref_ratelimit( relay = relay_with_processing() attachments_consumer = attachments_consumer() outcomes_consumer = outcomes_consumer() + project_key = mini_sentry.get_dsn_public_key(project_id) # First envelope: should go through (200 response) - envelope = make_envelope(event_id) + envelope = make_envelope(event_id, relay, project_id, project_key) relay.send_envelope(project_id, envelope) attachments_consumer.get_individual_attachment() # Second envelope: rate limited but returns 200 - envelope = make_envelope(event_id) relay.send_envelope(project_id, envelope) outcomes_consumer.assert_rate_limited( "attachment_ref_exceeded", @@ -103,7 +133,6 @@ def test_attachment_ref_ratelimit( ) # Third envelope: returns 429 - envelope = make_envelope(event_id) with pytest.raises(HTTPError) as excinfo: relay.send_envelope(project_id, envelope) assert excinfo.value.response.status_code == 429 @@ -186,3 +215,58 @@ def test_attachment_ref( objectstore_session = objectstore("attachments", project_id) assert objectstore_session.get(stored_id).payload.read() == attachment_data + + +@pytest.mark.parametrize( + "event_type", + [ + pytest.param("none", id="standalone"), + pytest.param("event", id="with_event"), + pytest.param("transaction", id="with_transaction"), + ], +) +@pytest.mark.parametrize( + "offense", + [ + "invalid_payload", + "tampered_signature", + "spoofed_size", + ], +) +def test_attachment_ref_validation( + mini_sentry, + relay_with_processing, + outcomes_consumer, + event_type, + offense, +): + event_id = "515539018c9b4260a6f999572f1661ee" + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"].setdefault("features", []).append( + "projects:relay-upload-endpoint" + ) + + relay = relay_with_processing() + outcomes_consumer = outcomes_consumer() + project_key = mini_sentry.get_dsn_public_key(project_id) + + ref_item, expected_bytes_quantity = make_invalid_attachment_ref( + offense, relay, project_id, project_key + ) + envelope = Envelope(headers=[["event_id", event_id]]) + envelope.add_item(ref_item) + + if event_type == "event": + envelope.add_event({"message": "Hello, World!"}) + elif event_type == "transaction": + envelope.add_transaction(make_transaction({"event_id": event_id})) + + relay.send_envelope(project_id, envelope) + + outcomes = outcomes_consumer.get_outcomes(n=3 if event_type == "transaction" else 2) + o = {DataCategory(o["category"]): o for o in outcomes} + assert o[DataCategory.ATTACHMENT]["reason"] == "invalid_placeholder_attachment" + assert o[DataCategory.ATTACHMENT]["quantity"] == expected_bytes_quantity + assert o[DataCategory.ATTACHMENT_ITEM]["reason"] == "invalid_placeholder_attachment" + assert o[DataCategory.ATTACHMENT_ITEM]["quantity"] == 1 From 51f35609ef22f24ac9a89e1f51a520269c49fd92 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 20 Mar 2026 15:49:11 +0100 Subject: [PATCH 07/19] appease clippy --- relay-server/src/processing/utils/attachments.rs | 2 +- relay-server/src/services/objectstore.rs | 2 +- relay-server/src/services/store.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/relay-server/src/processing/utils/attachments.rs b/relay-server/src/processing/utils/attachments.rs index 5843e52a87a..8766a9eed9e 100644 --- a/relay-server/src/processing/utils/attachments.rs +++ b/relay-server/src/processing/utils/attachments.rs @@ -16,7 +16,7 @@ use crate::services::projects::project::ProjectInfo; use relay_dynamic_config::Feature; #[cfg(feature = "processing")] -pub fn validate<'a>(item: &Item, config: &Config) -> Result<(), ProcessingError> { +pub fn validate(item: &Item, config: &Config) -> Result<(), ProcessingError> { if !item.is_attachment_ref() { return Ok(()); } diff --git a/relay-server/src/services/objectstore.rs b/relay-server/src/services/objectstore.rs index 8c1ac1c2e99..eadb605a2af 100644 --- a/relay-server/src/services/objectstore.rs +++ b/relay-server/src/services/objectstore.rs @@ -294,7 +294,7 @@ impl ObjectstoreServiceInner { } Ok(session) => { for attachment in attachments { - if Self::should_skip_upload(&attachment) { + if Self::should_skip_upload(attachment) { continue; } let result = self diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 4e11738544f..a1a02c583e6 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -976,7 +976,7 @@ impl StoreService { let payload = item.payload(); let placeholder: AttachmentPlaceholder<'_> = serde_json::from_slice(&payload).map_err(|_| StoreError::InvalidAttachmentRef)?; - let store_key = SignedLocation::try_from_str(&placeholder.location) + let store_key = SignedLocation::try_from_str(placeholder.location) .ok_or(StoreError::InvalidAttachmentRef)? .unverified_key() .to_owned(); From 7b90e9747505c3904d6dc25f7a98267b60664993 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Mon, 23 Mar 2026 13:02:07 +0100 Subject: [PATCH 08/19] use `application/vnd.sentry.attachment-ref+json` --- tests/integration/test_attachment_ref.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_attachment_ref.py b/tests/integration/test_attachment_ref.py index 0b7bdbe3833..b67f96f1c0b 100644 --- a/tests/integration/test_attachment_ref.py +++ b/tests/integration/test_attachment_ref.py @@ -24,7 +24,7 @@ def make_invalid_attachment_ref(offense, relay=None, project_id=None, project_ke payload=PayloadRef(bytes=b"not a valid payload"), headers={ "type": "attachment", - "content_type": "application/vnd.sentry.attachment-ref", + "content_type": "application/vnd.sentry.attachment-ref+json", "length": 16, "attachment_length": 100, "filename": "test.txt", @@ -81,7 +81,7 @@ def upload_and_make_ref( payload=PayloadRef(bytes=payload.encode()), headers={ "type": "attachment", - "content_type": "application/vnd.sentry.attachment-ref", + "content_type": "application/vnd.sentry.attachment-ref+json", "length": len(payload), "attachment_length": len(data), "filename": filename, From efaa2530f65f500cf1c962173e4f727092ca9ae4 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Mon, 23 Mar 2026 13:07:40 +0100 Subject: [PATCH 09/19] use `verify` in store to get key --- relay-server/src/services/store.rs | 8 +++++--- relay-server/src/services/upload.rs | 4 ---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index a1a02c583e6..e9ab9430f93 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -976,10 +976,12 @@ impl StoreService { let payload = item.payload(); let placeholder: AttachmentPlaceholder<'_> = serde_json::from_slice(&payload).map_err(|_| StoreError::InvalidAttachmentRef)?; - let store_key = SignedLocation::try_from_str(placeholder.location) + let location = SignedLocation::try_from_str(placeholder.location) .ok_or(StoreError::InvalidAttachmentRef)? - .unverified_key() - .to_owned(); + .verify(Utc::now(), &self.config) + .map_err(|_| StoreError::InvalidAttachmentRef)?; + + let store_key = location.key; let attachment = ChunkedAttachment { id: Uuid::new_v4().to_string(), diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index 19ef2841e16..a4628a4f547 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -432,10 +432,6 @@ impl SignedLocation { Some(Self::from_parts(project_id, key, length, signature)) } - - pub fn unverified_key(&self) -> &str { - &self.location.key - } } enum RequestKind { From 6fcb34dffe6aeff7ae8765b9d7e1a718be78fd16 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Mon, 23 Mar 2026 13:10:01 +0100 Subject: [PATCH 10/19] use post + patch in test helper --- tests/integration/test_attachment_ref.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/tests/integration/test_attachment_ref.py b/tests/integration/test_attachment_ref.py index b67f96f1c0b..77b3d261378 100644 --- a/tests/integration/test_attachment_ref.py +++ b/tests/integration/test_attachment_ref.py @@ -62,19 +62,29 @@ def upload_and_make_ref( content_type="text/plain", attachment_type="event.attachment", ): - """Upload data via TUS, then construct an AttachmentRef item pointing at it.""" - # TODO: We want to use the new (POST and PATCH) approach here as well or is the old way fine? - response = relay.post( + """Upload data via TUS (POST + PATCH), then construct an AttachmentRef item pointing at it.""" + create_response = relay.post( f"/api/{project_id}/upload/?sentry_key={project_key}", headers={ + "Content-Length": "0", "Tus-Resumable": "1.0.0", - "Content-Type": "application/offset+octet-stream", "Upload-Length": str(len(data)), }, + ) + assert create_response.status_code == 201 + location = create_response.headers["Location"] + + patch_response = relay.patch( + f"{location}&sentry_key={project_key}", + headers={ + "Content-Length": str(len(data)), + "Content-Type": "application/offset+octet-stream", + "Tus-Resumable": "1.0.0", + "Upload-Offset": "0", + }, data=data, ) - assert response.status_code == 201 - location = response.headers["Location"] + assert patch_response.status_code == 204 payload = json.dumps({"location": location, "content_type": content_type}) return Item( From fab22b8dbd6ff891359a06c1f02a9201169cf001 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Mon, 23 Mar 2026 13:14:46 +0100 Subject: [PATCH 11/19] add changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d8ddfd669e..ea1c7316a22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ - Calculate and track accepted bytes per individual trace metric item via `TraceMetricByte` data category. ([#5744](https://github.com/getsentry/relay/pull/5744)) - Use new processor architecture to process standalone profiles. ([#5741](https://github.com/getsentry/relay/pull/5741)) - TUS: Disallow creation with upload. ([#5734](https://github.com/getsentry/relay/pull/5734)) +- Add logic to verify attachment placeholders and correctly store them. ([#5747](https://github.com/getsentry/relay/pull/5747)) ## 26.3.1 From dec60e7dc15196aed83485d26b71904c9d2f198d Mon Sep 17 00:00:00 2001 From: Tobias Wilfert <36408720+tobias-wilfert@users.noreply.github.com> Date: Tue, 24 Mar 2026 10:12:37 +0100 Subject: [PATCH 12/19] Update relay-server/src/services/objectstore.rs Co-authored-by: Sebastian Zivota --- relay-server/src/services/objectstore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/objectstore.rs b/relay-server/src/services/objectstore.rs index eadb605a2af..30d1439141c 100644 --- a/relay-server/src/services/objectstore.rs +++ b/relay-server/src/services/objectstore.rs @@ -521,7 +521,7 @@ impl ObjectstoreServiceInner { /// Returns `true` if the item should **not** be uploaded to the objectstore. /// - /// The case for: + /// This is the case for: /// - Zero-size attachments /// - Attachment placeholders fn should_skip_upload(item: &Item) -> bool { From 6612cb319290e49570550c69952b4becee16a7ef Mon Sep 17 00:00:00 2001 From: Tobias Wilfert <36408720+tobias-wilfert@users.noreply.github.com> Date: Tue, 24 Mar 2026 10:13:06 +0100 Subject: [PATCH 13/19] Update relay-server/src/services/outcome.rs Co-authored-by: Sebastian Zivota --- relay-server/src/services/outcome.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index 6223e5c8981..346563fd2b4 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -485,7 +485,7 @@ pub enum DiscardReason { /// (Relay) A trace attachment that has invalid item headers or attachment meta-data. InvalidTraceAttachment, - /// (Relay) A attachment ref that has invalid item headers or payload. + /// (Relay) An attachment ref that has invalid item headers or payload. #[cfg(feature = "processing")] InvalidAttachmentRef, From 0dd5dd56c9f426d4b69850ef7ed56585e4f8ac55 Mon Sep 17 00:00:00 2001 From: Tobias Wilfert <36408720+tobias-wilfert@users.noreply.github.com> Date: Tue, 24 Mar 2026 10:13:21 +0100 Subject: [PATCH 14/19] Update relay-server/src/processing/attachments/process.rs Co-authored-by: Joris Bayer --- relay-server/src/processing/attachments/process.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/processing/attachments/process.rs b/relay-server/src/processing/attachments/process.rs index d0a90512480..fc2afa98c06 100644 --- a/relay-server/src/processing/attachments/process.rs +++ b/relay-server/src/processing/attachments/process.rs @@ -19,7 +19,7 @@ pub fn scrub( /// Validates the attachments and drop any invalid ones. /// -/// An attachment might be a placeholder, in which case it needs to be validated. +/// An attachment might be a placeholder, in which case its signature needs to be verified. #[cfg(feature = "processing")] pub fn validate_attachments( attachments: &mut Managed, From 56171c9714917a1c563c612081c3ef204291e5b4 Mon Sep 17 00:00:00 2001 From: Tobias Wilfert <36408720+tobias-wilfert@users.noreply.github.com> Date: Tue, 24 Mar 2026 10:15:44 +0100 Subject: [PATCH 15/19] Update relay-server/src/processing/utils/attachments.rs Co-authored-by: Joris Bayer --- relay-server/src/processing/utils/attachments.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/processing/utils/attachments.rs b/relay-server/src/processing/utils/attachments.rs index 8766a9eed9e..59bc2b4f892 100644 --- a/relay-server/src/processing/utils/attachments.rs +++ b/relay-server/src/processing/utils/attachments.rs @@ -21,7 +21,7 @@ pub fn validate(item: &Item, config: &Config) -> Result<(), ProcessingError> { return Ok(()); } let payload = item.payload(); - let payload: crate::envelope::AttachmentPlaceholder = + let payload: AttachmentPlaceholder = serde_json::from_slice(&payload).map_err(|_| ProcessingError::InvalidAttachmentRef)?; let signed_location = crate::services::upload::SignedLocation::try_from_str(payload.location) .ok_or(ProcessingError::InvalidAttachmentRef)?; From 688ebaab54a9efece188855a0b1cebe46e2d4f95 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Mon, 30 Mar 2026 13:50:11 +0200 Subject: [PATCH 16/19] add org id --- relay-server/src/services/store.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index d9a3b7aefc8..c49414a68cb 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -979,6 +979,7 @@ impl StoreService { &self, event_id: EventId, project_id: ProjectId, + org_id: OrganizationId, item: &Item, send_individual_attachments: bool, ) -> Result, StoreError> { @@ -1012,6 +1013,7 @@ impl StoreService { event_id, project_id, attachment, + org_id, }); self.produce(KafkaTopic::Attachments, message)?; Ok(None) @@ -1045,6 +1047,7 @@ impl StoreService { return self.produce_attachment_ref( event_id, project_id, + org_id, item, send_individual_attachments, ); From 1345053a88e8513368ad179771d42087f07bb4e6 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Mon, 30 Mar 2026 13:50:42 +0200 Subject: [PATCH 17/19] add correct import --- relay-server/src/processing/utils/attachments.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/relay-server/src/processing/utils/attachments.rs b/relay-server/src/processing/utils/attachments.rs index 59bc2b4f892..26452042d13 100644 --- a/relay-server/src/processing/utils/attachments.rs +++ b/relay-server/src/processing/utils/attachments.rs @@ -6,6 +6,8 @@ use relay_config::Config; use relay_pii::{PiiAttachmentsProcessor, SelectorPathItem, SelectorSpec}; use relay_statsd::metric; +#[cfg(feature = "processing")] +use crate::envelope::AttachmentPlaceholder; use crate::envelope::{AttachmentType, ContentType, Item, ItemType}; use crate::managed::RecordKeeper; #[cfg(feature = "processing")] From ab7163bdf52d9bd24acaefe636514005b5a44925 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Mon, 30 Mar 2026 14:25:09 +0200 Subject: [PATCH 18/19] move `#[cfg(feature = "processing")]` into validate function --- .../src/processing/attachments/mod.rs | 3 +- .../src/processing/attachments/process.rs | 1 - relay-server/src/processing/errors/mod.rs | 2 +- relay-server/src/processing/errors/process.rs | 1 - .../src/processing/transactions/mod.rs | 7 +-- .../src/processing/transactions/process.rs | 1 - .../src/processing/utils/attachments.rs | 50 +++++++++++-------- 7 files changed, 32 insertions(+), 33 deletions(-) diff --git a/relay-server/src/processing/attachments/mod.rs b/relay-server/src/processing/attachments/mod.rs index 3846707cdb5..d3c91d6e425 100644 --- a/relay-server/src/processing/attachments/mod.rs +++ b/relay-server/src/processing/attachments/mod.rs @@ -88,7 +88,7 @@ impl processing::Processor for AttachmentProcessor { async fn process( &self, - #[allow(unused_mut)] mut attachments: Managed, + mut attachments: Managed, ctx: processing::Context<'_>, ) -> Result, Rejected> { let client_name = crate::utils::client_name_tag(attachments.headers.meta().client_name()); @@ -125,7 +125,6 @@ impl processing::Processor for AttachmentProcessor { } } - #[cfg(feature = "processing")] process::validate_attachments(&mut attachments, ctx); let mut attachments = self.limiter.enforce_quotas(attachments, ctx).await?; process::scrub(&mut attachments, ctx)?; diff --git a/relay-server/src/processing/attachments/process.rs b/relay-server/src/processing/attachments/process.rs index fc2afa98c06..d799fa4803b 100644 --- a/relay-server/src/processing/attachments/process.rs +++ b/relay-server/src/processing/attachments/process.rs @@ -20,7 +20,6 @@ pub fn scrub( /// Validates the attachments and drop any invalid ones. /// /// An attachment might be a placeholder, in which case its signature needs to be verified. -#[cfg(feature = "processing")] pub fn validate_attachments( attachments: &mut Managed, ctx: processing::Context<'_>, diff --git a/relay-server/src/processing/errors/mod.rs b/relay-server/src/processing/errors/mod.rs index 8c9c48c8efa..6a2ee103be4 100644 --- a/relay-server/src/processing/errors/mod.rs +++ b/relay-server/src/processing/errors/mod.rs @@ -108,7 +108,7 @@ impl processing::Processor for ErrorsProcessor { ctx: Context<'_>, ) -> Result, Rejected> { let mut error = process::expand(error, ctx)?; - #[cfg(feature = "processing")] + process::validate_attachments(&mut error, ctx); process::process(&mut error)?; diff --git a/relay-server/src/processing/errors/process.rs b/relay-server/src/processing/errors/process.rs index 0c08f5dca3b..f20c23a4e7d 100644 --- a/relay-server/src/processing/errors/process.rs +++ b/relay-server/src/processing/errors/process.rs @@ -145,7 +145,6 @@ pub fn scrub(error: &mut Managed, ctx: Context<'_>) -> Result<(), /// Validates the attachments and drop any invalid ones. /// /// An attachment might be a placeholder, in which case it needs to be validated. -#[cfg(feature = "processing")] pub fn validate_attachments(error: &mut Managed, ctx: Context<'_>) { if !ctx.is_processing() { return; diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index 280295dc9fa..6f106e454b1 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -134,11 +134,8 @@ impl Processor for TransactionProcessor { relay_log::trace!("Expand transaction"); let mut tx = process::expand(tx)?; - #[cfg(feature = "processing")] - { - relay_log::trace!("Validate attachments"); - process::validate_attachments(&mut tx, ctx); - } + relay_log::trace!("Validate attachments"); + process::validate_attachments(&mut tx, ctx); relay_log::trace!("Prepare transaction data"); process::prepare_data(&mut tx, &mut ctx, &mut metrics)?; diff --git a/relay-server/src/processing/transactions/process.rs b/relay-server/src/processing/transactions/process.rs index e151dd3304c..99f6ea77312 100644 --- a/relay-server/src/processing/transactions/process.rs +++ b/relay-server/src/processing/transactions/process.rs @@ -449,7 +449,6 @@ impl Counted for IndexedSpans { /// Validates the attachments and drop any invalid ones. /// /// An attachment might be a placeholder, in which case it needs to be validated. -#[cfg(feature = "processing")] pub fn validate_attachments(transaction: &mut Managed>, ctx: Context<'_>) { if !ctx.is_processing() { return; diff --git a/relay-server/src/processing/utils/attachments.rs b/relay-server/src/processing/utils/attachments.rs index 26452042d13..6f989c24321 100644 --- a/relay-server/src/processing/utils/attachments.rs +++ b/relay-server/src/processing/utils/attachments.rs @@ -1,7 +1,6 @@ use std::error::Error; use std::time::Instant; -#[cfg(feature = "processing")] use relay_config::Config; use relay_pii::{PiiAttachmentsProcessor, SelectorPathItem, SelectorSpec}; use relay_statsd::metric; @@ -10,34 +9,41 @@ use relay_statsd::metric; use crate::envelope::AttachmentPlaceholder; use crate::envelope::{AttachmentType, ContentType, Item, ItemType}; use crate::managed::RecordKeeper; -#[cfg(feature = "processing")] use crate::services::processor::ProcessingError; use crate::statsd::RelayTimers; use crate::services::projects::project::ProjectInfo; use relay_dynamic_config::Feature; -#[cfg(feature = "processing")] +#[cfg_attr(not(feature = "processing"), expect(unused_variables))] pub fn validate(item: &Item, config: &Config) -> Result<(), ProcessingError> { - if !item.is_attachment_ref() { - return Ok(()); - } - let payload = item.payload(); - let payload: AttachmentPlaceholder = - serde_json::from_slice(&payload).map_err(|_| ProcessingError::InvalidAttachmentRef)?; - let signed_location = crate::services::upload::SignedLocation::try_from_str(payload.location) - .ok_or(ProcessingError::InvalidAttachmentRef)?; - // NOTE: Using the received timestamp here breaks tests without a pop-relay. - let location = signed_location - .verify(chrono::Utc::now(), config) - .map_err(|_| ProcessingError::InvalidAttachmentRef)?; - let signed_length = location - .length - .ok_or(ProcessingError::InvalidAttachmentRef)?; - - match item.attachment_body_size() == signed_length { - true => Ok(()), - false => Err(ProcessingError::InvalidAttachmentRef), + #[cfg(not(feature = "processing"))] + return Ok(()); + + #[cfg(feature = "processing")] + { + if !item.is_attachment_ref() { + return Ok(()); + } + + let payload = item.payload(); + let payload: AttachmentPlaceholder = + serde_json::from_slice(&payload).map_err(|_| ProcessingError::InvalidAttachmentRef)?; + let signed_location = + crate::services::upload::SignedLocation::try_from_str(payload.location) + .ok_or(ProcessingError::InvalidAttachmentRef)?; + // NOTE: Using the received timestamp here breaks tests without a pop-relay. + let location = signed_location + .verify(chrono::Utc::now(), config) + .map_err(|_| ProcessingError::InvalidAttachmentRef)?; + let signed_length = location + .length + .ok_or(ProcessingError::InvalidAttachmentRef)?; + + match item.attachment_body_size() == signed_length { + true => Ok(()), + false => Err(ProcessingError::InvalidAttachmentRef), + } } } From d2d48f6bd04754b519e2ebab374ca6cfa1f4ada9 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Mon, 30 Mar 2026 14:25:52 +0200 Subject: [PATCH 19/19] extract common logic for `produce_attachment` --- relay-server/src/services/store.rs | 98 +++++++++++++++--------------- 1 file changed, 49 insertions(+), 49 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index c49414a68cb..0bf63d744e0 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -64,7 +64,15 @@ impl OutcomeError for StoreError { type Error = Self; fn consume(self) -> (Option, Self::Error) { - (Some(Outcome::Invalid(DiscardReason::Internal)), self) + let outcome = match self { + StoreError::SendFailed(_) | StoreError::EncodingFailed(_) | StoreError::NoEventId => { + Some(Outcome::Invalid(DiscardReason::Internal)) + } + StoreError::InvalidAttachmentRef => { + Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef)) + } + }; + (outcome, self) } } @@ -975,14 +983,10 @@ impl StoreService { Ok(()) } - fn produce_attachment_ref( + fn chunked_attachment_from_placeholder( &self, - event_id: EventId, - project_id: ProjectId, - org_id: OrganizationId, item: &Item, - send_individual_attachments: bool, - ) -> Result, StoreError> { + ) -> Result { debug_assert!( item.stored_key().is_none(), "AttachmentRef should not have been uploaded to objectstore" @@ -998,7 +1002,7 @@ impl StoreService { let store_key = location.key; - let attachment = ChunkedAttachment { + Ok(ChunkedAttachment { id: Uuid::new_v4().to_string(), name: item.filename().unwrap_or(UNNAMED_ATTACHMENT).to_owned(), rate_limited: item.rate_limited(), @@ -1006,53 +1010,17 @@ impl StoreService { attachment_type: item.attachment_type().unwrap_or_default(), size: item.attachment_body_size(), payload: AttachmentPayload::Stored(store_key), - }; - - if send_individual_attachments { - let message = KafkaMessage::Attachment(AttachmentKafkaMessage { - event_id, - project_id, - attachment, - org_id, - }); - self.produce(KafkaTopic::Attachments, message)?; - Ok(None) - } else { - Ok(Some(attachment)) - } + }) } - /// Produces Kafka messages for the content and metadata of an attachment item. - /// - /// The `send_individual_attachments` controls whether the metadata of an attachment - /// is produced directly as an individual `attachment` message, or returned from this function - /// to be later sent as part of an `event` message. - /// - /// Attachment contents are chunked and sent as multiple `attachment_chunk` messages, - /// unless the `send_individual_attachments` flag is set, and the content is small enough - /// to fit inside a message. - /// In that case, no `attachment_chunk` is produced, but the content is sent as part - /// of the `attachment` message instead. - fn produce_attachment( + fn chunked_attachment_from_attachment( &self, event_id: EventId, project_id: ProjectId, org_id: OrganizationId, item: &Item, send_individual_attachments: bool, - ) -> Result, StoreError> { - // If the attachment is actually a placeholder we need to do some logic that does not neatly - // fit into this function. - if item.is_attachment_ref() { - return self.produce_attachment_ref( - event_id, - project_id, - org_id, - item, - send_individual_attachments, - ); - } - + ) -> Result { let id = Uuid::new_v4().to_string(); let payload = item.payload(); @@ -1097,7 +1065,7 @@ impl StoreService { AttachmentPayload::Chunked(chunk_index) }; - let attachment = ChunkedAttachment { + Ok(ChunkedAttachment { id, name: match item.filename() { Some(name) => name.to_owned(), @@ -1108,7 +1076,39 @@ impl StoreService { attachment_type: item.attachment_type().unwrap_or_default(), size, payload, - }; + }) + } + + /// Produces Kafka messages for the content and metadata of an attachment item. + /// + /// The `send_individual_attachments` controls whether the metadata of an attachment + /// is produced directly as an individual `attachment` message, or returned from this function + /// to be later sent as part of an `event` message. + /// + /// Attachment contents are chunked and sent as multiple `attachment_chunk` messages, + /// unless the `send_individual_attachments` flag is set, and the content is small enough + /// to fit inside a message. + /// In that case, no `attachment_chunk` is produced, but the content is sent as part + /// of the `attachment` message instead. + fn produce_attachment( + &self, + event_id: EventId, + project_id: ProjectId, + org_id: OrganizationId, + item: &Item, + send_individual_attachments: bool, + ) -> Result, StoreError> { + let attachment = if item.is_attachment_ref() { + self.chunked_attachment_from_placeholder(item) + } else { + self.chunked_attachment_from_attachment( + event_id, + project_id, + org_id, + item, + send_individual_attachments, + ) + }?; if send_individual_attachments { let message = KafkaMessage::Attachment(AttachmentKafkaMessage {