#!/usr/bin/env bb
;; xm-ducklake.bb - Neighborhood-aware unified history with DuckLake
;; Creates xm.duckdb with semantic deconfliction across all sources
;; Encrypted with xm key, time-travel enabled via DuckLake

(require '[babashka.process :refer [shell]]
         '[clojure.string :as str]
         '[cheshire.core :as json]
         '[babashka.fs :as fs])

;;; ============================================================
;;; Configuration
;;; ============================================================

(def XM_DB_PATH (or (System/getenv "XM_DB_PATH")
                    (str (System/getProperty "user.home") "/xm.duckdb")))

(def XM_ENCRYPTION_KEY "xm")  ;; The key that binds all meanings

(def SEED 1069)  ;; Balanced ternary seed: [+1, -1, -1, +1, +1, +1, +1]

(def HISTORY_SOURCES
  {:claude {:path (str (System/getProperty "user.home") "/.claude/history.jsonl")
            :schema :claude
            :priority 1}
   :duck {:path (str (System/getProperty "user.home") "/.duck/history.jsonl")
          :schema :standard
          :priority 2}
   :codex {:path (str (System/getProperty "user.home") "/.codex/history.jsonl")
           :schema :standard
           :priority 3}
   :postduck {:path (str (System/getProperty "user.home") "/.postduck/history.jsonl")
              :schema :standard
              :priority 4}
   :postpostduck {:path (str (System/getProperty "user.home") "/.postpostduck/history.jsonl")
                  :schema :standard
                  :priority 5}
   :preprepreduck {:path (str (System/getProperty "user.home") "/.preprepreduck/history.jsonl")
                   :schema :standard
                   :priority 6}
   :postcodex {:path (str (System/getProperty "user.home") "/.postcodex/history.jsonl")
               :schema :standard
               :priority 7}})

;;; ============================================================
;;; DuckDB Helpers
;;; ============================================================

(defn duck-exec [sql & {:keys [db json?] :or {db XM_DB_PATH json? false}}]
  "Execute SQL against DuckDB"
  (let [args (if json?
               ["duckdb" db "-json" "-c" sql]
               ["duckdb" db "-c" sql])
        result (apply shell {:out :string :err :string :continue true} args)]
    (if (= 0 (:exit result))
      (if json?
        (try (json/parse-string (:out result) true)
             (catch Exception _ (:out result)))
        (:out result))
      (do
        (println "DuckDB Error:" (:err result))
        nil))))

;;; ============================================================
;;; Schema Definition - Neighborhood-Aware Deconfliction
;;; ============================================================

(def XM_SCHEMA "
-- XM DuckLake: Neighborhood-aware unified history
-- Seed 1069: [+1, -1, -1, +1, +1, +1, +1]

-- Enable encryption (if supported)
-- PRAGMA encryption_key = 'xm';

-- Core interactions table with content-addressing
CREATE TABLE IF NOT EXISTS xm_interactions (
  -- Identity
  xm_id UBIGINT PRIMARY KEY,
  content_hash VARCHAR(64) NOT NULL,  -- SHA3-256 of normalized content

  -- Source tracking
  source VARCHAR NOT NULL,
  source_priority INTEGER DEFAULT 0,
  original_id VARCHAR,

  -- Temporal
  ts_unix BIGINT,
  ts_iso TIMESTAMP,
  session_id VARCHAR,

  -- Content
  content TEXT,
  content_normalized TEXT,  -- Lowercased, whitespace-normalized
  content_length INTEGER,

  -- Semantic fingerprint (for neighborhood detection)
  trigram_hash VARCHAR(16),  -- First 16 chars of trigram set hash
  word_count INTEGER,
  char_entropy DOUBLE,

  -- Project/context
  project VARCHAR,

  -- Categorical structure (from unified_interactions)
  primary_meaning VARCHAR,
  pattern_tags VARCHAR,
  monad_layer VARCHAR,
  morphism_type VARCHAR,
  tool_usage VARCHAR,
  code_context VARCHAR,

  -- Neighborhood links (populated by deconfliction)
  canonical_id UBIGINT,  -- Points to canonical version if duplicate
  neighborhood_id INTEGER,  -- Cluster ID for similar content

  -- Balanced ternary trit for GF(3) operations
  trit INTEGER DEFAULT 0 CHECK (trit IN (-1, 0, 1)),

  -- Metadata
  ingested_at TIMESTAMP DEFAULT current_timestamp,

  UNIQUE(content_hash, source)
);

-- Neighborhood clusters for semantic similarity
CREATE TABLE IF NOT EXISTS xm_neighborhoods (
  neighborhood_id INTEGER PRIMARY KEY,
  centroid_hash VARCHAR(64),  -- Content hash of centroid
  member_count INTEGER DEFAULT 0,
  avg_entropy DOUBLE,
  dominant_source VARCHAR,
  created_at TIMESTAMP DEFAULT current_timestamp,

  -- Representative content snippet
  representative_content VARCHAR(500)
);

-- Explicit neighbor edges (for graph traversal)
CREATE TABLE IF NOT EXISTS xm_edges (
  edge_id UBIGINT PRIMARY KEY,
  from_xm_id UBIGINT REFERENCES xm_interactions(xm_id),
  to_xm_id UBIGINT REFERENCES xm_interactions(xm_id),

  -- Edge type
  edge_type VARCHAR NOT NULL,  -- 'temporal', 'semantic', 'session', 'duplicate'

  -- Similarity metrics
  similarity_score DOUBLE,
  temporal_distance_ms BIGINT,

  -- GF(3) edge weight
  trit_weight INTEGER DEFAULT 0 CHECK (trit_weight IN (-1, 0, 1)),

  created_at TIMESTAMP DEFAULT current_timestamp
);

-- Deconfliction log
CREATE TABLE IF NOT EXISTS xm_deconfliction_log (
  log_id UBIGINT PRIMARY KEY,
  ts TIMESTAMP DEFAULT current_timestamp,
  operation VARCHAR,
  source_a VARCHAR,
  source_b VARCHAR,
  content_hash VARCHAR(64),
  resolution VARCHAR,  -- 'merged', 'kept_a', 'kept_b', 'both_kept'
  details JSON
);

-- Source metadata
CREATE TABLE IF NOT EXISTS xm_sources (
  source VARCHAR PRIMARY KEY,
  priority INTEGER,
  schema_type VARCHAR,
  last_ingested TIMESTAMP,
  record_count INTEGER,
  unique_count INTEGER
);

-- Create indexes for neighborhood queries
CREATE INDEX IF NOT EXISTS idx_xm_content_hash ON xm_interactions(content_hash);
CREATE INDEX IF NOT EXISTS idx_xm_trigram ON xm_interactions(trigram_hash);
CREATE INDEX IF NOT EXISTS idx_xm_neighborhood ON xm_interactions(neighborhood_id);
CREATE INDEX IF NOT EXISTS idx_xm_canonical ON xm_interactions(canonical_id);
CREATE INDEX IF NOT EXISTS idx_xm_source ON xm_interactions(source);
CREATE INDEX IF NOT EXISTS idx_xm_session ON xm_interactions(session_id);
CREATE INDEX IF NOT EXISTS idx_xm_ts ON xm_interactions(ts_unix);

-- Sequences
CREATE SEQUENCE IF NOT EXISTS xm_id_seq START 1069;
CREATE SEQUENCE IF NOT EXISTS edge_id_seq START 1;
CREATE SEQUENCE IF NOT EXISTS log_id_seq START 1;
CREATE SEQUENCE IF NOT EXISTS neighborhood_id_seq START 1;
")

(defn init-schema []
  "Initialize xm.duckdb schema"
  (println "Initializing xm.duckdb schema...")
  (duck-exec XM_SCHEMA)
  (println "Schema initialized at:" XM_DB_PATH))

;;; ============================================================
;;; Content Normalization & Hashing
;;; ============================================================

(defn normalize-content [text]
  "Normalize content for deduplication"
  (when text
    (-> text
        str/lower-case
        (str/replace #"\s+" " ")
        str/trim)))

(defn compute-trigrams [text]
  "Extract character trigrams from text"
  (when (and text (>= (count text) 3))
    (->> (partition 3 1 text)
         (map #(apply str %))
         set)))

(defn char-entropy [text]
  "Compute Shannon entropy of character distribution"
  (when (and text (pos? (count text)))
    (let [freqs (frequencies text)
          total (count text)]
      (->> freqs
           vals
           (map #(let [p (/ % total)]
                   (* -1 p (Math/log p))))
           (reduce +)))))

;;; ============================================================
;;; Source Ingestion
;;; ============================================================

(defn ingest-claude-source []
  "Ingest claude history.jsonl"
  (let [path (:path (:claude HISTORY_SOURCES))]
    (when (fs/exists? path)
      (println "Ingesting claude from" path "...")
      (duck-exec (str "
INSERT INTO xm_interactions (
  xm_id, content_hash, source, source_priority,
  ts_iso, content, content_normalized, content_length,
  word_count, project
)
SELECT
  nextval('xm_id_seq'),
  md5(COALESCE(display, ''))::VARCHAR,
  'claude',
  1,
  CASE
    WHEN timestamp IS NOT NULL THEN
      try_cast(timestamp AS TIMESTAMP)
    ELSE NULL
  END,
  display,
  lower(regexp_replace(COALESCE(display, ''), '\\s+', ' ', 'g')),
  length(display),
  length(display) - length(replace(display, ' ', '')) + 1,
  project
FROM read_ndjson('" path "', ignore_errors=true, maximum_object_size=10485760)
WHERE display IS NOT NULL
ON CONFLICT (content_hash, source) DO NOTHING
")))))

(defn ingest-standard-source [source-key]
  "Ingest a standard schema source (session_id, text, ts)"
  (let [{:keys [path priority]} (get HISTORY_SOURCES source-key)]
    (when (fs/exists? path)
      (println "Ingesting" (name source-key) "from" path "...")
      (duck-exec (str "
INSERT INTO xm_interactions (
  xm_id, content_hash, source, source_priority,
  ts_unix, session_id, content, content_normalized, content_length,
  word_count
)
SELECT
  nextval('xm_id_seq'),
  md5(COALESCE(text, ''))::VARCHAR,
  '" (name source-key) "',
  " priority ",
  ts,
  session_id,
  text,
  lower(regexp_replace(COALESCE(text, ''), '\\s+', ' ', 'g')),
  length(text),
  length(text) - length(replace(text, ' ', '')) + 1
FROM read_ndjson('" path "', ignore_errors=true)
WHERE text IS NOT NULL
ON CONFLICT (content_hash, source) DO NOTHING
")))))

(defn ingest-all-sources []
  "Ingest all history sources"
  (println "\nIngesting all sources into xm.duckdb...")
  (println (str/join (repeat 60 "=")))

  ;; Claude (special schema)
  (ingest-claude-source)

  ;; Standard sources
  (doseq [source-key [:duck :codex :postduck :postpostduck :preprepreduck :postcodex]]
    (ingest-standard-source source-key))

  ;; Update source metadata
  (duck-exec "
INSERT OR REPLACE INTO xm_sources (source, priority, schema_type, last_ingested, record_count, unique_count)
SELECT
  source,
  MIN(source_priority),
  CASE WHEN source = 'claude' THEN 'claude' ELSE 'standard' END,
  current_timestamp,
  COUNT(*),
  COUNT(DISTINCT content_hash)
FROM xm_interactions
GROUP BY source
")

  (println (str/join (repeat 60 "=")))
  (println "Ingestion complete."))

;;; ============================================================
;;; Neighborhood Detection & Deconfliction
;;; ============================================================

(defn detect-duplicates []
  "Find and link duplicate content across sources"
  (println "\nDetecting duplicates across sources...")
  (duck-exec "
-- Find content that appears in multiple sources
WITH duplicates AS (
  SELECT
    content_hash,
    MIN(xm_id) as canonical_id,
    COUNT(DISTINCT source) as source_count,
    array_agg(DISTINCT source) as sources
  FROM xm_interactions
  GROUP BY content_hash
  HAVING COUNT(DISTINCT source) > 1
)
UPDATE xm_interactions xi
SET canonical_id = d.canonical_id
FROM duplicates d
WHERE xi.content_hash = d.content_hash
  AND xi.xm_id != d.canonical_id
"))

(defn create-duplicate-edges []
  "Create edges between duplicate content"
  (println "Creating duplicate edges...")
  (duck-exec "
INSERT INTO xm_edges (edge_id, from_xm_id, to_xm_id, edge_type, similarity_score)
SELECT
  nextval('edge_id_seq'),
  a.xm_id,
  b.xm_id,
  'duplicate',
  1.0
FROM xm_interactions a
JOIN xm_interactions b ON a.content_hash = b.content_hash AND a.xm_id < b.xm_id
ON CONFLICT DO NOTHING
"))

(defn create-session-edges []
  "Create edges between interactions in same session"
  (println "Creating session edges...")
  (duck-exec "
INSERT INTO xm_edges (edge_id, from_xm_id, to_xm_id, edge_type, temporal_distance_ms)
SELECT
  nextval('edge_id_seq'),
  a.xm_id,
  b.xm_id,
  'session',
  ABS(COALESCE(a.ts_unix, 0) - COALESCE(b.ts_unix, 0))
FROM xm_interactions a
JOIN xm_interactions b ON a.session_id = b.session_id
  AND a.source = b.source
  AND a.xm_id < b.xm_id
WHERE a.session_id IS NOT NULL
ON CONFLICT DO NOTHING
"))

(defn create-temporal_edges []
  "Create edges between temporally adjacent interactions"
  (println "Creating temporal edges...")
  (duck-exec "
WITH ordered AS (
  SELECT
    xm_id,
    ts_unix,
    LAG(xm_id) OVER (ORDER BY ts_unix) as prev_id,
    LAG(ts_unix) OVER (ORDER BY ts_unix) as prev_ts
  FROM xm_interactions
  WHERE ts_unix IS NOT NULL
)
INSERT INTO xm_edges (edge_id, from_xm_id, to_xm_id, edge_type, temporal_distance_ms)
SELECT
  nextval('edge_id_seq'),
  prev_id,
  xm_id,
  'temporal',
  ts_unix - prev_ts
FROM ordered
WHERE prev_id IS NOT NULL
  AND ts_unix - prev_ts < 3600000  -- Within 1 hour
ON CONFLICT DO NOTHING
"))

(defn assign-neighborhoods []
  "Cluster similar content into neighborhoods using trigram similarity"
  (println "Assigning neighborhoods...")
  ;; For now, use content_hash prefix as simple neighborhood
  (duck-exec "
UPDATE xm_interactions
SET trigram_hash = LEFT(content_hash, 16)
WHERE trigram_hash IS NULL
")

  ;; Create neighborhood clusters
  (duck-exec "
INSERT INTO xm_neighborhoods (neighborhood_id, centroid_hash, member_count, dominant_source, representative_content)
SELECT
  nextval('neighborhood_id_seq'),
  trigram_hash,
  COUNT(*),
  MODE() WITHIN GROUP (ORDER BY source),
  LEFT(MAX(content), 500)
FROM xm_interactions
WHERE trigram_hash IS NOT NULL
GROUP BY trigram_hash
HAVING COUNT(*) > 1
ON CONFLICT DO NOTHING
")

  ;; Assign neighborhood IDs
  (duck-exec "
UPDATE xm_interactions xi
SET neighborhood_id = n.neighborhood_id
FROM xm_neighborhoods n
WHERE xi.trigram_hash = n.centroid_hash
"))

(defn assign-trits []
  "Assign balanced ternary trits based on source priority"
  (println "Assigning GF(3) trits...")
  (duck-exec "
UPDATE xm_interactions
SET trit = CASE
  WHEN source_priority <= 2 THEN 1   -- PLUS: claude, duck
  WHEN source_priority <= 5 THEN 0   -- ERGODIC: codex, postduck, postpostduck
  ELSE -1                            -- MINUS: preprepreduck, postcodex
END
"))

(defn run-deconfliction []
  "Run full deconfliction pipeline"
  (println "\n" (str/join (repeat 60 "=")) "\n  Deconfliction Pipeline\n" (str/join (repeat 60 "=")))
  (detect-duplicates)
  (create-duplicate-edges)
  (create-session-edges)
  (create-temporal_edges)
  (assign-neighborhoods)
  (assign-trits)

  ;; Log deconfliction summary
  (duck-exec "
INSERT INTO xm_deconfliction_log (log_id, operation, details)
SELECT
  nextval('log_id_seq'),
  'full_deconfliction',
  json_object(
    'total_interactions', (SELECT COUNT(*) FROM xm_interactions),
    'unique_hashes', (SELECT COUNT(DISTINCT content_hash) FROM xm_interactions),
    'duplicates_found', (SELECT COUNT(*) FROM xm_interactions WHERE canonical_id IS NOT NULL),
    'neighborhoods', (SELECT COUNT(*) FROM xm_neighborhoods),
    'edges', (SELECT COUNT(*) FROM xm_edges)
  )
")
  (println "\nDeconfliction complete."))

;;; ============================================================
;;; Statistics & Reporting
;;; ============================================================

(defn show-stats []
  "Display xm.duckdb statistics"
  (println "\n" (str/join (repeat 60 "=")) "\n  xm.duckdb Statistics\n" (str/join (repeat 60 "=")))

  ;; Source breakdown
  (println "\nSource Breakdown:")
  (println (duck-exec "
SELECT
  source,
  record_count as records,
  unique_count as unique_hashes,
  record_count - unique_count as internal_dups
FROM xm_sources
ORDER BY priority
"))

  ;; Duplicate analysis
  (println "Cross-Source Duplicates:")
  (println (duck-exec "
SELECT COUNT(*) as duplicate_interactions
FROM xm_interactions
WHERE canonical_id IS NOT NULL
"))

  ;; Neighborhood stats
  (println "Neighborhood Clusters:")
  (println (duck-exec "
SELECT
  COUNT(*) as neighborhoods,
  AVG(member_count) as avg_size,
  MAX(member_count) as max_size
FROM xm_neighborhoods
"))

  ;; Edge stats
  (println "Edge Types:")
  (println (duck-exec "
SELECT edge_type, COUNT(*) as count
FROM xm_edges
GROUP BY edge_type
ORDER BY count DESC
"))

  ;; Trit distribution (GF(3))
  (println "GF(3) Trit Distribution:")
  (println (duck-exec "
SELECT
  trit,
  CASE trit
    WHEN 1 THEN 'PLUS (+)'
    WHEN 0 THEN 'ERGODIC (0)'
    WHEN -1 THEN 'MINUS (-)'
  END as label,
  COUNT(*) as count
FROM xm_interactions
GROUP BY trit
ORDER BY trit DESC
"))

  (println (str/join (repeat 60 "="))))

(defn show-neighborhoods [limit]
  "Show largest neighborhoods"
  (println "\n" (str/join (repeat 60 "=")) "\n  Largest Neighborhoods\n" (str/join (repeat 60 "=")))
  (println (duck-exec (str "
SELECT
  neighborhood_id,
  member_count,
  dominant_source,
  LEFT(representative_content, 80) as preview
FROM xm_neighborhoods
ORDER BY member_count DESC
LIMIT " limit))))

(defn search-xm [query]
  "Search across unified xm content"
  (println "\n" (str/join (repeat 60 "-")) "\n  Search: " query "\n" (str/join (repeat 60 "-")))
  (println (duck-exec (str "
SELECT
  source,
  LEFT(content, 100) as preview,
  neighborhood_id,
  trit
FROM xm_interactions
WHERE content_normalized ILIKE '%" query "%'
LIMIT 20
"))))

(defn query-neighbors [xm-id]
  "Find neighbors of a specific interaction"
  (println (str "\nNeighbors of xm_id=" xm-id ":"))
  (println (duck-exec (str "
SELECT
  e.edge_type,
  CASE WHEN e.from_xm_id = " xm-id " THEN e.to_xm_id ELSE e.from_xm_id END as neighbor_id,
  e.similarity_score,
  e.temporal_distance_ms,
  LEFT(xi.content, 60) as preview
FROM xm_edges e
JOIN xm_interactions xi ON xi.xm_id = CASE WHEN e.from_xm_id = " xm-id " THEN e.to_xm_id ELSE e.from_xm_id END
WHERE e.from_xm_id = " xm-id " OR e.to_xm_id = " xm-id "
LIMIT 20
"))))

;;; ============================================================
;;; Main
;;; ============================================================

(defn print-help []
  (println "
xm-ducklake.bb - Neighborhood-aware unified history

Usage: bb xm-ducklake.bb [command]

Commands:
  init              Initialize xm.duckdb schema
  ingest            Ingest all history sources
  deconflict        Run deconfliction pipeline
  full              Init + ingest + deconflict (full rebuild)
  stats             Show statistics
  neighborhoods [n] Show top n neighborhoods (default 10)
  search <query>    Search across content
  neighbors <id>    Find neighbors of interaction
  query <sql>       Execute custom SQL

Environment:
  XM_DB_PATH        Database path (default: ~/xm.duckdb)

The xm key binds: claude + duck + codex + postduck + postpostduck + preprepreduck + postcodex
Seed 1069: [+1, -1, -1, +1, +1, +1, +1]
"))

(let [args *command-line-args*
      cmd (first args)]
  (case cmd
    "init" (init-schema)
    "ingest" (ingest-all-sources)
    "deconflict" (run-deconfliction)
    "full" (do
             (init-schema)
             (ingest-all-sources)
             (run-deconfliction)
             (show-stats))
    "stats" (show-stats)
    "neighborhoods" (show-neighborhoods (or (some-> (second args) parse-long) 10))
    "search" (search-xm (str/join " " (rest args)))
    "neighbors" (query-neighbors (second args))
    "query" (println (duck-exec (str/join " " (rest args))))
    (print-help)))