rpc.c 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. /*
  2. * Multiplexed Venti client. It would be nice if we
  3. * could turn this into a generic library routine rather
  4. * than keep it Venti specific. A user-level 9P client
  5. * could use something like this too.
  6. *
  7. * (Actually it does - this should be replaced with libmux,
  8. * which should be renamed librpcmux.)
  9. *
  10. * This is a little more complicated than it might be
  11. * because we want it to work well within and without libthread.
  12. *
  13. * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
  14. */
  15. #include <u.h>
  16. #include <libc.h>
  17. #include <venti.h>
  18. typedef struct Rwait Rwait;
  19. struct Rwait
  20. {
  21. Rendez r;
  22. Packet *p;
  23. int done;
  24. int sleeping;
  25. };
  26. static int gettag(VtConn*, Rwait*);
  27. static void puttag(VtConn*, Rwait*, int);
  28. static void muxrpc(VtConn*, Packet*);
  29. Packet*
  30. _vtrpc(VtConn *z, Packet *p, VtFcall *tx)
  31. {
  32. int i;
  33. uchar tag, buf[2], *top;
  34. Rwait *r, *rr;
  35. if(z == nil){
  36. werrstr("not connected");
  37. packetfree(p);
  38. return nil;
  39. }
  40. /* must malloc because stack could be private */
  41. r = vtmallocz(sizeof(Rwait));
  42. qlock(&z->lk);
  43. r->r.l = &z->lk;
  44. tag = gettag(z, r);
  45. if(tx){
  46. /* vtfcallrpc can't print packet because it doesn't have tag */
  47. tx->tag = tag;
  48. if(chattyventi)
  49. fprint(2, "%s -> %F\n", argv0, tx);
  50. }
  51. /* slam tag into packet */
  52. top = packetpeek(p, buf, 0, 2);
  53. if(top == nil){
  54. packetfree(p);
  55. return nil;
  56. }
  57. if(top == buf){
  58. werrstr("first two bytes must be in same packet fragment");
  59. packetfree(p);
  60. vtfree(r);
  61. return nil;
  62. }
  63. top[1] = tag;
  64. qunlock(&z->lk);
  65. if(vtsend(z, p) < 0){
  66. vtfree(r);
  67. return nil;
  68. }
  69. qlock(&z->lk);
  70. /* wait for the muxer to give us our packet */
  71. r->sleeping = 1;
  72. z->nsleep++;
  73. while(z->muxer && !r->done)
  74. rsleep(&r->r);
  75. z->nsleep--;
  76. r->sleeping = 0;
  77. /* if not done, there's no muxer: start muxing */
  78. if(!r->done){
  79. if(z->muxer)
  80. abort();
  81. z->muxer = 1;
  82. while(!r->done){
  83. qunlock(&z->lk);
  84. if((p = vtrecv(z)) == nil){
  85. werrstr("unexpected eof on venti connection");
  86. z->muxer = 0;
  87. vtfree(r);
  88. return nil;
  89. }
  90. qlock(&z->lk);
  91. muxrpc(z, p);
  92. }
  93. z->muxer = 0;
  94. /* if there is anyone else sleeping, wake first unfinished to mux */
  95. if(z->nsleep)
  96. for(i=0; i<256; i++){
  97. rr = z->wait[i];
  98. if(rr && rr->sleeping && !rr->done){
  99. rwakeup(&rr->r);
  100. break;
  101. }
  102. }
  103. }
  104. p = r->p;
  105. puttag(z, r, tag);
  106. vtfree(r);
  107. qunlock(&z->lk);
  108. return p;
  109. }
  110. Packet*
  111. vtrpc(VtConn *z, Packet *p)
  112. {
  113. return _vtrpc(z, p, nil);
  114. }
  115. static int
  116. gettag(VtConn *z, Rwait *r)
  117. {
  118. int i;
  119. Again:
  120. while(z->ntag == 256)
  121. rsleep(&z->tagrend);
  122. for(i=0; i<256; i++)
  123. if(z->wait[i] == 0){
  124. z->ntag++;
  125. z->wait[i] = r;
  126. return i;
  127. }
  128. fprint(2, "libventi: ntag botch\n");
  129. goto Again;
  130. }
  131. static void
  132. puttag(VtConn *z, Rwait *r, int tag)
  133. {
  134. assert(z->wait[tag] == r);
  135. z->wait[tag] = nil;
  136. z->ntag--;
  137. rwakeup(&z->tagrend);
  138. }
  139. static void
  140. muxrpc(VtConn *z, Packet *p)
  141. {
  142. uchar tag, buf[2], *top;
  143. Rwait *r;
  144. if((top = packetpeek(p, buf, 0, 2)) == nil){
  145. fprint(2, "libventi: short packet in vtrpc\n");
  146. packetfree(p);
  147. return;
  148. }
  149. tag = top[1];
  150. if((r = z->wait[tag]) == nil){
  151. fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
  152. abort();
  153. packetfree(p);
  154. return;
  155. }
  156. r->p = p;
  157. r->done = 1;
  158. rwakeup(&r->r);
  159. }