Fork channel

Create a new channel as a copy of main.

Rename channel

Rename main to:

Delete channel

Delete main? This cannot be undone.

calls_from_common.go
package calls

import (
	"fmt"
	"os"
	"path/filepath"
	"sort"
	"strings"
	"sync"
	"sync/atomic"
)

// CallsFromSourceInput defines the common input for calls-from-source tools
type CallsFromSourceInput struct {
	Folder          string          `json:"folder"`
	File            string          `json:"file"`
	Delete          bool            `json:"delete"`
	ProgressHandler ProgressHandler `json:"-"` // Optional progress callback
}

// CallsFromSourceOutput defines the common output for calls-from-source tools
type CallsFromSourceOutput struct {
	Calls            []ClusteredCall `json:"calls"`
	TotalCalls       int             `json:"total_calls"`
	SpeciesCount     map[string]int  `json:"species_count"`
	DataFilesWritten int             `json:"data_files_written"`
	DataFilesSkipped int             `json:"data_files_skipped"`
	FilesProcessed   int             `json:"files_processed"`
	FilesDeleted     int             `json:"files_deleted"`
	Filter           string          `json:"filter"`
	Error            *string         `json:"error,omitempty"`
}

// CallSource abstracts a source of bird call data (Raven, BirdNET, etc.)
type CallSource interface {
	// Name returns the display name (e.g. "Raven", "BirdNET")
	Name() string
	// FindFiles discovers source files in the given folder
	FindFiles(folder string) ([]string, error)
	// ProcessFile processes a single source file and returns calls, write/skip status
	ProcessFile(path string, cache *DirCache) (calls []ClusteredCall, written, skipped bool, err error)
}

// callsFromSource is the shared entry point for all call source tools.
func callsFromSource(src CallSource, input CallsFromSourceInput) (CallsFromSourceOutput, error) {
	var output CallsFromSourceOutput
	output.Filter = src.Name()

	// Collect source files to process
	var files []string
	if input.File != "" {
		files = []string{input.File}
	} else if input.Folder != "" {
		var err error
		files, err = src.FindFiles(input.Folder)
		if err != nil {
			errMsg := fmt.Sprintf("Failed to find %s files: %v", src.Name(), err)
			output.Error = &errMsg
			return output, fmt.Errorf("%s", errMsg)
		}
	} else {
		errMsg := "Either --folder or --file must be specified"
		output.Error = &errMsg
		return output, fmt.Errorf("%s", errMsg)
	}

	if len(files) == 0 {
		errMsg := fmt.Sprintf("No %s files found", src.Name())
		output.Error = &errMsg
		return output, fmt.Errorf("%s", errMsg)
	}

	// Single file or small batch: process sequentially (avoid goroutine overhead)
	if len(files) < 10 {
		return callsFromSourceSequential(src, input, files)
	}

	// Large batch: parallel processing with DirCache
	return callsFromSourceParallel(src, input, files)
}

// sequentialFileResult holds the outcome of processing a single file sequentially.
type sequentialFileResult struct {
	calls            []ClusteredCall
	dataFilesWritten int
	dataFilesSkipped int
	filesDeleted     int
}

// processSequentialFile handles one source file in the sequential path.
func processSequentialFile(src CallSource, file string, dirCaches map[string]*DirCache, shouldDelete bool) (sequentialFileResult, error) {
	var res sequentialFileResult

	dir := filepath.Dir(file)
	cache := dirCaches[dir]
	if cache == nil {
		cache = NewDirCache(dir)
		dirCaches[dir] = cache
	}

	calls, written, skipped, err := src.ProcessFile(file, cache)
	if err != nil {
		return res, fmt.Errorf("error processing %s: %v", file, err)
	}

	if written {
		res.dataFilesWritten = 1
	}
	if skipped {
		res.dataFilesSkipped = 1
	}
	res.calls = calls

	if shouldDelete && written {
		if err := os.Remove(file); err != nil {
			return res, fmt.Errorf("failed to delete %s: %v", file, err)
		}
		res.filesDeleted = 1
	}

	return res, nil
}

// callsFromSourceSequential processes source files one at a time (for small batches)
func callsFromSourceSequential(src CallSource, input CallsFromSourceInput, files []string) (CallsFromSourceOutput, error) {
	var output CallsFromSourceOutput
	output.Filter = src.Name()

	// Build DirCache once for the folder
	dirCaches := make(map[string]*DirCache)
	if input.Folder != "" {
		dirCaches[input.Folder] = NewDirCache(input.Folder)
	}

	var allCalls []ClusteredCall
	speciesCount := make(map[string]int)
	dataFilesWritten := 0
	dataFilesSkipped := 0
	filesProcessed := 0
	filesDeleted := 0

	for _, file := range files {
		res, err := processSequentialFile(src, file, dirCaches, input.Delete)
		if err != nil {
			errMsg := err.Error()
			output.Error = &errMsg
			return output, err
		}

		allCalls = append(allCalls, res.calls...)
		dataFilesWritten += res.dataFilesWritten
		dataFilesSkipped += res.dataFilesSkipped
		filesDeleted += res.filesDeleted
		filesProcessed++

		for _, call := range res.calls {
			speciesCount[call.EbirdCode]++
		}

		if input.ProgressHandler != nil {
			input.ProgressHandler(filesProcessed, len(files), filepath.Base(file))
		}
	}

	sort.Slice(allCalls, func(i, j int) bool {
		if allCalls[i].File != allCalls[j].File {
			return allCalls[i].File < allCalls[j].File
		}
		return allCalls[i].StartTime < allCalls[j].StartTime
	})

	output.Calls = allCalls
	output.TotalCalls = len(allCalls)
	output.SpeciesCount = speciesCount
	output.DataFilesWritten = dataFilesWritten
	output.DataFilesSkipped = dataFilesSkipped
	output.FilesProcessed = filesProcessed
	output.FilesDeleted = filesDeleted

	return output, nil
}

// sourceJob represents a single file to process (generic over CallSource)
type sourceJob struct {
	filePath string
}

// sourceResult represents the result of processing a single source file
type sourceResult struct {
	path    string
	calls   []ClusteredCall
	written bool
	skipped bool
	err     error
}

func (r sourceResult) filePath() string          { return r.path }
func (r sourceResult) getCalls() []ClusteredCall { return r.calls }
func (r sourceResult) wasWritten() bool          { return r.written }
func (r sourceResult) wasSkipped() bool          { return r.skipped }
func (r sourceResult) getError() error           { return r.err }

// callsFromSourceParallel processes source files concurrently using a worker pool and DirCache
func callsFromSourceParallel(src CallSource, input CallsFromSourceInput, files []string) (CallsFromSourceOutput, error) {
	var output CallsFromSourceOutput
	output.Filter = src.Name()

	total := len(files)
	var processed atomic.Int32

	// Build DirCache for the folder
	dirCaches := &sync.Map{}
	if input.Folder != "" {
		cache := NewDirCache(input.Folder)
		dirCaches.Store(input.Folder, cache)
	}

	// Create job and result channels
	jobs := make(chan sourceJob, total)
	results := make(chan parallelResult, total)

	// Start workers
	var wg sync.WaitGroup
	for range DOT_DATA_WORKERS {
		wg.Add(1)
		go sourceWorker(src, dirCaches, jobs, results, &wg)
	}

	// Send jobs
	for _, file := range files {
		jobs <- sourceJob{filePath: file}
	}
	close(jobs)

	// Wait for workers to finish, then close results
	go func() {
		wg.Wait()
		close(results)
	}()

	// Collect results with progress reporting
	stats := aggregateResults(results, total, &processed, input.Delete, input.ProgressHandler)

	if stats.firstErr != nil {
		errMsg := stats.firstErr.Error()
		output.Error = &errMsg
		return output, stats.firstErr
	}

	sortCallsByFileAndTime(stats.calls)

	output.Calls = stats.calls
	output.TotalCalls = len(stats.calls)
	output.SpeciesCount = stats.speciesCount
	output.DataFilesWritten = stats.dataFilesWritten
	output.DataFilesSkipped = stats.dataFilesSkipped
	output.FilesProcessed = stats.filesProcessed
	output.FilesDeleted = stats.filesDeleted

	return output, nil
}

// sourceWorker processes source files from the jobs channel
func sourceWorker(src CallSource, dirCaches *sync.Map, jobs <-chan sourceJob, results chan<- parallelResult, wg *sync.WaitGroup) {
	defer wg.Done()

	for job := range jobs {
		dir := filepath.Dir(job.filePath)

		// Get or create DirCache for this directory
		var cache *DirCache
		if cached, ok := dirCaches.Load(dir); ok {
			cache = cached.(*DirCache)
		} else {
			cache = NewDirCache(dir)
			dirCaches.Store(dir, cache)
		}

		calls, written, skipped, err := src.ProcessFile(job.filePath, cache)
		results <- sourceResult{
			path:    job.filePath,
			calls:   calls,
			written: written,
			skipped: skipped,
			err:     err,
		}
	}
}

// ClusteredCall represents a clustered bird call detection.
// Shared by all call source implementations (preds, birda, raven).
type ClusteredCall struct {
	File      string  `json:"file"`
	StartTime float64 `json:"start_time"`
	EndTime   float64 `json:"end_time"`
	EbirdCode string  `json:"ebird_code"`
	Segments  int     `json:"segments"`
}

// DirCache caches directory entries for fast WAV file lookup.
// Scans the directory once and builds a map from lowercased basename to full filename.
// Safe for concurrent read-only use after construction.
type DirCache struct {
	dir    string
	wavMap map[string]string // lowercase basename -> filename with original case (e.g. "20230610_150000" -> "20230610_150000.WAV")
	dirMap map[string]string // lowercase basename -> filename for any file (used by from-raven for .selections.txt etc.)
}

// NewDirCache creates a DirCache by scanning the directory once.
func NewDirCache(dir string) *DirCache {
	entries, err := os.ReadDir(dir)
	if err != nil {
		return &DirCache{dir: dir, wavMap: make(map[string]string), dirMap: make(map[string]string)}
	}
	wavMap := make(map[string]string, len(entries))
	dirMap := make(map[string]string, len(entries))
	for _, entry := range entries {
		if entry.IsDir() {
			continue
		}
		name := entry.Name()
		ext := filepath.Ext(name)
		base := strings.TrimSuffix(name, ext)
		dirMap[strings.ToLower(base)] = name
		if strings.EqualFold(ext, ".wav") {
			wavMap[strings.ToLower(base)] = name
		}
	}
	return &DirCache{dir: dir, wavMap: wavMap, dirMap: dirMap}
}

// FindWAV looks up a WAV file by basename (case-insensitive).
// Returns the full path with correct case, or empty string if not found.
func (dc *DirCache) FindWAV(baseName string) string {
	if name, ok := dc.wavMap[strings.ToLower(baseName)]; ok {
		return filepath.Join(dc.dir, name)
	}
	return ""
}

// FindFile looks up any file by basename (case-insensitive).
// Returns the full path with correct case, or empty string if not found.
func (dc *DirCache) FindFile(baseName string) string {
	if name, ok := dc.dirMap[strings.ToLower(baseName)]; ok {
		return filepath.Join(dc.dir, name)
	}
	return ""
}

// findWAVFile finds a WAV file in the directory with case-insensitive matching.
// baseName is the filename without extension (e.g., "20230610_150000").
// Returns the full path with correct case, or empty string if not found.
// Deprecated: Use DirCache.FindWAV for batch operations to avoid repeated directory scans.
func findWAVFile(dir, baseName string) string {
	entries, err := os.ReadDir(dir)
	if err != nil {
		return ""
	}
	for _, entry := range entries {
		if entry.IsDir() {
			continue
		}
		name := entry.Name()
		ext := filepath.Ext(name)
		nameNoExt := strings.TrimSuffix(name, ext)
		if nameNoExt == baseName && strings.EqualFold(ext, ".wav") {
			return filepath.Join(dir, name)
		}
	}
	return ""
}

// ParseFilterFromFilename extracts filter name from preds CSV filename.
// "predsST_opensoundscape-kiwi-1.2_2025-11-12.csv" -> "opensoundscape-kiwi-1.2"
// Returns empty string if parsing fails.
func ParseFilterFromFilename(csvPath string) string {
	filename := filepath.Base(csvPath)
	// Remove .csv extension
	name := strings.TrimSuffix(filename, ".csv")

	// Split on underscore
	parts := strings.Split(name, "_")
	if len(parts) == 3 {
		return parts[1]
	}

	return ""
}

// clusterStartTimes groups consecutive start times into clusters
// where the gap between consecutive times is <= gapThreshold.
func clusterStartTimes(startTimes []float64, gapThreshold float64) [][]float64 {
	if len(startTimes) == 0 {
		return nil
	}

	var clusters [][]float64
	currentCluster := []float64{startTimes[0]}

	for i := 1; i < len(startTimes); i++ {
		gap := startTimes[i] - startTimes[i-1]
		if gap <= gapThreshold {
			// Same cluster
			currentCluster = append(currentCluster, startTimes[i])
		} else {
			// New cluster
			clusters = append(clusters, currentCluster)
			currentCluster = []float64{startTimes[i]}
		}
	}
	// Don't forget the last cluster
	clusters = append(clusters, currentCluster)

	return clusters
}

// predFileSpeciesKey groups detections by file and ebird code.
// Used by clusterDetections in calls_from_preds.go.
type predFileSpeciesKey struct {
	File      string
	EbirdCode string
}

// clusterDetections groups detections into clusters and produces sorted ClusteredCalls.
func clusterDetections(detections map[predFileSpeciesKey][]float64, clipDuration, gapThreshold float64, minDetections int) ([]ClusteredCall, map[string]int) {
	var allCalls []ClusteredCall
	speciesCount := make(map[string]int)

	for key, startTimes := range detections {
		sort.Float64s(startTimes)

		clusters := clusterStartTimes(startTimes, gapThreshold)

		for _, cluster := range clusters {
			if len(cluster) <= minDetections {
				continue
			}

			call := ClusteredCall{
				File:      key.File,
				StartTime: cluster[0],
				EndTime:   cluster[len(cluster)-1] + clipDuration,
				EbirdCode: key.EbirdCode,
				Segments:  len(cluster),
			}
			allCalls = append(allCalls, call)
			speciesCount[key.EbirdCode]++
		}
	}

	sort.Slice(allCalls, func(i, j int) bool {
		if allCalls[i].File != allCalls[j].File {
			return allCalls[i].File < allCalls[j].File
		}
		return allCalls[i].StartTime < allCalls[j].StartTime
	})

	return allCalls, speciesCount
}