import.go
package cmd
import (
"context"
"encoding/json"
"flag"
"fmt"
"os"
imp "skraak/tools/import"
)
// RunImport handles the "import" subcommand
func RunImport(args []string) error {
if len(args) < 1 {
printImportUsage()
return fmt.Errorf("import subcommand required")
}
switch args[0] {
case "bulk":
return runImportBulk(args[1:])
case "file":
return runImportFile(args[1:])
case "folder":
return runImportFolder(args[1:])
case "segments":
return runImportSegments(args[1:])
case "unstructured":
return runImportUnstructured(args[1:])
default:
printImportUsage()
return fmt.Errorf("unknown import subcommand: %s", args[0])
}
}
func printImportUsage() {
fmt.Fprintf(os.Stderr, "Usage: skraak import <subcommand> [options]\n\n")
fmt.Fprintf(os.Stderr, "Subcommands:\n")
fmt.Fprintf(os.Stderr, " file Import a single WAV file (structured datasets)\n")
fmt.Fprintf(os.Stderr, " folder Import all WAV files from a folder (structured datasets)\n")
fmt.Fprintf(os.Stderr, " bulk Bulk import WAV files from CSV (structured datasets)\n")
fmt.Fprintf(os.Stderr, " unstructured Import WAV files into unstructured dataset (no location/cluster)\n")
fmt.Fprintf(os.Stderr, " segments Import segments from AviaNZ .data files (structured datasets)\n")
fmt.Fprintf(os.Stderr, "\nExamples:\n")
fmt.Fprintf(os.Stderr, " skraak import bulk --db ./db/skraak.duckdb --dataset abc123 --csv import.csv --log progress.log\n")
fmt.Fprintf(os.Stderr, " skraak import file --db ./db/skraak.duckdb --dataset abc123 --location loc456 --cluster clust789 --file /path/to/file.wav\n")
fmt.Fprintf(os.Stderr, " skraak import folder --db ./db/skraak.duckdb --dataset abc123 --location loc456 --cluster clust789 --folder /path/to/folder\n")
fmt.Fprintf(os.Stderr, " skraak import segments --db ./db/skraak.duckdb --dataset abc123 --location loc456 --cluster clust789 --folder /path/to/folder --mapping mapping.json\n")
fmt.Fprintf(os.Stderr, " skraak import unstructured --db ./db/skraak.duckdb --dataset abc123 --folder /path/to/folder\n")
}
// runImportBulk bulk imports WAV files across multiple locations/clusters using a CSV file.
//
// JSON output schema:
//
// {
// "total_locations": int, // Total locations in CSV
// "clusters_created": int, // New clusters created
// "clusters_existing": int, // Existing clusters reused
// "total_files_scanned": int, // Total WAV files found
// "files_imported": int, // Successfully imported files
// "files_duplicate": int, // Duplicate files skipped
// "files_error": int, // Files that failed to import
// "processing_time": string, // Human-readable duration
// "errors": [string] // Error messages (omitted if empty)
// }
func runImportBulk(args []string) error {
fs := flag.NewFlagSet("import bulk", flag.ExitOnError)
dbPath := fs.String("db", "", "Path to DuckDB database (required)")
datasetID := fs.String("dataset", "", "Dataset ID (required)")
csvPath := fs.String("csv", "", "Path to CSV file (required)")
logPath := fs.String("log", "", "Path to progress log file (required)")
fs.Usage = usagePrinter(fs,
"skraak import bulk [options]",
"Bulk import WAV files across multiple locations/clusters using a CSV file.\n\n"+
"CSV format: location_name,location_id,directory_path,date_range,sample_rate,file_count\n\n"+
"Monitor progress: tail -f <log-file>",
)
if err := fs.Parse(args); err != nil {
return fmt.Errorf("parsing arguments: %w", err)
}
if err := requireFlags(fs, map[string]any{
"--db": *dbPath,
"--dataset": *datasetID,
"--csv": *csvPath,
"--log": *logPath,
}); err != nil {
return err
}
// Set DB path and run
defer initEventLog(*dbPath)()
input := imp.BulkFileImportInput{
DBPath: *dbPath,
DatasetID: *datasetID,
CSVPath: *csvPath,
LogFilePath: *logPath,
}
fmt.Fprintf(os.Stderr, "Starting bulk import...\n")
fmt.Fprintf(os.Stderr, " Database: %s\n", *dbPath)
fmt.Fprintf(os.Stderr, " Dataset: %s\n", *datasetID)
fmt.Fprintf(os.Stderr, " CSV: %s\n", *csvPath)
fmt.Fprintf(os.Stderr, " Log: %s\n", *logPath)
fmt.Fprintf(os.Stderr, "\nMonitor progress: tail -f %s\n\n", *logPath)
output, err := imp.BulkFileImport(context.Background(), input)
if err != nil {
// Still print partial output if available
if output.TotalLocations > 0 || output.FilesImported > 0 {
_ = printJSON(output)
}
return fmt.Errorf("bulk import: %w", err)
}
return printJSON(output)
}
// runImportFile imports a single WAV file into the database.
//
// JSON output schema:
//
// {
// "file_id": string, // Generated 21-character nanoid
// "file_name": string, // Base filename
// "hash": string, // XXH64 hash (16-character hex)
// "duration_seconds": float, // File duration in seconds
// "sample_rate": int, // Sample rate in Hz
// "timestamp_local": string, // Local timestamp (RFC3339)
// "is_audiomoth": bool, // AudioMoth detection
// "is_duplicate": bool, // Skipped as duplicate
// "processing_time": string, // Duration string
// "error": string // Error message if failed (omitted if nil)
// }
func runImportFile(args []string) error {
fs := flag.NewFlagSet("import file", flag.ExitOnError)
dbPath := fs.String("db", "", "Path to DuckDB database (required)")
datasetID := fs.String("dataset", "", "Dataset ID (required)")
locationID := fs.String("location", "", "Location ID (required)")
clusterID := fs.String("cluster", "", "Cluster ID (required)")
filePath := fs.String("file", "", "Path to WAV file (required)")
fs.Usage = usagePrinter(fs,
"skraak import file [options]",
"Import a single WAV file into the database.",
"skraak import file --db ./db/skraak.duckdb --dataset abc123 --location loc456 --cluster clust789 --file /path/to/file.wav",
)
if err := fs.Parse(args); err != nil {
return fmt.Errorf("parsing arguments: %w", err)
}
if err := requireFlags(fs, map[string]any{
"--db": *dbPath,
"--dataset": *datasetID,
"--location": *locationID,
"--cluster": *clusterID,
"--file": *filePath,
}); err != nil {
return err
}
defer initEventLog(*dbPath)()
input := imp.ImportFileInput{
DBPath: *dbPath,
FilePath: *filePath,
DatasetID: *datasetID,
LocationID: *locationID,
ClusterID: *clusterID,
}
fmt.Fprintf(os.Stderr, "Importing file: %s\n", *filePath)
output, err := imp.ImportFile(context.Background(), input)
if err != nil {
return fmt.Errorf("import file: %w", err)
}
return printJSON(output)
}
// runImportFolder imports all WAV files from a folder into the database.
//
// JSON output schema:
//
// {
// "summary": {
// "total_files": int, // Total WAV files found
// "imported_files": int, // Successfully imported
// "skipped_files": int, // Duplicates skipped
// "failed_files": int, // Failed imports
// "audiomoth_files": int, // AudioMoth files detected
// "total_duration_seconds": float, // Total duration imported
// "processing_time": string // Human-readable duration
// },
// "file_ids": [string], // List of successfully imported file IDs
// "errors": [ // Import errors (omitted if empty)
// {"file_name": string, "error": string, "stage": string}
// ]
// }
func runImportFolder(args []string) error {
fs := flag.NewFlagSet("import folder", flag.ExitOnError)
dbPath := fs.String("db", "", "Path to DuckDB database (required)")
datasetID := fs.String("dataset", "", "Dataset ID (required)")
locationID := fs.String("location", "", "Location ID (required)")
clusterID := fs.String("cluster", "", "Cluster ID (required)")
folderPath := fs.String("folder", "", "Path to folder containing WAV files (required)")
recursive := fs.Bool("recursive", true, "Scan subfolders recursively (default: true)")
fs.Usage = usagePrinter(fs,
"skraak import folder [options]",
"Import all WAV files from a folder into the database.",
"skraak import folder --db ./db/skraak.duckdb --dataset abc123 --location loc456 --cluster clust789 --folder /path/to/folder",
)
if err := fs.Parse(args); err != nil {
return fmt.Errorf("parsing arguments: %w", err)
}
if err := requireFlags(fs, map[string]any{
"--db": *dbPath,
"--dataset": *datasetID,
"--location": *locationID,
"--cluster": *clusterID,
"--folder": *folderPath,
}); err != nil {
return err
}
defer initEventLog(*dbPath)()
input := imp.ImportAudioFilesInput{
DBPath: *dbPath,
FolderPath: *folderPath,
DatasetID: *datasetID,
LocationID: *locationID,
ClusterID: *clusterID,
Recursive: recursive,
}
fmt.Fprintf(os.Stderr, "Importing from folder: %s\n", *folderPath)
if *recursive {
fmt.Fprintf(os.Stderr, "Scanning recursively...\n")
}
output, err := imp.ImportAudioFiles(context.Background(), input)
if err != nil {
// Still print partial results if available
if len(output.FileIDs) > 0 {
_ = printJSON(output)
}
return fmt.Errorf("import folder: %w", err)
}
return printJSON(output)
}
// runImportSegments imports segments from AviaNZ .data files into the database.
//
// JSON output schema:
//
// {
// "summary": {
// "data_files_found": int, // .data files found
// "data_files_processed": int, // .data files processed
// "total_segments": int, // Total segments in .data files
// "imported_segments": int, // Successfully imported segments
// "imported_labels": int, // Successfully imported labels
// "imported_subtypes": int, // Successfully imported subtypes
// "processing_time_ms": int // Processing time in milliseconds
// },
// "segments": [
// {
// "segment_id": string, // Generated segment ID
// "file_name": string, // Source WAV filename
// "start_time": float, // Segment start time in seconds
// "end_time": float, // Segment end time in seconds
// "freq_low": float, // Low frequency bound
// "freq_high": float, // High frequency bound
// "labels": [
// {
// "label_id": string, // Generated label ID
// "species": string, // Species name
// "calltype": string, // Call type (omitted if empty)
// "filter": string, // Filter name
// "certainty": int, // Certainty level
// "comment": string // Comment (omitted if empty)
// }
// ]
// }
// ],
// "errors": [ // Import errors (omitted if empty)
// {"file": string, "stage": string, "message": string}
// ]
// }
func runImportSegments(args []string) error {
fs := flag.NewFlagSet("import segments", flag.ExitOnError)
dbPath := fs.String("db", "", "Path to DuckDB database (required)")
datasetID := fs.String("dataset", "", "Dataset ID (required)")
locationID := fs.String("location", "", "Location ID (required)")
clusterID := fs.String("cluster", "", "Cluster ID (required)")
folderPath := fs.String("folder", "", "Path to folder containing .data files (required)")
mappingPath := fs.String("mapping", "", "Path to mapping JSON file (required)")
fs.Usage = usagePrinter(fs,
"skraak import segments [options]",
"Import segments from AviaNZ .data files into the database.\n"+
"Applies species/calltype mapping from JSON file.\n\n"+
"Mapping file format:\n"+
" {\n"+
" \"GSK\": {\"species\": \"Roroa\", \"calltypes\": {\"Male\": \"Male - Solo\"}},\n"+
" \"Don't Know\": {\"species\": \"Don't Know\"}\n"+
" }\n\n"+
"Invariants:\n"+
" - All file hashes must already exist in database for the cluster\n"+
" - All files must have no existing labels (fresh imports only)\n"+
" - All filters, species, and calltypes must exist in database\n"+
" - Bookmark flags are ignored (not stored in database)",
"skraak import segments --db ./db/skraak.duckdb --dataset dset_id123 --location loc_id456 --cluster clust_id789 --folder /path/to/data --mapping mapping.json",
)
if err := fs.Parse(args); err != nil {
return fmt.Errorf("parsing arguments: %w", err)
}
if err := requireFlags(fs, map[string]any{
"--db": *dbPath,
"--dataset": *datasetID,
"--location": *locationID,
"--cluster": *clusterID,
"--folder": *folderPath,
"--mapping": *mappingPath,
}); err != nil {
return err
}
defer initEventLog(*dbPath)()
input := imp.ImportSegmentsInput{
DBPath: *dbPath,
Folder: *folderPath,
Mapping: *mappingPath,
DatasetID: *datasetID,
LocationID: *locationID,
ClusterID: *clusterID,
ProgressHandler: func(processed, total int, message string) {
if total > 0 {
percent := float64(processed) / float64(total) * 100
fmt.Fprintf(os.Stderr, "\rProcessing .data files: %d/%d (%.0f%%) - %s", processed, total, percent, message)
if processed == total {
fmt.Fprintf(os.Stderr, "\n")
}
}
},
}
fmt.Fprintf(os.Stderr, "Importing segments from: %s\n", *folderPath)
fmt.Fprintf(os.Stderr, "Using mapping: %s\n", *mappingPath)
output, err := imp.ImportSegments(context.Background(), input)
if err != nil {
fmt.Fprintf(os.Stderr, "\n")
// Still print partial results if available
if len(output.Segments) > 0 || len(output.Errors) > 0 {
_ = printJSON(output)
}
return fmt.Errorf("import segments: %w", err)
}
fmt.Fprintf(os.Stderr, "\nImport complete:\n")
fmt.Fprintf(os.Stderr, " Data files processed: %d\n", output.Summary.DataFilesProcessed)
fmt.Fprintf(os.Stderr, " Segments imported: %d\n", output.Summary.ImportedSegments)
fmt.Fprintf(os.Stderr, " Labels imported: %d\n", output.Summary.ImportedLabels)
fmt.Fprintf(os.Stderr, " Subtypes imported: %d\n", output.Summary.ImportedSubtypes)
return printJSON(output)
}
// runImportUnstructured imports WAV files into an unstructured dataset.
//
// JSON output schema:
//
// {
// "total_files": int, // Total WAV files found
// "imported_files": int, // Successfully imported
// "skipped_files": int, // Duplicates skipped
// "failed_files": int, // Failed imports
// "total_duration_seconds": float, // Total duration imported
// "processing_time": string, // Human-readable duration
// "errors": [ // Import errors (omitted if empty)
// {"file_name": string, "error": string, "stage": string}
// ]
// }
func runImportUnstructured(args []string) error {
fs := flag.NewFlagSet("import unstructured", flag.ExitOnError)
dbPath := fs.String("db", "", "Path to DuckDB database (required)")
datasetID := fs.String("dataset", "", "Dataset ID (required - must be 'unstructured' type)")
folderPath := fs.String("folder", "", "Path to folder containing WAV files (required)")
recursive := fs.Bool("recursive", true, "Scan subfolders recursively (default: true)")
fs.Usage = usagePrinter(fs,
"skraak import unstructured [options]",
"Import WAV files into an unstructured dataset.\n"+
"Files are stored with minimal metadata (hash, duration, sample_rate, file modification time).\n"+
"No location/cluster hierarchy required.",
"skraak import unstructured --db ./db/skraak.duckdb --dataset abc123 --folder /path/to/folder",
"skraak import unstructured --db ./db/skraak.duckdb --dataset abc123 --folder /path/to/folder --recursive=false",
)
if err := fs.Parse(args); err != nil {
return fmt.Errorf("parsing arguments: %w", err)
}
if err := requireFlags(fs, map[string]any{
"--db": *dbPath,
"--dataset": *datasetID,
"--folder": *folderPath,
}); err != nil {
return err
}
defer initEventLog(*dbPath)()
input := imp.ImportUnstructuredInput{
DBPath: *dbPath,
DatasetID: *datasetID,
FolderPath: *folderPath,
Recursive: recursive,
}
fmt.Fprintf(os.Stderr, "Importing into unstructured dataset: %s\n", *datasetID)
fmt.Fprintf(os.Stderr, "Scanning folder: %s\n", *folderPath)
if *recursive {
fmt.Fprintf(os.Stderr, "Scanning recursively...\n")
}
output, err := imp.ImportUnstructured(context.Background(), input)
if err != nil {
return fmt.Errorf("import unstructured: %w", err)
}
return printJSON(output)
}
func printJSON(v any) error {
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(v); err != nil {
return fmt.Errorf("encoding output: %w", err)
}
return nil
}