replacement in tools/import/mapping.go at line 42
[3.11248]→[3.59032:59049](∅→∅) replacement in tools/import/mapping.go at line 65
[3.11987]→[3.11987:12070](∅→∅) − if err := validateMappedSpecies(queryer, mappedSpeciesSet, &result); err != nil {
+ if err := validateMappedSpecies(q, mappedSpeciesSet, &result); err != nil {
replacement in tools/import/mapping.go at line 70
[3.12130]→[3.12130:12214](∅→∅) − if err := validateMappedCalltypes(queryer, mappedCalltypes, &result); err != nil {
+ if err := validateMappedCalltypes(q, mappedCalltypes, &result); err != nil {
replacement in tools/import/mapping.go at line 130
[3.13814]→[3.59050:59169](∅→∅) − func validateMappedSpecies(queryer Reader, mappedSpeciesSet map[string]bool, result *mapping.ValidationResult) error {
+ func validateMappedSpecies(q db.Querier, mappedSpeciesSet map[string]bool, result *mapping.ValidationResult) error {
replacement in tools/import/mapping.go at line 147
[3.14372]→[3.14372:14445](∅→∅) − rows, err := queryer.QueryContext(context.Background(), query, args...)
+ rows, err := q.QueryContext(context.Background(), query, args...)
replacement in tools/import/mapping.go at line 170
[3.14928]→[3.59288:59421](∅→∅) − func validateMappedCalltypes(queryer Reader, mappedCalltypes map[string]map[string]string, result *mapping.ValidationResult) error {
+ func validateMappedCalltypes(q db.Querier, mappedCalltypes map[string]map[string]string, result *mapping.ValidationResult) error {
replacement in tools/import/mapping.go at line 193
[3.15611]→[3.15611:15685](∅→∅) − rows, err := queryer.QueryContext(context.Background(), query, args...)
+ rows, err := q.QueryContext(context.Background(), query, args...)
replacement in tools/import/import_segments_prepare.go at line 18
replacement in tools/import/import_segments_prepare.go at line 30
− if err := validateSegmentHierarchy(database, input.DatasetID, input.LocationID, input.ClusterID); err != nil {
+ if err := validateSegmentHierarchy(q, input.DatasetID, input.LocationID, input.ClusterID); err != nil {
replacement in tools/import/import_segments_prepare.go at line 35
− filterIDMap, err := validateFiltersExist(database, uniqueFilters)
+ filterIDMap, err := validateFiltersExist(q, uniqueFilters)
replacement in tools/import/import_segments_prepare.go at line 41
[2.1134]→[2.1134:1236](∅→∅) − validationResult, err := ValidateMappingAgainstDB(database, mapping, uniqueSpecies, uniqueCalltypes)
+ validationResult, err := ValidateMappingAgainstDB(q, mapping, uniqueSpecies, uniqueCalltypes)
replacement in tools/import/import_segments_prepare.go at line 50
[2.1506]→[2.1506:1617](∅→∅) − speciesIDMap, calltypeIDMap, err := loadSpeciesCalltypeIDs(database, mapping, uniqueSpecies, uniqueCalltypes)
+ speciesIDMap, calltypeIDMap, err := loadSpeciesCalltypeIDs(q, mapping, uniqueSpecies, uniqueCalltypes)
replacement in tools/import/import_segments_prepare.go at line 56
[2.1795]→[2.1795:1899](∅→∅) − fileIDMap, hashErrors := validateAndMapFiles(database, scannedFiles, input.ClusterID, input.DatasetID)
+ fileIDMap, hashErrors := validateAndMapFiles(q, scannedFiles, input.ClusterID, input.DatasetID)
edit in tools/import/import_segments_db.go at line 5
[2.12523]→[2.12523:12539](∅→∅) edit in tools/import/import_segments_db.go at line 10
[2.12594]→[2.12594:12607](∅→∅) replacement in tools/import/import_segments_db.go at line 77
[2.14376]→[2.14376:14394](∅→∅) replacement in tools/import/import_segments_db.go at line 97
[2.14934]→[2.14934:15003](∅→∅) − if err := insertLabel(ctx, tx, ids, segmentID, label); err != nil {
+ if err := insertLabel(ctx, m, ids, segmentID, label); err != nil {
replacement in tools/import/import_segments_db.go at line 106
[2.15232]→[2.15232:15315](∅→∅) − if err := insertLabelMetadata(ctx, tx, ids.labelID, label.Comment); err != nil {
+ if err := insertLabelMetadata(ctx, m, ids.labelID, label.Comment); err != nil {
replacement in tools/import/import_segments_db.go at line 126
[2.15764]→[2.15764:15895](∅→∅) − if ctErr := importCalltype(ctx, tx, ids.labelID, label, ids.dbSpecies, ids.filterID, mapping, calltypeIDMap, sf); ctErr != nil {
+ if ctErr := importCalltype(ctx, m, ids.labelID, label, ids.dbSpecies, ids.filterID, mapping, calltypeIDMap, sf); ctErr != nil {
replacement in tools/import/import_segments_db.go at line 137
[2.16265]→[2.16265:16424](∅→∅) − func insertLabel(ctx context.Context, tx *db.LoggedTx, ids resolvedLabelIDs, segmentID string, label *datafile.Label) error {
− _, err := tx.ExecContext(ctx, `
+ func insertLabel(ctx context.Context, m Mutator, ids resolvedLabelIDs, segmentID string, label *datafile.Label) error {
+ _, err := m.ExecContext(ctx, `
replacement in tools/import/import_segments_db.go at line 149
[2.16806]→[2.16806:16902](∅→∅) − func insertLabelMetadata(ctx context.Context, tx *db.LoggedTx, labelID, comment string) error {
+ func insertLabelMetadata(ctx context.Context, m Mutator, labelID, comment string) error {
replacement in tools/import/import_segments_db.go at line 152
[2.17027]→[2.17027:17060](∅→∅) − _, err := tx.ExecContext(ctx, `
+ _, err := m.ExecContext(ctx, `
replacement in tools/import/import_segments_db.go at line 165
[2.17414]→[2.17414:17432](∅→∅) replacement in tools/import/import_segments_db.go at line 195
[2.18227]→[2.18227:18259](∅→∅) − _, err = tx.ExecContext(ctx, `
+ _, err = m.ExecContext(ctx, `
replacement in tools/import/import_segments_db.go at line 208
[2.18674]→[2.18674:18732](∅→∅) − // importSegmentsIntoDB performs the transactional import
+ // importSegmentsIntoDB performs the transactional import using the provided mutator.
replacement in tools/import/import_segments_db.go at line 211
[2.18781]→[2.18781:18800](∅→∅) edit in tools/import/import_segments_db.go at line 226
[2.19305]→[2.19305:19583](∅→∅) −
− tx, err := db.BeginLoggedTx(ctx, database, "import_segments")
− if err != nil {
− errors = append(errors, ImportSegmentError{
− Stage: StageImport,
− Message: fmt.Sprintf("failed to begin transaction: %v", err),
− })
− return nil, 0, 0, nil, errors
− }
− defer tx.Rollback()
replacement in tools/import/import_segments_db.go at line 247
[2.20009]→[2.20009:20153](∅→∅) − segImp, labelIDs, subtypes, segErrs := importSegment(ctx, tx, seg, segIdx, sf, datasetID, mapping, filterIDMap, speciesIDMap, calltypeIDMap)
+ segImp, labelIDs, subtypes, segErrs := importSegment(ctx, m, seg, segIdx, sf, datasetID, mapping, filterIDMap, speciesIDMap, calltypeIDMap)
replacement in tools/import/import_segments_db.go at line 253
[2.20310]→[2.20310:20415](∅→∅) − if _, err := tx.ExecContext(ctx, `DELETE FROM segment WHERE id = ?`, segImp.SegmentID); err != nil {
+ if _, err := m.ExecContext(ctx, `DELETE FROM segment WHERE id = ?`, segImp.SegmentID); err != nil {
edit in tools/import/import_segments_db.go at line 269
[2.20821]→[2.20821:21036](∅→∅) − if err := tx.Commit(); err != nil {
− errors = append(errors, ImportSegmentError{
− Stage: StageImport,
− Message: fmt.Sprintf("failed to commit transaction: %v", err),
− })
− return nil, 0, 0, nil, errors
− }
−
replacement in tools/import/import_segments_db.go at line 275
[2.21231]→[2.21231:21249](∅→∅) replacement in tools/import/import_segments_db.go at line 312
[2.22417]→[2.22417:22449](∅→∅) − _, err = tx.ExecContext(ctx, `
+ _, err = m.ExecContext(ctx, `
replacement in tools/import/import_segments_db.go at line 337
[2.23294]→[2.23294:23424](∅→∅) − result := importSingleLabel(ctx, tx, label, segmentID, segIdx, labelIdx, sf, mapping, filterIDMap, speciesIDMap, calltypeIDMap)
+ result := importSingleLabel(ctx, m, label, segmentID, segIdx, labelIdx, sf, mapping, filterIDMap, speciesIDMap, calltypeIDMap)
edit in tools/import/import_segments.go at line 5
replacement in tools/import/import_segments.go at line 118
[3.15683]→[3.15683:15812](∅→∅) − // Phase B+C: Parse data files and validate against DB
− database, err := db.OpenWriteableDB(db.ResolveDBPath(input.DBPath, ""))
+ // Phase B+C: Parse data files and validate against DB (read-only)
+ var val *segmentValidation
+ err = db.WithReadDB(db.ResolveDBPath(input.DBPath, ""), func(database *sql.DB) error {
+ var valErrors []ImportSegmentError
+ var err error
+ val, valErrors, err = validateAndPrepareSegments(database, input, mapping, dataFiles)
+ output.Errors = append(output.Errors, valErrors...)
+ return err
+ })
edit in tools/import/import_segments.go at line 128
[3.15829]→[3.15829:16079](∅→∅) − return output, fmt.Errorf("failed to open database: %w", err)
− }
− defer database.Close()
−
− val, valErrors, err := validateAndPrepareSegments(database, input, mapping, dataFiles)
− output.Errors = append(output.Errors, valErrors...)
− if err != nil {
replacement in tools/import/import_segments.go at line 136
[3.16279]→[3.16279:16539](∅→∅) − importedSegments, importedLabels, importedSubtypes, fileUpdates, importErrors := importSegmentsIntoDB(
− ctx, database, val.fileIDMap, val.scannedFiles, mapping, val.filterIDMap, val.speciesIDMap, val.calltypeIDMap, input.DatasetID, input.ProgressHandler,
− )
+ var importedSegments []SegmentImport
+ var importedLabels, importedSubtypes int
+ var fileUpdates []dataFileUpdate
+ var importErrors []ImportSegmentError
+ err = db.WithWriteTx(ctx, db.ResolveDBPath(input.DBPath, ""), "import_segments", func(_ *sql.DB, tx *db.LoggedTx) error {
+ importedSegments, importedLabels, importedSubtypes, fileUpdates, importErrors = importSegmentsIntoDB(
+ ctx, tx, val.fileIDMap, val.scannedFiles, mapping, val.filterIDMap, val.speciesIDMap, val.calltypeIDMap, input.DatasetID, input.ProgressHandler,
+ )
+ return nil
+ })
edit in tools/import/import_segments.go at line 148
+ if err != nil {
+ return output, err
+ }
replacement in tools/import/import_file.go at line 60
[3.45456]→[3.45456:45782](∅→∅),
[3.45782]→[3.49746:49807](∅→∅),
[3.49807]→[3.45849:45937](∅→∅),
[3.45849]→[3.45849:45937](∅→∅) − // Phase 3: Open database connection (single connection for all DB operations)
− database, err := db.OpenWriteableDB(db.ResolveDBPath(input.DBPath, ""))
− if err != nil {
− return output, fmt.Errorf("database connection failed: %w", err)
− }
− defer database.Close()
−
− // Phase 4: Get location data for astronomical calculations
− locData, err := GetLocationData(database, input.LocationID)
− if err != nil {
− return output, fmt.Errorf("failed to get location data: %w", err)
− }
+ // Phase 3: Get location data and process file metadata
+ var locData *LocationData
+ var result *wav.FileProcessingResult
+ err = db.WithReadDB(db.ResolveDBPath(input.DBPath, ""), func(database *sql.DB) error {
+ var err error
+ locData, err = GetLocationData(database, input.LocationID)
+ if err != nil {
+ return fmt.Errorf("failed to get location data: %w", err)
+ }
replacement in tools/import/import_file.go at line 70
[3.45938]→[3.45938:45973](∅→∅),
[3.45973]→[3.44754:44871](∅→∅) − // Phase 5: Process file metadata
− result, err := wav.ProcessSingleFile(input.FilePath, locData.Latitude, locData.Longitude, locData.TimezoneID, true)
+ result, err = wav.ProcessSingleFile(input.FilePath, locData.Latitude, locData.Longitude, locData.TimezoneID, true)
+ if err != nil {
+ return fmt.Errorf("file processing failed: %w", err)
+ }
+ return nil
+ })
replacement in tools/import/import_file.go at line 80
[3.46215]→[3.46215:46278](∅→∅) − return output, fmt.Errorf("file processing failed: %w", err)
replacement in tools/import/import_file.go at line 91
[3.46768]→[3.3345:3484](∅→∅) − // Phase 6: Insert into database (includes EnsureClusterPath)
− fileID, isDuplicate, err := insertFileIntoDB(ctx, database, result, input)
+ // Phase 4: Insert into database (includes EnsureClusterPath)
+ var fileID string
+ var isDuplicate bool
+ err = db.WithWriteTx(ctx, db.ResolveDBPath(input.DBPath, ""), "import_audio_file", func(_ *sql.DB, tx *db.LoggedTx) error {
+ var err error
+ fileID, isDuplicate, err = insertFileIntoDB(ctx, tx, result, input)
+ return err
+ })
replacement in tools/import/import_file.go at line 143
[3.48100]→[3.48100:48160](∅→∅) − // insertFileIntoDB inserts a single file into the database
+ // insertFileIntoDB inserts a single file into the database using the provided mutator.
replacement in tools/import/import_file.go at line 147
[3.48245]→[3.48245:48264](∅→∅) replacement in tools/import/import_file.go at line 151
[3.48369]→[3.48369:48604](∅→∅),
[3.48604]→[3.3510:3593](∅→∅),
[3.3593]→[3.49808:49902](∅→∅) − // Begin logged transaction
− tx, err := db.BeginLoggedTx(ctx, database, "import_audio_file")
− if err != nil {
− return "", false, fmt.Errorf("failed to begin transaction: %w", err)
− }
− defer tx.Rollback() // Rollback if not committed
−
− // Ensure cluster path is set (inside transaction for logging + rollback safety)
− if err := EnsureClusterPath(tx, input.ClusterID, filepath.Dir(input.FilePath)); err != nil {
+ // Ensure cluster path is set
+ if err := EnsureClusterPath(m, input.ClusterID, filepath.Dir(input.FilePath)); err != nil {
replacement in tools/import/import_file.go at line 157
[3.48634]→[3.49903:49966](∅→∅) − existingID, isDup, err := CheckDuplicateHash(tx, result.Hash)
+ existingID, isDup, err := CheckDuplicateHash(m, result.Hash)
edit in tools/import/import_file.go at line 162
[3.48759]→[3.3767:3849](∅→∅) − // Commit the transaction to log any EnsureClusterPath UPDATE
− _ = tx.Commit()
replacement in tools/import/import_file.go at line 166
[3.3902]→[3.3902:3962](∅→∅) − fileID, err := insertNewFileRecord(ctx, tx, result, input)
+ fileID, err := insertNewFileRecord(ctx, m, result, input)
edit in tools/import/import_file.go at line 169
[3.4003]→[3.48790:48794](∅→∅),
[3.48790]→[3.48790:48794](∅→∅),
[3.48794]→[3.4004:4132](∅→∅) − }
−
− // Commit transaction
− if err = tx.Commit(); err != nil {
− return "", false, fmt.Errorf("transaction commit failed: %w", err)
replacement in tools/import/import_file.go at line 177
[3.4310]→[3.4310:4328](∅→∅) replacement in tools/import/import_file.go at line 187
[3.48962]→[3.48962:48994](∅→∅) − _, err = tx.ExecContext(ctx, `
+ _, err = m.ExecContext(ctx, `
replacement in tools/import/import_file.go at line 203
[3.49600]→[3.49600:49632](∅→∅) − _, err = tx.ExecContext(ctx, `
+ _, err = m.ExecContext(ctx, `
replacement in tools/import/import_file.go at line 213
[3.49942]→[3.49942:49975](∅→∅) − _, err = tx.ExecContext(ctx, `
+ _, err = m.ExecContext(ctx, `
replacement in tools/import/bulk_file_import.go at line 258
[3.59217]→[3.59217:59391](∅→∅) − func bulkCreateClusters(ctx context.Context, database *sql.DB, logger *progressLogger, locations []bulkLocationData, datasetID string) (map[string]string, int, int, error) {
+ func bulkCreateClusters(ctx context.Context, m Mutator, logger *progressLogger, locations []bulkLocationData, datasetID string) (map[string]string, int, int, error) {
replacement in tools/import/bulk_file_import.go at line 267
[3.59614]→[3.49968:50026](∅→∅) − err := database.QueryRowContext(context.Background(), `
+ err := m.QueryRowContext(context.Background(), `
replacement in tools/import/bulk_file_import.go at line 274
[3.59838]→[3.59838:59949](∅→∅) − clusterID, err = bulkCreateCluster(ctx, database, datasetID, loc.LocationID, loc.DateRange, loc.SampleRate)
+ clusterID, err = bulkCreateCluster(ctx, m, datasetID, loc.LocationID, loc.DateRange, loc.SampleRate)
replacement in tools/import/bulk_file_import.go at line 299
[3.60782]→[3.60782:60958](∅→∅) − func bulkImportAllFiles(database *sql.DB, logger *progressLogger, locations []bulkLocationData, clusterIDMap map[string]string, datasetID string) (bulkImportStats, []string) {
+ func bulkImportAllFiles(m Mutator, logger *progressLogger, locations []bulkLocationData, clusterIDMap map[string]string, datasetID string) (bulkImportStats, []string) {
replacement in tools/import/bulk_file_import.go at line 318
[3.61445]→[3.61445:61562](∅→∅) − stats, err := bulkImportFilesForCluster(database, logger, loc.DirectoryPath, datasetID, loc.LocationID, clusterID)
+ stats, err := bulkImportFilesForCluster(m, logger, loc.DirectoryPath, datasetID, loc.LocationID, clusterID)
replacement in tools/import/bulk_file_import.go at line 421
[3.64241]→[3.64241:64373](∅→∅) − func bulkCreateCluster(ctx context.Context, database *sql.DB, datasetID, locationID, name string, sampleRate int) (string, error) {
+ func bulkCreateCluster(ctx context.Context, m Mutator, datasetID, locationID, name string, sampleRate int) (string, error) {
replacement in tools/import/bulk_file_import.go at line 431
[3.64623]→[3.50027:50155](∅→∅) − err = database.QueryRowContext(context.Background(), "SELECT name FROM location WHERE id = ?", locationID).Scan(&locationName)
+ err = m.QueryRowContext(context.Background(), "SELECT name FROM location WHERE id = ?", locationID).Scan(&locationName)
replacement in tools/import/bulk_file_import.go at line 440
[3.64961]→[3.64961:65106](∅→∅) − tx, err := db.BeginLoggedTx(ctx, database, "bulk_file_import")
− if err != nil {
− return "", fmt.Errorf("failed to begin transaction: %w", err)
+ // If m is a LoggedTx, use it directly; otherwise begin a new transaction
+ var tx *db.LoggedTx
+ switch v := m.(type) {
+ case *db.LoggedTx:
+ tx = v
+ default:
+ tx, err = db.BeginLoggedTx(ctx, v.(*sql.DB), "bulk_file_import")
+ if err != nil {
+ return "", fmt.Errorf("failed to begin transaction: %w", err)
+ }
+ defer tx.Rollback()
edit in tools/import/bulk_file_import.go at line 452
[3.65109]→[3.65109:65130](∅→∅) replacement in tools/import/bulk_file_import.go at line 461
[3.65469]→[3.65469:65575](∅→∅) − if err = tx.Commit(); err != nil {
− return "", fmt.Errorf("failed to commit cluster creation: %w", err)
+ // Only commit if we created the transaction
+ if _, isLoggedTx := m.(*db.LoggedTx); !isLoggedTx {
+ if err = tx.Commit(); err != nil {
+ return "", fmt.Errorf("failed to commit cluster creation: %w", err)
+ }
edit in tools/import/bulk_file_import.go at line 471
+ // resolveTransaction extracts transaction info from a Mutator.
+ // Returns the LoggedTx, whether caller should commit, and optionally a *sql.DB if available.
+ func resolveTransaction(ctx context.Context, m Mutator, txName string) (*db.LoggedTx, bool, *sql.DB, error) {
+ switch v := m.(type) {
+ case *db.LoggedTx:
+ // Already in a transaction, use it
+ return v, false, nil, nil
+ case *sql.DB:
+ // Need to create a transaction
+ tx, err := db.BeginLoggedTx(ctx, v, txName)
+ if err != nil {
+ return nil, false, nil, fmt.Errorf("failed to begin transaction: %w", err)
+ }
+ return tx, true, v, nil
+ default:
+ return nil, false, nil, fmt.Errorf("unsupported Mutator type")
+ }
+ }
+
+ // runImportCluster executes ImportCluster with appropriate Reader/Mutator.
+ func runImportCluster(database *sql.DB, tx *db.LoggedTx, input ClusterImportInput) (*ClusterImportOutput, error) {
+ if database != nil {
+ return ImportCluster(database, tx, input)
+ }
+ // tx is both Reader and Mutator
+ return ImportCluster(tx, tx, input)
+ }
+
replacement in tools/import/bulk_file_import.go at line 500
[3.65677]→[3.65677:65831](∅→∅) − func bulkImportFilesForCluster(database *sql.DB, logger *progressLogger, folderPath, datasetID, locationID, clusterID string) (*bulkImportStats, error) {
+ func bulkImportFilesForCluster(m Mutator, logger *progressLogger, folderPath, datasetID, locationID, clusterID string) (*bulkImportStats, error) {
replacement in tools/import/bulk_file_import.go at line 513
[3.66162]→[3.66162:66228](∅→∅) − tx, err := db.BeginLoggedTx(ctx, database, "import_audio_files")
+
+ // Determine if we need to create a transaction or use existing one
+ tx, shouldCommit, database, err := resolveTransaction(ctx, m, "import_audio_files")
replacement in tools/import/bulk_file_import.go at line 517
[3.66245]→[3.66245:66310](∅→∅) − return nil, fmt.Errorf("failed to begin transaction: %w", err)
replacement in tools/import/bulk_file_import.go at line 523
[3.4999]→[3.4999:5015](∅→∅) + if shouldCommit {
+ tx.Rollback()
+ }
replacement in tools/import/bulk_file_import.go at line 529
[3.5083]→[3.50209:50280](∅→∅) − clusterOutput, err := ImportCluster(database, tx, ClusterImportInput{
+ // For ImportCluster, we need both Reader and Mutator
+ clusterOutput, err := runImportCluster(database, tx, ClusterImportInput{
replacement in tools/import/bulk_file_import.go at line 538
[3.66555]→[3.66555:66571](∅→∅) + if shouldCommit {
+ tx.Rollback()
+ }
replacement in tools/import/bulk_file_import.go at line 544
[3.66593]→[3.66593:66693](∅→∅) − if err := tx.Commit(); err != nil {
− return nil, fmt.Errorf("transaction commit failed: %w", err)
+ if shouldCommit {
+ if err := tx.Commit(); err != nil {
+ return nil, fmt.Errorf("transaction commit failed: %w", err)
+ }