pijul nest
guest [sign in]

Forwarding stdout/stderr

pmeunier
Jun 15, 2025, 8:41 AM
BQ4E3XLAVS2AQAUYRGN6SZ4J35KQ2KIH7BBT2JNZXSPKKCGX7DYAC

Dependencies

  • [2] KOWYPLMX Nix and config.toml
  • [3] HX4TXY2D Fixed-output derivations enable the network
  • [4] SI454P2V Documentation and cleanup
  • [5] UWQB743K First working shell (with ocaml code)
  • [6] ODUDDQRY Adding the OCaml interface
  • [7] 6MGFBMON Debug and cleanup

Change contents

  • edit in src/main.rs at line 8
    [4.1587]
    [4.1587]
    use tokio_stream::wrappers::ReceiverStream;
  • replacement in src/main.rs at line 21
    [4.1752][4.1752:1815]()
    tokio::sync::oneshot::Sender<Result<PathBuf, String>>,
    [4.1752]
    [4.1815]
    tokio::sync::mpsc::Sender<crate::container::Msg>,
  • edit in src/main.rs at line 24
    [4.1696]
    [4.1711]
    use std::pin::Pin;
    type ResponseStream =
    Pin<Box<dyn tokio_stream::Stream<Item = Result<proto::DerivationReply, tonic::Status>> + Send>>;
  • edit in src/main.rs at line 92
    [4.4356]
    [4.4356]
    type DerivationStream = ResponseStream;
  • replacement in src/main.rs at line 98
    [4.4456][4.4456:4530]()
    ) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
    [4.4456]
    [4.4530]
    ) -> Result<tonic::Response<ResponseStream>, tonic::Status> {
  • replacement in src/main.rs at line 101
    [4.4606][4.4606:4662]()
    let (tx, rx) = tokio::sync::oneshot::channel();
    [4.4606]
    [4.4662]
    let (tx, rx) = tokio::sync::mpsc::channel(200);
  • replacement in src/main.rs at line 114
    [4.4975][4.4975:5566]()
    Ok(tonic::Response::new(match rx.await.unwrap() {
    Ok(out) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Ok(
    proto::DerivationResult {
    destdir: vec![out.to_str().unwrap().to_string()],
    paths: Vec::new(),
    path_patterns: Vec::new(),
    },
    )),
    },
    Err(e) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Error(e)),
    },
    }))
    [4.4975]
    [4.2198]
    use crate::container::Msg;
    let output_stream = ReceiverStream::new(rx).map(|x| {
    Ok(match x {
    Msg::Ok(out) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Ok(
    proto::DerivationResult {
    destdir: vec![out.to_str().unwrap().to_string()],
    paths: Vec::new(),
    path_patterns: Vec::new(),
    },
    )),
    },
    Msg::Error(p) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Error(p)),
    },
    Msg::Stdout(p) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Stdout(p)),
    },
    Msg::Stderr(p) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Stderr(p)),
    },
    })
    });
    Ok(tonic::Response::new(Box::pin(output_stream)))
  • replacement in src/main.rs at line 243
    [4.7219][4.7219:7286]()
    tokio::sync::oneshot::Sender<Result<PathBuf, String>>,
    [4.7219]
    [4.7286]
    tokio::sync::mpsc::Sender<crate::container::Msg>,
  • replacement in src/lib.rs at line 102
    [4.10235][4.10235:10287]()
    in_release: Arc<Mutex<Option<Arc<InRelease>>>>,
    [4.10235]
    [4.10287]
    in_release: Arc<Mutex<HashMap<std::borrow::Cow<'static, str>, Arc<InRelease>>>>,
  • edit in src/container.rs at line 5
    [4.44835]
    [4.44835]
    use std::os::fd::{AsRawFd, RawFd};
  • edit in src/container.rs at line 9
    [4.44926]
    [4.44926]
    use tokio::io::AsyncReadExt;
  • edit in src/container.rs at line 31
    [4.45327]
    [4.45327]
    }
    /// The type of messages received from a contained process.
    #[derive(bincode::Encode, bincode::Decode, Debug)]
    pub enum Msg {
    /// Success, with the result path.
    Ok(PathBuf),
    /// Error.
    Error(String),
    /// One chunk of stdout.
    Stdout(Vec<u8>),
    /// One chunk of stderr.
    Stderr(Vec<u8>),
  • replacement in src/container.rs at line 51
    [4.45526][4.45526:45589]()
    tokio::sync::oneshot::Sender<Result<PathBuf, String>>,
    [4.45526]
    [4.45589]
    tokio::sync::mpsc::Sender<Msg>,
  • replacement in src/container.rs at line 81
    [4.46770][4.46770:47023]()
    let ((id, resp), _) = bincode::decode_from_slice::<(u64, ProcessResult), _>(&msg, bincode::config::standard()).unwrap();
    let chan = pending.remove(&id).unwrap();
    chan.send(resp).unwrap_or(());
    [4.46770]
    [4.47023]
    let ((id, resp), _) = bincode::decode_from_slice::<(u64, Msg), _>(&msg, bincode::config::standard()).unwrap();
    match resp {
    Msg::Ok(_) | Msg::Error(_) => {
    let chan = pending.remove(&id).unwrap();
    chan.send(resp).await.unwrap_or(());
    }
    _ => {
    let chan = pending.get(&id).unwrap();
    chan.send(resp).await.unwrap_or(());
    }
    }
  • edit in src/container.rs at line 99
    [4.47143][4.47143:47190]()
    type ProcessResult = Result<PathBuf, String>;
  • edit in src/container.rs at line 151
    [4.49250]
    [4.49250]
    let (writer_out, mut reader_out) = tokio::net::unix::pipe::pipe().unwrap();
    let (writer_err, mut reader_err) = tokio::net::unix::pipe::pipe().unwrap();
  • replacement in src/container.rs at line 154
    [4.49288][4.49288:49574]()
    let result: ProcessResult =
    run_in_container(&user, &store, rec_msg).map_err(|e| format!("{:?}", e));
    debug!("result {:?}", result);
    let v = bincode::encode_to_vec(&(id, result), bincode::config::standard()).unwrap();
    [4.49288]
    [4.49574]
    let task = tokio::task::spawn_blocking(move || {
    run_in_container(
    &user,
    &store,
    rec_msg,
    writer_out.as_raw_fd(),
    writer_err.as_raw_fd(),
    )
    .map_err(|e| format!("{:?}", e))
    });
    tokio::pin!(task);
    let mut result = None;
    let mut stdout_closed = false;
    let mut stderr_closed = false;
    let mut out_buf = [0; 4096];
    let mut err_buf = [0; 4096];
    while result.is_none() || !stdout_closed || !stderr_closed {
    tokio::select! {
    r = &mut task, if result.is_none() => {
    result = Some(match r {
    Ok(Ok(r)) => Msg::Ok(r),
    Ok(Err(r)) => Msg::Error(r),
    Err(e) => Msg::Error(format!("{:?}", e)),
    });
    }
    r = reader_out.read(&mut out_buf), if !stdout_closed => {
    if let Ok(result) = r {
    if result == 0 {
    stdout_closed = true
    } else {
    println!("stdout: {:?}", std::str::from_utf8(&out_buf[..result]));
    let v = bincode::encode_to_vec(&(id, Msg::Stdout((&out_buf[..result]).to_vec())), bincode::config::standard()).unwrap();
    let mut bytes = bytes::BytesMut::new();
    debug!("drv_process replying {:?}", v.len());
    bytes.extend_from_slice(&v);
    writer.lock().await.send(bytes.into()).await.unwrap();
    }
    }
    }
    r = reader_err.read(&mut err_buf), if !stderr_closed => {
    if let Ok(result) = r {
    if result == 0 {
    stderr_closed = true
    } else {
    println!("stderr: {:?}", std::str::from_utf8(&err_buf[..result]));
    let v = bincode::encode_to_vec(&(id, Msg::Stderr((&err_buf[..result]).to_vec())), bincode::config::standard()).unwrap();
    let mut bytes = bytes::BytesMut::new();
    debug!("drv_process replying {:?}", v.len());
    bytes.extend_from_slice(&v);
    writer.lock().await.send(bytes.into()).await.unwrap();
    }
    }
    }
    }
    }
    let v = bincode::encode_to_vec(&(id, result.unwrap()), bincode::config::standard())
    .unwrap();
  • replacement in src/container.rs at line 215
    [4.49737][4.49737:49807]()
    writer.lock().await.send(bytes.into()).await.unwrap()
    [4.49737]
    [4.49807]
    writer.lock().await.send(bytes.into()).await.unwrap();
  • replacement in src/container.rs at line 222
    [4.49875][4.49875:49966]()
    fn run_in_container(user: &str, store: &Path, r: BuildRequest) -> Result<PathBuf, Error> {
    [4.49875]
    [4.49966]
    fn run_in_container(
    user: &str,
    store: &Path,
    r: BuildRequest,
    stdout: RawFd,
    stderr: RawFd,
    ) -> Result<PathBuf, Error> {
  • replacement in src/container.rs at line 301
    [4.52093][4.9347:9427]()
    inner_process(user, &r, &tmp_dir, &dest, &store, &tmp_store, &name)
    [4.52093]
    [4.52204]
    inner_process(
    user, &r, &tmp_dir, &dest, &store, &tmp_store, &name, stdout, stderr,
    )
  • edit in src/container.rs at line 465
    [4.57164]
    [4.57164]
    stdout: RawFd,
    stderr: RawFd,
  • edit in src/container.rs at line 568
    [4.60960]
    [4.60960]
    unsafe {
    libc::dup2(stdout, 1);
    libc::dup2(stderr, 2);
    }
  • edit in elpe/lib/elpegrpc.proto at line 50
    [4.63852]
    [4.63852]
    bytes stdout = 3;
    bytes stderr = 4;
  • replacement in elpe/lib/elpegrpc.proto at line 73
    [4.55921][4.55921:55985]()
    rpc Derivation (DerivationRequest) returns (DerivationReply);
    [4.55921]
    [4.63895]
    rpc Derivation (DerivationRequest) returns (stream DerivationReply);
  • replacement in elpe/lib/elpe.ml at line 71
    [4.64918][4.64918:64971]()
    Client.call ~service:"elpe.Elpe" ~rpc:"Derivation"
    [4.64918]
    [4.58003]
    let result = ref None in
    let* _ = Client.call ~service:"elpe.Elpe" ~rpc:"Derivation"
  • replacement in elpe/lib/elpe.ml at line 77
    [4.58134][4.58134:58365]()
    (Client.Rpc.unary enc ~f:(fun decoder ->
    let+ decoder = decoder in
    match decoder with
    | Some decoder -> (
    Reader.create decoder |> decode |> function
    | Ok v -> v
    [4.58134]
    [4.58365]
    (Client.Rpc.server_streaming enc ~f:(fun responses ->
    Lwt_stream.iter_s
    (fun str ->
    Reader.create str |> decode |> function
    | Ok (`Ok path) -> (result := Some(`Ok path); Lwt.return ())
    | Ok (`Error path) -> (result := Some(`Error path); Lwt.return ())
    | Ok (`Stdout buf) -> (
    let buf = Bytes.unsafe_to_string buf in
    Printf.printf "%s" buf; Lwt.return ())
    | Ok (`Stderr buf) -> (
    let buf = Bytes.unsafe_to_string buf in
    Printf.eprintf "%s" buf; Lwt.return ())
  • replacement in elpe/lib/elpe.ml at line 92
    [4.58489][4.58489:58600]()
    (Result.show_error e)))
    | None -> Elpe.Derivation.Response.make ()))
    ()
    [4.58489]
    [4.58600]
    (Result.show_error e))
    | _ -> Lwt.return ())
    responses
    ))
    ()
    in
    match !result with
    Some v -> Lwt.return v
    | None -> assert false
  • replacement in elpe/lib/elpe.ml at line 355
    [3.968][3.968:985]()
    let* res =
    [3.968]
    [3.985]
    let* r =
  • replacement in elpe/lib/elpe.ml at line 359
    [3.1103][4.72501:72562](),[4.72501][4.72501:72562]()
    let res, _ = Result.get_ok res in
    match res with
    [3.1103]
    [4.72562]
    match r with
  • edit in Cargo.toml at line 51
    [4.9564]
    [4.71823]
    tokio-stream = "0.1.17"
  • edit in Cargo.nix at line 1445
    [2.48208]
    [2.48208]
    }
    {
    name = "tokio-stream";
    packageId = "tokio-stream";
  • replacement in Cargo.nix at line 6519
    [2.226526][2.226526:226571]()
    resolvedDefaultFeatures = [ "net" ];
    [2.226526]
    [2.226571]
    resolvedDefaultFeatures = [ "default" "net" "time" ];
  • edit in Cargo.lock at line 477
    [4.82419]
    [4.82419]
    "tokio-stream",