s_async.cpp 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. /*
  2. Minetest
  3. Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
  4. This program is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU Lesser General Public License as published by
  6. the Free Software Foundation; either version 2.1 of the License, or
  7. (at your option) any later version.
  8. This program is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. GNU Lesser General Public License for more details.
  12. You should have received a copy of the GNU Lesser General Public License along
  13. with this program; if not, write to the Free Software Foundation, Inc.,
  14. 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  15. */
  16. #include <cstdio>
  17. #include <cstdlib>
  18. extern "C" {
  19. #include <lua.h>
  20. #include <lauxlib.h>
  21. #include <lualib.h>
  22. }
  23. #include "server.h"
  24. #include "s_async.h"
  25. #include "log.h"
  26. #include "filesys.h"
  27. #include "porting.h"
  28. #include "common/c_internal.h"
  29. #include "common/c_packer.h"
  30. #include "lua_api/l_base.h"
  31. /******************************************************************************/
  32. AsyncEngine::~AsyncEngine()
  33. {
  34. // Request all threads to stop
  35. for (AsyncWorkerThread *workerThread : workerThreads) {
  36. workerThread->stop();
  37. }
  38. // Wake up all threads
  39. for (auto it : workerThreads) {
  40. (void)it;
  41. jobQueueCounter.post();
  42. }
  43. // Wait for threads to finish
  44. for (AsyncWorkerThread *workerThread : workerThreads) {
  45. workerThread->wait();
  46. }
  47. // Force kill all threads
  48. for (AsyncWorkerThread *workerThread : workerThreads) {
  49. delete workerThread;
  50. }
  51. jobQueueMutex.lock();
  52. jobQueue.clear();
  53. jobQueueMutex.unlock();
  54. workerThreads.clear();
  55. }
  56. /******************************************************************************/
  57. void AsyncEngine::registerStateInitializer(StateInitializer func)
  58. {
  59. FATAL_ERROR_IF(initDone, "Initializer may not be registered after init");
  60. stateInitializers.push_back(func);
  61. }
  62. /******************************************************************************/
  63. void AsyncEngine::initialize(unsigned int numEngines)
  64. {
  65. initDone = true;
  66. if (numEngines == 0) {
  67. // Leave one core for the main thread and one for whatever else
  68. autoscaleMaxWorkers = Thread::getNumberOfProcessors();
  69. if (autoscaleMaxWorkers >= 2)
  70. autoscaleMaxWorkers -= 2;
  71. infostream << "AsyncEngine: using at most " << autoscaleMaxWorkers
  72. << " threads with automatic scaling" << std::endl;
  73. addWorkerThread();
  74. } else {
  75. for (unsigned int i = 0; i < numEngines; i++)
  76. addWorkerThread();
  77. }
  78. }
  79. void AsyncEngine::addWorkerThread()
  80. {
  81. AsyncWorkerThread *toAdd = new AsyncWorkerThread(this,
  82. std::string("AsyncWorker-") + itos(workerThreads.size()));
  83. workerThreads.push_back(toAdd);
  84. toAdd->start();
  85. }
  86. /******************************************************************************/
  87. u32 AsyncEngine::queueAsyncJob(std::string &&func, std::string &&params,
  88. const std::string &mod_origin)
  89. {
  90. MutexAutoLock autolock(jobQueueMutex);
  91. u32 jobId = jobIdCounter++;
  92. jobQueue.emplace_back();
  93. auto &to_add = jobQueue.back();
  94. to_add.id = jobId;
  95. to_add.function = std::move(func);
  96. to_add.params = std::move(params);
  97. to_add.mod_origin = mod_origin;
  98. jobQueueCounter.post();
  99. return jobId;
  100. }
  101. u32 AsyncEngine::queueAsyncJob(std::string &&func, PackedValue *params,
  102. const std::string &mod_origin)
  103. {
  104. MutexAutoLock autolock(jobQueueMutex);
  105. u32 jobId = jobIdCounter++;
  106. jobQueue.emplace_back();
  107. auto &to_add = jobQueue.back();
  108. to_add.id = jobId;
  109. to_add.function = std::move(func);
  110. to_add.params_ext.reset(params);
  111. to_add.mod_origin = mod_origin;
  112. jobQueueCounter.post();
  113. return jobId;
  114. }
  115. /******************************************************************************/
  116. bool AsyncEngine::getJob(LuaJobInfo *job)
  117. {
  118. jobQueueCounter.wait();
  119. jobQueueMutex.lock();
  120. bool retval = false;
  121. if (!jobQueue.empty()) {
  122. *job = std::move(jobQueue.front());
  123. jobQueue.pop_front();
  124. retval = true;
  125. }
  126. jobQueueMutex.unlock();
  127. return retval;
  128. }
  129. /******************************************************************************/
  130. void AsyncEngine::putJobResult(LuaJobInfo &&result)
  131. {
  132. resultQueueMutex.lock();
  133. resultQueue.emplace_back(std::move(result));
  134. resultQueueMutex.unlock();
  135. }
  136. /******************************************************************************/
  137. void AsyncEngine::step(lua_State *L)
  138. {
  139. stepJobResults(L);
  140. stepAutoscale();
  141. }
  142. void AsyncEngine::stepJobResults(lua_State *L)
  143. {
  144. int error_handler = PUSH_ERROR_HANDLER(L);
  145. lua_getglobal(L, "core");
  146. ScriptApiBase *script = ModApiBase::getScriptApiBase(L);
  147. MutexAutoLock autolock(resultQueueMutex);
  148. while (!resultQueue.empty()) {
  149. LuaJobInfo j = std::move(resultQueue.front());
  150. resultQueue.pop_front();
  151. lua_getfield(L, -1, "async_event_handler");
  152. if (lua_isnil(L, -1))
  153. FATAL_ERROR("Async event handler does not exist!");
  154. luaL_checktype(L, -1, LUA_TFUNCTION);
  155. lua_pushinteger(L, j.id);
  156. if (j.result_ext)
  157. script_unpack(L, j.result_ext.get());
  158. else
  159. lua_pushlstring(L, j.result.data(), j.result.size());
  160. // Call handler
  161. const char *origin = j.mod_origin.empty() ? nullptr : j.mod_origin.c_str();
  162. script->setOriginDirect(origin);
  163. int result = lua_pcall(L, 2, 0, error_handler);
  164. if (result)
  165. script_error(L, result, origin, "<async>");
  166. }
  167. lua_pop(L, 2); // Pop core and error handler
  168. }
  169. void AsyncEngine::stepAutoscale()
  170. {
  171. if (workerThreads.size() >= autoscaleMaxWorkers)
  172. return;
  173. MutexAutoLock autolock(jobQueueMutex);
  174. // 2) If the timer elapsed, check again
  175. if (autoscaleTimer && porting::getTimeMs() >= autoscaleTimer) {
  176. autoscaleTimer = 0;
  177. // Determine overlap with previous snapshot
  178. unsigned int n = 0;
  179. for (const auto &it : jobQueue)
  180. n += autoscaleSeenJobs.count(it.id);
  181. autoscaleSeenJobs.clear();
  182. infostream << "AsyncEngine: " << n << " jobs were still waiting after 1s" << std::endl;
  183. // Start this many new threads
  184. while (workerThreads.size() < autoscaleMaxWorkers && n > 0) {
  185. addWorkerThread();
  186. n--;
  187. }
  188. return;
  189. }
  190. // 1) Check if there's anything in the queue
  191. if (!autoscaleTimer && !jobQueue.empty()) {
  192. // Take a snapshot of all jobs we have seen
  193. for (const auto &it : jobQueue)
  194. autoscaleSeenJobs.emplace(it.id);
  195. // and set a timer for 1 second
  196. autoscaleTimer = porting::getTimeMs() + 1000;
  197. }
  198. }
  199. /******************************************************************************/
  200. bool AsyncEngine::prepareEnvironment(lua_State* L, int top)
  201. {
  202. for (StateInitializer &stateInitializer : stateInitializers) {
  203. stateInitializer(L, top);
  204. }
  205. auto *script = ModApiBase::getScriptApiBase(L);
  206. try {
  207. script->loadMod(Server::getBuiltinLuaPath() + DIR_DELIM + "init.lua",
  208. BUILTIN_MOD_NAME);
  209. script->checkSetByBuiltin();
  210. } catch (const ModError &e) {
  211. errorstream << "Execution of async base environment failed: "
  212. << e.what() << std::endl;
  213. FATAL_ERROR("Execution of async base environment failed");
  214. }
  215. // Load per mod stuff
  216. if (server) {
  217. const auto &list = server->m_async_init_files;
  218. try {
  219. for (auto &it : list)
  220. script->loadMod(it.second, it.first);
  221. } catch (const ModError &e) {
  222. errorstream << "Failed to load mod script inside async environment." << std::endl;
  223. server->setAsyncFatalError(e.what());
  224. return false;
  225. }
  226. }
  227. return true;
  228. }
  229. /******************************************************************************/
  230. AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
  231. const std::string &name) :
  232. ScriptApiBase(ScriptingType::Async),
  233. Thread(name),
  234. jobDispatcher(jobDispatcher)
  235. {
  236. lua_State *L = getStack();
  237. if (jobDispatcher->server) {
  238. setGameDef(jobDispatcher->server);
  239. if (g_settings->getBool("secure.enable_security"))
  240. initializeSecurity();
  241. }
  242. // Prepare job lua environment
  243. lua_getglobal(L, "core");
  244. int top = lua_gettop(L);
  245. // Push builtin initialization type
  246. lua_pushstring(L, jobDispatcher->server ? "async_game" : "async");
  247. lua_setglobal(L, "INIT");
  248. if (!jobDispatcher->prepareEnvironment(L, top)) {
  249. // can't throw from here so we're stuck with this
  250. isErrored = true;
  251. }
  252. }
  253. /******************************************************************************/
  254. AsyncWorkerThread::~AsyncWorkerThread()
  255. {
  256. sanity_check(!isRunning());
  257. }
  258. /******************************************************************************/
  259. void* AsyncWorkerThread::run()
  260. {
  261. if (isErrored)
  262. return nullptr;
  263. lua_State *L = getStack();
  264. int error_handler = PUSH_ERROR_HANDLER(L);
  265. auto report_error = [this] (const ModError &e) {
  266. if (jobDispatcher->server)
  267. jobDispatcher->server->setAsyncFatalError(e.what());
  268. else
  269. errorstream << e.what() << std::endl;
  270. };
  271. lua_getglobal(L, "core");
  272. if (lua_isnil(L, -1)) {
  273. FATAL_ERROR("Unable to find core within async environment!");
  274. }
  275. // Main loop
  276. LuaJobInfo j;
  277. while (!stopRequested()) {
  278. // Wait for job
  279. if (!jobDispatcher->getJob(&j) || stopRequested())
  280. continue;
  281. const bool use_ext = !!j.params_ext;
  282. lua_getfield(L, -1, "job_processor");
  283. if (lua_isnil(L, -1))
  284. FATAL_ERROR("Unable to get async job processor!");
  285. luaL_checktype(L, -1, LUA_TFUNCTION);
  286. if (luaL_loadbuffer(L, j.function.data(), j.function.size(), "=(async)")) {
  287. errorstream << "ASYNC WORKER: Unable to deserialize function" << std::endl;
  288. lua_pushnil(L);
  289. }
  290. if (use_ext)
  291. script_unpack(L, j.params_ext.get());
  292. else
  293. lua_pushlstring(L, j.params.data(), j.params.size());
  294. // Call it
  295. setOriginDirect(j.mod_origin.empty() ? nullptr : j.mod_origin.c_str());
  296. int result = lua_pcall(L, 2, 1, error_handler);
  297. if (result) {
  298. try {
  299. scriptError(result, "<async>");
  300. } catch (const ModError &e) {
  301. report_error(e);
  302. }
  303. } else {
  304. // Fetch result
  305. if (use_ext) {
  306. try {
  307. j.result_ext.reset(script_pack(L, -1));
  308. } catch (const ModError &e) {
  309. report_error(e);
  310. result = LUA_ERRERR;
  311. }
  312. } else {
  313. size_t length;
  314. const char *retval = lua_tolstring(L, -1, &length);
  315. j.result.assign(retval, length);
  316. }
  317. }
  318. lua_pop(L, 1); // Pop retval
  319. // Put job result
  320. if (result == 0)
  321. jobDispatcher->putJobResult(std::move(j));
  322. }
  323. lua_pop(L, 2); // Pop core and error handler
  324. return 0;
  325. }