S2PPKG7OUEVI6UPTGRAQEOZWSS7ZYDNO6NL4A5UH4VV2IFXL37EAC [package]name = "mer_derive"version = "0.1.0"authors = ["Paul Volavsek <paul.volavsek@gmail.com>"]edition = "2018"[features]default = ["std"]std = [][dependencies]syn = "1.0"quote = "1.0"proc-macro2 = "1.0"darling = "0.12"log = { version = "0.4", default-features = false }[dev-dependencies]mer = { path = "../mer" }serde = "1.0"[lib]proc-macro = true[[test]]name = "test"path = "test/tests.rs"
use darling::FromMeta;use proc_macro2::TokenStream;use quote::{format_ident, quote};#[derive(Debug, FromMeta)]pub struct AttrArgs {#[darling(default)]pub target: Option<syn::Path>,}pub fn expand_trait(args: &AttrArgs, input: &syn::ItemTrait) -> Result<TokenStream, Vec<syn::Error>> {let trait_name = &input.ident;let trait_generics = &input.generics;let service_name = args.target.as_ref().unwrap();let where_clause = &trait_generics.where_clause;let items = &input.items;let item_methods: Vec<syn::TraitItemMethod> = items.iter().filter_map(|i| match i {syn::TraitItem::Method(m) => {let mut method = m.clone();method.default.take();Some(method)}_ => None,}).collect();let mut impl_generic_def = trait_generics.clone();impl_generic_def.params.insert(0, syn::parse_quote! { __B: mer::interfaces::Backend<'__a> });impl_generic_def.params.insert(0, syn::parse_quote! { '__a });let mut impl_generics = trait_generics.clone();impl_generics.params.insert(0, syn::parse_quote! { __B });impl_generics.params.insert(0, syn::parse_quote! { '__a });let receiver_impl_items: Vec<TokenStream> = item_methods.iter().map(|i| {let item_name = format_ident!("{}", &i.sig.ident);let mut has_self: bool = false;let arguments: Vec<&Box<syn::Type>> = i.sig.inputs.iter().filter_map(|a| match a {syn::FnArg::Typed(t) => Some(&t.ty),syn::FnArg::Receiver(_) => {has_self = true;None}}).collect();let index = (0..arguments.len()).map(syn::Index::from);let deser = quote! {let deser_payload = __B::deserialize::<(#( #arguments ),*)>(call.payload)?;};let reply = match has_self {true => quote! {let reply = <Self as #trait_name #impl_generics>::#item_name(self, #( deser_payload.#index),*);},false => quote! {let reply = <Self as #trait_name #impl_generics>::#item_name(#( deser_payload.#index),*);},};let ser = quote! {let ser_reply = __B::serialize(&reply)?;Ok(mer::Reply { payload: ser_reply })};quote! {stringify!(#item_name) => {log::debug!("frontend procedure receiving: {}", stringify!(#item_name));#deser#reply#ser}}}).collect();let caller_impl_items: Vec<TokenStream> = item_methods.iter().map(|i| {let item_name = &i.sig.ident;let mut has_self = false;let arguments: Vec<&syn::Ident> = i.sig.inputs.iter().filter_map(|a| match a {syn::FnArg::Typed(t) => {if let syn::Pat::Ident(ident) = &*t.pat {Some(&ident.ident)} else {None}}syn::FnArg::Receiver(_) => {has_self = true;None}}).collect();let mut signature = i.sig.inputs.clone();if !has_self {signature.insert(0, syn::parse_quote! { &self })}let old_return_type: syn::Type = match &i.sig.output {syn::ReturnType::Default => syn::parse_quote! { () },syn::ReturnType::Type(_, t) => syn::parse_quote! { #t },};let return_type: syn::ReturnType = syn::parse_quote! { -> Result<#old_return_type, mer::frontends::derive::Error<__B::Error>> };quote! {pub fn #item_name(#signature) #return_type {log::debug!("frontend procedure calling: {}", stringify!(#item_name));let ser_payload = __B::serialize(&(#( #arguments ),*))?;let reply = self.__call.as_ref().unwrap()(&mer::Call {procedure: stringify!(#item_name).to_string(),payload: &ser_payload,})?.payload;let deser_reply = __B::deserialize::<#old_return_type>(&reply);Ok(deser_reply?)}}}).collect();Ok(quote! {trait #trait_name #impl_generic_def #where_clause {#( #item_methods )*}impl #impl_generic_def #trait_name #impl_generics for #service_name #impl_generics #where_clause {#( #items )*}impl #impl_generic_def #service_name #impl_generics #where_clause {fn __receive(&self, call: &mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, mer::frontends::derive::Error<__B::Error>> {match call.procedure.as_str() {#( #receiver_impl_items ),*_ => Err(mer::frontends::derive::Error::UnknownProcedure {}),}}}impl #impl_generic_def #service_name #impl_generics #where_clause {#( #caller_impl_items )*}})}pub fn expand_struct(input: &syn::ItemStruct) -> Result<TokenStream, Vec<syn::Error>> {let struct_name = &input.ident;let struct_name_init = format_ident!("{}Init", &input.ident);let struct_generics = &input.generics;let where_clause = &struct_generics.where_clause;let fields = match &input.fields {syn::Fields::Named(named) => {let fields: Vec<&syn::Field> = named.named.iter().collect();quote! { #( #fields ),* }}rest => quote! { #rest },};let field_names = match &input.fields {syn::Fields::Named(named) => named.named.iter().filter_map(|f| f.ident.clone()).map(|f| quote! { #f: self.#f }).collect(),_ => vec![],};let mut impl_generic_def = struct_generics.clone();impl_generic_def.params.insert(0, syn::parse_quote! { __B: mer::interfaces::Backend<'__a> });impl_generic_def.params.insert(0, syn::parse_quote! { '__a });let mut impl_generics = struct_generics.clone();impl_generics.params.insert(0, syn::parse_quote! { __B });impl_generics.params.insert(0, syn::parse_quote! { '__a });Ok(quote! {struct #struct_name #impl_generic_def #where_clause {#fields,__call: Option<Box<dyn Fn(&mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, __B::Error> + '__a + Send>>}#[derive(Default)]struct #struct_name_init #struct_generics #where_clause {#fields,}impl #struct_generics #struct_name_init #struct_generics #where_clause {pub fn init<'__a, __B: mer::interfaces::Backend<'__a>>(self) -> #struct_name #impl_generics {#struct_name {#( #field_names ),*,__call: None}}}impl #impl_generic_def mer::interfaces::Frontend<'__a, __B> for #struct_name #impl_generics #where_clause {type Intermediate = String;type Error = mer::frontends::derive::Error<__B::Error>;fn caller<__T>(&mut self, caller: __T) -> Result<(), mer::frontends::derive::Error<__B::Error>>where__T: Fn(&mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, __B::Error> + '__a + Send,__T: 'static,{self.__call = Some(Box::new(caller));Ok(())}fn receive(&self, call: &mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, mer::frontends::derive::Error<__B::Error>> {log::debug!("receiving: Call {{ prodecure: {:?}, payload: ... }}", &call.procedure);self.__receive(call).map_err(core::convert::Into::into)}}})}
use darling::FromMeta;use proc_macro::TokenStream;use quote::quote;use syn::{parse_macro_input, AttributeArgs, ItemStruct, ItemTrait};#[macro_use]mod frontend;fn to_compile_errors(errors: Vec<syn::Error>) -> proc_macro2::TokenStream {let compile_errors = errors.iter().map(syn::Error::to_compile_error);quote!(#(#compile_errors)*)}#[proc_macro_attribute]pub fn frontend(args: TokenStream, input: TokenStream) -> TokenStream {let args = parse_macro_input!(args as AttributeArgs);let args_parsed = match frontend::AttrArgs::from_list(&args) {Ok(v) => v,Err(e) => {return TokenStream::from(e.write_errors());}};if args_parsed.target.is_some() {let input = parse_macro_input!(input as ItemTrait);frontend::expand_trait(&args_parsed, &input).unwrap_or_else(to_compile_errors).into()} else {let input = parse_macro_input!(input as ItemStruct);frontend::expand_struct(&input).unwrap_or_else(to_compile_errors).into()}}
use mer_derive::*;#[test]fn test() {#[frontend]struct Data<T>whereT: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,{pub offset: T,}#[frontend(target = "Data")]trait Service<T>whereT: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,{fn add(a: T, b: T) -> T::Output {a + b}fn add_with_offset(&self, a: T, b: T) -> T::Output {a + b + self.offset}}}
use backends::InProcessChannel;use mer::*;use std::net::{IpAddr, Ipv4Addr, SocketAddr};fn add(a: i32, b: i32) -> i32 {a + b}#[test]fn register_http() {let register_caller = frontends::RegisterInit {}.init();let register_receiver = frontends::RegisterInit {}.init();register_caller.register("add", |(a, b)| add(a, b)).unwrap();register_receiver.register("add", |(a, b)| add(a, b)).unwrap();let mer_caller = MerInit {backend: backends::HttpInit {speak: "http://localhost:8080".parse::<hyper::Uri>().unwrap().into(),listen: None,..Default::default()}.init().unwrap(),frontend: register_caller,}.init();let mut mer_receiver = MerInit {backend: backends::HttpInit {speak: None,listen: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080).into(),..Default::default()}.init().unwrap(),frontend: register_receiver,}.init();mer_receiver.start().unwrap();let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);let result: i32 = mer_caller.frontend(|f| f.call("add", &(a, b)).unwrap()).unwrap();assert_eq!(result, a + b);}#[test]fn register_in_process() {use std::sync::mpsc;use std::sync::mpsc::{Receiver, Sender};let register_caller = frontends::RegisterInit {}.init();let register_receiver = frontends::RegisterInit {}.init();register_caller.register("add", |(a, b)| add(a, b)).unwrap();register_receiver.register("add", |(a, b)| add(a, b)).unwrap();let (to, from): (Sender<InProcessChannel>, Receiver<InProcessChannel>) = mpsc::channel();let mer_caller = MerInit {backend: backends::InProcessInit { to: to.into(), ..Default::default() }.init().unwrap(),frontend: register_caller,}.init();let mut mer_receiver = MerInit {backend: backends::InProcessInit {from: from.into(),..Default::default()}.init().unwrap(),frontend: register_receiver,}.init();mer_receiver.start().unwrap();let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);let result: i32 = mer_caller.frontend(|f| f.call("add", &(a, b)).unwrap()).unwrap();assert_eq!(result, a + b);}#[test]fn derive_http() {#[mer_derive::frontend()]struct Data<T>whereT: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,{pub offset: T,}#[mer_derive::frontend(target = "Data")]trait Receiver<T>whereT: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,{fn add(a: T, b: T) -> T::Output {a + b}fn add_with_offset(&self, a: T, b: T) -> T::Output {a + b + self.offset}}let mer_call = MerInit {backend: backends::HttpInit {speak: "http://localhost:8081".parse::<hyper::Uri>().unwrap().into(),listen: None,..Default::default()}.init().unwrap(),frontend: DataInit::<i32> { offset: 32 }.init(),}.init();let mut mer_receive = MerInit {backend: backends::HttpInit {speak: None,listen: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081).into(),..Default::default()}.init().unwrap(),frontend: DataInit::<i32> { offset: 32 }.init(),}.init();mer_receive.start().unwrap();let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);assert_eq!(mer_call.frontend(|f| {println!("start");let tmp = f.add(a, b).unwrap();tmp}).unwrap(),a + b);}#[test]fn derive_in_process() {use std::sync::mpsc;#[mer_derive::frontend()]struct Data<T>whereT: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,{pub offset: T,}#[mer_derive::frontend(target = "Data")]trait Receiver<T>whereT: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,{fn add(a: T, b: T) -> T::Output {a + b}fn add_with_offset(&self, a: T, b: T) -> T::Output {a + b + self.offset}}let (to, from): (mpsc::Sender<InProcessChannel>, mpsc::Receiver<InProcessChannel>) = mpsc::channel();let mer_caller = MerInit {backend: backends::InProcessInit { to: to.into(), ..Default::default() }.init().unwrap(),frontend: DataInit::<i32> { offset: 32 }.init(),}.init();let mut mer_register = MerInit {backend: backends::InProcessInit {from: from.into(),..Default::default()}.init().unwrap(),frontend: DataInit::<i32> { offset: 32 }.init(),}.init();mer_register.start().unwrap();let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);assert_eq!(mer_caller.frontend(|f| { f.add(a, b).unwrap() }).unwrap(), a + b);}
use crate::interfaces;use snafu::{OptionExt, ResultExt, Snafu};use hyper::{client::{connect::dns::GaiResolver, HttpConnector},Body, Client, Method, Request, StatusCode,};use hyper::http::Uri;use hyper::service::{make_service_fn, service_fn};use hyper::{Response, Server};use std::sync::{Arc, Mutex};use std::{fmt::Debug, net::SocketAddr};use tokio::runtime::Runtime;use tokio::sync;use log::{debug, trace};#[derive(Debug, Snafu)]pub enum Error {Serialize { from: serde_json::Error },Deserialize { from: serde_json::Error },SpeakingDisabled,RequestBuilder { source: hyper::http::Error },ParseResponseBodyBytes { source: hyper::Error },ParseResponseBody { source: std::string::FromUtf8Error },ClientRequest { source: hyper::Error },FailedRequest { status: StatusCode },NoListen,NoReceiver,BindServer { source: hyper::Error },NoProcedureHeader { source: hyper::http::Error },GetCallLockInReceiver,RuntimeCreation { source: std::io::Error },}pub struct Http {client: Client<HttpConnector<GaiResolver>, Body>,speak: Option<Uri>,listen: Option<SocketAddr>,#[allow(clippy::type_complexity)]receiver: Option<Arc<dyn Fn(Arc<Mutex<crate::Call<&String>>>) -> Arc<sync::Mutex<Result<crate::Reply<String>, Error>>> + Send + Sync>>,runtime: Runtime,}impl Debug for Http {fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {f.debug_struct("Http").field("client", &self.client).field("speak", &self.speak).field("listen", &self.listen).field("runtime", &self.runtime).finish()}}pub struct HttpInit {pub client: Client<HttpConnector<GaiResolver>, Body>,pub speak: Option<Uri>,pub listen: Option<SocketAddr>,}impl Default for HttpInit {fn default() -> Self {HttpInit {client: Client::new(),speak: None,listen: None,}}}impl From<HttpInit> for Result<Http, Error> {fn from(from: HttpInit) -> Self {from.init()}}impl HttpInit {pub fn init(self) -> Result<Http, Error> {trace!("HttpInit.init()");let http = Http {client: self.client,speak: self.speak,listen: self.listen,receiver: None,runtime: Runtime::new().context(RuntimeCreation {})?,};debug!("{:?}", &http);Ok(http)}}impl<'a> interfaces::Backend<'a> for Http {type Intermediate = String;type Error = Error;fn start(&mut self) -> Result<(), Self::Error> {trace!("Http.start()");let listen = self.listen.context(NoListen)?;let receiver = Arc::clone(self.receiver.as_ref().context(NoReceiver)?);self.runtime.spawn(async move {trace!("Http.runtime.spawn()");let receiver = receiver.clone();Server::bind(&listen).serve(make_service_fn(move |_| {trace!("Http.runtime.spawn.serve()");let receiver = receiver.clone();async move {Ok::<_, hyper::Error>(service_fn(move |request: Request<Body>| {trace!("Http.runtime.spawn.serve.service_fn()");debug!("{:?}", &listen);let receiver = receiver.clone();async move {let procedure = if let Some(procedure) = request.headers().get("Procedure") {match procedure.to_str() {Ok(procedure) => procedure.to_owned(),Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),}} else {return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from("No Procedure provided"));};let body_bytes = match hyper::body::to_bytes(request.into_body()).await {Ok(body_bytes) => body_bytes,Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),};let body = match String::from_utf8(body_bytes.to_vec()) {Ok(body) => body,Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),};debug!("call Call {{ procedure: {:?}, payload: {:?} }}", &procedure, &body);let reply_mutex = receiver(Arc::new(Mutex::new(crate::Call { procedure, payload: &body })));let reply = &*reply_mutex.lock().await;match reply {Err(e) => Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(format!("{:?}", e))),Ok(reply) => {debug!("reply Reply {{ payload: {:?} }}", &reply.payload);Response::builder().status(StatusCode::OK).body(Body::from(reply.payload.to_owned()))}}}}))}})).await.unwrap();});Ok(())}fn stop(&mut self) -> Result<(), Self::Error> {trace!("Http.stop()");Ok(())}fn receiver<T>(&mut self, receiver: T) -> Result<(), Self::Error>whereT: Fn(&crate::Call<&Self::Intermediate>) -> Result<crate::Reply<Self::Intermediate>, Self::Error> + Send + Sync + 'static,{trace!("Http.receiver()");self.receiver = Some(Arc::new(move |call: Arc<Mutex<crate::Call<&String>>>| {trace!("(Http.receiver)()");let call = match call.as_ref().lock() {Ok(c) => c,Err(_) => return Arc::new(sync::Mutex::new(Err(Error::GetCallLockInReceiver))),};debug!("calling receiver");match receiver(&*call) {Ok(reply) => Arc::new(sync::Mutex::new(Ok(crate::Reply { payload: reply.payload }))),Err(e) => Arc::new(sync::Mutex::new(Err(e))),}}));Ok(())}fn call(&mut self, call: &crate::Call<&Self::Intermediate>) -> Result<crate::Reply<Self::Intermediate>, Self::Error> {trace!("Http.call()");debug!("{:?}", &self.speak);match &self.speak {None => Err(Error::SpeakingDisabled),Some(uri) => self.runtime.block_on(async {let request = Request::builder().method(Method::POST).uri(uri).header("Procedure", &call.procedure).body(Body::from(call.payload.clone())).context(RequestBuilder)?;debug!("request {:?}", &request);let response = self.client.request(request).await.context(ClientRequest)?;debug!("response {:?}", &response);let status = response.status();let body_bytes = hyper::body::to_bytes(response.into_body()).await.context(ParseResponseBodyBytes)?;let body = String::from_utf8(body_bytes.to_vec()).context(ParseResponseBody)?;match status {StatusCode::OK => Ok(crate::Reply { payload: body }),_ => Err(Error::FailedRequest { status }),}}),}}fn serialize<T: serde::Serialize>(from: &T) -> Result<String, Self::Error> {trace!("Http.serialize()");serde_json::to_string(from).map_err(|e| Error::Serialize { from: e })}fn deserialize<'b, T>(from: &'b Self::Intermediate) -> Result<T, Self::Error>whereT: for<'de> serde::Deserialize<'de>,{trace!("Http.deserialize()");serde_json::from_str(&from).map_err(|e| Error::Deserialize { from: e })}}
use crate::interfaces;use snafu::{OptionExt, ResultExt, Snafu};use std::sync::mpsc;use std::sync::mpsc::{Receiver, Sender};use std::sync::{Arc, Mutex};use tokio::runtime::Runtime;pub type InProcessChannel = (crate::Call<String>, Sender<Result<crate::Reply<String>, Error>>);#[derive(Debug, Snafu)]pub enum Error {Serialize { from: serde_json::Error },Deserialize { from: serde_json::Error },GetCallLockInReceiver {},NoReceiver {},NoCallerChannel {},NoReceiverChannel {},CallerRecv { source: std::sync::mpsc::RecvError },CallerSend { source: std::sync::mpsc::SendError<InProcessChannel> },Reply { from: String },RuntimeCreation { source: std::io::Error },}pub struct InProcess {#[allow(clippy::type_complexity)]receiver: Option<Arc<dyn Fn(Arc<Mutex<crate::Call<&String>>>) -> Arc<tokio::sync::Mutex<Result<crate::Reply<String>, Error>>> + Send + Sync>>,runtime: Runtime,to: Option<Sender<InProcessChannel>>,from: Option<Arc<Mutex<Receiver<InProcessChannel>>>>,}pub struct InProcessInit {pub to: Option<Sender<InProcessChannel>>,pub from: Option<Receiver<InProcessChannel>>,}impl Default for InProcessInit {fn default() -> Self {InProcessInit { to: None, from: None }}}impl InProcessInit {pub fn init(self) -> Result<InProcess, Error> {Ok(InProcess {receiver: None,to: self.to,from: if let Some(from) = self.from { Some(Arc::new(Mutex::new(from))) } else { None },runtime: Runtime::new().context(RuntimeCreation)?,})}}impl<'a> interfaces::Backend<'a> for InProcess {type Intermediate = String;type Error = Error;fn start(&mut self) -> Result<(), Self::Error> {let from = self.from.as_ref().context(NoReceiverChannel {})?.clone();let receiver = self.receiver.as_ref().context(NoReceiver {})?.clone();self.runtime.spawn(async move {loop {let (call, tx) = from.lock().unwrap().recv().unwrap();let reply_mutex = receiver(Arc::new(Mutex::new(crate::Call {procedure: call.procedure,payload: &call.payload,})));let reply = &*reply_mutex.lock().await;tx.send(match reply {Ok(r) => Ok(crate::Reply { payload: r.payload.clone() }),Err(e) => Err(Error::Reply { from: format!("{:?}", e) }),}).unwrap();}});Ok(())}fn stop(&mut self) -> Result<(), Self::Error> {Ok(())}fn receiver<T>(&mut self, receiver: T) -> Result<(), Self::Error>whereT: Fn(&crate::Call<&Self::Intermediate>) -> Result<crate::Reply<Self::Intermediate>, Self::Error> + Send + Sync + 'static,{self.receiver = Some(Arc::new(move |call: Arc<Mutex<crate::Call<&String>>>| {let call = match call.as_ref().lock() {Ok(c) => c,Err(_) => return Arc::new(tokio::sync::Mutex::new(Err(Error::GetCallLockInReceiver {}))),};match receiver(&*call) {Ok(reply) => Arc::new(tokio::sync::Mutex::new(Ok(crate::Reply { payload: reply.payload }))),Err(e) => Arc::new(tokio::sync::Mutex::new(Err(e))),}}));Ok(())}fn call(&mut self, call: &crate::Call<&Self::Intermediate>) -> Result<crate::Reply<Self::Intermediate>, Self::Error> {#[allow(clippy::type_complexity)]let (tx, rx): (Sender<Result<crate::Reply<String>, Error>>, Receiver<Result<crate::Reply<String>, Error>>) = mpsc::channel();self.to.as_ref().context(NoCallerChannel {})?.send((crate::Call {procedure: call.procedure.clone(),payload: call.payload.clone(),},tx,)).context(CallerSend {})?;rx.recv().context(CallerRecv {})?}fn serialize<T: serde::Serialize>(from: &T) -> Result<String, Self::Error> {serde_json::to_string(from).map_err(|e| Error::Serialize { from: e })}fn deserialize<'b, T>(from: &'b Self::Intermediate) -> Result<T, Self::Error>whereT: for<'de> serde::Deserialize<'de>,{serde_json::from_str(&from).map_err(|e| Error::Deserialize { from: e })}}
use snafu::Snafu;#[derive(Debug, Snafu)]pub enum Error<B: core::fmt::Display> {FromBackend { from: B },UnknownProcedure,MutexLock,}impl<B: snafu::Error> From<B> for Error<B> {fn from(from: B) -> Self {Error::FromBackend { from }}}
use interfaces::Backend;use crate::interfaces;use std::collections::HashMap;use std::sync::{Arc, Mutex};use snafu::Snafu;#[derive(Debug, Snafu)]pub enum Error<B: core::fmt::Display> {FromBackend { from: B },ProcedureNotRegistered {},}impl<B: snafu::Error> From<B> for Error<B> {fn from(from: B) -> Self {Error::FromBackend { from }}}pub struct Register<'a, B: Backend<'a>> {#[allow(clippy::type_complexity)]procedures: Arc<Mutex<HashMap<String, Box<dyn Fn(&crate::Call<&B::Intermediate>) -> Result<crate::Reply<B::Intermediate>, Error<B::Error>> + 'a>>>>,#[allow(clippy::type_complexity)]call: Option<Box<dyn Fn(&crate::Call<&B::Intermediate>) -> Result<crate::Reply<B::Intermediate>, B::Error> + 'a + Send>>,}unsafe impl<'a, T: Backend<'a>> Send for Register<'a, T> {}pub struct RegisterInit {}impl Default for RegisterInit {fn default() -> Self {RegisterInit {}}}impl RegisterInit {pub fn init<'a, B: Backend<'a>>(self) -> Register<'a, B> {Register {procedures: Arc::new(Mutex::new(HashMap::new())),call: None,}}}impl<'a, B: Backend<'a>> Register<'a, B> {pub fn register<P, C: for<'de> serde::Deserialize<'de>, R: serde::Serialize>(&self, name: &str, procedure: P) -> Result<(), Error<B::Error>>whereP: Fn(C) -> R + 'a,{self.procedures.lock().unwrap().insert(name.to_string(),Box::new(move |call: &crate::Call<&B::Intermediate>| {let reply = procedure(B::deserialize::<C>(call.payload)?);Ok(crate::Reply { payload: B::serialize::<R>(&reply)? })}),);Ok(())}pub fn call<C: serde::Serialize, R: for<'de> serde::Deserialize<'de>>(&self, procedure: &str, payload: &C) -> Result<R, Error<B::Error>> {Ok(B::deserialize(&self.call.as_ref().unwrap()(&crate::Call {procedure: procedure.to_string(),payload: &B::serialize(&payload)?,})?.payload,)?)}}impl<'a, B> interfaces::Frontend<'a, B> for Register<'a, B>whereB: interfaces::Backend<'a>,{type Intermediate = String;type Error = Error<B::Error>;fn caller<T>(&mut self, caller: T) -> Result<(), Self::Error>whereT: Fn(&crate::Call<&B::Intermediate>) -> Result<crate::Reply<B::Intermediate>, B::Error> + 'a + Send,{self.call = Some(Box::new(caller));Ok(())}fn receive(&self, call: &crate::Call<&B::Intermediate>) -> Result<crate::Reply<B::Intermediate>, Error<B::Error>> {self.procedures.lock().unwrap().get(&call.procedure).unwrap()(call)}}
# serde-transcode = { version = "1.2.0", git = "https://github.com/volllly/serde-transcode", default-features = false, features = ["alloc"] }serde_json = { version = "1.0.57", optional = true }# erased-serde = { version = "0.3.12", default-features = false, features = ["alloc"] }hyper = { version = "0.14", optional = true }tokio = { version = "1.0", optional = true }
use mer::*;fn add(a: i32, b: i32) -> i32 {a + b}#[test]fn register_in_process() {use std::sync::mpsc;use std::sync::mpsc::{Receiver, Sender};let register_caller = mer_frontend_register::RegisterInit {}.init();let register_receiver = mer_frontend_register::RegisterInit {}.init();register_caller.register("add", |(a, b)| add(a, b)).unwrap();register_receiver.register("add", |(a, b)| add(a, b)).unwrap();let (to, from): (Sender<mer_backend_in_process::InProcessChannel>, Receiver<mer_backend_in_process::InProcessChannel>) = mpsc::channel();let mer_caller = MerInit {backend: mer_backend_in_process::InProcessInit { to: to.into(), ..Default::default() }.init().unwrap(),frontend: register_caller,}.init();let mut mer_receiver = MerInit {backend: mer_backend_in_process::InProcessInit {from: from.into(),..Default::default()}.init().unwrap(),frontend: register_receiver,}.init();mer_receiver.start().unwrap();let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);let result: i32 = mer_caller.frontend(|f| f.call("add", &(a, b)).unwrap()).unwrap();assert_eq!(result, a + b);}
use mer::{interfaces::{Backend, Frontend},Call, Reply,};use std::collections::HashMap;use std::sync::{Arc, Mutex};use snafu::Snafu;#[derive(Debug, Snafu)]pub enum Error<B: core::fmt::Display> {FromBackend { from: B },ProcedureNotRegistered {},}impl<B: snafu::Error> From<B> for Error<B> {fn from(from: B) -> Self {Error::FromBackend { from }}}pub struct Register<'a, B: Backend<'a>> {#[allow(clippy::type_complexity)]procedures: Arc<Mutex<HashMap<String, Box<dyn Fn(&Call<&B::Intermediate>) -> Result<Reply<B::Intermediate>, Error<B::Error>> + 'a>>>>,#[allow(clippy::type_complexity)]call: Option<Box<dyn Fn(&Call<&B::Intermediate>) -> Result<Reply<B::Intermediate>, B::Error> + 'a + Send>>,}unsafe impl<'a, T: Backend<'a>> Send for Register<'a, T> {}pub struct RegisterInit {}impl Default for RegisterInit {fn default() -> Self {RegisterInit {}}}impl RegisterInit {pub fn init<'a, B: Backend<'a>>(self) -> Register<'a, B> {Register {procedures: Arc::new(Mutex::new(HashMap::new())),call: None,}}}impl<'a, B: Backend<'a>> Register<'a, B> {pub fn register<P, C: for<'de> serde::Deserialize<'de>, R: serde::Serialize>(&self, name: &str, procedure: P) -> Result<(), Error<B::Error>>whereP: Fn(C) -> R + 'a,{self.procedures.lock().unwrap().insert(name.to_string(),Box::new(move |call: &Call<&B::Intermediate>| {let reply = procedure(B::deserialize::<C>(call.payload)?);Ok(Reply { payload: B::serialize::<R>(&reply)? })}),);Ok(())}pub fn call<C: serde::Serialize, R: for<'de> serde::Deserialize<'de>>(&self, procedure: &str, payload: &C) -> Result<R, Error<B::Error>> {Ok(B::deserialize(&self.call.as_ref().unwrap()(&Call {procedure: procedure.to_string(),payload: &B::serialize(&payload)?,})?.payload,)?)}}impl<'a, B> Frontend<'a, B> for Register<'a, B>whereB: Backend<'a>,{type Intermediate = String;type Error = Error<B::Error>;fn caller<T>(&mut self, caller: T) -> Result<(), Self::Error>whereT: Fn(&Call<&B::Intermediate>) -> Result<Reply<B::Intermediate>, B::Error> + 'a + Send,{self.call = Some(Box::new(caller));Ok(())}fn receive(&self, call: &Call<&B::Intermediate>) -> Result<Reply<B::Intermediate>, Error<B::Error>> {self.procedures.lock().unwrap().get(&call.procedure).unwrap()(call)}}
[package]name = "mer_frontend_register"version = "0.1.0"authors = ["Paul Volavsek <paul.volavsek@gmail.com>"]edition = "2018"[features][dependencies]mer = { path = "../../mer" }serde = "1.0.116"snafu = { version = "0.6.9", default-features = false }log = { version = "0.4", default-features = false }[dev-dependencies]mer_backend_in_process = { path = "../../backends/in-process" }rand = "0.8"flexi_logger = "0.16"[[test]]name = "test"path = "test/tests.rs"
use mer_frontend_derive::*;#[test]fn test() {#[frontend]struct Data<T>whereT: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,{pub offset: T,}#[frontend(target = "Data")]trait Service<T>whereT: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,{fn add(a: T, b: T) -> T::Output {a + b}fn add_with_offset(&self, a: T, b: T) -> T::Output {a + b + self.offset}}}
use darling::FromMeta;use proc_macro::TokenStream;use quote::quote;use syn::{parse_macro_input, AttributeArgs, ItemStruct, ItemTrait};#[macro_use]mod frontend;fn to_compile_errors(errors: Vec<syn::Error>) -> proc_macro2::TokenStream {let compile_errors = errors.iter().map(syn::Error::to_compile_error);quote!(#(#compile_errors)*)}#[proc_macro_attribute]pub fn frontend(args: TokenStream, input: TokenStream) -> TokenStream {let args = parse_macro_input!(args as AttributeArgs);let args_parsed = match frontend::AttrArgs::from_list(&args) {Ok(v) => v,Err(e) => {return TokenStream::from(e.write_errors());}};if args_parsed.target.is_some() {let input = parse_macro_input!(input as ItemTrait);frontend::expand_trait(&args_parsed, &input).unwrap_or_else(to_compile_errors).into()} else {let input = parse_macro_input!(input as ItemStruct);frontend::expand_struct(&input).unwrap_or_else(to_compile_errors).into()}}
use darling::FromMeta;use proc_macro2::TokenStream;use quote::{format_ident, quote};#[derive(Debug, FromMeta)]pub struct AttrArgs {#[darling(default)]pub target: Option<syn::Path>,}pub fn expand_trait(args: &AttrArgs, input: &syn::ItemTrait) -> Result<TokenStream, Vec<syn::Error>> {let trait_name = &input.ident;let trait_generics = &input.generics;let service_name = args.target.as_ref().unwrap();let where_clause = &trait_generics.where_clause;let items = &input.items;let item_methods: Vec<syn::TraitItemMethod> = items.iter().filter_map(|i| match i {syn::TraitItem::Method(m) => {let mut method = m.clone();method.default.take();Some(method)}_ => None,}).collect();let mut impl_generic_def = trait_generics.clone();impl_generic_def.params.insert(0, syn::parse_quote! { __B: mer::interfaces::Backend<'__a> });impl_generic_def.params.insert(0, syn::parse_quote! { '__a });let mut impl_generics = trait_generics.clone();impl_generics.params.insert(0, syn::parse_quote! { __B });impl_generics.params.insert(0, syn::parse_quote! { '__a });let receiver_impl_items: Vec<TokenStream> = item_methods.iter().map(|i| {let item_name = format_ident!("{}", &i.sig.ident);let mut has_self: bool = false;let arguments: Vec<&Box<syn::Type>> = i.sig.inputs.iter().filter_map(|a| match a {syn::FnArg::Typed(t) => Some(&t.ty),syn::FnArg::Receiver(_) => {has_self = true;None}}).collect();let index = (0..arguments.len()).map(syn::Index::from);let deser = quote! {let deser_payload = __B::deserialize::<(#( #arguments ),*)>(call.payload)?;};let reply = match has_self {true => quote! {let reply = <Self as #trait_name #impl_generics>::#item_name(self, #( deser_payload.#index),*);},false => quote! {let reply = <Self as #trait_name #impl_generics>::#item_name(#( deser_payload.#index),*);},};let ser = quote! {let ser_reply = __B::serialize(&reply)?;Ok(mer::Reply { payload: ser_reply })};quote! {stringify!(#item_name) => {log::debug!("frontend procedure receiving: {}", stringify!(#item_name));#deser#reply#ser}}}).collect();let caller_impl_items: Vec<TokenStream> = item_methods.iter().map(|i| {let item_name = &i.sig.ident;let mut has_self = false;let arguments: Vec<&syn::Ident> = i.sig.inputs.iter().filter_map(|a| match a {syn::FnArg::Typed(t) => {if let syn::Pat::Ident(ident) = &*t.pat {Some(&ident.ident)} else {None}}syn::FnArg::Receiver(_) => {has_self = true;None}}).collect();let mut signature = i.sig.inputs.clone();if !has_self {signature.insert(0, syn::parse_quote! { &self })}let old_return_type: syn::Type = match &i.sig.output {syn::ReturnType::Default => syn::parse_quote! { () },syn::ReturnType::Type(_, t) => syn::parse_quote! { #t },};let return_type: syn::ReturnType = syn::parse_quote! { -> Result<#old_return_type, mer_frontend_derive::Error<__B::Error>> };quote! {pub fn #item_name(#signature) #return_type {log::debug!("frontend procedure calling: {}", stringify!(#item_name));let ser_payload = __B::serialize(&(#( #arguments ),*))?;let reply = self.__call.as_ref().unwrap()(&mer::Call {procedure: stringify!(#item_name).to_string(),payload: &ser_payload,})?.payload;let deser_reply = __B::deserialize::<#old_return_type>(&reply);Ok(deser_reply?)}}}).collect();Ok(quote! {trait #trait_name #impl_generic_def #where_clause {#( #item_methods )*}impl #impl_generic_def #trait_name #impl_generics for #service_name #impl_generics #where_clause {#( #items )*}impl #impl_generic_def #service_name #impl_generics #where_clause {fn __receive(&self, call: &mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, mer_frontend_derive::Error<__B::Error>> {match call.procedure.as_str() {#( #receiver_impl_items ),*_ => Err(mer_frontend_derive::Error::UnknownProcedure {}),}}}impl #impl_generic_def #service_name #impl_generics #where_clause {#( #caller_impl_items )*}})}pub fn expand_struct(input: &syn::ItemStruct) -> Result<TokenStream, Vec<syn::Error>> {let struct_name = &input.ident;let struct_name_init = format_ident!("{}Init", &input.ident);let struct_generics = &input.generics;let where_clause = &struct_generics.where_clause;let fields = match &input.fields {syn::Fields::Named(named) => {let fields: Vec<&syn::Field> = named.named.iter().collect();quote! { #( #fields ),* }}rest => quote! { #rest },};let field_names = match &input.fields {syn::Fields::Named(named) => named.named.iter().filter_map(|f| f.ident.clone()).map(|f| quote! { #f: self.#f }).collect(),_ => vec![],};let mut impl_generic_def = struct_generics.clone();impl_generic_def.params.insert(0, syn::parse_quote! { __B: mer::interfaces::Backend<'__a> });impl_generic_def.params.insert(0, syn::parse_quote! { '__a });let mut impl_generics = struct_generics.clone();impl_generics.params.insert(0, syn::parse_quote! { __B });impl_generics.params.insert(0, syn::parse_quote! { '__a });Ok(quote! {struct #struct_name #impl_generic_def #where_clause {#fields,__call: Option<Box<dyn Fn(&mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, __B::Error> + '__a + Send>>}#[derive(Default)]struct #struct_name_init #struct_generics #where_clause {#fields,}impl #struct_generics #struct_name_init #struct_generics #where_clause {pub fn init<'__a, __B: mer::interfaces::Backend<'__a>>(self) -> #struct_name #impl_generics {#struct_name {#( #field_names ),*,__call: None}}}impl #impl_generic_def mer::interfaces::Frontend<'__a, __B> for #struct_name #impl_generics #where_clause {type Intermediate = String;type Error = mer_frontend_derive::Error<__B::Error>;fn caller<__T>(&mut self, caller: __T) -> Result<(), mer_frontend_derive::Error<__B::Error>>where__T: Fn(&mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, __B::Error> + '__a + Send,__T: 'static,{self.__call = Some(Box::new(caller));Ok(())}fn receive(&self, call: &mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, mer_frontend_derive::Error<__B::Error>> {log::debug!("receiving: Call {{ prodecure: {:?}, payload: ... }}", &call.procedure);self.__receive(call).map_err(core::convert::Into::into)}}})}
[package]name = "mer_frontend_derive_macros"version = "0.1.0"authors = ["Paul Volavsek <paul.volavsek@gmail.com>"]edition = "2018"[features]default = ["std"]std = [ "log/std", "serde/std" ][dependencies]syn = "1.0"quote = "1.0"proc-macro2 = "1.0"darling = "0.12"log = { version = "0.4", default-features = false }[dev-dependencies]mer_frontend_derive = { path = "../core" }mer = { path = "../../../mer" }serde = { version = "1.0", default-features = false }[lib]proc-macro = true[[test]]name = "test"path = "test/tests.rs"
use mer::*;#[test]fn derive_in_process() {use std::sync::mpsc;#[mer_frontend_derive::frontend()]struct Data<T>whereT: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,{pub offset: T,}#[mer_frontend_derive::frontend(target = "Data")]trait Receiver<T>whereT: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,{fn add(a: T, b: T) -> T::Output {a + b}fn add_with_offset(&self, a: T, b: T) -> T::Output {a + b + self.offset}}let (to, from): (mpsc::Sender<mer_backend_in_process::InProcessChannel>, mpsc::Receiver<mer_backend_in_process::InProcessChannel>) = mpsc::channel();let mer_caller = MerInit {backend: mer_backend_in_process::InProcessInit { to: to.into(), ..Default::default() }.init().unwrap(),frontend: DataInit::<i32> { offset: 32 }.init(),}.init();let mut mer_register = MerInit {backend: mer_backend_in_process::InProcessInit {from: from.into(),..Default::default()}.init().unwrap(),frontend: DataInit::<i32> { offset: 32 }.init(),}.init();mer_register.start().unwrap();let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);assert_eq!(mer_caller.frontend(|f| { f.add(a, b).unwrap() }).unwrap(), a + b);}
use snafu::Snafu;pub use mer_frontend_derive_macros::frontend;#[derive(Debug, Snafu)]pub enum Error<B: core::fmt::Display> {FromBackend { from: B },UnknownProcedure,MutexLock,}impl<B: snafu::Error> From<B> for Error<B> {fn from(from: B) -> Self {Error::FromBackend { from }}}
[package]name = "mer_frontend_derive"version = "0.1.0"authors = ["Paul Volavsek <paul.volavsek@gmail.com>"]edition = "2018"[features]default = ["std"]std = ["snafu/std","log/std"][dependencies]mer = { path = "../../../mer" }mer_frontend_derive_macros = { path = "../macros" }snafu = { version = "0.6.9", default-features = false }log = { version = "0.4", default-features = false }[dev-dependencies]mer_backend_in_process = { path = "../../../backends/in-process" }serde = "1.0"rand = "0.8"flexi_logger = "0.16"[[test]]name = "test"path = "test/tests.rs"
use mer::*;fn add(a: i32, b: i32) -> i32 {a + b}#[test]fn register_in_process() {use std::sync::mpsc;use std::sync::mpsc::{Receiver, Sender};let register_caller = mer_frontend_register::RegisterInit {}.init();let register_receiver = mer_frontend_register::RegisterInit {}.init();register_caller.register("add", |(a, b)| add(a, b)).unwrap();register_receiver.register("add", |(a, b)| add(a, b)).unwrap();let (to, from): (Sender<mer_backend_in_process::InProcessChannel>, Receiver<mer_backend_in_process::InProcessChannel>) = mpsc::channel();let mer_caller = MerInit {backend: mer_backend_in_process::InProcessInit { to: to.into(), ..Default::default() }.init().unwrap(),frontend: register_caller,}.init();let mut mer_receiver = MerInit {backend: mer_backend_in_process::InProcessInit {from: from.into(),..Default::default()}.init().unwrap(),frontend: register_receiver,}.init();mer_receiver.start().unwrap();let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);let result: i32 = mer_caller.frontend(|f| f.call("add", &(a, b)).unwrap()).unwrap();assert_eq!(result, a + b);}
use mer::{interfaces::Backend, Call, Reply};use snafu::{OptionExt, ResultExt, Snafu};use std::sync::mpsc;use std::sync::mpsc::{Receiver, Sender};use std::sync::{Arc, Mutex};use tokio::runtime::Runtime;pub type InProcessChannel = (Call<String>, Sender<Result<Reply<String>, Error>>);#[derive(Debug, Snafu)]pub enum Error {Serialize { from: serde_json::Error },Deserialize { from: serde_json::Error },GetCallLockInReceiver {},NoReceiver {},NoCallerChannel {},NoReceiverChannel {},CallerRecv { source: std::sync::mpsc::RecvError },CallerSend { source: std::sync::mpsc::SendError<InProcessChannel> },ReplyError { from: String },RuntimeCreation { source: std::io::Error },}pub struct InProcess {#[allow(clippy::type_complexity)]receiver: Option<Arc<dyn Fn(Arc<Mutex<Call<&String>>>) -> Arc<tokio::sync::Mutex<Result<Reply<String>, Error>>> + Send + Sync>>,runtime: Runtime,to: Option<Sender<InProcessChannel>>,from: Option<Arc<Mutex<Receiver<InProcessChannel>>>>,}pub struct InProcessInit {pub to: Option<Sender<InProcessChannel>>,pub from: Option<Receiver<InProcessChannel>>,}impl Default for InProcessInit {fn default() -> Self {InProcessInit { to: None, from: None }}}impl InProcessInit {pub fn init(self) -> Result<InProcess, Error> {Ok(InProcess {receiver: None,to: self.to,from: if let Some(from) = self.from { Some(Arc::new(Mutex::new(from))) } else { None },runtime: Runtime::new().context(RuntimeCreation)?,})}}impl<'a> Backend<'a> for InProcess {type Intermediate = String;type Error = Error;fn start(&mut self) -> Result<(), Self::Error> {let from = self.from.as_ref().context(NoReceiverChannel {})?.clone();let receiver = self.receiver.as_ref().context(NoReceiver {})?.clone();self.runtime.spawn(async move {loop {let (call, tx) = from.lock().unwrap().recv().unwrap();let reply_mutex = receiver(Arc::new(Mutex::new(Call {procedure: call.procedure,payload: &call.payload,})));let reply = &*reply_mutex.lock().await;tx.send(match reply {Ok(r) => Ok(Reply { payload: r.payload.clone() }),Err(e) => Err(Error::ReplyError { from: format!("{:?}", e) }),}).unwrap();}});Ok(())}fn stop(&mut self) -> Result<(), Self::Error> {Ok(())}fn receiver<T>(&mut self, receiver: T) -> Result<(), Self::Error>whereT: Fn(&Call<&Self::Intermediate>) -> Result<Reply<Self::Intermediate>, Self::Error> + Send + Sync + 'static,{self.receiver = Some(Arc::new(move |call: Arc<Mutex<Call<&String>>>| {let call = match call.as_ref().lock() {Ok(c) => c,Err(_) => return Arc::new(tokio::sync::Mutex::new(Err(Error::GetCallLockInReceiver {}))),};match receiver(&*call) {Ok(reply) => Arc::new(tokio::sync::Mutex::new(Ok(Reply { payload: reply.payload }))),Err(e) => Arc::new(tokio::sync::Mutex::new(Err(e))),}}));Ok(())}fn call(&mut self, call: &Call<&Self::Intermediate>) -> Result<Reply<Self::Intermediate>, Self::Error> {#[allow(clippy::type_complexity)]let (tx, rx): (Sender<Result<Reply<String>, Error>>, Receiver<Result<Reply<String>, Error>>) = mpsc::channel();self.to.as_ref().context(NoCallerChannel {})?.send((Call {procedure: call.procedure.clone(),payload: call.payload.clone(),},tx,)).context(CallerSend {})?;rx.recv().context(CallerRecv {})?}fn serialize<T: serde::Serialize>(from: &T) -> Result<String, Self::Error> {serde_json::to_string(from).map_err(|e| Error::Serialize { from: e })}fn deserialize<'b, T>(from: &'b Self::Intermediate) -> Result<T, Self::Error>whereT: for<'de> serde::Deserialize<'de>,{serde_json::from_str(&from).map_err(|e| Error::Deserialize { from: e })}}
[package]name = "mer_backend_in_process"version = "0.1.0"authors = ["Paul Volavsek <paul.volavsek@gmail.com>"]edition = "2018"[features][dependencies]mer = { path = "../../mer" }serde = "1.0.116"serde_json = "1.0.57"tokio = { version = "1.0", features = ["rt", "rt-multi-thread"] }snafu = "0.6.9"log = "0.4"[dev-dependencies]mer_frontend_register = { path = "../../frontends/register" }rand = "0.8"flexi_logger = "0.16"[[test]]name = "test"path = "test/tests.rs"
use mer::*;use std::net::{IpAddr, Ipv4Addr, SocketAddr};fn add(a: i32, b: i32) -> i32 {a + b}#[test]fn register_http() {let register_caller = mer_frontend_register::RegisterInit {}.init();let register_receiver = mer_frontend_register::RegisterInit {}.init();register_caller.register("add", |(a, b)| add(a, b)).unwrap();register_receiver.register("add", |(a, b)| add(a, b)).unwrap();let mer_caller = MerInit {backend: mer_backend_http::HttpInit {speak: "http://localhost:8080".parse::<hyper::Uri>().unwrap().into(),listen: None,..Default::default()}.init().unwrap(),frontend: register_caller,}.init();let mut mer_receiver = MerInit {backend: mer_backend_http::HttpInit {speak: None,listen: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080).into(),..Default::default()}.init().unwrap(),frontend: register_receiver,}.init();mer_receiver.start().unwrap();let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);let result: i32 = mer_caller.frontend(|f| f.call("add", &(a, b)).unwrap()).unwrap();assert_eq!(result, a + b);}
use mer::{interfaces::Backend, Call, Reply};use snafu::{OptionExt, ResultExt, Snafu};use hyper::{client::{connect::dns::GaiResolver, HttpConnector},Body, Client, Method, Request, StatusCode,};use hyper::http::Uri;use hyper::service::{make_service_fn, service_fn};use hyper::{Response, Server};use std::sync::{Arc, Mutex};use std::{fmt::Debug, net::SocketAddr};use tokio::runtime::Runtime;use tokio::sync;use log::{debug, trace};#[derive(Debug, Snafu)]pub enum Error {Serialize { from: serde_json::Error },Deserialize { from: serde_json::Error },SpeakingDisabled,RequestBuilder { source: hyper::http::Error },ParseResponseBodyBytes { source: hyper::Error },ParseResponseBody { source: std::string::FromUtf8Error },ClientRequest { source: hyper::Error },FailedRequest { status: StatusCode },NoListen,NoReceiver,BindServer { source: hyper::Error },NoProcedureHeader { source: hyper::http::Error },GetCallLockInReceiver,RuntimeCreation { source: std::io::Error },}pub struct Http {client: Client<HttpConnector<GaiResolver>, Body>,speak: Option<Uri>,listen: Option<SocketAddr>,#[allow(clippy::type_complexity)]receiver: Option<Arc<dyn Fn(Arc<Mutex<Call<&String>>>) -> Arc<sync::Mutex<Result<Reply<String>, Error>>> + Send + Sync>>,runtime: Runtime,}impl Debug for Http {fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {f.debug_struct("Http").field("client", &self.client).field("speak", &self.speak).field("listen", &self.listen).field("runtime", &self.runtime).finish()}}pub struct HttpInit {pub client: Client<HttpConnector<GaiResolver>, Body>,pub speak: Option<Uri>,pub listen: Option<SocketAddr>,}impl Default for HttpInit {fn default() -> Self {HttpInit {client: Client::new(),speak: None,listen: None,}}}impl From<HttpInit> for Result<Http, Error> {fn from(from: HttpInit) -> Self {from.init()}}impl HttpInit {pub fn init(self) -> Result<Http, Error> {trace!("HttpInit.init()");let http = Http {client: self.client,speak: self.speak,listen: self.listen,receiver: None,runtime: Runtime::new().context(RuntimeCreation {})?,};debug!("{:?}", &http);Ok(http)}}impl<'a> Backend<'a> for Http {type Intermediate = String;type Error = Error;fn start(&mut self) -> Result<(), Self::Error> {trace!("Http.start()");let listen = self.listen.context(NoListen)?;let receiver = Arc::clone(self.receiver.as_ref().context(NoReceiver)?);self.runtime.spawn(async move {trace!("Http.runtime.spawn()");let receiver = receiver.clone();Server::bind(&listen).serve(make_service_fn(move |_| {trace!("Http.runtime.spawn.serve()");let receiver = receiver.clone();async move {Ok::<_, hyper::Error>(service_fn(move |request: Request<Body>| {trace!("Http.runtime.spawn.serve.service_fn()");debug!("{:?}", &listen);let receiver = receiver.clone();async move {let procedure = if let Some(procedure) = request.headers().get("Procedure") {match procedure.to_str() {Ok(procedure) => procedure.to_owned(),Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),}} else {return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from("No Procedure provided"));};let body_bytes = match hyper::body::to_bytes(request.into_body()).await {Ok(body_bytes) => body_bytes,Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),};let body = match String::from_utf8(body_bytes.to_vec()) {Ok(body) => body,Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),};debug!("call Call {{ procedure: {:?}, payload: {:?} }}", &procedure, &body);let reply_mutex = receiver(Arc::new(Mutex::new(crate::Call { procedure, payload: &body })));let reply = &*reply_mutex.lock().await;match reply {Err(e) => Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(format!("{:?}", e))),Ok(reply) => {debug!("reply Reply {{ payload: {:?} }}", &reply.payload);Response::builder().status(StatusCode::OK).body(Body::from(reply.payload.to_owned()))}}}}))}})).await.unwrap();});Ok(())}fn stop(&mut self) -> Result<(), Self::Error> {trace!("Http.stop()");Ok(())}fn receiver<T>(&mut self, receiver: T) -> Result<(), Self::Error>whereT: Fn(&Call<&Self::Intermediate>) -> Result<Reply<Self::Intermediate>, Self::Error> + Send + Sync + 'static,{trace!("Http.receiver()");self.receiver = Some(Arc::new(move |call: Arc<Mutex<Call<&String>>>| {trace!("(Http.receiver)()");let call = match call.as_ref().lock() {Ok(c) => c,Err(_) => return Arc::new(sync::Mutex::new(Err(Error::GetCallLockInReceiver))),};debug!("calling receiver");match receiver(&*call) {Ok(reply) => Arc::new(sync::Mutex::new(Ok(Reply { payload: reply.payload }))),Err(e) => Arc::new(sync::Mutex::new(Err(e))),}}));Ok(())}fn call(&mut self, call: &Call<&Self::Intermediate>) -> Result<Reply<Self::Intermediate>, Self::Error> {trace!("Http.call()");debug!("{:?}", &self.speak);match &self.speak {None => Err(Error::SpeakingDisabled),Some(uri) => self.runtime.block_on(async {let request = Request::builder().method(Method::POST).uri(uri).header("Procedure", &call.procedure).body(Body::from(call.payload.clone())).context(RequestBuilder)?;debug!("request {:?}", &request);let response = self.client.request(request).await.context(ClientRequest)?;debug!("response {:?}", &response);let status = response.status();let body_bytes = hyper::body::to_bytes(response.into_body()).await.context(ParseResponseBodyBytes)?;let body = String::from_utf8(body_bytes.to_vec()).context(ParseResponseBody)?;match status {StatusCode::OK => Ok(Reply { payload: body }),_ => Err(Error::FailedRequest { status }),}}),}}fn serialize<T: serde::Serialize>(from: &T) -> Result<String, Self::Error> {trace!("Http.serialize()");serde_json::to_string(from).map_err(|e| Error::Serialize { from: e })}fn deserialize<'b, T>(from: &'b Self::Intermediate) -> Result<T, Self::Error>whereT: for<'de> serde::Deserialize<'de>,{trace!("Http.deserialize()");serde_json::from_str(&from).map_err(|e| Error::Deserialize { from: e })}}
[package]name = "mer_backend_http"version = "0.1.0"authors = ["Paul Volavsek <paul.volavsek@gmail.com>"]edition = "2018"[features][dependencies]mer = { path = "../../mer" }serde = "1.0.116"serde_json = "1.0.57"hyper = { version = "0.14", features = ["http2", "client", "server", "tcp"] }tokio = { version = "1.0", features = ["rt", "rt-multi-thread"] }snafu = "0.6.9"log = "0.4"[dev-dependencies]mer_frontend_register = { path = "../../frontends/register" }rand = "0.8"flexi_logger = "0.16"[[test]]name = "test"path = "test/tests.rs"