cluster_import.go
package imp
import (
"context"
"database/sql"
"fmt"
"os"
"path/filepath"
"time"
"skraak/astro"
"skraak/utils"
"skraak/wav"
)
// Mutator represents a transaction-like object that supports both reads and writes.
// Both *sql.Tx and *db.LoggedTx satisfy this interface.
// Uses Context variants exclusively so all DB-facing interfaces compose as
// compatible subsets of *sql.DB / *sql.Tx.
type Mutator interface {
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}
// Reader is the read-only interface for database queries within tools/import.
// Both *sql.DB and *db.LoggedTx satisfy it.
type Reader interface {
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}
// ClusterImportInput defines parameters for importing one cluster
type ClusterImportInput struct {
FolderPath string // Absolute path to folder with WAV files
DatasetID string // 12-char dataset ID
LocationID string // 12-char location ID
ClusterID string // 12-char cluster ID
Recursive bool // Scan subfolders?
}
// ClusterImportOutput provides results and statistics
type ClusterImportOutput struct {
TotalFiles int
ImportedFiles int
SkippedFiles int // Duplicates
FailedFiles int
AudioMothFiles int
TotalDuration float64
ProcessingTime string
Errors []FileImportError
}
// LocationData holds location information needed for processing
type LocationData struct {
Latitude float64
Longitude float64
TimezoneID string
}
// ImportCluster imports all WAV files from a folder into a cluster.
// The caller must provide an open transaction via tx; this function does NOT
// commit or rollback — the caller owns the transaction lifecycle.
//
// This is the canonical cluster import logic used by both:
// - import_files.go (single cluster)
// - bulk_file_import.go (multiple clusters)
//
// Steps:
// 1. Validate folder exists
// 2. Get location metadata (lat/lon/timezone) from database
// 3. Scan folder for WAV files (recursive or not)
// 4. Batch process all files:
// - Parse WAV headers (includes file mod time)
// - Batch parse filename timestamps (variance-based)
// - Resolve timestamps (AudioMoth → filename → file mod time)
// - Calculate hashes
// - Calculate astronomical data
// 5. Batch insert using the provided transaction:
// - Check duplicates
// - INSERT INTO file
// - INSERT INTO file_dataset (ALWAYS)
// - INSERT INTO moth_metadata (if AudioMoth)
// 6. Return summary statistics
func ImportCluster(
database Reader,
tx Mutator,
input ClusterImportInput,
) (*ClusterImportOutput, error) {
startTime := time.Now()
// Validate folder exists
info, err := os.Stat(input.FolderPath)
if err != nil {
return nil, fmt.Errorf("folder not accessible: %w", err)
}
if !info.IsDir() {
return nil, fmt.Errorf("path is not a directory: %s", input.FolderPath)
}
// Get location data for astronomical calculations
locationData, err := GetLocationData(database, input.LocationID)
if err != nil {
return nil, fmt.Errorf("failed to get location data: %w", err)
}
// Scan folder for WAV files
wavFiles, err := utils.FindFiles(input.FolderPath, utils.FindFilesOptions{
Extension: ".wav",
Recursive: input.Recursive,
SkipPrefixes: []string{"Clips_"},
SkipHidden: true, // Standard to ignore hidden
MinSize: 1, // Must have size > 0
})
if err != nil {
return nil, fmt.Errorf("failed to scan folder: %w", err)
}
// If no files, return early
if len(wavFiles) == 0 {
return &ClusterImportOutput{
TotalFiles: 0,
ProcessingTime: time.Since(startTime).String(),
Errors: []FileImportError{},
}, nil
}
// Batch process all files
filesData, processErrors := batchProcessFiles(wavFiles, locationData)
// Batch insert into database using the provided transaction
imported, skipped, insertErrors, err := insertClusterFiles(
tx,
filesData,
input.DatasetID,
input.ClusterID,
input.LocationID,
)
if err != nil {
return nil, fmt.Errorf("database insertion failed: %w", err)
}
// Combine all errors
allErrors := append(processErrors, insertErrors...)
// Calculate summary statistics
audiomothCount := 0
totalDuration := 0.0
for _, fd := range filesData {
if fd.IsAudioMoth {
audiomothCount++
}
totalDuration += fd.Duration
}
return &ClusterImportOutput{
TotalFiles: len(wavFiles),
ImportedFiles: imported,
SkippedFiles: skipped,
FailedFiles: len(allErrors),
AudioMothFiles: audiomothCount,
TotalDuration: totalDuration,
ProcessingTime: time.Since(startTime).String(),
Errors: allErrors,
}, nil
}
// GetLocationData retrieves location coordinates and timezone
func GetLocationData(database Reader, locationID string) (*LocationData, error) {
ctx := context.Background()
var loc LocationData
err := database.QueryRowContext(ctx,
"SELECT latitude, longitude, timezone_id FROM location WHERE id = ?",
locationID,
).Scan(&loc.Latitude, &loc.Longitude, &loc.TimezoneID)
if err != nil {
return nil, fmt.Errorf("failed to query location data: %w", err)
}
return &loc, nil
}
// EnsureClusterPath sets the cluster's path field if it's currently empty.
// Accepts any type with QueryRowContext and ExecContext (e.g. *sql.DB, *sql.Tx, *db.LoggedTx).
func EnsureClusterPath(database Mutator, clusterID, folderPath string) error {
ctx := context.Background()
// Check if cluster already has a path
var currentPath sql.NullString
err := database.QueryRowContext(ctx, "SELECT path FROM cluster WHERE id = ?", clusterID).Scan(¤tPath)
if err != nil {
return fmt.Errorf("failed to query cluster: %w", err)
}
// If path is already set, skip
if currentPath.Valid && currentPath.String != "" {
return nil
}
// Normalize folder path
normalizedPath := utils.NormalizeFolderPath(folderPath)
// Update cluster with normalized path
_, err = database.ExecContext(ctx,
"UPDATE cluster SET path = ?, last_modified = now() WHERE id = ?",
normalizedPath,
clusterID,
)
if err != nil {
return fmt.Errorf("failed to update cluster path: %w", err)
}
return nil
}
// wavInfo holds WAV metadata and hash for a single file during batch processing
type wavInfo struct {
path string
metadata *wav.WAVMetadata
hash string
err error
}
// parseFilenameTimestampsBatch parses filename timestamps and applies timezone offsets.
// Returns a map from wavInfos index to adjusted timestamp, and any errors.
func parseFilenameTimestampsBatch(
wavInfos []wavInfo,
filenameIndices []int,
filenames []string,
timezoneID string,
) (map[int]time.Time, []FileImportError) {
var errors []FileImportError
result := make(map[int]time.Time)
filenameTimestamps, err := wav.ParseFilenameTimestamps(filenames)
if err != nil {
for _, idx := range filenameIndices {
errors = append(errors, FileImportError{
FileName: filepath.Base(wavInfos[idx].path),
Error: fmt.Sprintf("filename timestamp parsing failed: %v", err),
Stage: StageParse,
})
}
return result, errors
}
adjustedTimestamps, err := wav.ApplyTimezoneOffset(filenameTimestamps, timezoneID)
if err != nil {
for _, idx := range filenameIndices {
errors = append(errors, FileImportError{
FileName: filepath.Base(wavInfos[idx].path),
Error: fmt.Sprintf("timezone offset failed: %v", err),
Stage: StageParse,
})
}
return result, errors
}
for j, idx := range filenameIndices {
result[idx] = adjustedTimestamps[j]
}
return result, errors
}
// resolveFileData resolves timestamp and calculates astronomical data for a single WAV file.
func resolveFileData(info wavInfo, preParsedTime *time.Time, location *LocationData) (*wav.FileProcessingResult, error) {
tsResult, err := wav.ResolveTimestamp(info.metadata, info.path, location.TimezoneID, true, preParsedTime)
if err != nil {
return nil, err
}
astroData := astro.CalculateAstronomicalData(
tsResult.Timestamp.UTC(),
info.metadata.Duration,
location.Latitude,
location.Longitude,
)
return &wav.FileProcessingResult{
FileName: filepath.Base(info.path),
Hash: info.hash,
Duration: info.metadata.Duration,
SampleRate: info.metadata.SampleRate,
TimestampLocal: tsResult.Timestamp,
IsAudioMoth: tsResult.IsAudioMoth,
MothData: tsResult.MothData,
AstroData: astroData,
}, nil
}
// batchProcessFiles extracts metadata and calculates hashes for all files
func batchProcessFiles(wavFiles []string, location *LocationData) ([]*wav.FileProcessingResult, []FileImportError) {
var filesData []*wav.FileProcessingResult
var errors []FileImportError
// Step 1: Extract WAV metadata and hash in single pass
wavInfos := make([]wavInfo, len(wavFiles))
for i, path := range wavFiles {
metadata, hash, err := wav.ParseWAVHeaderWithHash(path)
wavInfos[i] = wavInfo{path: path, metadata: metadata, hash: hash, err: err}
}
// Step 2: Collect filenames for batch timestamp parsing
var filenamesForParsing []string
var filenameIndices []int
for i, info := range wavInfos {
if info.err != nil {
errors = append(errors, FileImportError{
FileName: filepath.Base(info.path),
Error: info.err.Error(),
Stage: StageParse,
})
continue
}
if wav.HasTimestampFilename(info.path) {
filenamesForParsing = append(filenamesForParsing, filepath.Base(info.path))
filenameIndices = append(filenameIndices, i)
}
}
// Step 3: Parse filename timestamps in batch (if any)
filenameTimestampMap := make(map[int]time.Time)
if len(filenamesForParsing) > 0 {
tsMap, tsErrors := parseFilenameTimestampsBatch(wavInfos, filenameIndices, filenamesForParsing, location.TimezoneID)
errors = append(errors, tsErrors...)
filenameTimestampMap = tsMap
}
// Step 4: Process each file
for i, info := range wavInfos {
if info.err != nil {
continue
}
var preParsedTime *time.Time
if ts, ok := filenameTimestampMap[i]; ok {
preParsedTime = &ts
}
fd, err := resolveFileData(info, preParsedTime, location)
if err != nil {
errors = append(errors, FileImportError{
FileName: filepath.Base(info.path),
Error: err.Error(),
Stage: StageParse,
})
continue
}
filesData = append(filesData, fd)
}
return filesData, errors
}
// insertSingleFile inserts one file's data into the database within an existing transaction.
// Returns (imported=true, nil) on success, (imported=false, nil) if skipped, or (false, error) on failure.
func insertSingleFile(
ctx context.Context,
tx Mutator,
fd *wav.FileProcessingResult,
datasetID, clusterID, locationID string,
) (bool, error) {
// Check for duplicate hash
_, isDuplicate, err := CheckDuplicateHash(tx, fd.Hash)
if err != nil {
return false, fmt.Errorf("duplicate check failed: %w", err)
}
if isDuplicate {
return false, nil // skipped
}
// Generate file ID
fileID, err := utils.GenerateLongID()
if err != nil {
return false, fmt.Errorf("ID generation failed: %w", err)
}
// Insert file record
_, err = tx.ExecContext(ctx, `
INSERT INTO file (
id, file_name, xxh64_hash, location_id, timestamp_local,
cluster_id, duration, sample_rate, maybe_solar_night, maybe_civil_night,
moon_phase, created_at, last_modified, active
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now(), true)
`,
fileID, fd.FileName, fd.Hash, locationID,
fd.TimestampLocal, clusterID, fd.Duration, fd.SampleRate,
fd.AstroData.SolarNight, fd.AstroData.CivilNight, fd.AstroData.MoonPhase,
)
if err != nil {
return false, fmt.Errorf("file insert failed: %w", err)
}
// Insert file_dataset junction (ALWAYS)
_, err = tx.ExecContext(ctx, `
INSERT INTO file_dataset (file_id, dataset_id, created_at, last_modified)
VALUES (?, ?, now(), now())
`, fileID, datasetID)
if err != nil {
return false, fmt.Errorf("file_dataset insert failed: %w", err)
}
// If AudioMoth, insert moth_metadata
if fd.IsAudioMoth && fd.MothData != nil {
_, err = tx.ExecContext(ctx, `
INSERT INTO moth_metadata (
file_id, timestamp, recorder_id, gain, battery_v, temp_c,
created_at, last_modified, active
) VALUES (?, ?, ?, ?, ?, ?, now(), now(), true)
`,
fileID,
fd.MothData.Timestamp,
&fd.MothData.RecorderID,
&fd.MothData.Gain,
&fd.MothData.BatteryV,
&fd.MothData.TempC,
)
if err != nil {
return false, fmt.Errorf("moth_metadata insert failed: %w", err)
}
}
return true, nil
}
// insertClusterFiles inserts all file data into database using the provided transaction.
// The caller is responsible for committing or rolling back the transaction.
func insertClusterFiles(
tx Mutator,
filesData []*wav.FileProcessingResult,
datasetID, clusterID, locationID string,
) (imported, skipped int, errors []FileImportError, err error) {
ctx := context.Background()
for _, fd := range filesData {
wasImported, insertErr := insertSingleFile(ctx, tx, fd, datasetID, clusterID, locationID)
if insertErr != nil {
errors = append(errors, FileImportError{
FileName: fd.FileName,
Error: insertErr.Error(),
Stage: StageInsert,
})
continue
}
if wasImported {
imported++
} else {
skipped++
}
}
return imported, skipped, errors, nil
}