PipeServer.c 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "util/events/libuv/UvWrapper.h"
  16. #include "benc/String.h"
  17. #include "exception/Er.h"
  18. #include "memory/Allocator.h"
  19. #include "util/events/Pipe.h"
  20. #include "util/events/PipeServer.h"
  21. #include "util/events/libuv/Pipe_pvt.h"
  22. #include "util/events/libuv/EventBase_pvt.h"
  23. #include "util/log/Log.h"
  24. #include "util/Identity.h"
  25. #include "util/CString.h"
  26. #include "wire/Message.h"
  27. #include "wire/Error.h"
  28. #include <inttypes.h>
  29. #include <libgen.h>
  30. #include <stdio.h>
  31. #include <unistd.h>
  32. #include <string.h>
  33. struct PipeServer_pvt;
  34. struct Client
  35. {
  36. struct Iface iface;
  37. struct Pipe* pipe;
  38. struct PipeServer_pvt* psp;
  39. struct Allocator* alloc;
  40. struct Sockaddr addr;
  41. Identity
  42. };
  43. #define Map_NAME Clients
  44. #define Map_ENABLE_HANDLES
  45. #define Map_VALUE_TYPE struct Client*
  46. #include "util/Map.h"
  47. struct PipeServer_pvt
  48. {
  49. struct PipeServer pub;
  50. uv_pipe_t server;
  51. struct Map_Clients clients;
  52. struct Allocator_OnFreeJob* closeHandlesOnFree;
  53. struct Allocator* alloc;
  54. struct EventBase_pvt* base;
  55. struct Log* log;
  56. uint32_t nextHandle;
  57. Identity
  58. };
  59. static Iface_DEFUN sendMessage(struct Message* m, struct Iface* iface)
  60. {
  61. struct PipeServer_pvt* psp = Identity_check((struct PipeServer_pvt*) iface);
  62. struct Sockaddr* addr = Er_assert(AddrIface_popAddr(m));
  63. uint32_t handle = Sockaddr_addrHandle(addr);
  64. int idx = Map_Clients_indexForHandle(handle, &psp->clients);
  65. if (idx < 0) {
  66. Log_warn(psp->log, "Attempted to send a message to client [0x%x] which is gone", handle);
  67. return Error(UNHANDLED);
  68. }
  69. struct Client* cli = psp->clients.values[idx];
  70. return Iface_next(&cli->iface, m);
  71. }
  72. static Iface_DEFUN incomingFromClient(struct Message* msg, struct Iface* iface)
  73. {
  74. struct Client* cli = Identity_containerOf(iface, struct Client, iface);
  75. struct PipeServer_pvt* psp = Identity_check(cli->psp);
  76. Er_assert(AddrIface_pushAddr(msg, &cli->addr));
  77. return Iface_next(&psp->pub.iface.iface, msg);
  78. }
  79. /** Asynchronous allocator freeing. */
  80. static void onClose(uv_handle_t* handle)
  81. {
  82. struct PipeServer_pvt* psp = Identity_check((struct PipeServer_pvt*)handle->data);
  83. handle->data = NULL;
  84. if (psp->closeHandlesOnFree && !psp->server.data) {
  85. Allocator_onFreeComplete((struct Allocator_OnFreeJob*) psp->closeHandlesOnFree);
  86. }
  87. }
  88. static struct Pipe* getPipe(struct PipeServer_pvt* psp, struct Allocator* alloc)
  89. {
  90. struct Er_Ret* er = NULL;
  91. struct Pipe* out = Er_check(&er, Pipe_serverAccept(
  92. &psp->server, psp->pub.fullName, &psp->base->pub, psp->log, alloc));
  93. if (er) {
  94. Log_warn(psp->log, "failed to connect to client on pipe [%s] [%s]",
  95. psp->pub.fullName, er->message);
  96. }
  97. return out;
  98. }
  99. static int removeClientOnFree(struct Allocator_OnFreeJob* job)
  100. {
  101. struct Client* cli = Identity_check((struct Client*)job->userData);
  102. struct PipeServer_pvt* psp = Identity_check(cli->psp);
  103. uint32_t handle = Sockaddr_addrHandle(&cli->addr);
  104. int idx = Map_Clients_indexForHandle(handle, &psp->clients);
  105. if (idx > -1) {
  106. Map_Clients_remove(idx, &psp->clients);
  107. }
  108. return 0;
  109. }
  110. static void pipeOnClose(struct Pipe* p, int status)
  111. {
  112. struct Client* cli = Identity_check((struct Client*) p->userData);
  113. struct PipeServer_pvt* psp = Identity_check(cli->psp);
  114. if (psp->pub.onDisconnection) {
  115. psp->pub.onDisconnection(&psp->pub, &cli->addr);
  116. }
  117. Allocator_free(cli->alloc);
  118. }
  119. static void listenCallback(uv_stream_t* server, int status)
  120. {
  121. uv_pipe_t* pServer = (uv_pipe_t*) server;
  122. struct PipeServer_pvt* psp = Identity_containerOf(pServer, struct PipeServer_pvt, server);
  123. if (status == -1) {
  124. Log_info(psp->log, "failed to accept pipe connection [%s] [%s]",
  125. psp->pub.fullName, uv_strerror(status));
  126. return;
  127. }
  128. struct Allocator* pipeAlloc = Allocator_child(psp->alloc);
  129. struct Pipe* p = getPipe(psp, pipeAlloc);
  130. if (p == NULL) {
  131. Allocator_free(pipeAlloc);
  132. return;
  133. }
  134. struct Client* cli = Allocator_calloc(pipeAlloc, sizeof(struct Client), 1);
  135. cli->iface.send = incomingFromClient;
  136. Iface_plumb(&cli->iface, &p->iface);
  137. cli->alloc = pipeAlloc;
  138. cli->pipe = p;
  139. cli->psp = psp;
  140. p->userData = cli;
  141. Identity_set(cli);
  142. int idx = Map_Clients_put(&cli, &psp->clients);
  143. uint32_t handle = psp->clients.handles[idx];
  144. Sockaddr_addrFromHandle(&cli->addr, handle);
  145. {
  146. // assertions
  147. Assert_true(handle == Sockaddr_addrHandle(&cli->addr));
  148. //printf("Handle is %x index is %d\n", handle, idx);
  149. int idx2 = Map_Clients_indexForHandle(handle, &psp->clients);
  150. Assert_true(idx2 == idx);
  151. }
  152. Allocator_onFree(pipeAlloc, removeClientOnFree, cli);
  153. if (psp->pub.onConnection) {
  154. psp->pub.onConnection(&psp->pub, &cli->addr);
  155. }
  156. cli->pipe->onClose = pipeOnClose;
  157. }
  158. static int closeHandlesOnFree(struct Allocator_OnFreeJob* job)
  159. {
  160. struct PipeServer_pvt* psp = Identity_check((struct PipeServer_pvt*)job->userData);
  161. psp->closeHandlesOnFree = job;
  162. if (psp->server.data) {
  163. uv_close((uv_handle_t*) &psp->server, onClose);
  164. return Allocator_ONFREE_ASYNC;
  165. }
  166. return 0;
  167. }
  168. static struct PipeServer_pvt* newPipeAny(struct EventBase* eb,
  169. const char* fullPath,
  170. struct Except* eh,
  171. struct Log* log,
  172. struct Allocator* userAlloc)
  173. {
  174. struct EventBase_pvt* ctx = EventBase_privatize(eb);
  175. struct Allocator* alloc = Allocator_child(userAlloc);
  176. struct PipeServer_pvt* psp = Allocator_clone(alloc, (&(struct PipeServer_pvt) {
  177. .pub = {
  178. .iface = {
  179. .iface = {
  180. .send = sendMessage
  181. },
  182. .alloc = alloc,
  183. },
  184. .fullName = CString_strdup(fullPath, alloc),
  185. },
  186. .clients = {
  187. .allocator = alloc
  188. },
  189. .base = ctx,
  190. .alloc = alloc,
  191. .log = log,
  192. }));
  193. int ret = uv_pipe_init(ctx->loop, &psp->server, 0);
  194. if (ret) {
  195. Except_throw(eh, "uv_pipe_init() failed [%s]", uv_strerror(ret));
  196. }
  197. Allocator_onFree(alloc, closeHandlesOnFree, psp);
  198. psp->server.data = psp;
  199. //out->out = &out->peer;
  200. Identity_set(psp);
  201. return psp;
  202. }
  203. struct PipeServer* PipeServer_named(const char* fullPath,
  204. struct EventBase* eb,
  205. struct Except* eh,
  206. struct Log* log,
  207. struct Allocator* userAlloc)
  208. {
  209. struct PipeServer_pvt* out = newPipeAny(eb, fullPath, eh, log, userAlloc);
  210. int ret = uv_pipe_bind(&out->server, out->pub.fullName);
  211. if (ret) {
  212. Except_throw(eh, "uv_pipe_bind() failed [%s] for pipe [%s]",
  213. uv_strerror(ret), out->pub.fullName);
  214. }
  215. ret = uv_listen((uv_stream_t*) &out->server, 1, listenCallback);
  216. if (ret) {
  217. Except_throw(eh, "uv_listen() failed [%s] for pipe [%s]",
  218. uv_strerror(ret), out->pub.fullName);
  219. }
  220. return &out->pub;
  221. }