rpc.c 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. /*
  2. * This file is part of the UCB release of Plan 9. It is subject to the license
  3. * terms in the LICENSE file found in the top-level directory of this
  4. * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
  5. * part of the UCB release of Plan 9, including this file, may be copied,
  6. * modified, propagated, or distributed except according to the terms contained
  7. * in the LICENSE file.
  8. */
  9. /*
  10. * Multiplexed Venti client. It would be nice if we
  11. * could turn this into a generic library routine rather
  12. * than keep it Venti specific. A user-level 9P client
  13. * could use something like this too.
  14. *
  15. * (Actually it does - this should be replaced with libmux,
  16. * which should be renamed librpcmux.)
  17. *
  18. * This is a little more complicated than it might be
  19. * because we want it to work well within and without libthread.
  20. *
  21. * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
  22. */
  23. #include <u.h>
  24. #include <libc.h>
  25. #include <venti.h>
  26. typedef struct Rwait Rwait;
  27. struct Rwait
  28. {
  29. Rendez r;
  30. Packet *p;
  31. int done;
  32. int sleeping;
  33. };
  34. static int gettag(VtConn*, Rwait*);
  35. static void puttag(VtConn*, Rwait*, int);
  36. static void muxrpc(VtConn*, Packet*);
  37. Packet*
  38. _vtrpc(VtConn *z, Packet *p, VtFcall *tx)
  39. {
  40. int i;
  41. uint8_t tag, buf[2], *top;
  42. Rwait *r, *rr;
  43. if(z == nil){
  44. werrstr("not connected");
  45. packetfree(p);
  46. return nil;
  47. }
  48. /* must malloc because stack could be private */
  49. r = vtmallocz(sizeof(Rwait));
  50. qlock(&z->lk);
  51. r->r.l = &z->lk;
  52. tag = gettag(z, r);
  53. if(tx){
  54. /* vtfcallrpc can't print packet because it doesn't have tag */
  55. tx->tag = tag;
  56. if(chattyventi)
  57. fprint(2, "%s -> %F\n", argv0, tx);
  58. }
  59. /* slam tag into packet */
  60. top = packetpeek(p, buf, 0, 2);
  61. if(top == nil){
  62. packetfree(p);
  63. return nil;
  64. }
  65. if(top == buf){
  66. werrstr("first two bytes must be in same packet fragment");
  67. packetfree(p);
  68. vtfree(r);
  69. return nil;
  70. }
  71. top[1] = tag;
  72. qunlock(&z->lk);
  73. if(vtsend(z, p) < 0){
  74. vtfree(r);
  75. return nil;
  76. }
  77. qlock(&z->lk);
  78. /* wait for the muxer to give us our packet */
  79. r->sleeping = 1;
  80. z->nsleep++;
  81. while(z->muxer && !r->done)
  82. rsleep(&r->r);
  83. z->nsleep--;
  84. r->sleeping = 0;
  85. /* if not done, there's no muxer: start muxing */
  86. if(!r->done){
  87. if(z->muxer)
  88. abort();
  89. z->muxer = 1;
  90. while(!r->done){
  91. qunlock(&z->lk);
  92. if((p = vtrecv(z)) == nil){
  93. werrstr("unexpected eof on venti connection");
  94. z->muxer = 0;
  95. vtfree(r);
  96. return nil;
  97. }
  98. qlock(&z->lk);
  99. muxrpc(z, p);
  100. }
  101. z->muxer = 0;
  102. /* if there is anyone else sleeping, wake first unfinished to mux */
  103. if(z->nsleep)
  104. for(i=0; i<256; i++){
  105. rr = z->wait[i];
  106. if(rr && rr->sleeping && !rr->done){
  107. rwakeup(&rr->r);
  108. break;
  109. }
  110. }
  111. }
  112. p = r->p;
  113. puttag(z, r, tag);
  114. vtfree(r);
  115. qunlock(&z->lk);
  116. return p;
  117. }
  118. Packet*
  119. vtrpc(VtConn *z, Packet *p)
  120. {
  121. return _vtrpc(z, p, nil);
  122. }
  123. static int
  124. gettag(VtConn *z, Rwait *r)
  125. {
  126. int i;
  127. Again:
  128. while(z->ntag == 256)
  129. rsleep(&z->tagrend);
  130. for(i=0; i<256; i++)
  131. if(z->wait[i] == 0){
  132. z->ntag++;
  133. z->wait[i] = r;
  134. return i;
  135. }
  136. fprint(2, "libventi: ntag botch\n");
  137. goto Again;
  138. }
  139. static void
  140. puttag(VtConn *z, Rwait *r, int tag)
  141. {
  142. assert(z->wait[tag] == r);
  143. z->wait[tag] = nil;
  144. z->ntag--;
  145. rwakeup(&z->tagrend);
  146. }
  147. static void
  148. muxrpc(VtConn *z, Packet *p)
  149. {
  150. uint8_t tag, buf[2], *top;
  151. Rwait *r;
  152. if((top = packetpeek(p, buf, 0, 2)) == nil){
  153. fprint(2, "libventi: short packet in vtrpc\n");
  154. packetfree(p);
  155. return;
  156. }
  157. tag = top[1];
  158. if((r = z->wait[tag]) == nil){
  159. fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
  160. abort();
  161. packetfree(p);
  162. return;
  163. }
  164. r->p = p;
  165. r->done = 1;
  166. rwakeup(&r->r);
  167. }