Fork channel

Create a new channel as a copy of main.

Rename channel

Rename main to:

Delete channel

Delete main? This cannot be undone.

lib.rs
//! An implementation of an rsync-like protocol (not compatible with
//! rsync), in pure Rust.
//!
//! ```
//! extern crate rand;
//! extern crate rustsync;
//! use rustsync::*;
//! use rand::Rng;
//! fn main() {
//!   // Create 4 different random strings first.
//!   let chunk_size = 1000;
//!   let a = rand::thread_rng()
//!           .gen_ascii_chars()
//!           .take(chunk_size)
//!           .collect::<String>();
//!   let b = rand::thread_rng()
//!           .gen_ascii_chars()
//!           .take(50)
//!           .collect::<String>();
//!   let b_ = rand::thread_rng()
//!           .gen_ascii_chars()
//!           .take(100)
//!           .collect::<String>();
//!   let c = rand::thread_rng()
//!           .gen_ascii_chars()
//!           .take(chunk_size)
//!           .collect::<String>();
//!
//!   // Now concatenate them in two different ways.
//!
//!   let mut source = a.clone() + &b + &c;
//!   let mut modified = a + &b_ + &c;
//!
//!   // Suppose we want to download `modified`, and we already have
//!   // `source`, which only differs by a few characters in the
//!   // middle.
//!
//!   // We first have to choose a block size, which will be recorded
//!   // in the signature below. Blocks should normally be much bigger
//!   // than this in order to be efficient on large files.
//!
//!   let block = [0; 32];
//!
//!   // We then create a signature of `source`, to be uploaded to the
//!   // remote machine. Signatures are typically much smaller than
//!   // files, with just a few bytes per block.
//!
//!   let source_sig = signature(source.as_bytes(), block).unwrap();
//!
//!   // Then, we let the server compare our signature with their
//!   // version.
//!
//!   let comp = compare(&source_sig, modified.as_bytes(), block).unwrap();
//!
//!   // We finally download the result of that comparison, and
//!   // restore their file from that.
//!
//!   let mut restored = Vec::new();
//!   restore_seek(&mut restored, std::io::Cursor::new(source.as_bytes()), vec![0; 1000], &comp).unwrap();
//!   assert_eq!(&restored[..], modified.as_bytes())
//! }
//! ```

extern crate adler32;
extern crate blake2_rfc;
extern crate futures;
#[cfg(test)]
extern crate rand;
extern crate serde;
#[macro_use]
extern crate serde_derive;
#[cfg(test)]
extern crate tokio_core;
extern crate tokio_io;

use std::collections::HashMap;
use std::io::{Read, Seek, SeekFrom, Write};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{write_all, WriteAll};
use futures::{Async, Future, Poll};

const BLAKE2_SIZE: usize = 32;

#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
struct Blake2b([u8; BLAKE2_SIZE]);

impl std::borrow::Borrow<[u8]> for Blake2b {
    fn borrow(&self) -> &[u8] {
        &self.0
    }
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
/// The "signature" of the file, which is essentially a
/// content-indexed description of the blocks in the file.
pub struct Signature {
    pub window: usize,
    chunks: HashMap<u32, HashMap<Blake2b, usize>>
}

/// Create the "signature" of a file, essentially a content-indexed
/// map of blocks. The first step of the protocol is to run this
/// function on the "source" (the remote file when downloading, the
/// local file while uploading).
pub fn signature<R: Read, B: AsRef<[u8]>+AsMut<[u8]>>(mut r: R, mut block: B) -> Result<Signature, std::io::Error> {
    let mut chunks = HashMap::new();

    let mut i = 0;
    let block = block.as_mut();
    let mut eof = false;
    while !eof {
        let mut j = 0;
        while j < block.len() {
            let r = r.read(&mut block[j..])?;
            if r == 0 {
                eof = true;
                break;
            }
            j += r
        }
        let block = &block[..j];
        let hash = adler32::RollingAdler32::from_buffer(block);
        let mut blake2 = [0; BLAKE2_SIZE];
        blake2.clone_from_slice(blake2_rfc::blake2b::blake2b(BLAKE2_SIZE, &[], &block).as_bytes());
        println!("{:?} {:?}", block, blake2);
        chunks
            .entry(hash.hash())
            .or_insert(HashMap::new())
            .insert(Blake2b(blake2), i);

        i += block.len()
    }

    Ok(Signature {
        window: block.len(),
        chunks
    })
}

pub struct ReadBlock<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> {
    block: Option<(R, B)>,
    first: usize,
}

impl<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> ReadBlock<R, B> {
    fn new(r: R, block: B) -> Self {
        ReadBlock {
            block: Some((r, block)),
            first: 0,
        }
    }
}

impl<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> Future for ReadBlock<R, B> {
    type Item = (R, B, usize);
    type Error = std::io::Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        loop {
            if let Some((mut r, mut block)) = self.block.take() {
                let n = {
                    let block = block.as_mut();
                    if self.first == block.len() {
                        0
                    } else {
                        match r.read(&mut block[self.first..]) {
                            Ok(n) => n,
                            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                                return Ok(Async::NotReady)
                            }
                            Err(e) => return Err(e),
                        }
                    }
                };
                if n == 0 {
                    return Ok(Async::Ready((r, block, self.first)));
                } else {
                    self.first += n;
                    self.block = Some((r, block));
                }
            } else {
                panic!("future polled after completion")
            }
        }
    }
}

pub struct WriteBlock<W: AsyncWrite, B: AsRef<[u8]>> {
    block: Option<(W, B)>,
    first: usize,
    len: usize,
}

impl<W: AsyncWrite, B: AsRef<[u8]>> WriteBlock<W, B> {
    fn new(w: W, block: B, first: usize, len: usize) -> Self {
        WriteBlock {
            block: Some((w, block)),
            first,
            len,
        }
    }
}

impl<W: AsyncWrite, B: AsRef<[u8]>> Future for WriteBlock<W, B> {
    type Item = (W, B);
    type Error = std::io::Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        loop {
            if let Some((mut w, block)) = self.block.take() {
                match w.write(&block.as_ref()[self.first..self.len]) {
                    Ok(n) => {
                        self.first += n;
                        if self.first >= self.len {
                            return Ok(Async::Ready((w, block)));
                        } else {
                            self.block = Some((w, block))
                        }
                    }
                    Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                        return Ok(Async::NotReady)
                    }
                    Err(e) => return Err(e),
                }
            } else {
                panic!("future polled after completion")
            }
        }
    }
}

pub struct FutureSignature<R: AsyncRead, B:AsRef<[u8]>+AsMut<[u8]>> {
    chunks: HashMap<u32, HashMap<Blake2b, usize>>,
    i: usize,
    eof: bool,
    state: Option<ReadBlock<R, B>>,
}

impl<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> Future for FutureSignature<R, B> {
    type Item = (R, Signature);
    type Error = std::io::Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        loop {
            if let Some(mut reading) = self.state.take() {
                if let Async::Ready((r, mut block, len)) = reading.poll()? {
                    {
                        let block_ = block.as_ref();
                        self.eof = len < block_.len();
                        let b = &block_[..len];
                        let hash = adler32::RollingAdler32::from_buffer(b);
                        let mut blake2 = [0; BLAKE2_SIZE];
                        blake2.clone_from_slice(
                            blake2_rfc::blake2b::blake2b(BLAKE2_SIZE, &[], &b).as_bytes(),
                        );
                        self.chunks
                            .entry(hash.hash())
                            .or_insert(HashMap::new())
                            .insert(Blake2b(blake2), self.i);
                        self.i += block_.len();
                    }
                    if self.eof {
                        return Ok(Async::Ready((
                            r,
                            Signature {
                                chunks: std::mem::replace(&mut self.chunks, HashMap::new()),
                                window: block.as_ref().len(),
                            },
                        )));
                    } else {
                        self.state = Some(ReadBlock::new(r, block))
                    }
                } else {
                    self.state = Some(reading);
                    return Ok(Async::NotReady);
                }
            }
        }
    }
}

/// This is the same as [`signature`](fn.signature.html), except that
/// this function reads the input source asynchronously.
pub fn signature_fut<R: AsyncRead, B:AsRef<[u8]>+AsMut<[u8]>>(r: R, b: B) -> FutureSignature<R, B> {
    FutureSignature {
        state: Some(ReadBlock::new(r, b)),
        i: 0,
        eof: false,
        chunks: HashMap::new(),
    }
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub enum Block {
    FromSource(u64),
    Literal(Vec<u8>),
}

struct State {
    result: Vec<Block>,
    block_oldest: usize,
    block_len: usize,
    pending: Vec<u8>,
}

impl State {
    fn new() -> Self {
        State {
            result: Vec::new(),
            block_oldest: 0,
            block_len: 1,
            pending: Vec::new(),
        }
    }
}

#[derive(Default, Debug, PartialEq)]
/// The result of comparing two files
pub struct Delta {
    /// Description of the new file in terms of blocks.
    pub blocks: Vec<Block>,
    /// Size of the window.
    pub window: usize,
}

/// Compare a signature with an existing file. This is the second step
/// of the protocol, `r` is the local file when downloading, and the
/// remote file when uploading.
///
/// `block` must be a buffer the same size as `sig.window`.
pub fn compare<R: Read, B:AsRef<[u8]>+AsMut<[u8]>>(sig: &Signature, mut r: R, mut block: B) -> Result<Delta, std::io::Error> {
    let mut st = State::new();
    let block = block.as_mut();
    assert_eq!(block.len(), sig.window);
    while st.block_len > 0 {
        let mut hash = {
            let mut j = 0;
            let block = {
                while j < sig.window {
                    let r = r.read(&mut block[..])?;
                    if r == 0 {
                        break;
                    }
                    j += r
                }
                st.block_oldest = 0;
                st.block_len = j;
                &block[..j]
            };
            adler32::RollingAdler32::from_buffer(block)
        };

        // Starting from the current block (with hash `hash`), find
        // the next block with a hash that appears in the signature.
        loop {
            if matches(&mut st, sig, &block, &hash) {
                break;
            }
            // The blocks are not equal. Move the hash by one byte
            // until finding an equal block.
            let oldest = block[st.block_oldest];
            hash.remove(st.block_len, oldest);
            let r = r.read(&mut block[st.block_oldest..st.block_oldest + 1])?;
            if r > 0 {
                // If there are still bytes to read, update the hash.
                hash.update(block[st.block_oldest]);
            } else if st.block_len > 0 {
                // Else, just shrink the window, so that the current
                // block's blake2 hash can be compared with the
                // signature.
                st.block_len -= 1;
            } else {
                // We're done reading the file.
                break;
            }
            st.pending.push(oldest);
            st.block_oldest = (st.block_oldest + 1) % sig.window;
        }
        if !st.pending.is_empty() {
            // We've reached the end of the file, and have never found
            // a matching block again.
            st.result.push(Block::Literal(std::mem::replace(
                &mut st.pending,
                Vec::new(),
            )))
        }
    }
    Ok(Delta {
        blocks: st.result,
        window: sig.window,
    })
}

pub struct FutureCompare<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> {
    state: Option<CompareState<R, B>>,
    st: State,
}

enum CompareState<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> {
    ReadBlock {
        readblock: ReadBlock<R, B>,
        sig: Signature,
    },
    FindNext {
        hash: adler32::RollingAdler32,
        r: R,
        block: B,
        sig: Signature,
    },
    FindNextRead {
        hash: adler32::RollingAdler32,
        reading: ReadBlock<R, [u8; 1]>,
        block: B,
        sig: Signature,
    },
    EndLiteralBlock {
        r: R,
        block: B,
        sig: Signature,
    },
}

impl<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>> Future for FutureCompare<R, B> {
    type Item = (R, Signature, Delta);
    type Error = std::io::Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        loop {
            match self.state.take() {
                Some(CompareState::ReadBlock { mut readblock, sig }) => {
                    if let Async::Ready((r, block, len)) = readblock.poll()? {
                        self.st.block_oldest = 0;
                        self.st.block_len = len;
                        let mut hash = adler32::RollingAdler32::from_buffer(&block.as_ref()[..len]);
                        self.state = Some(CompareState::FindNext {
                            hash,
                            block,
                            r,
                            sig,
                        });
                    } else {
                        self.state = Some(CompareState::ReadBlock { readblock, sig });
                        return Ok(Async::NotReady);
                    }
                }
                Some(CompareState::FindNext {
                    mut hash,
                    r,
                    block,
                    sig,
                }) => {
                    if matches(&mut self.st, &sig, block.as_ref(), &hash) {
                        self.state = Some(CompareState::EndLiteralBlock { r, block, sig })
                    } else {
                        let oldest = block.as_ref()[self.st.block_oldest];
                        hash.remove(self.st.block_len, oldest);
                        self.state = Some(CompareState::FindNextRead {
                            hash,
                            reading: ReadBlock::new(r, [0; 1]),
                            block,
                            sig,
                        })
                    }
                }
                Some(CompareState::FindNextRead {
                    mut hash,
                    mut reading,
                    mut block,
                    sig,
                }) => {
                    if let Async::Ready((r, b, len)) = reading.poll()? {
                        if len > 0 || self.st.block_len > 0 {
                            let oldest = block.as_ref()[self.st.block_oldest];
                            if len > 0 {
                                block.as_mut()[self.st.block_oldest] = b[0];
                                hash.update(b[0])
                            } else {
                                self.st.block_len -= 1
                            }

                            self.st.pending.push(oldest);
                            self.st.block_oldest = (self.st.block_oldest + 1) % sig.window;
                            self.state = Some(CompareState::FindNext {
                                hash,
                                r,
                                block,
                                sig,
                            })
                        } else {
                            self.state = Some(CompareState::EndLiteralBlock { r, block, sig })
                        }
                    } else {
                        self.state = Some(CompareState::FindNextRead {
                            hash,
                            reading,
                            block,
                            sig,
                        })
                    }
                }
                Some(CompareState::EndLiteralBlock { r, block, sig }) => {
                    if !self.st.pending.is_empty() {
                        // We've reached the end of the file, and have never found
                        // a matching block again.
                        self.st.result.push(Block::Literal(std::mem::replace(
                            &mut self.st.pending,
                            Vec::new(),
                        )))
                    }
                    if self.st.block_len > 0 {
                        self.state = Some(CompareState::ReadBlock {
                            readblock: ReadBlock::new(r, block),
                            sig,
                        })
                    } else {
                        let window = sig.window;
                        return Ok(Async::Ready((
                            r,
                            sig,
                            Delta {
                                blocks: std::mem::replace(&mut self.st.result, Vec::new()),
                                window,
                            },
                        )));
                    }
                }
                None => panic!(""),
            }
        }
    }
}

/// Same as [`compare`](fn.compare.html), except that this function
/// reads the file asynchronously.
pub fn compare_fut<R: AsyncRead, B: AsRef<[u8]>+AsMut<[u8]>>(sig: Signature, r: R, block: B) -> FutureCompare<R, B> {
    assert_eq!(block.as_ref().len(), sig.window);
    FutureCompare {
        state: Some(CompareState::ReadBlock {
            readblock: ReadBlock::new(r, block),
            sig,
        }),
        st: State::new(),
    }
}

fn matches(st: &mut State, sig: &Signature, block: &[u8], hash: &adler32::RollingAdler32) -> bool {
    if let Some(h) = sig.chunks.get(&hash.hash()) {
        let blake2 = {
            let mut b = blake2_rfc::blake2b::Blake2b::new(BLAKE2_SIZE);
            if st.block_oldest + st.block_len > sig.window {
                b.update(&block[st.block_oldest..]);
                b.update(&block[..(st.block_oldest + st.block_len) % sig.window]);
            } else {
                b.update(&block[st.block_oldest..st.block_oldest + st.block_len])
            }
            b.finalize()
        };

        if let Some(&index) = h.get(blake2.as_bytes()) {
            // Matching hash found! If we have non-matching
            // material before the match, add it.
            if !st.pending.is_empty() {
                st.result.push(Block::Literal(std::mem::replace(
                    &mut st.pending,
                    Vec::new(),
                )));
            }
            st.result.push(Block::FromSource(index as u64));
            return true;
        }
    }
    false
}

/// Restore a file, using a "delta" (resulting from
/// [`compare`](fn.compare.html))
pub fn restore<W: Write>(mut w: W, s: &[u8], delta: &Delta) -> Result<(), std::io::Error> {
    for d in delta.blocks.iter() {
        match *d {
            Block::FromSource(i) => {
                let i = i as usize;
                if i + delta.window <= s.len() {
                    w.write(&s[i..i + delta.window])?
                } else {
                    w.write(&s[i..])?
                }
            }
            Block::Literal(ref l) => w.write(l)?,
        };
    }
    Ok(())
}

/// Same as [`restore`](fn.restore.html), except that this function
/// uses a seekable, readable stream instead of the entire file in a
/// slice.
///
/// `buf` must be a buffer the same size as `sig.window`.
pub fn restore_seek<W: Write, R: Read + Seek, B: AsRef<[u8]>+AsMut<[u8]>>(
    mut w: W,
    mut s: R,
    mut buf: B,
    delta: &Delta,
) -> Result<(), std::io::Error> {
    let buf = buf.as_mut();

    for d in delta.blocks.iter() {
        match *d {
            Block::FromSource(i) => {
                s.seek(SeekFrom::Start(i as u64))?;
                // fill the buffer from r.
                let mut n = 0;
                loop {
                    let r = s.read(&mut buf[n..delta.window])?;
                    if r == 0 {
                        break;
                    }
                    n += r
                }
                // write the buffer to w.
                let mut m = 0;
                while m < n {
                    m += w.write(&buf[m..n])?;
                }
            }
            Block::Literal(ref l) => {
                w.write(l)?;
            }
        }
    }
    Ok(())
}

pub struct FutureRestore<W: AsyncWrite, R: Read + Seek, B: AsMut<[u8]> + AsRef<[u8]>> {
    state: Option<RestoreState<W, R, B>>,
    delta: Delta,
    delta_pos: usize,
}

enum RestoreState<W: AsyncWrite, R: Read + Seek, B: AsMut<[u8]> + AsRef<[u8]>> {
    Delta {
        w: W,
        r: R,
        buf: B,
    },
    WriteBuf {
        write: WriteBlock<W, B>,
        r: R,
    },
    WriteVec {
        write: WriteAll<W, Vec<u8>>,
        r: R,
        buf: B,
    },
}

/// Same as [`restore_seek`](fn.restore_seek.html), except that this
/// function writes its output asynchronously.
pub fn restore_seek_fut<W: AsyncWrite, R: Read + Seek, B: AsMut<[u8]> + AsRef<[u8]>>(
    w: W,
    r: R,
    buf: B,
    delta: Delta,
) -> FutureRestore<W, R, B> {
    FutureRestore {
        state: Some(RestoreState::Delta { w, r, buf }),
        delta,
        delta_pos: 0,
    }
}

impl<W: AsyncWrite, R: Read + Seek, B: AsMut<[u8]> + AsRef<[u8]>> Future
    for FutureRestore<W, R, B> {
    type Item = (W, R, Delta);
    type Error = std::io::Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        loop {
            match self.state.take() {
                Some(RestoreState::Delta { w, mut r, mut buf }) => {
                    if self.delta_pos >= self.delta.blocks.len() {
                        return Ok(Async::Ready((
                            w,
                            r,
                            std::mem::replace(&mut self.delta, Delta::default()),
                        )));
                    }
                    match self.delta.blocks[self.delta_pos] {
                        Block::FromSource(i) => {
                            r.seek(SeekFrom::Start(i as u64))?;
                            // fill the buffer from r.
                            let mut n = 0;
                            {
                                let buf_ = buf.as_mut();
                                loop {
                                    let k = r.read(&mut buf_[n..self.delta.window])?;
                                    if k == 0 {
                                        break;
                                    }
                                    n += k
                                }
                            }
                            // write the buffer to w.
                            self.state = Some(RestoreState::WriteBuf {
                                write: WriteBlock::new(w, buf, 0, n),
                                r,
                            })
                        }
                        Block::Literal(ref mut l) => {
                            let vec = std::mem::replace(l, Vec::new());
                            self.state = Some(RestoreState::WriteVec {
                                write: write_all(w, vec),
                                r,
                                buf,
                            })
                        }
                    }
                }
                Some(RestoreState::WriteBuf { mut write, r }) => match write.poll()? {
                    Async::Ready((w, buf)) => {
                        self.delta_pos = self.delta_pos + 1;
                        self.state = Some(RestoreState::Delta { w, r, buf })
                    }
                    Async::NotReady => {
                        self.state = Some(RestoreState::WriteBuf { write, r });
                        return Ok(Async::NotReady);
                    }
                },
                Some(RestoreState::WriteVec { mut write, r, buf }) => match write.poll()? {
                    Async::Ready((w, vec)) => {
                        self.delta.blocks[self.delta_pos] = Block::Literal(vec);
                        self.delta_pos += 1;
                        self.state = Some(RestoreState::Delta { w, r, buf })
                    }
                    Async::NotReady => {
                        self.state = Some(RestoreState::WriteVec { write, r, buf });
                        return Ok(Async::NotReady);
                    }
                },
                None => panic!(""),
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use rand;
    use super::*;
    use rand::Rng;
    use tokio_core::reactor::Core;
    const WINDOW: usize = 32;
    #[test]
    fn basic() {
        for index in 0..10 {
            let source = rand::thread_rng()
                .gen_ascii_chars()
                .take(WINDOW * 10 + 8)
                .collect::<String>();
            let mut modified = source.clone();
            let index = WINDOW * index + 3;
            unsafe {
                modified.as_bytes_mut()[index] =
                    ((source.as_bytes()[index] as usize + 1) & 255) as u8
            }
            let block = [0; WINDOW];
            let source_sig = signature(source.as_bytes(), block).unwrap();
            let comp = compare(&source_sig, modified.as_bytes(), block).unwrap();

            let mut restored = Vec::new();
            let source = std::io::Cursor::new(source.as_bytes());
            restore_seek(&mut restored, source, [0; WINDOW], &comp).unwrap();
            if &restored[..] != modified.as_bytes() {
                for i in 0..10 {
                    let a = &restored[i * WINDOW..(i + 1) * WINDOW];
                    let b = &modified.as_bytes()[i * WINDOW..(i + 1) * WINDOW];
                    println!("{:?}\n{:?}\n", a, b);
                    if a != b {
                        println!(">>>>>>>>");
                    }
                }
                panic!("different");
            }
        }
    }
    #[test]
    fn futures() {
        let source = rand::thread_rng()
            .gen_ascii_chars()
            .take(WINDOW * 10 + 8)
            .collect::<String>();
        let mut modified = source.clone();
        let index = WINDOW + 3;
        unsafe {
            modified.as_bytes_mut()[index] = ((source.as_bytes()[index] as usize + 1) & 255) as u8
        }

        let mut l = Core::new().unwrap();
        let block = [0; WINDOW];
        let source_sig = signature(source.as_bytes(), block).unwrap();
        println!("==================\n");
        let (_, source_sig_) = l.run(signature_fut(source.as_bytes(), block)).unwrap();
        assert_eq!(source_sig, source_sig_);
        println!("{:?} {:?}", source_sig, source_sig_);

        let comp = compare(&source_sig, modified.as_bytes(), block).unwrap();
        let (_, _, comp_) = l.run(compare_fut(source_sig, modified.as_bytes(), block)).unwrap();
        assert_eq!(comp, comp_);
        println!("{:?}", comp);

        let v = Vec::new();
        let (rest_, _, _) = l.run(restore_seek_fut(
            std::io::Cursor::new(v),
            std::io::Cursor::new(source.as_bytes()),
            [0; WINDOW],
            comp,
        )).unwrap();
        assert_eq!(rest_.into_inner().as_slice(), modified.as_bytes());
    }
}