#include "lsmtest_tdb.h"
#include "lsm.h"
#include "lsmtest.h"
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#ifndef _WIN32
# include <unistd.h>
#endif
#include <stdio.h>
#ifndef _WIN32
# include <sys/time.h>
#endif
typedef struct LsmDb LsmDb;
typedef struct LsmWorker LsmWorker;
typedef struct LsmFile LsmFile;
#define LSMTEST_DFLT_MT_MAX_CKPT (8*1024)
#define LSMTEST_DFLT_MT_MIN_CKPT (2*1024)
#ifdef LSM_MUTEX_PTHREADS
#include <pthread.h>
#define LSMTEST_THREAD_CKPT 1
#define LSMTEST_THREAD_WORKER 2
#define LSMTEST_THREAD_WORKER_AC 3
struct LsmWorker {
LsmDb *pDb;
lsm_db *pWorker;
pthread_t worker_thread;
pthread_cond_t worker_cond;
pthread_mutex_t worker_mutex;
int bDoWork;
int worker_rc;
int eType;
int bBlock;
};
#else
struct LsmWorker { int worker_rc; int bBlock; };
#endif
static void mt_shutdown(LsmDb *);
lsm_env *tdb_lsm_env(void){
static int bInit = 0;
static lsm_env env;
if( bInit==0 ){
memcpy(&env, lsm_default_env(), sizeof(env));
bInit = 1;
}
return &env;
}
typedef struct FileSector FileSector;
typedef struct FileData FileData;
struct FileSector {
u8 *aOld;
};
struct FileData {
int nSector;
FileSector *aSector;
};
struct LsmDb {
TestDb base;
lsm_env env;
char *zName;
lsm_db *db;
lsm_cursor *pCsr;
void *pBuf;
int nBuf;
int bCrashed;
int nAutoCrash;
int bPrepareCrash;
int szSector;
FileData aFile[2];
int bNoRecovery;
void (*xWork)(lsm_db *, void *);
void *pWorkCtx;
void (*xWriteHook)(void *, int, lsm_i64, int, int);
void *pWriteCtx;
int nMtMinCkpt;
int nMtMaxCkpt;
int eMode;
int nWorker;
LsmWorker *aWorker;
};
#define LSMTEST_MODE_SINGLETHREAD 1
#define LSMTEST_MODE_BACKGROUND_CKPT 2
#define LSMTEST_MODE_BACKGROUND_WORK 3
#define LSMTEST_MODE_BACKGROUND_BOTH 4
struct LsmFile {
lsm_file *pReal;
int bLog;
LsmDb *pDb;
};
static int testEnvFullpath(
lsm_env *pEnv,
const char *zFile,
char *zOut,
int *pnOut
){
lsm_env *pRealEnv = tdb_lsm_env();
return pRealEnv->xFullpath(pRealEnv, zFile, zOut, pnOut);
}
static int testEnvOpen(
lsm_env *pEnv,
const char *zFile,
int flags,
lsm_file **ppFile
){
lsm_env *pRealEnv = tdb_lsm_env();
LsmDb *pDb = (LsmDb *)pEnv->pVfsCtx;
int rc;
LsmFile *pRet;
int nFile;
nFile = strlen(zFile);
pRet = (LsmFile *)testMalloc(sizeof(LsmFile));
pRet->pDb = pDb;
pRet->bLog = (nFile > 4 && 0==memcmp("-log", &zFile[nFile-4], 4));
rc = pRealEnv->xOpen(pRealEnv, zFile, flags, &pRet->pReal);
if( rc!=LSM_OK ){
testFree(pRet);
pRet = 0;
}
*ppFile = (lsm_file *)pRet;
return rc;
}
static int testEnvRead(lsm_file *pFile, lsm_i64 iOff, void *pData, int nData){
lsm_env *pRealEnv = tdb_lsm_env();
LsmFile *p = (LsmFile *)pFile;
if( p->pDb->bCrashed ) return LSM_IOERR;
return pRealEnv->xRead(p->pReal, iOff, pData, nData);
}
static int testEnvWrite(lsm_file *pFile, lsm_i64 iOff, void *pData, int nData){
lsm_env *pRealEnv = tdb_lsm_env();
LsmFile *p = (LsmFile *)pFile;
LsmDb *pDb = p->pDb;
if( pDb->bCrashed ) return LSM_IOERR;
if( pDb->bPrepareCrash ){
FileData *pData2 = &pDb->aFile[p->bLog];
int iFirst;
int iLast;
int iSector;
iFirst = (int)(iOff / pDb->szSector);
iLast = (int)((iOff + nData - 1) / pDb->szSector);
if( pData2->nSector<(iLast+1) ){
int nNew = ( ((iLast + 1) + 63) / 64 ) * 64;
assert( nNew>iLast );
pData2->aSector = (FileSector *)testRealloc(
pData2->aSector, nNew*sizeof(FileSector)
);
memset(&pData2->aSector[pData2->nSector],
0, (nNew - pData2->nSector) * sizeof(FileSector)
);
pData2->nSector = nNew;
}
for(iSector=iFirst; iSector<=iLast; iSector++){
if( pData2->aSector[iSector].aOld==0 ){
u8 *aOld = (u8 *)testMalloc(pDb->szSector);
pRealEnv->xRead(
p->pReal, (lsm_i64)iSector*pDb->szSector, aOld, pDb->szSector
);
pData2->aSector[iSector].aOld = aOld;
}
}
}
if( pDb->xWriteHook ){
int rc;
int nUs;
struct timeval t1;
struct timeval t2;
gettimeofday(&t1, 0);
assert( nData>0 );
rc = pRealEnv->xWrite(p->pReal, iOff, pData, nData);
gettimeofday(&t2, 0);
nUs = (t2.tv_sec - t1.tv_sec) * 1000000 + (t2.tv_usec - t1.tv_usec);
pDb->xWriteHook(pDb->pWriteCtx, p->bLog, iOff, nData, nUs);
return rc;
}
return pRealEnv->xWrite(p->pReal, iOff, pData, nData);
}
static void doSystemCrash(LsmDb *pDb);
static int testEnvSync(lsm_file *pFile){
lsm_env *pRealEnv = tdb_lsm_env();
LsmFile *p = (LsmFile *)pFile;
LsmDb *pDb = p->pDb;
FileData *pData = &pDb->aFile[p->bLog];
int i;
if( pDb->bCrashed ) return LSM_IOERR;
if( pDb->nAutoCrash ){
pDb->nAutoCrash--;
if( pDb->nAutoCrash==0 ){
doSystemCrash(pDb);
pDb->bCrashed = 1;
return LSM_IOERR;
}
}
if( pDb->bPrepareCrash ){
for(i=0; i<pData->nSector; i++){
testFree(pData->aSector[i].aOld);
pData->aSector[i].aOld = 0;
}
}
if( pDb->xWriteHook ){
int rc;
int nUs;
struct timeval t1;
struct timeval t2;
gettimeofday(&t1, 0);
rc = pRealEnv->xSync(p->pReal);
gettimeofday(&t2, 0);
nUs = (t2.tv_sec - t1.tv_sec) * 1000000 + (t2.tv_usec - t1.tv_usec);
pDb->xWriteHook(pDb->pWriteCtx, p->bLog, 0, 0, nUs);
return rc;
}
return pRealEnv->xSync(p->pReal);
}
static int testEnvTruncate(lsm_file *pFile, lsm_i64 iOff){
lsm_env *pRealEnv = tdb_lsm_env();
LsmFile *p = (LsmFile *)pFile;
if( p->pDb->bCrashed ) return LSM_IOERR;
return pRealEnv->xTruncate(p->pReal, iOff);
}
static int testEnvSectorSize(lsm_file *pFile){
lsm_env *pRealEnv = tdb_lsm_env();
LsmFile *p = (LsmFile *)pFile;
return pRealEnv->xSectorSize(p->pReal);
}
static int testEnvRemap(
lsm_file *pFile,
lsm_i64 iMin,
void **ppOut,
lsm_i64 *pnOut
){
lsm_env *pRealEnv = tdb_lsm_env();
LsmFile *p = (LsmFile *)pFile;
return pRealEnv->xRemap(p->pReal, iMin, ppOut, pnOut);
}
static int testEnvFileid(
lsm_file *pFile,
void *ppOut,
int *pnOut
){
lsm_env *pRealEnv = tdb_lsm_env();
LsmFile *p = (LsmFile *)pFile;
return pRealEnv->xFileid(p->pReal, ppOut, pnOut);
}
static int testEnvClose(lsm_file *pFile){
lsm_env *pRealEnv = tdb_lsm_env();
LsmFile *p = (LsmFile *)pFile;
pRealEnv->xClose(p->pReal);
testFree(p);
return LSM_OK;
}
static int testEnvUnlink(lsm_env *pEnv, const char *zFile){
lsm_env *pRealEnv = tdb_lsm_env();
unused_parameter(pEnv);
return pRealEnv->xUnlink(pRealEnv, zFile);
}
static int testEnvLock(lsm_file *pFile, int iLock, int eType){
LsmFile *p = (LsmFile *)pFile;
lsm_env *pRealEnv = tdb_lsm_env();
if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){
return LSM_BUSY;
}
return pRealEnv->xLock(p->pReal, iLock, eType);
}
static int testEnvTestLock(lsm_file *pFile, int iLock, int nLock, int eType){
LsmFile *p = (LsmFile *)pFile;
lsm_env *pRealEnv = tdb_lsm_env();
if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){
return LSM_BUSY;
}
return pRealEnv->xTestLock(p->pReal, iLock, nLock, eType);
}
static int testEnvShmMap(lsm_file *pFile, int iRegion, int sz, void **pp){
LsmFile *p = (LsmFile *)pFile;
lsm_env *pRealEnv = tdb_lsm_env();
return pRealEnv->xShmMap(p->pReal, iRegion, sz, pp);
}
static void testEnvShmBarrier(void){
}
static int testEnvShmUnmap(lsm_file *pFile, int bDel){
LsmFile *p = (LsmFile *)pFile;
lsm_env *pRealEnv = tdb_lsm_env();
return pRealEnv->xShmUnmap(p->pReal, bDel);
}
static int testEnvSleep(lsm_env *pEnv, int us){
lsm_env *pRealEnv = tdb_lsm_env();
return pRealEnv->xSleep(pRealEnv, us);
}
static void doSystemCrash(LsmDb *pDb){
lsm_env *pEnv = tdb_lsm_env();
int iFile;
int iSeed = pDb->aFile[0].nSector + pDb->aFile[1].nSector;
char *zFile = pDb->zName;
char *zFree = 0;
for(iFile=0; iFile<2; iFile++){
lsm_file *pFile = 0;
int i;
pEnv->xOpen(pEnv, zFile, 0, &pFile);
for(i=0; i<pDb->aFile[iFile].nSector; i++){
u8 *aOld = pDb->aFile[iFile].aSector[i].aOld;
if( aOld ){
int iOpt = testPrngValue(iSeed++) % 3;
switch( iOpt ){
case 0:
break;
case 1:
testPrngArray(iSeed++, (u32 *)aOld, pDb->szSector/4);
case 2:
pEnv->xWrite(
pFile, (lsm_i64)i * pDb->szSector, aOld, pDb->szSector
);
break;
}
testFree(aOld);
pDb->aFile[iFile].aSector[i].aOld = 0;
}
}
pEnv->xClose(pFile);
zFree = zFile = sqlite3_mprintf("%s-log", pDb->zName);
}
sqlite3_free(zFree);
}
#ifdef HAVE_ZLIB
#include <zlib.h>
static int testZipBound(void *pCtx, int nSrc){
return compressBound(nSrc);
}
static int testZipCompress(
void *pCtx,
char *aOut, int *pnOut,
const char *aIn, int nIn
){
uLongf n = *pnOut;
int rc;
rc = compress((Bytef*)aOut, &n, (Bytef*)aIn, nIn);
*pnOut = n;
return (rc==Z_OK ? 0 : LSM_ERROR);
}
static int testZipUncompress(
void *pCtx,
char *aOut, int *pnOut,
const char *aIn, int nIn
){
uLongf n = *pnOut;
int rc;
rc = uncompress((Bytef*)aOut, &n, (Bytef*)aIn, nIn);
*pnOut = n;
return (rc==Z_OK ? 0 : LSM_ERROR);
}
static int testConfigureCompression(lsm_db *pDb){
static lsm_compress zip = {
0,
1,
testZipBound,
testZipCompress,
testZipUncompress
};
return lsm_config(pDb, LSM_CONFIG_SET_COMPRESSION, &zip);
}
#endif
static int test_lsm_close(TestDb *pTestDb){
int i;
int rc = LSM_OK;
LsmDb *pDb = (LsmDb *)pTestDb;
lsm_csr_close(pDb->pCsr);
lsm_close(pDb->db);
mt_shutdown(pDb);
for(i=0; i<pDb->nWorker && rc==LSM_OK; i++){
rc = pDb->aWorker[i].worker_rc;
}
for(i=0; i<pDb->aFile[0].nSector; i++){
testFree(pDb->aFile[0].aSector[i].aOld);
}
testFree(pDb->aFile[0].aSector);
for(i=0; i<pDb->aFile[1].nSector; i++){
testFree(pDb->aFile[1].aSector[i].aOld);
}
testFree(pDb->aFile[1].aSector);
memset(pDb, sizeof(LsmDb), 0x11);
testFree((char *)pDb->pBuf);
testFree((char *)pDb);
return rc;
}
static void mt_signal_worker(LsmDb*, int);
static int waitOnCheckpointer(LsmDb *pDb, lsm_db *db){
int nSleep = 0;
int nKB;
int rc;
do {
nKB = 0;
rc = lsm_info(db, LSM_INFO_CHECKPOINT_SIZE, &nKB);
if( rc!=LSM_OK || nKB<pDb->nMtMaxCkpt ) break;
#ifdef LSM_MUTEX_PTHREADS
mt_signal_worker(pDb,
(pDb->eMode==LSMTEST_MODE_BACKGROUND_CKPT ? 0 : 1)
);
#endif
usleep(5000);
nSleep += 5;
}while( 1 );
#if 0#endif
return rc;
}
static int waitOnWorker(LsmDb *pDb){
int rc;
int nLimit = -1;
int nSleep = 0;
rc = lsm_config(pDb->db, LSM_CONFIG_AUTOFLUSH, &nLimit);
do {
int nOld, nNew, rc2;
rc2 = lsm_info(pDb->db, LSM_INFO_TREE_SIZE, &nOld, &nNew);
if( rc2!=LSM_OK ) return rc2;
if( nOld==0 || nNew<(nLimit/2) ) break;
#ifdef LSM_MUTEX_PTHREADS
mt_signal_worker(pDb, 0);
#endif
usleep(5000);
nSleep += 5;
}while( 1 );
#if 0#endif
return rc;
}
static int test_lsm_write(
TestDb *pTestDb,
void *pKey,
int nKey,
void *pVal,
int nVal
){
LsmDb *pDb = (LsmDb *)pTestDb;
int rc = LSM_OK;
if( pDb->eMode==LSMTEST_MODE_BACKGROUND_CKPT ){
rc = waitOnCheckpointer(pDb, pDb->db);
}else if(
pDb->eMode==LSMTEST_MODE_BACKGROUND_WORK
|| pDb->eMode==LSMTEST_MODE_BACKGROUND_BOTH
){
rc = waitOnWorker(pDb);
}
if( rc==LSM_OK ){
rc = lsm_insert(pDb->db, pKey, nKey, pVal, nVal);
}
return rc;
}
static int test_lsm_delete(TestDb *pTestDb, void *pKey, int nKey){
LsmDb *pDb = (LsmDb *)pTestDb;
return lsm_delete(pDb->db, pKey, nKey);
}
static int test_lsm_delete_range(
TestDb *pTestDb,
void *pKey1, int nKey1,
void *pKey2, int nKey2
){
LsmDb *pDb = (LsmDb *)pTestDb;
return lsm_delete_range(pDb->db, pKey1, nKey1, pKey2, nKey2);
}
static int test_lsm_fetch(
TestDb *pTestDb,
void *pKey,
int nKey,
void **ppVal,
int *pnVal
){
int rc;
LsmDb *pDb = (LsmDb *)pTestDb;
lsm_cursor *csr;
if( pKey==0 ) return LSM_OK;
if( pDb->pCsr==0 ){
rc = lsm_csr_open(pDb->db, &csr);
if( rc!=LSM_OK ) return rc;
}else{
csr = pDb->pCsr;
}
rc = lsm_csr_seek(csr, pKey, nKey, LSM_SEEK_EQ);
if( rc==LSM_OK ){
if( lsm_csr_valid(csr) ){
const void *pVal; int nVal;
rc = lsm_csr_value(csr, &pVal, &nVal);
if( nVal>pDb->nBuf ){
testFree(pDb->pBuf);
pDb->pBuf = testMalloc(nVal*2);
pDb->nBuf = nVal*2;
}
memcpy(pDb->pBuf, pVal, nVal);
*ppVal = pDb->pBuf;
*pnVal = nVal;
}else{
*ppVal = 0;
*pnVal = -1;
}
}
if( pDb->pCsr==0 ){
lsm_csr_close(csr);
}
return rc;
}
static int test_lsm_scan(
TestDb *pTestDb,
void *pCtx,
int bReverse,
void *pFirst, int nFirst,
void *pLast, int nLast,
void (*xCallback)(void *, void *, int , void *, int)
){
LsmDb *pDb = (LsmDb *)pTestDb;
lsm_cursor *csr;
lsm_cursor *csr2 = 0;
int rc;
if( pDb->pCsr==0 ){
rc = lsm_csr_open(pDb->db, &csr);
if( rc!=LSM_OK ) return rc;
}else{
rc = LSM_OK;
csr = pDb->pCsr;
}
if( pLast && pFirst ){
if( bReverse ){
rc = lsm_csr_seek(csr, pFirst, nFirst, LSM_SEEK_LE);
}else{
rc = lsm_csr_seek(csr, pLast, nLast, LSM_SEEK_GE);
}
}
if( bReverse ){
if( pLast ){
rc = lsm_csr_seek(csr, pLast, nLast, LSM_SEEK_LE);
}else{
rc = lsm_csr_last(csr);
}
}else{
if( pFirst ){
rc = lsm_csr_seek(csr, pFirst, nFirst, LSM_SEEK_GE);
}else{
rc = lsm_csr_first(csr);
}
}
while( rc==LSM_OK && lsm_csr_valid(csr) ){
const void *pKey; int nKey;
const void *pVal; int nVal;
int cmp;
lsm_csr_key(csr, &pKey, &nKey);
lsm_csr_value(csr, &pVal, &nVal);
if( bReverse && pFirst ){
cmp = memcmp(pFirst, pKey, MIN(nKey, nFirst));
if( cmp>0 || (cmp==0 && nFirst>nKey) ) break;
}else if( bReverse==0 && pLast ){
cmp = memcmp(pLast, pKey, MIN(nKey, nLast));
if( cmp<0 || (cmp==0 && nLast<nKey) ) break;
}
xCallback(pCtx, (void *)pKey, nKey, (void *)pVal, nVal);
if( bReverse ){
rc = lsm_csr_prev(csr);
}else{
rc = lsm_csr_next(csr);
}
}
if( pDb->pCsr==0 ){
lsm_csr_close(csr);
}
return rc;
}
static int test_lsm_begin(TestDb *pTestDb, int iLevel){
int rc = LSM_OK;
LsmDb *pDb = (LsmDb *)pTestDb;
if( iLevel==0 ) return 0;
if( pDb->pCsr==0 ) rc = lsm_csr_open(pDb->db, &pDb->pCsr);
if( rc==LSM_OK && iLevel>1 ){
rc = lsm_begin(pDb->db, iLevel-1);
}
return rc;
}
static int test_lsm_commit(TestDb *pTestDb, int iLevel){
LsmDb *pDb = (LsmDb *)pTestDb;
if( iLevel==0 && pDb->pCsr ){
lsm_csr_close(pDb->pCsr);
pDb->pCsr = 0;
}
return lsm_commit(pDb->db, MAX(0, iLevel-1));
}
static int test_lsm_rollback(TestDb *pTestDb, int iLevel){
LsmDb *pDb = (LsmDb *)pTestDb;
if( iLevel==0 && pDb->pCsr ){
lsm_csr_close(pDb->pCsr);
pDb->pCsr = 0;
}
return lsm_rollback(pDb->db, MAX(0, iLevel-1));
}
static void xLog(void *pCtx, int rc, const char *z){
unused_parameter(rc);
if( pCtx ) fprintf(stderr, "%s: ", (char *)pCtx);
fprintf(stderr, "%s\n", z);
fflush(stderr);
}
static void xWorkHook(lsm_db *db, void *pArg){
LsmDb *p = (LsmDb *)pArg;
if( p->xWork ) p->xWork(db, p->pWorkCtx);
}
#define TEST_NO_RECOVERY -1
#define TEST_COMPRESSION -3
#define TEST_MT_MODE -2
#define TEST_MT_MIN_CKPT -4
#define TEST_MT_MAX_CKPT -5
int test_lsm_config_str(
LsmDb *pLsm,
lsm_db *db,
int bWorker,
const char *zStr,
int *pnThread
){
struct CfgParam {
const char *zParam;
int bWorker;
int eParam;
} aParam[] = {
{ "autoflush", 0, LSM_CONFIG_AUTOFLUSH },
{ "page_size", 0, LSM_CONFIG_PAGE_SIZE },
{ "block_size", 0, LSM_CONFIG_BLOCK_SIZE },
{ "safety", 0, LSM_CONFIG_SAFETY },
{ "autowork", 0, LSM_CONFIG_AUTOWORK },
{ "autocheckpoint", 0, LSM_CONFIG_AUTOCHECKPOINT },
{ "mmap", 0, LSM_CONFIG_MMAP },
{ "use_log", 0, LSM_CONFIG_USE_LOG },
{ "automerge", 0, LSM_CONFIG_AUTOMERGE },
{ "max_freelist", 0, LSM_CONFIG_MAX_FREELIST },
{ "multi_proc", 0, LSM_CONFIG_MULTIPLE_PROCESSES },
{ "worker_automerge", 1, LSM_CONFIG_AUTOMERGE },
{ "test_no_recovery", 0, TEST_NO_RECOVERY },
{ "bg_min_ckpt", 0, TEST_NO_RECOVERY },
{ "mt_mode", 0, TEST_MT_MODE },
{ "mt_min_ckpt", 0, TEST_MT_MIN_CKPT },
{ "mt_max_ckpt", 0, TEST_MT_MAX_CKPT },
#ifdef HAVE_ZLIB
{ "compression", 0, TEST_COMPRESSION },
#endif
{ 0, 0 }
};
const char *z = zStr;
int nThread = 1;
if( zStr==0 ) return 0;
assert( db );
while( z[0] ){
const char *zStart;
while( *z==' ' ) z++;
zStart = z;
while( *z && *z!='=' ) z++;
if( *z ){
int eParam;
int i;
int iVal;
int iMul = 1;
int rc;
char zParam[32];
int nParam = z-zStart;
if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;
memcpy(zParam, zStart, nParam);
zParam[nParam] = '\0';
rc = testArgSelect(aParam, "param", zParam, &i);
if( rc!=0 ) return rc;
eParam = aParam[i].eParam;
z++;
zStart = z;
while( *z>='0' && *z<='9' ) z++;
if( *z=='k' || *z=='K' ){
iMul = 1;
z++;
}else if( *z=='M' || *z=='M' ){
iMul = 1024;
z++;
}
nParam = z-zStart;
if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;
memcpy(zParam, zStart, nParam);
zParam[nParam] = '\0';
iVal = atoi(zParam) * iMul;
if( eParam>0 ){
if( bWorker || aParam[i].bWorker==0 ){
lsm_config(db, eParam, &iVal);
}
}else{
switch( eParam ){
case TEST_NO_RECOVERY:
if( pLsm ) pLsm->bNoRecovery = iVal;
break;
case TEST_MT_MODE:
if( pLsm ) nThread = iVal;
break;
case TEST_MT_MIN_CKPT:
if( pLsm && iVal>0 ) pLsm->nMtMinCkpt = iVal*1024;
break;
case TEST_MT_MAX_CKPT:
if( pLsm && iVal>0 ) pLsm->nMtMaxCkpt = iVal*1024;
break;
#ifdef HAVE_ZLIB
case TEST_COMPRESSION:
testConfigureCompression(db);
break;
#endif
}
}
}else if( z!=zStart ){
goto syntax_error;
}
}
if( pnThread ) *pnThread = nThread;
if( pLsm && pLsm->nMtMaxCkpt < pLsm->nMtMinCkpt ){
pLsm->nMtMinCkpt = pLsm->nMtMaxCkpt;
}
return 0;
syntax_error:
testPrintError("syntax error at: \"%s\"\n", z);
return 1;
}
int tdb_lsm_config_str(TestDb *pDb, const char *zStr){
int rc = 0;
if( tdb_lsm(pDb) ){
#ifdef LSM_MUTEX_PTHREADS
int i;
#endif
LsmDb *pLsm = (LsmDb *)pDb;
rc = test_lsm_config_str(pLsm, pLsm->db, 0, zStr, 0);
#ifdef LSM_MUTEX_PTHREADS
for(i=0; rc==0 && i<pLsm->nWorker; i++){
rc = test_lsm_config_str(0, pLsm->aWorker[i].pWorker, 1, zStr, 0);
}
#endif
}
return rc;
}
int tdb_lsm_configure(lsm_db *db, const char *zConfig){
return test_lsm_config_str(0, db, 0, zConfig, 0);
}
static int testLsmStartWorkers(LsmDb *, int, const char *, const char *);
static int testLsmOpen(
const char *zCfg,
const char *zFilename,
int bClear,
TestDb **ppDb
){
static const DatabaseMethods LsmMethods = {
test_lsm_close,
test_lsm_write,
test_lsm_delete,
test_lsm_delete_range,
test_lsm_fetch,
test_lsm_scan,
test_lsm_begin,
test_lsm_commit,
test_lsm_rollback
};
int rc;
int nFilename;
LsmDb *pDb;
assert( zFilename);
if( bClear ) testDeleteLsmdb(zFilename);
nFilename = strlen(zFilename);
pDb = (LsmDb *)testMalloc(sizeof(LsmDb) + nFilename + 1);
memset(pDb, 0, sizeof(LsmDb));
pDb->base.pMethods = &LsmMethods;
pDb->zName = (char *)&pDb[1];
memcpy(pDb->zName, zFilename, nFilename + 1);
pDb->szSector = 256;
pDb->nMtMinCkpt = LSMTEST_DFLT_MT_MIN_CKPT;
pDb->nMtMaxCkpt = LSMTEST_DFLT_MT_MAX_CKPT;
memcpy(&pDb->env, tdb_lsm_env(), sizeof(lsm_env));
pDb->env.pVfsCtx = (void *)pDb;
pDb->env.xFullpath = testEnvFullpath;
pDb->env.xOpen = testEnvOpen;
pDb->env.xRead = testEnvRead;
pDb->env.xWrite = testEnvWrite;
pDb->env.xTruncate = testEnvTruncate;
pDb->env.xSync = testEnvSync;
pDb->env.xSectorSize = testEnvSectorSize;
pDb->env.xRemap = testEnvRemap;
pDb->env.xFileid = testEnvFileid;
pDb->env.xClose = testEnvClose;
pDb->env.xUnlink = testEnvUnlink;
pDb->env.xLock = testEnvLock;
pDb->env.xTestLock = testEnvTestLock;
pDb->env.xShmBarrier = testEnvShmBarrier;
pDb->env.xShmMap = testEnvShmMap;
pDb->env.xShmUnmap = testEnvShmUnmap;
pDb->env.xSleep = testEnvSleep;
rc = lsm_new(&pDb->env, &pDb->db);
if( rc==LSM_OK ){
int nThread = 1;
lsm_config_log(pDb->db, xLog, 0);
lsm_config_work_hook(pDb->db, xWorkHook, (void *)pDb);
rc = test_lsm_config_str(pDb, pDb->db, 0, zCfg, &nThread);
if( rc==LSM_OK ) rc = lsm_open(pDb->db, zFilename);
pDb->eMode = nThread;
#ifdef LSM_MUTEX_PTHREADS
if( rc==LSM_OK && nThread>1 ){
testLsmStartWorkers(pDb, nThread, zFilename, zCfg);
}
#endif
if( rc!=LSM_OK ){
test_lsm_close((TestDb *)pDb);
pDb = 0;
}
}
*ppDb = (TestDb *)pDb;
return rc;
}
int test_lsm_open(
const char *zSpec,
const char *zFilename,
int bClear,
TestDb **ppDb
){
return testLsmOpen(zSpec, zFilename, bClear, ppDb);
}
int test_lsm_small_open(
const char *zSpec,
const char *zFile,
int bClear,
TestDb **ppDb
){
const char *zCfg = "page_size=256 block_size=64 mmap=1024";
return testLsmOpen(zCfg, zFile, bClear, ppDb);
}
int test_lsm_lomem_open(
const char *zSpec,
const char *zFilename,
int bClear,
TestDb **ppDb
){
const char *zCfg =
"page_size=256 block_size=64 autoflush=16 "
"autocheckpoint=32"
"mmap=0 "
;
return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}
int test_lsm_lomem2_open(
const char *zSpec,
const char *zFilename,
int bClear,
TestDb **ppDb
){
const char *zCfg =
"page_size=512 block_size=64 autoflush=0 mmap=0 "
;
return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}
int test_lsm_zip_open(
const char *zSpec,
const char *zFilename,
int bClear,
TestDb **ppDb
){
const char *zCfg =
"page_size=256 block_size=64 autoflush=16 "
"autocheckpoint=32 compression=1 mmap=0 "
;
return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}
lsm_db *tdb_lsm(TestDb *pDb){
if( pDb->pMethods->xClose==test_lsm_close ){
return ((LsmDb *)pDb)->db;
}
return 0;
}
int tdb_lsm_multithread(TestDb *pDb){
int ret = 0;
if( tdb_lsm(pDb) ){
ret = ((LsmDb*)pDb)->eMode!=LSMTEST_MODE_SINGLETHREAD;
}
return ret;
}
void tdb_lsm_enable_log(TestDb *pDb, int bEnable){
lsm_db *db = tdb_lsm(pDb);
if( db ){
lsm_config_log(db, (bEnable ? xLog : 0), (void *)"client");
}
}
void tdb_lsm_application_crash(TestDb *pDb){
if( tdb_lsm(pDb) ){
LsmDb *p = (LsmDb *)pDb;
p->bCrashed = 1;
}
}
void tdb_lsm_prepare_system_crash(TestDb *pDb){
if( tdb_lsm(pDb) ){
LsmDb *p = (LsmDb *)pDb;
p->bPrepareCrash = 1;
}
}
void tdb_lsm_system_crash(TestDb *pDb){
if( tdb_lsm(pDb) ){
LsmDb *p = (LsmDb *)pDb;
p->bCrashed = 1;
doSystemCrash(p);
}
}
void tdb_lsm_safety(TestDb *pDb, int eMode){
assert( eMode==LSM_SAFETY_OFF
|| eMode==LSM_SAFETY_NORMAL
|| eMode==LSM_SAFETY_FULL
);
if( tdb_lsm(pDb) ){
int iParam = eMode;
LsmDb *p = (LsmDb *)pDb;
lsm_config(p->db, LSM_CONFIG_SAFETY, &iParam);
}
}
void tdb_lsm_prepare_sync_crash(TestDb *pDb, int iSync){
assert( iSync>0 );
if( tdb_lsm(pDb) ){
LsmDb *p = (LsmDb *)pDb;
p->nAutoCrash = iSync;
p->bPrepareCrash = 1;
}
}
void tdb_lsm_config_work_hook(
TestDb *pDb,
void (*xWork)(lsm_db *, void *),
void *pWorkCtx
){
if( tdb_lsm(pDb) ){
LsmDb *p = (LsmDb *)pDb;
p->xWork = xWork;
p->pWorkCtx = pWorkCtx;
}
}
void tdb_lsm_write_hook(
TestDb *pDb,
void (*xWrite)(void *, int, lsm_i64, int, int),
void *pWriteCtx
){
if( tdb_lsm(pDb) ){
LsmDb *p = (LsmDb *)pDb;
p->xWriteHook = xWrite;
p->pWriteCtx = pWriteCtx;
}
}
int tdb_lsm_open(const char *zCfg, const char *zDb, int bClear, TestDb **ppDb){
return testLsmOpen(zCfg, zDb, bClear, ppDb);
}
#ifdef LSM_MUTEX_PTHREADS
static void mt_signal_worker(LsmDb *pDb, int iWorker){
LsmWorker *p = &pDb->aWorker[iWorker];
pthread_mutex_lock(&p->worker_mutex);
p->bDoWork = 1;
pthread_cond_signal(&p->worker_cond);
pthread_mutex_unlock(&p->worker_mutex);
}
static void *worker_main(void *pArg){
LsmWorker *p = (LsmWorker *)pArg;
lsm_db *pWorker;
pthread_mutex_lock(&p->worker_mutex);
while( (pWorker = p->pWorker) ){
int rc = LSM_OK;
pthread_mutex_unlock(&p->worker_mutex);
if( p->eType==LSMTEST_THREAD_CKPT ){
int nKB = 0;
rc = lsm_info(pWorker, LSM_INFO_CHECKPOINT_SIZE, &nKB);
if( rc==LSM_OK && nKB>=p->pDb->nMtMinCkpt ){
rc = lsm_checkpoint(pWorker, 0);
}
}else{
int nWrite;
do {
if( p->eType==LSMTEST_THREAD_WORKER ){
waitOnCheckpointer(p->pDb, pWorker);
}
nWrite = 0;
rc = lsm_work(pWorker, 0, 256, &nWrite);
if( p->eType==LSMTEST_THREAD_WORKER && nWrite ){
mt_signal_worker(p->pDb, 1);
}
}while( nWrite && p->pWorker );
}
pthread_mutex_lock(&p->worker_mutex);
if( rc!=LSM_OK && rc!=LSM_BUSY ){
p->worker_rc = rc;
break;
}
if( p->pWorker && p->bDoWork==0 ){
pthread_cond_wait(&p->worker_cond, &p->worker_mutex);
}
p->bDoWork = 0;
}
pthread_mutex_unlock(&p->worker_mutex);
return 0;
}
static void mt_stop_worker(LsmDb *pDb, int iWorker){
LsmWorker *p = &pDb->aWorker[iWorker];
if( p->pWorker ){
void *pDummy;
lsm_db *pWorker;
pthread_mutex_lock(&p->worker_mutex);
pWorker = p->pWorker;
p->pWorker = 0;
pthread_cond_signal(&p->worker_cond);
pthread_mutex_unlock(&p->worker_mutex);
pthread_join(p->worker_thread, &pDummy);
pthread_cond_destroy(&p->worker_cond);
pthread_mutex_destroy(&p->worker_mutex);
lsm_close(pWorker);
}
}
static void mt_shutdown(LsmDb *pDb){
int i;
for(i=0; i<pDb->nWorker; i++){
mt_stop_worker(pDb, i);
}
}
static void mt_client_work_hook(lsm_db *db, void *pArg){
LsmDb *pDb = (LsmDb *)pArg;
if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx);
mt_signal_worker(pDb, 0);
}
static void mt_worker_work_hook(lsm_db *db, void *pArg){
LsmDb *pDb = (LsmDb *)pArg;
if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx);
}
static int mt_start_worker(
LsmDb *pDb,
int iWorker,
const char *zFilename,
const char *zCfg,
int eType
){
int rc = 0;
LsmWorker *p;
assert( iWorker<pDb->nWorker );
assert( eType==LSMTEST_THREAD_CKPT
|| eType==LSMTEST_THREAD_WORKER
|| eType==LSMTEST_THREAD_WORKER_AC
);
p = &pDb->aWorker[iWorker];
p->eType = eType;
p->pDb = pDb;
if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker);
if( zCfg ){
test_lsm_config_str(pDb, p->pWorker, 1, zCfg, 0);
}
if( rc==0 ) rc = lsm_open(p->pWorker, zFilename);
lsm_config_log(p->pWorker, xLog, (void *)"worker");
if( rc==0 ){
lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb);
}
if( eType==LSMTEST_THREAD_WORKER ){
test_lsm_config_str(0, p->pWorker, 1, "autocheckpoint=0", 0);
}
if( rc==0 ) rc = pthread_cond_init(&p->worker_cond, 0);
if( rc==0 ) rc = pthread_mutex_init(&p->worker_mutex, 0);
if( rc==0 ) rc = pthread_create(&p->worker_thread, 0, worker_main, (void *)p);
return rc;
}
static int testLsmStartWorkers(
LsmDb *pDb, int eModel, const char *zFilename, const char *zCfg
){
int rc;
if( eModel<1 || eModel>4 ) return 1;
if( eModel==1 ) return 0;
lsm_config_work_hook(pDb->db, mt_client_work_hook, (void *)pDb);
pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * 2);
memset(pDb->aWorker, 0, sizeof(LsmWorker) * 2);
switch( eModel ){
case LSMTEST_MODE_BACKGROUND_CKPT:
pDb->nWorker = 1;
test_lsm_config_str(0, pDb->db, 0, "autocheckpoint=0", 0);
rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_CKPT);
break;
case LSMTEST_MODE_BACKGROUND_WORK:
pDb->nWorker = 1;
test_lsm_config_str(0, pDb->db, 0, "autowork=0", 0);
rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_WORKER_AC);
break;
case LSMTEST_MODE_BACKGROUND_BOTH:
pDb->nWorker = 2;
test_lsm_config_str(0, pDb->db, 0, "autowork=0", 0);
rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_WORKER);
if( rc==0 ){
rc = mt_start_worker(pDb, 1, zFilename, zCfg, LSMTEST_THREAD_CKPT);
}
break;
}
return rc;
}
int test_lsm_mt2(
const char *zSpec,
const char *zFilename,
int bClear,
TestDb **ppDb
){
const char *zCfg = "mt_mode=2";
return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}
int test_lsm_mt3(
const char *zSpec,
const char *zFilename,
int bClear,
TestDb **ppDb
){
const char *zCfg = "mt_mode=4";
return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}
#else
static void mt_shutdown(LsmDb *pDb) {
unused_parameter(pDb);
}
int test_lsm_mt(const char *zFilename, int bClear, TestDb **ppDb){
unused_parameter(zFilename);
unused_parameter(bClear);
unused_parameter(ppDb);
testPrintError("threads unavailable - recompile with LSM_MUTEX_PTHREADS\n");
return 1;
}
#endif