1
0

Pipe.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. #include "util/events/libuv/UvWrapper.h"
  16. #include "memory/Allocator.h"
  17. #include "interface/Interface.h"
  18. #include "util/events/Pipe.h"
  19. #include "util/events/libuv/EventBase_pvt.h"
  20. #include "util/log/Log.h"
  21. #include "util/Identity.h"
  22. #include "util/CString.h"
  23. #include "wire/Message.h"
  24. #include "wire/Error.h"
  25. #include <inttypes.h>
  26. #include <stdio.h>
  27. struct Pipe_WriteRequest_pvt;
  28. struct Pipe_pvt
  29. {
  30. struct Pipe pub;
  31. uv_pipe_t server;
  32. uv_pipe_t peer;
  33. /** Job to close the handles when the allocator is freed */
  34. struct Allocator_OnFreeJob* closeHandlesOnFree;
  35. /** Job which blocks the freeing until the callback completes */
  36. struct Allocator_OnFreeJob* blockFreeInsideCallback;
  37. /**
  38. * If the output pipe is same as the input, this points to peer.
  39. * Otherwise it points to server which is reused.
  40. */
  41. uv_pipe_t* out;
  42. /** 1 when the pipe becomes active. */
  43. int isActive;
  44. int queueLen;
  45. /** Used by blockFreeInsideCallback */
  46. int isInCallback;
  47. /** only non-null before the connection is setup. */
  48. struct Pipe_WriteRequest_pvt* bufferedRequest;
  49. struct Allocator* alloc;
  50. Identity
  51. };
  52. struct Pipe_WriteRequest_pvt {
  53. uv_write_t uvReq;
  54. struct Pipe_pvt* pipe;
  55. struct Message* msg;
  56. struct Allocator* alloc;
  57. Identity
  58. };
  59. static void sendMessageCallback(uv_write_t* uvReq, int error)
  60. {
  61. struct Pipe_WriteRequest_pvt* req = Identity_check((struct Pipe_WriteRequest_pvt*) uvReq);
  62. if (error) {
  63. Log_info(req->pipe->pub.logger, "Failed to write to pipe [%s] [%s]",
  64. req->pipe->pub.fullName, uv_strerror(error) );
  65. }
  66. req->pipe->queueLen -= req->msg->length;
  67. Assert_ifParanoid(req->pipe->queueLen >= 0);
  68. Allocator_free(req->alloc);
  69. }
  70. static uint8_t sendMessage2(struct Pipe_WriteRequest_pvt* req)
  71. {
  72. struct Pipe_pvt* pipe = req->pipe;
  73. struct Message* m = req->msg;
  74. uv_buf_t buffers[] = {
  75. { .base = (char*)m->bytes, .len = m->length }
  76. };
  77. int ret = uv_write(&req->uvReq, (uv_stream_t*) pipe->out, buffers, 1, sendMessageCallback);
  78. if (ret) {
  79. Log_info(pipe->pub.logger, "Failed writing to pipe [%s] [%s]",
  80. pipe->pub.fullName, uv_strerror(ret) );
  81. Allocator_free(req->alloc);
  82. return Error_UNDELIVERABLE;
  83. }
  84. pipe->queueLen += m->length;
  85. return Error_NONE;
  86. }
  87. static uint8_t sendMessage(struct Message* m, struct Interface* iface)
  88. {
  89. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) iface);
  90. if (pipe->queueLen > 50000) {
  91. return Error_LINK_LIMIT_EXCEEDED;
  92. }
  93. // This allocator will hold the message allocator in existance after it is freed.
  94. struct Allocator* reqAlloc = Allocator_child(pipe->alloc);
  95. if (m->alloc) {
  96. Allocator_adopt(reqAlloc, m->alloc);
  97. } else {
  98. m = Message_clone(m, reqAlloc);
  99. }
  100. struct Pipe_WriteRequest_pvt* req =
  101. Allocator_clone(reqAlloc, (&(struct Pipe_WriteRequest_pvt) {
  102. .pipe = pipe,
  103. .msg = m,
  104. .alloc = reqAlloc
  105. }));
  106. Identity_set(req);
  107. if (pipe->isActive) {
  108. sendMessage2(req);
  109. } else {
  110. if (!pipe->bufferedRequest) {
  111. Log_debug(pipe->pub.logger, "Buffering a message");
  112. pipe->bufferedRequest = req;
  113. } else {
  114. Log_debug(pipe->pub.logger, "Appending to the buffered message");
  115. uint8_t* buff =
  116. Allocator_malloc(reqAlloc, m->length + pipe->bufferedRequest->msg->length);
  117. Bits_memcpy(buff,
  118. pipe->bufferedRequest->msg->bytes,
  119. pipe->bufferedRequest->msg->length);
  120. Bits_memcpy(buff+pipe->bufferedRequest->msg->length, m->bytes, m->length);
  121. m->length += pipe->bufferedRequest->msg->length;
  122. m->capacity = m->length;
  123. m->bytes = buff;
  124. Allocator_free(pipe->bufferedRequest->alloc);
  125. pipe->bufferedRequest = req;
  126. }
  127. }
  128. return Error_NONE;
  129. }
  130. /** Asynchronous allocator freeing. */
  131. static void onClose(uv_handle_t* handle)
  132. {
  133. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)handle->data);
  134. handle->data = NULL;
  135. if (pipe->closeHandlesOnFree && !pipe->server.data && !pipe->peer.data) {
  136. Allocator_onFreeComplete((struct Allocator_OnFreeJob*) pipe->closeHandlesOnFree);
  137. }
  138. }
  139. #if Pipe_PADDING_AMOUNT < 8
  140. #error
  141. #endif
  142. #define ALLOC(buff) (((struct Allocator**) &(buff[-(8 + (((uintptr_t)buff) % 8))]))[0])
  143. static void incoming(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
  144. {
  145. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) stream->data);
  146. // Grab out the allocator which was placed there by allocate()
  147. struct Allocator* alloc = buf->base ? ALLOC(buf->base) : NULL;
  148. pipe->isInCallback = 1;
  149. Assert_true(!alloc || alloc->fileName == pipe->alloc->fileName);
  150. if (nread < 0) {
  151. if (pipe->pub.onClose) {
  152. pipe->pub.onClose(&pipe->pub, 0);
  153. }
  154. uv_close((uv_handle_t*) stream, onClose);
  155. } else if (nread == 0) {
  156. // This is common.
  157. //Log_debug(pipe->pub.logger, "Pipe 0 length read [%s]", pipe->pub.fullName);
  158. } else if (pipe->pub.iface.receiveMessage) {
  159. Assert_true(alloc);
  160. struct Message* m = Allocator_malloc(alloc, sizeof(struct Message));
  161. m->length = nread;
  162. m->padding = Pipe_PADDING_AMOUNT;
  163. m->capacity = buf->len;
  164. m->bytes = (uint8_t*)buf->base;
  165. m->alloc = alloc;
  166. Interface_receiveMessage(&pipe->pub.iface, m);
  167. }
  168. if (alloc) {
  169. Allocator_free(alloc);
  170. }
  171. pipe->isInCallback = 0;
  172. if (pipe->blockFreeInsideCallback) {
  173. Allocator_onFreeComplete((struct Allocator_OnFreeJob*) pipe->blockFreeInsideCallback);
  174. }
  175. }
  176. static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
  177. {
  178. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) handle->data);
  179. size = Pipe_BUFFER_CAP;
  180. size_t fullSize = size + Pipe_PADDING_AMOUNT;
  181. struct Allocator* child = Allocator_child(pipe->alloc);
  182. char* buff = Allocator_malloc(child, fullSize);
  183. buff += Pipe_PADDING_AMOUNT;
  184. ALLOC(buff) = child;
  185. buf->base = buff;
  186. buf->len = size;
  187. }
  188. static void connected(uv_connect_t* req, int status)
  189. {
  190. uv_stream_t* link = req->handle;
  191. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) link->data);
  192. Log_debug(pipe->pub.logger, "Pipe [%s] established connection", pipe->pub.fullName);
  193. int ret;
  194. if (status) {
  195. Log_info(pipe->pub.logger, "uv_pipe_connect() failed for pipe [%s] [%s]",
  196. pipe->pub.fullName, uv_strerror(status) );
  197. uv_close((uv_handle_t*) &pipe->peer, onClose);
  198. } else if ((ret = uv_read_start((uv_stream_t*)&pipe->peer, allocate, incoming))) {
  199. Log_info(pipe->pub.logger, "uv_read_start() failed for pipe [%s] [%s]",
  200. pipe->pub.fullName, uv_strerror(ret));
  201. uv_close((uv_handle_t*) &pipe->peer, onClose);
  202. } else {
  203. pipe->isActive = 1;
  204. if (pipe->pub.onConnection) {
  205. pipe->pub.onConnection(&pipe->pub, status);
  206. }
  207. // If there's anything buffered then send it.
  208. if (pipe->bufferedRequest) {
  209. Log_debug(pipe->pub.logger, "Sending buffered message");
  210. sendMessage2(pipe->bufferedRequest);
  211. pipe->bufferedRequest = NULL;
  212. }
  213. }
  214. }
  215. static void listenCallback(uv_stream_t* server, int status)
  216. {
  217. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) server->data);
  218. if (pipe->isActive) {
  219. // first connection wins.
  220. return;
  221. }
  222. if (status == -1) {
  223. Log_info(pipe->pub.logger, "failed to accept pipe connection [%s] [%s]",
  224. pipe->pub.fullName, uv_strerror(status) );
  225. if (pipe->pub.onConnection) {
  226. pipe->pub.onConnection(&pipe->pub, status);
  227. }
  228. return;
  229. }
  230. int ret = uv_accept(server, (uv_stream_t*) &pipe->peer);
  231. if (ret) {
  232. Log_warn(pipe->pub.logger, "uv_accept() failed: pipe [%s] [%s]",
  233. pipe->pub.fullName, uv_strerror(ret) );
  234. if (pipe->pub.onConnection) {
  235. pipe->pub.onConnection(&pipe->pub, -1);
  236. }
  237. uv_close((uv_handle_t*) &pipe->peer, onClose);
  238. } else {
  239. uv_connect_t req = { .handle = (uv_stream_t*) &pipe->peer };
  240. connected(&req, 0);
  241. }
  242. uv_close((uv_handle_t*) &pipe->server, onClose);
  243. #ifndef win32
  244. // get rid of the pipe after it has been connected.
  245. uv_fs_t req;
  246. uv_fs_unlink(pipe->peer.loop, &req, pipe->pub.fullName, NULL);
  247. #endif
  248. }
  249. static int blockFreeInsideCallback(struct Allocator_OnFreeJob* job)
  250. {
  251. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)job->userData);
  252. if (!pipe->isInCallback) {
  253. return 0;
  254. }
  255. pipe->blockFreeInsideCallback = job;
  256. return Allocator_ONFREE_ASYNC;
  257. }
  258. static int closeHandlesOnFree(struct Allocator_OnFreeJob* job)
  259. {
  260. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)job->userData);
  261. pipe->closeHandlesOnFree = job;
  262. int skip = 2;
  263. if (pipe->server.data) {
  264. uv_close((uv_handle_t*) &pipe->server, onClose);
  265. skip--;
  266. }
  267. if (pipe->peer.data) {
  268. uv_close((uv_handle_t*) &pipe->peer, onClose);
  269. skip--;
  270. }
  271. if (skip == 2) {
  272. return 0;
  273. }
  274. return Allocator_ONFREE_ASYNC;
  275. }
  276. static struct Pipe_pvt* newPipe(struct EventBase* eb,
  277. const char* name,
  278. struct Except* eh,
  279. struct Allocator* userAlloc)
  280. {
  281. struct EventBase_pvt* ctx = EventBase_privatize(eb);
  282. struct Allocator* alloc = Allocator_child(userAlloc);
  283. #ifdef win32
  284. #define PREFIX "\\\\.\\pipe\\cjdns_pipe_"
  285. #elif defined(android)
  286. #define PREFIX "/data/local/tmp/cjdns_pipe_"
  287. #else
  288. #define PREFIX "/tmp/cjdns_pipe_"
  289. #endif
  290. char* cname = Allocator_malloc(alloc, CString_strlen(PREFIX)+CString_strlen(name)+1);
  291. Bits_memcpy(cname, PREFIX, CString_strlen(PREFIX));
  292. Bits_memcpy(cname+CString_strlen(PREFIX), name, CString_strlen(name)+1);
  293. struct Pipe_pvt* out = Allocator_clone(alloc, (&(struct Pipe_pvt) {
  294. .pub = {
  295. .iface = {
  296. .sendMessage = sendMessage,
  297. .allocator = userAlloc
  298. },
  299. .fullName = cname,
  300. .name = &cname[sizeof(PREFIX) - 1],
  301. .base = eb
  302. },
  303. .alloc = alloc
  304. }));
  305. int ret;
  306. ret = uv_pipe_init(ctx->loop, &out->peer, 0);
  307. if (ret) {
  308. Except_throw(eh, "uv_pipe_init() failed [%s]", uv_strerror(ret));
  309. }
  310. ret = uv_pipe_init(ctx->loop, &out->server, 0);
  311. if (ret) {
  312. Except_throw(eh, "uv_pipe_init() failed [%s]", uv_strerror(ret));
  313. }
  314. #ifdef win32
  315. out->pub.fd = &out->peer.handle;
  316. #else
  317. out->pub.fd = &out->peer.io_watcher.fd;
  318. #endif
  319. Allocator_onFree(alloc, closeHandlesOnFree, out);
  320. Allocator_onFree(alloc, blockFreeInsideCallback, out);
  321. out->peer.data = out;
  322. out->server.data = out;
  323. out->out = &out->peer;
  324. Identity_set(out);
  325. return out;
  326. }
  327. struct Pipe* Pipe_forFiles(int inFd,
  328. int outFd,
  329. struct EventBase* eb,
  330. struct Except* eh,
  331. struct Allocator* userAlloc)
  332. {
  333. char buff[32] = {0};
  334. snprintf(buff, 31, "forFiles(%d,%d)", inFd, outFd);
  335. struct Pipe_pvt* out = newPipe(eb, buff, eh, userAlloc);
  336. int ret = uv_pipe_open(&out->peer, inFd);
  337. if (ret) {
  338. Except_throw(eh, "uv_pipe_open(inFd) failed [%s]",
  339. uv_strerror(ret));
  340. }
  341. if (inFd != outFd) {
  342. out->out = &out->server;
  343. ret = uv_pipe_open(out->out, outFd);
  344. if (ret) {
  345. Except_throw(eh, "uv_pipe_open(outFd) failed [%s]",
  346. uv_strerror(ret));
  347. }
  348. }
  349. uv_connect_t req = { .handle = (uv_stream_t*) &out->peer };
  350. connected(&req, 0);
  351. return &out->pub;
  352. }
  353. struct Pipe* Pipe_named(const char* name,
  354. struct EventBase* eb,
  355. struct Except* eh,
  356. struct Allocator* userAlloc)
  357. {
  358. struct Pipe_pvt* out = newPipe(eb, name, eh, userAlloc);
  359. int ret;
  360. // Attempt to create pipe.
  361. ret = uv_pipe_bind(&out->server, out->pub.fullName);
  362. if (!ret) {
  363. ret = uv_listen((uv_stream_t*) &out->server, 1, listenCallback);
  364. if (ret) {
  365. Except_throw(eh, "uv_listen() failed [%s] for pipe [%s]",
  366. uv_strerror(ret), out->pub.fullName);
  367. }
  368. return &out->pub;
  369. }
  370. if (ret == UV_EADDRINUSE) {
  371. // Pipe exists, connect to it.
  372. uv_connect_t* req = Allocator_malloc(out->alloc, sizeof(uv_connect_t));
  373. req->data = out;
  374. uv_pipe_connect(req, &out->peer, out->pub.fullName, connected);
  375. return &out->pub;
  376. }
  377. Except_throw(eh, "uv_pipe_bind() failed [%s] for pipe [%s]",
  378. uv_strerror(ret), out->pub.fullName);
  379. return &out->pub;
  380. }