udp.c 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. /*
  2. * This file is part of the UCB release of Plan 9. It is subject to the license
  3. * terms in the LICENSE file found in the top-level directory of this
  4. * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
  5. * part of the UCB release of Plan 9, including this file, may be copied,
  6. * modified, propagated, or distributed except according to the terms contained
  7. * in the LICENSE file.
  8. */
  9. #include <u.h>
  10. #include <libc.h>
  11. #include <ip.h>
  12. #include <thread.h>
  13. #include <sunrpc.h>
  14. typedef struct SunMsgUdp SunMsgUdp;
  15. struct SunMsgUdp
  16. {
  17. SunMsg msg;
  18. Udphdr udp;
  19. };
  20. typedef struct Arg Arg;
  21. struct Arg
  22. {
  23. SunSrv *srv;
  24. Channel *creply;
  25. Channel *csync;
  26. int fd;
  27. };
  28. enum
  29. {
  30. UdpMaxRead = 65536+Udphdrsize
  31. };
  32. static void
  33. sunUdpRead(void *v)
  34. {
  35. int n;
  36. uint8_t *buf;
  37. Arg arg = *(Arg*)v;
  38. SunMsgUdp *msg;
  39. sendp(arg.csync, 0);
  40. buf = emalloc(UdpMaxRead);
  41. while((n = read(arg.fd, buf, UdpMaxRead)) > 0){
  42. if(arg.srv->chatty)
  43. fprint(2, "udp got %d (%d)\n", n, Udphdrsize);
  44. msg = emalloc(sizeof(SunMsgUdp));
  45. memmove(&msg->udp, buf, Udphdrsize);
  46. msg->msg.data = emalloc(n);
  47. msg->msg.count = n-Udphdrsize;
  48. memmove(msg->msg.data, buf+Udphdrsize, n-Udphdrsize);
  49. memmove(&msg->udp, buf, Udphdrsize);
  50. msg->msg.creply = arg.creply;
  51. if(arg.srv->chatty)
  52. fprint(2, "message %p count %d\n", msg, msg->msg.count);
  53. sendp(arg.srv->crequest, msg);
  54. }
  55. }
  56. static void
  57. sunUdpWrite(void *v)
  58. {
  59. uint8_t *buf;
  60. Arg arg = *(Arg*)v;
  61. SunMsgUdp *msg;
  62. sendp(arg.csync, 0);
  63. buf = emalloc(UdpMaxRead);
  64. while((msg = recvp(arg.creply)) != nil){
  65. memmove(buf+Udphdrsize, msg->msg.data, msg->msg.count);
  66. memmove(buf, &msg->udp, Udphdrsize);
  67. msg->msg.count += Udphdrsize;
  68. if(write(arg.fd, buf, msg->msg.count) != msg->msg.count)
  69. fprint(2, "udpWrite: %r\n");
  70. free(msg->msg.data);
  71. free(msg);
  72. }
  73. }
  74. int
  75. sunSrvUdp(SunSrv *srv, char *address)
  76. {
  77. int acfd, fd;
  78. char adir[40], data[60];
  79. Arg *arg;
  80. acfd = announce(address, adir);
  81. if(acfd < 0)
  82. return -1;
  83. if(write(acfd, "headers", 7) < 0){
  84. werrstr("setting headers: %r");
  85. close(acfd);
  86. return -1;
  87. }
  88. snprint(data, sizeof data, "%s/data", adir);
  89. if((fd = open(data, ORDWR)) < 0){
  90. werrstr("open %s: %r", data);
  91. close(acfd);
  92. return -1;
  93. }
  94. close(acfd);
  95. arg = emalloc(sizeof(Arg));
  96. arg->fd = fd;
  97. arg->srv = srv;
  98. arg->creply = chancreate(sizeof(SunMsg*), 10);
  99. arg->csync = chancreate(sizeof(void*), 10);
  100. proccreate(sunUdpRead, arg, SunStackSize);
  101. proccreate(sunUdpWrite, arg, SunStackSize);
  102. recvp(arg->csync);
  103. recvp(arg->csync);
  104. chanfree(arg->csync);
  105. free(arg);
  106. return 0;
  107. }