/* vim: set expandtab ts=4 sw=4: */
/*
* You may redistribute this program and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include "util/events/libuv/UvWrapper.h"
#include "benc/String.h"
#include "memory/Allocator.h"
#include "util/events/FileNo.h"
#include "util/events/libuv/EventBase_pvt.h"
#include "util/log/Log.h"
#include "util/Identity.h"
#include
#include
// This structure used to be public and accessable.
struct FileNo
{
/** The name of the file eg: "/tmp/cjdns_fileno_foo" */
const char* const pipePath;
void* userData;
enum FileNo_Type type;
struct EventBase* const base;
struct Allocator* alloc;
struct Log* logger;
};
struct FileNoContext
{
struct FileNo_Promise pub;
struct FileNo* fileno;
Identity
};
struct FileNo_pvt
{
struct FileNo pub;
uv_pipe_t server;
uv_pipe_t peer;
/** Job to close the handles when the allocator is freed */
struct Allocator_OnFreeJob* closeHandlesOnFree;
int isActive;
Identity
};
static void onClose(uv_handle_t* handle)
{
struct FileNo_pvt* fileno = Identity_check((struct FileNo_pvt*)handle->data);
handle->data = NULL;
if (fileno->closeHandlesOnFree && !fileno->server.data && !fileno->peer.data) {
Allocator_onFreeComplete((struct Allocator_OnFreeJob*) fileno->closeHandlesOnFree);
}
}
#if FileNo_PADDING_AMOUNT < 8
#error
#endif
#define ALLOC(buff) (((struct Allocator**) &(buff[-(8 + (((uintptr_t)buff) % 8))]))[0])
static void incoming(uv_pipe_t* stream,
ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending)
{
// Grab out the allocator which was placed there by allocate()
struct Allocator* alloc = buf->base ? ALLOC(buf->base) : NULL;
Assert_true(pending == UV_UNKNOWN_HANDLE);
if (nread < 0) {
uv_close((uv_handle_t*) stream, onClose);
#ifndef win32
} else {
struct FileNo_pvt* fileno = Identity_check((struct FileNo_pvt*) stream->data);
if (fileno->peer.accepted_fd != -1) {
struct FileNoContext* fctx = (struct FileNoContext*) fileno->pub.userData;
if (fctx->pub.callback) {
fctx->pub.callback(&fctx->pub, fileno->peer.accepted_fd);
}
}
#endif
}
if (alloc) {
Allocator_free(alloc);
}
}
static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
{
struct FileNo_pvt* pipe = Identity_check((struct FileNo_pvt*) handle->data);
size = FileNo_BUFFER_CAP;
size_t fullSize = size + FileNo_PADDING_AMOUNT;
struct Allocator* child = Allocator_child(pipe->pub.alloc);
char* buff = Allocator_malloc(child, fullSize);
buff += FileNo_PADDING_AMOUNT;
ALLOC(buff) = child;
buf->base = buff;
buf->len = size;
}
static void connected(uv_connect_t* req, int status)
{
uv_stream_t* link = req->handle;
struct FileNo_pvt* fileno = Identity_check((struct FileNo_pvt*) link->data);
Log_debug(fileno->pub.logger, "FileNo [%s] established connection", fileno->pub.pipePath);
int ret;
if (status) {
Log_info(fileno->pub.logger, "uv_pipe_connect() failed for pipe [%s] [%s]",
fileno->pub.pipePath, uv_strerror(status) );
uv_close((uv_handle_t*) &fileno->peer, onClose);
} else if ((ret = uv_read2_start((uv_stream_t*)&fileno->peer, allocate, incoming))) {
Log_info(fileno->pub.logger, "uv_read2_start() failed for pipe [%s] [%s]",
fileno->pub.pipePath, uv_strerror(ret));
uv_close((uv_handle_t*) &fileno->peer, onClose);
} else {
fileno->isActive = 1;
}
}
static void listenCallback(uv_stream_t* server, int status)
{
struct FileNo_pvt* fileno = Identity_check((struct FileNo_pvt*) server->data);
if (fileno->isActive) {
// first connection wins.
return;
}
if (status == -1) {
Log_info(fileno->pub.logger, "failed to accept pipe connection [%s] [%s]",
fileno->pub.pipePath, uv_strerror(status) );
return;
}
Log_info(fileno->pub.logger, "Try accept pipe connection [%s]",
fileno->pub.pipePath);
int ret = uv_accept(server, (uv_stream_t*) &fileno->peer);
if (ret) {
Log_warn(fileno->pub.logger, "uv_accept() failed: pipe [%s] [%s]",
fileno->pub.pipePath, uv_strerror(ret) );
uv_close((uv_handle_t*) &fileno->peer, onClose);
} else {
uv_connect_t req = { .handle = (uv_stream_t*) &fileno->peer };
connected(&req, 0);
}
}
static int closeHandlesOnFree(struct Allocator_OnFreeJob* job)
{
struct FileNo_pvt* fileno = Identity_check((struct FileNo_pvt*)job->userData);
fileno->closeHandlesOnFree = job;
int skip = 2;
if (fileno->server.data) {
uv_close((uv_handle_t*) &fileno->server, NULL);
skip--;
}
if (fileno->peer.data) {
uv_close((uv_handle_t*) &fileno->peer, NULL);
skip--;
}
if (skip == 2) {
return 0;
}
return Allocator_ONFREE_ASYNC;
}
static struct FileNo* FileNo_new(const char* path,
struct EventBase* eb,
struct Except* eh,
struct Log* logger,
struct Allocator* userAlloc)
{
struct EventBase_pvt* ctx = EventBase_privatize(eb);
struct Allocator* alloc = Allocator_child(userAlloc);
String* pathStr = String_new(path, alloc);
struct FileNo_pvt* out = Allocator_clone(alloc, (&(struct FileNo_pvt) {
.pub = {
.pipePath = pathStr->bytes,
.base = eb,
.logger = logger,
.alloc = alloc
}
}));
int ret = uv_pipe_init(ctx->loop, &out->peer, 1);
if (ret) {
Except_throw(eh, "uv_pipe_init() failed [%s]", uv_strerror(ret));
}
ret = uv_pipe_init(ctx->loop, &out->server, 1);
if (ret) {
Except_throw(eh, "uv_pipe_init() failed [%s]", uv_strerror(ret));
}
Allocator_onFree(alloc, closeHandlesOnFree, out);
out->peer.data = out;
out->server.data = out;
Identity_set(out);
uv_fs_t req;
uv_fs_unlink(out->peer.loop, &req, out->pub.pipePath, NULL);
ret = uv_pipe_bind(&out->server, out->pub.pipePath);
if (!ret) {
ret = uv_listen((uv_stream_t*) &out->server, 1, listenCallback);
if (ret) {
Except_throw(eh, "uv_listen() failed [%s] for pipe [%s]",
uv_strerror(ret), out->pub.pipePath);
}
return &out->pub;
}
Except_throw(eh, "uv_pipe_bind() failed [%s] for pipe [%s]",
uv_strerror(ret), out->pub.pipePath);
return &out->pub;
}
static struct FileNo_Promise* FileNo_newFile(const char* path,
struct EventBase* eb,
struct Except* eh,
struct Log* logger,
struct Allocator* alloc)
{
struct FileNo* fn = FileNo_new(path, eb, eh, logger, alloc);
struct FileNoContext* fctx = Allocator_clone(fn->alloc, (&(struct FileNoContext) {
.pub = {
.alloc = fn->alloc
},
.fileno = fn
}));
Identity_set(fctx);
fn->userData = fctx;
return &fctx->pub;
}
struct FileNo_Promise* FileNo_import(const char* path,
struct EventBase* eb,
struct Except* eh,
struct Log* logger,
struct Allocator* alloc)
{
return FileNo_newFile(path, eb, eh, logger, alloc);
}