#ifndef _LSM_INT_H
# include "lsmInt.h"
#endif
#define LSM_LOG_STRUCTURE 0
#define LSM_LOG_DATA 0
#define rtTopic(eType) ((eType) & LSM_SYSTEMKEY)
#define rtIsDelete(eType) (((eType) & 0x0F)==LSM_POINT_DELETE)
#define rtIsSeparator(eType) (((eType) & LSM_SEPARATOR)!=0)
#define rtIsWrite(eType) (((eType) & LSM_INSERT)!=0)
#define rtIsSystem(eType) (((eType) & LSM_SYSTEMKEY)!=0)
#define SEGMENT_NRECORD_OFFSET(pgsz) ((pgsz) - 2)
#define SEGMENT_FLAGS_OFFSET(pgsz) ((pgsz) - 2 - 2)
#define SEGMENT_POINTER_OFFSET(pgsz) ((pgsz) - 2 - 2 - 8)
#define SEGMENT_CELLPTR_OFFSET(pgsz, iCell) ((pgsz) - 2 - 2 - 8 - 2 - (iCell)*2)
#define SEGMENT_EOF(pgsz, nEntry) SEGMENT_CELLPTR_OFFSET(pgsz, nEntry-1)
#define SEGMENT_BTREE_FLAG 0x0001
#define PGFTR_SKIP_NEXT_FLAG 0x0002
#define PGFTR_SKIP_THIS_FLAG 0x0004
#ifndef LSM_SEGMENTPTR_FREE_THRESHOLD
# define LSM_SEGMENTPTR_FREE_THRESHOLD 1024
#endif
typedef struct SegmentPtr SegmentPtr;
typedef struct LsmBlob LsmBlob;
struct LsmBlob {
lsm_env *pEnv;
void *pData;
int nData;
int nAlloc;
};
struct SegmentPtr {
Level *pLevel;
Segment *pSeg;
Page *pPg;
u16 flags;
int nCell;
LsmPgno iPtr;
int iCell;
int eType;
LsmPgno iPgPtr;
void *pKey; int nKey;
void *pVal; int nVal;
LsmBlob blob1;
LsmBlob blob2;
};
typedef struct BtreePg BtreePg;
typedef struct BtreeCursor BtreeCursor;
struct BtreePg {
Page *pPage;
int iCell;
};
struct BtreeCursor {
Segment *pSeg;
FileSystem *pFS;
int nDepth;
int iPg;
BtreePg *aPg;
void *pKey;
int nKey;
int eType;
LsmPgno iPtr;
LsmBlob blob;
};
struct MultiCursor {
lsm_db *pDb;
MultiCursor *pNext;
int flags;
int eType;
LsmBlob key;
LsmBlob val;
TreeCursor *apTreeCsr[2];
int iFree;
SegmentPtr *aPtr;
int nPtr;
BtreeCursor *pBtCsr;
int nTree;
int *aTree;
void *pSystemVal;
LsmPgno *pPrevMergePtr;
};
#define CURSOR_DATA_TREE0 0
#define CURSOR_DATA_TREE1 1
#define CURSOR_DATA_SYSTEM 2
#define CURSOR_DATA_SEGMENT 3
#define CURSOR_IGNORE_DELETE 0x00000001
#define CURSOR_FLUSH_FREELIST 0x00000002
#define CURSOR_IGNORE_SYSTEM 0x00000010
#define CURSOR_NEXT_OK 0x00000020
#define CURSOR_PREV_OK 0x00000040
#define CURSOR_READ_SEPARATORS 0x00000080
#define CURSOR_SEEK_EQ 0x00000100
typedef struct MergeWorker MergeWorker;
typedef struct Hierarchy Hierarchy;
struct Hierarchy {
Page **apHier;
int nHier;
};
struct MergeWorker {
lsm_db *pDb;
Level *pLevel;
MultiCursor *pCsr;
int bFlush;
Hierarchy hier;
Page *pPage;
int nWork;
LsmPgno *aGobble;
LsmPgno iIndirect;
struct SavedPgno {
LsmPgno iPgno;
int bStore;
} aSave[2];
};
#ifdef LSM_DEBUG_EXPENSIVE
static int assertPointersOk(lsm_db *, Segment *, Segment *, int);
static int assertBtreeOk(lsm_db *, Segment *);
static void assertRunInOrder(lsm_db *pDb, Segment *pSeg);
#else
#define assertRunInOrder(x,y)
#define assertBtreeOk(x,y)
#endif
struct FilePage { u8 *aData; int nData; };
static u8 *fsPageData(Page *pPg, int *pnData){
*pnData = ((struct FilePage *)(pPg))->nData;
return ((struct FilePage *)(pPg))->aData;
}
void lsmPutU16(u8 *aOut, u16 nVal){
aOut[0] = (u8)((nVal>>8) & 0xFF);
aOut[1] = (u8)(nVal & 0xFF);
}
void lsmPutU32(u8 *aOut, u32 nVal){
aOut[0] = (u8)((nVal>>24) & 0xFF);
aOut[1] = (u8)((nVal>>16) & 0xFF);
aOut[2] = (u8)((nVal>> 8) & 0xFF);
aOut[3] = (u8)((nVal ) & 0xFF);
}
int lsmGetU16(u8 *aOut){
return (aOut[0] << 8) + aOut[1];
}
u32 lsmGetU32(u8 *aOut){
return ((u32)aOut[0] << 24)
+ ((u32)aOut[1] << 16)
+ ((u32)aOut[2] << 8)
+ ((u32)aOut[3]);
}
u64 lsmGetU64(u8 *aOut){
return ((u64)aOut[0] << 56)
+ ((u64)aOut[1] << 48)
+ ((u64)aOut[2] << 40)
+ ((u64)aOut[3] << 32)
+ ((u64)aOut[4] << 24)
+ ((u32)aOut[5] << 16)
+ ((u32)aOut[6] << 8)
+ ((u32)aOut[7]);
}
void lsmPutU64(u8 *aOut, u64 nVal){
aOut[0] = (u8)((nVal>>56) & 0xFF);
aOut[1] = (u8)((nVal>>48) & 0xFF);
aOut[2] = (u8)((nVal>>40) & 0xFF);
aOut[3] = (u8)((nVal>>32) & 0xFF);
aOut[4] = (u8)((nVal>>24) & 0xFF);
aOut[5] = (u8)((nVal>>16) & 0xFF);
aOut[6] = (u8)((nVal>> 8) & 0xFF);
aOut[7] = (u8)((nVal ) & 0xFF);
}
static int sortedBlobGrow(lsm_env *pEnv, LsmBlob *pBlob, int nData){
assert( pBlob->pEnv==pEnv || (pBlob->pEnv==0 && pBlob->pData==0) );
if( pBlob->nAlloc<nData ){
pBlob->pData = lsmReallocOrFree(pEnv, pBlob->pData, nData);
if( !pBlob->pData ) return LSM_NOMEM_BKPT;
pBlob->nAlloc = nData;
pBlob->pEnv = pEnv;
}
return LSM_OK;
}
static int sortedBlobSet(lsm_env *pEnv, LsmBlob *pBlob, void *pData, int nData){
if( sortedBlobGrow(pEnv, pBlob, nData) ) return LSM_NOMEM;
memcpy(pBlob->pData, pData, nData);
pBlob->nData = nData;
return LSM_OK;
}
#if 0#endif
static void sortedBlobFree(LsmBlob *pBlob){
assert( pBlob->pEnv || pBlob->pData==0 );
if( pBlob->pData ) lsmFree(pBlob->pEnv, pBlob->pData);
memset(pBlob, 0, sizeof(LsmBlob));
}
static int sortedReadData(
Segment *pSeg,
Page *pPg,
int iOff,
int nByte,
void **ppData,
LsmBlob *pBlob
){
int rc = LSM_OK;
int iEnd;
int nData;
int nCell;
u8 *aData;
aData = fsPageData(pPg, &nData);
nCell = lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
iEnd = SEGMENT_EOF(nData, nCell);
assert( iEnd>0 && iEnd<nData );
if( iOff+nByte<=iEnd ){
*ppData = (void *)&aData[iOff];
}else{
int nRem = nByte;
int i = iOff;
u8 *aDest;
rc = sortedBlobGrow(lsmPageEnv(pPg), pBlob, nByte);
if( rc!=LSM_OK ) return rc;
pBlob->nData = nByte;
aDest = (u8 *)pBlob->pData;
*ppData = pBlob->pData;
lsmFsPageRef(pPg);
while( rc==LSM_OK ){
Page *pNext;
int flags;
int nCopy = LSM_MIN(nRem, iEnd-i);
if( nCopy>0 ){
memcpy(&aDest[nByte-nRem], &aData[i], nCopy);
nRem -= nCopy;
i += nCopy;
assert( nRem==0 || i==iEnd );
}
assert( nRem>=0 );
if( nRem==0 ) break;
i -= iEnd;
do {
rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
if( rc==LSM_OK && pNext==0 ){
rc = LSM_CORRUPT_BKPT;
}
if( rc ) break;
lsmFsPageRelease(pPg);
pPg = pNext;
aData = fsPageData(pPg, &nData);
flags = lsmGetU16(&aData[SEGMENT_FLAGS_OFFSET(nData)]);
}while( flags&SEGMENT_BTREE_FLAG );
iEnd = SEGMENT_EOF(nData, lsmGetU16(&aData[nData-2]));
assert( iEnd>0 && iEnd<nData );
}
lsmFsPageRelease(pPg);
}
return rc;
}
static int pageGetNRec(u8 *aData, int nData){
return (int)lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
}
static LsmPgno pageGetPtr(u8 *aData, int nData){
return (LsmPgno)lsmGetU64(&aData[SEGMENT_POINTER_OFFSET(nData)]);
}
static int pageGetFlags(u8 *aData, int nData){
return (int)lsmGetU16(&aData[SEGMENT_FLAGS_OFFSET(nData)]);
}
static u8 *pageGetCell(u8 *aData, int nData, int iCell){
return &aData[lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, iCell)])];
}
static int pageObjGetNRec(Page *pPg){
int nData;
u8 *aData = lsmFsPageData(pPg, &nData);
return pageGetNRec(aData, nData);
}
static LsmPgno pageGetRecordPtr(u8 *aData, int nData, int iCell){
LsmPgno iRet;
u8 *aCell;
assert( iCell<pageGetNRec(aData, nData) && iCell>=0 );
aCell = pageGetCell(aData, nData, iCell);
lsmVarintGet64(&aCell[1], &iRet);
return iRet;
}
static u8 *pageGetKey(
Segment *pSeg,
Page *pPg,
int iCell,
int *piTopic,
int *pnKey,
LsmBlob *pBlob
){
u8 *pKey;
i64 nDummy;
int eType;
u8 *aData;
int nData;
aData = fsPageData(pPg, &nData);
assert( !(pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) );
assert( iCell<pageGetNRec(aData, nData) );
pKey = pageGetCell(aData, nData, iCell);
eType = *pKey++;
pKey += lsmVarintGet64(pKey, &nDummy);
pKey += lsmVarintGet32(pKey, pnKey);
if( rtIsWrite(eType) ){
pKey += lsmVarintGet64(pKey, &nDummy);
}
*piTopic = rtTopic(eType);
sortedReadData(pSeg, pPg, pKey-aData, *pnKey, (void **)&pKey, pBlob);
return pKey;
}
static int pageGetKeyCopy(
lsm_env *pEnv,
Segment *pSeg,
Page *pPg,
int iCell,
int *piTopic,
LsmBlob *pBlob
){
int rc = LSM_OK;
int nKey;
u8 *aKey;
aKey = pageGetKey(pSeg, pPg, iCell, piTopic, &nKey, pBlob);
assert( (void *)aKey!=pBlob->pData || nKey==pBlob->nData );
if( (void *)aKey!=pBlob->pData ){
rc = sortedBlobSet(pEnv, pBlob, aKey, nKey);
}
return rc;
}
static LsmPgno pageGetBtreeRef(Page *pPg, int iKey){
LsmPgno iRef;
u8 *aData;
int nData;
u8 *aCell;
aData = fsPageData(pPg, &nData);
aCell = pageGetCell(aData, nData, iKey);
assert( aCell[0]==0 );
aCell++;
aCell += lsmVarintGet64(aCell, &iRef);
lsmVarintGet64(aCell, &iRef);
assert( iRef>0 );
return iRef;
}
#define GETVARINT64(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet64((a), &(i)))
#define GETVARINT32(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet32((a), &(i)))
static int pageGetBtreeKey(
Segment *pSeg,
Page *pPg,
int iKey,
LsmPgno *piPtr,
int *piTopic,
void **ppKey,
int *pnKey,
LsmBlob *pBlob
){
u8 *aData;
int nData;
u8 *aCell;
int eType;
aData = fsPageData(pPg, &nData);
assert( SEGMENT_BTREE_FLAG & pageGetFlags(aData, nData) );
assert( iKey>=0 && iKey<pageGetNRec(aData, nData) );
aCell = pageGetCell(aData, nData, iKey);
eType = *aCell++;
aCell += GETVARINT64(aCell, *piPtr);
if( eType==0 ){
int rc;
LsmPgno iRef;
Page *pRef;
aCell += GETVARINT64(aCell, iRef);
rc = lsmFsDbPageGet(lsmPageFS(pPg), pSeg, iRef, &pRef);
if( rc!=LSM_OK ) return rc;
pageGetKeyCopy(lsmPageEnv(pPg), pSeg, pRef, 0, &eType, pBlob);
lsmFsPageRelease(pRef);
*ppKey = pBlob->pData;
*pnKey = pBlob->nData;
}else{
aCell += GETVARINT32(aCell, *pnKey);
*ppKey = aCell;
}
if( piTopic ) *piTopic = rtTopic(eType);
return LSM_OK;
}
static int btreeCursorLoadKey(BtreeCursor *pCsr){
int rc = LSM_OK;
if( pCsr->iPg<0 ){
pCsr->pKey = 0;
pCsr->nKey = 0;
pCsr->eType = 0;
}else{
LsmPgno dummy;
int iPg = pCsr->iPg;
int iCell = pCsr->aPg[iPg].iCell;
while( iCell<0 && (--iPg)>=0 ){
iCell = pCsr->aPg[iPg].iCell-1;
}
if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT;
rc = pageGetBtreeKey(
pCsr->pSeg,
pCsr->aPg[iPg].pPage, iCell,
&dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
);
pCsr->eType |= LSM_SEPARATOR;
}
return rc;
}
static LsmPgno btreeCursorPtr(u8 *aData, int nData, int iCell){
int nCell;
nCell = pageGetNRec(aData, nData);
if( iCell>=nCell ){
return pageGetPtr(aData, nData);
}
return pageGetRecordPtr(aData, nData, iCell);
}
static int btreeCursorNext(BtreeCursor *pCsr){
int rc = LSM_OK;
BtreePg *pPg = &pCsr->aPg[pCsr->iPg];
int nCell;
u8 *aData;
int nData;
assert( pCsr->iPg>=0 );
assert( pCsr->iPg==pCsr->nDepth-1 );
aData = fsPageData(pPg->pPage, &nData);
nCell = pageGetNRec(aData, nData);
assert( pPg->iCell<=nCell );
pPg->iCell++;
if( pPg->iCell==nCell ){
LsmPgno iLoad;
lsmFsPageRelease(pPg->pPage);
pPg->pPage = 0;
pCsr->iPg--;
while( pCsr->iPg>=0 ){
pPg = &pCsr->aPg[pCsr->iPg];
aData = fsPageData(pPg->pPage, &nData);
if( pPg->iCell<pageGetNRec(aData, nData) ) break;
lsmFsPageRelease(pPg->pPage);
pCsr->iPg--;
}
rc = btreeCursorLoadKey(pCsr);
if( pCsr->iPg>=0 ){
pCsr->aPg[pCsr->iPg].iCell++;
iLoad = btreeCursorPtr(aData, nData, pPg->iCell);
do {
Page *pLoad;
pCsr->iPg++;
rc = lsmFsDbPageGet(pCsr->pFS, pCsr->pSeg, iLoad, &pLoad);
pCsr->aPg[pCsr->iPg].pPage = pLoad;
pCsr->aPg[pCsr->iPg].iCell = 0;
if( rc==LSM_OK ){
if( pCsr->iPg==(pCsr->nDepth-1) ) break;
aData = fsPageData(pLoad, &nData);
iLoad = btreeCursorPtr(aData, nData, 0);
}
}while( rc==LSM_OK && pCsr->iPg<(pCsr->nDepth-1) );
pCsr->aPg[pCsr->iPg].iCell = -1;
}
}else{
rc = btreeCursorLoadKey(pCsr);
}
if( rc==LSM_OK && pCsr->iPg>=0 ){
aData = fsPageData(pCsr->aPg[pCsr->iPg].pPage, &nData);
pCsr->iPtr = btreeCursorPtr(aData, nData, pCsr->aPg[pCsr->iPg].iCell+1);
}
return rc;
}
static void btreeCursorFree(BtreeCursor *pCsr){
if( pCsr ){
int i;
lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
for(i=0; i<=pCsr->iPg; i++){
lsmFsPageRelease(pCsr->aPg[i].pPage);
}
sortedBlobFree(&pCsr->blob);
lsmFree(pEnv, pCsr->aPg);
lsmFree(pEnv, pCsr);
}
}
static int btreeCursorFirst(BtreeCursor *pCsr){
int rc;
Page *pPg = 0;
FileSystem *pFS = pCsr->pFS;
LsmPgno iPg = pCsr->pSeg->iRoot;
do {
rc = lsmFsDbPageGet(pFS, pCsr->pSeg, iPg, &pPg);
assert( (rc==LSM_OK)==(pPg!=0) );
if( rc==LSM_OK ){
u8 *aData;
int nData;
int flags;
aData = fsPageData(pPg, &nData);
flags = pageGetFlags(aData, nData);
if( (flags & SEGMENT_BTREE_FLAG)==0 ) break;
if( (pCsr->nDepth % 8)==0 ){
int nNew = pCsr->nDepth + 8;
pCsr->aPg = (BtreePg *)lsmReallocOrFreeRc(
lsmFsEnv(pFS), pCsr->aPg, sizeof(BtreePg) * nNew, &rc
);
if( rc==LSM_OK ){
memset(&pCsr->aPg[pCsr->nDepth], 0, sizeof(BtreePg) * 8);
}
}
if( rc==LSM_OK ){
assert( pCsr->aPg[pCsr->nDepth].iCell==0 );
pCsr->aPg[pCsr->nDepth].pPage = pPg;
pCsr->nDepth++;
iPg = pageGetRecordPtr(aData, nData, 0);
}
}
}while( rc==LSM_OK );
lsmFsPageRelease(pPg);
pCsr->iPg = pCsr->nDepth-1;
if( rc==LSM_OK && pCsr->nDepth ){
pCsr->aPg[pCsr->iPg].iCell = -1;
rc = btreeCursorNext(pCsr);
}
return rc;
}
static void btreeCursorPosition(BtreeCursor *pCsr, MergeInput *p){
if( pCsr->iPg>=0 ){
p->iPg = lsmFsPageNumber(pCsr->aPg[pCsr->iPg].pPage);
p->iCell = ((pCsr->aPg[pCsr->iPg].iCell + 1) << 8) + pCsr->nDepth;
}else{
p->iPg = 0;
p->iCell = 0;
}
}
static void btreeCursorSplitkey(BtreeCursor *pCsr, MergeInput *p){
int iCell = pCsr->aPg[pCsr->iPg].iCell;
if( iCell>=0 ){
p->iCell = iCell;
p->iPg = lsmFsPageNumber(pCsr->aPg[pCsr->iPg].pPage);
}else{
int i;
for(i=pCsr->iPg-1; i>=0; i--){
if( pCsr->aPg[i].iCell>0 ) break;
}
assert( i>=0 );
p->iCell = pCsr->aPg[i].iCell-1;
p->iPg = lsmFsPageNumber(pCsr->aPg[i].pPage);
}
}
static int sortedKeyCompare(
int (*xCmp)(void *, int, void *, int),
int iLhsTopic, void *pLhsKey, int nLhsKey,
int iRhsTopic, void *pRhsKey, int nRhsKey
){
int res = iLhsTopic - iRhsTopic;
if( res==0 ){
res = xCmp(pLhsKey, nLhsKey, pRhsKey, nRhsKey);
}
return res;
}
static int btreeCursorRestore(
BtreeCursor *pCsr,
int (*xCmp)(void *, int, void *, int),
MergeInput *p
){
int rc = LSM_OK;
if( p->iPg ){
lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
int iCell;
LsmPgno iLeaf;
int nDepth;
Segment *pSeg = pCsr->pSeg;
iLeaf = p->iPg;
nDepth = (p->iCell & 0x00FF);
iCell = (p->iCell >> 8) - 1;
assert( pCsr->aPg==0 );
pCsr->aPg = (BtreePg *)lsmMallocZeroRc(pEnv, sizeof(BtreePg) * nDepth, &rc);
if( rc==LSM_OK ){
Page **pp = &pCsr->aPg[nDepth-1].pPage;
pCsr->iPg = nDepth-1;
pCsr->nDepth = nDepth;
pCsr->aPg[pCsr->iPg].iCell = iCell;
rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLeaf, pp);
}
if( rc==LSM_OK && nDepth>1 ){
LsmBlob blob = {0,0,0};
void *pSeek;
int nSeek;
int iTopicSeek;
int iPg = 0;
LsmPgno iLoad = pSeg->iRoot;
Page *pPg = pCsr->aPg[nDepth-1].pPage;
if( pageObjGetNRec(pPg)==0 ){
assert( iCell==-1 );
iTopicSeek = 1000;
pSeek = 0;
nSeek = 0;
}else{
LsmPgno dummy;
rc = pageGetBtreeKey(pSeg, pPg,
0, &dummy, &iTopicSeek, &pSeek, &nSeek, &pCsr->blob
);
}
do {
Page *pPg2;
rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLoad, &pPg2);
assert( rc==LSM_OK || pPg2==0 );
if( rc==LSM_OK ){
u8 *aData;
int nData;
int iMin;
int iMax;
int iCell2;
aData = fsPageData(pPg2, &nData);
assert( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) );
iLoad = pageGetPtr(aData, nData);
iCell2 = pageGetNRec(aData, nData);
iMax = iCell2-1;
iMin = 0;
while( iMax>=iMin ){
int iTry = (iMin+iMax)/2;
void *pKey; int nKey;
int iTopic;
LsmPgno iPtr;
int res;
rc = pageGetBtreeKey(
pSeg, pPg2, iTry, &iPtr, &iTopic, &pKey, &nKey, &blob
);
if( rc!=LSM_OK ) break;
res = sortedKeyCompare(
xCmp, iTopicSeek, pSeek, nSeek, iTopic, pKey, nKey
);
assert( res!=0 );
if( res<0 ){
iLoad = iPtr;
iCell2 = iTry;
iMax = iTry-1;
}else{
iMin = iTry+1;
}
}
pCsr->aPg[iPg].pPage = pPg2;
pCsr->aPg[iPg].iCell = iCell2;
iPg++;
assert( iPg!=nDepth-1
|| lsmFsRedirectPage(pCsr->pFS, pSeg->pRedirect, iLoad)==iLeaf
);
}
}while( rc==LSM_OK && iPg<(nDepth-1) );
sortedBlobFree(&blob);
}
if( rc==LSM_OK ){
BtreePg *pBtreePg;
u8 *aData;
int nData;
pBtreePg = &pCsr->aPg[pCsr->iPg];
aData = fsPageData(pBtreePg->pPage, &nData);
pCsr->iPtr = btreeCursorPtr(aData, nData, pBtreePg->iCell+1);
if( pBtreePg->iCell<0 ){
LsmPgno dummy;
int i;
for(i=pCsr->iPg-1; i>=0; i--){
if( pCsr->aPg[i].iCell>0 ) break;
}
assert( i>=0 );
rc = pageGetBtreeKey(pSeg,
pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1,
&dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
);
pCsr->eType |= LSM_SEPARATOR;
}else{
rc = btreeCursorLoadKey(pCsr);
}
}
}
return rc;
}
static int btreeCursorNew(
lsm_db *pDb,
Segment *pSeg,
BtreeCursor **ppCsr
){
int rc = LSM_OK;
BtreeCursor *pCsr;
assert( pSeg->iRoot );
pCsr = lsmMallocZeroRc(pDb->pEnv, sizeof(BtreeCursor), &rc);
if( pCsr ){
pCsr->pFS = pDb->pFS;
pCsr->pSeg = pSeg;
pCsr->iPg = -1;
}
*ppCsr = pCsr;
return rc;
}
static void segmentPtrSetPage(SegmentPtr *pPtr, Page *pNext){
lsmFsPageRelease(pPtr->pPg);
if( pNext ){
int nData;
u8 *aData = fsPageData(pNext, &nData);
pPtr->nCell = pageGetNRec(aData, nData);
pPtr->flags = (u16)pageGetFlags(aData, nData);
pPtr->iPtr = pageGetPtr(aData, nData);
}
pPtr->pPg = pNext;
}
static int segmentPtrLoadPage(
FileSystem *pFS,
SegmentPtr *pPtr,
LsmPgno iNew
){
Page *pPg = 0;
int rc;
rc = lsmFsDbPageGet(pFS, pPtr->pSeg, iNew, &pPg);
assert( rc==LSM_OK || pPg==0 );
segmentPtrSetPage(pPtr, pPg);
return rc;
}
static int segmentPtrReadData(
SegmentPtr *pPtr,
int iOff,
int nByte,
void **ppData,
LsmBlob *pBlob
){
return sortedReadData(pPtr->pSeg, pPtr->pPg, iOff, nByte, ppData, pBlob);
}
static int segmentPtrNextPage(
SegmentPtr *pPtr,
int eDir
){
Page *pNext;
int rc;
assert( eDir==1 || eDir==-1 );
assert( pPtr->pPg );
assert( pPtr->pSeg || eDir>0 );
rc = lsmFsDbPageNext(pPtr->pSeg, pPtr->pPg, eDir, &pNext);
assert( rc==LSM_OK || pNext==0 );
segmentPtrSetPage(pPtr, pNext);
return rc;
}
static int segmentPtrLoadCell(
SegmentPtr *pPtr,
int iNew
){
int rc = LSM_OK;
if( pPtr->pPg ){
u8 *aData;
int iOff;
int nPgsz;
assert( iNew<pPtr->nCell );
pPtr->iCell = iNew;
aData = fsPageData(pPtr->pPg, &nPgsz);
iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nPgsz, pPtr->iCell)]);
pPtr->eType = aData[iOff];
iOff++;
iOff += GETVARINT64(&aData[iOff], pPtr->iPgPtr);
iOff += GETVARINT32(&aData[iOff], pPtr->nKey);
if( rtIsWrite(pPtr->eType) ){
iOff += GETVARINT32(&aData[iOff], pPtr->nVal);
}
assert( pPtr->nKey>=0 );
rc = segmentPtrReadData(
pPtr, iOff, pPtr->nKey, &pPtr->pKey, &pPtr->blob1
);
if( rc==LSM_OK && rtIsWrite(pPtr->eType) ){
rc = segmentPtrReadData(
pPtr, iOff+pPtr->nKey, pPtr->nVal, &pPtr->pVal, &pPtr->blob2
);
}else{
pPtr->nVal = 0;
pPtr->pVal = 0;
}
}
return rc;
}
static Segment *sortedSplitkeySegment(Level *pLevel){
Merge *pMerge = pLevel->pMerge;
MergeInput *p = &pMerge->splitkey;
Segment *pSeg;
int i;
for(i=0; i<pMerge->nInput; i++){
if( p->iPg==pMerge->aInput[i].iPg ) break;
}
if( pMerge->nInput==(pLevel->nRight+1) && i>=(pMerge->nInput-1) ){
pSeg = &pLevel->pNext->lhs;
}else{
pSeg = &pLevel->aRhs[i];
}
return pSeg;
}
static void sortedSplitkey(lsm_db *pDb, Level *pLevel, int *pRc){
Segment *pSeg;
Page *pPg = 0;
lsm_env *pEnv = pDb->pEnv;
int rc = *pRc;
Merge *pMerge = pLevel->pMerge;
pSeg = sortedSplitkeySegment(pLevel);
if( rc==LSM_OK ){
rc = lsmFsDbPageGet(pDb->pFS, pSeg, pMerge->splitkey.iPg, &pPg);
}
if( rc==LSM_OK ){
int iTopic;
LsmBlob blob = {0, 0, 0, 0};
u8 *aData;
int nData;
aData = lsmFsPageData(pPg, &nData);
if( pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG ){
void *pKey;
int nKey;
LsmPgno dummy;
rc = pageGetBtreeKey(pSeg,
pPg, pMerge->splitkey.iCell, &dummy, &iTopic, &pKey, &nKey, &blob
);
if( rc==LSM_OK && blob.pData!=pKey ){
rc = sortedBlobSet(pEnv, &blob, pKey, nKey);
}
}else{
rc = pageGetKeyCopy(
pEnv, pSeg, pPg, pMerge->splitkey.iCell, &iTopic, &blob
);
}
pLevel->iSplitTopic = iTopic;
pLevel->pSplitKey = blob.pData;
pLevel->nSplitKey = blob.nData;
lsmFsPageRelease(pPg);
}
*pRc = rc;
}
static void segmentPtrReset(SegmentPtr *pPtr, int nThreshold){
lsmFsPageRelease(pPtr->pPg);
pPtr->pPg = 0;
pPtr->nCell = 0;
pPtr->pKey = 0;
pPtr->nKey = 0;
pPtr->pVal = 0;
pPtr->nVal = 0;
pPtr->eType = 0;
pPtr->iCell = 0;
if( pPtr->blob1.nAlloc>=nThreshold ) sortedBlobFree(&pPtr->blob1);
if( pPtr->blob2.nAlloc>=nThreshold ) sortedBlobFree(&pPtr->blob2);
}
static int segmentPtrIgnoreSeparators(MultiCursor *pCsr, SegmentPtr *pPtr){
return (pCsr->flags & CURSOR_READ_SEPARATORS)==0
|| (pPtr!=&pCsr->aPtr[pCsr->nPtr-1]);
}
static int segmentPtrAdvance(
MultiCursor *pCsr,
SegmentPtr *pPtr,
int bReverse
){
int eDir = (bReverse ? -1 : 1);
Level *pLvl = pPtr->pLevel;
do {
int rc;
int iCell;
int svFlags = 0;
iCell = pPtr->iCell + eDir;
assert( pPtr->pPg );
assert( iCell<=pPtr->nCell && iCell>=-1 );
if( bReverse && pPtr->pSeg!=&pPtr->pLevel->lhs ){
svFlags = pPtr->eType;
assert( svFlags );
}
if( iCell>=pPtr->nCell || iCell<0 ){
do {
rc = segmentPtrNextPage(pPtr, eDir);
}while( rc==LSM_OK
&& pPtr->pPg
&& (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG) )
);
if( rc!=LSM_OK ) return rc;
iCell = bReverse ? (pPtr->nCell-1) : 0;
}
rc = segmentPtrLoadCell(pPtr, iCell);
if( rc!=LSM_OK ) return rc;
if( svFlags && pPtr->pPg ){
int res = sortedKeyCompare(pCsr->pDb->xCmp,
rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
);
if( res<0 ) segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
}
if( pPtr->pPg==0 && (svFlags & LSM_END_DELETE) ){
Segment *pSeg = pPtr->pSeg;
rc = lsmFsDbPageGet(pCsr->pDb->pFS, pSeg, pSeg->iFirst, &pPtr->pPg);
if( rc!=LSM_OK ) return rc;
pPtr->eType = LSM_START_DELETE | LSM_POINT_DELETE;
pPtr->eType |= (pLvl->iSplitTopic ? LSM_SYSTEMKEY : 0);
pPtr->pKey = pLvl->pSplitKey;
pPtr->nKey = pLvl->nSplitKey;
}
}while( pCsr
&& pPtr->pPg
&& segmentPtrIgnoreSeparators(pCsr, pPtr)
&& rtIsSeparator(pPtr->eType)
);
return LSM_OK;
}
static void segmentPtrEndPage(
FileSystem *pFS,
SegmentPtr *pPtr,
int bLast,
int *pRc
){
if( *pRc==LSM_OK ){
Segment *pSeg = pPtr->pSeg;
Page *pNew = 0;
if( bLast ){
*pRc = lsmFsDbPageLast(pFS, pSeg, &pNew);
}else{
*pRc = lsmFsDbPageGet(pFS, pSeg, pSeg->iFirst, &pNew);
}
segmentPtrSetPage(pPtr, pNew);
}
}
static int segmentPtrEnd(MultiCursor *pCsr, SegmentPtr *pPtr, int bLast){
Level *pLvl = pPtr->pLevel;
int rc = LSM_OK;
FileSystem *pFS = pCsr->pDb->pFS;
int bIgnore;
segmentPtrEndPage(pFS, pPtr, bLast, &rc);
while( rc==LSM_OK && pPtr->pPg
&& (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG))
){
rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1));
}
if( rc==LSM_OK && pPtr->pPg ){
rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0);
if( rc==LSM_OK && bLast && pPtr->pSeg!=&pLvl->lhs ){
int res = sortedKeyCompare(pCsr->pDb->xCmp,
rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
);
if( res<0 ) segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
}
}
bIgnore = segmentPtrIgnoreSeparators(pCsr, pPtr);
if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){
rc = segmentPtrAdvance(pCsr, pPtr, bLast);
}
#if 0#endif
return rc;
}
static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){
assert( pPtr->pPg );
*ppKey = pPtr->pKey;
*pnKey = pPtr->nKey;
}
#if 0#endif
#if 0#endif
#ifndef NDEBUG
static int assertSeekResult(
MultiCursor *pCsr,
SegmentPtr *pPtr,
int iTopic,
void *pKey,
int nKey,
int eSeek
){
if( pPtr->pPg ){
int res;
res = sortedKeyCompare(pCsr->pDb->xCmp, iTopic, pKey, nKey,
rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey
);
if( eSeek==LSM_SEEK_EQ ) return (res==0);
if( eSeek==LSM_SEEK_LE ) return (res>=0);
if( eSeek==LSM_SEEK_GE ) return (res<=0);
}
return 1;
}
#endif
static int segmentPtrSearchOversized(
MultiCursor *pCsr,
SegmentPtr *pPtr,
int iTopic,
void *pKey, int nKey
){
int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
int rc = LSM_OK;
while( rc==LSM_OK && (pPtr->flags & PGFTR_SKIP_NEXT_FLAG) ){
u8 *pLastKey;
int nLastKey;
int iLastTopic;
int res;
Page *pNext;
pLastKey = pageGetKey(pPtr->pSeg,
pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1
);
res = sortedKeyCompare(
xCmp, iLastTopic, pLastKey, nLastKey, iTopic, pKey, nKey
);
if( res>=0 ) break;
pNext = pPtr->pPg;
lsmFsPageRef(pNext);
while( 1 ){
Page *pLoad;
u8 *aData; int nData;
rc = lsmFsDbPageNext(pPtr->pSeg, pNext, 1, &pLoad);
lsmFsPageRelease(pNext);
pNext = pLoad;
if( pNext==0 ) break;
assert( rc==LSM_OK );
aData = lsmFsPageData(pNext, &nData);
if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0
&& pageGetNRec(aData, nData)>0
){
break;
}
}
if( pNext==0 ) break;
segmentPtrSetPage(pPtr, pNext);
assert( rc!=LSM_OK || (pPtr->flags & PGFTR_SKIP_THIS_FLAG) );
}
return rc;
}
static int ptrFwdPointer(
Page *pPage,
int iCell,
Segment *pSeg,
LsmPgno *piPtr,
int *pbFound
){
Page *pPg = pPage;
int iFirst = iCell;
int rc = LSM_OK;
do {
Page *pNext = 0;
u8 *aData;
int nData;
aData = lsmFsPageData(pPg, &nData);
if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0 ){
int i;
int nCell = pageGetNRec(aData, nData);
for(i=iFirst; i<nCell; i++){
u8 eType = *pageGetCell(aData, nData, i);
if( (eType & LSM_START_DELETE)==0 ){
*pbFound = 1;
*piPtr = pageGetRecordPtr(aData, nData, i) + pageGetPtr(aData, nData);
lsmFsPageRelease(pPg);
return LSM_OK;
}
}
}
rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
lsmFsPageRelease(pPg);
pPg = pNext;
iFirst = 0;
}while( pPg && rc==LSM_OK );
lsmFsPageRelease(pPg);
*pbFound = 0;
return rc;
}
static int sortedRhsFirst(MultiCursor *pCsr, Level *pLvl, SegmentPtr *pPtr){
int rc;
rc = segmentPtrEnd(pCsr, pPtr, 0);
while( pPtr->pPg && rc==LSM_OK ){
int res = sortedKeyCompare(pCsr->pDb->xCmp,
pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey,
rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey
);
if( res<=0 ) break;
rc = segmentPtrAdvance(pCsr, pPtr, 0);
}
return rc;
}
static int segmentPtrFwdPointer(
MultiCursor *pCsr,
SegmentPtr *pPtr,
LsmPgno *piPtr
){
Level *pLvl = pPtr->pLevel;
Level *pNext = pLvl->pNext;
Page *pPg = pPtr->pPg;
int rc;
int bFound;
LsmPgno iOut = 0;
if( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[pLvl->nRight-1] ){
if( pNext==0
|| (pNext->nRight==0 && pNext->lhs.iRoot)
|| (pNext->nRight!=0 && pNext->aRhs[0].iRoot)
){
return LSM_OK;
}
}else{
if( pPtr[1].pSeg->iRoot ){
return LSM_OK;
}
}
lsmFsPageRef(pPg);
rc = ptrFwdPointer(pPg, pPtr->iCell, pPtr->pSeg, &iOut, &bFound);
if( rc==LSM_OK && bFound==0 ){
SegmentPtr ptr;
if( pPtr->pLevel->nRight==0 || pPtr->pSeg!=&pPtr->pLevel->lhs ){
return LSM_CORRUPT_BKPT;
}
memset(&ptr, 0, sizeof(SegmentPtr));
ptr.pLevel = pPtr->pLevel;
ptr.pSeg = &ptr.pLevel->aRhs[ptr.pLevel->nRight-1];
rc = sortedRhsFirst(pCsr, ptr.pLevel, &ptr);
if( rc==LSM_OK ){
rc = ptrFwdPointer(ptr.pPg, ptr.iCell, ptr.pSeg, &iOut, &bFound);
ptr.pPg = 0;
}
segmentPtrReset(&ptr, 0);
}
*piPtr = iOut;
return rc;
}
static int segmentPtrSeek(
MultiCursor *pCsr,
SegmentPtr *pPtr,
int iTopic,
void *pKey, int nKey,
int eSeek,
LsmPgno *piPtr,
int *pbStop
){
int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
int res = 0;
int rc = LSM_OK;
int iMin;
int iMax;
LsmPgno iPtrOut = 0;
rc = segmentPtrSearchOversized(pCsr, pPtr, iTopic, pKey, nKey);
iPtrOut = pPtr->iPtr;
#if 0#endif
assert( pPtr->nCell>0
|| pPtr->pSeg->nSize==1
|| lsmFsDbPageIsLast(pPtr->pSeg, pPtr->pPg)
);
if( pPtr->nCell==0 ){
segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
}else{
iMin = 0;
iMax = pPtr->nCell-1;
while( 1 ){
int iTry = (iMin+iMax)/2;
void *pKeyT; int nKeyT;
int iTopicT;
assert( iTry<iMax || iMin==iMax );
rc = segmentPtrLoadCell(pPtr, iTry);
if( rc!=LSM_OK ) break;
segmentPtrKey(pPtr, &pKeyT, &nKeyT);
iTopicT = rtTopic(pPtr->eType);
res = sortedKeyCompare(xCmp, iTopicT, pKeyT, nKeyT, iTopic, pKey, nKey);
if( res<=0 ){
iPtrOut = pPtr->iPtr + pPtr->iPgPtr;
}
if( res==0 || iMin==iMax ){
break;
}else if( res>0 ){
iMax = LSM_MAX(iTry-1, iMin);
}else{
iMin = iTry+1;
}
}
if( rc==LSM_OK ){
assert( res==0 || (iMin==iMax && iMin>=0 && iMin<pPtr->nCell) );
if( res ){
rc = segmentPtrLoadCell(pPtr, iMin);
}
assert( rc!=LSM_OK || res>0 || iPtrOut==(pPtr->iPtr + pPtr->iPgPtr) );
if( rc==LSM_OK ){
switch( eSeek ){
case LSM_SEEK_EQ: {
int eType = pPtr->eType;
if( (res<0 && (eType & LSM_START_DELETE))
|| (res>0 && (eType & LSM_END_DELETE))
|| (res==0 && (eType & LSM_POINT_DELETE))
){
*pbStop = 1;
}else if( res==0 && (eType & LSM_INSERT) ){
lsm_env *pEnv = pCsr->pDb->pEnv;
*pbStop = 1;
pCsr->eType = pPtr->eType;
rc = sortedBlobSet(pEnv, &pCsr->key, pPtr->pKey, pPtr->nKey);
if( rc==LSM_OK ){
rc = sortedBlobSet(pEnv, &pCsr->val, pPtr->pVal, pPtr->nVal);
}
pCsr->flags |= CURSOR_SEEK_EQ;
}
segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
break;
}
case LSM_SEEK_LE:
if( res>0 ) rc = segmentPtrAdvance(pCsr, pPtr, 1);
break;
case LSM_SEEK_GE: {
if( (res<=0 && (pPtr->eType & LSM_START_DELETE))
|| (res>0 && (pPtr->eType & LSM_END_DELETE))
){
rc = segmentPtrFwdPointer(pCsr, pPtr, &iPtrOut);
}
if( res<0 && rc==LSM_OK ){
rc = segmentPtrAdvance(pCsr, pPtr, 0);
}
break;
}
}
}
}
if( rc==LSM_OK && pPtr->pPg
&& segmentPtrIgnoreSeparators(pCsr, pPtr)
&& rtIsSeparator(pPtr->eType)
){
assert( eSeek!=LSM_SEEK_EQ );
rc = segmentPtrAdvance(pCsr, pPtr, eSeek==LSM_SEEK_LE);
}
}
assert( rc!=LSM_OK || assertSeekResult(pCsr,pPtr,iTopic,pKey,nKey,eSeek) );
*piPtr = iPtrOut;
return rc;
}
static int seekInBtree(
MultiCursor *pCsr,
Segment *pSeg,
int iTopic,
void *pKey, int nKey,
LsmPgno *aPg,
Page **ppPg
){
int i = 0;
int rc;
LsmPgno iPg;
Page *pPg = 0;
LsmBlob blob = {0, 0, 0};
iPg = pSeg->iRoot;
do {
LsmPgno *piFirst = 0;
if( aPg ){
aPg[i++] = iPg;
piFirst = &aPg[i];
}
rc = lsmFsDbPageGet(pCsr->pDb->pFS, pSeg, iPg, &pPg);
assert( rc==LSM_OK || pPg==0 );
if( rc==LSM_OK ){
u8 *aData;
int nData;
int iMin;
int iMax;
int nRec;
int flags;
aData = fsPageData(pPg, &nData);
flags = pageGetFlags(aData, nData);
if( (flags & SEGMENT_BTREE_FLAG)==0 ) break;
iPg = pageGetPtr(aData, nData);
nRec = pageGetNRec(aData, nData);
iMin = 0;
iMax = nRec-1;
while( iMax>=iMin ){
int iTry = (iMin+iMax)/2;
void *pKeyT; int nKeyT;
int iTopicT;
LsmPgno iPtr;
int res;
rc = pageGetBtreeKey(
pSeg, pPg, iTry, &iPtr, &iTopicT, &pKeyT, &nKeyT, &blob
);
if( rc!=LSM_OK ) break;
if( piFirst && pKeyT==blob.pData ){
*piFirst = pageGetBtreeRef(pPg, iTry);
piFirst = 0;
i++;
}
res = sortedKeyCompare(
pCsr->pDb->xCmp, iTopic, pKey, nKey, iTopicT, pKeyT, nKeyT
);
if( res<0 ){
iPg = iPtr;
iMax = iTry-1;
}else{
iMin = iTry+1;
}
}
lsmFsPageRelease(pPg);
pPg = 0;
}
}while( rc==LSM_OK );
sortedBlobFree(&blob);
assert( (rc==LSM_OK)==(pPg!=0) );
if( ppPg ){
*ppPg = pPg;
}else{
lsmFsPageRelease(pPg);
}
return rc;
}
static int seekInSegment(
MultiCursor *pCsr,
SegmentPtr *pPtr,
int iTopic,
void *pKey, int nKey,
LsmPgno iPg,
int eSeek,
LsmPgno *piPtr,
int *pbStop
){
LsmPgno iPtr = iPg;
int rc = LSM_OK;
if( pPtr->pSeg->iRoot ){
Page *pPg;
assert( pPtr->pSeg->iRoot!=0 );
rc = seekInBtree(pCsr, pPtr->pSeg, iTopic, pKey, nKey, 0, &pPg);
if( rc==LSM_OK ) segmentPtrSetPage(pPtr, pPg);
}else{
if( iPtr==0 ){
iPtr = pPtr->pSeg->iFirst;
}
if( rc==LSM_OK ){
rc = segmentPtrLoadPage(pCsr->pDb->pFS, pPtr, iPtr);
}
}
if( rc==LSM_OK ){
rc = segmentPtrSeek(pCsr, pPtr, iTopic, pKey, nKey, eSeek, piPtr, pbStop);
}
return rc;
}
static int seekInLevel(
MultiCursor *pCsr,
SegmentPtr *aPtr,
int eSeek,
int iTopic,
void *pKey, int nKey,
LsmPgno *piPgno,
int *pbStop
){
Level *pLvl = aPtr[0].pLevel;
int rc = LSM_OK;
LsmPgno iOut = 0;
int res = -1;
int nRhs = pLvl->nRight;
int bStop = 0;
if( nRhs ){
res = sortedKeyCompare(pCsr->pDb->xCmp, iTopic, pKey, nKey,
pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
);
}
if( res<0 ){
int i;
LsmPgno iPtr = 0;
if( nRhs==0 ) iPtr = *piPgno;
rc = seekInSegment(
pCsr, &aPtr[0], iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
);
if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
res = 0;
}
for(i=1; i<=nRhs; i++){
segmentPtrReset(&aPtr[i], LSM_SEGMENTPTR_FREE_THRESHOLD);
}
}
if( res>=0 ){
int bHit = 0;
LsmPgno iPtr = *piPgno;
int i;
segmentPtrReset(&aPtr[0], LSM_SEGMENTPTR_FREE_THRESHOLD);
for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){
SegmentPtr *pPtr = &aPtr[i];
iOut = 0;
rc = seekInSegment(
pCsr, pPtr, iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
);
iPtr = iOut;
if( pPtr->pPg ){
res = sortedKeyCompare(pCsr->pDb->xCmp,
rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
);
if( res<0 ){
if( pPtr->eType & LSM_START_DELETE ){
pPtr->eType &= ~LSM_INSERT;
pPtr->pKey = pLvl->pSplitKey;
pPtr->nKey = pLvl->nSplitKey;
pPtr->pVal = 0;
pPtr->nVal = 0;
}else{
segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
}
}
}
if( aPtr[i].pKey ) bHit = 1;
}
if( rc==LSM_OK && eSeek==LSM_SEEK_LE && bHit==0 ){
rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
}
}
assert( eSeek==LSM_SEEK_EQ || bStop==0 );
*piPgno = iOut;
*pbStop = bStop;
return rc;
}
static void multiCursorGetKey(
MultiCursor *pCsr,
int iKey,
int *peType,
void **ppKey,
int *pnKey
){
int nKey = 0;
void *pKey = 0;
int eType = 0;
switch( iKey ){
case CURSOR_DATA_TREE0:
case CURSOR_DATA_TREE1: {
TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
if( lsmTreeCursorValid(pTreeCsr) ){
lsmTreeCursorKey(pTreeCsr, &eType, &pKey, &nKey);
}
break;
}
case CURSOR_DATA_SYSTEM: {
Snapshot *pWorker = pCsr->pDb->pWorker;
if( pWorker && (pCsr->flags & CURSOR_FLUSH_FREELIST) ){
int nEntry = pWorker->freelist.nEntry;
if( pCsr->iFree < (nEntry*2) ){
FreelistEntry *aEntry = pWorker->freelist.aEntry;
int i = nEntry - 1 - (pCsr->iFree / 2);
u32 iKey2 = 0;
if( (pCsr->iFree % 2) ){
eType = LSM_END_DELETE|LSM_SYSTEMKEY;
iKey2 = aEntry[i].iBlk-1;
}else if( aEntry[i].iId>=0 ){
eType = LSM_INSERT|LSM_SYSTEMKEY;
iKey2 = aEntry[i].iBlk;
if( i<(nEntry-1) && aEntry[i+1].iBlk==iKey2+1 && aEntry[i+1].iId<0 ){
eType |= LSM_END_DELETE;
}
}else{
eType = LSM_START_DELETE|LSM_SYSTEMKEY;
iKey2 = aEntry[i].iBlk + 1;
}
if( i>0 && aEntry[i-1].iBlk==iKey2-1 && aEntry[i-1].iId<0 ){
eType |= LSM_START_DELETE;
}
pKey = pCsr->pSystemVal;
nKey = 4;
lsmPutU32(pKey, ~iKey2);
}
}
break;
}
default: {
int iPtr = iKey - CURSOR_DATA_SEGMENT;
assert( iPtr>=0 );
if( iPtr==pCsr->nPtr ){
if( pCsr->pBtCsr ){
pKey = pCsr->pBtCsr->pKey;
nKey = pCsr->pBtCsr->nKey;
eType = pCsr->pBtCsr->eType;
}
}else if( iPtr<pCsr->nPtr ){
SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
if( pPtr->pPg ){
pKey = pPtr->pKey;
nKey = pPtr->nKey;
eType = pPtr->eType;
}
}
break;
}
}
if( peType ) *peType = eType;
if( pnKey ) *pnKey = nKey;
if( ppKey ) *ppKey = pKey;
}
static int sortedDbKeyCompare(
MultiCursor *pCsr,
int iLhsFlags, void *pLhsKey, int nLhsKey,
int iRhsFlags, void *pRhsKey, int nRhsKey
){
int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
int res;
res = sortedKeyCompare(xCmp,
rtTopic(iLhsFlags), pLhsKey, nLhsKey,
rtTopic(iRhsFlags), pRhsKey, nRhsKey
);
if( res==0 && (pCsr->flags & CURSOR_IGNORE_DELETE) ){
const int m = LSM_POINT_DELETE|LSM_INSERT|LSM_END_DELETE |LSM_START_DELETE;
int iDel1 = 0;
int iDel2 = 0;
if( LSM_START_DELETE==(iLhsFlags & m) ) iDel1 = +1;
if( LSM_END_DELETE ==(iLhsFlags & m) ) iDel1 = -1;
if( LSM_START_DELETE==(iRhsFlags & m) ) iDel2 = +1;
if( LSM_END_DELETE ==(iRhsFlags & m) ) iDel2 = -1;
res = (iDel1 - iDel2);
}
return res;
}
static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){
int i1;
int i2;
int iRes;
void *pKey1; int nKey1; int eType1;
void *pKey2; int nKey2; int eType2;
const int mul = (bReverse ? -1 : 1);
assert( pCsr->aTree && iOut<pCsr->nTree );
if( iOut>=(pCsr->nTree/2) ){
i1 = (iOut - pCsr->nTree/2) * 2;
i2 = i1 + 1;
}else{
i1 = pCsr->aTree[iOut*2];
i2 = pCsr->aTree[iOut*2+1];
}
multiCursorGetKey(pCsr, i1, &eType1, &pKey1, &nKey1);
multiCursorGetKey(pCsr, i2, &eType2, &pKey2, &nKey2);
if( pKey1==0 ){
iRes = i2;
}else if( pKey2==0 ){
iRes = i1;
}else{
int res;
res = sortedDbKeyCompare(pCsr,
eType1, pKey1, nKey1, eType2, pKey2, nKey2
);
res = res * mul;
if( res==0 ){
int nc1 = (eType1 & (LSM_INSERT|LSM_POINT_DELETE))==0;
int nc2 = (eType2 & (LSM_INSERT|LSM_POINT_DELETE))==0;
iRes = (nc1 > nc2) ? i2 : i1;
}else if( res<0 ){
iRes = i1;
}else{
iRes = i2;
}
}
pCsr->aTree[iOut] = iRes;
}
static int segmentCursorAdvance(
MultiCursor *pCsr,
int iPtr,
int bReverse
){
int rc;
SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
Level *pLvl = pPtr->pLevel;
int bComposite;
rc = segmentPtrAdvance(pCsr, pPtr, bReverse);
if( rc!=LSM_OK ) return rc;
bComposite = (pLvl->nRight>0 && pCsr->nPtr>pLvl->nRight);
if( bComposite && pPtr->pPg==0 ){
int bFix = 0;
if( (bReverse==0)==(pPtr->pSeg==&pLvl->lhs) ){
int i;
if( bReverse ){
SegmentPtr *pLhs = &pCsr->aPtr[iPtr - 1 - (pPtr->pSeg - pLvl->aRhs)];
for(i=0; i<pLvl->nRight; i++){
if( pLhs[i+1].pPg ) break;
}
if( i==pLvl->nRight ){
bFix = 1;
rc = segmentPtrEnd(pCsr, pLhs, 1);
}
}else{
bFix = 1;
for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
}
}
}
if( bFix ){
int i;
for(i=pCsr->nTree-1; i>0; i--){
multiCursorDoCompare(pCsr, i, bReverse);
}
}
}
#if 0#endif
return rc;
}
static void mcursorFreeComponents(MultiCursor *pCsr){
int i;
lsm_env *pEnv = pCsr->pDb->pEnv;
lsmTreeCursorDestroy(pCsr->apTreeCsr[0]);
lsmTreeCursorDestroy(pCsr->apTreeCsr[1]);
for(i=0; i<pCsr->nPtr; i++){
segmentPtrReset(&pCsr->aPtr[i], 0);
}
btreeCursorFree(pCsr->pBtCsr);
lsmFree(pEnv, pCsr->aPtr);
lsmFree(pEnv, pCsr->aTree);
lsmFree(pEnv, pCsr->pSystemVal);
pCsr->nPtr = 0;
pCsr->aPtr = 0;
pCsr->nTree = 0;
pCsr->aTree = 0;
pCsr->pSystemVal = 0;
pCsr->apTreeCsr[0] = 0;
pCsr->apTreeCsr[1] = 0;
pCsr->pBtCsr = 0;
}
void lsmMCursorFreeCache(lsm_db *pDb){
MultiCursor *p;
MultiCursor *pNext;
for(p=pDb->pCsrCache; p; p=pNext){
pNext = p->pNext;
lsmMCursorClose(p, 0);
}
pDb->pCsrCache = 0;
}
void lsmMCursorClose(MultiCursor *pCsr, int bCache){
if( pCsr ){
lsm_db *pDb = pCsr->pDb;
MultiCursor **pp;
for(pp=&pDb->pCsr; *pp; pp=&((*pp)->pNext)){
if( *pp==pCsr ){
*pp = pCsr->pNext;
break;
}
}
if( bCache ){
int i;
assert( !pCsr->pBtCsr );
for(i=0; i<pCsr->nPtr; i++){
SegmentPtr *pPtr = &pCsr->aPtr[i];
lsmFsPageRelease(pPtr->pPg);
pPtr->pPg = 0;
}
lsmTreeCursorReset(pCsr->apTreeCsr[0]);
lsmTreeCursorReset(pCsr->apTreeCsr[1]);
pCsr->pNext = pDb->pCsrCache;
pDb->pCsrCache = pCsr;
}else{
sortedBlobFree(&pCsr->key);
sortedBlobFree(&pCsr->val);
mcursorFreeComponents(pCsr);
lsmFree(pDb->pEnv, pCsr);
}
}
}
#define TREE_NONE 0
#define TREE_OLD 1
#define TREE_BOTH 2
static int multiCursorAddTree(MultiCursor *pCsr, Snapshot *pSnap, int eTree){
int rc = LSM_OK;
lsm_db *db = pCsr->pDb;
if( eTree!=TREE_NONE
&& lsmTreeHasOld(db)
&& db->treehdr.iOldLog!=pSnap->iLogOff
){
rc = lsmTreeCursorNew(db, 1, &pCsr->apTreeCsr[1]);
}
if( rc==LSM_OK && eTree==TREE_BOTH ){
rc = lsmTreeCursorNew(db, 0, &pCsr->apTreeCsr[0]);
}
return rc;
}
static int multiCursorAddRhs(MultiCursor *pCsr, Level *pLvl){
int i;
int nRhs = pLvl->nRight;
assert( pLvl->nRight>0 );
assert( pCsr->aPtr==0 );
pCsr->aPtr = lsmMallocZero(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nRhs);
if( !pCsr->aPtr ) return LSM_NOMEM_BKPT;
pCsr->nPtr = nRhs;
for(i=0; i<nRhs; i++){
pCsr->aPtr[i].pSeg = &pLvl->aRhs[i];
pCsr->aPtr[i].pLevel = pLvl;
}
return LSM_OK;
}
static void multiCursorAddOne(MultiCursor *pCsr, Level *pLvl, int *pRc){
if( *pRc==LSM_OK ){
int iPtr = pCsr->nPtr;
int i;
pCsr->aPtr[iPtr].pLevel = pLvl;
pCsr->aPtr[iPtr].pSeg = &pLvl->lhs;
iPtr++;
for(i=0; i<pLvl->nRight; i++){
pCsr->aPtr[iPtr].pLevel = pLvl;
pCsr->aPtr[iPtr].pSeg = &pLvl->aRhs[i];
iPtr++;
}
if( pLvl->nRight && pLvl->pSplitKey==0 ){
sortedSplitkey(pCsr->pDb, pLvl, pRc);
}
pCsr->nPtr = iPtr;
}
}
static int multiCursorAddAll(MultiCursor *pCsr, Snapshot *pSnap){
Level *pLvl;
int nPtr = 0;
int rc = LSM_OK;
for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){
if( pLvl->flags & LEVEL_INCOMPLETE ) continue;
nPtr += (1 + pLvl->nRight);
}
assert( pCsr->aPtr==0 );
pCsr->aPtr = lsmMallocZeroRc(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nPtr, &rc);
for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){
if( (pLvl->flags & LEVEL_INCOMPLETE)==0 ){
multiCursorAddOne(pCsr, pLvl, &rc);
}
}
return rc;
}
static int multiCursorInit(MultiCursor *pCsr, Snapshot *pSnap){
int rc;
rc = multiCursorAddAll(pCsr, pSnap);
if( rc==LSM_OK ){
rc = multiCursorAddTree(pCsr, pSnap, TREE_BOTH);
}
pCsr->flags |= (CURSOR_IGNORE_SYSTEM | CURSOR_IGNORE_DELETE);
return rc;
}
static MultiCursor *multiCursorNew(lsm_db *db, int *pRc){
MultiCursor *pCsr;
pCsr = (MultiCursor *)lsmMallocZeroRc(db->pEnv, sizeof(MultiCursor), pRc);
if( pCsr ){
pCsr->pNext = db->pCsr;
db->pCsr = pCsr;
pCsr->pDb = db;
}
return pCsr;
}
void lsmSortedRemap(lsm_db *pDb){
MultiCursor *pCsr;
for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
int iPtr;
if( pCsr->pBtCsr ){
btreeCursorLoadKey(pCsr->pBtCsr);
}
for(iPtr=0; iPtr<pCsr->nPtr; iPtr++){
segmentPtrLoadCell(&pCsr->aPtr[iPtr], pCsr->aPtr[iPtr].iCell);
}
}
}
static void multiCursorReadSeparators(MultiCursor *pCsr){
if( pCsr->nPtr>0 ){
pCsr->flags |= CURSOR_READ_SEPARATORS;
}
}
static void multiCursorIgnoreDelete(MultiCursor *pCsr){
if( pCsr ) pCsr->flags |= CURSOR_IGNORE_DELETE;
}
static int multiCursorVisitFreelist(MultiCursor *pCsr){
int rc = LSM_OK;
pCsr->flags |= CURSOR_FLUSH_FREELIST;
pCsr->pSystemVal = lsmMallocRc(pCsr->pDb->pEnv, 4 + 8, &rc);
return rc;
}
int lsmMCursorNew(
lsm_db *pDb,
MultiCursor **ppCsr
){
MultiCursor *pCsr = 0;
int rc = LSM_OK;
if( pDb->pCsrCache ){
int bOld;
pCsr = pDb->pCsrCache;
pDb->pCsrCache = pCsr->pNext;
pCsr->pNext = pDb->pCsr;
pDb->pCsr = pCsr;
bOld = (lsmTreeHasOld(pDb) && pDb->treehdr.iOldLog!=pDb->pClient->iLogOff);
if( !bOld && pCsr->apTreeCsr[1] ){
lsmTreeCursorDestroy(pCsr->apTreeCsr[1]);
pCsr->apTreeCsr[1] = 0;
}else if( bOld && !pCsr->apTreeCsr[1] ){
rc = lsmTreeCursorNew(pDb, 1, &pCsr->apTreeCsr[1]);
}
pCsr->flags = (CURSOR_IGNORE_SYSTEM | CURSOR_IGNORE_DELETE);
}else{
pCsr = multiCursorNew(pDb, &rc);
if( rc==LSM_OK ) rc = multiCursorInit(pCsr, pDb->pClient);
}
if( rc!=LSM_OK ){
lsmMCursorClose(pCsr, 0);
pCsr = 0;
}
assert( (rc==LSM_OK)==(pCsr!=0) );
*ppCsr = pCsr;
return rc;
}
static int multiCursorGetVal(
MultiCursor *pCsr,
int iVal,
void **ppVal,
int *pnVal
){
int rc = LSM_OK;
*ppVal = 0;
*pnVal = 0;
switch( iVal ){
case CURSOR_DATA_TREE0:
case CURSOR_DATA_TREE1: {
TreeCursor *pTreeCsr = pCsr->apTreeCsr[iVal-CURSOR_DATA_TREE0];
if( lsmTreeCursorValid(pTreeCsr) ){
lsmTreeCursorValue(pTreeCsr, ppVal, pnVal);
}else{
*ppVal = 0;
*pnVal = 0;
}
break;
}
case CURSOR_DATA_SYSTEM: {
Snapshot *pWorker = pCsr->pDb->pWorker;
if( pWorker
&& (pCsr->iFree % 2)==0
&& pCsr->iFree < (pWorker->freelist.nEntry*2)
){
int iEntry = pWorker->freelist.nEntry - 1 - (pCsr->iFree / 2);
u8 *aVal = &((u8 *)(pCsr->pSystemVal))[4];
lsmPutU64(aVal, pWorker->freelist.aEntry[iEntry].iId);
*ppVal = aVal;
*pnVal = 8;
}
break;
}
default: {
int iPtr = iVal-CURSOR_DATA_SEGMENT;
if( iPtr<pCsr->nPtr ){
SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
if( pPtr->pPg ){
*ppVal = pPtr->pVal;
*pnVal = pPtr->nVal;
}
}
}
}
assert( rc==LSM_OK || (*ppVal==0 && *pnVal==0) );
return rc;
}
static int multiCursorAdvance(MultiCursor *pCsr, int bReverse);
int lsmSortedWalkFreelist(
lsm_db *pDb,
int bReverse,
int (*x)(void *, int, i64),
void *pCtx
){
MultiCursor *pCsr;
int rc = LSM_OK;
Snapshot *pSnap = 0;
assert( pDb->pWorker );
if( pDb->bIncrMerge ){
rc = lsmCheckpointDeserialize(pDb, 0, pDb->pShmhdr->aSnap1, &pSnap);
if( rc!=LSM_OK ) return rc;
}else{
pSnap = pDb->pWorker;
}
pCsr = multiCursorNew(pDb, &rc);
if( pCsr ){
rc = multiCursorAddAll(pCsr, pSnap);
pCsr->flags |= CURSOR_IGNORE_DELETE;
}
if( rc==LSM_OK ){
if( bReverse==0 ){
rc = lsmMCursorLast(pCsr);
}else{
rc = lsmMCursorSeek(pCsr, 1, "", 0, LSM_SEEK_GE);
}
while( rc==LSM_OK && lsmMCursorValid(pCsr) && rtIsSystem(pCsr->eType) ){
void *pKey; int nKey;
void *pVal = 0; int nVal = 0;
rc = lsmMCursorKey(pCsr, &pKey, &nKey);
if( rc==LSM_OK ) rc = lsmMCursorValue(pCsr, &pVal, &nVal);
if( rc==LSM_OK && (nKey!=4 || nVal!=8) ) rc = LSM_CORRUPT_BKPT;
if( rc==LSM_OK ){
int iBlk;
i64 iSnap;
iBlk = (int)(~(lsmGetU32((u8 *)pKey)));
iSnap = (i64)lsmGetU64((u8 *)pVal);
if( x(pCtx, iBlk, iSnap) ) break;
rc = multiCursorAdvance(pCsr, !bReverse);
}
}
}
lsmMCursorClose(pCsr, 0);
if( pSnap!=pDb->pWorker ){
lsmFreeSnapshot(pDb->pEnv, pSnap);
}
return rc;
}
int lsmSortedLoadFreelist(
lsm_db *pDb,
void **ppVal,
int *pnVal
){
MultiCursor *pCsr;
int rc = LSM_OK;
assert( pDb->pWorker );
assert( *ppVal==0 && *pnVal==0 );
pCsr = multiCursorNew(pDb, &rc);
if( pCsr ){
rc = multiCursorAddAll(pCsr, pDb->pWorker);
pCsr->flags |= CURSOR_IGNORE_DELETE;
}
if( rc==LSM_OK ){
rc = lsmMCursorLast(pCsr);
if( rc==LSM_OK
&& rtIsWrite(pCsr->eType) && rtIsSystem(pCsr->eType)
&& pCsr->key.nData==8
&& 0==memcmp(pCsr->key.pData, "FREELIST", 8)
){
void *pVal; int nVal;
rc = lsmMCursorValue(pCsr, &pVal, &nVal);
if( rc==LSM_OK ){
*ppVal = lsmMallocRc(pDb->pEnv, nVal, &rc);
if( *ppVal ){
memcpy(*ppVal, pVal, nVal);
*pnVal = nVal;
}
}
}
lsmMCursorClose(pCsr, 0);
}
return rc;
}
static int multiCursorAllocTree(MultiCursor *pCsr){
int rc = LSM_OK;
if( pCsr->aTree==0 ){
int nByte;
int nMin;
nMin = CURSOR_DATA_SEGMENT + pCsr->nPtr + (pCsr->pBtCsr!=0);
pCsr->nTree = 2;
while( pCsr->nTree<nMin ){
pCsr->nTree = pCsr->nTree*2;
}
nByte = sizeof(int)*pCsr->nTree*2;
pCsr->aTree = (int *)lsmMallocZeroRc(pCsr->pDb->pEnv, nByte, &rc);
}
return rc;
}
static void multiCursorCacheKey(MultiCursor *pCsr, int *pRc){
if( *pRc==LSM_OK ){
void *pKey;
int nKey;
multiCursorGetKey(pCsr, pCsr->aTree[1], &pCsr->eType, &pKey, &nKey);
*pRc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->key, pKey, nKey);
}
}
#ifdef LSM_DEBUG_EXPENSIVE
static void assertCursorTree(MultiCursor *pCsr){
int bRev = !!(pCsr->flags & CURSOR_PREV_OK);
int *aSave = pCsr->aTree;
int nSave = pCsr->nTree;
int rc;
pCsr->aTree = 0;
pCsr->nTree = 0;
rc = multiCursorAllocTree(pCsr);
if( rc==LSM_OK ){
int i;
for(i=pCsr->nTree-1; i>0; i--){
multiCursorDoCompare(pCsr, i, bRev);
}
assert( nSave==pCsr->nTree
&& 0==memcmp(aSave, pCsr->aTree, sizeof(int)*nSave)
);
lsmFree(pCsr->pDb->pEnv, pCsr->aTree);
}
pCsr->aTree = aSave;
pCsr->nTree = nSave;
}
#else
# define assertCursorTree(x)
#endif
static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){
int eType = pCsr->eType;
int iKey;
int i;
int rdmask;
assert( pCsr->flags & (CURSOR_NEXT_OK|CURSOR_PREV_OK) );
assertCursorTree(pCsr);
rdmask = (pCsr->flags & CURSOR_NEXT_OK) ? LSM_END_DELETE : LSM_START_DELETE;
if( (pCsr->flags & CURSOR_IGNORE_DELETE) && bDeleteOk==0 ){
if( (eType & LSM_INSERT)==0 ) return 0;
}
if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){
return 0;
}
#ifndef NDEBUG
for(i=0; i<pCsr->nPtr; i++){
SegmentPtr *pPtr = &pCsr->aPtr[i];
Level *pLvl = pPtr->pLevel;
if( pLvl->nRight && pPtr->pPg ){
if( pPtr->pSeg==&pLvl->lhs ){
int j;
for(j=0; j<pLvl->nRight; j++) assert( pPtr[j+1].pPg==0 );
}else{
int res = sortedKeyCompare(pCsr->pDb->xCmp,
rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
);
assert( res>=0 );
}
}
}
#endif
iKey = pCsr->aTree[1];
for(i=0; i<iKey; i++){
int csrflags;
multiCursorGetKey(pCsr, i, &csrflags, 0, 0);
if( (rdmask & csrflags) ){
const int SD_ED = (LSM_START_DELETE|LSM_END_DELETE);
if( (csrflags & SD_ED)==SD_ED
|| (pCsr->flags & CURSOR_IGNORE_DELETE)==0
){
void *pKey; int nKey;
multiCursorGetKey(pCsr, i, 0, &pKey, &nKey);
if( 0==sortedKeyCompare(pCsr->pDb->xCmp,
rtTopic(eType), pCsr->key.pData, pCsr->key.nData,
rtTopic(csrflags), pKey, nKey
)){
continue;
}
}
return 0;
}
}
return 1;
}
static int multiCursorSetupTree(MultiCursor *pCsr, int bRev){
int rc;
rc = multiCursorAllocTree(pCsr);
if( rc==LSM_OK ){
int i;
for(i=pCsr->nTree-1; i>0; i--){
multiCursorDoCompare(pCsr, i, bRev);
}
}
assertCursorTree(pCsr);
multiCursorCacheKey(pCsr, &rc);
if( rc==LSM_OK && mcursorLocationOk(pCsr, 0)==0 ){
rc = multiCursorAdvance(pCsr, bRev);
}
return rc;
}
static int multiCursorEnd(MultiCursor *pCsr, int bLast){
int rc = LSM_OK;
int i;
pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK | CURSOR_SEEK_EQ);
pCsr->flags |= (bLast ? CURSOR_PREV_OK : CURSOR_NEXT_OK);
pCsr->iFree = 0;
for(i=0; rc==LSM_OK && i<2; i++){
if( pCsr->apTreeCsr[i] ){
rc = lsmTreeCursorEnd(pCsr->apTreeCsr[i], bLast);
}
}
for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
SegmentPtr *pPtr = &pCsr->aPtr[i];
Level *pLvl = pPtr->pLevel;
int iRhs;
int bHit = 0;
if( bLast ){
for(iRhs=0; iRhs<pLvl->nRight && rc==LSM_OK; iRhs++){
rc = segmentPtrEnd(pCsr, &pPtr[iRhs+1], 1);
if( pPtr[iRhs+1].pPg ) bHit = 1;
}
if( bHit==0 && rc==LSM_OK ){
rc = segmentPtrEnd(pCsr, pPtr, 1);
}else{
segmentPtrReset(pPtr, LSM_SEGMENTPTR_FREE_THRESHOLD);
}
}else{
int bLhs = (pPtr->pSeg==&pLvl->lhs);
assert( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[0] );
if( bLhs ){
rc = segmentPtrEnd(pCsr, pPtr, 0);
if( pPtr->pKey ) bHit = 1;
}
for(iRhs=0; iRhs<pLvl->nRight && rc==LSM_OK; iRhs++){
if( bHit ){
segmentPtrReset(&pPtr[iRhs+1], LSM_SEGMENTPTR_FREE_THRESHOLD);
}else{
rc = sortedRhsFirst(pCsr, pLvl, &pPtr[iRhs+bLhs]);
}
}
}
i += pLvl->nRight;
}
if( rc==LSM_OK && pCsr->pBtCsr ){
assert( bLast==0 );
rc = btreeCursorFirst(pCsr->pBtCsr);
}
if( rc==LSM_OK ){
rc = multiCursorSetupTree(pCsr, bLast);
}
return rc;
}
int mcursorSave(MultiCursor *pCsr){
int rc = LSM_OK;
if( pCsr->aTree ){
int iTree = pCsr->aTree[1];
if( iTree==CURSOR_DATA_TREE0 || iTree==CURSOR_DATA_TREE1 ){
multiCursorCacheKey(pCsr, &rc);
}
}
mcursorFreeComponents(pCsr);
return rc;
}
int mcursorRestore(lsm_db *pDb, MultiCursor *pCsr){
int rc;
rc = multiCursorInit(pCsr, pDb->pClient);
if( rc==LSM_OK && pCsr->key.pData ){
rc = lsmMCursorSeek(pCsr,
rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData, +1
);
}
return rc;
}
int lsmSaveCursors(lsm_db *pDb){
int rc = LSM_OK;
MultiCursor *pCsr;
for(pCsr=pDb->pCsr; rc==LSM_OK && pCsr; pCsr=pCsr->pNext){
rc = mcursorSave(pCsr);
}
return rc;
}
int lsmRestoreCursors(lsm_db *pDb){
int rc = LSM_OK;
MultiCursor *pCsr;
for(pCsr=pDb->pCsr; rc==LSM_OK && pCsr; pCsr=pCsr->pNext){
rc = mcursorRestore(pDb, pCsr);
}
return rc;
}
int lsmMCursorFirst(MultiCursor *pCsr){
return multiCursorEnd(pCsr, 0);
}
int lsmMCursorLast(MultiCursor *pCsr){
return multiCursorEnd(pCsr, 1);
}
lsm_db *lsmMCursorDb(MultiCursor *pCsr){
return pCsr->pDb;
}
void lsmMCursorReset(MultiCursor *pCsr){
int i;
lsmTreeCursorReset(pCsr->apTreeCsr[0]);
lsmTreeCursorReset(pCsr->apTreeCsr[1]);
for(i=0; i<pCsr->nPtr; i++){
segmentPtrReset(&pCsr->aPtr[i], LSM_SEGMENTPTR_FREE_THRESHOLD);
}
pCsr->key.nData = 0;
}
static int treeCursorSeek(
MultiCursor *pCsr,
TreeCursor *pTreeCsr,
void *pKey, int nKey,
int eSeek,
int *pbStop
){
int rc = LSM_OK;
if( pTreeCsr ){
int res = 0;
lsmTreeCursorSeek(pTreeCsr, pKey, nKey, &res);
switch( eSeek ){
case LSM_SEEK_EQ: {
int eType = lsmTreeCursorFlags(pTreeCsr);
if( (res<0 && (eType & LSM_START_DELETE))
|| (res>0 && (eType & LSM_END_DELETE))
|| (res==0 && (eType & LSM_POINT_DELETE))
){
*pbStop = 1;
}else if( res==0 && (eType & LSM_INSERT) ){
lsm_env *pEnv = pCsr->pDb->pEnv;
void *p; int n;
*pbStop = 1;
pCsr->flags |= CURSOR_SEEK_EQ;
rc = lsmTreeCursorKey(pTreeCsr, &pCsr->eType, &p, &n);
if( rc==LSM_OK ) rc = sortedBlobSet(pEnv, &pCsr->key, p, n);
if( rc==LSM_OK ) rc = lsmTreeCursorValue(pTreeCsr, &p, &n);
if( rc==LSM_OK ) rc = sortedBlobSet(pEnv, &pCsr->val, p, n);
}
lsmTreeCursorReset(pTreeCsr);
break;
}
case LSM_SEEK_GE:
if( res<0 && lsmTreeCursorValid(pTreeCsr) ){
lsmTreeCursorNext(pTreeCsr);
}
break;
default:
if( res>0 ){
assert( lsmTreeCursorValid(pTreeCsr) );
lsmTreeCursorPrev(pTreeCsr);
}
break;
}
}
return rc;
}
int lsmMCursorSeek(
MultiCursor *pCsr,
int iTopic,
void *pKey, int nKey,
int eSeek
){
int eESeek = eSeek;
int bStop = 0;
int rc = LSM_OK;
int iPtr = 0;
LsmPgno iPgno = 0;
assert( pCsr->apTreeCsr[0]==0 || iTopic==0 );
assert( pCsr->apTreeCsr[1]==0 || iTopic==0 );
if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE;
assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE );
assert( (pCsr->flags & CURSOR_FLUSH_FREELIST)==0 );
assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel );
pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK | CURSOR_SEEK_EQ);
rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[0], pKey, nKey, eESeek, &bStop);
if( rc==LSM_OK && bStop==0 ){
rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[1], pKey, nKey, eESeek, &bStop);
}
for(iPtr=0; iPtr<pCsr->nPtr && rc==LSM_OK && bStop==0; iPtr++){
SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
assert( pPtr->pSeg==&pPtr->pLevel->lhs );
rc = seekInLevel(pCsr, pPtr, eESeek, iTopic, pKey, nKey, &iPgno, &bStop);
iPtr += pPtr->pLevel->nRight;
}
if( eSeek!=LSM_SEEK_EQ ){
if( rc==LSM_OK ){
rc = multiCursorAllocTree(pCsr);
}
if( rc==LSM_OK ){
int i;
for(i=pCsr->nTree-1; i>0; i--){
multiCursorDoCompare(pCsr, i, eESeek==LSM_SEEK_LE);
}
if( eSeek==LSM_SEEK_GE ) pCsr->flags |= CURSOR_NEXT_OK;
if( eSeek==LSM_SEEK_LE ) pCsr->flags |= CURSOR_PREV_OK;
}
multiCursorCacheKey(pCsr, &rc);
if( rc==LSM_OK && eSeek!=LSM_SEEK_LEFAST && 0==mcursorLocationOk(pCsr, 0) ){
switch( eESeek ){
case LSM_SEEK_EQ:
lsmMCursorReset(pCsr);
break;
case LSM_SEEK_GE:
rc = lsmMCursorNext(pCsr);
break;
default:
rc = lsmMCursorPrev(pCsr);
break;
}
}
}
return rc;
}
int lsmMCursorValid(MultiCursor *pCsr){
int res = 0;
if( pCsr->flags & CURSOR_SEEK_EQ ){
res = 1;
}else if( pCsr->aTree ){
int iKey = pCsr->aTree[1];
if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
res = lsmTreeCursorValid(pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]);
}else{
void *pKey;
multiCursorGetKey(pCsr, iKey, 0, &pKey, 0);
res = pKey!=0;
}
}
return res;
}
static int mcursorAdvanceOk(
MultiCursor *pCsr,
int bReverse,
int *pRc
){
void *pNew;
int nNew;
int eNewType;
if( *pRc ) return 1;
multiCursorGetKey(pCsr, pCsr->aTree[1], &eNewType, &pNew, &nNew);
if( pNew ){
int typemask = (pCsr->flags & CURSOR_IGNORE_DELETE) ? ~(0) : LSM_SYSTEMKEY;
int res = sortedDbKeyCompare(pCsr,
eNewType & typemask, pNew, nNew,
pCsr->eType & typemask, pCsr->key.pData, pCsr->key.nData
);
if( (bReverse==0 && res<=0) || (bReverse!=0 && res>=0) ){
return 0;
}
multiCursorCacheKey(pCsr, pRc);
assert( pCsr->eType==eNewType );
if( *pRc==LSM_OK && 0==mcursorLocationOk(pCsr, 0) ) return 0;
}
return 1;
}
static void flCsrAdvance(MultiCursor *pCsr){
assert( pCsr->flags & CURSOR_FLUSH_FREELIST );
if( pCsr->iFree % 2 ){
pCsr->iFree++;
}else{
int nEntry = pCsr->pDb->pWorker->freelist.nEntry;
FreelistEntry *aEntry = pCsr->pDb->pWorker->freelist.aEntry;
int i = nEntry - 1 - (pCsr->iFree / 2);
if( aEntry[i].iId<0 ){
while( 1 ){
if( i==0 || aEntry[i-1].iBlk!=aEntry[i].iBlk-1 ){
pCsr->iFree--;
break;
}
if( aEntry[i-1].iId>=0 ) break;
pCsr->iFree += 2;
i--;
}
}
pCsr->iFree += 2;
}
}
static int multiCursorAdvance(MultiCursor *pCsr, int bReverse){
int rc = LSM_OK;
if( lsmMCursorValid(pCsr) ){
do {
int iKey = pCsr->aTree[1];
assertCursorTree(pCsr);
if( pCsr->pPrevMergePtr ){
if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
assert( pCsr->pBtCsr );
*pCsr->pPrevMergePtr = pCsr->pBtCsr->iPtr;
}else if( pCsr->pBtCsr==0 && pCsr->nPtr>0
&& iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr-1)
){
SegmentPtr *pPtr = &pCsr->aPtr[iKey-CURSOR_DATA_SEGMENT];
*pCsr->pPrevMergePtr = pPtr->iPtr+pPtr->iPgPtr;
}
}
if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
if( bReverse ){
rc = lsmTreeCursorPrev(pTreeCsr);
}else{
rc = lsmTreeCursorNext(pTreeCsr);
}
}else if( iKey==CURSOR_DATA_SYSTEM ){
assert( pCsr->flags & CURSOR_FLUSH_FREELIST );
assert( bReverse==0 );
flCsrAdvance(pCsr);
}else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
assert( bReverse==0 && pCsr->pBtCsr );
rc = btreeCursorNext(pCsr->pBtCsr);
}else{
rc = segmentCursorAdvance(pCsr, iKey-CURSOR_DATA_SEGMENT, bReverse);
}
if( rc==LSM_OK ){
int i;
for(i=(iKey+pCsr->nTree)/2; i>0; i=i/2){
multiCursorDoCompare(pCsr, i, bReverse);
}
assertCursorTree(pCsr);
}
}while( mcursorAdvanceOk(pCsr, bReverse, &rc)==0 );
}
return rc;
}
int lsmMCursorNext(MultiCursor *pCsr){
if( (pCsr->flags & CURSOR_NEXT_OK)==0 ) return LSM_MISUSE_BKPT;
return multiCursorAdvance(pCsr, 0);
}
int lsmMCursorPrev(MultiCursor *pCsr){
if( (pCsr->flags & CURSOR_PREV_OK)==0 ) return LSM_MISUSE_BKPT;
return multiCursorAdvance(pCsr, 1);
}
int lsmMCursorKey(MultiCursor *pCsr, void **ppKey, int *pnKey){
if( (pCsr->flags & CURSOR_SEEK_EQ) || pCsr->aTree==0 ){
*pnKey = pCsr->key.nData;
*ppKey = pCsr->key.pData;
}else{
int iKey = pCsr->aTree[1];
if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
lsmTreeCursorKey(pTreeCsr, 0, ppKey, pnKey);
}else{
int nKey;
#ifndef NDEBUG
void *pKey;
int eType;
multiCursorGetKey(pCsr, iKey, &eType, &pKey, &nKey);
assert( eType==pCsr->eType );
assert( nKey==pCsr->key.nData );
assert( memcmp(pKey, pCsr->key.pData, nKey)==0 );
#endif
nKey = pCsr->key.nData;
if( nKey==0 ){
*ppKey = 0;
}else{
*ppKey = pCsr->key.pData;
}
*pnKey = nKey;
}
}
return LSM_OK;
}
int lsm_csr_cmp(lsm_cursor *csr, const void *pKey, int nKey, int *piRes){
MultiCursor *pCsr = (MultiCursor *)csr;
void *pCsrkey; int nCsrkey;
int rc;
rc = lsmMCursorKey(pCsr, &pCsrkey, &nCsrkey);
if( rc==LSM_OK ){
int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
*piRes = sortedKeyCompare(xCmp, 0, pCsrkey, nCsrkey, 0, (void *)pKey, nKey);
}
return rc;
}
int lsmMCursorValue(MultiCursor *pCsr, void **ppVal, int *pnVal){
void *pVal;
int nVal;
int rc;
if( (pCsr->flags & CURSOR_SEEK_EQ) || pCsr->aTree==0 ){
rc = LSM_OK;
nVal = pCsr->val.nData;
pVal = pCsr->val.pData;
}else{
assert( pCsr->aTree );
assert( mcursorLocationOk(pCsr, (pCsr->flags & CURSOR_IGNORE_DELETE)) );
rc = multiCursorGetVal(pCsr, pCsr->aTree[1], &pVal, &nVal);
if( pVal && rc==LSM_OK ){
rc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->val, pVal, nVal);
pVal = pCsr->val.pData;
}
if( rc!=LSM_OK ){
pVal = 0;
nVal = 0;
}
}
*ppVal = pVal;
*pnVal = nVal;
return rc;
}
int lsmMCursorType(MultiCursor *pCsr, int *peType){
assert( pCsr->aTree );
multiCursorGetKey(pCsr, pCsr->aTree[1], peType, 0, 0);
return LSM_OK;
}
static int mergeWorkerPageOffset(u8 *aData, int nData){
int nRec;
int iOff;
int nKey;
int eType;
i64 nDummy;
nRec = lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec-1)]);
eType = aData[iOff++];
assert( eType==0
|| eType==(LSM_SYSTEMKEY|LSM_SEPARATOR)
|| eType==(LSM_SEPARATOR)
);
iOff += lsmVarintGet64(&aData[iOff], &nDummy);
iOff += lsmVarintGet32(&aData[iOff], &nKey);
return iOff + (eType ? nKey : 0);
}
static int mergeWorkerMoveHierarchy(
MergeWorker *pMW,
int bSep
){
lsm_db *pDb = pMW->pDb;
int rc = LSM_OK;
int i;
Page **apHier = pMW->hier.apHier;
int nHier = pMW->hier.nHier;
for(i=0; rc==LSM_OK && i<nHier; i++){
Page *pNew = 0;
rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pMW->pLevel, 1, &pNew);
assert( rc==LSM_OK );
if( rc==LSM_OK ){
u8 *a1; int n1;
u8 *a2; int n2;
a1 = fsPageData(pNew, &n1);
a2 = fsPageData(apHier[i], &n2);
assert( n1==n2 || n1+4==n2 );
if( n1==n2 ){
memcpy(a1, a2, n2);
}else{
int nEntry = pageGetNRec(a2, n2);
int iEof1 = SEGMENT_EOF(n1, nEntry);
int iEof2 = SEGMENT_EOF(n2, nEntry);
memcpy(a1, a2, iEof2 - 4);
memcpy(&a1[iEof1], &a2[iEof2], n2 - iEof2);
}
lsmFsPageRelease(apHier[i]);
apHier[i] = pNew;
#if 0#endif
}
}
#ifdef LSM_DEBUG
if( rc==LSM_OK ){
for(i=0; i<nHier; i++) assert( lsmFsPageWritable(apHier[i]) );
}
#endif
return rc;
}
static int mergeWorkerLoadHierarchy(MergeWorker *pMW){
int rc = LSM_OK;
Segment *pSeg;
Hierarchy *p;
pSeg = &pMW->pLevel->lhs;
p = &pMW->hier;
if( p->apHier==0 && pSeg->iRoot!=0 ){
FileSystem *pFS = pMW->pDb->pFS;
lsm_env *pEnv = pMW->pDb->pEnv;
Page **apHier = 0;
int nHier = 0;
LsmPgno iPg = pSeg->iRoot;
do {
Page *pPg = 0;
u8 *aData;
int nData;
int flags;
rc = lsmFsDbPageGet(pFS, pSeg, iPg, &pPg);
if( rc!=LSM_OK ) break;
aData = fsPageData(pPg, &nData);
flags = pageGetFlags(aData, nData);
if( flags&SEGMENT_BTREE_FLAG ){
Page **apNew = (Page **)lsmRealloc(
pEnv, apHier, sizeof(Page *)*(nHier+1)
);
if( apNew==0 ){
rc = LSM_NOMEM_BKPT;
break;
}
apHier = apNew;
memmove(&apHier[1], &apHier[0], sizeof(Page *) * nHier);
nHier++;
apHier[0] = pPg;
iPg = pageGetPtr(aData, nData);
}else{
lsmFsPageRelease(pPg);
break;
}
}while( 1 );
if( rc==LSM_OK ){
u8 *aData;
int nData;
aData = fsPageData(apHier[0], &nData);
pMW->aSave[0].iPgno = pageGetPtr(aData, nData);
p->nHier = nHier;
p->apHier = apHier;
rc = mergeWorkerMoveHierarchy(pMW, 0);
}else{
int i;
for(i=0; i<nHier; i++){
lsmFsPageRelease(apHier[i]);
}
lsmFree(pEnv, apHier);
}
}
return rc;
}
static int mergeWorkerBtreeWrite(
MergeWorker *pMW,
u8 eType,
LsmPgno iPtr,
LsmPgno iKeyPg,
void *pKey,
int nKey
){
Hierarchy *p = &pMW->hier;
lsm_db *pDb = pMW->pDb;
int rc = LSM_OK;
int iLevel;
int nData;
u8 *aData;
int iOff;
int nRec;
assert( (eType==0)==(iKeyPg!=0) );
for(iLevel=0; iLevel<=p->nHier; iLevel++){
int nByte;
if( iLevel==p->nHier ){
Page **aNew;
aNew = (Page **)lsmRealloc(
pMW->pDb->pEnv, p->apHier, sizeof(Page *)*(p->nHier+1)
);
if( !aNew ){
return LSM_NOMEM_BKPT;
}
p->apHier = aNew;
}else{
Page *pOld;
int nFree;
pOld = p->apHier[iLevel];
assert( lsmFsPageWritable(pOld) );
aData = fsPageData(pOld, &nData);
if( eType==0 ){
nByte = 2 + 1 + lsmVarintLen64(iPtr) + lsmVarintLen64(iKeyPg);
}else{
nByte = 2 + 1 + lsmVarintLen64(iPtr) + lsmVarintLen32(nKey) + nKey;
}
nRec = pageGetNRec(aData, nData);
nFree = SEGMENT_EOF(nData, nRec) - mergeWorkerPageOffset(aData, nData);
if( nByte<=nFree ) break;
lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iPtr);
assert( lsmFsPageNumber(pOld)==0 );
rc = lsmFsPagePersist(pOld);
if( rc==LSM_OK ){
iPtr = lsmFsPageNumber(pOld);
lsmFsPageRelease(pOld);
}
}
p->apHier[iLevel] = 0;
if( rc==LSM_OK ){
rc = lsmFsSortedAppend(
pDb->pFS, pDb->pWorker, pMW->pLevel, 1, &p->apHier[iLevel]
);
}
if( rc!=LSM_OK ) return rc;
aData = fsPageData(p->apHier[iLevel], &nData);
memset(aData, 0, nData);
lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], SEGMENT_BTREE_FLAG);
lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
if( iLevel==p->nHier ){
p->nHier++;
break;
}
}
aData = fsPageData(p->apHier[iLevel], &nData);
iOff = mergeWorkerPageOffset(aData, nData);
nRec = pageGetNRec(aData, nData);
lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], (u16)iOff);
lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], (u16)(nRec+1));
if( eType==0 ){
aData[iOff++] = 0x00;
iOff += lsmVarintPut64(&aData[iOff], iPtr);
iOff += lsmVarintPut64(&aData[iOff], iKeyPg);
}else{
aData[iOff++] = eType;
iOff += lsmVarintPut64(&aData[iOff], iPtr);
iOff += lsmVarintPut32(&aData[iOff], nKey);
memcpy(&aData[iOff], pKey, nKey);
}
return rc;
}
static int mergeWorkerBtreeIndirect(MergeWorker *pMW){
int rc = LSM_OK;
if( pMW->iIndirect ){
LsmPgno iKeyPg = pMW->aSave[1].iPgno;
rc = mergeWorkerBtreeWrite(pMW, 0, pMW->iIndirect, iKeyPg, 0, 0);
pMW->iIndirect = 0;
}
return rc;
}
static int mergeWorkerPushHierarchy(
MergeWorker *pMW,
int iTopic,
void *pKey,
int nKey
){
int rc = LSM_OK;
LsmPgno iPtr;
assert( pMW->aSave[0].bStore==0 );
assert( pMW->aSave[1].bStore==0 );
rc = mergeWorkerBtreeIndirect(pMW);
iPtr = pMW->aSave[0].iPgno;
assert( iPtr!=0 );
if( (nKey*4 > lsmFsPageSize(pMW->pDb->pFS)) ){
pMW->iIndirect = iPtr;
pMW->aSave[1].bStore = 1;
}else{
rc = mergeWorkerBtreeWrite(
pMW, (u8)(iTopic | LSM_SEPARATOR), iPtr, 0, pKey, nKey
);
}
return rc;
}
static int mergeWorkerFinishHierarchy(
MergeWorker *pMW
){
int i;
int rc = LSM_OK;
LsmPgno iPtr;
iPtr = pMW->aSave[0].iPgno;
for(i=0; i<pMW->hier.nHier && rc==LSM_OK; i++){
Page *pPg = pMW->hier.apHier[i];
int nData;
u8 *aData;
aData = fsPageData(pPg, &nData);
lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iPtr);
rc = lsmFsPagePersist(pPg);
iPtr = lsmFsPageNumber(pPg);
lsmFsPageRelease(pPg);
}
if( pMW->hier.nHier ){
pMW->pLevel->lhs.iRoot = iPtr;
lsmFree(pMW->pDb->pEnv, pMW->hier.apHier);
pMW->hier.apHier = 0;
pMW->hier.nHier = 0;
}
return rc;
}
static int mergeWorkerAddPadding(
MergeWorker *pMW
){
FileSystem *pFS = pMW->pDb->pFS;
return lsmFsSortedPadding(pFS, pMW->pDb->pWorker, &pMW->pLevel->lhs);
}
static void mergeWorkerReleaseAll(MergeWorker *pMW){
int i;
lsmFsPageRelease(pMW->pPage);
pMW->pPage = 0;
for(i=0; i<pMW->hier.nHier; i++){
lsmFsPageRelease(pMW->hier.apHier[i]);
pMW->hier.apHier[i] = 0;
}
lsmFree(pMW->pDb->pEnv, pMW->hier.apHier);
pMW->hier.apHier = 0;
pMW->hier.nHier = 0;
}
static int keyszToSkip(FileSystem *pFS, int nKey){
int nPgsz;
nPgsz = lsmFsPageSize(pFS);
return LSM_MIN(((nKey * 4) / nPgsz), 3);
}
static int mergeWorkerPersistAndRelease(MergeWorker *pMW){
int rc;
int i;
assert( pMW->pPage || (pMW->aSave[0].bStore==0 && pMW->aSave[1].bStore==0) );
rc = lsmFsPagePersist(pMW->pPage);
for(i=0; i<2; i++){
if( pMW->aSave[i].bStore ){
pMW->aSave[i].iPgno = lsmFsPageNumber(pMW->pPage);
pMW->aSave[i].bStore = 0;
}
}
lsmFsPageRelease(pMW->pPage);
pMW->pPage = 0;
return rc;
}
static int mergeWorkerNextPage(
MergeWorker *pMW,
LsmPgno iFPtr
){
int rc = LSM_OK;
Page *pNext = 0;
lsm_db *pDb = pMW->pDb;
rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pMW->pLevel, 0, &pNext);
assert( rc || pMW->pLevel->lhs.iFirst>0 || pMW->pDb->compress.xCompress );
if( rc==LSM_OK ){
u8 *aData;
int nData;
rc = mergeWorkerPersistAndRelease(pMW);
pMW->pPage = pNext;
pMW->pLevel->pMerge->iOutputOff = 0;
aData = fsPageData(pNext, &nData);
lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], 0);
lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iFPtr);
pMW->nWork++;
}
return rc;
}
static int mergeWorkerData(
MergeWorker *pMW,
int bSep,
LsmPgno iFPtr,
u8 *aWrite,
int nWrite
){
int rc = LSM_OK;
int nRem = nWrite;
while( rc==LSM_OK && nRem>0 ){
Merge *pMerge = pMW->pLevel->pMerge;
int nCopy;
u8 *aData;
int nData;
int nRec;
int iOff;
assert( lsmFsPageWritable(pMW->pPage) );
aData = fsPageData(pMW->pPage, &nData);
nRec = pageGetNRec(aData, nData);
iOff = pMerge->iOutputOff;
nCopy = LSM_MIN(nRem, SEGMENT_EOF(nData, nRec) - iOff);
memcpy(&aData[iOff], &aWrite[nWrite-nRem], nCopy);
nRem -= nCopy;
if( nRem>0 ){
rc = mergeWorkerNextPage(pMW, iFPtr);
}else{
pMerge->iOutputOff = iOff + nCopy;
}
}
return rc;
}
static int mergeWorkerFirstPage(MergeWorker *pMW){
int rc = LSM_OK;
Page *pPg = 0;
LsmPgno iFPtr = 0;
MultiCursor *pCsr = pMW->pCsr;
assert( pMW->pPage==0 );
if( pCsr->pBtCsr ){
rc = LSM_OK;
iFPtr = pMW->pLevel->pNext->lhs.iFirst;
}else if( pCsr->nPtr>0 ){
Segment *pSeg;
pSeg = pCsr->aPtr[pCsr->nPtr-1].pSeg;
rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg, pSeg->iFirst, &pPg);
if( rc==LSM_OK ){
u8 *aData;
int nData;
aData = fsPageData(pPg, &nData);
iFPtr = pageGetPtr(aData, nData);
lsmFsPageRelease(pPg);
}
}
if( rc==LSM_OK ){
rc = mergeWorkerNextPage(pMW, iFPtr);
if( pCsr->pPrevMergePtr ) *pCsr->pPrevMergePtr = iFPtr;
pMW->aSave[0].bStore = 1;
}
return rc;
}
static int mergeWorkerWrite(
MergeWorker *pMW,
int eType,
void *pKey, int nKey,
void *pVal, int nVal,
LsmPgno iPtr
){
int rc = LSM_OK;
Merge *pMerge;
int nHdr;
Page *pPg;
u8 *aData;
int nData = 0;
int nRec = 0;
LsmPgno iFPtr = 0;
LsmPgno iRPtr = 0;
int iOff = 0;
Segment *pSeg;
int flags = 0;
int bFirst = 0;
pMerge = pMW->pLevel->pMerge;
pSeg = &pMW->pLevel->lhs;
if( pSeg->iFirst==0 && pMW->pPage==0 ){
rc = mergeWorkerFirstPage(pMW);
bFirst = 1;
}
pPg = pMW->pPage;
if( pPg ){
aData = fsPageData(pPg, &nData);
nRec = pageGetNRec(aData, nData);
iFPtr = pageGetPtr(aData, nData);
iRPtr = iPtr ? (iPtr - iFPtr) : 0;
}
if( rc==LSM_OK ){
nHdr = 1 + lsmVarintLen64(iRPtr) + lsmVarintLen32(nKey);
if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);
iOff = pMerge->iOutputOff;
if( iOff<0 || pPg==0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
if( iOff>=0 && pPg ){
assert( aData );
memset(&aData[iOff], 0, SEGMENT_EOF(nData, nRec)-iOff);
}
iFPtr = *pMW->pCsr->pPrevMergePtr;
iRPtr = iPtr ? (iPtr - iFPtr) : 0;
iOff = 0;
nRec = 0;
rc = mergeWorkerNextPage(pMW, iFPtr);
pPg = pMW->pPage;
}
}
if( rc==LSM_OK && nRec==0 && bFirst==0 ){
assert( pMerge->nSkip>=0 );
if( pMerge->nSkip==0 ){
rc = mergeWorkerPushHierarchy(pMW, rtTopic(eType), pKey, nKey);
assert( pMW->aSave[0].bStore==0 );
pMW->aSave[0].bStore = 1;
pMerge->nSkip = keyszToSkip(pMW->pDb->pFS, nKey);
}else{
pMerge->nSkip--;
flags = PGFTR_SKIP_THIS_FLAG;
}
if( pMerge->nSkip ) flags |= PGFTR_SKIP_NEXT_FLAG;
}
if( rc==LSM_OK ){
aData = fsPageData(pPg, &nData);
lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], (u16)(nRec+1));
lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], (u16)iOff);
if( flags ) lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], (u16)flags);
aData[iOff++] = (u8)eType;
iOff += lsmVarintPut64(&aData[iOff], iRPtr);
iOff += lsmVarintPut32(&aData[iOff], nKey);
if( rtIsWrite(eType) ) iOff += lsmVarintPut32(&aData[iOff], nVal);
pMerge->iOutputOff = iOff;
assert( iFPtr==pageGetPtr(aData, nData) );
rc = mergeWorkerData(pMW, 0, iFPtr+iRPtr, pKey, nKey);
if( rc==LSM_OK && rtIsWrite(eType) ){
if( rc==LSM_OK ){
rc = mergeWorkerData(pMW, 0, iFPtr+iRPtr, pVal, nVal);
}
}
}
return rc;
}
static void mergeWorkerShutdown(MergeWorker *pMW, int *pRc){
int i;
int rc = *pRc;
MultiCursor *pCsr = pMW->pCsr;
if( rc==LSM_OK && pCsr ){
Merge *pMerge = pMW->pLevel->pMerge;
if( lsmMCursorValid(pCsr) ){
int bBtree = (pCsr->pBtCsr!=0);
int iPtr;
assert( pMerge->nInput==0 || pMW->pLevel->nRight>0 );
assert( pMerge->nInput==0 || pMerge->nInput==(pCsr->nPtr+bBtree) );
for(i=0; i<(pMerge->nInput-bBtree); i++){
SegmentPtr *pPtr = &pCsr->aPtr[i];
if( pPtr->pPg ){
pMerge->aInput[i].iPg = lsmFsPageNumber(pPtr->pPg);
pMerge->aInput[i].iCell = pPtr->iCell;
}else{
pMerge->aInput[i].iPg = 0;
pMerge->aInput[i].iCell = 0;
}
}
if( bBtree && pMerge->nInput ){
assert( i==pCsr->nPtr );
btreeCursorPosition(pCsr->pBtCsr, &pMerge->aInput[i]);
}
iPtr = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
if( iPtr<pCsr->nPtr ){
pMerge->splitkey = pMerge->aInput[iPtr];
}else{
btreeCursorSplitkey(pCsr->pBtCsr, &pMerge->splitkey);
}
}
if( pMW->pPage ){
int nData;
u8 *aData = fsPageData(pMW->pPage, &nData);
int iOff = pMerge->iOutputOff;
int iEof = SEGMENT_EOF(nData, pageGetNRec(aData, nData));
memset(&aData[iOff], 0, iEof - iOff);
}
pMerge->iOutputOff = -1;
}
lsmMCursorClose(pCsr, 0);
if( rc==LSM_OK ) rc = mergeWorkerPersistAndRelease(pMW);
if( rc==LSM_OK ) rc = mergeWorkerBtreeIndirect(pMW);
if( rc==LSM_OK ) rc = mergeWorkerFinishHierarchy(pMW);
if( rc==LSM_OK ) rc = mergeWorkerAddPadding(pMW);
lsmFsFlushWaiting(pMW->pDb->pFS, &rc);
mergeWorkerReleaseAll(pMW);
lsmFree(pMW->pDb->pEnv, pMW->aGobble);
pMW->aGobble = 0;
pMW->pCsr = 0;
*pRc = rc;
}
static void mergeRangeDeletes(MultiCursor *pCsr, int *piVal, int *piFlags){
int f = *piFlags;
int iKey = pCsr->aTree[1];
int i;
assert( pCsr->flags & CURSOR_NEXT_OK );
if( pCsr->flags & CURSOR_IGNORE_DELETE ){
assert( (f & LSM_POINT_DELETE)==0 );
f &= ~(LSM_START_DELETE|LSM_END_DELETE);
}else{
for(i=0; i<(CURSOR_DATA_SEGMENT + pCsr->nPtr); i++){
if( i!=iKey ){
int eType;
void *pKey;
int nKey;
int res;
multiCursorGetKey(pCsr, i, &eType, &pKey, &nKey);
if( pKey ){
res = sortedKeyCompare(pCsr->pDb->xCmp,
rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData,
rtTopic(eType), pKey, nKey
);
assert( res<=0 );
if( res==0 ){
if( (f & (LSM_INSERT|LSM_POINT_DELETE))==0 ){
if( eType & LSM_INSERT ){
f |= LSM_INSERT;
*piVal = i;
}
else if( eType & LSM_POINT_DELETE ){
f |= LSM_POINT_DELETE;
}
}
f |= (eType & (LSM_END_DELETE|LSM_START_DELETE));
}
if( i>iKey && (eType & LSM_END_DELETE) && res<0 ){
if( f & (LSM_INSERT|LSM_POINT_DELETE) ){
f |= (LSM_END_DELETE|LSM_START_DELETE);
}else{
f = 0;
}
break;
}
}
}
}
assert( (f & LSM_INSERT)==0 || (f & LSM_POINT_DELETE)==0 );
if( (f & LSM_START_DELETE)
&& (f & LSM_END_DELETE)
&& (f & LSM_POINT_DELETE )
){
f = 0;
}
}
*piFlags = f;
}
static int mergeWorkerStep(MergeWorker *pMW){
lsm_db *pDb = pMW->pDb;
MultiCursor *pCsr;
int rc = LSM_OK;
int eType;
void *pKey; int nKey;
LsmPgno iPtr;
int iVal;
pCsr = pMW->pCsr;
lsmMCursorKey(pCsr, &pKey, &nKey);
eType = pCsr->eType;
iPtr = (pCsr->pPrevMergePtr ? *pCsr->pPrevMergePtr : 0);
if( pCsr->pBtCsr ){
BtreeCursor *pBtCsr = pCsr->pBtCsr;
if( pBtCsr->pKey ){
int res = rtTopic(pBtCsr->eType) - rtTopic(eType);
if( res==0 ) res = pDb->xCmp(pBtCsr->pKey, pBtCsr->nKey, pKey, nKey);
if( 0==res ) iPtr = pBtCsr->iPtr;
assert( res>=0 );
}
}else if( pCsr->nPtr ){
SegmentPtr *pPtr = &pCsr->aPtr[pCsr->nPtr-1];
if( pPtr->pPg
&& 0==pDb->xCmp(pPtr->pKey, pPtr->nKey, pKey, nKey)
){
iPtr = pPtr->iPtr+pPtr->iPgPtr;
}
}
iVal = pCsr->aTree[1];
mergeRangeDeletes(pCsr, &iVal, &eType);
if( eType!=0 ){
if( pMW->aGobble ){
int iGobble = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
if( iGobble<pCsr->nPtr && iGobble>=0 ){
SegmentPtr *pGobble = &pCsr->aPtr[iGobble];
if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
pMW->aGobble[iGobble] = lsmFsPageNumber(pGobble->pPg);
}
}
}
if( rc==LSM_OK && (rtIsSeparator(eType)==0 || iPtr!=0) ){
void *pVal; int nVal;
rc = multiCursorGetVal(pCsr, iVal, &pVal, &nVal);
if( pVal && rc==LSM_OK ){
assert( nVal>=0 );
rc = sortedBlobSet(pDb->pEnv, &pCsr->val, pVal, nVal);
pVal = pCsr->val.pData;
}
if( rc==LSM_OK ){
rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pVal, nVal, iPtr);
}
}
}
assert( lsmMCursorValid(pMW->pCsr) );
if( rc==LSM_OK ) rc = lsmMCursorNext(pMW->pCsr);
return rc;
}
static int mergeWorkerDone(MergeWorker *pMW){
return pMW->pCsr==0 || !lsmMCursorValid(pMW->pCsr);
}
static void sortedFreeLevel(lsm_env *pEnv, Level *p){
if( p ){
lsmFree(pEnv, p->pSplitKey);
lsmFree(pEnv, p->pMerge);
lsmFree(pEnv, p->aRhs);
lsmFree(pEnv, p);
}
}
static void sortedInvokeWorkHook(lsm_db *pDb){
if( pDb->xWork ){
pDb->xWork(pDb, pDb->pWorkCtx);
}
}
static int sortedNewToplevel(
lsm_db *pDb,
int eTree,
int *pnWrite
){
int rc = LSM_OK;
MultiCursor *pCsr = 0;
Level *pNext = 0;
Level *pNew;
Segment *pLinked = 0;
Level *pDel = 0;
int nWrite = 0;
Freelist freelist;
if( eTree!=TREE_NONE ){
rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk);
}
assert( pDb->bUseFreelist==0 );
pDb->pFreelist = &freelist;
pDb->bUseFreelist = 1;
memset(&freelist, 0, sizeof(freelist));
pNext = lsmDbSnapshotLevel(pDb->pWorker);
pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
if( pNew ){
pNew->pNext = pNext;
lsmDbSnapshotSetLevel(pDb->pWorker, pNew);
}
pCsr = multiCursorNew(pDb, &rc);
if( pCsr ){
pCsr->pDb = pDb;
rc = multiCursorVisitFreelist(pCsr);
if( rc==LSM_OK ){
rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
}
if( rc==LSM_OK && pNext && pNext->pMerge==0 ){
if( (pNext->flags & LEVEL_FREELIST_ONLY) ){
pDel = pNext;
pCsr->aPtr = lsmMallocZeroRc(pDb->pEnv, sizeof(SegmentPtr), &rc);
multiCursorAddOne(pCsr, pNext, &rc);
}else if( eTree!=TREE_NONE && pNext->lhs.iRoot ){
pLinked = &pNext->lhs;
rc = btreeCursorNew(pDb, pLinked, &pCsr->pBtCsr);
}
}
if( pNext==0 ){
multiCursorIgnoreDelete(pCsr);
}
}
if( rc!=LSM_OK ){
lsmMCursorClose(pCsr, 0);
}else{
LsmPgno iLeftPtr = 0;
Merge merge;
MergeWorker mergeworker;
memset(&merge, 0, sizeof(Merge));
memset(&mergeworker, 0, sizeof(MergeWorker));
pNew->pMerge = &merge;
pNew->flags |= LEVEL_INCOMPLETE;
mergeworker.pDb = pDb;
mergeworker.pLevel = pNew;
mergeworker.pCsr = pCsr;
pCsr->pPrevMergePtr = &iLeftPtr;
mergeworker.bFlush = 1;
if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr);
while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
rc = mergeWorkerStep(&mergeworker);
}
mergeWorkerShutdown(&mergeworker, &rc);
assert( rc!=LSM_OK || mergeworker.nWork==0 || pNew->lhs.iFirst );
if( rc==LSM_OK && pNew->lhs.iFirst ){
rc = lsmFsSortedFinish(pDb->pFS, &pNew->lhs);
}
nWrite = mergeworker.nWork;
pNew->flags &= ~LEVEL_INCOMPLETE;
if( eTree==TREE_NONE ){
pNew->flags |= LEVEL_FREELIST_ONLY;
}
pNew->pMerge = 0;
}
if( rc!=LSM_OK || pNew->lhs.iFirst==0 ){
assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 );
lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
sortedFreeLevel(pDb->pEnv, pNew);
}else{
if( pLinked ){
pLinked->iRoot = 0;
}else if( pDel ){
assert( pNew->pNext==pDel );
pNew->pNext = pDel->pNext;
lsmFsSortedDelete(pDb->pFS, pDb->pWorker, 1, &pDel->lhs);
sortedFreeLevel(pDb->pEnv, pDel);
}
#if LSM_LOG_STRUCTURE
lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, "new-toplevel");
#endif
if( freelist.nEntry ){
Freelist *p = &pDb->pWorker->freelist;
lsmFree(pDb->pEnv, p->aEntry);
memcpy(p, &freelist, sizeof(freelist));
freelist.aEntry = 0;
}else{
pDb->pWorker->freelist.nEntry = 0;
}
assertBtreeOk(pDb, &pNew->lhs);
sortedInvokeWorkHook(pDb);
}
if( pnWrite ) *pnWrite = nWrite;
pDb->pWorker->nWrite += nWrite;
pDb->pFreelist = 0;
pDb->bUseFreelist = 0;
lsmFree(pDb->pEnv, freelist.aEntry);
return rc;
}
static int sortedMergeSetup(
lsm_db *pDb,
Level *pLevel,
int nMerge,
Level **ppNew
){
int rc = LSM_OK;
Level *pNew;
int bUseNext = 0;
Merge *pMerge;
int nByte;
#ifdef LSM_DEBUG
int iLevel;
Level *pX = pLevel;
for(iLevel=0; iLevel<nMerge; iLevel++){
assert( pX->nRight==0 );
pX = pX->pNext;
}
#endif
pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
if( pNew ){
pNew->aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv,
nMerge * sizeof(Segment), &rc);
}
if( rc==LSM_OK ){
Level *pNext = 0;
int i;
int bFreeOnly = 1;
Level *pTopLevel;
Level *p = pLevel;
Level **pp;
pNew->nRight = nMerge;
pNew->iAge = pLevel->iAge+1;
for(i=0; i<nMerge; i++){
assert( p->nRight==0 );
pNext = p->pNext;
pNew->aRhs[i] = p->lhs;
if( (p->flags & LEVEL_FREELIST_ONLY)==0 ) bFreeOnly = 0;
sortedFreeLevel(pDb->pEnv, p);
p = pNext;
}
if( bFreeOnly ) pNew->flags |= LEVEL_FREELIST_ONLY;
pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
pNew->pNext = p;
for(pp=&pTopLevel; *pp!=pLevel; pp=&((*pp)->pNext));
*pp = pNew;
lsmDbSnapshotSetLevel(pDb->pWorker, pTopLevel);
if( pNext && pNext->pMerge==0 && pNext->lhs.iRoot && pNext
&& (bFreeOnly==0 || (pNext->flags & LEVEL_FREELIST_ONLY))
){
bUseNext = 1;
}
}
nByte = sizeof(Merge) + sizeof(MergeInput) * (nMerge + bUseNext);
pMerge = (Merge *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
if( pMerge ){
pMerge->aInput = (MergeInput *)&pMerge[1];
pMerge->nInput = nMerge + bUseNext;
pNew->pMerge = pMerge;
}
*ppNew = pNew;
return rc;
}
static int mergeWorkerInit(
lsm_db *pDb,
Level *pLevel,
MergeWorker *pMW
){
int rc = LSM_OK;
Merge *pMerge = pLevel->pMerge;
MultiCursor *pCsr = 0;
Level *pNext = pLevel->pNext;
assert( pDb->pWorker );
assert( pLevel->pMerge );
assert( pLevel->nRight>0 );
memset(pMW, 0, sizeof(MergeWorker));
pMW->pDb = pDb;
pMW->pLevel = pLevel;
pMW->aGobble = lsmMallocZeroRc(pDb->pEnv, sizeof(LsmPgno)*pLevel->nRight,&rc);
pCsr = multiCursorNew(pDb, &rc);
if( pCsr ){
pCsr->flags |= CURSOR_NEXT_OK;
rc = multiCursorAddRhs(pCsr, pLevel);
}
if( rc==LSM_OK && pMerge->nInput > pLevel->nRight ){
rc = btreeCursorNew(pDb, &pNext->lhs, &pCsr->pBtCsr);
}else if( pNext ){
multiCursorReadSeparators(pCsr);
}else{
multiCursorIgnoreDelete(pCsr);
}
assert( rc!=LSM_OK || pMerge->nInput==(pCsr->nPtr+(pCsr->pBtCsr!=0)) );
pMW->pCsr = pCsr;
if( rc==LSM_OK ) rc = mergeWorkerLoadHierarchy(pMW);
if( rc==LSM_OK && pMW->hier.nHier==0 ){
pMW->aSave[0].iPgno = pLevel->lhs.iFirst;
}
if( rc==LSM_OK ){
pCsr->pPrevMergePtr = &pMerge->iCurrentPtr;
if( pLevel->lhs.iFirst==0 ){
rc = multiCursorEnd(pCsr, 0);
}else{
int i;
for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
MergeInput *pInput = &pMerge->aInput[i];
if( pInput->iPg ){
SegmentPtr *pPtr;
assert( pCsr->aPtr[i].pPg==0 );
pPtr = &pCsr->aPtr[i];
rc = segmentPtrLoadPage(pDb->pFS, pPtr, pInput->iPg);
if( rc==LSM_OK && pPtr->nCell>0 ){
rc = segmentPtrLoadCell(pPtr, pInput->iCell);
}
}
}
if( rc==LSM_OK && pCsr->pBtCsr ){
int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
assert( i==pCsr->nPtr );
rc = btreeCursorRestore(pCsr->pBtCsr, xCmp, &pMerge->aInput[i]);
}
if( rc==LSM_OK ){
rc = multiCursorSetupTree(pCsr, 0);
}
}
pCsr->flags |= CURSOR_NEXT_OK;
}
return rc;
}
static int sortedBtreeGobble(
lsm_db *pDb,
MultiCursor *pCsr,
int iGobble
){
int rc = LSM_OK;
if( rtTopic(pCsr->eType)==0 ){
Segment *pSeg = pCsr->aPtr[iGobble].pSeg;
LsmPgno *aPg;
int nPg;
assert( pSeg->iRoot>0 );
aPg = lsmMallocZeroRc(pDb->pEnv, sizeof(LsmPgno)*32, &rc);
if( rc==LSM_OK ){
rc = seekInBtree(pCsr, pSeg,
rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData, aPg, 0
);
}
if( rc==LSM_OK ){
for(nPg=0; aPg[nPg]; nPg++);
lsmFsGobble(pDb, pSeg, aPg, nPg);
}
lsmFree(pDb->pEnv, aPg);
}
return rc;
}
static int sortedCountLevels(Level *p){
int iAge = p->iAge;
int nRet = 0;
do {
nRet++;
p = p->pNext;
}while( p && p->iAge==iAge );
return nRet;
}
static int sortedSelectLevel(lsm_db *pDb, int nMerge, Level **ppOut){
Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
int rc = LSM_OK;
Level *pLevel = 0;
Level *pBest = 0;
int nBest;
Level *pThis = 0;
int nThis = 0;
assert( nMerge>=1 );
nBest = LSM_MAX(1, nMerge-1);
for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){
nThis++;
}else{
if( nThis>nBest ){
if( (pLevel->iAge!=pThis->iAge+1)
|| (pLevel->nRight==0 && sortedCountLevels(pLevel)<=pDb->nMerge)
){
pBest = pThis;
nBest = nThis;
}
}
if( pLevel->nRight ){
if( pLevel->nRight>nBest ){
nBest = pLevel->nRight;
pBest = pLevel;
}
nThis = 0;
pThis = 0;
}else{
pThis = pLevel;
nThis = 1;
}
}
}
if( nThis>nBest ){
assert( pThis );
pBest = pThis;
nBest = nThis;
}
if( pBest==0 && nMerge==1 ){
int nFree = 0;
int nUsr = 0;
for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
assert( !pLevel->nRight );
if( pLevel->flags & LEVEL_FREELIST_ONLY ){
nFree++;
}else{
nUsr++;
}
}
if( nUsr>1 ){
pBest = pTopLevel;
nBest = nFree + nUsr;
}
}
if( pBest ){
if( pBest->nRight==0 ){
rc = sortedMergeSetup(pDb, pBest, nBest, ppOut);
}else{
*ppOut = pBest;
}
}
return rc;
}
static int sortedDbIsFull(lsm_db *pDb){
Level *pTop = lsmDbSnapshotLevel(pDb->pWorker);
if( lsmDatabaseFull(pDb) ) return 1;
if( pTop && pTop->iAge==0
&& (pTop->nRight || sortedCountLevels(pTop)>=pDb->nMerge)
){
return 1;
}
return 0;
}
typedef struct MoveBlockCtx MoveBlockCtx;
struct MoveBlockCtx {
int iSeen;
int iFrom;
};
static int moveBlockCb(void *pCtx, int iBlk, i64 iSnapshot){
MoveBlockCtx *p = (MoveBlockCtx *)pCtx;
assert( p->iFrom==0 );
if( iBlk==(p->iSeen-1) ){
p->iSeen = iBlk;
return 0;
}
p->iFrom = p->iSeen-1;
return 1;
}
static int sortedMoveBlock(lsm_db *pDb, int *pnWrite){
Snapshot *p = pDb->pWorker;
Level *pLvl = lsmDbSnapshotLevel(p);
int iFrom;
int iTo;
int rc;
MoveBlockCtx sCtx;
assert( pLvl->pNext==0 && pLvl->nRight==0 );
assert( p->redirect.n<=LSM_MAX_BLOCK_REDIRECTS );
*pnWrite = 0;
if( p->redirect.n>=LSM_MAX_BLOCK_REDIRECTS ) return LSM_OK;
sCtx.iSeen = p->nBlock+1;
sCtx.iFrom = 0;
rc = lsmWalkFreelist(pDb, 1, moveBlockCb, &sCtx);
if( rc!=LSM_OK || sCtx.iFrom==0 ) return rc;
iFrom = sCtx.iFrom;
rc = lsmBlockAllocate(pDb, iFrom, &iTo);
if( rc!=LSM_OK || iTo==0 ) return rc;
assert( iTo!=1 && iTo<iFrom );
rc = lsmFsMoveBlock(pDb->pFS, &pLvl->lhs, iTo, iFrom);
if( rc==LSM_OK ){
if( p->redirect.a==0 ){
int nByte = sizeof(struct RedirectEntry) * LSM_MAX_BLOCK_REDIRECTS;
p->redirect.a = lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
}
if( rc==LSM_OK ){
int i;
for(i=0; i<p->redirect.n; i++){
if( p->redirect.a[i].iTo==iFrom ) break;
}
if( i==p->redirect.n ){
memmove(&p->redirect.a[1], &p->redirect.a[0],
sizeof(struct RedirectEntry) * p->redirect.n
);
p->redirect.a[0].iFrom = iFrom;
p->redirect.a[0].iTo = iTo;
p->redirect.n++;
}else{
p->redirect.a[i].iTo = iTo;
}
rc = lsmBlockFree(pDb, iFrom);
*pnWrite = lsmFsBlockSize(pDb->pFS) / lsmFsPageSize(pDb->pFS);
pLvl->lhs.pRedirect = &p->redirect;
}
}
#if LSM_LOG_STRUCTURE
if( rc==LSM_OK ){
char aBuf[64];
sprintf(aBuf, "move-block %d/%d", p->redirect.n-1, LSM_MAX_BLOCK_REDIRECTS);
lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, aBuf);
}
#endif
return rc;
}
static int mergeInsertFreelistSegments(
lsm_db *pDb,
int nFree,
MergeWorker *pMW
){
int rc = LSM_OK;
if( nFree>0 ){
MultiCursor *pCsr = pMW->pCsr;
Level *pLvl = pMW->pLevel;
SegmentPtr *aNew1;
Segment *aNew2;
Level *pIter;
Level *pNext;
int i = 0;
aNew1 = (SegmentPtr *)lsmMallocZeroRc(
pDb->pEnv, sizeof(SegmentPtr) * (pCsr->nPtr+nFree), &rc
);
if( rc ) return rc;
memcpy(&aNew1[nFree], pCsr->aPtr, sizeof(SegmentPtr)*pCsr->nPtr);
pCsr->nPtr += nFree;
lsmFree(pDb->pEnv, pCsr->aTree);
lsmFree(pDb->pEnv, pCsr->aPtr);
pCsr->aTree = 0;
pCsr->aPtr = aNew1;
aNew2 = (Segment *)lsmMallocZeroRc(
pDb->pEnv, sizeof(Segment) * (pLvl->nRight+nFree), &rc
);
if( rc ) return rc;
memcpy(&aNew2[nFree], pLvl->aRhs, sizeof(Segment)*pLvl->nRight);
pLvl->nRight += nFree;
lsmFree(pDb->pEnv, pLvl->aRhs);
pLvl->aRhs = aNew2;
for(pIter=pDb->pWorker->pLevel; rc==LSM_OK && pIter!=pLvl; pIter=pNext){
Segment *pSeg = &pLvl->aRhs[i];
memcpy(pSeg, &pIter->lhs, sizeof(Segment));
pCsr->aPtr[i].pSeg = pSeg;
pCsr->aPtr[i].pLevel = pLvl;
rc = segmentPtrEnd(pCsr, &pCsr->aPtr[i], 0);
pDb->pWorker->pLevel = pNext = pIter->pNext;
sortedFreeLevel(pDb->pEnv, pIter);
i++;
}
assert( i==nFree );
assert( rc!=LSM_OK || pDb->pWorker->pLevel==pLvl );
for(i=nFree; i<pCsr->nPtr; i++){
pCsr->aPtr[i].pSeg = &pLvl->aRhs[i];
}
lsmFree(pDb->pEnv, pMW->aGobble);
pMW->aGobble = 0;
}
return rc;
}
static int sortedWork(
lsm_db *pDb,
int nWork,
int nMerge,
int bFlush,
int *pnWrite
){
int rc = LSM_OK;
int nRemaining = nWork;
Snapshot *pWorker = pDb->pWorker;
assert( pWorker );
if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK;
while( nRemaining>0 ){
Level *pLevel = 0;
rc = sortedSelectLevel(pDb, nMerge, &pLevel);
assert( rc==LSM_OK || pLevel==0 );
if( pLevel==0 ){
int nDone = 0;
Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
if( bFlush==0 && nMerge==1 && pTopLevel && pTopLevel->pNext==0 ){
rc = sortedMoveBlock(pDb, &nDone);
}
nRemaining -= nDone;
if( nDone==0 ) break;
}else{
int bSave = 0;
Freelist freelist = {0, 0, 0};
MergeWorker mergeworker;
assert( pDb->bIncrMerge==0 );
assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 );
pDb->bIncrMerge = 1;
rc = mergeWorkerInit(pDb, pLevel, &mergeworker);
assert( mergeworker.nWork==0 );
while( rc==LSM_OK
&& 0==mergeWorkerDone(&mergeworker)
&& (mergeworker.nWork<nRemaining || pDb->bUseFreelist)
){
int eType = rtTopic(mergeworker.pCsr->eType);
rc = mergeWorkerStep(&mergeworker);
if( rc==LSM_OK && nMerge==1 && eType==0
&& (rtTopic(mergeworker.pCsr->eType) || mergeWorkerDone(&mergeworker))
){
int nFree = 0;
Level *pLvl;
assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 );
for(pLvl=pDb->pWorker->pLevel;
pLvl!=mergeworker.pLevel && (pLvl->flags & LEVEL_FREELIST_ONLY);
pLvl=pLvl->pNext
){
assert( pLvl->nRight==0 );
nFree++;
}
if( pLvl==mergeworker.pLevel ){
rc = mergeInsertFreelistSegments(pDb, nFree, &mergeworker);
if( rc==LSM_OK ){
rc = multiCursorVisitFreelist(mergeworker.pCsr);
}
if( rc==LSM_OK ){
rc = multiCursorSetupTree(mergeworker.pCsr, 0);
pDb->pFreelist = &freelist;
pDb->bUseFreelist = 1;
}
}
}
}
nRemaining -= LSM_MAX(mergeworker.nWork, 1);
if( rc==LSM_OK ){
if( mergeWorkerDone(&mergeworker)==0 ){
int i;
for(i=0; i<pLevel->nRight; i++){
SegmentPtr *pGobble = &mergeworker.pCsr->aPtr[i];
if( pGobble->pSeg->iRoot ){
rc = sortedBtreeGobble(pDb, mergeworker.pCsr, i);
}else if( mergeworker.aGobble[i] ){
lsmFsGobble(pDb, pGobble->pSeg, &mergeworker.aGobble[i], 1);
}
}
}else{
int i;
int bEmpty;
mergeWorkerShutdown(&mergeworker, &rc);
bEmpty = (pLevel->lhs.iFirst==0);
if( bEmpty==0 && rc==LSM_OK ){
rc = lsmFsSortedFinish(pDb->pFS, &pLevel->lhs);
}
if( pDb->bUseFreelist ){
Freelist *p = &pDb->pWorker->freelist;
lsmFree(pDb->pEnv, p->aEntry);
memcpy(p, &freelist, sizeof(freelist));
pDb->bUseFreelist = 0;
pDb->pFreelist = 0;
bSave = 1;
}
for(i=0; i<pLevel->nRight; i++){
lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]);
}
if( bEmpty ){
Level *pTop;
Level **pp;
assert( pLevel->pNext==0 );
pTop = lsmDbSnapshotLevel(pWorker);
for(pp=&pTop; *pp!=pLevel; pp=&((*pp)->pNext));
*pp = pLevel->pNext;
lsmDbSnapshotSetLevel(pWorker, pTop);
sortedFreeLevel(pDb->pEnv, pLevel);
}else{
if( pLevel->pMerge->nInput > pLevel->nRight ){
assert( pLevel->pNext->lhs.iRoot );
pLevel->pNext->lhs.iRoot = 0;
}
lsmFree(pDb->pEnv, pLevel->aRhs);
pLevel->nRight = 0;
pLevel->aRhs = 0;
lsmFree(pDb->pEnv, pLevel->pMerge);
pLevel->pMerge = 0;
}
if( bSave && rc==LSM_OK ){
pDb->bIncrMerge = 0;
rc = lsmSaveWorker(pDb, 0);
}
}
}
mergeWorkerShutdown(&mergeworker, &rc);
pDb->bIncrMerge = 0;
if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);
#if LSM_LOG_STRUCTURE
lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, "work");
#endif
assertBtreeOk(pDb, &pLevel->lhs);
assertRunInOrder(pDb, &pLevel->lhs);
if( bFlush && sortedDbIsFull(pDb)==0 ) break;
}
}
if( pnWrite ) *pnWrite = (nWork - nRemaining);
pWorker->nWrite += (nWork - nRemaining);
#ifdef LSM_LOG_WORK
lsmLogMessage(pDb, rc, "sortedWork(): %d pages", (nWork-nRemaining));
#endif
return rc;
}
static int sortedTreeHasOld(lsm_db *pDb, int *pRc){
int rc = LSM_OK;
int bRet = 0;
assert( pDb->pWorker );
if( *pRc==LSM_OK ){
if( rc==LSM_OK
&& pDb->treehdr.iOldShmid
&& pDb->treehdr.iOldLog!=pDb->pWorker->iLogOff
){
bRet = 1;
}else{
bRet = 0;
}
*pRc = rc;
}
assert( *pRc==LSM_OK || bRet==0 );
return bRet;
}
static int sortedNewFreelistOnly(lsm_db *pDb){
return sortedNewToplevel(pDb, TREE_NONE, 0);
}
int lsmSaveWorker(lsm_db *pDb, int bFlush){
Snapshot *p = pDb->pWorker;
if( p->freelist.nEntry>pDb->nMaxFreelist ){
int rc = sortedNewFreelistOnly(pDb);
if( rc!=LSM_OK ) return rc;
}
return lsmCheckpointSaveWorker(pDb, bFlush);
}
static int doLsmSingleWork(
lsm_db *pDb,
int bShutdown,
int nMerge,
int nPage,
int *pnWrite,
int *pbCkpt
){
Snapshot *pWorker;
int rc = LSM_OK;
int bDirty = 0;
int nMax = nPage;
int nRem = nPage;
int bCkpt = 0;
assert( nPage>0 );
assert( pDb->pWorker==0 );
rc = lsmBeginWork(pDb);
if( rc!=LSM_OK ) return rc;
pWorker = pDb->pWorker;
if( bShutdown==0 && pDb->nAutockpt ){
u32 nSync;
u32 nUnsync;
int nPgsz;
lsmCheckpointSynced(pDb, 0, 0, &nSync);
nUnsync = lsmCheckpointNWrite(pDb->pShmhdr->aSnap1, 0);
nPgsz = lsmCheckpointPgsz(pDb->pShmhdr->aSnap1);
nMax = (int)LSM_MIN(nMax, (pDb->nAutockpt/nPgsz) - (int)(nUnsync-nSync));
if( nMax<nRem ){
bCkpt = 1;
nRem = LSM_MAX(nMax, 0);
}
}
if( pDb->nTransOpen==0 ){
rc = lsmTreeLoadHeader(pDb, 0);
}
if( sortedTreeHasOld(pDb, &rc) ){
if( sortedDbIsFull(pDb) ){
int nPg = 0;
rc = sortedWork(pDb, nRem, nMerge, 1, &nPg);
nRem -= nPg;
assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) );
bDirty = 1;
}
if( rc==LSM_OK && nRem>0 ){
int nPg = 0;
rc = sortedNewToplevel(pDb, TREE_OLD, &nPg);
nRem -= nPg;
if( rc==LSM_OK ){
if( pDb->nTransOpen>0 ){
lsmTreeDiscardOld(pDb);
}
rc = lsmSaveWorker(pDb, 1);
bDirty = 0;
}
}
}
if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
int nPg = 0;
rc = sortedWork(pDb, nRem, nMerge, 0, &nPg);
nRem -= nPg;
if( nPg ) bDirty = 1;
}
if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){
while( rc==LSM_OK && lsmDatabaseFull(pDb) ){
int nPg = 0;
rc = sortedWork(pDb, 16, nMerge, 1, &nPg);
nRem -= nPg;
}
if( rc==LSM_OK ){
rc = sortedNewFreelistOnly(pDb);
}
bDirty = 1;
}
if( rc==LSM_OK ){
*pnWrite = (nMax - nRem);
*pbCkpt = (bCkpt && nRem<=0);
if( nMerge==1 && pDb->nAutockpt>0 && *pnWrite>0
&& pWorker->pLevel
&& pWorker->pLevel->nRight==0
&& pWorker->pLevel->pNext==0
){
*pbCkpt = 1;
}
}
if( rc==LSM_OK && bDirty ){
lsmFinishWork(pDb, 0, &rc);
}else{
int rcdummy = LSM_BUSY;
lsmFinishWork(pDb, 0, &rcdummy);
*pnWrite = 0;
}
assert( pDb->pWorker==0 );
return rc;
}
static int doLsmWork(lsm_db *pDb, int nMerge, int nPage, int *pnWrite){
int rc = LSM_OK;
int nWrite = 0;
assert( nMerge>=1 );
if( nPage!=0 ){
int bCkpt = 0;
do {
int nThis = 0;
int nReq = (nPage>=0) ? (nPage-nWrite) : ((int)0x7FFFFFFF);
bCkpt = 0;
rc = doLsmSingleWork(pDb, 0, nMerge, nReq, &nThis, &bCkpt);
nWrite += nThis;
if( rc==LSM_OK && bCkpt ){
rc = lsm_checkpoint(pDb, 0);
}
}while( rc==LSM_OK && bCkpt && (nWrite<nPage || nPage<0) );
}
if( pnWrite ){
if( rc==LSM_OK ){
*pnWrite = nWrite;
}else{
*pnWrite = 0;
}
}
return rc;
}
int lsm_work(lsm_db *pDb, int nMerge, int nKB, int *pnWrite){
int rc;
int nPgsz;
int nPage;
int nWrite = 0;
if( pDb->nTransOpen || pDb->pCsr ) return LSM_MISUSE_BKPT;
if( nMerge<=0 ) nMerge = pDb->nMerge;
lsmFsPurgeCache(pDb->pFS);
nPgsz = lsmFsPageSize(pDb->pFS);
if( nKB>=0 ){
nPage = ((i64)nKB * 1024 + nPgsz - 1) / nPgsz;
}else{
nPage = -1;
}
rc = doLsmWork(pDb, nMerge, nPage, &nWrite);
if( pnWrite ){
*pnWrite = (int)(((i64)nWrite * 1024 + nPgsz - 1) / nPgsz);
}
return rc;
}
int lsm_flush(lsm_db *db){
int rc;
if( db->nTransOpen>0 || db->pCsr ){
rc = LSM_MISUSE_BKPT;
}else{
rc = lsmBeginWriteTrans(db);
if( rc==LSM_OK ){
lsmFlushTreeToDisk(db);
lsmTreeDiscardOld(db);
lsmTreeMakeOld(db);
lsmTreeDiscardOld(db);
}
if( rc==LSM_OK ){
rc = lsmFinishWriteTrans(db, 1);
}else{
lsmFinishWriteTrans(db, 0);
}
lsmFinishReadTrans(db);
}
return rc;
}
int lsmSortedAutoWork(
lsm_db *pDb,
int nUnit
){
int rc = LSM_OK;
int nDepth = 0;
Level *pLevel;
int bRestore = 0;
assert( pDb->pWorker==0 );
assert( pDb->nTransOpen>0 );
for(pLevel=lsmDbSnapshotLevel(pDb->pClient); pLevel; pLevel=pLevel->pNext){
nDepth += 1;
}
if( lsmTreeHasOld(pDb) ){
nDepth += 1;
bRestore = 1;
rc = lsmSaveCursors(pDb);
if( rc!=LSM_OK ) return rc;
}
if( nDepth>0 ){
int nRemaining;
nRemaining = nUnit * nDepth;
#ifdef LSM_LOG_WORK
lsmLogMessage(pDb, rc, "lsmSortedAutoWork(): %d*%d = %d pages",
nUnit, nDepth, nRemaining);
#endif
assert( nRemaining>=0 );
rc = doLsmWork(pDb, pDb->nMerge, nRemaining, 0);
if( rc==LSM_BUSY ) rc = LSM_OK;
if( bRestore && pDb->pCsr ){
lsmMCursorFreeCache(pDb);
lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
pDb->pClient = 0;
if( rc==LSM_OK ){
rc = lsmCheckpointLoad(pDb, 0);
}
if( rc==LSM_OK ){
rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
}
if( rc==LSM_OK ){
rc = lsmRestoreCursors(pDb);
}
}
}
return rc;
}
int lsmFlushTreeToDisk(lsm_db *pDb){
int rc;
rc = lsmBeginWork(pDb);
while( rc==LSM_OK && sortedDbIsFull(pDb) ){
rc = sortedWork(pDb, 256, pDb->nMerge, 1, 0);
}
if( rc==LSM_OK ){
rc = sortedNewToplevel(pDb, TREE_BOTH, 0);
}
lsmFinishWork(pDb, 1, &rc);
return rc;
}
static char *segToString(lsm_env *pEnv, Segment *pSeg, int nMin){
LsmPgno nSize = pSeg->nSize;
LsmPgno iRoot = pSeg->iRoot;
LsmPgno iFirst = pSeg->iFirst;
LsmPgno iLast = pSeg->iLastPg;
char *z;
char *z1;
char *z2;
int nPad;
z1 = lsmMallocPrintf(pEnv, "%d.%d", iFirst, iLast);
if( iRoot ){
z2 = lsmMallocPrintf(pEnv, "root=%lld", iRoot);
}else{
z2 = lsmMallocPrintf(pEnv, "size=%lld", nSize);
}
nPad = nMin - 2 - strlen(z1) - 1 - strlen(z2);
nPad = LSM_MAX(0, nPad);
if( iRoot ){
z = lsmMallocPrintf(pEnv, "/%s %*s%s\\", z1, nPad, "", z2);
}else{
z = lsmMallocPrintf(pEnv, "|%s %*s%s|", z1, nPad, "", z2);
}
lsmFree(pEnv, z1);
lsmFree(pEnv, z2);
return z;
}
static int fileToString(
lsm_db *pDb,
char *aBuf,
int nBuf,
int nMin,
Segment *pSeg
){
int i = 0;
if( pSeg ){
char *zSeg;
zSeg = segToString(pDb->pEnv, pSeg, nMin);
snprintf(&aBuf[i], nBuf-i, "%s", zSeg);
i += strlen(&aBuf[i]);
lsmFree(pDb->pEnv, zSeg);
#ifdef LSM_LOG_FREELIST
lsmInfoArrayStructure(pDb, 1, pSeg->iFirst, &zSeg);
snprintf(&aBuf[i], nBuf-1, " (%s)", zSeg);
i += strlen(&aBuf[i]);
lsmFree(pDb->pEnv, zSeg);
#endif
aBuf[nBuf] = 0;
}else{
aBuf[0] = '\0';
}
return i;
}
void sortedDumpPage(lsm_db *pDb, Segment *pRun, Page *pPg, int bVals){
LsmBlob blob = {0, 0, 0};
LsmString s;
int i;
int nRec;
LsmPgno iPtr;
int flags;
u8 *aData;
int nData;
aData = fsPageData(pPg, &nData);
nRec = pageGetNRec(aData, nData);
iPtr = pageGetPtr(aData, nData);
flags = pageGetFlags(aData, nData);
lsmStringInit(&s, pDb->pEnv);
lsmStringAppendf(&s,"nCell=%d iPtr=%lld flags=%d {", nRec, iPtr, flags);
if( flags&SEGMENT_BTREE_FLAG ) iPtr = 0;
for(i=0; i<nRec; i++){
Page *pRef = 0;
int iChar;
u8 *aKey; int nKey = 0;
u8 *aVal = 0; int nVal = 0;
int iTopic;
u8 *aCell;
i64 iPgPtr;
int eType;
aCell = pageGetCell(aData, nData, i);
eType = *aCell++;
assert( (flags & SEGMENT_BTREE_FLAG) || eType!=0 );
aCell += lsmVarintGet64(aCell, &iPgPtr);
if( eType==0 ){
LsmPgno iRef;
aCell += lsmVarintGet64(aCell, &iRef);
lsmFsDbPageGet(pDb->pFS, pRun, iRef, &pRef);
aKey = pageGetKey(pRun, pRef, 0, &iTopic, &nKey, &blob);
}else{
aCell += lsmVarintGet32(aCell, &nKey);
if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal);
sortedReadData(0, pPg, (aCell-aData), nKey+nVal, (void **)&aKey, &blob);
aVal = &aKey[nKey];
iTopic = eType;
}
lsmStringAppendf(&s, "%s%2X:", (i==0?"":" "), iTopic);
for(iChar=0; iChar<nKey; iChar++){
lsmStringAppendf(&s, "%c", isalnum(aKey[iChar]) ? aKey[iChar] : '.');
}
if( nVal>0 && bVals ){
lsmStringAppendf(&s, "##");
for(iChar=0; iChar<nVal; iChar++){
lsmStringAppendf(&s, "%c", isalnum(aVal[iChar]) ? aVal[iChar] : '.');
}
}
lsmStringAppendf(&s, " %lld", iPgPtr+iPtr);
lsmFsPageRelease(pRef);
}
lsmStringAppend(&s, "}", 1);
lsmLogMessage(pDb, LSM_OK, " Page %d: %s", lsmFsPageNumber(pPg), s.z);
lsmStringClear(&s);
sortedBlobFree(&blob);
}
static void infoCellDump(
lsm_db *pDb,
Segment *pSeg,
int bIndirect,
Page *pPg,
int iCell,
int *peType,
int *piPgPtr,
u8 **paKey, int *pnKey,
u8 **paVal, int *pnVal,
LsmBlob *pBlob
){
u8 *aData; int nData;
u8 *aKey; int nKey = 0;
u8 *aVal = 0; int nVal = 0;
int eType;
int iPgPtr;
Page *pRef = 0;
u8 *aCell;
aData = fsPageData(pPg, &nData);
aCell = pageGetCell(aData, nData, iCell);
eType = *aCell++;
aCell += lsmVarintGet32(aCell, &iPgPtr);
if( eType==0 ){
int dummy;
LsmPgno iRef;
aCell += lsmVarintGet64(aCell, &iRef);
if( bIndirect ){
lsmFsDbPageGet(pDb->pFS, pSeg, iRef, &pRef);
pageGetKeyCopy(pDb->pEnv, pSeg, pRef, 0, &dummy, pBlob);
aKey = (u8 *)pBlob->pData;
nKey = pBlob->nData;
lsmFsPageRelease(pRef);
}else{
aKey = (u8 *)"<indirect>";
nKey = 11;
}
}else{
aCell += lsmVarintGet32(aCell, &nKey);
if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal);
sortedReadData(pSeg, pPg, (aCell-aData), nKey+nVal, (void **)&aKey, pBlob);
aVal = &aKey[nKey];
}
if( peType ) *peType = eType;
if( piPgPtr ) *piPgPtr = iPgPtr;
if( paKey ) *paKey = aKey;
if( paVal ) *paVal = aVal;
if( pnKey ) *pnKey = nKey;
if( pnVal ) *pnVal = nVal;
}
static int infoAppendBlob(LsmString *pStr, int bHex, u8 *z, int n){
int iChar;
for(iChar=0; iChar<n; iChar++){
if( bHex ){
lsmStringAppendf(pStr, "%02X", z[iChar]);
}else{
lsmStringAppendf(pStr, "%c", isalnum(z[iChar]) ?z[iChar] : '.');
}
}
return LSM_OK;
}
#define INFO_PAGE_DUMP_DATA 0x01
#define INFO_PAGE_DUMP_VALUES 0x02
#define INFO_PAGE_DUMP_HEX 0x04
#define INFO_PAGE_DUMP_INDIRECT 0x08
static int infoPageDump(
lsm_db *pDb,
LsmPgno iPg,
int flags,
char **pzOut
){
int rc = LSM_OK;
Page *pPg = 0;
int i, j;
const int perLine = 16;
Segment *pSeg = 0;
Snapshot *pSnap;
int bValues = (flags & INFO_PAGE_DUMP_VALUES);
int bHex = (flags & INFO_PAGE_DUMP_HEX);
int bData = (flags & INFO_PAGE_DUMP_DATA);
int bIndirect = (flags & INFO_PAGE_DUMP_INDIRECT);
*pzOut = 0;
if( iPg==0 ) return LSM_ERROR;
assert( pDb->pClient || pDb->pWorker );
pSnap = pDb->pClient;
if( pSnap==0 ) pSnap = pDb->pWorker;
if( pSnap->redirect.n>0 ){
Level *pLvl;
int bUse = 0;
for(pLvl=pSnap->pLevel; pLvl->pNext; pLvl=pLvl->pNext);
pSeg = (pLvl->nRight==0 ? &pLvl->lhs : &pLvl->aRhs[pLvl->nRight-1]);
rc = lsmFsSegmentContainsPg(pDb->pFS, pSeg, iPg, &bUse);
if( bUse==0 ){
pSeg = 0;
}
}
if( rc==LSM_OK ){
rc = lsmFsDbPageGet(pDb->pFS, 0, iPg, &pPg);
}
if( rc==LSM_OK ){
LsmBlob blob = {0, 0, 0, 0};
int nKeyWidth = 0;
LsmString str;
int nRec;
LsmPgno iPtr;
int flags2;
int iCell;
u8 *aData; int nData;
aData = fsPageData(pPg, &nData);
nRec = pageGetNRec(aData, nData);
iPtr = pageGetPtr(aData, nData);
flags2 = pageGetFlags(aData, nData);
lsmStringInit(&str, pDb->pEnv);
lsmStringAppendf(&str, "Page : %lld (%d bytes)\n", iPg, nData);
lsmStringAppendf(&str, "nRec : %d\n", nRec);
lsmStringAppendf(&str, "iPtr : %lld\n", iPtr);
lsmStringAppendf(&str, "flags: %04x\n", flags2);
lsmStringAppendf(&str, "\n");
for(iCell=0; iCell<nRec; iCell++){
int nKey;
infoCellDump(
pDb, pSeg, bIndirect, pPg, iCell, 0, 0, 0, &nKey, 0, 0, &blob
);
if( nKey>nKeyWidth ) nKeyWidth = nKey;
}
if( bHex ) nKeyWidth = nKeyWidth * 2;
for(iCell=0; iCell<nRec; iCell++){
u8 *aKey; int nKey = 0;
u8 *aVal; int nVal = 0;
int iPgPtr;
int eType;
LsmPgno iAbsPtr;
char zFlags[8];
infoCellDump(pDb, pSeg, bIndirect, pPg, iCell, &eType, &iPgPtr,
&aKey, &nKey, &aVal, &nVal, &blob
);
iAbsPtr = iPgPtr + ((flags2 & SEGMENT_BTREE_FLAG) ? 0 : iPtr);
lsmFlagsToString(eType, zFlags);
lsmStringAppendf(&str, "%s %d (%s) ",
zFlags, iAbsPtr, (rtTopic(eType) ? "sys" : "usr")
);
infoAppendBlob(&str, bHex, aKey, nKey);
if( nVal>0 && bValues ){
lsmStringAppendf(&str, "%*s", nKeyWidth - (nKey*(1+bHex)), "");
lsmStringAppendf(&str, " ");
infoAppendBlob(&str, bHex, aVal, nVal);
}
if( rtTopic(eType) ){
int iBlk = (int)~lsmGetU32(aKey);
lsmStringAppendf(&str, " (block=%d", iBlk);
if( nVal>0 ){
i64 iSnap = lsmGetU64(aVal);
lsmStringAppendf(&str, " snapshot=%lld", iSnap);
}
lsmStringAppendf(&str, ")");
}
lsmStringAppendf(&str, "\n");
}
if( bData ){
lsmStringAppendf(&str, "\n-------------------"
"-------------------------------------------------------------\n");
lsmStringAppendf(&str, "Page %d\n",
iPg, (iPg-1)*nData, iPg*nData - 1);
for(i=0; i<nData; i += perLine){
lsmStringAppendf(&str, "%04x: ", i);
for(j=0; j<perLine; j++){
if( i+j>nData ){
lsmStringAppendf(&str, " ");
}else{
lsmStringAppendf(&str, "%02x ", aData[i+j]);
}
}
lsmStringAppendf(&str, " ");
for(j=0; j<perLine; j++){
if( i+j>nData ){
lsmStringAppendf(&str, " ");
}else{
lsmStringAppendf(&str,"%c", isprint(aData[i+j]) ? aData[i+j] : '.');
}
}
lsmStringAppendf(&str,"\n");
}
}
*pzOut = str.z;
sortedBlobFree(&blob);
lsmFsPageRelease(pPg);
}
return rc;
}
int lsmInfoPageDump(
lsm_db *pDb,
LsmPgno iPg,
int bHex,
char **pzOut
){
int flags = INFO_PAGE_DUMP_DATA | INFO_PAGE_DUMP_VALUES;
if( bHex ) flags |= INFO_PAGE_DUMP_HEX;
return infoPageDump(pDb, iPg, flags, pzOut);
}
void sortedDumpSegment(lsm_db *pDb, Segment *pRun, int bVals){
assert( pDb->xLog );
if( pRun && pRun->iFirst ){
int flags = (bVals ? INFO_PAGE_DUMP_VALUES : 0);
char *zSeg;
Page *pPg;
zSeg = segToString(pDb->pEnv, pRun, 0);
lsmLogMessage(pDb, LSM_OK, "Segment: %s", zSeg);
lsmFree(pDb->pEnv, zSeg);
lsmFsDbPageGet(pDb->pFS, pRun, pRun->iFirst, &pPg);
while( pPg ){
Page *pNext;
char *z = 0;
infoPageDump(pDb, lsmFsPageNumber(pPg), flags, &z);
lsmLogMessage(pDb, LSM_OK, "%s", z);
lsmFree(pDb->pEnv, z);
#if 0#endif
lsmFsDbPageNext(pRun, pPg, 1, &pNext);
lsmFsPageRelease(pPg);
pPg = pNext;
}
}
}
void lsmSortedDumpStructure(
lsm_db *pDb,
Snapshot *pSnap,
int bKeys,
int bVals,
const char *zWhy
){
Snapshot *pDump = pSnap;
Level *pTopLevel;
char *zFree = 0;
assert( pSnap );
pTopLevel = lsmDbSnapshotLevel(pDump);
if( pDb->xLog && pTopLevel ){
static int nCall = 0;
Level *pLevel;
int iLevel = 0;
nCall++;
lsmLogMessage(pDb, LSM_OK, "Database structure %d (%s)", nCall, zWhy);
#if 0#endif
for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
char zLeft[1024];
char zRight[1024];
int i = 0;
Segment *aLeft[24];
Segment *aRight[24];
int nLeft = 0;
int nRight = 0;
Segment *pSeg = &pLevel->lhs;
aLeft[nLeft++] = pSeg;
for(i=0; i<pLevel->nRight; i++){
aRight[nRight++] = &pLevel->aRhs[i];
}
#ifdef LSM_LOG_FREELIST
if( nRight ){
memmove(&aRight[1], aRight, sizeof(aRight[0])*nRight);
aRight[0] = 0;
nRight++;
}
#endif
for(i=0; i<nLeft || i<nRight; i++){
int iPad = 0;
char zLevel[32];
zLeft[0] = '\0';
zRight[0] = '\0';
if( i<nLeft ){
fileToString(pDb, zLeft, sizeof(zLeft), 24, aLeft[i]);
}
if( i<nRight ){
fileToString(pDb, zRight, sizeof(zRight), 24, aRight[i]);
}
if( i==0 ){
snprintf(zLevel, sizeof(zLevel), "L%d: (age=%d) (flags=%.4x)",
iLevel, (int)pLevel->iAge, (int)pLevel->flags
);
}else{
zLevel[0] = '\0';
}
if( nRight==0 ){
iPad = 10;
}
lsmLogMessage(pDb, LSM_OK, "% 25s % *s% -35s %s",
zLevel, iPad, "", zLeft, zRight
);
}
iLevel++;
}
if( bKeys ){
for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
int i;
sortedDumpSegment(pDb, &pLevel->lhs, bVals);
for(i=0; i<pLevel->nRight; i++){
sortedDumpSegment(pDb, &pLevel->aRhs[i], bVals);
}
}
}
}
lsmInfoFreelist(pDb, &zFree);
lsmLogMessage(pDb, LSM_OK, "Freelist: %s", zFree);
lsmFree(pDb->pEnv, zFree);
assert( lsmFsIntegrityCheck(pDb) );
}
void lsmSortedFreeLevel(lsm_env *pEnv, Level *pLevel){
Level *pNext;
Level *p;
for(p=pLevel; p; p=pNext){
pNext = p->pNext;
sortedFreeLevel(pEnv, p);
}
}
void lsmSortedSaveTreeCursors(lsm_db *pDb){
MultiCursor *pCsr;
for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
lsmTreeCursorSave(pCsr->apTreeCsr[0]);
lsmTreeCursorSave(pCsr->apTreeCsr[1]);
}
}
void lsmSortedExpandBtreePage(Page *pPg, int nOrig){
u8 *aData;
int nData;
int nEntry;
int iHdr;
aData = lsmFsPageData(pPg, &nData);
nEntry = pageGetNRec(aData, nOrig);
iHdr = SEGMENT_EOF(nOrig, nEntry);
memmove(&aData[iHdr + (nData-nOrig)], &aData[iHdr], nOrig-iHdr);
}
#ifdef LSM_DEBUG_EXPENSIVE
static void assertRunInOrder(lsm_db *pDb, Segment *pSeg){
Page *pPg = 0;
LsmBlob blob1 = {0, 0, 0, 0};
LsmBlob blob2 = {0, 0, 0, 0};
lsmFsDbPageGet(pDb->pFS, pSeg, pSeg->iFirst, &pPg);
while( pPg ){
u8 *aData; int nData;
Page *pNext;
aData = lsmFsPageData(pPg, &nData);
if( 0==(pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) ){
int i;
int nRec = pageGetNRec(aData, nData);
for(i=0; i<nRec; i++){
int iTopic1, iTopic2;
pageGetKeyCopy(pDb->pEnv, pSeg, pPg, i, &iTopic1, &blob1);
if( i==0 && blob2.nData ){
assert( sortedKeyCompare(
pDb->xCmp, iTopic2, blob2.pData, blob2.nData,
iTopic1, blob1.pData, blob1.nData
)<0 );
}
if( i<(nRec-1) ){
pageGetKeyCopy(pDb->pEnv, pSeg, pPg, i+1, &iTopic2, &blob2);
assert( sortedKeyCompare(
pDb->xCmp, iTopic1, blob1.pData, blob1.nData,
iTopic2, blob2.pData, blob2.nData
)<0 );
}
}
}
lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
lsmFsPageRelease(pPg);
pPg = pNext;
}
sortedBlobFree(&blob1);
sortedBlobFree(&blob2);
}
#endif
#ifdef LSM_DEBUG_EXPENSIVE
static int assertPointersOk(
lsm_db *pDb,
Segment *pOne,
Segment *pTwo,
int bRhs
){
int rc = LSM_OK;
SegmentPtr ptr1;
SegmentPtr ptr2;
LsmPgno iPrev;
assert( pOne && pTwo );
memset(&ptr1, 0, sizeof(ptr1));
memset(&ptr2, 0, sizeof(ptr1));
ptr1.pSeg = pOne;
ptr2.pSeg = pTwo;
segmentPtrEndPage(pDb->pFS, &ptr1, 0, &rc);
segmentPtrEndPage(pDb->pFS, &ptr2, 0, &rc);
iPrev = pTwo->iFirst;
if( ptr1.iPtr!=iPrev && !bRhs ){
assert( 0 );
}
if( rc==LSM_OK && ptr1.nCell>0 ){
rc = segmentPtrLoadCell(&ptr1, 0);
}
while( rc==LSM_OK && ptr2.pPg ){
LsmPgno iThis;
do{
rc = segmentPtrNextPage(&ptr2, 1);
assert( rc==LSM_OK );
}while( rc==LSM_OK && ptr2.pPg && ptr2.nCell==0 );
if( rc!=LSM_OK || ptr2.pPg==0 ) break;
iThis = lsmFsPageNumber(ptr2.pPg);
if( (ptr2.flags & (PGFTR_SKIP_THIS_FLAG|SEGMENT_BTREE_FLAG))==0 ){
rc = segmentPtrLoadCell(&ptr2, 0);
while( rc==LSM_OK ){
int res = rtTopic(ptr1.eType) - rtTopic(ptr2.eType);
if( res==0 ){
res = pDb->xCmp(ptr1.pKey, ptr1.nKey, ptr2.pKey, ptr2.nKey);
}
if( res<0 ){
assert( bRhs || ptr1.iPtr+ptr1.iPgPtr==iPrev );
}else if( res>0 ){
assert( 0 );
}else{
assert( ptr1.iPtr+ptr1.iPgPtr==iThis );
iPrev = iThis;
break;
}
rc = segmentPtrAdvance(0, &ptr1, 0);
if( ptr1.pPg==0 ){
assert( 0 );
}
}
}
}
segmentPtrReset(&ptr1, 0);
segmentPtrReset(&ptr2, 0);
return LSM_OK;
}
static int assertBtreeOk(
lsm_db *pDb,
Segment *pSeg
){
int rc = LSM_OK;
if( pSeg->iRoot ){
LsmBlob blob = {0, 0, 0};
FileSystem *pFS = pDb->pFS;
Page *pPg = 0;
BtreeCursor *pCsr = 0;
rc = btreeCursorNew(pDb, pSeg, &pCsr);
if( rc==LSM_OK ){
rc = btreeCursorFirst(pCsr);
}
if( rc==LSM_OK ){
rc = lsmFsDbPageGet(pFS, pSeg, pSeg->iFirst, &pPg);
}
while( rc==LSM_OK ){
Page *pNext;
u8 *aData;
int nData;
int flags;
rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
lsmFsPageRelease(pPg);
pPg = pNext;
if( pPg==0 ) break;
aData = fsPageData(pPg, &nData);
flags = pageGetFlags(aData, nData);
if( rc==LSM_OK
&& 0==((SEGMENT_BTREE_FLAG|PGFTR_SKIP_THIS_FLAG) & flags)
&& 0!=pageGetNRec(aData, nData)
){
u8 *pKey;
int nKey;
int iTopic;
pKey = pageGetKey(pSeg, pPg, 0, &iTopic, &nKey, &blob);
assert( nKey==pCsr->nKey && 0==memcmp(pKey, pCsr->pKey, nKey) );
assert( lsmFsPageNumber(pPg)==pCsr->iPtr );
rc = btreeCursorNext(pCsr);
}
}
assert( rc!=LSM_OK || pCsr->pKey==0 );
if( pPg ) lsmFsPageRelease(pPg);
btreeCursorFree(pCsr);
sortedBlobFree(&blob);
}
return rc;
}
#endif