VUYJ47EV5POBIZXKE37GNTS2OBHT7VDQLG7T7VAOMXBFXB2GWRGAC DrvInfo::Outputs outputs = drv->queryOutputs();
auto vOutputs = vFlake->attrs->get(state.symbols.create("outputs"))->value;state.forceValue(*vOutputs);auto aHydraJobs = vOutputs->attrs->get(state.symbols.create("hydraJobs"));if (!aHydraJobs)aHydraJobs = vOutputs->attrs->get(state.symbols.create("checks"));if (!aHydraJobs)throw Error("flake '%s' does not provide any Hydra jobs or checks", flakeRef);vTop = *aHydraJobs->value;} else {state.evalFile(lookupFileArg(state, myArgs.releaseExpr), vTop);}
{auto res = top.object(attrPath);res.attr("nixName", drv->queryName());res.attr("system", drv->querySystem());res.attr("drvPath", drvPath = drv->queryDrvPath());res.attr("description", drv->queryMetaString("description"));res.attr("license", queryMetaStrings(state, *drv, "license", "shortName"));res.attr("homepage", drv->queryMetaString("homepage"));res.attr("maintainers", queryMetaStrings(state, *drv, "maintainers", "email"));res.attr("schedulingPriority", drv->queryMetaInt("schedulingPriority", 100));res.attr("timeout", drv->queryMetaInt("timeout", 36000));res.attr("maxSilent", drv->queryMetaInt("maxSilent", 7200));res.attr("isChannel", drv->queryMetaBool("isHydraChannel", false));
while (true) {/* Wait for the master to send us a job name. */writeLine(to.get(), "next");
/* If this is an aggregate, then get its constituents. */Bindings::iterator a = v.attrs->find(state.symbols.create("_hydraAggregate"));if (a != v.attrs->end() && state.forceBool(*a->value, *a->pos)) {Bindings::iterator a = v.attrs->find(state.symbols.create("constituents"));if (a == v.attrs->end())throw EvalError("derivation must have a ‘constituents’ attribute");PathSet context;state.coerceToString(*a->pos, *a->value, context, true, false);PathSet drvs;for (auto & i : context)if (i.at(0) == '!') {size_t index = i.find("!", 1);drvs.insert(string(i, index + 1));
auto s = readLine(from.get());if (s == "exit") break;if (!hasPrefix(s, "do ")) abort();std::string attrPath(s, 3);debug("worker process %d at '%s'", getpid(), attrPath);/* Evaluate it and send info back to the master. */nlohmann::json reply;try {auto v = findAlongAttrPath(state, attrPath, autoArgs, *vRoot).first;state.forceValue(*v);if (auto drv = getDerivation(state, *v, false)) {DrvInfo::Outputs outputs = drv->queryOutputs();if (drv->querySystem() == "unknown")throw EvalError("derivation must have a 'system' attribute");auto drvPath = drv->queryDrvPath();nlohmann::json job;job["nixName"] = drv->queryName();job["system"] =drv->querySystem();job["drvPath"] = drvPath;job["description"] = drv->queryMetaString("description");job["license"] = queryMetaStrings(state, *drv, "license", "shortName");job["homepage"] = drv->queryMetaString("homepage");job["maintainers"] = queryMetaStrings(state, *drv, "maintainers", "email");job["schedulingPriority"] = drv->queryMetaInt("schedulingPriority", 100);job["timeout"] = drv->queryMetaInt("timeout", 36000);job["maxSilent"] = drv->queryMetaInt("maxSilent", 7200);job["isChannel"] = drv->queryMetaBool("isHydraChannel", false);/* If this is an aggregate, then get its constituents. */auto a = v->attrs->get(state.symbols.create("_hydraAggregate"));if (a && state.forceBool(*a->value, *a->pos)) {auto a = v->attrs->get(state.symbols.create("constituents"));if (!a)throw EvalError("derivation must have a ‘constituents’ attribute");PathSet context;state.coerceToString(*a->pos, *a->value, context, true, false);for (auto & i : context)if (i.at(0) == '!') {size_t index = i.find("!", 1);job["constituents"].push_back(string(i, index + 1));}state.forceList(*a->value, *a->pos);for (unsigned int n = 0; n < a->value->listSize(); ++n) {auto v = a->value->listElems()[n];state.forceValue(*v);if (v->type == tString)job["namedConstituents"].push_back(state.forceStringNoCtx(*v));
/* Register the derivation as a GC root. !!! Thisregisters roots for jobs that we may have alreadydone. */auto localStore = state.store.dynamic_pointer_cast<LocalFSStore>();if (gcRootsDir != "" && localStore) {Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath));if (!pathExists(root))localStore->addPermRoot(localStore->parseStorePath(drvPath), root, false);}
/* Register the derivation as a GC root. !!! Thisregisters roots for jobs that we may have alreadydone. */auto localStore = state.store.dynamic_pointer_cast<LocalFSStore>();if (gcRootsDir != "" && localStore) {Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath));if (!pathExists(root))localStore->addPermRoot(localStore->parseStorePath(drvPath), root, false);}
static void findJobs(EvalState & state, JSONObject & top,Bindings & autoArgs, Value & v, const string & attrPath){try {findJobsWrapped(state, top, autoArgs, v, attrPath);} catch (EvalError & e) {auto res = top.object(attrPath);res.attr("error", filterANSIEscapes(e.msg(), true));
/* If our RSS exceeds the maximum, exit. The master willstart a new process. */struct rusage r;getrusage(RUSAGE_SELF, &r);if ((size_t) r.ru_maxrss > maxMemorySize * 1024) break;
auto initialHeapSize = config->getStrOption("evaluator_initial_heap_size", "");if (initialHeapSize != "")setenv("GC_INITIAL_HEAP_SIZE", initialHeapSize.c_str(), 1);
auto nrWorkers = config->getIntOption("evaluator_workers", 1);maxMemorySize = config->getIntOption("evaluator_max_memory_size", 4096);
mkFlag().longName("gc-roots-dir").description("garbage collector roots directory").labels({"path"}).dest(&gcRootsDir);mkFlag().longName("dry-run").description("don't create store derivations").set(&settings.readOnlyMode, true);mkFlag().longName("flake").description("build a flake").set(&flake, true);expectArg("expr", &releaseExpr);}};MyArgs myArgs;
auto vFlake = state.allocValue();
/* Start a new worker process if necessary. */if (pid == -1) {Pipe toPipe, fromPipe;toPipe.create();fromPipe.create();pid = startProcess([&,to{std::make_shared<AutoCloseFD>(std::move(fromPipe.writeSide))},from{std::make_shared<AutoCloseFD>(std::move(toPipe.readSide))}](){try {EvalState state(myArgs.searchPath, openStore());Bindings & autoArgs = *myArgs.getAutoArgs(state);worker(state, autoArgs, *to, *from);} catch (std::exception & e) {nlohmann::json err;err["error"] = e.what();writeLine(to->get(), err.dump());}},ProcessOptions { .allowVfork = false });from = std::move(fromPipe.readSide);to = std::move(toPipe.writeSide);debug("created worker process %d", pid);}
auto lockedFlake = lockFlake(state, flakeRef,LockFlags {.updateLockFile = false,.useRegistries = false,.allowMutable = false,});
/* Check whether the existing worker process is still there. */auto s = readLine(from.get());if (s == "restart") {pid = -1;continue;} else if (s != "next") {auto json = nlohmann::json::parse(s);throw Error("worker error: %s", (std::string) json["error"]);}
auto vOutputs = (*vFlake->attrs->get(state.symbols.create("outputs")))->value;state.forceValue(*vOutputs);
while (true) {checkInterrupt();auto state(state_.lock());if ((state->todo.empty() && state->active.empty()) || state->exc) {writeLine(to.get(), "exit");return;}if (!state->todo.empty()) {attrPath = *state->todo.begin();state->todo.erase(state->todo.begin());state->active.insert(attrPath);break;} elsestate.wait(wakeup);}/* Tell the worker to evaluate it. */writeLine(to.get(), "do " + attrPath);/* Wait for the response. */auto response = nlohmann::json::parse(readLine(from.get()));/* Handle the response. */StringSet newAttrs;if (response.find("job") != response.end()) {auto state(state_.lock());state->jobs[attrPath] = response["job"];}
auto aHydraJobs = vOutputs->attrs->get(state.symbols.create("hydraJobs"));if (!aHydraJobs)aHydraJobs = vOutputs->attrs->get(state.symbols.create("checks"));if (!aHydraJobs)throw Error("flake '%s' does not provide any Hydra jobs or checks", flakeRef);
if (response.find("attrs") != response.end()) {for (auto & i : response["attrs"]) {auto s = (attrPath.empty() ? "" : attrPath + ".") + (std::string) i;newAttrs.insert(s);}}if (response.find("error") != response.end()) {auto state(state_.lock());state->jobs[attrPath]["error"] = response["error"];}
v = *(*aHydraJobs)->value;
/* Add newly discovered job names to the queue. */{auto state(state_.lock());state->active.erase(attrPath);for (auto & s : newAttrs)state->todo.insert(s);wakeup.notify_all();}}} catch (...) {auto state(state_.lock());state->exc = std::current_exception();wakeup.notify_all();}};
} else {state.evalFile(lookupFileArg(state, myArgs.releaseExpr), v);
std::vector<std::thread> threads;for (size_t i = 0; i < nrWorkers; i++)threads.emplace_back(std::thread(handler));for (auto & thread : threads)thread.join();auto state(state_.lock());if (state->exc)std::rethrow_exception(state->exc);/* For aggregate jobs that have named consistuents(i.e. constituents that are a job name rather than aderivation), look up the referenced job and add it to thedependencies of the aggregate derivation. */auto store = openStore();for (auto i = state->jobs.begin(); i != state->jobs.end(); ++i) {auto jobName = i.key();auto & job = i.value();auto named = job.find("namedConstituents");if (named == job.end()) continue;if (myArgs.dryRun) {for (std::string jobName2 : *named) {auto job2 = state->jobs.find(jobName2);if (job2 == state->jobs.end())throw Error("aggregate job '%s' references non-existent job '%s'", jobName, jobName2);std::string drvPath2 = (*job2)["drvPath"];job["constituents"].push_back(drvPath2);}} else {std::string drvPath = job["drvPath"];auto drv = readDerivation(*store, drvPath);for (std::string jobName2 : *named) {auto job2 = state->jobs.find(jobName2);if (job2 == state->jobs.end())throw Error("aggregate job '%s' references non-existent job '%s'", jobName, jobName2);std::string drvPath2 = (*job2)["drvPath"];auto drv2 = readDerivation(*store, drvPath2);job["constituents"].push_back(drvPath2);drv.inputDrvs[store->parseStorePath(drvPath2)] = {drv2.outputs.begin()->first};}std::string drvName(store->parseStorePath(drvPath).name());assert(hasSuffix(drvName, drvExtension));drvName.resize(drvName.size() - drvExtension.size());auto h = hashDerivationModulo(*store, drv, true);auto outPath = store->makeOutputPath("out", h, drvName);drv.env["out"] = store->printStorePath(outPath);drv.outputs.insert_or_assign("out", DerivationOutput(outPath.clone(), "", ""));auto newDrvPath = store->printStorePath(writeDerivation(store, drv, drvName));debug("rewrote aggregate derivation %s -> %s", drvPath, newDrvPath);job["drvPath"] = newDrvPath;job["outputs"]["out"] = store->printStorePath(outPath);}job.erase("namedConstituents");