#include "sqliteInt.h"
#include "vdbeInt.h"
#if 0#endif
#define SQLITE_MAX_PMASZ (1<<29)
typedef struct MergeEngine MergeEngine;
typedef struct PmaReader PmaReader;
typedef struct PmaWriter PmaWriter;
typedef struct SorterRecord SorterRecord;
typedef struct SortSubtask SortSubtask;
typedef struct SorterFile SorterFile;
typedef struct SorterList SorterList;
typedef struct IncrMerger IncrMerger;
struct SorterFile {
sqlite3_file *pFd;
i64 iEof;
};
struct SorterList {
SorterRecord *pList;
u8 *aMemory;
int szPMA;
};
struct MergeEngine {
int nTree;
SortSubtask *pTask;
int *aTree;
PmaReader *aReadr;
};
typedef int (*SorterCompare)(SortSubtask*,int*,const void*,int,const void*,int);
struct SortSubtask {
SQLiteThread *pThread;
int bDone;
VdbeSorter *pSorter;
UnpackedRecord *pUnpacked;
SorterList list;
int nPMA;
SorterCompare xCompare;
SorterFile file;
SorterFile file2;
};
struct VdbeSorter {
int mnPmaSize;
int mxPmaSize;
int mxKeysize;
int pgsz;
PmaReader *pReader;
MergeEngine *pMerger;
sqlite3 *db;
KeyInfo *pKeyInfo;
UnpackedRecord *pUnpacked;
SorterList list;
int iMemory;
int nMemory;
u8 bUsePMA;
u8 bUseThreads;
u8 iPrev;
u8 nTask;
u8 typeMask;
SortSubtask aTask[1];
};
#define SORTER_TYPE_INTEGER 0x01
#define SORTER_TYPE_TEXT 0x02
struct PmaReader {
i64 iReadOff;
i64 iEof;
int nAlloc;
int nKey;
sqlite3_file *pFd;
u8 *aAlloc;
u8 *aKey;
u8 *aBuffer;
int nBuffer;
u8 *aMap;
IncrMerger *pIncr;
};
struct IncrMerger {
SortSubtask *pTask;
MergeEngine *pMerger;
i64 iStartOff;
int mxSz;
int bEof;
int bUseThread;
SorterFile aFile[2];
};
struct PmaWriter {
int eFWErr;
u8 *aBuffer;
int nBuffer;
int iBufStart;
int iBufEnd;
i64 iWriteOff;
sqlite3_file *pFd;
};
struct SorterRecord {
int nVal;
union {
SorterRecord *pNext;
int iNext;
} u;
};
#define SRVAL(p) ((void*)((SorterRecord*)(p) + 1))
#define SORTER_MAX_MERGE_COUNT 16
static int vdbeIncrSwap(IncrMerger*);
static void vdbeIncrFree(IncrMerger *);
static void vdbePmaReaderClear(PmaReader *pReadr){
sqlite3_free(pReadr->aAlloc);
sqlite3_free(pReadr->aBuffer);
if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
vdbeIncrFree(pReadr->pIncr);
memset(pReadr, 0, sizeof(PmaReader));
}
static int vdbePmaReadBlob(
PmaReader *p,
int nByte,
u8 **ppOut
){
int iBuf;
int nAvail;
if( p->aMap ){
*ppOut = &p->aMap[p->iReadOff];
p->iReadOff += nByte;
return SQLITE_OK;
}
assert( p->aBuffer );
iBuf = p->iReadOff % p->nBuffer;
if( iBuf==0 ){
int nRead;
int rc;
if( (p->iEof - p->iReadOff) > (i64)p->nBuffer ){
nRead = p->nBuffer;
}else{
nRead = (int)(p->iEof - p->iReadOff);
}
assert( nRead>0 );
rc = sqlite3OsRead(p->pFd, p->aBuffer, nRead, p->iReadOff);
assert( rc!=SQLITE_IOERR_SHORT_READ );
if( rc!=SQLITE_OK ) return rc;
}
nAvail = p->nBuffer - iBuf;
if( nByte<=nAvail ){
*ppOut = &p->aBuffer[iBuf];
p->iReadOff += nByte;
}else{
int nRem;
if( p->nAlloc<nByte ){
u8 *aNew;
sqlite3_int64 nNew = MAX(128, 2*(sqlite3_int64)p->nAlloc);
while( nByte>nNew ) nNew = nNew*2;
aNew = sqlite3Realloc(p->aAlloc, nNew);
if( !aNew ) return SQLITE_NOMEM_BKPT;
p->nAlloc = nNew;
p->aAlloc = aNew;
}
memcpy(p->aAlloc, &p->aBuffer[iBuf], nAvail);
p->iReadOff += nAvail;
nRem = nByte - nAvail;
while( nRem>0 ){
int rc;
int nCopy;
u8 *aNext;
nCopy = nRem;
if( nRem>p->nBuffer ) nCopy = p->nBuffer;
rc = vdbePmaReadBlob(p, nCopy, &aNext);
if( rc!=SQLITE_OK ) return rc;
assert( aNext!=p->aAlloc );
memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy);
nRem -= nCopy;
}
*ppOut = p->aAlloc;
}
return SQLITE_OK;
}
static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){
int iBuf;
if( p->aMap ){
p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut);
}else{
iBuf = p->iReadOff % p->nBuffer;
if( iBuf && (p->nBuffer-iBuf)>=9 ){
p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut);
}else{
u8 aVarint[16], *a;
int i = 0, rc;
do{
rc = vdbePmaReadBlob(p, 1, &a);
if( rc ) return rc;
aVarint[(i++)&0xf] = a[0];
}while( (a[0]&0x80)!=0 );
sqlite3GetVarint(aVarint, pnOut);
}
}
return SQLITE_OK;
}
static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
int rc = SQLITE_OK;
if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){
sqlite3_file *pFd = pFile->pFd;
if( pFd->pMethods->iVersion>=3 ){
rc = sqlite3OsFetch(pFd, 0, (int)pFile->iEof, (void**)pp);
testcase( rc!=SQLITE_OK );
}
}
return rc;
}
static int vdbePmaReaderSeek(
SortSubtask *pTask,
PmaReader *pReadr,
SorterFile *pFile,
i64 iOff
){
int rc = SQLITE_OK;
assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 );
if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ;
if( pReadr->aMap ){
sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
pReadr->aMap = 0;
}
pReadr->iReadOff = iOff;
pReadr->iEof = pFile->iEof;
pReadr->pFd = pFile->pFd;
rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap);
if( rc==SQLITE_OK && pReadr->aMap==0 ){
int pgsz = pTask->pSorter->pgsz;
int iBuf = pReadr->iReadOff % pgsz;
if( pReadr->aBuffer==0 ){
pReadr->aBuffer = (u8*)sqlite3Malloc(pgsz);
if( pReadr->aBuffer==0 ) rc = SQLITE_NOMEM_BKPT;
pReadr->nBuffer = pgsz;
}
if( rc==SQLITE_OK && iBuf ){
int nRead = pgsz - iBuf;
if( (pReadr->iReadOff + nRead) > pReadr->iEof ){
nRead = (int)(pReadr->iEof - pReadr->iReadOff);
}
rc = sqlite3OsRead(
pReadr->pFd, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff
);
testcase( rc!=SQLITE_OK );
}
}
return rc;
}
static int vdbePmaReaderNext(PmaReader *pReadr){
int rc = SQLITE_OK;
u64 nRec = 0;
if( pReadr->iReadOff>=pReadr->iEof ){
IncrMerger *pIncr = pReadr->pIncr;
int bEof = 1;
if( pIncr ){
rc = vdbeIncrSwap(pIncr);
if( rc==SQLITE_OK && pIncr->bEof==0 ){
rc = vdbePmaReaderSeek(
pIncr->pTask, pReadr, &pIncr->aFile[0], pIncr->iStartOff
);
bEof = 0;
}
}
if( bEof ){
vdbePmaReaderClear(pReadr);
testcase( rc!=SQLITE_OK );
return rc;
}
}
if( rc==SQLITE_OK ){
rc = vdbePmaReadVarint(pReadr, &nRec);
}
if( rc==SQLITE_OK ){
pReadr->nKey = (int)nRec;
rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey);
testcase( rc!=SQLITE_OK );
}
return rc;
}
static int vdbePmaReaderInit(
SortSubtask *pTask,
SorterFile *pFile,
i64 iStart,
PmaReader *pReadr,
i64 *pnByte
){
int rc;
assert( pFile->iEof>iStart );
assert( pReadr->aAlloc==0 && pReadr->nAlloc==0 );
assert( pReadr->aBuffer==0 );
assert( pReadr->aMap==0 );
rc = vdbePmaReaderSeek(pTask, pReadr, pFile, iStart);
if( rc==SQLITE_OK ){
u64 nByte = 0;
rc = vdbePmaReadVarint(pReadr, &nByte);
pReadr->iEof = pReadr->iReadOff + nByte;
*pnByte += nByte;
}
if( rc==SQLITE_OK ){
rc = vdbePmaReaderNext(pReadr);
}
return rc;
}
static int vdbeSorterCompareTail(
SortSubtask *pTask,
int *pbKey2Cached,
const void *pKey1, int nKey1,
const void *pKey2, int nKey2
){
UnpackedRecord *r2 = pTask->pUnpacked;
if( *pbKey2Cached==0 ){
sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
*pbKey2Cached = 1;
}
return sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, r2, 1);
}
static int vdbeSorterCompare(
SortSubtask *pTask,
int *pbKey2Cached,
const void *pKey1, int nKey1,
const void *pKey2, int nKey2
){
UnpackedRecord *r2 = pTask->pUnpacked;
if( !*pbKey2Cached ){
sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
*pbKey2Cached = 1;
}
return sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
}
static int vdbeSorterCompareText(
SortSubtask *pTask,
int *pbKey2Cached,
const void *pKey1, int nKey1,
const void *pKey2, int nKey2
){
const u8 * const p1 = (const u8 * const)pKey1;
const u8 * const p2 = (const u8 * const)pKey2;
const u8 * const v1 = &p1[ p1[0] ];
const u8 * const v2 = &p2[ p2[0] ];
int n1;
int n2;
int res;
getVarint32NR(&p1[1], n1);
getVarint32NR(&p2[1], n2);
res = memcmp(v1, v2, (MIN(n1, n2) - 13)/2);
if( res==0 ){
res = n1 - n2;
}
if( res==0 ){
if( pTask->pSorter->pKeyInfo->nKeyField>1 ){
res = vdbeSorterCompareTail(
pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2
);
}
}else{
assert( !(pTask->pSorter->pKeyInfo->aSortFlags[0]&KEYINFO_ORDER_BIGNULL) );
if( pTask->pSorter->pKeyInfo->aSortFlags[0] ){
res = res * -1;
}
}
return res;
}
static int vdbeSorterCompareInt(
SortSubtask *pTask,
int *pbKey2Cached,
const void *pKey1, int nKey1,
const void *pKey2, int nKey2
){
const u8 * const p1 = (const u8 * const)pKey1;
const u8 * const p2 = (const u8 * const)pKey2;
const int s1 = p1[1];
const int s2 = p2[1];
const u8 * const v1 = &p1[ p1[0] ];
const u8 * const v2 = &p2[ p2[0] ];
int res;
assert( (s1>0 && s1<7) || s1==8 || s1==9 );
assert( (s2>0 && s2<7) || s2==8 || s2==9 );
if( s1==s2 ){
static const u8 aLen[] = {0, 1, 2, 3, 4, 6, 8, 0, 0, 0 };
const u8 n = aLen[s1];
int i;
res = 0;
for(i=0; i<n; i++){
if( (res = v1[i] - v2[i])!=0 ){
if( ((v1[0] ^ v2[0]) & 0x80)!=0 ){
res = v1[0] & 0x80 ? -1 : +1;
}
break;
}
}
}else if( s1>7 && s2>7 ){
res = s1 - s2;
}else{
if( s2>7 ){
res = +1;
}else if( s1>7 ){
res = -1;
}else{
res = s1 - s2;
}
assert( res!=0 );
if( res>0 ){
if( *v1 & 0x80 ) res = -1;
}else{
if( *v2 & 0x80 ) res = +1;
}
}
if( res==0 ){
if( pTask->pSorter->pKeyInfo->nKeyField>1 ){
res = vdbeSorterCompareTail(
pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2
);
}
}else if( pTask->pSorter->pKeyInfo->aSortFlags[0] ){
assert( !(pTask->pSorter->pKeyInfo->aSortFlags[0]&KEYINFO_ORDER_BIGNULL) );
res = res * -1;
}
return res;
}
int sqlite3VdbeSorterInit(
sqlite3 *db,
int nField,
VdbeCursor *pCsr
){
int pgsz;
int i;
VdbeSorter *pSorter;
KeyInfo *pKeyInfo;
int szKeyInfo;
int sz;
int rc = SQLITE_OK;
#if SQLITE_MAX_WORKER_THREADS==0
# define nWorker 0
#else
int nWorker;
#endif
#if SQLITE_MAX_WORKER_THREADS>0
if( sqlite3TempInMemory(db) || sqlite3GlobalConfig.bCoreMutex==0 ){
nWorker = 0;
}else{
nWorker = db->aLimit[SQLITE_LIMIT_WORKER_THREADS];
}
#endif
#if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT
if( nWorker>=SORTER_MAX_MERGE_COUNT ){
nWorker = SORTER_MAX_MERGE_COUNT-1;
}
#endif
assert( pCsr->pKeyInfo );
assert( !pCsr->isEphemeral );
assert( pCsr->eCurType==CURTYPE_SORTER );
szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nKeyField-1)*sizeof(CollSeq*);
sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask);
pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
pCsr->uc.pSorter = pSorter;
if( pSorter==0 ){
rc = SQLITE_NOMEM_BKPT;
}else{
Btree *pBt = db->aDb[0].pBt;
pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
pKeyInfo->db = 0;
if( nField && nWorker==0 ){
pKeyInfo->nKeyField = nField;
}
sqlite3BtreeEnter(pBt);
pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(pBt);
sqlite3BtreeLeave(pBt);
pSorter->nTask = nWorker + 1;
pSorter->iPrev = (u8)(nWorker - 1);
pSorter->bUseThreads = (pSorter->nTask>1);
pSorter->db = db;
for(i=0; i<pSorter->nTask; i++){
SortSubtask *pTask = &pSorter->aTask[i];
pTask->pSorter = pSorter;
}
if( !sqlite3TempInMemory(db) ){
i64 mxCache;
u32 szPma = sqlite3GlobalConfig.szPma;
pSorter->mnPmaSize = szPma * pgsz;
mxCache = db->aDb[0].pSchema->cache_size;
if( mxCache<0 ){
mxCache = mxCache * -1024;
}else{
mxCache = mxCache * pgsz;
}
mxCache = MIN(mxCache, SQLITE_MAX_PMASZ);
pSorter->mxPmaSize = MAX(pSorter->mnPmaSize, (int)mxCache);
if( sqlite3GlobalConfig.bSmallMalloc==0 ){
assert( pSorter->iMemory==0 );
pSorter->nMemory = pgsz;
pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz);
if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM_BKPT;
}
}
if( pKeyInfo->nAllField<13
&& (pKeyInfo->aColl[0]==0 || pKeyInfo->aColl[0]==db->pDfltColl)
&& (pKeyInfo->aSortFlags[0] & KEYINFO_ORDER_BIGNULL)==0
){
pSorter->typeMask = SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT;
}
}
return rc;
}
#undef nWorker
static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){
SorterRecord *p;
SorterRecord *pNext;
for(p=pRecord; p; p=pNext){
pNext = p->u.pNext;
sqlite3DbFree(db, p);
}
}
static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
sqlite3DbFree(db, pTask->pUnpacked);
#if SQLITE_MAX_WORKER_THREADS>0
if( pTask->list.aMemory ){
sqlite3_free(pTask->list.aMemory);
}else
#endif
{
assert( pTask->list.aMemory==0 );
vdbeSorterRecordFree(0, pTask->list.pList);
}
if( pTask->file.pFd ){
sqlite3OsCloseFree(pTask->file.pFd);
}
if( pTask->file2.pFd ){
sqlite3OsCloseFree(pTask->file2.pFd);
}
memset(pTask, 0, sizeof(SortSubtask));
}
#ifdef SQLITE_DEBUG_SORTER_THREADS
static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
i64 t;
int iTask = (pTask - pTask->pSorter->aTask);
sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
}
static void vdbeSorterRewindDebug(const char *zEvent){
i64 t = 0;
sqlite3_vfs *pVfs = sqlite3_vfs_find(0);
if( ALWAYS(pVfs) ) sqlite3OsCurrentTimeInt64(pVfs, &t);
fprintf(stderr, "%lld:X %s\n", t, zEvent);
}
static void vdbeSorterPopulateDebug(
SortSubtask *pTask,
const char *zEvent
){
i64 t;
int iTask = (pTask - pTask->pSorter->aTask);
sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
}
static void vdbeSorterBlockDebug(
SortSubtask *pTask,
int bBlocked,
const char *zEvent
){
if( bBlocked ){
i64 t;
sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
fprintf(stderr, "%lld:main %s\n", t, zEvent);
}
}
#else
# define vdbeSorterWorkDebug(x,y)
# define vdbeSorterRewindDebug(y)
# define vdbeSorterPopulateDebug(x,y)
# define vdbeSorterBlockDebug(x,y,z)
#endif
#if SQLITE_MAX_WORKER_THREADS>0
static int vdbeSorterJoinThread(SortSubtask *pTask){
int rc = SQLITE_OK;
if( pTask->pThread ){
#ifdef SQLITE_DEBUG_SORTER_THREADS
int bDone = pTask->bDone;
#endif
void *pRet = SQLITE_INT_TO_PTR(SQLITE_ERROR);
vdbeSorterBlockDebug(pTask, !bDone, "enter");
(void)sqlite3ThreadJoin(pTask->pThread, &pRet);
vdbeSorterBlockDebug(pTask, !bDone, "exit");
rc = SQLITE_PTR_TO_INT(pRet);
assert( pTask->bDone==1 );
pTask->bDone = 0;
pTask->pThread = 0;
}
return rc;
}
static int vdbeSorterCreateThread(
SortSubtask *pTask,
void *(*xTask)(void*),
void *pIn
){
assert( pTask->pThread==0 && pTask->bDone==0 );
return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn);
}
static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
int rc = rcin;
int i;
for(i=pSorter->nTask-1; i>=0; i--){
SortSubtask *pTask = &pSorter->aTask[i];
int rc2 = vdbeSorterJoinThread(pTask);
if( rc==SQLITE_OK ) rc = rc2;
}
return rc;
}
#else
# define vdbeSorterJoinAll(x,rcin) (rcin)
# define vdbeSorterJoinThread(pTask) SQLITE_OK
#endif
static MergeEngine *vdbeMergeEngineNew(int nReader){
int N = 2;
int nByte;
MergeEngine *pNew;
assert( nReader<=SORTER_MAX_MERGE_COUNT );
while( N<nReader ) N += N;
nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte);
if( pNew ){
pNew->nTree = N;
pNew->pTask = 0;
pNew->aReadr = (PmaReader*)&pNew[1];
pNew->aTree = (int*)&pNew->aReadr[N];
}
return pNew;
}
static void vdbeMergeEngineFree(MergeEngine *pMerger){
int i;
if( pMerger ){
for(i=0; i<pMerger->nTree; i++){
vdbePmaReaderClear(&pMerger->aReadr[i]);
}
}
sqlite3_free(pMerger);
}
static void vdbeIncrFree(IncrMerger *pIncr){
if( pIncr ){
#if SQLITE_MAX_WORKER_THREADS>0
if( pIncr->bUseThread ){
vdbeSorterJoinThread(pIncr->pTask);
if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
}
#endif
vdbeMergeEngineFree(pIncr->pMerger);
sqlite3_free(pIncr);
}
}
void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
int i;
(void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
assert( pSorter->bUseThreads || pSorter->pReader==0 );
#if SQLITE_MAX_WORKER_THREADS>0
if( pSorter->pReader ){
vdbePmaReaderClear(pSorter->pReader);
sqlite3DbFree(db, pSorter->pReader);
pSorter->pReader = 0;
}
#endif
vdbeMergeEngineFree(pSorter->pMerger);
pSorter->pMerger = 0;
for(i=0; i<pSorter->nTask; i++){
SortSubtask *pTask = &pSorter->aTask[i];
vdbeSortSubtaskCleanup(db, pTask);
pTask->pSorter = pSorter;
}
if( pSorter->list.aMemory==0 ){
vdbeSorterRecordFree(0, pSorter->list.pList);
}
pSorter->list.pList = 0;
pSorter->list.szPMA = 0;
pSorter->bUsePMA = 0;
pSorter->iMemory = 0;
pSorter->mxKeysize = 0;
sqlite3DbFree(db, pSorter->pUnpacked);
pSorter->pUnpacked = 0;
}
void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
VdbeSorter *pSorter;
assert( pCsr->eCurType==CURTYPE_SORTER );
pSorter = pCsr->uc.pSorter;
if( pSorter ){
sqlite3VdbeSorterReset(db, pSorter);
sqlite3_free(pSorter->list.aMemory);
sqlite3DbFree(db, pSorter);
pCsr->uc.pSorter = 0;
}
}
#if SQLITE_MAX_MMAP_SIZE>0
static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFd, i64 nByte){
if( nByte<=(i64)(db->nMaxSorterMmap) && pFd->pMethods->iVersion>=3 ){
void *p = 0;
int chunksize = 4*1024;
sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_CHUNK_SIZE, &chunksize);
sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_SIZE_HINT, &nByte);
sqlite3OsFetch(pFd, 0, (int)nByte, &p);
if( p ) sqlite3OsUnfetch(pFd, 0, p);
}
}
#else
# define vdbeSorterExtendFile(x,y,z)
#endif
static int vdbeSorterOpenTempFile(
sqlite3 *db,
i64 nExtend,
sqlite3_file **ppFd
){
int rc;
if( sqlite3FaultSim(202) ) return SQLITE_IOERR_ACCESS;
rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFd,
SQLITE_OPEN_TEMP_JOURNAL |
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE |
SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &rc
);
if( rc==SQLITE_OK ){
i64 max = SQLITE_MAX_MMAP_SIZE;
sqlite3OsFileControlHint(*ppFd, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
if( nExtend>0 ){
vdbeSorterExtendFile(db, *ppFd, nExtend);
}
}
return rc;
}
static int vdbeSortAllocUnpacked(SortSubtask *pTask){
if( pTask->pUnpacked==0 ){
pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pTask->pSorter->pKeyInfo);
if( pTask->pUnpacked==0 ) return SQLITE_NOMEM_BKPT;
pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nKeyField;
pTask->pUnpacked->errCode = 0;
}
return SQLITE_OK;
}
static SorterRecord *vdbeSorterMerge(
SortSubtask *pTask,
SorterRecord *p1,
SorterRecord *p2
){
SorterRecord *pFinal = 0;
SorterRecord **pp = &pFinal;
int bCached = 0;
assert( p1!=0 && p2!=0 );
for(;;){
int res;
res = pTask->xCompare(
pTask, &bCached, SRVAL(p1), p1->nVal, SRVAL(p2), p2->nVal
);
if( res<=0 ){
*pp = p1;
pp = &p1->u.pNext;
p1 = p1->u.pNext;
if( p1==0 ){
*pp = p2;
break;
}
}else{
*pp = p2;
pp = &p2->u.pNext;
p2 = p2->u.pNext;
bCached = 0;
if( p2==0 ){
*pp = p1;
break;
}
}
}
return pFinal;
}
static SorterCompare vdbeSorterGetCompare(VdbeSorter *p){
if( p->typeMask==SORTER_TYPE_INTEGER ){
return vdbeSorterCompareInt;
}else if( p->typeMask==SORTER_TYPE_TEXT ){
return vdbeSorterCompareText;
}
return vdbeSorterCompare;
}
static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){
int i;
SorterRecord *p;
int rc;
SorterRecord *aSlot[64];
rc = vdbeSortAllocUnpacked(pTask);
if( rc!=SQLITE_OK ) return rc;
p = pList->pList;
pTask->xCompare = vdbeSorterGetCompare(pTask->pSorter);
memset(aSlot, 0, sizeof(aSlot));
while( p ){
SorterRecord *pNext;
if( pList->aMemory ){
if( (u8*)p==pList->aMemory ){
pNext = 0;
}else{
assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) );
pNext = (SorterRecord*)&pList->aMemory[p->u.iNext];
}
}else{
pNext = p->u.pNext;
}
p->u.pNext = 0;
for(i=0; aSlot[i]; i++){
p = vdbeSorterMerge(pTask, p, aSlot[i]);
aSlot[i] = 0;
}
aSlot[i] = p;
p = pNext;
}
p = 0;
for(i=0; i<ArraySize(aSlot); i++){
if( aSlot[i]==0 ) continue;
p = p ? vdbeSorterMerge(pTask, p, aSlot[i]) : aSlot[i];
}
pList->pList = p;
assert( pTask->pUnpacked->errCode==SQLITE_OK
|| pTask->pUnpacked->errCode==SQLITE_NOMEM
);
return pTask->pUnpacked->errCode;
}
static void vdbePmaWriterInit(
sqlite3_file *pFd,
PmaWriter *p,
int nBuf,
i64 iStart
){
memset(p, 0, sizeof(PmaWriter));
p->aBuffer = (u8*)sqlite3Malloc(nBuf);
if( !p->aBuffer ){
p->eFWErr = SQLITE_NOMEM_BKPT;
}else{
p->iBufEnd = p->iBufStart = (iStart % nBuf);
p->iWriteOff = iStart - p->iBufStart;
p->nBuffer = nBuf;
p->pFd = pFd;
}
}
static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){
int nRem = nData;
while( nRem>0 && p->eFWErr==0 ){
int nCopy = nRem;
if( nCopy>(p->nBuffer - p->iBufEnd) ){
nCopy = p->nBuffer - p->iBufEnd;
}
memcpy(&p->aBuffer[p->iBufEnd], &pData[nData-nRem], nCopy);
p->iBufEnd += nCopy;
if( p->iBufEnd==p->nBuffer ){
p->eFWErr = sqlite3OsWrite(p->pFd,
&p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart,
p->iWriteOff + p->iBufStart
);
p->iBufStart = p->iBufEnd = 0;
p->iWriteOff += p->nBuffer;
}
assert( p->iBufEnd<p->nBuffer );
nRem -= nCopy;
}
}
static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){
int rc;
if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){
p->eFWErr = sqlite3OsWrite(p->pFd,
&p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart,
p->iWriteOff + p->iBufStart
);
}
*piEof = (p->iWriteOff + p->iBufEnd);
sqlite3_free(p->aBuffer);
rc = p->eFWErr;
memset(p, 0, sizeof(PmaWriter));
return rc;
}
static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){
int nByte;
u8 aByte[10];
nByte = sqlite3PutVarint(aByte, iVal);
vdbePmaWriteBlob(p, aByte, nByte);
}
static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
sqlite3 *db = pTask->pSorter->db;
int rc = SQLITE_OK;
PmaWriter writer;
#ifdef SQLITE_DEBUG
i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
#endif
vdbeSorterWorkDebug(pTask, "enter");
memset(&writer, 0, sizeof(PmaWriter));
assert( pList->szPMA>0 );
if( pTask->file.pFd==0 ){
rc = vdbeSorterOpenTempFile(db, 0, &pTask->file.pFd);
assert( rc!=SQLITE_OK || pTask->file.pFd );
assert( pTask->file.iEof==0 );
assert( pTask->nPMA==0 );
}
if( rc==SQLITE_OK ){
vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9);
}
if( rc==SQLITE_OK ){
rc = vdbeSorterSort(pTask, pList);
}
if( rc==SQLITE_OK ){
SorterRecord *p;
SorterRecord *pNext = 0;
vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
pTask->file.iEof);
pTask->nPMA++;
vdbePmaWriteVarint(&writer, pList->szPMA);
for(p=pList->pList; p; p=pNext){
pNext = p->u.pNext;
vdbePmaWriteVarint(&writer, p->nVal);
vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
if( pList->aMemory==0 ) sqlite3_free(p);
}
pList->pList = p;
rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
}
vdbeSorterWorkDebug(pTask, "exit");
assert( rc!=SQLITE_OK || pList->pList==0 );
assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
return rc;
}
static int vdbeMergeEngineStep(
MergeEngine *pMerger,
int *pbEof
){
int rc;
int iPrev = pMerger->aTree[1];
SortSubtask *pTask = pMerger->pTask;
rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]);
if( rc==SQLITE_OK ){
int i;
PmaReader *pReadr1;
PmaReader *pReadr2;
int bCached = 0;
pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)];
pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)];
for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){
int iRes;
if( pReadr1->pFd==0 ){
iRes = +1;
}else if( pReadr2->pFd==0 ){
iRes = -1;
}else{
iRes = pTask->xCompare(pTask, &bCached,
pReadr1->aKey, pReadr1->nKey, pReadr2->aKey, pReadr2->nKey
);
}
if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){
pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr);
pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
bCached = 0;
}else{
if( pReadr1->pFd ) bCached = 0;
pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr);
pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
}
}
*pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0);
}
return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc);
}
#if SQLITE_MAX_WORKER_THREADS>0
static void *vdbeSorterFlushThread(void *pCtx){
SortSubtask *pTask = (SortSubtask*)pCtx;
int rc;
assert( pTask->bDone==0 );
rc = vdbeSorterListToPMA(pTask, &pTask->list);
pTask->bDone = 1;
return SQLITE_INT_TO_PTR(rc);
}
#endif
static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
#if SQLITE_MAX_WORKER_THREADS==0
pSorter->bUsePMA = 1;
return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list);
#else
int rc = SQLITE_OK;
int i;
SortSubtask *pTask = 0;
int nWorker = (pSorter->nTask-1);
pSorter->bUsePMA = 1;
for(i=0; i<nWorker; i++){
int iTest = (pSorter->iPrev + i + 1) % nWorker;
pTask = &pSorter->aTask[iTest];
if( pTask->bDone ){
rc = vdbeSorterJoinThread(pTask);
}
if( rc!=SQLITE_OK || pTask->pThread==0 ) break;
}
if( rc==SQLITE_OK ){
if( i==nWorker ){
rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
}else{
u8 *aMem;
void *pCtx;
assert( pTask!=0 );
assert( pTask->pThread==0 && pTask->bDone==0 );
assert( pTask->list.pList==0 );
assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
aMem = pTask->list.aMemory;
pCtx = (void*)pTask;
pSorter->iPrev = (u8)(pTask - pSorter->aTask);
pTask->list = pSorter->list;
pSorter->list.pList = 0;
pSorter->list.szPMA = 0;
if( aMem ){
pSorter->list.aMemory = aMem;
pSorter->nMemory = sqlite3MallocSize(aMem);
}else if( pSorter->list.aMemory ){
pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
if( !pSorter->list.aMemory ) return SQLITE_NOMEM_BKPT;
}
rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
}
}
return rc;
#endif
}
int sqlite3VdbeSorterWrite(
const VdbeCursor *pCsr,
Mem *pVal
){
VdbeSorter *pSorter;
int rc = SQLITE_OK;
SorterRecord *pNew;
int bFlush;
int nReq;
int nPMA;
int t;
assert( pCsr->eCurType==CURTYPE_SORTER );
pSorter = pCsr->uc.pSorter;
getVarint32NR((const u8*)&pVal->z[1], t);
if( t>0 && t<10 && t!=7 ){
pSorter->typeMask &= SORTER_TYPE_INTEGER;
}else if( t>10 && (t & 0x01) ){
pSorter->typeMask &= SORTER_TYPE_TEXT;
}else{
pSorter->typeMask = 0;
}
assert( pSorter );
nReq = pVal->n + sizeof(SorterRecord);
nPMA = pVal->n + sqlite3VarintLen(pVal->n);
if( pSorter->mxPmaSize ){
if( pSorter->list.aMemory ){
bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
}else{
bFlush = (
(pSorter->list.szPMA > pSorter->mxPmaSize)
|| (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
);
}
if( bFlush ){
rc = vdbeSorterFlushPMA(pSorter);
pSorter->list.szPMA = 0;
pSorter->iMemory = 0;
assert( rc!=SQLITE_OK || pSorter->list.pList==0 );
}
}
pSorter->list.szPMA += nPMA;
if( nPMA>pSorter->mxKeysize ){
pSorter->mxKeysize = nPMA;
}
if( pSorter->list.aMemory ){
int nMin = pSorter->iMemory + nReq;
if( nMin>pSorter->nMemory ){
u8 *aNew;
sqlite3_int64 nNew = 2 * (sqlite3_int64)pSorter->nMemory;
int iListOff = -1;
if( pSorter->list.pList ){
iListOff = (u8*)pSorter->list.pList - pSorter->list.aMemory;
}
while( nNew < nMin ) nNew = nNew*2;
if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
if( nNew < nMin ) nNew = nMin;
aNew = sqlite3Realloc(pSorter->list.aMemory, nNew);
if( !aNew ) return SQLITE_NOMEM_BKPT;
if( iListOff>=0 ){
pSorter->list.pList = (SorterRecord*)&aNew[iListOff];
}
pSorter->list.aMemory = aNew;
pSorter->nMemory = nNew;
}
pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory];
pSorter->iMemory += ROUND8(nReq);
if( pSorter->list.pList ){
pNew->u.iNext = (int)((u8*)(pSorter->list.pList) - pSorter->list.aMemory);
}
}else{
pNew = (SorterRecord *)sqlite3Malloc(nReq);
if( pNew==0 ){
return SQLITE_NOMEM_BKPT;
}
pNew->u.pNext = pSorter->list.pList;
}
memcpy(SRVAL(pNew), pVal->z, pVal->n);
pNew->nVal = pVal->n;
pSorter->list.pList = pNew;
return rc;
}
static int vdbeIncrPopulate(IncrMerger *pIncr){
int rc = SQLITE_OK;
int rc2;
i64 iStart = pIncr->iStartOff;
SorterFile *pOut = &pIncr->aFile[1];
SortSubtask *pTask = pIncr->pTask;
MergeEngine *pMerger = pIncr->pMerger;
PmaWriter writer;
assert( pIncr->bEof==0 );
vdbeSorterPopulateDebug(pTask, "enter");
vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart);
while( rc==SQLITE_OK ){
int dummy;
PmaReader *pReader = &pMerger->aReadr[ pMerger->aTree[1] ];
int nKey = pReader->nKey;
i64 iEof = writer.iWriteOff + writer.iBufEnd;
if( pReader->pFd==0 ) break;
if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
vdbePmaWriteVarint(&writer, nKey);
vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
assert( pIncr->pMerger->pTask==pTask );
rc = vdbeMergeEngineStep(pIncr->pMerger, &dummy);
}
rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
if( rc==SQLITE_OK ) rc = rc2;
vdbeSorterPopulateDebug(pTask, "exit");
return rc;
}
#if SQLITE_MAX_WORKER_THREADS>0
static void *vdbeIncrPopulateThread(void *pCtx){
IncrMerger *pIncr = (IncrMerger*)pCtx;
void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
pIncr->pTask->bDone = 1;
return pRet;
}
static int vdbeIncrBgPopulate(IncrMerger *pIncr){
void *p = (void*)pIncr;
assert( pIncr->bUseThread );
return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p);
}
#endif
static int vdbeIncrSwap(IncrMerger *pIncr){
int rc = SQLITE_OK;
#if SQLITE_MAX_WORKER_THREADS>0
if( pIncr->bUseThread ){
rc = vdbeSorterJoinThread(pIncr->pTask);
if( rc==SQLITE_OK ){
SorterFile f0 = pIncr->aFile[0];
pIncr->aFile[0] = pIncr->aFile[1];
pIncr->aFile[1] = f0;
}
if( rc==SQLITE_OK ){
if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
pIncr->bEof = 1;
}else{
rc = vdbeIncrBgPopulate(pIncr);
}
}
}else
#endif
{
rc = vdbeIncrPopulate(pIncr);
pIncr->aFile[0] = pIncr->aFile[1];
if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
pIncr->bEof = 1;
}
}
return rc;
}
static int vdbeIncrMergerNew(
SortSubtask *pTask,
MergeEngine *pMerger,
IncrMerger **ppOut
){
int rc = SQLITE_OK;
IncrMerger *pIncr = *ppOut = (IncrMerger*)
(sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr)));
if( pIncr ){
pIncr->pMerger = pMerger;
pIncr->pTask = pTask;
pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
pTask->file2.iEof += pIncr->mxSz;
}else{
vdbeMergeEngineFree(pMerger);
rc = SQLITE_NOMEM_BKPT;
}
assert( *ppOut!=0 || rc!=SQLITE_OK );
return rc;
}
#if SQLITE_MAX_WORKER_THREADS>0
static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){
pIncr->bUseThread = 1;
pIncr->pTask->file2.iEof -= pIncr->mxSz;
}
#endif
static void vdbeMergeEngineCompare(
MergeEngine *pMerger,
int iOut
){
int i1;
int i2;
int iRes;
PmaReader *p1;
PmaReader *p2;
assert( iOut<pMerger->nTree && iOut>0 );
if( iOut>=(pMerger->nTree/2) ){
i1 = (iOut - pMerger->nTree/2) * 2;
i2 = i1 + 1;
}else{
i1 = pMerger->aTree[iOut*2];
i2 = pMerger->aTree[iOut*2+1];
}
p1 = &pMerger->aReadr[i1];
p2 = &pMerger->aReadr[i2];
if( p1->pFd==0 ){
iRes = i2;
}else if( p2->pFd==0 ){
iRes = i1;
}else{
SortSubtask *pTask = pMerger->pTask;
int bCached = 0;
int res;
assert( pTask->pUnpacked!=0 );
res = pTask->xCompare(
pTask, &bCached, p1->aKey, p1->nKey, p2->aKey, p2->nKey
);
if( res<=0 ){
iRes = i1;
}else{
iRes = i2;
}
}
pMerger->aTree[iOut] = iRes;
}
#define INCRINIT_NORMAL 0
#define INCRINIT_TASK 1
#define INCRINIT_ROOT 2
static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode);
static int vdbeMergeEngineInit(
SortSubtask *pTask,
MergeEngine *pMerger,
int eMode
){
int rc = SQLITE_OK;
int i;
int nTree;
assert( pMerger!=0 );
assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
assert( pMerger->pTask==0 );
pMerger->pTask = pTask;
nTree = pMerger->nTree;
for(i=0; i<nTree; i++){
if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){
rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]);
}else{
rc = vdbePmaReaderIncrInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
}
if( rc!=SQLITE_OK ) return rc;
}
for(i=pMerger->nTree-1; i>0; i--){
vdbeMergeEngineCompare(pMerger, i);
}
return pTask->pUnpacked->errCode;
}
static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){
int rc = SQLITE_OK;
IncrMerger *pIncr = pReadr->pIncr;
SortSubtask *pTask = pIncr->pTask;
sqlite3 *db = pTask->pSorter->db;
assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode);
if( rc==SQLITE_OK ){
int mxSz = pIncr->mxSz;
#if SQLITE_MAX_WORKER_THREADS>0
if( pIncr->bUseThread ){
rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
if( rc==SQLITE_OK ){
rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
}
}else
#endif
{
if( pTask->file2.pFd==0 ){
assert( pTask->file2.iEof>0 );
rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
pTask->file2.iEof = 0;
}
if( rc==SQLITE_OK ){
pIncr->aFile[1].pFd = pTask->file2.pFd;
pIncr->iStartOff = pTask->file2.iEof;
pTask->file2.iEof += mxSz;
}
}
}
#if SQLITE_MAX_WORKER_THREADS>0
if( rc==SQLITE_OK && pIncr->bUseThread ){
assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
rc = vdbeIncrPopulate(pIncr);
}
#endif
if( rc==SQLITE_OK && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) ){
rc = vdbePmaReaderNext(pReadr);
}
return rc;
}
#if SQLITE_MAX_WORKER_THREADS>0
static void *vdbePmaReaderBgIncrInit(void *pCtx){
PmaReader *pReader = (PmaReader*)pCtx;
void *pRet = SQLITE_INT_TO_PTR(
vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK)
);
pReader->pIncr->pTask->bDone = 1;
return pRet;
}
#endif
static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode){
IncrMerger *pIncr = pReadr->pIncr;
int rc = SQLITE_OK;
if( pIncr ){
#if SQLITE_MAX_WORKER_THREADS>0
assert( pIncr->bUseThread==0 || eMode==INCRINIT_TASK );
if( pIncr->bUseThread ){
void *pCtx = (void*)pReadr;
rc = vdbeSorterCreateThread(pIncr->pTask, vdbePmaReaderBgIncrInit, pCtx);
}else
#endif
{
rc = vdbePmaReaderIncrMergeInit(pReadr, eMode);
}
}
return rc;
}
static int vdbeMergeEngineLevel0(
SortSubtask *pTask,
int nPMA,
i64 *piOffset,
MergeEngine **ppOut
){
MergeEngine *pNew;
i64 iOff = *piOffset;
int i;
int rc = SQLITE_OK;
*ppOut = pNew = vdbeMergeEngineNew(nPMA);
if( pNew==0 ) rc = SQLITE_NOMEM_BKPT;
for(i=0; i<nPMA && rc==SQLITE_OK; i++){
i64 nDummy = 0;
PmaReader *pReadr = &pNew->aReadr[i];
rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy);
iOff = pReadr->iEof;
}
if( rc!=SQLITE_OK ){
vdbeMergeEngineFree(pNew);
*ppOut = 0;
}
*piOffset = iOff;
return rc;
}
static int vdbeSorterTreeDepth(int nPMA){
int nDepth = 0;
i64 nDiv = SORTER_MAX_MERGE_COUNT;
while( nDiv < (i64)nPMA ){
nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
nDepth++;
}
return nDepth;
}
static int vdbeSorterAddToTree(
SortSubtask *pTask,
int nDepth,
int iSeq,
MergeEngine *pRoot,
MergeEngine *pLeaf
){
int rc = SQLITE_OK;
int nDiv = 1;
int i;
MergeEngine *p = pRoot;
IncrMerger *pIncr;
rc = vdbeIncrMergerNew(pTask, pLeaf, &pIncr);
for(i=1; i<nDepth; i++){
nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
}
for(i=1; i<nDepth && rc==SQLITE_OK; i++){
int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT;
PmaReader *pReadr = &p->aReadr[iIter];
if( pReadr->pIncr==0 ){
MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
if( pNew==0 ){
rc = SQLITE_NOMEM_BKPT;
}else{
rc = vdbeIncrMergerNew(pTask, pNew, &pReadr->pIncr);
}
}
if( rc==SQLITE_OK ){
p = pReadr->pIncr->pMerger;
nDiv = nDiv / SORTER_MAX_MERGE_COUNT;
}
}
if( rc==SQLITE_OK ){
p->aReadr[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr;
}else{
vdbeIncrFree(pIncr);
}
return rc;
}
static int vdbeSorterMergeTreeBuild(
VdbeSorter *pSorter,
MergeEngine **ppOut
){
MergeEngine *pMain = 0;
int rc = SQLITE_OK;
int iTask;
#if SQLITE_MAX_WORKER_THREADS>0
assert( pSorter->bUseThreads || pSorter->nTask==1 );
if( pSorter->nTask>1 ){
pMain = vdbeMergeEngineNew(pSorter->nTask);
if( pMain==0 ) rc = SQLITE_NOMEM_BKPT;
}
#endif
for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
SortSubtask *pTask = &pSorter->aTask[iTask];
assert( pTask->nPMA>0 || SQLITE_MAX_WORKER_THREADS>0 );
if( SQLITE_MAX_WORKER_THREADS==0 || pTask->nPMA ){
MergeEngine *pRoot = 0;
int nDepth = vdbeSorterTreeDepth(pTask->nPMA);
i64 iReadOff = 0;
if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){
rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot);
}else{
int i;
int iSeq = 0;
pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
if( pRoot==0 ) rc = SQLITE_NOMEM_BKPT;
for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){
MergeEngine *pMerger = 0;
int nReader;
nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT);
rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
if( rc==SQLITE_OK ){
rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger);
}
}
}
if( rc==SQLITE_OK ){
#if SQLITE_MAX_WORKER_THREADS>0
if( pMain!=0 ){
rc = vdbeIncrMergerNew(pTask, pRoot, &pMain->aReadr[iTask].pIncr);
}else
#endif
{
assert( pMain==0 );
pMain = pRoot;
}
}else{
vdbeMergeEngineFree(pRoot);
}
}
}
if( rc!=SQLITE_OK ){
vdbeMergeEngineFree(pMain);
pMain = 0;
}
*ppOut = pMain;
return rc;
}
static int vdbeSorterSetupMerge(VdbeSorter *pSorter){
int rc;
SortSubtask *pTask0 = &pSorter->aTask[0];
MergeEngine *pMain = 0;
#if SQLITE_MAX_WORKER_THREADS
sqlite3 *db = pTask0->pSorter->db;
int i;
SorterCompare xCompare = vdbeSorterGetCompare(pSorter);
for(i=0; i<pSorter->nTask; i++){
pSorter->aTask[i].xCompare = xCompare;
}
#endif
rc = vdbeSorterMergeTreeBuild(pSorter, &pMain);
if( rc==SQLITE_OK ){
#if SQLITE_MAX_WORKER_THREADS
assert( pSorter->bUseThreads==0 || pSorter->nTask>1 );
if( pSorter->bUseThreads ){
int iTask;
PmaReader *pReadr = 0;
SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
rc = vdbeSortAllocUnpacked(pLast);
if( rc==SQLITE_OK ){
pReadr = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
pSorter->pReader = pReadr;
if( pReadr==0 ) rc = SQLITE_NOMEM_BKPT;
}
if( rc==SQLITE_OK ){
rc = vdbeIncrMergerNew(pLast, pMain, &pReadr->pIncr);
if( rc==SQLITE_OK ){
vdbeIncrMergerSetThreads(pReadr->pIncr);
for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
IncrMerger *pIncr;
if( (pIncr = pMain->aReadr[iTask].pIncr) ){
vdbeIncrMergerSetThreads(pIncr);
assert( pIncr->pTask!=pLast );
}
}
for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
PmaReader *p = &pMain->aReadr[iTask];
assert( p->pIncr==0 || (
(p->pIncr->pTask==&pSorter->aTask[iTask])
&& (iTask!=pSorter->nTask-1 || p->pIncr->bUseThread==0)
));
rc = vdbePmaReaderIncrInit(p, INCRINIT_TASK);
}
}
pMain = 0;
}
if( rc==SQLITE_OK ){
rc = vdbePmaReaderIncrMergeInit(pReadr, INCRINIT_ROOT);
}
}else
#endif
{
rc = vdbeMergeEngineInit(pTask0, pMain, INCRINIT_NORMAL);
pSorter->pMerger = pMain;
pMain = 0;
}
}
if( rc!=SQLITE_OK ){
vdbeMergeEngineFree(pMain);
}
return rc;
}
int sqlite3VdbeSorterRewind(const VdbeCursor *pCsr, int *pbEof){
VdbeSorter *pSorter;
int rc = SQLITE_OK;
assert( pCsr->eCurType==CURTYPE_SORTER );
pSorter = pCsr->uc.pSorter;
assert( pSorter );
if( pSorter->bUsePMA==0 ){
if( pSorter->list.pList ){
*pbEof = 0;
rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list);
}else{
*pbEof = 1;
}
return rc;
}
assert( pSorter->list.pList );
rc = vdbeSorterFlushPMA(pSorter);
rc = vdbeSorterJoinAll(pSorter, rc);
vdbeSorterRewindDebug("rewind");
assert( pSorter->pReader==0 );
if( rc==SQLITE_OK ){
rc = vdbeSorterSetupMerge(pSorter);
*pbEof = 0;
}
vdbeSorterRewindDebug("rewinddone");
return rc;
}
int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr){
VdbeSorter *pSorter;
int rc;
assert( pCsr->eCurType==CURTYPE_SORTER );
pSorter = pCsr->uc.pSorter;
assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) );
if( pSorter->bUsePMA ){
assert( pSorter->pReader==0 || pSorter->pMerger==0 );
assert( pSorter->bUseThreads==0 || pSorter->pReader );
assert( pSorter->bUseThreads==1 || pSorter->pMerger );
#if SQLITE_MAX_WORKER_THREADS>0
if( pSorter->bUseThreads ){
rc = vdbePmaReaderNext(pSorter->pReader);
if( rc==SQLITE_OK && pSorter->pReader->pFd==0 ) rc = SQLITE_DONE;
}else
#endif
{
int res = 0;
assert( pSorter->pMerger!=0 );
assert( pSorter->pMerger->pTask==(&pSorter->aTask[0]) );
rc = vdbeMergeEngineStep(pSorter->pMerger, &res);
if( rc==SQLITE_OK && res ) rc = SQLITE_DONE;
}
}else{
SorterRecord *pFree = pSorter->list.pList;
pSorter->list.pList = pFree->u.pNext;
pFree->u.pNext = 0;
if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
rc = pSorter->list.pList ? SQLITE_OK : SQLITE_DONE;
}
return rc;
}
static void *vdbeSorterRowkey(
const VdbeSorter *pSorter,
int *pnKey
){
void *pKey;
if( pSorter->bUsePMA ){
PmaReader *pReader;
#if SQLITE_MAX_WORKER_THREADS>0
if( pSorter->bUseThreads ){
pReader = pSorter->pReader;
}else
#endif
{
pReader = &pSorter->pMerger->aReadr[pSorter->pMerger->aTree[1]];
}
*pnKey = pReader->nKey;
pKey = pReader->aKey;
}else{
*pnKey = pSorter->list.pList->nVal;
pKey = SRVAL(pSorter->list.pList);
}
return pKey;
}
int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){
VdbeSorter *pSorter;
void *pKey; int nKey;
assert( pCsr->eCurType==CURTYPE_SORTER );
pSorter = pCsr->uc.pSorter;
pKey = vdbeSorterRowkey(pSorter, &nKey);
if( sqlite3VdbeMemClearAndResize(pOut, nKey) ){
return SQLITE_NOMEM_BKPT;
}
pOut->n = nKey;
MemSetTypeFlag(pOut, MEM_Blob);
memcpy(pOut->z, pKey, nKey);
return SQLITE_OK;
}
int sqlite3VdbeSorterCompare(
const VdbeCursor *pCsr,
Mem *pVal,
int nKeyCol,
int *pRes
){
VdbeSorter *pSorter;
UnpackedRecord *r2;
KeyInfo *pKeyInfo;
int i;
void *pKey; int nKey;
assert( pCsr->eCurType==CURTYPE_SORTER );
pSorter = pCsr->uc.pSorter;
r2 = pSorter->pUnpacked;
pKeyInfo = pCsr->pKeyInfo;
if( r2==0 ){
r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo);
if( r2==0 ) return SQLITE_NOMEM_BKPT;
r2->nField = nKeyCol;
}
assert( r2->nField==nKeyCol );
pKey = vdbeSorterRowkey(pSorter, &nKey);
sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);
for(i=0; i<nKeyCol; i++){
if( r2->aMem[i].flags & MEM_Null ){
*pRes = -1;
return SQLITE_OK;
}
}
*pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2);
return SQLITE_OK;
}