Implement a database connection pool

[?]
May 29, 2015, 6:55 PM
YZAI5GQU3HNMK5MEGF2Y7WS445AN4YKD3HNJQVQP545ODN3F5DLAC

Dependencies

  • [2] 62MQPRXC Pass null values to libpqxx properly
  • [3] T2EIYJNG On SIGINT, shut down the builder threads
  • [4] ENXUSMSV Make concurrency more robust
  • [5] NJJ7H64S Very basic multi-threaded queue runner
  • [6] 24BMQDZA Start of single-process hydra-queue-runner

Change contents

  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 12
    [4.19]
    [4.19]
    #include "pool.hh"
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 149
    [4.947][3.41:75]()
    std::mutex queueMonitorMutex;
    [4.947]
    [3.75]
    /* CV for waking up the queue. */
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 152
    [3.123]
    [4.947]
    std::mutex queueMonitorMutex;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 170
    [3.172]
    [4.1019]
    /* PostgreSQL connection pool. */
    Pool<Connection> dbPool;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 179
    [4.6457][4.6457:6539]()
    void markActiveBuildStepsAsAborted(pqxx::connection & conn, time_t stopTime);
    [4.6457]
    [4.6539]
    void markActiveBuildStepsAsAborted(time_t stopTime);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 191
    [4.1058][4.1058:1142]()
    void getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection & conn);
    [4.1058]
    [4.7042]
    void getQueuedBuilds(std::shared_ptr<StoreAPI> store);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 222
    [4.7535][4.7535:7560]()
    Connection conn;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 223
    [4.7622][4.7622:7676]()
    markActiveBuildStepsAsAborted(conn, time(0));
    [4.7622]
    [4.7676]
    markActiveBuildStepsAsAborted(time(0));
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 230
    [4.7733][4.7733:7817]()
    void State::markActiveBuildStepsAsAborted(pqxx::connection & conn, time_t stopTime)
    [4.7733]
    [4.7817]
    void State::markActiveBuildStepsAsAborted(time_t stopTime)
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 232
    [4.7819][4.7819:7845]()
    pqxx::work txn(conn);
    [4.7819]
    [2.0]
    auto conn(dbPool.get());
    pqxx::work txn(*conn);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 280
    [4.1480][4.1480:1502]()
    Connection conn;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 282
    [3.256][4.1522:1560](),[4.1522][4.1522:1560]()
    getQueuedBuilds(store, conn);
    [3.256]
    [4.1560]
    getQueuedBuilds(store);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 294
    [4.1807][4.1807:1893]()
    void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection & conn)
    [4.1807]
    [4.9878]
    void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store)
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 297
    [4.1943]
    [4.1943]
    auto conn(dbPool.get());
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 317
    [4.1805][4.1805:1835]()
    pqxx::work txn(conn);
    [4.1805]
    [4.1835]
    pqxx::work txn(*conn);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 349
    [4.10611][4.10611:10645]()
    pqxx::work txn(conn);
    [4.10582]
    [4.10645]
    pqxx::work txn(*conn);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 369
    [4.11243][4.11243:11277]()
    pqxx::work txn(conn);
    [4.11214]
    [4.11277]
    pqxx::work txn(*conn);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 627
    [4.15497][4.15497:15518]()
    Connection conn;
    [4.15497]
    [4.15518]
    auto conn(dbPool.get());
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 631
    [4.15572][4.15572:15602]()
    pqxx::work txn(conn);
    [4.15572]
    [4.15602]
    pqxx::work txn(*conn);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 677
    [4.9007][4.16122:16152](),[4.16122][4.16122:16152]()
    pqxx::work txn(conn);
    [4.9007]
    [4.16152]
    pqxx::work txn(*conn);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 766
    [4.3200][4.3200:3285]()
    {
    Connection conn;
    markActiveBuildStepsAsAborted(conn, 0);
    }
    [4.3200]
    [4.3285]
    markActiveBuildStepsAsAborted(0);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 791
    [3.1272]
    [4.19072]
    printMsg(lvlError, format("psql connections = %1%") % dbPool.count());
  • file addition: pool.hh (----------)
    [4.187]
    #pragma once
    #include <memory>
    #include <list>
    #include "sync.hh"
    /* This template class implements a simple pool manager of resources
    of some type R, such as database connections. It is used as
    follows:
    class Connection { ... };
    Pool<Connection> pool;
    {
    auto conn(pool.get());
    conn->exec("select ...");
    }
    Here, the Connection object referenced by ‘conn’ is automatically
    returned to the pool when ‘conn’ goes out of scope.
    */
    template <class R>
    class Pool
    {
    private:
    struct State
    {
    unsigned int count = 0;
    std::list<std::shared_ptr<R>> idle;
    };
    Sync<State> state;
    public:
    class Handle
    {
    private:
    Pool & pool;
    std::shared_ptr<R> r;
    friend Pool;
    Handle(Pool & pool, std::shared_ptr<R> r) : pool(pool), r(r) { }
    public:
    Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); }
    Handle(const Handle & l) = delete;
    ~Handle()
    {
    auto state_(pool.state.lock());
    if (r) state_->idle.push_back(r);
    }
    R * operator -> () { return r; }
    R & operator * () { return *r; }
    };
    Handle get()
    {
    {
    auto state_(state.lock());
    if (!state_->idle.empty()) {
    auto p = state_->idle.back();
    state_->idle.pop_back();
    return Handle(*this, p);
    }
    state_->count++;
    }
    /* Note: we don't hold the lock while creating a new instance,
    because creation might take a long time. */
    return Handle(*this, std::make_shared<R>());
    }
    unsigned int count()
    {
    auto state_(state.lock());
    return state_->count;
    }
    };