TUZICZZDQKNR7ZJGOEFQ4LJZDA5H4UHG4ATBAQOBXLQNY2R6IO5QC
H5IHJUKQYZNZZ4FMUCOAPH4XCALZ7TSUYFIVMYBOI2MBRV4GVUVQC
NUXZXM3D64JDAP7TVBAY23Z43RW27JL4XHVOEXFLT5CLDU6CVOOAC
6LABQWDWWQUEDTXZBKETGLG65FZ66FJLLY5IW3IFCMYJUO65Q4BAC
AGX35J5GL7C33UWYITQZEP6QRYBUTFJV3ANW6YMEMZ6WVWG7ZLXAC
WGQH6HZG7HZNKJYKXV6DACDXMPRA6YPXBPA6BOHZDMGVF2DLBQ6QC
DIIDKMIXQENUQIOIFHT62KAX4WL5MJWEB5BXHGUYCDSKIPRQVPOAC
QUPSHDL6MQBVCRQPZ3O7MMG27FJYCASSOWN6LI6X5K3VOL7TCAVAC
BMWVMVJ527SZSVE7VKP5IAAOKNZPAFJ2RKQKXHWU7QTOX4OZUPHQC
KZMW4JDYUYSQHRZMLNL7EOVWYJWOI7L573QAL3KUMMMG6IWKBS6AC
WXLXWHPVYBBSZ76TUABERVLQAN26QTZLF6NGZZMB56FUTCLPSB4QC
Q5GUHJ4O6GNSL23U77Y3HR2WW7V3LTYVHXPKOMJY76JKIZUU5YWQC
[package]
name = "zhur_invk"
version = "0.1.0"
authors = ["oreganoli <3611916+oreganoli@users.noreply.github.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
zhur_common = { path = "../zhur_common" }
serde = { version = "1.0.118", features = ["derive"] }
use zhur_common::*;
use serde::{Deserialize, Serialize};
pub mod err;
pub use err::InvocationError;
pub mod http;
pub use http::*;
/// Struct representing a Zhur app invocation.
#[derive(Clone, Deserialize, Serialize, Debug)]
pub struct Invocation {
/// The name of the user/org to whom the app belongs.
pub owner: String,
/// The name of the app itself.
pub app_name: String,
/// The input for the app.
pub payload: Vec<u8>
}
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
#[derive(Debug, Deserialize, Serialize)]
/// A Serde-friendly representation of a HTTP request.
pub struct HttpReq {
/// The method of the request.
pub method: String,
/// The path of the request.
pub path: String,
/// Headers represented as pairs of `String`s.
pub headers: BTreeMap<String, String>,
/// Query strings represented as pairs of `String`s.
pub query_params: BTreeMap<String, String>,
/// Cookies represented as pairs of `String`s.
pub cookies: BTreeMap<String, String>,
/// The IP address of the requester.
pub ip_addr: String,
/// The body of the request, as bytes.
pub body: Vec<u8>
}
#[derive(Clone, Debug, Deserialize, Serialize)]
/// A Serde-friendly representation of an HTTP response.
pub struct HttpRes {
/// Headers represented as pairs of `String`s.
pub headers: BTreeMap<String, String>,
/// An optional `Set-Cookie` header. Setting multiple cookies is not supported and additional attributes aren't supported yet either.
pub set_cookie: Option<(String, String)>,
/// The body of the response, as bytes.
pub body: Vec<u8>,
/// The status code.
pub status: u16,
}
impl Default for HttpRes {
fn default() -> Self {
Self {
headers: BTreeMap::new(),
set_cookie: None,
body: vec![],
status: 200
}
}
}
use std::fmt::Display;
use super::*;
/// Everything that can go wrong with an invocation. Can be generated by the gateway or the core.
#[derive(Clone, Deserialize, Serialize, Debug)]
pub enum InvocationError {
// The variants below are gateway-side:
/// The HTTP request from Hyper could not be converted into a Serde-friendly form.
MalformedRequest,
/// No app was specified.
NoId,
/// No app could be identified with the information given.
MalformedId(String),
/// The gateway can't reach the core.
NoCore,
/// The core did not reply correctly.
MalformedReply,
/// The core did not respond within the timeout period.
TimedOut,
/// The invocation could not be serialized for transport.
SerializeErr,
// The variants below are core-side:
/// No app identified by the two ID substrings exists, or it is disabled/hidden.
NoSuchApp(String, String),
/// An internal problem occurred within the core.
OtherInternal
}
impl Display for InvocationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let text = match self {
Self::MalformedRequest => "The HTTP request from Hyper could not be converted into a Serde-friendly form.".to_owned(),
Self::NoId => "No app was specified at all.".to_owned(),
Self::MalformedId(id) => format!("The ID \"{}\" is not a valid Zhur app ID.", id),
Self::NoCore => "The Zhur gateway could not connect to the Zhur core.".to_owned(),
Self::MalformedReply => "The reply from the Zhur core server could not be properly deserialized as an app's HTTP output.".to_owned(),
Self::TimedOut => "The Zhur core could be reached, but timed out before responding.".to_owned(),
Self::SerializeErr => "The invocation could not be serialized for transport.".to_owned(),
Self::NoSuchApp(owner, app_name) => format!("The Zhur core could not find an app named {}:{}. It may have been disabled.", owner, app_name),
Self::OtherInternal => "The core encountered an internal error that prevented it from returning a proper reply.".to_owned()
};
f.write_str(&text)
}
}
use zhur_common::*;
use serde::{Deserialize, Serialize};
pub mod err;
pub use err::InvocationError;
pub mod http;
pub use http::*;
/// Struct representing a Zhur app invocation.
#[derive(Clone, Deserialize, Serialize, Debug)]
pub struct Invocation {
/// The name of the user/org to whom the app belongs.
pub owner: String,
/// The name of the app itself.
pub app_name: String,
/// The input for the app.
pub payload: Vec<u8>
}\
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
#[derive(Debug, Deserialize, Serialize)]
/// A Serde-friendly representation of a HTTP request.
pub struct HttpReq {
/// The method of the request.
pub method: String,
/// The path of the request.
pub path: String,
/// Headers represented as pairs of `String`s.
pub headers: BTreeMap<String, String>,
/// Query strings represented as pairs of `String`s.
pub query_params: BTreeMap<String, String>,
/// Cookies represented as pairs of `String`s.
pub cookies: BTreeMap<String, String>,
/// The IP address of the requester.
pub ip_addr: String,
/// The body of the request, as bytes.
pub body: Vec<u8>
}
#[derive(Clone, Debug, Deserialize, Serialize)]
/// A Serde-friendly representation of an HTTP response.
pub struct HttpRes {
/// Headers represented as pairs of `String`s.
pub headers: BTreeMap<String, String>,
/// An optional `Set-Cookie` header. Setting multiple cookies is not supported and additional attributes aren't supported yet either.
pub set_cookie: Option<(String, String)>,
/// The body of the response, as bytes.
pub body: Vec<u8>,
/// The status code.
pub status: u16,
}
impl Default for HttpRes {
fn default() -> Self {
Self {
headers: BTreeMap::new(),
set_cookie: None,
body: vec![],
status: 200
}
}
}\
use std::fmt::Display;
use super::*;
/// Everything that can go wrong with an invocation. Can be generated by the gateway or the core.
#[derive(Clone, Deserialize, Serialize, Debug)]
pub enum InvocationError {
// The variants below are gateway-side:
/// The HTTP request from Hyper could not be converted into a Serde-friendly form.
MalformedRequest,
/// No app was specified.
NoId,
/// No app could be identified with the information given.
MalformedId(String),
/// The gateway can't reach the core.
NoCore,
/// The core did not reply correctly.
MalformedReply,
/// The core did not respond within the timeout period.
TimedOut,
/// The invocation could not be serialized for transport.
SerializeErr,
// The variants below are core-side:
/// No app identified by the two ID substrings exists, or it is disabled/hidden.
NoSuchApp(String, String),
/// An internal problem occurred within the core.
OtherInternal
}
impl Display for InvocationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let text = match self {
Self::MalformedRequest => "The HTTP request from Hyper could not be converted into a Serde-friendly form.".to_owned(),
Self::NoId => "No app was specified at all.".to_owned(),
Self::MalformedId(id) => format!("The ID \"{}\" is not a valid Zhur app ID.", id),
Self::NoCore => "The Zhur gateway could not connect to the Zhur core.".to_owned(),
Self::MalformedReply => "The reply from the Zhur core server could not be properly deserialized as an app's HTTP output.".to_owned(),
Self::TimedOut => "The Zhur core could be reached, but timed out before responding.".to_owned(),
Self::SerializeErr => "The invocation could not be serialized for transport.".to_owned(),
Self::NoSuchApp(owner, app_name) => format!("The Zhur core could not find an app named {}:{}. It may have been disabled.", owner, app_name),
Self::OtherInternal => "The core encountered an internal error that prevented it from returning a proper reply.".to_owned()
};
f.write_str(&text)
}
}\
# zhur_invk
Types common to `zhur_core` and `zhur_gate`.\
[package]
name = "zhur_invk"
version = "0.1.0"
authors = ["oreganoli <3611916+oreganoli@users.noreply.github.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
zhur_common = { path = "../zhur_common" }
serde = { version = "1.0.118", features = ["derive"] }\
# zhur_gate
This is the HTTP gateway for Zhur. Its task is to receive HTTP requests, transform them into Zhur invocations, and return the appropriate response.
[package]
name = "zhur_gate"
description = "Zhur HTTP gateway"
version = "0.1.0"
authors = ["oreganoli <3611916+oreganoli@users.noreply.github.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
zhur_common = { path = "../zhur_common" }
hyper = { version = "0.14.2", features = ["full"] }
tokio = { version = "1.0.1", features = ["full"] }
serde = { version = "1.0.118", features = ["derive"] }
zhur_invk = { path = "../zhur_invk" }
http = { version = "0.2.3" }
use zhur_common::{init_logger, zmq,
msg::chan::{
client_server,
}
};
use zhur_gate::comms::Gate2CoreServer;
use zhur_gate::start_server;
#[tokio::main]
async fn main() {
let zmq_ctx = zmq::Context::new();
let gate_core_server = Gate2CoreServer::new(&zmq_ctx);
let (client, server) = client_server(gate_core_server);
std::thread::spawn(move || {
let mut server = server;
loop {
server.handle()
}
});
init_logger();
start_server(client).await;
}
use std::convert::Infallible;
use std::net::SocketAddr;
use hyper::{Server, server::conn::AddrStream, service::{
service_fn,
make_service_fn
}};
use zhur_common::log::*;
/// HTTP request handling code.
mod handle;
use handle::{FullRequest, handle_req};
/// Communication with the core module.
pub mod comms;
use zhur_common::msg::chan::*;
use zhur_invk::{HttpRes, Invocation, InvocationError};
/// Runs a Hyper HTTP server.
pub async fn start_server(client: ChannelClient<Invocation, Result<HttpRes, InvocationError>>) {
let port = match std::env::var("ZHUR_GATE_PORT") {
Ok(v) => match v.parse::<u16>() {
Ok(n) => n,
_ => {
error!("ZHUR_GATE_PORT set to invalid value \"{}\", exiting.", &v);
return;
}
},
_ => {
warn!("ZHUR_GATE_PORT env var not set. Assuming port 8080.");
8080
}
};
let addr = SocketAddr::from(([127, 0, 0, 1], port));
let make_svc = make_service_fn(move |conn: &AddrStream| {
let ip = conn.remote_addr().to_string();
let client = client.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| handle_req(
FullRequest{ req, ip: ip.clone() },
client.clone() // TODO: remove this horrific hack
)))
}
}
);
let server = Server::bind(&addr).serve(make_svc);
if let Err(e) = server.await {
error!("HTTP server error: {}", e);
}
}
use hyper::{
Body,
Request,
Response
};
use std::convert::Infallible;
use zhur_invk::*;
use zhur_common::{log::*, msg::chan::ChannelClient};
mod conversions;
/// The info we need to produce a Zhur invocation from an HTTP request.
pub struct FullRequest {
/// The request itself.
pub req: Request<Body>,
/// The IP address the request is coming from, represented as a string.
pub ip: String,
}
/// Transforms an HTTP request into an HTTP response.
pub async fn handle_req(req: FullRequest, mut client: ChannelClient<Invocation, Result<HttpRes, InvocationError>>) -> Result<Response<Body>, Infallible> {
let invocation = match req.into_invoc().await {
Ok(i) => {
let text = format!("Got an OK invocation for {}:{} of length {}", &i.owner, &i.app_name, &i.payload.len());
info!("{}", &text);
i
},
Err(e) => {
let text = format!("Got an invocation error: {}", e);
warn!("{}", &text);
return Ok(Response::new(text.into())) // TODO: Error pages
}
};
let reply = client.request(invocation);
match reply {
Ok(res) => {
info!("Got a well-formed HttpRes as an invocation result!");
return Ok(Response::new(
format!("Response with body length {}", res.body.len())
.into()
)) // TODO actually handle the damn HttpRes
},
Err(e) => {
let text = format!("Got an invocation error: {}", e);
warn!("{}", &text);
return Ok(Response::new(text.into())) // TODO: Error pages
// TODO: Not brazen copypaste
}
}
}
// TODO: actually send the invocation to the core and try to get something back.
use bincode::deserialize;
use zhur_common::{bincode, log::warn, msg::chan::*, zmq};
use zhur_invk::{HttpRes, Invocation, InvocationError};
use zmq::{Context, Socket, SocketType};
/// Struct responsible for relaying requests from the gateway to the core.
pub struct Gate2CoreServer {
/// ZMQ REQ socket.
req_socket: Socket,
}
impl Gate2CoreServer {
pub fn new(zmq_ctx: &Context) -> Self {
Self {
req_socket: {
let sck = zmq_ctx.socket(SocketType::REQ)
.expect("Expected to be able to construct a socket.");
let endpoint = match std::env::var("ZHUR_CORE_REP_URI") {
Ok(s) => s,
Err(_) => {
warn!("ZHUR_CORE_REP_URI not set - assuming default value of tcp://127.0.0.1:8081!");
"tcp://127.0.0.1:8081".to_owned()
}
};
sck.connect(&endpoint)
.expect("Expected to be able to connect a REQ socket from the gateway to the core.");
sck
}
}
}
}
impl HandleRequest<Invocation, Result<HttpRes, InvocationError>> for Gate2CoreServer {
fn handle(&mut self, msg: Invocation) -> Result<HttpRes, InvocationError> {
let invoc_bytes = match bincode::serialize(&msg) {
Ok(b) => b,
Err(_) => return Err(InvocationError::SerializeErr)
};
match self.req_socket.send(invoc_bytes, 0) {
Ok(_) => (),
Err(_) => return Err(InvocationError::NoCore)
};
let response_bytes = match self.req_socket.recv_bytes(0) {
Ok(b) => b,
Err(_) => return Err(InvocationError::MalformedReply)
};
match deserialize(&response_bytes) {
Ok(r) => r,
Err(_) => Err(InvocationError::MalformedReply)
}
}
}
use zhur_common::{init_logger, zmq,
msg::chan::{
client_server,
}
};
use zhur_gate::comms::Gate2CoreServer;
use zhur_gate::start_server;
#[tokio::main]
async fn main() {
let zmq_ctx = zmq::Context::new();
let gate_core_server = Gate2CoreServer::new(&zmq_ctx);
let (client, server) = client_server(gate_core_server);
std::thread::spawn(move || {
let mut server = server;
loop {
server.handle()
}
});
init_logger();
start_server(client).await;
}\
use std::convert::Infallible;
use std::net::SocketAddr;
use hyper::{Server, server::conn::AddrStream, service::{
service_fn,
make_service_fn
}};
use zhur_common::log::*;
/// HTTP request handling code.
mod handle;
use handle::{FullRequest, handle_req};
/// Communication with the core module.
pub mod comms;
use zhur_common::msg::chan::*;
use zhur_invk::{HttpRes, Invocation, InvocationError};
/// Runs a Hyper HTTP server.
pub async fn start_server(client: ChannelClient<Invocation, Result<HttpRes, InvocationError>>) {
let port = match std::env::var("ZHUR_GATE_PORT") {
Ok(v) => match v.parse::<u16>() {
Ok(n) => n,
_ => {
error!("ZHUR_GATE_PORT set to invalid value \"{}\", exiting.", &v);
return;
}
},
_ => {
warn!("ZHUR_GATE_PORT env var not set. Assuming port 8080.");
8080
}
};
let addr = SocketAddr::from(([127, 0, 0, 1], port));
let make_svc = make_service_fn(move |conn: &AddrStream| {
let ip = conn.remote_addr().to_string();
let client = client.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| handle_req(
FullRequest{ req, ip: ip.clone() },
client.clone() // TODO: remove this horrific hack
)))
}
}
);
let server = Server::bind(&addr).serve(make_svc);
if let Err(e) = server.await {
error!("HTTP server error: {}", e);
}
}\
use hyper::{
Body,
Request,
Response
};
use std::convert::Infallible;
use zhur_invk::*;
use zhur_common::{log::*, msg::chan::ChannelClient};
mod conversions;
/// The info we need to produce a Zhur invocation from an HTTP request.
pub struct FullRequest {
/// The request itself.
pub req: Request<Body>,
/// The IP address the request is coming from, represented as a string.
pub ip: String,
}
/// Transforms an HTTP request into an HTTP response.
pub async fn handle_req(req: FullRequest, mut client: ChannelClient<Invocation, Result<HttpRes, InvocationError>>) -> Result<Response<Body>, Infallible> {
let invocation = match req.into_invoc().await {
Ok(i) => {
let text = format!("Got an OK invocation for {}:{} of length {}", &i.owner, &i.app_name, &i.payload.len());
info!("{}", &text);
i
},
Err(e) => {
let text = format!("Got an invocation error: {}", e);
warn!("{}", &text);
return Ok(Response::new(text.into())) // TODO: Error pages
}
};
let reply = client.request(invocation);
match reply {
Ok(res) => {
info!("Got a well-formed HttpRes as an invocation result!");
return Ok(Response::new(
format!("Response with body length {}", res.body.len())
.into()
)) // TODO actually handle the damn HttpRes
},
Err(e) => {
let text = format!("Got an invocation error: {}", e);
warn!("{}", &text);
return Ok(Response::new(text.into())) // TODO: Error pages
// TODO: Not brazen copypaste
}
}
}\
use std::collections::BTreeMap;
use super::{FullRequest, HttpReq, InvocationError};
use zhur_common::log::*;
use zhur_invk::Invocation;
/// Simplifies a `FullRequest` into an `HttpReq` of ours.
pub async fn simplify_req(req: FullRequest) -> Result<HttpReq, InvocationError> {
use http::header::COOKIE;
let method = req.req.method().to_string();
let uri = req.req.uri();
let path = uri.path().to_owned();
let query_params = match uri.query() {
Some(s) => parse_query_string(s),
None => BTreeMap::new()
};
// dbg!(&query_params);
let cookie_str = match req.req.headers().get(COOKIE) {
Some(hv) => match hv.to_str() {
Ok(s) => Some(s),
Err(_) => {
warn!("Got a Cookie string that is not a valid ASCII string. Discarding.");
None
}
},
None => None
};
let cookies = match cookie_str {
Some(s) => cookie_map(s),
None => BTreeMap::new()
};
let mut headers = BTreeMap::new();
for (name, val) in req.req.headers().iter() {
if name == "Cookie" {
continue;
}
match val.to_str() {
Ok(s) => {
headers.insert(name.to_string(), s.to_owned());
},
Err(_) => {
warn!("Found a header that could not be parsed as a string. Discarding.");
continue;
}
}
}
let body = match hyper::body::to_bytes(req.req.into_body()).await {
Ok(b) => b.to_vec(),
Err(_) => return Err(InvocationError::MalformedRequest)
};
Ok(HttpReq {
body,
path,
method,
cookies,
headers,
query_params,
ip_addr: req.ip.clone()
})
}
impl FullRequest {
/// Turns a `FullRequest` into an `Invocation`.
pub async fn into_invoc(self) -> Result<Invocation, InvocationError> {
use zhur_common::bincode::serialize;
use http::header::HOST;
let host = match self.req.headers().get(HOST) {
Some(s) => match s.to_str() {
Ok(s) => s,
Err(_) => {
warn!("Received an HTTP request with a non-UTF-text Host header, returning malformed ID error.");
return Err(InvocationError::MalformedId("(not valid UTF-8 text)".into()))
}
},
None => {
warn!("Received an HTTP request with no Host header, returning a no ID error.");
return Err(InvocationError::NoId)
}
}.to_owned();
let segments = host.split('.').collect::<Vec<_>>();
if segments.is_empty() {
warn!("Received an HTTP request with an empty Host header, returning a no ID error.");
return Err(InvocationError::NoId);
} else if segments.len() < 2 {
warn!("Received an HTTP request with a Host header that could not be transformed into an app ID: \"{}\"", host);
return Err(InvocationError::MalformedId(host.into()));
}
let req_simple = simplify_req(self).await?;
let req_bytes = match serialize(&req_simple) {
Ok(b) => b,
Err(_) => return Err(InvocationError::MalformedRequest)
};
let result = Invocation {
owner: segments[0].to_owned(),
app_name: segments[1].to_owned(),
payload: req_bytes
};
Ok(result)
}
}
/// Parses a query string of the form "a=b&c=d" into params. Note: Hyper takes care of extracting said string out of a URI already, so no need to worry about ?.
fn parse_query_string(s: &str) -> BTreeMap<String, String> {
let mut output = BTreeMap::new();
let pairs = s.split("&");
for pair in pairs {
let mut param_val = pair.split("=");
let param = match param_val.next() {
Some(p) => p,
None => continue
};
let val = match param_val.next() {
Some(v) => v,
None => continue
};
output.insert(param.to_owned(), val.to_owned());
}
output
}
/// Produces a map of cookies to their values given a cookie string.
fn cookie_map(cookie_str: &str) -> BTreeMap<String, String> {
let mut output = BTreeMap::new();
for cookie in cookie_str.split("; ") {
let mut name_val = cookie.split("=");
let name = match name_val.next() {
Some(n) => n,
None => continue
};
let val = match name_val.next() {
Some(v) => v,
None => continue
};
output.insert(name.to_owned(), val.to_owned());
}
output
}
use std::collections::BTreeMap;
use super::{FullRequest, HttpReq, InvocationError};
use zhur_common::log::*;
use zhur_invk::Invocation;
/// Simplifies a `FullRequest` into an `HttpReq` of ours.
pub async fn simplify_req(req: FullRequest) -> Result<HttpReq, InvocationError> {
use http::header::COOKIE;
let method = req.req.method().to_string();
let uri = req.req.uri();
let path = uri.path().to_owned();
let query_params = match uri.query() {
Some(s) => parse_query_string(s),
None => BTreeMap::new()
};
// dbg!(&query_params);
let cookie_str = match req.req.headers().get(COOKIE) {
Some(hv) => match hv.to_str() {
Ok(s) => Some(s),
Err(_) => {
warn!("Got a Cookie string that is not a valid ASCII string. Discarding.");
None
}
},
None => None
};
let cookies = match cookie_str {
Some(s) => cookie_map(s),
None => BTreeMap::new()
};
let mut headers = BTreeMap::new();
for (name, val) in req.req.headers().iter() {
if name == "Cookie" {
continue;
}
match val.to_str() {
Ok(s) => {
headers.insert(name.to_string(), s.to_owned());
},
Err(_) => {
warn!("Found a header that could not be parsed as a string. Discarding.");
continue;
}
}
}
let body = match hyper::body::to_bytes(req.req.into_body()).await {
Ok(b) => b.to_vec(),
Err(_) => return Err(InvocationError::MalformedRequest)
};
Ok(HttpReq {
body,
path,
method,
cookies,
headers,
query_params,
ip_addr: req.ip.clone()
})
}
impl FullRequest {
/// Turns a `FullRequest` into an `Invocation`.
pub async fn into_invoc(self) -> Result<Invocation, InvocationError> {
use zhur_common::bincode::serialize;
use http::header::HOST;
let host = match self.req.headers().get(HOST) {
Some(s) => match s.to_str() {
Ok(s) => s,
Err(_) => {
warn!("Received an HTTP request with a non-UTF-text Host header, returning malformed ID error.");
return Err(InvocationError::MalformedId("(not valid UTF-8 text)".into()))
}
},
None => {
warn!("Received an HTTP request with no Host header, returning a no ID error.");
return Err(InvocationError::NoId)
}
}.to_owned();
let segments = host.split('.').collect::<Vec<_>>();
if segments.is_empty() {
warn!("Received an HTTP request with an empty Host header, returning a no ID error.");
return Err(InvocationError::NoId);
} else if segments.len() < 2 {
warn!("Received an HTTP request with a Host header that could not be transformed into an app ID: \"{}\"", host);
return Err(InvocationError::MalformedId(host.into()));
}
let req_simple = simplify_req(self).await?;
let req_bytes = match serialize(&req_simple) {
Ok(b) => b,
Err(_) => return Err(InvocationError::MalformedRequest)
};
let result = Invocation {
owner: segments[0].to_owned(),
app_name: segments[1].to_owned(),
payload: req_bytes
};
Ok(result)
}
}
/// Parses a query string of the form "a=b&c=d" into params. Note: Hyper takes care of extracting said string out of a URI already, so no need to worry about ?.
fn parse_query_string(s: &str) -> BTreeMap<String, String> {
let mut output = BTreeMap::new();
let pairs = s.split("&");
for pair in pairs {
let mut param_val = pair.split("=");
let param = match param_val.next() {
Some(p) => p,
None => continue
};
let val = match param_val.next() {
Some(v) => v,
None => continue
};
output.insert(param.to_owned(), val.to_owned());
}
output
}
/// Produces a map of cookies to their values given a cookie string.
fn cookie_map(cookie_str: &str) -> BTreeMap<String, String> {
let mut output = BTreeMap::new();
for cookie in cookie_str.split("; ") {
let mut name_val = cookie.split("=");
let name = match name_val.next() {
Some(n) => n,
None => continue
};
let val = match name_val.next() {
Some(v) => v,
None => continue
};
output.insert(name.to_owned(), val.to_owned());
}
output
}\
use bincode::deserialize;
use zhur_common::{bincode, log::warn, msg::chan::*, zmq};
use zhur_invk::{HttpRes, Invocation, InvocationError};
use zmq::{Context, Socket, SocketType};
/// Struct responsible for relaying requests from the gateway to the core.
pub struct Gate2CoreServer {
/// ZMQ REQ socket.
req_socket: Socket,
}
impl Gate2CoreServer {
pub fn new(zmq_ctx: &Context) -> Self {
Self {
req_socket: {
let sck = zmq_ctx.socket(SocketType::REQ)
.expect("Expected to be able to construct a socket.");
let endpoint = match std::env::var("ZHUR_CORE_REP_URI") {
Ok(s) => s,
Err(_) => {
warn!("ZHUR_CORE_REP_URI not set - assuming default value of tcp://127.0.0.1:8081!");
"tcp://127.0.0.1:8081".to_owned()
}
};
sck.connect(&endpoint)
.expect("Expected to be able to connect a REQ socket from the gateway to the core.");
sck
}
}
}
}
impl HandleRequest<Invocation, Result<HttpRes, InvocationError>> for Gate2CoreServer {
fn handle(&mut self, msg: Invocation) -> Result<HttpRes, InvocationError> {
let invoc_bytes = match bincode::serialize(&msg) {
Ok(b) => b,
Err(_) => return Err(InvocationError::SerializeErr)
};
match self.req_socket.send(invoc_bytes, 0) {
Ok(_) => (),
Err(_) => return Err(InvocationError::NoCore)
};
let response_bytes = match self.req_socket.recv_bytes(0) {
Ok(b) => b,
Err(_) => return Err(InvocationError::MalformedReply)
};
match deserialize(&response_bytes) {
Ok(r) => r,
Err(_) => Err(InvocationError::MalformedReply)
}
}
}\
# zhur_gate
This is the HTTP gateway for Zhur. Its task is to receive HTTP requests, transform them into Zhur invocations, and return the appropriate response.\
[package]
name = "zhur_gate"
description = "Zhur HTTP gateway"
version = "0.1.0"
authors = ["oreganoli <3611916+oreganoli@users.noreply.github.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
zhur_common = { path = "../zhur_common" }
zhur_invk = { path = "../zhur_invk" }
http = { version = "0.2.3" }
hyper = { version = "0.14.2", features = ["full"] }
tokio = { version = "1.0.1", features = ["full"] }
serde = { version = "1.0.118", features = ["derive"] }\
[package]
name = "zhur_core"
version = "0.1.0"
authors = ["oreganoli <3611916+oreganoli@users.noreply.github.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
zhur_common = { path = "../zhur_common" }
zhur_invk = { path = "../zhur_invk" }
use zhur_core::CoreServer;
use zhur_common::zmq::Context;
use zhur_common::init_logger;
fn main() {
init_logger();
let zmq_ctx = Context::new();
let server = CoreServer::new(&zmq_ctx);
loop {
server.handle();
}
}
mod wasm;
use zhur_common::bincode::{deserialize, serialize};
use zhur_common::log::*;
use zhur_common::zmq::{Context, Socket, SocketType};
use zhur_invk::{HttpRes, Invocation, InvocationError};
/// The ZMQ server that takes invocations incoming from the gateway and sends back HttpReses or InvocationErrors.
pub struct CoreServer {
rep_socket: Socket
}
impl CoreServer {
pub fn new(zmq_ctx: &Context) -> Self {
Self {
rep_socket: {
let socket = zmq_ctx.socket(SocketType::REP)
.expect("Expected to be able to construct a ZMQ socket.");
let endpoint = match std::env::var("ZHUR_CORE_REP_URI") {
Ok(s) => s,
Err(_) => {
warn!("ZHUR_CORE_REP_URI not set - assuming default value of tcp://127.0.0.1:8081!");
"tcp://127.0.0.1:8081".to_owned()
}
};
socket.bind(&endpoint)
.expect("Expected to be able to bind the core server socket.");
socket
}
}
}
pub fn handle(&self) {
let bytes = match self.rep_socket.recv_bytes(0) {
Ok(b) => {
trace!("CoreServer received some bytes.");
b
},
Err(_) => {
panic!("CoreServer could not receive any bytes.")
}
};
let response = handle_invoke_bytes(&bytes);
let res_bytes = match serialize(&response) {
Ok(b) => b,
Err(_) => panic!("Could not serialize a response.")
};
match self.rep_socket.send(res_bytes, 0) {
Ok(_) => {
trace!("Sent a response!");
()
},
Err(_) => panic!("Could not send a reply.")
}
}
}
fn handle_invoke_bytes(bytes: &[u8]) -> Result<HttpRes, InvocationError> {
match deserialize::<Invocation>(bytes) {
Ok(i) => handle_invocation(&i),
Err(_) => Err(InvocationError::MalformedRequest)
}
}
fn handle_invocation(i: &Invocation) -> Result<HttpRes, InvocationError> {
trace!("Got an invocation for {}:{}", &i.owner, &i.app_name);
let mut res = HttpRes::default();
res.body = b"Hello world from Zhur!".to_vec();
Ok(res)
}
mod wasm;
use std::thread::JoinHandle;
use zhur_common::{flume::Sender, log::*, msg::chan::Envelope};
use zhur_invk::{HttpRes, Invocation, InvocationError};
pub struct Executor {
/// The identifying number of the executor.
pub id: usize,
/// The username of this app's owner.
pub owner: String,
/// The name of the app currently held in the executor.
pub app_name: String,
/// Sender for passing messages to the executor's `inner_thread`.
msg_tx: Sender<ExecutorMsg>,
/// The thread that holds the actual code engine.
inner_thread: JoinHandle<()>,
}
/// Messages sent by the outer executor to its inner thread.
pub enum ExecutorMsg {
/// Replace the WASM code currently loaded. Issued on app updates or substitutions.
LoadCode(Vec<u8>),
/// Self-explanatory.
Invoke(Envelope<Invocation, Result<HttpRes, InvocationError>>),
/// Shut the inner thread down.
Shutdown
}
impl Executor {
pub fn shutdown(self) {
match self.msg_tx.send(ExecutorMsg::Shutdown) {
Ok(_) => (),
Err(_) => {
let text = format!("Executor #{} could not send a shutdown message to its inner thread!", self.id);
error!("{}", &text);
panic!("{}", &text);
}
}
match self.inner_thread.join() {
Ok(_) => (),
Err(_) => {
let text = format!("Executor #{} could not join on its inner thread!", self.id);
error!("{}", &text);
panic!("{}", &text);
}
}
}
}\
use zhur_core::CoreServer;
use zhur_common::zmq::Context;
use zhur_common::init_logger;
fn main() {
init_logger();
let zmq_ctx = Context::new();
let server = CoreServer::new(&zmq_ctx);
loop {
server.handle();
}
}\
use zhur_common::bincode::{deserialize, serialize};
use zhur_common::log::*;
use zhur_common::zmq::{Context, Socket, SocketType};
use zhur_invk::{HttpRes, Invocation, InvocationError};
mod wasm;
/// The ZMQ server that takes invocations incoming from the gateway and sends back HttpReses or InvocationErrors.
pub struct CoreServer {
rep_socket: Socket
}
impl CoreServer {
pub fn new(zmq_ctx: &Context) -> Self {
Self {
rep_socket: {
let socket = zmq_ctx.socket(SocketType::REP)
.expect("Expected to be able to construct a ZMQ socket.");
let endpoint = match std::env::var("ZHUR_CORE_REP_URI") {
Ok(s) => s,
Err(_) => {
warn!("ZHUR_CORE_REP_URI not set - assuming default value of tcp://127.0.0.1:8081!");
"tcp://127.0.0.1:8081".to_owned()
}
};
socket.bind(&endpoint)
.expect("Expected to be able to bind the core server socket.");
socket
}
}
}
pub fn handle(&self) {
let bytes = match self.rep_socket.recv_bytes(0) {
Ok(b) => {
trace!("CoreServer received some bytes.");
b
},
Err(_) => {
panic!("CoreServer could not receive any bytes.")
}
};
let response = handle_invoke_bytes(&bytes);
// TODO: There is no need to deserialize into HttpReses or InvocationErrors within the core. The gateway does that anyway.
let res_bytes = match serialize(&response) {
Ok(b) => b,
Err(_) => panic!("Could not serialize a response.")
};
match self.rep_socket.send(res_bytes, 0) {
Ok(_) => {
trace!("Sent a response!");
()
},
Err(_) => panic!("Could not send a reply.")
}
}
}
fn handle_invoke_bytes(bytes: &[u8]) -> Result<HttpRes, InvocationError> {
match deserialize::<Invocation>(bytes) {
Ok(i) => handle_invocation(&i),
Err(_) => Err(InvocationError::MalformedRequest)
}
}
fn handle_invocation(i: &Invocation) -> Result<HttpRes, InvocationError> {
trace!("Got an invocation for {}:{}", &i.owner, &i.app_name);
let mut res = HttpRes::default();
res.body = b"Hello world from Zhur!".to_vec();
Ok(res)
}\
# zhur_core
The core module of a Zhur installation. Handles incoming invocations with user-uploaded WASM code.\
[package]
name = "zhur_core"
version = "0.1.0"
authors = ["oreganoli <3611916+oreganoli@users.noreply.github.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
zhur_common = { path = "../zhur_common" }
zhur_invk = { path = "../zhur_invk" }\