AGX35J5GL7C33UWYITQZEP6QRYBUTFJV3ANW6YMEMZ6WVWG7ZLXAC
Self::MalformedReply => "The reply from the Zhur core server could not be properly deserialized as an app's output.".to_owned(),
Self::MalformedReply => "The reply from the Zhur core server could not be properly deserialized as an app's HTTP output.".to_owned(),
pub async fn handle_req(req: FullRequest) -> Result<Response<Body>, Infallible> {
let invocation = req.into_invoc().await;
// TODO: actually send the invocation to the core and try to get something back.
// Also TODO: extract all this stuff into its own function so we can ? it.
match &invocation {
pub async fn handle_req(req: FullRequest, mut client: ChannelClient<Invocation, Result<HttpRes, InvocationError>>) -> Result<Response<Body>, Infallible> {
let invocation = match req.into_invoc().await {
Ok(Response::new(text.into()))
return Ok(Response::new(text.into())) // TODO: Error pages
}
};
// TODO: actually send the invocation to the core and try to get something back.
let reply = client.request(invocation);
match reply {
Ok(res) => {
info!("Got a well-formed HttpRes as an invocation result!");
return Ok(Response::new(
format!("Response with body length {}", res.body.len())
.into()
)) // TODO actually handle the damn HttpRes
},
Err(e) => {
let text = format!("Got an invocation error: {}", e);
warn!("{}", &text);
return Ok(Response::new(text.into())) // TODO: Error pages
// TODO: Not brazen copypaste
use bincode::deserialize;
use zhur_common::{bincode, log::warn, msg::chan::*, zmq};
use zhur_invk::{HttpRes, Invocation, InvocationError};
use zmq::{Context, Socket, SocketType};
/// Struct responsible for relaying requests from the gateway to the core.
pub struct Gate2CoreServer {
/// ZMQ REQ socket.
req_socket: Socket,
}
impl Gate2CoreServer {
pub fn new(zmq_ctx: &Context) -> Self {
Self {
req_socket: {
let sck = zmq_ctx.socket(SocketType::REQ)
.expect("Expected to be able to construct a socket.");
let endpoint = match std::env::var("ZHUR_CORE_REP_URI") {
Ok(s) => s,
Err(_) => {
warn!("ZHUR_CORE_REP_URI not set - assuming default value of tcp://127.0.0.1:8081!");
"tcp://127.0.0.1:8081".to_owned()
}
};
sck.connect(&endpoint)
.expect("Expected to be able to connect a REQ socket from the gateway to the core.");
sck
}
}
}
}
impl HandleRequest<Invocation, Result<HttpRes, InvocationError>> for Gate2CoreServer {
fn handle(&mut self, msg: Invocation) -> Result<HttpRes, InvocationError> {
let invoc_bytes = match bincode::serialize(&msg) {
Ok(b) => b,
Err(_) => return Err(InvocationError::SerializeErr)
};
match self.req_socket.send(invoc_bytes, 0) {
Ok(_) => (),
Err(_) => return Err(InvocationError::NoCore)
};
let response_bytes = match self.req_socket.recv_bytes(0) {
Ok(b) => b,
Err(_) => return Err(InvocationError::MalformedReply)
};
match deserialize(&response_bytes) {
Ok(r) => r,
Err(_) => Err(InvocationError::MalformedReply)
}
}
}
impl<T: Send, U: Send, V: HandleRequest<T, U>> ChannelServer<T, U, V> {
pub fn handle(&mut self) {
// TODO: handle timeouts
let (req, reply_tx) = match self.request_rx.recv() {
Ok(t) => {
trace!("A ChannelServer got a request!");
t
},
Err(_) => {
panic!("A ChannelServer could not receive a request!");
}
};
let reply = self.handler.handle(req);
match reply_tx.send(reply) {
Ok(_) => {
trace!("A ChannelServer replied to a request!");
},
Err(_) => {
panic!("A ChannelServer could not respond to a request!");
}
}
}
}