split tools/import

quietlight
May 19, 2026, 2:10 AM
RDJ6UCAG2OHWUKWQUNMDPC4ZJRIO53BATIM5AE5FOKMHWZSGQ4XAC

Dependencies

  • [2] LHZQOX64 complexity in tools calls and import
  • [3] TSOJUMHV more tests
  • [4] NQPVZ3PP first phase of utils refactor, all realted to db interfaces
  • [5] V2HX6HEB claude going nuts all over the place
  • [6] 3DVPQOKB big tidy up of tools/
  • [7] ZCCQ4P5T reduce complexity to under 14, gocyclo but cilint test still has 3 functions over

Change contents

  • file addition: import_segments_prepare.go (----------)
    [3.1]
    package imp
    import (
    "context"
    "database/sql"
    "fmt"
    "os"
    "path/filepath"
    "strings"
    "skraak/datafile"
    "skraak/db"
    "skraak/utils"
    )
    // validateAndPrepareSegments performs phases B+C: parse data files, validate DB state, and prepare ID maps.
    func validateAndPrepareSegments(
    database *sql.DB,
    input ImportSegmentsInput,
    mapping MappingFile,
    dataFiles []string,
    ) (*segmentValidation, []ImportSegmentError, error) {
    // Phase B: Parse all .data files and collect unique values
    scannedFiles, parseErrors, uniqueFilters, uniqueSpecies, uniqueCalltypes := scanAllDataFiles(dataFiles, input.Folder)
    if len(scannedFiles) == 0 {
    return nil, parseErrors, nil
    }
    // Validate dataset/location/cluster hierarchy
    if err := validateSegmentHierarchy(database, input.DatasetID, input.LocationID, input.ClusterID); err != nil {
    return nil, parseErrors, err
    }
    // Validate all filters exist
    filterIDMap, err := validateFiltersExist(database, uniqueFilters)
    if err != nil {
    return nil, parseErrors, fmt.Errorf("filter validation failed: %w", err)
    }
    // Validate mapping covers all species/calltypes and they exist in DB
    validationResult, err := ValidateMappingAgainstDB(database, mapping, uniqueSpecies, uniqueCalltypes)
    if err != nil {
    return nil, parseErrors, fmt.Errorf("mapping validation failed: %w", err)
    }
    if validationResult.HasErrors() {
    return nil, parseErrors, fmt.Errorf("mapping validation failed: %s", validationResult.Error())
    }
    // Load species and calltype ID maps
    speciesIDMap, calltypeIDMap, err := loadSpeciesCalltypeIDs(database, mapping, uniqueSpecies, uniqueCalltypes)
    if err != nil {
    return nil, parseErrors, fmt.Errorf("failed to load species/calltype IDs: %w", err)
    }
    // Validate files: hash exists, linked to dataset, no existing labels
    fileIDMap, hashErrors := validateAndMapFiles(database, scannedFiles, input.ClusterID, input.DatasetID)
    allErrors := append(parseErrors, hashErrors...)
    return &segmentValidation{
    scannedFiles: scannedFiles,
    filterIDMap: filterIDMap,
    speciesIDMap: speciesIDMap,
    calltypeIDMap: calltypeIDMap,
    fileIDMap: fileIDMap,
    }, allErrors, nil
    }
    // validateSegmentImportInput validates input parameters
    func validateSegmentImportInput(input ImportSegmentsInput) error {
    // Validate folder exists
    if info, err := os.Stat(input.Folder); err != nil {
    return fmt.Errorf("folder does not exist: %s", input.Folder)
    } else if !info.IsDir() {
    return fmt.Errorf("path is not a folder: %s", input.Folder)
    }
    // Validate mapping file exists
    if _, err := os.Stat(input.Mapping); err != nil {
    return fmt.Errorf("mapping file does not exist: %s", input.Mapping)
    }
    // Validate IDs
    if err := utils.ValidateShortID(input.DatasetID, "dataset_id"); err != nil {
    return err
    }
    if err := utils.ValidateShortID(input.LocationID, "location_id"); err != nil {
    return err
    }
    if err := utils.ValidateShortID(input.ClusterID, "cluster_id"); err != nil {
    return err
    }
    return nil
    }
    // validateSegmentHierarchy validates dataset/location/cluster relationships
    func validateSegmentHierarchy(q db.Querier, datasetID, locationID, clusterID string) error {
    if err := db.ValidateDatasetTypeForImport(q, datasetID); err != nil {
    return err
    }
    if err := db.ValidateLocationBelongsToDataset(q, locationID, datasetID); err != nil {
    return err
    }
    if err := db.ClusterBelongsToLocation(q, clusterID, locationID); err != nil {
    return err
    }
    return nil
    }
    // scanAllDataFiles parses all .data files and collects unique values
    func scanAllDataFiles(dataFiles []string, folder string) (
    []scannedDataFile,
    []ImportSegmentError,
    map[string]bool,
    map[string]bool,
    map[string]map[string]bool,
    ) {
    var scanned []scannedDataFile
    var errors []ImportSegmentError
    uniqueFilters := make(map[string]bool)
    uniqueSpecies := make(map[string]bool)
    uniqueCalltypes := make(map[string]map[string]bool) // species -> calltype -> true
    for _, dataPath := range dataFiles {
    // Find corresponding WAV file
    wavPath := strings.TrimSuffix(dataPath, ".data")
    if _, err := os.Stat(wavPath); err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(dataPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("corresponding WAV file not found: %s", filepath.Base(wavPath)),
    })
    continue
    }
    // Parse .data file
    df, err := datafile.ParseDataFile(dataPath)
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(dataPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("failed to parse .data file: %v", err),
    })
    continue
    }
    // Collect unique filters, species, calltypes
    for _, seg := range df.Segments {
    for _, label := range seg.Labels {
    uniqueFilters[label.Filter] = true
    uniqueSpecies[label.Species] = true
    if label.CallType != "" {
    if uniqueCalltypes[label.Species] == nil {
    uniqueCalltypes[label.Species] = make(map[string]bool)
    }
    uniqueCalltypes[label.Species][label.CallType] = true
    }
    }
    }
    scanned = append(scanned, scannedDataFile{
    DataPath: dataPath,
    WavPath: wavPath,
    Duration: df.Meta.Duration,
    Segments: df.Segments,
    })
    }
    return scanned, errors, uniqueFilters, uniqueSpecies, uniqueCalltypes
    }
    // validateFiltersExist checks all filters exist in DB and returns ID map
    func validateFiltersExist(q db.Querier, filterNames map[string]bool) (map[string]string, error) {
    filterIDMap := make(map[string]string)
    if len(filterNames) == 0 {
    return filterIDMap, nil
    }
    names := make([]string, 0, len(filterNames))
    for name := range filterNames {
    names = append(names, name)
    }
    query := `SELECT id, name FROM filter WHERE name IN (` + db.Placeholders(len(names)) + `) AND active = true`
    args := make([]any, len(names))
    for i, name := range names {
    args[i] = name
    }
    rows, err := q.QueryContext(context.Background(), query, args...)
    if err != nil {
    return nil, fmt.Errorf("failed to query filters: %w", err)
    }
    defer rows.Close()
    for rows.Next() {
    var id, name string
    if err := rows.Scan(&id, &name); err == nil {
    filterIDMap[name] = id
    }
    }
    // Check for missing filters
    var missing []string
    for name := range filterNames {
    if _, exists := filterIDMap[name]; !exists {
    missing = append(missing, name)
    }
    }
    if len(missing) > 0 {
    return nil, fmt.Errorf("filters not found in database: [%s]", strings.Join(missing, ", "))
    }
    return filterIDMap, nil
    }
    // loadSpeciesCalltypeIDs loads species and calltype ID maps
    func loadSpeciesCalltypeIDs(
    q db.Querier,
    mapping MappingFile,
    uniqueSpecies map[string]bool,
    uniqueCalltypes map[string]map[string]bool,
    ) (map[string]string, map[string]map[string]string, error) {
    speciesIDMap, err := loadSpeciesIDs(q, mapping, uniqueSpecies)
    if err != nil {
    return nil, nil, err
    }
    calltypeIDMap, err := loadCalltypeIDs(q, mapping, uniqueCalltypes)
    if err != nil {
    return nil, nil, err
    }
    return speciesIDMap, calltypeIDMap, nil
    }
    // loadSpeciesIDs queries the DB for species IDs matching the mapped species labels.
    func loadSpeciesIDs(q db.Querier, mapping MappingFile, uniqueSpecies map[string]bool) (map[string]string, error) {
    speciesIDMap := make(map[string]string)
    dbSpeciesSet := make(map[string]bool)
    for dataSpecies := range uniqueSpecies {
    if dbSpecies, ok := mapping.GetDBSpecies(dataSpecies); ok {
    dbSpeciesSet[dbSpecies] = true
    }
    }
    if len(dbSpeciesSet) == 0 {
    return speciesIDMap, nil
    }
    dbSpeciesList := make([]string, 0, len(dbSpeciesSet))
    for s := range dbSpeciesSet {
    dbSpeciesList = append(dbSpeciesList, s)
    }
    query := `SELECT id, label FROM species WHERE label IN (` + db.Placeholders(len(dbSpeciesList)) + `) AND active = true`
    args := make([]any, len(dbSpeciesList))
    for i, s := range dbSpeciesList {
    args[i] = s
    }
    rows, err := q.QueryContext(context.Background(), query, args...)
    if err != nil {
    return nil, fmt.Errorf("failed to query species: %w", err)
    }
    defer rows.Close()
    for rows.Next() {
    var id, label string
    if err := rows.Scan(&id, &label); err == nil {
    speciesIDMap[label] = id
    }
    }
    return speciesIDMap, nil
    }
    // loadCalltypeIDs queries the DB for calltype IDs matching the mapped calltype labels.
    func loadCalltypeIDs(q db.Querier, mapping MappingFile, uniqueCalltypes map[string]map[string]bool) (map[string]map[string]string, error) {
    calltypeIDMap := make(map[string]map[string]string)
    for dataSpecies, ctSet := range uniqueCalltypes {
    dbSpecies, ok := mapping.GetDBSpecies(dataSpecies)
    if !ok {
    continue
    }
    if calltypeIDMap[dbSpecies] == nil {
    calltypeIDMap[dbSpecies] = make(map[string]string)
    }
    for dataCalltype := range ctSet {
    dbCalltype := mapping.GetDBCalltype(dataSpecies, dataCalltype)
    var calltypeID string
    err := q.QueryRowContext(context.Background(), `
    SELECT ct.id
    FROM call_type ct
    JOIN species s ON ct.species_id = s.id
    WHERE s.label = ? AND ct.label = ? AND ct.active = true
    `, dbSpecies, dbCalltype).Scan(&calltypeID)
    if err == nil {
    calltypeIDMap[dbSpecies][dbCalltype] = calltypeID
    }
    }
    }
    return calltypeIDMap, nil
    }
    // validateAndMapFiles validates files exist by hash, are linked to dataset, and have no existing labels
    func validateAndMapFiles(
    q db.Querier,
    scannedFiles []scannedDataFile,
    clusterID string,
    datasetID string,
    ) (map[string]scannedDataFile, []ImportSegmentError) {
    fileIDMap := make(map[string]scannedDataFile)
    var errors []ImportSegmentError
    for _, sf := range scannedFiles {
    // Compute hash
    hash, err := utils.ComputeXXH64(sf.WavPath)
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.WavPath),
    Stage: StageHash,
    Message: fmt.Sprintf("failed to compute hash: %v", err),
    })
    continue
    }
    sf.WavHash = hash
    // Find file by hash in cluster
    var fileID string
    var duration float64
    err = q.QueryRowContext(context.Background(), `
    SELECT id, duration FROM file WHERE xxh64_hash = ? AND cluster_id = ? AND active = true
    `, hash, clusterID).Scan(&fileID, &duration)
    if err == sql.ErrNoRows {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.WavPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("file hash not found in database for cluster (hash: %s)", hash),
    })
    continue
    }
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.WavPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("failed to query file: %v", err),
    })
    continue
    }
    sf.FileID = fileID
    sf.Duration = duration
    // Verify file is linked to dataset via file_dataset junction table (composite FK)
    var fileLinkedToDataset bool
    err = q.QueryRowContext(context.Background(), `
    SELECT EXISTS(SELECT 1 FROM file_dataset WHERE file_id = ? AND dataset_id = ?)
    `, fileID, datasetID).Scan(&fileLinkedToDataset)
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.WavPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("failed to verify file-dataset link: %v", err),
    })
    continue
    }
    if !fileLinkedToDataset {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.WavPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("file exists in cluster but is not linked to dataset %s", datasetID),
    })
    continue
    }
    // Check no existing labels for this file
    var labelCount int
    err = q.QueryRowContext(context.Background(), `
    SELECT COUNT(*) FROM label l
    JOIN segment s ON l.segment_id = s.id
    WHERE s.file_id = ? AND l.active = true
    `, fileID).Scan(&labelCount)
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.WavPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("failed to check existing labels: %v", err),
    })
    continue
    }
    if labelCount > 0 {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.WavPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("file already has %d label(s) - fresh imports only", labelCount),
    })
    continue
    }
    fileIDMap[fileID] = sf
    }
    return fileIDMap, errors
    }
    // countTotalSegments counts total segments from validated files
    func countTotalSegments(fileIDMap map[string]scannedDataFile) int {
    count := 0
    for _, sf := range fileIDMap {
    count += len(sf.Segments)
    }
    return count
    }
  • file addition: import_segments_db.go (----------)
    [3.1]
    package imp
    import (
    "context"
    "database/sql"
    "fmt"
    "path/filepath"
    "strings"
    "skraak/datafile"
    "skraak/db"
    "skraak/utils"
    )
    // dataFileUpdate holds data to write back to .data file after import
    type dataFileUpdate struct {
    DataPath string
    WavHash string
    LabelIDs map[int]map[int]string // segmentIndex -> labelIndex -> labelID
    }
    // importLabelResult holds the result of importing a single label.
    type importLabelResult struct {
    labelImport LabelImport
    labelID string
    subtypesImported int
    err ImportSegmentError
    hasError bool
    }
    // resolvedLabelIDs holds the resolved database IDs for a label.
    type resolvedLabelIDs struct {
    speciesID string
    filterID string
    labelID string
    dbSpecies string
    }
    // resolveLabelIDs looks up species and filter IDs, generates a label ID.
    // Returns an error if any lookup fails.
    func resolveLabelIDs(
    label *datafile.Label,
    sf scannedDataFile,
    mapping MappingFile,
    filterIDMap map[string]string,
    speciesIDMap map[string]string,
    ) (resolvedLabelIDs, error) {
    dbSpecies, ok := mapping.GetDBSpecies(label.Species)
    if !ok {
    return resolvedLabelIDs{}, fmt.Errorf("species not found in mapping: %s", label.Species)
    }
    speciesID, ok := speciesIDMap[dbSpecies]
    if !ok {
    return resolvedLabelIDs{}, fmt.Errorf("species ID not found: %s", dbSpecies)
    }
    filterID, ok := filterIDMap[label.Filter]
    if !ok {
    return resolvedLabelIDs{}, fmt.Errorf("filter ID not found: %s", label.Filter)
    }
    labelID, err := utils.GenerateLongID()
    if err != nil {
    return resolvedLabelIDs{}, fmt.Errorf("failed to generate label ID: %w", err)
    }
    return resolvedLabelIDs{
    speciesID: speciesID,
    filterID: filterID,
    labelID: labelID,
    dbSpecies: dbSpecies,
    }, nil
    }
    // importSingleLabel inserts a single label and its metadata/subtype into the DB.
    func importSingleLabel(
    ctx context.Context,
    tx *db.LoggedTx,
    label *datafile.Label,
    segmentID string,
    segIdx, labelIdx int,
    sf scannedDataFile,
    mapping MappingFile,
    filterIDMap map[string]string,
    speciesIDMap map[string]string,
    calltypeIDMap map[string]map[string]string,
    ) importLabelResult {
    // Resolve all IDs first
    ids, err := resolveLabelIDs(label, sf, mapping, filterIDMap, speciesIDMap)
    if err != nil {
    return importLabelResult{err: ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: err.Error(),
    }, hasError: true}
    }
    // Insert the label
    if err := insertLabel(ctx, tx, ids, segmentID, label); err != nil {
    return importLabelResult{err: ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: err.Error(),
    }, hasError: true}
    }
    // Insert label_metadata if comment exists
    if label.Comment != "" {
    if err := insertLabelMetadata(ctx, tx, ids.labelID, label.Comment); err != nil {
    return importLabelResult{err: ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: err.Error(),
    }, hasError: true}
    }
    }
    labelImport := LabelImport{
    LabelID: ids.labelID,
    Species: ids.dbSpecies,
    Filter: label.Filter,
    Certainty: label.Certainty,
    }
    if label.Comment != "" {
    labelImport.Comment = label.Comment
    }
    // Insert label_subtype if calltype exists
    if label.CallType != "" {
    if ctErr := importCalltype(ctx, tx, ids.labelID, label, ids.dbSpecies, ids.filterID, mapping, calltypeIDMap, sf); ctErr != nil {
    return importLabelResult{err: *ctErr, hasError: true}
    }
    labelImport.CallType = mapping.GetDBCalltype(label.Species, label.CallType)
    return importLabelResult{labelImport: labelImport, labelID: ids.labelID, subtypesImported: 1}
    }
    return importLabelResult{labelImport: labelImport, labelID: ids.labelID}
    }
    // insertLabel inserts a label row into the database.
    func insertLabel(ctx context.Context, tx *db.LoggedTx, ids resolvedLabelIDs, segmentID string, label *datafile.Label) error {
    _, err := tx.ExecContext(ctx, `
    INSERT INTO label (id, segment_id, species_id, filter_id, certainty, created_at, last_modified, active)
    VALUES (?, ?, ?, ?, ?, now(), now(), true)
    `, ids.labelID, segmentID, ids.speciesID, ids.filterID, label.Certainty)
    if err != nil {
    return fmt.Errorf("failed to insert label: %w", err)
    }
    return nil
    }
    // insertLabelMetadata inserts a label_metadata row for a comment.
    func insertLabelMetadata(ctx context.Context, tx *db.LoggedTx, labelID, comment string) error {
    escapedComment := strings.ReplaceAll(comment, `"`, `\\"`)
    metadataJSON := fmt.Sprintf(`{"comment": "%s"}`, escapedComment)
    _, err := tx.ExecContext(ctx, `
    INSERT INTO label_metadata (label_id, json, created_at, last_modified, active)
    VALUES (?, ?, now(), now(), true)
    `, labelID, metadataJSON)
    if err != nil {
    return fmt.Errorf("failed to insert label_metadata: %w", err)
    }
    return nil
    }
    // importCalltype inserts a label_subtype row for a calltype label.
    func importCalltype(
    ctx context.Context,
    tx *db.LoggedTx,
    labelID string,
    label *datafile.Label,
    dbSpecies string,
    filterID string,
    mapping MappingFile,
    calltypeIDMap map[string]map[string]string,
    sf scannedDataFile,
    ) *ImportSegmentError {
    dbCalltype := mapping.GetDBCalltype(label.Species, label.CallType)
    calltypeID := ""
    if calltypeIDMap[dbSpecies] != nil {
    calltypeID = calltypeIDMap[dbSpecies][dbCalltype]
    }
    if calltypeID == "" {
    return &ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("calltype ID not found: %s/%s", dbSpecies, dbCalltype),
    }
    }
    subtypeID, err := utils.GenerateLongID()
    if err != nil {
    return &ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("failed to generate label_subtype ID: %v", err),
    }
    }
    _, err = tx.ExecContext(ctx, `
    INSERT INTO label_subtype (id, label_id, calltype_id, filter_id, certainty, created_at, last_modified, active)
    VALUES (?, ?, ?, ?, ?, now(), now(), true)
    `, subtypeID, labelID, calltypeID, filterID, label.Certainty)
    if err != nil {
    return &ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("failed to insert label_subtype: %v", err),
    }
    }
    return nil
    }
    // importSegmentsIntoDB performs the transactional import
    func importSegmentsIntoDB(
    ctx context.Context,
    database *sql.DB,
    fileIDMap map[string]scannedDataFile,
    scannedFiles []scannedDataFile,
    mapping MappingFile,
    filterIDMap map[string]string,
    speciesIDMap map[string]string,
    calltypeIDMap map[string]map[string]string,
    datasetID string,
    progressHandler func(processed, total int, message string),
    ) ([]SegmentImport, int, int, []dataFileUpdate, []ImportSegmentError) {
    var importedSegments []SegmentImport
    var errors []ImportSegmentError
    importedLabels := 0
    importedSubtypes := 0
    var fileUpdates []dataFileUpdate
    tx, err := db.BeginLoggedTx(ctx, database, "import_segments")
    if err != nil {
    errors = append(errors, ImportSegmentError{
    Stage: StageImport,
    Message: fmt.Sprintf("failed to begin transaction: %v", err),
    })
    return nil, 0, 0, nil, errors
    }
    defer tx.Rollback()
    totalFiles := len(fileIDMap)
    processedFiles := 0
    for _, sf := range fileIDMap {
    if sf.FileID == "" {
    continue
    }
    processedFiles++
    if progressHandler != nil {
    progressHandler(processedFiles, totalFiles, filepath.Base(sf.DataPath))
    }
    fileUpdate := dataFileUpdate{
    DataPath: sf.DataPath,
    WavHash: sf.WavHash,
    LabelIDs: make(map[int]map[int]string),
    }
    for segIdx, seg := range sf.Segments {
    segImp, labelIDs, subtypes, segErrs := importSegment(ctx, tx, seg, segIdx, sf, datasetID, mapping, filterIDMap, speciesIDMap, calltypeIDMap)
    errors = append(errors, segErrs...)
    importedSubtypes += subtypes
    if len(segImp.Labels) == 0 {
    // Delete orphaned segment (no labels succeeded)
    if _, err := tx.ExecContext(ctx, `DELETE FROM segment WHERE id = ?`, segImp.SegmentID); err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("failed to delete orphaned segment: %v", err),
    })
    }
    } else {
    importedSegments = append(importedSegments, segImp)
    importedLabels += len(labelIDs)
    fileUpdate.LabelIDs[segIdx] = labelIDs
    }
    }
    fileUpdates = append(fileUpdates, fileUpdate)
    }
    if err := tx.Commit(); err != nil {
    errors = append(errors, ImportSegmentError{
    Stage: StageImport,
    Message: fmt.Sprintf("failed to commit transaction: %v", err),
    })
    return nil, 0, 0, nil, errors
    }
    return importedSegments, importedLabels, importedSubtypes, fileUpdates, errors
    }
    // importSegment inserts a single segment and its labels into the DB.
    func importSegment(
    ctx context.Context,
    tx *db.LoggedTx,
    seg *datafile.Segment,
    segIdx int,
    sf scannedDataFile,
    datasetID string,
    mapping MappingFile,
    filterIDMap map[string]string,
    speciesIDMap map[string]string,
    calltypeIDMap map[string]map[string]string,
    ) (SegmentImport, map[int]string, int, []ImportSegmentError) {
    var errors []ImportSegmentError
    if seg.StartTime >= seg.EndTime {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("invalid segment bounds: start=%.2f >= end=%.2f", seg.StartTime, seg.EndTime),
    })
    return SegmentImport{}, nil, 0, errors
    }
    if seg.EndTime > sf.Duration {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("segment end time (%.2f) exceeds file duration (%.2f)", seg.EndTime, sf.Duration),
    })
    return SegmentImport{}, nil, 0, errors
    }
    segmentID, err := utils.GenerateLongID()
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("failed to generate segment ID: %v", err),
    })
    return SegmentImport{}, nil, 0, errors
    }
    _, err = tx.ExecContext(ctx, `
    INSERT INTO segment (id, file_id, dataset_id, start_time, end_time, freq_low, freq_high, created_at, last_modified, active)
    VALUES (?, ?, ?, ?, ?, ?, ?, now(), now(), true)
    `, segmentID, sf.FileID, datasetID, seg.StartTime, seg.EndTime, seg.FreqLow, seg.FreqHigh)
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("failed to insert segment: %v", err),
    })
    return SegmentImport{}, nil, 0, errors
    }
    segImport := SegmentImport{
    SegmentID: segmentID,
    FileName: filepath.Base(sf.WavPath),
    StartTime: seg.StartTime,
    EndTime: seg.EndTime,
    FreqLow: seg.FreqLow,
    FreqHigh: seg.FreqHigh,
    Labels: make([]LabelImport, 0),
    }
    labelIDs := make(map[int]string)
    var subtypesImported int
    for labelIdx, label := range seg.Labels {
    result := importSingleLabel(ctx, tx, label, segmentID, segIdx, labelIdx, sf, mapping, filterIDMap, speciesIDMap, calltypeIDMap)
    if result.hasError {
    errors = append(errors, result.err)
    continue
    }
    labelIDs[labelIdx] = result.labelID
    segImport.Labels = append(segImport.Labels, result.labelImport)
    subtypesImported += result.subtypesImported
    }
    return segImport, labelIDs, subtypesImported, errors
    }
    // writeIDsToDataFiles writes skraak_hash and skraak_label_ids back to .data files
    func writeIDsToDataFiles(fileUpdates []dataFileUpdate) []ImportSegmentError {
    var errors []ImportSegmentError
    for _, fu := range fileUpdates {
    // Parse the .data file
    df, err := datafile.ParseDataFile(fu.DataPath)
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(fu.DataPath),
    Stage: StageImport,
    Message: fmt.Sprintf("failed to re-parse .data file for writing: %v", err),
    })
    continue
    }
    // Write skraak_hash to metadata
    if df.Meta.Extra == nil {
    df.Meta.Extra = make(map[string]any)
    }
    df.Meta.Extra["skraak_hash"] = fu.WavHash
    // Write skraak_label_id to each label
    for segIdx, labelIDs := range fu.LabelIDs {
    if segIdx >= len(df.Segments) {
    continue
    }
    seg := df.Segments[segIdx]
    for labelIdx, labelID := range labelIDs {
    if labelIdx >= len(seg.Labels) {
    continue
    }
    label := seg.Labels[labelIdx]
    if label.Extra == nil {
    label.Extra = make(map[string]any)
    }
    label.Extra["skraak_label_id"] = labelID
    }
    }
    // Write the updated .data file
    if err := df.Write(fu.DataPath); err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(fu.DataPath),
    Stage: StageImport,
    Message: fmt.Sprintf("failed to write updated .data file: %v", err),
    })
    continue
    }
    }
    return errors
    }
  • edit in tools/import/import_segments.go at line 5
    [3.9984][3.9984:10000]()
    "database/sql"
  • edit in tools/import/import_segments.go at line 6
    [3.10007][3.10007:10041]()
    "os"
    "path/filepath"
    "strings"
  • edit in tools/import/import_segments.go at line 10
    [3.10063][3.10063:10079]()
    "skraak/utils"
  • edit in tools/import/import_segments.go at line 86
    [3.12735][3.12735:12927](),[3.12927][3.47380:47402](),[3.47402][3.12955:13734](),[3.12955][3.12955:13734](),[3.13734][3.47403:47505](),[3.47505][3.13842:14754](),[3.13842][3.13842:14754]()
    }
    // validateAndPrepareSegments performs phases B+C: parse data files, validate DB state, and prepare ID maps.
    func validateAndPrepareSegments(
    database *sql.DB,
    input ImportSegmentsInput,
    mapping MappingFile,
    dataFiles []string,
    ) (*segmentValidation, []ImportSegmentError, error) {
    // Phase B: Parse all .data files and collect unique values
    scannedFiles, parseErrors, uniqueFilters, uniqueSpecies, uniqueCalltypes := scanAllDataFiles(dataFiles, input.Folder)
    if len(scannedFiles) == 0 {
    return nil, parseErrors, nil
    }
    // Validate dataset/location/cluster hierarchy
    if err := validateSegmentHierarchy(database, input.DatasetID, input.LocationID, input.ClusterID); err != nil {
    return nil, parseErrors, err
    }
    // Validate all filters exist
    filterIDMap, err := validateFiltersExist(database, uniqueFilters)
    if err != nil {
    return nil, parseErrors, fmt.Errorf("filter validation failed: %w", err)
    }
    // Validate mapping covers all species/calltypes and they exist in DB
    validationResult, err := ValidateMappingAgainstDB(database, mapping, uniqueSpecies, uniqueCalltypes)
    if err != nil {
    return nil, parseErrors, fmt.Errorf("mapping validation failed: %w", err)
    }
    if validationResult.HasErrors() {
    return nil, parseErrors, fmt.Errorf("mapping validation failed: %s", validationResult.Error())
    }
    // Load species and calltype ID maps
    speciesIDMap, calltypeIDMap, err := loadSpeciesCalltypeIDs(database, mapping, uniqueSpecies, uniqueCalltypes)
    if err != nil {
    return nil, parseErrors, fmt.Errorf("failed to load species/calltype IDs: %w", err)
    }
    // Validate files: hash exists, linked to dataset, no existing labels
    fileIDMap, hashErrors := validateAndMapFiles(database, scannedFiles, input.ClusterID, input.DatasetID)
    allErrors := append(parseErrors, hashErrors...)
    return &segmentValidation{
    scannedFiles: scannedFiles,
    filterIDMap: filterIDMap,
    speciesIDMap: speciesIDMap,
    calltypeIDMap: calltypeIDMap,
    fileIDMap: fileIDMap,
    }, allErrors, nil
  • edit in tools/import/import_segments.go at line 155
    [3.17212][3.17212:18127](),[3.18127][3.16855:17019](),[3.17019][3.18344:18361](),[3.18344][3.18344:18361](),[3.18361][3.17020:17107](),[3.17107][3.18494:18511](),[3.18494][3.18494:18511](),[3.18511][3.17108:17187](),[3.17187][3.18636:19391](),[3.18636][3.18636:19391](),[3.19391][3.47555:47585](),[3.47585][3.19427:19562](),[3.19427][3.19427:19562](),[3.19562][3.60135:60181](),[3.60181][3.19605:19708](),[3.19605][3.19605:19708](),[3.19708][3.47586:47616](),[3.47616][3.19744:20481](),[3.19744][3.19744:20481]()
    }
    // validateSegmentImportInput validates input parameters
    func validateSegmentImportInput(input ImportSegmentsInput) error {
    // Validate folder exists
    if info, err := os.Stat(input.Folder); err != nil {
    return fmt.Errorf("folder does not exist: %s", input.Folder)
    } else if !info.IsDir() {
    return fmt.Errorf("path is not a folder: %s", input.Folder)
    }
    // Validate mapping file exists
    if _, err := os.Stat(input.Mapping); err != nil {
    return fmt.Errorf("mapping file does not exist: %s", input.Mapping)
    }
    // Validate IDs
    if err := utils.ValidateShortID(input.DatasetID, "dataset_id"); err != nil {
    return err
    }
    if err := utils.ValidateShortID(input.LocationID, "location_id"); err != nil {
    return err
    }
    if err := utils.ValidateShortID(input.ClusterID, "cluster_id"); err != nil {
    return err
    }
    return nil
    }
    // validateSegmentHierarchy validates dataset/location/cluster relationships
    func validateSegmentHierarchy(q db.Querier, datasetID, locationID, clusterID string) error {
    if err := db.ValidateDatasetTypeForImport(q, datasetID); err != nil {
    return err
    }
    if err := db.ValidateLocationBelongsToDataset(q, locationID, datasetID); err != nil {
    return err
    }
    if err := db.ClusterBelongsToLocation(q, clusterID, locationID); err != nil {
    return err
    }
    return nil
    }
    // scanAllDataFiles parses all .data files and collects unique values
    func scanAllDataFiles(dataFiles []string, folder string) (
    []scannedDataFile,
    []ImportSegmentError,
    map[string]bool,
    map[string]bool,
    map[string]map[string]bool,
    ) {
    var scanned []scannedDataFile
    var errors []ImportSegmentError
    uniqueFilters := make(map[string]bool)
    uniqueSpecies := make(map[string]bool)
    uniqueCalltypes := make(map[string]map[string]bool) // species -> calltype -> true
    for _, dataPath := range dataFiles {
    // Find corresponding WAV file
    wavPath := strings.TrimSuffix(dataPath, ".data")
    if _, err := os.Stat(wavPath); err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(dataPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("corresponding WAV file not found: %s", filepath.Base(wavPath)),
    })
    continue
    }
    // Parse .data file
    df, err := datafile.ParseDataFile(dataPath)
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(dataPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("failed to parse .data file: %v", err),
    })
    continue
    }
    // Collect unique filters, species, calltypes
    for _, seg := range df.Segments {
    for _, label := range seg.Labels {
    uniqueFilters[label.Filter] = true
    uniqueSpecies[label.Species] = true
    if label.CallType != "" {
    if uniqueCalltypes[label.Species] == nil {
    uniqueCalltypes[label.Species] = make(map[string]bool)
    }
    uniqueCalltypes[label.Species][label.CallType] = true
    }
    }
    }
    scanned = append(scanned, scannedDataFile{
    DataPath: dataPath,
    WavPath: wavPath,
    Duration: df.Meta.Duration,
    Segments: df.Segments,
    })
    }
    return scanned, errors, uniqueFilters, uniqueSpecies, uniqueCalltypes
  • edit in tools/import/import_segments.go at line 156
    [3.20483][3.20483:20558](),[3.20558][3.17188:17286](),[3.17286][3.20658:21064](),[3.20658][3.20658:21064](),[3.21064][3.47617:47684](),[3.17325][3.21107:21745](),[3.47684][3.21107:21745](),[3.21107][3.21107:21745](),[3.21745][3.17326:17341](),[3.17341][3.47685:47707](),[3.47707][3.21790:21928](),[3.21790][3.21790:21928](),[3.21928][3.5323:5672](),[3.5672][3.47708:47823](),[3.5793][3.21928:21969](),[3.47823][3.21928:21969](),[3.21928][3.21928:21969](),[3.22064][3.22064:22065](),[3.22112][3.22112:22297](),[3.22297][3.5794:5986](),[3.5986][3.22482:22483](),[3.22482][3.22482:22483](),[3.22483][3.5987:6201](),[3.6201][3.22702:22703](),[3.22702][3.22702:22703](),[3.22703][3.47824:47891](),[3.47891][3.6240:6341](),[3.6240][3.6240:6341](),[3.6341][3.22857:22858](),[3.22857][3.22857:22858](),[3.22858][3.6342:6461](),[3.6461][3.22986:22993](),[3.22986][3.22986:22993](),[3.22993][3.6462:6491](),[3.6491][3.22993:22994](),[3.22993][3.22993:22994](),[3.22994][3.6492:6580](),[3.6580][3.47892:48032](),[3.48032][3.6726:6780](),[3.6726][3.6726:6780](),[3.6780][3.23016:23349](),[3.23016][3.23016:23349](),[3.23373][3.23373:23398](),[3.23398][3.48033:48085](),[3.17405][3.23426:23702](),[3.48085][3.23426:23702](),[3.23426][3.23426:23702](),[3.23702][3.6781:6808](),[3.6808][3.23743:23877](),[3.23743][3.23743:23877](),[3.23877][3.17406:17421](),[3.17421][3.23894:24305](),[3.23894][3.23894:24305](),[3.24305][3.48086:48110](),[3.48110][3.24335:24516](),[3.24335][3.24335:24516](),[3.24516][3.48111:48161](),[3.17443][3.24542:24796](),[3.48161][3.24542:24796](),[3.24542][3.24542:24796](),[3.24796][3.48162:48192](),[3.48192][3.24832:25049](),[3.24832][3.24832:25049](),[3.25049][3.48193:48223](),[3.48223][3.25085:25330](),[3.25085][3.25085:25330](),[3.25330][3.48224:48274](),[3.17465][3.25356:25594](),[3.48274][3.25356:25594](),[3.25356][3.25356:25594](),[3.25594][3.48275:48305](),[3.48305][3.25630:25840](),[3.25630][3.25630:25840](),[3.25840][3.48306:48336](),[3.48336][3.25876:26059](),[3.25876][3.25876:26059](),[3.26059][3.48337:48387](),[3.17487][3.26085:26338](),[3.48387][3.26085:26338](),[3.26085][3.26085:26338](),[3.26338][3.48388:48418](),[3.48418][3.26374:26576](),[3.26374][3.26374:26576](),[3.26576][3.48419:48449](),[3.48449][3.26612:27230](),[3.26612][3.26612:27230](),[3.27230][2.1357:1528](),[2.1528][3.27230:27233](),[3.27230][3.27230:27233](),[3.27233][2.1529:1666](),[2.1666][3.60182:60206](),[3.27379][3.60182:60206](),[3.27442][3.27442:27463](),[3.27463][3.48450:48472](),[3.48472][3.27491:27556](),[3.27491][3.27491:27556](),[3.27556][2.1667:1697](),[2.1697][3.27623:27687](),[3.27623][3.27623:27687](),[3.27687][2.1698:1789](),[2.1789][3.27899:27955](),[3.27899][3.27899:27955](),[3.27955][2.1790:1869](),[2.1869][3.28155:28212](),[3.28155][3.28155:28212](),[3.28212][2.1870:1951](),[2.1951][3.28414:28458](),[3.28414][3.28414:28458](),[3.28458][2.1952:2672](),[2.2672][3.28458:28527](),[3.28458][3.28458:28527](),[3.28527][3.48647:48704](),[3.48704][2.2673:2698](),[2.2698][3.28655:28680](),[3.28655][3.28655:28680](),[3.28680][2.2699:2789](),[2.2789][3.28942:28994](),[3.28942][3.28942:28994](),[3.28994][3.48705:48762](),[3.48762][2.2790:2815](),[2.2815][3.29117:29212](),[3.29117][3.29117:29212](),[3.29212][2.2816:2899](),[2.2899][3.29542:29595](),[3.29542][3.29542:29595](),[3.29595][3.48763:48821](),[3.48821][2.2900:2926](),[2.2926][3.29729:29788](),[3.29729][3.29729:29788](),[3.29788][2.2927:2981](),[2.2981][3.29834:30033](),[3.29834][3.29834:30033](),[3.30033][2.2982:3170](),[2.3170][3.30203:30285](),[3.30203][3.30203:30285](),[3.30285][2.3171:3267](),[2.3267][3.30377:30380](),[3.30377][3.30377:30380](),[3.30380][2.3268:3345](),[2.3345][3.30380:30381](),[3.30380][3.30380:30381](),[3.30381][2.3346:3871](),[2.3871][3.30451:30454](),[3.30451][3.30451:30454](),[3.30454][2.3872:4436](),[2.4436][3.30454:30600](),[3.30454][3.30454:30600](),[3.30600][3.60207:60231](),[3.60231][3.30621:30658](),[3.30621][3.30621:30658](),[3.30658][3.48822:48844](),[3.48844][3.30686:31009](),[3.30686][3.30686:31009](),[3.31009][3.48845:48902](),[3.48902][3.31072:31249](),[3.31072][3.31072:31249](),[3.31249][3.48903:48960](),[3.48960][3.31312:31693](),[3.31312][3.31312:31693](),[3.31693][3.48961:49018](),[3.49018][3.31756:32044](),[3.31756][3.31756:32044](),[3.32044][3.49019:49041](),[3.49041][3.32072:32610](),[3.32072][3.32072:32610](),[3.32610][3.49042:49067](),[3.49067][3.32641:33648](),[3.32641][3.32641:33648](),[3.33648][3.49068:49128](),[3.49128][3.33714:34094](),[3.33714][3.33714:34094](),[3.34094][3.49129:49154](),[3.49154][3.34125:34445](),[3.34125][3.34125:34445](),[3.34445][3.60232:60256](),[3.60256][3.34466:34519](),[3.34466][3.34466:34519](),[3.34519][3.49155:49177](),[3.49177][3.34547:34835](),[3.34547][3.34547:34835](),[3.34835][3.49178:49235](),[3.49235][3.34898:35129](),[3.34898][3.34898:35129](),[3.35129][3.49236:49293](),[3.49293][3.35192:35454](),[3.35192][3.35192:35454](),[3.35454][3.49294:49351](),[3.49351][3.35517:35998](),[3.35517][3.35517:35998](),[3.35998][3.49352:49409](),[3.49409][3.36061:37416](),[3.36061][3.36061:37416](),[3.37416][3.60257:60306](),[3.60306][3.37462:37568](),[3.37462][3.37462:37568](),[3.37568][3.49410:49436](),[3.49436][3.37600:38460](),[3.37600][3.37600:38460](),[3.38460][3.49437:49463](),[3.49463][3.38492:38608](),[3.38492][3.38492:38608]()
    // validateFiltersExist checks all filters exist in DB and returns ID map
    func validateFiltersExist(q db.Querier, filterNames map[string]bool) (map[string]string, error) {
    filterIDMap := make(map[string]string)
    if len(filterNames) == 0 {
    return filterIDMap, nil
    }
    names := make([]string, 0, len(filterNames))
    for name := range filterNames {
    names = append(names, name)
    }
    query := `SELECT id, name FROM filter WHERE name IN (` + db.Placeholders(len(names)) + `) AND active = true`
    args := make([]any, len(names))
    for i, name := range names {
    args[i] = name
    }
    rows, err := q.QueryContext(context.Background(), query, args...)
    if err != nil {
    return nil, fmt.Errorf("failed to query filters: %w", err)
    }
    defer rows.Close()
    for rows.Next() {
    var id, name string
    if err := rows.Scan(&id, &name); err == nil {
    filterIDMap[name] = id
    }
    }
    // Check for missing filters
    var missing []string
    for name := range filterNames {
    if _, exists := filterIDMap[name]; !exists {
    missing = append(missing, name)
    }
    }
    if len(missing) > 0 {
    return nil, fmt.Errorf("filters not found in database: [%s]", strings.Join(missing, ", "))
    }
    return filterIDMap, nil
    }
    // loadSpeciesCalltypeIDs loads species and calltype ID maps
    func loadSpeciesCalltypeIDs(
    q db.Querier,
    mapping MappingFile,
    uniqueSpecies map[string]bool,
    uniqueCalltypes map[string]map[string]bool,
    ) (map[string]string, map[string]map[string]string, error) {
    speciesIDMap, err := loadSpeciesIDs(q, mapping, uniqueSpecies)
    if err != nil {
    return nil, nil, err
    }
    calltypeIDMap, err := loadCalltypeIDs(q, mapping, uniqueCalltypes)
    if err != nil {
    return nil, nil, err
    }
    return speciesIDMap, calltypeIDMap, nil
    }
    // loadSpeciesIDs queries the DB for species IDs matching the mapped species labels.
    func loadSpeciesIDs(q db.Querier, mapping MappingFile, uniqueSpecies map[string]bool) (map[string]string, error) {
    speciesIDMap := make(map[string]string)
    dbSpeciesSet := make(map[string]bool)
    for dataSpecies := range uniqueSpecies {
    if dbSpecies, ok := mapping.GetDBSpecies(dataSpecies); ok {
    dbSpeciesSet[dbSpecies] = true
    }
    }
    if len(dbSpeciesSet) == 0 {
    return speciesIDMap, nil
    }
    dbSpeciesList := make([]string, 0, len(dbSpeciesSet))
    for s := range dbSpeciesSet {
    dbSpeciesList = append(dbSpeciesList, s)
    }
    query := `SELECT id, label FROM species WHERE label IN (` + db.Placeholders(len(dbSpeciesList)) + `) AND active = true`
    args := make([]any, len(dbSpeciesList))
    for i, s := range dbSpeciesList {
    args[i] = s
    }
    rows, err := q.QueryContext(context.Background(), query, args...)
    if err != nil {
    return nil, fmt.Errorf("failed to query species: %w", err)
    }
    defer rows.Close()
    for rows.Next() {
    var id, label string
    if err := rows.Scan(&id, &label); err == nil {
    speciesIDMap[label] = id
    }
    }
    return speciesIDMap, nil
    }
    // loadCalltypeIDs queries the DB for calltype IDs matching the mapped calltype labels.
    func loadCalltypeIDs(q db.Querier, mapping MappingFile, uniqueCalltypes map[string]map[string]bool) (map[string]map[string]string, error) {
    calltypeIDMap := make(map[string]map[string]string)
    for dataSpecies, ctSet := range uniqueCalltypes {
    dbSpecies, ok := mapping.GetDBSpecies(dataSpecies)
    if !ok {
    continue
    }
    if calltypeIDMap[dbSpecies] == nil {
    calltypeIDMap[dbSpecies] = make(map[string]string)
    }
    for dataCalltype := range ctSet {
    dbCalltype := mapping.GetDBCalltype(dataSpecies, dataCalltype)
    var calltypeID string
    err := q.QueryRowContext(context.Background(), `
    SELECT ct.id
    FROM call_type ct
    JOIN species s ON ct.species_id = s.id
    WHERE s.label = ? AND ct.label = ? AND ct.active = true
    `, dbSpecies, dbCalltype).Scan(&calltypeID)
    if err == nil {
    calltypeIDMap[dbSpecies][dbCalltype] = calltypeID
    }
    }
    }
    return calltypeIDMap, nil
    }
    // validateAndMapFiles validates files exist by hash, are linked to dataset, and have no existing labels
    func validateAndMapFiles(
    q db.Querier,
    scannedFiles []scannedDataFile,
    clusterID string,
    datasetID string,
    ) (map[string]scannedDataFile, []ImportSegmentError) {
    fileIDMap := make(map[string]scannedDataFile)
    var errors []ImportSegmentError
    for _, sf := range scannedFiles {
    // Compute hash
    hash, err := utils.ComputeXXH64(sf.WavPath)
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.WavPath),
    Stage: StageHash,
    Message: fmt.Sprintf("failed to compute hash: %v", err),
    })
    continue
    }
    sf.WavHash = hash
    // Find file by hash in cluster
    var fileID string
    var duration float64
    err = q.QueryRowContext(context.Background(), `
    SELECT id, duration FROM file WHERE xxh64_hash = ? AND cluster_id = ? AND active = true
    `, hash, clusterID).Scan(&fileID, &duration)
    if err == sql.ErrNoRows {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.WavPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("file hash not found in database for cluster (hash: %s)", hash),
    })
    continue
    }
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.WavPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("failed to query file: %v", err),
    })
    continue
    }
    sf.FileID = fileID
    sf.Duration = duration
    // Verify file is linked to dataset via file_dataset junction table (composite FK)
    var fileLinkedToDataset bool
    err = q.QueryRowContext(context.Background(), `
    SELECT EXISTS(SELECT 1 FROM file_dataset WHERE file_id = ? AND dataset_id = ?)
    `, fileID, datasetID).Scan(&fileLinkedToDataset)
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.WavPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("failed to verify file-dataset link: %v", err),
    })
    continue
    }
    if !fileLinkedToDataset {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.WavPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("file exists in cluster but is not linked to dataset %s", datasetID),
    })
    continue
    }
    // Check no existing labels for this file
    var labelCount int
    err = q.QueryRowContext(context.Background(), `
    SELECT COUNT(*) FROM label l
    JOIN segment s ON l.segment_id = s.id
    WHERE s.file_id = ? AND l.active = true
    `, fileID).Scan(&labelCount)
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.WavPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("failed to check existing labels: %v", err),
    })
    continue
    }
    if labelCount > 0 {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.WavPath),
    Stage: StageValidation,
    Message: fmt.Sprintf("file already has %d label(s) - fresh imports only", labelCount),
    })
    continue
    }
    fileIDMap[fileID] = sf
    }
    return fileIDMap, errors
    }
    // dataFileUpdate holds data to write back to .data file after import
    type dataFileUpdate struct {
    DataPath string
    WavHash string
    LabelIDs map[int]map[int]string // segmentIndex -> labelIndex -> labelID
    }
    // importLabelResult holds the result of importing a single label.
    type importLabelResult struct {
    labelImport LabelImport
    labelID string
    subtypesImported int
    err ImportSegmentError
    hasError bool
    }
    // resolvedLabelIDs holds the resolved database IDs for a label.
    type resolvedLabelIDs struct {
    speciesID string
    filterID string
    labelID string
    dbSpecies string
    }
    // resolveLabelIDs looks up species and filter IDs, generates a label ID.
    // Returns an error if any lookup fails.
    func resolveLabelIDs(
    label *datafile.Label,
    sf scannedDataFile,
    mapping MappingFile,
    filterIDMap map[string]string,
    speciesIDMap map[string]string,
    ) (resolvedLabelIDs, error) {
    dbSpecies, ok := mapping.GetDBSpecies(label.Species)
    if !ok {
    return resolvedLabelIDs{}, fmt.Errorf("species not found in mapping: %s", label.Species)
    }
    speciesID, ok := speciesIDMap[dbSpecies]
    if !ok {
    return resolvedLabelIDs{}, fmt.Errorf("species ID not found: %s", dbSpecies)
    }
    filterID, ok := filterIDMap[label.Filter]
    if !ok {
    return resolvedLabelIDs{}, fmt.Errorf("filter ID not found: %s", label.Filter)
    }
    labelID, err := utils.GenerateLongID()
    if err != nil {
    return resolvedLabelIDs{}, fmt.Errorf("failed to generate label ID: %w", err)
    }
    return resolvedLabelIDs{
    speciesID: speciesID,
    filterID: filterID,
    labelID: labelID,
    dbSpecies: dbSpecies,
    }, nil
    }
    // importSingleLabel inserts a single label and its metadata/subtype into the DB.
    func importSingleLabel(
    ctx context.Context,
    tx *db.LoggedTx,
    label *datafile.Label,
    segmentID string,
    segIdx, labelIdx int,
    sf scannedDataFile,
    mapping MappingFile,
    filterIDMap map[string]string,
    speciesIDMap map[string]string,
    calltypeIDMap map[string]map[string]string,
    ) importLabelResult {
    // Resolve all IDs first
    ids, err := resolveLabelIDs(label, sf, mapping, filterIDMap, speciesIDMap)
    if err != nil {
    return importLabelResult{err: ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: err.Error(),
    }, hasError: true}
    }
    // Insert the label
    if err := insertLabel(ctx, tx, ids, segmentID, label); err != nil {
    return importLabelResult{err: ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: err.Error(),
    }, hasError: true}
    }
    // Insert label_metadata if comment exists
    if label.Comment != "" {
    if err := insertLabelMetadata(ctx, tx, ids.labelID, label.Comment); err != nil {
    return importLabelResult{err: ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: err.Error(),
    }, hasError: true}
    }
    }
    labelImport := LabelImport{
    LabelID: ids.labelID,
    Species: ids.dbSpecies,
    Filter: label.Filter,
    Certainty: label.Certainty,
    }
    if label.Comment != "" {
    labelImport.Comment = label.Comment
    }
    // Insert label_subtype if calltype exists
    if label.CallType != "" {
    if ctErr := importCalltype(ctx, tx, ids.labelID, label, ids.dbSpecies, ids.filterID, mapping, calltypeIDMap, sf); ctErr != nil {
    return importLabelResult{err: *ctErr, hasError: true}
    }
    labelImport.CallType = mapping.GetDBCalltype(label.Species, label.CallType)
    return importLabelResult{labelImport: labelImport, labelID: ids.labelID, subtypesImported: 1}
    }
    return importLabelResult{labelImport: labelImport, labelID: ids.labelID}
    }
    // insertLabel inserts a label row into the database.
    func insertLabel(ctx context.Context, tx *db.LoggedTx, ids resolvedLabelIDs, segmentID string, label *datafile.Label) error {
    _, err := tx.ExecContext(ctx, `
    INSERT INTO label (id, segment_id, species_id, filter_id, certainty, created_at, last_modified, active)
    VALUES (?, ?, ?, ?, ?, now(), now(), true)
    `, ids.labelID, segmentID, ids.speciesID, ids.filterID, label.Certainty)
    if err != nil {
    return fmt.Errorf("failed to insert label: %w", err)
    }
    return nil
    }
    // insertLabelMetadata inserts a label_metadata row for a comment.
    func insertLabelMetadata(ctx context.Context, tx *db.LoggedTx, labelID, comment string) error {
    escapedComment := strings.ReplaceAll(comment, `"`, `\\"`)
    metadataJSON := fmt.Sprintf(`{"comment": "%s"}`, escapedComment)
    _, err := tx.ExecContext(ctx, `
    INSERT INTO label_metadata (label_id, json, created_at, last_modified, active)
    VALUES (?, ?, now(), now(), true)
    `, labelID, metadataJSON)
    if err != nil {
    return fmt.Errorf("failed to insert label_metadata: %w", err)
    }
    return nil
    }
    // importCalltype inserts a label_subtype row for a calltype label.
    func importCalltype(
    ctx context.Context,
    tx *db.LoggedTx,
    labelID string,
    label *datafile.Label,
    dbSpecies string,
    filterID string,
    mapping MappingFile,
    calltypeIDMap map[string]map[string]string,
    sf scannedDataFile,
    ) *ImportSegmentError {
    dbCalltype := mapping.GetDBCalltype(label.Species, label.CallType)
    calltypeID := ""
    if calltypeIDMap[dbSpecies] != nil {
    calltypeID = calltypeIDMap[dbSpecies][dbCalltype]
    }
    if calltypeID == "" {
    return &ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("calltype ID not found: %s/%s", dbSpecies, dbCalltype),
    }
    }
    subtypeID, err := utils.GenerateLongID()
    if err != nil {
    return &ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("failed to generate label_subtype ID: %v", err),
    }
    }
    _, err = tx.ExecContext(ctx, `
    INSERT INTO label_subtype (id, label_id, calltype_id, filter_id, certainty, created_at, last_modified, active)
    VALUES (?, ?, ?, ?, ?, now(), now(), true)
    `, subtypeID, labelID, calltypeID, filterID, label.Certainty)
    if err != nil {
    return &ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("failed to insert label_subtype: %v", err),
    }
    }
    return nil
    }
    // importSegmentsIntoDB performs the transactional import
    func importSegmentsIntoDB(
    ctx context.Context,
    database *sql.DB,
    fileIDMap map[string]scannedDataFile,
    scannedFiles []scannedDataFile,
    mapping MappingFile,
    filterIDMap map[string]string,
    speciesIDMap map[string]string,
    calltypeIDMap map[string]map[string]string,
    datasetID string,
    progressHandler func(processed, total int, message string),
    ) ([]SegmentImport, int, int, []dataFileUpdate, []ImportSegmentError) {
    var importedSegments []SegmentImport
    var errors []ImportSegmentError
    importedLabels := 0
    importedSubtypes := 0
    var fileUpdates []dataFileUpdate
    tx, err := db.BeginLoggedTx(ctx, database, "import_segments")
    if err != nil {
    errors = append(errors, ImportSegmentError{
    Stage: StageImport,
    Message: fmt.Sprintf("failed to begin transaction: %v", err),
    })
    return nil, 0, 0, nil, errors
    }
    defer tx.Rollback()
    totalFiles := len(fileIDMap)
    processedFiles := 0
    for _, sf := range fileIDMap {
    if sf.FileID == "" {
    continue
    }
    processedFiles++
    if progressHandler != nil {
    progressHandler(processedFiles, totalFiles, filepath.Base(sf.DataPath))
    }
    fileUpdate := dataFileUpdate{
    DataPath: sf.DataPath,
    WavHash: sf.WavHash,
    LabelIDs: make(map[int]map[int]string),
    }
    for segIdx, seg := range sf.Segments {
    segImp, labelIDs, subtypes, segErrs := importSegment(ctx, tx, seg, segIdx, sf, datasetID, mapping, filterIDMap, speciesIDMap, calltypeIDMap)
    errors = append(errors, segErrs...)
    importedSubtypes += subtypes
    if len(segImp.Labels) == 0 {
    // Delete orphaned segment (no labels succeeded)
    if _, err := tx.ExecContext(ctx, `DELETE FROM segment WHERE id = ?`, segImp.SegmentID); err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("failed to delete orphaned segment: %v", err),
    })
    }
    } else {
    importedSegments = append(importedSegments, segImp)
    importedLabels += len(labelIDs)
    fileUpdate.LabelIDs[segIdx] = labelIDs
    }
    }
    fileUpdates = append(fileUpdates, fileUpdate)
    }
    if err := tx.Commit(); err != nil {
    errors = append(errors, ImportSegmentError{
    Stage: StageImport,
    Message: fmt.Sprintf("failed to commit transaction: %v", err),
    })
    return nil, 0, 0, nil, errors
    }
    return importedSegments, importedLabels, importedSubtypes, fileUpdates, errors
    }
    // importSegment inserts a single segment and its labels into the DB.
    func importSegment(
    ctx context.Context,
    tx *db.LoggedTx,
    seg *datafile.Segment,
    segIdx int,
    sf scannedDataFile,
    datasetID string,
    mapping MappingFile,
    filterIDMap map[string]string,
    speciesIDMap map[string]string,
    calltypeIDMap map[string]map[string]string,
    ) (SegmentImport, map[int]string, int, []ImportSegmentError) {
    var errors []ImportSegmentError
    if seg.StartTime >= seg.EndTime {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("invalid segment bounds: start=%.2f >= end=%.2f", seg.StartTime, seg.EndTime),
    })
    return SegmentImport{}, nil, 0, errors
    }
    if seg.EndTime > sf.Duration {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("segment end time (%.2f) exceeds file duration (%.2f)", seg.EndTime, sf.Duration),
    })
    return SegmentImport{}, nil, 0, errors
    }
    segmentID, err := utils.GenerateLongID()
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("failed to generate segment ID: %v", err),
    })
    return SegmentImport{}, nil, 0, errors
    }
    _, err = tx.ExecContext(ctx, `
    INSERT INTO segment (id, file_id, dataset_id, start_time, end_time, freq_low, freq_high, created_at, last_modified, active)
    VALUES (?, ?, ?, ?, ?, ?, ?, now(), now(), true)
    `, segmentID, sf.FileID, datasetID, seg.StartTime, seg.EndTime, seg.FreqLow, seg.FreqHigh)
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(sf.DataPath), Stage: StageImport,
    Message: fmt.Sprintf("failed to insert segment: %v", err),
    })
    return SegmentImport{}, nil, 0, errors
    }
    segImport := SegmentImport{
    SegmentID: segmentID,
    FileName: filepath.Base(sf.WavPath),
    StartTime: seg.StartTime,
    EndTime: seg.EndTime,
    FreqLow: seg.FreqLow,
    FreqHigh: seg.FreqHigh,
    Labels: make([]LabelImport, 0),
    }
    labelIDs := make(map[int]string)
    var subtypesImported int
    for labelIdx, label := range seg.Labels {
    result := importSingleLabel(ctx, tx, label, segmentID, segIdx, labelIdx, sf, mapping, filterIDMap, speciesIDMap, calltypeIDMap)
    if result.hasError {
    errors = append(errors, result.err)
    continue
    }
    labelIDs[labelIdx] = result.labelID
    segImport.Labels = append(segImport.Labels, result.labelImport)
    subtypesImported += result.subtypesImported
    }
    return segImport, labelIDs, subtypesImported, errors
    }
    // countTotalSegments counts total segments from validated files
    func countTotalSegments(fileIDMap map[string]scannedDataFile) int {
    count := 0
    for _, sf := range fileIDMap {
    count += len(sf.Segments)
    }
    return count
    }
    // writeIDsToDataFiles writes skraak_hash and skraak_label_ids back to .data files
    func writeIDsToDataFiles(fileUpdates []dataFileUpdate) []ImportSegmentError {
    var errors []ImportSegmentError
    for _, fu := range fileUpdates {
    // Parse the .data file
    df, err := datafile.ParseDataFile(fu.DataPath)
    if err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(fu.DataPath),
    Stage: StageImport,
    Message: fmt.Sprintf("failed to re-parse .data file for writing: %v", err),
    })
    continue
    }
    // Write skraak_hash to metadata
    if df.Meta.Extra == nil {
    df.Meta.Extra = make(map[string]any)
    }
    df.Meta.Extra["skraak_hash"] = fu.WavHash
    // Write skraak_label_id to each label
    for segIdx, labelIDs := range fu.LabelIDs {
    if segIdx >= len(df.Segments) {
    continue
    }
    seg := df.Segments[segIdx]
    for labelIdx, labelID := range labelIDs {
    if labelIdx >= len(seg.Labels) {
    continue
    }
    label := seg.Labels[labelIdx]
    if label.Extra == nil {
    label.Extra = make(map[string]any)
    }
    label.Extra["skraak_label_id"] = labelID
    }
    }
    // Write the updated .data file
    if err := df.Write(fu.DataPath); err != nil {
    errors = append(errors, ImportSegmentError{
    File: filepath.Base(fu.DataPath),
    Stage: StageImport,
    Message: fmt.Sprintf("failed to write updated .data file: %v", err),
    })
    continue
    }
    }
    return errors
    }