#include "lsmInt.h"
#ifdef LSM_DEBUG
int lsmErrorBkpt(int rc){
return rc;
}
static void assert_db_state(lsm_db *pDb){
assert( (pDb->pCsr!=0||pDb->nTransOpen>0)==(pDb->iReader>=0||pDb->bRoTrans) );
assert( (pDb->iReader<0 && pDb->bRoTrans==0) || pDb->pClient!=0 );
assert( pDb->nTransOpen>=0 );
}
#else
# define assert_db_state(x)
#endif
static int xCmp(void *p1, int n1, void *p2, int n2){
int res;
res = memcmp(p1, p2, LSM_MIN(n1, n2));
if( res==0 ) res = (n1-n2);
return res;
}
static void xLog(void *pCtx, int rc, const char *z){
(void)(rc);
(void)(pCtx);
fprintf(stderr, "%s\n", z);
fflush(stderr);
}
int lsm_new(lsm_env *pEnv, lsm_db **ppDb){
lsm_db *pDb;
if( pEnv==0 ) pEnv = lsm_default_env();
assert( pEnv );
*ppDb = pDb = (lsm_db *)lsmMallocZero(pEnv, sizeof(lsm_db));
if( pDb==0 ) return LSM_NOMEM_BKPT;
pDb->pEnv = pEnv;
pDb->nTreeLimit = LSM_DFLT_AUTOFLUSH;
pDb->nAutockpt = LSM_DFLT_AUTOCHECKPOINT;
pDb->bAutowork = LSM_DFLT_AUTOWORK;
pDb->eSafety = LSM_DFLT_SAFETY;
pDb->xCmp = xCmp;
pDb->nDfltPgsz = LSM_DFLT_PAGE_SIZE;
pDb->nDfltBlksz = LSM_DFLT_BLOCK_SIZE;
pDb->nMerge = LSM_DFLT_AUTOMERGE;
pDb->nMaxFreelist = LSM_MAX_FREELIST_ENTRIES;
pDb->bUseLog = LSM_DFLT_USE_LOG;
pDb->iReader = -1;
pDb->iRwclient = -1;
pDb->bMultiProc = LSM_DFLT_MULTIPLE_PROCESSES;
pDb->iMmap = LSM_DFLT_MMAP;
pDb->xLog = xLog;
pDb->compress.iId = LSM_COMPRESSION_NONE;
return LSM_OK;
}
lsm_env *lsm_get_env(lsm_db *pDb){
assert( pDb->pEnv );
return pDb->pEnv;
}
static void dbReleaseClientSnapshot(lsm_db *pDb){
if( pDb->nTransOpen==0 && pDb->pCsr==0 ){
lsmFinishReadTrans(pDb);
}
}
static int getFullpathname(
lsm_env *pEnv,
const char *zRel,
char **pzAbs
){
int nAlloc = 0;
char *zAlloc = 0;
int nReq = 0;
int rc;
do{
nAlloc = nReq;
rc = pEnv->xFullpath(pEnv, zRel, zAlloc, &nReq);
if( nReq>nAlloc ){
zAlloc = lsmReallocOrFreeRc(pEnv, zAlloc, nReq, &rc);
}
}while( nReq>nAlloc && rc==LSM_OK );
if( rc!=LSM_OK ){
lsmFree(pEnv, zAlloc);
zAlloc = 0;
}
*pzAbs = zAlloc;
return rc;
}
static void assertRwclientLockValue(lsm_db *db){
#ifndef NDEBUG
u64 msk;
u64 rwclient = 0;
if( db->iRwclient>=0 ){
rwclient = ((u64)1 << (LSM_LOCK_RWCLIENT(db->iRwclient)-1));
}
msk = ((u64)1 << (LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT)-1)) - 1;
msk -= (((u64)1 << (LSM_LOCK_RWCLIENT(0)-1)) - 1);
assert( (db->mLock & msk)==rwclient );
#endif
}
int lsm_open(lsm_db *pDb, const char *zFilename){
int rc;
if( pDb->pDatabase ){
rc = LSM_MISUSE;
}else{
char *zFull;
rc = getFullpathname(pDb->pEnv, zFilename, &zFull);
assert( rc==LSM_OK || zFull==0 );
if( rc==LSM_OK ){
rc = lsmDbDatabaseConnect(pDb, zFull);
}
if( pDb->bReadonly==0 ){
if( rc==LSM_OK && LSM_OK==(rc = lsmCheckpointLoad(pDb, 0)) ){
lsmFsSetPageSize(pDb->pFS, lsmCheckpointPgsz(pDb->aSnapshot));
lsmFsSetBlockSize(pDb->pFS, lsmCheckpointBlksz(pDb->aSnapshot));
}
}
lsmFree(pDb->pEnv, zFull);
assertRwclientLockValue(pDb);
}
assert( pDb->bReadonly==0 || pDb->bReadonly==1 );
assert( rc!=LSM_OK || (pDb->pShmhdr==0)==(pDb->bReadonly==1) );
return rc;
}
int lsm_close(lsm_db *pDb){
int rc = LSM_OK;
if( pDb ){
assert_db_state(pDb);
if( pDb->pCsr || pDb->nTransOpen ){
rc = LSM_MISUSE_BKPT;
}else{
lsmMCursorFreeCache(pDb);
lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
pDb->pClient = 0;
assertRwclientLockValue(pDb);
lsmDbDatabaseRelease(pDb);
lsmLogClose(pDb);
lsmFsClose(pDb->pFS);
if( pDb->factory.xFree ) pDb->factory.xFree(pDb->factory.pCtx);
if( pDb->compress.xFree ) pDb->compress.xFree(pDb->compress.pCtx);
lsmFree(pDb->pEnv, pDb->rollback.aArray);
lsmFree(pDb->pEnv, pDb->aTrans);
lsmFree(pDb->pEnv, pDb->apShm);
lsmFree(pDb->pEnv, pDb);
}
}
return rc;
}
int lsm_config(lsm_db *pDb, int eParam, ...){
int rc = LSM_OK;
va_list ap;
va_start(ap, eParam);
switch( eParam ){
case LSM_CONFIG_AUTOFLUSH: {
int *piVal = va_arg(ap, int *);
int iVal = *piVal;
if( iVal>=0 && iVal<=(1024*1024) ){
pDb->nTreeLimit = iVal*1024;
}
*piVal = (pDb->nTreeLimit / 1024);
break;
}
case LSM_CONFIG_AUTOWORK: {
int *piVal = va_arg(ap, int *);
if( *piVal>=0 ){
pDb->bAutowork = *piVal;
}
*piVal = pDb->bAutowork;
break;
}
case LSM_CONFIG_AUTOCHECKPOINT: {
int *piVal = va_arg(ap, int *);
if( *piVal>=0 ){
int iVal = *piVal;
pDb->nAutockpt = (i64)iVal * 1024;
}
*piVal = (int)(pDb->nAutockpt / 1024);
break;
}
case LSM_CONFIG_PAGE_SIZE: {
int *piVal = va_arg(ap, int *);
if( pDb->pDatabase ){
*piVal = lsmFsPageSize(pDb->pFS);
}else{
if( *piVal>=256 && *piVal<=65536 && ((*piVal-1) & *piVal)==0 ){
pDb->nDfltPgsz = *piVal;
}else{
*piVal = pDb->nDfltPgsz;
}
}
break;
}
case LSM_CONFIG_BLOCK_SIZE: {
int *piVal = va_arg(ap, int *);
if( pDb->pDatabase ){
*piVal = lsmFsBlockSize(pDb->pFS) / 1024;
}else{
int iVal = *piVal;
if( iVal>=64 && iVal<=65536 && ((iVal-1) & iVal)==0 ){
pDb->nDfltBlksz = iVal * 1024;
}else{
*piVal = pDb->nDfltBlksz / 1024;
}
}
break;
}
case LSM_CONFIG_SAFETY: {
int *piVal = va_arg(ap, int *);
if( *piVal>=0 && *piVal<=2 ){
pDb->eSafety = *piVal;
}
*piVal = pDb->eSafety;
break;
}
case LSM_CONFIG_MMAP: {
int *piVal = va_arg(ap, int *);
if( pDb->iReader<0 && *piVal>=0 ){
pDb->iMmap = *piVal;
rc = lsmFsConfigure(pDb);
}
*piVal = pDb->iMmap;
break;
}
case LSM_CONFIG_USE_LOG: {
int *piVal = va_arg(ap, int *);
if( pDb->nTransOpen==0 && (*piVal==0 || *piVal==1) ){
pDb->bUseLog = *piVal;
}
*piVal = pDb->bUseLog;
break;
}
case LSM_CONFIG_AUTOMERGE: {
int *piVal = va_arg(ap, int *);
if( *piVal>1 ) pDb->nMerge = *piVal;
*piVal = pDb->nMerge;
break;
}
case LSM_CONFIG_MAX_FREELIST: {
int *piVal = va_arg(ap, int *);
if( *piVal>=2 && *piVal<=LSM_MAX_FREELIST_ENTRIES ){
pDb->nMaxFreelist = *piVal;
}
*piVal = pDb->nMaxFreelist;
break;
}
case LSM_CONFIG_MULTIPLE_PROCESSES: {
int *piVal = va_arg(ap, int *);
if( pDb->pDatabase ){
*piVal = lsmDbMultiProc(pDb);
}else{
pDb->bMultiProc = *piVal = (*piVal!=0);
}
break;
}
case LSM_CONFIG_READONLY: {
int *piVal = va_arg(ap, int *);
if( pDb->pDatabase==0 && *piVal>=0 ){
pDb->bReadonly = *piVal = (*piVal!=0);
}
*piVal = pDb->bReadonly;
break;
}
case LSM_CONFIG_SET_COMPRESSION: {
lsm_compress *p = va_arg(ap, lsm_compress *);
if( pDb->iReader>=0 && pDb->bInFactory==0 ){
rc = LSM_MISUSE_BKPT;
}else{
if( pDb->compress.xFree ){
pDb->compress.xFree(pDb->compress.pCtx);
}
if( p->xBound==0 ){
memset(&pDb->compress, 0, sizeof(lsm_compress));
pDb->compress.iId = LSM_COMPRESSION_NONE;
}else{
memcpy(&pDb->compress, p, sizeof(lsm_compress));
}
rc = lsmFsConfigure(pDb);
}
break;
}
case LSM_CONFIG_SET_COMPRESSION_FACTORY: {
lsm_compress_factory *p = va_arg(ap, lsm_compress_factory *);
if( pDb->factory.xFree ){
pDb->factory.xFree(pDb->factory.pCtx);
}
memcpy(&pDb->factory, p, sizeof(lsm_compress_factory));
break;
}
case LSM_CONFIG_GET_COMPRESSION: {
lsm_compress *p = va_arg(ap, lsm_compress *);
memcpy(p, &pDb->compress, sizeof(lsm_compress));
break;
}
default:
rc = LSM_MISUSE;
break;
}
va_end(ap);
return rc;
}
void lsmAppendSegmentList(LsmString *pStr, char *zPre, Segment *pSeg){
lsmStringAppendf(pStr, "%s{%lld %lld %lld %lld}", zPre,
pSeg->iFirst, pSeg->iLastPg, pSeg->iRoot, pSeg->nSize
);
}
static int infoGetWorker(lsm_db *pDb, Snapshot **pp, int *pbUnlock){
int rc = LSM_OK;
assert( *pbUnlock==0 );
if( !pDb->pWorker ){
rc = lsmBeginWork(pDb);
if( rc!=LSM_OK ) return rc;
*pbUnlock = 1;
}
if( pp ) *pp = pDb->pWorker;
return rc;
}
static void infoFreeWorker(lsm_db *pDb, int bUnlock){
if( bUnlock ){
int rcdummy = LSM_BUSY;
lsmFinishWork(pDb, 0, &rcdummy);
}
}
int lsmStructList(
lsm_db *pDb,
char **pzOut
){
Level *pTopLevel = 0;
int rc = LSM_OK;
Level *p;
LsmString s;
Snapshot *pWorker;
int bUnlock = 0;
rc = infoGetWorker(pDb, &pWorker, &bUnlock);
if( rc!=LSM_OK ) return rc;
pTopLevel = lsmDbSnapshotLevel(pWorker);
lsmStringInit(&s, pDb->pEnv);
for(p=pTopLevel; rc==LSM_OK && p; p=p->pNext){
int i;
lsmStringAppendf(&s, "%s{%d", (s.n ? " " : ""), (int)p->iAge);
lsmAppendSegmentList(&s, " ", &p->lhs);
for(i=0; rc==LSM_OK && i<p->nRight; i++){
lsmAppendSegmentList(&s, " ", &p->aRhs[i]);
}
lsmStringAppend(&s, "}", 1);
}
rc = s.n>=0 ? LSM_OK : LSM_NOMEM;
infoFreeWorker(pDb, bUnlock);
*pzOut = s.z;
return rc;
}
static int infoFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
LsmString *pStr = (LsmString *)pCtx;
lsmStringAppendf(pStr, "%s{%d %lld}", (pStr->n?" ":""), iBlk, iSnapshot);
return 0;
}
int lsmInfoFreelist(lsm_db *pDb, char **pzOut){
Snapshot *pWorker;
int bUnlock = 0;
LsmString s;
int rc;
rc = infoGetWorker(pDb, &pWorker, &bUnlock);
if( rc!=LSM_OK ) return rc;
lsmStringInit(&s, pDb->pEnv);
rc = lsmWalkFreelist(pDb, 0, infoFreelistCb, &s);
if( rc!=LSM_OK ){
lsmFree(pDb->pEnv, s.z);
}else{
*pzOut = s.z;
}
infoFreeWorker(pDb, bUnlock);
return rc;
}
static int infoTreeSize(lsm_db *db, int *pnOldKB, int *pnNewKB){
ShmHeader *pShm = db->pShmhdr;
TreeHeader *p = &pShm->hdr1;
*pnNewKB = ((int)p->root.nByte + 1023) / 1024;
if( p->iOldShmid ){
if( p->iOldLog==lsmCheckpointLogOffset(pShm->aSnap1) ){
*pnOldKB = 0;
}else{
*pnOldKB = ((int)p->oldroot.nByte + 1023) / 1024;
}
}else{
*pnOldKB = 0;
}
return LSM_OK;
}
int lsm_info(lsm_db *pDb, int eParam, ...){
int rc = LSM_OK;
va_list ap;
va_start(ap, eParam);
switch( eParam ){
case LSM_INFO_NWRITE: {
int *piVal = va_arg(ap, int *);
*piVal = lsmFsNWrite(pDb->pFS);
break;
}
case LSM_INFO_NREAD: {
int *piVal = va_arg(ap, int *);
*piVal = lsmFsNRead(pDb->pFS);
break;
}
case LSM_INFO_DB_STRUCTURE: {
char **pzVal = va_arg(ap, char **);
rc = lsmStructList(pDb, pzVal);
break;
}
case LSM_INFO_ARRAY_STRUCTURE: {
LsmPgno pgno = va_arg(ap, LsmPgno);
char **pzVal = va_arg(ap, char **);
rc = lsmInfoArrayStructure(pDb, 0, pgno, pzVal);
break;
}
case LSM_INFO_ARRAY_PAGES: {
LsmPgno pgno = va_arg(ap, LsmPgno);
char **pzVal = va_arg(ap, char **);
rc = lsmInfoArrayPages(pDb, pgno, pzVal);
break;
}
case LSM_INFO_PAGE_HEX_DUMP:
case LSM_INFO_PAGE_ASCII_DUMP: {
LsmPgno pgno = va_arg(ap, LsmPgno);
char **pzVal = va_arg(ap, char **);
int bUnlock = 0;
rc = infoGetWorker(pDb, 0, &bUnlock);
if( rc==LSM_OK ){
int bHex = (eParam==LSM_INFO_PAGE_HEX_DUMP);
rc = lsmInfoPageDump(pDb, pgno, bHex, pzVal);
}
infoFreeWorker(pDb, bUnlock);
break;
}
case LSM_INFO_LOG_STRUCTURE: {
char **pzVal = va_arg(ap, char **);
rc = lsmInfoLogStructure(pDb, pzVal);
break;
}
case LSM_INFO_FREELIST: {
char **pzVal = va_arg(ap, char **);
rc = lsmInfoFreelist(pDb, pzVal);
break;
}
case LSM_INFO_CHECKPOINT_SIZE: {
int *pnKB = va_arg(ap, int *);
rc = lsmCheckpointSize(pDb, pnKB);
break;
}
case LSM_INFO_TREE_SIZE: {
int *pnOld = va_arg(ap, int *);
int *pnNew = va_arg(ap, int *);
rc = infoTreeSize(pDb, pnOld, pnNew);
break;
}
case LSM_INFO_COMPRESSION_ID: {
unsigned int *piOut = va_arg(ap, unsigned int *);
if( pDb->pClient ){
*piOut = pDb->pClient->iCmpId;
}else{
rc = lsmInfoCompressionId(pDb, piOut);
}
break;
}
default:
rc = LSM_MISUSE;
break;
}
va_end(ap);
return rc;
}
static int doWriteOp(
lsm_db *pDb,
int bDeleteRange,
const void *pKey, int nKey,
const void *pVal, int nVal
){
int rc = LSM_OK;
int bCommit = 0;
if( pDb->nTransOpen==0 ){
bCommit = 1;
rc = lsm_begin(pDb, 1);
}
if( rc==LSM_OK ){
int eType = (bDeleteRange ? LSM_DRANGE : (nVal>=0?LSM_WRITE:LSM_DELETE));
rc = lsmLogWrite(pDb, eType, (void *)pKey, nKey, (void *)pVal, nVal);
}
lsmSortedSaveTreeCursors(pDb);
if( rc==LSM_OK ){
int pgsz = lsmFsPageSize(pDb->pFS);
int nQuant = LSM_AUTOWORK_QUANT * pgsz;
int nBefore;
int nAfter;
int nDiff;
if( nQuant>pDb->nTreeLimit ){
nQuant = LSM_MAX(pDb->nTreeLimit, pgsz);
}
nBefore = lsmTreeSize(pDb);
if( bDeleteRange ){
rc = lsmTreeDelete(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
}else{
rc = lsmTreeInsert(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
}
nAfter = lsmTreeSize(pDb);
nDiff = (nAfter/nQuant) - (nBefore/nQuant);
if( rc==LSM_OK && pDb->bAutowork && nDiff!=0 ){
rc = lsmSortedAutoWork(pDb, nDiff * LSM_AUTOWORK_QUANT);
}
}
if( bCommit ){
if( rc==LSM_OK ){
rc = lsm_commit(pDb, 0);
}else{
lsm_rollback(pDb, 0);
}
}
return rc;
}
int lsm_insert(
lsm_db *db,
const void *pKey, int nKey,
const void *pVal, int nVal
){
return doWriteOp(db, 0, pKey, nKey, pVal, nVal);
}
int lsm_delete(lsm_db *db, const void *pKey, int nKey){
return doWriteOp(db, 0, pKey, nKey, 0, -1);
}
int lsm_delete_range(
lsm_db *db,
const void *pKey1, int nKey1,
const void *pKey2, int nKey2
){
int rc = LSM_OK;
if( db->xCmp((void *)pKey1, nKey1, (void *)pKey2, nKey2)<0 ){
rc = doWriteOp(db, 1, pKey1, nKey1, pKey2, nKey2);
}
return rc;
}
int lsm_csr_open(lsm_db *pDb, lsm_cursor **ppCsr){
int rc = LSM_OK;
MultiCursor *pCsr = 0;
assert_db_state(pDb);
if( pDb->pShmhdr==0 ){
assert( pDb->bReadonly );
rc = lsmBeginRoTrans(pDb);
}else if( pDb->iReader<0 ){
rc = lsmBeginReadTrans(pDb);
}
if( rc==LSM_OK ){
rc = lsmMCursorNew(pDb, &pCsr);
}
if( rc!=LSM_OK ){
lsmMCursorClose(pCsr, 0);
dbReleaseClientSnapshot(pDb);
}
assert_db_state(pDb);
*ppCsr = (lsm_cursor *)pCsr;
return rc;
}
int lsm_csr_close(lsm_cursor *p){
if( p ){
lsm_db *pDb = lsmMCursorDb((MultiCursor *)p);
assert_db_state(pDb);
lsmMCursorClose((MultiCursor *)p, 1);
dbReleaseClientSnapshot(pDb);
assert_db_state(pDb);
}
return LSM_OK;
}
int lsm_csr_seek(lsm_cursor *pCsr, const void *pKey, int nKey, int eSeek){
return lsmMCursorSeek((MultiCursor *)pCsr, 0, (void *)pKey, nKey, eSeek);
}
int lsm_csr_next(lsm_cursor *pCsr){
return lsmMCursorNext((MultiCursor *)pCsr);
}
int lsm_csr_prev(lsm_cursor *pCsr){
return lsmMCursorPrev((MultiCursor *)pCsr);
}
int lsm_csr_first(lsm_cursor *pCsr){
return lsmMCursorFirst((MultiCursor *)pCsr);
}
int lsm_csr_last(lsm_cursor *pCsr){
return lsmMCursorLast((MultiCursor *)pCsr);
}
int lsm_csr_valid(lsm_cursor *pCsr){
return lsmMCursorValid((MultiCursor *)pCsr);
}
int lsm_csr_key(lsm_cursor *pCsr, const void **ppKey, int *pnKey){
return lsmMCursorKey((MultiCursor *)pCsr, (void **)ppKey, pnKey);
}
int lsm_csr_value(lsm_cursor *pCsr, const void **ppVal, int *pnVal){
return lsmMCursorValue((MultiCursor *)pCsr, (void **)ppVal, pnVal);
}
void lsm_config_log(
lsm_db *pDb,
void (*xLog)(void *, int, const char *),
void *pCtx
){
pDb->xLog = xLog;
pDb->pLogCtx = pCtx;
}
void lsm_config_work_hook(
lsm_db *pDb,
void (*xWork)(lsm_db *, void *),
void *pCtx
){
pDb->xWork = xWork;
pDb->pWorkCtx = pCtx;
}
void lsmLogMessage(lsm_db *pDb, int rc, const char *zFormat, ...){
if( pDb->xLog ){
LsmString s;
va_list ap, ap2;
lsmStringInit(&s, pDb->pEnv);
va_start(ap, zFormat);
va_start(ap2, zFormat);
lsmStringVAppendf(&s, zFormat, ap, ap2);
va_end(ap);
va_end(ap2);
pDb->xLog(pDb->pLogCtx, rc, s.z);
lsmStringClear(&s);
}
}
int lsm_begin(lsm_db *pDb, int iLevel){
int rc;
assert_db_state( pDb );
rc = (pDb->bReadonly ? LSM_READONLY : LSM_OK);
if( iLevel<0 ) iLevel = pDb->nTransOpen + 1;
if( iLevel>pDb->nTransOpen ){
int i;
if( rc==LSM_OK && pDb->nTransAlloc<iLevel ){
TransMark *aNew;
int nByte = sizeof(TransMark) * (iLevel+1);
aNew = (TransMark *)lsmRealloc(pDb->pEnv, pDb->aTrans, nByte);
if( !aNew ){
rc = LSM_NOMEM;
}else{
nByte = sizeof(TransMark) * (iLevel+1 - pDb->nTransAlloc);
memset(&aNew[pDb->nTransAlloc], 0, nByte);
pDb->nTransAlloc = iLevel+1;
pDb->aTrans = aNew;
}
}
if( rc==LSM_OK && pDb->nTransOpen==0 ){
rc = lsmBeginWriteTrans(pDb);
}
if( rc==LSM_OK ){
for(i=pDb->nTransOpen; i<iLevel; i++){
lsmTreeMark(pDb, &pDb->aTrans[i].tree);
lsmLogTell(pDb, &pDb->aTrans[i].log);
}
pDb->nTransOpen = iLevel;
}
}
return rc;
}
int lsm_commit(lsm_db *pDb, int iLevel){
int rc = LSM_OK;
assert_db_state( pDb );
if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);
if( iLevel<pDb->nTransOpen ){
if( iLevel==0 ){
int rc2;
if( rc==LSM_OK ) rc = lsmLogCommit(pDb);
if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){
rc = lsmFsSyncLog(pDb->pFS);
}
rc2 = lsmFinishWriteTrans(pDb, (rc==LSM_OK));
if( rc==LSM_OK ) rc = rc2;
}
pDb->nTransOpen = iLevel;
}
dbReleaseClientSnapshot(pDb);
return rc;
}
int lsm_rollback(lsm_db *pDb, int iLevel){
int rc = LSM_OK;
assert_db_state( pDb );
if( pDb->nTransOpen ){
if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);
if( iLevel<=pDb->nTransOpen ){
TransMark *pMark = &pDb->aTrans[(iLevel==0 ? 0 : iLevel-1)];
lsmTreeRollback(pDb, &pMark->tree);
if( iLevel ) lsmLogSeek(pDb, &pMark->log);
pDb->nTransOpen = iLevel;
}
if( pDb->nTransOpen==0 ){
lsmFinishWriteTrans(pDb, 0);
}
dbReleaseClientSnapshot(pDb);
}
return rc;
}
int lsm_get_user_version(lsm_db *pDb, unsigned int *piUsr){
int rc = LSM_OK;
assert_db_state(pDb);
if( pDb->pShmhdr==0 ){
assert( pDb->bReadonly );
rc = lsmBeginRoTrans(pDb);
}else if( pDb->iReader<0 ){
rc = lsmBeginReadTrans(pDb);
}
if( rc==LSM_OK ){
*piUsr = pDb->treehdr.iUsrVersion;
}
dbReleaseClientSnapshot(pDb);
assert_db_state(pDb);
return rc;
}
int lsm_set_user_version(lsm_db *pDb, unsigned int iUsr){
int rc = LSM_OK;
int bCommit = 0;
if( pDb->nTransOpen==0 ){
bCommit = 1;
rc = lsm_begin(pDb, 1);
}
if( rc==LSM_OK ){
pDb->treehdr.iUsrVersion = iUsr;
}
if( bCommit ){
if( rc==LSM_OK ){
rc = lsm_commit(pDb, 0);
}else{
lsm_rollback(pDb, 0);
}
}
return rc;
}