split stderr/stdout into smaller chunks in websocket

pmeunier
Apr 28, 2026, 10:46 AM
43PI2ASDQFPSI4TKEELSNCYQVKSVHBZCQFP7ZQ6RIFVBQAKHLKUQC

Dependencies

Change contents

  • replacement in geolite2.nix at line 8
    [2.307895][3.58306:58374]()
    hash = "sha256-GtQcO5i29L38HL4vE+9P5nGI2mW1+PSaEhp18mJNmXU=";
    [2.307895]
    [2.307963]
    hash = "sha256-YTzCCpRfaMvVeszKigjqTePaDySZzhVv2POyfW1iXyU=";
  • replacement in api/src/replication.rs at line 447
    [7.5383][7.5383:5439]()
    let n = if let Some(line) = line ?{
    [7.5383]
    [7.5439]
    let n = if let Some(line) = line? {
  • replacement in api/src/jobs.rs at line 19
    [4.32645][4.32645:32690]()
    use tokio::io::{AsyncReadExt, AsyncSeekExt};
    [4.32645]
    [4.32690]
    use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, BufReader};
  • replacement in api/src/jobs.rs at line 167
    [4.36549][6.1286:1618]()
    if let Some((a, b, c)) = send_all(&config, id, &mut remote_stdout, &mut remote_stderr)
    .await
    .unwrap()
    {
    socket.send(a.into()).await.unwrap_or(());
    socket.send(b.into()).await.unwrap_or(());
    if let Some(c) = c {
    socket.send(c.into()).await.unwrap_or(());
    }
    }
    [4.36549]
    [6.1618]
    send_all(
    &config,
    id,
    &mut remote_stdout,
    &mut remote_stderr,
    &mut socket,
    )
    .await
    .unwrap();
  • replacement in api/src/jobs.rs at line 258
    [4.39254][4.39254:39324]()
    ) -> Result<Option<(String, String, Option<String>)>, crate::Error> {
    [4.39254]
    [4.39324]
    socket: &mut WebSocket,
    ) -> Result<(), crate::Error> {
  • replacement in api/src/jobs.rs at line 271
    [4.39666][4.39666:39867](),[4.39867][5.69:157]()
    let mut stdout = String::new();
    let mut stderr = String::new();
    debug!("send_all {:?} {:?}", stdout.len(), stderr.len());
    if let Some(ref path) = config.ci.filesystem {
    let mut outf = tokio::fs::File::open(&path.join(&format!("{}.stdout", id)))
    [4.39666]
    [5.157]
    if let Some(ended) = ended {
    socket
    .send(
    serde_json::to_string(&Msg::Status { ended, status })
    .unwrap()
    .into(),
    )
  • replacement in api/src/jobs.rs at line 280
    [5.207][5.207:295]()
    let mut errf = tokio::fs::File::open(&path.join(&format!("{}.stderr", id)))
    [5.207]
    [5.295]
    }
    let Some(ref path) = config.ci.filesystem else {
    return Ok(());
    };
    let mut outf = BufReader::new(
    tokio::fs::File::open(&path.join(&format!("{}.stdout", id)))
  • replacement in api/src/jobs.rs at line 288
    [5.318][5.318:345](),[5.345][4.40059:40357](),[4.40059][4.40059:40357]()
    .unwrap();
    outf.seek(std::io::SeekFrom::Start(*remote_stdout as u64))
    .await?;
    errf.seek(std::io::SeekFrom::Start(*remote_stderr as u64))
    .await?;
    outf.read_to_string(&mut stdout).await?;
    errf.read_to_string(&mut stderr).await?;
    [5.318]
    [4.40357]
    .unwrap(),
    );
    outf.seek(std::io::SeekFrom::Start(*remote_stdout as u64))
    .await?;
    let mut buf = String::with_capacity(8192);
    while let Ok(n) = outf.read_line(&mut buf).await {
    *remote_stdout += n;
    if buf.len() >= 4096 || n == 0 {
    socket
    .send(
    serde_json::to_string(&Msg::Chunk {
    channel: 0,
    offset: 0,
    content: &buf,
    })
    .unwrap()
    .into(),
    )
    .await
    .unwrap();
    buf.clear()
    }
    if n == 0 {
    break;
    }
  • replacement in api/src/jobs.rs at line 315
    [4.40367][4.40367:40950]()
    *remote_stdout = stdout.len();
    *remote_stderr = stderr.len();
    Ok(Some((
    serde_json::to_string(&Msg::Chunk {
    channel: 0,
    offset: 0,
    content: &stdout,
    })
    .unwrap(),
    serde_json::to_string(&Msg::Chunk {
    channel: 1,
    offset: 0,
    content: &stderr,
    })
    .unwrap(),
    ended.map(|ended| serde_json::to_string(&Msg::Status { ended, status }).unwrap()),
    )))
    } else {
    Ok(None)
    [4.40367]
    [4.40950]
    buf.clear();
    let mut errf = BufReader::new(
    tokio::fs::File::open(&path.join(&format!("{}.stderr", id)))
    .await
    .unwrap(),
    );
    errf.seek(std::io::SeekFrom::Start(*remote_stderr as u64))
    .await?;
    while let Ok(n) = errf.read_line(&mut buf).await {
    *remote_stderr += n;
    if buf.len() >= 4096 || n == 0 {
    socket
    .send(
    serde_json::to_string(&Msg::Chunk {
    channel: 1,
    offset: 0,
    content: &buf,
    })
    .unwrap()
    .into(),
    )
    .await
    .unwrap();
    buf.clear()
    }
    if n == 0 {
    break;
    }
    }
  • edit in api/src/jobs.rs at line 345
    [4.40956]
    [4.40956]
    Ok(())