RDJ6UCAG2OHWUKWQUNMDPC4ZJRIO53BATIM5AE5FOKMHWZSGQ4XAC LHZQOX64RAXIJIHFJ5RN5TFJ4VFRRM42W2ZT7MRCNQ2AJVA4BSSAC 3DVPQOKB6BX63XSBIYYCPWBL2RBG3LXZS3XPQBANJP2FWVRAOVZQC NQPVZ3PPQG6EPTTAEHXOXXGK27HZCISHZCOZU6K6RKWTRTOHMY6QC TSOJUMHVLPASHBAVCTUK6WSGZOSBDZIC47FYILGQ2QAU7Z4BUZMAC V2HX6HEB2OBNI4IMWD5XJN3RKAZYHAFJAJAPFP3BFYFZVZVEYN6AC ZCCQ4P5T2AMJAPBDWZVHXIUKLI5U2E5GNDXRCWXEOJQRWPSJJFEQC package impimport ("context""database/sql""fmt""os""path/filepath""strings""skraak/datafile""skraak/db""skraak/utils")// validateAndPrepareSegments performs phases B+C: parse data files, validate DB state, and prepare ID maps.func validateAndPrepareSegments(database *sql.DB,input ImportSegmentsInput,mapping MappingFile,dataFiles []string,) (*segmentValidation, []ImportSegmentError, error) {// Phase B: Parse all .data files and collect unique valuesscannedFiles, parseErrors, uniqueFilters, uniqueSpecies, uniqueCalltypes := scanAllDataFiles(dataFiles, input.Folder)if len(scannedFiles) == 0 {return nil, parseErrors, nil}// Validate dataset/location/cluster hierarchyif err := validateSegmentHierarchy(database, input.DatasetID, input.LocationID, input.ClusterID); err != nil {return nil, parseErrors, err}// Validate all filters existfilterIDMap, err := validateFiltersExist(database, uniqueFilters)if err != nil {return nil, parseErrors, fmt.Errorf("filter validation failed: %w", err)}// Validate mapping covers all species/calltypes and they exist in DBvalidationResult, err := ValidateMappingAgainstDB(database, mapping, uniqueSpecies, uniqueCalltypes)if err != nil {return nil, parseErrors, fmt.Errorf("mapping validation failed: %w", err)}if validationResult.HasErrors() {return nil, parseErrors, fmt.Errorf("mapping validation failed: %s", validationResult.Error())}// Load species and calltype ID mapsspeciesIDMap, calltypeIDMap, err := loadSpeciesCalltypeIDs(database, mapping, uniqueSpecies, uniqueCalltypes)if err != nil {return nil, parseErrors, fmt.Errorf("failed to load species/calltype IDs: %w", err)}// Validate files: hash exists, linked to dataset, no existing labelsfileIDMap, hashErrors := validateAndMapFiles(database, scannedFiles, input.ClusterID, input.DatasetID)allErrors := append(parseErrors, hashErrors...)return &segmentValidation{scannedFiles: scannedFiles,filterIDMap: filterIDMap,speciesIDMap: speciesIDMap,calltypeIDMap: calltypeIDMap,fileIDMap: fileIDMap,}, allErrors, nil}// validateSegmentImportInput validates input parametersfunc validateSegmentImportInput(input ImportSegmentsInput) error {// Validate folder existsif info, err := os.Stat(input.Folder); err != nil {return fmt.Errorf("folder does not exist: %s", input.Folder)} else if !info.IsDir() {return fmt.Errorf("path is not a folder: %s", input.Folder)}// Validate mapping file existsif _, err := os.Stat(input.Mapping); err != nil {return fmt.Errorf("mapping file does not exist: %s", input.Mapping)}// Validate IDsif err := utils.ValidateShortID(input.DatasetID, "dataset_id"); err != nil {return err}if err := utils.ValidateShortID(input.LocationID, "location_id"); err != nil {return err}if err := utils.ValidateShortID(input.ClusterID, "cluster_id"); err != nil {return err}return nil}// validateSegmentHierarchy validates dataset/location/cluster relationshipsfunc validateSegmentHierarchy(q db.Querier, datasetID, locationID, clusterID string) error {if err := db.ValidateDatasetTypeForImport(q, datasetID); err != nil {return err}if err := db.ValidateLocationBelongsToDataset(q, locationID, datasetID); err != nil {return err}if err := db.ClusterBelongsToLocation(q, clusterID, locationID); err != nil {return err}return nil}// scanAllDataFiles parses all .data files and collects unique valuesfunc scanAllDataFiles(dataFiles []string, folder string) ([]scannedDataFile,[]ImportSegmentError,map[string]bool,map[string]bool,map[string]map[string]bool,) {var scanned []scannedDataFilevar errors []ImportSegmentErroruniqueFilters := make(map[string]bool)uniqueSpecies := make(map[string]bool)uniqueCalltypes := make(map[string]map[string]bool) // species -> calltype -> truefor _, dataPath := range dataFiles {// Find corresponding WAV filewavPath := strings.TrimSuffix(dataPath, ".data")if _, err := os.Stat(wavPath); err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(dataPath),Stage: StageValidation,Message: fmt.Sprintf("corresponding WAV file not found: %s", filepath.Base(wavPath)),})continue}// Parse .data filedf, err := datafile.ParseDataFile(dataPath)if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(dataPath),Stage: StageValidation,Message: fmt.Sprintf("failed to parse .data file: %v", err),})continue}// Collect unique filters, species, calltypesfor _, seg := range df.Segments {for _, label := range seg.Labels {uniqueFilters[label.Filter] = trueuniqueSpecies[label.Species] = trueif label.CallType != "" {if uniqueCalltypes[label.Species] == nil {uniqueCalltypes[label.Species] = make(map[string]bool)}uniqueCalltypes[label.Species][label.CallType] = true}}}scanned = append(scanned, scannedDataFile{DataPath: dataPath,WavPath: wavPath,Duration: df.Meta.Duration,Segments: df.Segments,})}return scanned, errors, uniqueFilters, uniqueSpecies, uniqueCalltypes}// validateFiltersExist checks all filters exist in DB and returns ID mapfunc validateFiltersExist(q db.Querier, filterNames map[string]bool) (map[string]string, error) {filterIDMap := make(map[string]string)if len(filterNames) == 0 {return filterIDMap, nil}names := make([]string, 0, len(filterNames))for name := range filterNames {names = append(names, name)}query := `SELECT id, name FROM filter WHERE name IN (` + db.Placeholders(len(names)) + `) AND active = true`args := make([]any, len(names))for i, name := range names {args[i] = name}rows, err := q.QueryContext(context.Background(), query, args...)if err != nil {return nil, fmt.Errorf("failed to query filters: %w", err)}defer rows.Close()for rows.Next() {var id, name stringif err := rows.Scan(&id, &name); err == nil {filterIDMap[name] = id}}// Check for missing filtersvar missing []stringfor name := range filterNames {if _, exists := filterIDMap[name]; !exists {missing = append(missing, name)}}if len(missing) > 0 {return nil, fmt.Errorf("filters not found in database: [%s]", strings.Join(missing, ", "))}return filterIDMap, nil}// loadSpeciesCalltypeIDs loads species and calltype ID mapsfunc loadSpeciesCalltypeIDs(q db.Querier,mapping MappingFile,uniqueSpecies map[string]bool,uniqueCalltypes map[string]map[string]bool,) (map[string]string, map[string]map[string]string, error) {speciesIDMap, err := loadSpeciesIDs(q, mapping, uniqueSpecies)if err != nil {return nil, nil, err}calltypeIDMap, err := loadCalltypeIDs(q, mapping, uniqueCalltypes)if err != nil {return nil, nil, err}return speciesIDMap, calltypeIDMap, nil}// loadSpeciesIDs queries the DB for species IDs matching the mapped species labels.func loadSpeciesIDs(q db.Querier, mapping MappingFile, uniqueSpecies map[string]bool) (map[string]string, error) {speciesIDMap := make(map[string]string)dbSpeciesSet := make(map[string]bool)for dataSpecies := range uniqueSpecies {if dbSpecies, ok := mapping.GetDBSpecies(dataSpecies); ok {dbSpeciesSet[dbSpecies] = true}}if len(dbSpeciesSet) == 0 {return speciesIDMap, nil}dbSpeciesList := make([]string, 0, len(dbSpeciesSet))for s := range dbSpeciesSet {dbSpeciesList = append(dbSpeciesList, s)}query := `SELECT id, label FROM species WHERE label IN (` + db.Placeholders(len(dbSpeciesList)) + `) AND active = true`args := make([]any, len(dbSpeciesList))for i, s := range dbSpeciesList {args[i] = s}rows, err := q.QueryContext(context.Background(), query, args...)if err != nil {return nil, fmt.Errorf("failed to query species: %w", err)}defer rows.Close()for rows.Next() {var id, label stringif err := rows.Scan(&id, &label); err == nil {speciesIDMap[label] = id}}return speciesIDMap, nil}// loadCalltypeIDs queries the DB for calltype IDs matching the mapped calltype labels.func loadCalltypeIDs(q db.Querier, mapping MappingFile, uniqueCalltypes map[string]map[string]bool) (map[string]map[string]string, error) {calltypeIDMap := make(map[string]map[string]string)for dataSpecies, ctSet := range uniqueCalltypes {dbSpecies, ok := mapping.GetDBSpecies(dataSpecies)if !ok {continue}if calltypeIDMap[dbSpecies] == nil {calltypeIDMap[dbSpecies] = make(map[string]string)}for dataCalltype := range ctSet {dbCalltype := mapping.GetDBCalltype(dataSpecies, dataCalltype)var calltypeID stringerr := q.QueryRowContext(context.Background(), `SELECT ct.idFROM call_type ctJOIN species s ON ct.species_id = s.idWHERE s.label = ? AND ct.label = ? AND ct.active = true`, dbSpecies, dbCalltype).Scan(&calltypeID)if err == nil {calltypeIDMap[dbSpecies][dbCalltype] = calltypeID}}}return calltypeIDMap, nil}// validateAndMapFiles validates files exist by hash, are linked to dataset, and have no existing labelsfunc validateAndMapFiles(q db.Querier,scannedFiles []scannedDataFile,clusterID string,datasetID string,) (map[string]scannedDataFile, []ImportSegmentError) {fileIDMap := make(map[string]scannedDataFile)var errors []ImportSegmentErrorfor _, sf := range scannedFiles {// Compute hashhash, err := utils.ComputeXXH64(sf.WavPath)if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.WavPath),Stage: StageHash,Message: fmt.Sprintf("failed to compute hash: %v", err),})continue}sf.WavHash = hash// Find file by hash in clustervar fileID stringvar duration float64err = q.QueryRowContext(context.Background(), `SELECT id, duration FROM file WHERE xxh64_hash = ? AND cluster_id = ? AND active = true`, hash, clusterID).Scan(&fileID, &duration)if err == sql.ErrNoRows {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.WavPath),Stage: StageValidation,Message: fmt.Sprintf("file hash not found in database for cluster (hash: %s)", hash),})continue}if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.WavPath),Stage: StageValidation,Message: fmt.Sprintf("failed to query file: %v", err),})continue}sf.FileID = fileIDsf.Duration = duration// Verify file is linked to dataset via file_dataset junction table (composite FK)var fileLinkedToDataset boolerr = q.QueryRowContext(context.Background(), `SELECT EXISTS(SELECT 1 FROM file_dataset WHERE file_id = ? AND dataset_id = ?)`, fileID, datasetID).Scan(&fileLinkedToDataset)if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.WavPath),Stage: StageValidation,Message: fmt.Sprintf("failed to verify file-dataset link: %v", err),})continue}if !fileLinkedToDataset {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.WavPath),Stage: StageValidation,Message: fmt.Sprintf("file exists in cluster but is not linked to dataset %s", datasetID),})continue}// Check no existing labels for this filevar labelCount interr = q.QueryRowContext(context.Background(), `SELECT COUNT(*) FROM label lJOIN segment s ON l.segment_id = s.idWHERE s.file_id = ? AND l.active = true`, fileID).Scan(&labelCount)if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.WavPath),Stage: StageValidation,Message: fmt.Sprintf("failed to check existing labels: %v", err),})continue}if labelCount > 0 {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.WavPath),Stage: StageValidation,Message: fmt.Sprintf("file already has %d label(s) - fresh imports only", labelCount),})continue}fileIDMap[fileID] = sf}return fileIDMap, errors}// countTotalSegments counts total segments from validated filesfunc countTotalSegments(fileIDMap map[string]scannedDataFile) int {count := 0for _, sf := range fileIDMap {count += len(sf.Segments)}return count}
package impimport ("context""database/sql""fmt""path/filepath""strings""skraak/datafile""skraak/db""skraak/utils")// dataFileUpdate holds data to write back to .data file after importtype dataFileUpdate struct {DataPath stringWavHash stringLabelIDs map[int]map[int]string // segmentIndex -> labelIndex -> labelID}// importLabelResult holds the result of importing a single label.type importLabelResult struct {labelImport LabelImportlabelID stringsubtypesImported interr ImportSegmentErrorhasError bool}// resolvedLabelIDs holds the resolved database IDs for a label.type resolvedLabelIDs struct {speciesID stringfilterID stringlabelID stringdbSpecies string}// resolveLabelIDs looks up species and filter IDs, generates a label ID.// Returns an error if any lookup fails.func resolveLabelIDs(label *datafile.Label,sf scannedDataFile,mapping MappingFile,filterIDMap map[string]string,speciesIDMap map[string]string,) (resolvedLabelIDs, error) {dbSpecies, ok := mapping.GetDBSpecies(label.Species)if !ok {return resolvedLabelIDs{}, fmt.Errorf("species not found in mapping: %s", label.Species)}speciesID, ok := speciesIDMap[dbSpecies]if !ok {return resolvedLabelIDs{}, fmt.Errorf("species ID not found: %s", dbSpecies)}filterID, ok := filterIDMap[label.Filter]if !ok {return resolvedLabelIDs{}, fmt.Errorf("filter ID not found: %s", label.Filter)}labelID, err := utils.GenerateLongID()if err != nil {return resolvedLabelIDs{}, fmt.Errorf("failed to generate label ID: %w", err)}return resolvedLabelIDs{speciesID: speciesID,filterID: filterID,labelID: labelID,dbSpecies: dbSpecies,}, nil}// importSingleLabel inserts a single label and its metadata/subtype into the DB.func importSingleLabel(ctx context.Context,tx *db.LoggedTx,label *datafile.Label,segmentID string,segIdx, labelIdx int,sf scannedDataFile,mapping MappingFile,filterIDMap map[string]string,speciesIDMap map[string]string,calltypeIDMap map[string]map[string]string,) importLabelResult {// Resolve all IDs firstids, err := resolveLabelIDs(label, sf, mapping, filterIDMap, speciesIDMap)if err != nil {return importLabelResult{err: ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: err.Error(),}, hasError: true}}// Insert the labelif err := insertLabel(ctx, tx, ids, segmentID, label); err != nil {return importLabelResult{err: ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: err.Error(),}, hasError: true}}// Insert label_metadata if comment existsif label.Comment != "" {if err := insertLabelMetadata(ctx, tx, ids.labelID, label.Comment); err != nil {return importLabelResult{err: ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: err.Error(),}, hasError: true}}}labelImport := LabelImport{LabelID: ids.labelID,Species: ids.dbSpecies,Filter: label.Filter,Certainty: label.Certainty,}if label.Comment != "" {labelImport.Comment = label.Comment}// Insert label_subtype if calltype existsif label.CallType != "" {if ctErr := importCalltype(ctx, tx, ids.labelID, label, ids.dbSpecies, ids.filterID, mapping, calltypeIDMap, sf); ctErr != nil {return importLabelResult{err: *ctErr, hasError: true}}labelImport.CallType = mapping.GetDBCalltype(label.Species, label.CallType)return importLabelResult{labelImport: labelImport, labelID: ids.labelID, subtypesImported: 1}}return importLabelResult{labelImport: labelImport, labelID: ids.labelID}}// insertLabel inserts a label row into the database.func insertLabel(ctx context.Context, tx *db.LoggedTx, ids resolvedLabelIDs, segmentID string, label *datafile.Label) error {_, err := tx.ExecContext(ctx, `INSERT INTO label (id, segment_id, species_id, filter_id, certainty, created_at, last_modified, active)VALUES (?, ?, ?, ?, ?, now(), now(), true)`, ids.labelID, segmentID, ids.speciesID, ids.filterID, label.Certainty)if err != nil {return fmt.Errorf("failed to insert label: %w", err)}return nil}// insertLabelMetadata inserts a label_metadata row for a comment.func insertLabelMetadata(ctx context.Context, tx *db.LoggedTx, labelID, comment string) error {escapedComment := strings.ReplaceAll(comment, `"`, `\\"`)metadataJSON := fmt.Sprintf(`{"comment": "%s"}`, escapedComment)_, err := tx.ExecContext(ctx, `INSERT INTO label_metadata (label_id, json, created_at, last_modified, active)VALUES (?, ?, now(), now(), true)`, labelID, metadataJSON)if err != nil {return fmt.Errorf("failed to insert label_metadata: %w", err)}return nil}// importCalltype inserts a label_subtype row for a calltype label.func importCalltype(ctx context.Context,tx *db.LoggedTx,labelID string,label *datafile.Label,dbSpecies string,filterID string,mapping MappingFile,calltypeIDMap map[string]map[string]string,sf scannedDataFile,) *ImportSegmentError {dbCalltype := mapping.GetDBCalltype(label.Species, label.CallType)calltypeID := ""if calltypeIDMap[dbSpecies] != nil {calltypeID = calltypeIDMap[dbSpecies][dbCalltype]}if calltypeID == "" {return &ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("calltype ID not found: %s/%s", dbSpecies, dbCalltype),}}subtypeID, err := utils.GenerateLongID()if err != nil {return &ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("failed to generate label_subtype ID: %v", err),}}_, err = tx.ExecContext(ctx, `INSERT INTO label_subtype (id, label_id, calltype_id, filter_id, certainty, created_at, last_modified, active)VALUES (?, ?, ?, ?, ?, now(), now(), true)`, subtypeID, labelID, calltypeID, filterID, label.Certainty)if err != nil {return &ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("failed to insert label_subtype: %v", err),}}return nil}// importSegmentsIntoDB performs the transactional importfunc importSegmentsIntoDB(ctx context.Context,database *sql.DB,fileIDMap map[string]scannedDataFile,scannedFiles []scannedDataFile,mapping MappingFile,filterIDMap map[string]string,speciesIDMap map[string]string,calltypeIDMap map[string]map[string]string,datasetID string,progressHandler func(processed, total int, message string),) ([]SegmentImport, int, int, []dataFileUpdate, []ImportSegmentError) {var importedSegments []SegmentImportvar errors []ImportSegmentErrorimportedLabels := 0importedSubtypes := 0var fileUpdates []dataFileUpdatetx, err := db.BeginLoggedTx(ctx, database, "import_segments")if err != nil {errors = append(errors, ImportSegmentError{Stage: StageImport,Message: fmt.Sprintf("failed to begin transaction: %v", err),})return nil, 0, 0, nil, errors}defer tx.Rollback()totalFiles := len(fileIDMap)processedFiles := 0for _, sf := range fileIDMap {if sf.FileID == "" {continue}processedFiles++if progressHandler != nil {progressHandler(processedFiles, totalFiles, filepath.Base(sf.DataPath))}fileUpdate := dataFileUpdate{DataPath: sf.DataPath,WavHash: sf.WavHash,LabelIDs: make(map[int]map[int]string),}for segIdx, seg := range sf.Segments {segImp, labelIDs, subtypes, segErrs := importSegment(ctx, tx, seg, segIdx, sf, datasetID, mapping, filterIDMap, speciesIDMap, calltypeIDMap)errors = append(errors, segErrs...)importedSubtypes += subtypesif len(segImp.Labels) == 0 {// Delete orphaned segment (no labels succeeded)if _, err := tx.ExecContext(ctx, `DELETE FROM segment WHERE id = ?`, segImp.SegmentID); err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("failed to delete orphaned segment: %v", err),})}} else {importedSegments = append(importedSegments, segImp)importedLabels += len(labelIDs)fileUpdate.LabelIDs[segIdx] = labelIDs}}fileUpdates = append(fileUpdates, fileUpdate)}if err := tx.Commit(); err != nil {errors = append(errors, ImportSegmentError{Stage: StageImport,Message: fmt.Sprintf("failed to commit transaction: %v", err),})return nil, 0, 0, nil, errors}return importedSegments, importedLabels, importedSubtypes, fileUpdates, errors}// importSegment inserts a single segment and its labels into the DB.func importSegment(ctx context.Context,tx *db.LoggedTx,seg *datafile.Segment,segIdx int,sf scannedDataFile,datasetID string,mapping MappingFile,filterIDMap map[string]string,speciesIDMap map[string]string,calltypeIDMap map[string]map[string]string,) (SegmentImport, map[int]string, int, []ImportSegmentError) {var errors []ImportSegmentErrorif seg.StartTime >= seg.EndTime {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("invalid segment bounds: start=%.2f >= end=%.2f", seg.StartTime, seg.EndTime),})return SegmentImport{}, nil, 0, errors}if seg.EndTime > sf.Duration {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("segment end time (%.2f) exceeds file duration (%.2f)", seg.EndTime, sf.Duration),})return SegmentImport{}, nil, 0, errors}segmentID, err := utils.GenerateLongID()if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("failed to generate segment ID: %v", err),})return SegmentImport{}, nil, 0, errors}_, err = tx.ExecContext(ctx, `INSERT INTO segment (id, file_id, dataset_id, start_time, end_time, freq_low, freq_high, created_at, last_modified, active)VALUES (?, ?, ?, ?, ?, ?, ?, now(), now(), true)`, segmentID, sf.FileID, datasetID, seg.StartTime, seg.EndTime, seg.FreqLow, seg.FreqHigh)if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("failed to insert segment: %v", err),})return SegmentImport{}, nil, 0, errors}segImport := SegmentImport{SegmentID: segmentID,FileName: filepath.Base(sf.WavPath),StartTime: seg.StartTime,EndTime: seg.EndTime,FreqLow: seg.FreqLow,FreqHigh: seg.FreqHigh,Labels: make([]LabelImport, 0),}labelIDs := make(map[int]string)var subtypesImported intfor labelIdx, label := range seg.Labels {result := importSingleLabel(ctx, tx, label, segmentID, segIdx, labelIdx, sf, mapping, filterIDMap, speciesIDMap, calltypeIDMap)if result.hasError {errors = append(errors, result.err)continue}labelIDs[labelIdx] = result.labelIDsegImport.Labels = append(segImport.Labels, result.labelImport)subtypesImported += result.subtypesImported}return segImport, labelIDs, subtypesImported, errors}// writeIDsToDataFiles writes skraak_hash and skraak_label_ids back to .data filesfunc writeIDsToDataFiles(fileUpdates []dataFileUpdate) []ImportSegmentError {var errors []ImportSegmentErrorfor _, fu := range fileUpdates {// Parse the .data filedf, err := datafile.ParseDataFile(fu.DataPath)if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(fu.DataPath),Stage: StageImport,Message: fmt.Sprintf("failed to re-parse .data file for writing: %v", err),})continue}// Write skraak_hash to metadataif df.Meta.Extra == nil {df.Meta.Extra = make(map[string]any)}df.Meta.Extra["skraak_hash"] = fu.WavHash// Write skraak_label_id to each labelfor segIdx, labelIDs := range fu.LabelIDs {if segIdx >= len(df.Segments) {continue}seg := df.Segments[segIdx]for labelIdx, labelID := range labelIDs {if labelIdx >= len(seg.Labels) {continue}label := seg.Labels[labelIdx]if label.Extra == nil {label.Extra = make(map[string]any)}label.Extra["skraak_label_id"] = labelID}}// Write the updated .data fileif err := df.Write(fu.DataPath); err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(fu.DataPath),Stage: StageImport,Message: fmt.Sprintf("failed to write updated .data file: %v", err),})continue}}return errors}
}// validateAndPrepareSegments performs phases B+C: parse data files, validate DB state, and prepare ID maps.func validateAndPrepareSegments(database *sql.DB,input ImportSegmentsInput,mapping MappingFile,dataFiles []string,) (*segmentValidation, []ImportSegmentError, error) {// Phase B: Parse all .data files and collect unique valuesscannedFiles, parseErrors, uniqueFilters, uniqueSpecies, uniqueCalltypes := scanAllDataFiles(dataFiles, input.Folder)if len(scannedFiles) == 0 {return nil, parseErrors, nil}// Validate dataset/location/cluster hierarchyif err := validateSegmentHierarchy(database, input.DatasetID, input.LocationID, input.ClusterID); err != nil {return nil, parseErrors, err}// Validate all filters existfilterIDMap, err := validateFiltersExist(database, uniqueFilters)if err != nil {return nil, parseErrors, fmt.Errorf("filter validation failed: %w", err)}// Validate mapping covers all species/calltypes and they exist in DBvalidationResult, err := ValidateMappingAgainstDB(database, mapping, uniqueSpecies, uniqueCalltypes)if err != nil {return nil, parseErrors, fmt.Errorf("mapping validation failed: %w", err)}if validationResult.HasErrors() {return nil, parseErrors, fmt.Errorf("mapping validation failed: %s", validationResult.Error())}// Load species and calltype ID mapsspeciesIDMap, calltypeIDMap, err := loadSpeciesCalltypeIDs(database, mapping, uniqueSpecies, uniqueCalltypes)if err != nil {return nil, parseErrors, fmt.Errorf("failed to load species/calltype IDs: %w", err)}// Validate files: hash exists, linked to dataset, no existing labelsfileIDMap, hashErrors := validateAndMapFiles(database, scannedFiles, input.ClusterID, input.DatasetID)allErrors := append(parseErrors, hashErrors...)return &segmentValidation{scannedFiles: scannedFiles,filterIDMap: filterIDMap,speciesIDMap: speciesIDMap,calltypeIDMap: calltypeIDMap,fileIDMap: fileIDMap,}, allErrors, nil
}// validateSegmentImportInput validates input parametersfunc validateSegmentImportInput(input ImportSegmentsInput) error {// Validate folder existsif info, err := os.Stat(input.Folder); err != nil {return fmt.Errorf("folder does not exist: %s", input.Folder)} else if !info.IsDir() {return fmt.Errorf("path is not a folder: %s", input.Folder)}// Validate mapping file existsif _, err := os.Stat(input.Mapping); err != nil {return fmt.Errorf("mapping file does not exist: %s", input.Mapping)}// Validate IDsif err := utils.ValidateShortID(input.DatasetID, "dataset_id"); err != nil {return err}if err := utils.ValidateShortID(input.LocationID, "location_id"); err != nil {return err}if err := utils.ValidateShortID(input.ClusterID, "cluster_id"); err != nil {return err}return nil}// validateSegmentHierarchy validates dataset/location/cluster relationshipsfunc validateSegmentHierarchy(q db.Querier, datasetID, locationID, clusterID string) error {if err := db.ValidateDatasetTypeForImport(q, datasetID); err != nil {return err}if err := db.ValidateLocationBelongsToDataset(q, locationID, datasetID); err != nil {return err}if err := db.ClusterBelongsToLocation(q, clusterID, locationID); err != nil {return err}return nil}// scanAllDataFiles parses all .data files and collects unique valuesfunc scanAllDataFiles(dataFiles []string, folder string) ([]scannedDataFile,[]ImportSegmentError,map[string]bool,map[string]bool,map[string]map[string]bool,) {var scanned []scannedDataFilevar errors []ImportSegmentErroruniqueFilters := make(map[string]bool)uniqueSpecies := make(map[string]bool)uniqueCalltypes := make(map[string]map[string]bool) // species -> calltype -> truefor _, dataPath := range dataFiles {// Find corresponding WAV filewavPath := strings.TrimSuffix(dataPath, ".data")if _, err := os.Stat(wavPath); err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(dataPath),Stage: StageValidation,Message: fmt.Sprintf("corresponding WAV file not found: %s", filepath.Base(wavPath)),})continue}// Parse .data filedf, err := datafile.ParseDataFile(dataPath)if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(dataPath),Stage: StageValidation,Message: fmt.Sprintf("failed to parse .data file: %v", err),})continue}// Collect unique filters, species, calltypesfor _, seg := range df.Segments {for _, label := range seg.Labels {uniqueFilters[label.Filter] = trueuniqueSpecies[label.Species] = trueif label.CallType != "" {if uniqueCalltypes[label.Species] == nil {uniqueCalltypes[label.Species] = make(map[string]bool)}uniqueCalltypes[label.Species][label.CallType] = true}}}scanned = append(scanned, scannedDataFile{DataPath: dataPath,WavPath: wavPath,Duration: df.Meta.Duration,Segments: df.Segments,})}return scanned, errors, uniqueFilters, uniqueSpecies, uniqueCalltypes
// validateFiltersExist checks all filters exist in DB and returns ID mapfunc validateFiltersExist(q db.Querier, filterNames map[string]bool) (map[string]string, error) {filterIDMap := make(map[string]string)if len(filterNames) == 0 {return filterIDMap, nil}names := make([]string, 0, len(filterNames))for name := range filterNames {names = append(names, name)}query := `SELECT id, name FROM filter WHERE name IN (` + db.Placeholders(len(names)) + `) AND active = true`args := make([]any, len(names))for i, name := range names {args[i] = name}rows, err := q.QueryContext(context.Background(), query, args...)if err != nil {return nil, fmt.Errorf("failed to query filters: %w", err)}defer rows.Close()for rows.Next() {var id, name stringif err := rows.Scan(&id, &name); err == nil {filterIDMap[name] = id}}// Check for missing filtersvar missing []stringfor name := range filterNames {if _, exists := filterIDMap[name]; !exists {missing = append(missing, name)}}if len(missing) > 0 {return nil, fmt.Errorf("filters not found in database: [%s]", strings.Join(missing, ", "))}return filterIDMap, nil}// loadSpeciesCalltypeIDs loads species and calltype ID mapsfunc loadSpeciesCalltypeIDs(q db.Querier,mapping MappingFile,uniqueSpecies map[string]bool,uniqueCalltypes map[string]map[string]bool,) (map[string]string, map[string]map[string]string, error) {speciesIDMap, err := loadSpeciesIDs(q, mapping, uniqueSpecies)if err != nil {return nil, nil, err}calltypeIDMap, err := loadCalltypeIDs(q, mapping, uniqueCalltypes)if err != nil {return nil, nil, err}return speciesIDMap, calltypeIDMap, nil}// loadSpeciesIDs queries the DB for species IDs matching the mapped species labels.func loadSpeciesIDs(q db.Querier, mapping MappingFile, uniqueSpecies map[string]bool) (map[string]string, error) {speciesIDMap := make(map[string]string)dbSpeciesSet := make(map[string]bool)for dataSpecies := range uniqueSpecies {if dbSpecies, ok := mapping.GetDBSpecies(dataSpecies); ok {dbSpeciesSet[dbSpecies] = true}}if len(dbSpeciesSet) == 0 {return speciesIDMap, nil}dbSpeciesList := make([]string, 0, len(dbSpeciesSet))for s := range dbSpeciesSet {dbSpeciesList = append(dbSpeciesList, s)}query := `SELECT id, label FROM species WHERE label IN (` + db.Placeholders(len(dbSpeciesList)) + `) AND active = true`args := make([]any, len(dbSpeciesList))for i, s := range dbSpeciesList {args[i] = s}rows, err := q.QueryContext(context.Background(), query, args...)if err != nil {return nil, fmt.Errorf("failed to query species: %w", err)}defer rows.Close()for rows.Next() {var id, label stringif err := rows.Scan(&id, &label); err == nil {speciesIDMap[label] = id}}return speciesIDMap, nil}// loadCalltypeIDs queries the DB for calltype IDs matching the mapped calltype labels.func loadCalltypeIDs(q db.Querier, mapping MappingFile, uniqueCalltypes map[string]map[string]bool) (map[string]map[string]string, error) {calltypeIDMap := make(map[string]map[string]string)for dataSpecies, ctSet := range uniqueCalltypes {dbSpecies, ok := mapping.GetDBSpecies(dataSpecies)if !ok {continue}if calltypeIDMap[dbSpecies] == nil {calltypeIDMap[dbSpecies] = make(map[string]string)}for dataCalltype := range ctSet {dbCalltype := mapping.GetDBCalltype(dataSpecies, dataCalltype)var calltypeID stringerr := q.QueryRowContext(context.Background(), `SELECT ct.idFROM call_type ctJOIN species s ON ct.species_id = s.idWHERE s.label = ? AND ct.label = ? AND ct.active = true`, dbSpecies, dbCalltype).Scan(&calltypeID)if err == nil {calltypeIDMap[dbSpecies][dbCalltype] = calltypeID}}}return calltypeIDMap, nil}// validateAndMapFiles validates files exist by hash, are linked to dataset, and have no existing labelsfunc validateAndMapFiles(q db.Querier,scannedFiles []scannedDataFile,clusterID string,datasetID string,) (map[string]scannedDataFile, []ImportSegmentError) {fileIDMap := make(map[string]scannedDataFile)var errors []ImportSegmentErrorfor _, sf := range scannedFiles {// Compute hashhash, err := utils.ComputeXXH64(sf.WavPath)if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.WavPath),Stage: StageHash,Message: fmt.Sprintf("failed to compute hash: %v", err),})continue}sf.WavHash = hash// Find file by hash in clustervar fileID stringvar duration float64err = q.QueryRowContext(context.Background(), `SELECT id, duration FROM file WHERE xxh64_hash = ? AND cluster_id = ? AND active = true`, hash, clusterID).Scan(&fileID, &duration)if err == sql.ErrNoRows {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.WavPath),Stage: StageValidation,Message: fmt.Sprintf("file hash not found in database for cluster (hash: %s)", hash),})continue}if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.WavPath),Stage: StageValidation,Message: fmt.Sprintf("failed to query file: %v", err),})continue}sf.FileID = fileIDsf.Duration = duration// Verify file is linked to dataset via file_dataset junction table (composite FK)var fileLinkedToDataset boolerr = q.QueryRowContext(context.Background(), `SELECT EXISTS(SELECT 1 FROM file_dataset WHERE file_id = ? AND dataset_id = ?)`, fileID, datasetID).Scan(&fileLinkedToDataset)if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.WavPath),Stage: StageValidation,Message: fmt.Sprintf("failed to verify file-dataset link: %v", err),})continue}if !fileLinkedToDataset {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.WavPath),Stage: StageValidation,Message: fmt.Sprintf("file exists in cluster but is not linked to dataset %s", datasetID),})continue}// Check no existing labels for this filevar labelCount interr = q.QueryRowContext(context.Background(), `SELECT COUNT(*) FROM label lJOIN segment s ON l.segment_id = s.idWHERE s.file_id = ? AND l.active = true`, fileID).Scan(&labelCount)if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.WavPath),Stage: StageValidation,Message: fmt.Sprintf("failed to check existing labels: %v", err),})continue}if labelCount > 0 {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.WavPath),Stage: StageValidation,Message: fmt.Sprintf("file already has %d label(s) - fresh imports only", labelCount),})continue}fileIDMap[fileID] = sf}return fileIDMap, errors}// dataFileUpdate holds data to write back to .data file after importtype dataFileUpdate struct {DataPath stringWavHash stringLabelIDs map[int]map[int]string // segmentIndex -> labelIndex -> labelID}// importLabelResult holds the result of importing a single label.type importLabelResult struct {labelImport LabelImportlabelID stringsubtypesImported interr ImportSegmentErrorhasError bool}// resolvedLabelIDs holds the resolved database IDs for a label.type resolvedLabelIDs struct {speciesID stringfilterID stringlabelID stringdbSpecies string}// resolveLabelIDs looks up species and filter IDs, generates a label ID.// Returns an error if any lookup fails.func resolveLabelIDs(label *datafile.Label,sf scannedDataFile,mapping MappingFile,filterIDMap map[string]string,speciesIDMap map[string]string,) (resolvedLabelIDs, error) {dbSpecies, ok := mapping.GetDBSpecies(label.Species)if !ok {return resolvedLabelIDs{}, fmt.Errorf("species not found in mapping: %s", label.Species)}speciesID, ok := speciesIDMap[dbSpecies]if !ok {return resolvedLabelIDs{}, fmt.Errorf("species ID not found: %s", dbSpecies)}filterID, ok := filterIDMap[label.Filter]if !ok {return resolvedLabelIDs{}, fmt.Errorf("filter ID not found: %s", label.Filter)}labelID, err := utils.GenerateLongID()if err != nil {return resolvedLabelIDs{}, fmt.Errorf("failed to generate label ID: %w", err)}return resolvedLabelIDs{speciesID: speciesID,filterID: filterID,labelID: labelID,dbSpecies: dbSpecies,}, nil}// importSingleLabel inserts a single label and its metadata/subtype into the DB.func importSingleLabel(ctx context.Context,tx *db.LoggedTx,label *datafile.Label,segmentID string,segIdx, labelIdx int,sf scannedDataFile,mapping MappingFile,filterIDMap map[string]string,speciesIDMap map[string]string,calltypeIDMap map[string]map[string]string,) importLabelResult {// Resolve all IDs firstids, err := resolveLabelIDs(label, sf, mapping, filterIDMap, speciesIDMap)if err != nil {return importLabelResult{err: ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: err.Error(),}, hasError: true}}// Insert the labelif err := insertLabel(ctx, tx, ids, segmentID, label); err != nil {return importLabelResult{err: ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: err.Error(),}, hasError: true}}// Insert label_metadata if comment existsif label.Comment != "" {if err := insertLabelMetadata(ctx, tx, ids.labelID, label.Comment); err != nil {return importLabelResult{err: ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: err.Error(),}, hasError: true}}}labelImport := LabelImport{LabelID: ids.labelID,Species: ids.dbSpecies,Filter: label.Filter,Certainty: label.Certainty,}if label.Comment != "" {labelImport.Comment = label.Comment}// Insert label_subtype if calltype existsif label.CallType != "" {if ctErr := importCalltype(ctx, tx, ids.labelID, label, ids.dbSpecies, ids.filterID, mapping, calltypeIDMap, sf); ctErr != nil {return importLabelResult{err: *ctErr, hasError: true}}labelImport.CallType = mapping.GetDBCalltype(label.Species, label.CallType)return importLabelResult{labelImport: labelImport, labelID: ids.labelID, subtypesImported: 1}}return importLabelResult{labelImport: labelImport, labelID: ids.labelID}}// insertLabel inserts a label row into the database.func insertLabel(ctx context.Context, tx *db.LoggedTx, ids resolvedLabelIDs, segmentID string, label *datafile.Label) error {_, err := tx.ExecContext(ctx, `INSERT INTO label (id, segment_id, species_id, filter_id, certainty, created_at, last_modified, active)VALUES (?, ?, ?, ?, ?, now(), now(), true)`, ids.labelID, segmentID, ids.speciesID, ids.filterID, label.Certainty)if err != nil {return fmt.Errorf("failed to insert label: %w", err)}return nil}// insertLabelMetadata inserts a label_metadata row for a comment.func insertLabelMetadata(ctx context.Context, tx *db.LoggedTx, labelID, comment string) error {escapedComment := strings.ReplaceAll(comment, `"`, `\\"`)metadataJSON := fmt.Sprintf(`{"comment": "%s"}`, escapedComment)_, err := tx.ExecContext(ctx, `INSERT INTO label_metadata (label_id, json, created_at, last_modified, active)VALUES (?, ?, now(), now(), true)`, labelID, metadataJSON)if err != nil {return fmt.Errorf("failed to insert label_metadata: %w", err)}return nil}// importCalltype inserts a label_subtype row for a calltype label.func importCalltype(ctx context.Context,tx *db.LoggedTx,labelID string,label *datafile.Label,dbSpecies string,filterID string,mapping MappingFile,calltypeIDMap map[string]map[string]string,sf scannedDataFile,) *ImportSegmentError {dbCalltype := mapping.GetDBCalltype(label.Species, label.CallType)calltypeID := ""if calltypeIDMap[dbSpecies] != nil {calltypeID = calltypeIDMap[dbSpecies][dbCalltype]}if calltypeID == "" {return &ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("calltype ID not found: %s/%s", dbSpecies, dbCalltype),}}subtypeID, err := utils.GenerateLongID()if err != nil {return &ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("failed to generate label_subtype ID: %v", err),}}_, err = tx.ExecContext(ctx, `INSERT INTO label_subtype (id, label_id, calltype_id, filter_id, certainty, created_at, last_modified, active)VALUES (?, ?, ?, ?, ?, now(), now(), true)`, subtypeID, labelID, calltypeID, filterID, label.Certainty)if err != nil {return &ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("failed to insert label_subtype: %v", err),}}return nil}// importSegmentsIntoDB performs the transactional importfunc importSegmentsIntoDB(ctx context.Context,database *sql.DB,fileIDMap map[string]scannedDataFile,scannedFiles []scannedDataFile,mapping MappingFile,filterIDMap map[string]string,speciesIDMap map[string]string,calltypeIDMap map[string]map[string]string,datasetID string,progressHandler func(processed, total int, message string),) ([]SegmentImport, int, int, []dataFileUpdate, []ImportSegmentError) {var importedSegments []SegmentImportvar errors []ImportSegmentErrorimportedLabels := 0importedSubtypes := 0var fileUpdates []dataFileUpdatetx, err := db.BeginLoggedTx(ctx, database, "import_segments")if err != nil {errors = append(errors, ImportSegmentError{Stage: StageImport,Message: fmt.Sprintf("failed to begin transaction: %v", err),})return nil, 0, 0, nil, errors}defer tx.Rollback()totalFiles := len(fileIDMap)processedFiles := 0for _, sf := range fileIDMap {if sf.FileID == "" {continue}processedFiles++if progressHandler != nil {progressHandler(processedFiles, totalFiles, filepath.Base(sf.DataPath))}fileUpdate := dataFileUpdate{DataPath: sf.DataPath,WavHash: sf.WavHash,LabelIDs: make(map[int]map[int]string),}for segIdx, seg := range sf.Segments {segImp, labelIDs, subtypes, segErrs := importSegment(ctx, tx, seg, segIdx, sf, datasetID, mapping, filterIDMap, speciesIDMap, calltypeIDMap)errors = append(errors, segErrs...)importedSubtypes += subtypesif len(segImp.Labels) == 0 {// Delete orphaned segment (no labels succeeded)if _, err := tx.ExecContext(ctx, `DELETE FROM segment WHERE id = ?`, segImp.SegmentID); err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("failed to delete orphaned segment: %v", err),})}} else {importedSegments = append(importedSegments, segImp)importedLabels += len(labelIDs)fileUpdate.LabelIDs[segIdx] = labelIDs}}fileUpdates = append(fileUpdates, fileUpdate)}if err := tx.Commit(); err != nil {errors = append(errors, ImportSegmentError{Stage: StageImport,Message: fmt.Sprintf("failed to commit transaction: %v", err),})return nil, 0, 0, nil, errors}return importedSegments, importedLabels, importedSubtypes, fileUpdates, errors}// importSegment inserts a single segment and its labels into the DB.func importSegment(ctx context.Context,tx *db.LoggedTx,seg *datafile.Segment,segIdx int,sf scannedDataFile,datasetID string,mapping MappingFile,filterIDMap map[string]string,speciesIDMap map[string]string,calltypeIDMap map[string]map[string]string,) (SegmentImport, map[int]string, int, []ImportSegmentError) {var errors []ImportSegmentErrorif seg.StartTime >= seg.EndTime {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("invalid segment bounds: start=%.2f >= end=%.2f", seg.StartTime, seg.EndTime),})return SegmentImport{}, nil, 0, errors}if seg.EndTime > sf.Duration {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("segment end time (%.2f) exceeds file duration (%.2f)", seg.EndTime, sf.Duration),})return SegmentImport{}, nil, 0, errors}segmentID, err := utils.GenerateLongID()if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("failed to generate segment ID: %v", err),})return SegmentImport{}, nil, 0, errors}_, err = tx.ExecContext(ctx, `INSERT INTO segment (id, file_id, dataset_id, start_time, end_time, freq_low, freq_high, created_at, last_modified, active)VALUES (?, ?, ?, ?, ?, ?, ?, now(), now(), true)`, segmentID, sf.FileID, datasetID, seg.StartTime, seg.EndTime, seg.FreqLow, seg.FreqHigh)if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(sf.DataPath), Stage: StageImport,Message: fmt.Sprintf("failed to insert segment: %v", err),})return SegmentImport{}, nil, 0, errors}segImport := SegmentImport{SegmentID: segmentID,FileName: filepath.Base(sf.WavPath),StartTime: seg.StartTime,EndTime: seg.EndTime,FreqLow: seg.FreqLow,FreqHigh: seg.FreqHigh,Labels: make([]LabelImport, 0),}labelIDs := make(map[int]string)var subtypesImported intfor labelIdx, label := range seg.Labels {result := importSingleLabel(ctx, tx, label, segmentID, segIdx, labelIdx, sf, mapping, filterIDMap, speciesIDMap, calltypeIDMap)if result.hasError {errors = append(errors, result.err)continue}labelIDs[labelIdx] = result.labelIDsegImport.Labels = append(segImport.Labels, result.labelImport)subtypesImported += result.subtypesImported}return segImport, labelIDs, subtypesImported, errors}// countTotalSegments counts total segments from validated filesfunc countTotalSegments(fileIDMap map[string]scannedDataFile) int {count := 0for _, sf := range fileIDMap {count += len(sf.Segments)}return count}// writeIDsToDataFiles writes skraak_hash and skraak_label_ids back to .data filesfunc writeIDsToDataFiles(fileUpdates []dataFileUpdate) []ImportSegmentError {var errors []ImportSegmentErrorfor _, fu := range fileUpdates {// Parse the .data filedf, err := datafile.ParseDataFile(fu.DataPath)if err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(fu.DataPath),Stage: StageImport,Message: fmt.Sprintf("failed to re-parse .data file for writing: %v", err),})continue}// Write skraak_hash to metadataif df.Meta.Extra == nil {df.Meta.Extra = make(map[string]any)}df.Meta.Extra["skraak_hash"] = fu.WavHash// Write skraak_label_id to each labelfor segIdx, labelIDs := range fu.LabelIDs {if segIdx >= len(df.Segments) {continue}seg := df.Segments[segIdx]for labelIdx, labelID := range labelIDs {if labelIdx >= len(seg.Labels) {continue}label := seg.Labels[labelIdx]if label.Extra == nil {label.Extra = make(map[string]any)}label.Extra["skraak_label_id"] = labelID}}// Write the updated .data fileif err := df.Write(fu.DataPath); err != nil {errors = append(errors, ImportSegmentError{File: filepath.Base(fu.DataPath),Stage: StageImport,Message: fmt.Sprintf("failed to write updated .data file: %v", err),})continue}}return errors}