4I2HF4L3JOC6KPYLI2YTEVTHBRRYO5XKXOZ6VQ2SJKSQKAPNQXCQC
bool keepGoing;
/* Sleep until we're woken up (either because a runnable build
is added, or because a build finishes). */
{
std::unique_lock<std::mutex> lock(dispatcherMutex);
printMsg(lvlDebug, format("dispatcher sleeping for %1%s") %
std::chrono::duration_cast<std::chrono::seconds>(sleepUntil - std::chrono::system_clock::now()).count());
dispatcherWakeup.wait_until(lock, sleepUntil);
nrDispatcherWakeups++;
}
}
printMsg(lvlError, "dispatcher exits");
}
/* Copy the currentJobs field of each machine. This is
necessary to ensure that the sort comparator below is
an ordering. std::sort() can segfault if it isn't. Also
filter out temporarily disabled machines. */
struct MachineInfo
{
Machine::ptr machine;
unsigned int currentJobs;
};
std::vector<MachineInfo> machinesSorted;
{
auto machines_(machines.lock());
for (auto & m : *machines_) {
auto info(m.second->state->connectInfo.lock());
if (info->consecutiveFailures && info->disabledUntil > now) {
if (info->disabledUntil < sleepUntil)
sleepUntil = info->disabledUntil;
continue;
}
machinesSorted.push_back({m.second, m.second->state->currentJobs});
/* Copy the currentJobs field of each machine. This is
necessary to ensure that the sort comparator below is
an ordering. std::sort() can segfault if it isn't. Also
filter out temporarily disabled machines. */
struct MachineInfo
{
Machine::ptr machine;
unsigned int currentJobs;
};
std::vector<MachineInfo> machinesSorted;
{
auto machines_(machines.lock());
for (auto & m : *machines_) {
auto info(m.second->state->connectInfo.lock());
if (info->consecutiveFailures && info->disabledUntil > now) {
if (info->disabledUntil < sleepUntil)
sleepUntil = info->disabledUntil;
continue;
- First by load divided by speed factor, rounded to the
nearest integer. This causes fast machines to be
preferred over slow machines with similar loads.
- First by load divided by speed factor, rounded to the
nearest integer. This causes fast machines to be
preferred over slow machines with similar loads.
- Finally by load. */
sort(machinesSorted.begin(), machinesSorted.end(),
[](const MachineInfo & a, const MachineInfo & b) -> bool
{
float ta = roundf(a.currentJobs / a.machine->speedFactor);
float tb = roundf(b.currentJobs / b.machine->speedFactor);
return
ta != tb ? ta < tb :
a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :
a.currentJobs > b.currentJobs;
});
- Finally by load. */
sort(machinesSorted.begin(), machinesSorted.end(),
[](const MachineInfo & a, const MachineInfo & b) -> bool
{
float ta = roundf(a.currentJobs / a.machine->speedFactor);
float tb = roundf(b.currentJobs / b.machine->speedFactor);
return
ta != tb ? ta < tb :
a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :
a.currentJobs > b.currentJobs;
});
/* Find a machine with a free slot and find a step to run
on it. Once we find such a pair, we restart the outer
loop because the machine sorting will have changed. */
keepGoing = false;
/* Find a machine with a free slot and find a step to run
on it. Once we find such a pair, we restart the outer
loop because the machine sorting will have changed. */
keepGoing = false;
for (auto & mi : machinesSorted) {
// FIXME: can we lose a wakeup if a builder exits concurrently?
if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;
for (auto & mi : machinesSorted) {
// FIXME: can we lose a wakeup if a builder exits concurrently?
if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;
auto runnable_(runnable.lock());
//printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
auto runnable_(runnable.lock());
//printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
/* FIXME: we're holding the runnable lock too long
here. This could be more efficient. */
/* Can this machine do this step? */
if (!mi.machine->supportsStep(step)) {
/* Skip previously failed steps that aren't ready
to be retried. */
{
auto step_(step->state.lock());
if (step_->tries > 0 && step_->after > now) {
if (step_->after < sleepUntil)
sleepUntil = step_->after;
/* Skip previously failed steps that aren't ready
to be retried. */
{
auto step_(step->state.lock());
if (step_->tries > 0 && step_->after > now) {
if (step_->after < sleepUntil)
sleepUntil = step_->after;
++i;
continue;
}
}
/* Make a slot reservation and start a thread to
do the build. */
auto reservation = std::make_shared<MaintainCount>(mi.machine->state->currentJobs);
i = runnable_->erase(i);
auto builderThread = std::thread(&State::builder, this, step, mi.machine, reservation);
builderThread.detach(); // FIXME?
/* Make a slot reservation and start a thread to
do the build. */
auto reservation = std::make_shared<MaintainCount>(mi.machine->state->currentJobs);
i = runnable_->erase(i);
/* Sleep until we're woken up (either because a runnable build
is added, or because a build finishes). */
{
std::unique_lock<std::mutex> lock(dispatcherMutex);
printMsg(lvlDebug, format("dispatcher sleeping for %1%s") %
std::chrono::duration_cast<std::chrono::seconds>(sleepUntil - std::chrono::system_clock::now()).count());
dispatcherWakeup.wait_until(lock, sleepUntil);
nrDispatcherWakeups++;
}
}
} while (keepGoing);