XJJ6HCY4AYXPWPAGUQEKY2XNPEDV36H6OJKYRPKXC2BIASCLCSMAC
VCTIMI5CEMFTOMDWYI7S2ZKSFN5XYUXH4GBAIKSEZ2N3SBLSH26QC
VEN5WJYRT23IT77JAAZ5CJRSW3GUTTNMAECT3WVTHQA34HI4646AC
B2BZZXWMRR2FYPBMSSQ6WBCOWX5R3JOXSESX4LFUAQF3FOS4APBQC
F2QYIRKBFYFWSTB7Z5CNGSZVYI6XQO3MHPUHJLBHPGLIOG7UNVNQC
I5WVRUHGAQGWFZGX7YBKCGQKLHXZHZTCBWFIUCXVGY2WVFB77VQQC
FTI67CGF4MMPDFA6YJN6UKOADQLFAECKGYPTWSPSALVQK76BJMJAC
TUBVSWW3OQX2OJKF2F5ACCPL3G6U6SNJQ3EWOSPBCV6EIZPM2LMAC
IA2CJ4HDSST6MPDLX2FVE2ZZR7EP5O6AIAUFGZIMWJ6NODVMKOKAC
AIF5IVL7B5S2X3N4RLS6GNKUCASQZPOH6NCVGECUFHQ5ZUJVDU6QC
T2HK3ZSDLLLGHROM77MND4UZF663WSHT5J2CT7ZMXDH6MMIZFOAQC
B2MSSEJB4GIBFA6F2HRMK7FXUHJBZIZFK6NOHY7YZUSHRB53URGQC
42MNZNAH4QWQPYFUEMXD2OCDH5PRANP4IXEIK3SBYEOMJXTOKQ7QC
4KBRNRLMAVPUVMU2EYNRJTTOZPQRGV2YHZNMLNGBA4Z3QAXKWNKAC
4LR5AJ4AX277G4G3SRDHNCIJIOCRZ6I4TUNTSZOAVUX4MW5GYFTQC
FKTISISENWXRFIQHUBCSWQAAN7OWUWIOD6R64YEQOBZ6VENZUWYQC
AMFKSNF2C3Y4Z5UNFMUFRSZUOWKQCBMCRFFYW44ZJLSPGVI3VFOAC
K7M77GF5ILC4KKKYPTLZRZ2OND7DOQDQNRYKM3N6XV2DMJURYA3QC
use super::Builder;
use crate::{
client,
message::Message,
store::Store,
};
use crossbeam_utils::sync::WaitGroup;
use flume::{
Receiver,
Sender,
};
use log::warn;
use std::{
os::unix::net::UnixDatagram,
path::PathBuf,
sync::{
atomic::{
AtomicBool,
Ordering,
},
Arc,
},
thread,
};
use thiserror::Error;
const BUFFER_SIZE: usize = 65_527;
#[derive(Error, Debug)]
pub enum Error {
#[error("can not receive message from socket: {0}")]
ReceiveFromSocket(std::io::Error),
#[error("can not send received data to processing: {0}")]
SendBuffer(flume::SendError<Vec<u8>>),
#[error("can not deserialize message: {0}")]
DeserializeMessage(bincode::Error),
#[error("can not receive data from channel: {0}")]
ReceiveData(flume::RecvError),
#[error("can not remove socket: {0}")]
RemoveSocket(std::io::Error),
#[error("can not setup ctrlc handler: {0}")]
SetupCtrlHandler(ctrlc::Error),
}
pub struct Server {
pub(super) entries: sled::Db,
pub(super) socket: UnixDatagram,
pub(super) socket_path: PathBuf,
pub(super) store: Store,
pub(super) stopping: Arc<AtomicBool>,
pub(super) wait_group: WaitGroup,
}
pub fn builder(cache_dir: PathBuf, data_dir: PathBuf, socket: PathBuf) -> Builder {
Builder {
cache_dir,
data_dir,
socket,
}
}
impl Server {
pub fn run(self) -> Result<(), Error> {
let data_sender =
Self::start_processor(Arc::clone(&self.stopping), self.wait_group.clone())?;
Self::start_receiver(
Arc::clone(&self.stopping),
self.wait_group.clone(),
self.socket,
data_sender,
)?;
Self::ctrl_c_watcher(self.stopping, self.socket_path.clone())?;
self.wait_group.wait();
std::fs::remove_file(&self.socket_path).map_err(Error::RemoveSocket)?;
Ok(())
}
fn ctrl_c_watcher(stopping: Arc<AtomicBool>, socket_path: PathBuf) -> Result<(), Error> {
ctrlc::set_handler(move || {
stopping.store(true, Ordering::SeqCst);
let client = client::new(socket_path.clone());
if let Err(err) = client.send(&Message::Stop) {
warn!("{}", err);
}
})
.map_err(Error::SetupCtrlHandler)?;
Ok(())
}
fn start_receiver(
stopping: Arc<AtomicBool>,
wait_group: WaitGroup,
socket: UnixDatagram,
data_sender: Sender<Vec<u8>>,
) -> Result<(), Error> {
thread::spawn(move || {
loop {
if dbg!(stopping.load(Ordering::SeqCst)) {
dbg!("break loop");
break;
}
if let Err(err) = Self::receive(&socket, &data_sender) {
warn!("{}", err)
}
}
drop(wait_group)
});
Ok(())
}
fn receive(socket: &UnixDatagram, data_sender: &Sender<Vec<u8>>) -> Result<(), Error> {
dbg!("receive");
let mut buffer = [0_u8; BUFFER_SIZE];
let (written, _) = socket
.recv_from(&mut buffer)
.map_err(Error::ReceiveFromSocket)?;
data_sender
.send(buffer[0..written].to_vec())
.map_err(Error::SendBuffer)?;
Ok(())
}
fn start_processor(
stopping: Arc<AtomicBool>,
wait_group: WaitGroup,
) -> Result<Sender<Vec<u8>>, Error> {
let (data_sender, data_receiver) = flume::unbounded();
thread::spawn(move || {
loop {
if dbg!(stopping.load(Ordering::SeqCst)) {
break;
}
if let Err(err) = Self::process(&stopping, &data_receiver) {
warn!("{}", err)
}
}
drop(wait_group)
});
Ok(data_sender)
}
fn process(stopping: &Arc<AtomicBool>, data_receiver: &Receiver<Vec<u8>>) -> Result<(), Error> {
let data = data_receiver.recv().map_err(Error::ReceiveData)?;
let message = bincode::deserialize(&data).map_err(Error::DeserializeMessage)?;
dbg!(&message);
match message {
Message::Stop => stopping.store(true, Ordering::SeqCst),
_ => todo!(),
}
Ok(())
}
}
use thiserror::Error;
const BUFFER_SIZE: usize = 65_527;
#[derive(Error, Debug)]
pub enum Error {
#[error("command for session already started")]
SessionCommandAlreadyStarted,
#[error("command for session not started yet")]
SessionCommandNotStarted,
impl Server {
let (data_sender, data_receiver) = flume::unbounded();
let (signal_sender, signal_receiver) = flume::unbounded();
let stopping = Arc::new(AtomicBool::new(false));
Self::ctrl_c_watcher(signal_sender)?;
Self::signal_watcher(signal_receiver, Arc::clone(&stopping));
Self::start_processor(data_receiver);
loop {
}
}
let writer = std::io::BufWriter::new(file);
Ok(self)
}
fn signal_watcher(signal_receiver: Receiver<RunState>, stopping: Arc<AtomicBool>) {
thread::spawn(move || loop {
match signal_receiver.recv() {
Ok(signal) => match signal {
RunState::Stop => stopping.store(true, Ordering::SeqCst),
RunState::Continue => continue,
},
Err(err) => warn!("{}", err),
}
});
serde_json::to_writer(writer, &self.state).map_err(Error::SerializeState)?;
let file = std::fs::File::create(&self.cache_path).map_err(Error::CreateCacheFile)?;
}
match message {
Message::CommandStart(data) => self.command_start(data),
Message::Running => self.command_running(),
Message::Disable(uuid) => self.disable_session(uuid),
Message::Enable(uuid) => self.enable_session(uuid),
}
}
Ok(RunState::Continue)
}
}
}
fn disable_session(&mut self, session_id: Uuid) -> Result<RunState, Error> {
self.state
.entries
.remove(&session_id)
.expect("already tested if exists");
self.state.disabled_session.insert(session_id);
Ok(RunState::Continue)
}
fn enable_session(&mut self, session_id: Uuid) -> Result<RunState, Error> {
self.state.disabled_session.remove(&session_id);
Ok(RunState::Continue)
}
fn command_running(&self) -> Result<RunState, Error> {
info!(
"session_id={session_id}, command={command}",
session_id = session_id,
command = entry.command
)
});
Ok(RunState::Continue)
self.state.entries.iter().for_each(|(session_id, entry)| {
self.store.add(&entry).map_err(Error::AddStore)?;
}
return Err(Error::SessionCommandAlreadyStarted);
}
}
return Err(Error::SessionCommandNotStarted);
}
let start = self
.entries
.remove(&finish.session_id)
.expect("already tested if exists");
let entry = Entry::from_messages(start, finish);
.state
fn command_finished(&mut self, finish: &CommandFinished) -> Result<RunState, Error> {
if self.state.disabled_session.contains(&finish.session_id) {
return Err(Error::DisabledSession(finish.session_id));
}
if !self.state.entries.contains_key(&finish.session_id) {
Ok(RunState::Continue)
if self.state.disabled_session.contains(&start.session_id) {
return Err(Error::DisabledSession(start.session_id));
}
self.state.entries.insert(start.session_id, start);
fn command_start(&mut self, start: CommandStart) -> Result<RunState, Error> {
if self.state.entries.contains_key(&start.session_id) {
fn start_processor(data_receiver: Receiver<Vec<u8>>) -> Result<(), Error> {
loop {
let buffer = match data_receiver.recv() {
Ok(b) => b,
Err(err) => {
warn!("{}", err);
continue;
}
};
let message = bincode::deserialize(&buffer).map_err(Error::DeserializeMessage)?;
}
Ok(())
}
let mut buffer = [0_u8; BUFFER_SIZE];
fn receive(socket: &UnixDatagram, data_sender: &Sender<Vec<u8>>) -> Result<(), Error> {
Message::CommandFinished(data) => self.command_finished(&data),
Message::Stop => Ok(RunState::Stop),
let (written, _) = socket
.recv_from(&mut buffer)
.map_err(Error::ReceiveFromSocket)?;
data_sender
.send(buffer[0..written].to_vec())
.map_err(Error::SendBuffer)?;
fn ctrl_c_watcher(signal_sender: Sender<RunState>) -> Result<(), Error> {
ctrlc::set_handler(move || {
if let Err(err) = signal_sender.send(RunState::Stop) {
warn!("{}", err)
}
})
.map_err(Error::SetupCtrlHandler)?;
Ok(())
}
fn process(&mut self, message: Message) -> Result<RunState, Error> {
std::fs::remove_file(&socket_path).map_err(Error::RemoveSocket)?;
if let Err(err) = Self::receive(&socket, &data_sender) {
warn!("{}", err)
if stopping.load(Ordering::SeqCst) {
break;
}
let socket_path_parent = socket_path.parent().ok_or(Error::NoSocketPathParent)?;
std::fs::create_dir_all(socket_path_parent).map_err(Error::CreateSocketPathParent)?;
info!("starting server listening on path {:?}", socket_path);
let socket = UnixDatagram::bind(&socket_path).map_err(Error::BindSocket)?;
pub fn start(mut self, socket_path: &PathBuf) -> Result<Self, Error> {
fn from_cachefile(cache_path: PathBuf, data_dir: PathBuf) -> Result<Server, Error> {
let reader = std::io::BufReader::new(file);
let store = store::new(data_dir);
Ok(Server {
store,
cache_path,
})
}
state,
let state = serde_json::from_reader(reader).map_err(Error::DeserializeState)?;
let file = std::fs::File::open(&cache_path).map_err(Error::OpenCacheFile)?;
}
}
pub fn new(cache_path: PathBuf, data_dir: PathBuf) -> Result<Server, Error> {
if cache_path.exists() {
from_cachefile(cache_path, data_dir)
} else {
Ok(Server {
store: store::new(data_dir),
cache_path,
})
state: State::default(),
#[derive(Debug, Default, Serialize, Deserialize)]
struct State {
entries: HashMap<Uuid, CommandStart>,
disabled_session: HashSet<Uuid>,
}
#[error("can not add to storeo: {0}")]
AddStore(crate::store::Error),
}
match self {
}
}
}
#[derive(Debug)]
pub struct Server {
store: Store,
cache_path: PathBuf,
}
state: State,
Self::Stop => true,
Self::Continue => false,
#[derive(Debug)]
enum RunState {
Stop,
Continue,
}
impl RunState {
const fn is_stop(&self) -> bool {
#[error("can not open cache file: {0}")]
OpenCacheFile(std::io::Error),
#[error("can not deserialize cache entries: {0}")]
#[error("can not bind to socket: {0}")]
BindSocket(std::io::Error),
#[error("can not remove socket: {0}")]
RemoveSocket(std::io::Error),
#[error("can not create cache file: {0}")]
CreateCacheFile(std::io::Error),
#[error("can not serialize cache entries: {0}")]
#[error("can not receive message from socket: {0}")]
ReceiveFromSocket(std::io::Error),
#[error("no parent directory for socket path")]
NoSocketPathParent,
#[error("can not create socket parent directory: {0}")]
CreateSocketPathParent(std::io::Error),
#[error("not recording because session {0} is disabled")]
DisabledSession(Uuid),
#[error("can not send received data to processing: {0}")]
SendBuffer(flume::SendError<Vec<u8>>),
#[error("can not setup ctrlc handler: {0}")]
SetupCtrlHandler(ctrlc::Error),
SerializeState(serde_json::Error),
DeserializeState(serde_json::Error),
use uuid::Uuid;
use crate::{
entry::Entry,
message::{
CommandFinished,
CommandStart,
Message,
},
store,
use std::{
os::unix::net::UnixDatagram,
path::PathBuf,
sync::{
atomic::{
AtomicBool,
Ordering,
},
Arc,
},
thread,
};
collections::{
HashMap,
HashSet,
},
store::Store,
};
use flume::{
Receiver,
Sender,
};
use log::{
info,
warn,
};
use serde::{
Deserialize,
Serialize,
};
use crate::message::CommandStart;
use serde::Serialize;
use std::path::Path;
use thiserror::Error;
use uuid::Uuid;
#[derive(Error, Debug)]
pub enum Error {
#[error("can not open entries database: {0}")]
OpenEntriesDatabase(sled::Error),
#[error("can not open disabled_sessions database: {0}")]
OpenDisabledSessionsDatabase(sled::Error),
#[error("can not serialize data: {0}")]
SerializeData(bincode::Error),
#[error("can not deserialize entry: {0}")]
DeserializeEntry(bincode::Error),
#[error("{0}")]
Sled(#[from] sled::Error),
#[error("entry does not exist in db")]
EntryNotExist,
}
pub fn new(path: impl AsRef<Path>) -> Result<Db, Error> {
let entries = sled::open(path.as_ref().join("entries")).map_err(Error::OpenEntriesDatabase)?;
let disabled_sessions = sled::open(path.as_ref().join("disabled_sessions"))
.map_err(Error::OpenDisabledSessionsDatabase)?;
Ok(Db {
entries,
disabled_sessions,
})
}
pub struct Db {
entries: sled::Db,
disabled_sessions: sled::Db,
}
impl Db {
pub fn contains_entry(&self, uuid: &Uuid) -> Result<bool, Error> {
let key = Self::serialize(uuid)?;
let contains = self.entries.contains_key(key)?;
Ok(contains)
}
pub fn is_session_disabled(&self, uuid: &Uuid) -> Result<bool, Error> {
let key = Self::serialize(uuid)?;
let contains = self.disabled_sessions.contains_key(key)?;
Ok(contains)
}
pub fn add_entry(&self, entry: &CommandStart) -> Result<(), Error> {
let key = Self::serialize(&entry.session_id)?;
let value = Self::serialize(&entry)?;
self.entries.insert(key, value)?;
Ok(())
}
pub fn remove_entry(&self, uuid: &Uuid) -> Result<CommandStart, Error> {
let key = Self::serialize(uuid)?;
let data = self.entries.remove(key)?.ok_or(Error::EntryNotExist)?;
let entry = Self::deserialize_entry(&data)?;
Ok(entry)
}
pub fn disable_session(&self, uuid: &Uuid) -> Result<(), Error> {
let key = Self::serialize(uuid)?;
let value = Self::serialize(true)?;
self.disabled_sessions.insert(key, value)?;
self.remove_entry(&uuid)?;
Ok(())
}
pub fn enable_session(&self, uuid: &Uuid) -> Result<(), Error> {
let key = Self::serialize(uuid)?;
self.disabled_sessions.remove(&key)?;
Ok(())
}
fn serialize(data: impl Serialize) -> Result<Vec<u8>, Error> {
let bytes = bincode::serialize(&data).map_err(Error::SerializeData)?;
Ok(bytes)
}
fn deserialize_entry(data: &sled::IVec) -> Result<CommandStart, Error> {
let entry = bincode::deserialize(&data).map_err(Error::DeserializeEntry)?;
Ok(entry)
}
}
pub use server::{
builder,
Error as ServerError,
Server,
pub use Error as ServerError;
use crate::{
client,
entry::Entry,
message::{
CommandFinished,
CommandStart,
Message,
},
store::Store,
};
use crossbeam_utils::sync::WaitGroup;
use db::Db;
use flume::{
Receiver,
Sender,
};
use log::warn;
use std::{
os::unix::net::UnixDatagram,
path::{
Path,
PathBuf,
},
sync::{
atomic::{
AtomicBool,
Ordering,
},
Arc,
},
thread,
use thiserror::Error;
use uuid::Uuid;
const BUFFER_SIZE: usize = 65_527;
#[derive(Error, Debug)]
pub enum Error {
#[error("can not receive message from socket: {0}")]
ReceiveFromSocket(std::io::Error),
#[error("can not send received data to processing: {0}")]
SendBuffer(flume::SendError<Vec<u8>>),
#[error("can not deserialize message: {0}")]
DeserializeMessage(bincode::Error),
#[error("can not receive data from channel: {0}")]
ReceiveData(flume::RecvError),
#[error("can not remove socket: {0}")]
RemoveSocket(std::io::Error),
#[error("can not setup ctrlc handler: {0}")]
SetupCtrlHandler(ctrlc::Error),
#[error("command for session already started")]
SessionCommandAlreadyStarted,
#[error("command for session not started yet")]
SessionCommandNotStarted,
#[error("can not check if key exists in db: {0}")]
CheckContainsEntry(db::Error),
#[error("can not check if session is disabled in db: {0}")]
CheckDisabledSession(db::Error),
#[error("not recording because session {0} is disabled")]
DisabledSession(Uuid),
#[error("can not add entry to db: {0}")]
AddDbEntry(db::Error),
#[error("can not remove entry from db: {0}")]
RemoveDbEntry(db::Error),
#[error("can not add to storeo: {0}")]
AddStore(crate::store::Error),
#[error("db error: {0}")]
Db(#[from] db::Error),
}
pub struct Server {
pub(super) db: Db,
pub(super) socket: UnixDatagram,
pub(super) socket_path: PathBuf,
pub(super) store: Store,
pub(super) stopping: Arc<AtomicBool>,
pub(super) wait_group: WaitGroup,
}
pub fn builder(cache_dir: PathBuf, data_dir: PathBuf, socket: PathBuf) -> Builder {
Builder {
cache_dir,
data_dir,
socket,
}
}
impl Server {
pub fn run(self) -> Result<(), Error> {
let data_sender = Self::start_processor(
Arc::clone(&self.stopping),
self.wait_group.clone(),
self.db,
self.store,
self.socket_path.clone(),
)?;
Self::start_receiver(
Arc::clone(&self.stopping),
self.wait_group.clone(),
self.socket,
data_sender,
)?;
Self::ctrl_c_watcher(self.stopping, self.socket_path.clone())?;
self.wait_group.wait();
std::fs::remove_file(&self.socket_path).map_err(Error::RemoveSocket)?;
Ok(())
}
fn ctrl_c_watcher(stopping: Arc<AtomicBool>, socket_path: PathBuf) -> Result<(), Error> {
ctrlc::set_handler(move || {
stopping.store(true, Ordering::SeqCst);
let client = client::new(socket_path.clone());
if let Err(err) = client.send(&Message::Stop) {
warn!("{}", err);
}
})
.map_err(Error::SetupCtrlHandler)?;
Ok(())
}
fn start_receiver(
stopping: Arc<AtomicBool>,
wait_group: WaitGroup,
socket: UnixDatagram,
data_sender: Sender<Vec<u8>>,
) -> Result<(), Error> {
thread::spawn(move || {
loop {
if stopping.load(Ordering::SeqCst) {
break;
}
if let Err(err) = Self::receive(&socket, &data_sender) {
warn!("{}", err)
}
}
drop(wait_group)
});
Ok(())
}
fn receive(socket: &UnixDatagram, data_sender: &Sender<Vec<u8>>) -> Result<(), Error> {
let mut buffer = [0_u8; BUFFER_SIZE];
let (written, _) = socket
.recv_from(&mut buffer)
.map_err(Error::ReceiveFromSocket)?;
data_sender
.send(buffer[0..written].to_vec())
.map_err(Error::SendBuffer)?;
Ok(())
}
fn start_processor(
stopping: Arc<AtomicBool>,
wait_group: WaitGroup,
db: Db,
store: Store,
socket_path: PathBuf,
) -> Result<Sender<Vec<u8>>, Error> {
let (data_sender, data_receiver) = flume::unbounded();
thread::spawn(move || {
loop {
if stopping.load(Ordering::SeqCst) {
break;
}
if let Err(err) =
Self::process(&stopping, &data_receiver, &db, &store, &socket_path)
{
warn!("{}", err)
}
}
drop(wait_group)
});
Ok(data_sender)
}
fn process(
stopping: &Arc<AtomicBool>,
data_receiver: &Receiver<Vec<u8>>,
db: &Db,
store: &Store,
socket_path: impl AsRef<Path>,
) -> Result<(), Error> {
let data = data_receiver.recv().map_err(Error::ReceiveData)?;
let message = bincode::deserialize(&data).map_err(Error::DeserializeMessage)?;
dbg!(&message);
match message {
Message::Stop => {
stopping.store(true, Ordering::SeqCst);
let client = client::new(socket_path.as_ref().to_path_buf());
if let Err(err) = client.send(&Message::Stop) {
warn!("{}", err);
}
Ok(())
}
Message::CommandStart(data) => Self::command_start(db, &data),
Message::CommandFinished(data) => Self::command_finished(db, store, &data),
Message::Disable(uuid) => Self::disable_session(db, &uuid),
Message::Enable(uuid) => Self::enable_session(db, &uuid),
}
}
fn command_start(db: &Db, data: &CommandStart) -> Result<(), Error> {
if db
.contains_entry(&data.session_id)
.map_err(Error::CheckContainsEntry)?
{
return Err(Error::SessionCommandAlreadyStarted);
}
if db
.is_session_disabled(&data.session_id)
.map_err(Error::CheckDisabledSession)?
{
return Err(Error::DisabledSession(data.session_id));
}
db.add_entry(data).map_err(Error::AddDbEntry)?;
Ok(())
}
fn command_finished(db: &Db, store: &Store, data: &CommandFinished) -> Result<(), Error> {
if db
.is_session_disabled(&data.session_id)
.map_err(Error::CheckDisabledSession)?
{
return Err(Error::DisabledSession(data.session_id));
}
if !db
.contains_entry(&data.session_id)
.map_err(Error::CheckContainsEntry)?
{
return Err(Error::SessionCommandNotStarted);
}
let start = db
.remove_entry(&data.session_id)
.map_err(Error::RemoveDbEntry)?;
let entry = Entry::from_messages(start, data);
store.add(&entry).map_err(Error::AddStore)?;
Ok(())
}
fn disable_session(db: &Db, uuid: &Uuid) -> Result<(), Error> {
db.disable_session(uuid)?;
Ok(())
}
fn enable_session(db: &Db, uuid: &Uuid) -> Result<(), Error> {
db.enable_session(uuid)?;
Ok(())
}
}