#!/usr/bin/env bb
;; xm-ducklake.bb - Neighborhood-aware unified history with DuckLake
;; Creates xm.duckdb with semantic deconfliction across all sources
;; Encrypted with xm key, time-travel enabled via DuckLake
(require '[babashka.process :refer [shell]]
'[clojure.string :as str]
'[cheshire.core :as json]
'[babashka.fs :as fs])
;;; ============================================================
;;; Configuration
;;; ============================================================
(def XM_DB_PATH (or (System/getenv "XM_DB_PATH")
(str (System/getProperty "user.home") "/xm.duckdb")))
(def XM_ENCRYPTION_KEY "xm") ;; The key that binds all meanings
(def SEED 1069) ;; Balanced ternary seed: [+1, -1, -1, +1, +1, +1, +1]
(def HISTORY_SOURCES
{:claude {:path (str (System/getProperty "user.home") "/.claude/history.jsonl")
:schema :claude
:priority 1}
:duck {:path (str (System/getProperty "user.home") "/.duck/history.jsonl")
:schema :standard
:priority 2}
:codex {:path (str (System/getProperty "user.home") "/.codex/history.jsonl")
:schema :standard
:priority 3}
:postduck {:path (str (System/getProperty "user.home") "/.postduck/history.jsonl")
:schema :standard
:priority 4}
:postpostduck {:path (str (System/getProperty "user.home") "/.postpostduck/history.jsonl")
:schema :standard
:priority 5}
:preprepreduck {:path (str (System/getProperty "user.home") "/.preprepreduck/history.jsonl")
:schema :standard
:priority 6}
:postcodex {:path (str (System/getProperty "user.home") "/.postcodex/history.jsonl")
:schema :standard
:priority 7}})
;;; ============================================================
;;; DuckDB Helpers
;;; ============================================================
(defn duck-exec [sql & {:keys [db json?] :or {db XM_DB_PATH json? false}}]
"Execute SQL against DuckDB"
(let [args (if json?
["duckdb" db "-json" "-c" sql]
["duckdb" db "-c" sql])
result (apply shell {:out :string :err :string :continue true} args)]
(if (= 0 (:exit result))
(if json?
(try (json/parse-string (:out result) true)
(catch Exception _ (:out result)))
(:out result))
(do
(println "DuckDB Error:" (:err result))
nil))))
;;; ============================================================
;;; Schema Definition - Neighborhood-Aware Deconfliction
;;; ============================================================
(def XM_SCHEMA "
-- XM DuckLake: Neighborhood-aware unified history
-- Seed 1069: [+1, -1, -1, +1, +1, +1, +1]
-- Enable encryption (if supported)
-- PRAGMA encryption_key = 'xm';
-- Core interactions table with content-addressing
CREATE TABLE IF NOT EXISTS xm_interactions (
-- Identity
xm_id UBIGINT PRIMARY KEY,
content_hash VARCHAR(64) NOT NULL, -- SHA3-256 of normalized content
-- Source tracking
source VARCHAR NOT NULL,
source_priority INTEGER DEFAULT 0,
original_id VARCHAR,
-- Temporal
ts_unix BIGINT,
ts_iso TIMESTAMP,
session_id VARCHAR,
-- Content
content TEXT,
content_normalized TEXT, -- Lowercased, whitespace-normalized
content_length INTEGER,
-- Semantic fingerprint (for neighborhood detection)
trigram_hash VARCHAR(16), -- First 16 chars of trigram set hash
word_count INTEGER,
char_entropy DOUBLE,
-- Project/context
project VARCHAR,
-- Categorical structure (from unified_interactions)
primary_meaning VARCHAR,
pattern_tags VARCHAR,
monad_layer VARCHAR,
morphism_type VARCHAR,
tool_usage VARCHAR,
code_context VARCHAR,
-- Neighborhood links (populated by deconfliction)
canonical_id UBIGINT, -- Points to canonical version if duplicate
neighborhood_id INTEGER, -- Cluster ID for similar content
-- Balanced ternary trit for GF(3) operations
trit INTEGER DEFAULT 0 CHECK (trit IN (-1, 0, 1)),
-- Metadata
ingested_at TIMESTAMP DEFAULT current_timestamp,
UNIQUE(content_hash, source)
);
-- Neighborhood clusters for semantic similarity
CREATE TABLE IF NOT EXISTS xm_neighborhoods (
neighborhood_id INTEGER PRIMARY KEY,
centroid_hash VARCHAR(64), -- Content hash of centroid
member_count INTEGER DEFAULT 0,
avg_entropy DOUBLE,
dominant_source VARCHAR,
created_at TIMESTAMP DEFAULT current_timestamp,
-- Representative content snippet
representative_content VARCHAR(500)
);
-- Explicit neighbor edges (for graph traversal)
CREATE TABLE IF NOT EXISTS xm_edges (
edge_id UBIGINT PRIMARY KEY,
from_xm_id UBIGINT REFERENCES xm_interactions(xm_id),
to_xm_id UBIGINT REFERENCES xm_interactions(xm_id),
-- Edge type
edge_type VARCHAR NOT NULL, -- 'temporal', 'semantic', 'session', 'duplicate'
-- Similarity metrics
similarity_score DOUBLE,
temporal_distance_ms BIGINT,
-- GF(3) edge weight
trit_weight INTEGER DEFAULT 0 CHECK (trit_weight IN (-1, 0, 1)),
created_at TIMESTAMP DEFAULT current_timestamp
);
-- Deconfliction log
CREATE TABLE IF NOT EXISTS xm_deconfliction_log (
log_id UBIGINT PRIMARY KEY,
ts TIMESTAMP DEFAULT current_timestamp,
operation VARCHAR,
source_a VARCHAR,
source_b VARCHAR,
content_hash VARCHAR(64),
resolution VARCHAR, -- 'merged', 'kept_a', 'kept_b', 'both_kept'
details JSON
);
-- Source metadata
CREATE TABLE IF NOT EXISTS xm_sources (
source VARCHAR PRIMARY KEY,
priority INTEGER,
schema_type VARCHAR,
last_ingested TIMESTAMP,
record_count INTEGER,
unique_count INTEGER
);
-- Create indexes for neighborhood queries
CREATE INDEX IF NOT EXISTS idx_xm_content_hash ON xm_interactions(content_hash);
CREATE INDEX IF NOT EXISTS idx_xm_trigram ON xm_interactions(trigram_hash);
CREATE INDEX IF NOT EXISTS idx_xm_neighborhood ON xm_interactions(neighborhood_id);
CREATE INDEX IF NOT EXISTS idx_xm_canonical ON xm_interactions(canonical_id);
CREATE INDEX IF NOT EXISTS idx_xm_source ON xm_interactions(source);
CREATE INDEX IF NOT EXISTS idx_xm_session ON xm_interactions(session_id);
CREATE INDEX IF NOT EXISTS idx_xm_ts ON xm_interactions(ts_unix);
-- Sequences
CREATE SEQUENCE IF NOT EXISTS xm_id_seq START 1069;
CREATE SEQUENCE IF NOT EXISTS edge_id_seq START 1;
CREATE SEQUENCE IF NOT EXISTS log_id_seq START 1;
CREATE SEQUENCE IF NOT EXISTS neighborhood_id_seq START 1;
")
(defn init-schema []
"Initialize xm.duckdb schema"
(println "Initializing xm.duckdb schema...")
(duck-exec XM_SCHEMA)
(println "Schema initialized at:" XM_DB_PATH))
;;; ============================================================
;;; Content Normalization & Hashing
;;; ============================================================
(defn normalize-content [text]
"Normalize content for deduplication"
(when text
(-> text
str/lower-case
(str/replace #"\s+" " ")
str/trim)))
(defn compute-trigrams [text]
"Extract character trigrams from text"
(when (and text (>= (count text) 3))
(->> (partition 3 1 text)
(map #(apply str %))
set)))
(defn char-entropy [text]
"Compute Shannon entropy of character distribution"
(when (and text (pos? (count text)))
(let [freqs (frequencies text)
total (count text)]
(->> freqs
vals
(map #(let [p (/ % total)]
(* -1 p (Math/log p))))
(reduce +)))))
;;; ============================================================
;;; Source Ingestion
;;; ============================================================
(defn ingest-claude-source []
"Ingest claude history.jsonl"
(let [path (:path (:claude HISTORY_SOURCES))]
(when (fs/exists? path)
(println "Ingesting claude from" path "...")
(duck-exec (str "
INSERT INTO xm_interactions (
xm_id, content_hash, source, source_priority,
ts_iso, content, content_normalized, content_length,
word_count, project
)
SELECT
nextval('xm_id_seq'),
md5(COALESCE(display, ''))::VARCHAR,
'claude',
1,
CASE
WHEN timestamp IS NOT NULL THEN
try_cast(timestamp AS TIMESTAMP)
ELSE NULL
END,
display,
lower(regexp_replace(COALESCE(display, ''), '\\s+', ' ', 'g')),
length(display),
length(display) - length(replace(display, ' ', '')) + 1,
project
FROM read_ndjson('" path "', ignore_errors=true, maximum_object_size=10485760)
WHERE display IS NOT NULL
ON CONFLICT (content_hash, source) DO NOTHING
")))))
(defn ingest-standard-source [source-key]
"Ingest a standard schema source (session_id, text, ts)"
(let [{:keys [path priority]} (get HISTORY_SOURCES source-key)]
(when (fs/exists? path)
(println "Ingesting" (name source-key) "from" path "...")
(duck-exec (str "
INSERT INTO xm_interactions (
xm_id, content_hash, source, source_priority,
ts_unix, session_id, content, content_normalized, content_length,
word_count
)
SELECT
nextval('xm_id_seq'),
md5(COALESCE(text, ''))::VARCHAR,
'" (name source-key) "',
" priority ",
ts,
session_id,
text,
lower(regexp_replace(COALESCE(text, ''), '\\s+', ' ', 'g')),
length(text),
length(text) - length(replace(text, ' ', '')) + 1
FROM read_ndjson('" path "', ignore_errors=true)
WHERE text IS NOT NULL
ON CONFLICT (content_hash, source) DO NOTHING
")))))
(defn ingest-all-sources []
"Ingest all history sources"
(println "\nIngesting all sources into xm.duckdb...")
(println (str/join (repeat 60 "=")))
;; Claude (special schema)
(ingest-claude-source)
;; Standard sources
(doseq [source-key [:duck :codex :postduck :postpostduck :preprepreduck :postcodex]]
(ingest-standard-source source-key))
;; Update source metadata
(duck-exec "
INSERT OR REPLACE INTO xm_sources (source, priority, schema_type, last_ingested, record_count, unique_count)
SELECT
source,
MIN(source_priority),
CASE WHEN source = 'claude' THEN 'claude' ELSE 'standard' END,
current_timestamp,
COUNT(*),
COUNT(DISTINCT content_hash)
FROM xm_interactions
GROUP BY source
")
(println (str/join (repeat 60 "=")))
(println "Ingestion complete."))
;;; ============================================================
;;; Neighborhood Detection & Deconfliction
;;; ============================================================
(defn detect-duplicates []
"Find and link duplicate content across sources"
(println "\nDetecting duplicates across sources...")
(duck-exec "
-- Find content that appears in multiple sources
WITH duplicates AS (
SELECT
content_hash,
MIN(xm_id) as canonical_id,
COUNT(DISTINCT source) as source_count,
array_agg(DISTINCT source) as sources
FROM xm_interactions
GROUP BY content_hash
HAVING COUNT(DISTINCT source) > 1
)
UPDATE xm_interactions xi
SET canonical_id = d.canonical_id
FROM duplicates d
WHERE xi.content_hash = d.content_hash
AND xi.xm_id != d.canonical_id
"))
(defn create-duplicate-edges []
"Create edges between duplicate content"
(println "Creating duplicate edges...")
(duck-exec "
INSERT INTO xm_edges (edge_id, from_xm_id, to_xm_id, edge_type, similarity_score)
SELECT
nextval('edge_id_seq'),
a.xm_id,
b.xm_id,
'duplicate',
1.0
FROM xm_interactions a
JOIN xm_interactions b ON a.content_hash = b.content_hash AND a.xm_id < b.xm_id
ON CONFLICT DO NOTHING
"))
(defn create-session-edges []
"Create edges between interactions in same session"
(println "Creating session edges...")
(duck-exec "
INSERT INTO xm_edges (edge_id, from_xm_id, to_xm_id, edge_type, temporal_distance_ms)
SELECT
nextval('edge_id_seq'),
a.xm_id,
b.xm_id,
'session',
ABS(COALESCE(a.ts_unix, 0) - COALESCE(b.ts_unix, 0))
FROM xm_interactions a
JOIN xm_interactions b ON a.session_id = b.session_id
AND a.source = b.source
AND a.xm_id < b.xm_id
WHERE a.session_id IS NOT NULL
ON CONFLICT DO NOTHING
"))
(defn create-temporal_edges []
"Create edges between temporally adjacent interactions"
(println "Creating temporal edges...")
(duck-exec "
WITH ordered AS (
SELECT
xm_id,
ts_unix,
LAG(xm_id) OVER (ORDER BY ts_unix) as prev_id,
LAG(ts_unix) OVER (ORDER BY ts_unix) as prev_ts
FROM xm_interactions
WHERE ts_unix IS NOT NULL
)
INSERT INTO xm_edges (edge_id, from_xm_id, to_xm_id, edge_type, temporal_distance_ms)
SELECT
nextval('edge_id_seq'),
prev_id,
xm_id,
'temporal',
ts_unix - prev_ts
FROM ordered
WHERE prev_id IS NOT NULL
AND ts_unix - prev_ts < 3600000 -- Within 1 hour
ON CONFLICT DO NOTHING
"))
(defn assign-neighborhoods []
"Cluster similar content into neighborhoods using trigram similarity"
(println "Assigning neighborhoods...")
;; For now, use content_hash prefix as simple neighborhood
(duck-exec "
UPDATE xm_interactions
SET trigram_hash = LEFT(content_hash, 16)
WHERE trigram_hash IS NULL
")
;; Create neighborhood clusters
(duck-exec "
INSERT INTO xm_neighborhoods (neighborhood_id, centroid_hash, member_count, dominant_source, representative_content)
SELECT
nextval('neighborhood_id_seq'),
trigram_hash,
COUNT(*),
MODE() WITHIN GROUP (ORDER BY source),
LEFT(MAX(content), 500)
FROM xm_interactions
WHERE trigram_hash IS NOT NULL
GROUP BY trigram_hash
HAVING COUNT(*) > 1
ON CONFLICT DO NOTHING
")
;; Assign neighborhood IDs
(duck-exec "
UPDATE xm_interactions xi
SET neighborhood_id = n.neighborhood_id
FROM xm_neighborhoods n
WHERE xi.trigram_hash = n.centroid_hash
"))
(defn assign-trits []
"Assign balanced ternary trits based on source priority"
(println "Assigning GF(3) trits...")
(duck-exec "
UPDATE xm_interactions
SET trit = CASE
WHEN source_priority <= 2 THEN 1 -- PLUS: claude, duck
WHEN source_priority <= 5 THEN 0 -- ERGODIC: codex, postduck, postpostduck
ELSE -1 -- MINUS: preprepreduck, postcodex
END
"))
(defn run-deconfliction []
"Run full deconfliction pipeline"
(println "\n" (str/join (repeat 60 "=")) "\n Deconfliction Pipeline\n" (str/join (repeat 60 "=")))
(detect-duplicates)
(create-duplicate-edges)
(create-session-edges)
(create-temporal_edges)
(assign-neighborhoods)
(assign-trits)
;; Log deconfliction summary
(duck-exec "
INSERT INTO xm_deconfliction_log (log_id, operation, details)
SELECT
nextval('log_id_seq'),
'full_deconfliction',
json_object(
'total_interactions', (SELECT COUNT(*) FROM xm_interactions),
'unique_hashes', (SELECT COUNT(DISTINCT content_hash) FROM xm_interactions),
'duplicates_found', (SELECT COUNT(*) FROM xm_interactions WHERE canonical_id IS NOT NULL),
'neighborhoods', (SELECT COUNT(*) FROM xm_neighborhoods),
'edges', (SELECT COUNT(*) FROM xm_edges)
)
")
(println "\nDeconfliction complete."))
;;; ============================================================
;;; Statistics & Reporting
;;; ============================================================
(defn show-stats []
"Display xm.duckdb statistics"
(println "\n" (str/join (repeat 60 "=")) "\n xm.duckdb Statistics\n" (str/join (repeat 60 "=")))
;; Source breakdown
(println "\nSource Breakdown:")
(println (duck-exec "
SELECT
source,
record_count as records,
unique_count as unique_hashes,
record_count - unique_count as internal_dups
FROM xm_sources
ORDER BY priority
"))
;; Duplicate analysis
(println "Cross-Source Duplicates:")
(println (duck-exec "
SELECT COUNT(*) as duplicate_interactions
FROM xm_interactions
WHERE canonical_id IS NOT NULL
"))
;; Neighborhood stats
(println "Neighborhood Clusters:")
(println (duck-exec "
SELECT
COUNT(*) as neighborhoods,
AVG(member_count) as avg_size,
MAX(member_count) as max_size
FROM xm_neighborhoods
"))
;; Edge stats
(println "Edge Types:")
(println (duck-exec "
SELECT edge_type, COUNT(*) as count
FROM xm_edges
GROUP BY edge_type
ORDER BY count DESC
"))
;; Trit distribution (GF(3))
(println "GF(3) Trit Distribution:")
(println (duck-exec "
SELECT
trit,
CASE trit
WHEN 1 THEN 'PLUS (+)'
WHEN 0 THEN 'ERGODIC (0)'
WHEN -1 THEN 'MINUS (-)'
END as label,
COUNT(*) as count
FROM xm_interactions
GROUP BY trit
ORDER BY trit DESC
"))
(println (str/join (repeat 60 "="))))
(defn show-neighborhoods [limit]
"Show largest neighborhoods"
(println "\n" (str/join (repeat 60 "=")) "\n Largest Neighborhoods\n" (str/join (repeat 60 "=")))
(println (duck-exec (str "
SELECT
neighborhood_id,
member_count,
dominant_source,
LEFT(representative_content, 80) as preview
FROM xm_neighborhoods
ORDER BY member_count DESC
LIMIT " limit))))
(defn search-xm [query]
"Search across unified xm content"
(println "\n" (str/join (repeat 60 "-")) "\n Search: " query "\n" (str/join (repeat 60 "-")))
(println (duck-exec (str "
SELECT
source,
LEFT(content, 100) as preview,
neighborhood_id,
trit
FROM xm_interactions
WHERE content_normalized ILIKE '%" query "%'
LIMIT 20
"))))
(defn query-neighbors [xm-id]
"Find neighbors of a specific interaction"
(println (str "\nNeighbors of xm_id=" xm-id ":"))
(println (duck-exec (str "
SELECT
e.edge_type,
CASE WHEN e.from_xm_id = " xm-id " THEN e.to_xm_id ELSE e.from_xm_id END as neighbor_id,
e.similarity_score,
e.temporal_distance_ms,
LEFT(xi.content, 60) as preview
FROM xm_edges e
JOIN xm_interactions xi ON xi.xm_id = CASE WHEN e.from_xm_id = " xm-id " THEN e.to_xm_id ELSE e.from_xm_id END
WHERE e.from_xm_id = " xm-id " OR e.to_xm_id = " xm-id "
LIMIT 20
"))))
;;; ============================================================
;;; Main
;;; ============================================================
(defn print-help []
(println "
xm-ducklake.bb - Neighborhood-aware unified history
Usage: bb xm-ducklake.bb [command]
Commands:
init Initialize xm.duckdb schema
ingest Ingest all history sources
deconflict Run deconfliction pipeline
full Init + ingest + deconflict (full rebuild)
stats Show statistics
neighborhoods [n] Show top n neighborhoods (default 10)
search <query> Search across content
neighbors <id> Find neighbors of interaction
query <sql> Execute custom SQL
Environment:
XM_DB_PATH Database path (default: ~/xm.duckdb)
The xm key binds: claude + duck + codex + postduck + postpostduck + preprepreduck + postcodex
Seed 1069: [+1, -1, -1, +1, +1, +1, +1]
"))
(let [args *command-line-args*
cmd (first args)]
(case cmd
"init" (init-schema)
"ingest" (ingest-all-sources)
"deconflict" (run-deconfliction)
"full" (do
(init-schema)
(ingest-all-sources)
(run-deconfliction)
(show-stats))
"stats" (show-stats)
"neighborhoods" (show-neighborhoods (or (some-> (second args) parse-long) 10))
"search" (search-xm (str/join " " (rest args)))
"neighbors" (query-neighbors (second args))
"query" (println (duck-exec (str/join " " (rest args))))
(print-help)))