rpc.c 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. /*
  2. * Multiplexed Venti client. It would be nice if we
  3. * could turn this into a generic library routine rather
  4. * than keep it Venti specific. A user-level 9P client
  5. * could use something like this too.
  6. *
  7. * (Actually it does - this should be replaced with libmux,
  8. * which should be renamed librpcmux.)
  9. *
  10. * This is a little more complicated than it might be
  11. * because we want it to work well within and without libthread.
  12. *
  13. * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
  14. */
  15. #include <u.h>
  16. #include <libc.h>
  17. #include <venti.h>
  18. typedef struct Rwait Rwait;
  19. struct Rwait
  20. {
  21. Rendez r;
  22. Packet *p;
  23. int done;
  24. int sleeping;
  25. };
  26. static int gettag(VtConn*, Rwait*);
  27. static void puttag(VtConn*, Rwait*, int);
  28. static void muxrpc(VtConn*, Packet*);
  29. Packet*
  30. _vtrpc(VtConn *z, Packet *p, VtFcall *tx)
  31. {
  32. int i;
  33. uchar tag, buf[2], *top;
  34. Rwait *r, *rr;
  35. /* must malloc because stack could be private */
  36. r = vtmallocz(sizeof(Rwait));
  37. qlock(&z->lk);
  38. r->r.l = &z->lk;
  39. tag = gettag(z, r);
  40. if(tx){
  41. /* vtfcallrpc can't print packet because it doesn't have tag */
  42. tx->tag = tag;
  43. if(chattyventi)
  44. fprint(2, "%s -> %F\n", argv0, tx);
  45. }
  46. /* slam tag into packet */
  47. top = packetpeek(p, buf, 0, 2);
  48. if(top == nil){
  49. packetfree(p);
  50. return nil;
  51. }
  52. if(top == buf){
  53. werrstr("first two bytes must be in same packet fragment");
  54. packetfree(p);
  55. vtfree(r);
  56. return nil;
  57. }
  58. top[1] = tag;
  59. qunlock(&z->lk);
  60. if(vtsend(z, p) < 0){
  61. vtfree(r);
  62. return nil;
  63. }
  64. qlock(&z->lk);
  65. /* wait for the muxer to give us our packet */
  66. r->sleeping = 1;
  67. z->nsleep++;
  68. while(z->muxer && !r->done)
  69. rsleep(&r->r);
  70. z->nsleep--;
  71. r->sleeping = 0;
  72. /* if not done, there's no muxer: start muxing */
  73. if(!r->done){
  74. if(z->muxer)
  75. abort();
  76. z->muxer = 1;
  77. while(!r->done){
  78. qunlock(&z->lk);
  79. if((p = vtrecv(z)) == nil){
  80. werrstr("unexpected eof on venti connection");
  81. z->muxer = 0;
  82. vtfree(r);
  83. return nil;
  84. }
  85. qlock(&z->lk);
  86. muxrpc(z, p);
  87. }
  88. z->muxer = 0;
  89. /* if there is anyone else sleeping, wake first unfinished to mux */
  90. if(z->nsleep)
  91. for(i=0; i<256; i++){
  92. rr = z->wait[i];
  93. if(rr && rr->sleeping && !rr->done){
  94. rwakeup(&rr->r);
  95. break;
  96. }
  97. }
  98. }
  99. p = r->p;
  100. puttag(z, r, tag);
  101. vtfree(r);
  102. qunlock(&z->lk);
  103. return p;
  104. }
  105. Packet*
  106. vtrpc(VtConn *z, Packet *p)
  107. {
  108. return _vtrpc(z, p, nil);
  109. }
  110. static int
  111. gettag(VtConn *z, Rwait *r)
  112. {
  113. int i;
  114. Again:
  115. while(z->ntag == 256)
  116. rsleep(&z->tagrend);
  117. for(i=0; i<256; i++)
  118. if(z->wait[i] == 0){
  119. z->ntag++;
  120. z->wait[i] = r;
  121. return i;
  122. }
  123. fprint(2, "libventi: ntag botch\n");
  124. goto Again;
  125. }
  126. static void
  127. puttag(VtConn *z, Rwait *r, int tag)
  128. {
  129. assert(z->wait[tag] == r);
  130. z->wait[tag] = nil;
  131. z->ntag--;
  132. rwakeup(&z->tagrend);
  133. }
  134. static void
  135. muxrpc(VtConn *z, Packet *p)
  136. {
  137. uchar tag, buf[2], *top;
  138. Rwait *r;
  139. if((top = packetpeek(p, buf, 0, 2)) == nil){
  140. fprint(2, "libventi: short packet in vtrpc\n");
  141. packetfree(p);
  142. return;
  143. }
  144. tag = top[1];
  145. if((r = z->wait[tag]) == nil){
  146. fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
  147. abort();
  148. packetfree(p);
  149. return;
  150. }
  151. r->p = p;
  152. r->done = 1;
  153. rwakeup(&r->r);
  154. }