#ifndef _LSM_INT_H
# include "lsmInt.h"
#endif
#define LSM_LOG_EOF 0x00
#define LSM_LOG_PAD1 0x01
#define LSM_LOG_PAD2 0x02
#define LSM_LOG_COMMIT 0x03
#define LSM_LOG_JUMP 0x04
#define LSM_LOG_WRITE 0x06
#define LSM_LOG_WRITE_CKSUM 0x07
#define LSM_LOG_DELETE 0x08
#define LSM_LOG_DELETE_CKSUM 0x09
#define LSM_LOG_DRANGE 0x0A
#define LSM_LOG_DRANGE_CKSUM 0x0B
#define LSM_CKSUM_MAXDATA (32*1024)
#define LSM_MIN_LOGWRAP (128*1024)
struct LogWriter {
u32 cksum0;
u32 cksum1;
int iCksumBuf;
i64 iOff;
int szSector;
LogRegion jump;
i64 iRegion1End;
i64 iRegion2Start;
LsmString buf;
};
static u32 getU32le(u8 *aIn){
return ((u32)aIn[3] << 24)
+ ((u32)aIn[2] << 16)
+ ((u32)aIn[1] << 8)
+ ((u32)aIn[0]);
}
static void logCksumUnaligned(
char *z,
int n,
u32 *pCksum0,
u32 *pCksum1
){
u8 *a = (u8 *)z;
u32 cksum0 = *pCksum0;
u32 cksum1 = *pCksum1;
int nIn = (n/8) * 8;
int i;
assert( n>0 );
for(i=0; i<nIn; i+=8){
cksum0 += getU32le(&a[i]) + cksum1;
cksum1 += getU32le(&a[i+4]) + cksum0;
}
if( nIn!=n ){
u8 aBuf[8] = {0, 0, 0, 0, 0, 0, 0, 0};
assert( (n-nIn)<8 && n>nIn );
memcpy(aBuf, &a[nIn], n-nIn);
cksum0 += getU32le(aBuf) + cksum1;
cksum1 += getU32le(&aBuf[4]) + cksum0;
}
*pCksum0 = cksum0;
*pCksum1 = cksum1;
}
static void logUpdateCksum(LogWriter *pLog, int nBuf){
assert( (pLog->iCksumBuf % 8)==0 );
assert( pLog->iCksumBuf<=nBuf );
assert( (nBuf % 8)==0 || nBuf==pLog->buf.n );
if( nBuf>pLog->iCksumBuf ){
logCksumUnaligned(
&pLog->buf.z[pLog->iCksumBuf], nBuf-pLog->iCksumBuf,
&pLog->cksum0, &pLog->cksum1
);
}
pLog->iCksumBuf = nBuf;
}
static i64 firstByteOnSector(LogWriter *pLog, i64 iOff){
return (iOff / pLog->szSector) * pLog->szSector;
}
static i64 lastByteOnSector(LogWriter *pLog, i64 iOff){
return firstByteOnSector(pLog, iOff) + pLog->szSector - 1;
}
static int logReclaimSpace(lsm_db *pDb){
int rc;
int iMeta;
int bRotrans;
rc = lsmDetectRoTrans(pDb, &bRotrans);
if( rc!=LSM_OK || bRotrans ) return rc;
iMeta = (int)pDb->pShmhdr->iMetaPage;
if( iMeta==1 || iMeta==2 ){
DbLog *pLog = &pDb->treehdr.log;
i64 iSyncedId;
rc = lsmFsReadSyncedId(pDb, iMeta, &iSyncedId);
if( rc==LSM_OK && pLog->iSnapshotId!=iSyncedId ){
i64 iSnapshotId = 0;
i64 iOff = 0;
rc = lsmCheckpointSynced(pDb, &iSnapshotId, &iOff, 0);
if( rc==LSM_OK && pLog->iSnapshotId<iSnapshotId ){
int iRegion;
for(iRegion=0; iRegion<3; iRegion++){
LogRegion *p = &pLog->aRegion[iRegion];
if( iOff>=p->iStart && iOff<=p->iEnd ) break;
p->iStart = 0;
p->iEnd = 0;
}
assert( iRegion<3 );
pLog->aRegion[iRegion].iStart = iOff;
pLog->iSnapshotId = iSnapshotId;
}
}
}
return rc;
}
int lsmLogBegin(lsm_db *pDb){
int rc = LSM_OK;
LogWriter *pNew;
LogRegion *aReg;
if( pDb->bUseLog==0 ) return LSM_OK;
rc = lsmFsOpenLog(pDb, 0);
if( pDb->pLogWriter==0 ){
pNew = lsmMallocZeroRc(pDb->pEnv, sizeof(LogWriter), &rc);
if( pNew ){
lsmStringInit(&pNew->buf, pDb->pEnv);
rc = lsmStringExtend(&pNew->buf, 2);
}
pDb->pLogWriter = pNew;
}else{
pNew = pDb->pLogWriter;
assert( (u8 *)(&pNew[1])==(u8 *)(&((&pNew->buf)[1])) );
memset(pNew, 0, ((u8 *)&pNew->buf) - (u8 *)pNew);
pNew->buf.n = 0;
}
if( rc==LSM_OK ){
rc = logReclaimSpace(pDb);
}
if( rc!=LSM_OK ){
lsmLogClose(pDb);
return rc;
}
if( pDb->eSafety==LSM_SAFETY_FULL ){
pNew->szSector = lsmFsSectorSize(pDb->pFS);
assert( pNew->szSector>0 );
}else{
pNew->szSector = 1;
}
aReg = &pDb->treehdr.log.aRegion[0];
assert( aReg[0].iEnd==0 || aReg[0].iEnd>aReg[0].iStart );
assert( aReg[1].iEnd==0 || aReg[1].iEnd>aReg[1].iStart );
pNew->cksum0 = pDb->treehdr.log.cksum0;
pNew->cksum1 = pDb->treehdr.log.cksum1;
if( aReg[0].iEnd==0 && aReg[1].iEnd==0 && aReg[2].iStart>=LSM_MIN_LOGWRAP ){
u8 aJump[] = {
LSM_LOG_PAD2, 0x04, 0x00, 0x00, 0x00, 0x00, LSM_LOG_JUMP, 0x00
};
lsmStringBinAppend(&pNew->buf, aJump, sizeof(aJump));
logUpdateCksum(pNew, pNew->buf.n);
rc = lsmFsWriteLog(pDb->pFS, aReg[2].iEnd, &pNew->buf);
pNew->iCksumBuf = pNew->buf.n = 0;
aReg[2].iEnd += 8;
pNew->jump = aReg[0] = aReg[2];
aReg[2].iStart = aReg[2].iEnd = 0;
}else if( aReg[1].iEnd==0 && aReg[2].iEnd<aReg[0].iEnd ){
pNew->iOff = aReg[2].iEnd;
pNew->jump = aReg[0];
}else{
assert( aReg[2].iStart>=aReg[0].iEnd && aReg[2].iStart>=aReg[1].iEnd );
pNew->iOff = aReg[2].iEnd;
}
if( pNew->jump.iStart ){
i64 iRound;
assert( pNew->jump.iStart>pNew->iOff );
iRound = firstByteOnSector(pNew, pNew->jump.iStart);
if( iRound>pNew->iOff ) pNew->jump.iStart = iRound;
pNew->jump.iEnd = lastByteOnSector(pNew, pNew->jump.iEnd);
}
assert( pDb->pLogWriter==pNew );
return rc;
}
void lsmLogEnd(lsm_db *pDb, int bCommit){
DbLog *pLog;
LogWriter *p;
p = pDb->pLogWriter;
if( p==0 ) return;
pLog = &pDb->treehdr.log;
if( bCommit ){
pLog->aRegion[2].iEnd = p->iOff;
pLog->cksum0 = p->cksum0;
pLog->cksum1 = p->cksum1;
if( p->iRegion1End ){
assert( pLog->aRegion[1].iEnd==0 );
assert( pLog->aRegion[2].iStart<p->iRegion1End );
pLog->aRegion[1].iStart = pLog->aRegion[2].iStart;
pLog->aRegion[1].iEnd = p->iRegion1End;
pLog->aRegion[2].iStart = p->iRegion2Start;
}
}
}
static int jumpIfRequired(
lsm_db *pDb,
LogWriter *pLog,
int nReq,
int *pbJump
){
if( (pLog->jump.iStart > (pLog->iOff + pLog->buf.n))
&& (pLog->jump.iStart < (pLog->iOff + pLog->buf.n + (nReq + 17)))
){
int rc;
i64 iJump;
u8 aJump[10];
int nJump;
int nPad;
iJump = pLog->jump.iEnd+1;
aJump[0] = LSM_LOG_JUMP;
nJump = 1 + lsmVarintPut64(&aJump[1], iJump);
nPad = (pLog->buf.n + nJump) % 8;
if( nPad ){
u8 aPad[7] = {0,0,0,0,0,0,0};
nPad = 8-nPad;
if( nPad==1 ){
aPad[0] = LSM_LOG_PAD1;
}else{
aPad[0] = LSM_LOG_PAD2;
aPad[1] = (u8)(nPad-2);
}
rc = lsmStringBinAppend(&pLog->buf, aPad, nPad);
if( rc!=LSM_OK ) return rc;
}
rc = lsmStringBinAppend(&pLog->buf, aJump, nJump);
if( rc!=LSM_OK ) return rc;
assert( (pLog->buf.n % 8)==0 );
rc = lsmFsWriteLog(pDb->pFS, pLog->iOff, &pLog->buf);
if( rc!=LSM_OK ) return rc;
logUpdateCksum(pLog, pLog->buf.n);
pLog->iRegion1End = (pLog->iOff + pLog->buf.n);
pLog->iRegion2Start = iJump;
pLog->iOff = iJump;
pLog->iCksumBuf = pLog->buf.n = 0;
if( pbJump ) *pbJump = 1;
}
return LSM_OK;
}
static int logCksumAndFlush(lsm_db *pDb){
int rc;
LogWriter *pLog = pDb->pLogWriter;
logUpdateCksum(pLog, pLog->buf.n);
lsmPutU32((u8 *)&pLog->buf.z[pLog->buf.n], pLog->cksum0);
pLog->buf.n += 4;
lsmPutU32((u8 *)&pLog->buf.z[pLog->buf.n], pLog->cksum1);
pLog->buf.n += 4;
rc = lsmFsWriteLog(pDb->pFS, pLog->iOff, &pLog->buf);
pLog->iOff += pLog->buf.n;
pLog->iCksumBuf = pLog->buf.n = 0;
return rc;
}
static int logFlush(lsm_db *pDb, int eType){
int rc;
int nReq;
LogWriter *pLog = pDb->pLogWriter;
assert( eType==LSM_LOG_COMMIT );
assert( pLog );
nReq = 9;
if( eType==LSM_LOG_COMMIT && pLog->szSector>1 ) nReq += pLog->szSector + 17;
rc = jumpIfRequired(pDb, pLog, nReq, 0);
if( eType==LSM_LOG_COMMIT && pLog->szSector>1 ){
int nPad;
nPad = ((pLog->iOff + pLog->buf.n + 9) % pLog->szSector);
if( nPad ) nPad = pLog->szSector - nPad;
rc = lsmStringExtend(&pLog->buf, nPad);
if( rc!=LSM_OK ) return rc;
while( nPad ){
if( nPad==1 ){
pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD1;
nPad = 0;
}else{
int n = LSM_MIN(200, nPad-2);
pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD2;
pLog->buf.z[pLog->buf.n++] = (char)n;
nPad -= 2;
memset(&pLog->buf.z[pLog->buf.n], 0x2B, n);
pLog->buf.n += n;
nPad -= n;
}
}
}
rc = lsmStringExtend(&pLog->buf, 9);
if( rc!=LSM_OK ) return rc;
pLog->buf.z[pLog->buf.n++] = (char)eType;
memset(&pLog->buf.z[pLog->buf.n], 0, 8);
rc = logCksumAndFlush(pDb);
if( rc==LSM_OK && eType==LSM_LOG_COMMIT && pDb->eSafety==LSM_SAFETY_FULL ){
rc = lsmFsSyncLog(pDb->pFS);
}
return rc;
}
int lsmLogWrite(
lsm_db *pDb,
int eType,
void *pKey, int nKey,
void *pVal, int nVal
){
int rc = LSM_OK;
LogWriter *pLog;
int nReq;
int bCksum = 0;
assert( eType==LSM_WRITE || eType==LSM_DELETE || eType==LSM_DRANGE );
assert( LSM_LOG_WRITE==LSM_WRITE );
assert( LSM_LOG_DELETE==LSM_DELETE );
assert( LSM_LOG_DRANGE==LSM_DRANGE );
assert( (eType==LSM_LOG_DELETE)==(nVal<0) );
if( pDb->bUseLog==0 ) return LSM_OK;
pLog = pDb->pLogWriter;
nReq = 1 + lsmVarintLen32(nKey) + 8 + nKey;
if( eType!=LSM_LOG_DELETE ) nReq += lsmVarintLen32(nVal) + nVal;
rc = jumpIfRequired(pDb, pLog, nReq, &bCksum);
if( (pLog->buf.n+nReq) > LSM_CKSUM_MAXDATA ) bCksum = 1;
if( rc==LSM_OK ){
rc = lsmStringExtend(&pLog->buf, nReq);
}
if( rc==LSM_OK ){
u8 *a = (u8 *)&pLog->buf.z[pLog->buf.n];
assert( LSM_LOG_WRITE_CKSUM == (LSM_LOG_WRITE | 0x0001) );
assert( LSM_LOG_DELETE_CKSUM == (LSM_LOG_DELETE | 0x0001) );
assert( LSM_LOG_DRANGE_CKSUM == (LSM_LOG_DRANGE | 0x0001) );
*(a++) = (u8)eType | (u8)bCksum;
a += lsmVarintPut32(a, nKey);
if( eType!=LSM_LOG_DELETE ) a += lsmVarintPut32(a, nVal);
if( bCksum ){
pLog->buf.n = (a - (u8 *)pLog->buf.z);
rc = logCksumAndFlush(pDb);
a = (u8 *)&pLog->buf.z[pLog->buf.n];
}
memcpy(a, pKey, nKey);
a += nKey;
if( eType!=LSM_LOG_DELETE ){
memcpy(a, pVal, nVal);
a += nVal;
}
pLog->buf.n = a - (u8 *)pLog->buf.z;
assert( pLog->buf.n<=pLog->buf.nAlloc );
}
return rc;
}
int lsmLogCommit(lsm_db *pDb){
if( pDb->bUseLog==0 ) return LSM_OK;
return logFlush(pDb, LSM_LOG_COMMIT);
}
void lsmLogTell(
lsm_db *pDb,
LogMark *pMark
){
LogWriter *pLog;
int nCksum;
if( pDb->bUseLog==0 ) return;
pLog = pDb->pLogWriter;
nCksum = pLog->buf.n & 0xFFFFFFF8;
logUpdateCksum(pLog, nCksum);
assert( pLog->iCksumBuf==nCksum );
pMark->nBuf = pLog->buf.n - nCksum;
memcpy(pMark->aBuf, &pLog->buf.z[nCksum], pMark->nBuf);
pMark->iOff = pLog->iOff + pLog->buf.n;
pMark->cksum0 = pLog->cksum0;
pMark->cksum1 = pLog->cksum1;
}
void lsmLogSeek(
lsm_db *pDb,
LogMark *pMark
){
LogWriter *pLog;
if( pDb->bUseLog==0 ) return;
pLog = pDb->pLogWriter;
assert( pMark->iOff<=pLog->iOff+pLog->buf.n );
if( (pMark->iOff & 0xFFFFFFF8)>=pLog->iOff ){
pLog->buf.n = (int)(pMark->iOff - pLog->iOff);
pLog->iCksumBuf = (pLog->buf.n & 0xFFFFFFF8);
}else{
pLog->buf.n = pMark->nBuf;
memcpy(pLog->buf.z, pMark->aBuf, pMark->nBuf);
pLog->iCksumBuf = 0;
pLog->iOff = pMark->iOff - pMark->nBuf;
}
pLog->cksum0 = pMark->cksum0;
pLog->cksum1 = pMark->cksum1;
if( pMark->iOff > pLog->iRegion1End ) pLog->iRegion1End = 0;
if( pMark->iOff > pLog->iRegion2Start ) pLog->iRegion2Start = 0;
}
int lsmInfoLogStructure(lsm_db *pDb, char **pzVal){
int rc = LSM_OK;
char *zVal = 0;
if( pDb->pCsr==0 && pDb->nTransOpen==0 ){
rc = lsmTreeLoadHeader(pDb, 0);
if( rc==LSM_OK ) rc = logReclaimSpace(pDb);
}
if( rc==LSM_OK ){
DbLog *pLog = &pDb->treehdr.log;
zVal = lsmMallocPrintf(pDb->pEnv,
"%d %d %d %d %d %d",
(int)pLog->aRegion[0].iStart, (int)pLog->aRegion[0].iEnd,
(int)pLog->aRegion[1].iStart, (int)pLog->aRegion[1].iEnd,
(int)pLog->aRegion[2].iStart, (int)pLog->aRegion[2].iEnd
);
if( !zVal ) rc = LSM_NOMEM_BKPT;
}
*pzVal = zVal;
return rc;
}
typedef struct LogReader LogReader;
struct LogReader {
FileSystem *pFS;
i64 iOff;
int iBuf;
LsmString buf;
int iCksumBuf;
u32 cksum0;
u32 cksum1;
};
static void logReaderBlob(
LogReader *p,
LsmString *pBuf,
int nBlob,
u8 **ppBlob,
int *pRc
){
static const int LOG_READ_SIZE = 512;
int rc = *pRc;
int nReq = nBlob;
while( rc==LSM_OK && nReq>0 ){
int nAvail;
if( p->buf.n==p->iBuf ){
int nCksum;
int nCarry = 0;
nCksum = p->iBuf - p->iCksumBuf;
if( nCksum>0 ){
nCarry = nCksum % 8;
nCksum = ((nCksum / 8) * 8);
if( nCksum>0 ){
logCksumUnaligned(
&p->buf.z[p->iCksumBuf], nCksum, &p->cksum0, &p->cksum1
);
}
}
if( nCarry>0 ) memcpy(p->buf.z, &p->buf.z[p->iBuf-nCarry], nCarry);
p->buf.n = nCarry;
p->iBuf = nCarry;
rc = lsmFsReadLog(p->pFS, p->iOff, LOG_READ_SIZE, &p->buf);
if( rc!=LSM_OK ) break;
p->iCksumBuf = 0;
p->iOff += LOG_READ_SIZE;
}
nAvail = p->buf.n - p->iBuf;
if( ppBlob && nReq==nBlob && nBlob<=nAvail ){
*ppBlob = (u8 *)&p->buf.z[p->iBuf];
p->iBuf += nBlob;
nReq = 0;
}else{
int nCopy = LSM_MIN(nAvail, nReq);
if( nBlob==nReq ){
pBuf->n = 0;
}
rc = lsmStringBinAppend(pBuf, (u8 *)&p->buf.z[p->iBuf], nCopy);
nReq -= nCopy;
p->iBuf += nCopy;
if( nReq==0 && ppBlob ){
*ppBlob = (u8*)pBuf->z;
}
}
}
*pRc = rc;
}
static void logReaderVarint(
LogReader *p,
LsmString *pBuf,
int *piVal,
int *pRc
){
if( *pRc==LSM_OK ){
u8 *aVarint;
if( p->buf.n==p->iBuf ){
logReaderBlob(p, 0, 10, &aVarint, pRc);
if( LSM_OK==*pRc ) p->iBuf -= (10 - lsmVarintGet32(aVarint, piVal));
}else{
logReaderBlob(p, pBuf, lsmVarintSize(p->buf.z[p->iBuf]), &aVarint, pRc);
if( LSM_OK==*pRc ) lsmVarintGet32(aVarint, piVal);
}
}
}
static void logReaderByte(LogReader *p, u8 *pByte, int *pRc){
u8 *pPtr = 0;
logReaderBlob(p, 0, 1, &pPtr, pRc);
if( pPtr ) *pByte = *pPtr;
}
static void logReaderCksum(LogReader *p, LsmString *pBuf, int *pbEof, int *pRc){
if( *pRc==LSM_OK ){
u8 *pPtr = 0;
u32 cksum0, cksum1;
int nCksum = p->iBuf - p->iCksumBuf;
assert( nCksum>=0 );
logCksumUnaligned(&p->buf.z[p->iCksumBuf], nCksum, &p->cksum0, &p->cksum1);
p->iCksumBuf = p->iBuf + 8;
logReaderBlob(p, pBuf, 8, &pPtr, pRc);
assert( pPtr || *pRc );
if( pPtr ){
cksum0 = lsmGetU32(pPtr);
cksum1 = lsmGetU32(&pPtr[4]);
*pbEof = (cksum0!=p->cksum0 || cksum1!=p->cksum1);
p->iCksumBuf = p->iBuf;
}
}
}
static void logReaderInit(
lsm_db *pDb,
DbLog *pLog,
int bInitBuf,
LogReader *p
){
p->pFS = pDb->pFS;
p->iOff = pLog->aRegion[2].iStart;
p->cksum0 = pLog->cksum0;
p->cksum1 = pLog->cksum1;
if( bInitBuf ){ lsmStringInit(&p->buf, pDb->pEnv); }
p->buf.n = 0;
p->iCksumBuf = 0;
p->iBuf = 0;
}
static int logRequireCksum(LogReader *p, int nByte){
return ((p->iBuf + nByte - p->iCksumBuf) > LSM_CKSUM_MAXDATA);
}
int lsmLogRecover(lsm_db *pDb){
LsmString buf1;
LsmString buf2;
LogReader reader;
int rc = LSM_OK;
int nCommit = 0;
int iPass;
int nJump = 0;
DbLog *pLog;
int bOpen;
rc = lsmFsOpenLog(pDb, &bOpen);
if( rc!=LSM_OK ) return rc;
rc = lsmTreeInit(pDb);
if( rc!=LSM_OK ) return rc;
pLog = &pDb->treehdr.log;
lsmCheckpointLogoffset(pDb->pShmhdr->aSnap2, pLog);
logReaderInit(pDb, pLog, 1, &reader);
lsmStringInit(&buf1, pDb->pEnv);
lsmStringInit(&buf2, pDb->pEnv);
if( bOpen ){
for(iPass=0; iPass<2 && rc==LSM_OK; iPass++){
int bEof = 0;
while( rc==LSM_OK && !bEof ){
u8 eType = 0;
logReaderByte(&reader, &eType, &rc);
switch( eType ){
case LSM_LOG_PAD1:
break;
case LSM_LOG_PAD2: {
int nPad;
logReaderVarint(&reader, &buf1, &nPad, &rc);
logReaderBlob(&reader, &buf1, nPad, 0, &rc);
break;
}
case LSM_LOG_DRANGE:
case LSM_LOG_DRANGE_CKSUM:
case LSM_LOG_WRITE:
case LSM_LOG_WRITE_CKSUM: {
int nKey;
int nVal;
u8 *aVal;
logReaderVarint(&reader, &buf1, &nKey, &rc);
logReaderVarint(&reader, &buf2, &nVal, &rc);
if( eType==LSM_LOG_WRITE_CKSUM || eType==LSM_LOG_DRANGE_CKSUM ){
logReaderCksum(&reader, &buf1, &bEof, &rc);
}else{
bEof = logRequireCksum(&reader, nKey+nVal);
}
if( bEof ) break;
logReaderBlob(&reader, &buf1, nKey, 0, &rc);
logReaderBlob(&reader, &buf2, nVal, &aVal, &rc);
if( iPass==1 && rc==LSM_OK ){
if( eType==LSM_LOG_WRITE || eType==LSM_LOG_WRITE_CKSUM ){
rc = lsmTreeInsert(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
}else{
rc = lsmTreeDelete(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
}
}
break;
}
case LSM_LOG_DELETE:
case LSM_LOG_DELETE_CKSUM: {
int nKey; u8 *aKey;
logReaderVarint(&reader, &buf1, &nKey, &rc);
if( eType==LSM_LOG_DELETE_CKSUM ){
logReaderCksum(&reader, &buf1, &bEof, &rc);
}else{
bEof = logRequireCksum(&reader, nKey);
}
if( bEof ) break;
logReaderBlob(&reader, &buf1, nKey, &aKey, &rc);
if( iPass==1 && rc==LSM_OK ){
rc = lsmTreeInsert(pDb, aKey, nKey, NULL, -1);
}
break;
}
case LSM_LOG_COMMIT:
logReaderCksum(&reader, &buf1, &bEof, &rc);
if( bEof==0 ){
nCommit++;
assert( nCommit>0 || iPass==1 );
if( nCommit==0 ) bEof = 1;
}
break;
case LSM_LOG_JUMP: {
int iOff = 0;
logReaderVarint(&reader, &buf1, &iOff, &rc);
if( rc==LSM_OK ){
if( iPass==1 ){
if( pLog->aRegion[2].iStart==0 ){
assert( pLog->aRegion[1].iStart==0 );
pLog->aRegion[1].iEnd = reader.iOff;
}else{
assert( pLog->aRegion[0].iStart==0 );
pLog->aRegion[0].iStart = pLog->aRegion[2].iStart;
pLog->aRegion[0].iEnd = reader.iOff-reader.buf.n+reader.iBuf;
}
pLog->aRegion[2].iStart = iOff;
}else{
if( (nJump++)==2 ){
bEof = 1;
}
}
reader.iOff = iOff;
reader.buf.n = reader.iBuf;
}
break;
}
default:
bEof = 1;
break;
}
}
if( rc==LSM_OK && iPass==0 ){
if( nCommit==0 ){
if( pLog->aRegion[2].iStart==0 ){
iPass = 1;
}else{
pLog->aRegion[2].iStart = 0;
iPass = -1;
lsmCheckpointZeroLogoffset(pDb);
}
}
logReaderInit(pDb, pLog, 0, &reader);
nCommit = nCommit * -1;
}
}
}
if( rc==LSM_OK ){
pLog->aRegion[2].iEnd = reader.iOff - reader.buf.n + reader.iBuf;
pLog->cksum0 = reader.cksum0;
pLog->cksum1 = reader.cksum1;
}
if( rc==LSM_OK ){
rc = lsmFinishRecovery(pDb);
}else{
lsmFinishRecovery(pDb);
}
if( pDb->bRoTrans ){
lsmFsCloseLog(pDb);
}
lsmStringClear(&buf1);
lsmStringClear(&buf2);
lsmStringClear(&reader.buf);
return rc;
}
void lsmLogClose(lsm_db *db){
if( db->pLogWriter ){
lsmFree(db->pEnv, db->pLogWriter->buf.z);
lsmFree(db->pEnv, db->pLogWriter);
db->pLogWriter = 0;
}
}