/* vim: set expandtab ts=4 sw=4: */ /* * You may redistribute this program and/or modify it under the terms of * the GNU General Public License as published by the Free Software Foundation, * either version 3 of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "util/events/libuv/UvWrapper.h" #include "exception/Er.h" #include "memory/Allocator.h" #include "util/events/Pipe.h" #include "util/events/libuv/Pipe_pvt.h" #include "util/events/libuv/EventBase_pvt.h" #include "util/log/Log.h" #include "util/Identity.h" #include "util/CString.h" #include "wire/Message.h" #include "wire/Error.h" #include "benc/String.h" #include #include #include #include #include struct Pipe_WriteRequest_pvt; struct Pipe_pvt { struct Pipe pub; uv_pipe_t peer; // true if we can pass file descriptors through this pipe bool ipc; /** 1 when the pipe becomes active. */ int isActive; int queueLen; /** Used by blockFreeInsideCallback */ int isInCallback; /** only non-null before the connection is setup. */ struct Pipe_WriteRequest_pvt* bufferedRequest; struct Allocator* alloc; struct Allocator* userAlloc; struct Log* log; struct EventBase* base; Identity }; struct Pipe_WriteRequest_pvt { uv_write_t uvReq; struct Pipe_pvt* pipe; struct Message* msg; struct Allocator* alloc; Identity }; static void sendMessageCallback(uv_write_t* uvReq, int error) { struct Pipe_WriteRequest_pvt* req = Identity_check((struct Pipe_WriteRequest_pvt*) uvReq); if (error) { Log_info(req->pipe->log, "Failed to write to pipe [%s] [%s]", req->pipe->pub.fullName, uv_strerror(error) ); } req->pipe->queueLen -= Message_getLength(req->msg); Assert_ifParanoid(req->pipe->queueLen >= 0); Allocator_free(req->alloc); } static void sendMessage2(struct Pipe_WriteRequest_pvt* req) { struct Pipe_pvt* pipe = req->pipe; struct Message* m = req->msg; uv_buf_t buffers[] = { { .base = (char*)m->msgbytes, .len = Message_getLength(m) } }; int ret = -1; int fd = Message_getAssociatedFd(m); if (pipe->ipc && fd > -1 && !Defined(win32)) { uv_stream_t* fake_handle = Allocator_calloc(req->alloc, sizeof(uv_stream_t), 1); #ifndef win32 fake_handle->io_watcher.fd = fd; #endif fake_handle->type = UV_TCP; ret = uv_write2( &req->uvReq, (uv_stream_t*) &pipe->peer, buffers, 1, fake_handle, sendMessageCallback); Log_debug(pipe->log, "Sending message with fd [%d]", fd); } else { Log_debug(pipe->log, "Sending message of length [%d]", Message_getLength(m)); ret = uv_write(&req->uvReq, (uv_stream_t*) &pipe->peer, buffers, 1, sendMessageCallback); } if (ret) { Log_info(pipe->log, "Failed writing to pipe [%s] [%s]", pipe->pub.fullName, uv_strerror(ret) ); Allocator_free(req->alloc); return; } pipe->queueLen += Message_getLength(m); return; } static Iface_DEFUN sendMessage(struct Message* m, struct Iface* iface) { struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) iface); if (pipe->userAlloc == NULL) { return NULL; } if (pipe->queueLen > 50000) { return Error(m, "OVERFLOW"); } // This allocator will hold the message allocator in existance after it is freed. struct Allocator* reqAlloc = Allocator_child(pipe->alloc); Allocator_adopt(reqAlloc, Message_getAlloc(m)); struct Pipe_WriteRequest_pvt* req = Allocator_clone(reqAlloc, (&(struct Pipe_WriteRequest_pvt) { .pipe = pipe, .msg = m, .alloc = reqAlloc })); Identity_set(req); if (pipe->isActive) { sendMessage2(req); } else { if (!pipe->bufferedRequest) { Log_debug(pipe->log, "Buffering a message"); pipe->bufferedRequest = req; } else { Log_debug(pipe->log, "Appending to the buffered message"); struct Message* m2 = Message_new(0, Message_getLength(m) + Message_getLength(pipe->bufferedRequest->msg), reqAlloc); Er_assert(Message_epush(m2, m->msgbytes, Message_getLength(m))); Er_assert(Message_epush(m2, pipe->bufferedRequest->msg->msgbytes, Message_getLength(pipe->bufferedRequest->msg))); req->msg = m2; Allocator_free(pipe->bufferedRequest->alloc); pipe->bufferedRequest = req; } } return NULL; } /** Asynchronous allocator freeing. */ static void onClose(uv_handle_t* handle) { struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)handle->data); handle->data = NULL; Log_debug(pipe->log, "Pipe closed"); Assert_true(!pipe->peer.data); Allocator_free(pipe->alloc); } #if Pipe_PADDING_AMOUNT < 8 #error #endif #define ALLOC(buff) (((struct Message**) &(buff[-(8 + (((uintptr_t)buff) % 8))]))[0]) static void incoming(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) stream->data); // Grab out the msg which was placed there by allocate() struct Message* msg = buf->base ? ALLOC(buf->base) : NULL; pipe->isInCallback = 1; if (nread < 0) { if (pipe->pub.onClose) { pipe->pub.onClose(&pipe->pub, 0); } } else if (nread == 0) { // This is common. //Log_debug(pipe->log, "Pipe 0 length read [%s]", pipe->pub.fullName); } else { Assert_true(msg); Er_assert(Message_truncate(msg, nread)); if (pipe->ipc) { #ifndef win32 Message_setAssociatedFd(msg, stream->accepted_fd); #endif } Log_debug(pipe->log, "Pipe incoming message len [%d]", Message_getLength(msg)); Iface_send(&pipe->pub.iface, msg); } if (msg) { Allocator_free(Message_getAlloc(msg)); } pipe->isInCallback = 0; if (pipe->userAlloc == NULL) { uv_close((uv_handle_t*) &pipe->peer, onClose); } } static void incoming2(uv_pipe_t* stream, ssize_t nread, const uv_buf_t* buf, uv_handle_type _) { incoming((uv_stream_t*)stream, nread, buf); } static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf) { struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) handle->data); size = Pipe_BUFFER_CAP; struct Allocator* child = Allocator_child(pipe->alloc); struct Message* msg = Message_new(size, Pipe_PADDING_AMOUNT, child); ALLOC(msg->msgbytes) = msg; buf->base = msg->msgbytes; buf->len = size; } static int startPipe(struct Pipe_pvt* pipe) { if (pipe->ipc) { return uv_read2_start((uv_stream_t*)&pipe->peer, allocate, incoming2); } else { return uv_read_start((uv_stream_t*)&pipe->peer, allocate, incoming); } } static void connected(uv_connect_t* req, int status) { uv_stream_t* link = req->handle; struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) link->data); if (pipe->userAlloc == NULL) { return; } Log_debug(pipe->log, "Pipe [%s] established connection", pipe->pub.fullName); int ret; if (status) { Log_info(pipe->log, "uv_pipe_connect() failed for pipe [%s] [%s]", pipe->pub.fullName, uv_strerror(status) ); uv_close((uv_handle_t*) &pipe->peer, NULL); } else if ((ret = startPipe(pipe))) { Log_info(pipe->log, "uv_read_start() failed for pipe [%s] [%s]", pipe->pub.fullName, uv_strerror(ret)); uv_close((uv_handle_t*) &pipe->peer, NULL); } else { pipe->isActive = 1; if (pipe->pub.onConnection) { pipe->pub.onConnection(&pipe->pub, status); } // If there's anything buffered then send it. if (pipe->bufferedRequest) { Log_debug(pipe->log, "Sending buffered message"); sendMessage2(pipe->bufferedRequest); pipe->bufferedRequest = NULL; } } } static int onFree(struct Allocator_OnFreeJob* job) { struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)job->userData); pipe->userAlloc = NULL; if (!pipe->isInCallback) { Assert_true(pipe->peer.data); uv_close((uv_handle_t*) &pipe->peer, onClose); EventBase_wakeup(pipe->base); } return 0; } static Er_DEFUN(struct Pipe_pvt* newPipeAny(struct EventBase* eb, const char* fullPath, bool ipc, struct Log* log, struct Allocator* userAlloc)) { struct EventBase_pvt* ctx = EventBase_privatize(eb); struct Allocator* alloc = Allocator_child(ctx->alloc); struct Pipe_pvt* out = Allocator_clone(alloc, (&(struct Pipe_pvt) { .pub = { .iface = { .send = sendMessage }, .fullName = (fullPath) ? CString_strdup(fullPath, alloc) : NULL, }, .alloc = alloc, .userAlloc = userAlloc, .log = log, .ipc = ipc, .base = eb, })); int ret = uv_pipe_init(ctx->loop, &out->peer, ipc); if (ret) { Er_raise(alloc, "uv_pipe_init() failed [%s]", uv_strerror(ret)); } Allocator_onFree(userAlloc, onFree, out); out->peer.data = out; Identity_set(out); Er_ret(out); } Er_DEFUN(struct Pipe* Pipe_forFd(int fd, bool ipc, struct EventBase* eb, struct Log* log, struct Allocator* userAlloc)) { char buff[32] = {0}; snprintf(buff, 31, "forFd(%d)", fd); struct Pipe_pvt* out = Er(newPipeAny(eb, buff, ipc, log, userAlloc)); int ret = uv_pipe_open(&out->peer, fd); if (ret) { Er_raise(out->alloc, "uv_pipe_open(inFd) failed [%s]", uv_strerror(ret)); } uv_connect_t req = { .handle = (uv_stream_t*) &out->peer }; connected(&req, 0); Er_ret(&out->pub); } Er_DEFUN(struct Pipe* Pipe_named(const char* fullPath, struct EventBase* eb, struct Log* log, struct Allocator* userAlloc)) { struct Pipe_pvt* out = Er(newPipeAny(eb, fullPath, true, log, userAlloc)); uv_connect_t* req = Allocator_malloc(out->alloc, sizeof(uv_connect_t)); req->data = out; uv_pipe_connect(req, &out->peer, out->pub.fullName, connected); int err = 0; // We get the error back synchronously but windows doesn't support that // TODO(cjd): Find a better way #ifndef win32 err = (&out->peer)->delayed_error; #endif if (err != 0) { Er_raise(out->alloc, "uv_pipe_connect() failed [%s] for pipe [%s]", uv_strerror(err), out->pub.fullName); } Er_ret(&out->pub); } Er_DEFUN(struct Pipe* Pipe_serverAccept(uv_pipe_t* server, const char* pipeName, struct EventBase* eb, struct Log* log, struct Allocator* userAlloc)) { struct Pipe_pvt* out = Er(newPipeAny(eb, NULL, true, log, userAlloc)); int ret = uv_accept((uv_stream_t*) server, (uv_stream_t*) &out->peer); if (ret) { uv_close((uv_handle_t*) &out->peer, NULL); Er_raise(out->alloc, "uv_accept() failed: pipe [%s] [%s]", pipeName, uv_strerror(ret)); } else { uv_connect_t req = { .handle = (uv_stream_t*) &out->peer }; connected(&req, 0); } Er_ret(&out->pub); } Er_DEFUN(bool Pipe_exists(const char* path, struct Allocator* errAlloc)) { struct stat st; if (stat(path, &st)) { if (errno == ENOENT) { Er_ret(false); } Er_raise(errAlloc, "Failed stat(%s) [%s]", path, strerror(errno)); } else { int flag = 0; #ifdef win32 flag = S_IFIFO; #elif defined(S_IFSOCK) flag = S_IFSOCK; #else #error "missing S_IFSOCK" #endif Er_ret(((int)(st.st_mode & S_IFMT)) == flag); } }