pragma once#include <atomic>#include "sync.hh"#include "types.hh"namespace nix {MakeError(NoTokens, Error);/* This class hands out tokens. There are only ‘maxTokens’ tokensavailable. Calling get(N) will return a Token object, representingownership of N tokens. If the requested number of tokens isunavailable, get() will sleep until another thread returns atoken. */class TokenServer{const size_t maxTokens;Sync<size_t> inUse{0};std::condition_variable wakeup;public:TokenServer(size_t maxTokens) : maxTokens(maxTokens) { }class Token{friend TokenServer;TokenServer * ts;size_t tokens;bool acquired = false;Token(TokenServer * ts, size_t tokens, unsigned int timeout): ts(ts), tokens(tokens){if (tokens >= ts->maxTokens)throw NoTokens("requesting more tokens (%d) than exist (%d)", tokens, ts->maxTokens);debug("acquiring %d tokens", tokens);auto inUse(ts->inUse.lock());while (*inUse + tokens > ts->maxTokens)if (timeout) {if (!inUse.wait_for(ts->wakeup, std::chrono::seconds(timeout),[&]() { return *inUse + tokens <= ts->maxTokens; }))return;} elseinUse.wait(ts->wakeup);*inUse += tokens;acquired = true;}public:Token(Token && t) : ts(t.ts), tokens(t.tokens), acquired(t.acquired){t.ts = 0;t.acquired = false;}Token(const Token & l) = delete;~Token(){if (!ts || !acquired) return;{auto inUse(ts->inUse.lock());assert(*inUse >= t);*inUse -= t;tokens -= t;}// FIXME: inefficient. Should wake up waiters that can// proceed now.ts->wakeup.notify_all();}};Token get(size_t tokens = 1, unsigned int timeout = 0){return Token(this, tokens, timeout);}size_t currentUse(){auto inUse_(inUse.lock());return *inUse_;}};}size_t capacity(){return maxTokens;}give_back(tokens);}bool operator ()() { return acquired; }void give_back(size_t t){debug("returning %d tokens", t);if (!t) return;assert(acquired);assert(t <= tokens);
/* Block until we have the required amount of memoryavailable, which is twice the NAR size (namely theuncompressed and worst-case compressed NAR), plus 150MB for xz compression overhead. (The xz manpage claims~94 MiB, but that's not was I'm seeing.) */auto resStart = std::chrono::steady_clock::now();size_t compressionCost = totalNarSize + 150 * 1024 * 1024;result.tokens = std::make_unique<nix::TokenServer::Token>(memoryTokens.get(totalNarSize + compressionCost));auto resStop = std::chrono::steady_clock::now();auto resMs = std::chrono::duration_cast<std::chrono::milliseconds>(resStop - resStart).count();if (resMs >= 1000)printMsg(lvlError, "warning: had to wait %d ms for %d memory tokens for %s",resMs, totalNarSize, localStore->printStorePath(step->drvPath));
/* Token server to prevent threads from allocating too many bigstrings concurrently while importing NARs from the buildmachines. When a thread imports a NAR of size N, it will firstacquire N memory tokens, causing it to block until that manytokens are available. */nix::TokenServer memoryTokens;