diff --git a/Cargo.lock b/Cargo.lock index 5c049e144..cf62a1150 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -486,7 +486,7 @@ dependencies = [ "aws-sdk-sts", "aws-smithy-async", "aws-smithy-http 0.63.6", - "aws-smithy-json 0.62.5", + "aws-smithy-json 0.62.6", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -536,9 +536,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.7.3" +version = "1.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dcd93c82209ac7413532388067dce79be5a8780c1786e5fae3df22e4dee2864" +checksum = "77ed8e8c52d2dc2390ad9f15647fe663f71e9780b4262c190fbb823a32721566" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -596,6 +596,30 @@ dependencies = [ "url", ] +[[package]] +name = "aws-sdk-sqs" +version = "1.99.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce61cf7e451891862a315dc96e1dbeb5e6a6f3740b354b5243217602b7e437b" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http 0.63.6", + "aws-smithy-json 0.62.6", + "aws-smithy-observability", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.4.0", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sts" version = "1.103.0" @@ -606,7 +630,7 @@ dependencies = [ "aws-runtime", "aws-smithy-async", "aws-smithy-http 0.63.6", - "aws-smithy-json 0.62.5", + "aws-smithy-json 0.62.6", "aws-smithy-observability", "aws-smithy-query", "aws-smithy-runtime", @@ -623,9 +647,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.4.3" +version = "1.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68dc0b907359b120170613b5c09ccc61304eac3998ff6274b97d93ee6490115a" +checksum = "b7083fb918b38474ac65ffbf8a69fc8792d36879f4ac5f1667b43aec61efe9a5" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -764,10 +788,12 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.62.5" +version = "0.62.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9648b0bb82a2eedd844052c6ad2a1a822d1f8e3adee5fbf668366717e428856a" +checksum = "517089205f18ab4adc5a3e02888cb139bbbbb2e168eac9f396216925d1fbeaf5" dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-schema", "aws-smithy-types", ] @@ -792,15 +818,16 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.11.1" +version = "1.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0504b1ab12debb5959e5165ee5fe97dd387e7aa7ea6a477bfd7635dfe769a4f5" +checksum = "b8e6f5caf6fea86f8c2206541ab5857cfcda9013426cdbe8fa0098b9e2d32182" dependencies = [ "aws-smithy-async", "aws-smithy-http 0.63.6", "aws-smithy-http-client", "aws-smithy-observability", "aws-smithy-runtime-api", + "aws-smithy-schema", "aws-smithy-types", "bytes", "fastrand", @@ -817,9 +844,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.12.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b71a13df6ada0aafbf21a73bdfcdf9324cfa9df77d96b8446045be3cde61b42e" +checksum = "dc117c179ecf39a62a0a3f49f600e9ac26a7ad7dd172177999f83933af776c32" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api-macros", @@ -844,11 +871,22 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "aws-smithy-schema" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7442cb268338f0eb8278140a107c046756aa01093d8ef5e99628d34ae09c94f5" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "http 1.4.0", +] + [[package]] name = "aws-smithy-types" -version = "1.4.7" +version = "1.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d73dbfbaa8e4bc57b9045137680b958d274823509a360abfd8e1d514d40c95c" +checksum = "056b66dbce2f81cc0c1e2b05bb402eb58f8a3530479d650efadd5bbae9a4050b" dependencies = [ "base64-simd", "bytes", @@ -891,13 +929,14 @@ dependencies = [ [[package]] name = "aws-types" -version = "1.3.15" +version = "1.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f4bbcaa9304ea40902d3d5f42a0428d1bd895a2b0f6999436fb279ffddc58ac" +checksum = "d16bf10b03a3c01e6b3b7d47cd964e873ffe9e7d4e80fad16bd4c077cb068531" dependencies = [ "aws-credential-types", "aws-smithy-async", "aws-smithy-runtime-api", + "aws-smithy-schema", "aws-smithy-types", "rustc_version", "tracing", @@ -1245,8 +1284,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] @@ -2107,6 +2148,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "docs_rs_crates_io" +version = "0.1.0" +dependencies = [ + "chrono", + "semver", + "serde", + "serde_json", +] + [[package]] name = "docs_rs_database" version = "0.0.0" @@ -2445,6 +2496,7 @@ name = "docs_rs_watcher" version = "0.6.0" dependencies = [ "anyhow", + "aws-sdk-sqs", "clap", "crates-index", "crates-index-diff", @@ -2471,6 +2523,7 @@ dependencies = [ "test-case", "tokio", "tracing", + "url", ] [[package]] @@ -6852,9 +6905,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.149" +version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9" dependencies = [ "itoa 1.0.18", "memchr", diff --git a/crates/bin/docs_rs_watcher/Cargo.toml b/crates/bin/docs_rs_watcher/Cargo.toml index 0b0f9ba6a..caeaefa97 100644 --- a/crates/bin/docs_rs_watcher/Cargo.toml +++ b/crates/bin/docs_rs_watcher/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true [dependencies] anyhow = { workspace = true } +aws-sdk-sqs = { version = "1.99.0", default-features = false, features = ["default-https-client", "rt-tokio"] } clap = { workspace = true } # NOTE: on the new infra, switch back from `git-https-reqwest` to `git-https` (curl) once the curl version is new enough crates-index = { version = "3.0.0", default-features = false, features = ["git", "git-https-reqwest", "git-performance", "parallel"] } @@ -32,6 +33,7 @@ rayon = "1.6.1" sqlx = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +url = { workspace = true } [dev-dependencies] docs_rs_config = { path = "../../lib/docs_rs_config", features = ["testing"] } diff --git a/crates/bin/docs_rs_watcher/src/config.rs b/crates/bin/docs_rs_watcher/src/config.rs index 7b5f17976..404ade8f5 100644 --- a/crates/bin/docs_rs_watcher/src/config.rs +++ b/crates/bin/docs_rs_watcher/src/config.rs @@ -2,11 +2,14 @@ use anyhow::Result; use docs_rs_config::AppConfig; use docs_rs_env_vars::{env, maybe_env, require_env}; use std::{path::PathBuf, time::Duration}; +use url::Url; #[derive(Debug)] pub struct Config { pub registry_index_path: PathBuf, pub registry_url: Option, + pub sqs_queue_url: Option, + pub sqs_region: Option, /// How long to wait between registry checks pub delay_between_registry_fetches: Duration, @@ -29,6 +32,8 @@ impl AppConfig for Config { Ok(Self { registry_index_path: env("REGISTRY_INDEX_PATH", prefix.join("crates.io-index"))?, registry_url: maybe_env("REGISTRY_URL")?, + sqs_queue_url: maybe_env("DOCSRS_SQS_QUEUE_URL")?, + sqs_region: maybe_env("DOCSRS_SQS_REGION")?, delay_between_registry_fetches: Duration::from_secs(env::( "DOCSRS_DELAY_BETWEEN_REGISTRY_FETCHES", 60, diff --git a/crates/bin/docs_rs_watcher/src/db/delete.rs b/crates/bin/docs_rs_watcher/src/db/delete.rs index dbfa0e58e..742bbb634 100644 --- a/crates/bin/docs_rs_watcher/src/db/delete.rs +++ b/crates/bin/docs_rs_watcher/src/db/delete.rs @@ -170,14 +170,18 @@ async fn delete_version_from_database( format!("DELETE FROM {table} WHERE {column} IN (SELECT id FROM releases WHERE crate_id = $1 AND version = $2)").as_str()) .bind(crate_id).bind(version).execute(&mut *transaction).await?; } - let is_library: bool = sqlx::query_scalar!( + let Some(is_library) = sqlx::query_scalar!( "DELETE FROM releases WHERE crate_id = $1 AND version = $2 RETURNING is_library", crate_id.0, version as _, ) - .fetch_one(&mut *transaction) + .fetch_optional(&mut *transaction) .await? - .unwrap_or(false); + else { + transaction.commit().await?; + return Ok(false); + }; + let is_library = is_library.unwrap_or(false); sqlx::query!( "DELETE FROM queue WHERE name = $1 AND version = $2;", @@ -690,6 +694,32 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread")] + async fn test_delete_already_deleted_version_doesnt_error() -> Result<()> { + let env = TestEnvironment::new().await?; + let mut conn = env.async_conn().await?; + + env.fake_release() + .await + .name(&KRATE) + .version(V1) + .create() + .await?; + env.fake_release() + .await + .name(&KRATE) + .version(V2) + .create() + .await?; + + delete_version(&mut conn, env.storage()?, env.config(), &KRATE, &V1).await?; + delete_version(&mut conn, env.storage()?, env.config(), &KRATE, &V1).await?; + + assert!(crate_exists(&mut conn, &KRATE).await?); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] async fn test_delete_version_waits_for_locked_queue_rows() -> Result<()> { let env = TestEnvironment::new().await?; diff --git a/crates/lib/docs_rs_crates_io/Cargo.toml b/crates/lib/docs_rs_crates_io/Cargo.toml new file mode 100644 index 000000000..96b373c89 --- /dev/null +++ b/crates/lib/docs_rs_crates_io/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "docs_rs_crates_io" +version = "0.1.0" +description = "types & logic for the direct integration between docs.rs & crates.io" + +authors.workspace = true +license.workspace = true +repository.workspace = true +edition.workspace = true + +[dependencies] +chrono = { version = "0.4", features = ["serde"] } +serde = { version = "1", features = ["derive"] } +semver = { version = "1", features = ["serde"] } + +[dev-dependencies] +serde_json = "1.0" + +[lints] +workspace = true diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs new file mode 100644 index 000000000..f01933db6 --- /dev/null +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -0,0 +1,207 @@ +#![allow(clippy::disallowed_types)] + +use chrono::{DateTime, Utc}; +use std::fmt; + +/// A change that can happen to a crate on our index. +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +#[serde(tag = "type", content = "payload", rename_all = "snake_case")] +pub enum IndexChangeV1 { + /// A crate version was added. + Added(CrateVersion), + /// A crate version was unyanked. + Unyanked(CrateVersion), + /// A crate version was yanked. + Yanked(CrateVersion), + /// The name of the crate whose file was deleted, which implies all versions were deleted as well. + CrateDeleted { name: String }, + /// A crate version was deleted. + VersionDeleted(CrateVersion), +} + +impl fmt::Display for IndexChangeV1 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}", + match *self { + IndexChangeV1::Added(_) => "added", + IndexChangeV1::Yanked(_) => "yanked", + IndexChangeV1::CrateDeleted { .. } => "crate deleted", + IndexChangeV1::VersionDeleted(_) => "version deleted", + IndexChangeV1::Unyanked(_) => "unyanked", + } + ) + } +} + +/// A conventional event envelope for our events between crates.io & docs.rs +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +pub struct Event { + /// Unique event identifier for deduplication and tracing. + pub id: String, + /// Timestamp when the event occured + pub occurred_at: DateTime, + /// The typed payload. + #[serde(flatten)] + pub change: T, +} + +/// The first version of the public event wire format. +pub type IndexChangeEventV1 = Event; + +/// Pack all information we know about a change made to a version of a crate. +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +pub struct CrateVersion { + /// The crate name, i.e. `clap`. + pub name: String, + /// is the release yanked? + pub yanked: bool, + /// The semantic version of the crate. + #[serde(rename = "vers")] + pub version: semver::Version, +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn crate_version() -> CrateVersion { + CrateVersion { + name: "clap".into(), + yanked: false, + version: semver::Version::new(4, 5, 0), + } + } + + fn event(change: IndexChangeV1) -> IndexChangeEventV1 { + IndexChangeEventV1 { + id: "evt_123".into(), + occurred_at: DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z") + .unwrap() + .with_timezone(&Utc), + change, + } + } + + #[test] + fn crate_version_serializes_with_vers_field() { + let event = crate_version(); + + assert_eq!( + serde_json::to_value(&event).unwrap(), + json!({ + "name": "clap", + "yanked": false, + "vers": "4.5.0", + }) + ); + } + + #[test] + fn change_serializes_with_expected_variant_shapes() { + let crate_version = crate_version(); + + let cases = [ + ( + IndexChangeV1::Added(crate_version.clone()), + json!({ + "type": "added", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::Unyanked(crate_version.clone()), + json!({ + "type": "unyanked", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::Yanked(crate_version.clone()), + json!({ + "type": "yanked", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::CrateDeleted { + name: "old-crate".into(), + }, + json!({ + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + }), + ), + ( + IndexChangeV1::VersionDeleted(crate_version), + json!({ + "type": "version_deleted", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ]; + + for (event, expected) in cases { + assert_eq!(serde_json::to_value(&event).unwrap(), expected); + } + } + + #[test] + fn event_serializes_with_minimum_metadata() { + let event = event(IndexChangeV1::CrateDeleted { + name: "old-crate".into(), + }); + + assert_eq!( + serde_json::to_value(&event).unwrap(), + json!({ + "id": "evt_123", + "occurred_at": "2026-05-22T12:34:56Z", + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + }) + ); + } + + #[test] + fn event_deserializes_rfc3339_occurred_at() { + let event: IndexChangeEventV1 = serde_json::from_value(json!({ + "id": "evt_123", + "occurred_at": "2026-05-22T12:34:56Z", + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + })) + .unwrap(); + + assert_eq!( + event.occurred_at, + DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z") + .unwrap() + .with_timezone(&Utc) + ); + } +} diff --git a/crates/lib/docs_rs_crates_io/src/lib.rs b/crates/lib/docs_rs_crates_io/src/lib.rs new file mode 100644 index 000000000..a9970c28f --- /dev/null +++ b/crates/lib/docs_rs_crates_io/src/lib.rs @@ -0,0 +1 @@ +pub mod events;