diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index d0ffe986..aa97c70f 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -60,14 +60,14 @@ jobs: with: command: build toolchain: ${{ matrix.toolchain }} - args: --target ${{ matrix.target }} --workspace + args: --target ${{ matrix.target }} --workspace --all-features - uses: actions-rs/cargo@v1 name: "Test" if: matrix.target == 'x86_64-unknown-linux-gnu' with: command: test toolchain: ${{ matrix.toolchain }} - args: --target ${{ matrix.target }} --workspace + args: --target ${{ matrix.target }} --workspace --all-features # If musl, compile and test all - uses: actions-rs/cargo@v1 @@ -76,7 +76,7 @@ jobs: with: command: build toolchain: ${{ matrix.toolchain }} - args: --target ${{ matrix.target }} --workspace + args: --target ${{ matrix.target }} --workspace --all-features env: CC: musl-gcc CXX: g++ @@ -86,19 +86,18 @@ jobs: with: command: test toolchain: ${{ matrix.toolchain }} - args: --target ${{ matrix.target }} --workspace + args: --target ${{ matrix.target }} --workspace --all-features env: CC: musl-gcc CXX: g++ - # If wasm, then we test only the main module and cloudevents-sdk-reqwest - uses: actions-rs/cargo@v1 name: "Build" if: matrix.target == 'wasm32-unknown-unknown' with: command: build toolchain: ${{ matrix.toolchain }} - args: --target wasm32-unknown-unknown --package cloudevents-sdk --package cloudevents-sdk-reqwest + args: --target wasm32-unknown-unknown --features cloudevents-reqwest # Build examples - uses: actions-rs/cargo@v1 diff --git a/Cargo.toml b/Cargo.toml index 3c0badb4..510e047a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,12 @@ categories = ["web-programming", "encoding", "data-structures"] [lib] name = "cloudevents" +[features] +cloudevents-actix = ["actix-web", "async-trait", "lazy_static", "bytes", "futures"] +cloudevents-reqwest = ["reqwest", "async-trait", "lazy_static", "bytes"] +cloudevents-rdkafka = ["rdkafka", "lazy_static", "bytes"] +cloudevents-warp = ["warp", "lazy_static", "bytes", "http", "hyper"] + [dependencies] serde = { version = "^1.0", features = ["derive"] } serde_json = "^1.0" @@ -26,6 +32,18 @@ url = { version = "^2.1", features = ["serde"] } snafu = "^0.6" bitflags = "^1.2" +# runtime optional deps +actix-web = { version = "^3", default-features = false, optional = true } +reqwest = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true } +rdkafka = { version = "^0.25", features = ["cmake-build"], optional = true } +warp = { version = "^0.3", optional = true } +async-trait = { version = "^0.1.33", optional = true } +lazy_static = { version = "1.4.0", optional = true } +bytes = { version = "^1.0", optional = true } +futures = { version = "^0.3", optional = true } +http = { version = "0.2", optional = true } +hyper = { version = "^0.14", optional = true } + [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" uuid = { version = "^0.8", features = ["v4"] } @@ -40,13 +58,22 @@ claim = "0.3.1" version-sync = "^0.9" serde_yaml = "0.8" +# runtime dev-deps +actix-rt = { version = "^1" } +url = { version = "^2.1", features = ["serde"] } +serde_json = { version = "^1.0" } +chrono = { version = "^0.4", features = ["serde"] } +mockito = "0.25.1" +tokio = { version = "^1.0", features = ["full"] } +mime = "0.3" + [workspace] members = [ ".", "cloudevents-sdk-actix-web", "cloudevents-sdk-reqwest", "cloudevents-sdk-rdkafka", - "cloudevents-sdk-warp" + "cloudevents-sdk-warp", ] exclude = [ "example-projects/actix-web-example", diff --git a/cloudevents-sdk-actix-web/Cargo.toml b/cloudevents-sdk-actix-web/Cargo.toml index 6e72acd8..36344825 100644 --- a/cloudevents-sdk-actix-web/Cargo.toml +++ b/cloudevents-sdk-actix-web/Cargo.toml @@ -17,7 +17,7 @@ cloudevents-sdk = { version = "0.3.0", path = ".." } actix-web = { version = "^3", default-features = false } async-trait = "^0.1.33" lazy_static = "1.4.0" -bytes = "^0.5" +bytes = "^1.0" futures = "^0.3" [dev-dependencies] diff --git a/cloudevents-sdk-rdkafka/Cargo.toml b/cloudevents-sdk-rdkafka/Cargo.toml index 3c1d6221..5030765a 100644 --- a/cloudevents-sdk-rdkafka/Cargo.toml +++ b/cloudevents-sdk-rdkafka/Cargo.toml @@ -22,5 +22,5 @@ rdkafka = { version = "^0.25", features = ["cmake-build"] } url = { version = "^2.1" } serde_json = "^1.0" chrono = { version = "^0.4", features = ["serde"] } -futures = "0.3.5" +futures = "^0.3" version-sync = "^0.9" diff --git a/example-projects/actix-web-example/Cargo.toml b/example-projects/actix-web-example/Cargo.toml index 34668a55..f8ef30c8 100644 --- a/example-projects/actix-web-example/Cargo.toml +++ b/example-projects/actix-web-example/Cargo.toml @@ -5,8 +5,7 @@ authors = ["Francesco Guardiani "] edition = "2018" [dependencies] -cloudevents-sdk = { path = "../.." } -cloudevents-sdk-actix-web = { path = "../../cloudevents-sdk-actix-web" } +cloudevents-sdk = { path = "../..", features = ["cloudevents-actix"] } actix-web = "^3" actix-cors = "^0.5" lazy_static = "1.4.0" diff --git a/example-projects/actix-web-example/src/main.rs b/example-projects/actix-web-example/src/main.rs index 1c1f5517..8abc2913 100644 --- a/example-projects/actix-web-example/src/main.rs +++ b/example-projects/actix-web-example/src/main.rs @@ -1,6 +1,6 @@ use actix_web::{get, post, web, App, HttpRequest, HttpResponse, HttpServer}; +use cloudevents::actix::{HttpRequestExt, HttpResponseBuilderExt}; use cloudevents::{EventBuilder, EventBuilderV10}; -use cloudevents_sdk_actix_web::{HttpResponseBuilderExt, HttpRequestExt}; use serde_json::json; #[post("/")] diff --git a/example-projects/rdkafka-example/Cargo.toml b/example-projects/rdkafka-example/Cargo.toml index a999960c..30ccee4f 100644 --- a/example-projects/rdkafka-example/Cargo.toml +++ b/example-projects/rdkafka-example/Cargo.toml @@ -8,8 +8,7 @@ edition = "2018" [dependencies] async-trait = "^0.1.33" -cloudevents-sdk = { path = "../.." } -cloudevents-sdk-rdkafka = { path = "../../cloudevents-sdk-rdkafka" } +cloudevents-sdk = { path = "../..", features = ["cloudevents-rdkafka"] } lazy_static = "1.4.0" bytes = "^1.0" url = { version = "^2.1", features = ["serde"] } diff --git a/example-projects/rdkafka-example/src/main.rs b/example-projects/rdkafka-example/src/main.rs index ea250627..82781ab0 100644 --- a/example-projects/rdkafka-example/src/main.rs +++ b/example-projects/rdkafka-example/src/main.rs @@ -3,7 +3,7 @@ use futures::StreamExt; use serde_json::json; use cloudevents::{EventBuilder, EventBuilderV10}; -use cloudevents_sdk_rdkafka::{FutureRecordExt, MessageExt, MessageRecord}; +use cloudevents::rdkafka::{FutureRecordExt, MessageExt, MessageRecord}; use rdkafka::config::{ClientConfig, RDKafkaLogLevel}; use rdkafka::consumer::stream_consumer::StreamConsumer; diff --git a/example-projects/reqwest-wasm-example/Cargo.toml b/example-projects/reqwest-wasm-example/Cargo.toml index f3c635ad..85dd1bf1 100644 --- a/example-projects/reqwest-wasm-example/Cargo.toml +++ b/example-projects/reqwest-wasm-example/Cargo.toml @@ -11,8 +11,7 @@ crate-type = ["cdylib"] [dependencies] reqwest = "^0.11" -cloudevents-sdk = { path = "../.." } -cloudevents-sdk-reqwest = { path = "../../cloudevents-sdk-reqwest" } +cloudevents-sdk = { path = "../..", features = ["cloudevents-reqwest"] } url = { version = "^2.1" } web-sys = { version = "0.3.39", features = ["Window", "Location"] } wasm-bindgen-futures = "0.4.12" diff --git a/example-projects/reqwest-wasm-example/src/lib.rs b/example-projects/reqwest-wasm-example/src/lib.rs index 2efe5840..8d9e6fbd 100644 --- a/example-projects/reqwest-wasm-example/src/lib.rs +++ b/example-projects/reqwest-wasm-example/src/lib.rs @@ -1,5 +1,5 @@ +use cloudevents::reqwest::RequestBuilderExt; use cloudevents::{EventBuilder, EventBuilderV10}; -use cloudevents_sdk_reqwest::RequestBuilderExt; use wasm_bindgen::prelude::*; #[wasm_bindgen] diff --git a/example-projects/warp-example/Cargo.toml b/example-projects/warp-example/Cargo.toml index ab6f86c6..131ce926 100644 --- a/example-projects/warp-example/Cargo.toml +++ b/example-projects/warp-example/Cargo.toml @@ -7,8 +7,7 @@ categories = ["web-programming", "encoding"] license-file = "../LICENSE" [dependencies] -cloudevents-sdk = { path = "../.." } -cloudevents-sdk-warp = { path = "../../cloudevents-sdk-warp"} +cloudevents-sdk = { path = "../..", features = ["cloudevents-warp"] } warp = "^0.3" tokio = { version = "^1.0", features = ["full"] } diff --git a/example-projects/warp-example/src/main.rs b/example-projects/warp-example/src/main.rs index a5db9af8..7b7ec458 100644 --- a/example-projects/warp-example/src/main.rs +++ b/example-projects/warp-example/src/main.rs @@ -1,4 +1,4 @@ -use cloudevents_sdk_warp::{filter, reply}; +use cloudevents::warp::{filter, reply}; use warp::Filter; #[tokio::main] diff --git a/src/actix/headers.rs b/src/actix/headers.rs new file mode 100644 index 00000000..12d40677 --- /dev/null +++ b/src/actix/headers.rs @@ -0,0 +1,68 @@ +use crate::event::SpecVersion; +use actix_web::http::header; +use actix_web::http::{HeaderName, HeaderValue}; +use lazy_static::lazy_static; +use std::collections::HashMap; +use std::str::FromStr; + +macro_rules! unwrap_optional_header { + ($headers:expr, $name:expr) => { + $headers + .get::<&'static HeaderName>(&$name) + .map(|a| header_value_to_str!(a)) + }; +} + +macro_rules! header_value_to_str { + ($header_value:expr) => { + $header_value + .to_str() + .map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + }; +} + +macro_rules! str_to_header_value { + ($header_value:expr) => { + HeaderValue::from_str($header_value).map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + }; +} + +macro_rules! str_name_to_header { + ($attribute:expr) => { + HeaderName::from_str($attribute).map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + }; +} + +macro_rules! attribute_name_to_header { + ($attribute:expr) => { + str_name_to_header!(&["ce-", $attribute].concat()) + }; +} + +fn attributes_to_headers( + it: impl Iterator, +) -> HashMap<&'static str, HeaderName> { + it.map(|s| { + if s == "datacontenttype" { + (s, header::CONTENT_TYPE) + } else { + (s, attribute_name_to_header!(s).unwrap()) + } + }) + .collect() +} + +lazy_static! { + pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> = + attributes_to_headers(SpecVersion::all_attribute_names()); + pub(crate) static ref SPEC_VERSION_HEADER: HeaderName = + HeaderName::from_static("ce-specversion"); + pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue = + HeaderValue::from_static("application/cloudevents+json"); +} diff --git a/src/actix/mod.rs b/src/actix/mod.rs new file mode 100644 index 00000000..ea3f322b --- /dev/null +++ b/src/actix/mod.rs @@ -0,0 +1,54 @@ +//! This module integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [Actix web](https://docs.rs/actix-web/) to easily send and receive CloudEvents. +//! +//! To deserialize an HTTP request as CloudEvent: +//! +//! ``` +//! use cloudevents::actix::HttpRequestExt; +//! use actix_web::{HttpRequest, web, post}; +//! +//! #[post("/")] +//! async fn post_event(req: HttpRequest, payload: web::Payload) -> Result { +//! let event = req.to_event(payload).await?; +//! println!("Received Event: {:?}", event); +//! Ok(format!("{:?}", event)) +//! } +//! ``` +//! +//! To serialize a CloudEvent to an HTTP response: +//! +//! ``` +//! use cloudevents::actix::HttpResponseBuilderExt; +//! use actix_web::{HttpRequest, web, get, HttpResponse}; +//! use cloudevents::{EventBuilderV10, EventBuilder}; +//! use serde_json::json; +//! +//! #[get("/")] +//! async fn get_event() -> Result { +//! Ok(HttpResponse::Ok() +//! .event( +//! EventBuilderV10::new() +//! .id("0001") +//! .ty("example.test") +//! .source("http://localhost/") +//! .data("application/json", json!({"hello": "world"})) +//! .build() +//! .expect("No error while building the event"), +//! ) +//! .await? +//! ) +//! } +//! ``` + +#![deny(broken_intra_doc_links)] + +#[macro_use] +mod headers; +mod server_request; +mod server_response; + +pub use server_request::request_to_event; +pub use server_request::HttpRequestDeserializer; +pub use server_request::HttpRequestExt; +pub use server_response::event_to_response; +pub use server_response::HttpResponseBuilderExt; +pub use server_response::HttpResponseSerializer; diff --git a/src/actix/server_request.rs b/src/actix/server_request.rs new file mode 100644 index 00000000..8a0e3c2b --- /dev/null +++ b/src/actix/server_request.rs @@ -0,0 +1,210 @@ +use super::headers; +use crate::event::SpecVersion; +use crate::message::{ + BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer, + Result, StructuredDeserializer, StructuredSerializer, +}; +use crate::{message, Event}; +use actix_web::http::HeaderName; +use actix_web::web::{Bytes, BytesMut}; +use actix_web::{web, HttpMessage, HttpRequest}; +use async_trait::async_trait; +use futures::StreamExt; +use std::convert::TryFrom; + +/// Wrapper for [`HttpRequest`] that implements [`MessageDeserializer`] trait. +pub struct HttpRequestDeserializer<'a> { + req: &'a HttpRequest, + body: Bytes, +} + +impl HttpRequestDeserializer<'_> { + pub fn new(req: &HttpRequest, body: Bytes) -> HttpRequestDeserializer { + HttpRequestDeserializer { req, body } + } +} + +impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> { + fn deserialize_binary>(self, mut visitor: V) -> Result { + if self.encoding() != Encoding::BINARY { + return Err(message::Error::WrongEncoding {}); + } + + let spec_version = SpecVersion::try_from( + unwrap_optional_header!(self.req.headers(), headers::SPEC_VERSION_HEADER).unwrap()?, + )?; + + visitor = visitor.set_spec_version(spec_version.clone())?; + + let attributes = spec_version.attribute_names(); + + for (hn, hv) in + self.req.headers().iter().filter(|(hn, _)| { + headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-") + }) + { + let name = &hn.as_str()["ce-".len()..]; + + if attributes.contains(&name) { + visitor = visitor.set_attribute( + name, + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } else { + visitor = visitor.set_extension( + name, + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } + } + + if let Some(hv) = self.req.headers().get("content-type") { + visitor = visitor.set_attribute( + "datacontenttype", + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } + + if !self.body.is_empty() { + visitor.end_with_data(self.body.to_vec()) + } else { + visitor.end() + } + } +} + +impl<'a> StructuredDeserializer for HttpRequestDeserializer<'a> { + fn deserialize_structured>(self, visitor: V) -> Result { + if self.encoding() != Encoding::STRUCTURED { + return Err(message::Error::WrongEncoding {}); + } + visitor.set_structured_event(self.body.to_vec()) + } +} + +impl<'a> MessageDeserializer for HttpRequestDeserializer<'a> { + fn encoding(&self) -> Encoding { + if self.req.content_type() == "application/cloudevents+json" { + Encoding::STRUCTURED + } else if self + .req + .headers() + .get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER) + .is_some() + { + Encoding::BINARY + } else { + Encoding::UNKNOWN + } + } +} + +/// Method to transform an incoming [`HttpRequest`] to [`Event`]. +pub async fn request_to_event( + req: &HttpRequest, + mut payload: web::Payload, +) -> std::result::Result { + let mut bytes = BytesMut::new(); + while let Some(item) = payload.next().await { + bytes.extend_from_slice(&item?); + } + MessageDeserializer::into_event(HttpRequestDeserializer::new(req, bytes.freeze())) + .map_err(actix_web::error::ErrorBadRequest) +} + +/// Extention Trait for [`HttpRequest`] which acts as a wrapper for the function [`request_to_event()`]. +/// +/// This trait is sealed and cannot be implemented for types outside of this crate. +#[async_trait(?Send)] +pub trait HttpRequestExt: private::Sealed { + /// Convert this [`HttpRequest`] into an [`Event`]. + async fn to_event( + &self, + mut payload: web::Payload, + ) -> std::result::Result; +} + +#[async_trait(?Send)] +impl HttpRequestExt for HttpRequest { + async fn to_event( + &self, + payload: web::Payload, + ) -> std::result::Result { + request_to_event(self, payload).await + } +} + +mod private { + // Sealing the RequestExt + pub trait Sealed {} + impl Sealed for actix_web::HttpRequest {} +} + +#[cfg(test)] +mod tests { + use super::*; + use actix_web::test; + + use crate::{EventBuilder, EventBuilderV10}; + use chrono::Utc; + use serde_json::json; + + #[actix_rt::test] + async fn test_request() { + let time = Utc::now(); + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost/") + //TODO this is required now because the message deserializer implictly set default values + // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41) + .time(time) + .extension("someint", "10") + .build() + .unwrap(); + + let (req, payload) = test::TestRequest::post() + .header("ce-specversion", "1.0") + .header("ce-id", "0001") + .header("ce-type", "example.test") + .header("ce-source", "http://localhost/") + .header("ce-someint", "10") + .header("ce-time", time.to_rfc3339()) + .to_http_parts(); + + let resp = req.to_event(web::Payload(payload)).await.unwrap(); + assert_eq!(expected, resp); + } + + #[actix_rt::test] + async fn test_request_with_full_data() { + let time = Utc::now(); + let j = json!({"hello": "world"}); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + //TODO this is required now because the message deserializer implictly set default values + // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41) + .time(time) + .data("application/json", j.to_string().into_bytes()) + .extension("someint", "10") + .build() + .unwrap(); + + let (req, payload) = test::TestRequest::post() + .header("ce-specversion", "1.0") + .header("ce-id", "0001") + .header("ce-type", "example.test") + .header("ce-source", "http://localhost") + .header("ce-someint", "10") + .header("ce-time", time.to_rfc3339()) + .header("content-type", "application/json") + .set_json(&j) + .to_http_parts(); + + let resp = req.to_event(web::Payload(payload)).await.unwrap(); + assert_eq!(expected, resp); + } +} diff --git a/src/actix/server_response.rs b/src/actix/server_response.rs new file mode 100644 index 00000000..eec5f1ce --- /dev/null +++ b/src/actix/server_response.rs @@ -0,0 +1,214 @@ +use super::headers; +use crate::event::SpecVersion; +use crate::message::{ + BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer, +}; +use crate::Event; +use actix_web::dev::HttpResponseBuilder; +use actix_web::http::{HeaderName, HeaderValue}; +use actix_web::HttpResponse; +use async_trait::async_trait; +use std::str::FromStr; + +/// Wrapper for [`HttpResponseBuilder`] that implements [`StructuredSerializer`] and [`BinarySerializer`]. +pub struct HttpResponseSerializer { + builder: HttpResponseBuilder, +} + +impl HttpResponseSerializer { + pub fn new(builder: HttpResponseBuilder) -> HttpResponseSerializer { + HttpResponseSerializer { builder } + } +} + +impl BinarySerializer for HttpResponseSerializer { + fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { + self.builder.set_header( + headers::SPEC_VERSION_HEADER.clone(), + str_to_header_value!(spec_version.as_str())?, + ); + Ok(self) + } + + fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { + self.builder.set_header( + headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(), + str_to_header_value!(value.to_string().as_str())?, + ); + Ok(self) + } + + fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { + self.builder.set_header( + attribute_name_to_header!(name)?, + str_to_header_value!(value.to_string().as_str())?, + ); + Ok(self) + } + + fn end_with_data(mut self, bytes: Vec) -> Result { + Ok(self.builder.body(bytes)) + } + + fn end(mut self) -> Result { + Ok(self.builder.finish()) + } +} + +impl StructuredSerializer for HttpResponseSerializer { + fn set_structured_event(mut self, bytes: Vec) -> Result { + Ok(self + .builder + .set_header( + actix_web::http::header::CONTENT_TYPE, + headers::CLOUDEVENTS_JSON_HEADER.clone(), + ) + .body(bytes)) + } +} + +/// Method to fill an [`HttpResponseBuilder`] with an [`Event`]. +pub async fn event_to_response( + event: Event, + response: HttpResponseBuilder, +) -> std::result::Result { + BinaryDeserializer::deserialize_binary(event, HttpResponseSerializer::new(response)) + .map_err(actix_web::error::ErrorBadRequest) +} + +/// Extension Trait for [`HttpResponseBuilder`] which acts as a wrapper for the function [`event_to_response()`]. +/// +/// This trait is sealed and cannot be implemented for types outside of this crate. +#[async_trait(?Send)] +pub trait HttpResponseBuilderExt: private::Sealed { + /// Fill this [`HttpResponseBuilder`] with an [`Event`]. + async fn event( + self, + event: Event, + ) -> std::result::Result; +} + +#[async_trait(?Send)] +impl HttpResponseBuilderExt for HttpResponseBuilder { + async fn event( + self, + event: Event, + ) -> std::result::Result { + event_to_response(event, self).await + } +} + +// Sealing the HttpResponseBuilderExt +mod private { + pub trait Sealed {} + impl Sealed for actix_web::dev::HttpResponseBuilder {} +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::{EventBuilder, EventBuilderV10}; + use actix_web::http::StatusCode; + use actix_web::test; + use futures::TryStreamExt; + use serde_json::json; + + #[actix_rt::test] + async fn test_response() { + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost/") + .extension("someint", "10") + .build() + .unwrap(); + + let resp = HttpResponseBuilder::new(StatusCode::OK) + .event(input) + .await + .unwrap(); + + assert_eq!( + resp.headers() + .get("ce-specversion") + .unwrap() + .to_str() + .unwrap(), + "1.0" + ); + assert_eq!( + resp.headers().get("ce-id").unwrap().to_str().unwrap(), + "0001" + ); + assert_eq!( + resp.headers().get("ce-type").unwrap().to_str().unwrap(), + "example.test" + ); + assert_eq!( + resp.headers().get("ce-source").unwrap().to_str().unwrap(), + "http://localhost/" + ); + assert_eq!( + resp.headers().get("ce-someint").unwrap().to_str().unwrap(), + "10" + ); + } + + #[actix_rt::test] + async fn test_response_with_full_data() { + let j = json!({"hello": "world"}); + + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .data("application/json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let mut resp = HttpResponseBuilder::new(StatusCode::OK) + .event(input) + .await + .unwrap(); + + assert_eq!( + resp.headers() + .get("ce-specversion") + .unwrap() + .to_str() + .unwrap(), + "1.0" + ); + assert_eq!( + resp.headers().get("ce-id").unwrap().to_str().unwrap(), + "0001" + ); + assert_eq!( + resp.headers().get("ce-type").unwrap().to_str().unwrap(), + "example.test" + ); + assert_eq!( + resp.headers().get("ce-source").unwrap().to_str().unwrap(), + "http://localhost" + ); + assert_eq!( + resp.headers() + .get("content-type") + .unwrap() + .to_str() + .unwrap(), + "application/json" + ); + assert_eq!( + resp.headers().get("ce-someint").unwrap().to_str().unwrap(), + "10" + ); + + let bytes = test::load_stream(resp.take_body().into_stream()) + .await + .unwrap(); + assert_eq!(j.to_string().as_bytes(), bytes.as_ref()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 89c6ca41..d78c7b91 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,17 +29,20 @@ //! * The [`EventBuilder`] trait and implementations, to create [`Event`] instances //! * The implementation of [`serde::Serialize`] and [`serde::Deserialize`] for [`Event`] to serialize/deserialize CloudEvents to/from JSON //! * Traits and utilities in [`message`] to implement Protocol Bindings -//! -//! If you're looking for Protocol Binding implementations, look at crates: -//! -//! * [cloudevents-sdk-actix-web](https://docs.rs/cloudevents-sdk-actix-web): Integration with [Actix Web](https://github.com/actix/actix-web) -//! * [cloudevents-sdk-reqwest](https://docs.rs/cloudevents-sdk-reqwest): Integration with [reqwest](https://github.com/seanmonstar/reqwest) -//! * [cloudevents-sdk-rdkafka](https://docs.rs/cloudevents-sdk-rdkafka): Integration with [rdkafka](https://fede1024.github.io/rust-rdkafka) +//! * Feature-guarded modules for various Protocol Binding implementations, e.g. actix, reqwest, warp, rdkafka //! -#![doc(html_root_url = "https://docs.rs/cloudevents-sdk/0.3.1")] #![deny(broken_intra_doc_links)] +#[cfg(feature = "cloudevents-actix")] +pub mod actix; +#[cfg(feature = "cloudevents-rdkafka")] +pub mod rdkafka; +#[cfg(feature = "cloudevents-reqwest")] +pub mod reqwest; +#[cfg(feature = "cloudevents-warp")] +pub mod warp; + pub mod event; pub mod message; diff --git a/src/rdkafka/headers.rs b/src/rdkafka/headers.rs new file mode 100644 index 00000000..6f662382 --- /dev/null +++ b/src/rdkafka/headers.rs @@ -0,0 +1,29 @@ +use crate::event::SpecVersion; +use lazy_static::lazy_static; +use std::collections::HashMap; + +macro_rules! attribute_name_to_header { + ($attribute:expr) => { + format!("ce_{}", $attribute) + }; +} + +fn attributes_to_headers(it: impl Iterator) -> HashMap<&'static str, String> { + it.map(|s| { + if s == "datacontenttype" { + (s, String::from("content-type")) + } else { + (s, attribute_name_to_header!(s)) + } + }) + .collect() +} + +lazy_static! { + pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, String> = + attributes_to_headers(SpecVersion::all_attribute_names()); +} + +pub(crate) static SPEC_VERSION_HEADER: &str = "ce_specversion"; +pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json"; +pub(crate) static CONTENT_TYPE: &str = "content-type"; diff --git a/src/rdkafka/kafka_consumer_record.rs b/src/rdkafka/kafka_consumer_record.rs new file mode 100644 index 00000000..6a399e20 --- /dev/null +++ b/src/rdkafka/kafka_consumer_record.rs @@ -0,0 +1,258 @@ +use super::headers; +use crate::event::SpecVersion; +use crate::message::{ + BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer, + Result, StructuredDeserializer, StructuredSerializer, +}; +use crate::{message, Event}; +use rdkafka::message::{BorrowedMessage, Headers, Message, OwnedMessage}; +use std::collections::HashMap; +use std::convert::TryFrom; +use std::str; + +/// Wrapper for [`Message`] that implements [`MessageDeserializer`] trait. +pub struct ConsumerRecordDeserializer { + pub(crate) headers: HashMap>, + pub(crate) payload: Option>, +} + +impl ConsumerRecordDeserializer { + fn get_kafka_headers(message: &impl Message) -> Result>> { + let mut hm = HashMap::new(); + let headers = message + .headers() + // TODO create an error variant for invalid headers + .ok_or(crate::message::Error::WrongEncoding {})?; + for i in 0..headers.count() { + let header = headers.get(i).unwrap(); + hm.insert(header.0.to_string(), Vec::from(header.1)); + } + Ok(hm) + } + + pub fn new(message: &impl Message) -> Result { + Ok(ConsumerRecordDeserializer { + headers: Self::get_kafka_headers(message)?, + payload: message.payload().map(Vec::from), + }) + } +} + +impl BinaryDeserializer for ConsumerRecordDeserializer { + fn deserialize_binary>(mut self, mut visitor: V) -> Result { + if self.encoding() != Encoding::BINARY { + return Err(message::Error::WrongEncoding {}); + } + + let spec_version = SpecVersion::try_from( + str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..]) + .map_err(|e| crate::message::Error::Other { + source: Box::new(e), + })?, + )?; + + visitor = visitor.set_spec_version(spec_version.clone())?; + + let attributes = spec_version.attribute_names(); + + if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) { + visitor = visitor.set_attribute( + "datacontenttype", + MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { + crate::message::Error::Other { + source: Box::new(e), + } + })?), + )? + } + + for (hn, hv) in self + .headers + .into_iter() + .filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_")) + { + let name = &hn["ce_".len()..]; + + if attributes.contains(&name) { + visitor = visitor.set_attribute( + name, + MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { + crate::message::Error::Other { + source: Box::new(e), + } + })?), + )? + } else { + visitor = visitor.set_extension( + name, + MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { + crate::message::Error::Other { + source: Box::new(e), + } + })?), + )? + } + } + + if self.payload != None { + visitor.end_with_data(self.payload.unwrap()) + } else { + visitor.end() + } + } +} + +impl StructuredDeserializer for ConsumerRecordDeserializer { + fn deserialize_structured>(self, visitor: V) -> Result { + if self.encoding() != Encoding::STRUCTURED { + return Err(message::Error::WrongEncoding {}); + } + visitor.set_structured_event(self.payload.unwrap()) + } +} + +impl MessageDeserializer for ConsumerRecordDeserializer { + fn encoding(&self) -> Encoding { + match ( + self.headers + .get("content-type") + .map(|s| String::from_utf8(s.to_vec()).ok()) + .flatten() + .map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER)) + .unwrap_or(false), + self.headers.get(headers::SPEC_VERSION_HEADER), + ) { + (true, _) => Encoding::STRUCTURED, + (_, Some(_)) => Encoding::BINARY, + _ => Encoding::UNKNOWN, + } + } +} + +/// Method to transform a [`Message`] to [`Event`]. +pub fn record_to_event(msg: &impl Message) -> Result { + MessageDeserializer::into_event(ConsumerRecordDeserializer::new(msg)?) +} + +/// Extension Trait for [`Message`] which acts as a wrapper for the function [`record_to_event()`]. +/// +/// This trait is sealed and cannot be implemented for types outside of this crate. +pub trait MessageExt: private::Sealed { + /// Generates [`Event`] from [`BorrowedMessage`]. + fn to_event(&self) -> Result; +} + +impl MessageExt for BorrowedMessage<'_> { + fn to_event(&self) -> Result { + record_to_event(self) + } +} + +impl MessageExt for OwnedMessage { + fn to_event(&self) -> Result { + record_to_event(self) + } +} + +mod private { + // Sealing the MessageExt + pub trait Sealed {} + impl Sealed for rdkafka::message::OwnedMessage {} + impl Sealed for rdkafka::message::BorrowedMessage<'_> {} +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::rdkafka::kafka_producer_record::MessageRecord; + + use crate::{EventBuilder, EventBuilderV10}; + use chrono::Utc; + use serde_json::json; + + #[test] + fn test_binary_record() { + let time = Utc::now(); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .time(time) + .source("http://localhost") + .extension("someint", "10") + .build() + .unwrap(); + + // Since there is neither a way provided by rust-rdkafka to convert FutureProducer back into + // OwnedMessage or BorrowedMessage, nor is there a way to create a BorrowedMessage struct, + // the test uses OwnedMessage instead, which consumes the message instead of borrowing it like + // in the case of BorrowedMessage + + let message_record = MessageRecord::from_event( + EventBuilderV10::new() + .id("0001") + .ty("example.test") + .time(time) + .source("http://localhost") + .extension("someint", "10") + .build() + .unwrap(), + ) + .unwrap(); + + let owned_message = OwnedMessage::new( + message_record.payload, + Some(String::from("test key").into_bytes()), + String::from("test topic"), + rdkafka::message::Timestamp::NotAvailable, + 10, + 10, + Some(message_record.headers), + ); + + assert_eq!(owned_message.to_event().unwrap(), expected) + } + + #[test] + fn test_structured_record() { + let j = json!({"hello": "world"}); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .data("application/json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + // Since there is neither a way provided by rust-rdkafka to convert FutureProducer back into + // OwnedMessage or BorrowedMessage, nor is there a way to create a BorrowedMessage struct, + // the test uses OwnedMessage instead, which consumes the message instead of borrowing it like + // in the case of BorrowedMessage + + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .data("application/json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let serialized_event = + StructuredDeserializer::deserialize_structured(input, MessageRecord::new()).unwrap(); + + let owned_message = OwnedMessage::new( + serialized_event.payload, + Some(String::from("test key").into_bytes()), + String::from("test topic"), + rdkafka::message::Timestamp::NotAvailable, + 10, + 10, + Some(serialized_event.headers), + ); + + assert_eq!(owned_message.to_event().unwrap(), expected) + } +} diff --git a/src/rdkafka/kafka_producer_record.rs b/src/rdkafka/kafka_producer_record.rs new file mode 100644 index 00000000..c3e89514 --- /dev/null +++ b/src/rdkafka/kafka_producer_record.rs @@ -0,0 +1,153 @@ +use super::headers; +use crate::event::SpecVersion; +use crate::message::{ + BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer, +}; +use crate::Event; +use rdkafka::message::{OwnedHeaders, ToBytes}; +use rdkafka::producer::{BaseRecord, FutureRecord}; + +/// This struct contains a serialized CloudEvent message in the Kafka shape. +/// Implements [`StructuredSerializer`] & [`BinarySerializer`] traits. +/// +/// To instantiate a new `MessageRecord` from an [`Event`], +/// look at [`Self::from_event`] or use [`StructuredDeserializer::deserialize_structured`](crate::message::StructuredDeserializer::deserialize_structured) +/// or [`BinaryDeserializer::deserialize_binary`]. +pub struct MessageRecord { + pub(crate) headers: OwnedHeaders, + pub(crate) payload: Option>, +} + +impl MessageRecord { + /// Create a new empty [`MessageRecord`] + pub fn new() -> Self { + MessageRecord { + headers: OwnedHeaders::new(), + payload: None, + } + } + + /// Create a new [`MessageRecord`], filled with `event` serialized in binary mode. + pub fn from_event(event: Event) -> Result { + BinaryDeserializer::deserialize_binary(event, MessageRecord::new()) + } +} + +impl Default for MessageRecord { + fn default() -> Self { + Self::new() + } +} + +impl BinarySerializer for MessageRecord { + fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { + self.headers = self + .headers + .add(headers::SPEC_VERSION_HEADER, spec_version.as_str()); + + Ok(self) + } + + fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { + self.headers = self.headers.add( + &headers::ATTRIBUTES_TO_HEADERS + .get(name) + .ok_or(crate::message::Error::UnknownAttribute { + name: String::from(name), + })? + .clone()[..], + &value.to_string()[..], + ); + + Ok(self) + } + + fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { + self.headers = self + .headers + .add(&attribute_name_to_header!(name)[..], &value.to_string()[..]); + + Ok(self) + } + + fn end_with_data(mut self, bytes: Vec) -> Result { + self.payload = Some(bytes); + + Ok(self) + } + + fn end(self) -> Result { + Ok(self) + } +} + +impl StructuredSerializer for MessageRecord { + fn set_structured_event(mut self, bytes: Vec) -> Result { + self.headers = self + .headers + .add(headers::CONTENT_TYPE, headers::CLOUDEVENTS_JSON_HEADER); + + self.payload = Some(bytes); + + Ok(self) + } +} + +/// Extension Trait for [`BaseRecord`] that fills the record with a [`MessageRecord`]. +/// +/// This trait is sealed and cannot be implemented for types outside of this crate. +pub trait BaseRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed { + /// Fill this [`BaseRecord`] with a [`MessageRecord`]. + fn message_record( + self, + message_record: &'a MessageRecord, + ) -> Result>>; +} + +impl<'a, K: ToBytes + ?Sized> BaseRecordExt<'a, K> for BaseRecord<'a, K, Vec> { + fn message_record( + mut self, + message_record: &'a MessageRecord, + ) -> Result>> { + self = self.headers(message_record.headers.clone()); + + if let Some(s) = message_record.payload.as_ref() { + self = self.payload(s); + } + + Ok(self) + } +} + +/// Extension Trait for [`FutureRecord`] that fills the record with a [`MessageRecord`]. +/// +/// This trait is sealed and cannot be implemented for types outside of this crate. +pub trait FutureRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed { + /// Fill this [`FutureRecord`] with a [`MessageRecord`]. + fn message_record(self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec>; +} + +impl<'a, K: ToBytes + ?Sized> FutureRecordExt<'a, K> for FutureRecord<'a, K, Vec> { + fn message_record(mut self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec> { + self = self.headers(message_record.headers.clone()); + + if let Some(s) = message_record.payload.as_ref() { + self = self.payload(s); + } + + self + } +} + +mod private { + // Sealing the FutureRecordExt and BaseRecordExt + pub trait Sealed {} + impl Sealed + for rdkafka::producer::FutureRecord<'_, K, V> + { + } + impl Sealed + for rdkafka::producer::BaseRecord<'_, K, V> + { + } +} diff --git a/src/rdkafka/mod.rs b/src/rdkafka/mod.rs new file mode 100644 index 00000000..31c279e1 --- /dev/null +++ b/src/rdkafka/mod.rs @@ -0,0 +1,64 @@ +//! This library provides Kafka protocol bindings for CloudEvents +//! using the [rust-rdkafka](https://fede1024.github.io/rust-rdkafka) library. +//! +//! To produce Cloudevents: +//! +//! ``` +//! +//! use cloudevents::Event; +//! use rdkafka::producer::{FutureProducer, FutureRecord}; +//! use rdkafka::util::Timeout; +//! use cloudevents::rdkafka::{MessageRecord, FutureRecordExt}; +//! +//! # async fn produce(producer: &FutureProducer, event: Event) -> Result<(), Box> { +//! let message_record = MessageRecord::from_event(event)?; +//! +//! producer.send( +//! FutureRecord::to("topic") +//! .key("some_event") +//! .message_record(&message_record), +//! Timeout::Never +//! ).await; +//! # Ok(()) +//! # } +//! +//! ``` +//! +//! To consume Cloudevents: +//! +//! ``` +//! use rdkafka::consumer::{StreamConsumer, DefaultConsumerContext, Consumer, CommitMode}; +//! use cloudevents::rdkafka::MessageExt; +//! use futures::StreamExt; +//! +//! # async fn consume(consumer: StreamConsumer) -> Result<(), Box> { +//! let mut message_stream = consumer.start(); +//! +//! while let Some(message) = message_stream.next().await { +//! match message { +//! Err(e) => println!("Kafka error: {}", e), +//! Ok(m) => { +//! let event = m.to_event()?; +//! println!("Received Event: {}", event); +//! consumer.commit_message(&m, CommitMode::Async)?; +//! } +//! }; +//! } +//! # Ok(()) +//! # } +//! ``` + +#![deny(broken_intra_doc_links)] + +#[macro_use] +mod headers; +mod kafka_consumer_record; +mod kafka_producer_record; + +pub use kafka_consumer_record::record_to_event; +pub use kafka_consumer_record::ConsumerRecordDeserializer; +pub use kafka_consumer_record::MessageExt; + +pub use kafka_producer_record::BaseRecordExt; +pub use kafka_producer_record::FutureRecordExt; +pub use kafka_producer_record::MessageRecord; diff --git a/src/reqwest/client_request.rs b/src/reqwest/client_request.rs new file mode 100644 index 00000000..64b37037 --- /dev/null +++ b/src/reqwest/client_request.rs @@ -0,0 +1,199 @@ +use super::headers; +use crate::event::SpecVersion; +use crate::message::{ + BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer, +}; +use crate::Event; +use reqwest::RequestBuilder; +use std::str::FromStr; + +/// Wrapper for [`RequestBuilder`] that implements [`StructuredSerializer`] & [`BinarySerializer`] traits. +pub struct RequestSerializer { + req: RequestBuilder, +} + +impl RequestSerializer { + pub fn new(req: RequestBuilder) -> RequestSerializer { + RequestSerializer { req } + } +} + +impl BinarySerializer for RequestSerializer { + fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { + self.req = self + .req + .header(headers::SPEC_VERSION_HEADER.clone(), spec_version.as_str()); + Ok(self) + } + + fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { + self.req = self.req.header( + headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(), + value.to_string(), + ); + Ok(self) + } + + fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { + self.req = self + .req + .header(attribute_name_to_header!(name)?, value.to_string()); + Ok(self) + } + + fn end_with_data(self, bytes: Vec) -> Result { + Ok(self.req.body(bytes)) + } + + fn end(self) -> Result { + Ok(self.req) + } +} + +impl StructuredSerializer for RequestSerializer { + fn set_structured_event(self, bytes: Vec) -> Result { + Ok(self + .req + .header( + reqwest::header::CONTENT_TYPE, + headers::CLOUDEVENTS_JSON_HEADER.clone(), + ) + .body(bytes)) + } +} + +/// Method to fill a [`RequestBuilder`] with an [`Event`]. +pub fn event_to_request(event: Event, request_builder: RequestBuilder) -> Result { + BinaryDeserializer::deserialize_binary(event, RequestSerializer::new(request_builder)) +} + +/// Extension Trait for [`RequestBuilder`] which acts as a wrapper for the function [`event_to_request()`]. +/// +/// This trait is sealed and cannot be implemented for types outside of this crate. +pub trait RequestBuilderExt: private::Sealed { + /// Write in this [`RequestBuilder`] the provided [`Event`]. Similar to invoking [`Event`]. + fn event(self, event: Event) -> Result; +} + +impl RequestBuilderExt for RequestBuilder { + fn event(self, event: Event) -> Result { + event_to_request(event, self) + } +} + +// Sealing the RequestBuilderExt +mod private { + pub trait Sealed {} + impl Sealed for reqwest::RequestBuilder {} +} + +#[cfg(test)] +mod tests { + use super::*; + use mockito::{mock, Matcher}; + + use crate::message::StructuredDeserializer; + use crate::{EventBuilder, EventBuilderV10}; + use serde_json::json; + + #[tokio::test] + async fn test_request() { + let url = mockito::server_url(); + let m = mock("POST", "/") + .match_header("ce-specversion", "1.0") + .match_header("ce-id", "0001") + .match_header("ce-type", "example.test") + .match_header("ce-source", "http://localhost/") + .match_header("ce-someint", "10") + .match_body(Matcher::Missing) + .create(); + + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost/") + .extension("someint", "10") + .build() + .unwrap(); + + let client = reqwest::Client::new(); + client + .post(&url) + .event(input) + .unwrap() + .send() + .await + .unwrap(); + + m.assert(); + } + + #[tokio::test] + async fn test_request_with_full_data() { + let j = json!({"hello": "world"}); + + let url = mockito::server_url(); + let m = mock("POST", "/") + .match_header("ce-specversion", "1.0") + .match_header("ce-id", "0001") + .match_header("ce-type", "example.test") + .match_header("ce-source", "http://localhost/") + .match_header("content-type", "application/json") + .match_header("ce-someint", "10") + .match_body(Matcher::Exact(j.to_string())) + .create(); + + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost/") + .data("application/json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let client = reqwest::Client::new(); + + client + .post(&url) + .event(input) + .unwrap() + .send() + .await + .unwrap(); + + m.assert(); + } + + #[tokio::test] + async fn test_structured_request_with_full_data() { + let j = json!({"hello": "world"}); + + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .data("application/json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let url = mockito::server_url(); + let m = mock("POST", "/") + .match_header("content-type", "application/cloudevents+json") + .match_body(Matcher::Exact(serde_json::to_string(&input).unwrap())) + .create(); + + let client = reqwest::Client::new(); + StructuredDeserializer::deserialize_structured( + input, + RequestSerializer::new(client.post(&url)), + ) + .unwrap() + .send() + .await + .unwrap(); + + m.assert(); + } +} diff --git a/src/reqwest/client_response.rs b/src/reqwest/client_response.rs new file mode 100644 index 00000000..7aa2ab92 --- /dev/null +++ b/src/reqwest/client_response.rs @@ -0,0 +1,263 @@ +use super::headers; +use crate::event::SpecVersion; +use crate::message::{ + BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue, + MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, +}; +use crate::{message, Event}; +use async_trait::async_trait; +use bytes::Bytes; +use reqwest::header::{HeaderMap, HeaderName}; +use reqwest::Response; +use std::convert::TryFrom; + +/// Wrapper for [`Response`] that implements [`MessageDeserializer`] trait. +pub struct ResponseDeserializer { + headers: HeaderMap, + body: Bytes, +} + +impl ResponseDeserializer { + pub fn new(headers: HeaderMap, body: Bytes) -> ResponseDeserializer { + ResponseDeserializer { headers, body } + } +} + +impl BinaryDeserializer for ResponseDeserializer { + fn deserialize_binary>(self, mut visitor: V) -> Result { + if self.encoding() != Encoding::BINARY { + return Err(message::Error::WrongEncoding {}); + } + + let spec_version = SpecVersion::try_from( + unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?, + )?; + + visitor = visitor.set_spec_version(spec_version.clone())?; + + let attributes = spec_version.attribute_names(); + + for (hn, hv) in self + .headers + .iter() + .filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")) + { + let name = &hn.as_str()["ce-".len()..]; + + if attributes.contains(&name) { + visitor = visitor.set_attribute( + name, + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } else { + visitor = visitor.set_extension( + name, + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } + } + + if let Some(hv) = self.headers.get("content-type") { + visitor = visitor.set_attribute( + "datacontenttype", + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } + + if !self.body.is_empty() { + visitor.end_with_data(self.body.to_vec()) + } else { + visitor.end() + } + } +} + +impl StructuredDeserializer for ResponseDeserializer { + fn deserialize_structured>(self, visitor: V) -> Result { + if self.encoding() != Encoding::STRUCTURED { + return Err(message::Error::WrongEncoding {}); + } + visitor.set_structured_event(self.body.to_vec()) + } +} + +impl MessageDeserializer for ResponseDeserializer { + fn encoding(&self) -> Encoding { + match ( + #[allow(clippy::borrow_interior_mutable_const)] + unwrap_optional_header!(self.headers, reqwest::header::CONTENT_TYPE) + .map(|r| r.ok()) + .flatten() + .map(|e| e.starts_with("application/cloudevents+json")), + self.headers + .get::<&'static HeaderName>(&headers::SPEC_VERSION_HEADER), + ) { + (Some(true), _) => Encoding::STRUCTURED, + (_, Some(_)) => Encoding::BINARY, + _ => Encoding::UNKNOWN, + } + } +} + +/// Method to transform an incoming [`Response`] to [`Event`]. +pub async fn response_to_event(res: Response) -> Result { + let h = res.headers().to_owned(); + let b = res.bytes().await.map_err(|e| Error::Other { + source: Box::new(e), + })?; + + MessageDeserializer::into_event(ResponseDeserializer::new(h, b)) +} + +/// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`]. +/// +/// This trait is sealed and cannot be implemented for types outside of this crate. +#[async_trait(?Send)] +pub trait ResponseExt: private::Sealed { + /// Convert this [`Response`] to [`Event`]. + async fn into_event(self) -> Result; +} + +#[async_trait(?Send)] +impl ResponseExt for Response { + async fn into_event(self) -> Result { + response_to_event(self).await + } +} + +// Sealing the ResponseExt +mod private { + pub trait Sealed {} + impl Sealed for reqwest::Response {} +} + +#[cfg(test)] +mod tests { + use super::*; + use mockito::mock; + + use crate::{EventBuilder, EventBuilderV10}; + use chrono::Utc; + use serde_json::json; + + #[tokio::test] + async fn test_response() { + let time = Utc::now(); + let url = mockito::server_url(); + let _m = mock("GET", "/") + .with_status(200) + .with_header("ce-specversion", "1.0") + .with_header("ce-id", "0001") + .with_header("ce-type", "example.test") + .with_header("ce-source", "http://localhost") + .with_header("ce-someint", "10") + .with_header("ce-time", &time.to_rfc3339()) + .create(); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + //TODO this is required now because the message deserializer implictly set default values + // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41) + .time(time) + .source("http://localhost") + .extension("someint", "10") + .build() + .unwrap(); + + let client = reqwest::Client::new(); + let res = client + .get(&url) + .send() + .await + .unwrap() + .into_event() + .await + .unwrap(); + + assert_eq!(expected, res); + } + + #[tokio::test] + async fn test_response_with_full_data() { + let time = Utc::now(); + let j = json!({"hello": "world"}); + + let url = mockito::server_url(); + let _m = mock("GET", "/") + .with_status(200) + .with_header("ce-specversion", "1.0") + .with_header("ce-id", "0001") + .with_header("ce-type", "example.test") + .with_header("ce-source", "http://localhost/") + .with_header("content-type", "application/json") + .with_header("ce-someint", "10") + .with_header("ce-time", &time.to_rfc3339()) + .with_body(j.to_string()) + .create(); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + //TODO this is required now because the message deserializer implictly set default values + // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41) + .time(time) + .source("http://localhost/") + .data("application/json", j.to_string().into_bytes()) + .extension("someint", "10") + .build() + .unwrap(); + + let client = reqwest::Client::new(); + let res = client + .get(&url) + .send() + .await + .unwrap() + .into_event() + .await + .unwrap(); + + assert_eq!(expected, res); + } + + #[tokio::test] + async fn test_structured_response_with_full_data() { + let time = Utc::now(); + + let j = json!({"hello": "world"}); + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + //TODO this is required now because the message deserializer implictly set default values + // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41) + .time(time) + .source("http://localhost") + .data("application/json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let url = mockito::server_url(); + let _m = mock("GET", "/") + .with_status(200) + .with_header( + "content-type", + "application/cloudevents+json; charset=utf-8", + ) + .with_body(serde_json::to_string(&expected).unwrap()) + .create(); + + let client = reqwest::Client::new(); + let res = client + .get(&url) + .send() + .await + .unwrap() + .into_event() + .await + .unwrap(); + + assert_eq!(expected, res); + } +} diff --git a/src/reqwest/headers.rs b/src/reqwest/headers.rs new file mode 100644 index 00000000..4cb98d6a --- /dev/null +++ b/src/reqwest/headers.rs @@ -0,0 +1,61 @@ +use crate::event::SpecVersion; +use lazy_static::lazy_static; +use reqwest::header::{HeaderName, HeaderValue}; +use std::collections::HashMap; +use std::str::FromStr; + +macro_rules! unwrap_optional_header { + ($headers:expr, $name:expr) => { + $headers + .get::<&'static reqwest::header::HeaderName>(&$name) + .map(|a| header_value_to_str!(a)) + }; +} + +macro_rules! header_value_to_str { + ($header_value:expr) => { + $header_value + .to_str() + .map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + }; +} + +macro_rules! str_name_to_header { + ($attribute:expr) => { + reqwest::header::HeaderName::from_str($attribute).map_err(|e| { + crate::message::Error::Other { + source: Box::new(e), + } + }) + }; +} + +macro_rules! attribute_name_to_header { + ($attribute:expr) => { + str_name_to_header!(&["ce-", $attribute].concat()) + }; +} + +fn attributes_to_headers( + it: impl Iterator, +) -> HashMap<&'static str, HeaderName> { + it.map(|s| { + if s == "datacontenttype" { + (s, reqwest::header::CONTENT_TYPE) + } else { + (s, attribute_name_to_header!(s).unwrap()) + } + }) + .collect() +} + +lazy_static! { + pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> = + attributes_to_headers(SpecVersion::all_attribute_names()); + pub(crate) static ref SPEC_VERSION_HEADER: HeaderName = + HeaderName::from_static("ce-specversion"); + pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue = + HeaderValue::from_static("application/cloudevents+json"); +} diff --git a/src/reqwest/mod.rs b/src/reqwest/mod.rs new file mode 100644 index 00000000..544218fe --- /dev/null +++ b/src/reqwest/mod.rs @@ -0,0 +1,42 @@ +//! This module integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [reqwest](https://docs.rs/reqwest/) to easily send and receive CloudEvents. +//! +//! ``` +//! use cloudevents::reqwest::{RequestBuilderExt, ResponseExt}; +//! use cloudevents::{EventBuilderV10, EventBuilder}; +//! use serde_json::json; +//! +//! # async fn example() -> Result<(), Box> { +//! let client = reqwest::Client::new(); +//! +//! // Prepare the event to send +//! let event_to_send = EventBuilderV10::new() +//! .id("0001") +//! .ty("example.test") +//! .source("http://localhost/") +//! .data("application/json", json!({"hello": "world"})) +//! .build()?; +//! +//! // Send request +//! let response = client.post("http://localhost") +//! .event(event_to_send)? +//! .send().await?; +//! // Parse response as event +//! let received_event = response +//! .into_event().await?; +//! # Ok(()) +//! # } +//! ``` + +#![deny(broken_intra_doc_links)] + +#[macro_use] +mod headers; +mod client_request; +mod client_response; + +pub use client_request::event_to_request; +pub use client_request::RequestBuilderExt; +pub use client_request::RequestSerializer; +pub use client_response::response_to_event; +pub use client_response::ResponseDeserializer; +pub use client_response::ResponseExt; diff --git a/src/warp/filter.rs b/src/warp/filter.rs new file mode 100644 index 00000000..3a9c51d1 --- /dev/null +++ b/src/warp/filter.rs @@ -0,0 +1,134 @@ +use super::server_request::request_to_event; + +use crate::Event; +use warp::http::HeaderMap; +use warp::Filter; +use warp::Rejection; + +#[derive(Debug)] +pub struct EventFilterError { + error: crate::message::Error, +} + +impl warp::reject::Reject for EventFilterError {} + +/// +/// # Extracts [`crate::Event`] from incoming request +/// +/// ``` +/// use cloudevents::warp::filter::to_event; +/// use warp::Filter; +/// use warp::Reply; +/// +/// let routes = warp::any() +/// .and(to_event()) +/// .map(|event| { +/// // do something with the event +/// } +/// ); +/// ``` +/// +pub fn to_event() -> impl Filter + Copy { + warp::header::headers_cloned() + .and(warp::body::bytes()) + .and_then(create_event) +} + +async fn create_event(headers: HeaderMap, body: bytes::Bytes) -> Result { + request_to_event(headers, body) + .map_err(|error| warp::reject::custom(EventFilterError { error })) +} + +#[cfg(test)] +mod tests { + use super::to_event; + use warp::test; + + use crate::{EventBuilder, EventBuilderV10}; + use chrono::Utc; + use serde_json::json; + + #[tokio::test] + async fn test_request() { + let time = Utc::now(); + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost/") + .time(time) + .extension("someint", "10") + .build() + .unwrap(); + + let result = test::request() + .method("POST") + .header("ce-specversion", "1.0") + .header("ce-id", "0001") + .header("ce-type", "example.test") + .header("ce-source", "http://localhost/") + .header("ce-someint", "10") + .header("ce-time", time.to_rfc3339()) + .filter(&to_event()) + .await + .unwrap(); + + assert_eq!(expected, result); + } + + #[tokio::test] + async fn test_bad_request() { + let time = Utc::now(); + + let result = test::request() + .method("POST") + .header("ce-specversion", "BAD SPECIFICATION") + .header("ce-id", "0001") + .header("ce-type", "example.test") + .header("ce-source", "http://localhost/") + .header("ce-someint", "10") + .header("ce-time", time.to_rfc3339()) + .filter(&to_event()) + .await; + + assert!(result.is_err()); + let rejection = result.unwrap_err(); + + let reason = rejection.find::().unwrap(); + assert_eq!( + reason.error.to_string(), + "Invalid specversion BAD SPECIFICATION" + ) + } + + #[tokio::test] + async fn test_request_with_full_data() { + let time = Utc::now(); + let j = json!({"hello": "world"}); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .time(time) + .data("application/json", j.to_string().into_bytes()) + .extension("someint", "10") + .build() + .unwrap(); + + let result = test::request() + .method("POST") + .header("ce-specversion", "1.0") + .header("ce-id", "0001") + .header("ce-type", "example.test") + .header("ce-source", "http://localhost") + .header("ce-someint", "10") + .header("ce-time", time.to_rfc3339()) + .header("content-type", "application/json") + .json(&j) + .filter(&to_event()) + .await + .unwrap(); + + assert_eq!(expected, result); + } +} diff --git a/src/warp/headers.rs b/src/warp/headers.rs new file mode 100644 index 00000000..ea5c9981 --- /dev/null +++ b/src/warp/headers.rs @@ -0,0 +1,61 @@ +use crate::event::SpecVersion; +use http::header::HeaderName; +use lazy_static::lazy_static; +use warp::http::HeaderValue; + +use std::collections::HashMap; +use std::str::FromStr; + +macro_rules! unwrap_optional_header { + ($headers:expr, $name:expr) => { + $headers + .get::<&'static HeaderName>(&$name) + .map(|a| header_value_to_str!(a)) + }; +} + +macro_rules! header_value_to_str { + ($header_value:expr) => { + $header_value + .to_str() + .map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + }; +} + +macro_rules! str_name_to_header { + ($attribute:expr) => { + HeaderName::from_str($attribute).map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + }; +} + +macro_rules! attribute_name_to_header { + ($attribute:expr) => { + str_name_to_header!(&["ce-", $attribute].concat()) + }; +} + +fn attributes_to_headers( + it: impl Iterator, +) -> HashMap<&'static str, HeaderName> { + it.map(|s| { + if s == "datacontenttype" { + (s, http::header::CONTENT_TYPE) + } else { + (s, attribute_name_to_header!(s).unwrap()) + } + }) + .collect() +} + +lazy_static! { + pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> = + attributes_to_headers(SpecVersion::all_attribute_names()); + pub(crate) static ref SPEC_VERSION_HEADER: HeaderName = + HeaderName::from_static("ce-specversion"); + pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue = + HeaderValue::from_static("application/cloudevents+json"); +} diff --git a/src/warp/mod.rs b/src/warp/mod.rs new file mode 100644 index 00000000..64867e5a --- /dev/null +++ b/src/warp/mod.rs @@ -0,0 +1,68 @@ +//! This module integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [Warp web service framework](https://docs.rs/warp/) +//! to easily send and receive CloudEvents. +//! +//! To deserialize an HTTP request as CloudEvent +//! +//! To echo events: +//! +//! ``` +//! use warp::{Filter, Reply}; +//! use cloudevents::warp::reply::from_event; +//! use cloudevents::warp::filter::to_event; +//! +//! let routes = warp::any() +//! // extracting event from request +//! .and(to_event()) +//! // returning event back +//! .map(|event| from_event(event)); +//! +//! warp::serve(routes).run(([127, 0, 0, 1], 3030)); +//! ``` +//! +//! To create event inside request handlers and send them as responses: +//! +//! ``` +//! use cloudevents::{Event, EventBuilder, EventBuilderV10}; +//! use http::StatusCode; +//! use serde_json::json; +//! use warp::{Filter, Reply}; +//! use cloudevents::warp::reply::from_event; +//! +//! let routes = warp::any().map(|| { +//! let event = EventBuilderV10::new() +//! .id("1") +//! .source("url://example_response/") +//! .ty("example.ce") +//! .data( +//! mime::APPLICATION_JSON.to_string(), +//! json!({ +//! "name": "John Doe", +//! "age": 43, +//! "phones": [ +//! "+44 1234567", +//! "+44 2345678" +//! ] +//! }), +//! ) +//! .build(); +//! +//! match event { +//! Ok(event) => Ok(from_event(event)), +//! Err(e) => Ok(warp::reply::with_status( +//! e.to_string(), +//! StatusCode::INTERNAL_SERVER_ERROR, +//! ) +//! .into_response()), +//! } +//! }); +//! warp::serve(routes).run(([127, 0, 0, 1], 3030)); +//! ``` + +#[macro_use] +mod headers; + +mod server_request; +mod server_response; + +pub mod filter; +pub mod reply; diff --git a/src/warp/reply.rs b/src/warp/reply.rs new file mode 100644 index 00000000..4f91abfb --- /dev/null +++ b/src/warp/reply.rs @@ -0,0 +1,126 @@ +use super::server_response::event_to_response; + +use crate::Event; +use http::StatusCode; +use warp::reply::Response; + +/// +/// # Serializes [`crate::Event`] as a http response +/// +/// ``` +/// use cloudevents::warp::reply::from_event; +/// use cloudevents::Event; +/// use warp::Filter; +/// use warp::Reply; +/// +/// let routes = warp::any() +/// .map(|| from_event(Event::default())); +/// ``` +pub fn from_event(event: Event) -> Response { + match event_to_response(event) { + Ok(response) => response, + Err(e) => warp::http::response::Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(hyper::body::Body::from(e.to_string())) + .unwrap(), + } +} + +#[cfg(test)] +mod tests { + + use crate::{EventBuilder, EventBuilderV10}; + use serde_json::json; + + #[test] + fn test_response() { + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost/") + .extension("someint", "10") + .build() + .unwrap(); + + let resp = super::from_event(input); + + assert_eq!( + resp.headers() + .get("ce-specversion") + .unwrap() + .to_str() + .unwrap(), + "1.0" + ); + assert_eq!( + resp.headers().get("ce-id").unwrap().to_str().unwrap(), + "0001" + ); + assert_eq!( + resp.headers().get("ce-type").unwrap().to_str().unwrap(), + "example.test" + ); + assert_eq!( + resp.headers().get("ce-source").unwrap().to_str().unwrap(), + "http://localhost/" + ); + assert_eq!( + resp.headers().get("ce-someint").unwrap().to_str().unwrap(), + "10" + ); + } + + #[tokio::test] + async fn test_response_with_full_data() { + let j = json!({"hello": "world"}); + + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .data("application/json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let resp = super::from_event(input); + + assert_eq!( + resp.headers() + .get("ce-specversion") + .unwrap() + .to_str() + .unwrap(), + "1.0" + ); + assert_eq!( + resp.headers().get("ce-id").unwrap().to_str().unwrap(), + "0001" + ); + assert_eq!( + resp.headers().get("ce-type").unwrap().to_str().unwrap(), + "example.test" + ); + assert_eq!( + resp.headers().get("ce-source").unwrap().to_str().unwrap(), + "http://localhost" + ); + assert_eq!( + resp.headers() + .get("content-type") + .unwrap() + .to_str() + .unwrap(), + "application/json" + ); + assert_eq!( + resp.headers().get("ce-someint").unwrap().to_str().unwrap(), + "10" + ); + + let (_, body) = resp.into_parts(); + let body = hyper::body::to_bytes(body).await.unwrap(); + + assert_eq!(j.to_string().as_bytes(), body); + } +} diff --git a/src/warp/server_request.rs b/src/warp/server_request.rs new file mode 100644 index 00000000..c1c7a471 --- /dev/null +++ b/src/warp/server_request.rs @@ -0,0 +1,107 @@ +use super::headers; +use bytes::Bytes; +use http::{header::HeaderName, HeaderMap}; + +use crate::event::SpecVersion; +use crate::message::{ + BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue, + MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, +}; + +use crate::{message, Event}; +use std::convert::TryFrom; + +pub struct RequestDeserializer { + headers: HeaderMap, + body: Bytes, +} + +impl RequestDeserializer { + pub fn new(headers: HeaderMap, body: Bytes) -> RequestDeserializer { + RequestDeserializer { headers, body } + } +} + +impl BinaryDeserializer for RequestDeserializer { + fn deserialize_binary>(self, mut visitor: V) -> Result { + if self.encoding() != Encoding::BINARY { + return Err(message::Error::WrongEncoding {}); + } + + let spec_version = SpecVersion::try_from( + unwrap_optional_header!(self.headers, headers::SPEC_VERSION_HEADER).unwrap()?, + )?; + + visitor = visitor.set_spec_version(spec_version.clone())?; + + let attributes = spec_version.attribute_names(); + + for (hn, hv) in self + .headers + .iter() + .filter(|(hn, _)| headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")) + { + let name = &hn.as_str()["ce-".len()..]; + + if attributes.contains(&name) { + visitor = visitor.set_attribute( + name, + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } else { + visitor = visitor.set_extension( + name, + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } + } + + if let Some(hv) = self.headers.get("content-type") { + visitor = visitor.set_attribute( + "datacontenttype", + MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)), + )? + } + + if self.body.len() != 0 { + visitor.end_with_data(self.body.to_vec()) + } else { + visitor.end() + } + } +} + +impl StructuredDeserializer for RequestDeserializer { + fn deserialize_structured>(self, visitor: V) -> Result { + if self.encoding() != Encoding::STRUCTURED { + return Err(message::Error::WrongEncoding {}); + } + visitor.set_structured_event(self.body.to_vec()) + } +} + +impl MessageDeserializer for RequestDeserializer { + fn encoding(&self) -> Encoding { + if self + .headers + .get("content-type") + .map(|v| v.to_str().unwrap_or("")) + .unwrap_or("") + == "application/cloudevents+json" + { + Encoding::STRUCTURED + } else if self + .headers + .get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER) + .is_some() + { + Encoding::BINARY + } else { + Encoding::UNKNOWN + } + } +} + +pub fn request_to_event(req: HeaderMap, bytes: bytes::Bytes) -> std::result::Result { + MessageDeserializer::into_event(RequestDeserializer::new(req, bytes)) +} diff --git a/src/warp/server_response.rs b/src/warp/server_response.rs new file mode 100644 index 00000000..edcfd7f4 --- /dev/null +++ b/src/warp/server_response.rs @@ -0,0 +1,102 @@ +use super::headers; + +use crate::event::SpecVersion; +use crate::message::{ + BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result, + StructuredSerializer, +}; +use crate::Event; + +use warp::http::HeaderValue; +use warp::hyper::Body; +use warp::reply::Response; + +use http::header::HeaderName; +use http::response::Builder; + +use std::{convert::TryFrom, str::FromStr}; + +pub struct ResponseSerializer { + builder: Builder, +} + +impl ResponseSerializer { + fn new() -> Self { + ResponseSerializer { + builder: http::Response::builder(), + } + } +} + +impl BinarySerializer for ResponseSerializer { + fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { + self.builder = self.builder.header( + headers::SPEC_VERSION_HEADER.clone(), + HeaderValue::try_from(spec_version.to_string().as_str()).map_err(|e| { + crate::message::Error::Other { + source: Box::new(e), + } + })?, + ); + Ok(self) + } + + fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { + self.builder = self.builder.header( + headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(), + HeaderValue::try_from(value.to_string().as_str()).map_err(|e| { + crate::message::Error::Other { + source: Box::new(e), + } + })?, + ); + Ok(self) + } + + fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { + self.builder = self.builder.header( + attribute_name_to_header!(name)?, + HeaderValue::try_from(value.to_string().as_str()).map_err(|e| { + crate::message::Error::Other { + source: Box::new(e), + } + })?, + ); + Ok(self) + } + + fn end_with_data(self, bytes: Vec) -> Result { + self.builder + .body(Body::from(bytes)) + .map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + } + + fn end(self) -> Result { + self.builder + .body(Body::empty()) + .map_err(|e| crate::message::Error::Other { + source: Box::new(e), + }) + } +} + +impl StructuredSerializer for ResponseSerializer { + fn set_structured_event(self, bytes: Vec) -> Result { + Ok(self + .builder + .header( + http::header::CONTENT_TYPE, + headers::CLOUDEVENTS_JSON_HEADER.clone(), + ) + .body(Body::from(bytes)) + .map_err(|e| crate::message::Error::Other { + source: Box::new(e), + })?) + } +} + +pub fn event_to_response(event: Event) -> std::result::Result { + BinaryDeserializer::deserialize_binary(event, ResponseSerializer::new()) +}