import_segments_db.go
package imp
import (
"context"
"fmt"
"path/filepath"
"strings"
"skraak/datafile"
"skraak/utils"
)
// dataFileUpdate holds data to write back to .data file after import
type dataFileUpdate struct {
DataPath string
WavHash string
LabelIDs map[int]map[int]string // segmentIndex -> labelIndex -> labelID
}
// importLabelResult holds the result of importing a single label.
type importLabelResult struct {
labelImport LabelImport
labelID string
subtypesImported int
err ImportSegmentError
hasError bool
}
// resolvedLabelIDs holds the resolved database IDs for a label.
type resolvedLabelIDs struct {
speciesID string
filterID string
labelID string
dbSpecies string
}
// resolveLabelIDs looks up species and filter IDs, generates a label ID.
// Returns an error if any lookup fails.
func resolveLabelIDs(
label *datafile.Label,
sf scannedDataFile,
mapping MappingFile,
filterIDMap map[string]string,
speciesIDMap map[string]string,
) (resolvedLabelIDs, error) {
dbSpecies, ok := mapping.GetDBSpecies(label.Species)
if !ok {
return resolvedLabelIDs{}, fmt.Errorf("species not found in mapping: %s", label.Species)
}
speciesID, ok := speciesIDMap[dbSpecies]
if !ok {
return resolvedLabelIDs{}, fmt.Errorf("species ID not found: %s", dbSpecies)
}
filterID, ok := filterIDMap[label.Filter]
if !ok {
return resolvedLabelIDs{}, fmt.Errorf("filter ID not found: %s", label.Filter)
}
labelID, err := utils.GenerateLongID()
if err != nil {
return resolvedLabelIDs{}, fmt.Errorf("failed to generate label ID: %w", err)
}
return resolvedLabelIDs{
speciesID: speciesID,
filterID: filterID,
labelID: labelID,
dbSpecies: dbSpecies,
}, nil
}
// importSingleLabel inserts a single label and its metadata/subtype into the DB.
func importSingleLabel(
ctx context.Context,
m Mutator,
label *datafile.Label,
segmentID string,
segIdx, labelIdx int,
sf scannedDataFile,
mapping MappingFile,
filterIDMap map[string]string,
speciesIDMap map[string]string,
calltypeIDMap map[string]map[string]string,
) importLabelResult {
// Resolve all IDs first
ids, err := resolveLabelIDs(label, sf, mapping, filterIDMap, speciesIDMap)
if err != nil {
return importLabelResult{err: ImportSegmentError{
File: filepath.Base(sf.DataPath), Stage: StageImport,
Message: err.Error(),
}, hasError: true}
}
// Insert the label
if err := insertLabel(ctx, m, ids, segmentID, label); err != nil {
return importLabelResult{err: ImportSegmentError{
File: filepath.Base(sf.DataPath), Stage: StageImport,
Message: err.Error(),
}, hasError: true}
}
// Insert label_metadata if comment exists
if label.Comment != "" {
if err := insertLabelMetadata(ctx, m, ids.labelID, label.Comment); err != nil {
return importLabelResult{err: ImportSegmentError{
File: filepath.Base(sf.DataPath), Stage: StageImport,
Message: err.Error(),
}, hasError: true}
}
}
labelImport := LabelImport{
LabelID: ids.labelID,
Species: ids.dbSpecies,
Filter: label.Filter,
Certainty: label.Certainty,
}
if label.Comment != "" {
labelImport.Comment = label.Comment
}
// Insert label_subtype if calltype exists
if label.CallType != "" {
if ctErr := importCalltype(ctx, m, ids.labelID, label, ids.dbSpecies, ids.filterID, mapping, calltypeIDMap, sf); ctErr != nil {
return importLabelResult{err: *ctErr, hasError: true}
}
labelImport.CallType = mapping.GetDBCalltype(label.Species, label.CallType)
return importLabelResult{labelImport: labelImport, labelID: ids.labelID, subtypesImported: 1}
}
return importLabelResult{labelImport: labelImport, labelID: ids.labelID}
}
// insertLabel inserts a label row into the database.
func insertLabel(ctx context.Context, m Mutator, ids resolvedLabelIDs, segmentID string, label *datafile.Label) error {
_, err := m.ExecContext(ctx, `
INSERT INTO label (id, segment_id, species_id, filter_id, certainty, created_at, last_modified, active)
VALUES (?, ?, ?, ?, ?, now(), now(), true)
`, ids.labelID, segmentID, ids.speciesID, ids.filterID, label.Certainty)
if err != nil {
return fmt.Errorf("failed to insert label: %w", err)
}
return nil
}
// insertLabelMetadata inserts a label_metadata row for a comment.
func insertLabelMetadata(ctx context.Context, m Mutator, labelID, comment string) error {
escapedComment := strings.ReplaceAll(comment, `"`, `\\"`)
metadataJSON := fmt.Sprintf(`{"comment": "%s"}`, escapedComment)
_, err := m.ExecContext(ctx, `
INSERT INTO label_metadata (label_id, json, created_at, last_modified, active)
VALUES (?, ?, now(), now(), true)
`, labelID, metadataJSON)
if err != nil {
return fmt.Errorf("failed to insert label_metadata: %w", err)
}
return nil
}
// importCalltype inserts a label_subtype row for a calltype label.
func importCalltype(
ctx context.Context,
m Mutator,
labelID string,
label *datafile.Label,
dbSpecies string,
filterID string,
mapping MappingFile,
calltypeIDMap map[string]map[string]string,
sf scannedDataFile,
) *ImportSegmentError {
dbCalltype := mapping.GetDBCalltype(label.Species, label.CallType)
calltypeID := ""
if calltypeIDMap[dbSpecies] != nil {
calltypeID = calltypeIDMap[dbSpecies][dbCalltype]
}
if calltypeID == "" {
return &ImportSegmentError{
File: filepath.Base(sf.DataPath), Stage: StageImport,
Message: fmt.Sprintf("calltype ID not found: %s/%s", dbSpecies, dbCalltype),
}
}
subtypeID, err := utils.GenerateLongID()
if err != nil {
return &ImportSegmentError{
File: filepath.Base(sf.DataPath), Stage: StageImport,
Message: fmt.Sprintf("failed to generate label_subtype ID: %v", err),
}
}
_, err = m.ExecContext(ctx, `
INSERT INTO label_subtype (id, label_id, calltype_id, filter_id, certainty, created_at, last_modified, active)
VALUES (?, ?, ?, ?, ?, now(), now(), true)
`, subtypeID, labelID, calltypeID, filterID, label.Certainty)
if err != nil {
return &ImportSegmentError{
File: filepath.Base(sf.DataPath), Stage: StageImport,
Message: fmt.Sprintf("failed to insert label_subtype: %v", err),
}
}
return nil
}
// importSegmentsIntoDB performs the transactional import using the provided mutator.
func importSegmentsIntoDB(
ctx context.Context,
m Mutator,
fileIDMap map[string]scannedDataFile,
scannedFiles []scannedDataFile,
mapping MappingFile,
filterIDMap map[string]string,
speciesIDMap map[string]string,
calltypeIDMap map[string]map[string]string,
datasetID string,
progressHandler func(processed, total int, message string),
) ([]SegmentImport, int, int, []dataFileUpdate, []ImportSegmentError) {
var importedSegments []SegmentImport
var errors []ImportSegmentError
importedLabels := 0
importedSubtypes := 0
var fileUpdates []dataFileUpdate
totalFiles := len(fileIDMap)
processedFiles := 0
for _, sf := range fileIDMap {
if sf.FileID == "" {
continue
}
processedFiles++
if progressHandler != nil {
progressHandler(processedFiles, totalFiles, filepath.Base(sf.DataPath))
}
fileUpdate := dataFileUpdate{
DataPath: sf.DataPath,
WavHash: sf.WavHash,
LabelIDs: make(map[int]map[int]string),
}
for segIdx, seg := range sf.Segments {
segImp, labelIDs, subtypes, segErrs := importSegment(ctx, m, seg, segIdx, sf, datasetID, mapping, filterIDMap, speciesIDMap, calltypeIDMap)
errors = append(errors, segErrs...)
importedSubtypes += subtypes
if len(segImp.Labels) == 0 {
// Delete orphaned segment (no labels succeeded)
if _, err := m.ExecContext(ctx, `DELETE FROM segment WHERE id = ?`, segImp.SegmentID); err != nil {
errors = append(errors, ImportSegmentError{
File: filepath.Base(sf.DataPath), Stage: StageImport,
Message: fmt.Sprintf("failed to delete orphaned segment: %v", err),
})
}
} else {
importedSegments = append(importedSegments, segImp)
importedLabels += len(labelIDs)
fileUpdate.LabelIDs[segIdx] = labelIDs
}
}
fileUpdates = append(fileUpdates, fileUpdate)
}
return importedSegments, importedLabels, importedSubtypes, fileUpdates, errors
}
// importSegment inserts a single segment and its labels into the DB.
func importSegment(
ctx context.Context,
m Mutator,
seg *datafile.Segment,
segIdx int,
sf scannedDataFile,
datasetID string,
mapping MappingFile,
filterIDMap map[string]string,
speciesIDMap map[string]string,
calltypeIDMap map[string]map[string]string,
) (SegmentImport, map[int]string, int, []ImportSegmentError) {
var errors []ImportSegmentError
if seg.StartTime >= seg.EndTime {
errors = append(errors, ImportSegmentError{
File: filepath.Base(sf.DataPath), Stage: StageImport,
Message: fmt.Sprintf("invalid segment bounds: start=%.2f >= end=%.2f", seg.StartTime, seg.EndTime),
})
return SegmentImport{}, nil, 0, errors
}
if seg.EndTime > sf.Duration {
errors = append(errors, ImportSegmentError{
File: filepath.Base(sf.DataPath), Stage: StageImport,
Message: fmt.Sprintf("segment end time (%.2f) exceeds file duration (%.2f)", seg.EndTime, sf.Duration),
})
return SegmentImport{}, nil, 0, errors
}
segmentID, err := utils.GenerateLongID()
if err != nil {
errors = append(errors, ImportSegmentError{
File: filepath.Base(sf.DataPath), Stage: StageImport,
Message: fmt.Sprintf("failed to generate segment ID: %v", err),
})
return SegmentImport{}, nil, 0, errors
}
_, err = m.ExecContext(ctx, `
INSERT INTO segment (id, file_id, dataset_id, start_time, end_time, freq_low, freq_high, created_at, last_modified, active)
VALUES (?, ?, ?, ?, ?, ?, ?, now(), now(), true)
`, segmentID, sf.FileID, datasetID, seg.StartTime, seg.EndTime, seg.FreqLow, seg.FreqHigh)
if err != nil {
errors = append(errors, ImportSegmentError{
File: filepath.Base(sf.DataPath), Stage: StageImport,
Message: fmt.Sprintf("failed to insert segment: %v", err),
})
return SegmentImport{}, nil, 0, errors
}
segImport := SegmentImport{
SegmentID: segmentID,
FileName: filepath.Base(sf.WavPath),
StartTime: seg.StartTime,
EndTime: seg.EndTime,
FreqLow: seg.FreqLow,
FreqHigh: seg.FreqHigh,
Labels: make([]LabelImport, 0),
}
labelIDs := make(map[int]string)
var subtypesImported int
for labelIdx, label := range seg.Labels {
result := importSingleLabel(ctx, m, label, segmentID, segIdx, labelIdx, sf, mapping, filterIDMap, speciesIDMap, calltypeIDMap)
if result.hasError {
errors = append(errors, result.err)
continue
}
labelIDs[labelIdx] = result.labelID
segImport.Labels = append(segImport.Labels, result.labelImport)
subtypesImported += result.subtypesImported
}
return segImport, labelIDs, subtypesImported, errors
}
// writeIDsToDataFiles writes skraak_hash and skraak_label_ids back to .data files
func writeIDsToDataFiles(fileUpdates []dataFileUpdate) []ImportSegmentError {
var errors []ImportSegmentError
for _, fu := range fileUpdates {
// Parse the .data file
df, err := datafile.ParseDataFile(fu.DataPath)
if err != nil {
errors = append(errors, ImportSegmentError{
File: filepath.Base(fu.DataPath),
Stage: StageImport,
Message: fmt.Sprintf("failed to re-parse .data file for writing: %v", err),
})
continue
}
// Write skraak_hash to metadata
if df.Meta.Extra == nil {
df.Meta.Extra = make(map[string]any)
}
df.Meta.Extra["skraak_hash"] = fu.WavHash
// Write skraak_label_id to each label
for segIdx, labelIDs := range fu.LabelIDs {
if segIdx >= len(df.Segments) {
continue
}
seg := df.Segments[segIdx]
for labelIdx, labelID := range labelIDs {
if labelIdx >= len(seg.Labels) {
continue
}
label := seg.Labels[labelIdx]
if label.Extra == nil {
label.Extra = make(map[string]any)
}
label.Extra["skraak_label_id"] = labelID
}
}
// Write the updated .data file
if err := df.Write(fu.DataPath); err != nil {
errors = append(errors, ImportSegmentError{
File: filepath.Base(fu.DataPath),
Stage: StageImport,
Message: fmt.Sprintf("failed to write updated .data file: %v", err),
})
continue
}
}
return errors
}