#include "lsmInt.h"
static struct SharedData {
Database *pDatabase;
} gShared;
struct Database {
char *zName;
int nName;
int nDbRef;
Database *pDbNext;
int bReadonly;
int bMultiProc;
lsm_file *pFile;
LsmFile *pLsmFile;
lsm_mutex *pClientMutex;
int nShmChunk;
void **apShmChunk;
lsm_db *pConn;
};
static int enterGlobalMutex(lsm_env *pEnv){
lsm_mutex *p;
int rc = lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
if( rc==LSM_OK ) lsmMutexEnter(pEnv, p);
return rc;
}
static void leaveGlobalMutex(lsm_env *pEnv){
lsm_mutex *p;
lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
lsmMutexLeave(pEnv, p);
}
#ifdef LSM_DEBUG
static int holdingGlobalMutex(lsm_env *pEnv){
lsm_mutex *p;
lsmMutexStatic(pEnv, LSM_MUTEX_GLOBAL, &p);
return lsmMutexHeld(pEnv, p);
}
#endif
#if 0#else
# define assertNotInFreelist(x,y)
#endif
int freelistAppend(lsm_db *db, u32 iBlk, i64 iId){
lsm_env *pEnv = db->pEnv;
Freelist *p;
int i;
assert( iId==-1 || iId>=0 );
p = db->bUseFreelist ? db->pFreelist : &db->pWorker->freelist;
assert( p->nAlloc>=p->nEntry );
if( p->nAlloc==p->nEntry ){
int nNew;
int nByte;
FreelistEntry *aNew;
nNew = (p->nAlloc==0 ? 4 : p->nAlloc*2);
nByte = sizeof(FreelistEntry) * nNew;
aNew = (FreelistEntry *)lsmRealloc(pEnv, p->aEntry, nByte);
if( !aNew ) return LSM_NOMEM_BKPT;
p->nAlloc = nNew;
p->aEntry = aNew;
}
for(i=0; i<p->nEntry; i++){
assert( i==0 || p->aEntry[i].iBlk > p->aEntry[i-1].iBlk );
if( p->aEntry[i].iBlk>=iBlk ) break;
}
if( i<p->nEntry && p->aEntry[i].iBlk==iBlk ){
p->aEntry[i].iId = iId;
}else{
int nByte = sizeof(FreelistEntry)*(p->nEntry-i);
memmove(&p->aEntry[i+1], &p->aEntry[i], nByte);
p->aEntry[i].iBlk = iBlk;
p->aEntry[i].iId = iId;
p->nEntry++;
}
return LSM_OK;
}
static void freeDatabase(lsm_env *pEnv, Database *p){
assert( holdingGlobalMutex(pEnv) );
if( p ){
lsmMutexDel(pEnv, p->pClientMutex);
if( p->pFile ){
lsmEnvClose(pEnv, p->pFile);
}
lsmFree(pEnv, p->apShmChunk);
lsmFree(pEnv, p);
}
}
typedef struct DbTruncateCtx DbTruncateCtx;
struct DbTruncateCtx {
int nBlock;
i64 iInUse;
};
static int dbTruncateCb(void *pCtx, int iBlk, i64 iSnapshot){
DbTruncateCtx *p = (DbTruncateCtx *)pCtx;
if( iBlk!=p->nBlock || (p->iInUse>=0 && iSnapshot>=p->iInUse) ) return 1;
p->nBlock--;
return 0;
}
static int dbTruncate(lsm_db *pDb, i64 iInUse){
int rc = LSM_OK;
#if 0#endif
return rc;
}
static int dbTruncateFile(lsm_db *pDb){
int rc;
assert( pDb->pWorker==0 );
assert( lsmShmAssertLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL) );
rc = lsmCheckpointLoadWorker(pDb);
if( rc==LSM_OK ){
DbTruncateCtx ctx;
ctx.nBlock = pDb->pWorker->nBlock;
ctx.iInUse = -1;
rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx);
if( rc==LSM_OK ){
rc = lsmFsTruncateDb(
pDb->pFS, (i64)ctx.nBlock*lsmFsBlockSize(pDb->pFS)
);
}
}
lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
pDb->pWorker = 0;
return rc;
}
static void doDbDisconnect(lsm_db *pDb){
int rc;
if( pDb->bReadonly ){
lsmShmLock(pDb, LSM_LOCK_DMS3, LSM_LOCK_UNLOCK, 0);
}else{
rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
if( rc==LSM_OK ){
lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0);
rc = lsmShmTestLock(pDb, LSM_LOCK_DMS2, 1, LSM_LOCK_EXCL);
if( rc==LSM_OK ){
rc = lsmShmTestLock(pDb, LSM_LOCK_CHECKPOINTER, 1, LSM_LOCK_EXCL);
}
if( rc==LSM_OK ){
int bReadonly = 0;
rc = lsmTreeLoadHeader(pDb, 0);
if( rc==LSM_OK && (lsmTreeHasOld(pDb) || lsmTreeSize(pDb)>0) ){
rc = lsmFlushTreeToDisk(pDb);
}
if( rc==LSM_OK ){
rc = lsmShmTestLock(pDb, LSM_LOCK_DMS3, 1, LSM_LOCK_EXCL);
if( rc==LSM_BUSY ){
bReadonly = 1;
rc = LSM_OK;
}
}
if( rc==LSM_OK ){
rc = lsmCheckpointWrite(pDb, 0);
}
if( rc==LSM_OK ){
int bRotrans = 0;
Database *p = pDb->pDatabase;
rc = lsmDetectRoTrans(pDb, &bRotrans);
if( rc==LSM_OK && bRotrans==0 ){
lsmFsCloseAndDeleteLog(pDb->pFS);
}
if( bReadonly==0 && bRotrans==0 ){
lsmFsUnmap(pDb->pFS);
dbTruncateFile(pDb);
if( p->pFile && p->bMultiProc ){
lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1);
}
}
}
}
}
if( pDb->iRwclient>=0 ){
lsmShmLock(pDb, LSM_LOCK_RWCLIENT(pDb->iRwclient), LSM_LOCK_UNLOCK, 0);
pDb->iRwclient = -1;
}
lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
}
pDb->pShmhdr = 0;
}
static int doDbConnect(lsm_db *pDb){
const int nUsMax = 100000;
int nUs = 1000;
int rc;
assert( pDb->pShmhdr==0 );
assert( pDb->bReadonly==0 );
while( 1 ){
rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
if( rc!=LSM_BUSY ) break;
lsmEnvSleep(pDb->pEnv, nUs);
nUs = nUs * 2;
if( nUs>nUsMax ) nUs = nUsMax;
}
if( rc==LSM_OK ){
rc = lsmShmCacheChunks(pDb, 1);
}
if( rc!=LSM_OK ) return rc;
pDb->pShmhdr = (ShmHeader *)pDb->apShm[0];
assert( LSM_LOCK_DMS3==1+LSM_LOCK_DMS2 );
rc = lsmShmTestLock(pDb, LSM_LOCK_DMS2, 2, LSM_LOCK_EXCL);
if( rc==LSM_OK ){
memset(pDb->pShmhdr, 0, sizeof(ShmHeader));
rc = lsmCheckpointRecover(pDb);
if( rc==LSM_OK ){
rc = lsmLogRecover(pDb);
}
if( rc==LSM_OK ){
ShmHeader *pShm = pDb->pShmhdr;
pShm->aReader[0].iLsmId = lsmCheckpointId(pShm->aSnap1, 0);
pShm->aReader[0].iTreeId = pDb->treehdr.iUsedShmid;
}
}else if( rc==LSM_BUSY ){
rc = LSM_OK;
}
if( rc==LSM_OK ){
rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_SHARED, 0);
}
if( rc!=LSM_OK ){
pDb->pShmhdr = 0;
}else{
int i;
for(i=0; i<LSM_LOCK_NRWCLIENT; i++){
int rc2 = lsmShmLock(pDb, LSM_LOCK_RWCLIENT(i), LSM_LOCK_EXCL, 0);
if( rc2==LSM_OK ) pDb->iRwclient = i;
if( rc2!=LSM_BUSY ){
rc = rc2;
break;
}
}
}
lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
return rc;
}
static int dbOpenSharedFd(lsm_env *pEnv, Database *p, int bRoOk){
int rc;
rc = lsmEnvOpen(pEnv, p->zName, 0, &p->pFile);
if( rc==LSM_IOERR && bRoOk ){
rc = lsmEnvOpen(pEnv, p->zName, LSM_OPEN_READONLY, &p->pFile);
p->bReadonly = 1;
}
return rc;
}
int lsmDbDatabaseConnect(
lsm_db *pDb,
const char *zName
){
lsm_env *pEnv = pDb->pEnv;
int rc;
Database *p = 0;
int nName = lsmStrlen(zName);
assert( pDb->pDatabase==0 );
rc = enterGlobalMutex(pEnv);
if( rc==LSM_OK ){
for(p=gShared.pDatabase; p; p=p->pDbNext){
if( nName==p->nName && 0==memcmp(zName, p->zName, nName) ) break;
}
if( p==0 ){
p = (Database *)lsmMallocZeroRc(pEnv, sizeof(Database)+nName+1, &rc);
if( rc==LSM_OK ){
p->bMultiProc = pDb->bMultiProc;
p->zName = (char *)&p[1];
p->nName = nName;
memcpy((void *)p->zName, zName, nName+1);
rc = lsmMutexNew(pEnv, &p->pClientMutex);
}
if( rc==LSM_OK ){
int bReadonly = (pDb->bReadonly && pDb->bMultiProc);
rc = dbOpenSharedFd(pDb->pEnv, p, bReadonly);
}
if( rc==LSM_OK && p->bMultiProc==0 ){
assert( p->bReadonly==0 );
rc = lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS1, LSM_LOCK_EXCL);
if( rc==LSM_OK ){
rc = lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS2, LSM_LOCK_EXCL);
lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK);
}
}
if( rc==LSM_OK ){
p->pDbNext = gShared.pDatabase;
gShared.pDatabase = p;
}else{
freeDatabase(pEnv, p);
p = 0;
}
}
if( p ){
p->nDbRef++;
}
leaveGlobalMutex(pEnv);
if( p ){
lsmMutexEnter(pDb->pEnv, p->pClientMutex);
pDb->pNext = p->pConn;
p->pConn = pDb;
lsmMutexLeave(pDb->pEnv, p->pClientMutex);
}
}
pDb->pDatabase = p;
if( rc==LSM_OK ){
assert( p );
rc = lsmFsOpen(pDb, zName, p->bReadonly);
}
if( rc==LSM_OK ){
rc = lsmFsConfigure(pDb);
}
if( rc==LSM_OK && pDb->bReadonly==0 ){
rc = doDbConnect(pDb);
}
return rc;
}
static void dbDeferClose(lsm_db *pDb){
if( pDb->pFS ){
LsmFile *pLsmFile;
Database *p = pDb->pDatabase;
pLsmFile = lsmFsDeferClose(pDb->pFS);
pLsmFile->pNext = p->pLsmFile;
p->pLsmFile = pLsmFile;
}
}
LsmFile *lsmDbRecycleFd(lsm_db *db){
LsmFile *pRet;
Database *p = db->pDatabase;
lsmMutexEnter(db->pEnv, p->pClientMutex);
if( (pRet = p->pLsmFile)!=0 ){
p->pLsmFile = pRet->pNext;
}
lsmMutexLeave(db->pEnv, p->pClientMutex);
return pRet;
}
void lsmDbDatabaseRelease(lsm_db *pDb){
Database *p = pDb->pDatabase;
if( p ){
lsm_db **ppDb;
if( pDb->pShmhdr ){
doDbDisconnect(pDb);
}
lsmFsUnmap(pDb->pFS);
lsmMutexEnter(pDb->pEnv, p->pClientMutex);
for(ppDb=&p->pConn; *ppDb!=pDb; ppDb=&((*ppDb)->pNext));
*ppDb = pDb->pNext;
dbDeferClose(pDb);
lsmMutexLeave(pDb->pEnv, p->pClientMutex);
enterGlobalMutex(pDb->pEnv);
p->nDbRef--;
if( p->nDbRef==0 ){
LsmFile *pIter;
LsmFile *pNext;
Database **pp;
for(pp=&gShared.pDatabase; *pp!=p; pp=&((*pp)->pDbNext));
*pp = p->pDbNext;
if( p->bMultiProc==0 ){
int i;
for(i=0; i<p->nShmChunk; i++){
lsmFree(pDb->pEnv, p->apShmChunk[i]);
}
}
for(pIter=p->pLsmFile; pIter; pIter=pNext){
pNext = pIter->pNext;
lsmEnvClose(pDb->pEnv, pIter->pFile);
lsmFree(pDb->pEnv, pIter);
}
freeDatabase(pDb->pEnv, p);
}
leaveGlobalMutex(pDb->pEnv);
}
}
Level *lsmDbSnapshotLevel(Snapshot *pSnapshot){
return pSnapshot->pLevel;
}
void lsmDbSnapshotSetLevel(Snapshot *pSnap, Level *pLevel){
pSnap->pLevel = pLevel;
}
static int firstSnapshotInUse(lsm_db *, i64 *);
typedef struct WalkFreelistCtx WalkFreelistCtx;
struct WalkFreelistCtx {
lsm_db *pDb;
int bReverse;
Freelist *pFreelist;
int iFree;
int (*xUsr)(void *, int, i64);
void *pUsrctx;
int bDone;
};
static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
const int iDir = (p->bReverse ? -1 : 1);
Freelist *pFree = p->pFreelist;
assert( p->bDone==0 );
assert( iBlk>=0 );
if( pFree ){
while( (p->iFree < pFree->nEntry) && p->iFree>=0 ){
FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
if( (p->bReverse==0 && pEntry->iBlk>(u32)iBlk)
|| (p->bReverse!=0 && pEntry->iBlk<(u32)iBlk)
){
break;
}else{
p->iFree += iDir;
if( pEntry->iId>=0
&& p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId)
){
p->bDone = 1;
return 1;
}
if( pEntry->iBlk==(u32)iBlk ) return 0;
}
}
}
if( p->xUsr(p->pUsrctx, iBlk, iSnapshot) ){
p->bDone = 1;
return 1;
}
return 0;
}
int lsmWalkFreelist(
lsm_db *pDb,
int bReverse,
int (*x)(void *, int, i64),
void *pCtx
){
const int iDir = (bReverse ? -1 : 1);
int rc;
int iCtx;
WalkFreelistCtx ctx[2];
ctx[0].pDb = pDb;
ctx[0].bReverse = bReverse;
ctx[0].pFreelist = &pDb->pWorker->freelist;
if( ctx[0].pFreelist && bReverse ){
ctx[0].iFree = ctx[0].pFreelist->nEntry-1;
}else{
ctx[0].iFree = 0;
}
ctx[0].xUsr = walkFreelistCb;
ctx[0].pUsrctx = (void *)&ctx[1];
ctx[0].bDone = 0;
ctx[1].pDb = pDb;
ctx[1].bReverse = bReverse;
ctx[1].pFreelist = pDb->pFreelist;
if( ctx[1].pFreelist && bReverse ){
ctx[1].iFree = ctx[1].pFreelist->nEntry-1;
}else{
ctx[1].iFree = 0;
}
ctx[1].xUsr = x;
ctx[1].pUsrctx = pCtx;
ctx[1].bDone = 0;
rc = lsmSortedWalkFreelist(pDb, bReverse, walkFreelistCb, (void *)&ctx[0]);
if( ctx[0].bDone==0 ){
for(iCtx=0; iCtx<2; iCtx++){
int i;
WalkFreelistCtx *p = &ctx[iCtx];
for(i=p->iFree;
p->pFreelist && rc==LSM_OK && i<p->pFreelist->nEntry && i>=0;
i += iDir
){
FreelistEntry *pEntry = &p->pFreelist->aEntry[i];
if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){
return LSM_OK;
}
}
}
}
return rc;
}
typedef struct FindFreeblockCtx FindFreeblockCtx;
struct FindFreeblockCtx {
i64 iInUse;
int iRet;
int bNotOne;
};
static int findFreeblockCb(void *pCtx, int iBlk, i64 iSnapshot){
FindFreeblockCtx *p = (FindFreeblockCtx *)pCtx;
if( iSnapshot<p->iInUse && (iBlk!=1 || p->bNotOne==0) ){
p->iRet = iBlk;
return 1;
}
return 0;
}
static int findFreeblock(lsm_db *pDb, i64 iInUse, int bNotOne, int *piRet){
int rc;
FindFreeblockCtx ctx;
ctx.iInUse = iInUse;
ctx.iRet = 0;
ctx.bNotOne = bNotOne;
rc = lsmWalkFreelist(pDb, 0, findFreeblockCb, (void *)&ctx);
*piRet = ctx.iRet;
return rc;
}
int lsmBlockAllocate(lsm_db *pDb, int iBefore, int *piBlk){
Snapshot *p = pDb->pWorker;
int iRet = 0;
int rc = LSM_OK;
i64 iInUse = 0;
i64 iSynced = 0;
assert( p );
#ifdef LSM_LOG_FREELIST
{
static int nCall = 0;
char *zFree = 0;
nCall++;
rc = lsmInfoFreelist(pDb, &zFree);
if( rc!=LSM_OK ) return rc;
lsmLogMessage(pDb, 0, "lsmBlockAllocate(): %d freelist: %s", nCall, zFree);
lsmFree(pDb->pEnv, zFree);
}
#endif
rc = lsmCheckpointSynced(pDb, &iSynced, 0, 0);
if( rc==LSM_OK && iSynced==0 ) iSynced = p->iId;
iInUse = iSynced;
if( rc==LSM_OK && pDb->iReader>=0 ){
assert( pDb->pClient );
iInUse = LSM_MIN(iInUse, pDb->pClient->iId);
}
if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse);
#ifdef LSM_LOG_FREELIST
{
lsmLogMessage(pDb, 0, "lsmBlockAllocate(): "
"snapshot-in-use: %lld (iSynced=%lld) (client-id=%lld)",
iInUse, iSynced, (pDb->iReader>=0 ? pDb->pClient->iId : 0)
);
}
#endif
if( rc==LSM_OK ){
int bRotrans;
rc = lsmDetectRoTrans(pDb, &bRotrans);
if( rc==LSM_OK && bRotrans==0 ){
rc = findFreeblock(pDb, iInUse, (iBefore>0), &iRet);
}
}
if( iBefore>0 && (iRet<=0 || iRet>=iBefore) ){
iRet = 0;
}else if( rc==LSM_OK ){
if( iRet>0 ){
#ifdef LSM_LOG_FREELIST
lsmLogMessage(pDb, 0,
"reusing block %d (snapshot-in-use=%lld)", iRet, iInUse);
#endif
rc = freelistAppend(pDb, iRet, -1);
if( rc==LSM_OK ){
rc = dbTruncate(pDb, iInUse);
}
}else{
iRet = ++(p->nBlock);
#ifdef LSM_LOG_FREELIST
lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
#endif
}
}
assert( iBefore>0 || iRet>0 || rc!=LSM_OK );
*piBlk = iRet;
return rc;
}
int lsmBlockFree(lsm_db *pDb, int iBlk){
Snapshot *p = pDb->pWorker;
assert( lsmShmAssertWorker(pDb) );
#ifdef LSM_LOG_FREELIST
lsmLogMessage(pDb, LSM_OK, "lsmBlockFree(): Free block %d", iBlk);
#endif
return freelistAppend(pDb, iBlk, p->iId);
}
int lsmBlockRefree(lsm_db *pDb, int iBlk){
int rc = LSM_OK;
#ifdef LSM_LOG_FREELIST
lsmLogMessage(pDb, LSM_OK, "lsmBlockRefree(): Refree block %d", iBlk);
#endif
rc = freelistAppend(pDb, iBlk, 0);
return rc;
}
int lsmCheckpointWrite(lsm_db *pDb, u32 *pnWrite){
int rc;
u32 nWrite = 0;
assert( pDb->pWorker==0 );
assert( 1 || pDb->pClient==0 );
assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );
rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL, 0);
if( rc!=LSM_OK ) return rc;
rc = lsmCheckpointLoad(pDb, 0);
if( rc==LSM_OK ){
int nBlock = lsmCheckpointNBlock(pDb->aSnapshot);
ShmHeader *pShm = pDb->pShmhdr;
int bDone = 0;
if( pShm->iMetaPage ){
MetaPage *pPg;
u8 *aData;
int nData;
i64 iCkpt;
i64 iDisk = 0;
iCkpt = lsmCheckpointId(pDb->aSnapshot, 0);
rc = lsmFsMetaPageGet(pDb->pFS, 0, pShm->iMetaPage, &pPg);
if( rc==LSM_OK ){
aData = lsmFsMetaPageData(pPg, &nData);
iDisk = lsmCheckpointId((u32 *)aData, 1);
nWrite = lsmCheckpointNWrite((u32 *)aData, 1);
lsmFsMetaPageRelease(pPg);
}
bDone = (iDisk>=iCkpt);
}
if( rc==LSM_OK && bDone==0 ){
int iMeta = (pShm->iMetaPage % 2) + 1;
if( pDb->eSafety!=LSM_SAFETY_OFF ){
rc = lsmFsSyncDb(pDb->pFS, nBlock);
}
if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
rc = lsmFsSyncDb(pDb->pFS, 0);
}
if( rc==LSM_OK ){
pShm->iMetaPage = iMeta;
nWrite = lsmCheckpointNWrite(pDb->aSnapshot, 0) - nWrite;
}
#ifdef LSM_LOG_WORK
lsmLogMessage(pDb, 0, "finish checkpoint %d",
(int)lsmCheckpointId(pDb->aSnapshot, 0)
);
#endif
}
}
lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
if( pnWrite && rc==LSM_OK ) *pnWrite = nWrite;
return rc;
}
int lsmBeginWork(lsm_db *pDb){
int rc;
rc = lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL, 0);
if( rc==LSM_OK ){
rc = lsmCheckpointLoadWorker(pDb);
}
return rc;
}
void lsmFreeSnapshot(lsm_env *pEnv, Snapshot *p){
if( p ){
lsmSortedFreeLevel(pEnv, p->pLevel);
lsmFree(pEnv, p->freelist.aEntry);
lsmFree(pEnv, p->redirect.a);
lsmFree(pEnv, p);
}
}
static int dbSetReadLock(lsm_db *db, i64 iLsm, u32 iShm){
int rc = LSM_OK;
ShmHeader *pShm = db->pShmhdr;
int i;
for(i=0; i<LSM_LOCK_NREADER; i++){
ShmReader *p = &pShm->aReader[i];
if( p->iLsmId==iLsm && p->iTreeId==iShm ) return LSM_OK;
}
for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
if( rc==LSM_BUSY ){
rc = LSM_OK;
}else{
ShmReader *p = &pShm->aReader[i];
p->iLsmId = iLsm;
p->iTreeId = iShm;
lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
break;
}
}
return rc;
}
int dbReleaseReadlock(lsm_db *db){
int rc = LSM_OK;
if( db->iReader>=0 ){
rc = lsmShmLock(db, LSM_LOCK_READER(db->iReader), LSM_LOCK_UNLOCK, 0);
db->iReader = -1;
}
db->bRoTrans = 0;
return rc;
}
void lsmFinishWork(lsm_db *pDb, int bFlush, int *pRc){
int rc = *pRc;
assert( rc!=0 || pDb->pWorker );
if( pDb->pWorker ){
if( rc==LSM_OK ){
rc = lsmSaveWorker(pDb, bFlush);
}
if( rc==LSM_OK ){
if( pDb->iReader<0 ){
rc = lsmTreeLoadHeader(pDb, 0);
}
if( rc==LSM_OK ){
rc = dbSetReadLock(pDb, pDb->pWorker->iId, pDb->treehdr.iUsedShmid);
}
}
lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
pDb->pWorker = 0;
}
lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
*pRc = rc;
}
int lsmFinishRecovery(lsm_db *pDb){
lsmTreeEndTransaction(pDb, 1);
return LSM_OK;
}
int lsmCheckCompressionId(lsm_db *pDb, u32 iReq){
if( iReq!=LSM_COMPRESSION_EMPTY && pDb->compress.iId!=iReq ){
if( pDb->factory.xFactory ){
pDb->bInFactory = 1;
pDb->factory.xFactory(pDb->factory.pCtx, pDb, iReq);
pDb->bInFactory = 0;
}
if( pDb->compress.iId!=iReq ){
return LSM_MISMATCH;
}
}
return LSM_OK;
}
int lsmBeginReadTrans(lsm_db *pDb){
const int MAX_READLOCK_ATTEMPTS = 10;
const int nMaxAttempt = (pDb->bRoTrans ? 1 : MAX_READLOCK_ATTEMPTS);
int rc = LSM_OK;
int iAttempt = 0;
assert( pDb->pWorker==0 );
while( rc==LSM_OK && pDb->iReader<0 && (iAttempt++)<nMaxAttempt ){
int iTreehdr = 0;
int iSnap = 0;
assert( pDb->pCsr==0 && pDb->nTransOpen==0 );
rc = lsmTreeLoadHeader(pDb, &iTreehdr);
if( rc==LSM_OK ){
if( lsmCheckpointClientCacheOk(pDb)==0 ){
lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
pDb->pClient = 0;
lsmMCursorFreeCache(pDb);
lsmFsPurgeCache(pDb->pFS);
rc = lsmCheckpointLoad(pDb, &iSnap);
}else{
iSnap = 1;
}
}
if( rc==LSM_OK ){
u32 iShmMax = pDb->treehdr.iUsedShmid;
u32 iShmMin = pDb->treehdr.iNextShmid+1-LSM_MAX_SHMCHUNKS;
rc = lsmReadlock(
pDb, lsmCheckpointId(pDb->aSnapshot, 0), iShmMin, iShmMax
);
if( rc==LSM_OK ){
if( lsmTreeLoadHeaderOk(pDb, iTreehdr)
&& lsmCheckpointLoadOk(pDb, iSnap)
){
if( pDb->pClient==0 ){
rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot,&pDb->pClient);
}
assert( (rc==LSM_OK)==(pDb->pClient!=0) );
assert( pDb->iReader>=0 );
if( rc==LSM_OK ){
rc = lsmCheckCompressionId(pDb, pDb->pClient->iCmpId);
}
}else{
rc = dbReleaseReadlock(pDb);
}
}
if( rc==LSM_BUSY ){
rc = LSM_OK;
}
}
#if 0#endif
}
if( rc==LSM_OK ){
rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk);
}
if( rc!=LSM_OK ){
dbReleaseReadlock(pDb);
}
if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY;
return rc;
}
int lsmDetectRoTrans(lsm_db *db, int *pbExist){
int rc;
assert( db->bReadonly==0 );
rc = lsmShmTestLock(db, LSM_LOCK_ROTRANS, 1, LSM_LOCK_EXCL);
if( rc==LSM_BUSY ){
*pbExist = 1;
rc = LSM_OK;
}else{
*pbExist = 0;
}
return rc;
}
int lsmBeginRoTrans(lsm_db *db){
int rc = LSM_OK;
assert( db->bReadonly && db->pShmhdr==0 );
assert( db->iReader<0 );
if( db->bRoTrans==0 ){
rc = lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_SHARED, 0);
if( rc!=LSM_OK ) return rc;
rc = lsmShmTestLock(
db, LSM_LOCK_RWCLIENT(0), LSM_LOCK_NREADER, LSM_LOCK_SHARED
);
if( rc==LSM_OK ){
rc = lsmShmLock(db, LSM_LOCK_ROTRANS, LSM_LOCK_SHARED, 0);
lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
if( rc==LSM_OK ){
db->bRoTrans = 1;
rc = lsmShmCacheChunks(db, 1);
if( rc==LSM_OK ){
db->pShmhdr = (ShmHeader *)db->apShm[0];
memset(db->pShmhdr, 0, sizeof(ShmHeader));
rc = lsmCheckpointRecover(db);
if( rc==LSM_OK ){
rc = lsmLogRecover(db);
}
}
}
}else if( rc==LSM_BUSY ){
rc = lsmShmLock(db, LSM_LOCK_DMS3, LSM_LOCK_SHARED, 0);
lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
if( rc==LSM_OK ){
rc = lsmShmCacheChunks(db, 1);
if( rc==LSM_OK ){
db->pShmhdr = (ShmHeader *)db->apShm[0];
}
}
}
if( LSM_OK==rc
&& 0==lsmCheckpointClientCacheOk(db)
&& LSM_OK==(rc=lsmCheckpointLoad(db, 0))
){
lsmFsSetPageSize(db->pFS, lsmCheckpointPgsz(db->aSnapshot));
lsmFsSetBlockSize(db->pFS, lsmCheckpointBlksz(db->aSnapshot));
}
if( rc==LSM_OK ){
rc = lsmBeginReadTrans(db);
}
}
return rc;
}
void lsmFinishReadTrans(lsm_db *pDb){
assert( pDb->pWorker==0 );
assert( pDb->pCsr==0 && pDb->nTransOpen==0 );
if( pDb->bRoTrans ){
int i;
for(i=0; i<pDb->nShm; i++){
lsmFree(pDb->pEnv, pDb->apShm[i]);
}
lsmFree(pDb->pEnv, pDb->apShm);
pDb->apShm = 0;
pDb->nShm = 0;
pDb->pShmhdr = 0;
lsmShmLock(pDb, LSM_LOCK_ROTRANS, LSM_LOCK_UNLOCK, 0);
}
dbReleaseReadlock(pDb);
}
int lsmBeginWriteTrans(lsm_db *pDb){
int rc = LSM_OK;
ShmHeader *pShm = pDb->pShmhdr;
assert( pDb->nTransOpen==0 );
assert( pDb->bDiscardOld==0 );
assert( pDb->bReadonly==0 );
if( pDb->iReader<0 ){
rc = lsmBeginReadTrans(pDb);
}
if( rc==LSM_OK ){
rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
}
if( rc==LSM_OK && pShm->bWriter ){
rc = lsmTreeRepair(pDb);
if( rc==LSM_OK ) pShm->bWriter = 0;
}
if( rc==LSM_OK && memcmp(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader)) ){
rc = LSM_BUSY;
}
if( rc==LSM_OK ){
rc = lsmLogBegin(pDb);
}
if( rc==LSM_OK ){
TreeHeader *p = &pDb->treehdr;
pShm->bWriter = 1;
p->root.iTransId++;
if( lsmTreeHasOld(pDb) && p->iOldLog==pDb->pClient->iLogOff ){
lsmTreeDiscardOld(pDb);
pDb->bDiscardOld = 1;
}
}else{
lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
if( pDb->pCsr==0 ) lsmFinishReadTrans(pDb);
}
return rc;
}
int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){
int rc = LSM_OK;
int bFlush = 0;
lsmLogEnd(pDb, bCommit);
if( rc==LSM_OK && bCommit && lsmTreeSize(pDb)>pDb->nTreeLimit ){
bFlush = 1;
lsmTreeMakeOld(pDb);
}
lsmTreeEndTransaction(pDb, bCommit);
if( rc==LSM_OK ){
if( bFlush && pDb->bAutowork ){
rc = lsmSortedAutoWork(pDb, 1);
}else if( bCommit && pDb->bDiscardOld ){
rc = dbSetReadLock(pDb, pDb->pClient->iId, pDb->treehdr.iUsedShmid);
}
}
pDb->bDiscardOld = 0;
lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
if( bFlush && pDb->bAutowork==0 && pDb->xWork ){
pDb->xWork(pDb, pDb->pWorkCtx);
}
return rc;
}
#ifdef LSM_DEBUG
int lsmHoldingClientMutex(lsm_db *pDb){
return lsmMutexHeld(pDb->pEnv, pDb->pDatabase->pClientMutex);
}
#endif
static int slotIsUsable(ShmReader *p, i64 iLsm, u32 iShmMin, u32 iShmMax){
return(
p->iLsmId && p->iLsmId<=iLsm
&& shm_sequence_ge(iShmMax, p->iTreeId)
&& shm_sequence_ge(p->iTreeId, iShmMin)
);
}
int lsmReadlock(lsm_db *db, i64 iLsm, u32 iShmMin, u32 iShmMax){
int rc = LSM_OK;
ShmHeader *pShm = db->pShmhdr;
int i;
assert( db->iReader<0 );
assert( shm_sequence_ge(iShmMax, iShmMin) );
if( db->bRoTrans ){
db->iReader = 0;
return LSM_OK;
}
for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
ShmReader *p = &pShm->aReader[i];
if( p->iLsmId==iLsm && p->iTreeId==iShmMax ){
rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
if( rc==LSM_OK && p->iLsmId==iLsm && p->iTreeId==iShmMax ){
db->iReader = i;
}else if( rc==LSM_BUSY ){
rc = LSM_OK;
}
}
}
for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
if( rc==LSM_BUSY ){
rc = LSM_OK;
}else{
ShmReader *p = &pShm->aReader[i];
p->iLsmId = iLsm;
p->iTreeId = iShmMax;
rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
assert( rc!=LSM_BUSY );
if( rc==LSM_OK ) db->iReader = i;
}
}
for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
ShmReader *p = &pShm->aReader[i];
if( slotIsUsable(p, iLsm, iShmMin, iShmMax) ){
rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
if( rc==LSM_OK && slotIsUsable(p, iLsm, iShmMin, iShmMax) ){
db->iReader = i;
}else if( rc==LSM_BUSY ){
rc = LSM_OK;
}
}
}
if( rc==LSM_OK && db->iReader<0 ){
rc = LSM_BUSY;
}
return rc;
}
static int isInUse(lsm_db *db, i64 iLsmId, u32 iShmid, int *pbInUse){
ShmHeader *pShm = db->pShmhdr;
int i;
int rc = LSM_OK;
for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
ShmReader *p = &pShm->aReader[i];
if( p->iLsmId ){
if( (iLsmId!=0 && p->iLsmId!=0 && iLsmId>=p->iLsmId)
|| (iLsmId==0 && shm_sequence_ge(p->iTreeId, iShmid))
){
rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
if( rc==LSM_OK ){
p->iLsmId = 0;
lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
}
}
}
}
if( rc==LSM_BUSY ){
*pbInUse = 1;
return LSM_OK;
}
*pbInUse = 0;
return rc;
}
static int firstSnapshotInUse(
lsm_db *db,
i64 *piInUse
){
ShmHeader *pShm = db->pShmhdr;
i64 iInUse = *piInUse;
int i;
assert( iInUse>0 );
for(i=0; i<LSM_LOCK_NREADER; i++){
ShmReader *p = &pShm->aReader[i];
if( p->iLsmId ){
i64 iThis = p->iLsmId;
if( iThis!=0 && iInUse>iThis ){
int rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
if( rc==LSM_OK ){
p->iLsmId = 0;
lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
}else if( rc==LSM_BUSY ){
iInUse = iThis;
}else{
return rc;
}
}
}
}
*piInUse = iInUse;
return LSM_OK;
}
int lsmTreeInUse(lsm_db *db, u32 iShmid, int *pbInUse){
if( db->treehdr.iUsedShmid==iShmid ){
*pbInUse = 1;
return LSM_OK;
}
return isInUse(db, 0, iShmid, pbInUse);
}
int lsmLsmInUse(lsm_db *db, i64 iLsmId, int *pbInUse){
if( db->pClient && db->pClient->iId<=iLsmId ){
*pbInUse = 1;
return LSM_OK;
}
return isInUse(db, iLsmId, 0, pbInUse);
}
int lsmDbMultiProc(lsm_db *pDb){
return pDb->pDatabase && pDb->pDatabase->bMultiProc;
}
int lsmShmCacheChunks(lsm_db *db, int nChunk){
int rc = LSM_OK;
if( nChunk>db->nShm ){
static const int NINCR = 16;
Database *p = db->pDatabase;
lsm_env *pEnv = db->pEnv;
int nAlloc;
int i;
nAlloc = ((db->nShm + NINCR - 1) / NINCR) * NINCR;
while( nChunk>=nAlloc ){
void **apShm;
nAlloc += NINCR;
apShm = lsmRealloc(pEnv, db->apShm, sizeof(void*)*nAlloc);
if( !apShm ) return LSM_NOMEM_BKPT;
db->apShm = apShm;
}
if( db->bRoTrans ){
for(i=db->nShm; rc==LSM_OK && i<nChunk; i++){
db->apShm[i] = lsmMallocZeroRc(pEnv, LSM_SHM_CHUNK_SIZE, &rc);
db->nShm++;
}
}else{
lsmMutexEnter(pEnv, p->pClientMutex);
nAlloc = ((p->nShmChunk + NINCR - 1) / NINCR) * NINCR;
while( nChunk>=nAlloc ){
void **apShm;
nAlloc += NINCR;
apShm = lsmRealloc(pEnv, p->apShmChunk, sizeof(void*)*nAlloc);
if( !apShm ){
rc = LSM_NOMEM_BKPT;
break;
}
p->apShmChunk = apShm;
}
for(i=db->nShm; rc==LSM_OK && i<nChunk; i++){
if( i>=p->nShmChunk ){
void *pChunk = 0;
if( p->bMultiProc==0 ){
pChunk = lsmMallocZeroRc(pEnv, LSM_SHM_CHUNK_SIZE, &rc);
}else{
rc = lsmEnvShmMap(pEnv, p->pFile, i, LSM_SHM_CHUNK_SIZE, &pChunk);
}
if( rc==LSM_OK ){
p->apShmChunk[i] = pChunk;
p->nShmChunk++;
}
}
if( rc==LSM_OK ){
db->apShm[i] = p->apShmChunk[i];
db->nShm++;
}
}
lsmMutexLeave(pEnv, p->pClientMutex);
}
}
return rc;
}
static int lockSharedFile(lsm_env *pEnv, Database *p, int iLock, int eOp){
int rc = LSM_OK;
if( p->bMultiProc ){
rc = lsmEnvLock(pEnv, p->pFile, iLock, eOp);
}
return rc;
}
int lsmShmTestLock(
lsm_db *db,
int iLock,
int nLock,
int eOp
){
int rc = LSM_OK;
lsm_db *pIter;
Database *p = db->pDatabase;
int i;
u64 mask = 0;
for(i=iLock; i<(iLock+nLock); i++){
mask |= ((u64)1 << (iLock-1));
if( eOp==LSM_LOCK_EXCL ) mask |= ((u64)1 << (iLock+32-1));
}
lsmMutexEnter(db->pEnv, p->pClientMutex);
for(pIter=p->pConn; pIter; pIter=pIter->pNext){
if( pIter!=db && (pIter->mLock & mask) ){
assert( pIter!=db );
break;
}
}
if( pIter ){
rc = LSM_BUSY;
}else if( p->bMultiProc ){
rc = lsmEnvTestLock(db->pEnv, p->pFile, iLock, nLock, eOp);
}
lsmMutexLeave(db->pEnv, p->pClientMutex);
return rc;
}
int lsmShmLock(
lsm_db *db,
int iLock,
int eOp,
int bBlock
){
lsm_db *pIter;
const u64 me = ((u64)1 << (iLock-1));
const u64 ms = ((u64)1 << (iLock+32-1));
int rc = LSM_OK;
Database *p = db->pDatabase;
assert( eOp!=LSM_LOCK_EXCL || p->bReadonly==0 );
assert( iLock>=1 && iLock<=LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT-1) );
assert( LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT-1)<=32 );
assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );
if( (eOp==LSM_LOCK_UNLOCK && (db->mLock & (me|ms))!=0)
|| (eOp==LSM_LOCK_SHARED && (db->mLock & (me|ms))!=ms)
|| (eOp==LSM_LOCK_EXCL && (db->mLock & me)==0)
){
int nExcl = 0;
int nShared = 0;
lsmMutexEnter(db->pEnv, p->pClientMutex);
for(pIter=p->pConn; pIter; pIter=pIter->pNext){
assert( (pIter->mLock & me)==0 || (pIter->mLock & ms)!=0 );
if( pIter!=db ){
if( pIter->mLock & me ){
nExcl++;
}else if( pIter->mLock & ms ){
nShared++;
}
}
}
assert( nExcl==0 || nExcl==1 );
assert( nExcl==0 || nShared==0 );
assert( nExcl==0 || (db->mLock & (me|ms))==0 );
switch( eOp ){
case LSM_LOCK_UNLOCK:
if( nShared==0 ){
lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_UNLOCK);
}
db->mLock &= ~(me|ms);
break;
case LSM_LOCK_SHARED:
if( nExcl ){
rc = LSM_BUSY;
}else{
if( nShared==0 ){
rc = lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_SHARED);
}
if( rc==LSM_OK ){
db->mLock |= ms;
db->mLock &= ~me;
}
}
break;
default:
assert( eOp==LSM_LOCK_EXCL );
if( nExcl || nShared ){
rc = LSM_BUSY;
}else{
rc = lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_EXCL);
if( rc==LSM_OK ){
db->mLock |= (me|ms);
}
}
break;
}
lsmMutexLeave(db->pEnv, p->pClientMutex);
}
return rc;
}
#ifdef LSM_DEBUG
int shmLockType(lsm_db *db, int iLock){
const u64 me = ((u64)1 << (iLock-1));
const u64 ms = ((u64)1 << (iLock+32-1));
if( db->mLock & me ) return LSM_LOCK_EXCL;
if( db->mLock & ms ) return LSM_LOCK_SHARED;
return LSM_LOCK_UNLOCK;
}
int lsmShmAssertLock(lsm_db *db, int iLock, int eOp){
int ret = 0;
int eHave;
assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) );
assert( iLock<=16 );
assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );
eHave = shmLockType(db, iLock);
switch( eOp ){
case LSM_LOCK_UNLOCK:
ret = (eHave==LSM_LOCK_UNLOCK);
break;
case LSM_LOCK_SHARED:
ret = (eHave!=LSM_LOCK_UNLOCK);
break;
case LSM_LOCK_EXCL:
ret = (eHave==LSM_LOCK_EXCL);
break;
default:
assert( !"bad eOp value passed to lsmShmAssertLock()" );
break;
}
return ret;
}
int lsmShmAssertWorker(lsm_db *db){
return lsmShmAssertLock(db, LSM_LOCK_WORKER, LSM_LOCK_EXCL) && db->pWorker;
}
void print_db_locks(lsm_db *db){
int iLock;
for(iLock=0; iLock<16; iLock++){
int bOne = 0;
const char *azLock[] = {0, "shared", "exclusive"};
const char *azName[] = {
0, "dms1", "dms2", "writer", "worker", "checkpointer",
"reader0", "reader1", "reader2", "reader3", "reader4", "reader5"
};
int eHave = shmLockType(db, iLock);
if( azLock[eHave] ){
printf("%s(%s on %s)", (bOne?" ":""), azLock[eHave], azName[iLock]);
bOne = 1;
}
}
printf("\n");
}
void print_all_db_locks(lsm_db *db){
lsm_db *p;
for(p=db->pDatabase->pConn; p; p=p->pNext){
printf("%s connection %p ", ((p==db)?"*":""), p);
print_db_locks(p);
}
}
#endif
void lsmShmBarrier(lsm_db *db){
lsmEnvShmBarrier(db->pEnv);
}
int lsm_checkpoint(lsm_db *pDb, int *pnKB){
int rc;
u32 nWrite = 0;
rc = lsmCheckpointWrite(pDb, &nWrite);
if( pnKB ){
int nKB = 0;
if( rc==LSM_OK && nWrite ){
nKB = (((i64)nWrite * lsmFsPageSize(pDb->pFS)) + 1023) / 1024;
}
*pnKB = nKB;
}
return rc;
}