11use crate :: binding:: http:: { to_event, Headers } ;
22use crate :: Event ;
3+ use actix_web:: dev:: Payload ;
34use actix_web:: web:: BytesMut ;
45use actix_web:: { web, HttpRequest } ;
56use async_trait:: async_trait;
6- use futures:: future:: LocalBoxFuture ;
7- use futures:: { FutureExt , StreamExt } ;
7+ use futures:: { future:: LocalBoxFuture , FutureExt , StreamExt } ;
88use http:: header:: { AsHeaderName , HeaderName , HeaderValue } ;
99
1010/// Implement Headers for the actix HeaderMap
11- impl < ' a > Headers < ' a > for actix_web :: http :: HeaderMap {
11+ impl < ' a > Headers < ' a > for actix_http :: header :: HeaderMap {
1212 type Iterator = Box < dyn Iterator < Item = ( & ' a HeaderName , & ' a HeaderValue ) > + ' a > ;
1313 fn get < K : AsHeaderName > ( & self , key : K ) -> Option < & HeaderValue > {
1414 self . get ( key. as_str ( ) )
@@ -32,14 +32,18 @@ pub async fn request_to_event(
3232
3333/// So that an actix-web handler may take an Event parameter
3434impl actix_web:: FromRequest for Event {
35- type Config = ( ) ;
3635 type Error = actix_web:: Error ;
3736 type Future = LocalBoxFuture < ' static , std:: result:: Result < Self , Self :: Error > > ;
3837
39- fn from_request ( r : & HttpRequest , p : & mut actix_web:: dev:: Payload ) -> Self :: Future {
40- let payload = web:: Payload ( p. take ( ) ) ;
38+ fn from_request ( r : & HttpRequest , p : & mut Payload ) -> Self :: Future {
4139 let request = r. to_owned ( ) ;
42- async move { request_to_event ( & request, payload) . await } . boxed_local ( )
40+ bytes:: Bytes :: from_request ( & request, p)
41+ . map ( move |bytes| match bytes {
42+ Ok ( b) => to_event ( request. headers ( ) , b. to_vec ( ) )
43+ . map_err ( actix_web:: error:: ErrorBadRequest ) ,
44+ Err ( e) => Err ( e) ,
45+ } )
46+ . boxed_local ( )
4347 }
4448}
4549
@@ -74,46 +78,52 @@ mod private {
7478#[ cfg( test) ]
7579mod tests {
7680 use super :: * ;
77- use actix_web:: test;
81+ use actix_web:: { test, FromRequest } ;
7882
7983 use crate :: test:: fixtures;
8084 use serde_json:: json;
85+
86+ async fn to_event ( req : & HttpRequest , mut payload : Payload ) -> Event {
87+ web:: Payload :: from_request ( & req, & mut payload)
88+ . then ( |p| req. to_event ( p. unwrap ( ) ) )
89+ . await
90+ . unwrap ( )
91+ }
92+
8193 #[ actix_rt:: test]
8294 async fn test_request ( ) {
8395 let expected = fixtures:: v10:: minimal_string_extension ( ) ;
8496
8597 let ( req, payload) = test:: TestRequest :: post ( )
86- . header ( "ce-specversion" , "1.0" )
87- . header ( "ce-id" , "0001" )
88- . header ( "ce-type" , "test_event.test_application" )
89- . header ( "ce-source" , "http://localhost/" )
90- . header ( "ce-someint" , "10" )
98+ . insert_header ( ( "ce-specversion" , "1.0" ) )
99+ . insert_header ( ( "ce-id" , "0001" ) )
100+ . insert_header ( ( "ce-type" , "test_event.test_application" ) )
101+ . insert_header ( ( "ce-source" , "http://localhost/" ) )
102+ . insert_header ( ( "ce-someint" , "10" ) )
91103 . to_http_parts ( ) ;
92104
93- let resp = req. to_event ( web:: Payload ( payload) ) . await . unwrap ( ) ;
94- assert_eq ! ( expected, resp) ;
105+ assert_eq ! ( expected, to_event( & req, payload) . await ) ;
95106 }
96107
97108 #[ actix_rt:: test]
98109 async fn test_request_with_full_data ( ) {
99110 let expected = fixtures:: v10:: full_binary_json_data_string_extension ( ) ;
100111
101112 let ( req, payload) = test:: TestRequest :: post ( )
102- . header ( "ce-specversion" , "1.0" )
103- . header ( "ce-id" , "0001" )
104- . header ( "ce-type" , "test_event.test_application" )
105- . header ( "ce-subject" , "cloudevents-sdk" )
106- . header ( "ce-source" , "http://localhost/" )
107- . header ( "ce-time" , fixtures:: time ( ) . to_rfc3339 ( ) )
108- . header ( "ce-string_ex" , "val" )
109- . header ( "ce-int_ex" , "10" )
110- . header ( "ce-bool_ex" , "true" )
111- . header ( "content-type" , "application/json" )
113+ . insert_header ( ( "ce-specversion" , "1.0" ) )
114+ . insert_header ( ( "ce-id" , "0001" ) )
115+ . insert_header ( ( "ce-type" , "test_event.test_application" ) )
116+ . insert_header ( ( "ce-subject" , "cloudevents-sdk" ) )
117+ . insert_header ( ( "ce-source" , "http://localhost/" ) )
118+ . insert_header ( ( "ce-time" , fixtures:: time ( ) . to_rfc3339 ( ) ) )
119+ . insert_header ( ( "ce-string_ex" , "val" ) )
120+ . insert_header ( ( "ce-int_ex" , "10" ) )
121+ . insert_header ( ( "ce-bool_ex" , "true" ) )
122+ . insert_header ( ( "content-type" , "application/json" ) )
112123 . set_json ( & fixtures:: json_data ( ) )
113124 . to_http_parts ( ) ;
114125
115- let resp = req. to_event ( web:: Payload ( payload) ) . await . unwrap ( ) ;
116- assert_eq ! ( expected, resp) ;
126+ assert_eq ! ( expected, to_event( & req, payload) . await ) ;
117127 }
118128
119129 #[ actix_rt:: test]
@@ -136,11 +146,10 @@ mod tests {
136146 let expected = fixtures:: v10:: full_json_data_string_extension ( ) ;
137147
138148 let ( req, payload) = test:: TestRequest :: post ( )
139- . header ( "content-type" , "application/cloudevents+json" )
149+ . insert_header ( ( "content-type" , "application/cloudevents+json" ) )
140150 . set_payload ( bytes)
141151 . to_http_parts ( ) ;
142152
143- let resp = req. to_event ( web:: Payload ( payload) ) . await . unwrap ( ) ;
144- assert_eq ! ( expected, resp) ;
153+ assert_eq ! ( expected, to_event( & req, payload) . await ) ;
145154 }
146155}
0 commit comments