B5KI2L3WNZGSA6AHO3GR3LAKEER3PFRCYYFFZKOEYLLV4P7FG5WAC RDJ6UCAG2OHWUKWQUNMDPC4ZJRIO53BATIM5AE5FOKMHWZSGQ4XAC NQPVZ3PPQG6EPTTAEHXOXXGK27HZCISHZCOZU6K6RKWTRTOHMY6QC V2HX6HEB2OBNI4IMWD5XJN3RKAZYHAFJAJAPFP3BFYFZVZVEYN6AC 3DVPQOKB6BX63XSBIYYCPWBL2RBG3LXZS3XPQBANJP2FWVRAOVZQC PXQDGTR53ST5T4EV6XFRCAOC7N5RQX23GWVKMJGS2J35VUQLZL4AC ZKLAOPURUGKKG4KC7C5NEQ5WSZSFTZM7SCV7PIYJMWN4UKI7UI3QC func validateMappedCalltypes(queryer Reader, mappedCalltypes map[string]map[string]string, result *mapping.ValidationResult) error {
func validateMappedCalltypes(q db.Querier, mappedCalltypes map[string]map[string]string, result *mapping.ValidationResult) error {
if ctErr := importCalltype(ctx, tx, ids.labelID, label, ids.dbSpecies, ids.filterID, mapping, calltypeIDMap, sf); ctErr != nil {
if ctErr := importCalltype(ctx, m, ids.labelID, label, ids.dbSpecies, ids.filterID, mapping, calltypeIDMap, sf); ctErr != nil {
func insertLabel(ctx context.Context, tx *db.LoggedTx, ids resolvedLabelIDs, segmentID string, label *datafile.Label) error {_, err := tx.ExecContext(ctx, `
func insertLabel(ctx context.Context, m Mutator, ids resolvedLabelIDs, segmentID string, label *datafile.Label) error {_, err := m.ExecContext(ctx, `
segImp, labelIDs, subtypes, segErrs := importSegment(ctx, tx, seg, segIdx, sf, datasetID, mapping, filterIDMap, speciesIDMap, calltypeIDMap)
segImp, labelIDs, subtypes, segErrs := importSegment(ctx, m, seg, segIdx, sf, datasetID, mapping, filterIDMap, speciesIDMap, calltypeIDMap)
result := importSingleLabel(ctx, tx, label, segmentID, segIdx, labelIdx, sf, mapping, filterIDMap, speciesIDMap, calltypeIDMap)
result := importSingleLabel(ctx, m, label, segmentID, segIdx, labelIdx, sf, mapping, filterIDMap, speciesIDMap, calltypeIDMap)
// Phase B+C: Parse data files and validate against DBdatabase, err := db.OpenWriteableDB(db.ResolveDBPath(input.DBPath, ""))
// Phase B+C: Parse data files and validate against DB (read-only)var val *segmentValidationerr = db.WithReadDB(db.ResolveDBPath(input.DBPath, ""), func(database *sql.DB) error {var valErrors []ImportSegmentErrorvar err errorval, valErrors, err = validateAndPrepareSegments(database, input, mapping, dataFiles)output.Errors = append(output.Errors, valErrors...)return err})
importedSegments, importedLabels, importedSubtypes, fileUpdates, importErrors := importSegmentsIntoDB(ctx, database, val.fileIDMap, val.scannedFiles, mapping, val.filterIDMap, val.speciesIDMap, val.calltypeIDMap, input.DatasetID, input.ProgressHandler,)
var importedSegments []SegmentImportvar importedLabels, importedSubtypes intvar fileUpdates []dataFileUpdatevar importErrors []ImportSegmentErrorerr = db.WithWriteTx(ctx, db.ResolveDBPath(input.DBPath, ""), "import_segments", func(_ *sql.DB, tx *db.LoggedTx) error {importedSegments, importedLabels, importedSubtypes, fileUpdates, importErrors = importSegmentsIntoDB(ctx, tx, val.fileIDMap, val.scannedFiles, mapping, val.filterIDMap, val.speciesIDMap, val.calltypeIDMap, input.DatasetID, input.ProgressHandler,)return nil})
// Phase 3: Open database connection (single connection for all DB operations)database, err := db.OpenWriteableDB(db.ResolveDBPath(input.DBPath, ""))if err != nil {return output, fmt.Errorf("database connection failed: %w", err)}defer database.Close()// Phase 4: Get location data for astronomical calculationslocData, err := GetLocationData(database, input.LocationID)if err != nil {return output, fmt.Errorf("failed to get location data: %w", err)}
// Phase 3: Get location data and process file metadatavar locData *LocationDatavar result *wav.FileProcessingResulterr = db.WithReadDB(db.ResolveDBPath(input.DBPath, ""), func(database *sql.DB) error {var err errorlocData, err = GetLocationData(database, input.LocationID)if err != nil {return fmt.Errorf("failed to get location data: %w", err)}
// Phase 5: Process file metadataresult, err := wav.ProcessSingleFile(input.FilePath, locData.Latitude, locData.Longitude, locData.TimezoneID, true)
result, err = wav.ProcessSingleFile(input.FilePath, locData.Latitude, locData.Longitude, locData.TimezoneID, true)if err != nil {return fmt.Errorf("file processing failed: %w", err)}return nil})
// Phase 6: Insert into database (includes EnsureClusterPath)fileID, isDuplicate, err := insertFileIntoDB(ctx, database, result, input)
// Phase 4: Insert into database (includes EnsureClusterPath)var fileID stringvar isDuplicate boolerr = db.WithWriteTx(ctx, db.ResolveDBPath(input.DBPath, ""), "import_audio_file", func(_ *sql.DB, tx *db.LoggedTx) error {var err errorfileID, isDuplicate, err = insertFileIntoDB(ctx, tx, result, input)return err})
// Begin logged transactiontx, err := db.BeginLoggedTx(ctx, database, "import_audio_file")if err != nil {return "", false, fmt.Errorf("failed to begin transaction: %w", err)}defer tx.Rollback() // Rollback if not committed// Ensure cluster path is set (inside transaction for logging + rollback safety)if err := EnsureClusterPath(tx, input.ClusterID, filepath.Dir(input.FilePath)); err != nil {
// Ensure cluster path is setif err := EnsureClusterPath(m, input.ClusterID, filepath.Dir(input.FilePath)); err != nil {
func bulkCreateClusters(ctx context.Context, database *sql.DB, logger *progressLogger, locations []bulkLocationData, datasetID string) (map[string]string, int, int, error) {
func bulkCreateClusters(ctx context.Context, m Mutator, logger *progressLogger, locations []bulkLocationData, datasetID string) (map[string]string, int, int, error) {
func bulkImportAllFiles(database *sql.DB, logger *progressLogger, locations []bulkLocationData, clusterIDMap map[string]string, datasetID string) (bulkImportStats, []string) {
func bulkImportAllFiles(m Mutator, logger *progressLogger, locations []bulkLocationData, clusterIDMap map[string]string, datasetID string) (bulkImportStats, []string) {
func bulkCreateCluster(ctx context.Context, database *sql.DB, datasetID, locationID, name string, sampleRate int) (string, error) {
func bulkCreateCluster(ctx context.Context, m Mutator, datasetID, locationID, name string, sampleRate int) (string, error) {
tx, err := db.BeginLoggedTx(ctx, database, "bulk_file_import")if err != nil {return "", fmt.Errorf("failed to begin transaction: %w", err)
// If m is a LoggedTx, use it directly; otherwise begin a new transactionvar tx *db.LoggedTxswitch v := m.(type) {case *db.LoggedTx:tx = vdefault:tx, err = db.BeginLoggedTx(ctx, v.(*sql.DB), "bulk_file_import")if err != nil {return "", fmt.Errorf("failed to begin transaction: %w", err)}defer tx.Rollback()
if err = tx.Commit(); err != nil {return "", fmt.Errorf("failed to commit cluster creation: %w", err)
// Only commit if we created the transactionif _, isLoggedTx := m.(*db.LoggedTx); !isLoggedTx {if err = tx.Commit(); err != nil {return "", fmt.Errorf("failed to commit cluster creation: %w", err)}
// resolveTransaction extracts transaction info from a Mutator.// Returns the LoggedTx, whether caller should commit, and optionally a *sql.DB if available.func resolveTransaction(ctx context.Context, m Mutator, txName string) (*db.LoggedTx, bool, *sql.DB, error) {switch v := m.(type) {case *db.LoggedTx:// Already in a transaction, use itreturn v, false, nil, nilcase *sql.DB:// Need to create a transactiontx, err := db.BeginLoggedTx(ctx, v, txName)if err != nil {return nil, false, nil, fmt.Errorf("failed to begin transaction: %w", err)}return tx, true, v, nildefault:return nil, false, nil, fmt.Errorf("unsupported Mutator type")}}// runImportCluster executes ImportCluster with appropriate Reader/Mutator.func runImportCluster(database *sql.DB, tx *db.LoggedTx, input ClusterImportInput) (*ClusterImportOutput, error) {if database != nil {return ImportCluster(database, tx, input)}// tx is both Reader and Mutatorreturn ImportCluster(tx, tx, input)}
func bulkImportFilesForCluster(database *sql.DB, logger *progressLogger, folderPath, datasetID, locationID, clusterID string) (*bulkImportStats, error) {
func bulkImportFilesForCluster(m Mutator, logger *progressLogger, folderPath, datasetID, locationID, clusterID string) (*bulkImportStats, error) {