/*
** 2011-08-13
**
** The author disclaims copyright to this source code.  In place of
** a legal notice, here is a blessing:
**
**    May you do good and not evil.
**    May you find forgiveness for yourself and forgive others.
**    May you share freely, never taking more than you give.
**
*************************************************************************
**
** This file contains the implementation of LSM database logging. Logging
** has one purpose in LSM - to make transactions durable.
**
** When data is written to an LSM database, it is initially stored in an
** in-memory tree structure. Since this structure is in volatile memory,
** if a power failure or application crash occurs it may be lost. To
** prevent loss of data in this case, each time a record is written to the
** in-memory tree an equivalent record is appended to the log on disk.
** If a power failure or application crash does occur, data can be recovered
** by reading the log.
**
** A log file consists of the following types of records representing data
** written into the database:
**
**   LOG_WRITE:  A key-value pair written to the database.
**   LOG_DELETE: A delete key issued to the database.
**   LOG_COMMIT: A transaction commit.
**
** And the following types of records for ancillary purposes..
**
**   LOG_EOF:    A record indicating the end of a log file.
**   LOG_PAD1:   A single byte padding record.
**   LOG_PAD2:   An N byte padding record (N>1).
**   LOG_JUMP:   A pointer to another offset within the log file.
**
** Each transaction written to the log contains one or more LOG_WRITE and/or
** LOG_DELETE records, followed by a LOG_COMMIT record. The LOG_COMMIT record
** contains an 8-byte checksum based on all previous data written to the
** log file.
**
** LOG CHECKSUMS & RECOVERY
**
**   Checksums are found in two types of log records: LOG_COMMIT and
**   LOG_CKSUM records. In order to recover content from a log, a client
**   reads each record from the start of the log, calculating a checksum as
**   it does. Each time a LOG_COMMIT or LOG_CKSUM is encountered, the 
**   recovery process verifies that the checksum stored in the log 
**   matches the calculated checksum. If it does not, the recovery process
**   can stop reading the log.
**
**   If a recovery process reads records (other than COMMIT or CKSUM) 
**   consisting of at least LSM_CKSUM_MAXDATA bytes, then the next record in
**   the log must be either a LOG_CKSUM or LOG_COMMIT record. If it is
**   not, the recovery process also stops reading the log.
**
**   To recover the log file, it must be read twice. The first time to 
**   determine the location of the last valid commit record. And the second
**   time to load data into the in-memory tree.
**
**   Todo: Surely there is a better way...
**
** LOG WRAPPING
**
**   If the log file were never deleted or wrapped, it would be possible to
**   read it from start to end each time is required recovery (i.e each time
**   the number of database clients changes from 0 to 1). Effectively reading
**   the entire history of the database each time. This would quickly become 
**   inefficient. Additionally, since the log file would grow without bound,
**   it wastes storage space.
**
**   Instead, part of each checkpoint written into the database file contains 
**   a log offset (and other information required to read the log starting at
**   at this offset) at which to begin recovery. Offset $O.
**
**   Once a checkpoint has been written and synced into the database file, it
**   is guaranteed that no recovery process will need to read any data before
**   offset $O of the log file. It is therefore safe to begin overwriting
**   any data that occurs before offset $O.
**
**   This implementation separates the log into three regions mapped into
**   the log file - regions 0, 1 and 2. During recovery, regions are read
**   in ascending order (i.e. 0, then 1, then 2). Each region is zero or
**   more bytes in size.
**
**     |---1---|..|--0--|.|--2--|....
**
**   New records are always appended to the end of region 2.
**
**   Initially (when it is empty), all three regions are zero bytes in size.
**   Each of them are located at the beginning of the file. As records are
**   added to the log, region 2 grows, so that the log consists of a zero
**   byte region 1, followed by a zero byte region 0, followed by an N byte
**   region 2. After one or more checkpoints have been written to disk, 
**   the start point of region 2 is moved to $O. For example:
**
**     A) ||.........|--2--|....
**   
**   (both regions 0 and 1 are 0 bytes in size at offset 0).
**
**   Eventually, the log wraps around to write new records into the start.
**   At this point, region 2 is renamed to region 0. Region 0 is renamed
**   to region 2. After appending a few records to the new region 2, the
**   log file looks like this:
**
**     B) ||--2--|...|--0--|....
**
**   (region 1 is still 0 bytes in size, located at offset 0).
**
**   Any checkpoints made at this point may reduce the size of region 0.
**   However, if they do not, and region 2 expands so that it is about to
**   overwrite the start of region 0, then region 2 is renamed to region 1,
**   and a new region 2 created at the end of the file following the existing
**   region 0.
**
**     C) |---1---|..|--0--|.|-2-|
**
**   In this state records are appended to region 2 until checkpoints have
**   contracted regions 0 AND 1 UNTil they are both zero bytes in size. They 
**   are then shifted to the start of the log file, leaving the system in 
**   the equivalent of state A above.
**
**   Alternatively, state B may transition directly to state A if the size
**   of region 0 is reduced to zero bytes before region 2 threatens to 
**   encroach upon it.
**
** LOG_PAD1 & LOG_PAD2 RECORDS
**
**   PAD1 and PAD2 records may appear in a log file at any point. They allow
**   a process writing the log file align the beginning of transactions with 
**   the beginning of disk sectors, which increases robustness.
**
** RECORD FORMATS:
**
**   LOG_EOF:    * A single 0x00 byte.
**
**   LOG_PAD1:   * A single 0x01 byte.
**
**   LOG_PAD2:   * A single 0x02 byte, followed by
**               * The number of unused bytes (N) as a varint,
**               * An N byte block of unused space.
**
**   LOG_COMMIT: * A single 0x03 byte.
**               * An 8-byte checksum.
**
**   LOG_JUMP:   * A single 0x04 byte.
**               * Absolute file offset to jump to, encoded as a varint.
**
**   LOG_WRITE:  * A single 0x06 or 0x07 byte, 
**               * The number of bytes in the key, encoded as a varint, 
**               * The number of bytes in the value, encoded as a varint, 
**               * If the first byte was 0x07, an 8 byte checksum.
**               * The key data,
**               * The value data.
**
**   LOG_DELETE: * A single 0x08 or 0x09 byte, 
**               * The number of bytes in the key, encoded as a varint, 
**               * If the first byte was 0x09, an 8 byte checksum.
**               * The key data.
**
**   Varints are as described in lsm_varint.c (SQLite 4 format).
**
** CHECKSUMS:
**
**   The checksum is calculated using two 32-bit unsigned integers, s0 and
**   s1. The initial value for both is 42. It is updated each time a record
**   is written into the log file by treating the encoded (binary) record as 
**   an array of 32-bit little-endian integers. Then, if x[] is the integer
**   array, updating the checksum accumulators as follows:
**
**     for i from 0 to n-1 step 2:
**       s0 += x[i] + s1;
**       s1 += x[i+1] + s0;
**     endfor
**
**   If the record is not an even multiple of 8-bytes in size it is padded
**   with zeroes to make it so before the checksum is updated.
**
**   The checksum stored in a COMMIT, WRITE or DELETE is based on all bytes
**   up to the start of the 8-byte checksum itself, including the COMMIT,
**   WRITE or DELETE fields that appear before the checksum in the record.
**
** VARINT FORMAT
**
** See lsm_varint.c.
*/

#ifndef _LSM_INT_H
# include "lsmInt.h"
#endif

/* Log record types */
#define LSM_LOG_EOF          0x00
#define LSM_LOG_PAD1         0x01
#define LSM_LOG_PAD2         0x02
#define LSM_LOG_COMMIT       0x03
#define LSM_LOG_JUMP         0x04

#define LSM_LOG_WRITE        0x06
#define LSM_LOG_WRITE_CKSUM  0x07

#define LSM_LOG_DELETE       0x08
#define LSM_LOG_DELETE_CKSUM 0x09

#define LSM_LOG_DRANGE       0x0A
#define LSM_LOG_DRANGE_CKSUM 0x0B

/* Require a checksum every 32KB. */
#define LSM_CKSUM_MAXDATA (32*1024)

/* Do not wrap a log file smaller than this in bytes. */
#define LSM_MIN_LOGWRAP      (128*1024)

/*
** szSector:
**   Commit records must be aligned to end on szSector boundaries. If
**   the safety-mode is set to NORMAL or OFF, this value is 1. Otherwise,
**   if the safety-mode is set to FULL, it is the size of the file-system
**   sectors as reported by lsmFsSectorSize().
*/
struct LogWriter {
  u32 cksum0;                     /* Checksum 0 at offset iOff */
  u32 cksum1;                     /* Checksum 1 at offset iOff */
  int iCksumBuf;                  /* Bytes of buf that have been checksummed */
  i64 iOff;                       /* Offset at start of buffer buf */
  int szSector;                   /* Sector size for this transaction */
  LogRegion jump;                 /* Avoid writing to this region */
  i64 iRegion1End;                /* End of first region written by trans */
  i64 iRegion2Start;              /* Start of second regions written by trans */
  LsmString buf;                  /* Buffer containing data not yet written */
};

/*
** Return the result of interpreting the first 4 bytes in buffer aIn as 
** a 32-bit unsigned little-endian integer.
*/
static u32 getU32le(u8 *aIn){
  return ((u32)aIn[3] << 24) 
       + ((u32)aIn[2] << 16) 
       + ((u32)aIn[1] << 8) 
       + ((u32)aIn[0]);
}


/*
** This function is the same as logCksum(), except that pointer "a" need
** not be aligned to an 8-byte boundary or padded with zero bytes. This
** version is slower, but sometimes more convenient to use.
*/
static void logCksumUnaligned(
  char *z,                        /* Input buffer */
  int n,                          /* Size of input buffer in bytes */
  u32 *pCksum0,                   /* IN/OUT: Checksum value 1 */
  u32 *pCksum1                    /* IN/OUT: Checksum value 2 */
){
  u8 *a = (u8 *)z;
  u32 cksum0 = *pCksum0;
  u32 cksum1 = *pCksum1;
  int nIn = (n/8) * 8;
  int i;

  assert( n>0 );
  for(i=0; i<nIn; i+=8){
    cksum0 += getU32le(&a[i]) + cksum1;
    cksum1 += getU32le(&a[i+4]) + cksum0;
  }

  if( nIn!=n ){
    u8 aBuf[8] = {0, 0, 0, 0, 0, 0, 0, 0};
    assert( (n-nIn)<8 && n>nIn );
    memcpy(aBuf, &a[nIn], n-nIn);
    cksum0 += getU32le(aBuf) + cksum1;
    cksum1 += getU32le(&aBuf[4]) + cksum0;
  }

  *pCksum0 = cksum0;
  *pCksum1 = cksum1;
}

/*
** Update pLog->cksum0 and pLog->cksum1 so that the first nBuf bytes in the 
** write buffer (pLog->buf) are included in the checksum.
*/
static void logUpdateCksum(LogWriter *pLog, int nBuf){
  assert( (pLog->iCksumBuf % 8)==0 );
  assert( pLog->iCksumBuf<=nBuf );
  assert( (nBuf % 8)==0 || nBuf==pLog->buf.n );
  if( nBuf>pLog->iCksumBuf ){
    logCksumUnaligned(
        &pLog->buf.z[pLog->iCksumBuf], nBuf-pLog->iCksumBuf, 
        &pLog->cksum0, &pLog->cksum1
    );
  }
  pLog->iCksumBuf = nBuf;
}

static i64 firstByteOnSector(LogWriter *pLog, i64 iOff){
  return (iOff / pLog->szSector) * pLog->szSector;
}
static i64 lastByteOnSector(LogWriter *pLog, i64 iOff){
  return firstByteOnSector(pLog, iOff) + pLog->szSector - 1;
}

/*
** If possible, reclaim log file space. Log file space is reclaimed after
** a snapshot that points to the same data in the database file is synced
** into the db header.
*/
static int logReclaimSpace(lsm_db *pDb){
  int rc;
  int iMeta;
  int bRotrans;                   /* True if there exists some ro-trans */

  /* Test if there exists some other connection with a read-only transaction
  ** open. If there does, then log file space may not be reclaimed.  */
  rc = lsmDetectRoTrans(pDb, &bRotrans);
  if( rc!=LSM_OK || bRotrans ) return rc;

  iMeta = (int)pDb->pShmhdr->iMetaPage;
  if( iMeta==1 || iMeta==2 ){
    DbLog *pLog = &pDb->treehdr.log;
    i64 iSyncedId;

    /* Read the snapshot-id of the snapshot stored on meta-page iMeta. Note
    ** that in theory, the value read is untrustworthy (due to a race 
    ** condition - see comments above lsmFsReadSyncedId()). So it is only 
    ** ever used to conclude that no log space can be reclaimed. If it seems
    ** to indicate that it may be possible to reclaim log space, a
    ** second call to lsmCheckpointSynced() (which does return trustworthy
    ** values) is made below to confirm.  */
    rc = lsmFsReadSyncedId(pDb, iMeta, &iSyncedId);

    if( rc==LSM_OK && pLog->iSnapshotId!=iSyncedId ){
      i64 iSnapshotId = 0;
      i64 iOff = 0;
      rc = lsmCheckpointSynced(pDb, &iSnapshotId, &iOff, 0);
      if( rc==LSM_OK && pLog->iSnapshotId<iSnapshotId ){
        int iRegion;
        for(iRegion=0; iRegion<3; iRegion++){
          LogRegion *p = &pLog->aRegion[iRegion];
          if( iOff>=p->iStart && iOff<=p->iEnd ) break;
          p->iStart = 0;
          p->iEnd = 0;
        }
        assert( iRegion<3 );
        pLog->aRegion[iRegion].iStart = iOff;
        pLog->iSnapshotId = iSnapshotId;
      }
    }
  }
  return rc;
}

/*
** This function is called when a write-transaction is first opened. It
** is assumed that the caller is holding the client-mutex when it is 
** called.
**
** Before returning, this function allocates the LogWriter object that
** will be used to write to the log file during the write transaction.
** LSM_OK is returned if no error occurs, otherwise an LSM error code.
*/
int lsmLogBegin(lsm_db *pDb){
  int rc = LSM_OK;
  LogWriter *pNew;
  LogRegion *aReg;

  if( pDb->bUseLog==0 ) return LSM_OK;

  /* If the log file has not yet been opened, open it now. Also allocate
  ** the LogWriter structure, if it has not already been allocated.  */
  rc = lsmFsOpenLog(pDb, 0);
  if( pDb->pLogWriter==0 ){
    pNew = lsmMallocZeroRc(pDb->pEnv, sizeof(LogWriter), &rc);
    if( pNew ){
      lsmStringInit(&pNew->buf, pDb->pEnv);
      rc = lsmStringExtend(&pNew->buf, 2);
    }
    pDb->pLogWriter = pNew;
  }else{
    pNew = pDb->pLogWriter;
    assert( (u8 *)(&pNew[1])==(u8 *)(&((&pNew->buf)[1])) );
    memset(pNew, 0, ((u8 *)&pNew->buf) - (u8 *)pNew);
    pNew->buf.n = 0;
  }

  if( rc==LSM_OK ){
    /* The following call detects whether or not a new snapshot has been 
    ** synced into the database file. If so, it updates the contents of
    ** the pDb->treehdr.log structure to reclaim any space in the log
    ** file that is no longer required. 
    **
    ** TODO: Calling this every transaction is overkill. And since the 
    ** call has to read and checksum a snapshot from the database file,
    ** it is expensive. It would be better to figure out a way so that
    ** this is only called occasionally - say for every 32KB written to 
    ** the log file.
    */
    rc = logReclaimSpace(pDb);
  }
  if( rc!=LSM_OK ){
    lsmLogClose(pDb);
    return rc;
  }

  /* Set the effective sector-size for this transaction. Sectors are assumed
  ** to be one byte in size if the safety-mode is OFF or NORMAL, or as
  ** reported by lsmFsSectorSize if it is FULL.  */
  if( pDb->eSafety==LSM_SAFETY_FULL ){
    pNew->szSector = lsmFsSectorSize(pDb->pFS);
    assert( pNew->szSector>0 );
  }else{
    pNew->szSector = 1;
  }

  /* There are now three scenarios:
  **
  **   1) Regions 0 and 1 are both zero bytes in size and region 2 begins
  **      at a file offset greater than LSM_MIN_LOGWRAP. In this case, wrap
  **      around to the start and write data into the start of the log file. 
  **
  **   2) Region 1 is zero bytes in size and region 2 occurs earlier in the 
  **      file than region 0. In this case, append data to region 2, but
  **      remember to jump over region 1 if required.
  **
  **   3) Region 2 is the last in the file. Append to it.
  */
  aReg = &pDb->treehdr.log.aRegion[0];

  assert( aReg[0].iEnd==0 || aReg[0].iEnd>aReg[0].iStart );
  assert( aReg[1].iEnd==0 || aReg[1].iEnd>aReg[1].iStart );

  pNew->cksum0 = pDb->treehdr.log.cksum0;
  pNew->cksum1 = pDb->treehdr.log.cksum1;

  if( aReg[0].iEnd==0 && aReg[1].iEnd==0 && aReg[2].iStart>=LSM_MIN_LOGWRAP ){
    /* Case 1. Wrap around to the start of the file. Write an LSM_LOG_JUMP 
    ** into the log file in this case. Pad it out to 8 bytes using a PAD2
    ** record so that the checksums can be updated immediately.  */
    u8 aJump[] = { 
      LSM_LOG_PAD2, 0x04, 0x00, 0x00, 0x00, 0x00, LSM_LOG_JUMP, 0x00 
    };

    lsmStringBinAppend(&pNew->buf, aJump, sizeof(aJump));
    logUpdateCksum(pNew, pNew->buf.n);
    rc = lsmFsWriteLog(pDb->pFS, aReg[2].iEnd, &pNew->buf);
    pNew->iCksumBuf = pNew->buf.n = 0;

    aReg[2].iEnd += 8;
    pNew->jump = aReg[0] = aReg[2];
    aReg[2].iStart = aReg[2].iEnd = 0;
  }else if( aReg[1].iEnd==0 && aReg[2].iEnd<aReg[0].iEnd ){
    /* Case 2. */
    pNew->iOff = aReg[2].iEnd;
    pNew->jump = aReg[0];
  }else{
    /* Case 3. */
    assert( aReg[2].iStart>=aReg[0].iEnd && aReg[2].iStart>=aReg[1].iEnd );
    pNew->iOff = aReg[2].iEnd;
  }

  if( pNew->jump.iStart ){
    i64 iRound;
    assert( pNew->jump.iStart>pNew->iOff );

    iRound = firstByteOnSector(pNew, pNew->jump.iStart);
    if( iRound>pNew->iOff ) pNew->jump.iStart = iRound;
    pNew->jump.iEnd = lastByteOnSector(pNew, pNew->jump.iEnd);
  }

  assert( pDb->pLogWriter==pNew );
  return rc;
}

/*
** This function is called when a write-transaction is being closed.
** Parameter bCommit is true if the transaction is being committed,
** or false otherwise. The caller must hold the client-mutex to call
** this function.
**
** A call to this function deletes the LogWriter object allocated by
** lsmLogBegin(). If the transaction is being committed, the shared state
** in *pLog is updated before returning.
*/
void lsmLogEnd(lsm_db *pDb, int bCommit){
  DbLog *pLog;
  LogWriter *p;
  p = pDb->pLogWriter;

  if( p==0 ) return;
  pLog = &pDb->treehdr.log;

  if( bCommit ){
    pLog->aRegion[2].iEnd = p->iOff;
    pLog->cksum0 = p->cksum0;
    pLog->cksum1 = p->cksum1;
    if( p->iRegion1End ){
      /* This happens when the transaction had to jump over some other
      ** part of the log.  */
      assert( pLog->aRegion[1].iEnd==0 );
      assert( pLog->aRegion[2].iStart<p->iRegion1End );
      pLog->aRegion[1].iStart = pLog->aRegion[2].iStart;
      pLog->aRegion[1].iEnd = p->iRegion1End;
      pLog->aRegion[2].iStart = p->iRegion2Start;
    }
  }
}

static int jumpIfRequired(
  lsm_db *pDb,
  LogWriter *pLog,
  int nReq,
  int *pbJump
){
  /* Determine if it is necessary to add an LSM_LOG_JUMP to jump over the
  ** jump region before writing the LSM_LOG_WRITE or DELETE record. This
  ** is necessary if there is insufficient room between the current offset
  ** and the jump region to fit the new WRITE/DELETE record and the largest
  ** possible JUMP record with up to 7 bytes of padding (a total of 17 
  ** bytes).  */
  if( (pLog->jump.iStart > (pLog->iOff + pLog->buf.n))
   && (pLog->jump.iStart < (pLog->iOff + pLog->buf.n + (nReq + 17))) 
  ){
    int rc;                       /* Return code */
    i64 iJump;                    /* Offset to jump to */
    u8 aJump[10];                 /* Encoded jump record */
    int nJump;                    /* Valid bytes in aJump[] */
    int nPad;                     /* Bytes of padding required */

    /* Serialize the JUMP record */
    iJump = pLog->jump.iEnd+1;
    aJump[0] = LSM_LOG_JUMP;
    nJump = 1 + lsmVarintPut64(&aJump[1], iJump);

    /* Adding padding to the contents of the buffer so that it will be a 
    ** multiple of 8 bytes in size after the JUMP record is appended. This
    ** is not strictly required, it just makes the keeping the running 
    ** checksum up to date in this file a little simpler.  */
    nPad = (pLog->buf.n + nJump) % 8;
    if( nPad ){
      u8 aPad[7] = {0,0,0,0,0,0,0};
      nPad = 8-nPad;
      if( nPad==1 ){
        aPad[0] = LSM_LOG_PAD1;
      }else{
        aPad[0] = LSM_LOG_PAD2;
        aPad[1] = (u8)(nPad-2);
      }
      rc = lsmStringBinAppend(&pLog->buf, aPad, nPad);
      if( rc!=LSM_OK ) return rc;
    }

    /* Append the JUMP record to the buffer. Then flush the buffer to disk
    ** and update the checksums. The next write to the log file (assuming
    ** there is no transaction rollback) will be to offset iJump (just past
    ** the jump region).  */
    rc = lsmStringBinAppend(&pLog->buf, aJump, nJump);
    if( rc!=LSM_OK ) return rc;
    assert( (pLog->buf.n % 8)==0 );
    rc = lsmFsWriteLog(pDb->pFS, pLog->iOff, &pLog->buf);
    if( rc!=LSM_OK ) return rc;
    logUpdateCksum(pLog, pLog->buf.n);
    pLog->iRegion1End = (pLog->iOff + pLog->buf.n);
    pLog->iRegion2Start = iJump;
    pLog->iOff = iJump;
    pLog->iCksumBuf = pLog->buf.n = 0;
    if( pbJump ) *pbJump = 1;
  }

  return LSM_OK;
}

static int logCksumAndFlush(lsm_db *pDb){
  int rc;                         /* Return code */
  LogWriter *pLog = pDb->pLogWriter;

  /* Calculate the checksum value. Append it to the buffer. */
  logUpdateCksum(pLog, pLog->buf.n);
  lsmPutU32((u8 *)&pLog->buf.z[pLog->buf.n], pLog->cksum0);
  pLog->buf.n += 4;
  lsmPutU32((u8 *)&pLog->buf.z[pLog->buf.n], pLog->cksum1);
  pLog->buf.n += 4;

  /* Write the contents of the buffer to disk. */
  rc = lsmFsWriteLog(pDb->pFS, pLog->iOff, &pLog->buf);
  pLog->iOff += pLog->buf.n;
  pLog->iCksumBuf = pLog->buf.n = 0;

  return rc;
}

/*
** Write the contents of the log-buffer to disk. Then write either a CKSUM
** or COMMIT record, depending on the value of parameter eType.
*/
static int logFlush(lsm_db *pDb, int eType){
  int rc;
  int nReq;
  LogWriter *pLog = pDb->pLogWriter;
  
  assert( eType==LSM_LOG_COMMIT );
  assert( pLog );

  /* Commit record is always 9 bytes in size. */
  nReq = 9;
  if( eType==LSM_LOG_COMMIT && pLog->szSector>1 ) nReq += pLog->szSector + 17;
  rc = jumpIfRequired(pDb, pLog, nReq, 0);

  /* If this is a COMMIT, add padding to the log so that the COMMIT record
  ** is aligned against the end of a disk sector. In other words, add padding
  ** so that the first byte following the COMMIT record lies on a different
  ** sector.  */
  if( eType==LSM_LOG_COMMIT && pLog->szSector>1 ){
    int nPad;                     /* Bytes of padding to add */

    /* Determine the value of nPad. */
    nPad = ((pLog->iOff + pLog->buf.n + 9) % pLog->szSector);
    if( nPad ) nPad = pLog->szSector - nPad;
    rc = lsmStringExtend(&pLog->buf, nPad);
    if( rc!=LSM_OK ) return rc;

    while( nPad ){
      if( nPad==1 ){
        pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD1;
        nPad = 0;
      }else{
        int n = LSM_MIN(200, nPad-2);
        pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD2;
        pLog->buf.z[pLog->buf.n++] = (char)n;
        nPad -= 2;
        memset(&pLog->buf.z[pLog->buf.n], 0x2B, n);
        pLog->buf.n += n;
        nPad -= n;
      }
    }
  }

  /* Make sure there is room in the log-buffer to add the CKSUM or COMMIT
  ** record. Then add the first byte of it.  */
  rc = lsmStringExtend(&pLog->buf, 9);
  if( rc!=LSM_OK ) return rc;
  pLog->buf.z[pLog->buf.n++] = (char)eType;
  memset(&pLog->buf.z[pLog->buf.n], 0, 8);

  rc = logCksumAndFlush(pDb);

  /* If this is a commit and synchronous=full, sync the log to disk. */
  if( rc==LSM_OK && eType==LSM_LOG_COMMIT && pDb->eSafety==LSM_SAFETY_FULL ){
    rc = lsmFsSyncLog(pDb->pFS);
  }
  return rc;
}

/*
** Append an LSM_LOG_WRITE (if nVal>=0) or LSM_LOG_DELETE (if nVal<0) 
** record to the database log.
*/
int lsmLogWrite(
  lsm_db *pDb,                    /* Database handle */
  int eType,
  void *pKey, int nKey,           /* Database key to write to log */
  void *pVal, int nVal            /* Database value (or nVal<0) to write */
){
  int rc = LSM_OK;
  LogWriter *pLog;                /* Log object to write to */
  int nReq;                       /* Bytes of space required in log */
  int bCksum = 0;                 /* True to embed a checksum in this record */

  assert( eType==LSM_WRITE || eType==LSM_DELETE || eType==LSM_DRANGE );
  assert( LSM_LOG_WRITE==LSM_WRITE );
  assert( LSM_LOG_DELETE==LSM_DELETE );
  assert( LSM_LOG_DRANGE==LSM_DRANGE );
  assert( (eType==LSM_LOG_DELETE)==(nVal<0) );

  if( pDb->bUseLog==0 ) return LSM_OK;
  pLog = pDb->pLogWriter;

  /* Determine how many bytes of space are required, assuming that a checksum
  ** will be embedded in this record (even though it may not be).  */
  nReq = 1 + lsmVarintLen32(nKey) + 8 + nKey;
  if( eType!=LSM_LOG_DELETE ) nReq += lsmVarintLen32(nVal) + nVal;

  /* Jump over the jump region if required. Set bCksum to true to tell the
  ** code below to include a checksum in the record if either (a) writing
  ** this record would mean that more than LSM_CKSUM_MAXDATA bytes of data
  ** have been written to the log since the last checksum, or (b) the jump
  ** is taken.  */
  rc = jumpIfRequired(pDb, pLog, nReq, &bCksum);
  if( (pLog->buf.n+nReq) > LSM_CKSUM_MAXDATA ) bCksum = 1;

  if( rc==LSM_OK ){
    rc = lsmStringExtend(&pLog->buf, nReq);
  }
  if( rc==LSM_OK ){
    u8 *a = (u8 *)&pLog->buf.z[pLog->buf.n];
    
    /* Write the record header - the type byte followed by either 1 (for
    ** DELETE) or 2 (for WRITE) varints.  */
    assert( LSM_LOG_WRITE_CKSUM == (LSM_LOG_WRITE | 0x0001) );
    assert( LSM_LOG_DELETE_CKSUM == (LSM_LOG_DELETE | 0x0001) );
    assert( LSM_LOG_DRANGE_CKSUM == (LSM_LOG_DRANGE | 0x0001) );
    *(a++) = (u8)eType | (u8)bCksum;
    a += lsmVarintPut32(a, nKey);
    if( eType!=LSM_LOG_DELETE ) a += lsmVarintPut32(a, nVal);

    if( bCksum ){
      pLog->buf.n = (a - (u8 *)pLog->buf.z);
      rc = logCksumAndFlush(pDb);
      a = (u8 *)&pLog->buf.z[pLog->buf.n];
    }

    memcpy(a, pKey, nKey);
    a += nKey;
    if( eType!=LSM_LOG_DELETE ){
      memcpy(a, pVal, nVal);
      a += nVal;
    }
    pLog->buf.n = a - (u8 *)pLog->buf.z;
    assert( pLog->buf.n<=pLog->buf.nAlloc );
  }

  return rc;
}

/*
** Append an LSM_LOG_COMMIT record to the database log.
*/
int lsmLogCommit(lsm_db *pDb){
  if( pDb->bUseLog==0 ) return LSM_OK;
  return logFlush(pDb, LSM_LOG_COMMIT);
}

/*
** Store the current offset and other checksum related information in the
** structure *pMark. Later, *pMark can be passed to lsmLogSeek() to "rewind"
** the LogWriter object to the current log file offset. This is used when
** rolling back savepoint transactions.
*/
void lsmLogTell(
  lsm_db *pDb,                    /* Database handle */
  LogMark *pMark                  /* Populate this object with current offset */
){
  LogWriter *pLog;
  int nCksum;

  if( pDb->bUseLog==0 ) return;
  pLog = pDb->pLogWriter;
  nCksum = pLog->buf.n & 0xFFFFFFF8;
  logUpdateCksum(pLog, nCksum);
  assert( pLog->iCksumBuf==nCksum );
  pMark->nBuf = pLog->buf.n - nCksum;
  memcpy(pMark->aBuf, &pLog->buf.z[nCksum], pMark->nBuf);

  pMark->iOff = pLog->iOff + pLog->buf.n;
  pMark->cksum0 = pLog->cksum0;
  pMark->cksum1 = pLog->cksum1;
}

/*
** Seek (rewind) back to the log file offset stored by an ealier call to
** lsmLogTell() in *pMark.
*/
void lsmLogSeek(
  lsm_db *pDb,                    /* Database handle */
  LogMark *pMark                  /* Object containing log offset to seek to */
){
  LogWriter *pLog;

  if( pDb->bUseLog==0 ) return;
  pLog = pDb->pLogWriter;

  assert( pMark->iOff<=pLog->iOff+pLog->buf.n );
  if( (pMark->iOff & 0xFFFFFFF8)>=pLog->iOff ){
    pLog->buf.n = (int)(pMark->iOff - pLog->iOff);
    pLog->iCksumBuf = (pLog->buf.n & 0xFFFFFFF8);
  }else{
    pLog->buf.n = pMark->nBuf;
    memcpy(pLog->buf.z, pMark->aBuf, pMark->nBuf);
    pLog->iCksumBuf = 0;
    pLog->iOff = pMark->iOff - pMark->nBuf;
  }
  pLog->cksum0 = pMark->cksum0;
  pLog->cksum1 = pMark->cksum1;

  if( pMark->iOff > pLog->iRegion1End ) pLog->iRegion1End = 0;
  if( pMark->iOff > pLog->iRegion2Start ) pLog->iRegion2Start = 0;
}

/*
** This function does the work for an lsm_info(LOG_STRUCTURE) request.
*/
int lsmInfoLogStructure(lsm_db *pDb, char **pzVal){
  int rc = LSM_OK;
  char *zVal = 0;

  /* If there is no read or write transaction open, read the latest 
  ** tree-header from shared-memory to report on. If necessary, update
  ** it based on the contents of the database header.  
  **
  ** No locks are taken here - these are passive read operations only.
  */
  if( pDb->pCsr==0 && pDb->nTransOpen==0 ){
    rc = lsmTreeLoadHeader(pDb, 0);
    if( rc==LSM_OK ) rc = logReclaimSpace(pDb);
  }

  if( rc==LSM_OK ){
    DbLog *pLog = &pDb->treehdr.log;
    zVal = lsmMallocPrintf(pDb->pEnv, 
        "%d %d %d %d %d %d", 
        (int)pLog->aRegion[0].iStart, (int)pLog->aRegion[0].iEnd,
        (int)pLog->aRegion[1].iStart, (int)pLog->aRegion[1].iEnd,
        (int)pLog->aRegion[2].iStart, (int)pLog->aRegion[2].iEnd
    );
    if( !zVal ) rc = LSM_NOMEM_BKPT;
  }

  *pzVal = zVal;
  return rc;
}

/*************************************************************************
** Begin code for log recovery.
*/

typedef struct LogReader LogReader;
struct LogReader {
  FileSystem *pFS;                /* File system to read from */
  i64 iOff;                       /* File offset at end of buf content */
  int iBuf;                       /* Current read offset in buf */
  LsmString buf;                  /* Buffer containing file content */

  int iCksumBuf;                  /* Offset in buf corresponding to cksum[01] */
  u32 cksum0;                     /* Checksum 0 at offset iCksumBuf */
  u32 cksum1;                     /* Checksum 1 at offset iCksumBuf */
};

static void logReaderBlob(
  LogReader *p,                   /* Log reader object */
  LsmString *pBuf,                /* Dynamic storage, if required */
  int nBlob,                      /* Number of bytes to read */
  u8 **ppBlob,                    /* OUT: Pointer to blob read */
  int *pRc                        /* IN/OUT: Error code */
){
  static const int LOG_READ_SIZE = 512;
  int rc = *pRc;                  /* Return code */
  int nReq = nBlob;               /* Bytes required */

  while( rc==LSM_OK && nReq>0 ){
    int nAvail;                   /* Bytes of data available in p->buf */
    if( p->buf.n==p->iBuf ){
      int nCksum;                 /* Total bytes requiring checksum */
      int nCarry = 0;             /* Total bytes requiring checksum */

      nCksum = p->iBuf - p->iCksumBuf;
      if( nCksum>0 ){
        nCarry = nCksum % 8;
        nCksum = ((nCksum / 8) * 8);
        if( nCksum>0 ){
          logCksumUnaligned(
              &p->buf.z[p->iCksumBuf], nCksum, &p->cksum0, &p->cksum1
          );
        }
      }
      if( nCarry>0 ) memcpy(p->buf.z, &p->buf.z[p->iBuf-nCarry], nCarry);
      p->buf.n = nCarry;
      p->iBuf = nCarry;

      rc = lsmFsReadLog(p->pFS, p->iOff, LOG_READ_SIZE, &p->buf);
      if( rc!=LSM_OK ) break;
      p->iCksumBuf = 0;
      p->iOff += LOG_READ_SIZE;
    }

    nAvail = p->buf.n - p->iBuf;
    if( ppBlob && nReq==nBlob && nBlob<=nAvail ){
      *ppBlob = (u8 *)&p->buf.z[p->iBuf];
      p->iBuf += nBlob;
      nReq = 0;
    }else{
      int nCopy = LSM_MIN(nAvail, nReq);
      if( nBlob==nReq ){
        pBuf->n = 0;
      }
      rc = lsmStringBinAppend(pBuf, (u8 *)&p->buf.z[p->iBuf], nCopy);
      nReq -= nCopy;
      p->iBuf += nCopy;
      if( nReq==0 && ppBlob ){
        *ppBlob = (u8*)pBuf->z;
      }
    }
  }

  *pRc = rc;
}

static void logReaderVarint(
  LogReader *p, 
  LsmString *pBuf,
  int *piVal,                     /* OUT: Value read from log */
  int *pRc                        /* IN/OUT: Error code */
){
  if( *pRc==LSM_OK ){
    u8 *aVarint;
    if( p->buf.n==p->iBuf ){
      logReaderBlob(p, 0, 10, &aVarint, pRc);
      if( LSM_OK==*pRc ) p->iBuf -= (10 - lsmVarintGet32(aVarint, piVal));
    }else{
      logReaderBlob(p, pBuf, lsmVarintSize(p->buf.z[p->iBuf]), &aVarint, pRc);
      if( LSM_OK==*pRc ) lsmVarintGet32(aVarint, piVal);
    }
  }
}

static void logReaderByte(LogReader *p, u8 *pByte, int *pRc){
  u8 *pPtr = 0;
  logReaderBlob(p, 0, 1, &pPtr, pRc);
  if( pPtr ) *pByte = *pPtr;
}

static void logReaderCksum(LogReader *p, LsmString *pBuf, int *pbEof, int *pRc){
  if( *pRc==LSM_OK ){
    u8 *pPtr = 0;
    u32 cksum0, cksum1;
    int nCksum = p->iBuf - p->iCksumBuf;

    /* Update in-memory (expected) checksums */
    assert( nCksum>=0 );
    logCksumUnaligned(&p->buf.z[p->iCksumBuf], nCksum, &p->cksum0, &p->cksum1);
    p->iCksumBuf = p->iBuf + 8;
    logReaderBlob(p, pBuf, 8, &pPtr, pRc);
    assert( pPtr || *pRc );

    /* Read the checksums from the log file. Set *pbEof if they do not match. */
    if( pPtr ){
      cksum0 = lsmGetU32(pPtr);
      cksum1 = lsmGetU32(&pPtr[4]);
      *pbEof = (cksum0!=p->cksum0 || cksum1!=p->cksum1);
      p->iCksumBuf = p->iBuf;
    }
  }
}

static void logReaderInit(
  lsm_db *pDb,                    /* Database handle */
  DbLog *pLog,                    /* Log object associated with pDb */
  int bInitBuf,                   /* True if p->buf is uninitialized */
  LogReader *p                    /* Initialize this LogReader object */
){
  p->pFS = pDb->pFS;
  p->iOff = pLog->aRegion[2].iStart;
  p->cksum0 = pLog->cksum0;
  p->cksum1 = pLog->cksum1;
  if( bInitBuf ){ lsmStringInit(&p->buf, pDb->pEnv); }
  p->buf.n = 0;
  p->iCksumBuf = 0;
  p->iBuf = 0;
}

/*
** This function is called after reading the header of a LOG_DELETE or
** LOG_WRITE record. Parameter nByte is the total size of the key and
** value that follow the header just read. Return true if the size and
** position of the record indicate that it should contain a checksum.
*/
static int logRequireCksum(LogReader *p, int nByte){
  return ((p->iBuf + nByte - p->iCksumBuf) > LSM_CKSUM_MAXDATA);
}

/*
** Recover the contents of the log file.
*/
int lsmLogRecover(lsm_db *pDb){
  LsmString buf1;                 /* Key buffer */
  LsmString buf2;                 /* Value buffer */
  LogReader reader;               /* Log reader object */
  int rc = LSM_OK;                /* Return code */
  int nCommit = 0;                /* Number of transactions to recover */
  int iPass;
  int nJump = 0;                  /* Number of LSM_LOG_JUMP records in pass 0 */
  DbLog *pLog;
  int bOpen;

  rc = lsmFsOpenLog(pDb, &bOpen);
  if( rc!=LSM_OK ) return rc;

  rc = lsmTreeInit(pDb);
  if( rc!=LSM_OK ) return rc;

  pLog = &pDb->treehdr.log;
  lsmCheckpointLogoffset(pDb->pShmhdr->aSnap2, pLog);

  logReaderInit(pDb, pLog, 1, &reader);
  lsmStringInit(&buf1, pDb->pEnv);
  lsmStringInit(&buf2, pDb->pEnv);

  /* The outer for() loop runs at most twice. The first iteration is to 
  ** count the number of committed transactions in the log. The second 
  ** iterates through those transactions and updates the in-memory tree 
  ** structure with their contents.  */
  if( bOpen ){
    for(iPass=0; iPass<2 && rc==LSM_OK; iPass++){
      int bEof = 0;

      while( rc==LSM_OK && !bEof ){
        u8 eType = 0;
        logReaderByte(&reader, &eType, &rc);

        switch( eType ){
          case LSM_LOG_PAD1:
            break;

          case LSM_LOG_PAD2: {
            int nPad;
            logReaderVarint(&reader, &buf1, &nPad, &rc);
            logReaderBlob(&reader, &buf1, nPad, 0, &rc);
            break;
          }

          case LSM_LOG_DRANGE:
          case LSM_LOG_DRANGE_CKSUM:
          case LSM_LOG_WRITE:
          case LSM_LOG_WRITE_CKSUM: {
            int nKey;
            int nVal;
            u8 *aVal;
            logReaderVarint(&reader, &buf1, &nKey, &rc);
            logReaderVarint(&reader, &buf2, &nVal, &rc);

            if( eType==LSM_LOG_WRITE_CKSUM || eType==LSM_LOG_DRANGE_CKSUM ){
              logReaderCksum(&reader, &buf1, &bEof, &rc);
            }else{
              bEof = logRequireCksum(&reader, nKey+nVal);
            }
            if( bEof ) break;

            logReaderBlob(&reader, &buf1, nKey, 0, &rc);
            logReaderBlob(&reader, &buf2, nVal, &aVal, &rc);
            if( iPass==1 && rc==LSM_OK ){ 
              if( eType==LSM_LOG_WRITE || eType==LSM_LOG_WRITE_CKSUM ){
                rc = lsmTreeInsert(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
              }else{
                rc = lsmTreeDelete(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
              }
            }
            break;
          }

          case LSM_LOG_DELETE:
          case LSM_LOG_DELETE_CKSUM: {
            int nKey; u8 *aKey;
            logReaderVarint(&reader, &buf1, &nKey, &rc);

            if( eType==LSM_LOG_DELETE_CKSUM ){
              logReaderCksum(&reader, &buf1, &bEof, &rc);
            }else{
              bEof = logRequireCksum(&reader, nKey);
            }
            if( bEof ) break;

            logReaderBlob(&reader, &buf1, nKey, &aKey, &rc);
            if( iPass==1 && rc==LSM_OK ){ 
              rc = lsmTreeInsert(pDb, aKey, nKey, NULL, -1);
            }
            break;
          }

          case LSM_LOG_COMMIT:
            logReaderCksum(&reader, &buf1, &bEof, &rc);
            if( bEof==0 ){
              nCommit++;
              assert( nCommit>0 || iPass==1 );
              if( nCommit==0 ) bEof = 1;
            }
            break;

          case LSM_LOG_JUMP: {
            int iOff = 0;
            logReaderVarint(&reader, &buf1, &iOff, &rc);
            if( rc==LSM_OK ){
              if( iPass==1 ){
                if( pLog->aRegion[2].iStart==0 ){
                  assert( pLog->aRegion[1].iStart==0 );
                  pLog->aRegion[1].iEnd = reader.iOff;
                }else{
                  assert( pLog->aRegion[0].iStart==0 );
                  pLog->aRegion[0].iStart = pLog->aRegion[2].iStart;
                  pLog->aRegion[0].iEnd = reader.iOff-reader.buf.n+reader.iBuf;
                }
                pLog->aRegion[2].iStart = iOff;
              }else{
                if( (nJump++)==2 ){
                  bEof = 1;
                }
              }

              reader.iOff = iOff;
              reader.buf.n = reader.iBuf;
            }
            break;
          }

          default:
            /* Including LSM_LOG_EOF */
            bEof = 1;
            break;
        }
      }

      if( rc==LSM_OK && iPass==0 ){
        if( nCommit==0 ){
          if( pLog->aRegion[2].iStart==0 ){
            iPass = 1;
          }else{
            pLog->aRegion[2].iStart = 0;
            iPass = -1;
            lsmCheckpointZeroLogoffset(pDb);
          }
        }
        logReaderInit(pDb, pLog, 0, &reader);
        nCommit = nCommit * -1;
      }
    }
  }

  /* Initialize DbLog object */
  if( rc==LSM_OK ){
    pLog->aRegion[2].iEnd = reader.iOff - reader.buf.n + reader.iBuf;
    pLog->cksum0 = reader.cksum0;
    pLog->cksum1 = reader.cksum1;
  }

  if( rc==LSM_OK ){
    rc = lsmFinishRecovery(pDb);
  }else{
    lsmFinishRecovery(pDb);
  }

  if( pDb->bRoTrans ){
    lsmFsCloseLog(pDb);
  }

  lsmStringClear(&buf1);
  lsmStringClear(&buf2);
  lsmStringClear(&reader.buf);
  return rc;
}

void lsmLogClose(lsm_db *db){
  if( db->pLogWriter ){
    lsmFree(db->pEnv, db->pLogWriter->buf.z);
    lsmFree(db->pEnv, db->pLogWriter);
    db->pLogWriter = 0;
  }
}