send.c 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. /*
  2. * This file is part of the UCB release of Plan 9. It is subject to the license
  3. * terms in the LICENSE file found in the top-level directory of this
  4. * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
  5. * part of the UCB release of Plan 9, including this file, may be copied,
  6. * modified, propagated, or distributed except according to the terms contained
  7. * in the LICENSE file.
  8. */
  9. #include <u.h>
  10. #include <libc.h>
  11. #include <venti.h>
  12. #include "queue.h"
  13. int32_t ventisendbytes, ventisendpackets;
  14. int32_t ventirecvbytes, ventirecvpackets;
  15. static int
  16. _vtsend(VtConn *z, Packet *p)
  17. {
  18. IOchunk ioc;
  19. int n, tot;
  20. uint8_t buf[2];
  21. if(z->state != VtStateConnected) {
  22. werrstr("session not connected");
  23. return -1;
  24. }
  25. /* add framing */
  26. n = packetsize(p);
  27. if(n >= (1<<16)) {
  28. werrstr("packet too large");
  29. packetfree(p);
  30. return -1;
  31. }
  32. buf[0] = n>>8;
  33. buf[1] = n;
  34. packetprefix(p, buf, 2);
  35. ventisendbytes += n+2;
  36. ventisendpackets++;
  37. tot = 0;
  38. for(;;){
  39. n = packetfragments(p, &ioc, 1, 0);
  40. if(n == 0)
  41. break;
  42. if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
  43. vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
  44. packetfree(p);
  45. return -1;
  46. }
  47. packetconsume(p, nil, ioc.len);
  48. tot += ioc.len;
  49. }
  50. vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
  51. packetfree(p);
  52. return 1;
  53. }
  54. static int
  55. interrupted(void)
  56. {
  57. char e[ERRMAX];
  58. rerrstr(e, sizeof e);
  59. return strstr(e, "interrupted") != nil;
  60. }
  61. static Packet*
  62. _vtrecv(VtConn *z)
  63. {
  64. uint8_t buf[10], *b;
  65. int n;
  66. Packet *p;
  67. int size, len;
  68. if(z->state != VtStateConnected) {
  69. werrstr("session not connected");
  70. return nil;
  71. }
  72. p = z->part;
  73. /* get enough for head size */
  74. size = packetsize(p);
  75. while(size < 2) {
  76. b = packettrailer(p, 2);
  77. assert(b != nil);
  78. if(0) fprint(2, "%d read hdr\n", getpid());
  79. n = read(z->infd, b, 2);
  80. if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
  81. if(n==0 || (n<0 && !interrupted()))
  82. goto Err;
  83. size += n;
  84. packettrim(p, 0, size);
  85. }
  86. if(packetconsume(p, buf, 2) < 0)
  87. goto Err;
  88. len = (buf[0] << 8) | buf[1];
  89. size -= 2;
  90. while(size < len) {
  91. n = len - size;
  92. if(n > MaxFragSize)
  93. n = MaxFragSize;
  94. b = packettrailer(p, n);
  95. if(0) fprint(2, "%d read body %d\n", getpid(), n);
  96. n = read(z->infd, b, n);
  97. if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
  98. if(n > 0)
  99. size += n;
  100. packettrim(p, 0, size);
  101. if(n==0 || (n<0 && !interrupted()))
  102. goto Err;
  103. }
  104. ventirecvbytes += len;
  105. ventirecvpackets++;
  106. p = packetsplit(p, len);
  107. vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
  108. return p;
  109. Err:
  110. vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
  111. return nil;
  112. }
  113. /*
  114. * If you fork off two procs running vtrecvproc and vtsendproc,
  115. * then vtrecv/vtsend (and thus vtrpc) will never block except on
  116. * rendevouses, which is nice when it's running in one thread of many.
  117. */
  118. void
  119. vtrecvproc(void *v)
  120. {
  121. Packet *p;
  122. VtConn *z;
  123. Queue *q;
  124. z = v;
  125. q = _vtqalloc();
  126. qlock(&z->lk);
  127. z->readq = q;
  128. qlock(&z->inlk);
  129. rwakeup(&z->rpcfork);
  130. qunlock(&z->lk);
  131. while((p = _vtrecv(z)) != nil)
  132. if(_vtqsend(q, p) < 0){
  133. packetfree(p);
  134. break;
  135. }
  136. qunlock(&z->inlk);
  137. qlock(&z->lk);
  138. _vtqhangup(q);
  139. while((p = _vtnbqrecv(q)) != nil)
  140. packetfree(p);
  141. _vtqdecref(q);
  142. z->readq = nil;
  143. rwakeup(&z->rpcfork);
  144. qunlock(&z->lk);
  145. vthangup(z);
  146. }
  147. void
  148. vtsendproc(void *v)
  149. {
  150. Queue *q;
  151. Packet *p;
  152. VtConn *z;
  153. z = v;
  154. q = _vtqalloc();
  155. qlock(&z->lk);
  156. z->writeq = q;
  157. qlock(&z->outlk);
  158. rwakeup(&z->rpcfork);
  159. qunlock(&z->lk);
  160. while((p = _vtqrecv(q)) != nil)
  161. if(_vtsend(z, p) < 0)
  162. break;
  163. qunlock(&z->outlk);
  164. qlock(&z->lk);
  165. _vtqhangup(q);
  166. while((p = _vtnbqrecv(q)) != nil)
  167. packetfree(p);
  168. _vtqdecref(q);
  169. z->writeq = nil;
  170. rwakeup(&z->rpcfork);
  171. qunlock(&z->lk);
  172. return;
  173. }
  174. Packet*
  175. vtrecv(VtConn *z)
  176. {
  177. Packet *p;
  178. Queue *q;
  179. qlock(&z->lk);
  180. if(z->state != VtStateConnected){
  181. werrstr("not connected");
  182. qunlock(&z->lk);
  183. return nil;
  184. }
  185. if(z->readq){
  186. q = _vtqincref(z->readq);
  187. qunlock(&z->lk);
  188. p = _vtqrecv(q);
  189. _vtqdecref(q);
  190. return p;
  191. }
  192. qlock(&z->inlk);
  193. qunlock(&z->lk);
  194. p = _vtrecv(z);
  195. qunlock(&z->inlk);
  196. if(!p)
  197. vthangup(z);
  198. return p;
  199. }
  200. int
  201. vtsend(VtConn *z, Packet *p)
  202. {
  203. Queue *q;
  204. qlock(&z->lk);
  205. if(z->state != VtStateConnected){
  206. packetfree(p);
  207. werrstr("not connected");
  208. qunlock(&z->lk);
  209. return -1;
  210. }
  211. if(z->writeq){
  212. q = _vtqincref(z->writeq);
  213. qunlock(&z->lk);
  214. if(_vtqsend(q, p) < 0){
  215. _vtqdecref(q);
  216. packetfree(p);
  217. return -1;
  218. }
  219. _vtqdecref(q);
  220. return 0;
  221. }
  222. qlock(&z->outlk);
  223. qunlock(&z->lk);
  224. if(_vtsend(z, p) < 0){
  225. qunlock(&z->outlk);
  226. vthangup(z);
  227. return -1;
  228. }
  229. qunlock(&z->outlk);
  230. return 0;
  231. }