/* vim: set expandtab ts=4 sw=4: */
/*
* You may redistribute this program and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include "util/events/libuv/UvWrapper.h"
#include "benc/String.h"
#include "exception/Er.h"
#include "memory/Allocator.h"
#include "util/events/Pipe.h"
#include "util/events/PipeServer.h"
#include "util/events/libuv/Pipe_pvt.h"
#include "util/events/libuv/EventBase_pvt.h"
#include "util/log/Log.h"
#include "util/Identity.h"
#include "util/CString.h"
#include "wire/Message.h"
#include "wire/Error.h"
#include
#include
#include
#include
#include
struct PipeServer_pvt;
struct Client
{
struct Iface iface;
struct Pipe* pipe;
struct PipeServer_pvt* psp;
struct Allocator* alloc;
struct Sockaddr addr;
Identity
};
#define Map_NAME Clients
#define Map_ENABLE_HANDLES
#define Map_VALUE_TYPE struct Client*
#include "util/Map.h"
struct PipeServer_pvt
{
struct PipeServer pub;
uv_pipe_t server;
Iface_t iface;
struct Map_Clients clients;
struct Allocator_OnFreeJob* closeHandlesOnFree;
struct Allocator* alloc;
struct Allocator* userAlloc;
struct EventBase_pvt* base;
struct Log* log;
uint32_t nextHandle;
Identity
};
static Iface_DEFUN sendMessage(struct Message* m, struct Iface* iface)
{
struct PipeServer_pvt* psp = Identity_containerOf(iface, struct PipeServer_pvt, iface);
struct Sockaddr* addr = Er_assert(AddrIface_popAddr(m));
uint32_t handle = Sockaddr_addrHandle(addr);
int idx = Map_Clients_indexForHandle(handle, &psp->clients);
if (idx < 0) {
Log_warn(psp->log, "Attempted to send a message to client [0x%x] which is gone", handle);
return Error(m, "UNHANDLED");
}
struct Client* cli = psp->clients.values[idx];
return Iface_next(&cli->iface, m);
}
static Iface_DEFUN incomingFromClient(struct Message* msg, struct Iface* iface)
{
struct Client* cli = Identity_containerOf(iface, struct Client, iface);
if (!cli->psp) { return NULL; }
struct PipeServer_pvt* psp = Identity_check(cli->psp);
Er_assert(AddrIface_pushAddr(msg, &cli->addr));
return Iface_next(psp->pub.iface.iface, msg);
}
/** Asynchronous allocator freeing. */
static void onClose(uv_handle_t* handle)
{
struct PipeServer_pvt* psp = Identity_check((struct PipeServer_pvt*)handle->data);
Allocator_free(psp->alloc);
}
static struct Pipe* getPipe(struct PipeServer_pvt* psp, struct Allocator* alloc)
{
struct Er_Ret* er = NULL;
struct Pipe* out = Er_check(&er, Pipe_serverAccept(
&psp->server, psp->pub.fullName, &psp->base->pub, psp->log, alloc));
if (er) {
Log_warn(psp->log, "failed to connect to client on pipe [%s] [%s]",
psp->pub.fullName, er->message);
}
return out;
}
static int removeClientOnFree(struct Allocator_OnFreeJob* job)
{
struct Client* cli = Identity_check((struct Client*)job->userData);
if (cli->psp != NULL) {
struct PipeServer_pvt* psp = Identity_check(cli->psp);
uint32_t handle = Sockaddr_addrHandle(&cli->addr);
int idx = Map_Clients_indexForHandle(handle, &psp->clients);
if (idx > -1) {
Map_Clients_remove(idx, &psp->clients);
}
}
return 0;
}
static void pipeOnClose(struct Pipe* p, int status)
{
struct Client* cli = Identity_check((struct Client*) p->userData);
struct PipeServer_pvt* psp = Identity_check(cli->psp);
if (psp->pub.onDisconnection) {
psp->pub.onDisconnection(&psp->pub, &cli->addr);
}
Allocator_free(cli->alloc);
}
static void listenCallback(uv_stream_t* server, int status)
{
uv_pipe_t* pServer = (uv_pipe_t*) server;
struct PipeServer_pvt* psp = Identity_containerOf(pServer, struct PipeServer_pvt, server);
if (status == -1) {
Log_info(psp->log, "failed to accept pipe connection [%s] [%s]",
psp->pub.fullName, uv_strerror(status));
return;
}
struct Allocator* pipeAlloc = Allocator_child(psp->userAlloc);
struct Pipe* p = getPipe(psp, pipeAlloc);
if (p == NULL) {
Allocator_free(pipeAlloc);
return;
}
struct Client* cli = Allocator_calloc(pipeAlloc, sizeof(struct Client), 1);
cli->iface.send = incomingFromClient;
Iface_plumb(&cli->iface, &p->iface);
cli->alloc = pipeAlloc;
cli->pipe = p;
cli->psp = psp;
p->userData = cli;
Identity_set(cli);
int idx = Map_Clients_put(&cli, &psp->clients);
uint32_t handle = psp->clients.handles[idx];
Sockaddr_addrFromHandle(&cli->addr, handle);
{
// assertions
Assert_true(handle == Sockaddr_addrHandle(&cli->addr));
//printf("Handle is %x index is %d\n", handle, idx);
int idx2 = Map_Clients_indexForHandle(handle, &psp->clients);
Assert_true(idx2 == idx);
}
Allocator_onFree(pipeAlloc, removeClientOnFree, cli);
if (psp->pub.onConnection) {
psp->pub.onConnection(&psp->pub, &cli->addr);
}
cli->pipe->onClose = pipeOnClose;
}
static int onFree(struct Allocator_OnFreeJob* job)
{
struct PipeServer_pvt* psp = Identity_check((struct PipeServer_pvt*)job->userData);
for (uint32_t i = 0; i < psp->clients.count; i++) {
// The clients will expire in their own time, just cut them loose so they don't
// try to reference the mothership after it's gone.
psp->clients.values[i]->pipe->onClose = NULL;
psp->clients.values[i]->psp = NULL;
}
if (psp->server.data) {
uv_close((uv_handle_t*) &psp->server, onClose);
EventBase_wakeup(psp->base);
}
return 0;
}
static struct PipeServer_pvt* newPipeAny(struct EventBase* eb,
const char* fullPath,
struct Except* eh,
struct Log* log,
struct Allocator* userAlloc)
{
struct EventBase_pvt* ctx = EventBase_privatize(eb);
struct Allocator* alloc = Allocator_child(ctx->alloc);
struct PipeServer_pvt* psp = Allocator_clone(alloc, (&(struct PipeServer_pvt) {
.pub = {
.iface = { .alloc = alloc },
.fullName = CString_strdup(fullPath, alloc)
},
.iface = { .send = sendMessage },
.clients = { .allocator = alloc },
.base = ctx,
.alloc = alloc,
.userAlloc = userAlloc,
.log = log,
}));
psp->pub.iface.iface = &psp->iface;
int ret = uv_pipe_init(ctx->loop, &psp->server, 0);
if (ret) {
Except_throw(eh, "uv_pipe_init() failed [%s]", uv_strerror(ret));
}
Allocator_onFree(userAlloc, onFree, psp);
psp->server.data = psp;
//out->out = &out->peer;
Identity_set(psp);
return psp;
}
struct PipeServer* PipeServer_named(const char* fullPath,
struct EventBase* eb,
struct Except* eh,
struct Log* log,
struct Allocator* userAlloc)
{
struct PipeServer_pvt* out = newPipeAny(eb, fullPath, eh, log, userAlloc);
int ret = uv_pipe_bind(&out->server, out->pub.fullName);
if (ret) {
Except_throw(eh, "uv_pipe_bind() failed [%s] for pipe [%s]",
uv_strerror(ret), out->pub.fullName);
}
ret = uv_listen((uv_stream_t*) &out->server, 1, listenCallback);
if (ret) {
Except_throw(eh, "uv_listen() failed [%s] for pipe [%s]",
uv_strerror(ret), out->pub.fullName);
}
return &out->pub;
}