UV2RTMNSNI3AJFVVD66OT3CEF7WU7TLAPJKACGX7BBSSWAEBAAAAC
impl<const Size: usize> F32Block<Size> {
fn write_vec(&mut self, vec: &Vec<f32>, start: usize) -> Result<(), F32BlockError> {
let end = start + vec.len();
if end < start || (end) >= Size {
Err(F32BlockError::overflow(start, vec.len(), Size))
struct Padded<'a> {
dimension: u16,
data: &'a [f32],
}
impl<'a> Padded<'a> {
fn iterator_with_dim(self) -> impl Iterator<Item = f32> + 'a {
self.data
.iter()
.map(|x| *x)
.chain(repeat(0.0))
.take(self.dimension as usize)
}
}
/// This will write over a piece of the block
impl<const SIZE: usize> F32Block<SIZE> {
fn write_slice<'a, 'b>(&'a mut self, vec: &'b [f32], start: usize) -> BlockResult<()> {
let vec_len = vec.len();
let data_len = self.0.len();
let end = vec_len + start;
if end <= data_len {
for (new_val, old_val) in vec.iter().zip(self.0[start..].iter_mut()) {
*old_val = *new_val;
}
Ok(())
} else {
Err(F32BlockError::overflow(start, vec_len, data_len))
}
}
fn write_iter<IterType: Iterator<Item = f32>>(
&mut self,
start: usize,
iter: IterType,
) -> BlockResult<()> {
if let (_, Some(upper_bound)) = iter.size_hint() {
let data_len = self.0.len();
if start + upper_bound > data_len {
for (old_val, new_val) in self.0[start..].iter_mut().zip(iter) {
*old_val = new_val;
}
Ok(())
} else {
Err(F32BlockError::overflow(start, upper_bound, data_len))
}
let vec_data = &mut self.0;
let data_len = vec_data.len();
if start > data_len {
(data_len..start)
.map(|_| &0.0)
.chain(vec.iter())
.map(|x| vec_data.push(*x))
.collect::<Result<Vec<()>, f32>>()
Err(F32BlockError::PushUnboundedIter)
}
}
fn push_slice<'a, 'b>(&'a mut self, vec: &'b [f32]) -> BlockResult<()> {
let data_len = self.0.len();
let vec_len = vec.len();
self.0
.extend_from_slice(vec)
.map_err(|_| F32BlockError::limit_exceded(data_len, vec_len, SIZE))
}
/// We allow pushing iterators, but we require an upper bound for the size hint of the
/// iterator.
fn push_iter<IterType: Iterator<Item = f32>>(&mut self, iter: IterType) -> BlockResult<()> {
if let (_, Some(upper_bound)) = iter.size_hint() {
let data_len = self.0.len();
if data_len + upper_bound <= SIZE {
iter.map(|el| self.0.push(el))
.collect::<Result<Vec<_>, _>>()
.map_err(|_| F32BlockError::PushUnboundedIter)
let intersect = data_len.min(end);
let intersect_shift = intersect - start;
vec_data[start..intersect]
.iter_mut()
.zip(vec.iter())
.for_each(|(buffer, new)| *buffer = *new);
vec[intersect_shift..]
.iter()
.map(|x| vec_data.push(*x))
.collect::<Result<Vec<()>, f32>>()
.map_err(|_| F32BlockError::overflow(start, vec.len(), Size))?;
Ok(())
Err(F32BlockError::limit_exceded(data_len, upper_bound, SIZE))
let mut tmp = [0.0; SEGMENT_SIZE];
let to_move = data_len - end;
(end..data_len).step_by(SEGMENT_SIZE).for_each(|i| {
let vec_data = &self.0;
tmp.iter_mut()
.zip(vec_data[i..(i + SEGMENT_SIZE)].iter())
.for_each(|(tmp_val, vec_data_val)| *tmp_val = *vec_data_val);
let vec_data = &mut self.0;
vec_data[(i - len)..(i - len + SEGMENT_SIZE)]
.iter_mut()
.zip(tmp.iter())
.for_each(|(vec_data_val, tmp_val)| *vec_data_val = *tmp_val);
});
(0..len)
.map(|_| {
self.0.pop().ok_or(F32BlockError::DeleteEmpty {
data_len,
start,
len,
})
})
.collect::<Result<Vec<f32>, F32BlockError>>()
.map(|_| ())
let mut loop_start = start;
loop {
let (initial, tail) = self.0.split_at_mut(end);
for (to_delete, to_assign) in initial[loop_start..].iter_mut().zip(tail.iter()) {
*to_delete = *to_assign;
}
end += len;
loop_start += len;
if end >= data_len {
break;
}
}
self.0.truncate(data_len - len);
Ok(())
let mem = RefCell::new(Box::new(F32Block::<BLOCK_SIZE>::default()));
assert!(mem
.borrow_mut()
.write_vec(&vec, good_start)
.map(|_| true)
.unwrap());
let mut mem = F32Block::<BLOCK_SIZE>::default();
mem.push_iter((0..end).map(|x| (x as f32))).unwrap();
mem.write_slice(&vec, good_start).unwrap();
let read = mem.read_data(good_start, vec_size).unwrap();
let diff: f32 = read
.iter()
.zip(vec.iter())
.map(|(x, y)| (x - y).abs())
.sum();
assert!(diff <= TOL);
mem.borrow_mut().write_vec(&vec, bad_start).unwrap_err(),
F32BlockError::overflow(bad_start, vec_size, BLOCK_SIZE)
mem.write_slice(&vec, bad_start).unwrap_err(),
F32BlockError::overflow(bad_start, vec_size, end)
);
assert_eq!(
mem.write_slice(&vec, BLOCK_SIZE + vec_size).unwrap_err(),
F32BlockError::overflow(BLOCK_SIZE + vec_size, vec_size, end)
fn test_delete() -> Result<(), F32BlockError> {
const BLOCK_SIZE: usize = 100000;
let good_start = 10000; // something less than BLOCK_SIZE;
let len = 10000; // something less than BLOCK_SIZE - good_start;
let mem = RefCell::new(Box::new(F32Block::<BLOCK_SIZE>::default()));
fn test_push() {
let mut mem = F32Block::<BLOCK_SIZE>::default();
let vec_size = 512;
let vec = (0..vec_size).map(|x| (x as f32).sin()).collect::<Vec<_>>();
let pushes = (0..20)
.map(|_| mem.push_slice(&vec))
.collect::<Result<Vec<()>, F32BlockError>>();
assert!(pushes.is_err());
}
#[test]
fn test_delete() {
let good_start = 1024; // something less than BLOCK_SIZE;
let len = good_start * 1024; // something less than BLOCK_SIZE - good_start;
let mut mem = F32Block::<BLOCK_SIZE>::default();
mem.borrow_mut().write_vec(
&(0..(good_start + len))
.map(|x| (x as f32).cos())
.collect::<Vec<_>>(),
0,
)?;
assert!(mem
.borrow_mut()
.free(good_start, len)
.map(|_| true)
.unwrap());
Ok(())
mem.push_iter((0..len).map(|x| (x as f32).cos())).unwrap();
assert!(mem.free(good_start, len).is_ok());
assert!(mem.free(good_start, len).is_err());
let end = self.end();
let write_vec = vec[..self.dim()].iter().map(|x| *x).collect::<Vec<_>>(); // Unecessary allocation ! need to implement as iters
self.f32_data.write_vec(&write_vec, end)?;
let padded = Padded {
dimension: self.dimension,
data: vec,
};
self.f32_data.push_iter(padded.iterator_with_dim())?;
let slice_start = self.start + (dimension) * index;
let slice_end = slice_start + (dimension);
self.f32_data.read_data(slice_start, slice_end)
let slice_start = dimension * index;
let slice_end = slice_start + dimension;
self.f32_data.read_data(slice_start, dimension)
}
fn pad<'b>(&'_ self, data: &'b [f32]) -> Padded<'b> {
Padded {
dimension: self.dimension,
data,
}
}
pub fn update_vec(&mut self, index: usize, data: &[f32]) -> Result<(), F32BlockError> {
let padded = self.pad(data);
self.f32_data
.write_iter(index * self.dim(), padded.iterator_with_dim())?;
Ok(())
#[pg_guard]
pub extern "C" fn amvalidate(_opclassoid: pg_sys::Oid) -> bool {
true
}
/// ```sql
/// CREATE OR REPLACE FUNCTION amhandler(internal) RETURNS index_am_handler PARALLEL SAFE IMMUTABLE STRICT COST 0.0001 LANGUAGE c AS 'MODULE_PATHNAME', '@FUNCTION_NAME@';
/// CREATE ACCESS METHOD vector TYPE INDEX HANDLER amhandler;
/// ```
/*
amroutine.ambuildempty = todo!
amroutine.aminsert = todo!;
amroutine.ambulkdelete = todo!;
amroutine.amvacuumcleanup = todo!;
amroutine.amcostestimate = todo!;
amroutine.amoptions = todo!;
amroutine.amvalidate = todo!;
amroutine.ambeginscan = todo!;
amroutine.amrescan = todo!;
amroutine.amgettuple = todo!;
amroutine.amendscan = todo!;
*/
amroutine.ambuildempty = Some(build::vec_mem_cache_buildempty);
amroutine.aminsert = None; // todo!();
amroutine.ambulkdelete = None; // todo!();
amroutine.amvacuumcleanup = None; // todo!();
amroutine.amcostestimate = None; //todo!();
amroutine.amoptions = Some(options::vec_mem_cache_options);
amroutine.amvalidate = Some(amvalidate);
amroutine.ambeginscan = None; //todo!();
amroutine.amrescan = None; //todo!();
amroutine.amgettuple = None; //todo!();
amroutine.amendscan = None; //todo!();
#[cfg(test)]
mod test {
// We want to try out something like
// ```sql
// CREATE TABLE public.vecs_2
//(
// idx bigserial,
// vec real[] CONSTRAINT vecs_2_dim512 CHECK (cardinality(vec) = 512),
// PRIMARY KEY (idx)
//);
//
// ```
fn test_constraints() {
todo!() // YOLO
}
}
}
struct BuildState<'a> {
tupdesc: &'a PgTupleDesc<'a>,
dimension: u16,
// attributes: Vec<CategorizedAttribute<'a>>,
memcxt: PgMemoryContexts,
pub rows_added: u32,
}
impl<'a> BuildState<'a> {
fn new(
tupdesc: &'a PgTupleDesc,
dimension: u16,
// attributes: Vec<CategorizedAttribute<'a>>,
) -> Self {
BuildState {
tupdesc,
dimension,
memcxt: PgMemoryContexts::new("pgvec_rs context"),
rows_added: 0,
}
}
}
#[cfg(feature = "pg13")]
#[pg_guard]
unsafe extern "C" fn build_callback(
_index: pg_sys::Relation,
ctid: pg_sys::ItemPointer,
values: *mut pg_sys::Datum,
_isnull: *mut bool,
_tuple_is_alive: bool,
state: *mut std::os::raw::c_void,
) {
build_callback_internal(*ctid, values, state);
}
#[inline(always)]
unsafe extern "C" fn build_callback_internal(
ctid: pg_sys::ItemPointerData,
values: *mut pg_sys::Datum,
state: *mut std::os::raw::c_void,
) {
check_for_interrupts!();
let state = (state as *mut BuildState).as_mut().unwrap();
let old_context = state.memcxt.set_as_current();
// TODO: Add check for structure
let values = std::slice::from_raw_parts(values, 1);
let VecRow { id: _, data: vec } = row_to_vec_iter(&state.tupdesc, values[0]).unwrap();
let mut vec_allocator = VECTOR_ALLOCATOR00.exclusive();
vec_allocator.add_vec(&vec);
state.rows_added += 1;
}
fn do_heap_scan<'a>(
index_info: *mut pg_sys::IndexInfo,
heap_relation: &'a PgRelation,
index_relation: &'a PgRelation,
tupdesc: &'a PgTupleDesc,
) -> u32 {
// Should be able a to get a dimension from the description, but we'll hard code it now YOLO
let dimension = 512;
let mut state = BuildState::new(&tupdesc, dimension);
unsafe {
pg_sys::IndexBuildHeapScan(
heap_relation.as_ptr(),
index_relation.as_ptr(),
index_info,
Some(build_callback),
&mut state,
);
}
state.rows_added
}
#[derive(Default, Clone)]
struct VecRowPartial {
id: Option<i64>,
data: Option<Vec<f32>>,
}
impl VecRowPartial {
fn check(self) -> Result<VecRow, &'static str> {
match self {
VecRowPartial {
id: Some(id),
data: Some(data),
} => Ok(VecRow { id, data }),
VecRowPartial {
id: None,
data: Some(_),
} => Err("Missing Id for vec row"),
VecRowPartial {
id: Some(_),
data: None,
} => Err("Missing vector data for vec row"),
VecRowPartial {
id: None,
data: None,
} => Err("Missing all data for vector"),
}
}
}
struct VecRow {
id: i64,
data: Vec<f32>,
#[inline]
unsafe fn row_to_vec_iter<'a>(
tupdesc: &'a PgTupleDesc,
row: pg_sys::Datum,
) -> Result<VecRow, &'static str> {
let td = pg_sys::pg_detoast_datum(row as *mut pg_sys::varlena) as pg_sys::HeapTupleHeader;
let mut tmptup = pg_sys::HeapTupleData {
t_len: varsize(td as *mut pg_sys::varlena) as u32,
t_self: Default::default(),
t_tableOid: 0,
t_data: td,
};
let mut datums = vec![0 as pg_sys::Datum; tupdesc.natts as usize];
let mut nulls = vec![false; tupdesc.natts as usize];
pg_sys::heap_deform_tuple(
&mut tmptup,
tupdesc.as_ptr(),
datums.as_mut_ptr(),
nulls.as_mut_ptr(),
);
let mut drop_cnt = 0;
let mut vec_row = VecRowPartial::default();
tupdesc
.iter()
.map(|attribute| {
let is_dropped = attribute.is_dropped();
let array_type = unsafe { pg_sys::get_element_type(attribute.type_oid().value()) };
let (base_oid, is_array) = if array_type != pg_sys::InvalidOid {
(PgOid::from(array_type), true)
} else {
(attribute.type_oid(), false)
};
let typoid = base_oid;
let fill_process: Result<(), &'static str> = match &typoid {
PgOid::BuiltIn(builtin) => match (builtin, is_array) {
(PgBuiltInOids::FLOAT4OID, true) => {
if let None = vec_row.data {
let data =
unsafe { Vec::<f32>::from_datum(row, false, builtin.value()) }
.unwrap();
vec_row.data = Some(data);
Ok(())
} else {
Err("Received more than one vector data entries in row")
}
}
(PgBuiltInOids::INT8OID, false) => {
if let None = vec_row.id {
let id =
unsafe { i64::from_datum(row, false, builtin.value()) }.unwrap();
vec_row.id = Some(id);
Ok(())
} else {
Err("Received more than one id in the row")
}
}
(_, _) => {
Err("This row is not the right shape. It should be (bigint, real[])")
}
},
_ => Err("This element is not an accepted type"), // todo: communicate the right and wrong types
};
fill_process
})
.collect::<Result<Vec<()>, &'static str>>()?;
vec_row.check()
}
#[pg_guard]
pub extern "C" fn vec_mem_cache_buildempty(_index_relation: pg_sys::Relation) {}
#[macro_use]
extern crate lazy_static;
const MAX_SPACE: usize = 131_072;
lazy_static! {
static ref SHMEM_STEP: Arc<Mutex<usize>> = Arc::new(Mutex::new(0));
static ref LOCKS: Arc<Mutex<Vec<PgLwLock<FiniteDimAllocator<MAX_SPACE>>>>> =
Arc::new(Mutex::new(Vec::new()));
static ref HOOK_LOCK: Mutex<bool> = Mutex::new(false);
}
#[pg_extern]
fn ip(a: Array<f32>, b: Array<f32>) -> f32 {
a.iter()
.zip(b.iter())
.map(|pair| unwrap_pair(pair))
.map(|(a, b)| a * b)
.sum()
}
const TOL: f32 = 1.0e-8;
#[pg_extern]
fn cosine_similarity(a: Array<f32>, b: Array<f32>) -> f32 {
let mut a_norm_sq = 0.0;
let mut b_norm_sq = 0.0;
let a_norm_sq_ref = &mut a_norm_sq;
let b_norm_sq_ref = &mut b_norm_sq;
let mut sq_sum: f32 = a
.iter()
.zip(b.iter())
.map(|pair| unwrap_pair(pair))
.map(|(a, b)| {
*a_norm_sq_ref += a * a;
*b_norm_sq_ref += b * b;
a * b
})
.sum();
sq_sum /= TOL + a_norm_sq.sqrt();
sq_sum /= TOL + b_norm_sq.sqrt();
sq_sum.min(1.).max(-1.0)
}
static VECTOR_ALLOCATOR0: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR1: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR2: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR3: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR4: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR5: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR6: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR7: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR8: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR9: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR10: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR11: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR12: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR13: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR14: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR15: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR16: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR17: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR18: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR19: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
// static VECTOR_ALLOCATOR20: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
static VECTOR_ALLOCATOR00: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
static SHMEM_BLOCKS: shmem_blocks::ShMemBlocks<MAX_SPACE> =
shmem_blocks::ShMemBlocks::<MAX_SPACE>::new();
pg_shmem_init!(VECTOR_ALLOCATOR0);
// pg_shmem_init!(VECTOR_ALLOCATOR1);
// pg_shmem_init!(VECTOR_ALLOCATOR2);
// pg_shmem_init!(VECTOR_ALLOCATOR3);
// pg_shmem_init!(VECTOR_ALLOCATOR4);
// pg_shmem_init!(VECTOR_ALLOCATOR5);
// pg_shmem_init!(VECTOR_ALLOCATOR6);
// pg_shmem_init!(VECTOR_ALLOCATOR7);
// pg_shmem_init!(VECTOR_ALLOCATOR8);
// pg_shmem_init!(VECTOR_ALLOCATOR9);
// pg_shmem_init!(VECTOR_ALLOCATOR10);
// pg_shmem_init!(VECTOR_ALLOCATOR11);
// pg_shmem_init!(VECTOR_ALLOCATOR12);
// pg_shmem_init!(VECTOR_ALLOCATOR13);
// pg_shmem_init!(VECTOR_ALLOCATOR14);
// pg_shmem_init!(VECTOR_ALLOCATOR15);
// pg_shmem_init!(VECTOR_ALLOCATOR16);
// pg_shmem_init!(VECTOR_ALLOCATOR17);
// pg_shmem_init!(VECTOR_ALLOCATOR18);
// pg_shmem_init!(VECTOR_ALLOCATOR19);
// pg_shmem_init!(VECTOR_ALLOCATOR20);
// pg_shmem_init!(VECTOR_ALLOCATOR00);
unsafe {
let lock = std::ffi::CString::new(SHMEM_BLOCKS.get_name()).expect("CString::new failed");
let size = SHMEM_BLOCKS.get_size();
for _ in 0..size {
let pg_lw_lock = PgLwLock::<FiniteDimAllocator<MAX_SPACE>>::new();
let name = pg_lw_lock.get_name();
let lock_name = std::ffi::CString::new(name).expect("CString::new failed");
pg_sys::RequestAddinShmemSpace(
size * std::mem::size_of::<FiniteDimAllocator<MAX_SPACE>>(),
);
pg_sys::RequestNamedLWLockTranche(lock_name.as_ptr(), 1);
let mut locks = LOCKS.lock().unwrap();
locks.push(pg_lw_lock)
}
pg_sys::RequestAddinShmemSpace(
size * std::mem::size_of::<PgLwLock<FiniteDimAllocator<MAX_SPACE>>>(),
);
pg_sys::RequestNamedLWLockTranche(lock.as_ptr(), 1);
}
unsafe {
let size = SHMEM_BLOCKS.get_size();
for _ in 0..size {
let hook_lock = HOOK_LOCK.lock().expect("LOCKING SHMEM HOOKS");
static mut PREV_SHMEM_STARTUP_HOOK: Option<(unsafe extern "C" fn(), usize)> = None;
PREV_SHMEM_STARTUP_HOOK = pg_sys::shmem_startup_hook.map(|i| (i, 0));
pg_sys::shmem_startup_hook = Some(shmem_hook);
println!("startup hook: {:?}", PREV_SHMEM_STARTUP_HOOK);
println!("DONE SETTING SHMEM_HOOKS: {:?}", hook_lock);
#[pg_guard]
extern "C" fn shmem_hook() {
unsafe {
PREV_SHMEM_STARTUP_HOOK =
PREV_SHMEM_STARTUP_HOOK.map(|(i, count)| (i, count + 1));
if let Some((i, count)) = PREV_SHMEM_STARTUP_HOOK {
if count < SHMEM_BLOCKS.get_size() {
i()
};
}
let addin_shmem_init_lock: *mut pg_sys::LWLock =
&mut (*pg_sys::MainLWLockArray.add(21)).lock;
pg_sys::LWLockAcquire(addin_shmem_init_lock, pg_sys::LWLockMode_LW_EXCLUSIVE);
let mut found = false;
let mut step_num = SHMEM_STEP.lock().unwrap();
let locks_vec = LOCKS.lock().expect("Unable to lock locks vector");
let lock = locks_vec.get(*step_num).unwrap();
let lock_name = lock.get_name();
let shm_name = std::ffi::CString::new(lock_name).expect("CString::new failed");
println!("About to attach pointer");
let fv_shmem = pg_sys::ShmemInitStruct(
shm_name.into_raw(),
std::mem::size_of::<FiniteDimAllocator<MAX_SPACE>>(),
&mut found,
) as *mut FiniteDimAllocator<MAX_SPACE>;
lock.attach(fv_shmem);
println!("About to write pointer");
*step_num += 1;
pg_sys::LWLockRelease(addin_shmem_init_lock);
}
}
}
static mut PREV_SHMEM_STARTUP_HOOK: Option<unsafe extern "C" fn()> = None;
let hook_lock = HOOK_LOCK.lock().unwrap();
PREV_SHMEM_STARTUP_HOOK = pg_sys::shmem_startup_hook;
pg_sys::shmem_startup_hook = Some(shmem_hook);
#[pg_guard]
extern "C" fn shmem_hook() {
unsafe {
if let Some(i) = PREV_SHMEM_STARTUP_HOOK {
i();
}
let mut found = false;
let addin_shmem_init_lock: *mut pg_sys::LWLock =
&mut (*pg_sys::MainLWLockArray.add(21)).lock;
pg_sys::LWLockAcquire(addin_shmem_init_lock, pg_sys::LWLockMode_LW_EXCLUSIVE);
let shm_name =
std::ffi::CString::new(SHMEM_BLOCKS.get_name()).expect("CString::new failed");
let fv_shmem = pg_sys::ShmemInitStruct(
shm_name.into_raw(),
SHMEM_BLOCKS.get_size()
* std::mem::size_of::<PgLwLock<FiniteDimAllocator<MAX_SPACE>>>(),
&mut found,
) as *mut PgLwLock<FiniteDimAllocator<MAX_SPACE>>;
SHMEM_BLOCKS
.inner
.set(ShMemBlocksInner { start: fv_shmem })
.expect("Failed to set inner shmem");
for offset in 0..SHMEM_BLOCKS.get_size() {
let lock = LOCKS
.lock()
.expect("Locks should be set")
.pop()
.expect("Locks should be initialised at this point");
*lock.exclusive() = FiniteDimAllocator::<MAX_SPACE>::default();
println!("written!");
std::ptr::write(fv_shmem.add(offset), lock);
let lock_ref = fv_shmem.add(offset).as_ref().unwrap();
println!("{:?}", *lock_ref.share())
}
pg_sys::LWLockRelease(addin_shmem_init_lock);
}
}
}
fn set_allocator_dim(dimemsion: i16) {
VECTOR_ALLOCATOR0.exclusive().set_dim(dimemsion as u16)
fn fill_vec_allocator(block: i32, vecs: i32) {
let mut rng = thread_rng();
let unif = Uniform::new(0.0, 1.0);
set_allocator_dim(block, 512);
(0..vecs).for_each(|_| {
let vec = (0..512).map(|_| rng.sample(unif)).collect::<Vec<_>>();
SHMEM_BLOCKS
.get_block(block as usize)
.unwrap()
.exclusive()
.add_vec(&vec);
});
#[cfg(any(test, feature = "pg_test"))]
mod tests {
use crate::Vector;
use pgx::*;
#[pg_test]
fn test_hello_pgvector_rs() {
let vec1 = (0..512)
.map(|x| x as f32)
.map(|x| x.sin())
.collect::<Vec<_>>();
let vec1 = Vector(vec1);
let vec2 = vec1.clone();
assert!(crate::l2_dist(vec1, vec2).abs() <= 1.0e-10);
}
}