diff --git a/common/src/api/mod.rs b/common/src/api/mod.rs index 6df5a84..3de011a 100644 --- a/common/src/api/mod.rs +++ b/common/src/api/mod.rs @@ -104,8 +104,7 @@ impl Client { url } - fn get(&self, path: Cow<'static, str>) -> crate::http::RequestBuilder { - let mut req = self.client.get(self.url_join(&path)); + fn add_authentication(&self, mut req: RequestBuilder) -> RequestBuilder { if let Some(auth_cookie) = &self.auth_cookie { req = req.header(AUTH_COOKIE_HEADER, auth_cookie); } @@ -121,38 +120,20 @@ impl Client { req } - fn post(&self, path: Cow<'static, str>) -> crate::http::RequestBuilder { - let mut req = self.client.post(self.url_join(&path)); - if let Some(auth_cookie) = &self.auth_cookie { - req = req.header(AUTH_COOKIE_HEADER, auth_cookie); - } - - if let Some(worker_key) = &self.worker_key { - req = req.header(WORKER_KEY_HEADER, worker_key); - } - - if let Some(signup_secret) = &self.signup_secret { - req = req.header(SIGNUP_SECRET_HEADER, signup_secret); - } - - req + fn get(&self, path: Cow<'static, str>) -> RequestBuilder { + self.add_authentication(self.client.get(self.url_join(&path))) } - fn delete(&self, path: Cow<'static, str>) -> crate::http::RequestBuilder { - let mut req = self.client.delete(self.url_join(&path)); - if let Some(auth_cookie) = &self.auth_cookie { - req = req.header(AUTH_COOKIE_HEADER, auth_cookie); - } - - if let Some(worker_key) = &self.worker_key { - req = req.header(WORKER_KEY_HEADER, worker_key); - } + fn post(&self, path: Cow<'static, str>) -> RequestBuilder { + self.add_authentication(self.client.post(self.url_join(&path))) + } - if let Some(signup_secret) = &self.signup_secret { - req = req.header(SIGNUP_SECRET_HEADER, signup_secret); - } + fn put(&self, path: Cow<'static, str>) -> RequestBuilder { + self.add_authentication(self.client.put(self.url_join(&path))) + } - req + fn delete(&self, path: Cow<'static, str>) -> RequestBuilder { + self.add_authentication(self.client.delete(self.url_join(&path))) } } diff --git a/common/src/api/v1/mod.rs b/common/src/api/v1/mod.rs index b1c94a1..ce963f2 100644 --- a/common/src/api/v1/mod.rs +++ b/common/src/api/v1/mod.rs @@ -196,6 +196,20 @@ pub trait WorkerRestApi { async fn register_worker(&self, request: RegisterWorkerRequest) -> Result<()>; async fn get_worker(&self, id: i32) -> Result; async fn unregister_worker(&self, id: i32) -> Result<()>; + async fn get_worker_tags(&self, id: i32) -> Result>; + async fn set_worker_tags(&self, id: i32, tags: Vec) -> Result<()>; + async fn create_worker_tag(&self, id: i32, tag: String) -> Result<()>; + async fn delete_worker_tag(&self, id: i32, tag: String) -> Result<()>; +} + +#[async_trait] +pub trait TagRestApi { + async fn get_tags(&self) -> Result>; + async fn create_tag(&self, request: CreateTagRequest) -> Result; + async fn delete_tag(&self, tag: String) -> Result<()>; + async fn get_tag_rules(&self, tag: String) -> Result>; + async fn create_tag_rule(&self, tag: String, request: CreateTagRuleRequest) -> Result; + async fn delete_tag_rule(&self, tag: String, tag_rule_id: i32) -> Result<()>; } #[async_trait] @@ -664,4 +678,115 @@ impl WorkerRestApi for Client { Ok(()) } + + async fn get_worker_tags(&self, id: i32) -> Result> { + let tags = self + .get(Cow::Owned(format!("api/v1/workers/{id}/tags"))) + .send() + .await? + .error_for_status()? + .json() + .await?; + + Ok(tags) + } + + async fn set_worker_tags(&self, id: i32, tags: Vec) -> Result<()> { + self.put(Cow::Owned(format!("api/v1/workers/{id}/tags"))) + .json(&tags) + .send_encoded() + .await? + .error_for_status()?; + + Ok(()) + } + + async fn create_worker_tag(&self, id: i32, tag: String) -> Result<()> { + self.put(Cow::Owned(format!("api/v1/workers/{id}/tags/{tag}"))) + .send() + .await? + .error_for_status()?; + + Ok(()) + } + + async fn delete_worker_tag(&self, id: i32, tag: String) -> Result<()> { + self.delete(Cow::Owned(format!("api/v1/workers/{id}/tags/{tag}"))) + .send() + .await? + .error_for_status()?; + + Ok(()) + } +} + +#[async_trait] +impl TagRestApi for Client { + async fn get_tags(&self) -> Result> { + let records = self + .get(Cow::Borrowed("api/v1/tags")) + .send() + .await? + .error_for_status()? + .json() + .await?; + + Ok(records) + } + + async fn create_tag(&self, request: CreateTagRequest) -> Result { + let record = self + .post(Cow::Borrowed("api/v1/tags")) + .json(&request) + .send_encoded() + .await? + .error_for_status()? + .json() + .await?; + + Ok(record) + } + + async fn delete_tag(&self, tag: String) -> Result<()> { + self.delete(Cow::Owned(format!("api/v1/tags/{tag}"))) + .send() + .await? + .error_for_status()?; + + Ok(()) + } + + async fn get_tag_rules(&self, tag: String) -> Result> { + let records = self + .get(Cow::Owned(format!("api/v1/tags/{tag}"))) + .send() + .await? + .error_for_status()? + .json() + .await?; + + Ok(records) + } + + async fn create_tag_rule(&self, tag: String, request: CreateTagRuleRequest) -> Result { + let record = self + .post(Cow::Owned(format!("api/v1/tags/{tag}"))) + .json(&request) + .send_encoded() + .await? + .error_for_status()? + .json() + .await?; + + Ok(record) + } + + async fn delete_tag_rule(&self, tag: String, tag_rule_id: i32) -> Result<()> { + self.delete(Cow::Owned(format!("api/v1/tags/{tag}/{tag_rule_id}"))) + .send() + .await? + .error_for_status()?; + + Ok(()) + } } diff --git a/common/src/api/v1/models/mod.rs b/common/src/api/v1/models/mod.rs index a03ecf6..de59e64 100644 --- a/common/src/api/v1/models/mod.rs +++ b/common/src/api/v1/models/mod.rs @@ -3,6 +3,7 @@ mod dashboard; mod meta; mod package; mod queue; +mod tag; mod worker; pub use build::*; @@ -11,6 +12,7 @@ pub use meta::*; pub use package::*; pub use queue::*; use serde::{Deserialize, Serialize}; +pub use tag::*; pub use worker::*; #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/common/src/api/v1/models/tag.rs b/common/src/api/v1/models/tag.rs new file mode 100644 index 0000000..a50ea54 --- /dev/null +++ b/common/src/api/v1/models/tag.rs @@ -0,0 +1,23 @@ +#[cfg(feature = "diesel")] +use diesel::Queryable; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateTagRequest { + pub tag: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateTagRuleRequest { + pub name_pattern: String, + pub version_pattern: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +#[cfg_attr(feature = "diesel", derive(Queryable))] +#[cfg_attr(feature = "diesel", diesel(check_for_backend(diesel::sqlite::Sqlite)))] +pub struct TagRule { + pub id: i32, + pub name_pattern: String, + pub version_pattern: Option, +} diff --git a/daemon/migrations/2025-11-01-202819_worker-tags/down.sql b/daemon/migrations/2025-11-01-202819_worker-tags/down.sql new file mode 100644 index 0000000..d3bbf7b --- /dev/null +++ b/daemon/migrations/2025-11-01-202819_worker-tags/down.sql @@ -0,0 +1,3 @@ +DROP TABLE tag_rules; +DROP TABLE worker_tags; +DROP TABLE tags; diff --git a/daemon/migrations/2025-11-01-202819_worker-tags/up.sql b/daemon/migrations/2025-11-01-202819_worker-tags/up.sql new file mode 100644 index 0000000..570331d --- /dev/null +++ b/daemon/migrations/2025-11-01-202819_worker-tags/up.sql @@ -0,0 +1,32 @@ +CREATE TABLE tags +( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + tag TEXT NOT NULL +); + +CREATE UNIQUE INDEX tags_unique_idx ON tags (tag); + +CREATE TABLE worker_tags +( + worker_id INTEGER NOT NULL REFERENCES workers ON DELETE CASCADE, + tag_id INTEGER NOT NULL REFERENCES tags ON DELETE CASCADE, + + PRIMARY KEY (worker_id, tag_id) +); + +CREATE INDEX worker_tags_worker_id_idx ON worker_tags (worker_id); +CREATE INDEX worker_tags_tag_id_idx ON worker_tags (tag_id); + +CREATE TABLE tag_rules +( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + tag_id INTEGER NOT NULL REFERENCES tags ON DELETE CASCADE, + name_pattern TEXT NOT NULL, + version_pattern TEXT +); + +CREATE UNIQUE INDEX tag_rules_unique_idx ON tag_rules (tag_id, + name_pattern, + COALESCE(version_pattern, 'PLACEHOLDER')); + +CREATE INDEX tag_rules_tag_id_idx ON tag_rules (tag_id); \ No newline at end of file diff --git a/daemon/src/api/v1/mod.rs b/daemon/src/api/v1/mod.rs index e57d2db..6447eb2 100644 --- a/daemon/src/api/v1/mod.rs +++ b/daemon/src/api/v1/mod.rs @@ -3,6 +3,7 @@ mod dashboard; mod meta; mod package; mod queue; +mod tag; mod util; mod worker; @@ -11,4 +12,5 @@ pub use dashboard::*; pub use meta::*; pub use package::*; pub use queue::*; +pub use tag::*; pub use worker::*; diff --git a/daemon/src/api/v1/queue.rs b/daemon/src/api/v1/queue.rs index f8bfc55..b573712 100644 --- a/daemon/src/api/v1/queue.rs +++ b/daemon/src/api/v1/queue.rs @@ -4,12 +4,18 @@ use crate::api::v1::util::pagination::PaginateDsl; use crate::config::Config; use crate::db::Pool; use crate::models::NewQueued; -use crate::schema::{binary_packages, build_inputs, queue, rebuilds, source_packages, workers}; +use crate::schema::{ + binary_packages, build_inputs, queue, rebuilds, source_packages, tag_rules, worker_tags, + workers, +}; use crate::web; use actix_web::{HttpRequest, HttpResponse, Responder, delete, get, post}; use chrono::{Duration, NaiveDateTime, Utc}; use diesel::dsl::update; -use diesel::{BoolExpressionMethods, JoinOnDsl}; +use diesel::sql_types::{Nullable, Text}; +use diesel::{ + BoolExpressionMethods, JoinOnDsl, QueryResult, SqliteConnection, TextExpressionMethods, +}; use diesel::{Connection, OptionalExtension, QueryDsl, RunQueryDsl}; use diesel::{ExpressionMethods, SqliteExpressionMethods, define_sql_function}; use rebuilderd_common::api::v1::{ @@ -17,7 +23,7 @@ use rebuilderd_common::api::v1::{ QueueJobRequest, QueuedJob, QueuedJobArtifact, QueuedJobWithArtifacts, ResultPage, }; use rebuilderd_common::config::PING_DEADLINE; -use rebuilderd_common::errors::Error; +use rebuilderd_common::errors::*; use std::collections::HashSet; #[diesel::dsl::auto_type] @@ -298,23 +304,87 @@ define_sql_function! { fn sqlite_random() -> Integer } -#[post("/pop")] -pub async fn request_work( - req: HttpRequest, - cfg: web::Data, - pool: web::Data, - request: web::Json, -) -> web::Result { - let mut connection = pool.get().map_err(Error::from)?; +define_sql_function! { + #[sql_name = "COALESCE"] + fn sqlite_coalesce(x: Nullable, y: Text) -> Text +} - let check_worker = auth::worker(&cfg, &req, connection.as_mut()); - if check_worker.is_err() { - return Ok(HttpResponse::Forbidden().finish()); - } +/// Gets a queued job eligible for rebuilding by the worker. Eligibility is determined by the worker's supported +/// architectures, backends, and tags (if any). +fn get_eligible_job( + connection: &mut SqliteConnection, + worker_id: i32, + supported_architectures: &Vec, + supported_backends: &Vec, +) -> QueryResult> { + let worker_tag_ids = worker_tags::table + .filter(worker_tags::worker_id.eq(worker_id)) + .select(worker_tags::tag_id) + .get_results::(connection)?; + + let mut base_query = queue_base() + .left_join(tag_rules::table.on( + source_packages::name.like(tag_rules::name_pattern).and( + source_packages::version.like(sqlite_coalesce(tag_rules::version_pattern, "%")), + ), + )) + .into_boxed(); - let worker = check_worker?; + base_query = if !worker_tag_ids.is_empty() { + // worker has tags - only offer tagged work applicable to us + base_query.filter(tag_rules::id.eq_any(worker_tag_ids)) + } else { + // worker has no tags - only offer untagged work + base_query.filter(tag_rules::id.is_null()) + }; - // clear any stale jobs before we consider available jobs in the queue + // TODO: this can produce duplicates if multiple rules match a package. It's not a breaking issue, but it does make + // those packages slightly more likely to be picked. Unfortunately, diesel doesn't let us combine into_boxed and + // group_by, which would be the easy solution. + let maybe_job = base_query + .filter(queue::worker.is_null()) + .filter( + build_inputs::next_retry + .is_null() + .or(build_inputs::next_retry.le(diesel::dsl::now)), + ) + .filter(build_inputs::architecture.eq_any(supported_architectures)) + .filter(build_inputs::backend.eq_any(supported_backends)) + .order_by(( + queue::priority, + diesel::dsl::date(queue::queued_at), + sqlite_random(), + )) + .first::(connection) + .optional()?; + + if let Some(job) = maybe_job { + let artifacts = queue::table + .filter(queue::id.is(job.id)) + .inner_join( + binary_packages::table + .on(queue::build_input_id.is(binary_packages::build_input_id)), + ) + .select(( + binary_packages::name, + binary_packages::version, + binary_packages::architecture, + binary_packages::artifact_url, + )) + .get_results::(connection)?; + + Ok(Some(QueuedJobWithArtifacts { job, artifacts })) + } else { + // TODO: offer untagged work to a worker with tags if no tagged work was available? Solve by adding a catch-all + // rule to workers which should also accept untagged work? + Ok(None) + } +} + +/// Reclaims stale jobs, making them available to workers again. A job is considered stale if it hasn't been pinged +/// within the [PING_DEADLINE]. Reclamation is done by removing the worker assignment of the job and resetting the start +/// and ping times. +fn reclaim_stale_jobs(connection: &mut SqliteConnection) -> QueryResult<()> { let now = Utc::now(); let then = now - Duration::seconds(PING_DEADLINE); @@ -330,8 +400,29 @@ pub async fn request_work( queue::started_at.eq(None::), queue::last_ping.eq(None::), )) - .execute(connection.as_mut()) - .map_err(Error::from)?; + .execute(connection)?; + + Ok(()) +} + +#[post("/pop")] +pub async fn request_work( + req: HttpRequest, + cfg: web::Data, + pool: web::Data, + request: web::Json, +) -> web::Result { + let mut connection = pool.get().map_err(Error::from)?; + + let check_worker = auth::worker(&cfg, &req, connection.as_mut()); + if check_worker.is_err() { + return Ok(HttpResponse::Forbidden().finish()); + } + + let worker = check_worker?; + + // clear any stale jobs before we consider available jobs in the queue + reclaim_stale_jobs(connection.as_mut()).map_err(Error::from)?; // see if we can dig up any available work for this worker let pop_request = request.into_inner(); @@ -339,51 +430,23 @@ pub async fn request_work( if let Some(record) = connection.transaction::, _, _>(|conn| { - if let Some(record) = queue_base() - .filter(queue::worker.is_null()) - .filter( - build_inputs::next_retry - .is_null() - .or(build_inputs::next_retry.le(diesel::dsl::now)), - ) - .filter(build_inputs::architecture.eq_any(supported_architectures)) - .filter(build_inputs::backend.eq_any(pop_request.supported_backends)) - .order_by(( - queue::priority, - diesel::dsl::date(queue::queued_at), - sqlite_random(), - )) - .first::(conn) - .optional() - .map_err(Error::from)? - { - let artifacts = queue::table - .filter(queue::id.is(record.id)) - .inner_join( - binary_packages::table - .on(queue::build_input_id.is(binary_packages::build_input_id)), - ) - .select(( - binary_packages::name, - binary_packages::version, - binary_packages::architecture, - binary_packages::artifact_url, - )) - .get_results::(conn) - .map_err(Error::from)?; - + if let Some(record) = get_eligible_job( + conn.as_mut(), + worker.id, + &supported_architectures, + &pop_request.supported_backends, + )? { let now = Utc::now().naive_utc(); - let status = format!("working hard on {} {}", record.name, record.version); + let status = format!("working hard on {} {}", record.job.name, record.job.version); diesel::update(queue::table) - .filter(queue::id.is(record.id)) + .filter(queue::id.is(record.job.id)) .set(( queue::started_at.eq(now), queue::worker.eq(worker.id), queue::last_ping.eq(now), )) - .execute(conn) - .map_err(Error::from)?; + .execute(conn)?; diesel::update(workers::table) .filter(workers::id.is(worker.id)) @@ -392,13 +455,9 @@ pub async fn request_work( workers::last_ping.eq(now), workers::status.eq(status), )) - .execute(conn) - .map_err(Error::from)?; + .execute(conn)?; - Ok::, Error>(Some(QueuedJobWithArtifacts { - job: record, - artifacts, - })) + Ok::, Error>(Some(record)) } else { Ok(None) } diff --git a/daemon/src/api/v1/tag.rs b/daemon/src/api/v1/tag.rs new file mode 100644 index 0000000..16708b5 --- /dev/null +++ b/daemon/src/api/v1/tag.rs @@ -0,0 +1,146 @@ +use crate::api::v1::util::auth; +use crate::config::Config; +use crate::db::Pool; +use crate::models::{NewSourcePackageTagRule, NewTag}; +use crate::schema::{tag_rules, tags}; +use crate::web; +use actix_web::{delete, get, post, HttpRequest, HttpResponse, Responder}; +use diesel::{delete, ExpressionMethods}; +use diesel::{QueryDsl, RunQueryDsl}; +use rebuilderd_common::api::v1::{CreateTagRequest, CreateTagRuleRequest, TagRule}; +use rebuilderd_common::errors::Error; + +#[get("")] +pub async fn get_tags(pool: web::Data) -> web::Result { + let mut connection = pool.get().map_err(Error::from)?; + + let tags = tags::table + .select(tags::tag) + .get_results::(connection.as_mut()) + .map_err(Error::from)?; + + Ok(HttpResponse::Ok().json(tags)) +} + +#[post("")] +pub async fn create_tag( + req: HttpRequest, + cfg: web::Data, + pool: web::Data, + request: web::Json, +) -> web::Result { + if auth::admin(&cfg, &req).is_err() { + return Ok(HttpResponse::Forbidden().finish()); + } + + let mut connection = pool.get().map_err(Error::from)?; + + let tag = NewTag { + tag: request.tag.clone(), + } + .ensure_exists(connection.as_mut())?; + + Ok(HttpResponse::Ok().json(tag.tag)) +} + +#[delete("/{tag}")] +pub async fn delete_tag( + req: HttpRequest, + cfg: web::Data, + pool: web::Data, + tag: web::Path, +) -> web::Result { + if auth::admin(&cfg, &req).is_err() { + return Ok(HttpResponse::Forbidden().finish()); + } + + let mut connection = pool.get().map_err(Error::from)?; + + delete(tags::table.filter(tags::tag.eq(tag.into_inner()))) + .execute(connection.as_mut()) + .map_err(Error::from)?; + + Ok(HttpResponse::NoContent().finish()) +} + +#[get("/{tag}")] +pub async fn get_tag_rules( + pool: web::Data, + tag: web::Path, +) -> web::Result { + let mut connection = pool.get().map_err(Error::from)?; + + let tag_rules = tags::table + .inner_join(tag_rules::table) + .filter(tags::tag.eq(tag.into_inner())) + .select(( + tag_rules::id, + tag_rules::name_pattern, + tag_rules::version_pattern, + )) + .get_results::(connection.as_mut()) + .map_err(Error::from)?; + + Ok(HttpResponse::Ok().json(tag_rules)) +} + +#[post("/{tag}")] +pub async fn create_tag_rule( + req: HttpRequest, + cfg: web::Data, + pool: web::Data, + tag: web::Path, + request: web::Json, +) -> web::Result { + if auth::admin(&cfg, &req).is_err() { + return Ok(HttpResponse::Forbidden().finish()); + } + + let mut connection = pool.get().map_err(Error::from)?; + + let tag_id = tags::table + .filter(tags::tag.eq(tag.into_inner())) + .select(tags::id) + .get_result::(connection.as_mut()) + .map_err(Error::from)?; + + let tag_rule = NewSourcePackageTagRule { + tag_id, + name_pattern: request.name_pattern.clone(), + version_pattern: request.version_pattern.clone(), + } + .ensure_exists(connection.as_mut())?; + + Ok(HttpResponse::Ok().json(tag_rule)) +} + +#[delete("/{tag}/{id}")] +pub async fn delete_tag_rule( + req: HttpRequest, + cfg: web::Data, + pool: web::Data, + parameters: web::Path<(String, i32)>, +) -> web::Result { + if auth::admin(&cfg, &req).is_err() { + return Ok(HttpResponse::Forbidden().finish()); + } + + let mut connection = pool.get().map_err(Error::from)?; + let (tag, tag_rule_id) = parameters.into_inner(); + + let tag_id = tags::table + .filter(tags::tag.eq(tag)) + .select(tags::id) + .get_result::(connection.as_mut()) + .map_err(Error::from)?; + + delete( + tag_rules::table + .filter(tag_rules::id.eq(tag_rule_id)) + .filter(tag_rules::tag_id.eq(tag_id)), + ) + .execute(connection.as_mut()) + .map_err(Error::from)?; + + Ok(HttpResponse::NoContent().finish()) +} diff --git a/daemon/src/api/v1/worker.rs b/daemon/src/api/v1/worker.rs index fc9cb6a..939ba0f 100644 --- a/daemon/src/api/v1/worker.rs +++ b/daemon/src/api/v1/worker.rs @@ -3,11 +3,13 @@ use crate::api::v1::util::auth; use crate::api::v1::util::pagination::PaginateDsl; use crate::config::Config; use crate::db::Pool; -use crate::models::NewWorker; -use crate::schema::workers; +use crate::models::{NewTag, NewWorker, NewWorkerTag, Tag, WorkerTag}; +use crate::schema::{tags, worker_tags, workers}; use crate::web; -use actix_web::{HttpRequest, HttpResponse, Responder, delete, get, post}; +use actix_web::{HttpRequest, HttpResponse, Responder, delete, get, post, put}; use chrono::Utc; +use diesel::dsl::{delete, exists, select}; +use diesel::ExpressionMethods; use diesel::{Connection, OptionalExtension, QueryDsl, RunQueryDsl, SqliteExpressionMethods}; use rebuilderd_common::api::WORKER_KEY_HEADER; use rebuilderd_common::api::v1::{Page, RegisterWorkerRequest, ResultPage}; @@ -122,3 +124,163 @@ pub async fn unregister_worker( Ok(HttpResponse::NoContent().finish()) } + +#[get("/{id}/tags")] +pub async fn get_worker_tags( + req: HttpRequest, + cfg: web::Data, + pool: web::Data, + id: web::Path, +) -> web::Result { + if auth::admin(&cfg, &req).is_err() { + return Ok(HttpResponse::Forbidden().finish()); + } + + let mut connection = pool.get().map_err(Error::from)?; + + let worker_exists = select(exists(workers::table.filter(workers::id.eq(*id)))) + .get_result::(connection.as_mut()) + .map_err(Error::from)?; + + if !worker_exists { + return Ok(HttpResponse::NotFound().finish()); + } + + let tags = worker_tags::table + .inner_join(tags::table) + .filter(worker_tags::worker_id.eq(id.into_inner())) + .select(tags::tag) + .get_results::(connection.as_mut()) + .map_err(Error::from)?; + + Ok(HttpResponse::Ok().json(tags)) +} + +#[put("/{id}/tags")] +pub async fn set_worker_tags( + req: HttpRequest, + cfg: web::Data, + pool: web::Data, + id: web::Path, + tags: web::Json>, +) -> web::Result { + if auth::admin(&cfg, &req).is_err() { + return Ok(HttpResponse::Forbidden().finish()); + } + + let mut connection = pool.get().map_err(Error::from)?; + + let worker_exists = select(exists(workers::table.filter(workers::id.eq(*id)))) + .get_result::(connection.as_mut()) + .map_err(Error::from)?; + + if !worker_exists { + return Ok(HttpResponse::NotFound().finish()); + } + + let tags = tags + .into_inner() + .into_iter() + .map(|v| NewTag { tag: v }.ensure_exists(connection.as_mut())) + .collect::, _>>()?; + + connection.transaction(|conn| { + // drop all existing tag associations + delete(worker_tags::table.filter(worker_tags::worker_id.eq(*id))).execute(conn)?; + + // create new tag associations for the input set + tags.into_iter() + .map(|t| { + NewWorkerTag { + worker_id: *id, + tag_id: t.id, + } + .ensure_exists(conn.as_mut()) + }) + .collect::, _>>()?; + + Ok::<(), Error>(()) + })?; + + Ok(HttpResponse::NoContent().finish()) +} + +#[put("/{id}/tags/{tag}")] +pub async fn create_worker_tag( + req: HttpRequest, + cfg: web::Data, + pool: web::Data, + parameters: web::Path<(i32, String)>, +) -> web::Result { + if auth::admin(&cfg, &req).is_err() { + return Ok(HttpResponse::Forbidden().finish()); + } + + let (worker_id, tag_name) = parameters.into_inner(); + let mut connection = pool.get().map_err(Error::from)?; + + let worker_exists = select(exists(workers::table.filter(workers::id.eq(worker_id)))) + .get_result::(connection.as_mut()) + .map_err(Error::from)?; + + if !worker_exists { + return Ok(HttpResponse::NotFound().finish()); + } + + let tag = NewTag { tag: tag_name }.ensure_exists(connection.as_mut())?; + NewWorkerTag { + worker_id, + tag_id: tag.id, + } + .ensure_exists(connection.as_mut())?; + + Ok(HttpResponse::NoContent().finish()) +} + +#[delete("/{id}/tags/{tag}")] +pub async fn delete_worker_tag( + req: HttpRequest, + cfg: web::Data, + pool: web::Data, + parameters: web::Path<(i32, String)>, +) -> web::Result { + if auth::admin(&cfg, &req).is_err() { + return Ok(HttpResponse::Forbidden().finish()); + } + + let (worker_id, tag_name) = parameters.into_inner(); + let mut connection = pool.get().map_err(Error::from)?; + + let worker_exists = select(exists(workers::table.filter(workers::id.eq(worker_id)))) + .get_result::(connection.as_mut()) + .map_err(Error::from)?; + + if !worker_exists { + return Ok(HttpResponse::NotFound().finish()); + } + + let tag_id = tags::table + .filter(tags::tag.eq(tag_name)) + .select(tags::id) + .get_result::(connection.as_mut()) + .optional() + .map_err(Error::from)?; + + if tag_id.is_none() { + return Ok(HttpResponse::NotFound().finish()); + } + + connection.transaction(|conn| { + // remove the association between this worker and the tag + delete( + worker_tags::table + .filter(worker_tags::worker_id.eq(worker_id)) + .filter(worker_tags::tag_id.eq(tag_id.unwrap())), + ) + .execute(conn)?; + + Ok::<(), Error>(()) + })?; + + Ok(HttpResponse::NoContent().finish()) +} diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 5f06789..e90509f 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -104,7 +104,20 @@ pub async fn run_config(pool: db::Pool, config: Config, privkey: PrivateKey) -> .service(api::v1::get_workers) .service(api::v1::register_worker) .service(api::v1::get_worker) - .service(api::v1::unregister_worker), + .service(api::v1::unregister_worker) + .service(api::v1::get_worker_tags) + .service(api::v1::set_worker_tags) + .service(api::v1::create_worker_tag) + .service(api::v1::delete_worker_tag), + ) + .service( + scope("/tags") + .service(api::v1::get_tags) + .service(api::v1::create_tag) + .service(api::v1::delete_tag) + .service(api::v1::get_tag_rules) + .service(api::v1::create_tag_rule) + .service(api::v1::delete_tag_rule), ), ), ) diff --git a/daemon/src/models/mod.rs b/daemon/src/models/mod.rs index 64d0cb1..5791e74 100644 --- a/daemon/src/models/mod.rs +++ b/daemon/src/models/mod.rs @@ -12,3 +12,4 @@ import_models!(build_input); import_models!(source_package); import_models!(worker); import_models!(queue); +import_models!(tags); diff --git a/daemon/src/models/tags.rs b/daemon/src/models/tags.rs new file mode 100644 index 0000000..b2bf9f5 --- /dev/null +++ b/daemon/src/models/tags.rs @@ -0,0 +1,137 @@ +use crate::schema::*; +use diesel::{ + AsChangeset, Identifiable, Insertable, OptionalExtension, Queryable, RunQueryDsl, Selectable, + SelectableHelper, SqliteConnection, SqliteExpressionMethods, +}; +use diesel::{ExpressionMethods, QueryDsl}; +use rebuilderd_common::errors::*; +use serde::{Deserialize, Serialize}; + +#[derive(Identifiable, Queryable, AsChangeset, Selectable, Serialize, PartialEq, Eq, Debug)] +#[diesel(check_for_backend(diesel::sqlite::Sqlite))] +#[diesel(treat_none_as_null = true)] +#[diesel(table_name = tags)] +pub struct Tag { + pub id: i32, + pub tag: String, +} + +#[derive(Insertable, Serialize, Deserialize, Debug)] +#[diesel(treat_none_as_null = true)] +#[diesel(table_name = tags)] +pub struct NewTag { + pub tag: String, +} + +impl NewTag { + pub fn ensure_exists(self, connection: &mut SqliteConnection) -> Result { + use crate::schema::tags::*; + + let inserted = diesel::insert_into(table) + .values(&self) + .on_conflict_do_nothing() + .returning(Tag::as_select()) + .get_result(connection) + .optional()?; + + if let Some(inserted) = inserted { + return Ok(inserted); + } + + let existing = table + .filter(tag.eq(self.tag)) + .select(Tag::as_select()) + .get_result(connection)?; + + Ok(existing) + } +} + +#[derive(Identifiable, Queryable, Selectable, Serialize, PartialEq, Eq, Debug)] +#[diesel(primary_key(worker_id, tag_id))] +#[diesel(check_for_backend(diesel::sqlite::Sqlite))] +#[diesel(treat_none_as_null = true)] +#[diesel(table_name = worker_tags)] +pub struct WorkerTag { + pub worker_id: i32, + pub tag_id: i32, +} + +#[derive(Insertable, Serialize, Deserialize, Debug)] +#[diesel(treat_none_as_null = true)] +#[diesel(table_name = worker_tags)] +pub struct NewWorkerTag { + pub worker_id: i32, + pub tag_id: i32, +} + +impl NewWorkerTag { + pub fn ensure_exists(self, connection: &mut SqliteConnection) -> Result { + use crate::schema::worker_tags::*; + + let inserted = diesel::insert_into(table) + .values(&self) + .on_conflict_do_nothing() + .returning(WorkerTag::as_select()) + .get_result(connection) + .optional()?; + + if let Some(inserted) = inserted { + return Ok(inserted); + } + + let existing = table + .filter(worker_id.eq(self.worker_id)) + .filter(tag_id.eq(self.tag_id)) + .select(WorkerTag::as_select()) + .get_result(connection)?; + + Ok(existing) + } +} + +#[derive(Identifiable, Queryable, AsChangeset, Selectable, Serialize, PartialEq, Eq, Debug)] +#[diesel(check_for_backend(diesel::sqlite::Sqlite))] +#[diesel(treat_none_as_null = true)] +#[diesel(table_name = tag_rules)] +pub struct SourcePackageTagRule { + pub id: i32, + pub tag_id: i32, + pub name_pattern: String, + pub version_pattern: Option, +} + +#[derive(Insertable, Serialize, Deserialize, Debug)] +#[diesel(treat_none_as_null = true)] +#[diesel(table_name = tag_rules)] +pub struct NewSourcePackageTagRule { + pub tag_id: i32, + pub name_pattern: String, + pub version_pattern: Option, +} + +impl NewSourcePackageTagRule { + pub fn ensure_exists(self, connection: &mut SqliteConnection) -> Result { + use crate::schema::tag_rules::*; + + let inserted = diesel::insert_into(table) + .values(&self) + .on_conflict_do_nothing() + .returning(SourcePackageTagRule::as_select()) + .get_result(connection) + .optional()?; + + if let Some(inserted) = inserted { + return Ok(inserted); + } + + let existing = table + .filter(tag_id.eq(self.tag_id)) + .filter(name_pattern.eq(self.name_pattern)) + .filter(version_pattern.is(self.version_pattern)) + .select(SourcePackageTagRule::as_select()) + .get_result(connection)?; + + Ok(existing) + } +} diff --git a/daemon/src/schema.rs b/daemon/src/schema.rs index 0e88516..5159b03 100644 --- a/daemon/src/schema.rs +++ b/daemon/src/schema.rs @@ -79,6 +79,15 @@ diesel::table! { } } +diesel::table! { + tag_rules (id) { + id -> Integer, + tag_id -> Integer, + name_pattern -> Text, + version_pattern -> Nullable, + } +} + diesel::table! { source_packages (id) { id -> Integer, @@ -88,7 +97,21 @@ diesel::table! { release -> Nullable, component -> Nullable, last_seen -> Timestamp, - seen_in_last_sync -> Bool + seen_in_last_sync -> Bool, + } +} + +diesel::table! { + tags (id) { + id -> Integer, + tag -> Text, + } +} + +diesel::table! { + worker_tags (worker_id, tag_id) { + worker_id -> Integer, + tag_id -> Integer, } } @@ -113,6 +136,9 @@ diesel::joinable!(rebuild_artifacts -> diffoscope_logs (diffoscope_log_id)); diesel::joinable!(rebuild_artifacts -> rebuilds (rebuild_id)); diesel::joinable!(rebuilds -> build_inputs (build_input_id)); diesel::joinable!(rebuilds -> build_logs (build_log_id)); +diesel::joinable!(tag_rules -> tags (tag_id)); +diesel::joinable!(worker_tags -> tags (tag_id)); +diesel::joinable!(worker_tags -> workers (worker_id)); diesel::allow_tables_to_appear_in_same_query!( attestation_logs, @@ -123,6 +149,9 @@ diesel::allow_tables_to_appear_in_same_query!( queue, rebuild_artifacts, rebuilds, + tag_rules, source_packages, + tags, + worker_tags, workers, ); diff --git a/tools/src/args.rs b/tools/src/args.rs index ff7346a..8e18def 100644 --- a/tools/src/args.rs +++ b/tools/src/args.rs @@ -35,6 +35,12 @@ pub enum SubCommand { /// Queue related subcommands #[command(subcommand)] Queue(Queue), + /// Worker related subcommands + #[command(subcommand)] + Worker(WorkerCommand), + /// Tag related subcommands + #[command(subcommand)] + Tag(TagCommand), /// Generate shell completions Completions(Completions), } @@ -201,6 +207,116 @@ pub struct QueueDrop { pub version: Option, } +#[derive(Debug, Parser)] +pub enum WorkerCommand { + /// Operate on worker tags + #[command(subcommand)] + Tag(WorkerTagCommand), +} + +#[derive(Debug, Parser)] +pub enum WorkerTagCommand { + /// List tags attached to a worker + List(WorkerTarget), + + /// Set the tags attached to a worker + Set(WorkerSetTags), + + /// Add a single tag to a worker + Add(WorkerTagTarget), + + /// Remove a single tag from a worker + Remove(WorkerTagTarget), +} + +#[derive(Debug, Parser)] +pub struct WorkerTarget { + /// The name of the worker + pub name: String, +} + +#[derive(Debug, Parser)] +pub struct WorkerTagTarget { + /// The name of the worker + pub name: String, + + /// The name of the tag + pub tag: String, +} + +#[derive(Debug, Parser)] +pub struct WorkerSetTags { + /// The name of the worker + pub name: String, + + /// The tags + pub tags: Vec, +} + +#[derive(Debug, Parser)] +pub enum TagCommand { + /// List registered tags + List, + + /// Create a new tag + Create(TagTarget), + + /// Delete a tag + Delete(TagTarget), + + /// Operate on tag rules + #[command(subcommand)] + Rule(TagRuleCommand), +} + +#[derive(Debug, Parser)] +pub enum TagRuleCommand { + /// List rules for a tag + List(OptionalTagTarget), + + /// Create a new tag rule + Create(CreateTagRuleCommand), + + /// Delete a tag rule + Delete(TagRuleTarget), +} + +#[derive(Debug, Parser)] +pub struct TagTarget { + /// The name of the tag + pub tag: String, +} + +#[derive(Debug, Parser)] +pub struct OptionalTagTarget { + /// The name of the tag + pub tag: Option, +} + +#[derive(Debug, Parser)] +pub struct CreateTagRuleCommand { + /// The name of the tag + pub tag: String, + + /// The pattern to match source package names against. Must be compatible with SQLite's LIKE syntax. + #[arg(long)] + pub name_pattern: String, + + /// The pattern to match source package versions against. Must be compatible with SQLite's LIKE syntax. Any version + /// will match if this pattern is omitted. + #[arg(long, default_missing_value(None))] + pub version_pattern: Option, +} + +#[derive(Debug, Parser)] +pub struct TagRuleTarget { + /// The name of the tag + pub tag: String, + + /// The ID of the rule + pub rule_id: i32, +} + #[derive(Debug, Parser)] pub struct Completions { pub shell: Shell, diff --git a/tools/src/main.rs b/tools/src/main.rs index 854a78f..1a8eb46 100644 --- a/tools/src/main.rs +++ b/tools/src/main.rs @@ -9,8 +9,9 @@ use glob::Pattern; use nom::AsBytes; use rebuilderd_common::api::Client; use rebuilderd_common::api::v1::{ - ArtifactStatus, BinaryPackage, BuildRestApi, IdentityFilter, OriginFilter, PackageReport, - PackageRestApi, Page, Priority, QueueJobRequest, QueueRestApi, WorkerRestApi, + ArtifactStatus, BinaryPackage, BuildRestApi, CreateTagRequest, CreateTagRuleRequest, + IdentityFilter, OriginFilter, PackageReport, PackageRestApi, Page, Priority, QueueJobRequest, + QueueRestApi, TagRestApi, Worker, WorkerRestApi, }; use rebuilderd_common::errors::*; use rebuilderd_common::http; @@ -127,6 +128,33 @@ async fn lookup_package(client: &Client, filter: PkgsFilter) -> Result Result> { + let mut page = Page { + limit: None, + before: None, + after: None, + sort: Some("id".to_string()), + direction: None, + }; + + loop { + let results = authenticated_client.get_workers(Some(&page)).await?; + + if let Some(last) = results.records.last() { + page.after = Some(last.id); + + let worker = results.records.into_iter().find(|w| w.name.eq(&name)); + + if let Some(worker) = worker { + return Ok(Some(worker)); + } + } else { + // out of results + return Ok(None); + } + } +} + #[tokio::main] async fn main() -> Result<()> { let args = Args::parse(); @@ -152,7 +180,18 @@ async fn main() -> Result<()> { SubCommand::Status => { let mut stdout = io::stdout(); for worker in client.with_auth_cookie()?.get_workers(None).await?.records { - let label = format!("{} ({})", worker.name.green(), worker.address.yellow()); + let tags = client.get_worker_tags(worker.id).await?; + let label = if tags.is_empty() { + format!("{} ({})", worker.name.green(), worker.address.yellow()) + } else { + format!( + "{} [{}] ({})", + worker.name.green(), + tags.join(", ").cyan(), + worker.address.yellow(), + ) + }; + let status = if let Some(status) = worker.status { format!("{:?}", status).bold() } else { @@ -438,6 +477,164 @@ async fn main() -> Result<()> { .drop_queued_jobs(Some(&origin_filter), Some(&identity_filter)) .await?; } + SubCommand::Worker(WorkerCommand::Tag(WorkerTagCommand::List(worker_target))) => { + let authenticated_client = client.with_auth_cookie()?; + + if let Some(worker) = + find_worker(authenticated_client, worker_target.name.clone()).await? + { + let tags = authenticated_client.get_worker_tags(worker.id).await?; + + if tags.is_empty() { + writeln!(io::stdout(), "{} has no tags", worker_target.name.green())?; + } else { + writeln!(io::stdout(), "{}", tags.join("\n").cyan())?; + } + } else { + writeln!( + io::stderr(), + "Worker {} not found", + worker_target.name.green() + )?; + } + } + SubCommand::Worker(WorkerCommand::Tag(WorkerTagCommand::Set(worker_target))) => { + let authenticated_client = client.with_auth_cookie()?; + + if let Some(worker) = + find_worker(authenticated_client, worker_target.name.clone()).await? + { + authenticated_client + .set_worker_tags(worker.id, worker_target.tags.clone()) + .await?; + + writeln!( + io::stdout(), + "{}'s tags have been set to {}", + worker_target.name.green(), + worker_target.tags.join(", ").cyan() + )?; + } else { + writeln!( + io::stderr(), + "Worker {} not found", + worker_target.name.green() + )?; + } + } + SubCommand::Worker(WorkerCommand::Tag(WorkerTagCommand::Add(tag_target))) => { + let authenticated_client = client.with_auth_cookie()?; + + if let Some(worker) = find_worker(authenticated_client, tag_target.name.clone()).await? + { + authenticated_client + .create_worker_tag(worker.id, tag_target.tag.clone()) + .await?; + + writeln!( + io::stdout(), + "{} added to {}", + tag_target.tag.cyan(), + tag_target.name.green() + )?; + } else { + writeln!(io::stderr(), "Worker {} not found", tag_target.name.green())?; + } + } + SubCommand::Worker(WorkerCommand::Tag(WorkerTagCommand::Remove(tag_target))) => { + let authenticated_client = client.with_auth_cookie()?; + + if let Some(worker) = find_worker(authenticated_client, tag_target.name.clone()).await? + { + authenticated_client + .delete_worker_tag(worker.id, tag_target.tag.clone()) + .await?; + + writeln!( + io::stdout(), + "{} removed from {}", + tag_target.tag.cyan(), + tag_target.name.green() + )?; + } else { + writeln!(io::stderr(), "Worker {} not found", tag_target.name.green())?; + } + } + SubCommand::Tag(TagCommand::List) => { + let tags = client.get_tags().await?; + writeln!(io::stdout(), "{}", tags.join("\n").cyan())?; + } + SubCommand::Tag(TagCommand::Create(tag_target)) => { + let authenticated_client = client.with_auth_cookie()?; + + authenticated_client + .create_tag(CreateTagRequest { + tag: tag_target.tag.clone(), + }) + .await?; + + writeln!(io::stdout(), "Tag {} created", tag_target.tag.cyan())?; + } + SubCommand::Tag(TagCommand::Delete(tag_target)) => { + let authenticated_client = client.with_auth_cookie()?; + + authenticated_client + .delete_tag(tag_target.tag.clone()) + .await?; + + writeln!(io::stdout(), "Tag {} deleted", tag_target.tag.cyan())?; + } + SubCommand::Tag(TagCommand::Rule(TagRuleCommand::List(tag_target))) => { + writeln!(io::stdout(), "ID\tTag\tName pattern\tVersion pattern",)?; + + let tags = if let Some(tag) = tag_target.tag { + vec![tag] + } else { + client.get_tags().await? + }; + + for tag in tags { + let tag_rules = client.get_tag_rules(tag.clone()).await?; + for tag_rule in tag_rules { + writeln!( + io::stdout(), + "{}\t{}\t{}\t{}", + tag_rule.id.to_string().yellow(), + tag.cyan(), + tag_rule.name_pattern.green(), + tag_rule.version_pattern.unwrap_or("".to_string()).magenta() + )?; + } + } + } + SubCommand::Tag(TagCommand::Rule(TagRuleCommand::Create(create_tag))) => { + let authenticated_client = client.with_auth_cookie()?; + + let tag_rule = authenticated_client + .create_tag_rule( + create_tag.tag, + CreateTagRuleRequest { + name_pattern: create_tag.name_pattern, + version_pattern: create_tag.version_pattern, + }, + ) + .await?; + + writeln!( + io::stdout(), + "Rule created (ID {})", + tag_rule.id.to_string().yellow() + )?; + } + SubCommand::Tag(TagCommand::Rule(TagRuleCommand::Delete(tag_rule_target))) => { + let authenticated_client = client.with_auth_cookie()?; + + authenticated_client + .delete_tag_rule(tag_rule_target.tag, tag_rule_target.rule_id) + .await?; + + writeln!(io::stdout(), "Rule deleted")?; + } SubCommand::Completions(completions) => args::gen_completions(&completions)?, }