Use PostgreSQL notifications for queue events

[?]
Jun 11, 2015, 3:38 PM
ATJ54SPXPE2IIFRERUOBFF42HBSEADP4QOI743ZBUNBQX3PYKRXQC

Dependencies

  • [2] W5OAZWPD Drop the errorMsg column in the Jobs table
  • [3] RYTQLATY Keep track of failed paths in the Hydra database
  • [4] C6HOMHZW Don't try to handle SIGINT
  • [5] T2EIYJNG On SIGINT, shut down the builder threads
  • [6] YZAI5GQU Implement a database connection pool
  • [7] NJJ7H64S Very basic multi-threaded queue runner
  • [8] 5AIYUMTB Basic remote building
  • [9] 24BMQDZA Start of single-process hydra-queue-runner
  • [10] ENXUSMSV Make concurrency more robust
  • [*] 2GK5DOU7 * Downloading closures.
  • [*] UOINKJ2J Add an action to cancel all builds in a jobset eval
  • [*] N22GPKYT * Put info about logs / build products in the DB.
  • [*] S6OISBQ3 * Mark the "current" builds in a jobset, i.e. those corresponding to

Change contents

  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 158
    [5.6816][5.20:59](),[5.947][5.20:59](),[5.59][5.75:123](),[5.75][5.75:123](),[5.123][5.60:94]()
    /* CV for waking up the queue. */
    std::condition_variable queueMonitorWakeup;
    std::mutex queueMonitorMutex;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 213
    [5.1058][5.222:281]()
    void getQueuedBuilds(std::shared_ptr<StoreAPI> store);
    [5.1058]
    [5.7042]
    void getQueuedBuilds(std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 365
    [5.1435]
    [5.1435]
    auto conn(dbPool.get());
    struct receiver : public pqxx::notification_receiver
    {
    bool status = false;
    receiver(pqxx::connection_base & c, const std::string & channel)
    : pqxx::notification_receiver(c, channel) { }
    void operator() (const string & payload, int pid) override
    {
    status = true;
    };
    bool get() {
    bool b = status;
    status = false;
    return b;
    }
    };
    receiver buildsAdded(*conn, "builds_added");
    receiver buildsRestarted(*conn, "builds_restarted");
    receiver buildsCancelled(*conn, "builds_cancelled");
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 388
    [5.1480]
    [5.1502]
    unsigned int lastBuildId = 0;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 392
    [4.19][5.448:480](),[5.256][5.448:480]()
    getQueuedBuilds(store);
    [4.19]
    [5.1560]
    getQueuedBuilds(store, lastBuildId);
    /* Sleep until we get notification from the database about an
    event. */
    conn->await_notification();
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 398
    [5.1561][5.1561:1571](),[5.1571][5.257:395]()
    {
    std::unique_lock<std::mutex> lock(queueMonitorMutex);
    queueMonitorWakeup.wait_for(lock, std::chrono::seconds(5));
    [5.1561]
    [5.1739]
    if (buildsAdded.get())
    printMsg(lvlError, "got notification: new builds added to the queue");
    if (buildsRestarted.get()) {
    printMsg(lvlError, "got notification: builds restarted");
    lastBuildId = 0; // check all builds
    }
    if (buildsCancelled.get()) {
    printMsg(lvlError, "got notification: builds cancelled");
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 413
    [5.1807][5.481:542]()
    void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store)
    [5.1807]
    [5.9878]
    void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId)
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 415
    [5.9880][5.1894:1943]()
    printMsg(lvlError, "checking the queue...");
    [5.9880]
    [5.543]
    printMsg(lvlError, format("checking the queue for builds > %1%...") % lastBuildId);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 428
    [5.1925][5.1925:2009]()
    auto res = txn.exec("select * from Builds where finished = 0 order by id");
    [5.1925]
    [5.9906]
    auto res = txn.parameterized("select * from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec();
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 433
    [5.2137]
    [5.2137]
    if (id > lastBuildId) lastBuildId = id;
  • replacement in src/hydra-queue-runner/pool.hh at line 60
    [5.1998][5.1998:2039]()
    R * operator -> () { return r; }
    [5.1998]
    [5.2039]
    R * operator -> () { return r.get(); }
  • edit in src/lib/Hydra/Helper/Nix.pm at line 472
    [3.3072]
    [13.1608]
    $db->storage->dbh->do("notify builds_restarted");
  • edit in src/script/hydra-evaluator at line 249
    [2.799]
    [15.5850]
    $db->storage->dbh->do("notify builds_added");