Add a importer component for CSV files

korrat
Jun 16, 2023, 3:21 PM
GVEI7KNDSO5KHFV5GIT2FLNIF7I3UQNH6KHUDM4QLUSWBSMDJO6AC

Dependencies

  • [2] KB7Y4PJI Implement importers for Amazon accounts

Change contents

  • file addition: csv (d--r------)
    [2.1]
  • file addition: src (d--r------)
    [0.1]
  • file addition: lib.rs (---r------)
    [0.18]
    use beancount_types::common_keys;
    pub use csv::ReaderBuilder;
    pub use csv::StringRecord as Record;
    use beancount_types::Directive;
    use camino::Utf8Path;
    use camino::Utf8PathBuf;
    use delegate::delegate;
    use miette::Diagnostic;
    use miette::SourceOffset;
    use miette::SourceSpan;
    use snafu::ResultExt as _;
    use snafu::Snafu;
    use tap::Pipe as _;
    use tap::Tap as _;
    use tap::TapFallible;
    use time::Date;
    #[derive(Debug, Diagnostic, Snafu)]
    pub enum Error<E>
    where
    E: Diagnostic + Send + Sync + 'static,
    {
    #[snafu(display("error(s) while extracting data from records"))]
    Extracting {
    #[related]
    errors: Vec<RecordError<E>>,
    #[source_code]
    data: String,
    },
    #[snafu(display("error while parsing CSV record"))]
    Parsing { source: csv::Error },
    #[snafu(display("error while reading CSV file {file}"))]
    Reading {
    file: Utf8PathBuf,
    source: std::io::Error,
    },
    }
    #[derive(Debug, Default)]
    pub struct Importer {
    builder: ReaderBuilder,
    }
    impl Importer {
    pub fn new(builder: ReaderBuilder) -> Self {
    Self { builder }
    }
    pub fn semicolon_delimited() -> Self {
    ReaderBuilder::new()
    .tap_mut(|builder| {
    builder.delimiter(b';');
    })
    .pipe(Self::new)
    }
    }
    impl Importer {
    pub fn date<R>(&self, file: &Utf8Path, inner: R) -> Option<Result<Date, Error<R::Error>>>
    where
    R: RecordImporter,
    {
    self.fold(
    file,
    None,
    |record| inner.date(record).transpose(),
    |date, extracted| {
    if let Some(new_date) = extracted {
    let date = date.get_or_insert(new_date);
    *date = Date::max(*date, new_date);
    };
    },
    )
    .transpose()
    }
    pub fn extract<R>(
    &self,
    file: &Utf8Path,
    existing: &[Directive],
    inner: R,
    ) -> Result<Vec<Directive>, Error<R::Error>>
    where
    R: RecordImporter,
    {
    self.fold(
    file,
    Vec::new(),
    |record| {
    let position = record.position().expect("position is set by csv::Reader");
    inner.extract(existing, record).tap_ok_mut(|records| {
    records.iter_mut().for_each(|directive| {
    directive
    .add_meta(common_keys::FILENAME, file.as_str())
    .add_meta(common_keys::LINE_NUMBER, position.line());
    });
    })
    },
    |directives, extracted| {
    directives.extend(extracted);
    },
    )
    }
    pub fn identify(&self, file: &Utf8Path, expected_headers: &[&str]) -> Result<bool, csv::Error> {
    if !matches!(file.extension(), Some(ext) if ext.eq_ignore_ascii_case("csv")) {
    return Ok(false);
    }
    let mut reader = self.builder.from_path(file)?;
    let headers = reader.headers()?;
    Ok(headers == expected_headers)
    }
    }
    impl Importer {
    fn fold<State, Err>(
    &self,
    file: &Utf8Path,
    mut state: State,
    extractor: impl Fn(&Record) -> Result<State, Err>,
    folder: impl Fn(&mut State, State),
    ) -> Result<State, Error<Err>>
    where
    Err: Diagnostic + Send + Sync,
    {
    let data = std::fs::read_to_string(file).context(ReadingSnafu { file })?;
    let mut reader = self.builder.from_reader(data.as_bytes());
    let mut errors = Vec::new();
    {
    let mut record = Record::new();
    while reader.read_record(&mut record).context(ParsingSnafu {})? {
    let position = record.position().expect("position is set by csv::Reader");
    let offset = SourceOffset::from((position.byte() + 1) as usize);
    match extractor(&record).context(RecordSnafu { offset }) {
    Ok(new_state) if errors.is_empty() => {
    folder(&mut state, new_state);
    }
    // we can skip updating our state when we already have at least one error
    Ok(_) => {}
    Err(error) => {
    errors.push(error);
    }
    }
    }
    }
    if errors.is_empty() {
    Ok(state)
    } else {
    ExtractingSnafu { errors, data }.fail()
    }
    }
    }
    #[derive(Debug, Diagnostic, Snafu)]
    #[snafu(display("encountered error while extracting record"))]
    pub struct RecordError<E>
    where
    E: Diagnostic + 'static,
    {
    #[diagnostic_source]
    source: E,
    #[label("in this record")]
    offset: SourceSpan,
    }
    pub trait RecordImporter {
    type Error: Diagnostic + Send + Sync + 'static;
    fn date(&self, _record: &Record) -> Option<Result<Date, Self::Error>> {
    None
    }
    fn extract(
    &self,
    existing: &[Directive],
    record: &Record,
    ) -> Result<Vec<Directive>, Self::Error>;
    }
    impl<R> RecordImporter for &R
    where
    R: RecordImporter,
    {
    type Error = R::Error;
    delegate! {
    to (*self) {
    fn date(&self, _record: &Record) -> Option<Result<Date, Self::Error>>;
    fn extract(
    &self,
    existing: &[Directive],
    record: &Record,
    ) -> Result<Vec<Directive>, Self::Error>;
    }
    }
    }
  • file addition: Cargo.toml (---r------)
    [0.1]
    [package]
    name = "csv"
    authors.workspace = true
    edition.workspace = true
    publish.workspace = true
    rust-version.workspace = true
    version.workspace = true
    [dependencies]
    # Workspace dependencies
    beancount-importers-framework.path = "../../framework"
    beancount-types.path = "../../common/beancount-types"
    german-decimal.path = "../../common/german-decimal"
    # Inherited dependencies
    camino.workspace = true
    csv.workspace = true
    delegate.workspace = true
    itertools.workspace = true
    miette.workspace = true
    serde.workspace = true
    snafu.workspace = true
    tap.workspace = true
    time.workspace = true