calls_from_common.go
package calls
import (
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
)
// CallsFromSourceInput defines the common input for calls-from-source tools
type CallsFromSourceInput struct {
Folder string `json:"folder"`
File string `json:"file"`
Delete bool `json:"delete"`
ProgressHandler ProgressHandler `json:"-"` // Optional progress callback
}
// CallsFromSourceOutput defines the common output for calls-from-source tools
type CallsFromSourceOutput struct {
Calls []ClusteredCall `json:"calls"`
TotalCalls int `json:"total_calls"`
SpeciesCount map[string]int `json:"species_count"`
DataFilesWritten int `json:"data_files_written"`
DataFilesSkipped int `json:"data_files_skipped"`
FilesProcessed int `json:"files_processed"`
FilesDeleted int `json:"files_deleted"`
Filter string `json:"filter"`
Error *string `json:"error,omitempty"`
}
// CallSource abstracts a source of bird call data (Raven, BirdNET, etc.)
type CallSource interface {
// Name returns the display name (e.g. "Raven", "BirdNET")
Name() string
// FindFiles discovers source files in the given folder
FindFiles(folder string) ([]string, error)
// ProcessFile processes a single source file and returns calls, write/skip status
ProcessFile(path string, cache *DirCache) (calls []ClusteredCall, written, skipped bool, err error)
}
// callsFromSource is the shared entry point for all call source tools.
func callsFromSource(src CallSource, input CallsFromSourceInput) (CallsFromSourceOutput, error) {
var output CallsFromSourceOutput
output.Filter = src.Name()
// Collect source files to process
var files []string
if input.File != "" {
files = []string{input.File}
} else if input.Folder != "" {
var err error
files, err = src.FindFiles(input.Folder)
if err != nil {
errMsg := fmt.Sprintf("Failed to find %s files: %v", src.Name(), err)
output.Error = &errMsg
return output, fmt.Errorf("%s", errMsg)
}
} else {
errMsg := "Either --folder or --file must be specified"
output.Error = &errMsg
return output, fmt.Errorf("%s", errMsg)
}
if len(files) == 0 {
errMsg := fmt.Sprintf("No %s files found", src.Name())
output.Error = &errMsg
return output, fmt.Errorf("%s", errMsg)
}
// Single file or small batch: process sequentially (avoid goroutine overhead)
if len(files) < 10 {
return callsFromSourceSequential(src, input, files)
}
// Large batch: parallel processing with DirCache
return callsFromSourceParallel(src, input, files)
}
// sequentialFileResult holds the outcome of processing a single file sequentially.
type sequentialFileResult struct {
calls []ClusteredCall
dataFilesWritten int
dataFilesSkipped int
filesDeleted int
}
// processSequentialFile handles one source file in the sequential path.
func processSequentialFile(src CallSource, file string, dirCaches map[string]*DirCache, shouldDelete bool) (sequentialFileResult, error) {
var res sequentialFileResult
dir := filepath.Dir(file)
cache := dirCaches[dir]
if cache == nil {
cache = NewDirCache(dir)
dirCaches[dir] = cache
}
calls, written, skipped, err := src.ProcessFile(file, cache)
if err != nil {
return res, fmt.Errorf("error processing %s: %v", file, err)
}
if written {
res.dataFilesWritten = 1
}
if skipped {
res.dataFilesSkipped = 1
}
res.calls = calls
if shouldDelete && written {
if err := os.Remove(file); err != nil {
return res, fmt.Errorf("failed to delete %s: %v", file, err)
}
res.filesDeleted = 1
}
return res, nil
}
// callsFromSourceSequential processes source files one at a time (for small batches)
func callsFromSourceSequential(src CallSource, input CallsFromSourceInput, files []string) (CallsFromSourceOutput, error) {
var output CallsFromSourceOutput
output.Filter = src.Name()
// Build DirCache once for the folder
dirCaches := make(map[string]*DirCache)
if input.Folder != "" {
dirCaches[input.Folder] = NewDirCache(input.Folder)
}
var allCalls []ClusteredCall
speciesCount := make(map[string]int)
dataFilesWritten := 0
dataFilesSkipped := 0
filesProcessed := 0
filesDeleted := 0
for _, file := range files {
res, err := processSequentialFile(src, file, dirCaches, input.Delete)
if err != nil {
errMsg := err.Error()
output.Error = &errMsg
return output, err
}
allCalls = append(allCalls, res.calls...)
dataFilesWritten += res.dataFilesWritten
dataFilesSkipped += res.dataFilesSkipped
filesDeleted += res.filesDeleted
filesProcessed++
for _, call := range res.calls {
speciesCount[call.EbirdCode]++
}
if input.ProgressHandler != nil {
input.ProgressHandler(filesProcessed, len(files), filepath.Base(file))
}
}
sort.Slice(allCalls, func(i, j int) bool {
if allCalls[i].File != allCalls[j].File {
return allCalls[i].File < allCalls[j].File
}
return allCalls[i].StartTime < allCalls[j].StartTime
})
output.Calls = allCalls
output.TotalCalls = len(allCalls)
output.SpeciesCount = speciesCount
output.DataFilesWritten = dataFilesWritten
output.DataFilesSkipped = dataFilesSkipped
output.FilesProcessed = filesProcessed
output.FilesDeleted = filesDeleted
return output, nil
}
// sourceJob represents a single file to process (generic over CallSource)
type sourceJob struct {
filePath string
}
// sourceResult represents the result of processing a single source file
type sourceResult struct {
path string
calls []ClusteredCall
written bool
skipped bool
err error
}
func (r sourceResult) filePath() string { return r.path }
func (r sourceResult) getCalls() []ClusteredCall { return r.calls }
func (r sourceResult) wasWritten() bool { return r.written }
func (r sourceResult) wasSkipped() bool { return r.skipped }
func (r sourceResult) getError() error { return r.err }
// callsFromSourceParallel processes source files concurrently using a worker pool and DirCache
func callsFromSourceParallel(src CallSource, input CallsFromSourceInput, files []string) (CallsFromSourceOutput, error) {
var output CallsFromSourceOutput
output.Filter = src.Name()
total := len(files)
var processed atomic.Int32
// Build DirCache for the folder
dirCaches := &sync.Map{}
if input.Folder != "" {
cache := NewDirCache(input.Folder)
dirCaches.Store(input.Folder, cache)
}
// Create job and result channels
jobs := make(chan sourceJob, total)
results := make(chan parallelResult, total)
// Start workers
var wg sync.WaitGroup
for range DOT_DATA_WORKERS {
wg.Add(1)
go sourceWorker(src, dirCaches, jobs, results, &wg)
}
// Send jobs
for _, file := range files {
jobs <- sourceJob{filePath: file}
}
close(jobs)
// Wait for workers to finish, then close results
go func() {
wg.Wait()
close(results)
}()
// Collect results with progress reporting
stats := aggregateResults(results, total, &processed, input.Delete, input.ProgressHandler)
if stats.firstErr != nil {
errMsg := stats.firstErr.Error()
output.Error = &errMsg
return output, stats.firstErr
}
sortCallsByFileAndTime(stats.calls)
output.Calls = stats.calls
output.TotalCalls = len(stats.calls)
output.SpeciesCount = stats.speciesCount
output.DataFilesWritten = stats.dataFilesWritten
output.DataFilesSkipped = stats.dataFilesSkipped
output.FilesProcessed = stats.filesProcessed
output.FilesDeleted = stats.filesDeleted
return output, nil
}
// sourceWorker processes source files from the jobs channel
func sourceWorker(src CallSource, dirCaches *sync.Map, jobs <-chan sourceJob, results chan<- parallelResult, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
dir := filepath.Dir(job.filePath)
// Get or create DirCache for this directory
var cache *DirCache
if cached, ok := dirCaches.Load(dir); ok {
cache = cached.(*DirCache)
} else {
cache = NewDirCache(dir)
dirCaches.Store(dir, cache)
}
calls, written, skipped, err := src.ProcessFile(job.filePath, cache)
results <- sourceResult{
path: job.filePath,
calls: calls,
written: written,
skipped: skipped,
err: err,
}
}
}
// ClusteredCall represents a clustered bird call detection.
// Shared by all call source implementations (preds, birda, raven).
type ClusteredCall struct {
File string `json:"file"`
StartTime float64 `json:"start_time"`
EndTime float64 `json:"end_time"`
EbirdCode string `json:"ebird_code"`
Segments int `json:"segments"`
}
// DirCache caches directory entries for fast WAV file lookup.
// Scans the directory once and builds a map from lowercased basename to full filename.
// Safe for concurrent read-only use after construction.
type DirCache struct {
dir string
wavMap map[string]string // lowercase basename -> filename with original case (e.g. "20230610_150000" -> "20230610_150000.WAV")
dirMap map[string]string // lowercase basename -> filename for any file (used by from-raven for .selections.txt etc.)
}
// NewDirCache creates a DirCache by scanning the directory once.
func NewDirCache(dir string) *DirCache {
entries, err := os.ReadDir(dir)
if err != nil {
return &DirCache{dir: dir, wavMap: make(map[string]string), dirMap: make(map[string]string)}
}
wavMap := make(map[string]string, len(entries))
dirMap := make(map[string]string, len(entries))
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
ext := filepath.Ext(name)
base := strings.TrimSuffix(name, ext)
dirMap[strings.ToLower(base)] = name
if strings.EqualFold(ext, ".wav") {
wavMap[strings.ToLower(base)] = name
}
}
return &DirCache{dir: dir, wavMap: wavMap, dirMap: dirMap}
}
// FindWAV looks up a WAV file by basename (case-insensitive).
// Returns the full path with correct case, or empty string if not found.
func (dc *DirCache) FindWAV(baseName string) string {
if name, ok := dc.wavMap[strings.ToLower(baseName)]; ok {
return filepath.Join(dc.dir, name)
}
return ""
}
// FindFile looks up any file by basename (case-insensitive).
// Returns the full path with correct case, or empty string if not found.
func (dc *DirCache) FindFile(baseName string) string {
if name, ok := dc.dirMap[strings.ToLower(baseName)]; ok {
return filepath.Join(dc.dir, name)
}
return ""
}
// findWAVFile finds a WAV file in the directory with case-insensitive matching.
// baseName is the filename without extension (e.g., "20230610_150000").
// Returns the full path with correct case, or empty string if not found.
// Deprecated: Use DirCache.FindWAV for batch operations to avoid repeated directory scans.
func findWAVFile(dir, baseName string) string {
entries, err := os.ReadDir(dir)
if err != nil {
return ""
}
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
ext := filepath.Ext(name)
nameNoExt := strings.TrimSuffix(name, ext)
if nameNoExt == baseName && strings.EqualFold(ext, ".wav") {
return filepath.Join(dir, name)
}
}
return ""
}
// ParseFilterFromFilename extracts filter name from preds CSV filename.
// "predsST_opensoundscape-kiwi-1.2_2025-11-12.csv" -> "opensoundscape-kiwi-1.2"
// Returns empty string if parsing fails.
func ParseFilterFromFilename(csvPath string) string {
filename := filepath.Base(csvPath)
// Remove .csv extension
name := strings.TrimSuffix(filename, ".csv")
// Split on underscore
parts := strings.Split(name, "_")
if len(parts) == 3 {
return parts[1]
}
return ""
}
// clusterStartTimes groups consecutive start times into clusters
// where the gap between consecutive times is <= gapThreshold.
func clusterStartTimes(startTimes []float64, gapThreshold float64) [][]float64 {
if len(startTimes) == 0 {
return nil
}
var clusters [][]float64
currentCluster := []float64{startTimes[0]}
for i := 1; i < len(startTimes); i++ {
gap := startTimes[i] - startTimes[i-1]
if gap <= gapThreshold {
// Same cluster
currentCluster = append(currentCluster, startTimes[i])
} else {
// New cluster
clusters = append(clusters, currentCluster)
currentCluster = []float64{startTimes[i]}
}
}
// Don't forget the last cluster
clusters = append(clusters, currentCluster)
return clusters
}
// predFileSpeciesKey groups detections by file and ebird code.
// Used by clusterDetections in calls_from_preds.go.
type predFileSpeciesKey struct {
File string
EbirdCode string
}
// clusterDetections groups detections into clusters and produces sorted ClusteredCalls.
func clusterDetections(detections map[predFileSpeciesKey][]float64, clipDuration, gapThreshold float64, minDetections int) ([]ClusteredCall, map[string]int) {
var allCalls []ClusteredCall
speciesCount := make(map[string]int)
for key, startTimes := range detections {
sort.Float64s(startTimes)
clusters := clusterStartTimes(startTimes, gapThreshold)
for _, cluster := range clusters {
if len(cluster) <= minDetections {
continue
}
call := ClusteredCall{
File: key.File,
StartTime: cluster[0],
EndTime: cluster[len(cluster)-1] + clipDuration,
EbirdCode: key.EbirdCode,
Segments: len(cluster),
}
allCalls = append(allCalls, call)
speciesCount[key.EbirdCode]++
}
}
sort.Slice(allCalls, func(i, j int) bool {
if allCalls[i].File != allCalls[j].File {
return allCalls[i].File < allCalls[j].File
}
return allCalls[i].StartTime < allCalls[j].StartTime
})
return allCalls, speciesCount
}