httpfetch.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826
  1. /*
  2. Minetest
  3. Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
  4. This program is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU Lesser General Public License as published by
  6. the Free Software Foundation; either version 2.1 of the License, or
  7. (at your option) any later version.
  8. This program is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. GNU Lesser General Public License for more details.
  12. You should have received a copy of the GNU Lesser General Public License along
  13. with this program; if not, write to the Free Software Foundation, Inc.,
  14. 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  15. */
  16. #include "httpfetch.h"
  17. #include "porting.h" // for sleep_ms(), get_sysinfo(), secure_rand_fill_buf()
  18. #include <list>
  19. #include <unordered_map>
  20. #include <cerrno>
  21. #include <mutex>
  22. #include "threading/event.h"
  23. #include "config.h"
  24. #include "exceptions.h"
  25. #include "debug.h"
  26. #include "log.h"
  27. #include "porting.h"
  28. #include "util/container.h"
  29. #include "util/thread.h"
  30. #include "version.h"
  31. #include "settings.h"
  32. #include "noise.h"
  33. static std::mutex g_httpfetch_mutex;
  34. static std::unordered_map<u64, std::queue<HTTPFetchResult>>
  35. g_httpfetch_results;
  36. static PcgRandom g_callerid_randomness;
  37. HTTPFetchRequest::HTTPFetchRequest() :
  38. timeout(g_settings->getS32("curl_timeout")),
  39. connect_timeout(10 * 1000),
  40. useragent(std::string(PROJECT_NAME_C "/") + g_version_hash + " (" + porting::get_sysinfo() + ")")
  41. {
  42. timeout = std::max(timeout, MIN_HTTPFETCH_TIMEOUT_INTERACTIVE);
  43. }
  44. static void httpfetch_deliver_result(const HTTPFetchResult &fetch_result)
  45. {
  46. u64 caller = fetch_result.caller;
  47. if (caller != HTTPFETCH_DISCARD) {
  48. MutexAutoLock lock(g_httpfetch_mutex);
  49. g_httpfetch_results[caller].emplace(fetch_result);
  50. }
  51. }
  52. static void httpfetch_request_clear(u64 caller);
  53. u64 httpfetch_caller_alloc()
  54. {
  55. MutexAutoLock lock(g_httpfetch_mutex);
  56. // Check each caller ID except reserved ones
  57. for (u64 caller = HTTPFETCH_CID_START; caller != 0; ++caller) {
  58. auto it = g_httpfetch_results.find(caller);
  59. if (it == g_httpfetch_results.end()) {
  60. verbosestream << "httpfetch_caller_alloc: allocating "
  61. << caller << std::endl;
  62. // Access element to create it
  63. g_httpfetch_results[caller];
  64. return caller;
  65. }
  66. }
  67. FATAL_ERROR("httpfetch_caller_alloc: ran out of caller IDs");
  68. }
  69. u64 httpfetch_caller_alloc_secure()
  70. {
  71. MutexAutoLock lock(g_httpfetch_mutex);
  72. // Generate random caller IDs and make sure they're not
  73. // already used or reserved.
  74. // Give up after 100 tries to prevent infinite loop
  75. size_t tries = 100;
  76. u64 caller;
  77. do {
  78. caller = (((u64) g_callerid_randomness.next()) << 32) |
  79. g_callerid_randomness.next();
  80. if (--tries < 1) {
  81. FATAL_ERROR("httpfetch_caller_alloc_secure: ran out of caller IDs");
  82. return HTTPFETCH_DISCARD;
  83. }
  84. } while (caller >= HTTPFETCH_CID_START &&
  85. g_httpfetch_results.find(caller) != g_httpfetch_results.end());
  86. verbosestream << "httpfetch_caller_alloc_secure: allocating "
  87. << caller << std::endl;
  88. // Access element to create it
  89. g_httpfetch_results[caller];
  90. return caller;
  91. }
  92. void httpfetch_caller_free(u64 caller)
  93. {
  94. verbosestream<<"httpfetch_caller_free: freeing "
  95. <<caller<<std::endl;
  96. httpfetch_request_clear(caller);
  97. if (caller != HTTPFETCH_DISCARD) {
  98. MutexAutoLock lock(g_httpfetch_mutex);
  99. g_httpfetch_results.erase(caller);
  100. }
  101. }
  102. bool httpfetch_async_get(u64 caller, HTTPFetchResult &fetch_result)
  103. {
  104. MutexAutoLock lock(g_httpfetch_mutex);
  105. // Check that caller exists
  106. auto it = g_httpfetch_results.find(caller);
  107. if (it == g_httpfetch_results.end())
  108. return false;
  109. // Check that result queue is nonempty
  110. std::queue<HTTPFetchResult> &caller_results = it->second;
  111. if (caller_results.empty())
  112. return false;
  113. // Pop first result
  114. fetch_result = std::move(caller_results.front());
  115. caller_results.pop();
  116. return true;
  117. }
  118. #if USE_CURL
  119. #include <curl/curl.h>
  120. /*
  121. USE_CURL is on: use cURL based httpfetch implementation
  122. */
  123. static size_t httpfetch_writefunction(
  124. char *ptr, size_t size, size_t nmemb, void *userdata)
  125. {
  126. auto *dest = reinterpret_cast<std::string*>(userdata);
  127. size_t count = size * nmemb;
  128. dest->append(ptr, count);
  129. return count;
  130. }
  131. static size_t httpfetch_discardfunction(
  132. char *ptr, size_t size, size_t nmemb, void *userdata)
  133. {
  134. return size * nmemb;
  135. }
  136. class CurlHandlePool
  137. {
  138. std::vector<CURL*> handles;
  139. public:
  140. CurlHandlePool() = default;
  141. ~CurlHandlePool()
  142. {
  143. for (CURL *it : handles) {
  144. curl_easy_cleanup(it);
  145. }
  146. }
  147. CURL * alloc()
  148. {
  149. CURL *curl;
  150. if (handles.empty()) {
  151. curl = curl_easy_init();
  152. if (!curl)
  153. throw std::bad_alloc();
  154. } else {
  155. curl = handles.back();
  156. handles.pop_back();
  157. }
  158. return curl;
  159. }
  160. void free(CURL *handle)
  161. {
  162. if (handle)
  163. handles.push_back(handle);
  164. }
  165. };
  166. class HTTPFetchOngoing
  167. {
  168. public:
  169. HTTPFetchOngoing(const HTTPFetchRequest &request, CurlHandlePool *pool);
  170. ~HTTPFetchOngoing();
  171. CURLcode start(CURLM *multi);
  172. const HTTPFetchResult * complete(CURLcode res);
  173. const HTTPFetchRequest &getRequest() const { return request; };
  174. const CURL *getEasyHandle() const { return curl; };
  175. private:
  176. CurlHandlePool *pool;
  177. CURL *curl = nullptr;
  178. CURLM *multi = nullptr;
  179. HTTPFetchRequest request;
  180. HTTPFetchResult result;
  181. struct curl_slist *http_header = nullptr;
  182. curl_mime *multipart_mime = nullptr;
  183. };
  184. HTTPFetchOngoing::HTTPFetchOngoing(const HTTPFetchRequest &request_,
  185. CurlHandlePool *pool_):
  186. pool(pool_),
  187. request(request_),
  188. result(request_)
  189. {
  190. curl = pool->alloc();
  191. if (!curl)
  192. return;
  193. // Set static cURL options
  194. curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
  195. curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
  196. curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 3);
  197. curl_easy_setopt(curl, CURLOPT_ACCEPT_ENCODING, ""); // = all supported ones
  198. std::string bind_address = g_settings->get("bind_address");
  199. if (!bind_address.empty()) {
  200. curl_easy_setopt(curl, CURLOPT_INTERFACE, bind_address.c_str());
  201. }
  202. if (!g_settings->getBool("enable_ipv6")) {
  203. curl_easy_setopt(curl, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
  204. }
  205. // Restrict protocols so that curl vulnerabilities in
  206. // other protocols don't affect us.
  207. #if LIBCURL_VERSION_NUM >= 0x075500
  208. // These settings were introduced in curl 7.85.0.
  209. const char *protocols = "HTTP,HTTPS,FTP,FTPS";
  210. curl_easy_setopt(curl, CURLOPT_PROTOCOLS_STR, protocols);
  211. curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS_STR, protocols);
  212. #elif LIBCURL_VERSION_NUM >= 0x071304
  213. // These settings were introduced in curl 7.19.4, and later deprecated.
  214. long protocols =
  215. CURLPROTO_HTTP |
  216. CURLPROTO_HTTPS |
  217. CURLPROTO_FTP |
  218. CURLPROTO_FTPS;
  219. curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
  220. curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
  221. #endif
  222. // Set cURL options based on HTTPFetchRequest
  223. curl_easy_setopt(curl, CURLOPT_URL,
  224. request.url.c_str());
  225. curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS,
  226. request.timeout);
  227. curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS,
  228. request.connect_timeout);
  229. if (!request.useragent.empty())
  230. curl_easy_setopt(curl, CURLOPT_USERAGENT, request.useragent.c_str());
  231. // Set up a write callback that writes to the
  232. // result struct, unless the data is to be discarded
  233. if (request.caller == HTTPFETCH_DISCARD) {
  234. curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
  235. httpfetch_discardfunction);
  236. curl_easy_setopt(curl, CURLOPT_WRITEDATA, nullptr);
  237. } else {
  238. curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
  239. httpfetch_writefunction);
  240. curl_easy_setopt(curl, CURLOPT_WRITEDATA, &result.data);
  241. }
  242. // Set data from fields or raw_data
  243. if (request.multipart) {
  244. multipart_mime = curl_mime_init(curl);
  245. for (auto &it : request.fields) {
  246. curl_mimepart *part = curl_mime_addpart(multipart_mime);
  247. curl_mime_name(part, it.first.c_str());
  248. curl_mime_data(part, it.second.c_str(), it.second.size());
  249. }
  250. curl_easy_setopt(curl, CURLOPT_MIMEPOST, multipart_mime);
  251. } else {
  252. switch (request.method) {
  253. case HTTP_GET:
  254. curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
  255. break;
  256. case HTTP_POST:
  257. curl_easy_setopt(curl, CURLOPT_POST, 1);
  258. break;
  259. case HTTP_PUT:
  260. curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
  261. break;
  262. case HTTP_DELETE:
  263. curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
  264. break;
  265. }
  266. if (request.method != HTTP_GET) {
  267. if (!request.raw_data.empty()) {
  268. curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE,
  269. request.raw_data.size());
  270. curl_easy_setopt(curl, CURLOPT_POSTFIELDS,
  271. request.raw_data.c_str());
  272. } else if (!request.fields.empty()) {
  273. std::string str;
  274. for (auto &field : request.fields) {
  275. if (!str.empty())
  276. str += "&";
  277. str += urlencode(field.first);
  278. str += "=";
  279. str += urlencode(field.second);
  280. }
  281. curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE,
  282. str.size());
  283. curl_easy_setopt(curl, CURLOPT_COPYPOSTFIELDS,
  284. str.c_str());
  285. }
  286. }
  287. }
  288. // Set additional HTTP headers
  289. for (const std::string &extra_header : request.extra_headers) {
  290. http_header = curl_slist_append(http_header, extra_header.c_str());
  291. }
  292. curl_easy_setopt(curl, CURLOPT_HTTPHEADER, http_header);
  293. if (!g_settings->getBool("curl_verify_cert")) {
  294. curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, false);
  295. }
  296. }
  297. CURLcode HTTPFetchOngoing::start(CURLM *multi_)
  298. {
  299. if (!curl)
  300. return CURLE_FAILED_INIT;
  301. if (!multi_) {
  302. // Easy interface (sync)
  303. return curl_easy_perform(curl);
  304. }
  305. // Multi interface (async)
  306. CURLMcode mres = curl_multi_add_handle(multi_, curl);
  307. if (mres != CURLM_OK) {
  308. errorstream << "curl_multi_add_handle"
  309. << " returned error code " << mres
  310. << std::endl;
  311. return CURLE_FAILED_INIT;
  312. }
  313. multi = multi_; // store for curl_multi_remove_handle
  314. return CURLE_OK;
  315. }
  316. const HTTPFetchResult * HTTPFetchOngoing::complete(CURLcode res)
  317. {
  318. result.succeeded = (res == CURLE_OK);
  319. result.timeout = (res == CURLE_OPERATION_TIMEDOUT);
  320. // Get HTTP/FTP response code
  321. result.response_code = 0;
  322. if (curl && (curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE,
  323. &result.response_code) != CURLE_OK)) {
  324. // We failed to get a return code, make sure it is still 0
  325. result.response_code = 0;
  326. }
  327. if (res != CURLE_OK) {
  328. errorstream << "HTTPFetch for " << request.url << " failed: "
  329. << curl_easy_strerror(res);
  330. if (result.timeout)
  331. errorstream << " (timeout = " << request.timeout << "ms)" << std::endl;
  332. errorstream << std::endl;
  333. } else if (result.response_code >= 400) {
  334. errorstream << "HTTPFetch for " << request.url
  335. << " returned response code " << result.response_code
  336. << std::endl;
  337. if (result.caller == HTTPFETCH_PRINT_ERR && !result.data.empty()) {
  338. errorstream << "Response body:" << std::endl;
  339. safe_print_string(errorstream, result.data);
  340. errorstream << std::endl;
  341. }
  342. }
  343. return &result;
  344. }
  345. HTTPFetchOngoing::~HTTPFetchOngoing()
  346. {
  347. if (multi) {
  348. CURLMcode mres = curl_multi_remove_handle(multi, curl);
  349. if (mres != CURLM_OK) {
  350. errorstream << "curl_multi_remove_handle"
  351. << " returned error code " << mres
  352. << std::endl;
  353. }
  354. }
  355. // Set safe options for the reusable cURL handle
  356. curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
  357. httpfetch_discardfunction);
  358. curl_easy_setopt(curl, CURLOPT_USERAGENT, nullptr);
  359. curl_easy_setopt(curl, CURLOPT_WRITEDATA, nullptr);
  360. curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, nullptr);
  361. curl_easy_setopt(curl, CURLOPT_POSTFIELDS, nullptr);
  362. if (http_header) {
  363. curl_easy_setopt(curl, CURLOPT_HTTPHEADER, nullptr);
  364. curl_slist_free_all(http_header);
  365. }
  366. if (multipart_mime) {
  367. curl_easy_setopt(curl, CURLOPT_MIMEPOST, nullptr);
  368. curl_mime_free(multipart_mime);
  369. }
  370. // Store the cURL handle for reuse
  371. pool->free(curl);
  372. }
  373. #if LIBCURL_VERSION_NUM >= 0x074200
  374. #define HAVE_CURL_MULTI_POLL
  375. #else
  376. #undef HAVE_CURL_MULTI_POLL
  377. #endif
  378. class CurlFetchThread : public Thread
  379. {
  380. protected:
  381. enum RequestType {
  382. RT_FETCH,
  383. RT_CLEAR,
  384. RT_WAKEUP,
  385. };
  386. struct Request {
  387. RequestType type;
  388. HTTPFetchRequest fetch_request;
  389. Event *event = nullptr;
  390. };
  391. CURLM *m_multi;
  392. MutexedQueue<Request> m_requests;
  393. size_t m_parallel_limit;
  394. // Variables exclusively used within thread
  395. std::vector<std::unique_ptr<HTTPFetchOngoing>> m_all_ongoing;
  396. std::list<HTTPFetchRequest> m_queued_fetches;
  397. public:
  398. CurlFetchThread(int parallel_limit) :
  399. Thread("CurlFetch")
  400. {
  401. if (parallel_limit >= 1)
  402. m_parallel_limit = parallel_limit;
  403. else
  404. m_parallel_limit = 1;
  405. }
  406. void requestFetch(const HTTPFetchRequest &fetch_request)
  407. {
  408. Request req;
  409. req.type = RT_FETCH;
  410. req.fetch_request = fetch_request;
  411. m_requests.push_back(std::move(req));
  412. }
  413. void requestClear(u64 caller, Event *event)
  414. {
  415. Request req;
  416. req.type = RT_CLEAR;
  417. req.fetch_request.caller = caller;
  418. req.event = event;
  419. m_requests.push_back(std::move(req));
  420. }
  421. void requestWakeUp()
  422. {
  423. Request req;
  424. req.type = RT_WAKEUP;
  425. m_requests.push_back(std::move(req));
  426. }
  427. protected:
  428. // Handle a request from some other thread
  429. // E.g. new fetch; clear fetches for one caller; wake up
  430. void processRequest(Request &req)
  431. {
  432. if (req.type == RT_FETCH) {
  433. // New fetch, queue until there are less
  434. // than m_parallel_limit ongoing fetches
  435. m_queued_fetches.push_back(std::move(req.fetch_request));
  436. // see processQueued() for what happens next
  437. } else if (req.type == RT_CLEAR) {
  438. u64 caller = req.fetch_request.caller;
  439. // Abort all ongoing fetches for the caller
  440. for (auto it = m_all_ongoing.begin(); it != m_all_ongoing.end();) {
  441. if ((*it)->getRequest().caller == caller) {
  442. it = m_all_ongoing.erase(it);
  443. } else {
  444. ++it;
  445. }
  446. }
  447. // Also abort all queued fetches for the caller
  448. for (auto it = m_queued_fetches.begin();
  449. it != m_queued_fetches.end();) {
  450. if ((*it).caller == caller)
  451. it = m_queued_fetches.erase(it);
  452. else
  453. ++it;
  454. }
  455. } else if (req.type == RT_WAKEUP) {
  456. // Wakeup: Nothing to do, thread is awake at this point
  457. }
  458. if (req.event)
  459. req.event->signal();
  460. }
  461. // Start new ongoing fetches if m_parallel_limit allows
  462. void processQueued(CurlHandlePool *pool)
  463. {
  464. while (m_all_ongoing.size() < m_parallel_limit &&
  465. !m_queued_fetches.empty()) {
  466. HTTPFetchRequest request = std::move(m_queued_fetches.front());
  467. m_queued_fetches.pop_front();
  468. // Create ongoing fetch data and make a cURL handle
  469. // Set cURL options based on HTTPFetchRequest
  470. auto ongoing = std::make_unique<HTTPFetchOngoing>(request, pool);
  471. // Initiate the connection (curl_multi_add_handle)
  472. CURLcode res = ongoing->start(m_multi);
  473. if (res == CURLE_OK) {
  474. m_all_ongoing.push_back(std::move(ongoing));
  475. } else {
  476. httpfetch_deliver_result(*ongoing->complete(res));
  477. }
  478. }
  479. }
  480. // Process CURLMsg (indicates completion of a fetch)
  481. void processCurlMessage(CURLMsg *msg)
  482. {
  483. if (msg->msg != CURLMSG_DONE)
  484. return;
  485. // Determine which ongoing fetch the message pertains to
  486. for (auto it = m_all_ongoing.begin(); it != m_all_ongoing.end(); ++it) {
  487. auto &ongoing = **it;
  488. if (ongoing.getEasyHandle() != msg->easy_handle)
  489. continue;
  490. httpfetch_deliver_result(*ongoing.complete(msg->data.result));
  491. m_all_ongoing.erase(it);
  492. return;
  493. }
  494. }
  495. // Wait for a request from another thread, or timeout elapses
  496. void waitForRequest(long timeout)
  497. {
  498. if (m_queued_fetches.empty()) {
  499. try {
  500. Request req = m_requests.pop_front(timeout);
  501. processRequest(req);
  502. }
  503. catch (ItemNotFoundException &e) {}
  504. }
  505. }
  506. // Wait until some IO happens, or timeout elapses
  507. void waitForIO(long timeout)
  508. {
  509. CURLMcode mres;
  510. #ifdef HAVE_CURL_MULTI_POLL
  511. mres = curl_multi_poll(m_multi, nullptr, 0, timeout, nullptr);
  512. if (mres != CURLM_OK) {
  513. errorstream << "curl_multi_poll returned error code "
  514. << mres << std::endl;
  515. }
  516. #else
  517. // If there's nothing to do curl_multi_wait() will immediately return
  518. // so we have to emulate the sleeping.
  519. fd_set dummy;
  520. int max_fd;
  521. mres = curl_multi_fdset(m_multi, &dummy, &dummy, &dummy, &max_fd);
  522. if (mres != CURLM_OK) {
  523. errorstream << "curl_multi_fdset returned error code "
  524. << mres << std::endl;
  525. max_fd = -1;
  526. }
  527. if (max_fd == -1) { // curl has nothing to wait for
  528. if (timeout > 0)
  529. sleep_ms(timeout);
  530. } else {
  531. mres = curl_multi_wait(m_multi, nullptr, 0, timeout, nullptr);
  532. if (mres != CURLM_OK) {
  533. errorstream << "curl_multi_wait returned error code "
  534. << mres << std::endl;
  535. }
  536. }
  537. #endif
  538. }
  539. void *run()
  540. {
  541. CurlHandlePool pool;
  542. m_multi = curl_multi_init();
  543. FATAL_ERROR_IF(!m_multi, "curl_multi_init returned NULL");
  544. FATAL_ERROR_IF(!m_all_ongoing.empty(), "Expected empty");
  545. while (!stopRequested()) {
  546. BEGIN_DEBUG_EXCEPTION_HANDLER
  547. /*
  548. Handle new async requests
  549. */
  550. while (!m_requests.empty()) {
  551. Request req = m_requests.pop_frontNoEx();
  552. processRequest(req);
  553. }
  554. processQueued(&pool);
  555. /*
  556. Handle ongoing async requests
  557. */
  558. int still_ongoing = 0;
  559. while (curl_multi_perform(m_multi, &still_ongoing) ==
  560. CURLM_CALL_MULTI_PERFORM)
  561. /* noop */;
  562. /*
  563. Handle completed async requests
  564. */
  565. if (still_ongoing < (int) m_all_ongoing.size()) {
  566. CURLMsg *msg;
  567. int msgs_in_queue;
  568. msg = curl_multi_info_read(m_multi, &msgs_in_queue);
  569. while (msg != NULL) {
  570. processCurlMessage(msg);
  571. msg = curl_multi_info_read(m_multi, &msgs_in_queue);
  572. }
  573. }
  574. /*
  575. If there are ongoing requests, wait for data
  576. (with a timeout of 100ms so that new requests
  577. can be processed).
  578. If no ongoing requests, wait for a new request.
  579. (Possibly an empty request that signals
  580. that the thread should be stopped.)
  581. */
  582. if (m_all_ongoing.empty())
  583. waitForRequest(100000000);
  584. else
  585. waitForIO(100);
  586. END_DEBUG_EXCEPTION_HANDLER
  587. }
  588. // Call curl_multi_remove_handle and cleanup easy handles
  589. m_all_ongoing.clear();
  590. m_queued_fetches.clear();
  591. CURLMcode mres = curl_multi_cleanup(m_multi);
  592. if (mres != CURLM_OK) {
  593. errorstream<<"curl_multi_cleanup"
  594. <<" returned error code "<<mres
  595. <<std::endl;
  596. }
  597. return NULL;
  598. }
  599. };
  600. static std::unique_ptr<CurlFetchThread> g_httpfetch_thread;
  601. void httpfetch_init(int parallel_limit)
  602. {
  603. FATAL_ERROR_IF(g_httpfetch_thread, "httpfetch_init called twice");
  604. verbosestream<<"httpfetch_init: parallel_limit="<<parallel_limit
  605. <<std::endl;
  606. CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT);
  607. FATAL_ERROR_IF(res != CURLE_OK, "cURL init failed");
  608. g_httpfetch_thread = std::make_unique<CurlFetchThread>(parallel_limit);
  609. // Initialize g_callerid_randomness for httpfetch_caller_alloc_secure
  610. u64 randbuf[2];
  611. porting::secure_rand_fill_buf(randbuf, sizeof(u64) * 2);
  612. g_callerid_randomness = PcgRandom(randbuf[0], randbuf[1]);
  613. }
  614. void httpfetch_cleanup()
  615. {
  616. verbosestream<<"httpfetch_cleanup: cleaning up"<<std::endl;
  617. if (g_httpfetch_thread) {
  618. g_httpfetch_thread->stop();
  619. g_httpfetch_thread->requestWakeUp();
  620. g_httpfetch_thread->wait();
  621. g_httpfetch_thread.reset();
  622. }
  623. curl_global_cleanup();
  624. }
  625. void httpfetch_async(const HTTPFetchRequest &fetch_request)
  626. {
  627. g_httpfetch_thread->requestFetch(fetch_request);
  628. if (!g_httpfetch_thread->isRunning())
  629. g_httpfetch_thread->start();
  630. }
  631. static void httpfetch_request_clear(u64 caller)
  632. {
  633. if (g_httpfetch_thread->isRunning()) {
  634. Event event;
  635. g_httpfetch_thread->requestClear(caller, &event);
  636. event.wait();
  637. } else {
  638. g_httpfetch_thread->requestClear(caller, nullptr);
  639. }
  640. }
  641. static void httpfetch_sync(const HTTPFetchRequest &fetch_request,
  642. HTTPFetchResult &fetch_result)
  643. {
  644. // Create ongoing fetch data and make a cURL handle
  645. // Set cURL options based on HTTPFetchRequest
  646. CurlHandlePool pool;
  647. HTTPFetchOngoing ongoing(fetch_request, &pool);
  648. // Do the fetch (curl_easy_perform)
  649. CURLcode res = ongoing.start(nullptr);
  650. // Update fetch result
  651. fetch_result = *ongoing.complete(res);
  652. }
  653. bool httpfetch_sync_interruptible(const HTTPFetchRequest &fetch_request,
  654. HTTPFetchResult &fetch_result, long interval)
  655. {
  656. if (Thread *thread = Thread::getCurrentThread()) {
  657. HTTPFetchRequest req = fetch_request;
  658. req.caller = httpfetch_caller_alloc_secure();
  659. httpfetch_async(req);
  660. do {
  661. if (thread->stopRequested()) {
  662. httpfetch_caller_free(req.caller);
  663. fetch_result = HTTPFetchResult(fetch_request);
  664. return false;
  665. }
  666. sleep_ms(interval);
  667. } while (!httpfetch_async_get(req.caller, fetch_result));
  668. httpfetch_caller_free(req.caller);
  669. } else {
  670. httpfetch_sync(fetch_request, fetch_result);
  671. }
  672. return true;
  673. }
  674. #else // USE_CURL
  675. /*
  676. USE_CURL is off:
  677. Dummy httpfetch implementation that always returns an error.
  678. */
  679. void httpfetch_init(int parallel_limit)
  680. {
  681. }
  682. void httpfetch_cleanup()
  683. {
  684. }
  685. void httpfetch_async(const HTTPFetchRequest &fetch_request)
  686. {
  687. errorstream << "httpfetch_async: unable to fetch " << fetch_request.url
  688. << " because USE_CURL=0" << std::endl;
  689. HTTPFetchResult fetch_result(fetch_request); // sets succeeded = false etc.
  690. httpfetch_deliver_result(fetch_result);
  691. }
  692. static void httpfetch_request_clear(u64 caller)
  693. {
  694. }
  695. bool httpfetch_sync_interruptible(const HTTPFetchRequest &fetch_request,
  696. HTTPFetchResult &fetch_result, long interval)
  697. {
  698. errorstream << "httpfetch_sync_interruptible: unable to fetch " << fetch_request.url
  699. << " because USE_CURL=0" << std::endl;
  700. fetch_result = HTTPFetchResult(fetch_request); // sets succeeded = false etc.
  701. return false;
  702. }
  703. #endif // USE_CURL