|
@@ -15,6 +15,7 @@
|
|
|
#include "util/events/libuv/UvWrapper.h"
|
|
|
#include "memory/Allocator.h"
|
|
|
#include "util/events/Pipe.h"
|
|
|
+#include "util/events/libuv/Pipe_pvt.h"
|
|
|
#include "util/events/libuv/EventBase_pvt.h"
|
|
|
#include "util/log/Log.h"
|
|
|
#include "util/Identity.h"
|
|
@@ -26,6 +27,8 @@
|
|
|
#include <inttypes.h>
|
|
|
#include <libgen.h>
|
|
|
#include <stdio.h>
|
|
|
+#include <sys/stat.h>
|
|
|
+#include <string.h>
|
|
|
|
|
|
struct Pipe_WriteRequest_pvt;
|
|
|
|
|
@@ -33,7 +36,6 @@ struct Pipe_pvt
|
|
|
{
|
|
|
struct Pipe pub;
|
|
|
|
|
|
- uv_pipe_t server;
|
|
|
uv_pipe_t peer;
|
|
|
|
|
|
/** Job to close the handles when the allocator is freed */
|
|
@@ -42,11 +44,8 @@ struct Pipe_pvt
|
|
|
/** Job which blocks the freeing until the callback completes */
|
|
|
struct Allocator_OnFreeJob* blockFreeInsideCallback;
|
|
|
|
|
|
- /**
|
|
|
- * If the output pipe is same as the input, this points to peer.
|
|
|
- * Otherwise it points to server which is reused.
|
|
|
- */
|
|
|
- uv_pipe_t* out;
|
|
|
+ // true if we can pass file descriptors through this pipe
|
|
|
+ bool ipc;
|
|
|
|
|
|
/** 1 when the pipe becomes active. */
|
|
|
int isActive;
|
|
@@ -61,6 +60,8 @@ struct Pipe_pvt
|
|
|
|
|
|
struct Allocator* alloc;
|
|
|
|
|
|
+ struct Log* log;
|
|
|
+
|
|
|
Identity
|
|
|
};
|
|
|
|
|
@@ -76,7 +77,7 @@ static void sendMessageCallback(uv_write_t* uvReq, int error)
|
|
|
{
|
|
|
struct Pipe_WriteRequest_pvt* req = Identity_check((struct Pipe_WriteRequest_pvt*) uvReq);
|
|
|
if (error) {
|
|
|
- Log_info(req->pipe->pub.logger, "Failed to write to pipe [%s] [%s]",
|
|
|
+ Log_info(req->pipe->log, "Failed to write to pipe [%s] [%s]",
|
|
|
req->pipe->pub.fullName, uv_strerror(error) );
|
|
|
}
|
|
|
req->pipe->queueLen -= req->msg->length;
|
|
@@ -84,7 +85,7 @@ static void sendMessageCallback(uv_write_t* uvReq, int error)
|
|
|
Allocator_free(req->alloc);
|
|
|
}
|
|
|
|
|
|
-static uint8_t sendMessage2(struct Pipe_WriteRequest_pvt* req)
|
|
|
+static void sendMessage2(struct Pipe_WriteRequest_pvt* req)
|
|
|
{
|
|
|
struct Pipe_pvt* pipe = req->pipe;
|
|
|
struct Message* m = req->msg;
|
|
@@ -93,16 +94,32 @@ static uint8_t sendMessage2(struct Pipe_WriteRequest_pvt* req)
|
|
|
{ .base = (char*)m->bytes, .len = m->length }
|
|
|
};
|
|
|
|
|
|
- int ret = uv_write(&req->uvReq, (uv_stream_t*) pipe->out, buffers, 1, sendMessageCallback);
|
|
|
+ int ret = -1;
|
|
|
+ if (pipe->ipc && m->associatedFd) {
|
|
|
+ int fd = Message_getAssociatedFd(m);
|
|
|
+ uv_stream_t* fake_handle = Allocator_calloc(req->alloc, sizeof(uv_stream_t), 1);
|
|
|
+ fake_handle->io_watcher.fd = fd;
|
|
|
+ fake_handle->type = UV_TCP;
|
|
|
+ ret = uv_write2(
|
|
|
+ &req->uvReq,
|
|
|
+ (uv_stream_t*) &pipe->peer,
|
|
|
+ buffers,
|
|
|
+ 1,
|
|
|
+ fake_handle,
|
|
|
+ sendMessageCallback);
|
|
|
+ Log_debug(pipe->log, "Sending message with fd [%d]", fd);
|
|
|
+ } else {
|
|
|
+ ret = uv_write(&req->uvReq, (uv_stream_t*) &pipe->peer, buffers, 1, sendMessageCallback);
|
|
|
+ }
|
|
|
if (ret) {
|
|
|
- Log_info(pipe->pub.logger, "Failed writing to pipe [%s] [%s]",
|
|
|
+ Log_info(pipe->log, "Failed writing to pipe [%s] [%s]",
|
|
|
pipe->pub.fullName, uv_strerror(ret) );
|
|
|
Allocator_free(req->alloc);
|
|
|
- return Error_UNDELIVERABLE;
|
|
|
+ return;
|
|
|
}
|
|
|
pipe->queueLen += m->length;
|
|
|
|
|
|
- return Error_NONE;
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
static Iface_DEFUN sendMessage(struct Message* m, struct Iface* iface)
|
|
@@ -130,17 +147,13 @@ static Iface_DEFUN sendMessage(struct Message* m, struct Iface* iface)
|
|
|
Identity_set(req);
|
|
|
|
|
|
if (pipe->isActive) {
|
|
|
- Log_debug(pipe->pub.logger, "Pipe [%s] sending a message len [%d]",
|
|
|
- pipe->pub.name, (int)m->length);
|
|
|
sendMessage2(req);
|
|
|
} else {
|
|
|
if (!pipe->bufferedRequest) {
|
|
|
- Log_debug(pipe->pub.logger, "Pipe [%s] buffering a message",
|
|
|
- pipe->pub.name);
|
|
|
+ Log_debug(pipe->log, "Buffering a message");
|
|
|
pipe->bufferedRequest = req;
|
|
|
} else {
|
|
|
- Log_debug(pipe->pub.logger, "Pipe [%s] appending to the buffered message",
|
|
|
- pipe->pub.name);
|
|
|
+ Log_debug(pipe->log, "Appending to the buffered message");
|
|
|
uint8_t* buff =
|
|
|
Allocator_malloc(reqAlloc, m->length + pipe->bufferedRequest->msg->length);
|
|
|
Bits_memcpy(buff,
|
|
@@ -155,7 +168,7 @@ static Iface_DEFUN sendMessage(struct Message* m, struct Iface* iface)
|
|
|
pipe->bufferedRequest = req;
|
|
|
}
|
|
|
}
|
|
|
- return Error_NONE;
|
|
|
+ return NULL;
|
|
|
}
|
|
|
|
|
|
/** Asynchronous allocator freeing. */
|
|
@@ -163,7 +176,7 @@ static void onClose(uv_handle_t* handle)
|
|
|
{
|
|
|
struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)handle->data);
|
|
|
handle->data = NULL;
|
|
|
- if (pipe->closeHandlesOnFree && !pipe->server.data && !pipe->peer.data) {
|
|
|
+ if (pipe->closeHandlesOnFree && !pipe->peer.data) {
|
|
|
Allocator_onFreeComplete((struct Allocator_OnFreeJob*) pipe->closeHandlesOnFree);
|
|
|
}
|
|
|
}
|
|
@@ -184,18 +197,15 @@ static void incoming(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
|
|
|
Assert_true(!alloc || alloc->fileName == pipe->alloc->fileName);
|
|
|
|
|
|
if (nread < 0) {
|
|
|
- Log_debug(pipe->pub.logger, "Pipe closed [%s]", pipe->pub.fullName);
|
|
|
if (pipe->pub.onClose) {
|
|
|
pipe->pub.onClose(&pipe->pub, 0);
|
|
|
}
|
|
|
- uv_close((uv_handle_t*) stream, onClose);
|
|
|
|
|
|
} else if (nread == 0) {
|
|
|
// This is common.
|
|
|
- //Log_debug(pipe->pub.logger, "Pipe 0 length read [%s]", pipe->pub.fullName);
|
|
|
+ //Log_debug(pipe->log, "Pipe 0 length read [%s]", pipe->pub.fullName);
|
|
|
|
|
|
} else {
|
|
|
- Log_debug(pipe->pub.logger, "Pipe read [%s] [%d]", pipe->pub.fullName, (int) nread);
|
|
|
Assert_true(alloc);
|
|
|
struct Message* m = Allocator_calloc(alloc, sizeof(struct Message), 1);
|
|
|
m->length = nread;
|
|
@@ -203,6 +213,11 @@ static void incoming(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
|
|
|
m->capacity = buf->len;
|
|
|
m->bytes = (uint8_t*)buf->base;
|
|
|
m->alloc = alloc;
|
|
|
+ if (pipe->ipc) {
|
|
|
+ #ifndef win32
|
|
|
+ Message_setAssociatedFd(m, stream->accepted_fd);
|
|
|
+ #endif
|
|
|
+ }
|
|
|
Iface_send(&pipe->pub.iface, m);
|
|
|
}
|
|
|
|
|
@@ -216,6 +231,11 @@ static void incoming(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+static void incoming2(uv_pipe_t* stream, ssize_t nread, const uv_buf_t* buf, uv_handle_type _)
|
|
|
+{
|
|
|
+ incoming((uv_stream_t*)stream, nread, buf);
|
|
|
+}
|
|
|
+
|
|
|
static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
|
|
|
{
|
|
|
struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) handle->data);
|
|
@@ -232,20 +252,29 @@ static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
|
|
|
buf->len = size;
|
|
|
}
|
|
|
|
|
|
+static int startPipe(struct Pipe_pvt* pipe)
|
|
|
+{
|
|
|
+ if (pipe->ipc) {
|
|
|
+ return uv_read2_start((uv_stream_t*)&pipe->peer, allocate, incoming2);
|
|
|
+ } else {
|
|
|
+ return uv_read_start((uv_stream_t*)&pipe->peer, allocate, incoming);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
static void connected(uv_connect_t* req, int status)
|
|
|
{
|
|
|
uv_stream_t* link = req->handle;
|
|
|
struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) link->data);
|
|
|
- Log_debug(pipe->pub.logger, "Pipe [%s] established connection", pipe->pub.fullName);
|
|
|
+ Log_debug(pipe->log, "Pipe [%s] established connection", pipe->pub.fullName);
|
|
|
|
|
|
int ret;
|
|
|
if (status) {
|
|
|
- Log_info(pipe->pub.logger, "uv_pipe_connect() failed for pipe [%s] [%s]",
|
|
|
+ Log_info(pipe->log, "uv_pipe_connect() failed for pipe [%s] [%s]",
|
|
|
pipe->pub.fullName, uv_strerror(status) );
|
|
|
uv_close((uv_handle_t*) &pipe->peer, onClose);
|
|
|
|
|
|
- } else if ((ret = uv_read_start((uv_stream_t*)&pipe->peer, allocate, incoming))) {
|
|
|
- Log_info(pipe->pub.logger, "uv_read_start() failed for pipe [%s] [%s]",
|
|
|
+ } else if ((ret = startPipe(pipe))) {
|
|
|
+ Log_info(pipe->log, "uv_read_start() failed for pipe [%s] [%s]",
|
|
|
pipe->pub.fullName, uv_strerror(ret));
|
|
|
uv_close((uv_handle_t*) &pipe->peer, onClose);
|
|
|
|
|
@@ -258,52 +287,13 @@ static void connected(uv_connect_t* req, int status)
|
|
|
|
|
|
// If there's anything buffered then send it.
|
|
|
if (pipe->bufferedRequest) {
|
|
|
- Log_debug(pipe->pub.logger, "Sending buffered message");
|
|
|
+ Log_debug(pipe->log, "Sending buffered message");
|
|
|
sendMessage2(pipe->bufferedRequest);
|
|
|
pipe->bufferedRequest = NULL;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-static void listenCallback(uv_stream_t* server, int status)
|
|
|
-{
|
|
|
- struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) server->data);
|
|
|
- if (pipe->isActive) {
|
|
|
- // first connection wins.
|
|
|
- return;
|
|
|
- }
|
|
|
- if (status == -1) {
|
|
|
- Log_info(pipe->pub.logger, "failed to accept pipe connection [%s] [%s]",
|
|
|
- pipe->pub.fullName, uv_strerror(status) );
|
|
|
-
|
|
|
- if (pipe->pub.onConnection) {
|
|
|
- pipe->pub.onConnection(&pipe->pub, status);
|
|
|
- }
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- int ret = uv_accept(server, (uv_stream_t*) &pipe->peer);
|
|
|
- if (ret) {
|
|
|
- Log_warn(pipe->pub.logger, "uv_accept() failed: pipe [%s] [%s]",
|
|
|
- pipe->pub.fullName, uv_strerror(ret) );
|
|
|
- if (pipe->pub.onConnection) {
|
|
|
- pipe->pub.onConnection(&pipe->pub, -1);
|
|
|
- }
|
|
|
- uv_close((uv_handle_t*) &pipe->peer, onClose);
|
|
|
- } else {
|
|
|
- uv_connect_t req = { .handle = (uv_stream_t*) &pipe->peer };
|
|
|
- connected(&req, 0);
|
|
|
- }
|
|
|
-
|
|
|
- uv_close((uv_handle_t*) &pipe->server, onClose);
|
|
|
-
|
|
|
- #ifndef win32
|
|
|
- // get rid of the pipe after it has been connected.
|
|
|
- uv_fs_t req;
|
|
|
- uv_fs_unlink(pipe->peer.loop, &req, pipe->pub.fullName, NULL);
|
|
|
- #endif
|
|
|
-}
|
|
|
-
|
|
|
static int blockFreeInsideCallback(struct Allocator_OnFreeJob* job)
|
|
|
{
|
|
|
struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)job->userData);
|
|
@@ -318,240 +308,121 @@ static int closeHandlesOnFree(struct Allocator_OnFreeJob* job)
|
|
|
{
|
|
|
struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)job->userData);
|
|
|
pipe->closeHandlesOnFree = job;
|
|
|
- int skip = 2;
|
|
|
- if (pipe->server.data) {
|
|
|
- uv_close((uv_handle_t*) &pipe->server, onClose);
|
|
|
- skip--;
|
|
|
- }
|
|
|
if (pipe->peer.data) {
|
|
|
uv_close((uv_handle_t*) &pipe->peer, onClose);
|
|
|
- skip--;
|
|
|
- }
|
|
|
- if (skip == 2) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- return Allocator_ONFREE_ASYNC;
|
|
|
-}
|
|
|
-
|
|
|
-static struct Pipe_pvt* newPipe(struct EventBase* eb,
|
|
|
- const char* path,
|
|
|
- const char* name,
|
|
|
- struct Except* eh,
|
|
|
- struct Allocator* userAlloc)
|
|
|
-{
|
|
|
- struct EventBase_pvt* ctx = EventBase_privatize(eb);
|
|
|
- struct Allocator* alloc = Allocator_child(userAlloc);
|
|
|
-
|
|
|
- char prefix[32] = {0};
|
|
|
- if (Defined(win32)) {
|
|
|
- Bits_memcpy(prefix, "\\cjdns_pipe_", CString_strlen("\\cjdns_pipe_"));
|
|
|
- } else {
|
|
|
- Bits_memcpy(prefix, "/cjdns_pipe_", CString_strlen("/cjdns_pipe_"));
|
|
|
- }
|
|
|
- char* cname = Allocator_malloc(alloc, (path ? CString_strlen(path) : 0) +
|
|
|
- CString_strlen(prefix) + CString_strlen(name) + 1);
|
|
|
- int pos = 0;
|
|
|
- if (path) {
|
|
|
- Bits_memcpy(cname, path, CString_strlen(path));
|
|
|
- pos += CString_strlen(path);
|
|
|
- }
|
|
|
- Bits_memcpy(cname + pos, prefix, CString_strlen(prefix));
|
|
|
- pos += CString_strlen(prefix);
|
|
|
- Bits_memcpy(cname + pos, name, CString_strlen(name) + 1);
|
|
|
-
|
|
|
- struct Pipe_pvt* out = Allocator_clone(alloc, (&(struct Pipe_pvt) {
|
|
|
- .pub = {
|
|
|
- .iface = {
|
|
|
- .send = sendMessage
|
|
|
- },
|
|
|
- .fullName = cname,
|
|
|
- .name = &cname[pos],
|
|
|
- .base = eb
|
|
|
- },
|
|
|
- .alloc = alloc
|
|
|
- }));
|
|
|
-
|
|
|
- int ret;
|
|
|
-
|
|
|
- ret = uv_pipe_init(ctx->loop, &out->peer, 0);
|
|
|
- if (ret) {
|
|
|
- Except_throw(eh, "uv_pipe_init() failed [%s]", uv_strerror(ret));
|
|
|
- }
|
|
|
-
|
|
|
- ret = uv_pipe_init(ctx->loop, &out->server, 0);
|
|
|
- if (ret) {
|
|
|
- Except_throw(eh, "uv_pipe_init() failed [%s]", uv_strerror(ret));
|
|
|
+ return Allocator_ONFREE_ASYNC;
|
|
|
}
|
|
|
-
|
|
|
- #ifdef win32
|
|
|
- out->pub.fd = &out->peer.handle;
|
|
|
- #else
|
|
|
- out->pub.fd = &out->peer.io_watcher.fd;
|
|
|
- #endif
|
|
|
-
|
|
|
- Allocator_onFree(alloc, closeHandlesOnFree, out);
|
|
|
- Allocator_onFree(alloc, blockFreeInsideCallback, out);
|
|
|
-
|
|
|
- out->peer.data = out;
|
|
|
- out->server.data = out;
|
|
|
- out->out = &out->peer;
|
|
|
- Identity_set(out);
|
|
|
-
|
|
|
- return out;
|
|
|
+ return 0;
|
|
|
}
|
|
|
|
|
|
static struct Pipe_pvt* newPipeAny(struct EventBase* eb,
|
|
|
const char* fullPath,
|
|
|
+ bool ipc,
|
|
|
+ struct Log* log,
|
|
|
struct Except* eh,
|
|
|
struct Allocator* userAlloc)
|
|
|
{
|
|
|
struct EventBase_pvt* ctx = EventBase_privatize(eb);
|
|
|
struct Allocator* alloc = Allocator_child(userAlloc);
|
|
|
|
|
|
- char* name = NULL;
|
|
|
- if (fullPath) {
|
|
|
- name = basename(String_new(fullPath, alloc)->bytes);
|
|
|
- }
|
|
|
-
|
|
|
struct Pipe_pvt* out = Allocator_clone(alloc, (&(struct Pipe_pvt) {
|
|
|
.pub = {
|
|
|
.iface = {
|
|
|
.send = sendMessage
|
|
|
},
|
|
|
- .fullName = fullPath,
|
|
|
- .name = name,
|
|
|
- .base = eb
|
|
|
+ .fullName = (fullPath) ? CString_strdup(fullPath, alloc) : NULL,
|
|
|
},
|
|
|
- .alloc = alloc
|
|
|
+ .alloc = alloc,
|
|
|
+ .log = log,
|
|
|
+ .ipc = ipc,
|
|
|
}));
|
|
|
|
|
|
- int ret;
|
|
|
-
|
|
|
- ret = uv_pipe_init(ctx->loop, &out->peer, 0);
|
|
|
- if (ret) {
|
|
|
- Except_throw(eh, "uv_pipe_init() failed [%s]", uv_strerror(ret));
|
|
|
- }
|
|
|
-
|
|
|
- ret = uv_pipe_init(ctx->loop, &out->server, 0);
|
|
|
+ int ret = uv_pipe_init(ctx->loop, &out->peer, ipc);
|
|
|
if (ret) {
|
|
|
Except_throw(eh, "uv_pipe_init() failed [%s]", uv_strerror(ret));
|
|
|
}
|
|
|
|
|
|
- #ifdef win32
|
|
|
- out->pub.fd = &out->peer.handle;
|
|
|
- #else
|
|
|
- out->pub.fd = &out->peer.io_watcher.fd;
|
|
|
- #endif
|
|
|
-
|
|
|
Allocator_onFree(alloc, closeHandlesOnFree, out);
|
|
|
Allocator_onFree(alloc, blockFreeInsideCallback, out);
|
|
|
|
|
|
out->peer.data = out;
|
|
|
- out->server.data = out;
|
|
|
- out->out = &out->peer;
|
|
|
Identity_set(out);
|
|
|
|
|
|
return out;
|
|
|
}
|
|
|
|
|
|
-struct Pipe* Pipe_forFiles(int inFd,
|
|
|
- int outFd,
|
|
|
- struct EventBase* eb,
|
|
|
- struct Except* eh,
|
|
|
- struct Log* logger,
|
|
|
- struct Allocator* userAlloc)
|
|
|
+struct Pipe* Pipe_forFd(int fd,
|
|
|
+ bool ipc,
|
|
|
+ struct EventBase* eb,
|
|
|
+ struct Except* eh,
|
|
|
+ struct Log* log,
|
|
|
+ struct Allocator* userAlloc)
|
|
|
{
|
|
|
char buff[32] = {0};
|
|
|
- snprintf(buff, 31, "forFiles(%d,%d)", inFd, outFd);
|
|
|
+ snprintf(buff, 31, "forFd(%d)", fd);
|
|
|
|
|
|
- struct Pipe_pvt* out = newPipe(eb, NULL, buff, eh, userAlloc);
|
|
|
- out->pub.logger = logger;
|
|
|
+ struct Pipe_pvt* out = newPipeAny(eb, buff, ipc, log, eh, userAlloc);
|
|
|
|
|
|
- int ret = uv_pipe_open(&out->peer, inFd);
|
|
|
+ int ret = uv_pipe_open(&out->peer, fd);
|
|
|
if (ret) {
|
|
|
Except_throw(eh, "uv_pipe_open(inFd) failed [%s]",
|
|
|
uv_strerror(ret));
|
|
|
}
|
|
|
|
|
|
- if (inFd != outFd) {
|
|
|
- out->out = &out->server;
|
|
|
- ret = uv_pipe_open(out->out, outFd);
|
|
|
- if (ret) {
|
|
|
- Except_throw(eh, "uv_pipe_open(outFd) failed [%s]",
|
|
|
- uv_strerror(ret));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
uv_connect_t req = { .handle = (uv_stream_t*) &out->peer };
|
|
|
connected(&req, 0);
|
|
|
|
|
|
return &out->pub;
|
|
|
}
|
|
|
|
|
|
-struct Pipe* Pipe_named(const char* path,
|
|
|
- const char* name,
|
|
|
+struct Pipe* Pipe_named(const char* fullPath,
|
|
|
struct EventBase* eb,
|
|
|
struct Except* eh,
|
|
|
+ struct Log* log,
|
|
|
struct Allocator* userAlloc)
|
|
|
{
|
|
|
- struct Pipe_pvt* out = newPipe(eb, path, name, eh, userAlloc);
|
|
|
- int ret;
|
|
|
+ struct Pipe_pvt* out = newPipeAny(eb, fullPath, true, log, eh, userAlloc);
|
|
|
|
|
|
- // Attempt to create pipe.
|
|
|
- ret = uv_pipe_bind(&out->server, out->pub.fullName);
|
|
|
- if (!ret) {
|
|
|
- ret = uv_listen((uv_stream_t*) &out->server, 1, listenCallback);
|
|
|
- if (ret) {
|
|
|
- Except_throw(eh, "uv_listen() failed [%s] for pipe [%s]",
|
|
|
- uv_strerror(ret), out->pub.fullName);
|
|
|
- }
|
|
|
- return &out->pub;
|
|
|
- }
|
|
|
+ uv_connect_t* req = Allocator_malloc(out->alloc, sizeof(uv_connect_t));
|
|
|
+ req->data = out;
|
|
|
+ uv_pipe_connect(req, &out->peer, out->pub.fullName, connected);
|
|
|
|
|
|
- if (ret == UV_EADDRINUSE) {
|
|
|
- // Pipe exists, connect to it.
|
|
|
- uv_connect_t* req = Allocator_malloc(out->alloc, sizeof(uv_connect_t));
|
|
|
- req->data = out;
|
|
|
- uv_pipe_connect(req, &out->peer, out->pub.fullName, connected);
|
|
|
- return &out->pub;
|
|
|
+ int err = (&out->peer)->delayed_error;
|
|
|
+ if (err != 0) {
|
|
|
+ Except_throw(eh, "uv_pipe_connect() failed [%s] for pipe [%s]",
|
|
|
+ uv_strerror(err), out->pub.fullName);
|
|
|
}
|
|
|
|
|
|
- Except_throw(eh, "uv_pipe_bind() failed [%s] for pipe [%s]",
|
|
|
- uv_strerror(ret), out->pub.fullName);
|
|
|
-
|
|
|
return &out->pub;
|
|
|
}
|
|
|
|
|
|
-struct Pipe* Pipe_namedConnect(const char* fullPath,
|
|
|
- bool attemptToCreate,
|
|
|
+struct Pipe* Pipe_serverAccept(uv_pipe_t* server,
|
|
|
+ const char* pipeName,
|
|
|
struct EventBase* eb,
|
|
|
struct Except* eh,
|
|
|
+ struct Log* log,
|
|
|
struct Allocator* userAlloc)
|
|
|
{
|
|
|
- struct Pipe_pvt* out = newPipeAny(eb, fullPath, eh, userAlloc);
|
|
|
-
|
|
|
- if (attemptToCreate) {
|
|
|
- // Attempt to create pipe.
|
|
|
- int ret = uv_pipe_bind(&out->server, out->pub.fullName);
|
|
|
- if (!ret) {
|
|
|
- ret = uv_listen((uv_stream_t*) &out->server, 1, listenCallback);
|
|
|
- if (ret) {
|
|
|
- Except_throw(eh, "uv_listen() failed [%s] for pipe [%s]",
|
|
|
- uv_strerror(ret), out->pub.fullName);
|
|
|
- }
|
|
|
- return &out->pub;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- uv_connect_t* req = Allocator_malloc(out->alloc, sizeof(uv_connect_t));
|
|
|
- req->data = out;
|
|
|
- uv_pipe_connect(req, &out->peer, out->pub.fullName, connected);
|
|
|
-
|
|
|
- int err = (&out->peer)->delayed_error;
|
|
|
- if (err != 0) {
|
|
|
- Except_throw(eh, "uv_pipe_connect() failed [%s] for pipe [%s]",
|
|
|
- uv_strerror(err), out->pub.fullName);
|
|
|
+ struct Pipe_pvt* out = newPipeAny(eb, NULL, true, log, eh, userAlloc);
|
|
|
+ int ret = uv_accept((uv_stream_t*) server, (uv_stream_t*) &out->peer);
|
|
|
+ if (ret) {
|
|
|
+ uv_close((uv_handle_t*) &out->peer, onClose);
|
|
|
+ Except_throw(eh, "uv_accept() failed: pipe [%s] [%s]",
|
|
|
+ pipeName, uv_strerror(ret) );
|
|
|
+ } else {
|
|
|
+ uv_connect_t req = { .handle = (uv_stream_t*) &out->peer };
|
|
|
+ connected(&req, 0);
|
|
|
}
|
|
|
-
|
|
|
return &out->pub;
|
|
|
}
|
|
|
+
|
|
|
+bool Pipe_exists(const char* path, struct Except* eh)
|
|
|
+{
|
|
|
+ struct stat st;
|
|
|
+ if (stat(path, &st)) {
|
|
|
+ if (errno == ENOENT) { return false; }
|
|
|
+ Except_throw(eh, "Failed stat(%s) [%s]", path, strerror(errno));
|
|
|
+ } else {
|
|
|
+ return (st.st_mode & S_IFMT) == S_IFSOCK;
|
|
|
+ }
|
|
|
+}
|