lib.rs
use beancount_types::common_keys;
pub use csv::ReaderBuilder;
pub use csv::StringRecord as Record;
use beancount_types::Directive;
use delegate::delegate;
use miette::Diagnostic;
use miette::SourceOffset;
use miette::SourceSpan;
use serde::Deserialize;
use snafu::ResultExt as _;
use snafu::Snafu;
use tap::Pipe as _;
use tap::Tap as _;
use time::Date;
#[derive(Debug, Diagnostic, Snafu)]
pub enum Error<E>
where
E: Diagnostic + Send + Sync + 'static,
{
#[snafu(display("error(s) while processing CSV records"))]
Processing {
#[related]
errors: Vec<RecordError<E>>,
#[source_code]
data: Vec<u8>,
},
#[snafu(display("error while parsing CSV file"))]
Parsing { source: csv::Error },
}
#[derive(Debug)]
pub struct Importer {
builder: ReaderBuilder,
pad_records: bool,
}
impl Default for Importer {
fn default() -> Self {
ReaderBuilder::new()
.tap_mut(|builder| {
builder.trim(csv::Trim::Fields);
})
.pipe(Self::new)
}
}
impl Importer {
pub fn new(builder: ReaderBuilder) -> Self {
Self {
builder,
pad_records: false,
}
}
pub fn semicolon_delimited() -> Self {
ReaderBuilder::new()
.tap_mut(|builder| {
builder.delimiter(b';').trim(csv::Trim::Fields);
})
.pipe(Self::new)
}
}
impl Importer {
pub fn pad_records(&mut self, yes: bool) -> &mut Self {
self.pad_records = yes;
self
}
}
impl Importer {
pub fn date<R>(&self, buffer: &[u8], inner: R) -> Option<Result<Date, Error<R::Error>>>
where
R: RecordImporter,
{
(|| {
let mut date = None;
let mut reader = self.builder.from_reader(buffer);
let headers = reader.headers().context(ParsingSnafu {})?.clone();
let mut errors = Vec::new();
let mut record = Record::new();
while reader.read_record(&mut record).context(ParsingSnafu {})? {
let new_date = self
.deserialize_record::<R>(&mut record, &headers)
.map(|record| inner.date(record));
match new_date {
Ok(new_date) if errors.is_empty() => {
let date = date.get_or_insert(new_date);
*date = Date::max(*date, new_date);
}
// we can skip updating our state when we already have at least one error
Ok(_) => {}
Err(error) => {
errors.push(error);
}
}
}
if errors.is_empty() {
Ok(date)
} else {
ProcessingSnafu {
errors,
data: buffer,
}
.fail()
}
})()
.transpose()
}
pub fn extract<R>(
&self,
buffer: &[u8],
existing: &[Directive],
inner: R,
) -> Result<Vec<Directive>, Error<R::Error>>
where
R: RecordImporter,
{
let mut directives = Vec::new();
let mut reader = self.builder.from_reader(buffer);
let headers = reader.headers().context(ParsingSnafu {})?.clone();
let mut errors = Vec::new();
let mut record = Record::new();
while reader.read_record(&mut record).context(ParsingSnafu {})? {
let position = record.position().expect("position is set by csv::Reader");
let offset = SourceOffset::from((position.byte() + 1) as usize);
let record_index = position.record();
let extracted = self
.deserialize_record::<R>(&mut record, &headers)
.and_then(|record| {
inner
.extract(existing, record)
.context(ExtractionSnafu { offset })
});
match extracted {
Ok(extracted) if errors.is_empty() => {
directives.extend(extracted.into_iter().map(|mut directive| {
directive.add_meta(common_keys::IMPORTED_RECORD, record_index);
directive
}));
}
// we can skip updating our state when we already have at least one error
Ok(_) => {}
Err(error) => {
errors.push(error);
}
}
}
if errors.is_empty() {
Ok(directives)
} else {
ProcessingSnafu {
errors,
data: buffer,
}
.fail()
}
}
pub fn identify(&self, buffer: &[u8], expected_headers: &[&str]) -> Result<bool, csv::Error> {
let mut reader = self.builder.from_reader(buffer);
let headers = reader.headers()?;
Ok(headers == expected_headers)
}
}
impl Importer {
fn deserialize_record<'de, 'h: 'de, R>(
&self,
record: &'de mut Record,
headers: &'h Record,
) -> Result<R::Record<'de>, RecordError<R::Error>>
where
R: RecordImporter,
{
record.extend(core::iter::repeat("").take(headers.len() - record.len()));
let position = record.position().expect("position is set by csv::Reader");
let offset = SourceOffset::from((position.byte() + 1) as usize);
record
.deserialize(Some(headers))
.context(DeserializationSnafu { offset })
}
}
#[derive(Debug, Diagnostic, Snafu)]
pub enum RecordError<E>
where
E: Diagnostic + 'static,
{
#[snafu(display("error while deserializing CSV record"))]
Deserialization {
source: csv::Error,
#[label("in this record")]
offset: SourceSpan,
},
#[snafu(display("error while extracting record"))]
Extraction {
#[diagnostic_source]
source: E,
#[label("in this record")]
offset: SourceSpan,
},
}
pub trait RecordImporter {
type Error: Diagnostic + Send + Sync + 'static;
type Record<'de>: Deserialize<'de>;
fn date(&self, _record: Self::Record<'_>) -> Date;
fn extract(
&self,
existing: &[Directive],
record: Self::Record<'_>,
) -> Result<Vec<Directive>, Self::Error>;
}
impl<R> RecordImporter for &R
where
R: RecordImporter,
{
type Error = R::Error;
type Record<'de> = R::Record<'de>;
delegate! {
to (*self) {
fn date(&self, record: Self::Record<'_>) -> Date;
fn extract(
&self,
existing: &[Directive],
record: Self::Record<'_>,
) -> Result<Vec<Directive>, Self::Error>;
}
}
}