validation.go
package db
import (
"context"
"database/sql"
"fmt"
)
// Querier is the common interface for *sql.DB and *LoggedTx query operations.
// Uses Context variants exclusively so all DB-facing interfaces compose as
// compatible subsets of *sql.DB / *sql.Tx.
type Querier interface {
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}
// GetDatasetType returns the type of a dataset
// Returns: (type, exists, error)
func GetDatasetType(q Querier, datasetID string) (string, bool, error) {
var datasetType string
err := q.QueryRowContext(context.Background(), "SELECT type FROM dataset WHERE id = ?", datasetID).Scan(&datasetType)
if err == sql.ErrNoRows {
return "", false, nil
}
if err != nil {
return "", false, err
}
return datasetType, true, nil
}
// ValidateDatasetTypeForImport checks that a dataset is 'structured' type for file imports
// Returns error if dataset doesn't exist or is not 'structured'
func ValidateDatasetTypeForImport(q Querier, datasetID string) error {
datasetType, exists, err := GetDatasetType(q, datasetID)
if err != nil {
return fmt.Errorf("failed to query dataset type: %w", err)
}
if !exists {
return fmt.Errorf("dataset not found: %s", datasetID)
}
if datasetType != "structured" {
return fmt.Errorf("dataset '%s' is type '%s' - file imports only support 'structured' datasets", datasetID, datasetType)
}
return nil
}
// ValidateDatasetTypeUnstructured checks that a dataset is 'unstructured' type
// Returns error if dataset doesn't exist or is not 'unstructured'
func ValidateDatasetTypeUnstructured(q Querier, datasetID string) error {
datasetType, exists, err := GetDatasetType(q, datasetID)
if err != nil {
return fmt.Errorf("failed to query dataset type: %w", err)
}
if !exists {
return fmt.Errorf("dataset not found: %s", datasetID)
}
if datasetType != "unstructured" {
return fmt.Errorf("dataset '%s' is type '%s' - this command only supports 'unstructured' datasets", datasetID, datasetType)
}
return nil
}
// ValidateLocationBelongsToDataset checks that a location belongs to a specific dataset
// Returns error if location doesn't exist or belongs to a different dataset
func ValidateLocationBelongsToDataset(q Querier, locationID, datasetID string) error {
var locationDatasetID string
err := q.QueryRowContext(context.Background(), "SELECT dataset_id FROM location WHERE id = ? AND active = true", locationID).Scan(&locationDatasetID)
if err == sql.ErrNoRows {
return fmt.Errorf("location not found or inactive: %s", locationID)
}
if err != nil {
return fmt.Errorf("failed to query location: %w", err)
}
if locationDatasetID != datasetID {
return fmt.Errorf("location %s does not belong to dataset %s", locationID, datasetID)
}
return nil
}
// DatasetExistsAndActive checks that a dataset exists and is active.
// Returns the dataset name if found.
func DatasetExistsAndActive(q Querier, datasetID string) (name string, err error) {
var exists, active bool
err = q.QueryRowContext(context.Background(),
"SELECT EXISTS(SELECT 1 FROM dataset WHERE id = ?), COALESCE((SELECT active FROM dataset WHERE id = ?), false), COALESCE((SELECT name FROM dataset WHERE id = ?), '')",
datasetID, datasetID, datasetID,
).Scan(&exists, &active, &name)
if err != nil {
return "", fmt.Errorf("failed to verify dataset: %w", err)
}
if !exists {
return "", fmt.Errorf("dataset with ID '%s' does not exist", datasetID)
}
if !active {
return "", fmt.Errorf("dataset '%s' (ID: %s) is not active", name, datasetID)
}
return name, nil
}
// LocationBelongsToDataset checks that a location exists, is active, and belongs to the dataset.
// Returns the location name if found.
func LocationBelongsToDataset(q Querier, locationID, datasetID string) (name string, err error) {
var exists, active bool
var locDatasetID string
err = q.QueryRowContext(context.Background(),
"SELECT EXISTS(SELECT 1 FROM location WHERE id = ?), COALESCE((SELECT active FROM location WHERE id = ?), false), COALESCE((SELECT name FROM location WHERE id = ?), ''), COALESCE((SELECT dataset_id FROM location WHERE id = ?), '')",
locationID, locationID, locationID, locationID,
).Scan(&exists, &active, &name, &locDatasetID)
if err != nil {
return "", fmt.Errorf("failed to verify location: %w", err)
}
if !exists {
return "", fmt.Errorf("location with ID '%s' does not exist", locationID)
}
if !active {
return "", fmt.Errorf("location '%s' (ID: %s) is not active", name, locationID)
}
if locDatasetID != datasetID {
return "", fmt.Errorf("location '%s' (ID: %s) does not belong to dataset ID '%s'",
name, locationID, locDatasetID)
}
return name, nil
}
// ClusterExistsAndActive checks that a cluster exists and is active.
func ClusterExistsAndActive(q Querier, clusterID string) error {
var exists, active bool
err := q.QueryRowContext(context.Background(),
"SELECT EXISTS(SELECT 1 FROM cluster WHERE id = ?), COALESCE((SELECT active FROM cluster WHERE id = ?), false)",
clusterID, clusterID,
).Scan(&exists, &active)
if err != nil {
return fmt.Errorf("failed to verify cluster: %w", err)
}
if !exists {
return fmt.Errorf("cluster not found: %s", clusterID)
}
if !active {
return fmt.Errorf("cluster '%s' is not active (cannot update inactive clusters)", clusterID)
}
return nil
}
// PatternExistsAndActive checks that a cyclic recording pattern exists and is active.
func PatternExistsAndActive(q Querier, patternID string) error {
var exists, active bool
err := q.QueryRowContext(context.Background(),
"SELECT EXISTS(SELECT 1 FROM cyclic_recording_pattern WHERE id = ?), COALESCE((SELECT active FROM cyclic_recording_pattern WHERE id = ?), false)",
patternID, patternID,
).Scan(&exists, &active)
if err != nil {
return fmt.Errorf("failed to verify cyclic recording pattern: %w", err)
}
if !exists {
return fmt.Errorf("cyclic recording pattern with ID '%s' does not exist", patternID)
}
if !active {
return fmt.Errorf("cyclic recording pattern with ID '%s' is not active", patternID)
}
return nil
}
// LocationExistsAndActive checks that a location exists and is active.
func LocationExistsAndActive(q Querier, locationID string) error {
var exists, active bool
err := q.QueryRowContext(context.Background(),
"SELECT EXISTS(SELECT 1 FROM location WHERE id = ?), COALESCE((SELECT active FROM location WHERE id = ?), false)",
locationID, locationID,
).Scan(&exists, &active)
if err != nil {
return fmt.Errorf("failed to verify location: %w", err)
}
if !exists {
return fmt.Errorf("location not found: %s", locationID)
}
if !active {
return fmt.Errorf("location '%s' is not active (cannot update inactive locations)", locationID)
}
return nil
}
// ValidateDatasetTypeForExport checks that a dataset exists, is active, and is 'structured'.
// Returns the dataset name if valid.
func ValidateDatasetTypeForExport(q Querier, datasetID string) (string, error) {
name, err := DatasetExistsAndActive(q, datasetID)
if err != nil {
return "", err
}
datasetType, exists, err := GetDatasetType(q, datasetID)
if err != nil {
return "", fmt.Errorf("failed to query dataset type: %w", err)
}
if !exists {
return "", fmt.Errorf("dataset not found: %s", datasetID)
}
if datasetType != "structured" {
return "", fmt.Errorf("cannot export dataset of type '%s': only structured datasets are supported", datasetType)
}
return name, nil
}
// ClusterBelongsToLocation checks that a cluster exists, is active, and belongs to the location.
func ClusterBelongsToLocation(q Querier, clusterID, locationID string) error {
var exists, active bool
var clusterLocationID string
err := q.QueryRowContext(context.Background(),
"SELECT EXISTS(SELECT 1 FROM cluster WHERE id = ?), COALESCE((SELECT active FROM cluster WHERE id = ?), false), COALESCE((SELECT location_id FROM cluster WHERE id = ?), '')",
clusterID, clusterID, clusterID,
).Scan(&exists, &active, &clusterLocationID)
if err != nil {
return fmt.Errorf("failed to verify cluster: %w", err)
}
if !exists {
return fmt.Errorf("cluster with ID '%s' does not exist", clusterID)
}
if !active {
return fmt.Errorf("cluster '%s' is not active", clusterID)
}
if clusterLocationID != locationID {
return fmt.Errorf("cluster '%s' does not belong to location '%s'", clusterID, locationID)
}
return nil
}