Fork channel

Create a new channel as a copy of main.

Rename channel

Rename main to:

Delete channel

Delete main? This cannot be undone.

parallel_aggregate_test.go
package calls

import (
	"errors"
	"sync/atomic"
	"testing"
)

type fakeResult struct {
	path    string
	calls   []ClusteredCall
	written bool
	skipped bool
	err     error
}

func (r fakeResult) filePath() string          { return r.path }
func (r fakeResult) getCalls() []ClusteredCall { return r.calls }
func (r fakeResult) wasWritten() bool          { return r.written }
func (r fakeResult) wasSkipped() bool          { return r.skipped }
func (r fakeResult) getError() error           { return r.err }

func sendAll(results []parallelResult) <-chan parallelResult {
	ch := make(chan parallelResult, len(results))
	for _, r := range results {
		ch <- r
	}
	close(ch)
	return ch
}

func TestAggregateResults_CountsAndSpecies(t *testing.T) {
	results := []parallelResult{
		fakeResult{path: "a.wav", written: true, calls: []ClusteredCall{
			{File: "a.wav", EbirdCode: "tomtit1", StartTime: 1},
			{File: "a.wav", EbirdCode: "tomtit1", StartTime: 2},
		}},
		fakeResult{path: "b.wav", skipped: true},
		fakeResult{path: "c.wav", written: true, calls: []ClusteredCall{
			{File: "c.wav", EbirdCode: "bellbird1", StartTime: 0.5},
		}},
	}
	var processed atomic.Int32
	var progressCalls int
	stats := aggregateResults(sendAll(results), len(results), &processed, false,
		func(cur, total int, msg string) { progressCalls++ })

	if stats.filesProcessed != 3 {
		t.Errorf("filesProcessed: got %d want 3", stats.filesProcessed)
	}
	if stats.dataFilesWritten != 2 {
		t.Errorf("written: got %d want 2", stats.dataFilesWritten)
	}
	if stats.dataFilesSkipped != 1 {
		t.Errorf("skipped: got %d want 1", stats.dataFilesSkipped)
	}
	if len(stats.calls) != 3 {
		t.Errorf("calls: got %d want 3", len(stats.calls))
	}
	if stats.speciesCount["tomtit1"] != 2 || stats.speciesCount["bellbird1"] != 1 {
		t.Errorf("speciesCount: got %v", stats.speciesCount)
	}
	if stats.firstErr != nil {
		t.Errorf("firstErr: got %v want nil", stats.firstErr)
	}
	if progressCalls != 3 {
		t.Errorf("progressCalls: got %d want 3", progressCalls)
	}
	if stats.filesDeleted != 0 {
		t.Errorf("filesDeleted: got %d want 0 (deleteFiles=false)", stats.filesDeleted)
	}
}

func TestAggregateResults_KeepsFirstError(t *testing.T) {
	err1 := errors.New("first")
	err2 := errors.New("second")
	results := []parallelResult{
		fakeResult{path: "a", err: err1},
		fakeResult{path: "b", err: err2},
		fakeResult{path: "c"},
	}
	var processed atomic.Int32
	stats := aggregateResults(sendAll(results), 3, &processed, false, nil)
	if stats.firstErr != err1 {
		t.Errorf("firstErr: got %v want %v", stats.firstErr, err1)
	}
}

func TestAggregateResults_NilProgressHandler(t *testing.T) {
	var processed atomic.Int32
	stats := aggregateResults(sendAll([]parallelResult{fakeResult{path: "a"}}), 1, &processed, false, nil)
	if stats.filesProcessed != 1 {
		t.Errorf("filesProcessed: got %d want 1", stats.filesProcessed)
	}
}

func TestSortCallsByFileAndTime(t *testing.T) {
	calls := []ClusteredCall{
		{File: "b.wav", StartTime: 1},
		{File: "a.wav", StartTime: 2},
		{File: "a.wav", StartTime: 1},
		{File: "b.wav", StartTime: 0.5},
	}
	sortCallsByFileAndTime(calls)
	want := []struct {
		file  string
		start float64
	}{
		{"a.wav", 1},
		{"a.wav", 2},
		{"b.wav", 0.5},
		{"b.wav", 1},
	}
	for i, w := range want {
		if calls[i].File != w.file || calls[i].StartTime != w.start {
			t.Errorf("calls[%d]: got %+v want %+v", i, calls[i], w)
		}
	}
}