MWPA6DALYV4N4EMHBFH5DJ67GMW2R7KOACNCECDD7TDOGZAHKZ4AC
use mer_derive::*;
#[test]
fn test() {
#[frontend]
struct Data<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
pub offset: T,
}
#[frontend(target = "Data")]
trait Service<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
fn add(a: T, b: T) -> T::Output {
a + b
}
fn add_with_offset(&self, a: T, b: T) -> T::Output {
a + b + self.offset
}
}
}
use darling::FromMeta;
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, AttributeArgs, ItemStruct, ItemTrait};
#[macro_use]
mod frontend;
fn to_compile_errors(errors: Vec<syn::Error>) -> proc_macro2::TokenStream {
let compile_errors = errors.iter().map(syn::Error::to_compile_error);
quote!(#(#compile_errors)*)
}
#[proc_macro_attribute]
pub fn frontend(args: TokenStream, input: TokenStream) -> TokenStream {
let args = parse_macro_input!(args as AttributeArgs);
let args_parsed = match frontend::AttrArgs::from_list(&args) {
Ok(v) => v,
Err(e) => {
return TokenStream::from(e.write_errors());
}
};
if args_parsed.target.is_some() {
let input = parse_macro_input!(input as ItemTrait);
frontend::expand_trait(&args_parsed, &input).unwrap_or_else(to_compile_errors).into()
} else {
let input = parse_macro_input!(input as ItemStruct);
frontend::expand_struct(&input).unwrap_or_else(to_compile_errors).into()
}
}
use darling::FromMeta;
use proc_macro2::TokenStream;
use quote::{format_ident, quote};
#[derive(Debug, FromMeta)]
pub struct AttrArgs {
#[darling(default)]
pub target: Option<syn::Path>,
}
pub fn expand_trait(args: &AttrArgs, input: &syn::ItemTrait) -> Result<TokenStream, Vec<syn::Error>> {
let trait_name = &input.ident;
let trait_generics = &input.generics;
let service_name = args.target.as_ref().unwrap();
let where_clause = &trait_generics.where_clause;
let items = &input.items;
let item_methods: Vec<syn::TraitItemMethod> = items
.iter()
.filter_map(|i| match i {
syn::TraitItem::Method(m) => {
let mut method = m.clone();
method.default.take();
Some(method)
}
_ => None,
})
.collect();
let mut impl_generic_def = trait_generics.clone();
impl_generic_def.params.insert(0, syn::parse_quote! { __B: mer::interfaces::Backend<'__a> });
impl_generic_def.params.insert(0, syn::parse_quote! { '__a });
let mut impl_generics = trait_generics.clone();
impl_generics.params.insert(0, syn::parse_quote! { __B });
impl_generics.params.insert(0, syn::parse_quote! { '__a });
let receiver_impl_items: Vec<TokenStream> = item_methods
.iter()
.map(|i| {
let item_name = format_ident!("{}", &i.sig.ident);
let mut has_self: bool = false;
let arguments: Vec<&Box<syn::Type>> = i
.sig
.inputs
.iter()
.filter_map(|a| match a {
syn::FnArg::Typed(t) => Some(&t.ty),
syn::FnArg::Receiver(_) => {
has_self = true;
None
}
})
.collect();
let index = (0..arguments.len()).map(syn::Index::from);
let deser = quote! {
let deser_payload = __B::deserialize::<(#( #arguments ),*)>(call.payload)?;
};
let reply = match has_self {
true => quote! {
let reply = <Self as #trait_name #impl_generics>::#item_name(self, #( deser_payload.#index),*);
},
false => quote! {
let reply = <Self as #trait_name #impl_generics>::#item_name(#( deser_payload.#index),*);
},
};
let ser = quote! {
let ser_reply = __B::serialize(&reply)?;
Ok(mer::Reply { payload: ser_reply })
};
quote! {
stringify!(#item_name) => {
log::debug!("frontend procedure receiving: {}", stringify!(#item_name));
#deser
#reply
#ser
}
}
})
.collect();
let caller_impl_items: Vec<TokenStream> = item_methods
.iter()
.map(|i| {
let item_name = &i.sig.ident;
let mut has_self = false;
let arguments: Vec<&syn::Ident> = i
.sig
.inputs
.iter()
.filter_map(|a| match a {
syn::FnArg::Typed(t) => {
if let syn::Pat::Ident(ident) = &*t.pat {
Some(&ident.ident)
} else {
None
}
}
syn::FnArg::Receiver(_) => {
has_self = true;
None
}
})
.collect();
let mut signature = i.sig.inputs.clone();
if !has_self {
signature.insert(0, syn::parse_quote! { &self })
}
let old_return_type: syn::Type = match &i.sig.output {
syn::ReturnType::Default => syn::parse_quote! { () },
syn::ReturnType::Type(_, t) => syn::parse_quote! { #t },
};
let return_type: syn::ReturnType = syn::parse_quote! { -> Result<#old_return_type, mer::frontends::derive::Error<__B::Error>> };
quote! {
pub fn #item_name(#signature) #return_type {
log::debug!("frontend procedure calling: {}", stringify!(#item_name));
let ser_payload = __B::serialize(&(#( #arguments ),*))?;
let reply = self.__call.as_ref().unwrap()(&mer::Call {
procedure: stringify!(#item_name).to_string(),
payload: &ser_payload,
})?
.payload;
let deser_reply = __B::deserialize::<#old_return_type>(&reply);
Ok(deser_reply?)
}
}
})
.collect();
Ok(quote! {
trait #trait_name #impl_generic_def #where_clause {
#( #item_methods )*
}
impl #impl_generic_def #trait_name #impl_generics for #service_name #impl_generics #where_clause {
#( #items )*
}
impl #impl_generic_def #service_name #impl_generics #where_clause {
fn __receive(&self, call: &mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, mer::frontends::derive::Error<__B::Error>> {
match call.procedure.as_str() {
#( #receiver_impl_items ),*
_ => Err(mer::frontends::derive::Error::UnknownProcedure {}),
}
}
}
impl #impl_generic_def #service_name #impl_generics #where_clause {
#( #caller_impl_items )*
}
})
}
pub fn expand_struct(input: &syn::ItemStruct) -> Result<TokenStream, Vec<syn::Error>> {
let struct_name = &input.ident;
let struct_name_init = format_ident!("{}Init", &input.ident);
let struct_generics = &input.generics;
let where_clause = &struct_generics.where_clause;
let fields = match &input.fields {
syn::Fields::Named(named) => {
let fields: Vec<&syn::Field> = named.named.iter().collect();
quote! { #( #fields ),* }
}
rest => quote! { #rest },
};
let field_names = match &input.fields {
syn::Fields::Named(named) => named.named.iter().filter_map(|f| f.ident.clone()).map(|f| quote! { #f: self.#f }).collect(),
_ => vec![],
};
let mut impl_generic_def = struct_generics.clone();
impl_generic_def.params.insert(0, syn::parse_quote! { __B: mer::interfaces::Backend<'__a> });
impl_generic_def.params.insert(0, syn::parse_quote! { '__a });
let mut impl_generics = struct_generics.clone();
impl_generics.params.insert(0, syn::parse_quote! { __B });
impl_generics.params.insert(0, syn::parse_quote! { '__a });
Ok(quote! {
struct #struct_name #impl_generic_def #where_clause {
#fields,
__call: Option<Box<dyn Fn(&mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, __B::Error> + '__a + Send>>
}
#[derive(Default)]
struct #struct_name_init #struct_generics #where_clause {
#fields,
}
impl #struct_generics #struct_name_init #struct_generics #where_clause {
pub fn init<'__a, __B: mer::interfaces::Backend<'__a>>(self) -> #struct_name #impl_generics {
#struct_name {
#( #field_names ),*,
__call: None
}
}
}
impl #impl_generic_def mer::interfaces::Frontend<'__a, __B> for #struct_name #impl_generics #where_clause {
type Intermediate = String;
type Error = mer::frontends::derive::Error<__B::Error>;
fn caller<__T>(&mut self, caller: __T) -> Result<(), mer::frontends::derive::Error<__B::Error>>
where
__T: Fn(&mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, __B::Error> + '__a + Send,
__T: 'static,
{
self.__call = Some(Box::new(caller));
Ok(())
}
fn receive(&self, call: &mer::Call<&__B::Intermediate>) -> Result<mer::Reply<__B::Intermediate>, mer::frontends::derive::Error<__B::Error>> {
log::debug!("receiving: Call {{ prodecure: {:?}, payload: ... }}", &call.procedure);
self.__receive(call).map_err(core::convert::Into::into)
}
}
})
}
[package]
name = "mer_derive"
version = "0.1.0"
authors = ["Paul Volavsek <paul.volavsek@gmail.com>"]
edition = "2018"
[features]
default = ["std"]
std = []
[dependencies]
syn = "1.0"
quote = "1.0"
proc-macro2 = "1.0"
darling = "0.12"
log = { version = "0.4", default-features = false }
[dev-dependencies]
mer = { path = "../mer" }
serde = "1.0"
[lib]
proc-macro = true
[[test]]
name = "test"
path = "test/tests.rs"
use backends::InProcessChannel;
use mer::*;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
fn add(a: i32, b: i32) -> i32 {
a + b
}
#[test]
fn register_http() {
let register_caller = frontends::RegisterInit {}.init();
let register_receiver = frontends::RegisterInit {}.init();
register_caller.register("add", |(a, b)| add(a, b)).unwrap();
register_receiver.register("add", |(a, b)| add(a, b)).unwrap();
let mer_caller = MerInit {
backend: backends::HttpInit {
speak: "http://localhost:8080".parse::<hyper::Uri>().unwrap().into(),
listen: None,
..Default::default()
}
.init()
.unwrap(),
frontend: register_caller,
}
.init();
let mut mer_receiver = MerInit {
backend: backends::HttpInit {
speak: None,
listen: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080).into(),
..Default::default()
}
.init()
.unwrap(),
frontend: register_receiver,
}
.init();
mer_receiver.start().unwrap();
let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);
let result: i32 = mer_caller.frontend(|f| f.call("add", &(a, b)).unwrap()).unwrap();
assert_eq!(result, a + b);
}
#[test]
fn register_in_process() {
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
let register_caller = frontends::RegisterInit {}.init();
let register_receiver = frontends::RegisterInit {}.init();
register_caller.register("add", |(a, b)| add(a, b)).unwrap();
register_receiver.register("add", |(a, b)| add(a, b)).unwrap();
let (to, from): (Sender<InProcessChannel>, Receiver<InProcessChannel>) = mpsc::channel();
let mer_caller = MerInit {
backend: backends::InProcessInit { to: to.into(), ..Default::default() }.init().unwrap(),
frontend: register_caller,
}
.init();
let mut mer_receiver = MerInit {
backend: backends::InProcessInit {
from: from.into(),
..Default::default()
}
.init()
.unwrap(),
frontend: register_receiver,
}
.init();
mer_receiver.start().unwrap();
let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);
let result: i32 = mer_caller.frontend(|f| f.call("add", &(a, b)).unwrap()).unwrap();
assert_eq!(result, a + b);
}
#[test]
fn derive_http() {
#[mer_derive::frontend()]
struct Data<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
pub offset: T,
}
#[mer_derive::frontend(target = "Data")]
trait Receiver<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
fn add(a: T, b: T) -> T::Output {
a + b
}
fn add_with_offset(&self, a: T, b: T) -> T::Output {
a + b + self.offset
}
}
let mer_call = MerInit {
backend: backends::HttpInit {
speak: "http://localhost:8081".parse::<hyper::Uri>().unwrap().into(),
listen: None,
..Default::default()
}
.init()
.unwrap(),
frontend: DataInit::<i32> { offset: 32 }.init(),
}
.init();
let mut mer_receive = MerInit {
backend: backends::HttpInit {
speak: None,
listen: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081).into(),
..Default::default()
}
.init()
.unwrap(),
frontend: DataInit::<i32> { offset: 32 }.init(),
}
.init();
mer_receive.start().unwrap();
let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);
assert_eq!(
mer_call
.frontend(|f| {
println!("start");
let tmp = f.add(a, b).unwrap();
tmp
})
.unwrap(),
a + b
);
}
#[test]
fn derive_in_process() {
use std::sync::mpsc;
#[mer_derive::frontend()]
struct Data<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
pub offset: T,
}
#[mer_derive::frontend(target = "Data")]
trait Receiver<T>
where
T: std::ops::Add<Output = T> + for<'de> serde::Deserialize<'de> + serde::Serialize + Copy + Send,
{
fn add(a: T, b: T) -> T::Output {
a + b
}
fn add_with_offset(&self, a: T, b: T) -> T::Output {
a + b + self.offset
}
}
let (to, from): (mpsc::Sender<InProcessChannel>, mpsc::Receiver<InProcessChannel>) = mpsc::channel();
let mer_caller = MerInit {
backend: backends::InProcessInit { to: to.into(), ..Default::default() }.init().unwrap(),
frontend: DataInit::<i32> { offset: 32 }.init(),
}
.init();
let mut mer_register = MerInit {
backend: backends::InProcessInit {
from: from.into(),
..Default::default()
}
.init()
.unwrap(),
frontend: DataInit::<i32> { offset: 32 }.init(),
}
.init();
mer_register.start().unwrap();
let (a, b) = (rand::random::<i32>() / 2, rand::random::<i32>() / 2);
assert_eq!(mer_caller.frontend(|f| { f.add(a, b).unwrap() }).unwrap(), a + b);
}
#![cfg_attr(not(feature = "std"), no_std)]
extern crate alloc;
#[macro_use]
pub mod helpers;
#[cfg(feature = "backends")]
pub mod backends;
#[cfg(feature = "frontends")]
pub mod frontends;
pub mod interfaces;
use core::marker::PhantomData;
use snafu::Snafu;
use log::trace;
#[derive(Debug, Snafu)]
pub enum Error {
Lock {},
}
pub struct Call<T> {
pub procedure: String,
pub payload: T,
}
unsafe impl<T> Send for Call<T> where T: Send {}
pub struct Reply<T> {
pub payload: T,
}
unsafe impl<T> Send for Reply<T> where T: Send {}
pub struct Mer<'a, B, F>
where
B: interfaces::Backend<'a>,
B: 'a,
F: interfaces::Frontend<'a, B>,
F: 'a,
{
_phantom: PhantomData<&'a B>,
backend: smart_lock_type!(B),
frontend: smart_lock_type!(F),
}
pub struct MerInit<B, F> {
pub backend: B,
pub frontend: F,
}
impl<'a, B, F> MerInit<B, F>
where
B: interfaces::Backend<'a> + 'static,
F: interfaces::Frontend<'a, B> + 'static,
{
pub fn init(self) -> Mer<'a, B, F> {
trace!("MerInit.init()");
let backend = smart_lock!(self.backend);
let frontend = smart_lock!(self.frontend);
let frontend_receiver = clone_lock!(frontend);
let backend_caller = clone_lock!(backend);
access_mut!(backend)
.unwrap()
.receiver(move |call: &Call<&B::Intermediate>| {
trace!("Mer.backend.receiver()");
Ok(access!(frontend_receiver).unwrap().receive(call).unwrap())
}) //TODO: fix error
.unwrap();
access_mut!(frontend)
.unwrap()
.caller(move |call: &Call<&B::Intermediate>| {
trace!("Mer.frontend.caller()");
access!(backend_caller).unwrap().call(call)
})
.unwrap();
Mer {
_phantom: PhantomData,
backend: clone_lock!(backend),
frontend: clone_lock!(frontend),
}
}
}
impl<'a, B: interfaces::Backend<'a>, F: interfaces::Frontend<'a, B>> Mer<'a, B, F> {
pub fn start(&mut self) -> Result<(), B::Error> {
trace!("MerInit.start()");
access!(self.backend).unwrap().start()
}
pub fn stop(&mut self) -> Result<(), B::Error> {
trace!("MerInit.stop()");
access!(self.backend).unwrap().stop()
}
pub fn frontend<T, R>(&self, access: T) -> Result<R, Error>
where
T: Fn(&F) -> R,
{
trace!("MerInit.frontend()");
Ok(access(&*match access!(self.frontend) {
Ok(frontend) => frontend,
Err(_) => return Err(Error::Lock {}),
}))
}
pub fn backend<T, R>(&self, access: T) -> Result<R, Error>
where
T: Fn(&B) -> R,
{
trace!("MerInit.backend()");
Ok(access(&*match access!(self.backend) {
Ok(backend) => backend,
Err(_) => return Err(Error::Lock {}),
}))
}
}
pub mod backend;
pub use backend::Backend;
pub mod frontend;
pub use frontend::Frontend;
use crate::interfaces::backend;
pub trait Frontend<'a, B>: Send
where
B: backend::Backend<'a>,
{
type Intermediate: serde::Serialize + serde::Deserialize<'a>;
type Error: snafu::Error + core::fmt::Debug;
fn caller<T>(&mut self, caller: T) -> Result<(), Self::Error>
where
T: Fn(&crate::Call<&B::Intermediate>) -> Result<crate::Reply<B::Intermediate>, B::Error> + Send + Sync + 'static;
fn receive(&self, call: &crate::Call<&B::Intermediate>) -> Result<crate::Reply<B::Intermediate>, Self::Error>;
}
pub trait Backend<'a>: Send {
type Intermediate: serde::Serialize + serde::Deserialize<'a>;
type Error: snafu::Error + core::fmt::Debug;
fn start(&mut self) -> Result<(), Self::Error>;
fn stop(&mut self) -> Result<(), Self::Error>;
fn receiver<T>(&mut self, receiver: T) -> Result<(), Self::Error>
where
T: Fn(&crate::Call<&Self::Intermediate>) -> Result<crate::Reply<Self::Intermediate>, Self::Error> + Send + Sync + 'static;
fn call(&mut self, call: &crate::Call<&Self::Intermediate>) -> Result<crate::Reply<Self::Intermediate>, Self::Error>;
fn serialize<T: serde::Serialize>(from: &T) -> Result<Self::Intermediate, Self::Error>;
fn deserialize<'b, T>(from: &'b Self::Intermediate) -> Result<T, Self::Error>
where
T: for<'de> serde::Deserialize<'de>;
}
#[cfg(feature = "threadsafe")]
#[allow(unused_macros)]
macro_rules! smart_pointer_type {
($x:ty) => {
alloc::sync::Arc<$x>
}
}
#[cfg(not(feature = "threadsafe"))]
macro_rules! smart_pointer_type {
($x:ty) => {
alloc::rc::Rc<$x>
}
}
#[cfg(feature = "threadsafe")]
#[allow(unused_macros)]
macro_rules! smart_pointer {
($x:expr) => {
alloc::sync::Arc::new($x)
};
}
#[cfg(not(feature = "threadsafe"))]
macro_rules! smart_pointer {
($x:expr) => {
alloc::rc::Rc::new($x)
};
}
#[allow(unused_macros)]
macro_rules! clone_pointer {
($x:expr) => {
$x.clone()
};
}
#[cfg(all(feature = "std", feature = "threadsafe"))]
macro_rules! smart_lock_type {
($x:ty) => {
alloc::sync::Arc<std::sync::Mutex<$x>>
}
}
#[cfg(all(not(feature = "std"), feature = "threadsafe"))]
macro_rules! smart_lock_type {
($x:ty) => {
alloc::sync::Arc<spin::Mutex<$x>>
}
}
#[cfg(not(feature = "threadsafe"))]
macro_rules! smart_lock_type {
($x:ty) => {
alloc::rc::Rc<core::cell::RefCell<$x>>
}
}
#[cfg(all(feature = "std", feature = "threadsafe"))]
macro_rules! smart_lock {
($x:expr) => {
alloc::sync::Arc::new(std::sync::Mutex::new($x))
};
}
#[cfg(all(not(feature = "std"), feature = "threadsafe"))]
#[cfg(feature = "threadsafe")]
macro_rules! smart_lock {
($x:expr) => {
alloc::sync::Arc::new(spin::Mutex::new($x))
};
}
#[cfg(not(feature = "threadsafe"))]
macro_rules! smart_lock {
($x:expr) => {
alloc::rc::Rc::new(core::cell::RefCell::new($x))
};
}
macro_rules! clone_lock {
($x:expr) => {
$x.clone()
};
}
#[cfg(all(feature = "threadsafe", feature = "std"))]
macro_rules! access {
($x:expr) => {
$x.lock()
};
}
#[cfg(all(feature = "threadsafe", not(feature = "std")))]
macro_rules! access {
($x:expr) => {
Ok(*$x.lock())
};
}
#[cfg(feature = "threadsafe")]
macro_rules! access_mut {
($x:expr) => {
access!($x)
};
}
#[cfg(not(feature = "threadsafe"))]
macro_rules! access {
($x:expr) => {
$x.borrow()
};
}
#[cfg(not(feature = "threadsafe"))]
macro_rules! access_mut {
($x:expr) => {
$x.borrow_mut()
};
}
#[macro_use]
pub mod smart_lock;
#[macro_use]
pub mod smart_pointer;
use interfaces::Backend;
use crate::interfaces;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use snafu::Snafu;
#[derive(Debug, Snafu)]
pub enum Error<B: core::fmt::Display> {
FromBackend { from: B },
ProcedureNotRegistered {},
}
impl<B: snafu::Error> From<B> for Error<B> {
fn from(from: B) -> Self {
Error::FromBackend { from }
}
}
pub struct Register<'a, B: Backend<'a>> {
#[allow(clippy::type_complexity)]
procedures: Arc<Mutex<HashMap<String, Box<dyn Fn(&crate::Call<&B::Intermediate>) -> Result<crate::Reply<B::Intermediate>, Error<B::Error>> + 'a>>>>,
#[allow(clippy::type_complexity)]
call: Option<Box<dyn Fn(&crate::Call<&B::Intermediate>) -> Result<crate::Reply<B::Intermediate>, B::Error> + 'a + Send>>,
}
unsafe impl<'a, T: Backend<'a>> Send for Register<'a, T> {}
pub struct RegisterInit {}
impl Default for RegisterInit {
fn default() -> Self {
RegisterInit {}
}
}
impl RegisterInit {
pub fn init<'a, B: Backend<'a>>(self) -> Register<'a, B> {
Register {
procedures: Arc::new(Mutex::new(HashMap::new())),
call: None,
}
}
}
impl<'a, B: Backend<'a>> Register<'a, B> {
pub fn register<P, C: for<'de> serde::Deserialize<'de>, R: serde::Serialize>(&self, name: &str, procedure: P) -> Result<(), Error<B::Error>>
where
P: Fn(C) -> R + 'a,
{
self.procedures.lock().unwrap().insert(
name.to_string(),
Box::new(move |call: &crate::Call<&B::Intermediate>| {
let reply = procedure(B::deserialize::<C>(call.payload)?);
Ok(crate::Reply { payload: B::serialize::<R>(&reply)? })
}),
);
Ok(())
}
pub fn call<C: serde::Serialize, R: for<'de> serde::Deserialize<'de>>(&self, procedure: &str, payload: &C) -> Result<R, Error<B::Error>> {
Ok(B::deserialize(
&self.call.as_ref().unwrap()(&crate::Call {
procedure: procedure.to_string(),
payload: &B::serialize(&payload)?,
})?
.payload,
)?)
}
}
impl<'a, B> interfaces::Frontend<'a, B> for Register<'a, B>
where
B: interfaces::Backend<'a>,
{
type Intermediate = String;
type Error = Error<B::Error>;
fn caller<T>(&mut self, caller: T) -> Result<(), Self::Error>
where
T: Fn(&crate::Call<&B::Intermediate>) -> Result<crate::Reply<B::Intermediate>, B::Error> + 'a + Send,
{
self.call = Some(Box::new(caller));
Ok(())
}
fn receive(&self, call: &crate::Call<&B::Intermediate>) -> Result<crate::Reply<B::Intermediate>, Error<B::Error>> {
self.procedures.lock().unwrap().get(&call.procedure).unwrap()(call)
}
}
pub mod register;
pub use register::{Register, RegisterInit};
pub mod derive;
use snafu::Snafu;
#[derive(Debug, Snafu)]
pub enum Error<B: core::fmt::Display> {
FromBackend { from: B },
UnknownProcedure,
MutexLock,
}
impl<B: snafu::Error> From<B> for Error<B> {
fn from(from: B) -> Self {
Error::FromBackend { from }
}
}
pub mod http;
pub use http::{Http, HttpInit};
pub mod in_process;
pub use in_process::{InProcess, InProcessChannel, InProcessInit};
use crate::interfaces;
use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
pub type InProcessChannel = (crate::Call<String>, Sender<Result<crate::Reply<String>, Error>>);
#[derive(Debug, Snafu)]
pub enum Error {
Serialize { from: serde_json::Error },
Deserialize { from: serde_json::Error },
GetCallLockInReceiver {},
NoReceiver {},
NoCallerChannel {},
NoReceiverChannel {},
CallerRecv { source: std::sync::mpsc::RecvError },
CallerSend { source: std::sync::mpsc::SendError<InProcessChannel> },
Reply { from: String },
RuntimeCreation { source: std::io::Error },
}
pub struct InProcess {
#[allow(clippy::type_complexity)]
receiver: Option<Arc<dyn Fn(Arc<Mutex<crate::Call<&String>>>) -> Arc<tokio::sync::Mutex<Result<crate::Reply<String>, Error>>> + Send + Sync>>,
runtime: Runtime,
to: Option<Sender<InProcessChannel>>,
from: Option<Arc<Mutex<Receiver<InProcessChannel>>>>,
}
pub struct InProcessInit {
pub to: Option<Sender<InProcessChannel>>,
pub from: Option<Receiver<InProcessChannel>>,
}
impl Default for InProcessInit {
fn default() -> Self {
InProcessInit { to: None, from: None }
}
}
impl InProcessInit {
pub fn init(self) -> Result<InProcess, Error> {
Ok(InProcess {
receiver: None,
to: self.to,
from: if let Some(from) = self.from { Some(Arc::new(Mutex::new(from))) } else { None },
runtime: Runtime::new().context(RuntimeCreation)?,
})
}
}
impl<'a> interfaces::Backend<'a> for InProcess {
type Intermediate = String;
type Error = Error;
fn start(&mut self) -> Result<(), Self::Error> {
let from = self.from.as_ref().context(NoReceiverChannel {})?.clone();
let receiver = self.receiver.as_ref().context(NoReceiver {})?.clone();
self.runtime.spawn(async move {
loop {
let (call, tx) = from.lock().unwrap().recv().unwrap();
let reply_mutex = receiver(Arc::new(Mutex::new(crate::Call {
procedure: call.procedure,
payload: &call.payload,
})));
let reply = &*reply_mutex.lock().await;
tx.send(match reply {
Ok(r) => Ok(crate::Reply { payload: r.payload.clone() }),
Err(e) => Err(Error::Reply { from: format!("{:?}", e) }),
})
.unwrap();
}
});
Ok(())
}
fn stop(&mut self) -> Result<(), Self::Error> {
Ok(())
}
fn receiver<T>(&mut self, receiver: T) -> Result<(), Self::Error>
where
T: Fn(&crate::Call<&Self::Intermediate>) -> Result<crate::Reply<Self::Intermediate>, Self::Error> + Send + Sync + 'static,
{
self.receiver = Some(Arc::new(move |call: Arc<Mutex<crate::Call<&String>>>| {
let call = match call.as_ref().lock() {
Ok(c) => c,
Err(_) => return Arc::new(tokio::sync::Mutex::new(Err(Error::GetCallLockInReceiver {}))),
};
match receiver(&*call) {
Ok(reply) => Arc::new(tokio::sync::Mutex::new(Ok(crate::Reply { payload: reply.payload }))),
Err(e) => Arc::new(tokio::sync::Mutex::new(Err(e))),
}
}));
Ok(())
}
fn call(&mut self, call: &crate::Call<&Self::Intermediate>) -> Result<crate::Reply<Self::Intermediate>, Self::Error> {
#[allow(clippy::type_complexity)]
let (tx, rx): (Sender<Result<crate::Reply<String>, Error>>, Receiver<Result<crate::Reply<String>, Error>>) = mpsc::channel();
self
.to
.as_ref()
.context(NoCallerChannel {})?
.send((
crate::Call {
procedure: call.procedure.clone(),
payload: call.payload.clone(),
},
tx,
))
.context(CallerSend {})?;
rx.recv().context(CallerRecv {})?
}
fn serialize<T: serde::Serialize>(from: &T) -> Result<String, Self::Error> {
serde_json::to_string(from).map_err(|e| Error::Serialize { from: e })
}
fn deserialize<'b, T>(from: &'b Self::Intermediate) -> Result<T, Self::Error>
where
T: for<'de> serde::Deserialize<'de>,
{
serde_json::from_str(&from).map_err(|e| Error::Deserialize { from: e })
}
}
use crate::interfaces;
use snafu::{OptionExt, ResultExt, Snafu};
use hyper::{
client::{connect::dns::GaiResolver, HttpConnector},
Body, Client, Method, Request, StatusCode,
};
use hyper::http::Uri;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Response, Server};
use std::sync::{Arc, Mutex};
use std::{fmt::Debug, net::SocketAddr};
use tokio::runtime::Runtime;
use tokio::sync;
use log::{debug, trace};
#[derive(Debug, Snafu)]
pub enum Error {
Serialize { from: serde_json::Error },
Deserialize { from: serde_json::Error },
SpeakingDisabled,
RequestBuilder { source: hyper::http::Error },
ParseResponseBodyBytes { source: hyper::Error },
ParseResponseBody { source: std::string::FromUtf8Error },
ClientRequest { source: hyper::Error },
FailedRequest { status: StatusCode },
NoListen,
NoReceiver,
BindServer { source: hyper::Error },
NoProcedureHeader { source: hyper::http::Error },
GetCallLockInReceiver,
RuntimeCreation { source: std::io::Error },
}
pub struct Http {
client: Client<HttpConnector<GaiResolver>, Body>,
speak: Option<Uri>,
listen: Option<SocketAddr>,
#[allow(clippy::type_complexity)]
receiver: Option<Arc<dyn Fn(Arc<Mutex<crate::Call<&String>>>) -> Arc<sync::Mutex<Result<crate::Reply<String>, Error>>> + Send + Sync>>,
runtime: Runtime,
}
impl Debug for Http {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
f.debug_struct("Http")
.field("client", &self.client)
.field("speak", &self.speak)
.field("listen", &self.listen)
.field("runtime", &self.runtime)
.finish()
}
}
pub struct HttpInit {
pub client: Client<HttpConnector<GaiResolver>, Body>,
pub speak: Option<Uri>,
pub listen: Option<SocketAddr>,
}
impl Default for HttpInit {
fn default() -> Self {
HttpInit {
client: Client::new(),
speak: None,
listen: None,
}
}
}
impl From<HttpInit> for Result<Http, Error> {
fn from(from: HttpInit) -> Self {
from.init()
}
}
impl HttpInit {
pub fn init(self) -> Result<Http, Error> {
trace!("HttpInit.init()");
let http = Http {
client: self.client,
speak: self.speak,
listen: self.listen,
receiver: None,
runtime: Runtime::new().context(RuntimeCreation {})?,
};
debug!("{:?}", &http);
Ok(http)
}
}
impl<'a> interfaces::Backend<'a> for Http {
type Intermediate = String;
type Error = Error;
fn start(&mut self) -> Result<(), Self::Error> {
trace!("Http.start()");
let listen = self.listen.context(NoListen)?;
let receiver = Arc::clone(self.receiver.as_ref().context(NoReceiver)?);
self.runtime.spawn(async move {
trace!("Http.runtime.spawn()");
let receiver = receiver.clone();
Server::bind(&listen)
.serve(make_service_fn(move |_| {
trace!("Http.runtime.spawn.serve()");
let receiver = receiver.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |request: Request<Body>| {
trace!("Http.runtime.spawn.serve.service_fn()");
debug!("{:?}", &listen);
let receiver = receiver.clone();
async move {
let procedure = if let Some(procedure) = request.headers().get("Procedure") {
match procedure.to_str() {
Ok(procedure) => procedure.to_owned(),
Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),
}
} else {
return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from("No Procedure provided"));
};
let body_bytes = match hyper::body::to_bytes(request.into_body()).await {
Ok(body_bytes) => body_bytes,
Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),
};
let body = match String::from_utf8(body_bytes.to_vec()) {
Ok(body) => body,
Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),
};
debug!("call Call {{ procedure: {:?}, payload: {:?} }}", &procedure, &body);
let reply_mutex = receiver(Arc::new(Mutex::new(crate::Call { procedure, payload: &body })));
let reply = &*reply_mutex.lock().await;
match reply {
Err(e) => Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(format!("{:?}", e))),
Ok(reply) => {
debug!("reply Reply {{ payload: {:?} }}", &reply.payload);
Response::builder().status(StatusCode::OK).body(Body::from(reply.payload.to_owned()))
}
}
}
}))
}
}))
.await
.unwrap();
});
Ok(())
}
fn stop(&mut self) -> Result<(), Self::Error> {
trace!("Http.stop()");
Ok(())
}
fn receiver<T>(&mut self, receiver: T) -> Result<(), Self::Error>
where
T: Fn(&crate::Call<&Self::Intermediate>) -> Result<crate::Reply<Self::Intermediate>, Self::Error> + Send + Sync + 'static,
{
trace!("Http.receiver()");
self.receiver = Some(Arc::new(move |call: Arc<Mutex<crate::Call<&String>>>| {
trace!("(Http.receiver)()");
let call = match call.as_ref().lock() {
Ok(c) => c,
Err(_) => return Arc::new(sync::Mutex::new(Err(Error::GetCallLockInReceiver))),
};
debug!("calling receiver");
match receiver(&*call) {
Ok(reply) => Arc::new(sync::Mutex::new(Ok(crate::Reply { payload: reply.payload }))),
Err(e) => Arc::new(sync::Mutex::new(Err(e))),
}
}));
Ok(())
}
fn call(&mut self, call: &crate::Call<&Self::Intermediate>) -> Result<crate::Reply<Self::Intermediate>, Self::Error> {
trace!("Http.call()");
debug!("{:?}", &self.speak);
match &self.speak {
None => Err(Error::SpeakingDisabled),
Some(uri) => self.runtime.block_on(async {
let request = Request::builder()
.method(Method::POST)
.uri(uri)
.header("Procedure", &call.procedure)
.body(Body::from(call.payload.clone()))
.context(RequestBuilder)?;
debug!("request {:?}", &request);
let response = self.client.request(request).await.context(ClientRequest)?;
debug!("response {:?}", &response);
let status = response.status();
let body_bytes = hyper::body::to_bytes(response.into_body()).await.context(ParseResponseBodyBytes)?;
let body = String::from_utf8(body_bytes.to_vec()).context(ParseResponseBody)?;
match status {
StatusCode::OK => Ok(crate::Reply { payload: body }),
_ => Err(Error::FailedRequest { status }),
}
}),
}
}
fn serialize<T: serde::Serialize>(from: &T) -> Result<String, Self::Error> {
trace!("Http.serialize()");
serde_json::to_string(from).map_err(|e| Error::Serialize { from: e })
}
fn deserialize<'b, T>(from: &'b Self::Intermediate) -> Result<T, Self::Error>
where
T: for<'de> serde::Deserialize<'de>,
{
trace!("Http.deserialize()");
serde_json::from_str(&from).map_err(|e| Error::Deserialize { from: e })
}
}
[package]
name = "mer"
version = "0.1.0"
authors = ["Paul Volavsek <paul.volavsek@gmail.com>"]
edition = "2018"
[features]
default = ["std", "backends", "frontends", "derive", "snafu/std"]
std = ["serde/std"
#, "serde-transcode/std", "erased-serde/std"
]
threadsafe = ["spin"]
backends = ["std", "serde_json", "hyper", "hyper/full", "tokio", "tokio/full", "threadsafe"]
frontends = ["std", "mer_derive"]
derive = ["serde/derive"]
[dependencies]
serde = { version = "1.0.116", default-features = false, features = ["alloc"] }
# serde-transcode = { version = "1.2.0", git = "https://github.com/volllly/serde-transcode", default-features = false, features = ["alloc"] }
serde_json = { version = "1.0.57", optional = true }
# erased-serde = { version = "0.3.12", default-features = false, features = ["alloc"] }
hyper = { version = "0.14", optional = true }
tokio = { version = "1.0", optional = true }
spin = { version = "0.7.0", optional = true }
snafu = { version = "0.6.9", default-features = false }
log = { version = "0.4", default-features = false }
mer_derive = { path = "../mer-derive", optional = true }
[dev-dependencies]
rand = "0.8"
flexi_logger = "0.16"
[[test]]
name = "test"
path = "test/tests.rs"
# 'mer
[![CI](https://github.com/volllly/mer/workflows/CI/badge.svg?branch=main)](https://github.com/volllly/mer/actions?query=workflow%3ACI)
minimal extensible rpc framework
MIT License
Copyright (c) 2020 Paul Volavsek
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
[workspace]
members = [
"mer",
"mer-derive"
]
tab_spaces = 2
max_width = 200
merge_imports = true
.vscode
# Generated by Cargo
# will have compiled files and executables
/target/
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
#Added by cargo
#
#already existing elements were commented out
/target
#Cargo.lock
.pijul
.vscode
# Generated by Cargo
# will have compiled files and executables
/target/
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
#Added by cargo
#
#already existing elements were commented out
/target
#Cargo.lock
.pijul