NYYW4ZOQX75F3WRHJYRDIKKCHTNQSSM6QZFLJC6JKZUM4AAZXYDAC
HIIO3QARO32NKUZOLZCCUPQUDWTSBFTROOMPQRWMOBNZNOUGRQRAC
DBEJ4WF23FBK5QPXENXCT6P43JBJQU36RGYMQT5CAA53EZUCW7VQC
WZU7DT3XXIH4WXNZEHZJBTW6PUMQDTHAUIX7EGV6NZYPZXUJU3GQC
BJZD3LYFA77H5MPFKDBVXK3YE5OCL4T4KWRKFM67SZTA6UHRUNGQC
IX73DI3JJBWPP7NDCWXD6OFZVHC6LPEMEWUGLAO2W3DW2UZPDLNAC
EQDUMQSNRCJGYQZL4M6GODYKUG2UMO4G5VWESE6DK4BMWJKGC3HQC
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use log::info;
use std::collections::HashMap;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::broadcast::Sender;
pub struct SignalGuard {}
impl SignalGuard {
pub async fn wait_for_stop(signal_forwarder: &Sender<i32>) -> Option<()> {
// Default behavior in UNIX is to terminate process when any of the
// following signals are received. In each of these case we initiate
// the controlled shutdown.
let mut interrupt_stream = signal(SignalKind::interrupt()).ok()?;
let mut terminate_stream = signal(SignalKind::terminate()).ok()?;
let mut alarm_stream = signal(SignalKind::alarm()).ok()?;
let mut hangup_stream = signal(SignalKind::hangup()).ok()?;
let mut quit_stream = signal(SignalKind::quit()).ok()?;
let mut user_defined1_stream = signal(SignalKind::user_defined1()).ok()?;
let mut user_defined2_stream = signal(SignalKind::user_defined2()).ok()?;
let interrupt_future = interrupt_stream.recv();
let terminate_future = terminate_stream.recv();
let alarm_future = alarm_stream.recv();
let hangup_future = hangup_stream.recv();
let quit_future = quit_stream.recv();
let user_defined1_future = user_defined1_stream.recv();
let user_defined2_future = user_defined2_stream.recv();
let all_futures = vec![
interrupt_future,
terminate_future,
alarm_future,
hangup_future,
quit_future,
user_defined1_future,
user_defined2_future,
];
let mut signal_map = HashMap::with_capacity(all_futures.len());
signal_map.insert(0, SignalKind::interrupt().as_raw_value());
signal_map.insert(1, SignalKind::terminate().as_raw_value());
signal_map.insert(2, SignalKind::alarm().as_raw_value());
signal_map.insert(3, SignalKind::hangup().as_raw_value());
signal_map.insert(4, SignalKind::quit().as_raw_value());
signal_map.insert(5, SignalKind::user_defined1().as_raw_value());
signal_map.insert(6, SignalKind::user_defined2().as_raw_value());
let all_futures = all_futures
.into_iter()
.enumerate()
.map(|(i, fut)| fut.map(move |res| (i, res)))
.collect::<FuturesUnordered<_>>();
// use collection as a stream, await only first 1 future to complete
let first_one = all_futures.take(1).collect::<Vec<_>>().await;
let signal_index = first_one[0].0;
// Currently we mask all signals to SIGTERM.
let received = signal_map[&signal_index];
info!("SIGNAL_GUARD: RECEIVED CODE {} FROM OS", received);
signal_forwarder.send(received).unwrap();
Option::Some(())
}
}
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use log::info;
use std::collections::HashMap;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::broadcast::Sender;
pub struct SignalGuard {}
impl SignalGuard {
pub async fn wait_for_stop(signal_forwarder: &Sender<i32>) -> Option<()> {
// Default behavior in UNIX is to terminate process when any of the
// following signals are received. In each of these case we initiate
// the controlled shutdown.
let mut interrupt_stream = signal(SignalKind::interrupt()).ok()?;
let mut terminate_stream = signal(SignalKind::terminate()).ok()?;
let mut alarm_stream = signal(SignalKind::alarm()).ok()?;
let mut hangup_stream = signal(SignalKind::hangup()).ok()?;
let mut quit_stream = signal(SignalKind::quit()).ok()?;
let mut user_defined1_stream = signal(SignalKind::user_defined1()).ok()?;
let mut user_defined2_stream = signal(SignalKind::user_defined2()).ok()?;
let interrupt_future = interrupt_stream.recv();
let terminate_future = terminate_stream.recv();
let alarm_future = alarm_stream.recv();
let hangup_future = hangup_stream.recv();
let quit_future = quit_stream.recv();
let user_defined1_future = user_defined1_stream.recv();
let user_defined2_future = user_defined2_stream.recv();
let all_futures = vec![
interrupt_future,
terminate_future,
alarm_future,
hangup_future,
quit_future,
user_defined1_future,
user_defined2_future,
];
let mut signal_map = HashMap::with_capacity(all_futures.len());
signal_map.insert(0, SignalKind::interrupt().as_raw_value());
signal_map.insert(1, SignalKind::terminate().as_raw_value());
signal_map.insert(2, SignalKind::alarm().as_raw_value());
signal_map.insert(3, SignalKind::hangup().as_raw_value());
signal_map.insert(4, SignalKind::quit().as_raw_value());
signal_map.insert(5, SignalKind::user_defined1().as_raw_value());
signal_map.insert(6, SignalKind::user_defined2().as_raw_value());
let all_futures = all_futures
.into_iter()
.enumerate()
.map(|(i, fut)| fut.map(move |res| (i, res)))
.collect::<FuturesUnordered<_>>();
// use collection as a stream, await only first 1 future to complete
let first_one = all_futures.take(1).collect::<Vec<_>>().await;
let signal_index = first_one[0].0;
// Currently we mask all signals to SIGTERM.
let received = signal_map[&signal_index];
info!("SIGNAL_GUARD: RECEIVED CODE {} FROM OS", received);
signal_forwarder.send(received).unwrap();
Option::Some(())
}
}
// Adapted from: https://github.com/tokio-rs/mini-redis/blob/069a8e5ee0de445ad3e642548d196ef6b4fa8bc1/src/shutdown.rs
use tokio::sync::broadcast;
pub struct Shutdown {
/// The receive half of the channel used to listen for shutdown.
notify: broadcast::Receiver<i32>,
}
impl Shutdown {
/// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
pub fn new(notify: broadcast::Receiver<i32>) -> Self {
Self { notify }
}
#[allow(dead_code)]
pub fn replicate(&self) -> Self {
Self {
notify: self.notify.resubscribe(),
}
}
/// Receive the shutdown notice, waiting if necessary.
pub async fn recv(&mut self) {
// Cannot receive a "lag error" as only one value is ever sent.
let _ = self.notify.recv().await;
}
}
// Adapted from: https://github.com/tokio-rs/mini-redis/blob/069a8e5ee0de445ad3e642548d196ef6b4fa8bc1/src/shutdown.rs
use tokio::sync::broadcast;
pub struct Shutdown {
/// The receive half of the channel used to listen for shutdown.
notify: broadcast::Receiver<i32>,
received: bool,
}
impl Shutdown {
/// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
pub fn new(notify: broadcast::Receiver<i32>) -> Self {
Self {
notify,
received: false,
}
}
pub fn replicate(&self) -> Self {
Self {
notify: self.notify.resubscribe(),
received: self.received,
}
}
/// Receive the shutdown notice, waiting if necessary.
pub async fn recv(&mut self) {
if self.received {
return;
}
// Cannot receive a "lag error" as only one value is ever sent.
let _ = self.notify.recv().await;
self.received = true;
}
}
use tokio::time::{sleep, Duration};
pub struct Service {}
pub struct ServiceError {}
impl Service {
pub fn new() -> Self {
Self {}
}
pub async fn wait(&self) -> Result<u64, ServiceError> {
sleep(Duration::from_secs(7)).await;
Result::Ok(117)
}
pub async fn serve(&self, _handle: u64) -> Result<(), ServiceError> {
sleep(Duration::from_secs(15)).await;
Result::Ok(())
}
}
use crate::shutdown::Shutdown;
use log::info;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
pub struct Service {
control: Shutdown,
}
impl Service {
pub fn new(control: Shutdown) -> Self {
Self { control }
}
pub async fn serve(&mut self) -> Option<()> {
info!("SERVICE: RUN");
let communication_loop = tokio::spawn(async move {
let listener = TcpListener::bind("127.0.0.1:8080").await.ok()?;
loop {
// Wait for incoming network request.
// Dispatch it to the communicator and repeat.
// This is ECHO server example from the Tokio dokumentation.
let (mut socket, _) = listener.accept().await.ok()?;
tokio::spawn(async move {
let mut buf = [0; 1024];
// In a loop, read data from the socket and write the data back.
loop {
let n = match socket.read(&mut buf).await {
// socket closed
Ok(n) if n == 0 => return Option::Some(()),
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {:?}", e);
return Option::Some(());
}
};
// Write the data back
if let Err(e) = socket.write_all(&buf[0..n]).await {
eprintln!("failed to write to socket; err = {:?}", e);
return Option::Some(());
}
}
});
}
// TODO: Needs some cleanup as this required by
// compiler but does generate warning.
Option::Some(())
});
self.control.recv().await;
info!("SERVICE: RECEIVED CONTROL SIGNAL");
communication_loop.abort();
info!("COMMUNICATION: CANCELLED");
info!("SERVICE: STOP");
Option::Some(())
}
}
match service_response {
Ok(_) => info!("SERVICE: PROCESSED (REASON: Handle {} has been processed.)",handle),
Err(_service_error) => error!("SERVICE: FAILURE (REASON: Request handling error for handle {}.)",handle)
}
});
}
Err(_service_error) => error!("SERVICE: FAILURE (REASON: Unable to acquire new handle for request servicing.)")
}
}
network_service.serve().await;
# Concepts
This chapter describes the core concepts around the implementation and the IRC specification. Some material may be restated from the source but it is gathered here for reference purposes.
## IRC
For the core IRC concepts see: [IRC concepts](https://modern.ircdocs.horse/#irc-concepts)
## Implementation
### Client
This is something that sends bytes to our server according to the IRC protocol (something that communicates with the server but is not a server). This is also something outside of this implementation and this is modelled as something from which we receive and send bytes to.
### Server
This is our implementation which communicates with the client and other servers in the same network. According to IRC specification network consists of servers and even though there is server to server protocols available in practise one network consists of same server implementations and they can use any protocol they wish to communicate state changes between them. We will utilize this in our implementation.
### User
This entity models a real user (either human or bot) which uses the client to communicate with our server.
### Channel
Channels represent the logical conversation groups in an IRC network. These are synced between servers in the network. This entity ties users to channels in which they can exchange messages.
### Capability
When the client connects to the server they first have an exchange of information which tells the client what capabilities the server provides. This allows the client to establish reasonable communication with the server. In the implementation different capabilities are implemented in a modular manner which are queried during runtime to provide the capability information to the client.
# Architecture
In general principle we follow modular monolith approach where we have a single executable for the application but that application is composed of modules which are independent and focus on specific features (capabilities) and the runtime then ties these together. In addition there are modules which handles the low-level work of communication.
## Components
### Server
This is the main high-level component that drives our server instance.
#### Capability
There are multiple capability components that each implement a specific functionality to the server.
### Pool
This is the component which tracks the running service instance in the network it is attached to. Network can consist of single server instance. State changes in this instance are then propagated to other servers in pool.
#### Server Communicator
This sub-component of the pool manages the communication between server instances.
### State Manager
This component maintains the state information (users, channels, etc.) for the current instance. It is linked to the pool component and the pool observes the changes in the state manager and propagates them forward and vice versa. That is pool may receive state update from a different server instance and pool then signals these changes to this instance's state manager.
### Protocol Stack
This component maintains the reference information for different communication and message components about specific IRC code constants and other similar information in the IRC protocol.
### Communicator
This component is responsible for sending and receiving bytes between client and server. It ensures the correct formatting of messages and their validation. This component is also linked to the state manager.
# Concepts
This chapter describes the core concepts around the implementation and the IRC specification. Some material may be restated from the source but it is gathered here for reference purposes.
## IRC
For the core IRC concepts see: [IRC concepts](https://modern.ircdocs.horse/#irc-concepts)
## Implementation
### Client
This is something that sends bytes to our server according to the IRC protocol (something that communicates with the server but is not a server). This is also something outside of this implementation and this is modelled as something from which we receive and send bytes to.
### Server
This is our implementation which communicates with the client and other servers in the same network. According to IRC specification network consists of servers and even though there is server to server protocols available in practise one network consists of same server implementations and they can use any protocol they wish to communicate state changes between them. We will utilize this in our implementation.
### User
This entity models a real user (either human or bot) which uses the client to communicate with our server.
### Channel
Channels represent the logical conversation groups in an IRC network. These are synced between servers in the network. This entity ties users to channels in which they can exchange messages.
### Capability
When the client connects to the server they first have an exchange of information which tells the client what capabilities the server provides. This allows the client to establish reasonable communication with the server. In the implementation different capabilities are implemented in a modular manner which are queried during runtime to provide the capability information to the client.
# Architecture
In general principle we follow modular monolith approach where we have a single executable for the application but that application is composed of modules which are independent and focus on specific features (capabilities) and the runtime then ties these together. In addition there are modules which handles the low-level work of communication.
## Components
### Server
This is the main high-level component that drives our server instance.
#### Capability
There are multiple capability components that each implement a specific functionality to the server.
### Pool
This is the component which tracks the running service instance in the network it is attached to. Network can consist of single server instance. State changes in this instance are then propagated to other servers in pool.
#### Server Communicator
This sub-component of the pool manages the communication between server instances.
### State Manager
This component maintains the state information (users, channels, etc.) for the current instance. It is linked to the pool component and the pool observes the changes in the state manager and propagates them forward and vice versa. That is pool may receive state update from a different server instance and pool then signals these changes to this instance's state manager.
### Protocol Stack
This component maintains the reference information for different communication and message components about specific IRC code constants and other similar information in the IRC protocol.
### Communicator
This component is responsible for sending and receiving bytes between client and server. It ensures the correct formatting of messages and their validation. This component is also linked to the state manager.