hydra-queue-runner: Improve dispatcher

[?]
Jun 17, 2015, 11:52 PM
NNOCZ4ROWC64ZKSAE2MPHZ3LGLI34C5TJJFW4MHXN6OELK6VMWOQC

Dependencies

  • [2] OCZ4LSGG Automatically retry aborted builds
  • [3] 5AIYUMTB Basic remote building
  • [4] ENXUSMSV Make concurrency more robust
  • [5] N5O7VEEO Immediately abort builds that require an unsupported system type
  • [6] HLSHCK3C Support requiredSystemFeatures
  • [7] NJJ7H64S Very basic multi-threaded queue runner
  • [8] GKZN4UV7 Make the queue monitor more robust, and better debug output
  • [9] 24BMQDZA Start of single-process hydra-queue-runner
  • [10] T2EIYJNG On SIGINT, shut down the builder threads
  • [11] WHULPA6S Handle failure with output
  • [12] RQUAATWB Add status dump facility

Change contents

  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 9
    [2.1077]
    [3.286]
    #include <algorithm>
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 163
    [3.6162][3.6162:6293](),[3.6293][3.0:6]()
    Sync<unsigned int> currentJobs;
    Machine()
    {
    auto currentJobs_(currentJobs.lock());
    *currentJobs_ = 0;
    }
    [3.6162]
    [3.6]
    std::atomic<unsigned int> currentJobs{0};
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 185
    [3.6549][3.6549:6632]()
    auto currentJobs_(machine->currentJobs.lock());
    (*currentJobs_)++;
    [3.6549]
    [3.6632]
    machine->currentJobs++;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 189
    [3.6670][3.6670:6776]()
    auto currentJobs_(machine->currentJobs.lock());
    if (*currentJobs_ > 0) (*currentJobs_)--;
    [3.6670]
    [3.6776]
    machine->currentJobs--;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 280
    [3.7328][3.7504:7562]()
    MachineReservation::ptr findMachine(Step::ptr step);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 872
    [2.1915][3.2622:2632](),[3.2622][3.2622:2632](),[3.2632][3.6794:6839](),[3.6839][3.1651:1734]()
    {
    auto runnable_(runnable.lock());
    printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
    [2.1915]
    [3.10471]
    bool keepGoing;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 874
    [3.10472][3.10472:10589]()
    /* FIXME: we're holding the runnable lock too long
    here. This could be more efficient. */
    [3.10472]
    [3.10589]
    do {
    /* Bail out when there are no slots left. */
    std::vector<Machine::ptr> machinesSorted;
    {
    auto machines_(machines.lock());
    machinesSorted.insert(machinesSorted.end(),
    machines_->begin(), machines_->end());
    }
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 883
    [3.10590][2.1916:1980]()
    system_time now = std::chrono::system_clock::now();
    [3.10590]
    [2.1980]
    /* Sort the machines by a combination of speed factor and
    available slots. Prioritise the available machines as
    follows:
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 887
    [2.1981][3.10590:10702](),[3.10590][3.10590:10702]()
    for (auto i = runnable_->begin(); i != runnable_->end(); ) {
    auto step = i->lock();
    [2.1981]
    [3.10702]
    - First by load divided by speed factor, rounded to the
    nearest integer. This causes fast machines to be
    preferred over slow machines with similar loads.
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 891
    [3.10703][3.10703:10848](),[3.10848][2.1982:2000]()
    /* Delete dead steps. */
    if (!step) {
    i = runnable_->erase(i);
    continue;
    }
    [3.10703]
    [2.2000]
    - Then by speed factor.
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 893
    [2.2001][2.2001:2104]()
    /* Skip previously failed steps that aren't ready to
    be retried. */
    [2.2001]
    [2.2104]
    - Finally by load. */
    sort(machinesSorted.begin(), machinesSorted.end(),
    [](const Machine::ptr & a, const Machine::ptr & b) -> bool
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 897
    [2.2122][2.2122:2350]()
    auto step_(step->state.lock());
    if (step_->tries > 0 && step_->after > now) {
    if (step_->after < sleepUntil)
    sleepUntil = step_->after;
    [2.2122]
    [2.2350]
    float ta = roundf(a->currentJobs / a->speedFactor);
    float tb = roundf(b->currentJobs / b->speedFactor);
    return
    ta != tb ? ta > tb :
    a->speedFactor != b->speedFactor ? a->speedFactor > b->speedFactor :
    a->maxJobs > b->maxJobs;
    });
    /* Find a machine with a free slot and find a step to run
    on it. Once we find such a pair, we restart the outer
    loop because the machine sorting will have changed. */
    keepGoing = false;
    system_time now = std::chrono::system_clock::now();
    for (auto & machine : machinesSorted) {
    // FIXME: can we lose a wakeup if a builder exits concurrently?
    if (machine->currentJobs >= machine->maxJobs) continue;
    auto runnable_(runnable.lock());
    printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
    /* FIXME: we're holding the runnable lock too long
    here. This could be more efficient. */
    for (auto i = runnable_->begin(); i != runnable_->end(); ) {
    auto step = i->lock();
    /* Delete dead steps. */
    if (!step) {
    i = runnable_->erase(i);
    continue;
    }
    /* Can this machine do this step? */
    if (!machine->supportsStep(step)) {
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 935
    [2.2435][3.10848:10866](),[3.10848][3.10848:10866]()
    }
    [2.2435]
    [3.10866]
    /* Skip previously failed steps that aren't ready
    to be retried. */
    {
    auto step_(step->state.lock());
    if (step_->tries > 0 && step_->after > now) {
    if (step_->after < sleepUntil)
    sleepUntil = step_->after;
    ++i;
    continue;
    }
    }
    /* Make a slot reservation and start a thread to
    do the build. */
    auto reservation = std::make_shared<MachineReservation>(machine);
    i = runnable_->erase(i);
    auto builderThread = std::thread(&State::builder, this, step, reservation);
    builderThread.detach(); // FIXME?
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 956
    [3.10867][3.10867:10957](),[3.10957][3.1735:1842](),[3.1842][3.11064:11119](),[3.11064][3.11064:11119]()
    auto reservation = findMachine(step);
    if (!reservation) {
    printMsg(lvlDebug, format("cannot execute step ‘%1%’ right now") % step->drvPath);
    ++i;
    continue;
    [3.10867]
    [3.11119]
    keepGoing = true;
    break;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 959
    [3.11137][3.11137:11138](),[3.11305][3.11305:11346]()
    i = runnable_->erase(i);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 960
    [3.11347][3.11347:11489]()
    auto builderThread = std::thread(&State::builder, this, step, reservation);
    builderThread.detach(); // FIXME?
    [3.11347]
    [3.11489]
    if (keepGoing) break;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 962
    [3.7068][3.2842:2852](),[3.11503][3.2842:2852](),[3.2842][3.2842:2852]()
    }
    [3.11503]
    [3.2852]
    } while (keepGoing);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 984
    [3.11939][3.3040:3044](),[3.3040][3.3040:3044](),[3.3044][3.11940:12079](),[3.12079][3.1283:1335](),[3.1335][3.12182:12391](),[3.12182][3.12182:12391](),[3.12559][3.12559:12574]()
    }
    MachineReservation::ptr State::findMachine(Step::ptr step)
    {
    auto machines_(machines.lock());
    for (auto & machine : *machines_) {
    if (!machine->supportsStep(step)) continue;
    {
    auto currentJobs_(machine->currentJobs.lock());
    if (*currentJobs_ >= machine->maxJobs) continue;
    }
    return std::make_shared<MachineReservation>(machine);
    }
    return 0;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 1300
    [3.1109][3.1109:1163]()
    auto currentJobs_(m->currentJobs.lock());
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 1301
    [3.1232][3.1232:1292]()
    % m->sshName % *currentJobs_ % m->maxJobs);
    [3.1232]
    [3.1292]
    % m->sshName % m->currentJobs % m->maxJobs);