httpfetch.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826
  1. /*
  2. Minetest
  3. Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
  4. This program is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU Lesser General Public License as published by
  6. the Free Software Foundation; either version 2.1 of the License, or
  7. (at your option) any later version.
  8. This program is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. GNU Lesser General Public License for more details.
  12. You should have received a copy of the GNU Lesser General Public License along
  13. with this program; if not, write to the Free Software Foundation, Inc.,
  14. 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  15. */
  16. #include "httpfetch.h"
  17. #include "porting.h" // for sleep_ms(), get_sysinfo(), secure_rand_fill_buf()
  18. #include <iostream>
  19. #include <sstream>
  20. #include <list>
  21. #include <map>
  22. #include <cerrno>
  23. #include <mutex>
  24. #include "network/socket.h" // for select()
  25. #include "threading/event.h"
  26. #include "config.h"
  27. #include "exceptions.h"
  28. #include "debug.h"
  29. #include "log.h"
  30. #include "util/container.h"
  31. #include "util/thread.h"
  32. #include "version.h"
  33. #include "settings.h"
  34. #include "noise.h"
  35. std::mutex g_httpfetch_mutex;
  36. std::map<unsigned long, std::queue<HTTPFetchResult> > g_httpfetch_results;
  37. PcgRandom g_callerid_randomness;
  38. HTTPFetchRequest::HTTPFetchRequest() :
  39. timeout(g_settings->getS32("curl_timeout")),
  40. connect_timeout(timeout),
  41. useragent(std::string(PROJECT_NAME_C "/") + g_version_hash + " (" + porting::get_sysinfo() + ")")
  42. {
  43. }
  44. static void httpfetch_deliver_result(const HTTPFetchResult &fetch_result)
  45. {
  46. unsigned long caller = fetch_result.caller;
  47. if (caller != HTTPFETCH_DISCARD) {
  48. MutexAutoLock lock(g_httpfetch_mutex);
  49. g_httpfetch_results[caller].push(fetch_result);
  50. }
  51. }
  52. static void httpfetch_request_clear(unsigned long caller);
  53. unsigned long httpfetch_caller_alloc()
  54. {
  55. MutexAutoLock lock(g_httpfetch_mutex);
  56. // Check each caller ID except HTTPFETCH_DISCARD
  57. const unsigned long discard = HTTPFETCH_DISCARD;
  58. for (unsigned long caller = discard + 1; caller != discard; ++caller) {
  59. std::map<unsigned long, std::queue<HTTPFetchResult> >::iterator
  60. it = g_httpfetch_results.find(caller);
  61. if (it == g_httpfetch_results.end()) {
  62. verbosestream << "httpfetch_caller_alloc: allocating "
  63. << caller << std::endl;
  64. // Access element to create it
  65. g_httpfetch_results[caller];
  66. return caller;
  67. }
  68. }
  69. FATAL_ERROR("httpfetch_caller_alloc: ran out of caller IDs");
  70. return discard;
  71. }
  72. unsigned long httpfetch_caller_alloc_secure()
  73. {
  74. MutexAutoLock lock(g_httpfetch_mutex);
  75. // Generate random caller IDs and make sure they're not
  76. // already used or equal to HTTPFETCH_DISCARD
  77. // Give up after 100 tries to prevent infinite loop
  78. u8 tries = 100;
  79. unsigned long caller;
  80. do {
  81. caller = (((u64) g_callerid_randomness.next()) << 32) |
  82. g_callerid_randomness.next();
  83. if (--tries < 1) {
  84. FATAL_ERROR("httpfetch_caller_alloc_secure: ran out of caller IDs");
  85. return HTTPFETCH_DISCARD;
  86. }
  87. } while (g_httpfetch_results.find(caller) != g_httpfetch_results.end());
  88. verbosestream << "httpfetch_caller_alloc_secure: allocating "
  89. << caller << std::endl;
  90. // Access element to create it
  91. g_httpfetch_results[caller];
  92. return caller;
  93. }
  94. void httpfetch_caller_free(unsigned long caller)
  95. {
  96. verbosestream<<"httpfetch_caller_free: freeing "
  97. <<caller<<std::endl;
  98. httpfetch_request_clear(caller);
  99. if (caller != HTTPFETCH_DISCARD) {
  100. MutexAutoLock lock(g_httpfetch_mutex);
  101. g_httpfetch_results.erase(caller);
  102. }
  103. }
  104. bool httpfetch_async_get(unsigned long caller, HTTPFetchResult &fetch_result)
  105. {
  106. MutexAutoLock lock(g_httpfetch_mutex);
  107. // Check that caller exists
  108. std::map<unsigned long, std::queue<HTTPFetchResult> >::iterator
  109. it = g_httpfetch_results.find(caller);
  110. if (it == g_httpfetch_results.end())
  111. return false;
  112. // Check that result queue is nonempty
  113. std::queue<HTTPFetchResult> &caller_results = it->second;
  114. if (caller_results.empty())
  115. return false;
  116. // Pop first result
  117. fetch_result = caller_results.front();
  118. caller_results.pop();
  119. return true;
  120. }
  121. #if USE_CURL
  122. #include <curl/curl.h>
  123. /*
  124. USE_CURL is on: use cURL based httpfetch implementation
  125. */
  126. static size_t httpfetch_writefunction(
  127. char *ptr, size_t size, size_t nmemb, void *userdata)
  128. {
  129. std::ostringstream *stream = (std::ostringstream*)userdata;
  130. size_t count = size * nmemb;
  131. stream->write(ptr, count);
  132. return count;
  133. }
  134. static size_t httpfetch_discardfunction(
  135. char *ptr, size_t size, size_t nmemb, void *userdata)
  136. {
  137. return size * nmemb;
  138. }
  139. class CurlHandlePool
  140. {
  141. std::list<CURL*> handles;
  142. public:
  143. CurlHandlePool() = default;
  144. ~CurlHandlePool()
  145. {
  146. for (std::list<CURL*>::iterator it = handles.begin();
  147. it != handles.end(); ++it) {
  148. curl_easy_cleanup(*it);
  149. }
  150. }
  151. CURL * alloc()
  152. {
  153. CURL *curl;
  154. if (handles.empty()) {
  155. curl = curl_easy_init();
  156. if (curl == NULL) {
  157. errorstream<<"curl_easy_init returned NULL"<<std::endl;
  158. }
  159. }
  160. else {
  161. curl = handles.front();
  162. handles.pop_front();
  163. }
  164. return curl;
  165. }
  166. void free(CURL *handle)
  167. {
  168. if (handle)
  169. handles.push_back(handle);
  170. }
  171. };
  172. class HTTPFetchOngoing
  173. {
  174. public:
  175. HTTPFetchOngoing(const HTTPFetchRequest &request, CurlHandlePool *pool);
  176. ~HTTPFetchOngoing();
  177. CURLcode start(CURLM *multi);
  178. const HTTPFetchResult * complete(CURLcode res);
  179. const HTTPFetchRequest &getRequest() const { return request; };
  180. const CURL *getEasyHandle() const { return curl; };
  181. private:
  182. CurlHandlePool *pool;
  183. CURL *curl;
  184. CURLM *multi;
  185. HTTPFetchRequest request;
  186. HTTPFetchResult result;
  187. std::ostringstream oss;
  188. struct curl_slist *http_header;
  189. curl_httppost *post;
  190. };
  191. HTTPFetchOngoing::HTTPFetchOngoing(const HTTPFetchRequest &request_,
  192. CurlHandlePool *pool_):
  193. pool(pool_),
  194. curl(NULL),
  195. multi(NULL),
  196. request(request_),
  197. result(request_),
  198. oss(std::ios::binary),
  199. http_header(NULL),
  200. post(NULL)
  201. {
  202. curl = pool->alloc();
  203. if (curl == NULL) {
  204. return;
  205. }
  206. // Set static cURL options
  207. curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
  208. curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1);
  209. curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
  210. curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 3);
  211. curl_easy_setopt(curl, CURLOPT_ENCODING, "gzip");
  212. std::string bind_address = g_settings->get("bind_address");
  213. if (!bind_address.empty()) {
  214. curl_easy_setopt(curl, CURLOPT_INTERFACE, bind_address.c_str());
  215. }
  216. #if LIBCURL_VERSION_NUM >= 0x071304
  217. // Restrict protocols so that curl vulnerabilities in
  218. // other protocols don't affect us.
  219. // These settings were introduced in curl 7.19.4.
  220. long protocols =
  221. CURLPROTO_HTTP |
  222. CURLPROTO_HTTPS |
  223. CURLPROTO_FTP |
  224. CURLPROTO_FTPS;
  225. curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
  226. curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
  227. #endif
  228. // Set cURL options based on HTTPFetchRequest
  229. curl_easy_setopt(curl, CURLOPT_URL,
  230. request.url.c_str());
  231. curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS,
  232. request.timeout);
  233. curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS,
  234. request.connect_timeout);
  235. if (!request.useragent.empty())
  236. curl_easy_setopt(curl, CURLOPT_USERAGENT, request.useragent.c_str());
  237. // Set up a write callback that writes to the
  238. // ostringstream ongoing->oss, unless the data
  239. // is to be discarded
  240. if (request.caller == HTTPFETCH_DISCARD) {
  241. curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
  242. httpfetch_discardfunction);
  243. curl_easy_setopt(curl, CURLOPT_WRITEDATA, NULL);
  244. } else {
  245. curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
  246. httpfetch_writefunction);
  247. curl_easy_setopt(curl, CURLOPT_WRITEDATA, &oss);
  248. }
  249. // Set POST (or GET) data
  250. if (request.post_fields.empty() && request.post_data.empty()) {
  251. curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
  252. } else if (request.multipart) {
  253. curl_httppost *last = NULL;
  254. for (StringMap::iterator it = request.post_fields.begin();
  255. it != request.post_fields.end(); ++it) {
  256. curl_formadd(&post, &last,
  257. CURLFORM_NAMELENGTH, it->first.size(),
  258. CURLFORM_PTRNAME, it->first.c_str(),
  259. CURLFORM_CONTENTSLENGTH, it->second.size(),
  260. CURLFORM_PTRCONTENTS, it->second.c_str(),
  261. CURLFORM_END);
  262. }
  263. curl_easy_setopt(curl, CURLOPT_HTTPPOST, post);
  264. // request.post_fields must now *never* be
  265. // modified until CURLOPT_HTTPPOST is cleared
  266. } else if (request.post_data.empty()) {
  267. curl_easy_setopt(curl, CURLOPT_POST, 1);
  268. std::string str;
  269. for (auto &post_field : request.post_fields) {
  270. if (!str.empty())
  271. str += "&";
  272. str += urlencode(post_field.first);
  273. str += "=";
  274. str += urlencode(post_field.second);
  275. }
  276. curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE,
  277. str.size());
  278. curl_easy_setopt(curl, CURLOPT_COPYPOSTFIELDS,
  279. str.c_str());
  280. } else {
  281. curl_easy_setopt(curl, CURLOPT_POST, 1);
  282. curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE,
  283. request.post_data.size());
  284. curl_easy_setopt(curl, CURLOPT_POSTFIELDS,
  285. request.post_data.c_str());
  286. // request.post_data must now *never* be
  287. // modified until CURLOPT_POSTFIELDS is cleared
  288. }
  289. // Set additional HTTP headers
  290. for (const std::string &extra_header : request.extra_headers) {
  291. http_header = curl_slist_append(http_header, extra_header.c_str());
  292. }
  293. curl_easy_setopt(curl, CURLOPT_HTTPHEADER, http_header);
  294. if (!g_settings->getBool("curl_verify_cert")) {
  295. curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, false);
  296. }
  297. }
  298. CURLcode HTTPFetchOngoing::start(CURLM *multi_)
  299. {
  300. if (!curl)
  301. return CURLE_FAILED_INIT;
  302. if (!multi_) {
  303. // Easy interface (sync)
  304. return curl_easy_perform(curl);
  305. }
  306. // Multi interface (async)
  307. CURLMcode mres = curl_multi_add_handle(multi_, curl);
  308. if (mres != CURLM_OK) {
  309. errorstream << "curl_multi_add_handle"
  310. << " returned error code " << mres
  311. << std::endl;
  312. return CURLE_FAILED_INIT;
  313. }
  314. multi = multi_; // store for curl_multi_remove_handle
  315. return CURLE_OK;
  316. }
  317. const HTTPFetchResult * HTTPFetchOngoing::complete(CURLcode res)
  318. {
  319. result.succeeded = (res == CURLE_OK);
  320. result.timeout = (res == CURLE_OPERATION_TIMEDOUT);
  321. result.data = oss.str();
  322. // Get HTTP/FTP response code
  323. result.response_code = 0;
  324. if (curl && (curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE,
  325. &result.response_code) != CURLE_OK)) {
  326. // We failed to get a return code, make sure it is still 0
  327. result.response_code = 0;
  328. }
  329. if (res != CURLE_OK) {
  330. errorstream << request.url << " not found ("
  331. << curl_easy_strerror(res) << ")"
  332. << " (response code " << result.response_code << ")"
  333. << std::endl;
  334. }
  335. return &result;
  336. }
  337. HTTPFetchOngoing::~HTTPFetchOngoing()
  338. {
  339. if (multi) {
  340. CURLMcode mres = curl_multi_remove_handle(multi, curl);
  341. if (mres != CURLM_OK) {
  342. errorstream << "curl_multi_remove_handle"
  343. << " returned error code " << mres
  344. << std::endl;
  345. }
  346. }
  347. // Set safe options for the reusable cURL handle
  348. curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
  349. httpfetch_discardfunction);
  350. curl_easy_setopt(curl, CURLOPT_WRITEDATA, NULL);
  351. curl_easy_setopt(curl, CURLOPT_POSTFIELDS, NULL);
  352. if (http_header) {
  353. curl_easy_setopt(curl, CURLOPT_HTTPHEADER, NULL);
  354. curl_slist_free_all(http_header);
  355. }
  356. if (post) {
  357. curl_easy_setopt(curl, CURLOPT_HTTPPOST, NULL);
  358. curl_formfree(post);
  359. }
  360. // Store the cURL handle for reuse
  361. pool->free(curl);
  362. }
  363. class CurlFetchThread : public Thread
  364. {
  365. protected:
  366. enum RequestType {
  367. RT_FETCH,
  368. RT_CLEAR,
  369. RT_WAKEUP,
  370. };
  371. struct Request {
  372. RequestType type;
  373. HTTPFetchRequest fetch_request;
  374. Event *event;
  375. };
  376. CURLM *m_multi;
  377. MutexedQueue<Request> m_requests;
  378. size_t m_parallel_limit;
  379. // Variables exclusively used within thread
  380. std::vector<HTTPFetchOngoing*> m_all_ongoing;
  381. std::list<HTTPFetchRequest> m_queued_fetches;
  382. public:
  383. CurlFetchThread(int parallel_limit) :
  384. Thread("CurlFetch")
  385. {
  386. if (parallel_limit >= 1)
  387. m_parallel_limit = parallel_limit;
  388. else
  389. m_parallel_limit = 1;
  390. }
  391. void requestFetch(const HTTPFetchRequest &fetch_request)
  392. {
  393. Request req;
  394. req.type = RT_FETCH;
  395. req.fetch_request = fetch_request;
  396. req.event = NULL;
  397. m_requests.push_back(req);
  398. }
  399. void requestClear(unsigned long caller, Event *event)
  400. {
  401. Request req;
  402. req.type = RT_CLEAR;
  403. req.fetch_request.caller = caller;
  404. req.event = event;
  405. m_requests.push_back(req);
  406. }
  407. void requestWakeUp()
  408. {
  409. Request req;
  410. req.type = RT_WAKEUP;
  411. req.event = NULL;
  412. m_requests.push_back(req);
  413. }
  414. protected:
  415. // Handle a request from some other thread
  416. // E.g. new fetch; clear fetches for one caller; wake up
  417. void processRequest(const Request &req)
  418. {
  419. if (req.type == RT_FETCH) {
  420. // New fetch, queue until there are less
  421. // than m_parallel_limit ongoing fetches
  422. m_queued_fetches.push_back(req.fetch_request);
  423. // see processQueued() for what happens next
  424. }
  425. else if (req.type == RT_CLEAR) {
  426. unsigned long caller = req.fetch_request.caller;
  427. // Abort all ongoing fetches for the caller
  428. for (std::vector<HTTPFetchOngoing*>::iterator
  429. it = m_all_ongoing.begin();
  430. it != m_all_ongoing.end();) {
  431. if ((*it)->getRequest().caller == caller) {
  432. delete (*it);
  433. it = m_all_ongoing.erase(it);
  434. } else {
  435. ++it;
  436. }
  437. }
  438. // Also abort all queued fetches for the caller
  439. for (std::list<HTTPFetchRequest>::iterator
  440. it = m_queued_fetches.begin();
  441. it != m_queued_fetches.end();) {
  442. if ((*it).caller == caller)
  443. it = m_queued_fetches.erase(it);
  444. else
  445. ++it;
  446. }
  447. }
  448. else if (req.type == RT_WAKEUP) {
  449. // Wakeup: Nothing to do, thread is awake at this point
  450. }
  451. if (req.event != NULL)
  452. req.event->signal();
  453. }
  454. // Start new ongoing fetches if m_parallel_limit allows
  455. void processQueued(CurlHandlePool *pool)
  456. {
  457. while (m_all_ongoing.size() < m_parallel_limit &&
  458. !m_queued_fetches.empty()) {
  459. HTTPFetchRequest request = m_queued_fetches.front();
  460. m_queued_fetches.pop_front();
  461. // Create ongoing fetch data and make a cURL handle
  462. // Set cURL options based on HTTPFetchRequest
  463. HTTPFetchOngoing *ongoing =
  464. new HTTPFetchOngoing(request, pool);
  465. // Initiate the connection (curl_multi_add_handle)
  466. CURLcode res = ongoing->start(m_multi);
  467. if (res == CURLE_OK) {
  468. m_all_ongoing.push_back(ongoing);
  469. }
  470. else {
  471. httpfetch_deliver_result(*ongoing->complete(res));
  472. delete ongoing;
  473. }
  474. }
  475. }
  476. // Process CURLMsg (indicates completion of a fetch)
  477. void processCurlMessage(CURLMsg *msg)
  478. {
  479. // Determine which ongoing fetch the message pertains to
  480. size_t i = 0;
  481. bool found = false;
  482. for (i = 0; i < m_all_ongoing.size(); ++i) {
  483. if (m_all_ongoing[i]->getEasyHandle() == msg->easy_handle) {
  484. found = true;
  485. break;
  486. }
  487. }
  488. if (msg->msg == CURLMSG_DONE && found) {
  489. // m_all_ongoing[i] succeeded or failed.
  490. HTTPFetchOngoing *ongoing = m_all_ongoing[i];
  491. httpfetch_deliver_result(*ongoing->complete(msg->data.result));
  492. delete ongoing;
  493. m_all_ongoing.erase(m_all_ongoing.begin() + i);
  494. }
  495. }
  496. // Wait for a request from another thread, or timeout elapses
  497. void waitForRequest(long timeout)
  498. {
  499. if (m_queued_fetches.empty()) {
  500. try {
  501. Request req = m_requests.pop_front(timeout);
  502. processRequest(req);
  503. }
  504. catch (ItemNotFoundException &e) {}
  505. }
  506. }
  507. // Wait until some IO happens, or timeout elapses
  508. void waitForIO(long timeout)
  509. {
  510. fd_set read_fd_set;
  511. fd_set write_fd_set;
  512. fd_set exc_fd_set;
  513. int max_fd;
  514. long select_timeout = -1;
  515. struct timeval select_tv;
  516. CURLMcode mres;
  517. FD_ZERO(&read_fd_set);
  518. FD_ZERO(&write_fd_set);
  519. FD_ZERO(&exc_fd_set);
  520. mres = curl_multi_fdset(m_multi, &read_fd_set,
  521. &write_fd_set, &exc_fd_set, &max_fd);
  522. if (mres != CURLM_OK) {
  523. errorstream<<"curl_multi_fdset"
  524. <<" returned error code "<<mres
  525. <<std::endl;
  526. select_timeout = 0;
  527. }
  528. mres = curl_multi_timeout(m_multi, &select_timeout);
  529. if (mres != CURLM_OK) {
  530. errorstream<<"curl_multi_timeout"
  531. <<" returned error code "<<mres
  532. <<std::endl;
  533. select_timeout = 0;
  534. }
  535. // Limit timeout so new requests get through
  536. if (select_timeout < 0 || select_timeout > timeout)
  537. select_timeout = timeout;
  538. if (select_timeout > 0) {
  539. // in Winsock it is forbidden to pass three empty
  540. // fd_sets to select(), so in that case use sleep_ms
  541. if (max_fd != -1) {
  542. select_tv.tv_sec = select_timeout / 1000;
  543. select_tv.tv_usec = (select_timeout % 1000) * 1000;
  544. int retval = select(max_fd + 1, &read_fd_set,
  545. &write_fd_set, &exc_fd_set,
  546. &select_tv);
  547. if (retval == -1) {
  548. #ifdef _WIN32
  549. errorstream<<"select returned error code "
  550. <<WSAGetLastError()<<std::endl;
  551. #else
  552. errorstream<<"select returned error code "
  553. <<errno<<std::endl;
  554. #endif
  555. }
  556. }
  557. else {
  558. sleep_ms(select_timeout);
  559. }
  560. }
  561. }
  562. void *run()
  563. {
  564. CurlHandlePool pool;
  565. m_multi = curl_multi_init();
  566. if (m_multi == NULL) {
  567. errorstream<<"curl_multi_init returned NULL\n";
  568. return NULL;
  569. }
  570. FATAL_ERROR_IF(!m_all_ongoing.empty(), "Expected empty");
  571. while (!stopRequested()) {
  572. BEGIN_DEBUG_EXCEPTION_HANDLER
  573. /*
  574. Handle new async requests
  575. */
  576. while (!m_requests.empty()) {
  577. Request req = m_requests.pop_frontNoEx();
  578. processRequest(req);
  579. }
  580. processQueued(&pool);
  581. /*
  582. Handle ongoing async requests
  583. */
  584. int still_ongoing = 0;
  585. while (curl_multi_perform(m_multi, &still_ongoing) ==
  586. CURLM_CALL_MULTI_PERFORM)
  587. /* noop */;
  588. /*
  589. Handle completed async requests
  590. */
  591. if (still_ongoing < (int) m_all_ongoing.size()) {
  592. CURLMsg *msg;
  593. int msgs_in_queue;
  594. msg = curl_multi_info_read(m_multi, &msgs_in_queue);
  595. while (msg != NULL) {
  596. processCurlMessage(msg);
  597. msg = curl_multi_info_read(m_multi, &msgs_in_queue);
  598. }
  599. }
  600. /*
  601. If there are ongoing requests, wait for data
  602. (with a timeout of 100ms so that new requests
  603. can be processed).
  604. If no ongoing requests, wait for a new request.
  605. (Possibly an empty request that signals
  606. that the thread should be stopped.)
  607. */
  608. if (m_all_ongoing.empty())
  609. waitForRequest(100000000);
  610. else
  611. waitForIO(100);
  612. END_DEBUG_EXCEPTION_HANDLER
  613. }
  614. // Call curl_multi_remove_handle and cleanup easy handles
  615. for (HTTPFetchOngoing *i : m_all_ongoing) {
  616. delete i;
  617. }
  618. m_all_ongoing.clear();
  619. m_queued_fetches.clear();
  620. CURLMcode mres = curl_multi_cleanup(m_multi);
  621. if (mres != CURLM_OK) {
  622. errorstream<<"curl_multi_cleanup"
  623. <<" returned error code "<<mres
  624. <<std::endl;
  625. }
  626. return NULL;
  627. }
  628. };
  629. CurlFetchThread *g_httpfetch_thread = NULL;
  630. void httpfetch_init(int parallel_limit)
  631. {
  632. verbosestream<<"httpfetch_init: parallel_limit="<<parallel_limit
  633. <<std::endl;
  634. CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT);
  635. FATAL_ERROR_IF(res != CURLE_OK, "CURL init failed");
  636. g_httpfetch_thread = new CurlFetchThread(parallel_limit);
  637. // Initialize g_callerid_randomness for httpfetch_caller_alloc_secure
  638. u64 randbuf[2];
  639. porting::secure_rand_fill_buf(randbuf, sizeof(u64) * 2);
  640. g_callerid_randomness = PcgRandom(randbuf[0], randbuf[1]);
  641. }
  642. void httpfetch_cleanup()
  643. {
  644. verbosestream<<"httpfetch_cleanup: cleaning up"<<std::endl;
  645. g_httpfetch_thread->stop();
  646. g_httpfetch_thread->requestWakeUp();
  647. g_httpfetch_thread->wait();
  648. delete g_httpfetch_thread;
  649. curl_global_cleanup();
  650. }
  651. void httpfetch_async(const HTTPFetchRequest &fetch_request)
  652. {
  653. g_httpfetch_thread->requestFetch(fetch_request);
  654. if (!g_httpfetch_thread->isRunning())
  655. g_httpfetch_thread->start();
  656. }
  657. static void httpfetch_request_clear(unsigned long caller)
  658. {
  659. if (g_httpfetch_thread->isRunning()) {
  660. Event event;
  661. g_httpfetch_thread->requestClear(caller, &event);
  662. event.wait();
  663. } else {
  664. g_httpfetch_thread->requestClear(caller, NULL);
  665. }
  666. }
  667. void httpfetch_sync(const HTTPFetchRequest &fetch_request,
  668. HTTPFetchResult &fetch_result)
  669. {
  670. // Create ongoing fetch data and make a cURL handle
  671. // Set cURL options based on HTTPFetchRequest
  672. CurlHandlePool pool;
  673. HTTPFetchOngoing ongoing(fetch_request, &pool);
  674. // Do the fetch (curl_easy_perform)
  675. CURLcode res = ongoing.start(NULL);
  676. // Update fetch result
  677. fetch_result = *ongoing.complete(res);
  678. }
  679. #else // USE_CURL
  680. /*
  681. USE_CURL is off:
  682. Dummy httpfetch implementation that always returns an error.
  683. */
  684. void httpfetch_init(int parallel_limit)
  685. {
  686. }
  687. void httpfetch_cleanup()
  688. {
  689. }
  690. void httpfetch_async(const HTTPFetchRequest &fetch_request)
  691. {
  692. errorstream << "httpfetch_async: unable to fetch " << fetch_request.url
  693. << " because USE_CURL=0" << std::endl;
  694. HTTPFetchResult fetch_result(fetch_request); // sets succeeded = false etc.
  695. httpfetch_deliver_result(fetch_result);
  696. }
  697. static void httpfetch_request_clear(unsigned long caller)
  698. {
  699. }
  700. void httpfetch_sync(const HTTPFetchRequest &fetch_request,
  701. HTTPFetchResult &fetch_result)
  702. {
  703. errorstream << "httpfetch_sync: unable to fetch " << fetch_request.url
  704. << " because USE_CURL=0" << std::endl;
  705. fetch_result = HTTPFetchResult(fetch_request); // sets succeeded = false etc.
  706. }
  707. #endif // USE_CURL