/**
* Database batch processing utilities for efficient bulk operations
* Optimized for Cloudflare Workers and Neon HTTP driver
*/
import { nanoid } from "nanoid";
import {
processInChunks,
ChunkProcessorOptions,
ChunkProcessResult,
ChunkProgress
} from "./chunkProcessor";
import type { Database } from "./database";
// Optimal batch sizes for Cloudflare Workers and Neon HTTP driver
export const BATCH_SIZES = {
SELECTIONS: 50, // Larger records with more fields
LABELS: 100, // Medium-sized records
LABEL_SUBTYPES: 100 // Smaller records
} as const;
export interface SelectionBatchRecord {
id: string;
fileId: string;
datasetId: string;
startTime: string;
endTime: string;
createdBy: string;
createdAt: Date;
lastModified: Date;
modifiedBy: string;
}
export interface LabelBatchRecord {
id: string;
selectionId: string;
speciesId: string;
filterId: string;
createdBy: string;
createdAt: Date;
lastModified: Date;
modifiedBy: string;
}
export interface LabelSubtypeBatchRecord {
id: string;
labelId: string;
calltypeId: string;
filterId: string;
createdBy: string;
createdAt: Date;
lastModified: Date;
modifiedBy: string;
}
export interface BatchInsertResult {
insertedCount: number;
failedCount: number;
errors: Error[];
partialSuccess?: boolean;
failedRecords?: unknown[];
}
export interface SelectionImportBatchData {
selections: SelectionBatchRecord[];
labels: LabelBatchRecord[];
labelSubtypes: LabelSubtypeBatchRecord[];
}
export interface SelectionImportProgress {
phase: 'pre-generation' | 'selections' | 'labels' | 'labelSubtypes' | 'complete';
totalOperations: number;
completedOperations: number;
currentTable: string;
chunkProgress?: ChunkProgress;
}
/**
* Pre-generate all IDs and build batch data structures
*/
export function prepareBatchData(
selectionsData: Array<{
fileId: string;
startTime: number;
endTime: number;
species: Array<{
speciesId: string;
callTypes?: string[];
}>;
}>,
datasetId: string,
filterId: string,
userId: string
): SelectionImportBatchData {
const timestamp = new Date();
const selections: SelectionBatchRecord[] = [];
const labels: LabelBatchRecord[] = [];
const labelSubtypes: LabelSubtypeBatchRecord[] = [];
// Pre-generate all IDs to avoid conflicts
const selectionIds = new Map<number, string>();
const labelIds = new Map<string, string>(); // Key: selectionId-speciesId
// First pass: generate selection IDs
for (let i = 0; i < selectionsData.length; i++) {
selectionIds.set(i, nanoid());
}
// Second pass: generate label IDs and build records
for (let i = 0; i < selectionsData.length; i++) {
const selectionData = selectionsData[i];
const selectionId = selectionIds.get(i)!;
// Create selection record
selections.push({
id: selectionId,
fileId: selectionData.fileId,
datasetId: datasetId,
startTime: selectionData.startTime.toString(),
endTime: selectionData.endTime.toString(),
createdBy: userId,
createdAt: timestamp,
lastModified: timestamp,
modifiedBy: userId
});
// Process species for this selection
for (const speciesData of selectionData.species) {
const labelKey = `${selectionId}-${speciesData.speciesId}`;
const labelId = nanoid();
labelIds.set(labelKey, labelId);
// Create label record
labels.push({
id: labelId,
selectionId: selectionId,
speciesId: speciesData.speciesId,
filterId: filterId,
createdBy: userId,
createdAt: timestamp,
lastModified: timestamp,
modifiedBy: userId
});
// Process call types for this label
if (speciesData.callTypes && speciesData.callTypes.length > 0) {
for (const callTypeId of speciesData.callTypes) {
labelSubtypes.push({
id: nanoid(),
labelId: labelId,
calltypeId: callTypeId,
filterId: filterId,
createdBy: userId,
createdAt: timestamp,
lastModified: timestamp,
modifiedBy: userId
});
}
}
}
}
return {
selections,
labels,
labelSubtypes
};
}
/**
* Batch insert selections with retry logic
*/
export async function batchInsertSelections(
db: Database,
selections: SelectionBatchRecord[],
onProgress?: (progress: ChunkProgress) => void
): Promise<ChunkProcessResult<BatchInsertResult>> {
const options: Partial<ChunkProcessorOptions> = {
chunkSize: BATCH_SIZES.SELECTIONS,
maxRetries: 3,
baseDelayMs: 1000,
exponentialBackoff: true
};
return processInChunks(
selections,
async (chunk: SelectionBatchRecord[]) => {
try {
// Import the selection table from schema
const { selection } = await import("../../../db/schema");
await db.insert(selection).values(
chunk.map(record => ({
id: record.id,
fileId: record.fileId,
datasetId: record.datasetId,
startTime: record.startTime,
endTime: record.endTime,
freqLow: null,
freqHigh: null,
description: null,
upload: false,
approved: false,
createdBy: record.createdBy,
createdAt: record.createdAt,
lastModified: record.lastModified,
modifiedBy: record.modifiedBy,
active: true
}))
);
return [{
insertedCount: chunk.length,
failedCount: 0,
errors: []
}];
} catch (error) {
console.error('Batch insert selections error:', error);
throw error;
}
},
options,
onProgress
);
}
/**
* Batch insert labels with retry logic
*/
export async function batchInsertLabels(
db: Database,
labels: LabelBatchRecord[],
onProgress?: (progress: ChunkProgress) => void
): Promise<ChunkProcessResult<BatchInsertResult>> {
const options: Partial<ChunkProcessorOptions> = {
chunkSize: BATCH_SIZES.LABELS,
maxRetries: 3,
baseDelayMs: 1000,
exponentialBackoff: true
};
return processInChunks(
labels,
async (chunk: LabelBatchRecord[]) => {
try {
// Import the label table from schema
const { label } = await import("../../../db/schema");
await db.insert(label).values(
chunk.map(record => ({
id: record.id,
selectionId: record.selectionId,
speciesId: record.speciesId,
filterId: record.filterId,
certainty: null,
createdBy: record.createdBy,
createdAt: record.createdAt,
lastModified: record.lastModified,
modifiedBy: record.modifiedBy,
active: true
}))
);
return [{
insertedCount: chunk.length,
failedCount: 0,
errors: []
}];
} catch (error) {
console.error('Batch insert labels error:', error);
throw error;
}
},
options,
onProgress
);
}
/**
* Batch insert label subtypes with retry logic
*/
export async function batchInsertLabelSubtypes(
db: Database,
labelSubtypes: LabelSubtypeBatchRecord[],
onProgress?: (progress: ChunkProgress) => void
): Promise<ChunkProcessResult<BatchInsertResult>> {
const options: Partial<ChunkProcessorOptions> = {
chunkSize: BATCH_SIZES.LABEL_SUBTYPES,
maxRetries: 3,
baseDelayMs: 1000,
exponentialBackoff: true
};
return processInChunks(
labelSubtypes,
async (chunk: LabelSubtypeBatchRecord[]) => {
try {
// Import the labelSubtype table from schema
const { labelSubtype } = await import("../../../db/schema");
await db.insert(labelSubtype).values(
chunk.map(record => ({
id: record.id,
labelId: record.labelId,
calltypeId: record.calltypeId,
filterId: record.filterId,
certainty: null,
createdBy: record.createdBy,
createdAt: record.createdAt,
lastModified: record.lastModified,
modifiedBy: record.modifiedBy,
active: true
}))
);
return [{
insertedCount: chunk.length,
failedCount: 0,
errors: []
}];
} catch (error) {
console.error('Batch insert label subtypes error:', error);
throw error;
}
},
options,
onProgress
);
}
/**
* Process complete selection import with batch operations and comprehensive error handling
*/
export async function processSelectionImport(
db: Database,
selectionsData: Array<{
fileId: string;
startTime: number;
endTime: number;
species: Array<{
speciesId: string;
callTypes?: string[];
}>;
}>,
datasetId: string,
filterId: string,
userId: string,
onProgress?: (progress: SelectionImportProgress) => void
): Promise<{
success: boolean;
selectionsCreated: number;
labelsCreated: number;
labelSubtypesCreated: number;
errors: Error[];
partialSuccess?: boolean;
failurePhase?: string;
recoveryInstructions?: string;
}> {
const errors: Error[] = [];
let selectionsCreated = 0;
let labelsCreated = 0;
let labelSubtypesCreated = 0;
let failurePhase: string | undefined;
let partialSuccess = false;
try {
// Phase 1: Pre-generate all data
console.log(`Starting selection import for ${selectionsData.length} selections`);
if (onProgress) {
onProgress({
phase: 'pre-generation',
totalOperations: 3,
completedOperations: 0,
currentTable: 'preparation'
});
}
const batchData = prepareBatchData(selectionsData, datasetId, filterId, userId);
const totalOperations = batchData.selections.length + batchData.labels.length + batchData.labelSubtypes.length;
console.log(`Batch data prepared: ${batchData.selections.length} selections, ${batchData.labels.length} labels, ${batchData.labelSubtypes.length} label subtypes`);
// Phase 2: Insert selections
failurePhase = 'selections';
if (onProgress) {
onProgress({
phase: 'selections',
totalOperations,
completedOperations: 0,
currentTable: 'selections'
});
}
console.log(`Starting batch insert of ${batchData.selections.length} selections`);
const selectionsResult = await batchInsertSelections(
db,
batchData.selections,
(chunkProgress) => {
console.log(`Selections progress: ${chunkProgress.processedItems}/${chunkProgress.totalItems} (${chunkProgress.completedChunks}/${chunkProgress.totalChunks} chunks)`);
if (onProgress) {
onProgress({
phase: 'selections',
totalOperations,
completedOperations: chunkProgress.processedItems,
currentTable: 'selections',
chunkProgress
});
}
}
);
if (!selectionsResult.success) {
console.error('Selections batch insert failed:', selectionsResult.errors.length, 'errors');
errors.push(...selectionsResult.errors.map(e => e.error));
return {
success: false,
selectionsCreated: selectionsResult.processedCount,
labelsCreated: 0,
labelSubtypesCreated: 0,
errors,
partialSuccess: selectionsResult.processedCount > 0,
failurePhase,
recoveryInstructions: 'Some selections were created but labels and label subtypes were not. You may need to clean up partial data.'
};
}
selectionsCreated = selectionsResult.processedCount;
console.log(`Successfully created ${selectionsCreated} selections`);
// Phase 3: Insert labels
failurePhase = 'labels';
if (onProgress) {
onProgress({
phase: 'labels',
totalOperations,
completedOperations: selectionsCreated,
currentTable: 'labels'
});
}
console.log(`Starting batch insert of ${batchData.labels.length} labels`);
const labelsResult = await batchInsertLabels(
db,
batchData.labels,
(chunkProgress) => {
console.log(`Labels progress: ${chunkProgress.processedItems}/${chunkProgress.totalItems} (${chunkProgress.completedChunks}/${chunkProgress.totalChunks} chunks)`);
if (onProgress) {
onProgress({
phase: 'labels',
totalOperations,
completedOperations: selectionsCreated + chunkProgress.processedItems,
currentTable: 'labels',
chunkProgress
});
}
}
);
if (!labelsResult.success) {
console.error('Labels batch insert failed:', labelsResult.errors.length, 'errors');
errors.push(...labelsResult.errors.map(e => e.error));
partialSuccess = true;
return {
success: false,
selectionsCreated,
labelsCreated: labelsResult.processedCount,
labelSubtypesCreated: 0,
errors,
partialSuccess,
failurePhase,
recoveryInstructions: 'Selections were created but some labels failed. You may need to clean up partial data or retry the label creation.'
};
}
labelsCreated = labelsResult.processedCount;
console.log(`Successfully created ${labelsCreated} labels`);
// Phase 4: Insert label subtypes (if any)
if (batchData.labelSubtypes.length > 0) {
failurePhase = 'labelSubtypes';
if (onProgress) {
onProgress({
phase: 'labelSubtypes',
totalOperations,
completedOperations: selectionsCreated + labelsCreated,
currentTable: 'labelSubtypes'
});
}
console.log(`Starting batch insert of ${batchData.labelSubtypes.length} label subtypes`);
const labelSubtypesResult = await batchInsertLabelSubtypes(
db,
batchData.labelSubtypes,
(chunkProgress) => {
console.log(`Label subtypes progress: ${chunkProgress.processedItems}/${chunkProgress.totalItems} (${chunkProgress.completedChunks}/${chunkProgress.totalChunks} chunks)`);
if (onProgress) {
onProgress({
phase: 'labelSubtypes',
totalOperations,
completedOperations: selectionsCreated + labelsCreated + chunkProgress.processedItems,
currentTable: 'labelSubtypes',
chunkProgress
});
}
}
);
if (!labelSubtypesResult.success) {
console.error('Label subtypes batch insert failed:', labelSubtypesResult.errors.length, 'errors');
errors.push(...labelSubtypesResult.errors.map(e => e.error));
partialSuccess = true;
return {
success: false,
selectionsCreated,
labelsCreated,
labelSubtypesCreated: labelSubtypesResult.processedCount,
errors,
partialSuccess,
failurePhase,
recoveryInstructions: 'Selections and labels were created but some label subtypes failed. The import is mostly successful but call type associations may be incomplete.'
};
}
labelSubtypesCreated = labelSubtypesResult.processedCount;
console.log(`Successfully created ${labelSubtypesCreated} label subtypes`);
}
// Phase 5: Complete
failurePhase = undefined;
if (onProgress) {
onProgress({
phase: 'complete',
totalOperations,
completedOperations: totalOperations,
currentTable: 'complete'
});
}
console.log(`Selection import completed successfully: ${selectionsCreated} selections, ${labelsCreated} labels, ${labelSubtypesCreated} label subtypes`);
return {
success: true,
selectionsCreated,
labelsCreated,
labelSubtypesCreated,
errors
};
} catch (error) {
console.error('Selection import batch processing error:', error);
const errorInstance = error instanceof Error ? error : new Error(String(error));
errors.push(errorInstance);
partialSuccess = selectionsCreated > 0 || labelsCreated > 0 || labelSubtypesCreated > 0;
const result: {
success: boolean;
selectionsCreated: number;
labelsCreated: number;
labelSubtypesCreated: number;
errors: Error[];
partialSuccess?: boolean;
failurePhase?: string;
recoveryInstructions?: string;
} = {
success: false,
selectionsCreated,
labelsCreated,
labelSubtypesCreated,
errors,
partialSuccess,
recoveryInstructions: partialSuccess
? 'Partial data was created before failure. Consider cleaning up or retrying from the failed phase.'
: 'No data was created. Safe to retry the entire import.'
};
if (failurePhase) {
result.failurePhase = failurePhase;
}
return result;
}
}