PMKFKETRBAZWKGX2ZBWZAUBXVKD2V22AEHFKA2HCSSTGI75DPEAAC DBCRYUN5Q3DIFGBVVCOWRNBMMWCGILT4TY2OY22IYVH6U7TQIYTAC IFWHYAZTPKL7ZVOV3AFZNIUTRLMVSOACYACA3CI4DSEEZU77YQEQC UV2RTMNSNI3AJFVVD66OT3CEF7WU7TLAPJKACGX7BBSSWAEBAAAAC DH3RVE6JRQBOSRUBVIIMKSJYFS4CK5PI25KV2NAJ7EP4Q7NNKSJQC SIHQ3OG5GIBQLSMOBKSD2IHFTGFXU4VDCX45BH2ABS42BV3N2UIQC LECAQCCL2UKTMH3LQSGEFVBOF6CYCGJGJZM2I2T4R7QPTBVW4VYQC }#[pg_test]fn test_score_vecs() {println!("Starting test");let mut rng = thread_rng();let poisson = Poisson::new(1024.0).expect("Couldn't Poissonify");let n_vecs = poisson.sample(&mut rng) as usize;let dim = 512u16;let mut shmem_handler = SharedMemHandler {};println!("INITIALISING TABLES");init_index_table();clear_allocations();println!("Building vectors");let vectors = VectorsBuilder::new().init("vectors_2", dim);insert_random_vecs(&mut rng, &vectors, n_vecs);println!("DONE Building vectors");let mut mem_cache_vectors = MemCacheVectorsBuilder::new().init(&vectors);mem_cache_vectors.push_all_possible(&mut shmem_handler);let vec = std::iter::repeat(1.0).take(dim as usize).collect::<Vec<_>>();Spi::connect(|client| {let vecs_and_scores_db = vectors.score_vecs_ip(&client, &vec);let vecs_and_scores_cache = mem_cache_vectors.score_vecs_ip(&client, &vec);let score: f32 = vecs_and_scores_cache.iter().map(|(idx, score)| {let db_score = vecs_and_scores_db.get(idx).expect(&format!("Missing index {} from db", idx));(*score - db_score).abs()}).sum();assert!(score <= 1.0e-7);Ok(Some(score))}).expect("Couln't unwrap score");
.map(|heap_tuple| {let vec_table_name = heap_tuple.by_ordinal(1).expect("Unable to read vector table name").value::<String>().expect("Unable to cast vector table name to String");let index_table_name = heap_tuple.by_ordinal(2).expect("Unabled to read index table name").value::<String>().expect("Unable to cast index table name to String");let dim = heap_tuple.by_ordinal(3).expect("Unable to read dimension").value::<i32>().expect("Unable to cast dimension to int");
.select_help().iter_three::<String, String, i32>().map(|(vec_table_opt, index_table_opt, dim_opt)| {let vec_table_name =vec_table_opt.expect("Unable to cast vector table name to String");let index_table_name =index_table_opt.expect("Unable to cast index table name to String");let dim = dim_opt.expect("Unable to cast dimension to int");
}pub(crate) fn score_vecs_ip(&self, client: &SpiClient, vec: &[f32]) -> HashMap<i64, f32> {client.select(&format!("SELECTvec.id, ip(vec.vec, $1) ipFROM {vector_table} vec",vector_table = &self.name),None,Some(vec![(PgBuiltInOids::FLOAT4ARRAYOID.oid(),vec.into_datum(),)]),).select_help().iter_two::<i64, f32>().filter_map(|(idx_opt, score_opt)| {let idx = idx_opt?;let score = score_opt?;Some((idx, score))}).collect()
.ok_or({println!("Doing some Stuff");SpiError::Transaction})?; // Probably unwrap is better
{let vecs_with_ids = self.get_vecs_block(&client, count).collect::<Vec<_>>();let inserts = vecs_with_ids.iter().map(|vec_with_id| {self.push_vector(allocator_index as usize, &mut client, vec_with_id)}).collect::<Result<Vec<()>, String>>().map_err(|_| SpiError::Transaction)?.len(); // todo if one of these fails, there should be a recovery step.println!("Insert n_vecs {}, in allocator {} with dim {}",vecs_with_ids.len(),allocator_index,self.vector_table.dim);
let vecs_with_ids = self.get_vecs_block(&client, count).collect::<Vec<_>>();let inserts = vecs_with_ids.iter().map(|vec_with_id| self.push_vector(allocator_index as usize, &mut client, vec_with_id)).collect::<Result<Vec<()>, String>>().map_err(|_| SpiError::Transaction)?.len(); // todo if one of these fails, there should be a recovery step.println!("Insert n_vecs {}, in allocator {} with dim {}",vecs_with_ids.len(),allocator_index,self.vector_table.dim);Ok(inserts.wrap_with_client(client))
Ok(inserts.wrap_with_client(client))} else {Ok(0.wrap_with_client(client))}
COUNT(vector_id) n_vecsFROM {}GROUP BY allocatorORDER BY n_vecs)SELECTallocator,n_vecsFROM allocators_by_vecWHERE n_vecs < $1",
n_vecsFROM allocators_by_vecWHERE n_vecs < $1",
}pub(crate) fn score_vecs_ip(&self, client: &SpiClient, vec: &[f32]) -> HashMap<i64, f32> {client.select(&format!("SELECTmit.vector_id id,get_block_vector_ip(mit.allocator, mit.allocator_index, $1) ipFROM {memcache_index_table} mitUNIONSELECTvec.id, ip(vec.vec, $1) ipFROM {vector_table} vecLEFT JOIN {memcache_index_table} mit2ON mit2.vector_id = vec.idWHERE mit2.id IS NULL",memcache_index_table = &self.name,vector_table = &self.vector_table.name),None,Some(vec![(PgBuiltInOids::FLOAT4ARRAYOID.oid(),vec.into_datum(),)]),).select_help().iter_two::<i64, f32>().filter_map(|(idx_opt, score_opt)| {let idx = idx_opt?;let score = score_opt?;Some((idx, score))}).collect::<HashMap<_, _>>()
fn get_block_vector_ip(block_index: i32, vector_index: i32, comp_array: Array<f32>) -> Option<f32> {if block_index >= SHMEM_BLOCKS.get_size() as i32 {panic!("Tried to access block index {} bigger than block size {} when calculating inner product",block_index,SHMEM_BLOCKS.get_size())}SHMEM_BLOCKS.get_block(block_index as usize).map(|block| {let shared_ref = block.share();shared_ref.get_slice(vector_index as usize).expect("vector_index out of bounds").into_iter().zip(comp_array.into_iter()).map(|(x, y_opt)| x * y_opt.unwrap_or(0.0)).sum::<f32>()})}#[pg_extern]