Fork channel

Create a new channel as a copy of main.

Rename channel

Rename main to:

Delete channel

Delete main? This cannot be undone.

lib.rs
use beancount_types::common_keys;
pub use csv::ReaderBuilder;
pub use csv::StringRecord as Record;

use beancount_types::Directive;

use delegate::delegate;
use miette::Diagnostic;
use miette::SourceOffset;
use miette::SourceSpan;
use serde::Deserialize;
use snafu::ResultExt as _;
use snafu::Snafu;
use tap::Pipe as _;
use tap::Tap as _;
use time::Date;

#[derive(Debug, Diagnostic, Snafu)]
pub enum Error<E>
where
    E: Diagnostic + Send + Sync + 'static,
{
    #[snafu(display("error(s) while processing CSV records"))]
    Processing {
        #[related]
        errors: Vec<RecordError<E>>,

        #[source_code]
        data: Vec<u8>,
    },
    #[snafu(display("error while parsing CSV file"))]
    Parsing { source: csv::Error },
}

#[derive(Debug)]
pub struct Importer {
    builder: ReaderBuilder,

    pad_records: bool,
}

impl Default for Importer {
    fn default() -> Self {
        ReaderBuilder::new()
            .tap_mut(|builder| {
                builder.trim(csv::Trim::Fields);
            })
            .pipe(Self::new)
    }
}

impl Importer {
    pub fn new(builder: ReaderBuilder) -> Self {
        Self {
            builder,
            pad_records: false,
        }
    }

    pub fn semicolon_delimited() -> Self {
        ReaderBuilder::new()
            .tap_mut(|builder| {
                builder.delimiter(b';').trim(csv::Trim::Fields);
            })
            .pipe(Self::new)
    }
}

impl Importer {
    pub fn pad_records(&mut self, yes: bool) -> &mut Self {
        self.pad_records = yes;
        self
    }
}

impl Importer {
    pub fn date<R>(&self, buffer: &[u8], inner: R) -> Option<Result<Date, Error<R::Error>>>
    where
        R: RecordImporter,
    {
        (|| {
            let mut date = None;

            let mut reader = self.builder.from_reader(buffer);
            let headers = reader.headers().context(ParsingSnafu {})?.clone();

            let mut errors = Vec::new();

            let mut record = Record::new();
            while reader.read_record(&mut record).context(ParsingSnafu {})? {
                let new_date = self
                    .deserialize_record::<R>(&mut record, &headers)
                    .map(|record| inner.date(record));
                match new_date {
                    Ok(new_date) if errors.is_empty() => {
                        let date = date.get_or_insert(new_date);
                        *date = Date::max(*date, new_date);
                    }

                    // we can skip updating our state when we already have at least one error
                    Ok(_) => {}

                    Err(error) => {
                        errors.push(error);
                    }
                }
            }

            if errors.is_empty() {
                Ok(date)
            } else {
                ProcessingSnafu {
                    errors,
                    data: buffer,
                }
                .fail()
            }
        })()
        .transpose()
    }

    pub fn extract<R>(
        &self,
        buffer: &[u8],
        existing: &[Directive],
        inner: R,
    ) -> Result<Vec<Directive>, Error<R::Error>>
    where
        R: RecordImporter,
    {
        let mut directives = Vec::new();
        let mut reader = self.builder.from_reader(buffer);
        let headers = reader.headers().context(ParsingSnafu {})?.clone();

        let mut errors = Vec::new();

        let mut record = Record::new();
        while reader.read_record(&mut record).context(ParsingSnafu {})? {
            let position = record.position().expect("position is set by csv::Reader");
            let offset = SourceOffset::from((position.byte() + 1) as usize);
            let record_index = position.record();

            let extracted = self
                .deserialize_record::<R>(&mut record, &headers)
                .and_then(|record| {
                    inner
                        .extract(existing, record)
                        .context(ExtractionSnafu { offset })
                });
            match extracted {
                Ok(extracted) if errors.is_empty() => {
                    directives.extend(extracted.into_iter().map(|mut directive| {
                        directive.add_meta(common_keys::IMPORTED_RECORD, record_index);
                        directive
                    }));
                }

                // we can skip updating our state when we already have at least one error
                Ok(_) => {}

                Err(error) => {
                    errors.push(error);
                }
            }
        }

        if errors.is_empty() {
            Ok(directives)
        } else {
            ProcessingSnafu {
                errors,
                data: buffer,
            }
            .fail()
        }
    }

    pub fn identify(&self, buffer: &[u8], expected_headers: &[&str]) -> Result<bool, csv::Error> {
        let mut reader = self.builder.from_reader(buffer);
        let headers = reader.headers()?;

        Ok(headers == expected_headers)
    }
}

impl Importer {
    fn deserialize_record<'de, 'h: 'de, R>(
        &self,
        record: &'de mut Record,
        headers: &'h Record,
    ) -> Result<R::Record<'de>, RecordError<R::Error>>
    where
        R: RecordImporter,
    {
        record.extend(core::iter::repeat("").take(headers.len() - record.len()));

        let position = record.position().expect("position is set by csv::Reader");
        let offset = SourceOffset::from((position.byte() + 1) as usize);

        record
            .deserialize(Some(headers))
            .context(DeserializationSnafu { offset })
    }
}

#[derive(Debug, Diagnostic, Snafu)]
pub enum RecordError<E>
where
    E: Diagnostic + 'static,
{
    #[snafu(display("error while deserializing CSV record"))]
    Deserialization {
        source: csv::Error,

        #[label("in this record")]
        offset: SourceSpan,
    },

    #[snafu(display("error while extracting record"))]
    Extraction {
        #[diagnostic_source]
        source: E,

        #[label("in this record")]
        offset: SourceSpan,
    },
}

pub trait RecordImporter {
    type Error: Diagnostic + Send + Sync + 'static;
    type Record<'de>: Deserialize<'de>;

    fn date(&self, _record: Self::Record<'_>) -> Date;

    fn extract(
        &self,
        existing: &[Directive],
        record: Self::Record<'_>,
    ) -> Result<Vec<Directive>, Self::Error>;
}

impl<R> RecordImporter for &R
where
    R: RecordImporter,
{
    type Error = R::Error;
    type Record<'de> = R::Record<'de>;

    delegate! {
        to (*self) {
            fn date(&self, record: Self::Record<'_>) -> Date;
            fn extract(
                &self,
                existing: &[Directive],
                record: Self::Record<'_>,
            ) -> Result<Vec<Directive>, Self::Error>;

        }
    }
}