Pipe.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "util/events/libuv/UvWrapper.h"
  16. #include "exception/Er.h"
  17. #include "memory/Allocator.h"
  18. #include "util/events/Pipe.h"
  19. #include "util/events/libuv/Pipe_pvt.h"
  20. #include "util/events/libuv/EventBase_pvt.h"
  21. #include "util/log/Log.h"
  22. #include "util/Identity.h"
  23. #include "util/CString.h"
  24. #include "wire/Message.h"
  25. #include "wire/Error.h"
  26. #include "benc/String.h"
  27. #include <inttypes.h>
  28. #include <libgen.h>
  29. #include <stdio.h>
  30. #include <sys/stat.h>
  31. #include <string.h>
  32. struct Pipe_WriteRequest_pvt;
  33. struct Pipe_pvt
  34. {
  35. struct Pipe pub;
  36. uv_pipe_t peer;
  37. /** Job to close the handles when the allocator is freed */
  38. struct Allocator_OnFreeJob* closeHandlesOnFree;
  39. /** Job which blocks the freeing until the callback completes */
  40. struct Allocator_OnFreeJob* blockFreeInsideCallback;
  41. // true if we can pass file descriptors through this pipe
  42. bool ipc;
  43. /** 1 when the pipe becomes active. */
  44. int isActive;
  45. int queueLen;
  46. /** Used by blockFreeInsideCallback */
  47. int isInCallback;
  48. /** only non-null before the connection is setup. */
  49. struct Pipe_WriteRequest_pvt* bufferedRequest;
  50. struct Allocator* alloc;
  51. struct Log* log;
  52. Identity
  53. };
  54. struct Pipe_WriteRequest_pvt {
  55. uv_write_t uvReq;
  56. struct Pipe_pvt* pipe;
  57. struct Message* msg;
  58. struct Allocator* alloc;
  59. Identity
  60. };
  61. static void sendMessageCallback(uv_write_t* uvReq, int error)
  62. {
  63. struct Pipe_WriteRequest_pvt* req = Identity_check((struct Pipe_WriteRequest_pvt*) uvReq);
  64. if (error) {
  65. Log_info(req->pipe->log, "Failed to write to pipe [%s] [%s]",
  66. req->pipe->pub.fullName, uv_strerror(error) );
  67. }
  68. req->pipe->queueLen -= req->msg->length;
  69. Assert_ifParanoid(req->pipe->queueLen >= 0);
  70. Allocator_free(req->alloc);
  71. }
  72. static void sendMessage2(struct Pipe_WriteRequest_pvt* req)
  73. {
  74. struct Pipe_pvt* pipe = req->pipe;
  75. struct Message* m = req->msg;
  76. uv_buf_t buffers[] = {
  77. { .base = (char*)m->bytes, .len = m->length }
  78. };
  79. int ret = -1;
  80. if (pipe->ipc && m->associatedFd && !Defined(win32)) {
  81. int fd = Message_getAssociatedFd(m);
  82. uv_stream_t* fake_handle = Allocator_calloc(req->alloc, sizeof(uv_stream_t), 1);
  83. #ifndef win32
  84. fake_handle->io_watcher.fd = fd;
  85. #endif
  86. fake_handle->type = UV_TCP;
  87. ret = uv_write2(
  88. &req->uvReq,
  89. (uv_stream_t*) &pipe->peer,
  90. buffers,
  91. 1,
  92. fake_handle,
  93. sendMessageCallback);
  94. Log_debug(pipe->log, "Sending message with fd [%d]", fd);
  95. } else {
  96. ret = uv_write(&req->uvReq, (uv_stream_t*) &pipe->peer, buffers, 1, sendMessageCallback);
  97. }
  98. if (ret) {
  99. Log_info(pipe->log, "Failed writing to pipe [%s] [%s]",
  100. pipe->pub.fullName, uv_strerror(ret) );
  101. Allocator_free(req->alloc);
  102. return;
  103. }
  104. pipe->queueLen += m->length;
  105. return;
  106. }
  107. static Iface_DEFUN sendMessage(struct Message* m, struct Iface* iface)
  108. {
  109. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) iface);
  110. if (pipe->queueLen > 50000) {
  111. return Error(OVERFLOW);
  112. }
  113. // This allocator will hold the message allocator in existance after it is freed.
  114. struct Allocator* reqAlloc = Allocator_child(pipe->alloc);
  115. if (m->alloc) {
  116. Allocator_adopt(reqAlloc, m->alloc);
  117. } else {
  118. m = Message_clone(m, reqAlloc);
  119. }
  120. struct Pipe_WriteRequest_pvt* req =
  121. Allocator_clone(reqAlloc, (&(struct Pipe_WriteRequest_pvt) {
  122. .pipe = pipe,
  123. .msg = m,
  124. .alloc = reqAlloc
  125. }));
  126. Identity_set(req);
  127. if (pipe->isActive) {
  128. sendMessage2(req);
  129. } else {
  130. if (!pipe->bufferedRequest) {
  131. Log_debug(pipe->log, "Buffering a message");
  132. pipe->bufferedRequest = req;
  133. } else {
  134. Log_debug(pipe->log, "Appending to the buffered message");
  135. uint8_t* buff =
  136. Allocator_malloc(reqAlloc, m->length + pipe->bufferedRequest->msg->length);
  137. Bits_memcpy(buff,
  138. pipe->bufferedRequest->msg->bytes,
  139. pipe->bufferedRequest->msg->length);
  140. Bits_memcpy(buff+pipe->bufferedRequest->msg->length, m->bytes, m->length);
  141. m->length += pipe->bufferedRequest->msg->length;
  142. m->capacity = m->length;
  143. m->bytes = buff;
  144. Allocator_free(pipe->bufferedRequest->alloc);
  145. pipe->bufferedRequest = req;
  146. }
  147. }
  148. return Error(NONE);
  149. }
  150. /** Asynchronous allocator freeing. */
  151. static void onClose(uv_handle_t* handle)
  152. {
  153. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)handle->data);
  154. handle->data = NULL;
  155. if (pipe->closeHandlesOnFree && !pipe->peer.data) {
  156. Allocator_onFreeComplete((struct Allocator_OnFreeJob*) pipe->closeHandlesOnFree);
  157. }
  158. }
  159. #if Pipe_PADDING_AMOUNT < 8
  160. #error
  161. #endif
  162. #define ALLOC(buff) (((struct Allocator**) &(buff[-(8 + (((uintptr_t)buff) % 8))]))[0])
  163. static void incoming(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
  164. {
  165. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) stream->data);
  166. // Grab out the allocator which was placed there by allocate()
  167. struct Allocator* alloc = buf->base ? ALLOC(buf->base) : NULL;
  168. pipe->isInCallback = 1;
  169. Assert_true(!alloc || alloc->fileName == pipe->alloc->fileName);
  170. if (nread < 0) {
  171. if (pipe->pub.onClose) {
  172. pipe->pub.onClose(&pipe->pub, 0);
  173. }
  174. } else if (nread == 0) {
  175. // This is common.
  176. //Log_debug(pipe->log, "Pipe 0 length read [%s]", pipe->pub.fullName);
  177. } else {
  178. Assert_true(alloc);
  179. struct Message* m = Allocator_calloc(alloc, sizeof(struct Message), 1);
  180. m->length = nread;
  181. m->padding = Pipe_PADDING_AMOUNT;
  182. m->capacity = buf->len;
  183. m->bytes = (uint8_t*)buf->base;
  184. m->alloc = alloc;
  185. if (pipe->ipc) {
  186. #ifndef win32
  187. Message_setAssociatedFd(m, stream->accepted_fd);
  188. #endif
  189. }
  190. Iface_send(&pipe->pub.iface, m);
  191. }
  192. if (alloc) {
  193. Allocator_free(alloc);
  194. }
  195. pipe->isInCallback = 0;
  196. if (pipe->blockFreeInsideCallback) {
  197. Allocator_onFreeComplete((struct Allocator_OnFreeJob*) pipe->blockFreeInsideCallback);
  198. }
  199. }
  200. static void incoming2(uv_pipe_t* stream, ssize_t nread, const uv_buf_t* buf, uv_handle_type _)
  201. {
  202. incoming((uv_stream_t*)stream, nread, buf);
  203. }
  204. static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
  205. {
  206. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) handle->data);
  207. size = Pipe_BUFFER_CAP;
  208. size_t fullSize = size + Pipe_PADDING_AMOUNT;
  209. struct Allocator* child = Allocator_child(pipe->alloc);
  210. char* buff = Allocator_malloc(child, fullSize);
  211. buff += Pipe_PADDING_AMOUNT;
  212. ALLOC(buff) = child;
  213. buf->base = buff;
  214. buf->len = size;
  215. }
  216. static int startPipe(struct Pipe_pvt* pipe)
  217. {
  218. if (pipe->ipc) {
  219. return uv_read2_start((uv_stream_t*)&pipe->peer, allocate, incoming2);
  220. } else {
  221. return uv_read_start((uv_stream_t*)&pipe->peer, allocate, incoming);
  222. }
  223. }
  224. static void connected(uv_connect_t* req, int status)
  225. {
  226. uv_stream_t* link = req->handle;
  227. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) link->data);
  228. Log_debug(pipe->log, "Pipe [%s] established connection", pipe->pub.fullName);
  229. int ret;
  230. if (status) {
  231. Log_info(pipe->log, "uv_pipe_connect() failed for pipe [%s] [%s]",
  232. pipe->pub.fullName, uv_strerror(status) );
  233. uv_close((uv_handle_t*) &pipe->peer, onClose);
  234. } else if ((ret = startPipe(pipe))) {
  235. Log_info(pipe->log, "uv_read_start() failed for pipe [%s] [%s]",
  236. pipe->pub.fullName, uv_strerror(ret));
  237. uv_close((uv_handle_t*) &pipe->peer, onClose);
  238. } else {
  239. pipe->isActive = 1;
  240. if (pipe->pub.onConnection) {
  241. pipe->pub.onConnection(&pipe->pub, status);
  242. }
  243. // If there's anything buffered then send it.
  244. if (pipe->bufferedRequest) {
  245. Log_debug(pipe->log, "Sending buffered message");
  246. sendMessage2(pipe->bufferedRequest);
  247. pipe->bufferedRequest = NULL;
  248. }
  249. }
  250. }
  251. static int blockFreeInsideCallback(struct Allocator_OnFreeJob* job)
  252. {
  253. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)job->userData);
  254. if (!pipe->isInCallback) {
  255. return 0;
  256. }
  257. pipe->blockFreeInsideCallback = job;
  258. return Allocator_ONFREE_ASYNC;
  259. }
  260. static int closeHandlesOnFree(struct Allocator_OnFreeJob* job)
  261. {
  262. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)job->userData);
  263. pipe->closeHandlesOnFree = job;
  264. if (pipe->peer.data) {
  265. uv_close((uv_handle_t*) &pipe->peer, onClose);
  266. return Allocator_ONFREE_ASYNC;
  267. }
  268. return 0;
  269. }
  270. static Er_DEFUN(struct Pipe_pvt* newPipeAny(struct EventBase* eb,
  271. const char* fullPath,
  272. bool ipc,
  273. struct Log* log,
  274. struct Allocator* userAlloc))
  275. {
  276. struct EventBase_pvt* ctx = EventBase_privatize(eb);
  277. struct Allocator* alloc = Allocator_child(userAlloc);
  278. struct Pipe_pvt* out = Allocator_clone(alloc, (&(struct Pipe_pvt) {
  279. .pub = {
  280. .iface = {
  281. .send = sendMessage
  282. },
  283. .fullName = (fullPath) ? CString_strdup(fullPath, alloc) : NULL,
  284. },
  285. .alloc = alloc,
  286. .log = log,
  287. .ipc = ipc,
  288. }));
  289. int ret = uv_pipe_init(ctx->loop, &out->peer, ipc);
  290. if (ret) {
  291. Er_raise(alloc, "uv_pipe_init() failed [%s]", uv_strerror(ret));
  292. }
  293. Allocator_onFree(alloc, closeHandlesOnFree, out);
  294. Allocator_onFree(alloc, blockFreeInsideCallback, out);
  295. out->peer.data = out;
  296. Identity_set(out);
  297. Er_ret(out);
  298. }
  299. Er_DEFUN(struct Pipe* Pipe_forFd(int fd,
  300. bool ipc,
  301. struct EventBase* eb,
  302. struct Log* log,
  303. struct Allocator* userAlloc))
  304. {
  305. char buff[32] = {0};
  306. snprintf(buff, 31, "forFd(%d)", fd);
  307. struct Pipe_pvt* out = Er(newPipeAny(eb, buff, ipc, log, userAlloc));
  308. int ret = uv_pipe_open(&out->peer, fd);
  309. if (ret) {
  310. Er_raise(out->alloc, "uv_pipe_open(inFd) failed [%s]", uv_strerror(ret));
  311. }
  312. uv_connect_t req = { .handle = (uv_stream_t*) &out->peer };
  313. connected(&req, 0);
  314. Er_ret(&out->pub);
  315. }
  316. Er_DEFUN(struct Pipe* Pipe_named(const char* fullPath,
  317. struct EventBase* eb,
  318. struct Log* log,
  319. struct Allocator* userAlloc))
  320. {
  321. struct Pipe_pvt* out = Er(newPipeAny(eb, fullPath, true, log, userAlloc));
  322. uv_connect_t* req = Allocator_malloc(out->alloc, sizeof(uv_connect_t));
  323. req->data = out;
  324. uv_pipe_connect(req, &out->peer, out->pub.fullName, connected);
  325. int err = 0;
  326. // We get the error back synchronously but windows doesn't support that
  327. // TODO(cjd): Find a better way
  328. #ifndef win32
  329. err = (&out->peer)->delayed_error;
  330. #endif
  331. if (err != 0) {
  332. Er_raise(out->alloc, "uv_pipe_connect() failed [%s] for pipe [%s]",
  333. uv_strerror(err), out->pub.fullName);
  334. }
  335. Er_ret(&out->pub);
  336. }
  337. Er_DEFUN(struct Pipe* Pipe_serverAccept(uv_pipe_t* server,
  338. const char* pipeName,
  339. struct EventBase* eb,
  340. struct Log* log,
  341. struct Allocator* userAlloc))
  342. {
  343. struct Pipe_pvt* out = Er(newPipeAny(eb, NULL, true, log, userAlloc));
  344. int ret = uv_accept((uv_stream_t*) server, (uv_stream_t*) &out->peer);
  345. if (ret) {
  346. uv_close((uv_handle_t*) &out->peer, onClose);
  347. Er_raise(out->alloc, "uv_accept() failed: pipe [%s] [%s]",
  348. pipeName, uv_strerror(ret));
  349. } else {
  350. uv_connect_t req = { .handle = (uv_stream_t*) &out->peer };
  351. connected(&req, 0);
  352. }
  353. Er_ret(&out->pub);
  354. }
  355. Er_DEFUN(bool Pipe_exists(const char* path, struct Allocator* errAlloc))
  356. {
  357. struct stat st;
  358. if (stat(path, &st)) {
  359. if (errno == ENOENT) { Er_ret(false); }
  360. Er_raise(errAlloc, "Failed stat(%s) [%s]", path, strerror(errno));
  361. } else {
  362. int flag = 0;
  363. #ifdef win32
  364. flag = S_IFIFO;
  365. #elif defined(S_IFSOCK)
  366. flag = S_IFSOCK;
  367. #else
  368. #error "missing S_IFSOCK"
  369. #endif
  370. Er_ret(((int)(st.st_mode & S_IFMT)) == flag);
  371. }
  372. }