#include "lsmtest.h"
typedef struct DbParameters DbParameters;
struct DbParameters {
int nFanout;
int nKey;
};
#define DB_KEY_BYTES (2+5+10+1)
static void dbFormatKey(
DbParameters *pParam,
int iLevel,
int iKey,
char *aBuf
){
if( iLevel==0 ){
snprintf(aBuf, DB_KEY_BYTES, "k.%.10d", iKey);
}else{
int f = 1;
int i;
for(i=0; i<iLevel; i++) f = f * pParam->nFanout;
snprintf(aBuf, DB_KEY_BYTES, "c.%d.%.10d", iLevel, f*(iKey/f));
}
}
static void dbFormatCksumValue(u32 iVal, char *aBuf){
snprintf(aBuf, DB_KEY_BYTES, "%.10u", iVal);
}
static int dbMaxLevel(DbParameters *pParam){
int iMax;
int n = 1;
for(iMax=0; n<pParam->nKey; iMax++){
n = n * pParam->nFanout;
}
return iMax;
}
static void dbCksum(
void *pCtx,
void *pKey, int nKey,
void *pVal, int nVal
){
u8 *aVal = (u8 *)pVal;
u32 *pCksum = (u32 *)pCtx;
u32 cksum = *pCksum;
int i;
unused_parameter(pKey);
unused_parameter(nKey);
for(i=0; i<nVal; i++){
cksum += (cksum<<3) + (int)aVal[i];
}
*pCksum = cksum;
}
static u32 dbComputeCksum(
DbParameters *pParam,
TestDb *pDb,
int iLevel,
int iKey,
int *pRc
){
u32 cksum = 0;
if( *pRc==0 ){
int nFirst;
int nLast;
int iFirst = 0;
int iLast = 0;
int i;
int f = 1;
char zFirst[DB_KEY_BYTES];
char zLast[DB_KEY_BYTES];
assert( iLevel>=1 );
for(i=0; i<iLevel; i++) f = f * pParam->nFanout;
iFirst = f*(iKey/f);
iLast = iFirst + f - 1;
dbFormatKey(pParam, iLevel-1, iFirst, zFirst);
dbFormatKey(pParam, iLevel-1, iLast, zLast);
nFirst = strlen(zFirst);
nLast = strlen(zLast);
*pRc = tdb_scan(pDb, (u32*)&cksum, 0, zFirst, nFirst, zLast, nLast,dbCksum);
}
return cksum;
}
static void dbReadOperation(
DbParameters *pParam,
TestDb *pDb,
void (*xDelay)(void *),
void *pDelayCtx,
int iKey,
int *pRc
){
const int iMax = dbMaxLevel(pParam);
int i;
if( tdb_transaction_support(pDb) ) testBegin(pDb, 1, pRc);
for(i=1; *pRc==0 && i<=iMax; i++){
char zCksum[DB_KEY_BYTES];
char zKey[DB_KEY_BYTES];
u32 iCksum = 0;
iCksum = dbComputeCksum(pParam, pDb, i, iKey, pRc);
if( iCksum ){
if( xDelay && i==1 ) xDelay(pDelayCtx);
dbFormatCksumValue(iCksum, zCksum);
dbFormatKey(pParam, i, iKey, zKey);
testFetchStr(pDb, zKey, zCksum, pRc);
}
}
if( tdb_transaction_support(pDb) ) testCommit(pDb, 0, pRc);
}
static int dbWriteOperation(
DbParameters *pParam,
TestDb *pDb,
int iKey,
const char *zValue,
int *pRc
){
const int iMax = dbMaxLevel(pParam);
char zKey[DB_KEY_BYTES];
int i;
int rc;
assert( iKey>=0 && iKey<pParam->nKey );
dbFormatKey(pParam, 0, iKey, zKey);
if( *pRc==0 && tdb_transaction_support(pDb) ){
rc = tdb_begin(pDb, 2);
if( rc==5 ) return 0;
*pRc = rc;
}
testWriteStr(pDb, zKey, zValue, pRc);
for(i=1; i<=iMax; i++){
char zCksum[DB_KEY_BYTES];
u32 iCksum = 0;
iCksum = dbComputeCksum(pParam, pDb, i, iKey, pRc);
dbFormatCksumValue(iCksum, zCksum);
dbFormatKey(pParam, i, iKey, zKey);
testWriteStr(pDb, zKey, zCksum, pRc);
}
if( tdb_transaction_support(pDb) ) testCommit(pDb, 0, pRc);
return 1;
}
typedef struct ThreadSet ThreadSet;
#ifdef LSM_MUTEX_PTHREADS
#include <pthread.h>
#include <unistd.h>
typedef struct Thread Thread;
struct Thread {
int rc;
char *zMsg;
pthread_t id;
void (*xMain)(ThreadSet *, int, void *);
void *pCtx;
ThreadSet *pThreadSet;
};
struct ThreadSet {
int bHalt;
int nThread;
Thread *aThread;
pthread_mutex_t mutex;
};
static int testThreadSupport(){ return 1; }
static ThreadSet *testThreadInit(int nMax){
int nByte;
ThreadSet *p;
nByte = sizeof(ThreadSet) + sizeof(struct Thread) * nMax;
p = (ThreadSet *)testMalloc(nByte);
p->nThread = nMax;
p->aThread = (Thread *)&p[1];
pthread_mutex_init(&p->mutex, 0);
return p;
}
static void testThreadShutdown(ThreadSet *p){
int i;
for(i=0; i<p->nThread; i++){
testFree(p->aThread[i].zMsg);
}
pthread_mutex_destroy(&p->mutex);
testFree(p);
}
static void *ttMain(void *pArg){
Thread *pThread = (Thread *)pArg;
int iThread;
iThread = (pThread - pThread->pThreadSet->aThread);
pThread->xMain(pThread->pThreadSet, iThread, pThread->pCtx);
return 0;
}
static int testThreadLaunch(
ThreadSet *p,
int iThread,
void (*xMain)(ThreadSet *, int, void *),
void *pCtx
){
int rc;
Thread *pThread;
assert( iThread>=0 && iThread<p->nThread );
pThread = &p->aThread[iThread];
assert( pThread->pThreadSet==0 );
pThread->xMain = xMain;
pThread->pCtx = pCtx;
pThread->pThreadSet = p;
rc = pthread_create(&pThread->id, 0, ttMain, (void *)pThread);
return rc;
}
static void testThreadSetHalt(ThreadSet *pThreadSet){
pThreadSet->bHalt = 1;
}
static int testThreadGetHalt(ThreadSet *pThreadSet){
return pThreadSet->bHalt;
}
static void testThreadSleep(ThreadSet *pThreadSet, int nMs){
int nRem = nMs;
while( nRem>0 && testThreadGetHalt(pThreadSet)==0 ){
usleep(50000);
nRem -= 50;
}
}
static void testThreadWait(ThreadSet *pThreadSet, int nMs){
int i;
testThreadSleep(pThreadSet, nMs);
testThreadSetHalt(pThreadSet);
for(i=0; i<pThreadSet->nThread; i++){
Thread *pThread = &pThreadSet->aThread[i];
if( pThread->xMain ){
pthread_join(pThread->id, 0);
}
}
}
static void testThreadSetResult(
ThreadSet *pThreadSet,
int iThread,
int rc,
char *zFmt,
...
){
va_list ap;
testFree(pThreadSet->aThread[iThread].zMsg);
pThreadSet->aThread[iThread].rc = rc;
pThreadSet->aThread[iThread].zMsg = 0;
if( zFmt ){
va_start(ap, zFmt);
pThreadSet->aThread[iThread].zMsg = testMallocVPrintf(zFmt, ap);
va_end(ap);
}
}
static int testThreadGetResult(
ThreadSet *pThreadSet,
int iThread,
const char **pzRes
){
if( pzRes ) *pzRes = pThreadSet->aThread[iThread].zMsg;
return pThreadSet->aThread[iThread].rc;
}
#if 0#endif
#endif
#if !defined(LSM_MUTEX_PTHREADS)
static int testThreadSupport(){ return 0; }
#define testThreadInit(a) 0
#define testThreadShutdown(a)
#define testThreadLaunch(a,b,c,d) 0
#define testThreadWait(a,b)
#define testThreadSetHalt(a)
#define testThreadGetHalt(a) 0
#define testThreadGetResult(a,b,c) 0
#define testThreadSleep(a,b) 0
static void testThreadSetResult(ThreadSet *a, int b, int c, char *d, ...){
unused_parameter(a);
unused_parameter(b);
unused_parameter(c);
unused_parameter(d);
}
#endif
typedef struct Mt1Test Mt1Test;
struct Mt1Test {
DbParameters param;
int nReadwrite;
int nFastReader;
int nSlowReader;
int nMs;
const char *zSystem;
};
typedef struct Mt1DelayCtx Mt1DelayCtx;
struct Mt1DelayCtx {
ThreadSet *pSet;
int nMs;
};
static void xMt1Delay(void *pCtx){
Mt1DelayCtx *p = (Mt1DelayCtx *)pCtx;
testThreadSleep(p->pSet, p->nMs);
}
#define MT1_THREAD_RDWR 0
#define MT1_THREAD_SLOW 1
#define MT1_THREAD_FAST 2
static void xMt1Work(lsm_db *pDb, void *pCtx){
#if 0#endif
}
static void mt1Main(ThreadSet *pThreadSet, int iThread, void *pCtx){
Mt1Test *p = (Mt1Test *)pCtx;
Mt1DelayCtx delay;
int nRead = 0;
int nWrite = 0;
int rc = 0;
int iPrng;
TestDb *pDb;
int eType;
delay.pSet = pThreadSet;
delay.nMs = 0;
if( iThread<p->nReadwrite ){
eType = MT1_THREAD_RDWR;
}else if( iThread<(p->nReadwrite+p->nFastReader) ){
eType = MT1_THREAD_FAST;
}else{
eType = MT1_THREAD_SLOW;
delay.nMs = (p->nMs / 20);
}
iPrng = testPrngValue(iThread);
pDb = testOpen(p->zSystem, 0, &rc);
if( rc==0 ){
tdb_lsm_config_work_hook(pDb, xMt1Work, 0);
}
while( rc==0 && testThreadGetHalt(pThreadSet)==0 ){
int iKey;
iKey = (testPrngValue(iPrng++) % p->param.nKey);
dbReadOperation(&p->param, pDb, xMt1Delay, (void *)&delay, iKey, &rc);
if( rc ) continue;
nRead++;
if( eType==MT1_THREAD_RDWR ){
char aValue[50];
char aRnd[25];
iKey = (testPrngValue(iPrng++) % p->param.nKey);
testPrngString(iPrng, aRnd, sizeof(aRnd));
iPrng += sizeof(aRnd);
snprintf(aValue, sizeof(aValue), "%d.%s", iThread, aRnd);
nWrite += dbWriteOperation(&p->param, pDb, iKey, aValue, &rc);
}
}
testClose(&pDb);
if( rc ){
testThreadSetResult(pThreadSet, iThread, rc, 0);
testThreadSetHalt(pThreadSet);
}else{
testThreadSetResult(pThreadSet, iThread, 0, "r/w: %d/%d", nRead, nWrite);
}
}
static void do_test_mt1(
const char *zSystem,
const char *zPattern,
int *pRc
){
Mt1Test aTest[] = {
{ {10, 1000}, 4, 0, 0, 10000, 0 },
{ {10, 1000}, 4, 4, 2, 100000, 0 },
{ {10, 100000}, 4, 0, 0, 10000, 0 },
{ {10, 100000}, 4, 4, 2, 100000, 0 },
};
int i;
for(i=0; *pRc==0 && i<ArraySize(aTest); i++){
Mt1Test *p = &aTest[i];
int bRun = testCaseBegin(pRc, zPattern,
"mt1.%s.db=%d,%d.ms=%d.rdwr=%d.fast=%d.slow=%d",
zSystem, p->param.nFanout, p->param.nKey,
p->nMs, p->nReadwrite, p->nFastReader, p->nSlowReader
);
if( bRun ){
TestDb *pDb;
ThreadSet *pSet;
int iThread;
int nThread;
p->zSystem = zSystem;
pDb = testOpen(zSystem, 1, pRc);
nThread = p->nReadwrite + p->nFastReader + p->nSlowReader;
pSet = testThreadInit(nThread);
for(iThread=0; *pRc==0 && iThread<nThread; iThread++){
testThreadLaunch(pSet, iThread, mt1Main, (void *)p);
}
testThreadWait(pSet, p->nMs);
for(iThread=0; *pRc==0 && iThread<nThread; iThread++){
*pRc = testThreadGetResult(pSet, iThread, 0);
}
testCaseFinish(*pRc);
for(iThread=0; *pRc==0 && iThread<nThread; iThread++){
const char *zMsg = 0;
*pRc = testThreadGetResult(pSet, iThread, &zMsg);
printf(" Info: thread %d (%d): %s\n", iThread, *pRc, zMsg);
}
testThreadShutdown(pSet);
testClose(&pDb);
}
}
}
void test_mt(
const char *zSystem,
const char *zPattern,
int *pRc
){
if( testThreadSupport()==0 ) return;
do_test_mt1(zSystem, zPattern, pRc);
}