#ifndef _LSM_INT_H
# include "lsmInt.h"
#endif
#include <string.h>
#define MAX_DEPTH 32
typedef struct TreeKey TreeKey;
typedef struct TreeNode TreeNode;
typedef struct TreeLeaf TreeLeaf;
typedef struct NodeVersion NodeVersion;
struct TreeOld {
u32 iShmid;
u32 iRoot;
u32 nHeight;
};
#if 0#endif
static int assert_delete_ranges_match(lsm_db *);
static int treeCountEntries(lsm_db *db);
struct TreeKey {
int nKey;
int nValue;
u8 flags;
};
#define TKV_KEY(p) ((void *)&(p)[1])
#define TKV_VAL(p) ((void *)(((u8 *)&(p)[1]) + (p)->nKey))
struct TreeNode {
u32 aiKeyPtr[3];
u32 aiChildPtr[4];
u32 iV2;
u8 iV2Child;
u32 iV2Ptr;
};
struct TreeLeaf {
u32 aiKeyPtr[3];
};
typedef struct TreeBlob TreeBlob;
struct TreeBlob {
int n;
u8 *a;
};
struct TreeCursor {
lsm_db *pDb;
TreeRoot *pRoot;
int iNode;
TreeNode *apTreeNode[MAX_DEPTH];
u8 aiCell[MAX_DEPTH];
TreeKey *pSave;
TreeBlob blob;
};
#define WORKING_VERSION (1<<30)
static int tblobGrow(lsm_db *pDb, TreeBlob *p, int n, int *pRc){
if( n>p->n ){
lsmFree(pDb->pEnv, p->a);
p->a = lsmMallocRc(pDb->pEnv, n, pRc);
p->n = n;
}
return (p->a==0);
}
static void tblobFree(lsm_db *pDb, TreeBlob *p){
lsmFree(pDb->pEnv, p->a);
}
static int intArrayAppend(lsm_env *pEnv, IntArray *p, u32 iVal){
assert( p->nArray<=p->nAlloc );
if( p->nArray>=p->nAlloc ){
u32 *aNew;
int nNew = p->nArray ? p->nArray*2 : 128;
aNew = lsmRealloc(pEnv, p->aArray, nNew*sizeof(u32));
if( !aNew ) return LSM_NOMEM_BKPT;
p->aArray = aNew;
p->nAlloc = nNew;
}
p->aArray[p->nArray++] = iVal;
return LSM_OK;
}
static void intArrayFree(lsm_env *pEnv, IntArray *p){
p->nArray = 0;
}
static int intArraySize(IntArray *p){
return p->nArray;
}
static u32 intArrayEntry(IntArray *p, int iIdx){
return p->aArray[iIdx];
}
static void intArrayTruncate(IntArray *p, int nVal){
p->nArray = nVal;
}
static int treeKeycmp(void *p1, int n1, void *p2, int n2){
int res;
res = memcmp(p1, p2, LSM_MIN(n1, n2));
if( res==0 ) res = (n1-n2);
return res;
}
static u32 getChildPtr(TreeNode *p, int iVersion, int iCell){
assert( iVersion>=0 );
assert( iCell>=0 && iCell<=array_size(p->aiChildPtr) );
if( p->iV2 && p->iV2<=(u32)iVersion && iCell==p->iV2Child ) return p->iV2Ptr;
return p->aiChildPtr[iCell];
}
static int treeOffsetToChunk(u32 iOff){
assert( LSM_SHM_CHUNK_SIZE==(1<<15) );
return (int)(iOff>>15);
}
#define treeShmptrUnsafe(pDb, iPtr) \
(&((u8*)((pDb)->apShm[(iPtr)>>15]))[(iPtr) & (LSM_SHM_CHUNK_SIZE-1)])
static void *treeShmptr(lsm_db *pDb, u32 iPtr){
assert( (iPtr>>15)<(u32)pDb->nShm );
assert( pDb->apShm[iPtr>>15] );
return iPtr ? treeShmptrUnsafe(pDb, iPtr) : 0;
}
static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){
return (ShmChunk *)(pDb->apShm[iChunk]);
}
static ShmChunk * treeShmChunkRc(lsm_db *pDb, int iChunk, int *pRc){
assert( *pRc==LSM_OK );
if( iChunk<pDb->nShm || LSM_OK==(*pRc = lsmShmCacheChunks(pDb, iChunk+1)) ){
return (ShmChunk *)(pDb->apShm[iChunk]);
}
return 0;
}
#ifndef NDEBUG
static void assertIsWorkingChild(
lsm_db *db,
TreeNode *pNode,
TreeNode *pParent,
int iCell
){
TreeNode *p;
u32 iPtr = getChildPtr(pParent, WORKING_VERSION, iCell);
p = treeShmptr(db, iPtr);
assert( p==pNode );
}
#else
# define assertIsWorkingChild(w,x,y,z)
#endif
#define TKV_LOADKEY 1
#define TKV_LOADVAL 2
static TreeKey *treeShmkey(
lsm_db *pDb,
u32 iPtr,
int eLoad,
TreeBlob *pBlob,
int *pRc
){
TreeKey *pRet;
assert( eLoad==TKV_LOADKEY || eLoad==TKV_LOADVAL );
pRet = (TreeKey *)treeShmptr(pDb, iPtr);
if( pRet ){
int nReq;
int nAvail;
nReq = sizeof(TreeKey) + pRet->nKey;
if( eLoad==TKV_LOADVAL && pRet->nValue>0 ){
nReq += pRet->nValue;
}
assert( LSM_SHM_CHUNK_SIZE==(1<<15) );
nAvail = LSM_SHM_CHUNK_SIZE - (iPtr & (LSM_SHM_CHUNK_SIZE-1));
if( nAvail<nReq ){
if( tblobGrow(pDb, pBlob, nReq, pRc)==0 ){
int nLoad = 0;
while( *pRc==LSM_OK ){
ShmChunk *pChunk;
void *p = treeShmptr(pDb, iPtr);
int n = LSM_MIN(nAvail, nReq-nLoad);
memcpy(&pBlob->a[nLoad], p, n);
nLoad += n;
if( nLoad==nReq ) break;
pChunk = treeShmChunk(pDb, treeOffsetToChunk(iPtr));
assert( pChunk );
iPtr = (pChunk->iNext * LSM_SHM_CHUNK_SIZE) + LSM_SHM_CHUNK_HDR;
nAvail = LSM_SHM_CHUNK_SIZE - LSM_SHM_CHUNK_HDR;
}
}
pRet = (TreeKey *)(pBlob->a);
}
}
return pRet;
}
#if defined(LSM_DEBUG) && defined(LSM_EXPENSIVE_ASSERT)
void assert_leaf_looks_ok(TreeNode *pNode){
assert( pNode->apKey[1] );
}
void assert_node_looks_ok(TreeNode *pNode, int nHeight){
if( pNode ){
assert( pNode->apKey[1] );
if( nHeight>1 ){
int i;
assert( getChildPtr(pNode, WORKING_VERSION, 1) );
assert( getChildPtr(pNode, WORKING_VERSION, 2) );
for(i=0; i<4; i++){
assert_node_looks_ok(getChildPtr(pNode, WORKING_VERSION, i), nHeight-1);
}
}
}
}
void assert_tree_looks_ok(int rc, Tree *pTree){
}
#else
# define assert_tree_looks_ok(x,y)
#endif
void lsmFlagsToString(int flags, char *zFlags){
zFlags[0] = (flags & LSM_END_DELETE) ? ']' : '.';
switch( flags & (LSM_POINT_DELETE|LSM_INSERT|LSM_SEPARATOR) ){
case 0: zFlags[1] = '.'; break;
case LSM_POINT_DELETE: zFlags[1] = '-'; break;
case LSM_INSERT: zFlags[1] = '+'; break;
case LSM_SEPARATOR: zFlags[1] = '^'; break;
default: zFlags[1] = '?'; break;
}
zFlags[2] = (flags & LSM_SYSTEMKEY) ? '*' : '.';
zFlags[3] = (flags & LSM_START_DELETE) ? '[' : '.';
zFlags[4] = '\0';
}
#ifdef LSM_DEBUG
static void lsmAppendStrBlob(LsmString *pStr, void *pBlob, int nBlob){
int i;
lsmStringExtend(pStr, nBlob*2);
if( pStr->nAlloc==0 ) return;
for(i=0; i<nBlob; i++){
u8 c = ((u8*)pBlob)[i];
if( c>='a' && c<='z' ){
pStr->z[pStr->n++] = c;
}else if( c!=0 || nBlob==1 || i!=(nBlob-1) ){
pStr->z[pStr->n++] = "0123456789abcdef"[(c>>4)&0xf];
pStr->z[pStr->n++] = "0123456789abcdef"[c&0xf];
}
}
pStr->z[pStr->n] = 0;
}
#if 0#endif
static void strAppendFlags(LsmString *pStr, u8 flags){
char zFlags[8];
lsmFlagsToString(flags, zFlags);
zFlags[4] = ':';
lsmStringAppend(pStr, zFlags, 5);
}
void dump_node_contents(
lsm_db *pDb,
u32 iNode,
char *zPath,
int nPath,
int nHeight
){
const char *zSpace = " ";
int i;
int rc = LSM_OK;
LsmString s;
TreeNode *pNode;
TreeBlob b = {0, 0};
pNode = (TreeNode *)treeShmptr(pDb, iNode);
if( nHeight==0 ){
lsmStringInit(&s, pDb->pEnv);
for(i=0; i<3; i++){
u32 iPtr = pNode->aiKeyPtr[i];
if( iPtr ){
TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i],TKV_LOADKEY, &b,&rc);
strAppendFlags(&s, pKey->flags);
lsmAppendStrBlob(&s, TKV_KEY(pKey), pKey->nKey);
lsmStringAppend(&s, " ", -1);
}
}
printf("% 6d %.*sleaf%.*s: %s\n",
iNode, nPath, zPath, 20-nPath-4, zSpace, s.z
);
lsmStringClear(&s);
}else{
for(i=0; i<4 && nHeight>0; i++){
u32 iPtr = getChildPtr(pNode, pDb->treehdr.root.iTransId, i);
zPath[nPath] = (char)(i+'0');
zPath[nPath+1] = '/';
if( iPtr ){
dump_node_contents(pDb, iPtr, zPath, nPath+2, nHeight-1);
}
if( i!=3 && pNode->aiKeyPtr[i] ){
TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TKV_LOADKEY,&b,&rc);
lsmStringInit(&s, pDb->pEnv);
strAppendFlags(&s, pKey->flags);
lsmAppendStrBlob(&s, TKV_KEY(pKey), pKey->nKey);
printf("% 6d %.*s%.*s: %s\n",
iNode, nPath+1, zPath, 20-nPath-1, zSpace, s.z);
lsmStringClear(&s);
}
}
}
tblobFree(pDb, &b);
}
void dump_tree_contents(lsm_db *pDb, const char *zCaption){
char zPath[64];
TreeRoot *p = &pDb->treehdr.root;
printf("\n%s\n", zCaption);
zPath[0] = '/';
if( p->iRoot ){
dump_node_contents(pDb, p->iRoot, zPath, 1, p->nHeight-1);
}
fflush(stdout);
}
#endif
static void treeCursorInit(lsm_db *pDb, int bOld, TreeCursor *pCsr){
memset(pCsr, 0, sizeof(TreeCursor));
pCsr->pDb = pDb;
if( bOld ){
pCsr->pRoot = &pDb->treehdr.oldroot;
}else{
pCsr->pRoot = &pDb->treehdr.root;
}
pCsr->iNode = -1;
}
static TreeKey *csrGetKey(TreeCursor *pCsr, TreeBlob *pBlob, int *pRc){
TreeKey *pRet;
lsm_db *pDb = pCsr->pDb;
u32 iPtr = pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]];
assert( iPtr );
pRet = (TreeKey*)treeShmptrUnsafe(pDb, iPtr);
if( !(pRet->flags & LSM_CONTIGUOUS) ){
pRet = treeShmkey(pDb, iPtr, TKV_LOADVAL, pBlob, pRc);
}
return pRet;
}
int lsmTreeCursorSave(TreeCursor *pCsr){
int rc = LSM_OK;
if( pCsr && pCsr->pSave==0 ){
int iNode = pCsr->iNode;
if( iNode>=0 ){
pCsr->pSave = csrGetKey(pCsr, &pCsr->blob, &rc);
}
pCsr->iNode = -1;
}
return rc;
}
static int treeCursorRestore(TreeCursor *pCsr, int *pRes){
int rc = LSM_OK;
if( pCsr->pSave ){
TreeKey *pKey = pCsr->pSave;
pCsr->pSave = 0;
if( pRes ){
rc = lsmTreeCursorSeek(pCsr, TKV_KEY(pKey), pKey->nKey, pRes);
}
}
return rc;
}
static u32 treeShmalloc(lsm_db *pDb, int bAlign, int nByte, int *pRc){
u32 iRet = 0;
if( *pRc==LSM_OK ){
const static int CHUNK_SIZE = LSM_SHM_CHUNK_SIZE;
const static int CHUNK_HDR = LSM_SHM_CHUNK_HDR;
u32 iWrite;
u32 iEof;
int iChunk;
assert( nByte <= (CHUNK_SIZE-CHUNK_HDR) );
iWrite = pDb->treehdr.iWrite;
if( bAlign ){
iWrite = (iWrite + 3) & ~0x0003;
assert( (iWrite % 4)==0 );
}
assert( iWrite );
iChunk = treeOffsetToChunk(iWrite-1);
iEof = (iChunk+1) * CHUNK_SIZE;
assert( iEof>=iWrite && (iEof-iWrite)<(u32)CHUNK_SIZE );
if( (iWrite+nByte)>iEof ){
ShmChunk *pHdr;
ShmChunk *pFirst;
ShmChunk *pNext;
int iNext = 0;
int rc = LSM_OK;
pFirst = treeShmChunk(pDb, pDb->treehdr.iFirst);
assert( shm_sequence_ge(pDb->treehdr.iUsedShmid, pFirst->iShmid) );
assert( (pDb->treehdr.iNextShmid+1-pDb->treehdr.nChunk)==pFirst->iShmid );
if( pDb->treehdr.iUsedShmid!=pFirst->iShmid ){
int bInUse;
rc = lsmTreeInUse(pDb, pFirst->iShmid, &bInUse);
if( rc!=LSM_OK ){
*pRc = rc;
return 0;
}
if( bInUse==0 ){
iNext = pDb->treehdr.iFirst;
pDb->treehdr.iFirst = pFirst->iNext;
assert( pDb->treehdr.iFirst );
}
}
if( iNext==0 ) iNext = pDb->treehdr.nChunk++;
pNext = treeShmChunkRc(pDb, iNext, &rc);
if( pNext ){
pNext->iNext = 0;
pNext->iShmid = (pDb->treehdr.iNextShmid++);
}else{
*pRc = rc;
return 0;
}
pHdr = (ShmChunk *)treeShmptr(pDb, iChunk*CHUNK_SIZE);
pHdr->iNext = iNext;
iWrite = iNext * CHUNK_SIZE + CHUNK_HDR;
}
iRet = iWrite;
pDb->treehdr.iWrite = iWrite + nByte;
pDb->treehdr.root.nByte += nByte;
}
return iRet;
}
static void *treeShmallocZero(lsm_db *pDb, int nByte, u32 *piPtr, int *pRc){
u32 iPtr;
void *p;
iPtr = treeShmalloc(pDb, 1, nByte, pRc);
p = treeShmptr(pDb, iPtr);
if( p ){
assert( *pRc==LSM_OK );
memset(p, 0, nByte);
*piPtr = iPtr;
}
return p;
}
static TreeNode *newTreeNode(lsm_db *pDb, u32 *piPtr, int *pRc){
return treeShmallocZero(pDb, sizeof(TreeNode), piPtr, pRc);
}
static TreeLeaf *newTreeLeaf(lsm_db *pDb, u32 *piPtr, int *pRc){
return treeShmallocZero(pDb, sizeof(TreeLeaf), piPtr, pRc);
}
static TreeKey *newTreeKey(
lsm_db *pDb,
u32 *piPtr,
void *pKey, int nKey,
void *pVal, int nVal,
int *pRc
){
TreeKey *p;
u32 iPtr;
u32 iEnd;
int nRem;
u8 *a;
int n;
*piPtr = iPtr = treeShmalloc(pDb, 1, sizeof(TreeKey), pRc);
p = treeShmptr(pDb, iPtr);
if( *pRc ) return 0;
p->nKey = nKey;
p->nValue = nVal;
n = nRem = nKey;
a = (u8 *)pKey;
while( a ){
while( nRem>0 ){
u8 *aAlloc;
int nAlloc;
u32 iWrite;
iWrite = (pDb->treehdr.iWrite & (LSM_SHM_CHUNK_SIZE-1));
iWrite = LSM_MAX(iWrite, LSM_SHM_CHUNK_HDR);
nAlloc = LSM_MIN((LSM_SHM_CHUNK_SIZE-iWrite), (u32)nRem);
aAlloc = treeShmptr(pDb, treeShmalloc(pDb, 0, nAlloc, pRc));
if( aAlloc==0 ) break;
memcpy(aAlloc, &a[n-nRem], nAlloc);
nRem -= nAlloc;
}
a = pVal;
n = nRem = nVal;
pVal = 0;
}
iEnd = iPtr + sizeof(TreeKey) + nKey + LSM_MAX(0, nVal);
if( (iPtr & ~(LSM_SHM_CHUNK_SIZE-1))!=(iEnd & ~(LSM_SHM_CHUNK_SIZE-1)) ){
p->flags = 0;
}else{
p->flags = LSM_CONTIGUOUS;
}
if( *pRc ) return 0;
#if 0#endif
return p;
}
static TreeNode *copyTreeNode(
lsm_db *pDb,
TreeNode *pOld,
u32 *piNew,
int *pRc
){
TreeNode *pNew;
pNew = newTreeNode(pDb, piNew, pRc);
if( pNew ){
memcpy(pNew->aiKeyPtr, pOld->aiKeyPtr, sizeof(pNew->aiKeyPtr));
memcpy(pNew->aiChildPtr, pOld->aiChildPtr, sizeof(pNew->aiChildPtr));
if( pOld->iV2 ) pNew->aiChildPtr[pOld->iV2Child] = pOld->iV2Ptr;
}
return pNew;
}
static TreeNode *copyTreeLeaf(
lsm_db *pDb,
TreeLeaf *pOld,
u32 *piNew,
int *pRc
){
TreeLeaf *pNew;
pNew = newTreeLeaf(pDb, piNew, pRc);
if( pNew ){
memcpy(pNew, pOld, sizeof(TreeLeaf));
}
return (TreeNode *)pNew;
}
static int treeUpdatePtr(lsm_db *pDb, TreeCursor *pCsr, u32 iNew){
int rc = LSM_OK;
if( pCsr->iNode<0 ){
pDb->treehdr.root.iRoot = iNew;
}else{
TreeNode *p;
int iChildPtr;
p = pCsr->apTreeNode[pCsr->iNode];
iChildPtr = pCsr->aiCell[pCsr->iNode];
if( p->iV2 ){
u32 iCopy;
TreeNode *pCopy;
pCopy = copyTreeNode(pDb, p, &iCopy, &rc);
if( pCopy ){
assert( rc==LSM_OK );
pCopy->aiChildPtr[iChildPtr] = iNew;
pCsr->iNode--;
rc = treeUpdatePtr(pDb, pCsr, iCopy);
}
}else{
u32 iPtr;
assert( pDb->treehdr.root.iTransId>0 );
if( pCsr->iNode ){
iPtr = getChildPtr(
pCsr->apTreeNode[pCsr->iNode-1],
pDb->treehdr.root.iTransId, pCsr->aiCell[pCsr->iNode-1]
);
}else{
iPtr = pDb->treehdr.root.iRoot;
}
rc = intArrayAppend(pDb->pEnv, &pDb->rollback, iPtr);
if( rc==LSM_OK ){
p->iV2 = pDb->treehdr.root.iTransId;
p->iV2Child = (u8)iChildPtr;
p->iV2Ptr = iNew;
}
}
}
return rc;
}
static int treeInsert(
lsm_db *pDb,
TreeCursor *pCsr,
u32 iLeftPtr,
u32 iTreeKey,
u32 iRightPtr,
int iSlot
){
int rc = LSM_OK;
TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
assert( pNode->aiKeyPtr[1] );
if( pNode->aiKeyPtr[0] && pNode->aiKeyPtr[2] ){
u32 iLeft; TreeNode *pLeft;
u32 iRight; TreeNode *pRight;
pLeft = newTreeNode(pDb, &iLeft, &rc);
pRight = newTreeNode(pDb, &iRight, &rc);
if( rc ) return rc;
pLeft->aiChildPtr[1] = getChildPtr(pNode, WORKING_VERSION, 0);
pLeft->aiKeyPtr[1] = pNode->aiKeyPtr[0];
pLeft->aiChildPtr[2] = getChildPtr(pNode, WORKING_VERSION, 1);
pRight->aiChildPtr[1] = getChildPtr(pNode, WORKING_VERSION, 2);
pRight->aiKeyPtr[1] = pNode->aiKeyPtr[2];
pRight->aiChildPtr[2] = getChildPtr(pNode, WORKING_VERSION, 3);
if( pCsr->iNode==0 ){
u32 iRoot; TreeNode *pRoot;
pRoot = newTreeNode(pDb, &iRoot, &rc);
pRoot->aiKeyPtr[1] = pNode->aiKeyPtr[1];
pRoot->aiChildPtr[1] = iLeft;
pRoot->aiChildPtr[2] = iRight;
pDb->treehdr.root.iRoot = iRoot;
pDb->treehdr.root.nHeight++;
}else{
pCsr->iNode--;
rc = treeInsert(pDb, pCsr,
iLeft, pNode->aiKeyPtr[1], iRight, pCsr->aiCell[pCsr->iNode]
);
}
assert( pLeft->iV2==0 );
assert( pRight->iV2==0 );
switch( iSlot ){
case 0:
pLeft->aiKeyPtr[0] = iTreeKey;
pLeft->aiChildPtr[0] = iLeftPtr;
if( iRightPtr ) pLeft->aiChildPtr[1] = iRightPtr;
break;
case 1:
pLeft->aiChildPtr[3] = (iRightPtr ? iRightPtr : pLeft->aiChildPtr[2]);
pLeft->aiKeyPtr[2] = iTreeKey;
pLeft->aiChildPtr[2] = iLeftPtr;
break;
case 2:
pRight->aiKeyPtr[0] = iTreeKey;
pRight->aiChildPtr[0] = iLeftPtr;
if( iRightPtr ) pRight->aiChildPtr[1] = iRightPtr;
break;
case 3:
pRight->aiChildPtr[3] = (iRightPtr ? iRightPtr : pRight->aiChildPtr[2]);
pRight->aiKeyPtr[2] = iTreeKey;
pRight->aiChildPtr[2] = iLeftPtr;
break;
}
}else{
TreeNode *pNew;
u32 *piKey;
u32 *piChild;
u32 iStore = 0;
u32 iNew = 0;
int i;
pNew = newTreeNode(pDb, &iNew, &rc);
if( rc ) return rc;
piKey = pNew->aiKeyPtr;
piChild = pNew->aiChildPtr;
for(i=0; i<iSlot; i++){
if( pNode->aiKeyPtr[i] ){
*(piKey++) = pNode->aiKeyPtr[i];
*(piChild++) = getChildPtr(pNode, WORKING_VERSION, i);
}
}
*piKey++ = iTreeKey;
*piChild++ = iLeftPtr;
iStore = iRightPtr;
for(i=iSlot; i<3; i++){
if( pNode->aiKeyPtr[i] ){
*(piKey++) = pNode->aiKeyPtr[i];
*(piChild++) = iStore ? iStore : getChildPtr(pNode, WORKING_VERSION, i);
iStore = 0;
}
}
if( iStore ){
*piChild = iStore;
}else{
*piChild = getChildPtr(pNode, WORKING_VERSION,
(pNode->aiKeyPtr[2] ? 3 : 2)
);
}
pCsr->iNode--;
rc = treeUpdatePtr(pDb, pCsr, iNew);
}
return rc;
}
static int treeInsertLeaf(
lsm_db *pDb,
TreeCursor *pCsr,
u32 iTreeKey,
int iSlot
){
int rc = LSM_OK;
TreeNode *pLeaf = pCsr->apTreeNode[pCsr->iNode];
TreeLeaf *pNew;
u32 iNew;
assert( iSlot>=0 && iSlot<=4 );
assert( pCsr->iNode>0 );
assert( pLeaf->aiKeyPtr[1] );
pCsr->iNode--;
pNew = newTreeLeaf(pDb, &iNew, &rc);
if( pNew ){
if( pLeaf->aiKeyPtr[0] && pLeaf->aiKeyPtr[2] ){
TreeLeaf *pRight;
u32 iRight;
pRight = newTreeLeaf(pDb, &iRight, &rc);
if( pRight ){
assert( rc==LSM_OK );
pNew->aiKeyPtr[1] = pLeaf->aiKeyPtr[0];
pRight->aiKeyPtr[1] = pLeaf->aiKeyPtr[2];
switch( iSlot ){
case 0: pNew->aiKeyPtr[0] = iTreeKey; break;
case 1: pNew->aiKeyPtr[2] = iTreeKey; break;
case 2: pRight->aiKeyPtr[0] = iTreeKey; break;
case 3: pRight->aiKeyPtr[2] = iTreeKey; break;
}
rc = treeInsert(pDb, pCsr, iNew, pLeaf->aiKeyPtr[1], iRight,
pCsr->aiCell[pCsr->iNode]
);
}
}else{
int iOut = 0;
int i;
for(i=0; i<4; i++){
if( i==iSlot ) pNew->aiKeyPtr[iOut++] = iTreeKey;
if( i<3 && pLeaf->aiKeyPtr[i] ){
pNew->aiKeyPtr[iOut++] = pLeaf->aiKeyPtr[i];
}
}
rc = treeUpdatePtr(pDb, pCsr, iNew);
}
}
return rc;
}
void lsmTreeMakeOld(lsm_db *pDb){
assert( pDb->iReader>=0 );
if( pDb->treehdr.iOldShmid==0 ){
pDb->treehdr.iOldLog = (pDb->treehdr.log.aRegion[2].iEnd << 1);
pDb->treehdr.iOldLog |= (~(pDb->pClient->iLogOff) & (i64)0x0001);
pDb->treehdr.oldcksum0 = pDb->treehdr.log.cksum0;
pDb->treehdr.oldcksum1 = pDb->treehdr.log.cksum1;
pDb->treehdr.iOldShmid = pDb->treehdr.iNextShmid-1;
memcpy(&pDb->treehdr.oldroot, &pDb->treehdr.root, sizeof(TreeRoot));
pDb->treehdr.root.iTransId = 1;
pDb->treehdr.root.iRoot = 0;
pDb->treehdr.root.nHeight = 0;
pDb->treehdr.root.nByte = 0;
}
}
void lsmTreeDiscardOld(lsm_db *pDb){
assert( lsmShmAssertLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL)
|| lsmShmAssertLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL)
);
pDb->treehdr.iUsedShmid = pDb->treehdr.iOldShmid;
pDb->treehdr.iOldShmid = 0;
}
int lsmTreeHasOld(lsm_db *pDb){
return pDb->treehdr.iOldShmid!=0;
}
int lsmTreeInit(lsm_db *pDb){
ShmChunk *pOne;
int rc = LSM_OK;
memset(&pDb->treehdr, 0, sizeof(TreeHeader));
pDb->treehdr.root.iTransId = 1;
pDb->treehdr.iFirst = 1;
pDb->treehdr.nChunk = 2;
pDb->treehdr.iWrite = LSM_SHM_CHUNK_SIZE + LSM_SHM_CHUNK_HDR;
pDb->treehdr.iNextShmid = 2;
pDb->treehdr.iUsedShmid = 1;
pOne = treeShmChunkRc(pDb, 1, &rc);
if( pOne ){
pOne->iNext = 0;
pOne->iShmid = 1;
}
return rc;
}
static void treeHeaderChecksum(
TreeHeader *pHdr,
u32 *aCksum
){
u32 cksum1 = 0x12345678;
u32 cksum2 = 0x9ABCDEF0;
u32 *a = (u32 *)pHdr;
int i;
assert( (offsetof(TreeHeader, aCksum) + sizeof(u32)*2)==sizeof(TreeHeader) );
assert( (sizeof(TreeHeader) % (sizeof(u32)*2))==0 );
for(i=0; i<(offsetof(TreeHeader, aCksum) / sizeof(u32)); i+=2){
cksum1 += a[i];
cksum2 += (cksum1 + a[i+1]);
}
aCksum[0] = cksum1;
aCksum[1] = cksum2;
}
static int treeHeaderChecksumOk(TreeHeader *pHdr){
u32 aCksum[2];
treeHeaderChecksum(pHdr, aCksum);
return (0==memcmp(aCksum, pHdr->aCksum, sizeof(aCksum)));
}
typedef struct ShmChunkLoc ShmChunkLoc;
struct ShmChunkLoc {
ShmChunk *pShm;
u32 iLoc;
};
static int treeCheckLinkedList(lsm_db *db){
int rc = LSM_OK;
int nVisit = 0;
ShmChunk *p;
p = treeShmChunkRc(db, db->treehdr.iFirst, &rc);
while( rc==LSM_OK && p ){
if( p->iNext ){
if( p->iNext>=db->treehdr.nChunk ){
rc = LSM_CORRUPT_BKPT;
}else{
ShmChunk *pNext = treeShmChunkRc(db, p->iNext, &rc);
if( rc==LSM_OK ){
if( pNext->iShmid!=p->iShmid+1 ){
rc = LSM_CORRUPT_BKPT;
}
p = pNext;
}
}
}else{
p = 0;
}
nVisit++;
}
if( rc==LSM_OK && (u32)nVisit!=db->treehdr.nChunk-1 ){
rc = LSM_CORRUPT_BKPT;
}
return rc;
}
static int treeRepairPtrs(lsm_db *db){
int rc = LSM_OK;
if( db->treehdr.root.nHeight>1 ){
TreeCursor csr;
u32 iTransId = db->treehdr.root.iTransId;
db->treehdr.root.nHeight--;
treeCursorInit(db, 0, &csr);
rc = lsmTreeCursorEnd(&csr, 0);
while( rc==LSM_OK && lsmTreeCursorValid(&csr) ){
TreeNode *pNode = csr.apTreeNode[csr.iNode];
if( pNode->iV2>iTransId ){
pNode->iV2Child = 0;
pNode->iV2Ptr = 0;
pNode->iV2 = 0;
}
rc = lsmTreeCursorNext(&csr);
}
tblobFree(csr.pDb, &csr.blob);
db->treehdr.root.nHeight++;
}
return rc;
}
static int treeRepairList(lsm_db *db){
int rc = LSM_OK;
int i;
ShmChunk *p;
ShmChunk *pMin = 0;
u32 iMin = 0;
for(i=1; rc==LSM_OK && (u32)i<db->treehdr.nChunk; i++){
p = treeShmChunkRc(db, i, &rc);
if( p && (pMin==0 || shm_sequence_ge(pMin->iShmid, p->iShmid)) ){
pMin = p;
iMin = i;
}
}
if( rc==LSM_OK ){
int nSort;
int nByte;
u32 iPrevShmid;
ShmChunkLoc *aSort;
nSort = 1;
while( (u32)nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2;
nByte = sizeof(ShmChunkLoc) * nSort * 2;
aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc);
iPrevShmid = pMin->iShmid;
if( rc==LSM_OK ){
iPrevShmid = pMin->iShmid-1;
for(i=1; (u32)i<db->treehdr.nChunk; i++){
p = treeShmChunk(db, i);
aSort[i-1].pShm = p;
aSort[i-1].iLoc = i;
if( (u32)i!=db->treehdr.iFirst ){
if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){
p->iShmid = iPrevShmid--;
}
}
}
if( iMin!=db->treehdr.iFirst ){
p = treeShmChunk(db, db->treehdr.iFirst);
p->iShmid = iPrevShmid;
}
}
if( rc==LSM_OK ){
ShmChunkLoc *aSpace = &aSort[nSort];
for(i=0; i<nSort; i++){
if( aSort[i].pShm ){
assert( shm_sequence_ge(aSort[i].pShm->iShmid, iPrevShmid) );
assert( aSpace[aSort[i].pShm->iShmid - iPrevShmid].pShm==0 );
aSpace[aSort[i].pShm->iShmid - iPrevShmid] = aSort[i];
}
}
if( aSpace[nSort-1].pShm ) aSpace[nSort-1].pShm->iNext = 0;
for(i=0; i<nSort-1; i++){
if( aSpace[i].pShm ){
aSpace[i].pShm->iNext = aSpace[i+1].iLoc;
}
}
rc = treeCheckLinkedList(db);
lsmFree(db->pEnv, aSort);
}
}
return rc;
}
int lsmTreeRepair(lsm_db *db){
int rc = LSM_OK;
TreeHeader hdr;
ShmHeader *pHdr = db->pShmhdr;
if( memcmp(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader)) ){
if( treeHeaderChecksumOk(&pHdr->hdr1) ){
memcpy(&pHdr->hdr2, &pHdr->hdr1, sizeof(TreeHeader));
}else{
memcpy(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader));
}
}
memcpy(&hdr, &db->treehdr, sizeof(TreeHeader));
rc = treeRepairPtrs(db);
if( rc==LSM_OK ){
rc = treeRepairList(db);
}
memcpy(&db->treehdr, &hdr, sizeof(TreeHeader));
return rc;
}
static void treeOverwriteKey(lsm_db *db, TreeCursor *pCsr, u32 iKey, int *pRc){
if( *pRc==LSM_OK ){
TreeRoot *p = &db->treehdr.root;
TreeNode *pNew;
u32 iNew;
TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
int iCell = pCsr->aiCell[pCsr->iNode];
if( (pCsr->iNode>0 && (u32)pCsr->iNode==(p->nHeight-1)) ){
pNew = copyTreeLeaf(db, (TreeLeaf *)pNode, &iNew, pRc);
}else{
pNew = copyTreeNode(db, pNode, &iNew, pRc);
}
if( pNew ){
pNew->aiKeyPtr[iCell] = iKey;
pCsr->iNode--;
treeUpdatePtr(db, pCsr, iNew);
}
}
}
static int treeNextIsEndDelete(lsm_db *db, TreeCursor *pCsr){
int iNode = pCsr->iNode;
int iCell = pCsr->aiCell[iNode]+1;
assert( (u32)pCsr->iNode==(db->treehdr.root.nHeight-1) );
while( iNode>=0 ){
TreeNode *pNode = pCsr->apTreeNode[iNode];
if( iCell<3 && pNode->aiKeyPtr[iCell] ){
int rc = LSM_OK;
TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
assert( rc==LSM_OK );
return ((pKey->flags & LSM_END_DELETE) ? 1 : 0);
}
iNode--;
iCell = pCsr->aiCell[iNode];
}
return 0;
}
static int treePrevIsStartDelete(lsm_db *db, TreeCursor *pCsr){
int iNode = pCsr->iNode;
assert( (u32)pCsr->iNode==(db->treehdr.root.nHeight-1) );
while( iNode>=0 ){
TreeNode *pNode = pCsr->apTreeNode[iNode];
int iCell = pCsr->aiCell[iNode]-1;
if( iCell>=0 && pNode->aiKeyPtr[iCell] ){
int rc = LSM_OK;
TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
assert( rc==LSM_OK );
return ((pKey->flags & LSM_START_DELETE) ? 1 : 0);
}
iNode--;
}
return 0;
}
static int treeInsertEntry(
lsm_db *pDb,
int flags,
void *pKey,
int nKey,
void *pVal,
int nVal
){
int rc = LSM_OK;
TreeKey *pTreeKey;
u32 iTreeKey;
TreeRoot *p = &pDb->treehdr.root;
TreeCursor csr;
int res = 0;
assert( nVal>=0 || pVal==0 );
assert_tree_looks_ok(LSM_OK, pTree);
assert( flags==LSM_INSERT || flags==LSM_POINT_DELETE
|| flags==LSM_START_DELETE || flags==LSM_END_DELETE
);
assert( (flags & LSM_CONTIGUOUS)==0 );
#if 0#endif
if( p->iRoot ){
TreeKey *pRes;
treeCursorInit(pDb, 0, &csr);
rc = lsmTreeCursorSeek(&csr, pKey, nKey, &res);
pRes = csrGetKey(&csr, &csr.blob, &rc);
if( rc!=LSM_OK ) return rc;
assert( pRes );
if( flags==LSM_START_DELETE ){
if( (res<=0 && (pRes->flags & LSM_START_DELETE))
|| (res>0 && treePrevIsStartDelete(pDb, &csr))
){
goto insert_entry_out;
}
}else if( flags==LSM_END_DELETE ){
if( (res<0 && treeNextIsEndDelete(pDb, &csr))
|| (res>=0 && (pRes->flags & LSM_END_DELETE))
){
goto insert_entry_out;
}
}
if( res==0 && (flags & (LSM_END_DELETE|LSM_START_DELETE)) ){
if( pRes->flags & LSM_INSERT ){
nVal = pRes->nValue;
pVal = TKV_VAL(pRes);
}
flags = flags | pRes->flags;
}
if( flags & (LSM_INSERT|LSM_POINT_DELETE) ){
if( (res<0 && (pRes->flags & LSM_START_DELETE))
|| (res>0 && (pRes->flags & LSM_END_DELETE))
){
flags = flags | (LSM_END_DELETE|LSM_START_DELETE);
}else if( res==0 ){
flags = flags | (pRes->flags & (LSM_END_DELETE|LSM_START_DELETE));
}
}
}else{
memset(&csr, 0, sizeof(TreeCursor));
}
pTreeKey = newTreeKey(pDb, &iTreeKey, pKey, nKey, pVal, nVal, &rc);
if( rc!=LSM_OK ) return rc;
assert( pTreeKey->flags==0 || pTreeKey->flags==LSM_CONTIGUOUS );
pTreeKey->flags |= flags;
if( p->iRoot==0 ){
TreeNode *pRoot = newTreeNode(pDb, &p->iRoot, &rc);
if( rc==LSM_OK ){
assert( p->nHeight==0 );
pRoot->aiKeyPtr[1] = iTreeKey;
p->nHeight = 1;
}
}else{
if( res==0 ){
treeOverwriteKey(pDb, &csr, iTreeKey, &rc);
}else{
int iSlot = csr.aiCell[csr.iNode] + (res<0);
if( csr.iNode==0 ){
rc = treeInsert(pDb, &csr, 0, iTreeKey, 0, iSlot);
}else{
rc = treeInsertLeaf(pDb, &csr, iTreeKey, iSlot);
}
}
}
#if 0#endif
insert_entry_out:
tblobFree(pDb, &csr.blob);
assert_tree_looks_ok(rc, pTree);
return rc;
}
int lsmTreeInsert(
lsm_db *pDb,
void *pKey,
int nKey,
void *pVal,
int nVal
){
int flags;
if( nVal<0 ){
flags = LSM_POINT_DELETE;
}else{
flags = LSM_INSERT;
}
return treeInsertEntry(pDb, flags, pKey, nKey, pVal, nVal);
}
static int treeDeleteEntry(lsm_db *db, TreeCursor *pCsr, u32 iNewptr){
TreeRoot *p = &db->treehdr.root;
TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
int iSlot = pCsr->aiCell[pCsr->iNode];
int bLeaf;
int rc = LSM_OK;
assert( pNode->aiKeyPtr[1] );
assert( pNode->aiKeyPtr[iSlot] );
assert( iSlot==0 || iSlot==1 || iSlot==2 );
assert( ((u32)pCsr->iNode==(db->treehdr.root.nHeight-1))==(iNewptr==0) );
bLeaf = ((u32)pCsr->iNode==(p->nHeight-1) && p->nHeight>1);
if( pNode->aiKeyPtr[0] || pNode->aiKeyPtr[2] ){
TreeNode *pNew;
u32 iNew;
if( bLeaf ){
pNew = (TreeNode *)newTreeLeaf(db, &iNew, &rc);
}else{
pNew = newTreeNode(db, &iNew, &rc);
}
if( pNew ){
int i;
int iOut = 1;
for(i=0; i<4; i++){
if( i==iSlot ){
i++;
if( bLeaf==0 ) pNew->aiChildPtr[iOut] = iNewptr;
if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i];
iOut++;
}else if( bLeaf || p->nHeight==1 ){
if( i<3 && pNode->aiKeyPtr[i] ){
pNew->aiKeyPtr[iOut++] = pNode->aiKeyPtr[i];
}
}else{
if( getChildPtr(pNode, WORKING_VERSION, i) ){
pNew->aiChildPtr[iOut] = getChildPtr(pNode, WORKING_VERSION, i);
if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i];
iOut++;
}
}
}
assert( iOut<=4 );
assert( bLeaf || pNew->aiChildPtr[0]==0 );
pCsr->iNode--;
rc = treeUpdatePtr(db, pCsr, iNew);
}
}else if( pCsr->iNode==0 ){
assert( iSlot==1 );
db->treehdr.root.iRoot = iNewptr;
db->treehdr.root.nHeight--;
}else{
TreeNode *pParent;
int iPSlot;
u32 iPeer;
int iDir;
TreeNode *pPeer;
TreeNode *pNew1; u32 iNew1;
assert( iSlot==1 );
pParent = pCsr->apTreeNode[pCsr->iNode-1];
iPSlot = pCsr->aiCell[pCsr->iNode-1];
if( iPSlot>0 && getChildPtr(pParent, WORKING_VERSION, iPSlot-1) ){
iDir = -1;
}else{
iDir = +1;
}
iPeer = getChildPtr(pParent, WORKING_VERSION, iPSlot+iDir);
pPeer = (TreeNode *)treeShmptr(db, iPeer);
assertIsWorkingChild(db, pNode, pParent, iPSlot);
if( bLeaf ){
pNew1 = (TreeNode *)newTreeLeaf(db, &iNew1, &rc);
}else{
pNew1 = (TreeNode *)newTreeNode(db, &iNew1, &rc);
}
if( pPeer->aiKeyPtr[0] && pPeer->aiKeyPtr[2] ){
TreeNode *pNew2; u32 iNew2;
TreeNode *pNewP; u32 iNewP;
if( bLeaf ){
pNew2 = (TreeNode *)newTreeLeaf(db, &iNew2, &rc);
}else{
pNew2 = (TreeNode *)newTreeNode(db, &iNew2, &rc);
}
pNewP = copyTreeNode(db, pParent, &iNewP, &rc);
if( iDir==-1 ){
pNew1->aiKeyPtr[1] = pPeer->aiKeyPtr[0];
if( bLeaf==0 ){
pNew1->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 0);
pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 1);
}
pNewP->aiChildPtr[iPSlot-1] = iNew1;
pNewP->aiKeyPtr[iPSlot-1] = pPeer->aiKeyPtr[1];
pNewP->aiChildPtr[iPSlot] = iNew2;
pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[2];
pNew2->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot-1];
if( bLeaf==0 ){
pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 2);
pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 3);
pNew2->aiChildPtr[2] = iNewptr;
}
}else{
pNew1->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot];
if( bLeaf==0 ){
pNew1->aiChildPtr[1] = iNewptr;
pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 0);
}
pNewP->aiChildPtr[iPSlot] = iNew1;
pNewP->aiKeyPtr[iPSlot] = pPeer->aiKeyPtr[0];
pNewP->aiChildPtr[iPSlot+1] = iNew2;
pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[1];
pNew2->aiKeyPtr[1] = pPeer->aiKeyPtr[2];
if( bLeaf==0 ){
pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 1);
pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 2);
pNew2->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 3);
}
}
assert( pCsr->iNode>=1 );
pCsr->iNode -= 2;
if( rc==LSM_OK ){
assert( pNew1->aiKeyPtr[1] && pNew2->aiKeyPtr[1] );
rc = treeUpdatePtr(db, pCsr, iNewP);
}
}else{
int iKOut = 0;
int iPOut = 0;
int i;
pCsr->iNode--;
if( iDir==1 ){
pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
}
for(i=0; i<3; i++){
if( pPeer->aiKeyPtr[i] ){
pNew1->aiKeyPtr[iKOut++] = pPeer->aiKeyPtr[i];
}
}
if( bLeaf==0 ){
for(i=0; i<4; i++){
if( getChildPtr(pPeer, WORKING_VERSION, i) ){
pNew1->aiChildPtr[iPOut++] = getChildPtr(pPeer, WORKING_VERSION, i);
}
}
}
if( iDir==-1 ){
iPSlot--;
pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
pCsr->aiCell[pCsr->iNode] = (u8)iPSlot;
}
rc = treeDeleteEntry(db, pCsr, iNew1);
}
}
return rc;
}
int lsmTreeDelete(
lsm_db *db,
void *pKey1, int nKey1,
void *pKey2, int nKey2
){
int rc = LSM_OK;
int bDone = 0;
TreeRoot *p = &db->treehdr.root;
TreeBlob blob = {0, 0};
assert( treeKeycmp(pKey1, nKey1, pKey2, nKey2)<0 );
assert( assert_delete_ranges_match(db) );
#if 0#endif
while( bDone==0 && rc==LSM_OK ){
int res;
TreeCursor csr;
void *pDel; int nDel;
#ifndef NDEBUG
int nEntry = treeCountEntries(db);
#endif
treeCursorInit(db, 0, &csr);
lsmTreeCursorSeek(&csr, pKey1, nKey1, &res);
if( res<=0 && lsmTreeCursorValid(&csr) ) lsmTreeCursorNext(&csr);
bDone = 1;
if( lsmTreeCursorValid(&csr) ){
lsmTreeCursorKey(&csr, 0, &pDel, &nDel);
if( treeKeycmp(pDel, nDel, pKey2, nKey2)<0 ) bDone = 0;
}
if( bDone==0 ){
if( (u32)csr.iNode==(p->nHeight-1) ){
rc = treeDeleteEntry(db, &csr, 0);
}else{
u32 iKey;
TreeKey *pKey;
int iNode = csr.iNode;
lsmTreeCursorNext(&csr);
assert( (u32)csr.iNode==(p->nHeight-1) );
iKey = csr.apTreeNode[csr.iNode]->aiKeyPtr[csr.aiCell[csr.iNode]];
lsmTreeCursorPrev(&csr);
treeOverwriteKey(db, &csr, iKey, &rc);
pKey = treeShmkey(db, iKey, TKV_LOADKEY, &blob, &rc);
if( pKey ){
rc = lsmTreeCursorSeek(&csr, TKV_KEY(pKey), pKey->nKey, &res);
}
if( rc==LSM_OK ){
assert( res==0 && csr.iNode==iNode );
rc = lsmTreeCursorNext(&csr);
if( rc==LSM_OK ){
rc = treeDeleteEntry(db, &csr, 0);
}
}
}
}
tblobFree(db, &csr.blob);
#if 0#endif
assert( bDone || treeCountEntries(db)==(nEntry-1) );
}
#if 0#endif
if( rc==LSM_OK ){
rc = treeInsertEntry(db, LSM_START_DELETE, pKey1, nKey1, 0, -1);
}
#if 0#endif
if( rc==LSM_OK ){
rc = treeInsertEntry(db, LSM_END_DELETE, pKey2, nKey2, 0, -1);
}
#if 0#endif
tblobFree(db, &blob);
assert( assert_delete_ranges_match(db) );
return rc;
}
int lsmTreeSize(lsm_db *pDb){
return pDb->treehdr.root.nByte;
}
int lsmTreeCursorNew(lsm_db *pDb, int bOld, TreeCursor **ppCsr){
TreeCursor *pCsr;
*ppCsr = pCsr = lsmMalloc(pDb->pEnv, sizeof(TreeCursor));
if( pCsr ){
treeCursorInit(pDb, bOld, pCsr);
return LSM_OK;
}
return LSM_NOMEM_BKPT;
}
void lsmTreeCursorDestroy(TreeCursor *pCsr){
if( pCsr ){
tblobFree(pCsr->pDb, &pCsr->blob);
lsmFree(pCsr->pDb->pEnv, pCsr);
}
}
void lsmTreeCursorReset(TreeCursor *pCsr){
if( pCsr ){
pCsr->iNode = -1;
pCsr->pSave = 0;
}
}
#ifndef NDEBUG
static int treeCsrCompare(TreeCursor *pCsr, void *pKey, int nKey, int *pRc){
TreeKey *p;
int cmp = 0;
assert( pCsr->iNode>=0 );
p = csrGetKey(pCsr, &pCsr->blob, pRc);
if( p ){
cmp = treeKeycmp(TKV_KEY(p), p->nKey, pKey, nKey);
}
return cmp;
}
#endif
int lsmTreeCursorSeek(TreeCursor *pCsr, void *pKey, int nKey, int *pRes){
int rc = LSM_OK;
lsm_db *pDb = pCsr->pDb;
TreeRoot *pRoot = pCsr->pRoot;
u32 iNodePtr;
treeCursorRestore(pCsr, 0);
iNodePtr = pRoot->iRoot;
if( iNodePtr==0 ){
assert( rc!=LSM_OK || pRoot->iRoot==0 );
*pRes = -1;
pCsr->iNode = -1;
}else{
TreeBlob b = {0, 0};
int res = 0;
int iNode = -1;
while( iNodePtr ){
TreeNode *pNode;
int iTest;
u32 iTreeKey;
TreeKey *pTreeKey;
pNode = (TreeNode *)treeShmptrUnsafe(pDb, iNodePtr);
iNode++;
pCsr->apTreeNode[iNode] = pNode;
pTreeKey = (TreeKey*)treeShmptrUnsafe(pDb, pNode->aiKeyPtr[1]);
if( !(pTreeKey->flags & LSM_CONTIGUOUS) ){
pTreeKey = treeShmkey(pDb, pNode->aiKeyPtr[1], TKV_LOADKEY, &b, &rc);
if( rc!=LSM_OK ) break;
}
res = treeKeycmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey);
if( res==0 ){
pCsr->aiCell[iNode] = 1;
break;
}
iTest = (res>0 ? 0 : 2);
iTreeKey = pNode->aiKeyPtr[iTest];
if( iTreeKey ){
pTreeKey = (TreeKey*)treeShmptrUnsafe(pDb, iTreeKey);
if( !(pTreeKey->flags & LSM_CONTIGUOUS) ){
pTreeKey = treeShmkey(pDb, iTreeKey, TKV_LOADKEY, &b, &rc);
if( rc ) break;
}
res = treeKeycmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey);
if( res==0 ){
pCsr->aiCell[iNode] = (u8)iTest;
break;
}
}else{
iTest = 1;
}
if( (u32)iNode<(pRoot->nHeight-1) ){
iNodePtr = getChildPtr(pNode, pRoot->iTransId, iTest + (res<0));
}else{
iNodePtr = 0;
}
pCsr->aiCell[iNode] = (u8)(iTest + (iNodePtr && (res<0)));
}
*pRes = res;
pCsr->iNode = iNode;
tblobFree(pDb, &b);
}
#ifndef NDEBUG
if( rc==LSM_OK && lsmTreeCursorValid(pCsr) ){
int cmp = treeCsrCompare(pCsr, pKey, nKey, &rc);
assert( rc!=LSM_OK || *pRes==cmp || (*pRes ^ cmp)>0 );
}
#endif
return rc;
}
int lsmTreeCursorNext(TreeCursor *pCsr){
#ifndef NDEBUG
TreeKey *pK1;
TreeBlob key1 = {0, 0};
#endif
lsm_db *pDb = pCsr->pDb;
TreeRoot *pRoot = pCsr->pRoot;
const int iLeaf = pRoot->nHeight-1;
int iCell;
int rc = LSM_OK;
TreeNode *pNode;
int iRestore = 0;
treeCursorRestore(pCsr, &iRestore);
if( iRestore>0 ) return LSM_OK;
#ifndef NDEBUG
pK1 = csrGetKey(pCsr, &key1, &rc);
if( rc!=LSM_OK ) return rc;
#endif
assert( lsmTreeCursorValid(pCsr) );
assert( pCsr->aiCell[pCsr->iNode]<3 );
pNode = pCsr->apTreeNode[pCsr->iNode];
iCell = ++pCsr->aiCell[pCsr->iNode];
if( pCsr->iNode<iLeaf && getChildPtr(pNode, pRoot->iTransId, iCell) ){
do {
u32 iNodePtr;
pCsr->iNode++;
iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
pCsr->apTreeNode[pCsr->iNode] = pNode;
iCell = pCsr->aiCell[pCsr->iNode] = (pNode->aiKeyPtr[0]==0);
}while( pCsr->iNode < iLeaf );
}
else if( iCell>=3 || pNode->aiKeyPtr[iCell]==0 ){
while( (--pCsr->iNode)>=0 ){
iCell = pCsr->aiCell[pCsr->iNode];
if( iCell<3 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
}
}
#ifndef NDEBUG
if( pCsr->iNode>=0 ){
TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
assert( rc||treeKeycmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)>=0 );
}
tblobFree(pDb, &key1);
#endif
return rc;
}
int lsmTreeCursorPrev(TreeCursor *pCsr){
#ifndef NDEBUG
TreeKey *pK1;
TreeBlob key1 = {0, 0};
#endif
lsm_db *pDb = pCsr->pDb;
TreeRoot *pRoot = pCsr->pRoot;
const int iLeaf = pRoot->nHeight-1;
int iCell;
int rc = LSM_OK;
TreeNode *pNode;
int iRestore = 0;
treeCursorRestore(pCsr, &iRestore);
if( iRestore<0 ) return LSM_OK;
#ifndef NDEBUG
pK1 = csrGetKey(pCsr, &key1, &rc);
if( rc!=LSM_OK ) return rc;
#endif
assert( lsmTreeCursorValid(pCsr) );
pNode = pCsr->apTreeNode[pCsr->iNode];
iCell = pCsr->aiCell[pCsr->iNode];
assert( iCell>=0 && iCell<3 );
if( pCsr->iNode<iLeaf && getChildPtr(pNode, pRoot->iTransId, iCell) ){
do {
u32 iNodePtr;
pCsr->iNode++;
iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
if( rc!=LSM_OK ) break;
pCsr->apTreeNode[pCsr->iNode] = pNode;
iCell = 1 + (pNode->aiKeyPtr[2]!=0) + (pCsr->iNode < iLeaf);
pCsr->aiCell[pCsr->iNode] = (u8)iCell;
}while( pCsr->iNode < iLeaf );
}
else{
do {
iCell = pCsr->aiCell[pCsr->iNode]-1;
if( iCell>=0 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
}while( (--pCsr->iNode)>=0 );
pCsr->aiCell[pCsr->iNode] = (u8)iCell;
}
#ifndef NDEBUG
if( pCsr->iNode>=0 ){
TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
assert( rc || treeKeycmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)<0 );
}
tblobFree(pDb, &key1);
#endif
return rc;
}
int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast){
lsm_db *pDb = pCsr->pDb;
TreeRoot *pRoot = pCsr->pRoot;
int rc = LSM_OK;
u32 iNodePtr;
pCsr->iNode = -1;
treeCursorRestore(pCsr, 0);
iNodePtr = pRoot->iRoot;
while( iNodePtr ){
int iCell;
TreeNode *pNode;
pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
if( rc ) break;
if( bLast ){
iCell = ((pNode->aiKeyPtr[2]==0) ? 2 : 3);
}else{
iCell = ((pNode->aiKeyPtr[0]==0) ? 1 : 0);
}
pCsr->iNode++;
pCsr->apTreeNode[pCsr->iNode] = pNode;
if( (u32)pCsr->iNode<pRoot->nHeight-1 ){
iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
}else{
iNodePtr = 0;
}
pCsr->aiCell[pCsr->iNode] = (u8)(iCell - (iNodePtr==0 && bLast));
}
return rc;
}
int lsmTreeCursorFlags(TreeCursor *pCsr){
int flags = 0;
if( pCsr && pCsr->iNode>=0 ){
int rc = LSM_OK;
TreeKey *pKey = (TreeKey *)treeShmptrUnsafe(pCsr->pDb,
pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]]
);
assert( rc==LSM_OK );
flags = (pKey->flags & ~LSM_CONTIGUOUS);
}
return flags;
}
int lsmTreeCursorKey(TreeCursor *pCsr, int *pFlags, void **ppKey, int *pnKey){
TreeKey *pTreeKey;
int rc = LSM_OK;
assert( lsmTreeCursorValid(pCsr) );
pTreeKey = pCsr->pSave;
if( !pTreeKey ){
pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
}
if( rc==LSM_OK ){
*pnKey = pTreeKey->nKey;
if( pFlags ) *pFlags = pTreeKey->flags;
*ppKey = (void *)&pTreeKey[1];
}
return rc;
}
int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal){
int res = 0;
int rc;
rc = treeCursorRestore(pCsr, &res);
if( res==0 ){
TreeKey *pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
if( rc==LSM_OK ){
if( pTreeKey->flags & LSM_INSERT ){
*pnVal = pTreeKey->nValue;
*ppVal = TKV_VAL(pTreeKey);
}else{
*ppVal = 0;
*pnVal = -1;
}
}
}else{
*ppVal = 0;
*pnVal = 0;
}
return rc;
}
int lsmTreeCursorValid(TreeCursor *pCsr){
return (pCsr && (pCsr->pSave || pCsr->iNode>=0));
}
void lsmTreeMark(lsm_db *pDb, TreeMark *pMark){
pMark->iRoot = pDb->treehdr.root.iRoot;
pMark->nHeight = pDb->treehdr.root.nHeight;
pMark->iWrite = pDb->treehdr.iWrite;
pMark->nChunk = pDb->treehdr.nChunk;
pMark->iNextShmid = pDb->treehdr.iNextShmid;
pMark->iRollback = intArraySize(&pDb->rollback);
}
void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark){
int iIdx;
int nIdx;
u32 iNext;
ShmChunk *pChunk;
u32 iChunk;
u32 iShmid;
nIdx = intArraySize(&pDb->rollback);
for(iIdx = pMark->iRollback; iIdx<nIdx; iIdx++){
TreeNode *pNode;
pNode = treeShmptr(pDb, intArrayEntry(&pDb->rollback, iIdx));
assert( pNode );
pNode->iV2 = 0;
pNode->iV2Child = 0;
pNode->iV2Ptr = 0;
}
intArrayTruncate(&pDb->rollback, pMark->iRollback);
assert( pMark->iWrite!=0 );
iChunk = treeOffsetToChunk(pMark->iWrite-1);
pChunk = treeShmChunk(pDb, iChunk);
iNext = pChunk->iNext;
pChunk->iNext = 0;
pChunk = treeShmChunk(pDb, pDb->treehdr.iFirst);
iShmid = pChunk->iShmid-1;
while( iNext ){
u32 iFree = iNext;
ShmChunk *pFree;
pFree = treeShmChunk(pDb, iFree);
iNext = pFree->iNext;
if( iFree<pMark->nChunk ){
pFree->iNext = pDb->treehdr.iFirst;
pFree->iShmid = iShmid--;
pDb->treehdr.iFirst = iFree;
}
}
pDb->treehdr.root.iRoot = pMark->iRoot;
pDb->treehdr.root.nHeight = pMark->nHeight;
pDb->treehdr.iWrite = pMark->iWrite;
pDb->treehdr.nChunk = pMark->nChunk;
pDb->treehdr.iNextShmid = pMark->iNextShmid;
}
int lsmTreeLoadHeader(lsm_db *pDb, int *piRead){
int nRem = LSM_ATTEMPTS_BEFORE_PROTOCOL;
while( (nRem--)>0 ){
ShmHeader *pShm = pDb->pShmhdr;
memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader));
if( treeHeaderChecksumOk(&pDb->treehdr) ){
if( piRead ) *piRead = 1;
return LSM_OK;
}
memcpy(&pDb->treehdr, &pShm->hdr2, sizeof(TreeHeader));
if( treeHeaderChecksumOk(&pDb->treehdr) ){
if( piRead ) *piRead = 2;
return LSM_OK;
}
lsmShmBarrier(pDb);
}
return LSM_PROTOCOL_BKPT;
}
int lsmTreeLoadHeaderOk(lsm_db *pDb, int iRead){
TreeHeader *p = (iRead==1) ? &pDb->pShmhdr->hdr1 : &pDb->pShmhdr->hdr2;
assert( iRead==1 || iRead==2 );
return (0==memcmp(pDb->treehdr.aCksum, p->aCksum, sizeof(u32)*2));
}
int lsmTreeEndTransaction(lsm_db *pDb, int bCommit){
ShmHeader *pShm = pDb->pShmhdr;
treeHeaderChecksum(&pDb->treehdr, pDb->treehdr.aCksum);
memcpy(&pShm->hdr2, &pDb->treehdr, sizeof(TreeHeader));
lsmShmBarrier(pDb);
memcpy(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader));
pShm->bWriter = 0;
intArrayFree(pDb->pEnv, &pDb->rollback);
return LSM_OK;
}
#ifndef NDEBUG
static int assert_delete_ranges_match(lsm_db *db){
int prev = 0;
TreeBlob blob = {0, 0};
TreeCursor csr;
int rc;
treeCursorInit(db, 0, &csr);
for( rc = lsmTreeCursorEnd(&csr, 0);
rc==LSM_OK && lsmTreeCursorValid(&csr);
rc = lsmTreeCursorNext(&csr)
){
TreeKey *pKey = csrGetKey(&csr, &blob, &rc);
if( rc!=LSM_OK ) break;
assert( ((prev&LSM_START_DELETE)==0)==((pKey->flags&LSM_END_DELETE)==0) );
prev = pKey->flags;
}
tblobFree(csr.pDb, &csr.blob);
tblobFree(csr.pDb, &blob);
return 1;
}
static int treeCountEntries(lsm_db *db){
TreeCursor csr;
int rc;
int nEntry = 0;
treeCursorInit(db, 0, &csr);
for( rc = lsmTreeCursorEnd(&csr, 0);
rc==LSM_OK && lsmTreeCursorValid(&csr);
rc = lsmTreeCursorNext(&csr)
){
nEntry++;
}
tblobFree(csr.pDb, &csr.blob);
return nEntry;
}
#endif