KJK5LRQDGEQLVSS2SIOINZNNH6T4NKAJB24QWWSFLI3KZLLIDYPAC
while let Some(line) = lines.next_line().await.unwrap() {}
while let Some(line) = lines.next_line().await.unwrap() {
if let Ok(mut message) = serde_json::from_str::<Message>(&line) {
if message.from.is_none() {
message.from = Some(uuid);
}
if let Some(other) = ACTORS.lock().await.get(&message.to) {
let mut other = other.lock().await;
let message = serde_json::to_string(&message).unwrap();
if let Err(e) = other.stdin.write_all(message.as_bytes()).await {
eprintln!("{e}");
}
}
} else if let Ok(message) = serde_json::from_str::<SpawnActor>(&line) {
let mut spawn_requests = SPAWN_REQUESTS.lock().await;
spawn_requests.push((uuid, message));
} else {
eprintln!("Unknown message type: {line}");
}
}
let message = serde_json::from_str::<LogMessage>(&line).unwrap();
eprintln!(
"Child {}: {} [{}]",
uuid,
message.message,
message
.tags
.into_iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join(", ")
);
if let Ok(message) = serde_json::from_str::<LogMessage>(&line) {
eprintln!(
"Child {}: {} [{}]",
uuid,
message.message,
message
.tags
.into_iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join(", ")
);
} else {
eprintln!("Child {}: {}", uuid, line);
}
}
if let Some(mut stdin) = child.stdin.take() {
let init_message = Message {
to: uuid,
from: None,
payload: serde_json::json!({
"id": uuid,
}),
};
let msg = serde_json::to_string(&init_message)?;
stdin
.write_all(msg.as_bytes())
.await
.expect("Failed to write to child stdin");
let mut actors = ACTORS.lock().await;
actors.insert(uuid, Arc::new(Mutex::new(Actor::new(stdin))));
_ = spawn_actor(executable).await?;
spawn(async {
let mut spawn_requests = SPAWN_REQUESTS.lock().await;
loop {
if let Some((uuid, request)) = spawn_requests.pop() {
let new_uid = spawn_actor(request.executable, false).await.unwrap();
let message = Message {
from: None,
to: uuid,
payload: serde_json::json!({
"type": "spawned",
"status": "ok",
"id": new_uid,
}),
};
let actors = ACTORS.lock().await;
let mut actor = actors.get(&uuid).unwrap().lock().await;
actor
.stdin
.write_all(serde_json::to_string(&message).unwrap().as_bytes())
.await
.unwrap();
}
}
});
use std::io::{self, BufRead};
use indexmap::IndexMap;
use tokio::io::AsyncWriteExt;
use uaf::{LogMessage, Message};
fn log(level: &str, message: &str) {
let log_message = LogMessage {
level: level.to_string(),
message: message.to_string(),
tags: IndexMap::new(),
};
let serialized = serde_json::to_string(&log_message).unwrap();
eprintln!("{}", serialized);
}
fn send_message(message: &Message) {
let serialized = serde_json::to_string(&message).unwrap();
println!("{}", serialized);
}
#[tokio::main]
async fn main() {
let stdin = io::stdin();
let mut reader = stdin.lock();
let mut line = String::new();
let mut count = 0;
while let Ok(_) = reader.read_line(&mut line) {
count += 1;
let message: Message = serde_json::from_str(&line).expect("Failed to parse message");
// Handle incoming data message
log("info", &format!("Received message({count}): {message:?}"));
// Example of sending a message back
let response = Message {
from: None,
to: message.to,
payload: serde_json::json!({"status": "ok", "count": count}),
};
send_message(&response);
}
log("info", "Actor shutting down");
tokio::io::stdout().flush().await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}