#include "lsmInt.h"
#define LSM_MAX_RHS_SEGMENTS 40
#define BYTESWAP32(x) ( \
(((x)&0x000000FF)<<24) + (((x)&0x0000FF00)<<8) \
+ (((x)&0x00FF0000)>>8) + (((x)&0xFF000000)>>24) \
)
static const int one = 1;
#define LSM_LITTLE_ENDIAN (*(u8 *)(&one))
#define CKPT_HDR_SIZE 9
#define CKPT_LOGPTR_SIZE 4
#define CKPT_APPENDLIST_SIZE (LSM_APPLIST_SZ * 2)
#define CKPT_HDR_ID_MSW 0
#define CKPT_HDR_ID_LSW 1
#define CKPT_HDR_NCKPT 2
#define CKPT_HDR_CMPID 3
#define CKPT_HDR_NBLOCK 4
#define CKPT_HDR_BLKSZ 5
#define CKPT_HDR_NLEVEL 6
#define CKPT_HDR_PGSZ 7
#define CKPT_HDR_NWRITE 8
#define CKPT_HDR_LO_MSW 9
#define CKPT_HDR_LO_LSW 10
#define CKPT_HDR_LO_CKSUM1 11
#define CKPT_HDR_LO_CKSUM2 12
typedef struct CkptBuffer CkptBuffer;
struct CkptBuffer {
lsm_env *pEnv;
int nAlloc;
u32 *aCkpt;
};
static void ckptChecksum(u32 *aCkpt, u32 nCkpt, u32 *piCksum1, u32 *piCksum2){
u32 i;
u32 cksum1 = 1;
u32 cksum2 = 2;
if( nCkpt % 2 ){
cksum1 += aCkpt[nCkpt-3] & 0x0000FFFF;
cksum2 += aCkpt[nCkpt-3] & 0xFFFF0000;
}
for(i=0; (i+3)<nCkpt; i+=2){
cksum1 += cksum2 + aCkpt[i];
cksum2 += cksum1 + aCkpt[i+1];
}
*piCksum1 = cksum1;
*piCksum2 = cksum2;
}
static void ckptSetValue(CkptBuffer *p, int iIdx, u32 iVal, int *pRc){
if( *pRc ) return;
if( iIdx>=p->nAlloc ){
int nNew = LSM_MAX(8, iIdx*2);
p->aCkpt = (u32 *)lsmReallocOrFree(p->pEnv, p->aCkpt, nNew*sizeof(u32));
if( !p->aCkpt ){
*pRc = LSM_NOMEM_BKPT;
return;
}
p->nAlloc = nNew;
}
p->aCkpt[iIdx] = iVal;
}
static void ckptChangeEndianness(u32 *aInt, int nInt){
if( LSM_LITTLE_ENDIAN ){
int i;
for(i=0; i<nInt; i++) aInt[i] = BYTESWAP32(aInt[i]);
}
}
static void ckptAddChecksum(CkptBuffer *p, int nCkpt, int *pRc){
if( *pRc==LSM_OK ){
u32 aCksum[2] = {0, 0};
ckptChecksum(p->aCkpt, nCkpt+2, &aCksum[0], &aCksum[1]);
ckptSetValue(p, nCkpt, aCksum[0], pRc);
ckptSetValue(p, nCkpt+1, aCksum[1], pRc);
}
}
static void ckptAppend64(CkptBuffer *p, int *piOut, i64 iVal, int *pRc){
int iOut = *piOut;
ckptSetValue(p, iOut++, (iVal >> 32) & 0xFFFFFFFF, pRc);
ckptSetValue(p, iOut++, (iVal & 0xFFFFFFFF), pRc);
*piOut = iOut;
}
static i64 ckptRead64(u32 *a){
return (((i64)a[0]) << 32) + (i64)a[1];
}
static i64 ckptGobble64(u32 *a, int *piIn){
int iIn = *piIn;
*piIn += 2;
return ckptRead64(&a[iIn]);
}
static void ckptExportSegment(
Segment *pSeg,
CkptBuffer *p,
int *piOut,
int *pRc
){
ckptAppend64(p, piOut, pSeg->iFirst, pRc);
ckptAppend64(p, piOut, pSeg->iLastPg, pRc);
ckptAppend64(p, piOut, pSeg->iRoot, pRc);
ckptAppend64(p, piOut, pSeg->nSize, pRc);
}
static void ckptExportLevel(
Level *pLevel,
CkptBuffer *p,
int *piOut,
int *pRc
){
int iOut = *piOut;
Merge *pMerge;
pMerge = pLevel->pMerge;
ckptSetValue(p, iOut++, (u32)pLevel->iAge + (u32)(pLevel->flags<<16), pRc);
ckptSetValue(p, iOut++, pLevel->nRight, pRc);
ckptExportSegment(&pLevel->lhs, p, &iOut, pRc);
assert( (pLevel->nRight>0)==(pMerge!=0) );
if( pMerge ){
int i;
for(i=0; i<pLevel->nRight; i++){
ckptExportSegment(&pLevel->aRhs[i], p, &iOut, pRc);
}
assert( pMerge->nInput==pLevel->nRight
|| pMerge->nInput==pLevel->nRight+1
);
ckptSetValue(p, iOut++, pMerge->nInput, pRc);
ckptSetValue(p, iOut++, pMerge->nSkip, pRc);
for(i=0; i<pMerge->nInput; i++){
ckptAppend64(p, &iOut, pMerge->aInput[i].iPg, pRc);
ckptSetValue(p, iOut++, pMerge->aInput[i].iCell, pRc);
}
ckptAppend64(p, &iOut, pMerge->splitkey.iPg, pRc);
ckptSetValue(p, iOut++, pMerge->splitkey.iCell, pRc);
ckptAppend64(p, &iOut, pMerge->iCurrentPtr, pRc);
}
*piOut = iOut;
}
static void ckptExportLog(
lsm_db *pDb,
int bFlush,
CkptBuffer *p,
int *piOut,
int *pRc
){
int iOut = *piOut;
assert( iOut==CKPT_HDR_LO_MSW );
if( bFlush ){
i64 iOff = pDb->treehdr.iOldLog;
ckptAppend64(p, &iOut, iOff, pRc);
ckptSetValue(p, iOut++, pDb->treehdr.oldcksum0, pRc);
ckptSetValue(p, iOut++, pDb->treehdr.oldcksum1, pRc);
}else{
for(; iOut<=CKPT_HDR_LO_CKSUM2; iOut++){
ckptSetValue(p, iOut, pDb->pShmhdr->aSnap2[iOut], pRc);
}
}
assert( *pRc || iOut==CKPT_HDR_LO_CKSUM2+1 );
*piOut = iOut;
}
static void ckptExportAppendlist(
lsm_db *db,
CkptBuffer *p,
int *piOut,
int *pRc
){
int i;
LsmPgno *aiAppend = db->pWorker->aiAppend;
for(i=0; i<LSM_APPLIST_SZ; i++){
ckptAppend64(p, piOut, aiAppend[i], pRc);
}
};
static int ckptExportSnapshot(
lsm_db *pDb,
int bLog,
i64 iId,
int bCksum,
void **ppCkpt,
int *pnCkpt
){
int rc = LSM_OK;
FileSystem *pFS = pDb->pFS;
Snapshot *pSnap = pDb->pWorker;
int nLevel = 0;
int iLevel;
int iOut = 0;
Level *pLevel;
int i;
CkptBuffer ckpt;
memset(&ckpt, 0, sizeof(CkptBuffer));
ckpt.pEnv = pDb->pEnv;
iOut = CKPT_HDR_SIZE;
ckptExportLog(pDb, bLog, &ckpt, &iOut, &rc);
ckptExportAppendlist(pDb, &ckpt, &iOut, &rc);
for(pLevel=lsmDbSnapshotLevel(pSnap); pLevel; pLevel=pLevel->pNext) nLevel++;
iLevel = 0;
for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nLevel; pLevel=pLevel->pNext){
ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
iLevel++;
}
ckptSetValue(&ckpt, iOut++, pSnap->redirect.n, &rc);
for(i=0; i<pSnap->redirect.n; i++){
ckptSetValue(&ckpt, iOut++, pSnap->redirect.a[i].iFrom, &rc);
ckptSetValue(&ckpt, iOut++, pSnap->redirect.a[i].iTo, &rc);
}
assert( pSnap->freelist.nEntry<=pDb->nMaxFreelist );
if( rc==LSM_OK ){
int nFree = pSnap->freelist.nEntry;
ckptSetValue(&ckpt, iOut++, nFree, &rc);
for(i=0; i<nFree; i++){
FreelistEntry *p = &pSnap->freelist.aEntry[i];
ckptSetValue(&ckpt, iOut++, p->iBlk, &rc);
ckptSetValue(&ckpt, iOut++, (p->iId >> 32) & 0xFFFFFFFF, &rc);
ckptSetValue(&ckpt, iOut++, p->iId & 0xFFFFFFFF, &rc);
}
}
assert( iId>=0 );
assert( pSnap->iCmpId==pDb->compress.iId
|| pSnap->iCmpId==LSM_COMPRESSION_EMPTY
);
ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
ckptSetValue(&ckpt, CKPT_HDR_CMPID, pDb->compress.iId, &rc);
ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc);
if( bCksum ){
ckptAddChecksum(&ckpt, iOut, &rc);
}else{
ckptSetValue(&ckpt, iOut, 0, &rc);
ckptSetValue(&ckpt, iOut+1, 0, &rc);
}
iOut += 2;
assert( iOut<=1024 );
#ifdef LSM_LOG_FREELIST
lsmLogMessage(pDb, rc,
"ckptExportSnapshot(): id=%lld freelist: %d", iId, pSnap->freelist.nEntry
);
for(i=0; i<pSnap->freelist.nEntry; i++){
lsmLogMessage(pDb, rc,
"ckptExportSnapshot(): iBlk=%d id=%lld",
pSnap->freelist.aEntry[i].iBlk,
pSnap->freelist.aEntry[i].iId
);
}
#endif
*ppCkpt = (void *)ckpt.aCkpt;
if( pnCkpt ) *pnCkpt = sizeof(u32)*iOut;
return rc;
}
static void ckptNewSegment(
u32 *aIn,
int *piIn,
Segment *pSegment
){
assert( pSegment->iFirst==0 && pSegment->iLastPg==0 );
assert( pSegment->nSize==0 && pSegment->iRoot==0 );
pSegment->iFirst = ckptGobble64(aIn, piIn);
pSegment->iLastPg = ckptGobble64(aIn, piIn);
pSegment->iRoot = ckptGobble64(aIn, piIn);
pSegment->nSize = ckptGobble64(aIn, piIn);
assert( pSegment->iFirst );
}
static int ckptSetupMerge(lsm_db *pDb, u32 *aInt, int *piIn, Level *pLevel){
Merge *pMerge;
int nInput;
int iIn = *piIn;
int i;
int nByte;
nInput = (int)aInt[iIn++];
nByte = sizeof(Merge) + sizeof(MergeInput) * nInput;
pMerge = (Merge *)lsmMallocZero(pDb->pEnv, nByte);
if( !pMerge ) return LSM_NOMEM_BKPT;
pLevel->pMerge = pMerge;
pMerge->aInput = (MergeInput *)&pMerge[1];
pMerge->nInput = nInput;
pMerge->iOutputOff = -1;
pMerge->nSkip = (int)aInt[iIn++];
for(i=0; i<nInput; i++){
pMerge->aInput[i].iPg = ckptGobble64(aInt, &iIn);
pMerge->aInput[i].iCell = (int)aInt[iIn++];
}
pMerge->splitkey.iPg = ckptGobble64(aInt, &iIn);
pMerge->splitkey.iCell = (int)aInt[iIn++];
pMerge->iCurrentPtr = ckptGobble64(aInt, &iIn);
*piIn = iIn;
return LSM_OK;
}
static int ckptLoadLevels(
lsm_db *pDb,
u32 *aIn,
int *piIn,
int nLevel,
Level **ppLevel
){
int i;
int rc = LSM_OK;
Level *pRet = 0;
Level **ppNext;
int iIn = *piIn;
ppNext = &pRet;
for(i=0; rc==LSM_OK && i<nLevel; i++){
int iRight;
Level *pLevel;
pLevel = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
if( rc==LSM_OK ){
pLevel->iAge = (u16)(aIn[iIn] & 0x0000FFFF);
pLevel->flags = (u16)((aIn[iIn]>>16) & 0x0000FFFF);
iIn++;
pLevel->nRight = aIn[iIn++];
if( pLevel->nRight ){
int nByte = sizeof(Segment) * pLevel->nRight;
pLevel->aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
}
if( rc==LSM_OK ){
*ppNext = pLevel;
ppNext = &pLevel->pNext;
ckptNewSegment(aIn, &iIn, &pLevel->lhs);
for(iRight=0; iRight<pLevel->nRight; iRight++){
ckptNewSegment(aIn, &iIn, &pLevel->aRhs[iRight]);
}
if( pLevel->nRight>0 ){
rc = ckptSetupMerge(pDb, aIn, &iIn, pLevel);
}
}
}
}
if( rc!=LSM_OK ){
lsmSortedFreeLevel(pDb->pEnv, pRet);
pRet = 0;
}
*ppLevel = pRet;
*piIn = iIn;
return rc;
}
int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal){
int rc = LSM_OK;
if( nVal>0 ){
u32 *aIn;
aIn = lsmMallocRc(pDb->pEnv, nVal, &rc);
if( aIn ){
Level *pLevel = 0;
Level *pParent;
int nIn;
int nLevel;
int iIn = 1;
memcpy(aIn, pVal, nVal);
nIn = nVal / sizeof(u32);
ckptChangeEndianness(aIn, nIn);
nLevel = aIn[0];
rc = ckptLoadLevels(pDb, aIn, &iIn, nLevel, &pLevel);
lsmFree(pDb->pEnv, aIn);
assert( rc==LSM_OK || pLevel==0 );
if( rc==LSM_OK ){
pParent = lsmDbSnapshotLevel(pDb->pWorker);
assert( pParent );
while( pParent->pNext ) pParent = pParent->pNext;
pParent->pNext = pLevel;
}
}
}
return rc;
}
int lsmCheckpointLevels(
lsm_db *pDb,
int nLevel,
void **paVal,
int *pnVal
){
Level *p;
int nAll= 0;
int rc;
int i;
int iOut;
CkptBuffer ckpt;
assert( nLevel>0 );
for(p=lsmDbSnapshotLevel(pDb->pWorker); p; p=p->pNext) nAll++;
assert( nAll>nLevel );
nAll -= nLevel;
for(p=lsmDbSnapshotLevel(pDb->pWorker); p && nAll>0; p=p->pNext) nAll--;
memset(&ckpt, 0, sizeof(CkptBuffer));
ckpt.pEnv = pDb->pEnv;
ckptSetValue(&ckpt, 0, nLevel, &rc);
iOut = 1;
for(i=0; rc==LSM_OK && i<nLevel; i++){
ckptExportLevel(p, &ckpt, &iOut, &rc);
p = p->pNext;
}
assert( rc!=LSM_OK || p==0 );
if( rc==LSM_OK ){
ckptChangeEndianness(ckpt.aCkpt, iOut);
*paVal = (void *)ckpt.aCkpt;
*pnVal = iOut * sizeof(u32);
}else{
*pnVal = 0;
*paVal = 0;
}
return rc;
}
static i64 ckptLoadId(MetaPage *pPg){
i64 ret = 0;
if( pPg ){
int nData;
u8 *aData = lsmFsMetaPageData(pPg, &nData);
ret = (((i64)lsmGetU32(&aData[CKPT_HDR_ID_MSW*4])) << 32) +
((i64)lsmGetU32(&aData[CKPT_HDR_ID_LSW*4]));
}
return ret;
}
static int ckptChecksumOk(u32 *aCkpt){
u32 nCkpt = aCkpt[CKPT_HDR_NCKPT];
u32 cksum1;
u32 cksum2;
if( nCkpt<CKPT_HDR_NCKPT || nCkpt>(LSM_META_RW_PAGE_SIZE)/sizeof(u32) ){
return 0;
}
ckptChecksum(aCkpt, nCkpt, &cksum1, &cksum2);
return (cksum1==aCkpt[nCkpt-2] && cksum2==aCkpt[nCkpt-1]);
}
static int ckptTryLoad(lsm_db *pDb, MetaPage *pPg, u32 iMeta, int *pRc){
int bLoaded = 0;
if( *pRc==LSM_OK ){
int rc = LSM_OK;
u32 *aCkpt = 0;
u32 nCkpt;
int nData;
u8 *aData;
aData = lsmFsMetaPageData(pPg, &nData);
nCkpt = (u32)lsmGetU32(&aData[CKPT_HDR_NCKPT*sizeof(u32)]);
if( nCkpt<=nData/sizeof(u32) && nCkpt>CKPT_HDR_NCKPT ){
aCkpt = (u32 *)lsmMallocRc(pDb->pEnv, nCkpt*sizeof(u32), &rc);
}
if( aCkpt ){
memcpy(aCkpt, aData, nCkpt*sizeof(u32));
ckptChangeEndianness(aCkpt, nCkpt);
if( ckptChecksumOk(aCkpt) ){
ShmHeader *pShm = pDb->pShmhdr;
memcpy(pShm->aSnap1, aCkpt, nCkpt*sizeof(u32));
memcpy(pShm->aSnap2, aCkpt, nCkpt*sizeof(u32));
memcpy(pDb->aSnapshot, aCkpt, nCkpt*sizeof(u32));
pShm->iMetaPage = iMeta;
bLoaded = 1;
}
}
lsmFree(pDb->pEnv, aCkpt);
*pRc = rc;
}
return bLoaded;
}
static void ckptLoadEmpty(lsm_db *pDb){
u32 aCkpt[] = {
0,
10,
0,
LSM_COMPRESSION_EMPTY,
0,
0,
0,
0,
0,
0, 0, 1234, 5678,
0,0,0,0, 0,0,0,0,
0,
0,
0, 0
};
u32 nCkpt = array_size(aCkpt);
ShmHeader *pShm = pDb->pShmhdr;
aCkpt[CKPT_HDR_NCKPT] = nCkpt;
aCkpt[CKPT_HDR_BLKSZ] = pDb->nDfltBlksz;
aCkpt[CKPT_HDR_PGSZ] = pDb->nDfltPgsz;
ckptChecksum(aCkpt, array_size(aCkpt), &aCkpt[nCkpt-2], &aCkpt[nCkpt-1]);
memcpy(pShm->aSnap1, aCkpt, nCkpt*sizeof(u32));
memcpy(pShm->aSnap2, aCkpt, nCkpt*sizeof(u32));
memcpy(pDb->aSnapshot, aCkpt, nCkpt*sizeof(u32));
}
int lsmCheckpointRecover(lsm_db *pDb){
int rc = LSM_OK;
i64 iId1;
i64 iId2;
int bLoaded = 0;
int cmp;
MetaPage *apPg[2] = {0, 0};
rc = lsmFsMetaPageGet(pDb->pFS, 0, 1, &apPg[0]);
if( rc==LSM_OK ) rc = lsmFsMetaPageGet(pDb->pFS, 0, 2, &apPg[1]);
iId1 = ckptLoadId(apPg[0]);
iId2 = ckptLoadId(apPg[1]);
cmp = (iId2 > iId1);
bLoaded = ckptTryLoad(pDb, apPg[cmp?1:0], (cmp?2:1), &rc);
if( bLoaded==0 ){
bLoaded = ckptTryLoad(pDb, apPg[cmp?0:1], (cmp?1:2), &rc);
}
if( bLoaded==0 ){
ckptLoadEmpty(pDb);
}
lsmFsMetaPageRelease(apPg[0]);
lsmFsMetaPageRelease(apPg[1]);
return rc;
}
int lsmCheckpointStore(lsm_db *pDb, int iMeta){
MetaPage *pPg = 0;
int rc;
assert( iMeta==1 || iMeta==2 );
rc = lsmFsMetaPageGet(pDb->pFS, 1, iMeta, &pPg);
if( rc==LSM_OK ){
u8 *aData;
int nData;
int nCkpt;
nCkpt = (int)pDb->aSnapshot[CKPT_HDR_NCKPT];
aData = lsmFsMetaPageData(pPg, &nData);
memcpy(aData, pDb->aSnapshot, nCkpt*sizeof(u32));
ckptChangeEndianness((u32 *)aData, nCkpt);
rc = lsmFsMetaPageRelease(pPg);
}
return rc;
}
int lsmCheckpointLoad(lsm_db *pDb, int *piRead){
int nRem = LSM_ATTEMPTS_BEFORE_PROTOCOL;
ShmHeader *pShm = pDb->pShmhdr;
while( (nRem--)>0 ){
int nInt;
nInt = pShm->aSnap1[CKPT_HDR_NCKPT];
if( nInt<=(LSM_META_RW_PAGE_SIZE / sizeof(u32)) ){
memcpy(pDb->aSnapshot, pShm->aSnap1, nInt*sizeof(u32));
if( ckptChecksumOk(pDb->aSnapshot) ){
if( piRead ) *piRead = 1;
return LSM_OK;
}
}
nInt = pShm->aSnap2[CKPT_HDR_NCKPT];
if( nInt<=(LSM_META_RW_PAGE_SIZE / sizeof(u32)) ){
memcpy(pDb->aSnapshot, pShm->aSnap2, nInt*sizeof(u32));
if( ckptChecksumOk(pDb->aSnapshot) ){
if( piRead ) *piRead = 2;
return LSM_OK;
}
}
lsmShmBarrier(pDb);
}
return LSM_PROTOCOL_BKPT;
}
int lsmInfoCompressionId(lsm_db *db, u32 *piCmpId){
int rc;
assert( db->pClient==0 && db->pWorker==0 );
rc = lsmCheckpointLoad(db, 0);
if( rc==LSM_OK ){
*piCmpId = db->aSnapshot[CKPT_HDR_CMPID];
}
return rc;
}
int lsmCheckpointLoadOk(lsm_db *pDb, int iSnap){
u32 *aShm;
assert( iSnap==1 || iSnap==2 );
aShm = (iSnap==1) ? pDb->pShmhdr->aSnap1 : pDb->pShmhdr->aSnap2;
return (lsmCheckpointId(pDb->aSnapshot, 0)==lsmCheckpointId(aShm, 0) );
}
int lsmCheckpointClientCacheOk(lsm_db *pDb){
return ( pDb->pClient
&& pDb->pClient->iId==lsmCheckpointId(pDb->aSnapshot, 0)
&& pDb->pClient->iId==lsmCheckpointId(pDb->pShmhdr->aSnap1, 0)
&& pDb->pClient->iId==lsmCheckpointId(pDb->pShmhdr->aSnap2, 0)
);
}
int lsmCheckpointLoadWorker(lsm_db *pDb){
int rc;
ShmHeader *pShm = pDb->pShmhdr;
int nInt1;
int nInt2;
assert(
lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL)
|| lsmShmAssertLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL)
);
nInt1 = pShm->aSnap1[CKPT_HDR_NCKPT];
nInt2 = pShm->aSnap2[CKPT_HDR_NCKPT];
if( nInt1!=nInt2 || memcmp(pShm->aSnap1, pShm->aSnap2, nInt2*sizeof(u32)) ){
if( ckptChecksumOk(pShm->aSnap1) ){
memcpy(pShm->aSnap2, pShm->aSnap1, sizeof(u32)*nInt1);
}else if( ckptChecksumOk(pShm->aSnap2) ){
memcpy(pShm->aSnap1, pShm->aSnap2, sizeof(u32)*nInt2);
}else{
return LSM_PROTOCOL_BKPT;
}
}
rc = lsmCheckpointDeserialize(pDb, 1, pShm->aSnap1, &pDb->pWorker);
if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase;
if( rc==LSM_OK ){
rc = lsmCheckCompressionId(pDb, pDb->pWorker->iCmpId);
}
#if 0#endif
return rc;
}
int lsmCheckpointDeserialize(
lsm_db *pDb,
int bInclFreelist,
u32 *aCkpt,
Snapshot **ppSnap
){
int rc = LSM_OK;
Snapshot *pNew;
pNew = (Snapshot *)lsmMallocZeroRc(pDb->pEnv, sizeof(Snapshot), &rc);
if( rc==LSM_OK ){
Level *pLvl;
int nFree;
int i;
int nLevel = (int)aCkpt[CKPT_HDR_NLEVEL];
int iIn = CKPT_HDR_SIZE + CKPT_APPENDLIST_SIZE + CKPT_LOGPTR_SIZE;
pNew->iId = lsmCheckpointId(aCkpt, 0);
pNew->nBlock = aCkpt[CKPT_HDR_NBLOCK];
pNew->nWrite = aCkpt[CKPT_HDR_NWRITE];
rc = ckptLoadLevels(pDb, aCkpt, &iIn, nLevel, &pNew->pLevel);
pNew->iLogOff = lsmCheckpointLogOffset(aCkpt);
pNew->iCmpId = aCkpt[CKPT_HDR_CMPID];
for(i=0; i<LSM_APPLIST_SZ; i++){
u32 *a = &aCkpt[CKPT_HDR_SIZE + CKPT_LOGPTR_SIZE + i*2];
pNew->aiAppend[i] = ckptRead64(a);
}
pNew->redirect.n = aCkpt[iIn++];
if( pNew->redirect.n ){
pNew->redirect.a = lsmMallocZeroRc(pDb->pEnv,
(sizeof(struct RedirectEntry) * LSM_MAX_BLOCK_REDIRECTS), &rc
);
if( rc==LSM_OK ){
for(i=0; i<pNew->redirect.n; i++){
pNew->redirect.a[i].iFrom = aCkpt[iIn++];
pNew->redirect.a[i].iTo = aCkpt[iIn++];
}
}
for(pLvl=pNew->pLevel; pLvl->pNext; pLvl=pLvl->pNext);
if( pLvl->nRight ){
pLvl->aRhs[pLvl->nRight-1].pRedirect = &pNew->redirect;
}else{
pLvl->lhs.pRedirect = &pNew->redirect;
}
}
if( rc==LSM_OK && bInclFreelist ){
nFree = aCkpt[iIn++];
if( nFree ){
pNew->freelist.aEntry = (FreelistEntry *)lsmMallocZeroRc(
pDb->pEnv, sizeof(FreelistEntry)*nFree, &rc
);
if( rc==LSM_OK ){
int j;
for(j=0; j<nFree; j++){
FreelistEntry *p = &pNew->freelist.aEntry[j];
p->iBlk = aCkpt[iIn++];
p->iId = ((i64)(aCkpt[iIn])<<32) + aCkpt[iIn+1];
iIn += 2;
}
pNew->freelist.nEntry = pNew->freelist.nAlloc = nFree;
}
}
}
}
if( rc!=LSM_OK ){
lsmFreeSnapshot(pDb->pEnv, pNew);
pNew = 0;
}
*ppSnap = pNew;
return rc;
}
int lsmDatabaseFull(lsm_db *pDb){
Level *p;
int nRhs = 0;
assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) );
assert( pDb->pWorker );
for(p=pDb->pWorker->pLevel; p; p=p->pNext){
nRhs += (p->nRight ? p->nRight : 1);
}
return (nRhs >= LSM_MAX_RHS_SEGMENTS);
}
int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush){
Snapshot *pSnap = pDb->pWorker;
ShmHeader *pShm = pDb->pShmhdr;
void *p = 0;
int n = 0;
int rc;
pSnap->iId++;
rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId, 1, &p, &n);
if( rc!=LSM_OK ) return rc;
assert( ckptChecksumOk((u32 *)p) );
assert( n<=LSM_META_RW_PAGE_SIZE );
memcpy(pShm->aSnap2, p, n);
lsmShmBarrier(pDb);
memcpy(pShm->aSnap1, p, n);
lsmFree(pDb->pEnv, p);
return LSM_OK;
}
int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog, u32 *pnWrite){
int rc = LSM_OK;
MetaPage *pPg;
u32 iMeta;
iMeta = pDb->pShmhdr->iMetaPage;
if( iMeta==1 || iMeta==2 ){
rc = lsmFsMetaPageGet(pDb->pFS, 0, iMeta, &pPg);
if( rc==LSM_OK ){
int nCkpt;
int nData;
u8 *aData;
aData = lsmFsMetaPageData(pPg, &nData);
assert( nData==LSM_META_RW_PAGE_SIZE );
nCkpt = lsmGetU32(&aData[CKPT_HDR_NCKPT*sizeof(u32)]);
if( nCkpt<(LSM_META_RW_PAGE_SIZE/sizeof(u32)) ){
u32 *aCopy = lsmMallocRc(pDb->pEnv, sizeof(u32) * nCkpt, &rc);
if( aCopy ){
memcpy(aCopy, aData, nCkpt*sizeof(u32));
ckptChangeEndianness(aCopy, nCkpt);
if( ckptChecksumOk(aCopy) ){
if( piId ) *piId = lsmCheckpointId(aCopy, 0);
if( piLog ) *piLog = (lsmCheckpointLogOffset(aCopy) >> 1);
if( pnWrite ) *pnWrite = aCopy[CKPT_HDR_NWRITE];
}
lsmFree(pDb->pEnv, aCopy);
}
}
lsmFsMetaPageRelease(pPg);
}
}
if( (iMeta!=1 && iMeta!=2) || rc!=LSM_OK || pDb->pShmhdr->iMetaPage!=iMeta ){
if( piId ) *piId = 0;
if( piLog ) *piLog = 0;
if( pnWrite ) *pnWrite = 0;
}
return rc;
}
i64 lsmCheckpointId(u32 *aCkpt, int bDisk){
i64 iId;
if( bDisk ){
u8 *aData = (u8 *)aCkpt;
iId = (((i64)lsmGetU32(&aData[CKPT_HDR_ID_MSW*4])) << 32);
iId += ((i64)lsmGetU32(&aData[CKPT_HDR_ID_LSW*4]));
}else{
iId = ((i64)aCkpt[CKPT_HDR_ID_MSW] << 32) + (i64)aCkpt[CKPT_HDR_ID_LSW];
}
return iId;
}
u32 lsmCheckpointNBlock(u32 *aCkpt){
return aCkpt[CKPT_HDR_NBLOCK];
}
u32 lsmCheckpointNWrite(u32 *aCkpt, int bDisk){
if( bDisk ){
return lsmGetU32((u8 *)&aCkpt[CKPT_HDR_NWRITE]);
}else{
return aCkpt[CKPT_HDR_NWRITE];
}
}
i64 lsmCheckpointLogOffset(u32 *aCkpt){
return ((i64)aCkpt[CKPT_HDR_LO_MSW] << 32) + (i64)aCkpt[CKPT_HDR_LO_LSW];
}
int lsmCheckpointPgsz(u32 *aCkpt){ return (int)aCkpt[CKPT_HDR_PGSZ]; }
int lsmCheckpointBlksz(u32 *aCkpt){ return (int)aCkpt[CKPT_HDR_BLKSZ]; }
void lsmCheckpointLogoffset(
u32 *aCkpt,
DbLog *pLog
){
pLog->aRegion[2].iStart = (lsmCheckpointLogOffset(aCkpt) >> 1);
pLog->cksum0 = aCkpt[CKPT_HDR_LO_CKSUM1];
pLog->cksum1 = aCkpt[CKPT_HDR_LO_CKSUM2];
pLog->iSnapshotId = lsmCheckpointId(aCkpt, 0);
}
void lsmCheckpointZeroLogoffset(lsm_db *pDb){
u32 nCkpt;
nCkpt = pDb->aSnapshot[CKPT_HDR_NCKPT];
assert( nCkpt>CKPT_HDR_NCKPT );
assert( nCkpt==pDb->pShmhdr->aSnap1[CKPT_HDR_NCKPT] );
assert( 0==memcmp(pDb->aSnapshot, pDb->pShmhdr->aSnap1, nCkpt*sizeof(u32)) );
assert( 0==memcmp(pDb->aSnapshot, pDb->pShmhdr->aSnap2, nCkpt*sizeof(u32)) );
pDb->aSnapshot[CKPT_HDR_LO_MSW] = 0;
pDb->aSnapshot[CKPT_HDR_LO_LSW] = 0;
ckptChecksum(pDb->aSnapshot, nCkpt,
&pDb->aSnapshot[nCkpt-2], &pDb->aSnapshot[nCkpt-1]
);
memcpy(pDb->pShmhdr->aSnap1, pDb->aSnapshot, nCkpt*sizeof(u32));
memcpy(pDb->pShmhdr->aSnap2, pDb->aSnapshot, nCkpt*sizeof(u32));
}
int lsmCheckpointSize(lsm_db *db, int *pnKB){
int rc = LSM_OK;
u32 nSynced;
rc = lsmCheckpointSynced(db, 0, 0, &nSynced);
if( rc==LSM_OK ){
u32 nPgsz = db->pShmhdr->aSnap1[CKPT_HDR_PGSZ];
u32 nWrite = db->pShmhdr->aSnap1[CKPT_HDR_NWRITE];
*pnKB = (int)(( ((i64)(nWrite - nSynced) * nPgsz) + 1023) / 1024);
}
return rc;
}