control.cc 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426
  1. #include <algorithm>
  2. #include <unordered_set>
  3. #include <climits>
  4. #include "control-cmds.h"
  5. #include "dinit-env.h"
  6. #include "control.h"
  7. #include "service.h"
  8. #include "proc-service.h"
  9. #include "control-datatypes.h"
  10. // Server-side control protocol implementation. This implements the functionality that allows
  11. // clients (such as dinitctl) to query service state and issue commands to control services.
  12. // Control protocol versions:
  13. // 1 - dinit 0.16 and prior
  14. // 2 - dinit 0.17 (adds SETTRIGGER, CATLOG, SIGNAL)
  15. // 3 - (unreleased) (adds QUERYSERVICEDSCDIR)
  16. // common communication datatypes
  17. using namespace dinit_cptypes;
  18. namespace {
  19. // Control protocol minimum compatible version and current version:
  20. constexpr uint16_t min_compat_version = 1;
  21. constexpr uint16_t cp_version = 3;
  22. // check for value in a set
  23. template <typename T, int N, typename U>
  24. inline bool contains(const T (&v)[N], U i)
  25. {
  26. return std::find_if(std::begin(v), std::end(v),
  27. [=](T p){ return i == static_cast<U>(p); }) != std::end(v);
  28. }
  29. }
  30. bool control_conn_t::process_packet()
  31. {
  32. using std::string;
  33. // Note that where we call queue_packet, we must generally check the return value. If it
  34. // returns false it has either deleted the connection or marked it for deletion; we
  35. // shouldn't touch instance members after that point.
  36. cp_cmd pktType = (cp_cmd)rbuf[0];
  37. if (pktType == cp_cmd::QUERYVERSION) {
  38. // Responds with:
  39. // cp_rply::CVERSION, (2 byte) minimum compatible version, (2 byte) actual version
  40. char replyBuf[] = { (char)cp_rply::CPVERSION, 0, 0, 0, 0 };
  41. memcpy(replyBuf + 1, &min_compat_version, 2);
  42. memcpy(replyBuf + 3, &cp_version, 2);
  43. if (!queue_packet(replyBuf, sizeof(replyBuf))) return false;
  44. rbuf.consume(1);
  45. return true;
  46. }
  47. if (pktType == cp_cmd::FINDSERVICE || pktType == cp_cmd::LOADSERVICE) {
  48. return process_find_load(pktType);
  49. }
  50. if (pktType == cp_cmd::STARTSERVICE || pktType == cp_cmd::STOPSERVICE
  51. || pktType == cp_cmd::WAKESERVICE || pktType == cp_cmd::RELEASESERVICE) {
  52. return process_start_stop(pktType);
  53. }
  54. if (pktType == cp_cmd::UNPINSERVICE) {
  55. return process_unpin_service();
  56. }
  57. if (pktType == cp_cmd::UNLOADSERVICE) {
  58. return process_unload_service();
  59. }
  60. if (pktType == cp_cmd::RELOADSERVICE) {
  61. return process_reload_service();
  62. }
  63. if (pktType == cp_cmd::SHUTDOWN) {
  64. // Shutdown/reboot
  65. if (rbuf.get_length() < 2) {
  66. chklen = 2;
  67. return true;
  68. }
  69. if (contains({shutdown_type_t::REMAIN, shutdown_type_t::HALT,
  70. shutdown_type_t::POWEROFF, shutdown_type_t::REBOOT}, rbuf[1])) {
  71. auto sd_type = static_cast<shutdown_type_t>(rbuf[1]);
  72. services->stop_all_services(sd_type);
  73. char ackBuf[] = { (char)cp_rply::ACK };
  74. if (! queue_packet(ackBuf, 1)) return false;
  75. // Clear the packet from the buffer
  76. rbuf.consume(2);
  77. chklen = 0;
  78. return true;
  79. }
  80. // (otherwise fall through to below).
  81. }
  82. if (pktType == cp_cmd::LISTSERVICES) {
  83. return list_services();
  84. }
  85. if (pktType == cp_cmd::SERVICESTATUS) {
  86. return process_service_status();
  87. }
  88. if (pktType == cp_cmd::ADD_DEP) {
  89. return add_service_dep();
  90. }
  91. if (pktType == cp_cmd::REM_DEP) {
  92. return rm_service_dep();
  93. }
  94. if (pktType == cp_cmd::QUERY_LOAD_MECH) {
  95. return query_load_mech();
  96. }
  97. if (pktType == cp_cmd::ENABLESERVICE) {
  98. return add_service_dep(true);
  99. }
  100. if (pktType == cp_cmd::QUERYSERVICENAME) {
  101. return process_query_name();
  102. }
  103. if (pktType == cp_cmd::SETENV) {
  104. return process_setenv();
  105. }
  106. if (pktType == cp_cmd::SETTRIGGER) {
  107. return process_set_trigger();
  108. }
  109. if (pktType == cp_cmd::CATLOG) {
  110. return process_catlog();
  111. }
  112. if (pktType == cp_cmd::SIGNAL) {
  113. return process_signal();
  114. }
  115. if (pktType == cp_cmd::QUERYSERVICEDSCDIR) {
  116. return process_query_dsc_dir();
  117. }
  118. // Unrecognized: give error response
  119. char outbuf[] = { (char)cp_rply::BADREQ };
  120. if (!queue_packet(outbuf, 1)) return false;
  121. bad_conn_close = true;
  122. return true;
  123. }
  124. bool control_conn_t::process_find_load(cp_cmd pktType)
  125. {
  126. using std::string;
  127. constexpr int pkt_size = 4;
  128. if (rbuf.get_length() < pkt_size) {
  129. chklen = pkt_size;
  130. return true;
  131. }
  132. srvname_len_t srvname_len;
  133. rbuf.extract(&srvname_len, 1, sizeof(srvname_len));
  134. if (srvname_len <= 0 || srvname_len > (1024 - 3)) {
  135. // Queue error response / mark connection bad
  136. char badreqRep[] = { (char)cp_rply::BADREQ };
  137. if (! queue_packet(badreqRep, 1)) return false;
  138. bad_conn_close = true;
  139. return true;
  140. }
  141. chklen = srvname_len + 3; // packet type + (2 byte) length + service name
  142. if (rbuf.get_length() < chklen) {
  143. // packet not complete yet; read more
  144. return true;
  145. }
  146. service_record * record = nullptr;
  147. string service_name = rbuf.extract_string(3, srvname_len);
  148. // Clear the packet from the buffer
  149. rbuf.consume(chklen);
  150. chklen = 0;
  151. cp_rply fail_code = cp_rply::NOSERVICE;
  152. if (pktType == cp_cmd::LOADSERVICE) {
  153. // LOADSERVICE
  154. try {
  155. record = services->load_service(service_name.c_str());
  156. }
  157. catch (service_description_exc &sdexc) {
  158. log_service_load_failure(sdexc);
  159. fail_code = cp_rply::SERVICE_DESC_ERR;
  160. }
  161. catch (service_not_found &snf) {
  162. log(loglevel_t::ERROR, "Could not load service ", snf.service_name, ": ",
  163. snf.exc_description);
  164. // fail_code = cp_rply::NOSERVICE; (already set)
  165. }
  166. catch (service_load_exc &slexc) {
  167. log(loglevel_t::ERROR, "Could not load service ", slexc.service_name, ": ",
  168. slexc.exc_description);
  169. fail_code = cp_rply::SERVICE_LOAD_ERR;
  170. }
  171. }
  172. else {
  173. // FINDSERVICE
  174. record = services->find_service(service_name.c_str());
  175. }
  176. if (record == nullptr) {
  177. std::vector<char> rp_buf = { (char)fail_code };
  178. if (! queue_packet(std::move(rp_buf))) return false;
  179. return true;
  180. }
  181. // Allocate a service handle
  182. handle_t handle = allocate_service_handle(record);
  183. std::vector<char> rp_buf;
  184. rp_buf.reserve(7);
  185. rp_buf.push_back((char)cp_rply::SERVICERECORD);
  186. rp_buf.push_back(static_cast<char>(record->get_state()));
  187. for (int i = 0; i < (int) sizeof(handle); i++) {
  188. rp_buf.push_back(*(((char *) &handle) + i));
  189. }
  190. rp_buf.push_back(static_cast<char>(record->get_target_state()));
  191. if (! queue_packet(std::move(rp_buf))) return false;
  192. return true;
  193. }
  194. bool control_conn_t::check_dependents(service_record *service, bool &had_dependents)
  195. {
  196. std::vector<char> reply_pkt;
  197. size_t num_depts = 0;
  198. for (service_dep *dep : service->get_dependents()) {
  199. if (dep->dep_type == dependency_type::REGULAR && dep->holding_acq) {
  200. num_depts++;
  201. // find or allocate a service handle
  202. handle_t dept_handle = allocate_service_handle(dep->get_from());
  203. if (reply_pkt.empty()) {
  204. // packet type, size
  205. reply_pkt.reserve(1 + sizeof(size_t) + sizeof(handle_t));
  206. reply_pkt.resize(1 + sizeof(size_t));
  207. reply_pkt[0] = (char)cp_rply::DEPENDENTS;
  208. }
  209. auto old_size = reply_pkt.size();
  210. reply_pkt.resize(old_size + sizeof(handle_t));
  211. memcpy(reply_pkt.data() + old_size, &dept_handle, sizeof(dept_handle));
  212. }
  213. }
  214. if (num_depts != 0) {
  215. // There are affected dependents
  216. had_dependents = true;
  217. memcpy(reply_pkt.data() + 1, &num_depts, sizeof(num_depts));
  218. return queue_packet(std::move(reply_pkt));
  219. }
  220. had_dependents = false;
  221. return true;
  222. }
  223. bool control_conn_t::process_start_stop(cp_cmd pktType)
  224. {
  225. using std::string;
  226. constexpr int pkt_size = 2 + sizeof(handle_t);
  227. if (rbuf.get_length() < pkt_size) {
  228. chklen = pkt_size;
  229. return true;
  230. }
  231. // 1 byte: packet type
  232. // 1 byte: flags eg. pin in requested state (0 = no pin, 1 = pin)
  233. // 4 bytes: service handle
  234. bool do_pin = ((rbuf[1] & 1) == 1);
  235. handle_t handle;
  236. rbuf.extract((char *) &handle, 2, sizeof(handle));
  237. service_record *service = find_service_for_key(handle);
  238. if (service == nullptr) {
  239. // Service handle is bad
  240. char badreqRep[] = { (char)cp_rply::BADREQ };
  241. if (! queue_packet(badreqRep, 1)) return false;
  242. bad_conn_close = true;
  243. return true;
  244. }
  245. else {
  246. char ack_buf[1] = { (char)cp_rply::ACK };
  247. switch (pktType) {
  248. case cp_cmd::STARTSERVICE:
  249. // start service, mark as required
  250. if (services->is_shutting_down()) {
  251. ack_buf[0] = (char)cp_rply::SHUTTINGDOWN;
  252. break;
  253. }
  254. if ((service->get_state() == service_state_t::STOPPED
  255. || service->get_state() == service_state_t::STOPPING)
  256. && service->is_stop_pinned()) {
  257. ack_buf[0] = (char)cp_rply::PINNEDSTOPPED;
  258. break;
  259. }
  260. if (do_pin) service->pin_start();
  261. service->start();
  262. services->process_queues();
  263. if (service->get_state() == service_state_t::STARTED) ack_buf[0] = (char)cp_rply::ALREADYSS;
  264. break;
  265. case cp_cmd::STOPSERVICE:
  266. {
  267. // force service to stop
  268. bool do_restart = ((rbuf[1] & 4) == 4);
  269. bool gentle = ((rbuf[1] & 2) == 2);
  270. if (do_restart && services->is_shutting_down()) {
  271. ack_buf[0] = (char)cp_rply::SHUTTINGDOWN;
  272. break;
  273. }
  274. if ((service->get_state() == service_state_t::STARTED
  275. || service->get_state() == service_state_t::STARTING)
  276. && service->is_start_pinned()) {
  277. ack_buf[0] = (char)cp_rply::PINNEDSTARTED;
  278. break;
  279. }
  280. if (gentle) {
  281. // Check dependents; return appropriate response if any will be affected
  282. bool has_dependents;
  283. if (!check_dependents(service, has_dependents)) {
  284. return false;
  285. }
  286. if (has_dependents) {
  287. // Reply packet has already been sent
  288. goto clear_out;
  289. }
  290. }
  291. service_state_t wanted_state;
  292. if (do_restart) {
  293. if (! service->restart()) {
  294. ack_buf[0] = (char)cp_rply::NAK;
  295. break;
  296. }
  297. wanted_state = service_state_t::STARTED;
  298. }
  299. else {
  300. if (do_pin) service->pin_stop();
  301. service->stop(true);
  302. service->forced_stop();
  303. wanted_state = service_state_t::STOPPED;
  304. }
  305. services->process_queues();
  306. if (service->get_state() == wanted_state && !do_restart) ack_buf[0] = (char)cp_rply::ALREADYSS;
  307. break;
  308. }
  309. case cp_cmd::WAKESERVICE:
  310. {
  311. // re-attach a service to its (started) dependents, causing it to start.
  312. if (services->is_shutting_down()) {
  313. ack_buf[0] = (char)cp_rply::SHUTTINGDOWN;
  314. break;
  315. }
  316. if ((service->get_state() == service_state_t::STOPPED
  317. || service->get_state() == service_state_t::STOPPING)
  318. && service->is_stop_pinned()) {
  319. ack_buf[0] = (char)cp_rply::PINNEDSTOPPED;
  320. break;
  321. }
  322. bool found_dpt = false;
  323. for (auto dpt : service->get_dependents()) {
  324. if (dpt->is_only_ordering()) continue;
  325. auto from = dpt->get_from();
  326. auto from_state = from->get_state();
  327. if (from_state == service_state_t::STARTED || from_state == service_state_t::STARTING) {
  328. found_dpt = true;
  329. if (!dpt->holding_acq) {
  330. dpt->get_from()->start_dep(*dpt);
  331. }
  332. }
  333. }
  334. if (!found_dpt) {
  335. ack_buf[0] = (char)cp_rply::NAK;
  336. }
  337. if (do_pin) service->pin_start();
  338. services->process_queues();
  339. if (service->get_state() == service_state_t::STARTED) ack_buf[0] = (char)cp_rply::ALREADYSS;
  340. break;
  341. }
  342. case cp_cmd::RELEASESERVICE:
  343. // remove required mark, stop if not required by dependents
  344. if (do_pin) service->pin_stop();
  345. service->stop(false);
  346. services->process_queues();
  347. if (service->get_state() == service_state_t::STOPPED) ack_buf[0] = (char)cp_rply::ALREADYSS;
  348. break;
  349. default:
  350. // avoid warning for unhandled switch/case values
  351. return false;
  352. }
  353. if (! queue_packet(ack_buf, 1)) return false;
  354. }
  355. clear_out:
  356. // Clear the packet from the buffer
  357. rbuf.consume(pkt_size);
  358. chklen = 0;
  359. return true;
  360. }
  361. bool control_conn_t::process_unpin_service()
  362. {
  363. using std::string;
  364. constexpr int pkt_size = 1 + sizeof(handle_t);
  365. if (rbuf.get_length() < pkt_size) {
  366. chklen = pkt_size;
  367. return true;
  368. }
  369. // 1 byte: packet type
  370. // 4 bytes: service handle
  371. handle_t handle;
  372. rbuf.extract((char *) &handle, 1, sizeof(handle));
  373. service_record *service = find_service_for_key(handle);
  374. if (service == nullptr) {
  375. // Service handle is bad
  376. char badreqRep[] = { (char)cp_rply::BADREQ };
  377. if (! queue_packet(badreqRep, 1)) return false;
  378. bad_conn_close = true;
  379. return true;
  380. }
  381. service->unpin();
  382. services->process_queues();
  383. char ack_buf[] = { (char) cp_rply::ACK };
  384. if (! queue_packet(ack_buf, 1)) return false;
  385. // Clear the packet from the buffer
  386. rbuf.consume(pkt_size);
  387. chklen = 0;
  388. return true;
  389. }
  390. bool control_conn_t::process_unload_service()
  391. {
  392. using std::string;
  393. constexpr int pkt_size = 1 + sizeof(handle_t);
  394. if (rbuf.get_length() < pkt_size) {
  395. chklen = pkt_size;
  396. return true;
  397. }
  398. // 1 byte: packet type
  399. // 4 bytes: service handle
  400. handle_t handle;
  401. rbuf.extract((char *) &handle, 1, sizeof(handle));
  402. service_record *service = find_service_for_key(handle);
  403. if (service == nullptr) {
  404. // Service handle is bad
  405. char badreq_rep[] = { (char)cp_rply::BADREQ };
  406. if (! queue_packet(badreq_rep, 1)) return false;
  407. bad_conn_close = true;
  408. return true;
  409. }
  410. if (!service->has_lone_ref() || service->get_state() != service_state_t::STOPPED) {
  411. // Cannot unload: has other references
  412. char nak_rep[] = { (char)cp_rply::NAK };
  413. if (!queue_packet(nak_rep, 1)) return false;
  414. }
  415. else {
  416. // unload (this may fail with bad_alloc)
  417. services->unload_service(service);
  418. // drop handle
  419. service_key_map.erase(service);
  420. key_service_map.erase(handle);
  421. // send ack
  422. char ack_buf[] = { (char) cp_rply::ACK };
  423. if (!queue_packet(ack_buf, 1)) return false;
  424. }
  425. // Clear the packet from the buffer
  426. rbuf.consume(pkt_size);
  427. chklen = 0;
  428. return true;
  429. }
  430. bool control_conn_t::process_reload_service()
  431. {
  432. using std::string;
  433. constexpr int pkt_size = 1 + sizeof(handle_t);
  434. if (rbuf.get_length() < pkt_size) {
  435. chklen = pkt_size;
  436. return true;
  437. }
  438. // 1 byte: packet type
  439. // 4 bytes: service handle
  440. handle_t handle;
  441. rbuf.extract((char *) &handle, 1, sizeof(handle));
  442. service_record *service = find_service_for_key(handle);
  443. if (service == nullptr) {
  444. // Service handle is bad
  445. char badreq_rep[] = { (char)cp_rply::BADREQ };
  446. if (! queue_packet(badreq_rep, 1)) return false;
  447. bad_conn_close = true;
  448. return true;
  449. }
  450. if (!service->has_lone_ref(false)) {
  451. // Cannot unload: has other references
  452. char nak_rep[] = { (char)cp_rply::NAK };
  453. if (! queue_packet(nak_rep, 1)) return false;
  454. }
  455. else {
  456. try {
  457. // drop handle
  458. key_service_map.erase(handle);
  459. service_key_map.erase(service);
  460. // reload
  461. service->remove_listener(this);
  462. services->reload_service(service);
  463. services->process_queues();
  464. // send ack
  465. char ack_buf[] = { (char) cp_rply::ACK };
  466. if (! queue_packet(ack_buf, 1)) return false;
  467. }
  468. catch (service_load_exc &slexc) {
  469. log(loglevel_t::ERROR, "Could not reload service ", slexc.service_name, ": ",
  470. slexc.exc_description);
  471. char nak_rep[] = { (char)cp_rply::NAK };
  472. if (! queue_packet(nak_rep, 1)) return false;
  473. }
  474. }
  475. // Clear the packet from the buffer
  476. rbuf.consume(pkt_size);
  477. chklen = 0;
  478. return true;
  479. }
  480. constexpr static unsigned SIZEOF_INT_PIDT_UNION = ((sizeof(pid_t) > sizeof(int)) ? sizeof(pid_t) : sizeof(int));
  481. constexpr static unsigned STATUS_BUFFER_SIZE = 6 + SIZEOF_INT_PIDT_UNION;
  482. static void fill_status_buffer(char *buffer, service_record *service)
  483. {
  484. buffer[0] = static_cast<char>(service->get_state());
  485. buffer[1] = static_cast<char>(service->get_target_state());
  486. pid_t proc_pid = service->get_pid();
  487. char b0 = service->is_waiting_for_console() ? 1 : 0;
  488. b0 |= service->has_console() ? 2 : 0;
  489. b0 |= service->was_start_skipped() ? 4 : 0;
  490. b0 |= service->is_marked_active() ? 8 : 0;
  491. b0 |= (proc_pid != -1) ? 16 : 0;
  492. buffer[2] = b0;
  493. buffer[3] = static_cast<char>(service->get_stop_reason());
  494. buffer[4] = 0; // (if exec failed, these are replaced with stage)
  495. buffer[5] = 0;
  496. if (proc_pid != -1) {
  497. memcpy(buffer + 6, &proc_pid, sizeof(proc_pid));
  498. }
  499. else {
  500. // These values only make sense in STOPPING/STOPPED, but we'll fill them in regardless:
  501. if (buffer[3] == (char)stopped_reason_t::EXECFAILED) {
  502. base_process_service *bsp = (base_process_service *)service;
  503. run_proc_err exec_err = bsp->get_exec_err_info();
  504. uint16_t stage = (uint16_t)exec_err.stage;
  505. memcpy(buffer + 4, &stage, 2);
  506. memcpy(buffer + 6, &exec_err.st_errno, sizeof(int));
  507. }
  508. else {
  509. int exit_status = service->get_exit_status();
  510. memcpy(buffer + 6, &exit_status, sizeof(exit_status));
  511. }
  512. }
  513. }
  514. bool control_conn_t::list_services()
  515. {
  516. rbuf.consume(1); // clear request packet
  517. chklen = 0;
  518. try {
  519. auto slist = services->list_services();
  520. for (auto sptr : slist) {
  521. if (sptr->get_type() == service_type_t::PLACEHOLDER) continue;
  522. std::vector<char> pkt_buf;
  523. int hdrsize = 2 + STATUS_BUFFER_SIZE;
  524. const std::string &name = sptr->get_name();
  525. int nameLen = std::min((size_t)256, name.length());
  526. pkt_buf.resize(hdrsize + nameLen);
  527. pkt_buf[0] = (char)cp_rply::SVCINFO;
  528. pkt_buf[1] = nameLen;
  529. fill_status_buffer(&pkt_buf[2], sptr);
  530. for (int i = 0; i < nameLen; i++) {
  531. pkt_buf[hdrsize+i] = name[i];
  532. }
  533. if (!queue_packet(std::move(pkt_buf))) return false;
  534. }
  535. char ack_buf[] = { (char) cp_rply::LISTDONE };
  536. if (! queue_packet(ack_buf, 1)) return false;
  537. return true;
  538. }
  539. catch (std::bad_alloc &exc)
  540. {
  541. do_oom_close();
  542. return true;
  543. }
  544. }
  545. bool control_conn_t::process_service_status()
  546. {
  547. constexpr int pkt_size = 1 + sizeof(handle_t);
  548. if (rbuf.get_length() < pkt_size) {
  549. chklen = pkt_size;
  550. return true;
  551. }
  552. handle_t handle;
  553. rbuf.extract(&handle, 1, sizeof(handle));
  554. rbuf.consume(pkt_size);
  555. chklen = 0;
  556. service_record *service = find_service_for_key(handle);
  557. if (service == nullptr || service->get_name().length() > std::numeric_limits<uint16_t>::max()) {
  558. char nak_rep[] = { (char)cp_rply::NAK };
  559. return queue_packet(nak_rep, 1);
  560. }
  561. // Reply:
  562. // 1 byte packet type = cp_rply::SERVICESTATUS
  563. // 1 byte reserved ( = 0)
  564. // STATUS_BUFFER_SIZE bytes status
  565. std::vector<char> pkt_buf(2 + STATUS_BUFFER_SIZE);
  566. pkt_buf[0] = (char)cp_rply::SERVICESTATUS;
  567. pkt_buf[1] = 0;
  568. fill_status_buffer(pkt_buf.data() + 2, service);
  569. return queue_packet(std::move(pkt_buf));
  570. }
  571. bool control_conn_t::add_service_dep(bool do_enable)
  572. {
  573. // 1 byte packet type
  574. // 1 byte dependency type
  575. // handle: "from"
  576. // handle: "to"
  577. constexpr int pkt_size = 2 + sizeof(handle_t) * 2;
  578. if (rbuf.get_length() < pkt_size) {
  579. chklen = pkt_size;
  580. return true;
  581. }
  582. handle_t from_handle;
  583. handle_t to_handle;
  584. rbuf.extract((char *) &from_handle, 2, sizeof(from_handle));
  585. rbuf.extract((char *) &to_handle, 2 + sizeof(from_handle), sizeof(to_handle));
  586. service_record *from_service = find_service_for_key(from_handle);
  587. service_record *to_service = find_service_for_key(to_handle);
  588. if (from_service == nullptr || to_service == nullptr || from_service == to_service) {
  589. // Service handle is bad
  590. char badreq_rep[] = { (char)cp_rply::BADREQ };
  591. if (!queue_packet(badreq_rep, 1)) return false;
  592. bad_conn_close = true;
  593. return true;
  594. }
  595. // Check dependency type is valid:
  596. int dep_type_int = rbuf[1];
  597. if (!contains({dependency_type::MILESTONE, dependency_type::REGULAR,
  598. dependency_type::WAITS_FOR}, dep_type_int)) {
  599. char badreqRep[] = { (char)cp_rply::BADREQ };
  600. if (!queue_packet(badreqRep, 1)) return false;
  601. bad_conn_close = true;
  602. }
  603. dependency_type dep_type = static_cast<dependency_type>(dep_type_int);
  604. // Check current service states are valid for given dep type
  605. if (dep_type == dependency_type::REGULAR) {
  606. if (from_service->get_state() != service_state_t::STOPPED &&
  607. to_service->get_state() != service_state_t::STARTED) {
  608. // Cannot create dependency now since it would be contradicted:
  609. char nak_rep[] = { (char)cp_rply::NAK };
  610. if (! queue_packet(nak_rep, 1)) return false;
  611. rbuf.consume(pkt_size);
  612. chklen = 0;
  613. return true;
  614. }
  615. }
  616. // Check for creation of circular dependency chain
  617. std::unordered_set<service_record *> dep_marks;
  618. std::vector<service_record *> dep_queue;
  619. dep_queue.push_back(to_service);
  620. while (! dep_queue.empty()) {
  621. service_record * sr = dep_queue.back();
  622. dep_queue.pop_back();
  623. // iterate deps; if dep == from, abort; otherwise add to set/queue
  624. // (only add to queue if not already in set)
  625. for (auto &dep : sr->get_dependencies()) {
  626. service_record * dep_to = dep.get_to();
  627. if (dep_to == from_service) {
  628. // fail, circular dependency!
  629. char nak_rep[] = { (char)cp_rply::NAK };
  630. if (! queue_packet(nak_rep, 1)) return false;
  631. rbuf.consume(pkt_size);
  632. chklen = 0;
  633. return true;
  634. }
  635. if (dep_marks.insert(dep_to).second) {
  636. dep_queue.push_back(dep_to);
  637. }
  638. }
  639. }
  640. dep_marks.clear();
  641. dep_queue.clear();
  642. bool dep_exists = false;
  643. service_dep * dep_record = nullptr;
  644. // Prevent creation of duplicate dependency:
  645. for (auto &dep : from_service->get_dependencies()) {
  646. service_record * dep_to = dep.get_to();
  647. if (dep_to == to_service && dep.dep_type == dep_type) {
  648. // Dependency already exists
  649. dep_exists = true;
  650. dep_record = &dep;
  651. break;
  652. }
  653. }
  654. if (! dep_exists) {
  655. // Create dependency:
  656. dep_record = &(from_service->add_dep(to_service, dep_type));
  657. services->process_queues();
  658. }
  659. if (do_enable && contains({service_state_t::STARTED, service_state_t::STARTING},
  660. from_service->get_state())) {
  661. // The dependency record is activated: mark it as holding acquisition of the dependency, and start
  662. // the dependency.
  663. if (!services->is_shutting_down()) {
  664. dep_record->get_from()->start_dep(*dep_record);
  665. services->process_queues();
  666. }
  667. }
  668. char ack_rep[] = { (char)cp_rply::ACK };
  669. if (! queue_packet(ack_rep, 1)) return false;
  670. rbuf.consume(pkt_size);
  671. chklen = 0;
  672. return true;
  673. }
  674. bool control_conn_t::rm_service_dep()
  675. {
  676. // 1 byte packet type
  677. // 1 byte dependency type
  678. // handle: "from"
  679. // handle: "to"
  680. constexpr int pkt_size = 2 + sizeof(handle_t) * 2;
  681. if (rbuf.get_length() < pkt_size) {
  682. chklen = pkt_size;
  683. return true;
  684. }
  685. handle_t from_handle;
  686. handle_t to_handle;
  687. rbuf.extract((char *) &from_handle, 2, sizeof(from_handle));
  688. rbuf.extract((char *) &to_handle, 2 + sizeof(from_handle), sizeof(to_handle));
  689. service_record *from_service = find_service_for_key(from_handle);
  690. service_record *to_service = find_service_for_key(to_handle);
  691. if (from_service == nullptr || to_service == nullptr || from_service == to_service) {
  692. // Service handle is bad
  693. char badreq_rep[] = { (char)cp_rply::BADREQ };
  694. if (! queue_packet(badreq_rep, 1)) return false;
  695. bad_conn_close = true;
  696. return true;
  697. }
  698. // Check dependency type is valid:
  699. int dep_type_int = rbuf[1];
  700. if (! contains({dependency_type::MILESTONE, dependency_type::REGULAR,
  701. dependency_type::WAITS_FOR}, dep_type_int)) {
  702. char badreqRep[] = { (char)cp_rply::BADREQ };
  703. if (! queue_packet(badreqRep, 1)) return false;
  704. bad_conn_close = true;
  705. }
  706. dependency_type dep_type = static_cast<dependency_type>(dep_type_int);
  707. // Remove dependency:
  708. bool did_remove = from_service->rm_dep(to_service, dep_type);
  709. services->process_queues();
  710. char ack_rep[] = { did_remove ? (char)cp_rply::ACK : (char)cp_rply::NAK };
  711. if (! queue_packet(ack_rep, 1)) return false;
  712. rbuf.consume(pkt_size);
  713. chklen = 0;
  714. return true;
  715. }
  716. bool control_conn_t::process_query_name()
  717. {
  718. // 1 byte packet type
  719. // 1 byte reserved
  720. // handle: service
  721. constexpr int pkt_size = 2 + sizeof(handle_t);
  722. if (rbuf.get_length() < pkt_size) {
  723. chklen = pkt_size;
  724. return true;
  725. }
  726. handle_t handle;
  727. rbuf.extract(&handle, 2, sizeof(handle));
  728. rbuf.consume(pkt_size);
  729. chklen = 0;
  730. service_record *service = find_service_for_key(handle);
  731. if (service == nullptr || service->get_name().length() > std::numeric_limits<uint16_t>::max()) {
  732. char nak_rep[] = { (char)cp_rply::NAK };
  733. return queue_packet(nak_rep, 1);
  734. }
  735. // Reply:
  736. // 1 byte packet type = cp_rply::SERVICENAME
  737. // 1 byte reserved
  738. // uint16_t length
  739. // N bytes name
  740. std::vector<char> reply;
  741. const std::string &name = service->get_name();
  742. uint16_t name_length = name.length();
  743. reply.resize(2 + sizeof(uint16_t) + name_length);
  744. reply[0] = (char)cp_rply::SERVICENAME;
  745. memcpy(reply.data() + 2, &name_length, sizeof(name_length));
  746. memcpy(reply.data() + 2 + sizeof(uint16_t), name.c_str(), name_length);
  747. return queue_packet(std::move(reply));
  748. }
  749. bool control_conn_t::process_setenv()
  750. {
  751. using std::string;
  752. string envVar;
  753. typename string::size_type eq;
  754. constexpr int pkt_size = 4;
  755. char badreqRep[] = { (char)cp_rply::BADREQ };
  756. char okRep[] = { (char)cp_rply::ACK };
  757. if (rbuf.get_length() < pkt_size) {
  758. chklen = pkt_size;
  759. return true;
  760. }
  761. envvar_len_t envvar_len;
  762. rbuf.extract(&envvar_len, 1, sizeof(envvar_len));
  763. if (envvar_len <= 0 || envvar_len > (1024 - 3)) {
  764. goto badreq;
  765. }
  766. chklen = envvar_len + 1 + sizeof(envvar_len); // packet type + (2 byte) length + envvar
  767. if (rbuf.get_length() < chklen) {
  768. // packet not complete yet; read more
  769. return true;
  770. }
  771. envVar = rbuf.extract_string(3, envvar_len);
  772. eq = envVar.find('=');
  773. if (!eq || eq == envVar.npos) {
  774. // Not found or at the beginning of the string
  775. goto badreq;
  776. }
  777. main_env.set_var(std::move(envVar));
  778. // Success response
  779. if (!queue_packet(okRep, 1)) return false;
  780. // Clear the packet from the buffer
  781. rbuf.consume(chklen);
  782. chklen = 0;
  783. return true;
  784. badreq:
  785. // Queue error response / mark connection bad
  786. if (! queue_packet(badreqRep, 1)) return false;
  787. bad_conn_close = true;
  788. return true;
  789. }
  790. bool control_conn_t::process_set_trigger()
  791. {
  792. // 1 byte packet type
  793. // handle: service
  794. // 1 byte trigger value
  795. constexpr int pkt_size = 2 + sizeof(handle_t);
  796. if (rbuf.get_length() < pkt_size) {
  797. chklen = pkt_size;
  798. return true;
  799. }
  800. handle_t handle;
  801. trigger_val_t trigger_val;
  802. rbuf.extract(&handle, 1, sizeof(handle));
  803. rbuf.extract(&trigger_val, 1 + sizeof(handle), sizeof(trigger_val));
  804. rbuf.consume(pkt_size);
  805. chklen = 0;
  806. service_record *service = find_service_for_key(handle);
  807. if (service == nullptr || service->get_type() != service_type_t::TRIGGERED) {
  808. char nak_rep[] = { (char)cp_rply::NAK };
  809. return queue_packet(nak_rep, 1);
  810. }
  811. triggered_service *tservice = static_cast<triggered_service *>(service);
  812. tservice->set_trigger(trigger_val != 0);
  813. services->process_queues();
  814. char ack_rep[] = { (char)cp_rply::ACK };
  815. return queue_packet(ack_rep, 1);
  816. }
  817. bool control_conn_t::process_catlog()
  818. {
  819. // 1 byte packet type
  820. // 1 byte reserved for future use
  821. // handle
  822. constexpr int pkt_size = 2 + sizeof(handle_t);
  823. if (rbuf.get_length() < pkt_size) {
  824. chklen = pkt_size;
  825. return true;
  826. }
  827. handle_t handle;
  828. char flags = rbuf[1];
  829. rbuf.extract(&handle, 2, sizeof(handle));
  830. rbuf.consume(pkt_size);
  831. chklen = 0;
  832. service_record *service = find_service_for_key(handle);
  833. if (service == nullptr || (service->get_type() != service_type_t::PROCESS
  834. && service->get_type() != service_type_t::BGPROCESS
  835. && service->get_type() != service_type_t::SCRIPTED)) {
  836. char nak_rep[] = { (char)cp_rply::NAK };
  837. return queue_packet(nak_rep, 1);
  838. }
  839. base_process_service *bps = static_cast<base_process_service *>(service);
  840. if (bps->get_log_mode() != log_type_id::BUFFER) {
  841. char nak_rep[] = { (char)cp_rply::NAK };
  842. return queue_packet(nak_rep, 1);
  843. }
  844. auto buffer_details = bps->get_log_buffer();
  845. const char *bufaddr = buffer_details.first;
  846. unsigned buflen = buffer_details.second;
  847. std::vector<char> pkt = { (char)cp_rply::SERVICE_LOG, 0 /* flags; reserved for future */ };
  848. pkt.insert(pkt.end(), (char *)(&buflen), (char *)(&buflen + 1));
  849. pkt.insert(pkt.end(), bufaddr, bufaddr + buflen);
  850. if ((flags & 1) != 0) {
  851. bps->clear_log_buffer();
  852. }
  853. return queue_packet(std::move(pkt));
  854. }
  855. bool control_conn_t::process_signal()
  856. {
  857. // packet contains signal number and process handle
  858. constexpr int pkt_size = 1 + sizeof(int) + sizeof(handle_t);
  859. if (rbuf.get_length() < pkt_size) {
  860. chklen = pkt_size;
  861. return true;
  862. }
  863. sig_num_t sig_num;
  864. rbuf.extract(&sig_num, 1, sizeof(sig_num));
  865. handle_t handle;
  866. rbuf.extract(&handle, 1 + sizeof(sig_num), sizeof(handle));
  867. rbuf.consume(pkt_size);
  868. chklen = 0;
  869. service_record *service = find_service_for_key(handle);
  870. if (service == nullptr) {
  871. char nak_rep[] = { (char)cp_rply::NAK };
  872. return queue_packet(nak_rep, 1);
  873. }
  874. // Reply:
  875. // 1 byte packet type = cp_rply::*
  876. pid_t spid = service->get_pid();
  877. // we probably don't want to kill/signal every process (in the current group),
  878. // but get_pid() sometimes returns -1 if e.g. service is not 'started'
  879. if (spid == -1 || spid == 0) {
  880. char nak_rep[] = { (char)cp_rply::SIGNAL_NOPID };
  881. return queue_packet(nak_rep, 1);
  882. }
  883. else {
  884. if (bp_sys::kill(spid, sig_num) != 0) {
  885. if (errno == EINVAL) {
  886. log(loglevel_t::ERROR, "Requested signal not in valid signal range.");
  887. char nak_rep[] = { (char)cp_rply::SIGNAL_BADSIG };
  888. return queue_packet(nak_rep, 1);
  889. }
  890. log(loglevel_t::ERROR, "Error sending signal to process: ", strerror(errno));
  891. char nak_rep[] = { (char)cp_rply::SIGNAL_KILLERR };
  892. return queue_packet(nak_rep, 1);
  893. }
  894. }
  895. char ack_rep[] = { (char)cp_rply::ACK };
  896. return queue_packet(ack_rep, 1);
  897. }
  898. bool control_conn_t::process_query_dsc_dir()
  899. {
  900. // packet contains command byte, spare byte, and service handle
  901. constexpr int pkt_size = 2 + sizeof(handle_t);
  902. if (rbuf.get_length() < pkt_size) {
  903. chklen = pkt_size;
  904. return true;
  905. }
  906. bool spare_ok = (rbuf[1] == 0);
  907. handle_t handle;
  908. rbuf.extract(&handle, 2, sizeof(handle));
  909. rbuf.consume(pkt_size);
  910. chklen = 0;
  911. service_record *service = find_service_for_key(handle);
  912. if (service == nullptr || !spare_ok) {
  913. char nak_rep[] = { (char)cp_rply::NAK };
  914. return queue_packet(nak_rep, 1);
  915. }
  916. // Reply:
  917. // 1 byte packet type = cp_rply::SVCDSCDIR
  918. // 4 bytes (uint32_t) = directory length (no nul terminator)
  919. // N bytes = directory (no nul)
  920. std::vector<char> reppkt;
  921. size_t sdir_len = strlen(service->get_service_dsc_dir());
  922. reppkt.resize(1 + sizeof(uint32_t) + sdir_len); // packet type, dir length, dir
  923. reppkt[0] = (char)cp_rply::SVCDSCDIR;
  924. std::memcpy(&reppkt[1], &sdir_len, sizeof(sdir_len));
  925. std::memcpy(&reppkt[1 + sizeof(uint32_t)], service->get_service_dsc_dir(), sdir_len);
  926. if (! queue_packet(std::move(reppkt))) return false;
  927. return true;
  928. }
  929. bool control_conn_t::query_load_mech()
  930. {
  931. rbuf.consume(1);
  932. chklen = 0;
  933. if (services->get_set_type_id() == SSET_TYPE_DIRLOAD) {
  934. dirload_service_set *dss = static_cast<dirload_service_set *>(services);
  935. std::vector<char> reppkt;
  936. reppkt.resize(2 + sizeof(uint32_t) * 2); // packet type, loader type, packet size, # dirs
  937. reppkt[0] = (char)cp_rply::LOADER_MECH;
  938. reppkt[1] = SSET_TYPE_DIRLOAD;
  939. // Number of directories in load path:
  940. uint32_t sdirs = dss->get_service_dir_count();
  941. std::memcpy(reppkt.data() + 2 + sizeof(uint32_t), &sdirs, sizeof(sdirs));
  942. // Our current working directory, which above are relative to:
  943. // leave sizeof(uint32_t) for size, which we'll fill in afterwards:
  944. std::size_t curpos = reppkt.size() + sizeof(uint32_t);
  945. #ifdef PATH_MAX
  946. uint32_t try_path_size = PATH_MAX;
  947. #else
  948. uint32_t try_path_size = 2048;
  949. #endif
  950. char *wd;
  951. while (true) {
  952. std::size_t total_size = curpos + std::size_t(try_path_size);
  953. if (total_size < curpos) {
  954. // Overflow. In theory we could now limit to size_t max, but the size must already
  955. // be crazy long; let's abort.
  956. char ack_rep[] = { (char)cp_rply::NAK };
  957. if (! queue_packet(ack_rep, 1)) return false;
  958. return true;
  959. }
  960. reppkt.resize(total_size);
  961. wd = getcwd(reppkt.data() + curpos, try_path_size);
  962. if (wd != nullptr) break;
  963. // Keep doubling the path size we try until it's big enough, or we get numeric overflow
  964. uint32_t new_try_path_size = try_path_size * uint32_t(2u);
  965. if (new_try_path_size < try_path_size) {
  966. // Overflow.
  967. char ack_rep[] = { (char)cp_rply::NAK };
  968. return queue_packet(ack_rep, 1);
  969. }
  970. try_path_size = new_try_path_size;
  971. }
  972. uint32_t wd_len = std::strlen(reppkt.data() + curpos);
  973. reppkt.resize(curpos + std::size_t(wd_len));
  974. std::memcpy(reppkt.data() + curpos - sizeof(uint32_t), &wd_len, sizeof(wd_len));
  975. // Each directory in the load path:
  976. for (int i = 0; uint32_t(i) < sdirs; i++) {
  977. const char *sdir = dss->get_service_dir(i);
  978. uint32_t dlen = std::strlen(sdir);
  979. auto cursize = reppkt.size();
  980. reppkt.resize(cursize + sizeof(dlen) + dlen);
  981. std::memcpy(reppkt.data() + cursize, &dlen, sizeof(dlen));
  982. std::memcpy(reppkt.data() + cursize + sizeof(dlen), sdir, dlen);
  983. }
  984. // Total packet size:
  985. uint32_t fsize = reppkt.size();
  986. std::memcpy(reppkt.data() + 2, &fsize, sizeof(fsize));
  987. if (! queue_packet(std::move(reppkt))) return false;
  988. return true;
  989. }
  990. else {
  991. // If we don't know how to deal with the service set type, send a NAK reply:
  992. char ack_rep[] = { (char)cp_rply::NAK };
  993. return queue_packet(ack_rep, 1);
  994. }
  995. }
  996. handle_t control_conn_t::allocate_service_handle(service_record *record)
  997. {
  998. // Try to find a unique handle (integer) in a single pass. Since the map is ordered, we can search until
  999. // we find a gap in the handle values.
  1000. handle_t candidate = 0;
  1001. for (auto p : key_service_map) {
  1002. if (p.first == candidate) ++candidate;
  1003. else break;
  1004. }
  1005. bool is_unique = (service_key_map.find(record) == service_key_map.end());
  1006. // The following operations perform allocation (can throw std::bad_alloc). If an exception occurs we
  1007. // must undo any previous actions:
  1008. if (is_unique) {
  1009. record->add_listener(this);
  1010. }
  1011. try {
  1012. key_service_map[candidate] = record;
  1013. service_key_map.insert(std::make_pair(record, candidate));
  1014. }
  1015. catch (...) {
  1016. if (is_unique) {
  1017. record->remove_listener(this);
  1018. }
  1019. key_service_map.erase(candidate);
  1020. }
  1021. return candidate;
  1022. }
  1023. void control_conn_t::service_event(service_record *service, service_event_t event) noexcept
  1024. {
  1025. // For each service handle corresponding to the event, send an information packet.
  1026. auto range = service_key_map.equal_range(service);
  1027. auto &i = range.first;
  1028. auto &end = range.second;
  1029. try {
  1030. while (i != end) {
  1031. uint32_t key = i->second;
  1032. std::vector<char> pkt;
  1033. constexpr int pktsize = 3 + sizeof(key) + STATUS_BUFFER_SIZE;
  1034. pkt.reserve(pktsize);
  1035. pkt.push_back((char)cp_info::SERVICEEVENT);
  1036. pkt.push_back(pktsize);
  1037. char *p = (char *)&key;
  1038. for (unsigned j = 0; j < sizeof(key); j++) {
  1039. pkt.push_back(*p++);
  1040. }
  1041. pkt.push_back(static_cast<char>(event));
  1042. pkt.resize(pktsize);
  1043. fill_status_buffer(pkt.data() + 3 + sizeof(key), service);
  1044. queue_packet(std::move(pkt));
  1045. ++i;
  1046. }
  1047. }
  1048. catch (std::bad_alloc &exc) {
  1049. do_oom_close();
  1050. }
  1051. }
  1052. bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
  1053. {
  1054. bool was_empty = outbuf.empty();
  1055. // If the queue is empty, we can try to write the packet out now rather than queueing it.
  1056. // If the write is unsuccessful or partial, we queue the remainder.
  1057. if (was_empty) {
  1058. int wr = bp_sys::write(iob.get_watched_fd(), pkt, size);
  1059. if (wr == -1) {
  1060. if (errno == EPIPE) {
  1061. return false;
  1062. }
  1063. if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
  1064. log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
  1065. return false;
  1066. }
  1067. // EAGAIN etc: fall through to below
  1068. }
  1069. else {
  1070. if ((unsigned)wr == size) {
  1071. // Ok, all written.
  1072. return true;
  1073. }
  1074. pkt += wr;
  1075. size -= wr;
  1076. }
  1077. }
  1078. // Create a vector out of the (remaining part of the) packet:
  1079. try {
  1080. outbuf.emplace_back(pkt, pkt + size);
  1081. outbuf_size += size;
  1082. return true;
  1083. }
  1084. catch (std::bad_alloc &baexc) {
  1085. // Mark the connection bad, and stop reading further requests
  1086. bad_conn_close = true;
  1087. oom_close = true;
  1088. if (was_empty) {
  1089. // We can't send out-of-memory response as we already wrote as much as we
  1090. // could above. Neither can we later send the response since we have currently
  1091. // sent an incomplete packet. All we can do is close the connection.
  1092. return false;
  1093. }
  1094. else {
  1095. return true;
  1096. }
  1097. }
  1098. }
  1099. // This queue_packet method is frustratingly similar to the one above, but the subtle differences
  1100. // make them extraordinary difficult to combine into a single method.
  1101. bool control_conn_t::queue_packet(std::vector<char> &&pkt) noexcept
  1102. {
  1103. bool was_empty = outbuf.empty();
  1104. if (was_empty) {
  1105. outpkt_index = 0;
  1106. // We can try sending the packet immediately:
  1107. int wr = bp_sys::write(iob.get_watched_fd(), pkt.data(), pkt.size());
  1108. if (wr == -1) {
  1109. if (errno == EPIPE) {
  1110. return false;
  1111. }
  1112. if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
  1113. log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
  1114. return false;
  1115. }
  1116. // EAGAIN etc: fall through to below
  1117. }
  1118. else {
  1119. if ((unsigned)wr == pkt.size()) {
  1120. // Ok, all written.
  1121. return true;
  1122. }
  1123. outpkt_index = wr;
  1124. }
  1125. }
  1126. try {
  1127. outbuf.emplace_back(std::move(pkt));
  1128. outbuf_size += pkt.size();
  1129. return true;
  1130. }
  1131. catch (std::bad_alloc &baexc) {
  1132. // Mark the connection bad, and stop reading further requests
  1133. bad_conn_close = true;
  1134. oom_close = true;
  1135. if (was_empty) {
  1136. // We can't send out-of-memory response as we already wrote as much as we
  1137. // could above. Neither can we later send the response since we have currently
  1138. // sent an incomplete packet. All we can do is close the connection.
  1139. return false;
  1140. }
  1141. else {
  1142. return true;
  1143. }
  1144. }
  1145. }
  1146. bool control_conn_t::data_ready() noexcept
  1147. {
  1148. int fd = iob.get_watched_fd();
  1149. int r = rbuf.fill(fd);
  1150. // Note file descriptor is non-blocking
  1151. if (r == -1) {
  1152. if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
  1153. if (errno != ECONNRESET) {
  1154. log(loglevel_t::WARN, "Error reading from control connection: ", strerror(errno));
  1155. }
  1156. return true;
  1157. }
  1158. return false;
  1159. }
  1160. if (r == 0) {
  1161. return true;
  1162. }
  1163. // complete packet?
  1164. if (rbuf.get_length() >= chklen) {
  1165. try {
  1166. return !process_packet();
  1167. }
  1168. catch (std::bad_alloc &baexc) {
  1169. do_oom_close();
  1170. return false;
  1171. }
  1172. }
  1173. else if (rbuf.get_length() == rbuf.get_size()) {
  1174. // Too big packet
  1175. log(loglevel_t::WARN, "Received too-large control packet; dropping connection");
  1176. bad_conn_close = true;
  1177. }
  1178. return false;
  1179. }
  1180. bool control_conn_t::send_data() noexcept
  1181. {
  1182. if (outbuf.empty() && bad_conn_close) {
  1183. if (oom_close) {
  1184. // Send oom response
  1185. char oomBuf[] = { (char)cp_rply::OOM };
  1186. bp_sys::write(iob.get_watched_fd(), oomBuf, 1);
  1187. }
  1188. return true;
  1189. }
  1190. vector<char> & pkt = outbuf.front();
  1191. char *data = pkt.data();
  1192. int written = bp_sys::write(iob.get_watched_fd(), data + outpkt_index, pkt.size() - outpkt_index);
  1193. if (written == -1) {
  1194. if (errno == EPIPE) {
  1195. // read end closed
  1196. return true;
  1197. }
  1198. else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
  1199. // spurious readiness notification?
  1200. return false;
  1201. }
  1202. else {
  1203. log(loglevel_t::ERROR, "Error writing to control connection: ", strerror(errno));
  1204. return true;
  1205. }
  1206. }
  1207. outpkt_index += written;
  1208. if (outpkt_index == pkt.size()) {
  1209. // We've finished this packet, move on to the next:
  1210. outbuf_size -= pkt.size();
  1211. outbuf.pop_front();
  1212. outpkt_index = 0;
  1213. if (oom_close) {
  1214. // remain active, try to send cp_rply::OOM shortly
  1215. return false;
  1216. }
  1217. if (outbuf.empty() && bad_conn_close) {
  1218. return true;
  1219. }
  1220. }
  1221. // more to send
  1222. return false;
  1223. }
  1224. control_conn_t::~control_conn_t() noexcept
  1225. {
  1226. int fd = iob.get_watched_fd();
  1227. iob.deregister(loop);
  1228. bp_sys::close(fd);
  1229. // Clear service listeners
  1230. for (auto p : service_key_map) {
  1231. p.first->remove_listener(this);
  1232. }
  1233. active_control_conns--;
  1234. }