udp.c 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. #include <u.h>
  2. #include <libc.h>
  3. #include <ip.h>
  4. #include <thread.h>
  5. #include <sunrpc.h>
  6. typedef struct SunMsgUdp SunMsgUdp;
  7. struct SunMsgUdp
  8. {
  9. SunMsg msg;
  10. Udphdr udp;
  11. };
  12. typedef struct Arg Arg;
  13. struct Arg
  14. {
  15. SunSrv *srv;
  16. Channel *creply;
  17. Channel *csync;
  18. int fd;
  19. };
  20. enum
  21. {
  22. UdpMaxRead = 65536+Udphdrsize
  23. };
  24. static void
  25. sunUdpRead(void *v)
  26. {
  27. int n;
  28. uchar *buf;
  29. Arg arg = *(Arg*)v;
  30. SunMsgUdp *msg;
  31. sendp(arg.csync, 0);
  32. buf = emalloc(UdpMaxRead);
  33. while((n = read(arg.fd, buf, UdpMaxRead)) > 0){
  34. if(arg.srv->chatty)
  35. fprint(2, "udp got %d (%d)\n", n, Udphdrsize);
  36. msg = emalloc(sizeof(SunMsgUdp));
  37. memmove(&msg->udp, buf, Udphdrsize);
  38. msg->msg.data = emalloc(n);
  39. msg->msg.count = n-Udphdrsize;
  40. memmove(msg->msg.data, buf+Udphdrsize, n-Udphdrsize);
  41. memmove(&msg->udp, buf, Udphdrsize);
  42. msg->msg.creply = arg.creply;
  43. if(arg.srv->chatty)
  44. fprint(2, "message %p count %d\n", msg, msg->msg.count);
  45. sendp(arg.srv->crequest, msg);
  46. }
  47. }
  48. static void
  49. sunUdpWrite(void *v)
  50. {
  51. uchar *buf;
  52. Arg arg = *(Arg*)v;
  53. SunMsgUdp *msg;
  54. sendp(arg.csync, 0);
  55. buf = emalloc(UdpMaxRead);
  56. while((msg = recvp(arg.creply)) != nil){
  57. memmove(buf+Udphdrsize, msg->msg.data, msg->msg.count);
  58. memmove(buf, &msg->udp, Udphdrsize);
  59. msg->msg.count += Udphdrsize;
  60. if(write(arg.fd, buf, msg->msg.count) != msg->msg.count)
  61. fprint(2, "udpWrite: %r\n");
  62. free(msg->msg.data);
  63. free(msg);
  64. }
  65. }
  66. int
  67. sunSrvUdp(SunSrv *srv, char *address)
  68. {
  69. int acfd, fd;
  70. char adir[40], data[60];
  71. Arg *arg;
  72. acfd = announce(address, adir);
  73. if(acfd < 0)
  74. return -1;
  75. if(write(acfd, "headers", 7) < 0){
  76. werrstr("setting headers: %r");
  77. close(acfd);
  78. return -1;
  79. }
  80. write(acfd, "oldheaders", 10);
  81. snprint(data, sizeof data, "%s/data", adir);
  82. if((fd = open(data, ORDWR)) < 0){
  83. werrstr("open %s: %r", data);
  84. close(acfd);
  85. return -1;
  86. }
  87. close(acfd);
  88. arg = emalloc(sizeof(Arg));
  89. arg->fd = fd;
  90. arg->srv = srv;
  91. arg->creply = chancreate(sizeof(SunMsg*), 10);
  92. arg->csync = chancreate(sizeof(void*), 10);
  93. proccreate(sunUdpRead, arg, SunStackSize);
  94. proccreate(sunUdpWrite, arg, SunStackSize);
  95. recvp(arg->csync);
  96. recvp(arg->csync);
  97. chanfree(arg->csync);
  98. free(arg);
  99. return 0;
  100. }