#include "lsmtest_tdb.h"
#include "lsm.h"
#include "lsmtest.h"

#include <stdlib.h>
#include <string.h>
#include <assert.h>
#ifndef _WIN32
# include <unistd.h>
#endif
#include <stdio.h>

#ifndef _WIN32
# include <sys/time.h>
#endif

typedef struct LsmDb LsmDb;
typedef struct LsmWorker LsmWorker;
typedef struct LsmFile LsmFile;

#define LSMTEST_DFLT_MT_MAX_CKPT (8*1024)
#define LSMTEST_DFLT_MT_MIN_CKPT (2*1024)

#ifdef LSM_MUTEX_PTHREADS
#include <pthread.h>

#define LSMTEST_THREAD_CKPT      1
#define LSMTEST_THREAD_WORKER    2
#define LSMTEST_THREAD_WORKER_AC 3

/*
** There are several different types of worker threads that run in different
** test configurations, depending on the value of LsmWorker.eType.
**
**   1. Checkpointer.
**   2. Worker with auto-checkpoint.
**   3. Worker without auto-checkpoint.
*/
struct LsmWorker {
  LsmDb *pDb;                     /* Main database structure */
  lsm_db *pWorker;                /* Worker database handle */
  pthread_t worker_thread;        /* Worker thread */
  pthread_cond_t worker_cond;     /* Condition var the worker waits on */
  pthread_mutex_t worker_mutex;   /* Mutex used with worker_cond */
  int bDoWork;                    /* Set to true by client when there is work */
  int worker_rc;                  /* Store error code here */
  int eType;                      /* LSMTEST_THREAD_XXX constant */
  int bBlock;
};
#else
struct LsmWorker { int worker_rc; int bBlock; };
#endif

static void mt_shutdown(LsmDb *);

lsm_env *tdb_lsm_env(void){
  static int bInit = 0;
  static lsm_env env;
  if( bInit==0 ){
    memcpy(&env, lsm_default_env(), sizeof(env));
    bInit = 1;
  }
  return &env;
}

typedef struct FileSector FileSector;
typedef struct FileData FileData;

struct FileSector {
  u8 *aOld;                       /* Old data for this sector */
};

struct FileData {
  int nSector;                    /* Allocated size of apSector[] array */
  FileSector *aSector;            /* Array of file sectors */
};

/*
** bPrepareCrash:
**   If non-zero, the file wrappers maintain enough in-memory data to
**   simulate the effect of a power-failure on the file-system (i.e. that
**   unsynced sectors may be written, not written, or overwritten with
**   arbitrary data when the crash occurs).
**
** bCrashed:
**   Set to true after a crash is simulated. Once this variable is true, all
**   VFS methods other than xClose() return LSM_IOERR as soon as they are
**   called (without affecting the contents of the file-system).
**
** env:
**   The environment object used by all lsm_db* handles opened by this
**   object (i.e. LsmDb.db plus any worker connections). Variable env.pVfsCtx
**   always points to the containing LsmDb structure.
*/
struct LsmDb {
  TestDb base;                    /* Base class - methods table */
  lsm_env env;                    /* Environment used by connection db */
  char *zName;                    /* Database file name */
  lsm_db *db;                     /* LSM database handle */

  lsm_cursor *pCsr;               /* Cursor held open during read transaction */
  void *pBuf;                     /* Buffer for tdb_fetch() output */
  int nBuf;                       /* Allocated (not used) size of pBuf */

  /* Crash testing related state */
  int bCrashed;                   /* True once a crash has occurred */
  int nAutoCrash;                 /* Number of syncs until a crash */
  int bPrepareCrash;              /* True to store writes in memory */

  /* Unsynced data (while crash testing) */
  int szSector;                   /* Assumed size of disk sectors (512B) */
  FileData aFile[2];              /* Database and log file data */

  /* Other test instrumentation */
  int bNoRecovery;                /* If true, assume DMS2 is locked */

  /* Work hook redirection */
  void (*xWork)(lsm_db *, void *);
  void *pWorkCtx;

  /* IO logging hook */
  void (*xWriteHook)(void *, int, lsm_i64, int, int);
  void *pWriteCtx;
  
  /* Worker threads (for lsm_mt) */
  int nMtMinCkpt;
  int nMtMaxCkpt;
  int eMode;
  int nWorker;
  LsmWorker *aWorker;
};

#define LSMTEST_MODE_SINGLETHREAD    1
#define LSMTEST_MODE_BACKGROUND_CKPT 2
#define LSMTEST_MODE_BACKGROUND_WORK 3
#define LSMTEST_MODE_BACKGROUND_BOTH 4

/*************************************************************************
**************************************************************************
** Begin test VFS code.
*/

struct LsmFile {
  lsm_file *pReal;                /* Real underlying file */
  int bLog;                       /* True for log file. False for db file */
  LsmDb *pDb;                     /* Database handle that uses this file */
};

static int testEnvFullpath(
  lsm_env *pEnv,                  /* Environment for current LsmDb */
  const char *zFile,              /* Relative path name */
  char *zOut,                     /* Output buffer */
  int *pnOut                      /* IN/OUT: Size of output buffer */
){
  lsm_env *pRealEnv = tdb_lsm_env();
  return pRealEnv->xFullpath(pRealEnv, zFile, zOut, pnOut);
}

static int testEnvOpen(
  lsm_env *pEnv,                  /* Environment for current LsmDb */
  const char *zFile,              /* Name of file to open */
  int flags,
  lsm_file **ppFile               /* OUT: New file handle object */
){
  lsm_env *pRealEnv = tdb_lsm_env();
  LsmDb *pDb = (LsmDb *)pEnv->pVfsCtx;
  int rc;                         /* Return Code */
  LsmFile *pRet;                  /* The new file handle */
  int nFile;                      /* Length of string zFile in bytes */

  nFile = strlen(zFile);
  pRet = (LsmFile *)testMalloc(sizeof(LsmFile));
  pRet->pDb = pDb;
  pRet->bLog = (nFile > 4 && 0==memcmp("-log", &zFile[nFile-4], 4));

  rc = pRealEnv->xOpen(pRealEnv, zFile, flags, &pRet->pReal);
  if( rc!=LSM_OK ){
    testFree(pRet);
    pRet = 0;
  }

  *ppFile = (lsm_file *)pRet;
  return rc;
}

static int testEnvRead(lsm_file *pFile, lsm_i64 iOff, void *pData, int nData){
  lsm_env *pRealEnv = tdb_lsm_env();
  LsmFile *p = (LsmFile *)pFile;
  if( p->pDb->bCrashed ) return LSM_IOERR;
  return pRealEnv->xRead(p->pReal, iOff, pData, nData);
}

static int testEnvWrite(lsm_file *pFile, lsm_i64 iOff, void *pData, int nData){
  lsm_env *pRealEnv = tdb_lsm_env();
  LsmFile *p = (LsmFile *)pFile;
  LsmDb *pDb = p->pDb;

  if( pDb->bCrashed ) return LSM_IOERR;

  if( pDb->bPrepareCrash ){
    FileData *pData2 = &pDb->aFile[p->bLog];
    int iFirst;                 
    int iLast;
    int iSector;

    iFirst = (int)(iOff / pDb->szSector);
    iLast =  (int)((iOff + nData - 1) / pDb->szSector);

    if( pData2->nSector<(iLast+1) ){
      int nNew = ( ((iLast + 1) + 63) / 64 ) * 64;
      assert( nNew>iLast );
      pData2->aSector = (FileSector *)testRealloc(
          pData2->aSector, nNew*sizeof(FileSector)
      );
      memset(&pData2->aSector[pData2->nSector], 
          0, (nNew - pData2->nSector) * sizeof(FileSector)
      );
      pData2->nSector = nNew;
    }

    for(iSector=iFirst; iSector<=iLast; iSector++){
      if( pData2->aSector[iSector].aOld==0 ){
        u8 *aOld = (u8 *)testMalloc(pDb->szSector);
        pRealEnv->xRead(
            p->pReal, (lsm_i64)iSector*pDb->szSector, aOld, pDb->szSector
        );
        pData2->aSector[iSector].aOld = aOld;
      }
    }
  }

  if( pDb->xWriteHook ){
    int rc;
    int nUs;
    struct timeval t1;
    struct timeval t2;

    gettimeofday(&t1, 0);
    assert( nData>0 );
    rc = pRealEnv->xWrite(p->pReal, iOff, pData, nData);
    gettimeofday(&t2, 0);

    nUs = (t2.tv_sec - t1.tv_sec) * 1000000 + (t2.tv_usec - t1.tv_usec);
    pDb->xWriteHook(pDb->pWriteCtx, p->bLog, iOff, nData, nUs);
    return rc;
  }

  return pRealEnv->xWrite(p->pReal, iOff, pData, nData);
}

static void doSystemCrash(LsmDb *pDb);

static int testEnvSync(lsm_file *pFile){
  lsm_env *pRealEnv = tdb_lsm_env();
  LsmFile *p = (LsmFile *)pFile;
  LsmDb *pDb = p->pDb;
  FileData *pData = &pDb->aFile[p->bLog];
  int i;

  if( pDb->bCrashed ) return LSM_IOERR;

  if( pDb->nAutoCrash ){
    pDb->nAutoCrash--;
    if( pDb->nAutoCrash==0 ){
      doSystemCrash(pDb);
      pDb->bCrashed = 1;
      return LSM_IOERR;
    }
  }

  if( pDb->bPrepareCrash ){
    for(i=0; i<pData->nSector; i++){
      testFree(pData->aSector[i].aOld);
      pData->aSector[i].aOld = 0;
    }
  }

  if( pDb->xWriteHook ){
    int rc;
    int nUs;
    struct timeval t1;
    struct timeval t2;

    gettimeofday(&t1, 0);
    rc = pRealEnv->xSync(p->pReal);
    gettimeofday(&t2, 0);

    nUs = (t2.tv_sec - t1.tv_sec) * 1000000 + (t2.tv_usec - t1.tv_usec);
    pDb->xWriteHook(pDb->pWriteCtx, p->bLog, 0, 0, nUs);
    return rc;
  }

  return pRealEnv->xSync(p->pReal);
}

static int testEnvTruncate(lsm_file *pFile, lsm_i64 iOff){
  lsm_env *pRealEnv = tdb_lsm_env();
  LsmFile *p = (LsmFile *)pFile;
  if( p->pDb->bCrashed ) return LSM_IOERR;
  return pRealEnv->xTruncate(p->pReal, iOff);
}

static int testEnvSectorSize(lsm_file *pFile){
  lsm_env *pRealEnv = tdb_lsm_env();
  LsmFile *p = (LsmFile *)pFile;
  return pRealEnv->xSectorSize(p->pReal);
}

static int testEnvRemap(
  lsm_file *pFile, 
  lsm_i64 iMin, 
  void **ppOut,
  lsm_i64 *pnOut
){
  lsm_env *pRealEnv = tdb_lsm_env();
  LsmFile *p = (LsmFile *)pFile;
  return pRealEnv->xRemap(p->pReal, iMin, ppOut, pnOut);
}

static int testEnvFileid(
  lsm_file *pFile, 
  void *ppOut,
  int *pnOut
){
  lsm_env *pRealEnv = tdb_lsm_env();
  LsmFile *p = (LsmFile *)pFile;
  return pRealEnv->xFileid(p->pReal, ppOut, pnOut);
}

static int testEnvClose(lsm_file *pFile){
  lsm_env *pRealEnv = tdb_lsm_env();
  LsmFile *p = (LsmFile *)pFile;

  pRealEnv->xClose(p->pReal);
  testFree(p);
  return LSM_OK;
}

static int testEnvUnlink(lsm_env *pEnv, const char *zFile){
  lsm_env *pRealEnv = tdb_lsm_env();
  unused_parameter(pEnv);
  return pRealEnv->xUnlink(pRealEnv, zFile);
}

static int testEnvLock(lsm_file *pFile, int iLock, int eType){
  LsmFile *p = (LsmFile *)pFile;
  lsm_env *pRealEnv = tdb_lsm_env();

  if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){
    return LSM_BUSY;
  }
  return pRealEnv->xLock(p->pReal, iLock, eType);
}

static int testEnvTestLock(lsm_file *pFile, int iLock, int nLock, int eType){
  LsmFile *p = (LsmFile *)pFile;
  lsm_env *pRealEnv = tdb_lsm_env();

  if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){
    return LSM_BUSY;
  }
  return pRealEnv->xTestLock(p->pReal, iLock, nLock, eType);
}

static int testEnvShmMap(lsm_file *pFile, int iRegion, int sz, void **pp){
  LsmFile *p = (LsmFile *)pFile;
  lsm_env *pRealEnv = tdb_lsm_env();
  return pRealEnv->xShmMap(p->pReal, iRegion, sz, pp);
}

static void testEnvShmBarrier(void){
}

static int testEnvShmUnmap(lsm_file *pFile, int bDel){
  LsmFile *p = (LsmFile *)pFile;
  lsm_env *pRealEnv = tdb_lsm_env();
  return pRealEnv->xShmUnmap(p->pReal, bDel);
}

static int testEnvSleep(lsm_env *pEnv, int us){
  lsm_env *pRealEnv = tdb_lsm_env();
  return pRealEnv->xSleep(pRealEnv, us);
}

static void doSystemCrash(LsmDb *pDb){
  lsm_env *pEnv = tdb_lsm_env();
  int iFile;
  int iSeed = pDb->aFile[0].nSector + pDb->aFile[1].nSector;

  char *zFile = pDb->zName;
  char *zFree = 0;

  for(iFile=0; iFile<2; iFile++){
    lsm_file *pFile = 0;
    int i;

    pEnv->xOpen(pEnv, zFile, 0, &pFile);
    for(i=0; i<pDb->aFile[iFile].nSector; i++){
      u8 *aOld = pDb->aFile[iFile].aSector[i].aOld;
      if( aOld ){
        int iOpt = testPrngValue(iSeed++) % 3;
        switch( iOpt ){
          case 0:
            break;

          case 1:
            testPrngArray(iSeed++, (u32 *)aOld, pDb->szSector/4);
            /* Fall-through */

          case 2:
            pEnv->xWrite(
                pFile, (lsm_i64)i * pDb->szSector, aOld, pDb->szSector
            );
            break;
        }
        testFree(aOld);
        pDb->aFile[iFile].aSector[i].aOld = 0;
      }
    }
    pEnv->xClose(pFile);
    zFree = zFile = sqlite3_mprintf("%s-log", pDb->zName);
  }

  sqlite3_free(zFree);
}
/*
** End test VFS code.
**************************************************************************
*************************************************************************/

/*************************************************************************
**************************************************************************
** Begin test compression hooks.
*/

#ifdef HAVE_ZLIB
#include <zlib.h>

static int testZipBound(void *pCtx, int nSrc){
  return compressBound(nSrc);
}

static int testZipCompress(
  void *pCtx,                     /* Context pointer */
  char *aOut, int *pnOut,         /* OUT: Buffer containing compressed data */
  const char *aIn, int nIn        /* Buffer containing input data */
){
  uLongf n = *pnOut;              /* In/out buffer size for compress() */
  int rc;                         /* compress() return code */
 
  rc = compress((Bytef*)aOut, &n, (Bytef*)aIn, nIn);
  *pnOut = n;
  return (rc==Z_OK ? 0 : LSM_ERROR);
}

static int testZipUncompress(
  void *pCtx,                     /* Context pointer */
  char *aOut, int *pnOut,         /* OUT: Buffer containing uncompressed data */
  const char *aIn, int nIn        /* Buffer containing input data */
){
  uLongf n = *pnOut;              /* In/out buffer size for uncompress() */
  int rc;                         /* uncompress() return code */

  rc = uncompress((Bytef*)aOut, &n, (Bytef*)aIn, nIn);
  *pnOut = n;
  return (rc==Z_OK ? 0 : LSM_ERROR);
}

static int testConfigureCompression(lsm_db *pDb){
  static lsm_compress zip = {
    0,                            /* Context pointer (unused) */
    1,                            /* Id value */
    testZipBound,                 /* xBound method */
    testZipCompress,              /* xCompress method */
    testZipUncompress             /* xUncompress method */
  };
  return lsm_config(pDb, LSM_CONFIG_SET_COMPRESSION, &zip);
}
#endif /* ifdef HAVE_ZLIB */

/*
** End test compression hooks.
**************************************************************************
*************************************************************************/

static int test_lsm_close(TestDb *pTestDb){
  int i;
  int rc = LSM_OK;
  LsmDb *pDb = (LsmDb *)pTestDb;

  lsm_csr_close(pDb->pCsr);
  lsm_close(pDb->db);

  /* If this is a multi-threaded database, wait on the worker threads. */
  mt_shutdown(pDb);
  for(i=0; i<pDb->nWorker && rc==LSM_OK; i++){
    rc = pDb->aWorker[i].worker_rc;
  }

  for(i=0; i<pDb->aFile[0].nSector; i++){
    testFree(pDb->aFile[0].aSector[i].aOld);
  }
  testFree(pDb->aFile[0].aSector);
  for(i=0; i<pDb->aFile[1].nSector; i++){
    testFree(pDb->aFile[1].aSector[i].aOld);
  }
  testFree(pDb->aFile[1].aSector);

  memset(pDb, sizeof(LsmDb), 0x11);
  testFree((char *)pDb->pBuf);
  testFree((char *)pDb);
  return rc;
}

static void mt_signal_worker(LsmDb*, int);

static int waitOnCheckpointer(LsmDb *pDb, lsm_db *db){
  int nSleep = 0;
  int nKB;
  int rc;

  do {
    nKB = 0;
    rc = lsm_info(db, LSM_INFO_CHECKPOINT_SIZE, &nKB);
    if( rc!=LSM_OK || nKB<pDb->nMtMaxCkpt ) break;
#ifdef LSM_MUTEX_PTHREADS
    mt_signal_worker(pDb, 
        (pDb->eMode==LSMTEST_MODE_BACKGROUND_CKPT ? 0 : 1)
    );
#endif
    usleep(5000);
    nSleep += 5;
  }while( 1 );

#if 0
    if( nSleep ) printf("# waitOnCheckpointer(): nSleep=%d\n", nSleep);
#endif

  return rc;
}

static int waitOnWorker(LsmDb *pDb){
  int rc;
  int nLimit = -1;
  int nSleep = 0;

  rc = lsm_config(pDb->db, LSM_CONFIG_AUTOFLUSH, &nLimit);
  do {
    int nOld, nNew, rc2;
    rc2 = lsm_info(pDb->db, LSM_INFO_TREE_SIZE, &nOld, &nNew);
    if( rc2!=LSM_OK ) return rc2;
    if( nOld==0 || nNew<(nLimit/2) ) break;
#ifdef LSM_MUTEX_PTHREADS
    mt_signal_worker(pDb, 0);
#endif
    usleep(5000);
    nSleep += 5;
  }while( 1 );

#if 0
  if( nSleep ) printf("# waitOnWorker(): nSleep=%d\n", nSleep);
#endif

  return rc;
}

static int test_lsm_write(
  TestDb *pTestDb, 
  void *pKey, 
  int nKey, 
  void *pVal,
  int nVal
){
  LsmDb *pDb = (LsmDb *)pTestDb;
  int rc = LSM_OK;

  if( pDb->eMode==LSMTEST_MODE_BACKGROUND_CKPT ){
    rc = waitOnCheckpointer(pDb, pDb->db);
  }else if( 
      pDb->eMode==LSMTEST_MODE_BACKGROUND_WORK
   || pDb->eMode==LSMTEST_MODE_BACKGROUND_BOTH 
  ){
    rc = waitOnWorker(pDb);
  }

  if( rc==LSM_OK ){
    rc = lsm_insert(pDb->db, pKey, nKey, pVal, nVal);
  }
  return rc;
}

static int test_lsm_delete(TestDb *pTestDb, void *pKey, int nKey){
  LsmDb *pDb = (LsmDb *)pTestDb;
  return lsm_delete(pDb->db, pKey, nKey);
}

static int test_lsm_delete_range(
  TestDb *pTestDb, 
  void *pKey1, int nKey1,
  void *pKey2, int nKey2
){
  LsmDb *pDb = (LsmDb *)pTestDb;
  return lsm_delete_range(pDb->db, pKey1, nKey1, pKey2, nKey2);
}

static int test_lsm_fetch(
  TestDb *pTestDb, 
  void *pKey, 
  int nKey, 
  void **ppVal, 
  int *pnVal
){
  int rc;
  LsmDb *pDb = (LsmDb *)pTestDb;
  lsm_cursor *csr;

  if( pKey==0 ) return LSM_OK;

  if( pDb->pCsr==0 ){
    rc = lsm_csr_open(pDb->db, &csr);
    if( rc!=LSM_OK ) return rc;
  }else{
    csr = pDb->pCsr;
  }

  rc = lsm_csr_seek(csr, pKey, nKey, LSM_SEEK_EQ);
  if( rc==LSM_OK ){
    if( lsm_csr_valid(csr) ){
      const void *pVal; int nVal;
      rc = lsm_csr_value(csr, &pVal, &nVal);
      if( nVal>pDb->nBuf ){
        testFree(pDb->pBuf);
        pDb->pBuf = testMalloc(nVal*2);
        pDb->nBuf = nVal*2;
      }
      memcpy(pDb->pBuf, pVal, nVal);
      *ppVal = pDb->pBuf;
      *pnVal = nVal;
    }else{
      *ppVal = 0;
      *pnVal = -1;
    }
  }
  if( pDb->pCsr==0 ){
    lsm_csr_close(csr);
  }
  return rc;
}

static int test_lsm_scan(
  TestDb *pTestDb,
  void *pCtx,
  int bReverse,
  void *pFirst, int nFirst,
  void *pLast, int nLast,
  void (*xCallback)(void *, void *, int , void *, int)
){
  LsmDb *pDb = (LsmDb *)pTestDb;
  lsm_cursor *csr;
  lsm_cursor *csr2 = 0;
  int rc;

  if( pDb->pCsr==0 ){
    rc = lsm_csr_open(pDb->db, &csr);
    if( rc!=LSM_OK ) return rc;
  }else{
    rc = LSM_OK;
    csr = pDb->pCsr;
  }

  /* To enhance testing, if both pLast and pFirst are defined, seek the
  ** cursor to the "end" boundary here. Then the next block seeks it to
  ** the "start" ready for the scan. The point is to test that cursors
  ** can be reused.  */
  if( pLast && pFirst ){
    if( bReverse ){
      rc = lsm_csr_seek(csr, pFirst, nFirst, LSM_SEEK_LE);
    }else{
      rc = lsm_csr_seek(csr, pLast, nLast, LSM_SEEK_GE);
    }
  }

  if( bReverse ){
    if( pLast ){
      rc = lsm_csr_seek(csr, pLast, nLast, LSM_SEEK_LE);
    }else{
      rc = lsm_csr_last(csr);
    }
  }else{
    if( pFirst ){
      rc = lsm_csr_seek(csr, pFirst, nFirst, LSM_SEEK_GE);
    }else{
      rc = lsm_csr_first(csr);
    }
  }

  while( rc==LSM_OK && lsm_csr_valid(csr) ){
    const void *pKey; int nKey;
    const void *pVal; int nVal;
    int cmp;

    lsm_csr_key(csr, &pKey, &nKey);
    lsm_csr_value(csr, &pVal, &nVal);

    if( bReverse && pFirst ){
      cmp = memcmp(pFirst, pKey, MIN(nKey, nFirst));
      if( cmp>0 || (cmp==0 && nFirst>nKey) ) break;
    }else if( bReverse==0 && pLast ){
      cmp = memcmp(pLast, pKey, MIN(nKey, nLast));
      if( cmp<0 || (cmp==0 && nLast<nKey) ) break;
    }

    xCallback(pCtx, (void *)pKey, nKey, (void *)pVal, nVal);

    if( bReverse ){
      rc = lsm_csr_prev(csr);
    }else{
      rc = lsm_csr_next(csr);
    }
  }

  if( pDb->pCsr==0 ){
    lsm_csr_close(csr);
  }
  return rc;
}

static int test_lsm_begin(TestDb *pTestDb, int iLevel){
  int rc = LSM_OK;
  LsmDb *pDb = (LsmDb *)pTestDb;

  /* iLevel==0 is a no-op. */
  if( iLevel==0 ) return 0;

  if( pDb->pCsr==0 ) rc = lsm_csr_open(pDb->db, &pDb->pCsr);
  if( rc==LSM_OK && iLevel>1 ){
    rc = lsm_begin(pDb->db, iLevel-1);
  }

  return rc;
}
static int test_lsm_commit(TestDb *pTestDb, int iLevel){
  LsmDb *pDb = (LsmDb *)pTestDb;

  /* If iLevel==0, close any open read transaction */
  if( iLevel==0 && pDb->pCsr ){
    lsm_csr_close(pDb->pCsr);
    pDb->pCsr = 0;
  }

  /* If iLevel==0, close any open read transaction */
  return lsm_commit(pDb->db, MAX(0, iLevel-1));
}
static int test_lsm_rollback(TestDb *pTestDb, int iLevel){
  LsmDb *pDb = (LsmDb *)pTestDb;

  /* If iLevel==0, close any open read transaction */
  if( iLevel==0 && pDb->pCsr ){
    lsm_csr_close(pDb->pCsr);
    pDb->pCsr = 0;
  }

  return lsm_rollback(pDb->db, MAX(0, iLevel-1));
}

/*
** A log message callback registered with lsm connections. Prints all 
** messages to stderr.
*/
static void xLog(void *pCtx, int rc, const char *z){
  unused_parameter(rc);
  /* fprintf(stderr, "lsm: rc=%d \"%s\"\n", rc, z); */
  if( pCtx ) fprintf(stderr, "%s: ", (char *)pCtx);
  fprintf(stderr, "%s\n", z);
  fflush(stderr);
}

static void xWorkHook(lsm_db *db, void *pArg){
  LsmDb *p = (LsmDb *)pArg;
  if( p->xWork ) p->xWork(db, p->pWorkCtx);
}

#define TEST_NO_RECOVERY -1
#define TEST_COMPRESSION -3

#define TEST_MT_MODE     -2
#define TEST_MT_MIN_CKPT -4
#define TEST_MT_MAX_CKPT -5


int test_lsm_config_str(
  LsmDb *pLsm,
  lsm_db *db, 
  int bWorker,
  const char *zStr,
  int *pnThread
){
  struct CfgParam {
    const char *zParam;
    int bWorker;
    int eParam;
  } aParam[] = {
    { "autoflush",        0, LSM_CONFIG_AUTOFLUSH },
    { "page_size",        0, LSM_CONFIG_PAGE_SIZE },
    { "block_size",       0, LSM_CONFIG_BLOCK_SIZE },
    { "safety",           0, LSM_CONFIG_SAFETY },
    { "autowork",         0, LSM_CONFIG_AUTOWORK },
    { "autocheckpoint",   0, LSM_CONFIG_AUTOCHECKPOINT },
    { "mmap",             0, LSM_CONFIG_MMAP },
    { "use_log",          0, LSM_CONFIG_USE_LOG },
    { "automerge",        0, LSM_CONFIG_AUTOMERGE },
    { "max_freelist",     0, LSM_CONFIG_MAX_FREELIST },
    { "multi_proc",       0, LSM_CONFIG_MULTIPLE_PROCESSES },
    { "worker_automerge", 1, LSM_CONFIG_AUTOMERGE },
    { "test_no_recovery", 0, TEST_NO_RECOVERY },
    { "bg_min_ckpt",      0, TEST_NO_RECOVERY },

    { "mt_mode",          0, TEST_MT_MODE },
    { "mt_min_ckpt",      0, TEST_MT_MIN_CKPT },
    { "mt_max_ckpt",      0, TEST_MT_MAX_CKPT },

#ifdef HAVE_ZLIB
    { "compression",      0, TEST_COMPRESSION },
#endif
    { 0, 0 }
  };
  const char *z = zStr;
  int nThread = 1;

  if( zStr==0 ) return 0;

  assert( db );
  while( z[0] ){
    const char *zStart;

    /* Skip whitespace */
    while( *z==' ' ) z++;
    zStart = z;

    while( *z && *z!='=' ) z++;
    if( *z ){
      int eParam;
      int i;
      int iVal;
      int iMul = 1;
      int rc;
      char zParam[32];
      int nParam = z-zStart;
      if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;

      memcpy(zParam, zStart, nParam);
      zParam[nParam] = '\0';
      rc = testArgSelect(aParam, "param", zParam, &i);
      if( rc!=0 ) return rc;
      eParam = aParam[i].eParam;

      z++;
      zStart = z;
      while( *z>='0' && *z<='9' ) z++;
      if( *z=='k' || *z=='K' ){
        iMul = 1;
        z++;
      }else if( *z=='M' || *z=='M' ){
        iMul = 1024;
        z++;
      }
      nParam = z-zStart;
      if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;
      memcpy(zParam, zStart, nParam);
      zParam[nParam] = '\0';
      iVal = atoi(zParam) * iMul;

      if( eParam>0 ){
        if( bWorker || aParam[i].bWorker==0 ){
          lsm_config(db, eParam, &iVal);
        }
      }else{
        switch( eParam ){
          case TEST_NO_RECOVERY:
            if( pLsm ) pLsm->bNoRecovery = iVal;
            break;
          case TEST_MT_MODE:
            if( pLsm ) nThread = iVal;
            break;
          case TEST_MT_MIN_CKPT:
            if( pLsm && iVal>0 ) pLsm->nMtMinCkpt = iVal*1024;
            break;
          case TEST_MT_MAX_CKPT:
            if( pLsm && iVal>0 ) pLsm->nMtMaxCkpt = iVal*1024;
            break;
#ifdef HAVE_ZLIB
          case TEST_COMPRESSION:
            testConfigureCompression(db);
            break;
#endif
        }
      }
    }else if( z!=zStart ){
      goto syntax_error;
    }
  }

  if( pnThread ) *pnThread = nThread;
  if( pLsm && pLsm->nMtMaxCkpt < pLsm->nMtMinCkpt ){
    pLsm->nMtMinCkpt = pLsm->nMtMaxCkpt;
  }

  return 0;
 syntax_error:
  testPrintError("syntax error at: \"%s\"\n", z);
  return 1;
}

int tdb_lsm_config_str(TestDb *pDb, const char *zStr){
  int rc = 0;
  if( tdb_lsm(pDb) ){
#ifdef LSM_MUTEX_PTHREADS
    int i;
#endif
    LsmDb *pLsm = (LsmDb *)pDb;

    rc = test_lsm_config_str(pLsm, pLsm->db, 0, zStr, 0);
#ifdef LSM_MUTEX_PTHREADS
    for(i=0; rc==0 && i<pLsm->nWorker; i++){
      rc = test_lsm_config_str(0, pLsm->aWorker[i].pWorker, 1, zStr, 0);
    }
#endif
  }
  return rc;
}

int tdb_lsm_configure(lsm_db *db, const char *zConfig){
  return test_lsm_config_str(0, db, 0, zConfig, 0);
}

static int testLsmStartWorkers(LsmDb *, int, const char *, const char *);

static int testLsmOpen(
  const char *zCfg,
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  static const DatabaseMethods LsmMethods = {
    test_lsm_close,
    test_lsm_write,
    test_lsm_delete,
    test_lsm_delete_range,
    test_lsm_fetch,
    test_lsm_scan,
    test_lsm_begin,
    test_lsm_commit,
    test_lsm_rollback
  };

  int rc;
  int nFilename;
  LsmDb *pDb;

  /* If the bClear flag is set, delete any existing database. */
  assert( zFilename);
  if( bClear ) testDeleteLsmdb(zFilename);
  nFilename = strlen(zFilename);

  pDb = (LsmDb *)testMalloc(sizeof(LsmDb) + nFilename + 1);
  memset(pDb, 0, sizeof(LsmDb));
  pDb->base.pMethods = &LsmMethods;
  pDb->zName = (char *)&pDb[1];
  memcpy(pDb->zName, zFilename, nFilename + 1);

  /* Default the sector size used for crash simulation to 512 bytes. 
  ** Todo: There should be an OS method to obtain this value - just as
  ** there is in SQLite. For now, LSM assumes that it is smaller than
  ** the page size (default 4KB).
  */
  pDb->szSector = 256;

  /* Default values for the mt_min_ckpt and mt_max_ckpt parameters. */
  pDb->nMtMinCkpt = LSMTEST_DFLT_MT_MIN_CKPT;
  pDb->nMtMaxCkpt = LSMTEST_DFLT_MT_MAX_CKPT;

  memcpy(&pDb->env, tdb_lsm_env(), sizeof(lsm_env));
  pDb->env.pVfsCtx = (void *)pDb;
  pDb->env.xFullpath = testEnvFullpath;
  pDb->env.xOpen = testEnvOpen;
  pDb->env.xRead = testEnvRead;
  pDb->env.xWrite = testEnvWrite;
  pDb->env.xTruncate = testEnvTruncate;
  pDb->env.xSync = testEnvSync;
  pDb->env.xSectorSize = testEnvSectorSize;
  pDb->env.xRemap = testEnvRemap;
  pDb->env.xFileid = testEnvFileid;
  pDb->env.xClose = testEnvClose;
  pDb->env.xUnlink = testEnvUnlink;
  pDb->env.xLock = testEnvLock;
  pDb->env.xTestLock = testEnvTestLock;
  pDb->env.xShmBarrier = testEnvShmBarrier;
  pDb->env.xShmMap = testEnvShmMap;
  pDb->env.xShmUnmap = testEnvShmUnmap;
  pDb->env.xSleep = testEnvSleep;

  rc = lsm_new(&pDb->env, &pDb->db);
  if( rc==LSM_OK ){
    int nThread = 1;
    lsm_config_log(pDb->db, xLog, 0);
    lsm_config_work_hook(pDb->db, xWorkHook, (void *)pDb);

    rc = test_lsm_config_str(pDb, pDb->db, 0, zCfg, &nThread);
    if( rc==LSM_OK ) rc = lsm_open(pDb->db, zFilename);

    pDb->eMode = nThread;
#ifdef LSM_MUTEX_PTHREADS
    if( rc==LSM_OK && nThread>1 ){
      testLsmStartWorkers(pDb, nThread, zFilename, zCfg);
    }
#endif

    if( rc!=LSM_OK ){
      test_lsm_close((TestDb *)pDb);
      pDb = 0;
    }
  }

  *ppDb = (TestDb *)pDb;
  return rc;
}

int test_lsm_open(
  const char *zSpec, 
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  return testLsmOpen(zSpec, zFilename, bClear, ppDb);
}

int test_lsm_small_open(
  const char *zSpec, 
  const char *zFile, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = "page_size=256 block_size=64 mmap=1024";
  return testLsmOpen(zCfg, zFile, bClear, ppDb);
}

int test_lsm_lomem_open(
  const char *zSpec, 
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
    /* "max_freelist=4 autocheckpoint=32" */
  const char *zCfg = 
    "page_size=256 block_size=64 autoflush=16 "
    "autocheckpoint=32"
    "mmap=0 "
  ;
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

int test_lsm_lomem2_open(
  const char *zSpec, 
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
    /* "max_freelist=4 autocheckpoint=32" */
  const char *zCfg = 
    "page_size=512 block_size=64 autoflush=0 mmap=0 "
  ;
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

int test_lsm_zip_open(
  const char *zSpec, 
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = 
    "page_size=256 block_size=64 autoflush=16 "
    "autocheckpoint=32 compression=1 mmap=0 "
  ;
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

lsm_db *tdb_lsm(TestDb *pDb){
  if( pDb->pMethods->xClose==test_lsm_close ){
    return ((LsmDb *)pDb)->db;
  }
  return 0;
}

int tdb_lsm_multithread(TestDb *pDb){
  int ret = 0;
  if( tdb_lsm(pDb) ){
    ret = ((LsmDb*)pDb)->eMode!=LSMTEST_MODE_SINGLETHREAD;
  }
  return ret;
}

void tdb_lsm_enable_log(TestDb *pDb, int bEnable){
  lsm_db *db = tdb_lsm(pDb);
  if( db ){
    lsm_config_log(db, (bEnable ? xLog : 0), (void *)"client");
  }
}

void tdb_lsm_application_crash(TestDb *pDb){
  if( tdb_lsm(pDb) ){
    LsmDb *p = (LsmDb *)pDb;
    p->bCrashed = 1;
  }
}

void tdb_lsm_prepare_system_crash(TestDb *pDb){
  if( tdb_lsm(pDb) ){
    LsmDb *p = (LsmDb *)pDb;
    p->bPrepareCrash = 1;
  }
}

void tdb_lsm_system_crash(TestDb *pDb){
  if( tdb_lsm(pDb) ){
    LsmDb *p = (LsmDb *)pDb;
    p->bCrashed = 1;
    doSystemCrash(p);
  }
}

void tdb_lsm_safety(TestDb *pDb, int eMode){
  assert( eMode==LSM_SAFETY_OFF 
       || eMode==LSM_SAFETY_NORMAL 
       || eMode==LSM_SAFETY_FULL 
  );
  if( tdb_lsm(pDb) ){
    int iParam = eMode;
    LsmDb *p = (LsmDb *)pDb;
    lsm_config(p->db, LSM_CONFIG_SAFETY, &iParam);
  }
}

void tdb_lsm_prepare_sync_crash(TestDb *pDb, int iSync){
  assert( iSync>0 );
  if( tdb_lsm(pDb) ){
    LsmDb *p = (LsmDb *)pDb;
    p->nAutoCrash = iSync;
    p->bPrepareCrash = 1;
  }
}

void tdb_lsm_config_work_hook(
  TestDb *pDb, 
  void (*xWork)(lsm_db *, void *), 
  void *pWorkCtx
){
  if( tdb_lsm(pDb) ){
    LsmDb *p = (LsmDb *)pDb;
    p->xWork = xWork;
    p->pWorkCtx = pWorkCtx;
  }
}

void tdb_lsm_write_hook(
  TestDb *pDb, 
  void (*xWrite)(void *, int, lsm_i64, int, int),
  void *pWriteCtx
){
  if( tdb_lsm(pDb) ){
    LsmDb *p = (LsmDb *)pDb;
    p->xWriteHook = xWrite;
    p->pWriteCtx = pWriteCtx;
  }
}

int tdb_lsm_open(const char *zCfg, const char *zDb, int bClear, TestDb **ppDb){
  return testLsmOpen(zCfg, zDb, bClear, ppDb);
}

#ifdef LSM_MUTEX_PTHREADS

/*
** Signal worker thread iWorker that there may be work to do.
*/
static void mt_signal_worker(LsmDb *pDb, int iWorker){
  LsmWorker *p = &pDb->aWorker[iWorker];
  pthread_mutex_lock(&p->worker_mutex);
  p->bDoWork = 1;
  pthread_cond_signal(&p->worker_cond);
  pthread_mutex_unlock(&p->worker_mutex);
}

/*
** This routine is used as the main() for all worker threads.
*/
static void *worker_main(void *pArg){
  LsmWorker *p = (LsmWorker *)pArg;
  lsm_db *pWorker;                /* Connection to access db through */

  pthread_mutex_lock(&p->worker_mutex);
  while( (pWorker = p->pWorker) ){
    int rc = LSM_OK;

    /* Do some work. If an error occurs, exit. */

    pthread_mutex_unlock(&p->worker_mutex);
    if( p->eType==LSMTEST_THREAD_CKPT ){
      int nKB = 0;
      rc = lsm_info(pWorker, LSM_INFO_CHECKPOINT_SIZE, &nKB);
      if( rc==LSM_OK && nKB>=p->pDb->nMtMinCkpt ){
        rc = lsm_checkpoint(pWorker, 0);
      }
    }else{
      int nWrite;
      do {

        if( p->eType==LSMTEST_THREAD_WORKER ){
          waitOnCheckpointer(p->pDb, pWorker);
        }

        nWrite = 0;
        rc = lsm_work(pWorker, 0, 256, &nWrite);

        if( p->eType==LSMTEST_THREAD_WORKER && nWrite ){
          mt_signal_worker(p->pDb, 1);
        }
      }while( nWrite && p->pWorker );
    }
    pthread_mutex_lock(&p->worker_mutex);

    if( rc!=LSM_OK && rc!=LSM_BUSY ){
      p->worker_rc = rc;
      break;
    }

    /* The thread will wake up when it is signaled either because another
    ** thread has created some work for this one or because the connection
    ** is being closed.  */
    if( p->pWorker && p->bDoWork==0 ){
      pthread_cond_wait(&p->worker_cond, &p->worker_mutex);
    }
    p->bDoWork = 0;
  }
  pthread_mutex_unlock(&p->worker_mutex);
  
  return 0;
}


static void mt_stop_worker(LsmDb *pDb, int iWorker){
  LsmWorker *p = &pDb->aWorker[iWorker];
  if( p->pWorker ){
    void *pDummy;
    lsm_db *pWorker;

    /* Signal the worker to stop */
    pthread_mutex_lock(&p->worker_mutex);
    pWorker = p->pWorker;
    p->pWorker = 0;
    pthread_cond_signal(&p->worker_cond);
    pthread_mutex_unlock(&p->worker_mutex);

    /* Join the worker thread. */
    pthread_join(p->worker_thread, &pDummy);

    /* Free resources allocated in mt_start_worker() */
    pthread_cond_destroy(&p->worker_cond);
    pthread_mutex_destroy(&p->worker_mutex);
    lsm_close(pWorker);
  }
}

static void mt_shutdown(LsmDb *pDb){
  int i;
  for(i=0; i<pDb->nWorker; i++){
    mt_stop_worker(pDb, i);
  }
}

/*
** This callback is invoked by LSM when the client database writes to
** the database file (i.e. to flush the contents of the in-memory tree).
** This implies there may be work to do on the database, so signal
** the worker threads.
*/
static void mt_client_work_hook(lsm_db *db, void *pArg){
  LsmDb *pDb = (LsmDb *)pArg;     /* LsmDb database handle */

  /* Invoke the user level work-hook, if any. */
  if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx);

  /* Wake up worker thread 0. */
  mt_signal_worker(pDb, 0);
}

static void mt_worker_work_hook(lsm_db *db, void *pArg){
  LsmDb *pDb = (LsmDb *)pArg;     /* LsmDb database handle */

  /* Invoke the user level work-hook, if any. */
  if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx);
}

/*
** Launch worker thread iWorker for database connection pDb.
*/
static int mt_start_worker(
  LsmDb *pDb,                     /* Main database structure */
  int iWorker,                    /* Worker number to start */
  const char *zFilename,          /* File name of database to open */
  const char *zCfg,               /* Connection configuration string */
  int eType                       /* Type of worker thread */
){
  int rc = 0;                     /* Return code */
  LsmWorker *p;                   /* Object to initialize */

  assert( iWorker<pDb->nWorker );
  assert( eType==LSMTEST_THREAD_CKPT 
       || eType==LSMTEST_THREAD_WORKER 
       || eType==LSMTEST_THREAD_WORKER_AC 
  );

  p = &pDb->aWorker[iWorker];
  p->eType = eType;
  p->pDb = pDb;

  /* Open the worker connection */
  if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker);
  if( zCfg ){
    test_lsm_config_str(pDb, p->pWorker, 1, zCfg, 0);
  }
  if( rc==0 ) rc = lsm_open(p->pWorker, zFilename);
  lsm_config_log(p->pWorker, xLog, (void *)"worker");

  /* Configure the work-hook */
  if( rc==0 ){
    lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb);
  }

  if( eType==LSMTEST_THREAD_WORKER ){
    test_lsm_config_str(0, p->pWorker, 1, "autocheckpoint=0", 0);
  }

  /* Kick off the worker thread. */
  if( rc==0 ) rc = pthread_cond_init(&p->worker_cond, 0);
  if( rc==0 ) rc = pthread_mutex_init(&p->worker_mutex, 0);
  if( rc==0 ) rc = pthread_create(&p->worker_thread, 0, worker_main, (void *)p);

  return rc;
}


static int testLsmStartWorkers(
  LsmDb *pDb, int eModel, const char *zFilename, const char *zCfg
){
  int rc;

  if( eModel<1 || eModel>4 ) return 1;
  if( eModel==1 ) return 0;

  /* Configure a work-hook for the client connection. Worker 0 is signalled
  ** every time the users connection writes to the database.  */
  lsm_config_work_hook(pDb->db, mt_client_work_hook, (void *)pDb);

  /* Allocate space for two worker connections. They may not both be
  ** used, but both are allocated.  */
  pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * 2);
  memset(pDb->aWorker, 0, sizeof(LsmWorker) * 2);

  switch( eModel ){
    case LSMTEST_MODE_BACKGROUND_CKPT:
      pDb->nWorker = 1;
      test_lsm_config_str(0, pDb->db, 0, "autocheckpoint=0", 0);
      rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_CKPT);
      break;

    case LSMTEST_MODE_BACKGROUND_WORK:
      pDb->nWorker = 1;
      test_lsm_config_str(0, pDb->db, 0, "autowork=0", 0);
      rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_WORKER_AC);
      break;

    case LSMTEST_MODE_BACKGROUND_BOTH:
      pDb->nWorker = 2;
      test_lsm_config_str(0, pDb->db, 0, "autowork=0", 0);
      rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_WORKER);
      if( rc==0 ){
        rc = mt_start_worker(pDb, 1, zFilename, zCfg, LSMTEST_THREAD_CKPT);
      }
      break;
  }

  return rc;
}


int test_lsm_mt2(
  const char *zSpec, 
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = "mt_mode=2";
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

int test_lsm_mt3(
  const char *zSpec, 
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = "mt_mode=4";
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

#else
static void mt_shutdown(LsmDb *pDb) { 
  unused_parameter(pDb); 
}
int test_lsm_mt(const char *zFilename, int bClear, TestDb **ppDb){
  unused_parameter(zFilename);
  unused_parameter(bClear);
  unused_parameter(ppDb);
  testPrintError("threads unavailable - recompile with LSM_MUTEX_PTHREADS\n");
  return 1;
}
#endif