2
0

mqttd.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089
  1. /***************************************************************************
  2. * _ _ ____ _
  3. * Project ___| | | | _ \| |
  4. * / __| | | | |_) | |
  5. * | (__| |_| | _ <| |___
  6. * \___|\___/|_| \_\_____|
  7. *
  8. * Copyright (C) 1998 - 2022, Daniel Stenberg, <daniel@haxx.se>, et al.
  9. *
  10. * This software is licensed as described in the file COPYING, which
  11. * you should have received as part of this distribution. The terms
  12. * are also available at https://curl.se/docs/copyright.html.
  13. *
  14. * You may opt to use, copy, modify, merge, publish, distribute and/or sell
  15. * copies of the Software, and permit persons to whom the Software is
  16. * furnished to do so, under the terms of the COPYING file.
  17. *
  18. * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
  19. * KIND, either express or implied.
  20. *
  21. * SPDX-License-Identifier: curl
  22. *
  23. ***************************************************************************/
  24. #include "server_setup.h"
  25. #include <stdlib.h>
  26. #include <string.h>
  27. #include "util.h"
  28. /* Function
  29. *
  30. * Accepts a TCP connection on a custom port (IPv4 or IPv6). Speaks MQTT.
  31. *
  32. * Read commands from FILE (set with --config). The commands control how to
  33. * act and is reset to defaults each client TCP connect.
  34. *
  35. * Config file keywords:
  36. *
  37. * TODO
  38. */
  39. /* based on sockfilt.c */
  40. #ifdef HAVE_SIGNAL_H
  41. #include <signal.h>
  42. #endif
  43. #ifdef HAVE_NETINET_IN_H
  44. #include <netinet/in.h>
  45. #endif
  46. #ifdef HAVE_NETINET_IN6_H
  47. #include <netinet/in6.h>
  48. #endif
  49. #ifdef HAVE_ARPA_INET_H
  50. #include <arpa/inet.h>
  51. #endif
  52. #ifdef HAVE_NETDB_H
  53. #include <netdb.h>
  54. #endif
  55. #define ENABLE_CURLX_PRINTF
  56. /* make the curlx header define all printf() functions to use the curlx_*
  57. versions instead */
  58. #include "curlx.h" /* from the private lib dir */
  59. #include "getpart.h"
  60. #include "inet_pton.h"
  61. #include "server_sockaddr.h"
  62. #include "warnless.h"
  63. /* include memdebug.h last */
  64. #include "memdebug.h"
  65. #ifdef USE_WINSOCK
  66. #undef EINTR
  67. #define EINTR 4 /* errno.h value */
  68. #undef EAGAIN
  69. #define EAGAIN 11 /* errno.h value */
  70. #undef ENOMEM
  71. #define ENOMEM 12 /* errno.h value */
  72. #undef EINVAL
  73. #define EINVAL 22 /* errno.h value */
  74. #endif
  75. #define DEFAULT_PORT 1883 /* MQTT default port */
  76. #ifndef DEFAULT_LOGFILE
  77. #define DEFAULT_LOGFILE "log/mqttd.log"
  78. #endif
  79. #ifndef DEFAULT_CONFIG
  80. #define DEFAULT_CONFIG "mqttd.config"
  81. #endif
  82. #define MQTT_MSG_CONNECT 0x10
  83. #define MQTT_MSG_CONNACK 0x20
  84. #define MQTT_MSG_PUBLISH 0x30
  85. #define MQTT_MSG_PUBACK 0x40
  86. #define MQTT_MSG_SUBSCRIBE 0x82
  87. #define MQTT_MSG_SUBACK 0x90
  88. #define MQTT_MSG_DISCONNECT 0xe0
  89. #define MQTT_CONNACK_LEN 4
  90. #define MQTT_SUBACK_LEN 5
  91. #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
  92. #define MQTT_HEADER_LEN 5 /* max 5 bytes */
  93. struct configurable {
  94. unsigned char version; /* initial version byte in the request must match
  95. this */
  96. bool publish_before_suback;
  97. bool short_publish;
  98. bool excessive_remaining;
  99. unsigned char error_connack;
  100. int testnum;
  101. };
  102. #define REQUEST_DUMP "log/server.input"
  103. #define CONFIG_VERSION 5
  104. static struct configurable config;
  105. const char *serverlogfile = DEFAULT_LOGFILE;
  106. static const char *configfile = DEFAULT_CONFIG;
  107. #ifdef ENABLE_IPV6
  108. static bool use_ipv6 = FALSE;
  109. #endif
  110. static const char *ipv_inuse = "IPv4";
  111. static unsigned short port = DEFAULT_PORT;
  112. static void resetdefaults(void)
  113. {
  114. logmsg("Reset to defaults");
  115. config.version = CONFIG_VERSION;
  116. config.publish_before_suback = FALSE;
  117. config.short_publish = FALSE;
  118. config.excessive_remaining = FALSE;
  119. config.error_connack = 0;
  120. config.testnum = 0;
  121. }
  122. static unsigned char byteval(char *value)
  123. {
  124. unsigned long num = strtoul(value, NULL, 10);
  125. return num & 0xff;
  126. }
  127. static void getconfig(void)
  128. {
  129. FILE *fp = fopen(configfile, FOPEN_READTEXT);
  130. resetdefaults();
  131. if(fp) {
  132. char buffer[512];
  133. logmsg("parse config file");
  134. while(fgets(buffer, sizeof(buffer), fp)) {
  135. char key[32];
  136. char value[32];
  137. if(2 == sscanf(buffer, "%31s %31s", key, value)) {
  138. if(!strcmp(key, "version")) {
  139. config.version = byteval(value);
  140. logmsg("version [%d] set", config.version);
  141. }
  142. else if(!strcmp(key, "PUBLISH-before-SUBACK")) {
  143. logmsg("PUBLISH-before-SUBACK set");
  144. config.publish_before_suback = TRUE;
  145. }
  146. else if(!strcmp(key, "short-PUBLISH")) {
  147. logmsg("short-PUBLISH set");
  148. config.short_publish = TRUE;
  149. }
  150. else if(!strcmp(key, "error-CONNACK")) {
  151. config.error_connack = byteval(value);
  152. logmsg("error-CONNACK = %d", config.error_connack);
  153. }
  154. else if(!strcmp(key, "Testnum")) {
  155. config.testnum = atoi(value);
  156. logmsg("testnum = %d", config.testnum);
  157. }
  158. else if(!strcmp(key, "excessive-remaining")) {
  159. logmsg("excessive-remaining set");
  160. config.excessive_remaining = TRUE;
  161. }
  162. }
  163. }
  164. fclose(fp);
  165. }
  166. else {
  167. logmsg("No config file '%s' to read", configfile);
  168. }
  169. }
  170. static void loghex(unsigned char *buffer, ssize_t len)
  171. {
  172. char data[12000];
  173. ssize_t i;
  174. unsigned char *ptr = buffer;
  175. char *optr = data;
  176. ssize_t width = 0;
  177. int left = sizeof(data);
  178. for(i = 0; i<len && (left >= 0); i++) {
  179. msnprintf(optr, left, "%02x", ptr[i]);
  180. width += 2;
  181. optr += 2;
  182. left -= 2;
  183. }
  184. if(width)
  185. logmsg("'%s'", data);
  186. }
  187. typedef enum {
  188. FROM_CLIENT,
  189. FROM_SERVER
  190. } mqttdir;
  191. static void logprotocol(mqttdir dir,
  192. const char *prefix, size_t remlen,
  193. FILE *output,
  194. unsigned char *buffer, ssize_t len)
  195. {
  196. char data[12000] = "";
  197. ssize_t i;
  198. unsigned char *ptr = buffer;
  199. char *optr = data;
  200. int left = sizeof(data);
  201. for(i = 0; i<len && (left >= 0); i++) {
  202. msnprintf(optr, left, "%02x", ptr[i]);
  203. optr += 2;
  204. left -= 2;
  205. }
  206. fprintf(output, "%s %s %zx %s\n",
  207. dir == FROM_CLIENT? "client": "server",
  208. prefix, remlen,
  209. data);
  210. }
  211. /* return 0 on success */
  212. static int connack(FILE *dump, curl_socket_t fd)
  213. {
  214. unsigned char packet[]={
  215. MQTT_MSG_CONNACK, 0x02,
  216. 0x00, 0x00
  217. };
  218. ssize_t rc;
  219. packet[3] = config.error_connack;
  220. rc = swrite(fd, (char *)packet, sizeof(packet));
  221. if(rc > 0) {
  222. logmsg("WROTE %d bytes [CONNACK]", rc);
  223. loghex(packet, rc);
  224. logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet));
  225. }
  226. if(rc == sizeof(packet)) {
  227. return 0;
  228. }
  229. return 1;
  230. }
  231. /* return 0 on success */
  232. static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid)
  233. {
  234. unsigned char packet[]={
  235. MQTT_MSG_SUBACK, 0x03,
  236. 0, 0, /* filled in below */
  237. 0x00
  238. };
  239. ssize_t rc;
  240. packet[2] = (unsigned char)(packetid >> 8);
  241. packet[3] = (unsigned char)(packetid & 0xff);
  242. rc = swrite(fd, (char *)packet, sizeof(packet));
  243. if(rc == sizeof(packet)) {
  244. logmsg("WROTE %d bytes [SUBACK]", rc);
  245. loghex(packet, rc);
  246. logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc);
  247. return 0;
  248. }
  249. return 1;
  250. }
  251. #ifdef QOS
  252. /* return 0 on success */
  253. static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid)
  254. {
  255. unsigned char packet[]={
  256. MQTT_MSG_PUBACK, 0x00,
  257. 0, 0 /* filled in below */
  258. };
  259. ssize_t rc;
  260. packet[2] = (unsigned char)(packetid >> 8);
  261. packet[3] = (unsigned char)(packetid & 0xff);
  262. rc = swrite(fd, (char *)packet, sizeof(packet));
  263. if(rc == sizeof(packet)) {
  264. logmsg("WROTE %d bytes [PUBACK]", rc);
  265. loghex(packet, rc);
  266. logprotocol(FROM_SERVER, dump, packet, rc);
  267. return 0;
  268. }
  269. logmsg("Failed sending [PUBACK]");
  270. return 1;
  271. }
  272. #endif
  273. /* return 0 on success */
  274. static int disconnect(FILE *dump, curl_socket_t fd)
  275. {
  276. unsigned char packet[]={
  277. MQTT_MSG_DISCONNECT, 0x00,
  278. };
  279. ssize_t rc = swrite(fd, (char *)packet, sizeof(packet));
  280. if(rc == sizeof(packet)) {
  281. logmsg("WROTE %d bytes [DISCONNECT]", rc);
  282. loghex(packet, rc);
  283. logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc);
  284. return 0;
  285. }
  286. logmsg("Failed sending [DISCONNECT]");
  287. return 1;
  288. }
  289. /*
  290. do
  291. encodedByte = X MOD 128
  292. X = X DIV 128
  293. // if there are more data to encode, set the top bit of this byte
  294. if ( X > 0 )
  295. encodedByte = encodedByte OR 128
  296. endif
  297. 'output' encodedByte
  298. while ( X > 0 )
  299. */
  300. /* return number of bytes used */
  301. static int encode_length(size_t packetlen,
  302. unsigned char *remlength) /* 4 bytes */
  303. {
  304. int bytes = 0;
  305. unsigned char encode;
  306. do {
  307. encode = packetlen % 0x80;
  308. packetlen /= 0x80;
  309. if(packetlen)
  310. encode |= 0x80;
  311. remlength[bytes++] = encode;
  312. if(bytes > 3) {
  313. logmsg("too large packet!");
  314. return 0;
  315. }
  316. } while(packetlen);
  317. return bytes;
  318. }
  319. static size_t decode_length(unsigned char *buf,
  320. size_t buflen, size_t *lenbytes)
  321. {
  322. size_t len = 0;
  323. size_t mult = 1;
  324. size_t i;
  325. unsigned char encoded = 0x80;
  326. for(i = 0; (i < buflen) && (encoded & 0x80); i++) {
  327. encoded = buf[i];
  328. len += (encoded & 0x7f) * mult;
  329. mult *= 0x80;
  330. }
  331. if(lenbytes)
  332. *lenbytes = i;
  333. return len;
  334. }
  335. /* return 0 on success */
  336. static int publish(FILE *dump,
  337. curl_socket_t fd, unsigned short packetid,
  338. char *topic, char *payload, size_t payloadlen)
  339. {
  340. size_t topiclen = strlen(topic);
  341. unsigned char *packet;
  342. size_t payloadindex;
  343. ssize_t remaininglength = topiclen + 2 + payloadlen;
  344. ssize_t packetlen;
  345. ssize_t sendamount;
  346. ssize_t rc;
  347. unsigned char rembuffer[4];
  348. int encodedlen;
  349. if(config.excessive_remaining) {
  350. /* manually set illegal remaining length */
  351. rembuffer[0] = 0xff;
  352. rembuffer[1] = 0xff;
  353. rembuffer[2] = 0xff;
  354. rembuffer[3] = 0x80; /* maximum allowed here by spec is 0x7f */
  355. encodedlen = 4;
  356. }
  357. else
  358. encodedlen = encode_length(remaininglength, rembuffer);
  359. /* one packet type byte (possibly two more for packetid) */
  360. packetlen = remaininglength + encodedlen + 1;
  361. packet = malloc(packetlen);
  362. if(!packet)
  363. return 1;
  364. packet[0] = MQTT_MSG_PUBLISH; /* TODO: set QoS? */
  365. memcpy(&packet[1], rembuffer, encodedlen);
  366. (void)packetid;
  367. /* packet_id if QoS is set */
  368. packet[1 + encodedlen] = (unsigned char)(topiclen >> 8);
  369. packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff);
  370. memcpy(&packet[3 + encodedlen], topic, topiclen);
  371. payloadindex = 3 + topiclen + encodedlen;
  372. memcpy(&packet[payloadindex], payload, payloadlen);
  373. sendamount = packetlen;
  374. if(config.short_publish)
  375. sendamount -= 2;
  376. rc = swrite(fd, (char *)packet, sendamount);
  377. if(rc > 0) {
  378. logmsg("WROTE %d bytes [PUBLISH]", rc);
  379. loghex(packet, rc);
  380. logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
  381. }
  382. if(rc == packetlen)
  383. return 0;
  384. return 1;
  385. }
  386. #define MAX_TOPIC_LENGTH 65535
  387. #define MAX_CLIENT_ID_LENGTH 32
  388. static char topic[MAX_TOPIC_LENGTH + 1];
  389. static int fixedheader(curl_socket_t fd,
  390. unsigned char *bytep,
  391. size_t *remaining_lengthp,
  392. size_t *remaining_length_bytesp)
  393. {
  394. /* get the fixed header */
  395. unsigned char buffer[10];
  396. /* get the first two bytes */
  397. ssize_t rc = sread(fd, (char *)buffer, 2);
  398. int i;
  399. if(rc < 2) {
  400. logmsg("READ %d bytes [SHORT!]", rc);
  401. return 1; /* fail */
  402. }
  403. logmsg("READ %d bytes", rc);
  404. loghex(buffer, rc);
  405. *bytep = buffer[0];
  406. /* if the length byte has the top bit set, get the next one too */
  407. i = 1;
  408. while(buffer[i] & 0x80) {
  409. i++;
  410. rc = sread(fd, (char *)&buffer[i], 1);
  411. if(rc != 1) {
  412. logmsg("Remaining Length broken");
  413. return 1;
  414. }
  415. }
  416. *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp);
  417. logmsg("Remaining Length: %ld [%d bytes]", (long) *remaining_lengthp,
  418. *remaining_length_bytesp);
  419. return 0;
  420. }
  421. static curl_socket_t mqttit(curl_socket_t fd)
  422. {
  423. size_t buff_size = 10*1024;
  424. unsigned char *buffer = NULL;
  425. ssize_t rc;
  426. unsigned char byte;
  427. unsigned short packet_id;
  428. size_t payload_len;
  429. size_t client_id_length;
  430. unsigned int topic_len;
  431. size_t remaining_length = 0;
  432. size_t bytes = 0; /* remaining length field size in bytes */
  433. char client_id[MAX_CLIENT_ID_LENGTH];
  434. long testno;
  435. FILE *stream = NULL;
  436. static const char protocol[7] = {
  437. 0x00, 0x04, /* protocol length */
  438. 'M','Q','T','T', /* protocol name */
  439. 0x04 /* protocol level */
  440. };
  441. FILE *dump = fopen(REQUEST_DUMP, "ab");
  442. if(!dump)
  443. goto end;
  444. getconfig();
  445. testno = config.testnum;
  446. if(testno)
  447. logmsg("Found test number %ld", testno);
  448. buffer = malloc(buff_size);
  449. if(!buffer) {
  450. logmsg("Out of memory, unable to allocate buffer");
  451. goto end;
  452. }
  453. do {
  454. unsigned char usr_flag = 0x80;
  455. unsigned char passwd_flag = 0x40;
  456. unsigned char conn_flags;
  457. const size_t client_id_offset = 12;
  458. size_t start_usr;
  459. size_t start_passwd;
  460. /* get the fixed header */
  461. rc = fixedheader(fd, &byte, &remaining_length, &bytes);
  462. if(rc)
  463. break;
  464. if(remaining_length >= buff_size) {
  465. buff_size = remaining_length;
  466. buffer = realloc(buffer, buff_size);
  467. if(!buffer) {
  468. logmsg("Failed realloc of size %lu", buff_size);
  469. goto end;
  470. }
  471. }
  472. if(remaining_length) {
  473. /* reading variable header and payload into buffer */
  474. rc = sread(fd, (char *)buffer, remaining_length);
  475. if(rc > 0) {
  476. logmsg("READ %d bytes", rc);
  477. loghex(buffer, rc);
  478. }
  479. }
  480. if(byte == MQTT_MSG_CONNECT) {
  481. logprotocol(FROM_CLIENT, "CONNECT", remaining_length,
  482. dump, buffer, rc);
  483. if(memcmp(protocol, buffer, sizeof(protocol))) {
  484. logmsg("Protocol preamble mismatch");
  485. goto end;
  486. }
  487. /* ignore the connect flag byte and two keepalive bytes */
  488. payload_len = (buffer[10] << 8) | buffer[11];
  489. /* first part of the payload is the client ID */
  490. client_id_length = payload_len;
  491. /* checking if user and password flags were set */
  492. conn_flags = buffer[7];
  493. start_usr = client_id_offset + payload_len;
  494. if(usr_flag == (unsigned char)(conn_flags & usr_flag)) {
  495. logmsg("User flag is present in CONN flag");
  496. payload_len += (buffer[start_usr] << 8) | buffer[start_usr + 1];
  497. payload_len += 2; /* MSB and LSB for user length */
  498. }
  499. start_passwd = client_id_offset + payload_len;
  500. if(passwd_flag == (char)(conn_flags & passwd_flag)) {
  501. logmsg("Password flag is present in CONN flags");
  502. payload_len += (buffer[start_passwd] << 8) | buffer[start_passwd + 1];
  503. payload_len += 2; /* MSB and LSB for password length */
  504. }
  505. /* check the length of the payload */
  506. if((ssize_t)payload_len != (rc - 12)) {
  507. logmsg("Payload length mismatch, expected %x got %x",
  508. rc - 12, payload_len);
  509. goto end;
  510. }
  511. /* check the length of the client ID */
  512. else if((client_id_length + 1) > MAX_CLIENT_ID_LENGTH) {
  513. logmsg("Too large client id");
  514. goto end;
  515. }
  516. memcpy(client_id, &buffer[12], client_id_length);
  517. client_id[client_id_length] = 0;
  518. logmsg("MQTT client connect accepted: %s", client_id);
  519. /* The first packet sent from the Server to the Client MUST be a
  520. CONNACK Packet */
  521. if(connack(dump, fd)) {
  522. logmsg("failed sending CONNACK");
  523. goto end;
  524. }
  525. }
  526. else if(byte == MQTT_MSG_SUBSCRIBE) {
  527. int error;
  528. char *data;
  529. size_t datalen;
  530. logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
  531. dump, buffer, rc);
  532. logmsg("Incoming SUBSCRIBE");
  533. if(rc < 6) {
  534. logmsg("Too small SUBSCRIBE");
  535. goto end;
  536. }
  537. /* two bytes packet id */
  538. packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]);
  539. /* two bytes topic length */
  540. topic_len = (buffer[2] << 8) | buffer[3];
  541. if(topic_len != (remaining_length - 5)) {
  542. logmsg("Wrong topic length, got %d expected %d",
  543. topic_len, remaining_length - 5);
  544. goto end;
  545. }
  546. memcpy(topic, &buffer[4], topic_len);
  547. topic[topic_len] = 0;
  548. /* there's a QoS byte (two bits) after the topic */
  549. logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
  550. stream = test2fopen(testno);
  551. error = getpart(&data, &datalen, "reply", "data", stream);
  552. if(!error) {
  553. if(!config.publish_before_suback) {
  554. if(suback(dump, fd, packet_id)) {
  555. logmsg("failed sending SUBACK");
  556. goto end;
  557. }
  558. }
  559. if(publish(dump, fd, packet_id, topic, data, datalen)) {
  560. logmsg("PUBLISH failed");
  561. goto end;
  562. }
  563. if(config.publish_before_suback) {
  564. if(suback(dump, fd, packet_id)) {
  565. logmsg("failed sending SUBACK");
  566. goto end;
  567. }
  568. }
  569. }
  570. else {
  571. char *def = (char *)"this is random payload yes yes it is";
  572. publish(dump, fd, packet_id, topic, def, strlen(def));
  573. }
  574. disconnect(dump, fd);
  575. }
  576. else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) {
  577. size_t topiclen;
  578. logmsg("Incoming PUBLISH");
  579. logprotocol(FROM_CLIENT, "PUBLISH", remaining_length,
  580. dump, buffer, rc);
  581. topiclen = (buffer[1 + bytes] << 8) | buffer[2 + bytes];
  582. logmsg("Got %d bytes topic", topiclen);
  583. /* TODO: verify topiclen */
  584. #ifdef QOS
  585. /* TODO: handle packetid if there is one. Send puback if QoS > 0 */
  586. puback(dump, fd, 0);
  587. #endif
  588. /* expect a disconnect here */
  589. /* get the request */
  590. rc = sread(fd, (char *)&buffer[0], 2);
  591. logmsg("READ %d bytes [DISCONNECT]", rc);
  592. loghex(buffer, rc);
  593. logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc);
  594. goto end;
  595. }
  596. else {
  597. /* not supported (yet) */
  598. goto end;
  599. }
  600. } while(1);
  601. end:
  602. if(buffer)
  603. free(buffer);
  604. if(dump)
  605. fclose(dump);
  606. if(stream)
  607. fclose(stream);
  608. return CURL_SOCKET_BAD;
  609. }
  610. /*
  611. sockfdp is a pointer to an established stream or CURL_SOCKET_BAD
  612. if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must
  613. accept()
  614. */
  615. static bool incoming(curl_socket_t listenfd)
  616. {
  617. fd_set fds_read;
  618. fd_set fds_write;
  619. fd_set fds_err;
  620. int clients = 0; /* connected clients */
  621. if(got_exit_signal) {
  622. logmsg("signalled to die, exiting...");
  623. return FALSE;
  624. }
  625. #ifdef HAVE_GETPPID
  626. /* As a last resort, quit if socks5 process becomes orphan. */
  627. if(getppid() <= 1) {
  628. logmsg("process becomes orphan, exiting");
  629. return FALSE;
  630. }
  631. #endif
  632. do {
  633. ssize_t rc;
  634. int error = 0;
  635. curl_socket_t sockfd = listenfd;
  636. int maxfd = (int)sockfd;
  637. FD_ZERO(&fds_read);
  638. FD_ZERO(&fds_write);
  639. FD_ZERO(&fds_err);
  640. /* there's always a socket to wait for */
  641. FD_SET(sockfd, &fds_read);
  642. do {
  643. /* select() blocking behavior call on blocking descriptors please */
  644. rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL);
  645. if(got_exit_signal) {
  646. logmsg("signalled to die, exiting...");
  647. return FALSE;
  648. }
  649. } while((rc == -1) && ((error = SOCKERRNO) == EINTR));
  650. if(rc < 0) {
  651. logmsg("select() failed with error: (%d) %s",
  652. error, strerror(error));
  653. return FALSE;
  654. }
  655. if(FD_ISSET(sockfd, &fds_read)) {
  656. curl_socket_t newfd = accept(sockfd, NULL, NULL);
  657. if(CURL_SOCKET_BAD == newfd) {
  658. error = SOCKERRNO;
  659. logmsg("accept(%d, NULL, NULL) failed with error: (%d) %s",
  660. sockfd, error, strerror(error));
  661. }
  662. else {
  663. logmsg("====> Client connect, fd %d. Read config from %s",
  664. newfd, configfile);
  665. set_advisor_read_lock(SERVERLOGS_LOCK);
  666. (void)mqttit(newfd); /* until done */
  667. clear_advisor_read_lock(SERVERLOGS_LOCK);
  668. logmsg("====> Client disconnect");
  669. sclose(newfd);
  670. }
  671. }
  672. } while(clients);
  673. return TRUE;
  674. }
  675. static curl_socket_t sockdaemon(curl_socket_t sock,
  676. unsigned short *listenport)
  677. {
  678. /* passive daemon style */
  679. srvr_sockaddr_union_t listener;
  680. int flag;
  681. int rc;
  682. int totdelay = 0;
  683. int maxretr = 10;
  684. int delay = 20;
  685. int attempt = 0;
  686. int error = 0;
  687. do {
  688. attempt++;
  689. flag = 1;
  690. rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
  691. (void *)&flag, sizeof(flag));
  692. if(rc) {
  693. error = SOCKERRNO;
  694. logmsg("setsockopt(SO_REUSEADDR) failed with error: (%d) %s",
  695. error, strerror(error));
  696. if(maxretr) {
  697. rc = wait_ms(delay);
  698. if(rc) {
  699. /* should not happen */
  700. logmsg("wait_ms() failed with error: %d", rc);
  701. sclose(sock);
  702. return CURL_SOCKET_BAD;
  703. }
  704. if(got_exit_signal) {
  705. logmsg("signalled to die, exiting...");
  706. sclose(sock);
  707. return CURL_SOCKET_BAD;
  708. }
  709. totdelay += delay;
  710. delay *= 2; /* double the sleep for next attempt */
  711. }
  712. }
  713. } while(rc && maxretr--);
  714. if(rc) {
  715. logmsg("setsockopt(SO_REUSEADDR) failed %d times in %d ms. Error: (%d) %s",
  716. attempt, totdelay, error, strerror(error));
  717. logmsg("Continuing anyway...");
  718. }
  719. /* When the specified listener port is zero, it is actually a
  720. request to let the system choose a non-zero available port. */
  721. #ifdef ENABLE_IPV6
  722. if(!use_ipv6) {
  723. #endif
  724. memset(&listener.sa4, 0, sizeof(listener.sa4));
  725. listener.sa4.sin_family = AF_INET;
  726. listener.sa4.sin_addr.s_addr = INADDR_ANY;
  727. listener.sa4.sin_port = htons(*listenport);
  728. rc = bind(sock, &listener.sa, sizeof(listener.sa4));
  729. #ifdef ENABLE_IPV6
  730. }
  731. else {
  732. memset(&listener.sa6, 0, sizeof(listener.sa6));
  733. listener.sa6.sin6_family = AF_INET6;
  734. listener.sa6.sin6_addr = in6addr_any;
  735. listener.sa6.sin6_port = htons(*listenport);
  736. rc = bind(sock, &listener.sa, sizeof(listener.sa6));
  737. }
  738. #endif /* ENABLE_IPV6 */
  739. if(rc) {
  740. error = SOCKERRNO;
  741. logmsg("Error binding socket on port %hu: (%d) %s",
  742. *listenport, error, strerror(error));
  743. sclose(sock);
  744. return CURL_SOCKET_BAD;
  745. }
  746. if(!*listenport) {
  747. /* The system was supposed to choose a port number, figure out which
  748. port we actually got and update the listener port value with it. */
  749. curl_socklen_t la_size;
  750. srvr_sockaddr_union_t localaddr;
  751. #ifdef ENABLE_IPV6
  752. if(!use_ipv6)
  753. #endif
  754. la_size = sizeof(localaddr.sa4);
  755. #ifdef ENABLE_IPV6
  756. else
  757. la_size = sizeof(localaddr.sa6);
  758. #endif
  759. memset(&localaddr.sa, 0, (size_t)la_size);
  760. if(getsockname(sock, &localaddr.sa, &la_size) < 0) {
  761. error = SOCKERRNO;
  762. logmsg("getsockname() failed with error: (%d) %s",
  763. error, strerror(error));
  764. sclose(sock);
  765. return CURL_SOCKET_BAD;
  766. }
  767. switch(localaddr.sa.sa_family) {
  768. case AF_INET:
  769. *listenport = ntohs(localaddr.sa4.sin_port);
  770. break;
  771. #ifdef ENABLE_IPV6
  772. case AF_INET6:
  773. *listenport = ntohs(localaddr.sa6.sin6_port);
  774. break;
  775. #endif
  776. default:
  777. break;
  778. }
  779. if(!*listenport) {
  780. /* Real failure, listener port shall not be zero beyond this point. */
  781. logmsg("Apparently getsockname() succeeded, with listener port zero.");
  782. logmsg("A valid reason for this failure is a binary built without");
  783. logmsg("proper network library linkage. This might not be the only");
  784. logmsg("reason, but double check it before anything else.");
  785. sclose(sock);
  786. return CURL_SOCKET_BAD;
  787. }
  788. }
  789. /* start accepting connections */
  790. rc = listen(sock, 5);
  791. if(0 != rc) {
  792. error = SOCKERRNO;
  793. logmsg("listen(%d, 5) failed with error: (%d) %s",
  794. sock, error, strerror(error));
  795. sclose(sock);
  796. return CURL_SOCKET_BAD;
  797. }
  798. return sock;
  799. }
  800. int main(int argc, char *argv[])
  801. {
  802. curl_socket_t sock = CURL_SOCKET_BAD;
  803. curl_socket_t msgsock = CURL_SOCKET_BAD;
  804. int wrotepidfile = 0;
  805. int wroteportfile = 0;
  806. const char *pidname = ".mqttd.pid";
  807. const char *portname = ".mqttd.port";
  808. bool juggle_again;
  809. int error;
  810. int arg = 1;
  811. while(argc>arg) {
  812. if(!strcmp("--version", argv[arg])) {
  813. printf("mqttd IPv4%s\n",
  814. #ifdef ENABLE_IPV6
  815. "/IPv6"
  816. #else
  817. ""
  818. #endif
  819. );
  820. return 0;
  821. }
  822. else if(!strcmp("--pidfile", argv[arg])) {
  823. arg++;
  824. if(argc>arg)
  825. pidname = argv[arg++];
  826. }
  827. else if(!strcmp("--portfile", argv[arg])) {
  828. arg++;
  829. if(argc>arg)
  830. portname = argv[arg++];
  831. }
  832. else if(!strcmp("--config", argv[arg])) {
  833. arg++;
  834. if(argc>arg)
  835. configfile = argv[arg++];
  836. }
  837. else if(!strcmp("--logfile", argv[arg])) {
  838. arg++;
  839. if(argc>arg)
  840. serverlogfile = argv[arg++];
  841. }
  842. else if(!strcmp("--ipv6", argv[arg])) {
  843. #ifdef ENABLE_IPV6
  844. ipv_inuse = "IPv6";
  845. use_ipv6 = TRUE;
  846. #endif
  847. arg++;
  848. }
  849. else if(!strcmp("--ipv4", argv[arg])) {
  850. /* for completeness, we support this option as well */
  851. #ifdef ENABLE_IPV6
  852. ipv_inuse = "IPv4";
  853. use_ipv6 = FALSE;
  854. #endif
  855. arg++;
  856. }
  857. else if(!strcmp("--port", argv[arg])) {
  858. arg++;
  859. if(argc>arg) {
  860. char *endptr;
  861. unsigned long ulnum = strtoul(argv[arg], &endptr, 10);
  862. if((endptr != argv[arg] + strlen(argv[arg])) ||
  863. ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) {
  864. fprintf(stderr, "mqttd: invalid --port argument (%s)\n",
  865. argv[arg]);
  866. return 0;
  867. }
  868. port = curlx_ultous(ulnum);
  869. arg++;
  870. }
  871. }
  872. else {
  873. puts("Usage: mqttd [option]\n"
  874. " --config [file]\n"
  875. " --version\n"
  876. " --logfile [file]\n"
  877. " --pidfile [file]\n"
  878. " --portfile [file]\n"
  879. " --ipv4\n"
  880. " --ipv6\n"
  881. " --port [port]\n");
  882. return 0;
  883. }
  884. }
  885. #ifdef WIN32
  886. win32_init();
  887. atexit(win32_cleanup);
  888. setmode(fileno(stdin), O_BINARY);
  889. setmode(fileno(stdout), O_BINARY);
  890. setmode(fileno(stderr), O_BINARY);
  891. #endif
  892. install_signal_handlers(FALSE);
  893. #ifdef ENABLE_IPV6
  894. if(!use_ipv6)
  895. #endif
  896. sock = socket(AF_INET, SOCK_STREAM, 0);
  897. #ifdef ENABLE_IPV6
  898. else
  899. sock = socket(AF_INET6, SOCK_STREAM, 0);
  900. #endif
  901. if(CURL_SOCKET_BAD == sock) {
  902. error = SOCKERRNO;
  903. logmsg("Error creating socket: (%d) %s",
  904. error, strerror(error));
  905. goto mqttd_cleanup;
  906. }
  907. {
  908. /* passive daemon style */
  909. sock = sockdaemon(sock, &port);
  910. if(CURL_SOCKET_BAD == sock) {
  911. goto mqttd_cleanup;
  912. }
  913. msgsock = CURL_SOCKET_BAD; /* no stream socket yet */
  914. }
  915. logmsg("Running %s version", ipv_inuse);
  916. logmsg("Listening on port %hu", port);
  917. wrotepidfile = write_pidfile(pidname);
  918. if(!wrotepidfile) {
  919. goto mqttd_cleanup;
  920. }
  921. wroteportfile = write_portfile(portname, port);
  922. if(!wroteportfile) {
  923. goto mqttd_cleanup;
  924. }
  925. do {
  926. juggle_again = incoming(sock);
  927. } while(juggle_again);
  928. mqttd_cleanup:
  929. if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD))
  930. sclose(msgsock);
  931. if(sock != CURL_SOCKET_BAD)
  932. sclose(sock);
  933. if(wrotepidfile)
  934. unlink(pidname);
  935. if(wroteportfile)
  936. unlink(portname);
  937. restore_signal_handlers(FALSE);
  938. if(got_exit_signal) {
  939. logmsg("============> mqttd exits with signal (%d)", exit_signal);
  940. /*
  941. * To properly set the return status of the process we
  942. * must raise the same signal SIGINT or SIGTERM that we
  943. * caught and let the old handler take care of it.
  944. */
  945. raise(exit_signal);
  946. }
  947. logmsg("============> mqttd quits");
  948. return 0;
  949. }