libubus-io.c 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. /*
  2. * Copyright (C) 2011-2014 Felix Fietkau <nbd@openwrt.org>
  3. *
  4. * This program is free software; you can redistribute it and/or modify
  5. * it under the terms of the GNU Lesser General Public License version 2.1
  6. * as published by the Free Software Foundation
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU General Public License for more details.
  12. */
  13. #define _GNU_SOURCE
  14. #include <sys/types.h>
  15. #include <sys/uio.h>
  16. #include <sys/socket.h>
  17. #include <unistd.h>
  18. #include <fcntl.h>
  19. #include <poll.h>
  20. #include <libubox/usock.h>
  21. #include <libubox/blob.h>
  22. #include <libubox/blobmsg.h>
  23. #include "libubus.h"
  24. #include "libubus-internal.h"
  25. #define STATIC_IOV(_var) { .iov_base = (char *) &(_var), .iov_len = sizeof(_var) }
  26. #define UBUS_MSGBUF_REDUCTION_INTERVAL 16
  27. static const struct blob_attr_info ubus_policy[UBUS_ATTR_MAX] = {
  28. [UBUS_ATTR_STATUS] = { .type = BLOB_ATTR_INT32 },
  29. [UBUS_ATTR_OBJID] = { .type = BLOB_ATTR_INT32 },
  30. [UBUS_ATTR_OBJPATH] = { .type = BLOB_ATTR_STRING },
  31. [UBUS_ATTR_METHOD] = { .type = BLOB_ATTR_STRING },
  32. [UBUS_ATTR_ACTIVE] = { .type = BLOB_ATTR_INT8 },
  33. [UBUS_ATTR_NO_REPLY] = { .type = BLOB_ATTR_INT8 },
  34. [UBUS_ATTR_SUBSCRIBERS] = { .type = BLOB_ATTR_NESTED },
  35. };
  36. static struct blob_attr *attrbuf[UBUS_ATTR_MAX];
  37. __hidden struct blob_attr **ubus_parse_msg(struct blob_attr *msg, size_t len)
  38. {
  39. blob_parse_untrusted(msg, len, attrbuf, ubus_policy, UBUS_ATTR_MAX);
  40. return attrbuf;
  41. }
  42. static void wait_data(int fd, bool write)
  43. {
  44. struct pollfd pfd = { .fd = fd };
  45. pfd.events = write ? POLLOUT : POLLIN;
  46. poll(&pfd, 1, -1);
  47. }
  48. static int writev_retry(int fd, struct iovec *iov, int iov_len, int sock_fd)
  49. {
  50. uint8_t fd_buf[CMSG_SPACE(sizeof(int))] = { 0 };
  51. struct msghdr msghdr = { 0 };
  52. struct cmsghdr *cmsg;
  53. int len = 0;
  54. int *pfd;
  55. msghdr.msg_iov = iov,
  56. msghdr.msg_iovlen = iov_len,
  57. msghdr.msg_control = fd_buf;
  58. msghdr.msg_controllen = sizeof(fd_buf);
  59. cmsg = CMSG_FIRSTHDR(&msghdr);
  60. cmsg->cmsg_type = SCM_RIGHTS;
  61. cmsg->cmsg_level = SOL_SOCKET;
  62. cmsg->cmsg_len = CMSG_LEN(sizeof(int));
  63. pfd = (int *) CMSG_DATA(cmsg);
  64. msghdr.msg_controllen = cmsg->cmsg_len;
  65. do {
  66. ssize_t cur_len;
  67. if (sock_fd < 0) {
  68. msghdr.msg_control = NULL;
  69. msghdr.msg_controllen = 0;
  70. } else {
  71. *pfd = sock_fd;
  72. }
  73. cur_len = sendmsg(fd, &msghdr, 0);
  74. if (cur_len < 0) {
  75. switch(errno) {
  76. case EAGAIN:
  77. wait_data(fd, true);
  78. break;
  79. case EINTR:
  80. break;
  81. default:
  82. return -1;
  83. }
  84. continue;
  85. }
  86. if (len > 0)
  87. sock_fd = -1;
  88. len += cur_len;
  89. while (cur_len >= (ssize_t) iov->iov_len) {
  90. cur_len -= iov->iov_len;
  91. iov_len--;
  92. iov++;
  93. if (!iov_len)
  94. return len;
  95. }
  96. iov->iov_base += cur_len;
  97. iov->iov_len -= cur_len;
  98. msghdr.msg_iov = iov;
  99. msghdr.msg_iovlen = iov_len;
  100. } while (1);
  101. /* Should never reach here */
  102. return -1;
  103. }
  104. int __hidden ubus_send_msg(struct ubus_context *ctx, uint32_t seq,
  105. struct blob_attr *msg, int cmd, uint32_t peer, int fd)
  106. {
  107. struct ubus_msghdr hdr;
  108. struct iovec iov[2] = {
  109. STATIC_IOV(hdr)
  110. };
  111. int ret;
  112. hdr.version = 0;
  113. hdr.type = cmd;
  114. hdr.seq = cpu_to_be16(seq);
  115. hdr.peer = cpu_to_be32(peer);
  116. if (!msg) {
  117. blob_buf_init(&b, 0);
  118. msg = b.head;
  119. }
  120. iov[1].iov_base = (char *) msg;
  121. iov[1].iov_len = blob_raw_len(msg);
  122. ret = writev_retry(ctx->sock.fd, iov, ARRAY_SIZE(iov), fd);
  123. if (ret < 0)
  124. ctx->sock.eof = true;
  125. if (fd >= 0)
  126. close(fd);
  127. return ret;
  128. }
  129. static int recv_retry(struct ubus_context *ctx, struct iovec *iov, bool wait, int *recv_fd)
  130. {
  131. uint8_t fd_buf[CMSG_SPACE(sizeof(int))] = { 0 };
  132. struct msghdr msghdr = { 0 };
  133. struct cmsghdr *cmsg;
  134. int total = 0;
  135. int bytes;
  136. int *pfd;
  137. int fd;
  138. fd = ctx->sock.fd;
  139. msghdr.msg_iov = iov,
  140. msghdr.msg_iovlen = 1,
  141. msghdr.msg_control = fd_buf;
  142. msghdr.msg_controllen = sizeof(fd_buf);
  143. cmsg = CMSG_FIRSTHDR(&msghdr);
  144. cmsg->cmsg_type = SCM_RIGHTS;
  145. cmsg->cmsg_level = SOL_SOCKET;
  146. cmsg->cmsg_len = CMSG_LEN(sizeof(int));
  147. pfd = (int *) CMSG_DATA(cmsg);
  148. while (iov->iov_len > 0) {
  149. if (recv_fd) {
  150. msghdr.msg_control = fd_buf;
  151. msghdr.msg_controllen = cmsg->cmsg_len;
  152. } else {
  153. msghdr.msg_control = NULL;
  154. msghdr.msg_controllen = 0;
  155. }
  156. *pfd = -1;
  157. bytes = recvmsg(fd, &msghdr, 0);
  158. if (!bytes)
  159. return -1;
  160. if (bytes < 0) {
  161. bytes = 0;
  162. if (errno == EINTR)
  163. continue;
  164. if (errno != EAGAIN)
  165. return -1;
  166. }
  167. if (!wait && !bytes)
  168. return 0;
  169. if (recv_fd)
  170. *recv_fd = *pfd;
  171. recv_fd = NULL;
  172. wait = true;
  173. iov->iov_len -= bytes;
  174. iov->iov_base += bytes;
  175. total += bytes;
  176. if (iov->iov_len > 0)
  177. wait_data(fd, false);
  178. }
  179. return total;
  180. }
  181. bool ubus_validate_hdr(struct ubus_msghdr *hdr)
  182. {
  183. struct blob_attr *data = (struct blob_attr *) (hdr + 1);
  184. if (hdr->version != 0)
  185. return false;
  186. if (blob_raw_len(data) < sizeof(*data))
  187. return false;
  188. if (blob_pad_len(data) > UBUS_MAX_MSGLEN)
  189. return false;
  190. return true;
  191. }
  192. static bool alloc_msg_buf(struct ubus_context *ctx, int len)
  193. {
  194. void *ptr;
  195. int buf_len = ctx->msgbuf_data_len;
  196. int rem;
  197. if (!ctx->msgbuf.data)
  198. buf_len = 0;
  199. rem = (len % UBUS_MSG_CHUNK_SIZE);
  200. if (rem > 0)
  201. len += UBUS_MSG_CHUNK_SIZE - rem;
  202. if (len < buf_len &&
  203. ++ctx->msgbuf_reduction_counter > UBUS_MSGBUF_REDUCTION_INTERVAL) {
  204. ctx->msgbuf_reduction_counter = 0;
  205. buf_len = 0;
  206. }
  207. if (len <= buf_len)
  208. return true;
  209. ptr = realloc(ctx->msgbuf.data, len);
  210. if (!ptr)
  211. return false;
  212. ctx->msgbuf.data = ptr;
  213. ctx->msgbuf_data_len = len;
  214. return true;
  215. }
  216. static bool get_next_msg(struct ubus_context *ctx, int *recv_fd)
  217. {
  218. struct {
  219. struct ubus_msghdr hdr;
  220. struct blob_attr data;
  221. } hdrbuf;
  222. struct iovec iov = STATIC_IOV(hdrbuf);
  223. int len;
  224. int r;
  225. /* receive header + start attribute */
  226. r = recv_retry(ctx, &iov, false, recv_fd);
  227. if (r <= 0) {
  228. if (r < 0)
  229. ctx->sock.eof = true;
  230. return false;
  231. }
  232. hdrbuf.hdr.seq = be16_to_cpu(hdrbuf.hdr.seq);
  233. hdrbuf.hdr.peer = be32_to_cpu(hdrbuf.hdr.peer);
  234. if (!ubus_validate_hdr(&hdrbuf.hdr))
  235. return false;
  236. len = blob_raw_len(&hdrbuf.data);
  237. if (!alloc_msg_buf(ctx, len))
  238. return false;
  239. memcpy(&ctx->msgbuf.hdr, &hdrbuf.hdr, sizeof(hdrbuf.hdr));
  240. memcpy(ctx->msgbuf.data, &hdrbuf.data, sizeof(hdrbuf.data));
  241. iov.iov_base = (char *)ctx->msgbuf.data + sizeof(hdrbuf.data);
  242. iov.iov_len = blob_len(ctx->msgbuf.data);
  243. if (iov.iov_len > 0 &&
  244. recv_retry(ctx, &iov, true, NULL) <= 0)
  245. return false;
  246. return true;
  247. }
  248. void __hidden ubus_handle_data(struct uloop_fd *u, unsigned int events)
  249. {
  250. struct ubus_context *ctx = container_of(u, struct ubus_context, sock);
  251. int recv_fd = -1;
  252. while (1) {
  253. if (!ctx->stack_depth)
  254. ctx->pending_timer.cb(&ctx->pending_timer);
  255. if (!get_next_msg(ctx, &recv_fd))
  256. break;
  257. ubus_process_msg(ctx, &ctx->msgbuf, recv_fd);
  258. if (uloop_cancelling() || ctx->cancel_poll)
  259. break;
  260. }
  261. if (!ctx->stack_depth)
  262. ctx->pending_timer.cb(&ctx->pending_timer);
  263. if (u->eof)
  264. ctx->connection_lost(ctx);
  265. }
  266. void __hidden ubus_poll_data(struct ubus_context *ctx, int timeout)
  267. {
  268. struct pollfd pfd = {
  269. .fd = ctx->sock.fd,
  270. .events = POLLIN | POLLERR,
  271. };
  272. ctx->cancel_poll = false;
  273. poll(&pfd, 1, timeout ? timeout : -1);
  274. ubus_handle_data(&ctx->sock, ULOOP_READ);
  275. }
  276. static void
  277. ubus_auto_sub_lookup(struct ubus_context *ctx, struct ubus_object_data *obj,
  278. void *priv)
  279. {
  280. struct ubus_subscriber *s;
  281. list_for_each_entry(s, &ctx->auto_subscribers, list)
  282. if (s->new_obj_cb(ctx, s, obj->path))
  283. ubus_subscribe(ctx, s, obj->id);
  284. }
  285. static void
  286. ubus_refresh_auto_subscribe(struct ubus_context *ctx)
  287. {
  288. struct ubus_event_handler *ev = &ctx->auto_subscribe_event_handler;
  289. if (list_empty(&ctx->auto_subscribers))
  290. return;
  291. ubus_register_event_handler(ctx, ev, "ubus.object.add");
  292. ubus_lookup(ctx, NULL, ubus_auto_sub_lookup, NULL);
  293. }
  294. static void
  295. ubus_refresh_state(struct ubus_context *ctx)
  296. {
  297. struct ubus_object *obj, *tmp;
  298. struct ubus_object **objs;
  299. int n, i = 0;
  300. /* clear all type IDs, they need to be registered again */
  301. avl_for_each_element(&ctx->objects, obj, avl)
  302. if (obj->type)
  303. obj->type->id = 0;
  304. /* push out all objects again */
  305. objs = alloca(ctx->objects.count * sizeof(*objs));
  306. avl_remove_all_elements(&ctx->objects, obj, avl, tmp) {
  307. objs[i++] = obj;
  308. obj->id = 0;
  309. }
  310. for (n = i, i = 0; i < n; i++)
  311. ubus_add_object(ctx, objs[i]);
  312. ubus_refresh_auto_subscribe(ctx);
  313. }
  314. int ubus_reconnect(struct ubus_context *ctx, const char *path)
  315. {
  316. struct {
  317. struct ubus_msghdr hdr;
  318. struct blob_attr data;
  319. } hdr;
  320. struct blob_attr *buf;
  321. int ret = UBUS_STATUS_UNKNOWN_ERROR;
  322. if (!path)
  323. path = UBUS_UNIX_SOCKET;
  324. if (ctx->sock.fd >= 0) {
  325. if (ctx->sock.registered)
  326. uloop_fd_delete(&ctx->sock);
  327. close(ctx->sock.fd);
  328. }
  329. ctx->sock.eof = false;
  330. ctx->sock.error = false;
  331. ctx->sock.fd = usock(USOCK_UNIX, path, NULL);
  332. if (ctx->sock.fd < 0)
  333. return UBUS_STATUS_CONNECTION_FAILED;
  334. if (read(ctx->sock.fd, &hdr, sizeof(hdr)) != sizeof(hdr))
  335. goto out_close;
  336. if (!ubus_validate_hdr(&hdr.hdr))
  337. goto out_close;
  338. if (hdr.hdr.type != UBUS_MSG_HELLO)
  339. goto out_close;
  340. buf = calloc(1, blob_raw_len(&hdr.data));
  341. if (!buf)
  342. goto out_close;
  343. memcpy(buf, &hdr.data, sizeof(hdr.data));
  344. if (read(ctx->sock.fd, blob_data(buf), blob_len(buf)) != (ssize_t) blob_len(buf))
  345. goto out_free;
  346. ctx->local_id = hdr.hdr.peer;
  347. if (!ctx->local_id)
  348. goto out_free;
  349. ret = UBUS_STATUS_OK;
  350. fcntl(ctx->sock.fd, F_SETFL, fcntl(ctx->sock.fd, F_GETFL) | O_NONBLOCK | O_CLOEXEC);
  351. ubus_refresh_state(ctx);
  352. out_free:
  353. free(buf);
  354. out_close:
  355. if (ret)
  356. close(ctx->sock.fd);
  357. return ret;
  358. }