s_async.cpp 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. /*
  2. Minetest
  3. Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
  4. This program is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU Lesser General Public License as published by
  6. the Free Software Foundation; either version 2.1 of the License, or
  7. (at your option) any later version.
  8. This program is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. GNU Lesser General Public License for more details.
  12. You should have received a copy of the GNU Lesser General Public License along
  13. with this program; if not, write to the Free Software Foundation, Inc.,
  14. 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  15. */
  16. #include <cstdio>
  17. #include <cstdlib>
  18. extern "C" {
  19. #include "lua.h"
  20. #include "lauxlib.h"
  21. #include "lualib.h"
  22. }
  23. #include "server.h"
  24. #include "s_async.h"
  25. #include "log.h"
  26. #include "filesys.h"
  27. #include "porting.h"
  28. #include "common/c_internal.h"
  29. /******************************************************************************/
  30. AsyncEngine::~AsyncEngine()
  31. {
  32. // Request all threads to stop
  33. for (AsyncWorkerThread *workerThread : workerThreads) {
  34. workerThread->stop();
  35. }
  36. // Wake up all threads
  37. for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
  38. it != workerThreads.end(); ++it) {
  39. jobQueueCounter.post();
  40. }
  41. // Wait for threads to finish
  42. for (AsyncWorkerThread *workerThread : workerThreads) {
  43. workerThread->wait();
  44. }
  45. // Force kill all threads
  46. for (AsyncWorkerThread *workerThread : workerThreads) {
  47. delete workerThread;
  48. }
  49. jobQueueMutex.lock();
  50. jobQueue.clear();
  51. jobQueueMutex.unlock();
  52. workerThreads.clear();
  53. }
  54. /******************************************************************************/
  55. void AsyncEngine::registerStateInitializer(StateInitializer func)
  56. {
  57. stateInitializers.push_back(func);
  58. }
  59. /******************************************************************************/
  60. void AsyncEngine::initialize(unsigned int numEngines)
  61. {
  62. initDone = true;
  63. for (unsigned int i = 0; i < numEngines; i++) {
  64. AsyncWorkerThread *toAdd = new AsyncWorkerThread(this,
  65. std::string("AsyncWorker-") + itos(i));
  66. workerThreads.push_back(toAdd);
  67. toAdd->start();
  68. }
  69. }
  70. /******************************************************************************/
  71. unsigned int AsyncEngine::queueAsyncJob(const std::string &func,
  72. const std::string &params)
  73. {
  74. jobQueueMutex.lock();
  75. LuaJobInfo toAdd;
  76. toAdd.id = jobIdCounter++;
  77. toAdd.serializedFunction = func;
  78. toAdd.serializedParams = params;
  79. jobQueue.push_back(toAdd);
  80. jobQueueCounter.post();
  81. jobQueueMutex.unlock();
  82. return toAdd.id;
  83. }
  84. /******************************************************************************/
  85. LuaJobInfo AsyncEngine::getJob()
  86. {
  87. jobQueueCounter.wait();
  88. jobQueueMutex.lock();
  89. LuaJobInfo retval;
  90. if (!jobQueue.empty()) {
  91. retval = jobQueue.front();
  92. jobQueue.pop_front();
  93. retval.valid = true;
  94. }
  95. jobQueueMutex.unlock();
  96. return retval;
  97. }
  98. /******************************************************************************/
  99. void AsyncEngine::putJobResult(const LuaJobInfo &result)
  100. {
  101. resultQueueMutex.lock();
  102. resultQueue.push_back(result);
  103. resultQueueMutex.unlock();
  104. }
  105. /******************************************************************************/
  106. void AsyncEngine::step(lua_State *L)
  107. {
  108. int error_handler = PUSH_ERROR_HANDLER(L);
  109. lua_getglobal(L, "core");
  110. resultQueueMutex.lock();
  111. while (!resultQueue.empty()) {
  112. LuaJobInfo jobDone = resultQueue.front();
  113. resultQueue.pop_front();
  114. lua_getfield(L, -1, "async_event_handler");
  115. if (lua_isnil(L, -1)) {
  116. FATAL_ERROR("Async event handler does not exist!");
  117. }
  118. luaL_checktype(L, -1, LUA_TFUNCTION);
  119. lua_pushinteger(L, jobDone.id);
  120. lua_pushlstring(L, jobDone.serializedResult.data(),
  121. jobDone.serializedResult.size());
  122. PCALL_RESL(L, lua_pcall(L, 2, 0, error_handler));
  123. }
  124. resultQueueMutex.unlock();
  125. lua_pop(L, 2); // Pop core and error handler
  126. }
  127. /******************************************************************************/
  128. void AsyncEngine::pushFinishedJobs(lua_State* L) {
  129. // Result Table
  130. MutexAutoLock l(resultQueueMutex);
  131. unsigned int index = 1;
  132. lua_createtable(L, resultQueue.size(), 0);
  133. int top = lua_gettop(L);
  134. while (!resultQueue.empty()) {
  135. LuaJobInfo jobDone = resultQueue.front();
  136. resultQueue.pop_front();
  137. lua_createtable(L, 0, 2); // Pre-allocate space for two map fields
  138. int top_lvl2 = lua_gettop(L);
  139. lua_pushstring(L, "jobid");
  140. lua_pushnumber(L, jobDone.id);
  141. lua_settable(L, top_lvl2);
  142. lua_pushstring(L, "retval");
  143. lua_pushlstring(L, jobDone.serializedResult.data(),
  144. jobDone.serializedResult.size());
  145. lua_settable(L, top_lvl2);
  146. lua_rawseti(L, top, index++);
  147. }
  148. }
  149. /******************************************************************************/
  150. void AsyncEngine::prepareEnvironment(lua_State* L, int top)
  151. {
  152. for (StateInitializer &stateInitializer : stateInitializers) {
  153. stateInitializer(L, top);
  154. }
  155. }
  156. /******************************************************************************/
  157. AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
  158. const std::string &name) :
  159. Thread(name),
  160. ScriptApiBase(ScriptingType::Async),
  161. jobDispatcher(jobDispatcher)
  162. {
  163. lua_State *L = getStack();
  164. // Prepare job lua environment
  165. lua_getglobal(L, "core");
  166. int top = lua_gettop(L);
  167. // Push builtin initialization type
  168. lua_pushstring(L, "async");
  169. lua_setglobal(L, "INIT");
  170. jobDispatcher->prepareEnvironment(L, top);
  171. }
  172. /******************************************************************************/
  173. AsyncWorkerThread::~AsyncWorkerThread()
  174. {
  175. sanity_check(!isRunning());
  176. }
  177. /******************************************************************************/
  178. void* AsyncWorkerThread::run()
  179. {
  180. lua_State *L = getStack();
  181. std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua";
  182. try {
  183. loadScript(script);
  184. } catch (const ModError &e) {
  185. errorstream << "Execution of async base environment failed: "
  186. << e.what() << std::endl;
  187. FATAL_ERROR("Execution of async base environment failed");
  188. }
  189. int error_handler = PUSH_ERROR_HANDLER(L);
  190. lua_getglobal(L, "core");
  191. if (lua_isnil(L, -1)) {
  192. FATAL_ERROR("Unable to find core within async environment!");
  193. }
  194. // Main loop
  195. while (!stopRequested()) {
  196. // Wait for job
  197. LuaJobInfo toProcess = jobDispatcher->getJob();
  198. if (!toProcess.valid || stopRequested()) {
  199. continue;
  200. }
  201. lua_getfield(L, -1, "job_processor");
  202. if (lua_isnil(L, -1)) {
  203. FATAL_ERROR("Unable to get async job processor!");
  204. }
  205. luaL_checktype(L, -1, LUA_TFUNCTION);
  206. // Call it
  207. lua_pushlstring(L,
  208. toProcess.serializedFunction.data(),
  209. toProcess.serializedFunction.size());
  210. lua_pushlstring(L,
  211. toProcess.serializedParams.data(),
  212. toProcess.serializedParams.size());
  213. int result = lua_pcall(L, 2, 1, error_handler);
  214. if (result) {
  215. PCALL_RES(result);
  216. toProcess.serializedResult = "";
  217. } else {
  218. // Fetch result
  219. size_t length;
  220. const char *retval = lua_tolstring(L, -1, &length);
  221. toProcess.serializedResult = std::string(retval, length);
  222. }
  223. lua_pop(L, 1); // Pop retval
  224. // Put job result
  225. jobDispatcher->putJobResult(toProcess);
  226. }
  227. lua_pop(L, 2); // Pop core and error handler
  228. return 0;
  229. }