Forwarding stdout/stderr
Dependencies
- [2]
KOWYPLMXNix and config.toml - [3]
HX4TXY2DFixed-output derivations enable the network - [4]
SI454P2VDocumentation and cleanup - [5]
UWQB743KFirst working shell (with ocaml code) - [6]
ODUDDQRYAdding the OCaml interface - [7]
6MGFBMONDebug and cleanup
Change contents
- edit in src/main.rs at line 8
use tokio_stream::wrappers::ReceiverStream; - replacement in src/main.rs at line 21
tokio::sync::oneshot::Sender<Result<PathBuf, String>>,tokio::sync::mpsc::Sender<crate::container::Msg>, - edit in src/main.rs at line 24
use std::pin::Pin;type ResponseStream =Pin<Box<dyn tokio_stream::Stream<Item = Result<proto::DerivationReply, tonic::Status>> + Send>>; - edit in src/main.rs at line 92
type DerivationStream = ResponseStream; - replacement in src/main.rs at line 98
) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {) -> Result<tonic::Response<ResponseStream>, tonic::Status> { - replacement in src/main.rs at line 101
let (tx, rx) = tokio::sync::oneshot::channel();let (tx, rx) = tokio::sync::mpsc::channel(200); - replacement in src/main.rs at line 114
Ok(tonic::Response::new(match rx.await.unwrap() {Ok(out) => proto::DerivationReply {result: Some(proto::derivation_reply::Result::Ok(proto::DerivationResult {destdir: vec![out.to_str().unwrap().to_string()],paths: Vec::new(),path_patterns: Vec::new(),},)),},Err(e) => proto::DerivationReply {result: Some(proto::derivation_reply::Result::Error(e)),},}))use crate::container::Msg;let output_stream = ReceiverStream::new(rx).map(|x| {Ok(match x {Msg::Ok(out) => proto::DerivationReply {result: Some(proto::derivation_reply::Result::Ok(proto::DerivationResult {destdir: vec![out.to_str().unwrap().to_string()],paths: Vec::new(),path_patterns: Vec::new(),},)),},Msg::Error(p) => proto::DerivationReply {result: Some(proto::derivation_reply::Result::Error(p)),},Msg::Stdout(p) => proto::DerivationReply {result: Some(proto::derivation_reply::Result::Stdout(p)),},Msg::Stderr(p) => proto::DerivationReply {result: Some(proto::derivation_reply::Result::Stderr(p)),},})});Ok(tonic::Response::new(Box::pin(output_stream))) - replacement in src/main.rs at line 243
tokio::sync::oneshot::Sender<Result<PathBuf, String>>,tokio::sync::mpsc::Sender<crate::container::Msg>, - replacement in src/lib.rs at line 102
in_release: Arc<Mutex<Option<Arc<InRelease>>>>,in_release: Arc<Mutex<HashMap<std::borrow::Cow<'static, str>, Arc<InRelease>>>>, - edit in src/container.rs at line 5
use std::os::fd::{AsRawFd, RawFd}; - edit in src/container.rs at line 9
use tokio::io::AsyncReadExt; - edit in src/container.rs at line 31
}/// The type of messages received from a contained process.#[derive(bincode::Encode, bincode::Decode, Debug)]pub enum Msg {/// Success, with the result path.Ok(PathBuf),/// Error.Error(String),/// One chunk of stdout.Stdout(Vec<u8>),/// One chunk of stderr.Stderr(Vec<u8>), - replacement in src/container.rs at line 51
tokio::sync::oneshot::Sender<Result<PathBuf, String>>,tokio::sync::mpsc::Sender<Msg>, - replacement in src/container.rs at line 81
let ((id, resp), _) = bincode::decode_from_slice::<(u64, ProcessResult), _>(&msg, bincode::config::standard()).unwrap();let chan = pending.remove(&id).unwrap();chan.send(resp).unwrap_or(());let ((id, resp), _) = bincode::decode_from_slice::<(u64, Msg), _>(&msg, bincode::config::standard()).unwrap();match resp {Msg::Ok(_) | Msg::Error(_) => {let chan = pending.remove(&id).unwrap();chan.send(resp).await.unwrap_or(());}_ => {let chan = pending.get(&id).unwrap();chan.send(resp).await.unwrap_or(());}} - edit in src/container.rs at line 99
type ProcessResult = Result<PathBuf, String>; - edit in src/container.rs at line 151
let (writer_out, mut reader_out) = tokio::net::unix::pipe::pipe().unwrap();let (writer_err, mut reader_err) = tokio::net::unix::pipe::pipe().unwrap(); - replacement in src/container.rs at line 154
let result: ProcessResult =run_in_container(&user, &store, rec_msg).map_err(|e| format!("{:?}", e));debug!("result {:?}", result);let v = bincode::encode_to_vec(&(id, result), bincode::config::standard()).unwrap();let task = tokio::task::spawn_blocking(move || {run_in_container(&user,&store,rec_msg,writer_out.as_raw_fd(),writer_err.as_raw_fd(),).map_err(|e| format!("{:?}", e))});tokio::pin!(task);let mut result = None;let mut stdout_closed = false;let mut stderr_closed = false;let mut out_buf = [0; 4096];let mut err_buf = [0; 4096];while result.is_none() || !stdout_closed || !stderr_closed {tokio::select! {r = &mut task, if result.is_none() => {result = Some(match r {Ok(Ok(r)) => Msg::Ok(r),Ok(Err(r)) => Msg::Error(r),Err(e) => Msg::Error(format!("{:?}", e)),});}r = reader_out.read(&mut out_buf), if !stdout_closed => {if let Ok(result) = r {if result == 0 {stdout_closed = true} else {println!("stdout: {:?}", std::str::from_utf8(&out_buf[..result]));let v = bincode::encode_to_vec(&(id, Msg::Stdout((&out_buf[..result]).to_vec())), bincode::config::standard()).unwrap();let mut bytes = bytes::BytesMut::new();debug!("drv_process replying {:?}", v.len());bytes.extend_from_slice(&v);writer.lock().await.send(bytes.into()).await.unwrap();}}}r = reader_err.read(&mut err_buf), if !stderr_closed => {if let Ok(result) = r {if result == 0 {stderr_closed = true} else {println!("stderr: {:?}", std::str::from_utf8(&err_buf[..result]));let v = bincode::encode_to_vec(&(id, Msg::Stderr((&err_buf[..result]).to_vec())), bincode::config::standard()).unwrap();let mut bytes = bytes::BytesMut::new();debug!("drv_process replying {:?}", v.len());bytes.extend_from_slice(&v);writer.lock().await.send(bytes.into()).await.unwrap();}}}}}let v = bincode::encode_to_vec(&(id, result.unwrap()), bincode::config::standard()).unwrap(); - replacement in src/container.rs at line 215
writer.lock().await.send(bytes.into()).await.unwrap()writer.lock().await.send(bytes.into()).await.unwrap(); - replacement in src/container.rs at line 222
fn run_in_container(user: &str, store: &Path, r: BuildRequest) -> Result<PathBuf, Error> {fn run_in_container(user: &str,store: &Path,r: BuildRequest,stdout: RawFd,stderr: RawFd,) -> Result<PathBuf, Error> { - replacement in src/container.rs at line 301
inner_process(user, &r, &tmp_dir, &dest, &store, &tmp_store, &name)inner_process(user, &r, &tmp_dir, &dest, &store, &tmp_store, &name, stdout, stderr,) - edit in src/container.rs at line 465
stdout: RawFd,stderr: RawFd, - edit in src/container.rs at line 568
unsafe {libc::dup2(stdout, 1);libc::dup2(stderr, 2);} - edit in elpe/lib/elpegrpc.proto at line 50
bytes stdout = 3;bytes stderr = 4; - replacement in elpe/lib/elpegrpc.proto at line 73
rpc Derivation (DerivationRequest) returns (DerivationReply);rpc Derivation (DerivationRequest) returns (stream DerivationReply); - replacement in elpe/lib/elpe.ml at line 71
Client.call ~service:"elpe.Elpe" ~rpc:"Derivation"let result = ref None inlet* _ = Client.call ~service:"elpe.Elpe" ~rpc:"Derivation" - replacement in elpe/lib/elpe.ml at line 77
(Client.Rpc.unary enc ~f:(fun decoder ->let+ decoder = decoder inmatch decoder with| Some decoder -> (Reader.create decoder |> decode |> function| Ok v -> v(Client.Rpc.server_streaming enc ~f:(fun responses ->Lwt_stream.iter_s(fun str ->Reader.create str |> decode |> function| Ok (`Ok path) -> (result := Some(`Ok path); Lwt.return ())| Ok (`Error path) -> (result := Some(`Error path); Lwt.return ())| Ok (`Stdout buf) -> (let buf = Bytes.unsafe_to_string buf inPrintf.printf "%s" buf; Lwt.return ())| Ok (`Stderr buf) -> (let buf = Bytes.unsafe_to_string buf inPrintf.eprintf "%s" buf; Lwt.return ()) - replacement in elpe/lib/elpe.ml at line 92
(Result.show_error e)))| None -> Elpe.Derivation.Response.make ()))()(Result.show_error e))| _ -> Lwt.return ())responses))()inmatch !result withSome v -> Lwt.return v| None -> assert false - replacement in elpe/lib/elpe.ml at line 355
let* res =let* r = - replacement in elpe/lib/elpe.ml at line 359
let res, _ = Result.get_ok res inmatch res withmatch r with - edit in Cargo.toml at line 51
tokio-stream = "0.1.17" - edit in Cargo.nix at line 1445
}{name = "tokio-stream";packageId = "tokio-stream"; - replacement in Cargo.nix at line 6519
resolvedDefaultFeatures = [ "net" ];resolvedDefaultFeatures = [ "default" "net" "time" ]; - edit in Cargo.lock at line 477
"tokio-stream",