Pipe.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "util/events/libuv/UvWrapper.h"
  16. #include "exception/Er.h"
  17. #include "memory/Allocator.h"
  18. #include "util/events/Pipe.h"
  19. #include "util/events/libuv/Pipe_pvt.h"
  20. #include "util/events/libuv/EventBase_pvt.h"
  21. #include "util/log/Log.h"
  22. #include "util/Identity.h"
  23. #include "util/CString.h"
  24. #include "wire/Message.h"
  25. #include "wire/Error.h"
  26. #include "benc/String.h"
  27. #include <inttypes.h>
  28. #include <libgen.h>
  29. #include <stdio.h>
  30. #include <sys/stat.h>
  31. #include <string.h>
  32. struct Pipe_WriteRequest_pvt;
  33. struct Pipe_pvt
  34. {
  35. struct Pipe pub;
  36. uv_pipe_t peer;
  37. // true if we can pass file descriptors through this pipe
  38. bool ipc;
  39. /** 1 when the pipe becomes active. */
  40. int isActive;
  41. int queueLen;
  42. /** Used by blockFreeInsideCallback */
  43. int isInCallback;
  44. /** only non-null before the connection is setup. */
  45. struct Pipe_WriteRequest_pvt* bufferedRequest;
  46. struct Allocator* alloc;
  47. struct Allocator* userAlloc;
  48. struct Log* log;
  49. struct EventBase* base;
  50. Identity
  51. };
  52. struct Pipe_WriteRequest_pvt {
  53. uv_write_t uvReq;
  54. struct Pipe_pvt* pipe;
  55. struct Message* msg;
  56. struct Allocator* alloc;
  57. Identity
  58. };
  59. static void sendMessageCallback(uv_write_t* uvReq, int error)
  60. {
  61. struct Pipe_WriteRequest_pvt* req = Identity_check((struct Pipe_WriteRequest_pvt*) uvReq);
  62. if (error) {
  63. Log_info(req->pipe->log, "Failed to write to pipe [%s] [%s]",
  64. req->pipe->pub.fullName, uv_strerror(error) );
  65. }
  66. req->pipe->queueLen -= Message_getLength(req->msg);
  67. Assert_ifParanoid(req->pipe->queueLen >= 0);
  68. Allocator_free(req->alloc);
  69. }
  70. static void sendMessage2(struct Pipe_WriteRequest_pvt* req)
  71. {
  72. struct Pipe_pvt* pipe = req->pipe;
  73. struct Message* m = req->msg;
  74. uv_buf_t buffers[] = {
  75. { .base = (char*)m->msgbytes, .len = Message_getLength(m) }
  76. };
  77. int ret = -1;
  78. int fd = Message_getAssociatedFd(m);
  79. if (pipe->ipc && fd > -1 && !Defined(win32)) {
  80. uv_stream_t* fake_handle = Allocator_calloc(req->alloc, sizeof(uv_stream_t), 1);
  81. #ifndef win32
  82. fake_handle->io_watcher.fd = fd;
  83. #endif
  84. fake_handle->type = UV_TCP;
  85. ret = uv_write2(
  86. &req->uvReq,
  87. (uv_stream_t*) &pipe->peer,
  88. buffers,
  89. 1,
  90. fake_handle,
  91. sendMessageCallback);
  92. Log_debug(pipe->log, "Sending message with fd [%d]", fd);
  93. } else {
  94. Log_debug(pipe->log, "Sending message of length [%d]", Message_getLength(m));
  95. ret = uv_write(&req->uvReq, (uv_stream_t*) &pipe->peer, buffers, 1, sendMessageCallback);
  96. }
  97. if (ret) {
  98. Log_info(pipe->log, "Failed writing to pipe [%s] [%s]",
  99. pipe->pub.fullName, uv_strerror(ret) );
  100. Allocator_free(req->alloc);
  101. return;
  102. }
  103. pipe->queueLen += Message_getLength(m);
  104. return;
  105. }
  106. static Iface_DEFUN sendMessage(struct Message* m, struct Iface* iface)
  107. {
  108. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) iface);
  109. if (pipe->userAlloc == NULL) {
  110. return NULL;
  111. }
  112. if (pipe->queueLen > 50000) {
  113. return Error(m, "OVERFLOW");
  114. }
  115. // This allocator will hold the message allocator in existance after it is freed.
  116. struct Allocator* reqAlloc = Allocator_child(pipe->alloc);
  117. Allocator_adopt(reqAlloc, Message_getAlloc(m));
  118. struct Pipe_WriteRequest_pvt* req =
  119. Allocator_clone(reqAlloc, (&(struct Pipe_WriteRequest_pvt) {
  120. .pipe = pipe,
  121. .msg = m,
  122. .alloc = reqAlloc
  123. }));
  124. Identity_set(req);
  125. if (pipe->isActive) {
  126. sendMessage2(req);
  127. } else {
  128. if (!pipe->bufferedRequest) {
  129. Log_debug(pipe->log, "Buffering a message");
  130. pipe->bufferedRequest = req;
  131. } else {
  132. Log_debug(pipe->log, "Appending to the buffered message");
  133. struct Message* m2 = Message_new(0,
  134. Message_getLength(m) + Message_getLength(pipe->bufferedRequest->msg), reqAlloc);
  135. Er_assert(Message_epush(m2, m->msgbytes, Message_getLength(m)));
  136. Er_assert(Message_epush(m2,
  137. pipe->bufferedRequest->msg->msgbytes,
  138. Message_getLength(pipe->bufferedRequest->msg)));
  139. req->msg = m2;
  140. Allocator_free(pipe->bufferedRequest->alloc);
  141. pipe->bufferedRequest = req;
  142. }
  143. }
  144. return NULL;
  145. }
  146. /** Asynchronous allocator freeing. */
  147. static void onClose(uv_handle_t* handle)
  148. {
  149. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)handle->data);
  150. handle->data = NULL;
  151. Log_debug(pipe->log, "Pipe closed");
  152. Assert_true(!pipe->peer.data);
  153. Allocator_free(pipe->alloc);
  154. }
  155. #if Pipe_PADDING_AMOUNT < 8
  156. #error
  157. #endif
  158. #define ALLOC(buff) (((struct Message**) &(buff[-(8 + (((uintptr_t)buff) % 8))]))[0])
  159. static void incoming(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
  160. {
  161. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) stream->data);
  162. // Grab out the msg which was placed there by allocate()
  163. struct Message* msg = buf->base ? ALLOC(buf->base) : NULL;
  164. pipe->isInCallback = 1;
  165. if (nread < 0) {
  166. if (pipe->pub.onClose) {
  167. pipe->pub.onClose(&pipe->pub, 0);
  168. }
  169. } else if (nread == 0) {
  170. // This is common.
  171. //Log_debug(pipe->log, "Pipe 0 length read [%s]", pipe->pub.fullName);
  172. } else {
  173. Assert_true(msg);
  174. Er_assert(Message_truncate(msg, nread));
  175. if (pipe->ipc) {
  176. #ifndef win32
  177. Message_setAssociatedFd(msg, stream->accepted_fd);
  178. #endif
  179. }
  180. Log_debug(pipe->log, "Pipe incoming message len [%d]", Message_getLength(msg));
  181. Iface_send(&pipe->pub.iface, msg);
  182. }
  183. if (msg) {
  184. Allocator_free(Message_getAlloc(msg));
  185. }
  186. pipe->isInCallback = 0;
  187. if (pipe->userAlloc == NULL) {
  188. uv_close((uv_handle_t*) &pipe->peer, onClose);
  189. }
  190. }
  191. static void incoming2(uv_pipe_t* stream, ssize_t nread, const uv_buf_t* buf, uv_handle_type _)
  192. {
  193. incoming((uv_stream_t*)stream, nread, buf);
  194. }
  195. static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
  196. {
  197. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) handle->data);
  198. size = Pipe_BUFFER_CAP;
  199. struct Allocator* child = Allocator_child(pipe->alloc);
  200. struct Message* msg = Message_new(size, Pipe_PADDING_AMOUNT, child);
  201. ALLOC(msg->msgbytes) = msg;
  202. buf->base = msg->msgbytes;
  203. buf->len = size;
  204. }
  205. static int startPipe(struct Pipe_pvt* pipe)
  206. {
  207. if (pipe->ipc) {
  208. return uv_read2_start((uv_stream_t*)&pipe->peer, allocate, incoming2);
  209. } else {
  210. return uv_read_start((uv_stream_t*)&pipe->peer, allocate, incoming);
  211. }
  212. }
  213. static void connected(uv_connect_t* req, int status)
  214. {
  215. uv_stream_t* link = req->handle;
  216. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) link->data);
  217. if (pipe->userAlloc == NULL) { return; }
  218. Log_debug(pipe->log, "Pipe [%s] established connection", pipe->pub.fullName);
  219. int ret;
  220. if (status) {
  221. Log_info(pipe->log, "uv_pipe_connect() failed for pipe [%s] [%s]",
  222. pipe->pub.fullName, uv_strerror(status) );
  223. uv_close((uv_handle_t*) &pipe->peer, NULL);
  224. } else if ((ret = startPipe(pipe))) {
  225. Log_info(pipe->log, "uv_read_start() failed for pipe [%s] [%s]",
  226. pipe->pub.fullName, uv_strerror(ret));
  227. uv_close((uv_handle_t*) &pipe->peer, NULL);
  228. } else {
  229. pipe->isActive = 1;
  230. if (pipe->pub.onConnection) {
  231. pipe->pub.onConnection(&pipe->pub, status);
  232. }
  233. // If there's anything buffered then send it.
  234. if (pipe->bufferedRequest) {
  235. Log_debug(pipe->log, "Sending buffered message");
  236. sendMessage2(pipe->bufferedRequest);
  237. pipe->bufferedRequest = NULL;
  238. }
  239. }
  240. }
  241. static int onFree(struct Allocator_OnFreeJob* job)
  242. {
  243. struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)job->userData);
  244. pipe->userAlloc = NULL;
  245. if (!pipe->isInCallback) {
  246. Assert_true(pipe->peer.data);
  247. uv_close((uv_handle_t*) &pipe->peer, onClose);
  248. EventBase_wakeup(pipe->base);
  249. }
  250. return 0;
  251. }
  252. static Er_DEFUN(struct Pipe_pvt* newPipeAny(struct EventBase* eb,
  253. const char* fullPath,
  254. bool ipc,
  255. struct Log* log,
  256. struct Allocator* userAlloc))
  257. {
  258. struct EventBase_pvt* ctx = EventBase_privatize(eb);
  259. struct Allocator* alloc = Allocator_child(ctx->alloc);
  260. struct Pipe_pvt* out = Allocator_clone(alloc, (&(struct Pipe_pvt) {
  261. .pub = {
  262. .iface = {
  263. .send = sendMessage
  264. },
  265. .fullName = (fullPath) ? CString_strdup(fullPath, alloc) : NULL,
  266. },
  267. .alloc = alloc,
  268. .userAlloc = userAlloc,
  269. .log = log,
  270. .ipc = ipc,
  271. .base = eb,
  272. }));
  273. int ret = uv_pipe_init(ctx->loop, &out->peer, ipc);
  274. if (ret) {
  275. Er_raise(alloc, "uv_pipe_init() failed [%s]", uv_strerror(ret));
  276. }
  277. Allocator_onFree(userAlloc, onFree, out);
  278. out->peer.data = out;
  279. Identity_set(out);
  280. Er_ret(out);
  281. }
  282. Er_DEFUN(struct Pipe* Pipe_forFd(int fd,
  283. bool ipc,
  284. struct EventBase* eb,
  285. struct Log* log,
  286. struct Allocator* userAlloc))
  287. {
  288. char buff[32] = {0};
  289. snprintf(buff, 31, "forFd(%d)", fd);
  290. struct Pipe_pvt* out = Er(newPipeAny(eb, buff, ipc, log, userAlloc));
  291. int ret = uv_pipe_open(&out->peer, fd);
  292. if (ret) {
  293. Er_raise(out->alloc, "uv_pipe_open(inFd) failed [%s]", uv_strerror(ret));
  294. }
  295. uv_connect_t req = { .handle = (uv_stream_t*) &out->peer };
  296. connected(&req, 0);
  297. Er_ret(&out->pub);
  298. }
  299. Er_DEFUN(struct Pipe* Pipe_named(const char* fullPath,
  300. struct EventBase* eb,
  301. struct Log* log,
  302. struct Allocator* userAlloc))
  303. {
  304. struct Pipe_pvt* out = Er(newPipeAny(eb, fullPath, true, log, userAlloc));
  305. uv_connect_t* req = Allocator_malloc(out->alloc, sizeof(uv_connect_t));
  306. req->data = out;
  307. uv_pipe_connect(req, &out->peer, out->pub.fullName, connected);
  308. int err = 0;
  309. // We get the error back synchronously but windows doesn't support that
  310. // TODO(cjd): Find a better way
  311. #ifndef win32
  312. err = (&out->peer)->delayed_error;
  313. #endif
  314. if (err != 0) {
  315. Er_raise(out->alloc, "uv_pipe_connect() failed [%s] for pipe [%s]",
  316. uv_strerror(err), out->pub.fullName);
  317. }
  318. Er_ret(&out->pub);
  319. }
  320. Er_DEFUN(struct Pipe* Pipe_serverAccept(uv_pipe_t* server,
  321. const char* pipeName,
  322. struct EventBase* eb,
  323. struct Log* log,
  324. struct Allocator* userAlloc))
  325. {
  326. struct Pipe_pvt* out = Er(newPipeAny(eb, NULL, true, log, userAlloc));
  327. int ret = uv_accept((uv_stream_t*) server, (uv_stream_t*) &out->peer);
  328. if (ret) {
  329. uv_close((uv_handle_t*) &out->peer, NULL);
  330. Er_raise(out->alloc, "uv_accept() failed: pipe [%s] [%s]",
  331. pipeName, uv_strerror(ret));
  332. } else {
  333. uv_connect_t req = { .handle = (uv_stream_t*) &out->peer };
  334. connected(&req, 0);
  335. }
  336. Er_ret(&out->pub);
  337. }
  338. Er_DEFUN(bool Pipe_exists(const char* path, struct Allocator* errAlloc))
  339. {
  340. struct stat st;
  341. if (stat(path, &st)) {
  342. if (errno == ENOENT) { Er_ret(false); }
  343. Er_raise(errAlloc, "Failed stat(%s) [%s]", path, strerror(errno));
  344. } else {
  345. int flag = 0;
  346. #ifdef win32
  347. flag = S_IFIFO;
  348. #elif defined(S_IFSOCK)
  349. flag = S_IFSOCK;
  350. #else
  351. #error "missing S_IFSOCK"
  352. #endif
  353. Er_ret(((int)(st.st_mode & S_IFMT)) == flag);
  354. }
  355. }