#include "lsmInt.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
struct FileSystem {
lsm_db *pDb;
lsm_env *pEnv;
char *zDb;
char *zLog;
int nMetasize;
int nMetaRwSize;
i64 nPagesize;
i64 nBlocksize;
LsmFile *pLsmFile;
lsm_file *fdDb;
lsm_file *fdLog;
int szSector;
lsm_compress *pCompress;
u8 *aIBuffer;
u8 *aOBuffer;
int nBuffer;
i64 nMapLimit;
void *pMap;
i64 nMap;
Page *pFree;
Page *pMapped;
int nCacheMax;
int nCacheAlloc;
Page *pLruFirst;
Page *pLruLast;
int nHash;
Page **apHash;
Page *pWaiting;
int nOut;
int nWrite;
int nRead;
};
struct Page {
u8 *aData;
int nData;
LsmPgno iPg;
int nRef;
int flags;
Page *pHashNext;
Page *pLruNext;
Page *pLruPrev;
FileSystem *pFS;
int nCompress;
int nCompressPrev;
Segment *pSeg;
Page *pWaitingNext;
Page *pFreeNext;
Page *pMappedNext;
};
struct MetaPage {
int iPg;
int bWrite;
u8 *aData;
FileSystem *pFS;
};
#define PAGE_DIRTY 0x00000001
#define PAGE_FREE 0x00000002
#define PAGE_HASPREV 0x00000004
#define BLOCK1_HDR_SIZE(pgsz) LSM_MAX(1, 8192/(pgsz))
#ifndef NDEBUG
static void lsmIoerrBkpt(void){
static int nErr = 0;
nErr++;
}
static int IOERR_WRAPPER(int rc){
if( rc!=LSM_OK ) lsmIoerrBkpt();
return rc;
}
#else
# define IOERR_WRAPPER(rc) (rc)
#endif
#ifdef NDEBUG
# define assert_lists_are_ok(x)
#else
static Page *fsPageFindInHash(FileSystem *pFS, LsmPgno iPg, int *piHash);
static void assert_lists_are_ok(FileSystem *pFS){
#if 0#endif
}
#endif
int lsmEnvOpen(lsm_env *pEnv, const char *zFile, int flags, lsm_file **ppNew){
return pEnv->xOpen(pEnv, zFile, flags, ppNew);
}
static int lsmEnvRead(
lsm_env *pEnv,
lsm_file *pFile,
lsm_i64 iOff,
void *pRead,
int nRead
){
return IOERR_WRAPPER( pEnv->xRead(pFile, iOff, pRead, nRead) );
}
static int lsmEnvWrite(
lsm_env *pEnv,
lsm_file *pFile,
lsm_i64 iOff,
const void *pWrite,
int nWrite
){
return IOERR_WRAPPER( pEnv->xWrite(pFile, iOff, (void *)pWrite, nWrite) );
}
static int lsmEnvSync(lsm_env *pEnv, lsm_file *pFile){
return IOERR_WRAPPER( pEnv->xSync(pFile) );
}
static int lsmEnvSectorSize(lsm_env *pEnv, lsm_file *pFile){
return pEnv->xSectorSize(pFile);
}
int lsmEnvClose(lsm_env *pEnv, lsm_file *pFile){
return IOERR_WRAPPER( pEnv->xClose(pFile) );
}
static int lsmEnvTruncate(lsm_env *pEnv, lsm_file *pFile, lsm_i64 nByte){
return IOERR_WRAPPER( pEnv->xTruncate(pFile, nByte) );
}
static int lsmEnvUnlink(lsm_env *pEnv, const char *zDel){
return IOERR_WRAPPER( pEnv->xUnlink(pEnv, zDel) );
}
static int lsmEnvRemap(
lsm_env *pEnv,
lsm_file *pFile,
i64 szMin,
void **ppMap,
i64 *pszMap
){
return pEnv->xRemap(pFile, szMin, ppMap, pszMap);
}
int lsmEnvLock(lsm_env *pEnv, lsm_file *pFile, int iLock, int eLock){
if( pFile==0 ) return LSM_OK;
return pEnv->xLock(pFile, iLock, eLock);
}
int lsmEnvTestLock(
lsm_env *pEnv,
lsm_file *pFile,
int iLock,
int nLock,
int eLock
){
return pEnv->xTestLock(pFile, iLock, nLock, eLock);
}
int lsmEnvShmMap(
lsm_env *pEnv,
lsm_file *pFile,
int iChunk,
int sz,
void **ppOut
){
return pEnv->xShmMap(pFile, iChunk, sz, ppOut);
}
void lsmEnvShmBarrier(lsm_env *pEnv){
pEnv->xShmBarrier();
}
void lsmEnvShmUnmap(lsm_env *pEnv, lsm_file *pFile, int bDel){
pEnv->xShmUnmap(pFile, bDel);
}
void lsmEnvSleep(lsm_env *pEnv, int nUs){
pEnv->xSleep(pEnv, nUs);
}
int lsmFsWriteLog(FileSystem *pFS, i64 iOff, LsmString *pStr){
assert( pFS->fdLog );
return lsmEnvWrite(pFS->pEnv, pFS->fdLog, iOff, pStr->z, pStr->n);
}
int lsmFsSyncLog(FileSystem *pFS){
assert( pFS->fdLog );
return lsmEnvSync(pFS->pEnv, pFS->fdLog);
}
int lsmFsReadLog(FileSystem *pFS, i64 iOff, int nRead, LsmString *pStr){
int rc;
assert( pFS->fdLog );
rc = lsmStringExtend(pStr, nRead);
if( rc==LSM_OK ){
rc = lsmEnvRead(pFS->pEnv, pFS->fdLog, iOff, &pStr->z[pStr->n], nRead);
pStr->n += nRead;
}
return rc;
}
int lsmFsTruncateLog(FileSystem *pFS, i64 nByte){
if( pFS->fdLog==0 ) return LSM_OK;
return lsmEnvTruncate(pFS->pEnv, pFS->fdLog, nByte);
}
int lsmFsTruncateDb(FileSystem *pFS, i64 nByte){
if( pFS->fdDb==0 ) return LSM_OK;
return lsmEnvTruncate(pFS->pEnv, pFS->fdDb, nByte);
}
int lsmFsCloseAndDeleteLog(FileSystem *pFS){
char *zDel;
if( pFS->fdLog ){
lsmEnvClose(pFS->pEnv, pFS->fdLog );
pFS->fdLog = 0;
}
zDel = lsmMallocPrintf(pFS->pEnv, "%s-log", pFS->zDb);
if( zDel ){
lsmEnvUnlink(pFS->pEnv, zDel);
lsmFree(pFS->pEnv, zDel);
}
return LSM_OK;
}
static int fsMmapPage(FileSystem *pFS, LsmPgno iReal){
return ((i64)iReal*pFS->nPagesize <= pFS->nMapLimit);
}
static int fsHashKey(int nHash, LsmPgno iPg){
return (iPg % nHash);
}
static lsm_file *fsOpenFile(
FileSystem *pFS,
int bReadonly,
int bLog,
int *pRc
){
lsm_file *pFile = 0;
if( *pRc==LSM_OK ){
int flags = (bReadonly ? LSM_OPEN_READONLY : 0);
const char *zPath = (bLog ? pFS->zLog : pFS->zDb);
*pRc = lsmEnvOpen(pFS->pEnv, zPath, flags, &pFile);
}
return pFile;
}
int lsmFsOpenLog(lsm_db *db, int *pbOpen){
int rc = LSM_OK;
FileSystem *pFS = db->pFS;
if( 0==pFS->fdLog ){
pFS->fdLog = fsOpenFile(pFS, db->bReadonly, 1, &rc);
if( rc==LSM_IOERR_NOENT && db->bReadonly ){
rc = LSM_OK;
}
}
if( pbOpen ) *pbOpen = (pFS->fdLog!=0);
return rc;
}
void lsmFsCloseLog(lsm_db *db){
FileSystem *pFS = db->pFS;
if( pFS->fdLog ){
lsmEnvClose(pFS->pEnv, pFS->fdLog);
pFS->fdLog = 0;
}
}
int lsmFsOpen(
lsm_db *pDb,
const char *zDb,
int bReadonly
){
FileSystem *pFS;
int rc = LSM_OK;
int nDb = strlen(zDb);
int nByte;
assert( pDb->pFS==0 );
assert( pDb->pWorker==0 && pDb->pClient==0 );
nByte = sizeof(FileSystem) + nDb+1 + nDb+4+1;
pFS = (FileSystem *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
if( pFS ){
LsmFile *pLsmFile;
pFS->zDb = (char *)&pFS[1];
pFS->zLog = &pFS->zDb[nDb+1];
pFS->nPagesize = LSM_DFLT_PAGE_SIZE;
pFS->nBlocksize = LSM_DFLT_BLOCK_SIZE;
pFS->nMetasize = LSM_META_PAGE_SIZE;
pFS->nMetaRwSize = LSM_META_RW_PAGE_SIZE;
pFS->pDb = pDb;
pFS->pEnv = pDb->pEnv;
memcpy(pFS->zDb, zDb, nDb+1);
memcpy(pFS->zLog, zDb, nDb);
memcpy(&pFS->zLog[nDb], "-log", 5);
pFS->nCacheMax = 2048*1024 / pFS->nPagesize;
pFS->nHash = 4096;
pFS->apHash = lsmMallocZeroRc(pDb->pEnv, sizeof(Page *) * pFS->nHash, &rc);
pLsmFile = lsmDbRecycleFd(pDb);
if( pLsmFile ){
pFS->pLsmFile = pLsmFile;
pFS->fdDb = pLsmFile->pFile;
memset(pLsmFile, 0, sizeof(LsmFile));
}else{
pFS->pLsmFile = lsmMallocZeroRc(pDb->pEnv, sizeof(LsmFile), &rc);
if( rc==LSM_OK ){
pFS->fdDb = fsOpenFile(pFS, bReadonly, 0, &rc);
}
}
if( rc!=LSM_OK ){
lsmFsClose(pFS);
pFS = 0;
}else{
pFS->szSector = lsmEnvSectorSize(pFS->pEnv, pFS->fdDb);
}
}
pDb->pFS = pFS;
return rc;
}
int lsmFsConfigure(lsm_db *db){
FileSystem *pFS = db->pFS;
if( pFS ){
lsm_env *pEnv = pFS->pEnv;
Page *pPg;
assert( pFS->nOut==0 );
assert( pFS->pWaiting==0 );
assert( pFS->pMapped==0 );
lsmFree(pEnv, pFS->aIBuffer);
lsmFree(pEnv, pFS->aOBuffer);
pFS->nBuffer = 0;
if( pFS->pMap ){
lsmEnvRemap(pEnv, pFS->fdDb, -1, &pFS->pMap, &pFS->nMap);
pFS->nMapLimit = 0;
}
pPg = pFS->pLruFirst;
while( pPg ){
Page *pNext = pPg->pLruNext;
assert( pPg->flags & PAGE_FREE );
lsmFree(pEnv, pPg->aData);
lsmFree(pEnv, pPg);
pPg = pNext;
}
pPg = pFS->pFree;
while( pPg ){
Page *pNext = pPg->pFreeNext;
lsmFree(pEnv, pPg);
pPg = pNext;
}
pFS->nCacheAlloc = 0;
pFS->pLruFirst = 0;
pFS->pLruLast = 0;
pFS->pFree = 0;
if( pFS->apHash ){
memset(pFS->apHash, 0, pFS->nHash*sizeof(pFS->apHash[0]));
}
if( db->compress.xCompress ){
pFS->pCompress = &db->compress;
pFS->nMapLimit = 0;
}else{
pFS->pCompress = 0;
if( db->iMmap==1 ){
pFS->nMapLimit = (i64)1 << 60;
}else{
pFS->nMapLimit = (i64)db->iMmap * 1024;
}
}
}
return LSM_OK;
}
void lsmFsClose(FileSystem *pFS){
if( pFS ){
Page *pPg;
lsm_env *pEnv = pFS->pEnv;
assert( pFS->nOut==0 );
pPg = pFS->pLruFirst;
while( pPg ){
Page *pNext = pPg->pLruNext;
if( pPg->flags & PAGE_FREE ) lsmFree(pEnv, pPg->aData);
lsmFree(pEnv, pPg);
pPg = pNext;
}
pPg = pFS->pFree;
while( pPg ){
Page *pNext = pPg->pFreeNext;
if( pPg->flags & PAGE_FREE ) lsmFree(pEnv, pPg->aData);
lsmFree(pEnv, pPg);
pPg = pNext;
}
if( pFS->fdDb ) lsmEnvClose(pFS->pEnv, pFS->fdDb );
if( pFS->fdLog ) lsmEnvClose(pFS->pEnv, pFS->fdLog );
lsmFree(pEnv, pFS->pLsmFile);
lsmFree(pEnv, pFS->apHash);
lsmFree(pEnv, pFS->aIBuffer);
lsmFree(pEnv, pFS->aOBuffer);
lsmFree(pEnv, pFS);
}
}
LsmFile *lsmFsDeferClose(FileSystem *pFS){
LsmFile *p = pFS->pLsmFile;
assert( p->pNext==0 );
p->pFile = pFS->fdDb;
pFS->fdDb = 0;
pFS->pLsmFile = 0;
return p;
}
int lsmFsFileid(lsm_db *pDb, void **ppId, int *pnId){
lsm_env *pEnv = pDb->pEnv;
FileSystem *pFS = pDb->pFS;
int rc;
int nId = 0;
void *pId;
rc = pEnv->xFileid(pFS->fdDb, 0, &nId);
pId = lsmMallocZeroRc(pEnv, nId, &rc);
if( rc==LSM_OK ) rc = pEnv->xFileid(pFS->fdDb, pId, &nId);
if( rc!=LSM_OK ){
lsmFree(pEnv, pId);
pId = 0;
nId = 0;
}
*ppId = pId;
*pnId = nId;
return rc;
}
int lsmFsPageSize(FileSystem *pFS){
return pFS->nPagesize;
}
int lsmFsBlockSize(FileSystem *pFS){
return pFS->nBlocksize;
}
void lsmFsSetPageSize(FileSystem *pFS, int nPgsz){
pFS->nPagesize = nPgsz;
pFS->nCacheMax = 2048*1024 / pFS->nPagesize;
}
void lsmFsSetBlockSize(FileSystem *pFS, int nBlocksize){
pFS->nBlocksize = nBlocksize;
}
static LsmPgno fsFirstPageOnBlock(FileSystem *pFS, int iBlock){
LsmPgno iPg;
if( pFS->pCompress ){
if( iBlock==1 ){
iPg = pFS->nMetasize * 2 + 4;
}else{
iPg = pFS->nBlocksize * (LsmPgno)(iBlock-1) + 4;
}
}else{
const i64 nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
if( iBlock==1 ){
iPg = 1 + ((pFS->nMetasize*2 + pFS->nPagesize - 1) / pFS->nPagesize);
}else{
iPg = 1 + (iBlock-1) * nPagePerBlock;
}
}
return iPg;
}
static LsmPgno fsLastPageOnBlock(FileSystem *pFS, int iBlock){
if( pFS->pCompress ){
return pFS->nBlocksize * (LsmPgno)iBlock - 1 - 4;
}else{
const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
return iBlock * nPagePerBlock;
}
}
static int fsPageToBlock(FileSystem *pFS, LsmPgno iPg){
if( pFS->pCompress ){
return (int)((iPg / pFS->nBlocksize) + 1);
}else{
return (int)(1 + ((iPg-1) / (pFS->nBlocksize / pFS->nPagesize)));
}
}
static int fsIsLast(FileSystem *pFS, LsmPgno iPg){
const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
assert( !pFS->pCompress );
return ( iPg && (iPg % nPagePerBlock)==0 );
}
static int fsIsFirst(FileSystem *pFS, LsmPgno iPg){
const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
assert( !pFS->pCompress );
return ( (iPg % nPagePerBlock)==1
|| (iPg<nPagePerBlock && iPg==fsFirstPageOnBlock(pFS, 1))
);
}
u8 *lsmFsPageData(Page *pPage, int *pnData){
if( pnData ){
*pnData = pPage->nData;
}
return pPage->aData;
}
LsmPgno lsmFsPageNumber(Page *pPage){
return pPage ? pPage->iPg : 0;
}
static void fsPageRemoveFromLru(FileSystem *pFS, Page *pPg){
assert( pPg->pLruNext || pPg==pFS->pLruLast );
assert( pPg->pLruPrev || pPg==pFS->pLruFirst );
if( pPg->pLruNext ){
pPg->pLruNext->pLruPrev = pPg->pLruPrev;
}else{
pFS->pLruLast = pPg->pLruPrev;
}
if( pPg->pLruPrev ){
pPg->pLruPrev->pLruNext = pPg->pLruNext;
}else{
pFS->pLruFirst = pPg->pLruNext;
}
pPg->pLruPrev = 0;
pPg->pLruNext = 0;
}
static void fsPageAddToLru(FileSystem *pFS, Page *pPg){
assert( pPg->pLruNext==0 && pPg->pLruPrev==0 );
pPg->pLruPrev = pFS->pLruLast;
if( pPg->pLruPrev ){
pPg->pLruPrev->pLruNext = pPg;
}else{
pFS->pLruFirst = pPg;
}
pFS->pLruLast = pPg;
}
static void fsPageRemoveFromHash(FileSystem *pFS, Page *pPg){
int iHash;
Page **pp;
iHash = fsHashKey(pFS->nHash, pPg->iPg);
for(pp=&pFS->apHash[iHash]; *pp!=pPg; pp=&(*pp)->pHashNext);
*pp = pPg->pHashNext;
pPg->pHashNext = 0;
}
static void fsPageBufferFree(Page *pPg){
pPg->pFS->nCacheAlloc--;
lsmFree(pPg->pFS->pEnv, pPg->aData);
lsmFree(pPg->pFS->pEnv, pPg);
}
void lsmFsPurgeCache(FileSystem *pFS){
Page *pPg;
pPg = pFS->pLruFirst;
while( pPg ){
Page *pNext = pPg->pLruNext;
assert( pPg->flags & PAGE_FREE );
fsPageRemoveFromHash(pFS, pPg);
fsPageBufferFree(pPg);
pPg = pNext;
}
pFS->pLruFirst = 0;
pFS->pLruLast = 0;
assert( pFS->nCacheAlloc<=pFS->nOut && pFS->nCacheAlloc>=0 );
}
static Page *fsPageFindInHash(FileSystem *pFS, LsmPgno iPg, int *piHash){
Page *p;
int iHash = fsHashKey(pFS->nHash, iPg);
if( piHash ) *piHash = iHash;
for(p=pFS->apHash[iHash]; p; p=p->pHashNext){
if( p->iPg==iPg) break;
}
return p;
}
static int fsPageBuffer(
FileSystem *pFS,
Page **ppOut
){
int rc = LSM_OK;
Page *pPage = 0;
if( pFS->pLruFirst==0 || pFS->nCacheAlloc<pFS->nCacheMax ){
pPage = lsmMallocZero(pFS->pEnv, sizeof(Page));
if( !pPage ){
rc = LSM_NOMEM_BKPT;
}else{
pPage->aData = (u8 *)lsmMalloc(pFS->pEnv, pFS->nPagesize);
if( !pPage->aData ){
lsmFree(pFS->pEnv, pPage);
rc = LSM_NOMEM_BKPT;
pPage = 0;
}else{
pFS->nCacheAlloc++;
}
}
}else{
u8 *aData;
pPage = pFS->pLruFirst;
aData = pPage->aData;
fsPageRemoveFromLru(pFS, pPage);
fsPageRemoveFromHash(pFS, pPage);
memset(pPage, 0, sizeof(Page));
pPage->aData = aData;
}
if( pPage ){
pPage->flags = PAGE_FREE;
}
*ppOut = pPage;
return rc;
}
static void fsGrowMapping(
FileSystem *pFS,
i64 iSz,
int *pRc
){
assert( PAGE_HASPREV==4 );
if( *pRc==LSM_OK && iSz>pFS->nMap ){
int rc;
u8 *aOld = pFS->pMap;
rc = lsmEnvRemap(pFS->pEnv, pFS->fdDb, iSz, &pFS->pMap, &pFS->nMap);
if( rc==LSM_OK && pFS->pMap!=aOld ){
Page *pFix;
i64 iOff = (u8 *)pFS->pMap - aOld;
for(pFix=pFS->pMapped; pFix; pFix=pFix->pMappedNext){
pFix->aData += iOff;
}
lsmSortedRemap(pFS->pDb);
}
*pRc = rc;
}
}
int lsmFsUnmap(FileSystem *pFS){
int rc = LSM_OK;
if( pFS ){
rc = lsmEnvRemap(pFS->pEnv, pFS->fdDb, -1, &pFS->pMap, &pFS->nMap);
}
return rc;
}
int lsmFsSyncDb(FileSystem *pFS, int nBlock){
return lsmEnvSync(pFS->pEnv, pFS->fdDb);
}
static int fsRedirectBlock(Redirect *p, int iBlk){
if( p ){
int i;
for(i=0; i<p->n; i++){
if( iBlk==p->a[i].iFrom ) return p->a[i].iTo;
}
}
assert( iBlk!=0 );
return iBlk;
}
LsmPgno lsmFsRedirectPage(FileSystem *pFS, Redirect *pRedir, LsmPgno iPg){
LsmPgno iReal = iPg;
if( pRedir ){
const int nPagePerBlock = (
pFS->pCompress ? pFS->nBlocksize : (pFS->nBlocksize / pFS->nPagesize)
);
int iBlk = fsPageToBlock(pFS, iPg);
int i;
for(i=0; i<pRedir->n; i++){
int iFrom = pRedir->a[i].iFrom;
if( iFrom>iBlk ) break;
if( iFrom==iBlk ){
int iTo = pRedir->a[i].iTo;
iReal = iPg - (LsmPgno)(iFrom - iTo) * nPagePerBlock;
if( iTo==1 ){
iReal += (fsFirstPageOnBlock(pFS, 1)-1);
}
break;
}
}
}
assert( iReal!=0 );
return iReal;
}
static int fsPageGet(FileSystem *, Segment *, LsmPgno, int, Page **, int *);
static int fsBlockNext(
FileSystem *pFS,
Segment *pSeg,
int iBlock,
int *piNext
){
int rc;
int iRead;
if( pSeg ){
iRead = fsRedirectBlock(pSeg->pRedirect, iBlock);
}else{
iRead = iBlock;
}
assert( pFS->nMapLimit==0 || pFS->pCompress==0 );
if( pFS->pCompress ){
i64 iOff;
u8 aNext[4];
iOff = (i64)iRead * pFS->nBlocksize - sizeof(aNext);
rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aNext, sizeof(aNext));
if( rc==LSM_OK ){
*piNext = (int)lsmGetU32(aNext);
}
}else{
const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
Page *pLast;
rc = fsPageGet(pFS, 0, iRead*nPagePerBlock, 0, &pLast, 0);
if( rc==LSM_OK ){
*piNext = lsmGetU32(&pLast->aData[pFS->nPagesize-4]);
lsmFsPageRelease(pLast);
}
}
if( pSeg ){
*piNext = fsRedirectBlock(pSeg->pRedirect, *piNext);
}
return rc;
}
LsmPgno fsLastPageOnPagesBlock(FileSystem *pFS, LsmPgno iPg){
return fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iPg));
}
static int fsReadData(
FileSystem *pFS,
Segment *pSeg,
i64 iOff,
u8 *aData,
int nData
){
i64 iEob;
int nRead;
int rc;
assert( pFS->pCompress );
iEob = fsLastPageOnPagesBlock(pFS, iOff) + 1;
nRead = (int)LSM_MIN(iEob - iOff, nData);
rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aData, nRead);
if( rc==LSM_OK && nRead!=nData ){
int iBlk;
rc = fsBlockNext(pFS, pSeg, fsPageToBlock(pFS, iOff), &iBlk);
if( rc==LSM_OK ){
i64 iOff2 = fsFirstPageOnBlock(pFS, iBlk);
rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff2, &aData[nRead], nData-nRead);
}
}
return rc;
}
static int fsBlockPrev(
FileSystem *pFS,
Segment *pSeg,
int iBlock,
int *piPrev
){
int rc = LSM_OK;
assert( pFS->nMapLimit==0 || pFS->pCompress==0 );
assert( iBlock>0 );
if( pFS->pCompress ){
i64 iOff = fsFirstPageOnBlock(pFS, iBlock) - 4;
u8 aPrev[4];
rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aPrev, sizeof(aPrev));
if( rc==LSM_OK ){
Redirect *pRedir = (pSeg ? pSeg->pRedirect : 0);
*piPrev = fsRedirectBlock(pRedir, (int)lsmGetU32(aPrev));
}
}else{
assert( 0 );
}
return rc;
}
static void putRecordSize(u8 *aBuf, int nByte, int bFree){
aBuf[0] = (u8)(nByte >> 14) | 0x80;
aBuf[1] = ((u8)(nByte >> 7) & 0x7F) | (bFree ? 0x00 : 0x80);
aBuf[2] = (u8)nByte | 0x80;
}
static int getRecordSize(u8 *aBuf, int *pbFree){
int nByte;
nByte = (aBuf[0] & 0x7F) << 14;
nByte += (aBuf[1] & 0x7F) << 7;
nByte += (aBuf[2] & 0x7F);
*pbFree = !(aBuf[1] & 0x80);
return nByte;
}
static int fsSubtractOffset(
FileSystem *pFS,
Segment *pSeg,
i64 iOff,
int iSub,
i64 *piRes
){
i64 iStart;
int iBlk = 0;
int rc;
assert( pFS->pCompress );
iStart = fsFirstPageOnBlock(pFS, fsPageToBlock(pFS, iOff));
if( (iOff-iSub)>=iStart ){
*piRes = (iOff-iSub);
return LSM_OK;
}
rc = fsBlockPrev(pFS, pSeg, fsPageToBlock(pFS, iOff), &iBlk);
*piRes = fsLastPageOnBlock(pFS, iBlk) - iSub + (iOff - iStart + 1);
return rc;
}
static int fsAddOffset(
FileSystem *pFS,
Segment *pSeg,
i64 iOff,
int iAdd,
i64 *piRes
){
i64 iEob;
int iBlk;
int rc;
assert( pFS->pCompress );
iEob = fsLastPageOnPagesBlock(pFS, iOff);
if( (iOff+iAdd)<=iEob ){
*piRes = (iOff+iAdd);
return LSM_OK;
}
rc = fsBlockNext(pFS, pSeg, fsPageToBlock(pFS, iOff), &iBlk);
*piRes = fsFirstPageOnBlock(pFS, iBlk) + iAdd - (iEob - iOff + 1);
return rc;
}
static int fsAllocateBuffer(FileSystem *pFS, int bWrite){
u8 **pp;
assert( pFS->pCompress );
if( pFS->nBuffer==0 ){
assert( pFS->aIBuffer==0 && pFS->aOBuffer==0 );
pFS->nBuffer = pFS->pCompress->xBound(pFS->pCompress->pCtx, pFS->nPagesize);
if( pFS->nBuffer<(pFS->szSector+6) ){
pFS->nBuffer = pFS->szSector+6;
}
}
pp = (bWrite ? &pFS->aOBuffer : &pFS->aIBuffer);
if( *pp==0 ){
*pp = lsmMalloc(pFS->pEnv, LSM_MAX(pFS->nBuffer, pFS->nPagesize));
if( *pp==0 ) return LSM_NOMEM_BKPT;
}
return LSM_OK;
}
static int fsReadPagedata(
FileSystem *pFS,
Segment *pSeg,
Page *pPg,
int *pnSpace
){
lsm_compress *p = pFS->pCompress;
i64 iOff = pPg->iPg;
u8 aSz[3];
int rc;
assert( p && pPg->nCompress==0 );
if( fsAllocateBuffer(pFS, 0) ) return LSM_NOMEM;
rc = fsReadData(pFS, pSeg, iOff, aSz, sizeof(aSz));
if( rc==LSM_OK ){
int bFree;
if( aSz[0] & 0x80 ){
pPg->nCompress = (int)getRecordSize(aSz, &bFree);
}else{
pPg->nCompress = (int)aSz[0] - sizeof(aSz)*2;
bFree = 1;
}
if( bFree ){
if( pnSpace ){
*pnSpace = pPg->nCompress + sizeof(aSz)*2;
}else{
rc = LSM_CORRUPT_BKPT;
}
}else{
rc = fsAddOffset(pFS, pSeg, iOff, 3, &iOff);
if( rc==LSM_OK ){
if( pPg->nCompress>pFS->nBuffer ){
rc = LSM_CORRUPT_BKPT;
}else{
rc = fsReadData(pFS, pSeg, iOff, pFS->aIBuffer, pPg->nCompress);
}
if( rc==LSM_OK ){
int n = pFS->nPagesize;
rc = p->xUncompress(p->pCtx,
(char *)pPg->aData, &n,
(const char *)pFS->aIBuffer, pPg->nCompress
);
if( rc==LSM_OK && n!=pPg->pFS->nPagesize ){
rc = LSM_CORRUPT_BKPT;
}
}
}
}
}
return rc;
}
static int fsPageGet(
FileSystem *pFS,
Segment *pSeg,
LsmPgno iPg,
int noContent,
Page **ppPg,
int *pnSpace
){
Page *p;
int iHash;
int rc = LSM_OK;
LsmPgno iReal = lsmFsRedirectPage(pFS, (pSeg ? pSeg->pRedirect : 0), iPg);
assert_lists_are_ok(pFS);
assert( iPg>=fsFirstPageOnBlock(pFS, 1) );
assert( iReal>=fsFirstPageOnBlock(pFS, 1) );
*ppPg = 0;
p = fsPageFindInHash(pFS, iReal, &iHash);
if( p ){
assert( p->flags & PAGE_FREE );
if( p->nRef==0 ) fsPageRemoveFromLru(pFS, p);
}else{
if( fsMmapPage(pFS, iReal) ){
i64 iEnd = (i64)iReal * pFS->nPagesize;
fsGrowMapping(pFS, iEnd, &rc);
if( rc!=LSM_OK ) return rc;
if( pFS->pFree ){
p = pFS->pFree;
pFS->pFree = p->pFreeNext;
assert( p->nRef==0 );
}else{
p = lsmMallocZeroRc(pFS->pEnv, sizeof(Page), &rc);
if( rc ) return rc;
p->pFS = pFS;
}
p->aData = &((u8 *)pFS->pMap)[pFS->nPagesize * (iReal-1)];
p->iPg = iReal;
assert( p->pMappedNext==0 );
p->pMappedNext = pFS->pMapped;
pFS->pMapped = p;
assert( pFS->pCompress==0 );
assert( (p->flags & PAGE_FREE)==0 );
}else{
rc = fsPageBuffer(pFS, &p);
if( rc==LSM_OK ){
int nSpace = 0;
p->iPg = iReal;
p->nRef = 0;
p->pFS = pFS;
assert( p->flags==0 || p->flags==PAGE_FREE );
#ifdef LSM_DEBUG
memset(p->aData, 0x56, pFS->nPagesize);
#endif
assert( p->pLruNext==0 && p->pLruPrev==0 );
if( noContent==0 ){
if( pFS->pCompress ){
rc = fsReadPagedata(pFS, pSeg, p, &nSpace);
}else{
int nByte = pFS->nPagesize;
i64 iOff = (i64)(iReal-1) * pFS->nPagesize;
rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, p->aData, nByte);
}
pFS->nRead++;
}
if( rc==LSM_OK && nSpace==0 ){
p->pHashNext = pFS->apHash[iHash];
pFS->apHash[iHash] = p;
}else{
fsPageBufferFree(p);
p = 0;
if( pnSpace ) *pnSpace = nSpace;
}
}
}
assert( (rc==LSM_OK && (p || (pnSpace && *pnSpace)))
|| (rc!=LSM_OK && p==0)
);
}
if( rc==LSM_OK && p ){
if( pFS->pCompress==0 && (fsIsLast(pFS, iReal) || fsIsFirst(pFS, iReal)) ){
p->nData = pFS->nPagesize - 4;
if( fsIsFirst(pFS, iReal) && p->nRef==0 ){
p->aData += 4;
p->flags |= PAGE_HASPREV;
}
}else{
p->nData = pFS->nPagesize;
}
pFS->nOut += (p->nRef==0);
p->nRef++;
}
*ppPg = p;
return rc;
}
int lsmFsReadSyncedId(lsm_db *db, int iMeta, i64 *piVal){
FileSystem *pFS = db->pFS;
int rc = LSM_OK;
assert( iMeta==1 || iMeta==2 );
if( pFS->nMapLimit>0 ){
fsGrowMapping(pFS, iMeta*LSM_META_PAGE_SIZE, &rc);
if( rc==LSM_OK ){
*piVal = (i64)lsmGetU64(&((u8 *)pFS->pMap)[(iMeta-1)*LSM_META_PAGE_SIZE]);
}
}else{
MetaPage *pMeta = 0;
rc = lsmFsMetaPageGet(pFS, 0, iMeta, &pMeta);
if( rc==LSM_OK ){
*piVal = (i64)lsmGetU64(pMeta->aData);
lsmFsMetaPageRelease(pMeta);
}
}
return rc;
}
static int fsRunEndsBetween(
Segment *pRun,
Segment *pIgnore,
LsmPgno iFirst,
LsmPgno iLast
){
return (pRun!=pIgnore && (
(pRun->iFirst>=iFirst && pRun->iFirst<=iLast)
|| (pRun->iLastPg>=iFirst && pRun->iLastPg<=iLast)
));
}
static int fsLevelEndsBetween(
Level *pLevel,
Segment *pIgnore,
LsmPgno iFirst,
LsmPgno iLast
){
int i;
if( fsRunEndsBetween(&pLevel->lhs, pIgnore, iFirst, iLast) ){
return 1;
}
for(i=0; i<pLevel->nRight; i++){
if( fsRunEndsBetween(&pLevel->aRhs[i], pIgnore, iFirst, iLast) ){
return 1;
}
}
return 0;
}
static int fsFreeBlock(
FileSystem *pFS,
Snapshot *pSnapshot,
Segment *pIgnore,
int iBlk
){
int rc = LSM_OK;
LsmPgno iFirst;
LsmPgno iLast;
Level *pLevel;
int iIn;
int iOut = 0;
LsmPgno *aApp = pSnapshot->aiAppend;
iFirst = fsFirstPageOnBlock(pFS, iBlk);
iLast = fsLastPageOnBlock(pFS, iBlk);
for(pLevel=lsmDbSnapshotLevel(pSnapshot); pLevel; pLevel=pLevel->pNext){
if( fsLevelEndsBetween(pLevel, pIgnore, iFirst, iLast) ){
return LSM_OK;
}
}
for(iIn=0; iIn<LSM_APPLIST_SZ; iIn++){
if( aApp[iIn]<iFirst || aApp[iIn]>iLast ){
aApp[iOut++] = aApp[iIn];
}
}
while( iOut<LSM_APPLIST_SZ ) aApp[iOut++] = 0;
if( rc==LSM_OK ){
rc = lsmBlockFree(pFS->pDb, iBlk);
}
return rc;
}
int lsmFsSortedDelete(
FileSystem *pFS,
Snapshot *pSnapshot,
int bZero,
Segment *pDel
){
if( pDel->iFirst ){
int rc = LSM_OK;
int iBlk;
int iLastBlk;
iBlk = fsPageToBlock(pFS, pDel->iFirst);
iLastBlk = fsPageToBlock(pFS, pDel->iLastPg);
while( iBlk && rc==LSM_OK ){
int iNext = 0;
if( iBlk!=iLastBlk ){
rc = fsBlockNext(pFS, pDel, iBlk, &iNext);
}else if( bZero==0 && pDel->iLastPg!=fsLastPageOnBlock(pFS, iLastBlk) ){
break;
}
rc = fsFreeBlock(pFS, pSnapshot, pDel, iBlk);
iBlk = iNext;
}
if( pDel->pRedirect ){
assert( pDel->pRedirect==&pSnapshot->redirect );
pSnapshot->redirect.n = 0;
}
if( bZero ) memset(pDel, 0, sizeof(Segment));
}
return LSM_OK;
}
static LsmPgno firstOnBlock(
FileSystem *pFS,
int iBlk,
LsmPgno *aPgno,
int nPgno
){
LsmPgno iRet = 0;
int i;
for(i=0; i<nPgno; i++){
LsmPgno iPg = aPgno[i];
if( fsPageToBlock(pFS, iPg)==iBlk && (iRet==0 || iPg<iRet) ){
iRet = iPg;
}
}
return iRet;
}
#ifndef NDEBUG
static int fsPageRedirects(FileSystem *pFS, Segment *p, LsmPgno iPg){
return (iPg!=0 && iPg!=lsmFsRedirectPage(pFS, p->pRedirect, iPg));
}
static int fsSegmentRedirects(FileSystem *pFS, Segment *p){
return (p && (
fsPageRedirects(pFS, p, p->iFirst)
|| fsPageRedirects(pFS, p, p->iRoot)
|| fsPageRedirects(pFS, p, p->iLastPg)
));
}
#endif
void lsmFsGobble(
lsm_db *pDb,
Segment *pRun,
LsmPgno *aPgno,
int nPgno
){
int rc = LSM_OK;
FileSystem *pFS = pDb->pFS;
Snapshot *pSnapshot = pDb->pWorker;
int iBlk;
assert( pRun->nSize>0 );
assert( 0==fsSegmentRedirects(pFS, pRun) );
assert( nPgno>0 && 0==fsPageRedirects(pFS, pRun, aPgno[0]) );
iBlk = fsPageToBlock(pFS, pRun->iFirst);
pRun->nSize += (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
while( rc==LSM_OK ){
int iNext = 0;
LsmPgno iFirst = firstOnBlock(pFS, iBlk, aPgno, nPgno);
if( iFirst ){
pRun->iFirst = iFirst;
break;
}
rc = fsBlockNext(pFS, pRun, iBlk, &iNext);
if( rc==LSM_OK ) rc = fsFreeBlock(pFS, pSnapshot, pRun, iBlk);
pRun->nSize -= (
1 + fsLastPageOnBlock(pFS, iBlk) - fsFirstPageOnBlock(pFS, iBlk)
);
iBlk = iNext;
}
pRun->nSize -= (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
assert( pRun->nSize>0 );
}
static int fsNextPageOffset(
FileSystem *pFS,
Segment *pSeg,
LsmPgno iPg,
int nByte,
LsmPgno *piNext
){
LsmPgno iNext;
int rc;
assert( pFS->pCompress );
rc = fsAddOffset(pFS, pSeg, iPg, nByte-1, &iNext);
if( pSeg && iNext==pSeg->iLastPg ){
iNext = 0;
}else if( rc==LSM_OK ){
rc = fsAddOffset(pFS, pSeg, iNext, 1, &iNext);
}
*piNext = iNext;
return rc;
}
static int fsGetPageBefore(
FileSystem *pFS,
Segment *pSeg,
LsmPgno iPg,
LsmPgno *piPrev
){
u8 aSz[3];
int rc;
i64 iRead;
assert( pFS->pCompress );
rc = fsSubtractOffset(pFS, pSeg, iPg, sizeof(aSz), &iRead);
if( rc==LSM_OK ) rc = fsReadData(pFS, pSeg, iRead, aSz, sizeof(aSz));
if( rc==LSM_OK ){
int bFree;
int nSz;
if( aSz[2] & 0x80 ){
nSz = getRecordSize(aSz, &bFree) + sizeof(aSz)*2;
}else{
nSz = (int)(aSz[2] & 0x7F);
bFree = 1;
}
rc = fsSubtractOffset(pFS, pSeg, iPg, nSz, piPrev);
}
return rc;
}
int lsmFsDbPageNext(Segment *pRun, Page *pPg, int eDir, Page **ppNext){
int rc = LSM_OK;
FileSystem *pFS = pPg->pFS;
LsmPgno iPg = pPg->iPg;
assert( 0==fsSegmentRedirects(pFS, pRun) );
if( pFS->pCompress ){
int nSpace = pPg->nCompress + 2*3;
do {
if( eDir>0 ){
rc = fsNextPageOffset(pFS, pRun, iPg, nSpace, &iPg);
}else{
if( iPg==pRun->iFirst ){
iPg = 0;
}else{
rc = fsGetPageBefore(pFS, pRun, iPg, &iPg);
}
}
nSpace = 0;
if( iPg!=0 ){
rc = fsPageGet(pFS, pRun, iPg, 0, ppNext, &nSpace);
assert( (*ppNext==0)==(rc!=LSM_OK || nSpace>0) );
}else{
*ppNext = 0;
}
}while( nSpace>0 && rc==LSM_OK );
}else{
Redirect *pRedir = pRun ? pRun->pRedirect : 0;
assert( eDir==1 || eDir==-1 );
if( eDir<0 ){
if( pRun && iPg==pRun->iFirst ){
*ppNext = 0;
return LSM_OK;
}else if( fsIsFirst(pFS, iPg) ){
assert( pPg->flags & PAGE_HASPREV );
iPg = fsLastPageOnBlock(pFS, lsmGetU32(&pPg->aData[-4]));
}else{
iPg--;
}
}else{
if( pRun ){
if( iPg==pRun->iLastPg ){
*ppNext = 0;
return LSM_OK;
}
}
if( fsIsLast(pFS, iPg) ){
int iBlk = fsRedirectBlock(
pRedir, lsmGetU32(&pPg->aData[pFS->nPagesize-4])
);
iPg = fsFirstPageOnBlock(pFS, iBlk);
}else{
iPg++;
}
}
rc = fsPageGet(pFS, pRun, iPg, 0, ppNext, 0);
}
return rc;
}
static LsmPgno findAppendPoint(FileSystem *pFS, Level *pLvl){
int i;
LsmPgno *aiAppend = pFS->pDb->pWorker->aiAppend;
LsmPgno iRet = 0;
for(i=LSM_APPLIST_SZ-1; iRet==0 && i>=0; i--){
if( (iRet = aiAppend[i]) ){
if( pLvl ){
int iBlk = fsPageToBlock(pFS, iRet);
int j;
for(j=0; iRet && j<pLvl->nRight; j++){
if( fsPageToBlock(pFS, pLvl->aRhs[j].iLastPg)==iBlk ){
iRet = 0;
}
}
}
if( iRet ) aiAppend[i] = 0;
}
}
return iRet;
}
int lsmFsSortedAppend(
FileSystem *pFS,
Snapshot *pSnapshot,
Level *pLvl,
int bDefer,
Page **ppOut
){
int rc = LSM_OK;
Page *pPg = 0;
LsmPgno iApp = 0;
LsmPgno iNext = 0;
Segment *p = &pLvl->lhs;
LsmPgno iPrev = p->iLastPg;
*ppOut = 0;
assert( p->pRedirect==0 );
if( pFS->pCompress || bDefer ){
rc = fsPageBuffer(pFS, &pPg);
if( rc==LSM_OK ){
pPg->pFS = pFS;
pPg->pSeg = p;
pPg->iPg = 0;
pPg->flags |= PAGE_DIRTY;
pPg->nData = pFS->nPagesize;
assert( pPg->aData );
if( pFS->pCompress==0 ) pPg->nData -= 4;
pPg->nRef = 1;
pFS->nOut++;
}
}else{
if( iPrev==0 ){
iApp = findAppendPoint(pFS, pLvl);
}else if( fsIsLast(pFS, iPrev) ){
int iNext2;
rc = fsBlockNext(pFS, 0, fsPageToBlock(pFS, iPrev), &iNext2);
if( rc!=LSM_OK ) return rc;
iApp = fsFirstPageOnBlock(pFS, iNext2);
}else{
iApp = iPrev + 1;
}
if( iApp==0 || fsIsLast(pFS, iApp) ){
int iNew;
rc = lsmBlockAllocate(pFS->pDb, 0, &iNew);
if( rc!=LSM_OK ) return rc;
if( iApp==0 ){
iApp = fsFirstPageOnBlock(pFS, iNew);
}else{
iNext = fsFirstPageOnBlock(pFS, iNew);
}
}
pPg = 0;
rc = fsPageGet(pFS, 0, iApp, 1, &pPg, 0);
assert( rc==LSM_OK || pPg==0 );
if( rc==LSM_OK ){
p->nSize++;
p->iLastPg = iApp;
if( p->iFirst==0 ) p->iFirst = iApp;
pPg->flags |= PAGE_DIRTY;
if( fsIsLast(pFS, iApp) ){
lsmPutU32(&pPg->aData[pFS->nPagesize-4], fsPageToBlock(pFS, iNext));
}else if( fsIsFirst(pFS, iApp) ){
lsmPutU32(&pPg->aData[-4], fsPageToBlock(pFS, iPrev));
}
}
}
*ppOut = pPg;
return rc;
}
int lsmFsSortedFinish(FileSystem *pFS, Segment *p){
int rc = LSM_OK;
if( p && p->iLastPg ){
assert( p->pRedirect==0 );
if( fsLastPageOnPagesBlock(pFS, p->iLastPg)!=p->iLastPg ){
int i;
LsmPgno *aiAppend = pFS->pDb->pWorker->aiAppend;
for(i=0; i<LSM_APPLIST_SZ; i++){
if( aiAppend[i]==0 ){
aiAppend[i] = p->iLastPg+1;
break;
}
}
}else if( pFS->pCompress==0 ){
Page *pLast;
rc = fsPageGet(pFS, 0, p->iLastPg, 0, &pLast, 0);
if( rc==LSM_OK ){
int iBlk = (int)lsmGetU32(&pLast->aData[pFS->nPagesize-4]);
lsmBlockRefree(pFS->pDb, iBlk);
lsmFsPageRelease(pLast);
}
}else{
int iBlk = 0;
rc = fsBlockNext(pFS, p, fsPageToBlock(pFS, p->iLastPg), &iBlk);
if( rc==LSM_OK ){
lsmBlockRefree(pFS->pDb, iBlk);
}
}
}
return rc;
}
int lsmFsDbPageGet(FileSystem *pFS, Segment *pSeg, LsmPgno iPg, Page **ppPg){
return fsPageGet(pFS, pSeg, iPg, 0, ppPg, 0);
}
int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg){
int rc;
LsmPgno iPg = pSeg->iLastPg;
if( pFS->pCompress ){
int nSpace;
iPg++;
do {
nSpace = 0;
rc = fsGetPageBefore(pFS, pSeg, iPg, &iPg);
if( rc==LSM_OK ){
rc = fsPageGet(pFS, pSeg, iPg, 0, ppPg, &nSpace);
}
}while( rc==LSM_OK && nSpace>0 );
}else{
rc = fsPageGet(pFS, pSeg, iPg, 0, ppPg, 0);
}
return rc;
}
int lsmFsMetaPageGet(
FileSystem *pFS,
int bWrite,
int iPg,
MetaPage **ppPg
){
int rc = LSM_OK;
MetaPage *pPg;
assert( iPg==1 || iPg==2 );
pPg = lsmMallocZeroRc(pFS->pEnv, sizeof(Page), &rc);
if( pPg ){
i64 iOff = (iPg-1) * pFS->nMetasize;
if( pFS->nMapLimit>0 ){
fsGrowMapping(pFS, 2*pFS->nMetasize, &rc);
pPg->aData = (u8 *)(pFS->pMap) + iOff;
}else{
pPg->aData = lsmMallocRc(pFS->pEnv, pFS->nMetasize, &rc);
if( rc==LSM_OK && bWrite==0 ){
rc = lsmEnvRead(
pFS->pEnv, pFS->fdDb, iOff, pPg->aData, pFS->nMetaRwSize
);
}
#ifndef NDEBUG
else if( rc==LSM_OK ){
memset( pPg->aData, 0x77, pFS->nMetasize );
}
#endif
}
if( rc!=LSM_OK ){
if( pFS->nMapLimit==0 ) lsmFree(pFS->pEnv, pPg->aData);
lsmFree(pFS->pEnv, pPg);
pPg = 0;
}else{
pPg->iPg = iPg;
pPg->bWrite = bWrite;
pPg->pFS = pFS;
}
}
*ppPg = pPg;
return rc;
}
int lsmFsMetaPageRelease(MetaPage *pPg){
int rc = LSM_OK;
if( pPg ){
FileSystem *pFS = pPg->pFS;
if( pFS->nMapLimit==0 ){
if( pPg->bWrite ){
i64 iOff = (pPg->iPg==2 ? pFS->nMetasize : 0);
int nWrite = pFS->nMetaRwSize;
rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, pPg->aData, nWrite);
}
lsmFree(pFS->pEnv, pPg->aData);
}
lsmFree(pFS->pEnv, pPg);
}
return rc;
}
u8 *lsmFsMetaPageData(MetaPage *pPg, int *pnData){
if( pnData ) *pnData = pPg->pFS->nMetaRwSize;
return pPg->aData;
}
#ifndef NDEBUG
int lsmFsPageWritable(Page *pPg){
return (pPg->flags & PAGE_DIRTY) ? 1 : 0;
}
#endif
static void fsMovePage(
FileSystem *pFS,
int iTo,
int iFrom,
LsmPgno *piPg
){
LsmPgno iPg = *piPg;
if( iFrom==fsPageToBlock(pFS, iPg) ){
const int nPagePerBlock = (
pFS->pCompress ? pFS ->nBlocksize : (pFS->nBlocksize / pFS->nPagesize)
);
*piPg = iPg - (LsmPgno)(iFrom - iTo) * nPagePerBlock;
}
}
int lsmFsMoveBlock(FileSystem *pFS, Segment *pSeg, int iTo, int iFrom){
Snapshot *p = pFS->pDb->pWorker;
int rc = LSM_OK;
int i;
i64 nMap;
i64 iFromOff = (i64)(iFrom-1) * pFS->nBlocksize;
i64 iToOff = (i64)(iTo-1) * pFS->nBlocksize;
assert( iTo!=1 );
assert( iFrom>iTo );
nMap = LSM_MIN(pFS->nMapLimit, (i64)iFrom * pFS->nBlocksize);
fsGrowMapping(pFS, nMap, &rc);
if( rc==LSM_OK ){
const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
int nSz = pFS->nPagesize;
u8 *aBuf = 0;
u8 *aData = 0;
for(i=0; rc==LSM_OK && i<nPagePerBlock; i++){
i64 iOff = iFromOff + i*nSz;
if( (iOff+nSz)<=pFS->nMapLimit ){
u8 *aMap = (u8 *)(pFS->pMap);
aData = &aMap[iOff];
}else{
if( aBuf==0 ){
aBuf = (u8 *)lsmMallocRc(pFS->pEnv, nSz, &rc);
if( aBuf==0 ) break;
}
aData = aBuf;
rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aData, nSz);
}
if( rc==LSM_OK ){
iOff = iToOff + i*nSz;
if( (iOff+nSz)<=pFS->nMapLimit ){
u8 *aMap = (u8 *)(pFS->pMap);
memcpy(&aMap[iOff], aData, nSz);
}else{
rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, aData, nSz);
}
}
}
lsmFree(pFS->pEnv, aBuf);
lsmFsPurgeCache(pFS);
}
for(i=0; i<LSM_APPLIST_SZ; i++){
fsMovePage(pFS, iTo, iFrom, &p->aiAppend[i]);
}
fsMovePage(pFS, iTo, iFrom, &pSeg->iFirst);
fsMovePage(pFS, iTo, iFrom, &pSeg->iLastPg);
fsMovePage(pFS, iTo, iFrom, &pSeg->iRoot);
return rc;
}
static LsmPgno fsAppendData(
FileSystem *pFS,
Segment *pSeg,
const u8 *aData,
int nData,
int *pRc
){
LsmPgno iRet = 0;
int rc = *pRc;
assert( pFS->pCompress );
if( rc==LSM_OK ){
int nRem = 0;
int nWrite = 0;
LsmPgno iLastOnBlock;
LsmPgno iApp = pSeg->iLastPg+1;
if( iApp==1 ){
pSeg->iFirst = iApp = findAppendPoint(pFS, 0);
if( iApp==0 ){
int iBlk;
rc = lsmBlockAllocate(pFS->pDb, 0, &iBlk);
pSeg->iFirst = iApp = fsFirstPageOnBlock(pFS, iBlk);
}
}
iRet = iApp;
iLastOnBlock = fsLastPageOnPagesBlock(pFS, iApp);
if( rc==LSM_OK ){
int nSpace = (int)(iLastOnBlock - iApp + 1);
nWrite = LSM_MIN(nData, nSpace);
nRem = nData - nWrite;
assert( nWrite>=0 );
if( nWrite!=0 ){
rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aData, nWrite);
}
iApp += nWrite;
}
assert( nRem<=0 || (iApp-1)==iLastOnBlock );
if( rc==LSM_OK && (iApp-1)==iLastOnBlock ){
u8 aPtr[4];
int iBlk;
if( nWrite>0 ){
rc = lsmBlockAllocate(pFS->pDb, 0, &iBlk);
if( rc==LSM_OK ){
assert( iApp==(fsPageToBlock(pFS, iApp)*pFS->nBlocksize)-4 );
lsmPutU32(aPtr, iBlk);
rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aPtr, sizeof(aPtr));
}
if( rc==LSM_OK ){
LsmPgno iWrite;
lsmPutU32(aPtr, fsPageToBlock(pFS, iApp));
iWrite = fsFirstPageOnBlock(pFS, iBlk);
rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iWrite-4, aPtr, sizeof(aPtr));
if( nRem>0 ) iApp = iWrite;
}
}else{
assert( nRem>0 );
assert( pSeg->pRedirect==0 );
rc = fsBlockNext(pFS, 0, fsPageToBlock(pFS, iApp), &iBlk);
iRet = iApp = fsFirstPageOnBlock(pFS, iBlk);
}
if( rc==LSM_OK && nRem>0 ){
rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, &aData[nWrite], nRem);
iApp += nRem;
}
}
pSeg->iLastPg = iApp-1;
*pRc = rc;
}
return iRet;
}
static int fsCompressIntoBuffer(FileSystem *pFS, Page *pPg){
lsm_compress *p = pFS->pCompress;
if( fsAllocateBuffer(pFS, 1) ) return LSM_NOMEM;
assert( pPg->nData==pFS->nPagesize );
pPg->nCompress = pFS->nBuffer;
return p->xCompress(p->pCtx,
(char *)pFS->aOBuffer, &pPg->nCompress,
(const char *)pPg->aData, pPg->nData
);
}
static int fsAppendPage(
FileSystem *pFS,
Segment *pSeg,
LsmPgno *piNew,
int *piPrev,
int *piNext
){
LsmPgno iPrev = pSeg->iLastPg;
int rc;
assert( iPrev!=0 );
*piPrev = 0;
*piNext = 0;
if( fsIsLast(pFS, iPrev) ){
int iNext;
int iBlk = fsPageToBlock(pFS, iPrev);
assert( pSeg->pRedirect==0 );
rc = fsBlockNext(pFS, 0, iBlk, &iNext);
if( rc!=LSM_OK ) return rc;
*piNew = fsFirstPageOnBlock(pFS, iNext);
*piPrev = iBlk;
}else{
*piNew = iPrev+1;
if( fsIsLast(pFS, *piNew) ){
int iBlk;
rc = lsmBlockAllocate(pFS->pDb, 0, &iBlk);
if( rc!=LSM_OK ) return rc;
*piNext = iBlk;
}
}
pSeg->nSize++;
pSeg->iLastPg = *piNew;
return LSM_OK;
}
void lsmFsFlushWaiting(FileSystem *pFS, int *pRc){
int rc = *pRc;
Page *pPg;
pPg = pFS->pWaiting;
pFS->pWaiting = 0;
while( pPg ){
Page *pNext = pPg->pWaitingNext;
if( rc==LSM_OK ) rc = lsmFsPagePersist(pPg);
assert( pPg->nRef==1 );
lsmFsPageRelease(pPg);
pPg = pNext;
}
*pRc = rc;
}
static void fsRemoveHashEntry(FileSystem *pFS, LsmPgno iPg){
Page *p;
int iHash = fsHashKey(pFS->nHash, iPg);
for(p=pFS->apHash[iHash]; p && p->iPg!=iPg; p=p->pHashNext);
if( p ){
assert( p->nRef==0 || (p->flags & PAGE_FREE)==0 );
fsPageRemoveFromHash(pFS, p);
p->iPg = 0;
iHash = fsHashKey(pFS->nHash, 0);
p->pHashNext = pFS->apHash[iHash];
pFS->apHash[iHash] = p;
}
}
int lsmFsPagePersist(Page *pPg){
int rc = LSM_OK;
if( pPg && (pPg->flags & PAGE_DIRTY) ){
FileSystem *pFS = pPg->pFS;
if( pFS->pCompress ){
int iHash;
u8 aSz[3];
assert( pPg->pSeg && pPg->iPg==0 && pPg->nCompress==0 );
rc = fsCompressIntoBuffer(pFS, pPg);
putRecordSize(aSz, pPg->nCompress, 0);
pPg->iPg = fsAppendData(pFS, pPg->pSeg, aSz, sizeof(aSz), &rc);
fsAppendData(pFS, pPg->pSeg, pFS->aOBuffer, pPg->nCompress, &rc);
fsAppendData(pFS, pPg->pSeg, aSz, sizeof(aSz), &rc);
iHash = fsHashKey(pFS->nHash, pPg->iPg);
pPg->pHashNext = pFS->apHash[iHash];
pFS->apHash[iHash] = pPg;
pPg->pSeg->nSize += (sizeof(aSz) * 2) + pPg->nCompress;
pPg->flags &= ~PAGE_DIRTY;
pFS->nWrite++;
}else{
if( pPg->iPg==0 ){
Page **pp;
int iPrev = 0;
int iNext = 0;
int iHash;
assert( pPg->pSeg->iFirst );
assert( pPg->flags & PAGE_FREE );
assert( (pPg->flags & PAGE_HASPREV)==0 );
assert( pPg->nData==pFS->nPagesize-4 );
rc = fsAppendPage(pFS, pPg->pSeg, &pPg->iPg, &iPrev, &iNext);
if( rc!=LSM_OK ) return rc;
assert( pPg->flags & PAGE_FREE );
iHash = fsHashKey(pFS->nHash, pPg->iPg);
fsRemoveHashEntry(pFS, pPg->iPg);
pPg->pHashNext = pFS->apHash[iHash];
pFS->apHash[iHash] = pPg;
assert( pPg->pHashNext==0 || pPg->pHashNext->iPg!=pPg->iPg );
if( iPrev ){
assert( iNext==0 );
memmove(&pPg->aData[4], pPg->aData, pPg->nData);
lsmPutU32(pPg->aData, iPrev);
pPg->flags |= PAGE_HASPREV;
pPg->aData += 4;
}else if( iNext ){
assert( iPrev==0 );
lsmPutU32(&pPg->aData[pPg->nData], iNext);
}else{
int nData = pPg->nData;
pPg->nData += 4;
lsmSortedExpandBtreePage(pPg, nData);
}
pPg->nRef++;
for(pp=&pFS->pWaiting; *pp; pp=&(*pp)->pWaitingNext);
*pp = pPg;
assert( pPg->pWaitingNext==0 );
}else{
i64 iOff;
iOff = (i64)pFS->nPagesize * (i64)(pPg->iPg-1);
if( fsMmapPage(pFS, pPg->iPg)==0 ){
u8 *aData = pPg->aData - (pPg->flags & PAGE_HASPREV);
rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, aData, pFS->nPagesize);
}else if( pPg->flags & PAGE_FREE ){
fsGrowMapping(pFS, iOff + pFS->nPagesize, &rc);
if( rc==LSM_OK ){
u8 *aTo = &((u8 *)(pFS->pMap))[iOff];
u8 *aFrom = pPg->aData - (pPg->flags & PAGE_HASPREV);
memcpy(aTo, aFrom, pFS->nPagesize);
lsmFree(pFS->pEnv, aFrom);
pFS->nCacheAlloc--;
pPg->aData = aTo + (pPg->flags & PAGE_HASPREV);
pPg->flags &= ~PAGE_FREE;
fsPageRemoveFromHash(pFS, pPg);
pPg->pMappedNext = pFS->pMapped;
pFS->pMapped = pPg;
}
}
lsmFsFlushWaiting(pFS, &rc);
pPg->flags &= ~PAGE_DIRTY;
pFS->nWrite++;
}
}
}
return rc;
}
int lsmFsSortedPadding(
FileSystem *pFS,
Snapshot *pSnapshot,
Segment *pSeg
){
int rc = LSM_OK;
if( pFS->pCompress && pSeg->iFirst ){
LsmPgno iLast2;
LsmPgno iLast = pSeg->iLastPg;
int nPad;
u8 aSz[3];
iLast2 = (1 + iLast/pFS->szSector) * pFS->szSector - 1;
assert( fsPageToBlock(pFS, iLast)==fsPageToBlock(pFS, iLast2) );
nPad = (int)(iLast2 - iLast);
if( iLast2>fsLastPageOnPagesBlock(pFS, iLast) ){
nPad -= 4;
}
assert( nPad>=0 );
if( nPad>=6 ){
pSeg->nSize += nPad;
nPad -= 6;
putRecordSize(aSz, nPad, 1);
fsAppendData(pFS, pSeg, aSz, sizeof(aSz), &rc);
memset(pFS->aOBuffer, 0, nPad);
fsAppendData(pFS, pSeg, pFS->aOBuffer, nPad, &rc);
fsAppendData(pFS, pSeg, aSz, sizeof(aSz), &rc);
}else if( nPad>0 ){
u8 aBuf[5] = {0,0,0,0,0};
aBuf[0] = (u8)nPad;
aBuf[nPad-1] = (u8)nPad;
fsAppendData(pFS, pSeg, aBuf, nPad, &rc);
}
assert( rc!=LSM_OK
|| pSeg->iLastPg==fsLastPageOnPagesBlock(pFS, pSeg->iLastPg)
|| ((pSeg->iLastPg + 1) % pFS->szSector)==0
);
}
return rc;
}
void lsmFsPageRef(Page *pPg){
if( pPg ){
pPg->nRef++;
}
}
int lsmFsPageRelease(Page *pPg){
int rc = LSM_OK;
if( pPg ){
assert( pPg->nRef>0 );
pPg->nRef--;
if( pPg->nRef==0 ){
FileSystem *pFS = pPg->pFS;
rc = lsmFsPagePersist(pPg);
pFS->nOut--;
assert( pPg->pFS->pCompress
|| fsIsFirst(pPg->pFS, pPg->iPg)==0
|| (pPg->flags & PAGE_HASPREV)
);
pPg->aData -= (pPg->flags & PAGE_HASPREV);
pPg->flags &= ~PAGE_HASPREV;
if( (pPg->flags & PAGE_FREE)==0 ){
Page **pp;
for(pp=&pFS->pMapped; (*pp)!=pPg; pp=&(*pp)->pMappedNext);
*pp = pPg->pMappedNext;
pPg->pMappedNext = 0;
pPg->pFreeNext = pFS->pFree;
pFS->pFree = pPg;
}else{
fsPageAddToLru(pFS, pPg);
}
}
}
return rc;
}
int lsmFsNRead(FileSystem *pFS){ return pFS->nRead; }
int lsmFsNWrite(FileSystem *pFS){ return pFS->nWrite; }
lsm_env *lsmFsEnv(FileSystem *pFS){
return pFS->pEnv;
}
lsm_env *lsmPageEnv(Page *pPg) {
return pPg->pFS->pEnv;
}
FileSystem *lsmPageFS(Page *pPg){
return pPg->pFS;
}
int lsmFsSectorSize(FileSystem *pFS){
return pFS->szSector;
}
static Segment *startsWith(Segment *pRun, LsmPgno iFirst){
return (iFirst==pRun->iFirst) ? pRun : 0;
}
static Segment *findSegment(Snapshot *pWorker, LsmPgno iFirst){
Level *pLvl;
Segment *pSeg = 0;
for(pLvl=lsmDbSnapshotLevel(pWorker); pLvl && pSeg==0; pLvl=pLvl->pNext){
if( 0==(pSeg = startsWith(&pLvl->lhs, iFirst)) ){
int i;
for(i=0; i<pLvl->nRight; i++){
if( (pSeg = startsWith(&pLvl->aRhs[i], iFirst)) ) break;
}
}
}
return pSeg;
}
int lsmInfoArrayStructure(
lsm_db *pDb,
int bBlock,
LsmPgno iFirst,
char **pzOut
){
int rc = LSM_OK;
Snapshot *pWorker;
Segment *pArray = 0;
int bUnlock = 0;
*pzOut = 0;
if( iFirst==0 ) return LSM_ERROR;
pWorker = pDb->pWorker;
if( !pWorker ){
rc = lsmBeginWork(pDb);
if( rc!=LSM_OK ) return rc;
pWorker = pDb->pWorker;
bUnlock = 1;
}
pArray = findSegment(pWorker, iFirst);
if( pArray==0 ){
rc = LSM_ERROR;
}else{
FileSystem *pFS = pDb->pFS;
LsmString str;
int iBlk;
int iLastBlk;
iBlk = fsPageToBlock(pFS, pArray->iFirst);
iLastBlk = fsPageToBlock(pFS, pArray->iLastPg);
lsmStringInit(&str, pDb->pEnv);
if( bBlock ){
lsmStringAppendf(&str, "%d", iBlk);
while( iBlk!=iLastBlk ){
fsBlockNext(pFS, pArray, iBlk, &iBlk);
lsmStringAppendf(&str, " %d", iBlk);
}
}else{
lsmStringAppendf(&str, "%d", pArray->iFirst);
while( iBlk!=iLastBlk ){
lsmStringAppendf(&str, " %d", fsLastPageOnBlock(pFS, iBlk));
fsBlockNext(pFS, pArray, iBlk, &iBlk);
lsmStringAppendf(&str, " %d", fsFirstPageOnBlock(pFS, iBlk));
}
lsmStringAppendf(&str, " %d", pArray->iLastPg);
}
*pzOut = str.z;
}
if( bUnlock ){
int rcwork = LSM_BUSY;
lsmFinishWork(pDb, 0, &rcwork);
}
return rc;
}
int lsmFsSegmentContainsPg(
FileSystem *pFS,
Segment *pSeg,
LsmPgno iPg,
int *pbRes
){
Redirect *pRedir = pSeg->pRedirect;
int rc = LSM_OK;
int iBlk;
int iLastBlk;
int iPgBlock;
iPgBlock = fsPageToBlock(pFS, pSeg->iFirst);
iBlk = fsRedirectBlock(pRedir, fsPageToBlock(pFS, pSeg->iFirst));
iLastBlk = fsRedirectBlock(pRedir, fsPageToBlock(pFS, pSeg->iLastPg));
while( iBlk!=iLastBlk && iBlk!=iPgBlock && rc==LSM_OK ){
rc = fsBlockNext(pFS, pSeg, iBlk, &iBlk);
}
*pbRes = (iBlk==iPgBlock);
return rc;
}
int lsmInfoArrayPages(lsm_db *pDb, LsmPgno iFirst, char **pzOut){
int rc = LSM_OK;
Snapshot *pWorker;
Segment *pSeg = 0;
int bUnlock = 0;
*pzOut = 0;
if( iFirst==0 ) return LSM_ERROR;
pWorker = pDb->pWorker;
if( !pWorker ){
rc = lsmBeginWork(pDb);
if( rc!=LSM_OK ) return rc;
pWorker = pDb->pWorker;
bUnlock = 1;
}
pSeg = findSegment(pWorker, iFirst);
if( pSeg==0 ){
rc = LSM_ERROR;
}else{
Page *pPg = 0;
FileSystem *pFS = pDb->pFS;
LsmString str;
lsmStringInit(&str, pDb->pEnv);
rc = lsmFsDbPageGet(pFS, pSeg, iFirst, &pPg);
while( rc==LSM_OK && pPg ){
Page *pNext = 0;
lsmStringAppendf(&str, " %lld", lsmFsPageNumber(pPg));
rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
lsmFsPageRelease(pPg);
pPg = pNext;
}
if( rc!=LSM_OK ){
lsmFree(pDb->pEnv, str.z);
}else{
*pzOut = str.z;
}
}
if( bUnlock ){
int rcwork = LSM_BUSY;
lsmFinishWork(pDb, 0, &rcwork);
}
return rc;
}
#define INTEGRITY_CHECK_FIRST_PG 0x01
#define INTEGRITY_CHECK_LAST_PG 0x02
#define INTEGRITY_CHECK_USED 0x04
#define INTEGRITY_CHECK_FREE 0x08
static void checkBlocks(
FileSystem *pFS,
Segment *pSeg,
int bExtra,
int nUsed,
u8 *aUsed
){
if( pSeg ){
if( pSeg && pSeg->nSize>0 ){
int rc;
int iBlk;
int iLastBlk;
int iFirstBlk;
int bLastIsLastOnBlock;
assert( 0==fsSegmentRedirects(pFS, pSeg) );
iBlk = iFirstBlk = fsPageToBlock(pFS, pSeg->iFirst);
iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg);
bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==pSeg->iLastPg);
assert( iBlk>0 );
do {
aUsed[iBlk-1] |= INTEGRITY_CHECK_USED;
if( fsFirstPageOnBlock(pFS, iBlk)==pSeg->iFirst || iBlk!=iFirstBlk ){
assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_FIRST_PG)==0 );
aUsed[iBlk-1] |= INTEGRITY_CHECK_FIRST_PG;
}
if( iBlk!=iLastBlk || bLastIsLastOnBlock ){
assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_LAST_PG)==0 );
aUsed[iBlk-1] |= INTEGRITY_CHECK_LAST_PG;
}
if( iBlk==iLastBlk && bLastIsLastOnBlock && bExtra ){
int iExtra = 0;
rc = fsBlockNext(pFS, pSeg, iBlk, &iExtra);
assert( rc==LSM_OK );
assert( aUsed[iExtra-1]==0 );
aUsed[iExtra-1] |= INTEGRITY_CHECK_USED;
aUsed[iExtra-1] |= INTEGRITY_CHECK_FIRST_PG;
aUsed[iExtra-1] |= INTEGRITY_CHECK_LAST_PG;
}
if( iBlk==iLastBlk ){
iBlk = 0;
}else{
rc = fsBlockNext(pFS, pSeg, iBlk, &iBlk);
assert( rc==LSM_OK );
}
}while( iBlk );
}
}
}
typedef struct CheckFreelistCtx CheckFreelistCtx;
struct CheckFreelistCtx {
u8 *aUsed;
int nBlock;
};
static int checkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
CheckFreelistCtx *p = (CheckFreelistCtx *)pCtx;
assert( iBlk>=1 );
assert( iBlk<=p->nBlock );
assert( p->aUsed[iBlk-1]==0 );
p->aUsed[iBlk-1] = INTEGRITY_CHECK_FREE;
return 0;
}
int lsmFsIntegrityCheck(lsm_db *pDb){
CheckFreelistCtx ctx;
FileSystem *pFS = pDb->pFS;
int i;
int rc;
Freelist freelist = {0, 0, 0};
u8 *aUsed;
Level *pLevel;
Snapshot *pWorker = pDb->pWorker;
int nBlock = pWorker->nBlock;
#if 0#endif
aUsed = lsmMallocZero(pDb->pEnv, nBlock);
if( aUsed==0 ){
return 1;
}
for(pLevel=pWorker->pLevel; pLevel; pLevel=pLevel->pNext){
int j;
checkBlocks(pFS, &pLevel->lhs, (pLevel->nRight!=0), nBlock, aUsed);
for(j=0; j<pLevel->nRight; j++){
checkBlocks(pFS, &pLevel->aRhs[j], 0, nBlock, aUsed);
}
}
ctx.aUsed = aUsed;
ctx.nBlock = nBlock;
rc = lsmWalkFreelist(pDb, 0, checkFreelistCb, (void *)&ctx);
if( rc==LSM_OK ){
for(i=0; i<nBlock; i++) assert( aUsed[i]!=0 );
}
lsmFree(pDb->pEnv, aUsed);
lsmFree(pDb->pEnv, freelist.aEntry);
return 1;
}
#ifndef NDEBUG
int lsmFsDbPageIsLast(Segment *pSeg, Page *pPg){
if( pPg->pFS->pCompress ){
LsmPgno iNext = 0;
int rc;
rc = fsNextPageOffset(pPg->pFS, pSeg, pPg->iPg, pPg->nCompress+6, &iNext);
return (rc!=LSM_OK || iNext==0);
}
return (pPg->iPg==pSeg->iLastPg);
}
#endif