#include <algorithm>
#include <cmath>
#include <thread>
#include <unordered_map>
#include "state.hh"
using namespace nix;
void State::makeRunnable(Step::ptr step)
{
printMsg(lvlChatty, "step ‘%s’ is now runnable", localStore->printStorePath(step->drvPath));
{
auto step_(step->state.lock());
assert(step_->created);
assert(!step->finished);
assert(step_->deps.empty());
step_->runnableSince = std::chrono::system_clock::now();
}
{
auto runnable_(runnable.lock());
runnable_->push_back(step);
}
wakeDispatcher();
}
void State::dispatcher()
{
while (true) {
try {
printMsg(lvlDebug, "dispatcher woken up");
nrDispatcherWakeups++;
auto now1 = std::chrono::steady_clock::now();
auto sleepUntil = doDispatch();
auto now2 = std::chrono::steady_clock::now();
dispatchTimeMs += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
{
auto dispatcherWakeup_(dispatcherWakeup.lock());
if (!*dispatcherWakeup_) {
printMsg(lvlDebug, format("dispatcher sleeping for %1%s") %
std::chrono::duration_cast<std::chrono::seconds>(sleepUntil - std::chrono::system_clock::now()).count());
dispatcherWakeup_.wait_until(dispatcherWakeupCV, sleepUntil);
}
*dispatcherWakeup_ = false;
}
} catch (std::exception & e) {
printMsg(lvlError, format("dispatcher: %1%") % e.what());
sleep(1);
}
}
printMsg(lvlError, "dispatcher exits");
}
system_time State::doDispatch()
{
{
auto jobsets_(jobsets.lock());
for (auto & jobset : *jobsets_) {
auto s1 = jobset.second->shareUsed();
jobset.second->pruneSteps();
auto s2 = jobset.second->shareUsed();
if (s1 != s2)
printMsg(lvlDebug, format("pruned scheduling window of ‘%1%:%2%’ from %3% to %4%")
% jobset.first.first % jobset.first.second % s1 % s2);
}
}
auto sleepUntil = system_time::max();
bool keepGoing;
do {
system_time now = std::chrono::system_clock::now();
struct MachineInfo
{
Machine::ptr machine;
unsigned long currentJobs;
};
std::vector<MachineInfo> machinesSorted;
{
auto machines_(machines.lock());
for (auto & m : *machines_) {
auto info(m.second->state->connectInfo.lock());
if (!m.second->enabled) continue;
if (info->consecutiveFailures && info->disabledUntil > now) {
if (info->disabledUntil < sleepUntil)
sleepUntil = info->disabledUntil;
continue;
}
machinesSorted.push_back({m.second, m.second->state->currentJobs});
}
}
sort(machinesSorted.begin(), machinesSorted.end(),
[](const MachineInfo & a, const MachineInfo & b) -> bool
{
float ta = std::round(a.currentJobs / a.machine->speedFactor);
float tb = std::round(b.currentJobs / b.machine->speedFactor);
return
ta != tb ? ta < tb :
a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :
a.currentJobs > b.currentJobs;
});
struct StepInfo
{
Step::ptr step;
double lowestShareUsed = 1e9;
int highestGlobalPriority;
int highestLocalPriority;
BuildID lowestBuildID;
StepInfo(Step::ptr step, Step::State & step_) : step(step)
{
for (auto & jobset : step_.jobsets)
lowestShareUsed = std::min(lowestShareUsed, jobset->shareUsed());
highestGlobalPriority = step_.highestGlobalPriority;
highestLocalPriority = step_.highestLocalPriority;
lowestBuildID = step_.lowestBuildID;
}
};
std::vector<StepInfo> runnableSorted;
struct RunnablePerType
{
unsigned int count{0};
std::chrono::seconds waitTime{0};
};
std::unordered_map<std::string, RunnablePerType> runnablePerType;
{
auto runnable_(runnable.lock());
runnableSorted.reserve(runnable_->size());
for (auto i = runnable_->begin(); i != runnable_->end(); ) {
auto step = i->lock();
if (!step) {
i = runnable_->erase(i);
continue;
}
++i;
auto & r = runnablePerType[step->systemType];
r.count++;
auto step_(step->state.lock());
r.waitTime += std::chrono::duration_cast<std::chrono::seconds>(now - step_->runnableSince);
if (step_->tries > 0 && step_->after > now) {
if (step_->after < sleepUntil)
sleepUntil = step_->after;
continue;
}
runnableSorted.emplace_back(step, *step_);
}
}
sort(runnableSorted.begin(), runnableSorted.end(),
[](const StepInfo & a, const StepInfo & b)
{
return
a.highestGlobalPriority != b.highestGlobalPriority ? a.highestGlobalPriority > b.highestGlobalPriority :
a.lowestShareUsed != b.lowestShareUsed ? a.lowestShareUsed < b.lowestShareUsed :
a.highestLocalPriority != b.highestLocalPriority ? a.highestLocalPriority > b.highestLocalPriority :
a.lowestBuildID < b.lowestBuildID;
});
keepGoing = false;
for (auto & mi : machinesSorted) {
if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;
for (auto & stepInfo : runnableSorted) {
auto & step(stepInfo.step);
if (!mi.machine->supportsStep(step)) {
debug("machine '%s' does not support step '%s' (system type '%s')",
mi.machine->sshName, localStore->printStorePath(step->drvPath), step->drv->platform);
continue;
}
{
auto runnable_(runnable.lock());
bool removed = false;
for (auto i = runnable_->begin(); i != runnable_->end(); )
if (i->lock() == step) {
i = runnable_->erase(i);
removed = true;
break;
} else ++i;
assert(removed);
auto & r = runnablePerType[step->systemType];
assert(r.count);
r.count--;
}
auto builderThread = std::thread(&State::builder, this,
std::make_shared<MachineReservation>(*this, step, mi.machine));
builderThread.detach();
keepGoing = true;
break;
}
if (keepGoing) break;
}
{
auto machineTypes_(machineTypes.lock());
for (auto & i : *machineTypes_)
i.second.runnable = 0;
for (auto & i : runnablePerType) {
auto & j = (*machineTypes_)[i.first];
j.runnable = i.second.count;
j.waitTime = i.second.waitTime;
}
}
lastDispatcherCheck = std::chrono::system_clock::to_time_t(now);
} while (keepGoing);
abortUnsupported();
return sleepUntil;
}
void State::wakeDispatcher()
{
{
auto dispatcherWakeup_(dispatcherWakeup.lock());
*dispatcherWakeup_ = true;
}
dispatcherWakeupCV.notify_one();
}
void State::abortUnsupported()
{
auto runnable2 = *runnable.lock();
auto machines2 = *machines.lock();
system_time now = std::chrono::system_clock::now();
auto now2 = time(0);
std::unordered_set<Step::ptr> aborted;
size_t count = 0;
for (auto & wstep : runnable2) {
auto step(wstep.lock());
if (!step) continue;
bool supported = false;
for (auto & machine : machines2) {
if (machine.second->supportsStep(step)) {
step->state.lock()->lastSupported = now;
supported = true;
break;
}
}
if (!supported)
count++;
if (!supported
&& std::chrono::duration_cast<std::chrono::seconds>(now - step->state.lock()->lastSupported).count() >= maxUnsupportedTime)
{
printError("aborting unsupported build step '%s' (type '%s')",
localStore->printStorePath(step->drvPath),
step->systemType);
aborted.insert(step);
auto conn(dbPool.get());
std::set<Build::ptr> dependents;
std::set<Step::ptr> steps;
getDependents(step, dependents, steps);
if (dependents.empty()) continue;
Build::ptr build;
for (auto build2 : dependents) {
if (build2->drvPath == step->drvPath)
build = build2;
}
if (!build) build = *dependents.begin();
bool stepFinished = false;
failStep(
*conn, step, build->id,
RemoteResult {
.stepStatus = bsUnsupported,
.errorMsg = fmt("unsupported system type '%s'",
step->systemType),
.startTime = now2,
.stopTime = now2,
},
nullptr, stepFinished);
if (buildOneDone) exit(1);
}
}
{
auto runnable_(runnable.lock());
for (auto i = runnable_->begin(); i != runnable_->end(); ) {
if (aborted.count(i->lock()))
i = runnable_->erase(i);
else
++i;
}
}
nrUnsupportedSteps = count;
}
void Jobset::addStep(time_t startTime, time_t duration)
{
auto steps_(steps.lock());
(*steps_)[startTime] = duration;
seconds += duration;
}
void Jobset::pruneSteps()
{
time_t now = time(0);
auto steps_(steps.lock());
while (!steps_->empty()) {
auto i = steps_->begin();
if (i->first > now - schedulingWindow) break;
seconds -= i->second;
steps_->erase(i);
}
}
State::MachineReservation::MachineReservation(State & state, Step::ptr step, Machine::ptr machine)
: state(state), step(step), machine(machine)
{
machine->state->currentJobs++;
{
auto machineTypes_(state.machineTypes.lock());
(*machineTypes_)[step->systemType].running++;
}
}
State::MachineReservation::~MachineReservation()
{
auto prev = machine->state->currentJobs--;
assert(prev);
if (prev == 1)
machine->state->idleSince = time(0);
{
auto machineTypes_(state.machineTypes.lock());
auto & machineType = (*machineTypes_)[step->systemType];
assert(machineType.running);
machineType.running--;
if (machineType.running == 0)
machineType.lastActive = std::chrono::system_clock::now();
}
}