PipeServer.c 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "util/events/libuv/UvWrapper.h"
  16. #include "benc/String.h"
  17. #include "exception/Er.h"
  18. #include "memory/Allocator.h"
  19. #include "util/events/Pipe.h"
  20. #include "util/events/PipeServer.h"
  21. #include "util/events/libuv/Pipe_pvt.h"
  22. #include "util/events/libuv/EventBase_pvt.h"
  23. #include "util/log/Log.h"
  24. #include "util/Identity.h"
  25. #include "util/CString.h"
  26. #include "wire/Message.h"
  27. #include "wire/Error.h"
  28. #include <inttypes.h>
  29. #include <libgen.h>
  30. #include <stdio.h>
  31. #include <unistd.h>
  32. #include <string.h>
  33. struct PipeServer_pvt;
  34. struct Client
  35. {
  36. struct Iface iface;
  37. struct Pipe* pipe;
  38. struct PipeServer_pvt* psp;
  39. struct Allocator* alloc;
  40. struct Sockaddr addr;
  41. Identity
  42. };
  43. #define Map_NAME Clients
  44. #define Map_ENABLE_HANDLES
  45. #define Map_VALUE_TYPE struct Client*
  46. #include "util/Map.h"
  47. struct PipeServer_pvt
  48. {
  49. struct PipeServer pub;
  50. uv_pipe_t server;
  51. Iface_t iface;
  52. struct Map_Clients clients;
  53. struct Allocator_OnFreeJob* closeHandlesOnFree;
  54. struct Allocator* alloc;
  55. struct Allocator* userAlloc;
  56. struct EventBase_pvt* base;
  57. struct Log* log;
  58. uint32_t nextHandle;
  59. Identity
  60. };
  61. static Iface_DEFUN sendMessage(struct Message* m, struct Iface* iface)
  62. {
  63. struct PipeServer_pvt* psp = Identity_containerOf(iface, struct PipeServer_pvt, iface);
  64. struct Sockaddr* addr = Er_assert(AddrIface_popAddr(m));
  65. uint32_t handle = Sockaddr_addrHandle(addr);
  66. int idx = Map_Clients_indexForHandle(handle, &psp->clients);
  67. if (idx < 0) {
  68. Log_warn(psp->log, "Attempted to send a message to client [0x%x] which is gone", handle);
  69. return Error(m, "UNHANDLED");
  70. }
  71. struct Client* cli = psp->clients.values[idx];
  72. return Iface_next(&cli->iface, m);
  73. }
  74. static Iface_DEFUN incomingFromClient(struct Message* msg, struct Iface* iface)
  75. {
  76. struct Client* cli = Identity_containerOf(iface, struct Client, iface);
  77. if (!cli->psp) { return NULL; }
  78. struct PipeServer_pvt* psp = Identity_check(cli->psp);
  79. Er_assert(AddrIface_pushAddr(msg, &cli->addr));
  80. return Iface_next(psp->pub.iface.iface, msg);
  81. }
  82. /** Asynchronous allocator freeing. */
  83. static void onClose(uv_handle_t* handle)
  84. {
  85. struct PipeServer_pvt* psp = Identity_check((struct PipeServer_pvt*)handle->data);
  86. Allocator_free(psp->alloc);
  87. }
  88. static struct Pipe* getPipe(struct PipeServer_pvt* psp, struct Allocator* alloc)
  89. {
  90. struct Er_Ret* er = NULL;
  91. struct Pipe* out = Er_check(&er, Pipe_serverAccept(
  92. &psp->server, psp->pub.fullName, &psp->base->pub, psp->log, alloc));
  93. if (er) {
  94. Log_warn(psp->log, "failed to connect to client on pipe [%s] [%s]",
  95. psp->pub.fullName, er->message);
  96. }
  97. return out;
  98. }
  99. static int removeClientOnFree(struct Allocator_OnFreeJob* job)
  100. {
  101. struct Client* cli = Identity_check((struct Client*)job->userData);
  102. if (cli->psp != NULL) {
  103. struct PipeServer_pvt* psp = Identity_check(cli->psp);
  104. uint32_t handle = Sockaddr_addrHandle(&cli->addr);
  105. int idx = Map_Clients_indexForHandle(handle, &psp->clients);
  106. if (idx > -1) {
  107. Map_Clients_remove(idx, &psp->clients);
  108. }
  109. }
  110. return 0;
  111. }
  112. static void pipeOnClose(struct Pipe* p, int status)
  113. {
  114. struct Client* cli = Identity_check((struct Client*) p->userData);
  115. struct PipeServer_pvt* psp = Identity_check(cli->psp);
  116. if (psp->pub.onDisconnection) {
  117. psp->pub.onDisconnection(&psp->pub, &cli->addr);
  118. }
  119. Allocator_free(cli->alloc);
  120. }
  121. static void listenCallback(uv_stream_t* server, int status)
  122. {
  123. uv_pipe_t* pServer = (uv_pipe_t*) server;
  124. struct PipeServer_pvt* psp = Identity_containerOf(pServer, struct PipeServer_pvt, server);
  125. if (status == -1) {
  126. Log_info(psp->log, "failed to accept pipe connection [%s] [%s]",
  127. psp->pub.fullName, uv_strerror(status));
  128. return;
  129. }
  130. struct Allocator* pipeAlloc = Allocator_child(psp->userAlloc);
  131. struct Pipe* p = getPipe(psp, pipeAlloc);
  132. if (p == NULL) {
  133. Allocator_free(pipeAlloc);
  134. return;
  135. }
  136. struct Client* cli = Allocator_calloc(pipeAlloc, sizeof(struct Client), 1);
  137. cli->iface.send = incomingFromClient;
  138. Iface_plumb(&cli->iface, &p->iface);
  139. cli->alloc = pipeAlloc;
  140. cli->pipe = p;
  141. cli->psp = psp;
  142. p->userData = cli;
  143. Identity_set(cli);
  144. int idx = Map_Clients_put(&cli, &psp->clients);
  145. uint32_t handle = psp->clients.handles[idx];
  146. Sockaddr_addrFromHandle(&cli->addr, handle);
  147. {
  148. // assertions
  149. Assert_true(handle == Sockaddr_addrHandle(&cli->addr));
  150. //printf("Handle is %x index is %d\n", handle, idx);
  151. int idx2 = Map_Clients_indexForHandle(handle, &psp->clients);
  152. Assert_true(idx2 == idx);
  153. }
  154. Allocator_onFree(pipeAlloc, removeClientOnFree, cli);
  155. if (psp->pub.onConnection) {
  156. psp->pub.onConnection(&psp->pub, &cli->addr);
  157. }
  158. cli->pipe->onClose = pipeOnClose;
  159. }
  160. static int onFree(struct Allocator_OnFreeJob* job)
  161. {
  162. struct PipeServer_pvt* psp = Identity_check((struct PipeServer_pvt*)job->userData);
  163. for (uint32_t i = 0; i < psp->clients.count; i++) {
  164. // The clients will expire in their own time, just cut them loose so they don't
  165. // try to reference the mothership after it's gone.
  166. psp->clients.values[i]->pipe->onClose = NULL;
  167. psp->clients.values[i]->psp = NULL;
  168. }
  169. if (psp->server.data) {
  170. uv_close((uv_handle_t*) &psp->server, onClose);
  171. EventBase_wakeup(psp->base);
  172. }
  173. return 0;
  174. }
  175. static struct PipeServer_pvt* newPipeAny(struct EventBase* eb,
  176. const char* fullPath,
  177. struct Except* eh,
  178. struct Log* log,
  179. struct Allocator* userAlloc)
  180. {
  181. struct EventBase_pvt* ctx = EventBase_privatize(eb);
  182. struct Allocator* alloc = Allocator_child(ctx->alloc);
  183. struct PipeServer_pvt* psp = Allocator_clone(alloc, (&(struct PipeServer_pvt) {
  184. .pub = {
  185. .iface = { .alloc = alloc },
  186. .fullName = CString_strdup(fullPath, alloc)
  187. },
  188. .iface = { .send = sendMessage },
  189. .clients = { .allocator = alloc },
  190. .base = ctx,
  191. .alloc = alloc,
  192. .userAlloc = userAlloc,
  193. .log = log,
  194. }));
  195. psp->pub.iface.iface = &psp->iface;
  196. int ret = uv_pipe_init(ctx->loop, &psp->server, 0);
  197. if (ret) {
  198. Except_throw(eh, "uv_pipe_init() failed [%s]", uv_strerror(ret));
  199. }
  200. Allocator_onFree(userAlloc, onFree, psp);
  201. psp->server.data = psp;
  202. //out->out = &out->peer;
  203. Identity_set(psp);
  204. return psp;
  205. }
  206. struct PipeServer* PipeServer_named(const char* fullPath,
  207. struct EventBase* eb,
  208. struct Except* eh,
  209. struct Log* log,
  210. struct Allocator* userAlloc)
  211. {
  212. struct PipeServer_pvt* out = newPipeAny(eb, fullPath, eh, log, userAlloc);
  213. int ret = uv_pipe_bind(&out->server, out->pub.fullName);
  214. if (ret) {
  215. Except_throw(eh, "uv_pipe_bind() failed [%s] for pipe [%s]",
  216. uv_strerror(ret), out->pub.fullName);
  217. }
  218. ret = uv_listen((uv_stream_t*) &out->server, 1, listenCallback);
  219. if (ret) {
  220. Except_throw(eh, "uv_listen() failed [%s] for pipe [%s]",
  221. uv_strerror(ret), out->pub.fullName);
  222. }
  223. return &out->pub;
  224. }