+ use beancount_types::common_keys;
+ pub use csv::ReaderBuilder;
+ pub use csv::StringRecord as Record;
+
+ use beancount_types::Directive;
+ use camino::Utf8Path;
+ use camino::Utf8PathBuf;
+ use delegate::delegate;
+ use miette::Diagnostic;
+ use miette::SourceOffset;
+ use miette::SourceSpan;
+ use snafu::ResultExt as _;
+ use snafu::Snafu;
+ use tap::Pipe as _;
+ use tap::Tap as _;
+ use tap::TapFallible;
+ use time::Date;
+
+ #[derive(Debug, Diagnostic, Snafu)]
+ pub enum Error<E>
+ where
+ E: Diagnostic + Send + Sync + 'static,
+ {
+ #[snafu(display("error(s) while extracting data from records"))]
+ Extracting {
+ #[related]
+ errors: Vec<RecordError<E>>,
+
+ #[source_code]
+ data: String,
+ },
+ #[snafu(display("error while parsing CSV record"))]
+ Parsing { source: csv::Error },
+ #[snafu(display("error while reading CSV file {file}"))]
+ Reading {
+ file: Utf8PathBuf,
+
+ source: std::io::Error,
+ },
+ }
+
+ #[derive(Debug, Default)]
+ pub struct Importer {
+ builder: ReaderBuilder,
+ }
+
+ impl Importer {
+ pub fn new(builder: ReaderBuilder) -> Self {
+ Self { builder }
+ }
+
+ pub fn semicolon_delimited() -> Self {
+ ReaderBuilder::new()
+ .tap_mut(|builder| {
+ builder.delimiter(b';');
+ })
+ .pipe(Self::new)
+ }
+ }
+
+ impl Importer {
+ pub fn date<R>(&self, file: &Utf8Path, inner: R) -> Option<Result<Date, Error<R::Error>>>
+ where
+ R: RecordImporter,
+ {
+ self.fold(
+ file,
+ None,
+ |record| inner.date(record).transpose(),
+ |date, extracted| {
+ if let Some(new_date) = extracted {
+ let date = date.get_or_insert(new_date);
+ *date = Date::max(*date, new_date);
+ };
+ },
+ )
+ .transpose()
+ }
+
+ pub fn extract<R>(
+ &self,
+ file: &Utf8Path,
+ existing: &[Directive],
+ inner: R,
+ ) -> Result<Vec<Directive>, Error<R::Error>>
+ where
+ R: RecordImporter,
+ {
+ self.fold(
+ file,
+ Vec::new(),
+ |record| {
+ let position = record.position().expect("position is set by csv::Reader");
+
+ inner.extract(existing, record).tap_ok_mut(|records| {
+ records.iter_mut().for_each(|directive| {
+ directive
+ .add_meta(common_keys::FILENAME, file.as_str())
+ .add_meta(common_keys::LINE_NUMBER, position.line());
+ });
+ })
+ },
+ |directives, extracted| {
+ directives.extend(extracted);
+ },
+ )
+ }
+
+ pub fn identify(&self, file: &Utf8Path, expected_headers: &[&str]) -> Result<bool, csv::Error> {
+ if !matches!(file.extension(), Some(ext) if ext.eq_ignore_ascii_case("csv")) {
+ return Ok(false);
+ }
+
+ let mut reader = self.builder.from_path(file)?;
+ let headers = reader.headers()?;
+
+ Ok(headers == expected_headers)
+ }
+ }
+
+ impl Importer {
+ fn fold<State, Err>(
+ &self,
+ file: &Utf8Path,
+ mut state: State,
+ extractor: impl Fn(&Record) -> Result<State, Err>,
+ folder: impl Fn(&mut State, State),
+ ) -> Result<State, Error<Err>>
+ where
+ Err: Diagnostic + Send + Sync,
+ {
+ let data = std::fs::read_to_string(file).context(ReadingSnafu { file })?;
+
+ let mut reader = self.builder.from_reader(data.as_bytes());
+
+ let mut errors = Vec::new();
+
+ {
+ let mut record = Record::new();
+ while reader.read_record(&mut record).context(ParsingSnafu {})? {
+ let position = record.position().expect("position is set by csv::Reader");
+ let offset = SourceOffset::from((position.byte() + 1) as usize);
+
+ match extractor(&record).context(RecordSnafu { offset }) {
+ Ok(new_state) if errors.is_empty() => {
+ folder(&mut state, new_state);
+ }
+
+ // we can skip updating our state when we already have at least one error
+ Ok(_) => {}
+
+ Err(error) => {
+ errors.push(error);
+ }
+ }
+ }
+ }
+
+ if errors.is_empty() {
+ Ok(state)
+ } else {
+ ExtractingSnafu { errors, data }.fail()
+ }
+ }
+ }
+
+ #[derive(Debug, Diagnostic, Snafu)]
+ #[snafu(display("encountered error while extracting record"))]
+ pub struct RecordError<E>
+ where
+ E: Diagnostic + 'static,
+ {
+ #[diagnostic_source]
+ source: E,
+
+ #[label("in this record")]
+ offset: SourceSpan,
+ }
+
+ pub trait RecordImporter {
+ type Error: Diagnostic + Send + Sync + 'static;
+
+ fn date(&self, _record: &Record) -> Option<Result<Date, Self::Error>> {
+ None
+ }
+
+ fn extract(
+ &self,
+ existing: &[Directive],
+ record: &Record,
+ ) -> Result<Vec<Directive>, Self::Error>;
+ }
+
+ impl<R> RecordImporter for &R
+ where
+ R: RecordImporter,
+ {
+ type Error = R::Error;
+
+ delegate! {
+ to (*self) {
+ fn date(&self, _record: &Record) -> Option<Result<Date, Self::Error>>;
+ fn extract(
+ &self,
+ existing: &[Directive],
+ record: &Record,
+ ) -> Result<Vec<Directive>, Self::Error>;
+
+ }
+ }
+ }