123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- /* vim: set expandtab ts=4 sw=4: */
- /*
- * You may redistribute this program and/or modify it under the terms of
- * the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <https://www.gnu.org/licenses/>.
- */
- #include "util/events/libuv/UvWrapper.h"
- #include "benc/String.h"
- #include "exception/Er.h"
- #include "memory/Allocator.h"
- #include "util/events/Pipe.h"
- #include "util/events/PipeServer.h"
- #include "util/events/libuv/Pipe_pvt.h"
- #include "util/events/libuv/EventBase_pvt.h"
- #include "util/log/Log.h"
- #include "util/Identity.h"
- #include "util/CString.h"
- #include "wire/Message.h"
- #include "wire/Error.h"
- #include <inttypes.h>
- #include <libgen.h>
- #include <stdio.h>
- #include <unistd.h>
- #include <string.h>
- struct PipeServer_pvt;
- struct Client
- {
- struct Iface iface;
- struct Pipe* pipe;
- struct PipeServer_pvt* psp;
- struct Allocator* alloc;
- struct Sockaddr addr;
- Identity
- };
- #define Map_NAME Clients
- #define Map_ENABLE_HANDLES
- #define Map_VALUE_TYPE struct Client*
- #include "util/Map.h"
- struct PipeServer_pvt
- {
- struct PipeServer pub;
- uv_pipe_t server;
- Iface_t iface;
- struct Map_Clients clients;
- struct Allocator_OnFreeJob* closeHandlesOnFree;
- struct Allocator* alloc;
- struct Allocator* userAlloc;
- struct EventBase_pvt* base;
- struct Log* log;
- uint32_t nextHandle;
- Identity
- };
- static Iface_DEFUN sendMessage(struct Message* m, struct Iface* iface)
- {
- struct PipeServer_pvt* psp = Identity_containerOf(iface, struct PipeServer_pvt, iface);
- struct Sockaddr* addr = Er_assert(AddrIface_popAddr(m));
- uint32_t handle = Sockaddr_addrHandle(addr);
- int idx = Map_Clients_indexForHandle(handle, &psp->clients);
- if (idx < 0) {
- Log_warn(psp->log, "Attempted to send a message to client [0x%x] which is gone", handle);
- return Error(m, "UNHANDLED");
- }
- struct Client* cli = psp->clients.values[idx];
- return Iface_next(&cli->iface, m);
- }
- static Iface_DEFUN incomingFromClient(struct Message* msg, struct Iface* iface)
- {
- struct Client* cli = Identity_containerOf(iface, struct Client, iface);
- if (!cli->psp) { return NULL; }
- struct PipeServer_pvt* psp = Identity_check(cli->psp);
- Er_assert(AddrIface_pushAddr(msg, &cli->addr));
- return Iface_next(psp->pub.iface.iface, msg);
- }
- /** Asynchronous allocator freeing. */
- static void onClose(uv_handle_t* handle)
- {
- struct PipeServer_pvt* psp = Identity_check((struct PipeServer_pvt*)handle->data);
- Allocator_free(psp->alloc);
- }
- static struct Pipe* getPipe(struct PipeServer_pvt* psp, struct Allocator* alloc)
- {
- struct Er_Ret* er = NULL;
- struct Pipe* out = Er_check(&er, Pipe_serverAccept(
- &psp->server, psp->pub.fullName, &psp->base->pub, psp->log, alloc));
- if (er) {
- Log_warn(psp->log, "failed to connect to client on pipe [%s] [%s]",
- psp->pub.fullName, er->message);
- }
- return out;
- }
- static int removeClientOnFree(struct Allocator_OnFreeJob* job)
- {
- struct Client* cli = Identity_check((struct Client*)job->userData);
- if (cli->psp != NULL) {
- struct PipeServer_pvt* psp = Identity_check(cli->psp);
- uint32_t handle = Sockaddr_addrHandle(&cli->addr);
- int idx = Map_Clients_indexForHandle(handle, &psp->clients);
- if (idx > -1) {
- Map_Clients_remove(idx, &psp->clients);
- }
- }
- return 0;
- }
- static void pipeOnClose(struct Pipe* p, int status)
- {
- struct Client* cli = Identity_check((struct Client*) p->userData);
- struct PipeServer_pvt* psp = Identity_check(cli->psp);
- if (psp->pub.onDisconnection) {
- psp->pub.onDisconnection(&psp->pub, &cli->addr);
- }
- Allocator_free(cli->alloc);
- }
- static void listenCallback(uv_stream_t* server, int status)
- {
- uv_pipe_t* pServer = (uv_pipe_t*) server;
- struct PipeServer_pvt* psp = Identity_containerOf(pServer, struct PipeServer_pvt, server);
- if (status == -1) {
- Log_info(psp->log, "failed to accept pipe connection [%s] [%s]",
- psp->pub.fullName, uv_strerror(status));
- return;
- }
- struct Allocator* pipeAlloc = Allocator_child(psp->userAlloc);
- struct Pipe* p = getPipe(psp, pipeAlloc);
- if (p == NULL) {
- Allocator_free(pipeAlloc);
- return;
- }
- struct Client* cli = Allocator_calloc(pipeAlloc, sizeof(struct Client), 1);
- cli->iface.send = incomingFromClient;
- Iface_plumb(&cli->iface, &p->iface);
- cli->alloc = pipeAlloc;
- cli->pipe = p;
- cli->psp = psp;
- p->userData = cli;
- Identity_set(cli);
- int idx = Map_Clients_put(&cli, &psp->clients);
- uint32_t handle = psp->clients.handles[idx];
- Sockaddr_addrFromHandle(&cli->addr, handle);
- {
- // assertions
- Assert_true(handle == Sockaddr_addrHandle(&cli->addr));
- //printf("Handle is %x index is %d\n", handle, idx);
- int idx2 = Map_Clients_indexForHandle(handle, &psp->clients);
- Assert_true(idx2 == idx);
- }
- Allocator_onFree(pipeAlloc, removeClientOnFree, cli);
- if (psp->pub.onConnection) {
- psp->pub.onConnection(&psp->pub, &cli->addr);
- }
- cli->pipe->onClose = pipeOnClose;
- }
- static int onFree(struct Allocator_OnFreeJob* job)
- {
- struct PipeServer_pvt* psp = Identity_check((struct PipeServer_pvt*)job->userData);
- for (uint32_t i = 0; i < psp->clients.count; i++) {
- // The clients will expire in their own time, just cut them loose so they don't
- // try to reference the mothership after it's gone.
- psp->clients.values[i]->pipe->onClose = NULL;
- psp->clients.values[i]->psp = NULL;
- }
- if (psp->server.data) {
- uv_close((uv_handle_t*) &psp->server, onClose);
- EventBase_wakeup(psp->base);
- }
- return 0;
- }
- static struct PipeServer_pvt* newPipeAny(struct EventBase* eb,
- const char* fullPath,
- struct Except* eh,
- struct Log* log,
- struct Allocator* userAlloc)
- {
- struct EventBase_pvt* ctx = EventBase_privatize(eb);
- struct Allocator* alloc = Allocator_child(ctx->alloc);
- struct PipeServer_pvt* psp = Allocator_clone(alloc, (&(struct PipeServer_pvt) {
- .pub = {
- .iface = { .alloc = alloc },
- .fullName = CString_strdup(fullPath, alloc)
- },
- .iface = { .send = sendMessage },
- .clients = { .allocator = alloc },
- .base = ctx,
- .alloc = alloc,
- .userAlloc = userAlloc,
- .log = log,
- }));
- psp->pub.iface.iface = &psp->iface;
- int ret = uv_pipe_init(ctx->loop, &psp->server, 0);
- if (ret) {
- Except_throw(eh, "uv_pipe_init() failed [%s]", uv_strerror(ret));
- }
- Allocator_onFree(userAlloc, onFree, psp);
- psp->server.data = psp;
- //out->out = &out->peer;
- Identity_set(psp);
- return psp;
- }
- struct PipeServer* PipeServer_named(const char* fullPath,
- struct EventBase* eb,
- struct Except* eh,
- struct Log* log,
- struct Allocator* userAlloc)
- {
- struct PipeServer_pvt* out = newPipeAny(eb, fullPath, eh, log, userAlloc);
- int ret = uv_pipe_bind(&out->server, out->pub.fullName);
- if (ret) {
- Except_throw(eh, "uv_pipe_bind() failed [%s] for pipe [%s]",
- uv_strerror(ret), out->pub.fullName);
- }
- ret = uv_listen((uv_stream_t*) &out->server, 1, listenCallback);
- if (ret) {
- Except_throw(eh, "uv_listen() failed [%s] for pipe [%s]",
- uv_strerror(ret), out->pub.fullName);
- }
- return &out->pub;
- }
|