pijul nest
guest [sign in]

Forwarding log to frontend

pmeunier
Jul 2, 2025, 9:15 PM
AJLMC7UMTMBYBXLOSMGT5PDNX4D7O7LV4GDV7AN25CPOBPRUFYVAC

Dependencies

  • [2] LIUJQXB7 Allow merging two packages based on regular expressions of their name
  • [3] 5IT7CXFG Debugging the dependency resolution (Tarjan's SCC algorithm)
  • [4] BDEVQIAU Handle cyclic Ubuntu dependencies
  • [5] 6BIW5YDC Rust builder: compiling the crates
  • [6] UWQB743K First working shell (with ocaml code)
  • [7] SI454P2V Documentation and cleanup
  • [8] BQ4E3XLA Forwarding stdout/stderr
  • [9] R7J4254Z Reorganisation of the frontend
  • [10] ODUDDQRY Adding the OCaml interface
  • [11] F2C3TLOI Fetching URLs (server-side)

Change contents

  • edit in src/main.rs at line 9
    [3.44]
    [3.1587]
    use tokio_util::sync::PollSender;
  • edit in src/main.rs at line 38
    [3.2038]
    [3.2038]
    debug!("add_path");
  • replacement in src/main.rs at line 41
    [3.387][3.2127:2179](),[3.2127][3.2127:2179]()
    let tmp_dir = tempfile::tempdir_in(store)?;
    [3.387]
    [3.2179]
    debug!("store");
    let tmp_dir = tempfile::tempdir_in(store).unwrap();
    debug!("store {:?}", tmp_dir);
  • replacement in src/main.rs at line 46
    [3.2228][3.2000:2001](),[3.2000][3.2000:2001]()
    [3.2228]
    [3.2229]
    debug!("loop");
  • edit in src/main.rs at line 53
    [3.2482]
    [3.2482]
    info!("Adding file {:?}", f.name);
  • edit in src/main.rs at line 62
    [3.2922]
    [3.2922]
    info!("Adding file {:?}", d.name);
  • edit in src/main.rs at line 76
    [3.3477]
    [3.3477]
    debug!("loop done");
  • edit in src/main.rs at line 220
    [3.2951]
    [3.2951]
    type UbuntuPackageStream = ResponseStream;
  • replacement in src/main.rs at line 225
    [3.6060][3.6060:6134]()
    ) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
    [3.6060]
    [3.3117]
    ) -> Result<tonic::Response<Self::UbuntuPackageStream>, tonic::Status> {
  • replacement in src/main.rs at line 245
    [2.294][2.294:380](),[2.380][3.3465:3507](),[3.3465][3.3465:3507](),[3.3507][3.6135:6180](),[3.6180][3.3538:3595](),[3.3538][3.3538:3595](),[3.3595][3.6181:6646]()
    let p = download_extract_deps(&index, &self.deb_client, &r.name, &link_extra)
    .await
    .unwrap();
    info!("path {:?} {:#?}", r.name, p);
    Ok(tonic::Response::new(proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Ok(
    proto::DerivationResult {
    destdir: p
    .result
    .iter()
    .rev()
    .map(|x| x.to_str().unwrap().to_string())
    .collect(),
    paths: p.paths.into_iter().filter_map(Arc::into_inner).collect(),
    path_patterns: Vec::new(),
    [2.294]
    [3.6646]
    let (tx, rx) = tokio::sync::mpsc::channel(200);
    use crate::extract::Msg;
    let output_stream = ReceiverStream::new(rx).map(|x| {
    Ok(match x {
    Msg::Downloading(p) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Loading(p)),
  • replacement in src/main.rs at line 253
    [3.6665][3.6665:6681](),[3.6681][3.3773:3785](),[3.3773][3.3773:3785]()
    )),
    }))
    [3.6665]
    [3.4845]
    Msg::Ok(p) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Ok(
    proto::DerivationResult {
    destdir: p
    .result
    .iter()
    .rev()
    .map(|x| x.to_str().unwrap().to_string())
    .collect(),
    paths: p.paths.into_iter().filter_map(Arc::into_inner).collect(),
    path_patterns: Vec::new(),
    },
    )),
    },
    Msg::Error(e) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Error(e.to_string())),
    },
    })
    });
    match download_extract_deps(
    &index,
    &self.deb_client,
    &r.name,
    &link_extra,
    PollSender::new(tx.clone()),
    )
    .await
    {
    Ok(p) => {
    info!("path {:?} {:#?}", r.name, p);
    tx.send(Msg::Ok(p)).await.unwrap();
    }
    Err(e) => tx.send(Msg::Error(e)).await.unwrap(),
    }
    Ok(tonic::Response::new(Box::pin(output_stream)))
  • edit in src/lib.rs at line 10
    [3.8271]
    [3.10327]
    use futures::SinkExt;
  • edit in src/lib.rs at line 36
    [3.8732]
    [3.10537]
    mod dummy_sink;
    use dummy_sink::*;
  • edit in src/lib.rs at line 93
    [3.10103]
    [3.11210]
    /// Package not found
    #[error("Package not found: {pkg}")]
    PackageNotFound {
    /// The package name.
    pkg: String,
    },
  • edit in src/lib.rs at line 311
    [3.1497]
    [3.11035]
    std::fs::create_dir_all(store_path.as_ref()).unwrap();
  • replacement in src/lib.rs at line 366
    [3.12451][2.381:452]()
    let p = extract::download_extract_deps(&index, self, pkg, &[])
    [3.12451]
    [3.12517]
    let p = extract::download_extract_deps(&index, self, pkg, &[], DummySink {})
  • replacement in src/lib.rs at line 607
    [3.14073][3.18409:18510](),[3.18409][3.18409:18510]()
    async fn download_url<'a>(&self, url: &str, sha256: &str) -> Result<(Downloaded, bool), Error> {
    [3.14073]
    [3.18510]
    async fn download_url<'a, S: futures::Sink<extract::Msg> + Unpin>(
    &self,
    url: &str,
    sha256: &str,
    mut tx: S,
    ) -> Result<(Downloaded, bool), Error>
    where
    S::Error: std::fmt::Debug,
    {
  • edit in src/lib.rs at line 624
    [3.14177]
    [3.14177]
    tx.send(extract::Msg::Downloading(url.to_string()))
    .await
    .unwrap();
  • edit in src/extract.rs at line 42
    [2.610]
    [3.502]
    }
    /// Messages from the extractor.
    #[derive(Debug)]
    pub enum Msg {
    /// Downloading
    Downloading(String),
    /// Result
    Ok(Packages),
    /// Error
    Error(Error),
  • replacement in src/extract.rs at line 57
    [3.3343][3.19837:19873](),[3.17671][3.19837:19873](),[3.19837][3.19837:19873]()
    pub async fn download_extract_deps(
    [3.3343]
    [3.19873]
    pub async fn download_extract_deps<F: futures::Sink<Msg> + Clone + Send + Unpin + 'static>(
  • replacement in src/extract.rs at line 62
    [2.660][3.17694:17725](),[3.19944][3.17694:17725](),[3.20068][3.17726:17786]()
    ) -> Result<Packages, Error> {
    let pkg = multi_lookup(index, &package).await.unwrap();
    [2.660]
    [3.506]
    tx: F,
    ) -> Result<Packages, Error>
    where
    F::Error: std::fmt::Debug,
    {
    let Some(pkg) = multi_lookup(index, &package).await else {
    return Err(Error::PackageNotFound {
    pkg: package.to_string(),
    });
    };
  • replacement in src/extract.rs at line 107
    [3.1773][3.1773:1837]()
    let rx = spawn_extract(client.clone(), &pkg).await;
    [3.1773]
    [3.1837]
    let rx = spawn_extract(client.clone(), &pkg, tx.clone()).await;
  • replacement in src/extract.rs at line 323
    [3.5478][3.5478:5506]()
    async fn spawn_extract<'a>(
    [3.5478]
    [3.5506]
    async fn spawn_extract<'a, S: futures::Sink<Msg> + Send + Unpin + 'static>(
  • replacement in src/extract.rs at line 326
    [3.5556][3.5556:5621]()
    ) -> tokio::sync::oneshot::Receiver<Result<Downloaded, Error>> {
    [3.5556]
    [3.5621]
    log_tx: S,
    ) -> tokio::sync::oneshot::Receiver<Result<Downloaded, Error>>
    where
    S::Error: std::fmt::Debug,
    {
  • replacement in src/extract.rs at line 337
    [3.5932][3.5932:6009]()
    let (mut task, _) = match client.download_url(&url, &sha256).await {
    [3.5932]
    [3.6009]
    let (mut task, _) = match client.download_url(&url, &sha256, log_tx).await {
  • file addition: dummy_sink.rs (----------)
    [3.15]
    use std::convert::Infallible;
    use std::pin::Pin;
    #[derive(Clone, Copy)]
    pub struct DummySink {}
    impl<T> futures::Sink<T> for DummySink {
    type Error = Infallible;
    fn poll_ready(
    self: Pin<&mut Self>,
    _cx: &mut futures::task::Context<'_>,
    ) -> futures::task::Poll<Result<(), Infallible>> {
    futures::task::Poll::Ready(Ok(()))
    }
    fn start_send(self: Pin<&mut Self>, _item: T) -> Result<(), Infallible> {
    Ok(())
    }
    fn poll_flush(
    self: Pin<&mut Self>,
    _cx: &mut futures::task::Context<'_>,
    ) -> futures::task::Poll<Result<(), Infallible>> {
    futures::task::Poll::Ready(Ok(()))
    }
    fn poll_close(
    self: Pin<&mut Self>,
    _cx: &mut futures::task::Context<'_>,
    ) -> futures::task::Poll<Result<(), Infallible>> {
    futures::task::Poll::Ready(Ok(()))
    }
    }
  • edit in elpe/lib/elpegrpc.proto at line 64
    [3.6581]
    [3.63852]
    string loading = 5;
  • replacement in elpe/lib/elpegrpc.proto at line 95
    [3.56055][3.56055:56125]()
    rpc UbuntuPackage (UbuntuPackageRequest) returns (DerivationReply);
    [3.56055]
    [3.56125]
    rpc UbuntuPackage (UbuntuPackageRequest) returns (stream DerivationReply);
  • replacement in elpe/lib/elpe.ml at line 115
    [3.72631][3.72631:72662]()
    | `Error e -> failwith e
    [3.72631]
    [3.72662]
    | `Error e -> raise (DerivationError e)
  • edit in elpe/lib/derivation.ml at line 2
    [3.462]
    [3.462]
    exception DerivationError of string
  • replacement in elpe/lib/derivation.ml at line 80
    [3.2579][3.2579:3246]()
    Client.call ~service:"elpe.Elpe" ~rpc:"UbuntuPackage"
    ~do_request:
    (H2_lwt_unix.Client.request connection ~error_handler:(fun _ ->
    failwith "Error"))
    ~handler:
    (Client.Rpc.unary enc ~f:(fun decoder ->
    let+ decoder = decoder in
    match decoder with
    | Some decoder -> (
    Reader.create decoder |> decode |> function
    | Ok v -> v
    | Error e ->
    failwith
    (Printf.sprintf "Could not decode request: %s"
    (Result.show_error e)))
    | None -> Elpegrpc.Elpe.Elpe.Derivation.Response.make ()))
    ()
    [3.2579]
    [3.3246]
    let result = ref None in
    let* x =
    Client.call ~service:"elpe.Elpe" ~rpc:"UbuntuPackage"
    ~do_request:
    (H2_lwt_unix.Client.request connection ~error_handler:(fun _ ->
    failwith "Error"))
    ~handler:
    (Client.Rpc.server_streaming enc ~f:(fun responses ->
    Lwt_stream.iter_s
    (fun str ->
    Reader.create str |> decode |> function
    | Ok (`Ok v) ->
    result := Some v;
    Lwt.return ()
    | Ok (`Loading v) ->
    Printf.eprintf "Downloading %s\n" v;
    Stdlib.flush Stdlib.stderr;
    Lwt.return ()
    | Error e ->
    failwith
    (Printf.sprintf "Could not decode request: %s"
    (Result.show_error e))
    | _ -> Lwt.return ())
    responses))
    ()
    in
    match x with
    | Ok _ -> Lwt.return (Ok (Option.get !result))
    | Error e -> Lwt.return (Error e)
    type arch = Amd64 | Aarch64
  • replacement in elpe/lib/derivation.ml at line 112
    [3.3247][3.3247:3287]()
    let ubuntu ?(release = "plucky") name =
    [3.3247]
    [3.3287]
    let ubuntu ?(release = "plucky") ?(arch = Amd64) name =
  • replacement in elpe/lib/derivation.ml at line 191
    [3.5575][3.5575:5625]()
    Printf.printf "%s" buf
    [3.5575]
    [3.5625]
    Printf.printf "%s" buf;
    Stdlib.flush Stdlib.stdout
  • replacement in elpe/lib/derivation.ml at line 201
    [3.5944][3.5944:5994]()
    Printf.printf "%s" buf
    [3.5944]
    [3.5994]
    Printf.eprintf "%s" buf;
    Stdlib.flush Stdlib.stderr