BQ4E3XLAVS2AQAUYRGN6SZ4J35KQ2KIH7BBT2JNZXSPKKCGX7DYAC
Ok(tonic::Response::new(match rx.await.unwrap() {
Ok(out) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Ok(
proto::DerivationResult {
destdir: vec![out.to_str().unwrap().to_string()],
paths: Vec::new(),
path_patterns: Vec::new(),
},
)),
},
Err(e) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Error(e)),
},
}))
use crate::container::Msg;
let output_stream = ReceiverStream::new(rx).map(|x| {
Ok(match x {
Msg::Ok(out) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Ok(
proto::DerivationResult {
destdir: vec![out.to_str().unwrap().to_string()],
paths: Vec::new(),
path_patterns: Vec::new(),
},
)),
},
Msg::Error(p) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Error(p)),
},
Msg::Stdout(p) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Stdout(p)),
},
Msg::Stderr(p) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Stderr(p)),
},
})
});
Ok(tonic::Response::new(Box::pin(output_stream)))
}
/// The type of messages received from a contained process.
#[derive(bincode::Encode, bincode::Decode, Debug)]
pub enum Msg {
/// Success, with the result path.
Ok(PathBuf),
/// Error.
Error(String),
/// One chunk of stdout.
Stdout(Vec<u8>),
/// One chunk of stderr.
Stderr(Vec<u8>),
let ((id, resp), _) = bincode::decode_from_slice::<(u64, ProcessResult), _>(&msg, bincode::config::standard()).unwrap();
let chan = pending.remove(&id).unwrap();
chan.send(resp).unwrap_or(());
let ((id, resp), _) = bincode::decode_from_slice::<(u64, Msg), _>(&msg, bincode::config::standard()).unwrap();
match resp {
Msg::Ok(_) | Msg::Error(_) => {
let chan = pending.remove(&id).unwrap();
chan.send(resp).await.unwrap_or(());
}
_ => {
let chan = pending.get(&id).unwrap();
chan.send(resp).await.unwrap_or(());
}
}
let result: ProcessResult =
run_in_container(&user, &store, rec_msg).map_err(|e| format!("{:?}", e));
debug!("result {:?}", result);
let v = bincode::encode_to_vec(&(id, result), bincode::config::standard()).unwrap();
let task = tokio::task::spawn_blocking(move || {
run_in_container(
&user,
&store,
rec_msg,
writer_out.as_raw_fd(),
writer_err.as_raw_fd(),
)
.map_err(|e| format!("{:?}", e))
});
tokio::pin!(task);
let mut result = None;
let mut stdout_closed = false;
let mut stderr_closed = false;
let mut out_buf = [0; 4096];
let mut err_buf = [0; 4096];
while result.is_none() || !stdout_closed || !stderr_closed {
tokio::select! {
r = &mut task, if result.is_none() => {
result = Some(match r {
Ok(Ok(r)) => Msg::Ok(r),
Ok(Err(r)) => Msg::Error(r),
Err(e) => Msg::Error(format!("{:?}", e)),
});
}
r = reader_out.read(&mut out_buf), if !stdout_closed => {
if let Ok(result) = r {
if result == 0 {
stdout_closed = true
} else {
println!("stdout: {:?}", std::str::from_utf8(&out_buf[..result]));
let v = bincode::encode_to_vec(&(id, Msg::Stdout((&out_buf[..result]).to_vec())), bincode::config::standard()).unwrap();
let mut bytes = bytes::BytesMut::new();
debug!("drv_process replying {:?}", v.len());
bytes.extend_from_slice(&v);
writer.lock().await.send(bytes.into()).await.unwrap();
}
}
}
r = reader_err.read(&mut err_buf), if !stderr_closed => {
if let Ok(result) = r {
if result == 0 {
stderr_closed = true
} else {
println!("stderr: {:?}", std::str::from_utf8(&err_buf[..result]));
let v = bincode::encode_to_vec(&(id, Msg::Stderr((&err_buf[..result]).to_vec())), bincode::config::standard()).unwrap();
let mut bytes = bytes::BytesMut::new();
debug!("drv_process replying {:?}", v.len());
bytes.extend_from_slice(&v);
writer.lock().await.send(bytes.into()).await.unwrap();
}
}
}
}
}
let v = bincode::encode_to_vec(&(id, result.unwrap()), bincode::config::standard())
.unwrap();
(Client.Rpc.unary enc ~f:(fun decoder ->
let+ decoder = decoder in
match decoder with
| Some decoder -> (
Reader.create decoder |> decode |> function
| Ok v -> v
(Client.Rpc.server_streaming enc ~f:(fun responses ->
Lwt_stream.iter_s
(fun str ->
Reader.create str |> decode |> function
| Ok (`Ok path) -> (result := Some(`Ok path); Lwt.return ())
| Ok (`Error path) -> (result := Some(`Error path); Lwt.return ())
| Ok (`Stdout buf) -> (
let buf = Bytes.unsafe_to_string buf in
Printf.printf "%s" buf; Lwt.return ())
| Ok (`Stderr buf) -> (
let buf = Bytes.unsafe_to_string buf in
Printf.eprintf "%s" buf; Lwt.return ())