123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- /*
- Minetest
- Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 2.1 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public License along
- with this program; if not, write to the Free Software Foundation, Inc.,
- 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
- #include <cstdio>
- #include <cstdlib>
- extern "C" {
- #include <lua.h>
- #include <lauxlib.h>
- #include <lualib.h>
- }
- #include "server.h"
- #include "s_async.h"
- #include "log.h"
- #include "filesys.h"
- #include "porting.h"
- #include "common/c_internal.h"
- #include "common/c_packer.h"
- #include "lua_api/l_base.h"
- /******************************************************************************/
- AsyncEngine::~AsyncEngine()
- {
- // Request all threads to stop
- for (AsyncWorkerThread *workerThread : workerThreads) {
- workerThread->stop();
- }
- // Wake up all threads
- for (auto it : workerThreads) {
- (void)it;
- jobQueueCounter.post();
- }
- // Wait for threads to finish
- for (AsyncWorkerThread *workerThread : workerThreads) {
- workerThread->wait();
- }
- // Force kill all threads
- for (AsyncWorkerThread *workerThread : workerThreads) {
- delete workerThread;
- }
- jobQueueMutex.lock();
- jobQueue.clear();
- jobQueueMutex.unlock();
- workerThreads.clear();
- }
- /******************************************************************************/
- void AsyncEngine::registerStateInitializer(StateInitializer func)
- {
- FATAL_ERROR_IF(initDone, "Initializer may not be registered after init");
- stateInitializers.push_back(func);
- }
- /******************************************************************************/
- void AsyncEngine::initialize(unsigned int numEngines)
- {
- initDone = true;
- if (numEngines == 0) {
- // Leave one core for the main thread and one for whatever else
- autoscaleMaxWorkers = Thread::getNumberOfProcessors();
- if (autoscaleMaxWorkers >= 2)
- autoscaleMaxWorkers -= 2;
- infostream << "AsyncEngine: using at most " << autoscaleMaxWorkers
- << " threads with automatic scaling" << std::endl;
- addWorkerThread();
- } else {
- for (unsigned int i = 0; i < numEngines; i++)
- addWorkerThread();
- }
- }
- void AsyncEngine::addWorkerThread()
- {
- AsyncWorkerThread *toAdd = new AsyncWorkerThread(this,
- std::string("AsyncWorker-") + itos(workerThreads.size()));
- workerThreads.push_back(toAdd);
- toAdd->start();
- }
- /******************************************************************************/
- u32 AsyncEngine::queueAsyncJob(std::string &&func, std::string &¶ms,
- const std::string &mod_origin)
- {
- MutexAutoLock autolock(jobQueueMutex);
- u32 jobId = jobIdCounter++;
- jobQueue.emplace_back();
- auto &to_add = jobQueue.back();
- to_add.id = jobId;
- to_add.function = std::move(func);
- to_add.params = std::move(params);
- to_add.mod_origin = mod_origin;
- jobQueueCounter.post();
- return jobId;
- }
- u32 AsyncEngine::queueAsyncJob(std::string &&func, PackedValue *params,
- const std::string &mod_origin)
- {
- MutexAutoLock autolock(jobQueueMutex);
- u32 jobId = jobIdCounter++;
- jobQueue.emplace_back();
- auto &to_add = jobQueue.back();
- to_add.id = jobId;
- to_add.function = std::move(func);
- to_add.params_ext.reset(params);
- to_add.mod_origin = mod_origin;
- jobQueueCounter.post();
- return jobId;
- }
- /******************************************************************************/
- bool AsyncEngine::getJob(LuaJobInfo *job)
- {
- jobQueueCounter.wait();
- jobQueueMutex.lock();
- bool retval = false;
- if (!jobQueue.empty()) {
- *job = std::move(jobQueue.front());
- jobQueue.pop_front();
- retval = true;
- }
- jobQueueMutex.unlock();
- return retval;
- }
- /******************************************************************************/
- void AsyncEngine::putJobResult(LuaJobInfo &&result)
- {
- resultQueueMutex.lock();
- resultQueue.emplace_back(std::move(result));
- resultQueueMutex.unlock();
- }
- /******************************************************************************/
- void AsyncEngine::step(lua_State *L)
- {
- stepJobResults(L);
- stepAutoscale();
- }
- void AsyncEngine::stepJobResults(lua_State *L)
- {
- int error_handler = PUSH_ERROR_HANDLER(L);
- lua_getglobal(L, "core");
- ScriptApiBase *script = ModApiBase::getScriptApiBase(L);
- MutexAutoLock autolock(resultQueueMutex);
- while (!resultQueue.empty()) {
- LuaJobInfo j = std::move(resultQueue.front());
- resultQueue.pop_front();
- lua_getfield(L, -1, "async_event_handler");
- if (lua_isnil(L, -1))
- FATAL_ERROR("Async event handler does not exist!");
- luaL_checktype(L, -1, LUA_TFUNCTION);
- lua_pushinteger(L, j.id);
- if (j.result_ext)
- script_unpack(L, j.result_ext.get());
- else
- lua_pushlstring(L, j.result.data(), j.result.size());
- // Call handler
- const char *origin = j.mod_origin.empty() ? nullptr : j.mod_origin.c_str();
- script->setOriginDirect(origin);
- int result = lua_pcall(L, 2, 0, error_handler);
- if (result)
- script_error(L, result, origin, "<async>");
- }
- lua_pop(L, 2); // Pop core and error handler
- }
- void AsyncEngine::stepAutoscale()
- {
- if (workerThreads.size() >= autoscaleMaxWorkers)
- return;
- MutexAutoLock autolock(jobQueueMutex);
- // 2) If the timer elapsed, check again
- if (autoscaleTimer && porting::getTimeMs() >= autoscaleTimer) {
- autoscaleTimer = 0;
- // Determine overlap with previous snapshot
- unsigned int n = 0;
- for (const auto &it : jobQueue)
- n += autoscaleSeenJobs.count(it.id);
- autoscaleSeenJobs.clear();
- infostream << "AsyncEngine: " << n << " jobs were still waiting after 1s" << std::endl;
- // Start this many new threads
- while (workerThreads.size() < autoscaleMaxWorkers && n > 0) {
- addWorkerThread();
- n--;
- }
- return;
- }
- // 1) Check if there's anything in the queue
- if (!autoscaleTimer && !jobQueue.empty()) {
- // Take a snapshot of all jobs we have seen
- for (const auto &it : jobQueue)
- autoscaleSeenJobs.emplace(it.id);
- // and set a timer for 1 second
- autoscaleTimer = porting::getTimeMs() + 1000;
- }
- }
- /******************************************************************************/
- bool AsyncEngine::prepareEnvironment(lua_State* L, int top)
- {
- for (StateInitializer &stateInitializer : stateInitializers) {
- stateInitializer(L, top);
- }
- auto *script = ModApiBase::getScriptApiBase(L);
- try {
- script->loadMod(Server::getBuiltinLuaPath() + DIR_DELIM + "init.lua",
- BUILTIN_MOD_NAME);
- script->checkSetByBuiltin();
- } catch (const ModError &e) {
- errorstream << "Execution of async base environment failed: "
- << e.what() << std::endl;
- FATAL_ERROR("Execution of async base environment failed");
- }
- // Load per mod stuff
- if (server) {
- const auto &list = server->m_async_init_files;
- try {
- for (auto &it : list)
- script->loadMod(it.second, it.first);
- } catch (const ModError &e) {
- errorstream << "Failed to load mod script inside async environment." << std::endl;
- server->setAsyncFatalError(e.what());
- return false;
- }
- }
- return true;
- }
- /******************************************************************************/
- AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
- const std::string &name) :
- ScriptApiBase(ScriptingType::Async),
- Thread(name),
- jobDispatcher(jobDispatcher)
- {
- lua_State *L = getStack();
- if (jobDispatcher->server) {
- setGameDef(jobDispatcher->server);
- if (g_settings->getBool("secure.enable_security"))
- initializeSecurity();
- }
- // Prepare job lua environment
- lua_getglobal(L, "core");
- int top = lua_gettop(L);
- // Push builtin initialization type
- lua_pushstring(L, jobDispatcher->server ? "async_game" : "async");
- lua_setglobal(L, "INIT");
- if (!jobDispatcher->prepareEnvironment(L, top)) {
- // can't throw from here so we're stuck with this
- isErrored = true;
- }
- }
- /******************************************************************************/
- AsyncWorkerThread::~AsyncWorkerThread()
- {
- sanity_check(!isRunning());
- }
- /******************************************************************************/
- void* AsyncWorkerThread::run()
- {
- if (isErrored)
- return nullptr;
- lua_State *L = getStack();
- int error_handler = PUSH_ERROR_HANDLER(L);
- auto report_error = [this] (const ModError &e) {
- if (jobDispatcher->server)
- jobDispatcher->server->setAsyncFatalError(e.what());
- else
- errorstream << e.what() << std::endl;
- };
- lua_getglobal(L, "core");
- if (lua_isnil(L, -1)) {
- FATAL_ERROR("Unable to find core within async environment!");
- }
- // Main loop
- LuaJobInfo j;
- while (!stopRequested()) {
- // Wait for job
- if (!jobDispatcher->getJob(&j) || stopRequested())
- continue;
- const bool use_ext = !!j.params_ext;
- lua_getfield(L, -1, "job_processor");
- if (lua_isnil(L, -1))
- FATAL_ERROR("Unable to get async job processor!");
- luaL_checktype(L, -1, LUA_TFUNCTION);
- if (luaL_loadbuffer(L, j.function.data(), j.function.size(), "=(async)")) {
- errorstream << "ASYNC WORKER: Unable to deserialize function" << std::endl;
- lua_pushnil(L);
- }
- if (use_ext)
- script_unpack(L, j.params_ext.get());
- else
- lua_pushlstring(L, j.params.data(), j.params.size());
- // Call it
- setOriginDirect(j.mod_origin.empty() ? nullptr : j.mod_origin.c_str());
- int result = lua_pcall(L, 2, 1, error_handler);
- if (result) {
- try {
- scriptError(result, "<async>");
- } catch (const ModError &e) {
- report_error(e);
- }
- } else {
- // Fetch result
- if (use_ext) {
- try {
- j.result_ext.reset(script_pack(L, -1));
- } catch (const ModError &e) {
- report_error(e);
- result = LUA_ERRERR;
- }
- } else {
- size_t length;
- const char *retval = lua_tolstring(L, -1, &length);
- j.result.assign(retval, length);
- }
- }
- lua_pop(L, 1); // Pop retval
- // Put job result
- if (result == 0)
- jobDispatcher->putJobResult(std::move(j));
- }
- lua_pop(L, 2); // Pop core and error handler
- return 0;
- }
|