tools/ refactor
Dependencies
- [2]
XO5DF6WRI tiedied up readme - [3]
T2WZBTVFcyclo 22 - [4]
E27ZWCDPcyclo over 18 - [5]
WKQ7LFTPrefactor of utils/ - [6]
RUVJ3V4Ncyclo to 14 now - [7]
KLUEQ6X5cyclo 21+ - [8]
GPQSOVBPcyclo complexity over 25 - [9]
LQLC7S3Atrying gemini: Inconsistent Standards in @utils/ refactoring - [10]
5FUZQJW5readme - [11]
KZKLAINJrun out of space on nest, cleaned out - [12]
2P27XV3Dfixed cyclo over 30 - [13]
JAT3DXOLcyclo over 15
Change contents
- replacement in tools/sql.go at line 76[6.261553]→[3.2248:2388](∅→∅),[3.2388]→[6.261803:261806](∅→∅),[6.261803]→[6.261803:261806](∅→∅),[6.261806]→[3.2389:2413](∅→∅)
database, err := db.OpenReadOnlyDB(dbPath)if err != nil {return ExecuteSQLOutput{}, fmt.Errorf("database connection failed: %w", err)}defer database.Close()var output ExecuteSQLOutputerr := db.WithReadDB(dbPath, func(database *sql.DB) error {rows, rerr := executeSQLQuery(ctx, database, query, input.Parameters)if rerr != nil {return rerr}defer rows.Close() - replacement in tools/sql.go at line 84[6.261807]→[3.2414:2534](∅→∅),[3.2534]→[6.262042:262045](∅→∅),[6.262042]→[6.262042:262045](∅→∅),[6.262045]→[3.2535:2555](∅→∅)
rows, err := executeSQLQuery(ctx, database, query, input.Parameters)if err != nil {return ExecuteSQLOutput{}, err}defer rows.Close()columnInfo, columns, cerr := buildColumnInfo(rows)if cerr != nil {return cerr} - replacement in tools/sql.go at line 89
columnInfo, columns, err := buildColumnInfo(rows)if err != nil {return ExecuteSQLOutput{}, err}results, serr := scanResultRows(rows, columns)if serr != nil {return serr} - replacement in tools/sql.go at line 94[6.262316]→[3.2658:2705](∅→∅),[3.2705]→[6.262413:262430](∅→∅),[6.262413]→[6.262413:262430](∅→∅),[6.262430]→[3.2706:2739](∅→∅),[3.2739]→[6.262509:262512](∅→∅),[6.262509]→[6.262509:262512](∅→∅)
results, err := scanResultRows(rows, columns)if err != nil {return ExecuteSQLOutput{}, err}// Handle empty results (return empty array, not error)if results == nil {results = []map[string]any{}} - replacement in tools/sql.go at line 99[6.262563]→[3.2740:2849](∅→∅),[3.2849]→[6.262776:262779](∅→∅),[6.262776]→[6.262776:262779](∅→∅),[6.262779]→[3.2850:3036](∅→∅),[3.3036]→[6.262871:262874](∅→∅),[6.262871]→[6.262871:262874](∅→∅)
// Handle empty results (return empty array, not error)if results == nil {results = []map[string]any{}}// Detect truncation: if we auto-added limit+1 and got more than limit rowslimited := falseif autoAddedLimit && len(results) > limit {limited = trueresults = results[:limit]}// Detect truncation: if we auto-added limit+1 and got more than limit rowslimited := falseif autoAddedLimit && len(results) > limit {limited = trueresults = results[:limit]} - replacement in tools/sql.go at line 106
queryReported := buildQueryReported(input.Query, autoAddedLimit, limit)queryReported := buildQueryReported(input.Query, autoAddedLimit, limit) - replacement in tools/sql.go at line 108
return ExecuteSQLOutput{Rows: results,RowCount: len(results),Columns: columnInfo,Limited: limited,Query: queryReported,}, niloutput = ExecuteSQLOutput{Rows: results,RowCount: len(results),Columns: columnInfo,Limited: limited,Query: queryReported,}return nil})return output, err - edit in tools/pattern.go at line 99
}// Open writable database connectiondatabase, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("database connection failed: %w", err) - edit in tools/pattern.go at line 100
defer database.Close() - replacement in tools/pattern.go at line 101
// Begin logged transactiontx, err := db.BeginLoggedTx(ctx, database, "create_or_update_pattern")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {if err != nil {tx.Rollback()err := db.WithWriteTx(ctx, dbPath, "create_or_update_pattern", func(database *sql.DB, tx *db.LoggedTx) error {// Check if pattern with same record_s/sleep_s already existsexisting, found, ferr := findExistingPattern(ctx, tx, *input.RecordSeconds, *input.SleepSeconds)if ferr != nil {return fmt.Errorf("failed to check for existing pattern: %w", ferr) - replacement in tools/pattern.go at line 107[6.282699]→[6.282699:282768](∅→∅),[6.282768]→[6.7606:7816](∅→∅),[6.7816]→[6.283487:283594](∅→∅),[6.283487]→[6.283487:283594](∅→∅)
}()// Check if pattern with same record_s/sleep_s already existsif existing, found, err := findExistingPattern(ctx, tx, *input.RecordSeconds, *input.SleepSeconds); err != nil {return output, fmt.Errorf("failed to check for existing pattern: %w", err)} else if found {if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)if found {output.Pattern = existingoutput.Message = fmt.Sprintf("Pattern already exists with ID %s (record %ds, sleep %ds) - returning existing pattern",existing.ID, existing.RecordS, existing.SleepS)return nil // commit transaction - edit in tools/pattern.go at line 113[6.283598]→[6.7817:7845](∅→∅),[6.7845]→[6.283626:283747](∅→∅),[6.283626]→[6.283626:283747](∅→∅),[6.283747]→[6.7846:7897](∅→∅),[6.7897]→[6.283796:283817](∅→∅),[6.283796]→[6.283796:283817](∅→∅),[6.283928]→[6.283928:284415](∅→∅)
output.Pattern = existingoutput.Message = fmt.Sprintf("Pattern already exists with ID %s (record %ds, sleep %ds) - returning existing pattern",existing.ID, existing.RecordS, existing.SleepS)return output, nil}// Generate IDid, err := utils.GenerateShortID()if err != nil {return output, fmt.Errorf("failed to generate ID: %w", err)}// Insert pattern_, err = tx.ExecContext(ctx,"INSERT INTO cyclic_recording_pattern (id, record_s, sleep_s, created_at, last_modified, active) VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, TRUE)",id, *input.RecordSeconds, *input.SleepSeconds,)if err != nil {return output, fmt.Errorf("failed to create pattern: %w", err)} - replacement in tools/pattern.go at line 114
// Fetch the created patternvar pattern db.CyclicRecordingPatternerr = tx.QueryRowContext(ctx,"SELECT id, record_s, sleep_s, created_at, last_modified, active FROM cyclic_recording_pattern WHERE id = ?",id,).Scan(&pattern.ID, &pattern.RecordS, &pattern.SleepS, &pattern.CreatedAt, &pattern.LastModified, &pattern.Active)if err != nil {return output, fmt.Errorf("failed to fetch created pattern: %w", err)}// Generate IDid, gerr := utils.GenerateShortID()if gerr != nil {return fmt.Errorf("failed to generate ID: %w", gerr)} - replacement in tools/pattern.go at line 120
if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)}// Insert patternif _, err := tx.ExecContext(ctx,"INSERT INTO cyclic_recording_pattern (id, record_s, sleep_s, created_at, last_modified, active) VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, TRUE)",id, *input.RecordSeconds, *input.SleepSeconds,); err != nil {return fmt.Errorf("failed to create pattern: %w", err)} - replacement in tools/pattern.go at line 128
output.Pattern = patternoutput.Message = fmt.Sprintf("Successfully created cyclic recording pattern with ID %s (record %ds, sleep %ds)",pattern.ID, pattern.RecordS, pattern.SleepS)// Fetch the created patternvar pattern db.CyclicRecordingPatternif err := tx.QueryRowContext(ctx,"SELECT id, record_s, sleep_s, created_at, last_modified, active FROM cyclic_recording_pattern WHERE id = ?",id,).Scan(&pattern.ID, &pattern.RecordS, &pattern.SleepS, &pattern.CreatedAt, &pattern.LastModified, &pattern.Active); err != nil {return fmt.Errorf("failed to fetch created pattern: %w", err)} - replacement in tools/pattern.go at line 137
return output, niloutput.Pattern = patternoutput.Message = fmt.Sprintf("Successfully created cyclic recording pattern with ID %s (record %ds, sleep %ds)",pattern.ID, pattern.RecordS, pattern.SleepS)return nil})return output, err - replacement in tools/pattern.go at line 145[6.285163]→[6.285163:285274](∅→∅),[6.285298]→[6.285298:285299](∅→∅),[6.285299]→[6.7898:7957](∅→∅),[6.7957]→[6.285394:285419](∅→∅),[6.285394]→[6.285394:285419](∅→∅),[6.285754]→[6.285754:285985](∅→∅)
func updatePattern(ctx context.Context, input PatternInput) (PatternOutput, error) {var output PatternOutputif err := validateUpdatePatternInput(input); err != nil {return output, err}// Open writable databasedatabase, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("failed to open database: %w", err)}defer database.Close()// Verify pattern exists and check active status// verifyPatternExistsAndActive checks that a pattern exists and is active.func verifyPatternExistsAndActive(database *sql.DB, patternID string) error { - replacement in tools/pattern.go at line 148
err = database.QueryRow(err := database.QueryRow( - replacement in tools/pattern.go at line 150
*input.ID, *input.ID,patternID, patternID, - replacement in tools/pattern.go at line 153
return output, fmt.Errorf("failed to query pattern: %w", err)return fmt.Errorf("failed to query pattern: %w", err) - replacement in tools/pattern.go at line 156
return output, fmt.Errorf("pattern not found: %s", *input.ID)return fmt.Errorf("pattern not found: %s", patternID) - replacement in tools/pattern.go at line 159
return output, fmt.Errorf("pattern '%s' is not active (cannot update inactive patterns)", *input.ID)return fmt.Errorf("pattern '%s' is not active (cannot update inactive patterns)", patternID) - edit in tools/pattern.go at line 161
return nil} - replacement in tools/pattern.go at line 164
// Build dynamic UPDATE query// buildPatternUpdateQuery builds the dynamic UPDATE query and args for pattern fields.func buildPatternUpdateQuery(input PatternInput) (string, []any, error) { - replacement in tools/pattern.go at line 179
return output, fmt.Errorf("no fields provided to update")return "", nil, fmt.Errorf("no fields provided to update") - edit in tools/pattern.go at line 182
// Always update last_modified - edit in tools/pattern.go at line 186
return query, args, nil} - replacement in tools/pattern.go at line 189
// Begin logged transaction for updatetx, err := db.BeginLoggedTx(ctx, database, "create_or_update_pattern")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)func updatePattern(ctx context.Context, input PatternInput) (PatternOutput, error) {var output PatternOutputif err := validateUpdatePatternInput(input); err != nil {return output, err - replacement in tools/pattern.go at line 195
defer func() {if err != nil {tx.Rollback()err := db.WithWriteTx(ctx, dbPath, "create_or_update_pattern", func(database *sql.DB, tx *db.LoggedTx) error {if err := verifyPatternExistsAndActive(database, *input.ID); err != nil {return err - edit in tools/pattern.go at line 200
}() - replacement in tools/pattern.go at line 201
_, err = tx.Exec(query, args...)if err != nil {return output, fmt.Errorf("failed to update pattern: %w", err)}query, args, qerr := buildPatternUpdateQuery(input)if qerr != nil {return qerr} - replacement in tools/pattern.go at line 206[6.287532]→[6.287532:287733](∅→∅),[6.287733]→[6.8185:8198](∅→∅),[6.8198]→[6.287746:287954](∅→∅),[6.287746]→[6.287746:287954](∅→∅)
// Fetch the updated patternvar pattern db.CyclicRecordingPatternerr = tx.QueryRow("SELECT id, record_s, sleep_s, created_at, last_modified, active FROM cyclic_recording_pattern WHERE id = ?",*input.ID,).Scan(&pattern.ID, &pattern.RecordS, &pattern.SleepS, &pattern.CreatedAt, &pattern.LastModified, &pattern.Active)if err != nil {return output, fmt.Errorf("failed to fetch updated pattern: %w", err)}if _, err := tx.Exec(query, args...); err != nil {return fmt.Errorf("failed to update pattern: %w", err)} - replacement in tools/pattern.go at line 210
if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)}// Fetch the updated patternvar pattern db.CyclicRecordingPatternif err := tx.QueryRow("SELECT id, record_s, sleep_s, created_at, last_modified, active FROM cyclic_recording_pattern WHERE id = ?",*input.ID,).Scan(&pattern.ID, &pattern.RecordS, &pattern.SleepS, &pattern.CreatedAt, &pattern.LastModified, &pattern.Active); err != nil {return fmt.Errorf("failed to fetch updated pattern: %w", err)} - replacement in tools/pattern.go at line 219
output.Pattern = patternoutput.Message = fmt.Sprintf("Successfully updated pattern (ID: %s, record %ds, sleep %ds)",pattern.ID, pattern.RecordS, pattern.SleepS)return output, niloutput.Pattern = patternoutput.Message = fmt.Sprintf("Successfully updated pattern (ID: %s, record %ds, sleep %ds)",pattern.ID, pattern.RecordS, pattern.SleepS)return nil})return output, err - replacement in tools/location.go at line 112
// Open writable database connectiondatabase, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("database connection failed: %w", err)}defer database.Close()// Begin logged transactiontx, err := db.BeginLoggedTx(ctx, database, "create_or_update_location")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {if err != nil {tx.Rollback()err := db.WithWriteTx(ctx, dbPath, "create_or_update_location", func(database *sql.DB, tx *db.LoggedTx) error {if err := verifyDatasetExistsAndActive(ctx, tx, *input.DatasetID); err != nil {return err - edit in tools/location.go at line 116
}() - replacement in tools/location.go at line 117[6.291755]→[3.7039:7141](∅→∅),[3.7141]→[6.292249:292252](∅→∅),[6.292249]→[6.292249:292252](∅→∅),[6.292356]→[6.292356:292642](∅→∅),[6.292642]→[3.7142:7203](∅→∅),[3.7203]→[6.293577:293580](∅→∅),[6.293577]→[6.293577:293580](∅→∅)
if err := verifyDatasetExistsAndActive(ctx, tx, *input.DatasetID); err != nil {return output, err}// Check for existing location with same name in dataset (UNIQUE constraint)var existingID stringerr = tx.QueryRowContext(ctx,"SELECT id FROM location WHERE dataset_id = ? AND name = ? AND active = true",*input.DatasetID, *input.Name,).Scan(&existingID)if err == nil {return returnExistingLocation(ctx, tx, existingID, output)}// Check for existing location with same name in dataset (UNIQUE constraint)var existingID stringqerr := tx.QueryRowContext(ctx,"SELECT id FROM location WHERE dataset_id = ? AND name = ? AND active = true",*input.DatasetID, *input.Name,).Scan(&existingID) - replacement in tools/location.go at line 124
// Generate IDid, err := utils.GenerateShortID()if err != nil {return output, fmt.Errorf("failed to generate ID: %w", err)}if qerr == nil {result, rerr := returnExistingLocation(ctx, tx, existingID)if rerr != nil {return rerr}output = resultreturn nil} - replacement in tools/location.go at line 133
// Insert location_, err = tx.ExecContext(ctx,"INSERT INTO location (id, dataset_id, name, latitude, longitude, timezone_id, description, created_at, last_modified, active) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, TRUE)",id, *input.DatasetID, *input.Name, *input.Latitude, *input.Longitude, *input.TimezoneID, input.Description,)if err != nil {return output, fmt.Errorf("failed to create location: %w", err)}// Generate IDid, gerr := utils.GenerateShortID()if gerr != nil {return fmt.Errorf("failed to generate ID: %w", gerr)} - replacement in tools/location.go at line 139[6.294170]→[6.294170:294201](∅→∅),[6.294201]→[3.7204:7253](∅→∅),[3.7253]→[6.294616:294709](∅→∅),[6.294616]→[6.294616:294709](∅→∅)
// Fetch the created locationlocation, err := fetchLocationByID(ctx, tx, id)if err != nil {return output, fmt.Errorf("failed to fetch created location: %w", err)}// Insert locationif _, err := tx.ExecContext(ctx,"INSERT INTO location (id, dataset_id, name, latitude, longitude, timezone_id, description, created_at, last_modified, active) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, TRUE)",id, *input.DatasetID, *input.Name, *input.Latitude, *input.Longitude, *input.TimezoneID, input.Description,); err != nil {return fmt.Errorf("failed to create location: %w", err)} - replacement in tools/location.go at line 147
if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)}// Fetch the created locationlocation, ferr := fetchLocationByID(ctx, tx, id)if ferr != nil {return fmt.Errorf("failed to fetch created location: %w", ferr)} - replacement in tools/location.go at line 153
output.Location = locationoutput.Message = fmt.Sprintf("Successfully created location '%s' with ID %s (%.6f, %.6f, %s)",location.Name, location.ID, location.Latitude, location.Longitude, location.TimezoneID)return output, niloutput.Location = locationoutput.Message = fmt.Sprintf("Successfully created location '%s' with ID %s (%.6f, %.6f, %s)",location.Name, location.ID, location.Latitude, location.Longitude, location.TimezoneID)return nil})return output, err - replacement in tools/location.go at line 162
func returnExistingLocation(ctx context.Context, tx *db.LoggedTx, existingID string, output LocationOutput) (LocationOutput, error) {// Caller is responsible for committing the transaction.func returnExistingLocation(ctx context.Context, tx *db.LoggedTx, existingID string) (LocationOutput, error) {var output LocationOutput - edit in tools/location.go at line 168
}if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err) - replacement in tools/location.go at line 175
func verifyDatasetExistsAndActive(ctx context.Context, queryer interface {QueryRowContext(context.Context, string, ...any) *sql.Row}, datasetID string) error {var exists, active boolerr := queryer.QueryRowContext(ctx,"SELECT EXISTS(SELECT 1 FROM dataset WHERE id = ?), COALESCE((SELECT active FROM dataset WHERE id = ?), false)",datasetID, datasetID,).Scan(&exists, &active)if err != nil {return fmt.Errorf("failed to verify dataset: %w", err)}if !exists {return fmt.Errorf("dataset with ID '%s' does not exist", datasetID)}if !active {return fmt.Errorf("dataset (ID: %s) is not active", datasetID)}return nilfunc verifyDatasetExistsAndActive(ctx context.Context, q db.Querier, datasetID string) error {_, err := db.DatasetExistsAndActive(q, datasetID)return err - replacement in tools/location.go at line 198
// Open writable databasedatabase, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("failed to open database: %w", err)}defer database.Close()err := db.WithWriteTx(ctx, dbPath, "create_or_update_location", func(database *sql.DB, tx *db.LoggedTx) error {if err := verifyLocationExistsAndActive(database, locationID); err != nil {return err} - replacement in tools/location.go at line 203
if err := verifyLocationExistsAndActive(database, locationID); err != nil {return output, err}// Verify dataset exists if DatasetID provided (relationship consistency)if input.DatasetID != nil {if err := verifyDatasetExistsAndActive(ctx, database, *input.DatasetID); err != nil {return err}} - replacement in tools/location.go at line 210
// Verify dataset exists if DatasetID provided (relationship consistency)if input.DatasetID != nil {if err := verifyDatasetExistsAndActive(context.Background(), database, *input.DatasetID); err != nil {return output, errupdates, args, uerr := buildLocationUpdates(input, locationID)if uerr != nil {return uerr - replacement in tools/location.go at line 214
}updates, args, err := buildLocationUpdates(input, locationID)if err != nil {return output, err}query := fmt.Sprintf("UPDATE location SET %s WHERE id = ?", strings.Join(updates, ", "))query := fmt.Sprintf("UPDATE location SET %s WHERE id = ?", strings.Join(updates, ", ")) - replacement in tools/location.go at line 216[3.9087]→[3.9087:9304](∅→∅),[3.9304]→[6.296787:296805](∅→∅),[6.296787]→[6.296787:296805](∅→∅),[6.296805]→[3.9305:9322](∅→∅)
// Begin logged transaction for updatetx, err := db.BeginLoggedTx(ctx, database, "create_or_update_location")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {if err != nil {tx.Rollback()if _, err := tx.ExecContext(ctx, query, args...); err != nil {return fmt.Errorf("failed to update location: %w", err) - edit in tools/location.go at line 219
}()_, err = tx.ExecContext(ctx, query, args...)if err != nil {return output, fmt.Errorf("failed to update location: %w", err)}// Fetch the updated locationlocation, err := fetchLocationByID(ctx, tx, locationID)if err != nil {return output, fmt.Errorf("failed to fetch updated location: %w", err)} - replacement in tools/location.go at line 220
if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)}// Fetch the updated locationlocation, ferr := fetchLocationByID(ctx, tx, locationID)if ferr != nil {return fmt.Errorf("failed to fetch updated location: %w", ferr)} - replacement in tools/location.go at line 226
output.Location = locationoutput.Message = fmt.Sprintf("Successfully updated location '%s' (ID: %s)", location.Name, location.ID)return output, niloutput.Location = locationoutput.Message = fmt.Sprintf("Successfully updated location '%s' (ID: %s)", location.Name, location.ID)return nil})return output, err - edit in tools/import_unstructured.go at line 5
"database/sql" - replacement in tools/import_unstructured.go at line 56
// Open databasedatabase, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("failed to open database: %w", err)}defer database.Close()// Scan for WAV files// Scan for WAV files (no DB needed) - replacement in tools/import_unstructured.go at line 66
// Begin logged transactiontx, err := db.BeginLoggedTx(ctx, database, "import_unstructured")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {if err != nil {tx.Rollback()}}()err := db.WithWriteTx(ctx, dbPath, "import_unstructured", func(database *sql.DB, tx *db.LoggedTx) error {// Process each filefor _, filePath := range files {fileResult, procErr := processUnstructuredFile(tx, filePath, input.DatasetID) - replacement in tools/import_unstructured.go at line 71
// Process each filefor _, filePath := range files {fileResult, procErr := processUnstructuredFile(tx, filePath, input.DatasetID)if procErr != nil {output.FailedFiles++output.Errors = append(output.Errors, utils.FileImportError{FileName: filepath.Base(filePath),Error: procErr.Error(),Stage: utils.StageProcess,})continue} - replacement in tools/import_unstructured.go at line 81[6.310152]→[6.310152:310332](∅→∅),[6.310332]→[5.2792:2826](∅→∅),[5.2826]→[6.310357:310375](∅→∅),[6.310357]→[6.310357:310375](∅→∅)
if procErr != nil {output.FailedFiles++output.Errors = append(output.Errors, utils.FileImportError{FileName: filepath.Base(filePath),Error: procErr.Error(),Stage: utils.StageProcess,})continueif fileResult.Skipped {output.SkippedFiles++} else {output.ImportedFiles++output.TotalDuration += fileResult.Duration} - replacement in tools/import_unstructured.go at line 88
if fileResult.Skipped {output.SkippedFiles++} else {output.ImportedFiles++output.TotalDuration += fileResult.Duration}return nil})if err != nil {return output, err - edit in tools/import_unstructured.go at line 94
// Commit transactionif err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)} - edit in tools/import_unstructured.go at line 188
}// Open database for validationdatabase, err := db.OpenReadOnlyDB(dbPath)if err != nil {return fmt.Errorf("failed to open database: %w", err) - edit in tools/import_unstructured.go at line 189
defer database.Close() - replacement in tools/import_unstructured.go at line 190
// Verify dataset exists and is activevar datasetExists boolerr = database.QueryRow("SELECT EXISTS(SELECT 1 FROM dataset WHERE id = ? AND active = true)",input.DatasetID,).Scan(&datasetExists)if err != nil {return fmt.Errorf("failed to query dataset: %w", err)}if !datasetExists {return fmt.Errorf("dataset not found or inactive: %s", input.DatasetID)}return db.WithReadDB(dbPath, func(database *sql.DB) error {// Verify dataset exists and is activeif _, err := db.DatasetExistsAndActive(database, input.DatasetID); err != nil {return err} - replacement in tools/import_unstructured.go at line 196[6.313965]→[6.313965:314007](∅→∅),[6.314007]→[6.5401:5488](∅→∅),[6.5488]→[6.314097:314113](∅→∅),[6.314097]→[6.314097:314113](∅→∅)
// Verify dataset is 'unstructured' typeif err := db.ValidateDatasetTypeUnstructured(database, input.DatasetID); err != nil {return err}// Verify dataset is 'unstructured' typeif err := db.ValidateDatasetTypeUnstructured(database, input.DatasetID); err != nil {return err} - replacement in tools/import_unstructured.go at line 201
return nilreturn nil}) - replacement in tools/import_segments.go at line 243
var datasetType stringerr := dbConn.QueryRow(`SELECT type FROM dataset WHERE id = ? AND active = true`, datasetID).Scan(&datasetType)if err == sql.ErrNoRows {return fmt.Errorf("dataset not found: %s", datasetID)}if err != nil {return fmt.Errorf("failed to query dataset: %w", err)if err := db.ValidateDatasetTypeForImport(dbConn, datasetID); err != nil {return err - edit in tools/import_segments.go at line 246
if datasetType != "structured" {return fmt.Errorf("dataset must be 'structured' type, got: %s", datasetType)} - replacement in tools/import_segments.go at line 248
var locationExists boolerr = dbConn.QueryRow(`SELECT EXISTS(SELECT 1 FROM location WHERE id = ? AND dataset_id = ? AND active = true)`, locationID, datasetID).Scan(&locationExists)if err != nil {return fmt.Errorf("failed to query location: %w", err)if err := db.ValidateLocationBelongsToDataset(dbConn, locationID, datasetID); err != nil {return err - edit in tools/import_segments.go at line 251
if !locationExists {return fmt.Errorf("location not found or not linked to dataset: %s", locationID)} - replacement in tools/import_segments.go at line 253
var clusterExists boolerr = dbConn.QueryRow(`SELECT EXISTS(SELECT 1 FROM cluster WHERE id = ? AND location_id = ? AND active = true)`, clusterID, locationID).Scan(&clusterExists)if err != nil {return fmt.Errorf("failed to query cluster: %w", err)}if !clusterExists {return fmt.Errorf("cluster not found or not linked to location: %s", clusterID)if err := db.ClusterBelongsToLocation(dbConn, clusterID, locationID); err != nil {return err - replacement in tools/import_files.go at line 130
// Open database for validation queriesdatabase, err := db.OpenReadOnlyDB(dbPath)if err != nil {return fmt.Errorf("failed to open database: %w", err)}defer database.Close()return db.WithReadDB(dbPath, func(database *sql.DB) error {// Verify dataset exists, is active, and is 'structured' typeif err := db.ValidateDatasetTypeForImport(database, datasetID); err != nil {return err} - replacement in tools/import_files.go at line 136
// Verify dataset exists and is activevar datasetExists boolerr = database.QueryRow("SELECT EXISTS(SELECT 1 FROM dataset WHERE id = ? AND active = true)", datasetID).Scan(&datasetExists)if err != nil {return fmt.Errorf("failed to query dataset: %w", err)}if !datasetExists {return fmt.Errorf("dataset not found or inactive: %s", datasetID)}// Verify location exists and belongs to datasetif err := db.ValidateLocationBelongsToDataset(database, locationID, datasetID); err != nil {return err} - replacement in tools/import_files.go at line 141[6.349907]→[6.349907:349995](∅→∅),[6.349995]→[6.5726:5804](∅→∅),[6.5804]→[6.350076:351117](∅→∅),[6.350076]→[6.350076:351117](∅→∅)
// Verify dataset is 'structured' type (file imports only support structured datasets)if err := db.ValidateDatasetTypeForImport(database, datasetID); err != nil {return err}// Verify location exists and belongs to datasetvar locationDatasetID stringerr = database.QueryRow("SELECT dataset_id FROM location WHERE id = ? AND active = true", locationID).Scan(&locationDatasetID)if err == sql.ErrNoRows {return fmt.Errorf("location not found or inactive: %s", locationID)}if err != nil {return fmt.Errorf("failed to query location: %w", err)}if locationDatasetID != datasetID {return fmt.Errorf("location %s does not belong to dataset %s", locationID, datasetID)}// Verify cluster exists and belongs to locationvar clusterLocationID stringerr = database.QueryRow("SELECT location_id FROM cluster WHERE id = ? AND active = true", clusterID).Scan(&clusterLocationID)if err == sql.ErrNoRows {return fmt.Errorf("cluster not found or inactive: %s", clusterID)}if err != nil {return fmt.Errorf("failed to query cluster: %w", err)}if clusterLocationID != locationID {return fmt.Errorf("cluster %s does not belong to location %s", clusterID, locationID)}// Verify cluster exists and belongs to locationif err := db.ClusterBelongsToLocation(database, clusterID, locationID); err != nil {return err} - replacement in tools/import_files.go at line 146
return nilreturn nil}) - replacement in tools/dataset.go at line 59
// Open writable database connectiondatabase, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("database connection failed: %w", err)}defer database.Close()err = db.WithWriteTx(ctx, dbPath, "create_or_update_dataset", func(database *sql.DB, tx *db.LoggedTx) error {// Check for existing dataset with same name (UNIQUE constraint)var existingID stringqerr := tx.QueryRowContext(ctx,"SELECT id FROM dataset WHERE name = ? AND active = true",*input.Name,).Scan(&existingID) - replacement in tools/dataset.go at line 67
// Begin logged transactiontx, err := db.BeginLoggedTx(ctx, database, "create_or_update_dataset")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {if err != nil {tx.Rollback()if qerr == nil {result, herr := handleExistingDataset(ctx, tx, existingID)if herr != nil {return herr}output = resultreturn nil - edit in tools/dataset.go at line 75
}() - replacement in tools/dataset.go at line 76
// Check for existing dataset with same name (UNIQUE constraint)var existingID stringerr = tx.QueryRowContext(ctx,"SELECT id FROM dataset WHERE name = ? AND active = true",*input.Name,).Scan(&existingID)if err == nil {return handleExistingDataset(ctx, tx, existingID)}return insertNewDataset(ctx, tx, *input.Name, input.Description, datasetType)result, insErr := insertNewDataset(ctx, tx, *input.Name, input.Description, datasetType)if insErr != nil {return insErr}output = resultreturn nil})return output, err - edit in tools/dataset.go at line 108
// Caller is responsible for committing the transaction. - edit in tools/dataset.go at line 117
}if err = tx.Commit(); err != nil {return DatasetOutput{}, fmt.Errorf("failed to commit transaction: %w", err) - edit in tools/dataset.go at line 126
// Caller is responsible for committing the transaction. - edit in tools/dataset.go at line 148
}if err = tx.Commit(); err != nil {return DatasetOutput{}, fmt.Errorf("failed to commit transaction: %w", err) - replacement in tools/dataset.go at line 226
// Open writable databasedatabase, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("failed to open database: %w", err)}defer database.Close()err := db.WithWriteTx(ctx, dbPath, "create_or_update_dataset", func(database *sql.DB, tx *db.LoggedTx) error {// Verify dataset exists and check active statusif err := verifyDatasetActive(database, datasetID); err != nil {return err} - replacement in tools/dataset.go at line 232
// Verify dataset exists and check active statusif err := verifyDatasetActive(database, datasetID); err != nil {return output, err}// Build dynamic UPDATE queryquery, args, qerr := buildUpdateQuery(input, datasetID)if qerr != nil {return qerr} - replacement in tools/dataset.go at line 238
// Build dynamic UPDATE queryquery, args, err := buildUpdateQuery(input, datasetID)if err != nil {return output, err}if _, err := tx.Exec(query, args...); err != nil {return fmt.Errorf("failed to update dataset: %w", err)} - replacement in tools/dataset.go at line 242
// Begin logged transaction for updatetx, err := db.BeginLoggedTx(ctx, database, "create_or_update_dataset")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {// Fetch the updated datasetvar dataset db.Dataseterr := tx.QueryRow("SELECT id, name, description, created_at, last_modified, active, type FROM dataset WHERE id = ?",datasetID,).Scan(&dataset.ID, &dataset.Name, &dataset.Description, &dataset.CreatedAt, &dataset.LastModified, &dataset.Active, &dataset.Type) - replacement in tools/dataset.go at line 249
tx.Rollback()return fmt.Errorf("failed to fetch updated dataset: %w", err) - edit in tools/dataset.go at line 251
}()_, err = tx.Exec(query, args...)if err != nil {return output, fmt.Errorf("failed to update dataset: %w", err)}// Fetch the updated datasetvar dataset db.Dataseterr = tx.QueryRow("SELECT id, name, description, created_at, last_modified, active, type FROM dataset WHERE id = ?",datasetID,).Scan(&dataset.ID, &dataset.Name, &dataset.Description, &dataset.CreatedAt, &dataset.LastModified, &dataset.Active, &dataset.Type)if err != nil {return output, fmt.Errorf("failed to fetch updated dataset: %w", err)}if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)}output.Dataset = datasetoutput.Message = fmt.Sprintf("Successfully updated dataset '%s' (ID: %s)", dataset.Name, dataset.ID) - replacement in tools/dataset.go at line 252
return output, niloutput.Dataset = datasetoutput.Message = fmt.Sprintf("Successfully updated dataset '%s' (ID: %s)", dataset.Name, dataset.ID)return nil})return output, err - replacement in tools/cluster.go at line 85
// Open writable database connectiondatabase, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("database connection failed: %w", err)}defer database.Close()// Begin logged transactiontx, err := db.BeginLoggedTx(ctx, database, "create_or_update_cluster")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {if err != nil {tx.Rollback()err := db.WithWriteTx(ctx, dbPath, "create_or_update_cluster", func(database *sql.DB, tx *db.LoggedTx) error {// Verify parent references exist and are activedatasetName, locationName, verr := verifyClusterParentRefs(ctx, tx, input)if verr != nil {return verr - edit in tools/cluster.go at line 91
}() - replacement in tools/cluster.go at line 92[6.383066]→[4.6055:6180](∅→∅),[6.2277]→[6.383659:383676](∅→∅),[4.6180]→[6.383659:383676](∅→∅),[6.383659]→[6.383659:383676](∅→∅),[6.383676]→[6.2278:2299](∅→∅),[6.385120]→[6.385120:385123](∅→∅)
// Verify parent references exist and are activedatasetName, locationName, err := verifyClusterParentRefs(ctx, tx, input)if err != nil {return output, err}// Check for existing cluster with same name in location (UNIQUE constraint)existing, findErr := findExistingClusterInLocation(ctx, tx, *input.LocationID, *input.Name)if findErr == nil {output.Cluster = existingoutput.Message = fmt.Sprintf("Cluster '%s' already exists in location '%s' (ID: %s) - returning existing cluster", existing.Name, locationName, existing.ID)return nil // commit transaction} - replacement in tools/cluster.go at line 100[6.385124]→[6.385124:385202](∅→∅),[6.385202]→[6.2411:2500](∅→∅),[6.2500]→[6.385393:385410](∅→∅),[6.385393]→[6.385393:385410](∅→∅),[6.2562]→[6.386066:386173](∅→∅),[6.386066]→[6.386066:386173](∅→∅)
// Check for existing cluster with same name in location (UNIQUE constraint)existing, err := findExistingClusterInLocation(ctx, tx, *input.LocationID, *input.Name)if err == nil {if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)result, insErr := insertNewCluster(ctx, tx, input, datasetName, locationName)if insErr != nil {return insErr - replacement in tools/cluster.go at line 104[6.386177]→[6.2563:2750](∅→∅),[6.2750]→[6.386362:386387](∅→∅),[6.386362]→[6.386362:386387](∅→∅),[6.386387]→[4.6181:6249](∅→∅)
output.Cluster = existingoutput.Message = fmt.Sprintf("Cluster '%s' already exists in location '%s' (ID: %s) - returning existing cluster", existing.Name, locationName, existing.ID)return output, nil}return insertNewCluster(ctx, tx, input, datasetName, locationName)output = resultreturn nil // commit transaction})return output, err - replacement in tools/cluster.go at line 112
datasetName, err := verifyDatasetForCluster(ctx, tx, *input.DatasetID)datasetName, err := db.DatasetExistsAndActive(tx, *input.DatasetID) - replacement in tools/cluster.go at line 117
locationName, err := verifyLocationForCluster(ctx, tx, *input.LocationID, *input.DatasetID, datasetName)locationName, err := db.LocationBelongsToDataset(tx, *input.LocationID, *input.DatasetID) - edit in tools/cluster.go at line 132
// Caller is responsible for committing the transaction. - edit in tools/cluster.go at line 150
}if err = tx.Commit(); err != nil {return ClusterOutput{}, fmt.Errorf("failed to commit transaction: %w", err) - edit in tools/cluster.go at line 184
// verifyDatasetForCluster verifies dataset exists and is active within a transactionfunc verifyDatasetForCluster(ctx context.Context, tx *db.LoggedTx, datasetID string) (string, error) {var exists, active boolvar name stringerr := tx.QueryRowContext(ctx,"SELECT EXISTS(SELECT 1 FROM dataset WHERE id = ?), COALESCE((SELECT active FROM dataset WHERE id = ?), false), COALESCE((SELECT name FROM dataset WHERE id = ?), '')",datasetID, datasetID, datasetID,).Scan(&exists, &active, &name)if err != nil {return "", fmt.Errorf("failed to verify dataset: %w", err)}if !exists {return "", fmt.Errorf("dataset with ID '%s' does not exist", datasetID)}if !active {return "", fmt.Errorf("dataset '%s' (ID: %s) is not active", name, datasetID)}return name, nil}// verifyLocationForCluster verifies location exists, is active, and belongs to the datasetfunc verifyLocationForCluster(ctx context.Context, tx *db.LoggedTx, locationID, datasetID, datasetName string) (string, error) {var exists, active boolvar name, locDatasetID stringerr := tx.QueryRowContext(ctx,"SELECT EXISTS(SELECT 1 FROM location WHERE id = ?), COALESCE((SELECT active FROM location WHERE id = ?), false), COALESCE((SELECT name FROM location WHERE id = ?), ''), COALESCE((SELECT dataset_id FROM location WHERE id = ?), '')",locationID, locationID, locationID, locationID,).Scan(&exists, &active, &name, &locDatasetID)if err != nil {return "", fmt.Errorf("failed to verify location: %w", err)}if !exists {return "", fmt.Errorf("location with ID '%s' does not exist", locationID)}if !active {return "", fmt.Errorf("location '%s' (ID: %s) is not active", name, locationID)}if locDatasetID != datasetID {return "", fmt.Errorf("location '%s' (ID: %s) does not belong to dataset '%s' (ID: %s) - it belongs to dataset ID '%s'",name, locationID, datasetName, datasetID, locDatasetID)}return name, nil} - replacement in tools/cluster.go at line 348[6.12965]→[6.12965:13212](∅→∅),[6.13212]→[6.9536:9628](∅→∅),[6.9628]→[6.13468:13576](∅→∅),[6.13468]→[6.13468:13576](∅→∅)
database, err := db.OpenWriteableDB(dbPath)if err != nil {return output, fmt.Errorf("failed to open database: %w", err)}defer database.Close()if err := validateClusterActive(database, clusterID); err != nil {return output, err}if err := validateClusterCyclicPattern(database, input); err != nil {return output, err}query, args, err := buildClusterUpdateQuery(input, clusterID)if err != nil {return output, err}err = db.WithWriteTx(ctx, dbPath, "create_or_update_cluster", func(database *sql.DB, tx *db.LoggedTx) error {if err := validateClusterActive(database, clusterID); err != nil {return err} - replacement in tools/cluster.go at line 353
tx, err := db.BeginLoggedTx(ctx, database, "create_or_update_cluster")if err != nil {return output, fmt.Errorf("failed to begin transaction: %w", err)}defer func() {if err != nil {tx.Rollback()if err := validateClusterCyclicPattern(database, input); err != nil {return err - edit in tools/cluster.go at line 356
}() - replacement in tools/cluster.go at line 357
if _, err = tx.Exec(query, args...); err != nil {return output, fmt.Errorf("failed to update cluster: %w", err)}query, args, qerr := buildClusterUpdateQuery(input, clusterID)if qerr != nil {return qerr} - replacement in tools/cluster.go at line 362
cluster, err := fetchClusterByID(ctx, tx, clusterID)if err != nil {return output, fmt.Errorf("failed to fetch updated cluster: %w", err)}if _, err := tx.Exec(query, args...); err != nil {return fmt.Errorf("failed to update cluster: %w", err)} - replacement in tools/cluster.go at line 366
if err = tx.Commit(); err != nil {return output, fmt.Errorf("failed to commit transaction: %w", err)}cluster, ferr := fetchClusterByID(ctx, tx, clusterID)if ferr != nil {return fmt.Errorf("failed to fetch updated cluster: %w", ferr)} - replacement in tools/cluster.go at line 371
output.Cluster = clusteroutput.Message = fmt.Sprintf("Successfully updated cluster '%s' (ID: %s)", cluster.Name, cluster.ID)return output, niloutput.Cluster = clusteroutput.Message = fmt.Sprintf("Successfully updated cluster '%s' (ID: %s)", cluster.Name, cluster.ID)return nil})return output, err - replacement in tools/calls_from_raven.go at line 8
"sort""strconv" - edit in tools/calls_from_raven.go at line 10
"sync""sync/atomic" - edit in tools/calls_from_raven.go at line 33[6.459304]→[6.459304:459778](∅→∅),[6.459778]→[6.9638:9980](∅→∅),[6.9980]→[6.459778:460898](∅→∅),[6.459778]→[6.459778:460898](∅→∅)
}// RavenSelection represents a single Raven selectiontype RavenSelection struct {StartTime float64EndTime float64FreqLow float64FreqHigh float64Species string}// ravenJob represents a single Raven file to processtype ravenJob struct {ravenFile string}// ravenResult represents the result of processing a single Raven filetype ravenResult struct {ravenFile stringcalls []ClusteredCallwritten boolskipped boolerr error}func (r ravenResult) filePath() string { return r.ravenFile }func (r ravenResult) getCalls() []ClusteredCall { return r.calls }func (r ravenResult) wasWritten() bool { return r.written }func (r ravenResult) wasSkipped() bool { return r.skipped }func (r ravenResult) getError() error { return r.err }// CallsFromRaven processes Raven selection files and writes .data filesfunc CallsFromRaven(input CallsFromRavenInput) (CallsFromRavenOutput, error) {var output CallsFromRavenOutputoutput.Filter = "Raven"// Collect Raven files to processvar ravenFiles []stringif input.File != "" {ravenFiles = []string{input.File}} else if input.Folder != "" {var err errorravenFiles, err = findRavenFiles(input.Folder)if err != nil {errMsg := fmt.Sprintf("Failed to find Raven files: %v", err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}} else {errMsg := "Either --folder or --file must be specified"output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}if len(ravenFiles) == 0 {errMsg := "No Raven files found"output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}// Single file or small batch: process sequentially (avoid goroutine overhead)if len(ravenFiles) < 10 {return callsFromRavenSequential(input, ravenFiles)}// Large batch: parallel processing with DirCachereturn callsFromRavenParallel(input, ravenFiles) - edit in tools/calls_from_raven.go at line 34
// callsFromRavenSequential processes Raven files one at a time (for small batches)func callsFromRavenSequential(input CallsFromRavenInput, ravenFiles []string) (CallsFromRavenOutput, error) {var output CallsFromRavenOutputoutput.Filter = "Raven"// Build DirCache once for the folder (even sequential benefits from avoiding repeated dir scans)dirCaches := make(map[string]*DirCache)if input.Folder != "" {dirCaches[input.Folder] = NewDirCache(input.Folder)}speciesCount := make(map[string]int)var allCalls []ClusteredCalldataFilesWritten := 0dataFilesSkipped := 0filesProcessed := 0filesDeleted := 0for _, ravenFile := range ravenFiles {dir := filepath.Dir(ravenFile)cache := dirCaches[dir]if cache == nil {cache = NewDirCache(dir)dirCaches[dir] = cache}calls, written, skipped, err := processRavenFileCached(ravenFile, cache)if err != nil {errMsg := fmt.Sprintf("Error processing %s: %v", ravenFile, err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}if written {dataFilesWritten++}if skipped {dataFilesSkipped++} - replacement in tools/calls_from_raven.go at line 35
for _, call := range calls {allCalls = append(allCalls, call)speciesCount[call.EbirdCode]++}// ravenSource implements CallSource for Raven selection filestype ravenSource struct{} - replacement in tools/calls_from_raven.go at line 38
filesProcessed++func (ravenSource) Name() string { return "Raven" } - replacement in tools/calls_from_raven.go at line 40[6.462155]→[6.462155:463660](∅→∅),[6.463660]→[6.9981:10026](∅→∅),[6.10026]→[6.463702:464093](∅→∅),[6.463702]→[6.463702:464093](∅→∅),[6.464093]→[6.10027:10119](∅→∅),[6.10119]→[6.464897:464898](∅→∅),[6.464897]→[6.464897:464898](∅→∅),[6.464898]→[6.10120:10183](∅→∅),[6.10183]→[6.465103:465128](∅→∅),[6.465103]→[6.465103:465128](∅→∅),[6.465128]→[6.10184:10216](∅→∅),[6.10216]→[6.465154:465158](∅→∅),[6.465154]→[6.465154:465158](∅→∅),[6.465158]→[6.10217:10254](∅→∅),[6.10254]→[6.465399:465400](∅→∅),[6.465399]→[6.465399:465400](∅→∅),[6.465400]→[6.10255:10551](∅→∅),[6.10551]→[6.465660:465743](∅→∅),[6.465660]→[6.465660:465743](∅→∅),[6.465743]→[6.10552:10665](∅→∅),[6.10665]→[6.465853:466500](∅→∅),[6.465853]→[6.465853:466500](∅→∅)
// Delete if requested and successfully processedif input.Delete && written {if err := os.Remove(ravenFile); err != nil {errMsg := fmt.Sprintf("Failed to delete %s: %v", ravenFile, err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}filesDeleted++}if input.ProgressHandler != nil {input.ProgressHandler(filesProcessed, len(ravenFiles), filepath.Base(ravenFile))}}// Sort all calls by file, then start timesort.Slice(allCalls, func(i, j int) bool {if allCalls[i].File != allCalls[j].File {return allCalls[i].File < allCalls[j].File}return allCalls[i].StartTime < allCalls[j].StartTime})output.Calls = allCallsoutput.TotalCalls = len(allCalls)output.SpeciesCount = speciesCountoutput.DataFilesWritten = dataFilesWrittenoutput.DataFilesSkipped = dataFilesSkippedoutput.FilesProcessed = filesProcessedoutput.FilesDeleted = filesDeletedreturn output, nil}// callsFromRavenParallel processes Raven files concurrently using a worker pool and DirCachefunc callsFromRavenParallel(input CallsFromRavenInput, ravenFiles []string) (CallsFromRavenOutput, error) {var output CallsFromRavenOutputoutput.Filter = "Raven"total := len(ravenFiles)var processed atomic.Int32// Build DirCache for the folderdirCaches := &sync.Map{}if input.Folder != "" {cache := NewDirCache(input.Folder)dirCaches.Store(input.Folder, cache)}// Create job and result channelsjobs := make(chan ravenJob, total)results := make(chan parallelResult, total)// Start workersvar wg sync.WaitGroupfor range DOT_DATA_WORKERS {wg.Add(1)go ravenWorker(dirCaches, jobs, results, &wg)}// Send jobsfor _, ravenFile := range ravenFiles {jobs <- ravenJob{ravenFile: ravenFile}}close(jobs)// Wait for workers to finish, then close resultsgo func() {wg.Wait()close(results)}()// Collect results with progress reportingstats := aggregateResults(results, total, &processed, input.Delete, input.ProgressHandler)if stats.firstErr != nil {errMsg := stats.firstErr.Error()output.Error = &errMsgreturn output, stats.firstErr}sortCallsByFileAndTime(stats.calls)output.Calls = stats.callsoutput.TotalCalls = len(stats.calls)output.SpeciesCount = stats.speciesCountoutput.DataFilesWritten = stats.dataFilesWrittenoutput.DataFilesSkipped = stats.dataFilesSkippedoutput.FilesProcessed = stats.filesProcessedoutput.FilesDeleted = stats.filesDeletedreturn output, nil}// ravenWorker processes Raven files from the jobs channelfunc ravenWorker(dirCaches *sync.Map, jobs <-chan ravenJob, results chan<- parallelResult, wg *sync.WaitGroup) {defer wg.Done()for job := range jobs {dir := filepath.Dir(job.ravenFile)// Get or create DirCache for this directoryvar cache *DirCacheif cached, ok := dirCaches.Load(dir); ok {cache = cached.(*DirCache)} else {cache = NewDirCache(dir)dirCaches.Store(dir, cache)}calls, written, skipped, err := processRavenFileCached(job.ravenFile, cache)results <- ravenResult{ravenFile: job.ravenFile,calls: calls,written: written,skipped: skipped,err: err,}}}// findRavenFiles finds all Raven selection files in a folderfunc findRavenFiles(folder string) ([]string, error) {func (ravenSource) FindFiles(folder string) ([]string, error) { - edit in tools/calls_from_raven.go at line 57
func (ravenSource) ProcessFile(ravenFile string, cache *DirCache) ([]ClusteredCall, bool, bool, error) {return processRavenFileCached(ravenFile, cache)}// CallsFromRaven processes Raven selection files and writes .data filesfunc CallsFromRaven(input CallsFromRavenInput) (CallsFromRavenOutput, error) {src := ravenSource{}commonInput := CallsFromSourceInput(input) - edit in tools/calls_from_raven.go at line 67
commonOutput, err := callsFromSource(src, commonInput)// Convert to Raven-specific output typevar output CallsFromRavenOutputoutput.Calls = commonOutput.Callsoutput.TotalCalls = commonOutput.TotalCallsoutput.SpeciesCount = commonOutput.SpeciesCountoutput.DataFilesWritten = commonOutput.DataFilesWrittenoutput.DataFilesSkipped = commonOutput.DataFilesSkippedoutput.FilesProcessed = commonOutput.FilesProcessedoutput.FilesDeleted = commonOutput.FilesDeletedoutput.Filter = commonOutput.Filteroutput.Error = commonOutput.Errorreturn output, err}// RavenSelection represents a single Raven selectiontype RavenSelection struct {StartTime float64EndTime float64FreqLow float64FreqHigh float64Species string} - replacement in tools/calls_from_raven.go at line 153
if _, err := fmt.Sscanf(fields[idx.beginTimeIdx], "%f", &sel.StartTime); err != nil {startTime, err := strconv.ParseFloat(fields[idx.beginTimeIdx], 64)if err != nil { - replacement in tools/calls_from_raven.go at line 157
if _, err := fmt.Sscanf(fields[idx.endTimeIdx], "%f", &sel.EndTime); err != nil {sel.StartTime = startTimeendTime, err := strconv.ParseFloat(fields[idx.endTimeIdx], 64)if err != nil { - edit in tools/calls_from_raven.go at line 163
sel.EndTime = endTime - replacement in tools/calls_from_raven.go at line 166
if _, err := fmt.Sscanf(fields[idx.lowFreqIdx], "%f", &sel.FreqLow); err != nil {freqLow, err := strconv.ParseFloat(fields[idx.lowFreqIdx], 64)if err != nil { - edit in tools/calls_from_raven.go at line 170
sel.FreqLow = freqLow - replacement in tools/calls_from_raven.go at line 173
if _, err := fmt.Sscanf(fields[idx.highFreqIdx], "%f", &sel.FreqHigh); err != nil {freqHigh, err := strconv.ParseFloat(fields[idx.highFreqIdx], 64)if err != nil { - edit in tools/calls_from_raven.go at line 177
sel.FreqHigh = freqHigh - edit in tools/calls_from_preds.go at line 300
}// extractFilename extracts just the filename from a path// "./C05/2025-11-08/20250518_210000.WAV" -> "20250518_210000.WAV"func extractFilename(path string) string {return filepath.Base(path) - replacement in tools/calls_from_preds.go at line 384
filename := extractFilename(call.File)filename := filepath.Base(call.File) - file addition: calls_from_common.go[6.248737]
package toolsimport ("fmt""os""path/filepath""sort""sync""sync/atomic")// CallsFromSourceInput defines the common input for calls-from-source toolstype CallsFromSourceInput struct {Folder string `json:"folder"`File string `json:"file"`Delete bool `json:"delete"`ProgressHandler ProgressHandler `json:"-"` // Optional progress callback}// CallsFromSourceOutput defines the common output for calls-from-source toolstype CallsFromSourceOutput struct {Calls []ClusteredCall `json:"calls"`TotalCalls int `json:"total_calls"`SpeciesCount map[string]int `json:"species_count"`DataFilesWritten int `json:"data_files_written"`DataFilesSkipped int `json:"data_files_skipped"`FilesProcessed int `json:"files_processed"`FilesDeleted int `json:"files_deleted"`Filter string `json:"filter"`Error *string `json:"error,omitempty"`}// CallSource abstracts a source of bird call data (Raven, BirdNET, etc.)type CallSource interface {// Name returns the display name (e.g. "Raven", "BirdNET")Name() string// FindFiles discovers source files in the given folderFindFiles(folder string) ([]string, error)// ProcessFile processes a single source file and returns calls, write/skip statusProcessFile(path string, cache *DirCache) (calls []ClusteredCall, written, skipped bool, err error)}// callsFromSource is the shared entry point for all call source tools.func callsFromSource(src CallSource, input CallsFromSourceInput) (CallsFromSourceOutput, error) {var output CallsFromSourceOutputoutput.Filter = src.Name()// Collect source files to processvar files []stringif input.File != "" {files = []string{input.File}} else if input.Folder != "" {var err errorfiles, err = src.FindFiles(input.Folder)if err != nil {errMsg := fmt.Sprintf("Failed to find %s files: %v", src.Name(), err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}} else {errMsg := "Either --folder or --file must be specified"output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}if len(files) == 0 {errMsg := fmt.Sprintf("No %s files found", src.Name())output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}// Single file or small batch: process sequentially (avoid goroutine overhead)if len(files) < 10 {return callsFromSourceSequential(src, input, files)}// Large batch: parallel processing with DirCachereturn callsFromSourceParallel(src, input, files)}// callsFromSourceSequential processes source files one at a time (for small batches)func callsFromSourceSequential(src CallSource, input CallsFromSourceInput, files []string) (CallsFromSourceOutput, error) {var output CallsFromSourceOutputoutput.Filter = src.Name()// Build DirCache once for the folderdirCaches := make(map[string]*DirCache)if input.Folder != "" {dirCaches[input.Folder] = NewDirCache(input.Folder)}speciesCount := make(map[string]int)var allCalls []ClusteredCalldataFilesWritten := 0dataFilesSkipped := 0filesProcessed := 0filesDeleted := 0for _, file := range files {dir := filepath.Dir(file)cache := dirCaches[dir]if cache == nil {cache = NewDirCache(dir)dirCaches[dir] = cache}calls, written, skipped, err := src.ProcessFile(file, cache)if err != nil {errMsg := fmt.Sprintf("Error processing %s: %v", file, err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}if written {dataFilesWritten++}if skipped {dataFilesSkipped++}for _, call := range calls {allCalls = append(allCalls, call)speciesCount[call.EbirdCode]++}filesProcessed++// Delete if requested and successfully processedif input.Delete && written {if err := os.Remove(file); err != nil {errMsg := fmt.Sprintf("Failed to delete %s: %v", file, err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}filesDeleted++}if input.ProgressHandler != nil {input.ProgressHandler(filesProcessed, len(files), filepath.Base(file))}}// Sort all calls by file, then start timesort.Slice(allCalls, func(i, j int) bool {if allCalls[i].File != allCalls[j].File {return allCalls[i].File < allCalls[j].File}return allCalls[i].StartTime < allCalls[j].StartTime})output.Calls = allCallsoutput.TotalCalls = len(allCalls)output.SpeciesCount = speciesCountoutput.DataFilesWritten = dataFilesWrittenoutput.DataFilesSkipped = dataFilesSkippedoutput.FilesProcessed = filesProcessedoutput.FilesDeleted = filesDeletedreturn output, nil}// sourceJob represents a single file to process (generic over CallSource)type sourceJob struct {filePath string}// sourceResult represents the result of processing a single source filetype sourceResult struct {path stringcalls []ClusteredCallwritten boolskipped boolerr error}func (r sourceResult) filePath() string { return r.path }func (r sourceResult) getCalls() []ClusteredCall { return r.calls }func (r sourceResult) wasWritten() bool { return r.written }func (r sourceResult) wasSkipped() bool { return r.skipped }func (r sourceResult) getError() error { return r.err }// callsFromSourceParallel processes source files concurrently using a worker pool and DirCachefunc callsFromSourceParallel(src CallSource, input CallsFromSourceInput, files []string) (CallsFromSourceOutput, error) {var output CallsFromSourceOutputoutput.Filter = src.Name()total := len(files)var processed atomic.Int32// Build DirCache for the folderdirCaches := &sync.Map{}if input.Folder != "" {cache := NewDirCache(input.Folder)dirCaches.Store(input.Folder, cache)}// Create job and result channelsjobs := make(chan sourceJob, total)results := make(chan parallelResult, total)// Start workersvar wg sync.WaitGroupfor range DOT_DATA_WORKERS {wg.Add(1)go sourceWorker(src, dirCaches, jobs, results, &wg)}// Send jobsfor _, file := range files {jobs <- sourceJob{filePath: file}}close(jobs)// Wait for workers to finish, then close resultsgo func() {wg.Wait()close(results)}()// Collect results with progress reportingstats := aggregateResults(results, total, &processed, input.Delete, input.ProgressHandler)if stats.firstErr != nil {errMsg := stats.firstErr.Error()output.Error = &errMsgreturn output, stats.firstErr}sortCallsByFileAndTime(stats.calls)output.Calls = stats.callsoutput.TotalCalls = len(stats.calls)output.SpeciesCount = stats.speciesCountoutput.DataFilesWritten = stats.dataFilesWrittenoutput.DataFilesSkipped = stats.dataFilesSkippedoutput.FilesProcessed = stats.filesProcessedoutput.FilesDeleted = stats.filesDeletedreturn output, nil}// sourceWorker processes source files from the jobs channelfunc sourceWorker(src CallSource, dirCaches *sync.Map, jobs <-chan sourceJob, results chan<- parallelResult, wg *sync.WaitGroup) {defer wg.Done()for job := range jobs {dir := filepath.Dir(job.filePath)// Get or create DirCache for this directoryvar cache *DirCacheif cached, ok := dirCaches.Load(dir); ok {cache = cached.(*DirCache)} else {cache = NewDirCache(dir)dirCaches.Store(dir, cache)}calls, written, skipped, err := src.ProcessFile(job.filePath, cache)results <- sourceResult{path: job.filePath,calls: calls,written: written,skipped: skipped,err: err,}}} - replacement in tools/calls_from_birda.go at line 9
"sort""strconv" - edit in tools/calls_from_birda.go at line 11
"sync""sync/atomic" - edit in tools/calls_from_birda.go at line 34
}// BirdNETDetection represents a single BirdNET detectiontype BirdNETDetection struct {StartTime float64EndTime float64ScientificName stringCommonName stringConfidence float64WAVPath string}// birdaJob represents a single BirdNET file to processtype birdaJob struct {birdaFile string}// birdaResult represents the result of processing a single BirdNET filetype birdaResult struct {birdaFile stringcalls []ClusteredCallwritten boolskipped boolerr error - edit in tools/calls_from_birda.go at line 35
func (r birdaResult) filePath() string { return r.birdaFile }func (r birdaResult) getCalls() []ClusteredCall { return r.calls }func (r birdaResult) wasWritten() bool { return r.written }func (r birdaResult) wasSkipped() bool { return r.skipped }func (r birdaResult) getError() error { return r.err }// CallsFromBirda processes BirdNET results files and writes .data filesfunc CallsFromBirda(input CallsFromBirdaInput) (CallsFromBirdaOutput, error) {var output CallsFromBirdaOutputoutput.Filter = "BirdNET"// Collect BirdNET files to processvar birdaFiles []stringif input.File != "" {birdaFiles = []string{input.File}} else if input.Folder != "" {var err errorbirdaFiles, err = findBirdaFiles(input.Folder)if err != nil {errMsg := fmt.Sprintf("Failed to find BirdNET files: %v", err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}} else {errMsg := "Either --folder or --file must be specified"output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)} - replacement in tools/calls_from_birda.go at line 36
if len(birdaFiles) == 0 {errMsg := "No BirdNET files found"output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}// birdaSource implements CallSource for BirdNET results filestype birdaSource struct{} - replacement in tools/calls_from_birda.go at line 39
// Single file or small batch: process sequentially (avoid goroutine overhead)if len(birdaFiles) < 10 {return callsFromBirdaSequential(input, birdaFiles)}func (birdaSource) Name() string { return "BirdNET" } - replacement in tools/calls_from_birda.go at line 41[6.518601]→[6.518601:521412](∅→∅),[6.521412]→[6.11013:11058](∅→∅),[6.11058]→[6.521454:521845](∅→∅),[6.521454]→[6.521454:521845](∅→∅),[6.521845]→[6.11059:11151](∅→∅),[6.11151]→[6.522649:522650](∅→∅),[6.522649]→[6.522649:522650](∅→∅),[6.522650]→[6.11152:11215](∅→∅),[6.11215]→[6.522855:522880](∅→∅),[6.522855]→[6.522855:522880](∅→∅),[6.522880]→[6.11216:11248](∅→∅),[6.11248]→[6.522906:522910](∅→∅),[6.522906]→[6.522906:522910](∅→∅),[6.522910]→[6.11249:11286](∅→∅),[6.11286]→[6.523151:523152](∅→∅),[6.523151]→[6.523151:523152](∅→∅),[6.523152]→[6.11287:11583](∅→∅),[6.11583]→[6.523412:523497](∅→∅),[6.523412]→[6.523412:523497](∅→∅),[6.523497]→[6.11584:11697](∅→∅),[6.11697]→[6.523607:524254](∅→∅),[6.523607]→[6.523607:524254](∅→∅)
// Large batch: parallel processing with DirCachereturn callsFromBirdaParallel(input, birdaFiles)}// callsFromBirdaSequential processes BirdNET files one at a time (for small batches)func callsFromBirdaSequential(input CallsFromBirdaInput, birdaFiles []string) (CallsFromBirdaOutput, error) {var output CallsFromBirdaOutputoutput.Filter = "BirdNET"// Build DirCache once for the folderdirCaches := make(map[string]*DirCache)if input.Folder != "" {dirCaches[input.Folder] = NewDirCache(input.Folder)}speciesCount := make(map[string]int)var allCalls []ClusteredCalldataFilesWritten := 0dataFilesSkipped := 0filesProcessed := 0filesDeleted := 0for _, birdaFile := range birdaFiles {dir := filepath.Dir(birdaFile)cache := dirCaches[dir]if cache == nil {cache = NewDirCache(dir)dirCaches[dir] = cache}calls, written, skipped, err := processBirdaFileCached(birdaFile, cache)if err != nil {errMsg := fmt.Sprintf("Error processing %s: %v", birdaFile, err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}if written {dataFilesWritten++}if skipped {dataFilesSkipped++}for _, call := range calls {allCalls = append(allCalls, call)speciesCount[call.EbirdCode]++}filesProcessed++// Delete if requested and successfully processedif input.Delete && written {if err := os.Remove(birdaFile); err != nil {errMsg := fmt.Sprintf("Failed to delete %s: %v", birdaFile, err)output.Error = &errMsgreturn output, fmt.Errorf("%s", errMsg)}filesDeleted++}if input.ProgressHandler != nil {input.ProgressHandler(filesProcessed, len(birdaFiles), filepath.Base(birdaFile))}}// Sort all calls by file, then start timesort.Slice(allCalls, func(i, j int) bool {if allCalls[i].File != allCalls[j].File {return allCalls[i].File < allCalls[j].File}return allCalls[i].StartTime < allCalls[j].StartTime})output.Calls = allCallsoutput.TotalCalls = len(allCalls)output.SpeciesCount = speciesCountoutput.DataFilesWritten = dataFilesWrittenoutput.DataFilesSkipped = dataFilesSkippedoutput.FilesProcessed = filesProcessedoutput.FilesDeleted = filesDeletedreturn output, nil}// callsFromBirdaParallel processes BirdNET files concurrently using a worker pool and DirCachefunc callsFromBirdaParallel(input CallsFromBirdaInput, birdaFiles []string) (CallsFromBirdaOutput, error) {var output CallsFromBirdaOutputoutput.Filter = "BirdNET"total := len(birdaFiles)var processed atomic.Int32// Build DirCache for the folderdirCaches := &sync.Map{}if input.Folder != "" {cache := NewDirCache(input.Folder)dirCaches.Store(input.Folder, cache)}// Create job and result channelsjobs := make(chan birdaJob, total)results := make(chan parallelResult, total)// Start workersvar wg sync.WaitGroupfor range DOT_DATA_WORKERS {wg.Add(1)go birdaWorker(dirCaches, jobs, results, &wg)}// Send jobsfor _, birdaFile := range birdaFiles {jobs <- birdaJob{birdaFile: birdaFile}}close(jobs)// Wait for workers to finish, then close resultsgo func() {wg.Wait()close(results)}()// Collect results with progress reportingstats := aggregateResults(results, total, &processed, input.Delete, input.ProgressHandler)if stats.firstErr != nil {errMsg := stats.firstErr.Error()output.Error = &errMsgreturn output, stats.firstErr}sortCallsByFileAndTime(stats.calls)output.Calls = stats.callsoutput.TotalCalls = len(stats.calls)output.SpeciesCount = stats.speciesCountoutput.DataFilesWritten = stats.dataFilesWrittenoutput.DataFilesSkipped = stats.dataFilesSkippedoutput.FilesProcessed = stats.filesProcessedoutput.FilesDeleted = stats.filesDeletedreturn output, nil}// birdaWorker processes BirdNET files from the jobs channelfunc birdaWorker(dirCaches *sync.Map, jobs <-chan birdaJob, results chan<- parallelResult, wg *sync.WaitGroup) {defer wg.Done()for job := range jobs {dir := filepath.Dir(job.birdaFile)// Get or create DirCache for this directoryvar cache *DirCacheif cached, ok := dirCaches.Load(dir); ok {cache = cached.(*DirCache)} else {cache = NewDirCache(dir)dirCaches.Store(dir, cache)}calls, written, skipped, err := processBirdaFileCached(job.birdaFile, cache)results <- birdaResult{birdaFile: job.birdaFile,calls: calls,written: written,skipped: skipped,err: err,}}}// findBirdaFiles finds all BirdNET results files in a folderfunc findBirdaFiles(folder string) ([]string, error) {func (birdaSource) FindFiles(folder string) ([]string, error) { - edit in tools/calls_from_birda.go at line 57
}func (birdaSource) ProcessFile(birdaFile string, cache *DirCache) ([]ClusteredCall, bool, bool, error) {return processBirdaFileCached(birdaFile, cache) - edit in tools/calls_from_birda.go at line 62
// CallsFromBirda processes BirdNET results files and writes .data filesfunc CallsFromBirda(input CallsFromBirdaInput) (CallsFromBirdaOutput, error) {src := birdaSource{}commonInput := CallsFromSourceInput(input) - edit in tools/calls_from_birda.go at line 68
commonOutput, err := callsFromSource(src, commonInput)// Convert to Birda-specific output typevar output CallsFromBirdaOutputoutput.Calls = commonOutput.Callsoutput.TotalCalls = commonOutput.TotalCallsoutput.SpeciesCount = commonOutput.SpeciesCountoutput.DataFilesWritten = commonOutput.DataFilesWrittenoutput.DataFilesSkipped = commonOutput.DataFilesSkippedoutput.FilesProcessed = commonOutput.FilesProcessedoutput.FilesDeleted = commonOutput.FilesDeletedoutput.Filter = commonOutput.Filteroutput.Error = commonOutput.Errorreturn output, err}// BirdNETDetection represents a single BirdNET detectiontype BirdNETDetection struct {StartTime float64EndTime float64ScientificName stringCommonName stringConfidence float64WAVPath string} - replacement in tools/calls_from_birda.go at line 146
if _, err := fmt.Sscanf(record[idx.startIdx], "%f", &det.StartTime); err != nil {return nil, fmt.Errorf("failed to parse start time %q: %w", record[idx.startIdx], err)startTime, perr := strconv.ParseFloat(record[idx.startIdx], 64)if perr != nil {return nil, fmt.Errorf("failed to parse start time %q: %w", record[idx.startIdx], perr) - replacement in tools/calls_from_birda.go at line 150
if _, err := fmt.Sscanf(record[idx.endIdx], "%f", &det.EndTime); err != nil {return nil, fmt.Errorf("failed to parse end time %q: %w", record[idx.endIdx], err)det.StartTime = startTimeendTime, perr := strconv.ParseFloat(record[idx.endIdx], 64)if perr != nil {return nil, fmt.Errorf("failed to parse end time %q: %w", record[idx.endIdx], perr) - edit in tools/calls_from_birda.go at line 156
det.EndTime = endTime - replacement in tools/calls_from_birda.go at line 159
if _, err := fmt.Sscanf(record[idx.confidenceIdx], "%f", &det.Confidence); err != nil {return nil, fmt.Errorf("failed to parse confidence %q: %w", record[idx.confidenceIdx], err)confidence, perr := strconv.ParseFloat(record[idx.confidenceIdx], 64)if perr != nil {return nil, fmt.Errorf("failed to parse confidence %q: %w", record[idx.confidenceIdx], perr) - edit in tools/calls_from_birda.go at line 164
det.Confidence = confidence - edit in tools/bulk_file_import.go at line 205
// Verify dataset exists and is activevar datasetExists boolerr = database.QueryRow("SELECT EXISTS(SELECT 1 FROM dataset WHERE id = ? AND active = true)", input.DatasetID).Scan(&datasetExists)if err != nil {return fmt.Errorf("failed to query dataset: %w", err)}if !datasetExists {return fmt.Errorf("dataset not found or inactive: %s", input.DatasetID)} - replacement in tools/bulk_file_import.go at line 206
// Verify dataset is 'structured' type (file imports only support structured datasets)// Verify dataset exists and is structured - edit in db/validation.go at line 4
"context" - edit in db/validation.go at line 8
// Querier is the common interface for *sql.DB and *LoggedTx query operations.type Querier interface {QueryRow(query string, args ...any) *sql.RowQueryRowContext(ctx context.Context, query string, args ...any) *sql.Row} - replacement in db/validation.go at line 17
func GetDatasetType(database *sql.DB, datasetID string) (string, bool, error) {func GetDatasetType(q Querier, datasetID string) (string, bool, error) { - replacement in db/validation.go at line 19
err := database.QueryRow("SELECT type FROM dataset WHERE id = ?", datasetID).Scan(&datasetType)err := q.QueryRow("SELECT type FROM dataset WHERE id = ?", datasetID).Scan(&datasetType) - replacement in db/validation.go at line 31
func ValidateDatasetTypeForImport(database *sql.DB, datasetID string) error {datasetType, exists, err := GetDatasetType(database, datasetID)func ValidateDatasetTypeForImport(q Querier, datasetID string) error {datasetType, exists, err := GetDatasetType(q, datasetID) - replacement in db/validation.go at line 47
func ValidateDatasetTypeUnstructured(database *sql.DB, datasetID string) error {datasetType, exists, err := GetDatasetType(database, datasetID)func ValidateDatasetTypeUnstructured(q Querier, datasetID string) error {datasetType, exists, err := GetDatasetType(q, datasetID) - replacement in db/validation.go at line 63
func ValidateLocationBelongsToDataset(database *sql.DB, locationID, datasetID string) error {func ValidateLocationBelongsToDataset(q Querier, locationID, datasetID string) error { - replacement in db/validation.go at line 65
err := database.QueryRow("SELECT dataset_id FROM location WHERE id = ? AND active = true", locationID).Scan(&locationDatasetID)err := q.QueryRow("SELECT dataset_id FROM location WHERE id = ? AND active = true", locationID).Scan(&locationDatasetID) - edit in db/validation.go at line 77[6.2458]
// DatasetExistsAndActive checks that a dataset exists and is active.// Returns the dataset name if found.func DatasetExistsAndActive(q Querier, datasetID string) (name string, err error) {var exists, active boolerr = q.QueryRow("SELECT EXISTS(SELECT 1 FROM dataset WHERE id = ?), COALESCE((SELECT active FROM dataset WHERE id = ?), false), COALESCE((SELECT name FROM dataset WHERE id = ?), '')",datasetID, datasetID, datasetID,).Scan(&exists, &active, &name)if err != nil {return "", fmt.Errorf("failed to verify dataset: %w", err)}if !exists {return "", fmt.Errorf("dataset with ID '%s' does not exist", datasetID)}if !active {return "", fmt.Errorf("dataset '%s' (ID: %s) is not active", name, datasetID)}return name, nil}// LocationBelongsToDataset checks that a location exists, is active, and belongs to the dataset.// Returns the location name if found.func LocationBelongsToDataset(q Querier, locationID, datasetID string) (name string, err error) {var exists, active boolvar locDatasetID stringerr = q.QueryRow("SELECT EXISTS(SELECT 1 FROM location WHERE id = ?), COALESCE((SELECT active FROM location WHERE id = ?), false), COALESCE((SELECT name FROM location WHERE id = ?), ''), COALESCE((SELECT dataset_id FROM location WHERE id = ?), '')",locationID, locationID, locationID, locationID,).Scan(&exists, &active, &name, &locDatasetID)if err != nil {return "", fmt.Errorf("failed to verify location: %w", err)}if !exists {return "", fmt.Errorf("location with ID '%s' does not exist", locationID)}if !active {return "", fmt.Errorf("location '%s' (ID: %s) is not active", name, locationID)}if locDatasetID != datasetID {return "", fmt.Errorf("location '%s' (ID: %s) does not belong to dataset ID '%s'",name, locationID, locDatasetID)}return name, nil}// ClusterBelongsToLocation checks that a cluster exists, is active, and belongs to the location.func ClusterBelongsToLocation(q Querier, clusterID, locationID string) error {var exists, active boolvar clusterLocationID stringerr := q.QueryRow("SELECT EXISTS(SELECT 1 FROM cluster WHERE id = ?), COALESCE((SELECT active FROM cluster WHERE id = ?), false), COALESCE((SELECT location_id FROM cluster WHERE id = ?), '')",clusterID, clusterID, clusterID,).Scan(&exists, &active, &clusterLocationID)if err != nil {return fmt.Errorf("failed to verify cluster: %w", err)}if !exists {return fmt.Errorf("cluster with ID '%s' does not exist", clusterID)}if !active {return fmt.Errorf("cluster '%s' is not active", clusterID)}if clusterLocationID != locationID {return fmt.Errorf("cluster '%s' does not belong to location '%s'", clusterID, locationID)}return nil} - edit in db/db.go at line 4
"context" - edit in db/db.go at line 30
}// WithReadDB opens a read-only DB connection, calls fn, and closes the connection.// This is a convenience wrapper that ensures the connection is always closed.func WithReadDB(dbPath string, fn func(*sql.DB) error) error {database, err := OpenReadOnlyDB(dbPath)if err != nil {return fmt.Errorf("database connection failed: %w", err)}defer database.Close()return fn(database) - edit in db/db.go at line 43
// WithWriteTx opens a writeable DB connection, begins a logged transaction, calls fn,// and commits on success (or rollbacks on error). The connection is always closed.// The fn callback receives both the *sql.DB (for pre-validation queries) and the// *LoggedTx (for mutation operations).func WithWriteTx(ctx context.Context, dbPath, toolName string, fn func(*sql.DB, *LoggedTx) error) error {database, err := OpenWriteableDB(dbPath)if err != nil {return fmt.Errorf("database connection failed: %w", err)}defer database.Close()tx, err := BeginLoggedTx(ctx, database, toolName)if err != nil {return fmt.Errorf("failed to begin transaction: %w", err)}defer tx.Rollback() // no-op after commitif err := fn(database, tx); err != nil {return err}return tx.Commit()} - replacement in README.md at line 364
# Lines of codemake count# Go unit testsmake unit``` - edit in README.md at line 366
# Shell script integration tests (make sure db/test.duckdb available and FK's applied)make shell - edit in README.md at line 367
# Keep cyclomatic complexity lowgocyclo -over 10 .``` - edit in CHANGELOG.md at line 4
## [2026-05-05] tools/ refactoring: WithWriteTx, CallSource interface, hierarchy primitives, strconv - edit in CHANGELOG.md at line 7
Four refactoring changes in tools/ and db/:- **Added `db.WithWriteTx` and `db.WithReadDB` helpers** (db/db.go): Extracted theopen-DB→begin-tx→defer-rollback→commit→close-DB boilerplate that was repeated across14+ tool entry points. `WithWriteTx(ctx, dbPath, name, fn)` opens a writeable DB, beginsa logged transaction, calls fn, and commits on success / rollbacks on error. `WithReadDB`does the same for read-only connections. Applied to all create/update functions incluster, dataset, location, and pattern, plus import_unstructured, import_files (validation),sql, and bulk_file_import (validation). Eliminates inconsistent rollback handling(some used `defer tx.Rollback()`, others `defer func() { if err != nil { tx.Rollback() } }()`)and removes ~100 lines of boilerplate.- **Introduced `CallSource` interface** (tools/calls_from_common.go): Extracted sharedscaffolding from `calls_from_raven.go` and `calls_from_birda.go` into a `CallSource`interface with `Name()`, `FindFiles()`, and `ProcessFile()` methods. Both files nowimplement the interface and delegate to `callsFromSource()`, which handles thesequential/parallel dispatch, DirCache management, worker pool, and result aggregation.Public API (`CallsFromRaven`, `CallsFromBirda`) unchanged. ~100 lines saved.- **Extracted hierarchy validation primitives** (db/validation.go): Added `Querier`interface, `DatasetExistsAndActive()`, `LocationBelongsToDataset()`, and`ClusterBelongsToLocation()` to db/validation.go. Replaced three near-duplicatefunctions: `validateSegmentHierarchy` (import_segments.go, 14→7 cyclomatic),`validateHierarchyIDs` (import_files.go, 14→7 cyclomatic), and`verifyClusterParentRefs` plus its helpers `verifyDatasetForCluster` and`verifyLocationForCluster` (cluster.go, deleted ~50 lines). Also replaced inlinedataset-exists-and-active checks in bulk_file_import.go and import_unstructured.go.- **`fmt.Sscanf("%f", ...)→ strconv.ParseFloat` and deleted `extractFilename`**: Replaced7 `fmt.Sscanf` calls in calls_from_raven.go and calls_from_birda.go with`strconv.ParseFloat` (faster, idiomatic). Deleted `extractFilename()` fromcalls_from_preds.go — it was a one-line wrapper over `filepath.Base`.Functions at cyclomatic >13 reduced from 9 to 6.