S2PPKG7OUEVI6UPTGRAQEOZWSS7ZYDNO6NL4A5UH4VV2IFXL37EAC
[package]
name = "mer_derive"
version = "0.1.0"
authors = ["Paul Volavsek <paul.volavsek@gmail.com>"]
edition = "2018"
[features]
default = ["std"]
std = []
[dependencies]
syn = "1.0"
quote = "1.0"
proc-macro2 = "1.0"
darling = "0.12"
log = { version = "0.4", default-features = false }
[dev-dependencies]
mer = { path = "../mer" }
serde = "1.0"
[lib]
proc-macro = true
[[test]]
name = "test"
path = "test/tests.rs"
use darling::FromMeta;
use proc_macro2::TokenStream;
use quote::{format_ident, quote};
#[derive(Debug, FromMeta)]
pub struct AttrArgs {
#[darling(default)]
pub target: Option<syn::Path>,
}
pub fn expand_trait(args: &AttrArgs, input: &syn::ItemTrait) -> Result<TokenStream, Vec<syn::Error>> {
let trait_name = &input.ident;
let trait_generics = &input.generics;
let service_name = args.target.as_ref().unwrap();
let where_clause = &trait_generics.where_clause;
let items = &input.items;
let item_methods: Vec<syn::TraitItemMethod> = items
.iter()
.filter_map(|i| match i {
syn::TraitItem::Method(m) => {
let mut method = m.clone();
method.default.take();
Some(method)
}
_ => None,
})
.collect();
let mut impl_generic_def = trait_generics.clone();
impl_generic_def.params.insert(0, syn::parse_quote! { __B: mer::interfaces::Backend<'__a> });
impl_generic_def.params.insert(0, syn::parse_quote! { '__a });
let mut impl_generics = trait_generics.clone();
impl_generics.params.insert(0, syn::parse_quote! { __B });
impl_generics.params.insert(0, syn::parse_quote! { '__a });
let receiver_impl_items: Vec<TokenStream> = item_methods
.iter()
.map(|i| {
let item_name = format_ident!("{}", &i.sig.ident);
let mut has_self: bool = false;
let arguments: Vec<&Box<syn::Type>> = i
.sig
.inputs
.iter()
.filter_map(|a| match a {
syn::FnArg::Typed(t) => Some(&t.ty),
syn::FnArg::Receiver(_) => {
has_self = true;
None
}
})
.collect();
let index = (0..arguments.len()).map(syn::Index::from);
let deser = quote! {
let deser_payload = __B::deserialize::<(#( #arguments ),*)>(call.payload)?;
};
let reply = match has_self {
true => quote! {
let reply = <Self as #trait_name #impl_generics>::#item_name(self, #( deser_payload.#index),*);
},
false => quote! {
let reply = <Self as #trait_name #impl_generics>::#item_name(#( deser_payload.#index),*);
},
};
let ser = quote! {
let ser_reply = __B::serialize(&reply)?;
Ok(mer::Reply { payload: ser_reply })
};
quote! {
stringify!(#item_name) => {
log::debug!("frontend procedure receiving: {}", stringify!(#item_name));
#deser
#reply
#ser
}
}
})
.collect();
let caller_impl_items: Vec<TokenStream> = item_methods
.iter()
.map(|i| {
let item_name = &i.sig.ident;
let mut has_self = false;
let arguments: Vec<&syn::Ident> = i
.sig
.inputs
.iter()
.filter_map(|a| match a {
syn::FnArg::Typed(t) => {
if let syn::Pat::Ident(ident) = &*t.pat {
Some(&ident.ident)
} else {
None
}
}
syn::FnArg::Receiver(_) => {
has_self = true;
None
}
})
.collect();
let mut signature = i.sig.inputs.clone();
if !has_self {
signature.insert(0, syn::parse_quote! { &self })
}
let old_return_type: syn::Type = match &i.sig.output {
syn::ReturnType::Default => syn::parse_quote! { () },
syn::ReturnType::Type(_, t) => syn::parse_quote! { #t },
};
let return_type: syn::ReturnType = syn::parse_quote! { -> Result<#old_return_type, mer::frontends::derive::Error<__B::Error>> };
quote! {
pub fn #item_name(#signature) #return_type {
log::debug!("frontend procedure calling: {}", stringify!(#item_name));
let ser_payload = __B::serialize(&(#( #arguments ),*))?;
let reply = self.__call.as_ref().unwrap()(&mer::Call {
procedure: stringify!(#item_name).to_string(),
payload: &ser_payload,
})?
.payload;
let deser_reply = __B::deserialize::<#old_return_type>(&reply);
Ok(deser_reply?)
}
}
})
.collect();
Ok(quote! {
trait #trait_name #impl_generic_def #where_clause {
#( #item_methods )*
}
impl #impl_generic_def #trait_name #impl_generics for #service_name #impl_generics #where_clause {
#( #items )*
}
impl #impl_generic_def #service_name #impl_generics #where_clause {
fn __receive(&self, call: &mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, mer::frontends::derive::Error<__B::Error>> {
match call.procedure.as_str() {
#( #receiver_impl_items ),*
_ => Err(mer::frontends::derive::Error::UnknownProcedure {}),
}
}
}
impl #impl_generic_def #service_name #impl_generics #where_clause {
#( #caller_impl_items )*
}
})
}
pub fn expand_struct(input: &syn::ItemStruct) -> Result<TokenStream, Vec<syn::Error>> {
let struct_name = &input.ident;
let struct_name_init = format_ident!("{}Init", &input.ident);
let struct_generics = &input.generics;
let where_clause = &struct_generics.where_clause;
let fields = match &input.fields {
syn::Fields::Named(named) => {
let fields: Vec<&syn::Field> = named.named.iter().collect();
quote! { #( #fields ),* }
}
rest => quote! { #rest },
};
let field_names = match &input.fields {
syn::Fields::Named(named) => named.named.iter().filter_map(|f| f.ident.clone()).map(|f| quote! { #f: self.#f }).collect(),
_ => vec![],
};
let mut impl_generic_def = struct_generics.clone();
impl_generic_def.params.insert(0, syn::parse_quote! { __B: mer::interfaces::Backend<'__a> });
impl_generic_def.params.insert(0, syn::parse_quote! { '__a });
let mut impl_generics = struct_generics.clone();
impl_generics.params.insert(0, syn::parse_quote! { __B });
impl_generics.params.insert(0, syn::parse_quote! { '__a });
Ok(quote! {
struct #struct_name #impl_generic_def #where_clause {
#fields,
__call: Option<Box<dyn Fn(&mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, __B::Error> + '__a + Send>>
}
#[derive(Default)]
struct #struct_name_init #struct_generics #where_clause {
#fields,
}
impl #struct_generics #struct_name_init #struct_generics #where_clause {
pub fn init<'__a, __B: mer::interfaces::Backend<'__a>>(self) -> #struct_name #impl_generics {
#struct_name {
#( #field_names ),*,
__call: None
}
}
}
impl #impl_generic_def mer::interfaces::Frontend<'__a, __B> for #struct_name #impl_generics #where_clause {
type Intermediate = String;
type Error = mer::frontends::derive::Error<__B::Error>;
fn caller<__T>(&mut self, caller: __T) -> Result<(), mer::frontends::derive::Error<__B::Error>>
where
__T: Fn(&mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, __B::Error> + '__a + Send,
__T: 'static,
{
self.__call = Some(Box::new(caller));
Ok(())
}
fn receive(&self, call: &mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, mer::frontends::derive::Error<__B::Error>> {
log::debug!("receiving: Call {{ prodecure: {:?}, payload: ... }}", &call.procedure);
self.__receive(call).map_err(core::convert::Into::into)
}
}
})
}
use darling::FromMeta;
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, AttributeArgs, ItemStruct, ItemTrait};
#[macro_use]
mod frontend;
fn to_compile_errors(errors: Vec<syn::Error>) -> proc_macro2::TokenStream {
let compile_errors = errors.iter().map(syn::Error::to_compile_error);
quote!(#(#compile_errors)*)
}
#[proc_macro_attribute]
pub fn frontend(args: TokenStream, input: TokenStream) -> TokenStream {
let args = parse_macro_input!(args as AttributeArgs);
let args_parsed = match frontend::AttrArgs::from_list(&args) {
Ok(v) => v,
Err(e) => {
return TokenStream::from(e.write_errors());
}
};
if args_parsed.target.is_some() {
let input = parse_macro_input!(input as ItemTrait);
frontend::expand_trait(&args_parsed, &input).unwrap_or_else(to_compile_errors).into()
} else {
let input = parse_macro_input!(input as ItemStruct);
frontend::expand_struct(&input).unwrap_or_else(to_compile_errors).into()
}
}
use mer_derive::*;
#[test]
fn test() {
#[frontend]
struct Data<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
pub offset: T,
}
#[frontend(target = "Data")]
trait Service<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
fn add(a: T, b: T) -> T::Output {
a + b
}
fn add_with_offset(&self, a: T, b: T) -> T::Output {
a + b + self.offset
}
}
}
use backends::InProcessChannel;
use mer::*;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
fn add(a: i32, b: i32) -> i32 {
a + b
}
#[test]
fn register_http() {
let register_caller = frontends::RegisterInit {}.init();
let register_receiver = frontends::RegisterInit {}.init();
register_caller.register("add", |(a, b)| add(a, b)).unwrap();
register_receiver.register("add", |(a, b)| add(a, b)).unwrap();
let mer_caller = MerInit {
backend: backends::HttpInit {
speak: "http://localhost:8080".parse::<hyper::Uri>().unwrap().into(),
listen: None,
..Default::default()
}
.init()
.unwrap(),
frontend: register_caller,
}
.init();
let mut mer_receiver = MerInit {
backend: backends::HttpInit {
speak: None,
listen: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080).into(),
..Default::default()
}
.init()
.unwrap(),
frontend: register_receiver,
}
.init();
mer_receiver.start().unwrap();
let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);
let result: i32 = mer_caller.frontend(|f| f.call("add", &(a, b)).unwrap()).unwrap();
assert_eq!(result, a + b);
}
#[test]
fn register_in_process() {
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
let register_caller = frontends::RegisterInit {}.init();
let register_receiver = frontends::RegisterInit {}.init();
register_caller.register("add", |(a, b)| add(a, b)).unwrap();
register_receiver.register("add", |(a, b)| add(a, b)).unwrap();
let (to, from): (Sender<InProcessChannel>, Receiver<InProcessChannel>) = mpsc::channel();
let mer_caller = MerInit {
backend: backends::InProcessInit { to: to.into(), ..Default::default() }.init().unwrap(),
frontend: register_caller,
}
.init();
let mut mer_receiver = MerInit {
backend: backends::InProcessInit {
from: from.into(),
..Default::default()
}
.init()
.unwrap(),
frontend: register_receiver,
}
.init();
mer_receiver.start().unwrap();
let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);
let result: i32 = mer_caller.frontend(|f| f.call("add", &(a, b)).unwrap()).unwrap();
assert_eq!(result, a + b);
}
#[test]
fn derive_http() {
#[mer_derive::frontend()]
struct Data<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
pub offset: T,
}
#[mer_derive::frontend(target = "Data")]
trait Receiver<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
fn add(a: T, b: T) -> T::Output {
a + b
}
fn add_with_offset(&self, a: T, b: T) -> T::Output {
a + b + self.offset
}
}
let mer_call = MerInit {
backend: backends::HttpInit {
speak: "http://localhost:8081".parse::<hyper::Uri>().unwrap().into(),
listen: None,
..Default::default()
}
.init()
.unwrap(),
frontend: DataInit::<i32> { offset: 32 }.init(),
}
.init();
let mut mer_receive = MerInit {
backend: backends::HttpInit {
speak: None,
listen: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081).into(),
..Default::default()
}
.init()
.unwrap(),
frontend: DataInit::<i32> { offset: 32 }.init(),
}
.init();
mer_receive.start().unwrap();
let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);
assert_eq!(
mer_call
.frontend(|f| {
println!("start");
let tmp = f.add(a, b).unwrap();
tmp
})
.unwrap(),
a + b
);
}
#[test]
fn derive_in_process() {
use std::sync::mpsc;
#[mer_derive::frontend()]
struct Data<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
pub offset: T,
}
#[mer_derive::frontend(target = "Data")]
trait Receiver<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
fn add(a: T, b: T) -> T::Output {
a + b
}
fn add_with_offset(&self, a: T, b: T) -> T::Output {
a + b + self.offset
}
}
let (to, from): (mpsc::Sender<InProcessChannel>, mpsc::Receiver<InProcessChannel>) = mpsc::channel();
let mer_caller = MerInit {
backend: backends::InProcessInit { to: to.into(), ..Default::default() }.init().unwrap(),
frontend: DataInit::<i32> { offset: 32 }.init(),
}
.init();
let mut mer_register = MerInit {
backend: backends::InProcessInit {
from: from.into(),
..Default::default()
}
.init()
.unwrap(),
frontend: DataInit::<i32> { offset: 32 }.init(),
}
.init();
mer_register.start().unwrap();
let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);
assert_eq!(mer_caller.frontend(|f| { f.add(a, b).unwrap() }).unwrap(), a + b);
}
use crate::interfaces;
use snafu::{OptionExt, ResultExt, Snafu};
use hyper::{
client::{connect::dns::GaiResolver, HttpConnector},
Body, Client, Method, Request, StatusCode,
};
use hyper::http::Uri;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Response, Server};
use std::sync::{Arc, Mutex};
use std::{fmt::Debug, net::SocketAddr};
use tokio::runtime::Runtime;
use tokio::sync;
use log::{debug, trace};
#[derive(Debug, Snafu)]
pub enum Error {
Serialize { from: serde_json::Error },
Deserialize { from: serde_json::Error },
SpeakingDisabled,
RequestBuilder { source: hyper::http::Error },
ParseResponseBodyBytes { source: hyper::Error },
ParseResponseBody { source: std::string::FromUtf8Error },
ClientRequest { source: hyper::Error },
FailedRequest { status: StatusCode },
NoListen,
NoReceiver,
BindServer { source: hyper::Error },
NoProcedureHeader { source: hyper::http::Error },
GetCallLockInReceiver,
RuntimeCreation { source: std::io::Error },
}
pub struct Http {
client: Client<HttpConnector<GaiResolver>, Body>,
speak: Option<Uri>,
listen: Option<SocketAddr>,
#[allow(clippy::type_complexity)]
receiver: Option<Arc<dyn Fn(Arc<Mutex<crate::Call<&String>>>) -> Arc<sync::Mutex<Result<crate::Reply<String>, Error>>> + Send + Sync>>,
runtime: Runtime,
}
impl Debug for Http {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
f.debug_struct("Http")
.field("client", &self.client)
.field("speak", &self.speak)
.field("listen", &self.listen)
.field("runtime", &self.runtime)
.finish()
}
}
pub struct HttpInit {
pub client: Client<HttpConnector<GaiResolver>, Body>,
pub speak: Option<Uri>,
pub listen: Option<SocketAddr>,
}
impl Default for HttpInit {
fn default() -> Self {
HttpInit {
client: Client::new(),
speak: None,
listen: None,
}
}
}
impl From<HttpInit> for Result<Http, Error> {
fn from(from: HttpInit) -> Self {
from.init()
}
}
impl HttpInit {
pub fn init(self) -> Result<Http, Error> {
trace!("HttpInit.init()");
let http = Http {
client: self.client,
speak: self.speak,
listen: self.listen,
receiver: None,
runtime: Runtime::new().context(RuntimeCreation {})?,
};
debug!("{:?}", &http);
Ok(http)
}
}
impl<'a> interfaces::Backend<'a> for Http {
type Intermediate = String;
type Error = Error;
fn start(&mut self) -> Result<(), Self::Error> {
trace!("Http.start()");
let listen = self.listen.context(NoListen)?;
let receiver = Arc::clone(self.receiver.as_ref().context(NoReceiver)?);
self.runtime.spawn(async move {
trace!("Http.runtime.spawn()");
let receiver = receiver.clone();
Server::bind(&listen)
.serve(make_service_fn(move |_| {
trace!("Http.runtime.spawn.serve()");
let receiver = receiver.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |request: Request<Body>| {
trace!("Http.runtime.spawn.serve.service_fn()");
debug!("{:?}", &listen);
let receiver = receiver.clone();
async move {
let procedure = if let Some(procedure) = request.headers().get("Procedure") {
match procedure.to_str() {
Ok(procedure) => procedure.to_owned(),
Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),
}
} else {
return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from("No Procedure provided"));
};
let body_bytes = match hyper::body::to_bytes(request.into_body()).await {
Ok(body_bytes) => body_bytes,
Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),
};
let body = match String::from_utf8(body_bytes.to_vec()) {
Ok(body) => body,
Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),
};
debug!("call Call {{ procedure: {:?}, payload: {:?} }}", &procedure, &body);
let reply_mutex = receiver(Arc::new(Mutex::new(crate::Call { procedure, payload: &body })));
let reply = &*reply_mutex.lock().await;
match reply {
Err(e) => Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(format!("{:?}", e))),
Ok(reply) => {
debug!("reply Reply {{ payload: {:?} }}", &reply.payload);
Response::builder().status(StatusCode::OK).body(Body::from(reply.payload.to_owned()))
}
}
}
}))
}
}))
.await
.unwrap();
});
Ok(())
}
fn stop(&mut self) -> Result<(), Self::Error> {
trace!("Http.stop()");
Ok(())
}
fn receiver<T>(&mut self, receiver: T) -> Result<(), Self::Error>
where
T: Fn(&crate::Call<&Self::Intermediate>) -> Result<crate::Reply<Self::Intermediate>, Self::Error> + Send + Sync + 'static,
{
trace!("Http.receiver()");
self.receiver = Some(Arc::new(move |call: Arc<Mutex<crate::Call<&String>>>| {
trace!("(Http.receiver)()");
let call = match call.as_ref().lock() {
Ok(c) => c,
Err(_) => return Arc::new(sync::Mutex::new(Err(Error::GetCallLockInReceiver))),
};
debug!("calling receiver");
match receiver(&*call) {
Ok(reply) => Arc::new(sync::Mutex::new(Ok(crate::Reply { payload: reply.payload }))),
Err(e) => Arc::new(sync::Mutex::new(Err(e))),
}
}));
Ok(())
}
fn call(&mut self, call: &crate::Call<&Self::Intermediate>) -> Result<crate::Reply<Self::Intermediate>, Self::Error> {
trace!("Http.call()");
debug!("{:?}", &self.speak);
match &self.speak {
None => Err(Error::SpeakingDisabled),
Some(uri) => self.runtime.block_on(async {
let request = Request::builder()
.method(Method::POST)
.uri(uri)
.header("Procedure", &call.procedure)
.body(Body::from(call.payload.clone()))
.context(RequestBuilder)?;
debug!("request {:?}", &request);
let response = self.client.request(request).await.context(ClientRequest)?;
debug!("response {:?}", &response);
let status = response.status();
let body_bytes = hyper::body::to_bytes(response.into_body()).await.context(ParseResponseBodyBytes)?;
let body = String::from_utf8(body_bytes.to_vec()).context(ParseResponseBody)?;
match status {
StatusCode::OK => Ok(crate::Reply { payload: body }),
_ => Err(Error::FailedRequest { status }),
}
}),
}
}
fn serialize<T: serde::Serialize>(from: &T) -> Result<String, Self::Error> {
trace!("Http.serialize()");
serde_json::to_string(from).map_err(|e| Error::Serialize { from: e })
}
fn deserialize<'b, T>(from: &'b Self::Intermediate) -> Result<T, Self::Error>
where
T: for<'de> serde::Deserialize<'de>,
{
trace!("Http.deserialize()");
serde_json::from_str(&from).map_err(|e| Error::Deserialize { from: e })
}
}
use crate::interfaces;
use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
pub type InProcessChannel = (crate::Call<String>, Sender<Result<crate::Reply<String>, Error>>);
#[derive(Debug, Snafu)]
pub enum Error {
Serialize { from: serde_json::Error },
Deserialize { from: serde_json::Error },
GetCallLockInReceiver {},
NoReceiver {},
NoCallerChannel {},
NoReceiverChannel {},
CallerRecv { source: std::sync::mpsc::RecvError },
CallerSend { source: std::sync::mpsc::SendError<InProcessChannel> },
Reply { from: String },
RuntimeCreation { source: std::io::Error },
}
pub struct InProcess {
#[allow(clippy::type_complexity)]
receiver: Option<Arc<dyn Fn(Arc<Mutex<crate::Call<&String>>>) -> Arc<tokio::sync::Mutex<Result<crate::Reply<String>, Error>>> + Send + Sync>>,
runtime: Runtime,
to: Option<Sender<InProcessChannel>>,
from: Option<Arc<Mutex<Receiver<InProcessChannel>>>>,
}
pub struct InProcessInit {
pub to: Option<Sender<InProcessChannel>>,
pub from: Option<Receiver<InProcessChannel>>,
}
impl Default for InProcessInit {
fn default() -> Self {
InProcessInit { to: None, from: None }
}
}
impl InProcessInit {
pub fn init(self) -> Result<InProcess, Error> {
Ok(InProcess {
receiver: None,
to: self.to,
from: if let Some(from) = self.from { Some(Arc::new(Mutex::new(from))) } else { None },
runtime: Runtime::new().context(RuntimeCreation)?,
})
}
}
impl<'a> interfaces::Backend<'a> for InProcess {
type Intermediate = String;
type Error = Error;
fn start(&mut self) -> Result<(), Self::Error> {
let from = self.from.as_ref().context(NoReceiverChannel {})?.clone();
let receiver = self.receiver.as_ref().context(NoReceiver {})?.clone();
self.runtime.spawn(async move {
loop {
let (call, tx) = from.lock().unwrap().recv().unwrap();
let reply_mutex = receiver(Arc::new(Mutex::new(crate::Call {
procedure: call.procedure,
payload: &call.payload,
})));
let reply = &*reply_mutex.lock().await;
tx.send(match reply {
Ok(r) => Ok(crate::Reply { payload: r.payload.clone() }),
Err(e) => Err(Error::Reply { from: format!("{:?}", e) }),
})
.unwrap();
}
});
Ok(())
}
fn stop(&mut self) -> Result<(), Self::Error> {
Ok(())
}
fn receiver<T>(&mut self, receiver: T) -> Result<(), Self::Error>
where
T: Fn(&crate::Call<&Self::Intermediate>) -> Result<crate::Reply<Self::Intermediate>, Self::Error> + Send + Sync + 'static,
{
self.receiver = Some(Arc::new(move |call: Arc<Mutex<crate::Call<&String>>>| {
let call = match call.as_ref().lock() {
Ok(c) => c,
Err(_) => return Arc::new(tokio::sync::Mutex::new(Err(Error::GetCallLockInReceiver {}))),
};
match receiver(&*call) {
Ok(reply) => Arc::new(tokio::sync::Mutex::new(Ok(crate::Reply { payload: reply.payload }))),
Err(e) => Arc::new(tokio::sync::Mutex::new(Err(e))),
}
}));
Ok(())
}
fn call(&mut self, call: &crate::Call<&Self::Intermediate>) -> Result<crate::Reply<Self::Intermediate>, Self::Error> {
#[allow(clippy::type_complexity)]
let (tx, rx): (Sender<Result<crate::Reply<String>, Error>>, Receiver<Result<crate::Reply<String>, Error>>) = mpsc::channel();
self
.to
.as_ref()
.context(NoCallerChannel {})?
.send((
crate::Call {
procedure: call.procedure.clone(),
payload: call.payload.clone(),
},
tx,
))
.context(CallerSend {})?;
rx.recv().context(CallerRecv {})?
}
fn serialize<T: serde::Serialize>(from: &T) -> Result<String, Self::Error> {
serde_json::to_string(from).map_err(|e| Error::Serialize { from: e })
}
fn deserialize<'b, T>(from: &'b Self::Intermediate) -> Result<T, Self::Error>
where
T: for<'de> serde::Deserialize<'de>,
{
serde_json::from_str(&from).map_err(|e| Error::Deserialize { from: e })
}
}
use snafu::Snafu;
#[derive(Debug, Snafu)]
pub enum Error<B: core::fmt::Display> {
FromBackend { from: B },
UnknownProcedure,
MutexLock,
}
impl<B: snafu::Error> From<B> for Error<B> {
fn from(from: B) -> Self {
Error::FromBackend { from }
}
}
use interfaces::Backend;
use crate::interfaces;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use snafu::Snafu;
#[derive(Debug, Snafu)]
pub enum Error<B: core::fmt::Display> {
FromBackend { from: B },
ProcedureNotRegistered {},
}
impl<B: snafu::Error> From<B> for Error<B> {
fn from(from: B) -> Self {
Error::FromBackend { from }
}
}
pub struct Register<'a, B: Backend<'a>> {
#[allow(clippy::type_complexity)]
procedures: Arc<Mutex<HashMap<String, Box<dyn Fn(&crate::Call<&B::Intermediate>) -> Result<crate::Reply<B::Intermediate>, Error<B::Error>> + 'a>>>>,
#[allow(clippy::type_complexity)]
call: Option<Box<dyn Fn(&crate::Call<&B::Intermediate>) -> Result<crate::Reply<B::Intermediate>, B::Error> + 'a + Send>>,
}
unsafe impl<'a, T: Backend<'a>> Send for Register<'a, T> {}
pub struct RegisterInit {}
impl Default for RegisterInit {
fn default() -> Self {
RegisterInit {}
}
}
impl RegisterInit {
pub fn init<'a, B: Backend<'a>>(self) -> Register<'a, B> {
Register {
procedures: Arc::new(Mutex::new(HashMap::new())),
call: None,
}
}
}
impl<'a, B: Backend<'a>> Register<'a, B> {
pub fn register<P, C: for<'de> serde::Deserialize<'de>, R: serde::Serialize>(&self, name: &str, procedure: P) -> Result<(), Error<B::Error>>
where
P: Fn(C) -> R + 'a,
{
self.procedures.lock().unwrap().insert(
name.to_string(),
Box::new(move |call: &crate::Call<&B::Intermediate>| {
let reply = procedure(B::deserialize::<C>(call.payload)?);
Ok(crate::Reply { payload: B::serialize::<R>(&reply)? })
}),
);
Ok(())
}
pub fn call<C: serde::Serialize, R: for<'de> serde::Deserialize<'de>>(&self, procedure: &str, payload: &C) -> Result<R, Error<B::Error>> {
Ok(B::deserialize(
&self.call.as_ref().unwrap()(&crate::Call {
procedure: procedure.to_string(),
payload: &B::serialize(&payload)?,
})?
.payload,
)?)
}
}
impl<'a, B> interfaces::Frontend<'a, B> for Register<'a, B>
where
B: interfaces::Backend<'a>,
{
type Intermediate = String;
type Error = Error<B::Error>;
fn caller<T>(&mut self, caller: T) -> Result<(), Self::Error>
where
T: Fn(&crate::Call<&B::Intermediate>) -> Result<crate::Reply<B::Intermediate>, B::Error> + 'a + Send,
{
self.call = Some(Box::new(caller));
Ok(())
}
fn receive(&self, call: &crate::Call<&B::Intermediate>) -> Result<crate::Reply<B::Intermediate>, Error<B::Error>> {
self.procedures.lock().unwrap().get(&call.procedure).unwrap()(call)
}
}
# serde-transcode = { version = "1.2.0", git = "https://github.com/volllly/serde-transcode", default-features = false, features = ["alloc"] }
serde_json = { version = "1.0.57", optional = true }
# erased-serde = { version = "0.3.12", default-features = false, features = ["alloc"] }
hyper = { version = "0.14", optional = true }
tokio = { version = "1.0", optional = true }
use mer::*;
fn add(a: i32, b: i32) -> i32 {
a + b
}
#[test]
fn register_in_process() {
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
let register_caller = mer_frontend_register::RegisterInit {}.init();
let register_receiver = mer_frontend_register::RegisterInit {}.init();
register_caller.register("add", |(a, b)| add(a, b)).unwrap();
register_receiver.register("add", |(a, b)| add(a, b)).unwrap();
let (to, from): (Sender<mer_backend_in_process::InProcessChannel>, Receiver<mer_backend_in_process::InProcessChannel>) = mpsc::channel();
let mer_caller = MerInit {
backend: mer_backend_in_process::InProcessInit { to: to.into(), ..Default::default() }.init().unwrap(),
frontend: register_caller,
}
.init();
let mut mer_receiver = MerInit {
backend: mer_backend_in_process::InProcessInit {
from: from.into(),
..Default::default()
}
.init()
.unwrap(),
frontend: register_receiver,
}
.init();
mer_receiver.start().unwrap();
let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);
let result: i32 = mer_caller.frontend(|f| f.call("add", &(a, b)).unwrap()).unwrap();
assert_eq!(result, a + b);
}
use mer::{
interfaces::{Backend, Frontend},
Call, Reply,
};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use snafu::Snafu;
#[derive(Debug, Snafu)]
pub enum Error<B: core::fmt::Display> {
FromBackend { from: B },
ProcedureNotRegistered {},
}
impl<B: snafu::Error> From<B> for Error<B> {
fn from(from: B) -> Self {
Error::FromBackend { from }
}
}
pub struct Register<'a, B: Backend<'a>> {
#[allow(clippy::type_complexity)]
procedures: Arc<Mutex<HashMap<String, Box<dyn Fn(&Call<&B::Intermediate>) -> Result<Reply<B::Intermediate>, Error<B::Error>> + 'a>>>>,
#[allow(clippy::type_complexity)]
call: Option<Box<dyn Fn(&Call<&B::Intermediate>) -> Result<Reply<B::Intermediate>, B::Error> + 'a + Send>>,
}
unsafe impl<'a, T: Backend<'a>> Send for Register<'a, T> {}
pub struct RegisterInit {}
impl Default for RegisterInit {
fn default() -> Self {
RegisterInit {}
}
}
impl RegisterInit {
pub fn init<'a, B: Backend<'a>>(self) -> Register<'a, B> {
Register {
procedures: Arc::new(Mutex::new(HashMap::new())),
call: None,
}
}
}
impl<'a, B: Backend<'a>> Register<'a, B> {
pub fn register<P, C: for<'de> serde::Deserialize<'de>, R: serde::Serialize>(&self, name: &str, procedure: P) -> Result<(), Error<B::Error>>
where
P: Fn(C) -> R + 'a,
{
self.procedures.lock().unwrap().insert(
name.to_string(),
Box::new(move |call: &Call<&B::Intermediate>| {
let reply = procedure(B::deserialize::<C>(call.payload)?);
Ok(Reply { payload: B::serialize::<R>(&reply)? })
}),
);
Ok(())
}
pub fn call<C: serde::Serialize, R: for<'de> serde::Deserialize<'de>>(&self, procedure: &str, payload: &C) -> Result<R, Error<B::Error>> {
Ok(B::deserialize(
&self.call.as_ref().unwrap()(&Call {
procedure: procedure.to_string(),
payload: &B::serialize(&payload)?,
})?
.payload,
)?)
}
}
impl<'a, B> Frontend<'a, B> for Register<'a, B>
where
B: Backend<'a>,
{
type Intermediate = String;
type Error = Error<B::Error>;
fn caller<T>(&mut self, caller: T) -> Result<(), Self::Error>
where
T: Fn(&Call<&B::Intermediate>) -> Result<Reply<B::Intermediate>, B::Error> + 'a + Send,
{
self.call = Some(Box::new(caller));
Ok(())
}
fn receive(&self, call: &Call<&B::Intermediate>) -> Result<Reply<B::Intermediate>, Error<B::Error>> {
self.procedures.lock().unwrap().get(&call.procedure).unwrap()(call)
}
}
[package]
name = "mer_frontend_register"
version = "0.1.0"
authors = ["Paul Volavsek <paul.volavsek@gmail.com>"]
edition = "2018"
[features]
[dependencies]
mer = { path = "../../mer" }
serde = "1.0.116"
snafu = { version = "0.6.9", default-features = false }
log = { version = "0.4", default-features = false }
[dev-dependencies]
mer_backend_in_process = { path = "../../backends/in-process" }
rand = "0.8"
flexi_logger = "0.16"
[[test]]
name = "test"
path = "test/tests.rs"
use mer_frontend_derive::*;
#[test]
fn test() {
#[frontend]
struct Data<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
pub offset: T,
}
#[frontend(target = "Data")]
trait Service<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
fn add(a: T, b: T) -> T::Output {
a + b
}
fn add_with_offset(&self, a: T, b: T) -> T::Output {
a + b + self.offset
}
}
}
use darling::FromMeta;
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, AttributeArgs, ItemStruct, ItemTrait};
#[macro_use]
mod frontend;
fn to_compile_errors(errors: Vec<syn::Error>) -> proc_macro2::TokenStream {
let compile_errors = errors.iter().map(syn::Error::to_compile_error);
quote!(#(#compile_errors)*)
}
#[proc_macro_attribute]
pub fn frontend(args: TokenStream, input: TokenStream) -> TokenStream {
let args = parse_macro_input!(args as AttributeArgs);
let args_parsed = match frontend::AttrArgs::from_list(&args) {
Ok(v) => v,
Err(e) => {
return TokenStream::from(e.write_errors());
}
};
if args_parsed.target.is_some() {
let input = parse_macro_input!(input as ItemTrait);
frontend::expand_trait(&args_parsed, &input).unwrap_or_else(to_compile_errors).into()
} else {
let input = parse_macro_input!(input as ItemStruct);
frontend::expand_struct(&input).unwrap_or_else(to_compile_errors).into()
}
}
use darling::FromMeta;
use proc_macro2::TokenStream;
use quote::{format_ident, quote};
#[derive(Debug, FromMeta)]
pub struct AttrArgs {
#[darling(default)]
pub target: Option<syn::Path>,
}
pub fn expand_trait(args: &AttrArgs, input: &syn::ItemTrait) -> Result<TokenStream, Vec<syn::Error>> {
let trait_name = &input.ident;
let trait_generics = &input.generics;
let service_name = args.target.as_ref().unwrap();
let where_clause = &trait_generics.where_clause;
let items = &input.items;
let item_methods: Vec<syn::TraitItemMethod> = items
.iter()
.filter_map(|i| match i {
syn::TraitItem::Method(m) => {
let mut method = m.clone();
method.default.take();
Some(method)
}
_ => None,
})
.collect();
let mut impl_generic_def = trait_generics.clone();
impl_generic_def.params.insert(0, syn::parse_quote! { __B: mer::interfaces::Backend<'__a> });
impl_generic_def.params.insert(0, syn::parse_quote! { '__a });
let mut impl_generics = trait_generics.clone();
impl_generics.params.insert(0, syn::parse_quote! { __B });
impl_generics.params.insert(0, syn::parse_quote! { '__a });
let receiver_impl_items: Vec<TokenStream> = item_methods
.iter()
.map(|i| {
let item_name = format_ident!("{}", &i.sig.ident);
let mut has_self: bool = false;
let arguments: Vec<&Box<syn::Type>> = i
.sig
.inputs
.iter()
.filter_map(|a| match a {
syn::FnArg::Typed(t) => Some(&t.ty),
syn::FnArg::Receiver(_) => {
has_self = true;
None
}
})
.collect();
let index = (0..arguments.len()).map(syn::Index::from);
let deser = quote! {
let deser_payload = __B::deserialize::<(#( #arguments ),*)>(call.payload)?;
};
let reply = match has_self {
true => quote! {
let reply = <Self as #trait_name #impl_generics>::#item_name(self, #( deser_payload.#index),*);
},
false => quote! {
let reply = <Self as #trait_name #impl_generics>::#item_name(#( deser_payload.#index),*);
},
};
let ser = quote! {
let ser_reply = __B::serialize(&reply)?;
Ok(mer::Reply { payload: ser_reply })
};
quote! {
stringify!(#item_name) => {
log::debug!("frontend procedure receiving: {}", stringify!(#item_name));
#deser
#reply
#ser
}
}
})
.collect();
let caller_impl_items: Vec<TokenStream> = item_methods
.iter()
.map(|i| {
let item_name = &i.sig.ident;
let mut has_self = false;
let arguments: Vec<&syn::Ident> = i
.sig
.inputs
.iter()
.filter_map(|a| match a {
syn::FnArg::Typed(t) => {
if let syn::Pat::Ident(ident) = &*t.pat {
Some(&ident.ident)
} else {
None
}
}
syn::FnArg::Receiver(_) => {
has_self = true;
None
}
})
.collect();
let mut signature = i.sig.inputs.clone();
if !has_self {
signature.insert(0, syn::parse_quote! { &self })
}
let old_return_type: syn::Type = match &i.sig.output {
syn::ReturnType::Default => syn::parse_quote! { () },
syn::ReturnType::Type(_, t) => syn::parse_quote! { #t },
};
let return_type: syn::ReturnType = syn::parse_quote! { -> Result<#old_return_type, mer_frontend_derive::Error<__B::Error>> };
quote! {
pub fn #item_name(#signature) #return_type {
log::debug!("frontend procedure calling: {}", stringify!(#item_name));
let ser_payload = __B::serialize(&(#( #arguments ),*))?;
let reply = self.__call.as_ref().unwrap()(&mer::Call {
procedure: stringify!(#item_name).to_string(),
payload: &ser_payload,
})?
.payload;
let deser_reply = __B::deserialize::<#old_return_type>(&reply);
Ok(deser_reply?)
}
}
})
.collect();
Ok(quote! {
trait #trait_name #impl_generic_def #where_clause {
#( #item_methods )*
}
impl #impl_generic_def #trait_name #impl_generics for #service_name #impl_generics #where_clause {
#( #items )*
}
impl #impl_generic_def #service_name #impl_generics #where_clause {
fn __receive(&self, call: &mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, mer_frontend_derive::Error<__B::Error>> {
match call.procedure.as_str() {
#( #receiver_impl_items ),*
_ => Err(mer_frontend_derive::Error::UnknownProcedure {}),
}
}
}
impl #impl_generic_def #service_name #impl_generics #where_clause {
#( #caller_impl_items )*
}
})
}
pub fn expand_struct(input: &syn::ItemStruct) -> Result<TokenStream, Vec<syn::Error>> {
let struct_name = &input.ident;
let struct_name_init = format_ident!("{}Init", &input.ident);
let struct_generics = &input.generics;
let where_clause = &struct_generics.where_clause;
let fields = match &input.fields {
syn::Fields::Named(named) => {
let fields: Vec<&syn::Field> = named.named.iter().collect();
quote! { #( #fields ),* }
}
rest => quote! { #rest },
};
let field_names = match &input.fields {
syn::Fields::Named(named) => named.named.iter().filter_map(|f| f.ident.clone()).map(|f| quote! { #f: self.#f }).collect(),
_ => vec![],
};
let mut impl_generic_def = struct_generics.clone();
impl_generic_def.params.insert(0, syn::parse_quote! { __B: mer::interfaces::Backend<'__a> });
impl_generic_def.params.insert(0, syn::parse_quote! { '__a });
let mut impl_generics = struct_generics.clone();
impl_generics.params.insert(0, syn::parse_quote! { __B });
impl_generics.params.insert(0, syn::parse_quote! { '__a });
Ok(quote! {
struct #struct_name #impl_generic_def #where_clause {
#fields,
__call: Option<Box<dyn Fn(&mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, __B::Error> + '__a + Send>>
}
#[derive(Default)]
struct #struct_name_init #struct_generics #where_clause {
#fields,
}
impl #struct_generics #struct_name_init #struct_generics #where_clause {
pub fn init<'__a, __B: mer::interfaces::Backend<'__a>>(self) -> #struct_name #impl_generics {
#struct_name {
#( #field_names ),*,
__call: None
}
}
}
impl #impl_generic_def mer::interfaces::Frontend<'__a, __B> for #struct_name #impl_generics #where_clause {
type Intermediate = String;
type Error = mer_frontend_derive::Error<__B::Error>;
fn caller<__T>(&mut self, caller: __T) -> Result<(), mer_frontend_derive::Error<__B::Error>>
where
__T: Fn(&mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, __B::Error> + '__a + Send,
__T: 'static,
{
self.__call = Some(Box::new(caller));
Ok(())
}
fn receive(&self, call: &mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, mer_frontend_derive::Error<__B::Error>> {
log::debug!("receiving: Call {{ prodecure: {:?}, payload: ... }}", &call.procedure);
self.__receive(call).map_err(core::convert::Into::into)
}
}
})
}
[package]
name = "mer_frontend_derive_macros"
version = "0.1.0"
authors = ["Paul Volavsek <paul.volavsek@gmail.com>"]
edition = "2018"
[features]
default = ["std"]
std = [ "log/std", "serde/std" ]
[dependencies]
syn = "1.0"
quote = "1.0"
proc-macro2 = "1.0"
darling = "0.12"
log = { version = "0.4", default-features = false }
[dev-dependencies]
mer_frontend_derive = { path = "../core" }
mer = { path = "../../../mer" }
serde = { version = "1.0", default-features = false }
[lib]
proc-macro = true
[[test]]
name = "test"
path = "test/tests.rs"
use mer::*;
#[test]
fn derive_in_process() {
use std::sync::mpsc;
#[mer_frontend_derive::frontend()]
struct Data<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
pub offset: T,
}
#[mer_frontend_derive::frontend(target = "Data")]
trait Receiver<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
fn add(a: T, b: T) -> T::Output {
a + b
}
fn add_with_offset(&self, a: T, b: T) -> T::Output {
a + b + self.offset
}
}
let (to, from): (mpsc::Sender<mer_backend_in_process::InProcessChannel>, mpsc::Receiver<mer_backend_in_process::InProcessChannel>) = mpsc::channel();
let mer_caller = MerInit {
backend: mer_backend_in_process::InProcessInit { to: to.into(), ..Default::default() }.init().unwrap(),
frontend: DataInit::<i32> { offset: 32 }.init(),
}
.init();
let mut mer_register = MerInit {
backend: mer_backend_in_process::InProcessInit {
from: from.into(),
..Default::default()
}
.init()
.unwrap(),
frontend: DataInit::<i32> { offset: 32 }.init(),
}
.init();
mer_register.start().unwrap();
let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);
assert_eq!(mer_caller.frontend(|f| { f.add(a, b).unwrap() }).unwrap(), a + b);
}
use snafu::Snafu;
pub use mer_frontend_derive_macros::frontend;
#[derive(Debug, Snafu)]
pub enum Error<B: core::fmt::Display> {
FromBackend { from: B },
UnknownProcedure,
MutexLock,
}
impl<B: snafu::Error> From<B> for Error<B> {
fn from(from: B) -> Self {
Error::FromBackend { from }
}
}
[package]
name = "mer_frontend_derive"
version = "0.1.0"
authors = ["Paul Volavsek <paul.volavsek@gmail.com>"]
edition = "2018"
[features]
default = ["std"]
std = [
"snafu/std",
"log/std"
]
[dependencies]
mer = { path = "../../../mer" }
mer_frontend_derive_macros = { path = "../macros" }
snafu = { version = "0.6.9", default-features = false }
log = { version = "0.4", default-features = false }
[dev-dependencies]
mer_backend_in_process = { path = "../../../backends/in-process" }
serde = "1.0"
rand = "0.8"
flexi_logger = "0.16"
[[test]]
name = "test"
path = "test/tests.rs"
use mer::*;
fn add(a: i32, b: i32) -> i32 {
a + b
}
#[test]
fn register_in_process() {
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
let register_caller = mer_frontend_register::RegisterInit {}.init();
let register_receiver = mer_frontend_register::RegisterInit {}.init();
register_caller.register("add", |(a, b)| add(a, b)).unwrap();
register_receiver.register("add", |(a, b)| add(a, b)).unwrap();
let (to, from): (Sender<mer_backend_in_process::InProcessChannel>, Receiver<mer_backend_in_process::InProcessChannel>) = mpsc::channel();
let mer_caller = MerInit {
backend: mer_backend_in_process::InProcessInit { to: to.into(), ..Default::default() }.init().unwrap(),
frontend: register_caller,
}
.init();
let mut mer_receiver = MerInit {
backend: mer_backend_in_process::InProcessInit {
from: from.into(),
..Default::default()
}
.init()
.unwrap(),
frontend: register_receiver,
}
.init();
mer_receiver.start().unwrap();
let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);
let result: i32 = mer_caller.frontend(|f| f.call("add", &(a, b)).unwrap()).unwrap();
assert_eq!(result, a + b);
}
use mer::{interfaces::Backend, Call, Reply};
use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
pub type InProcessChannel = (Call<String>, Sender<Result<Reply<String>, Error>>);
#[derive(Debug, Snafu)]
pub enum Error {
Serialize { from: serde_json::Error },
Deserialize { from: serde_json::Error },
GetCallLockInReceiver {},
NoReceiver {},
NoCallerChannel {},
NoReceiverChannel {},
CallerRecv { source: std::sync::mpsc::RecvError },
CallerSend { source: std::sync::mpsc::SendError<InProcessChannel> },
ReplyError { from: String },
RuntimeCreation { source: std::io::Error },
}
pub struct InProcess {
#[allow(clippy::type_complexity)]
receiver: Option<Arc<dyn Fn(Arc<Mutex<Call<&String>>>) -> Arc<tokio::sync::Mutex<Result<Reply<String>, Error>>> + Send + Sync>>,
runtime: Runtime,
to: Option<Sender<InProcessChannel>>,
from: Option<Arc<Mutex<Receiver<InProcessChannel>>>>,
}
pub struct InProcessInit {
pub to: Option<Sender<InProcessChannel>>,
pub from: Option<Receiver<InProcessChannel>>,
}
impl Default for InProcessInit {
fn default() -> Self {
InProcessInit { to: None, from: None }
}
}
impl InProcessInit {
pub fn init(self) -> Result<InProcess, Error> {
Ok(InProcess {
receiver: None,
to: self.to,
from: if let Some(from) = self.from { Some(Arc::new(Mutex::new(from))) } else { None },
runtime: Runtime::new().context(RuntimeCreation)?,
})
}
}
impl<'a> Backend<'a> for InProcess {
type Intermediate = String;
type Error = Error;
fn start(&mut self) -> Result<(), Self::Error> {
let from = self.from.as_ref().context(NoReceiverChannel {})?.clone();
let receiver = self.receiver.as_ref().context(NoReceiver {})?.clone();
self.runtime.spawn(async move {
loop {
let (call, tx) = from.lock().unwrap().recv().unwrap();
let reply_mutex = receiver(Arc::new(Mutex::new(Call {
procedure: call.procedure,
payload: &call.payload,
})));
let reply = &*reply_mutex.lock().await;
tx.send(match reply {
Ok(r) => Ok(Reply { payload: r.payload.clone() }),
Err(e) => Err(Error::ReplyError { from: format!("{:?}", e) }),
})
.unwrap();
}
});
Ok(())
}
fn stop(&mut self) -> Result<(), Self::Error> {
Ok(())
}
fn receiver<T>(&mut self, receiver: T) -> Result<(), Self::Error>
where
T: Fn(&Call<&Self::Intermediate>) -> Result<Reply<Self::Intermediate>, Self::Error> + Send + Sync + 'static,
{
self.receiver = Some(Arc::new(move |call: Arc<Mutex<Call<&String>>>| {
let call = match call.as_ref().lock() {
Ok(c) => c,
Err(_) => return Arc::new(tokio::sync::Mutex::new(Err(Error::GetCallLockInReceiver {}))),
};
match receiver(&*call) {
Ok(reply) => Arc::new(tokio::sync::Mutex::new(Ok(Reply { payload: reply.payload }))),
Err(e) => Arc::new(tokio::sync::Mutex::new(Err(e))),
}
}));
Ok(())
}
fn call(&mut self, call: &Call<&Self::Intermediate>) -> Result<Reply<Self::Intermediate>, Self::Error> {
#[allow(clippy::type_complexity)]
let (tx, rx): (Sender<Result<Reply<String>, Error>>, Receiver<Result<Reply<String>, Error>>) = mpsc::channel();
self
.to
.as_ref()
.context(NoCallerChannel {})?
.send((
Call {
procedure: call.procedure.clone(),
payload: call.payload.clone(),
},
tx,
))
.context(CallerSend {})?;
rx.recv().context(CallerRecv {})?
}
fn serialize<T: serde::Serialize>(from: &T) -> Result<String, Self::Error> {
serde_json::to_string(from).map_err(|e| Error::Serialize { from: e })
}
fn deserialize<'b, T>(from: &'b Self::Intermediate) -> Result<T, Self::Error>
where
T: for<'de> serde::Deserialize<'de>,
{
serde_json::from_str(&from).map_err(|e| Error::Deserialize { from: e })
}
}
[package]
name = "mer_backend_in_process"
version = "0.1.0"
authors = ["Paul Volavsek <paul.volavsek@gmail.com>"]
edition = "2018"
[features]
[dependencies]
mer = { path = "../../mer" }
serde = "1.0.116"
serde_json = "1.0.57"
tokio = { version = "1.0", features = ["rt", "rt-multi-thread"] }
snafu = "0.6.9"
log = "0.4"
[dev-dependencies]
mer_frontend_register = { path = "../../frontends/register" }
rand = "0.8"
flexi_logger = "0.16"
[[test]]
name = "test"
path = "test/tests.rs"
use mer::*;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
fn add(a: i32, b: i32) -> i32 {
a + b
}
#[test]
fn register_http() {
let register_caller = mer_frontend_register::RegisterInit {}.init();
let register_receiver = mer_frontend_register::RegisterInit {}.init();
register_caller.register("add", |(a, b)| add(a, b)).unwrap();
register_receiver.register("add", |(a, b)| add(a, b)).unwrap();
let mer_caller = MerInit {
backend: mer_backend_http::HttpInit {
speak: "http://localhost:8080".parse::<hyper::Uri>().unwrap().into(),
listen: None,
..Default::default()
}
.init()
.unwrap(),
frontend: register_caller,
}
.init();
let mut mer_receiver = MerInit {
backend: mer_backend_http::HttpInit {
speak: None,
listen: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080).into(),
..Default::default()
}
.init()
.unwrap(),
frontend: register_receiver,
}
.init();
mer_receiver.start().unwrap();
let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);
let result: i32 = mer_caller.frontend(|f| f.call("add", &(a, b)).unwrap()).unwrap();
assert_eq!(result, a + b);
}
use mer::{interfaces::Backend, Call, Reply};
use snafu::{OptionExt, ResultExt, Snafu};
use hyper::{
client::{connect::dns::GaiResolver, HttpConnector},
Body, Client, Method, Request, StatusCode,
};
use hyper::http::Uri;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Response, Server};
use std::sync::{Arc, Mutex};
use std::{fmt::Debug, net::SocketAddr};
use tokio::runtime::Runtime;
use tokio::sync;
use log::{debug, trace};
#[derive(Debug, Snafu)]
pub enum Error {
Serialize { from: serde_json::Error },
Deserialize { from: serde_json::Error },
SpeakingDisabled,
RequestBuilder { source: hyper::http::Error },
ParseResponseBodyBytes { source: hyper::Error },
ParseResponseBody { source: std::string::FromUtf8Error },
ClientRequest { source: hyper::Error },
FailedRequest { status: StatusCode },
NoListen,
NoReceiver,
BindServer { source: hyper::Error },
NoProcedureHeader { source: hyper::http::Error },
GetCallLockInReceiver,
RuntimeCreation { source: std::io::Error },
}
pub struct Http {
client: Client<HttpConnector<GaiResolver>, Body>,
speak: Option<Uri>,
listen: Option<SocketAddr>,
#[allow(clippy::type_complexity)]
receiver: Option<Arc<dyn Fn(Arc<Mutex<Call<&String>>>) -> Arc<sync::Mutex<Result<Reply<String>, Error>>> + Send + Sync>>,
runtime: Runtime,
}
impl Debug for Http {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
f.debug_struct("Http")
.field("client", &self.client)
.field("speak", &self.speak)
.field("listen", &self.listen)
.field("runtime", &self.runtime)
.finish()
}
}
pub struct HttpInit {
pub client: Client<HttpConnector<GaiResolver>, Body>,
pub speak: Option<Uri>,
pub listen: Option<SocketAddr>,
}
impl Default for HttpInit {
fn default() -> Self {
HttpInit {
client: Client::new(),
speak: None,
listen: None,
}
}
}
impl From<HttpInit> for Result<Http, Error> {
fn from(from: HttpInit) -> Self {
from.init()
}
}
impl HttpInit {
pub fn init(self) -> Result<Http, Error> {
trace!("HttpInit.init()");
let http = Http {
client: self.client,
speak: self.speak,
listen: self.listen,
receiver: None,
runtime: Runtime::new().context(RuntimeCreation {})?,
};
debug!("{:?}", &http);
Ok(http)
}
}
impl<'a> Backend<'a> for Http {
type Intermediate = String;
type Error = Error;
fn start(&mut self) -> Result<(), Self::Error> {
trace!("Http.start()");
let listen = self.listen.context(NoListen)?;
let receiver = Arc::clone(self.receiver.as_ref().context(NoReceiver)?);
self.runtime.spawn(async move {
trace!("Http.runtime.spawn()");
let receiver = receiver.clone();
Server::bind(&listen)
.serve(make_service_fn(move |_| {
trace!("Http.runtime.spawn.serve()");
let receiver = receiver.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |request: Request<Body>| {
trace!("Http.runtime.spawn.serve.service_fn()");
debug!("{:?}", &listen);
let receiver = receiver.clone();
async move {
let procedure = if let Some(procedure) = request.headers().get("Procedure") {
match procedure.to_str() {
Ok(procedure) => procedure.to_owned(),
Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),
}
} else {
return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from("No Procedure provided"));
};
let body_bytes = match hyper::body::to_bytes(request.into_body()).await {
Ok(body_bytes) => body_bytes,
Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),
};
let body = match String::from_utf8(body_bytes.to_vec()) {
Ok(body) => body,
Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),
};
debug!("call Call {{ procedure: {:?}, payload: {:?} }}", &procedure, &body);
let reply_mutex = receiver(Arc::new(Mutex::new(crate::Call { procedure, payload: &body })));
let reply = &*reply_mutex.lock().await;
match reply {
Err(e) => Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(format!("{:?}", e))),
Ok(reply) => {
debug!("reply Reply {{ payload: {:?} }}", &reply.payload);
Response::builder().status(StatusCode::OK).body(Body::from(reply.payload.to_owned()))
}
}
}
}))
}
}))
.await
.unwrap();
});
Ok(())
}
fn stop(&mut self) -> Result<(), Self::Error> {
trace!("Http.stop()");
Ok(())
}
fn receiver<T>(&mut self, receiver: T) -> Result<(), Self::Error>
where
T: Fn(&Call<&Self::Intermediate>) -> Result<Reply<Self::Intermediate>, Self::Error> + Send + Sync + 'static,
{
trace!("Http.receiver()");
self.receiver = Some(Arc::new(move |call: Arc<Mutex<Call<&String>>>| {
trace!("(Http.receiver)()");
let call = match call.as_ref().lock() {
Ok(c) => c,
Err(_) => return Arc::new(sync::Mutex::new(Err(Error::GetCallLockInReceiver))),
};
debug!("calling receiver");
match receiver(&*call) {
Ok(reply) => Arc::new(sync::Mutex::new(Ok(Reply { payload: reply.payload }))),
Err(e) => Arc::new(sync::Mutex::new(Err(e))),
}
}));
Ok(())
}
fn call(&mut self, call: &Call<&Self::Intermediate>) -> Result<Reply<Self::Intermediate>, Self::Error> {
trace!("Http.call()");
debug!("{:?}", &self.speak);
match &self.speak {
None => Err(Error::SpeakingDisabled),
Some(uri) => self.runtime.block_on(async {
let request = Request::builder()
.method(Method::POST)
.uri(uri)
.header("Procedure", &call.procedure)
.body(Body::from(call.payload.clone()))
.context(RequestBuilder)?;
debug!("request {:?}", &request);
let response = self.client.request(request).await.context(ClientRequest)?;
debug!("response {:?}", &response);
let status = response.status();
let body_bytes = hyper::body::to_bytes(response.into_body()).await.context(ParseResponseBodyBytes)?;
let body = String::from_utf8(body_bytes.to_vec()).context(ParseResponseBody)?;
match status {
StatusCode::OK => Ok(Reply { payload: body }),
_ => Err(Error::FailedRequest { status }),
}
}),
}
}
fn serialize<T: serde::Serialize>(from: &T) -> Result<String, Self::Error> {
trace!("Http.serialize()");
serde_json::to_string(from).map_err(|e| Error::Serialize { from: e })
}
fn deserialize<'b, T>(from: &'b Self::Intermediate) -> Result<T, Self::Error>
where
T: for<'de> serde::Deserialize<'de>,
{
trace!("Http.deserialize()");
serde_json::from_str(&from).map_err(|e| Error::Deserialize { from: e })
}
}
[package]
name = "mer_backend_http"
version = "0.1.0"
authors = ["Paul Volavsek <paul.volavsek@gmail.com>"]
edition = "2018"
[features]
[dependencies]
mer = { path = "../../mer" }
serde = "1.0.116"
serde_json = "1.0.57"
hyper = { version = "0.14", features = ["http2", "client", "server", "tcp"] }
tokio = { version = "1.0", features = ["rt", "rt-multi-thread"] }
snafu = "0.6.9"
log = "0.4"
[dev-dependencies]
mer_frontend_register = { path = "../../frontends/register" }
rand = "0.8"
flexi_logger = "0.16"
[[test]]
name = "test"
path = "test/tests.rs"