123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- /*
- * This file is part of the UCB release of Plan 9. It is subject to the license
- * terms in the LICENSE file found in the top-level directory of this
- * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
- * part of the UCB release of Plan 9, including this file, may be copied,
- * modified, propagated, or distributed except according to the terms contained
- * in the LICENSE file.
- */
- /*
- * Multiplexed Venti client. It would be nice if we
- * could turn this into a generic library routine rather
- * than keep it Venti specific. A user-level 9P client
- * could use something like this too.
- *
- * (Actually it does - this should be replaced with libmux,
- * which should be renamed librpcmux.)
- *
- * This is a little more complicated than it might be
- * because we want it to work well within and without libthread.
- *
- * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
- */
- #include <u.h>
- #include <libc.h>
- #include <venti.h>
- typedef struct Rwait Rwait;
- struct Rwait
- {
- Rendez r;
- Packet *p;
- int done;
- int sleeping;
- };
- static int gettag(VtConn*, Rwait*);
- static void puttag(VtConn*, Rwait*, int);
- static void muxrpc(VtConn*, Packet*);
- Packet*
- _vtrpc(VtConn *z, Packet *p, VtFcall *tx)
- {
- int i;
- uint8_t tag, buf[2], *top;
- Rwait *r, *rr;
- if(z == nil){
- werrstr("not connected");
- packetfree(p);
- return nil;
- }
- /* must malloc because stack could be private */
- r = vtmallocz(sizeof(Rwait));
- qlock(&z->lk);
- r->r.l = &z->lk;
- tag = gettag(z, r);
- if(tx){
- /* vtfcallrpc can't print packet because it doesn't have tag */
- tx->tag = tag;
- if(chattyventi)
- fprint(2, "%s -> %F\n", argv0, tx);
- }
- /* slam tag into packet */
- top = packetpeek(p, buf, 0, 2);
- if(top == nil){
- packetfree(p);
- return nil;
- }
- if(top == buf){
- werrstr("first two bytes must be in same packet fragment");
- packetfree(p);
- vtfree(r);
- return nil;
- }
- top[1] = tag;
- qunlock(&z->lk);
- if(vtsend(z, p) < 0){
- vtfree(r);
- return nil;
- }
- qlock(&z->lk);
- /* wait for the muxer to give us our packet */
- r->sleeping = 1;
- z->nsleep++;
- while(z->muxer && !r->done)
- rsleep(&r->r);
- z->nsleep--;
- r->sleeping = 0;
- /* if not done, there's no muxer: start muxing */
- if(!r->done){
- if(z->muxer)
- abort();
- z->muxer = 1;
- while(!r->done){
- qunlock(&z->lk);
- if((p = vtrecv(z)) == nil){
- werrstr("unexpected eof on venti connection");
- z->muxer = 0;
- vtfree(r);
- return nil;
- }
- qlock(&z->lk);
- muxrpc(z, p);
- }
- z->muxer = 0;
- /* if there is anyone else sleeping, wake first unfinished to mux */
- if(z->nsleep)
- for(i=0; i<256; i++){
- rr = z->wait[i];
- if(rr && rr->sleeping && !rr->done){
- rwakeup(&rr->r);
- break;
- }
- }
- }
- p = r->p;
- puttag(z, r, tag);
- vtfree(r);
- qunlock(&z->lk);
- return p;
- }
- Packet*
- vtrpc(VtConn *z, Packet *p)
- {
- return _vtrpc(z, p, nil);
- }
- static int
- gettag(VtConn *z, Rwait *r)
- {
- int i;
- Again:
- while(z->ntag == 256)
- rsleep(&z->tagrend);
- for(i=0; i<256; i++)
- if(z->wait[i] == 0){
- z->ntag++;
- z->wait[i] = r;
- return i;
- }
- fprint(2, "libventi: ntag botch\n");
- goto Again;
- }
- static void
- puttag(VtConn *z, Rwait *r, int tag)
- {
- assert(z->wait[tag] == r);
- z->wait[tag] = nil;
- z->ntag--;
- rwakeup(&z->tagrend);
- }
- static void
- muxrpc(VtConn *z, Packet *p)
- {
- uint8_t tag, buf[2], *top;
- Rwait *r;
- if((top = packetpeek(p, buf, 0, 2)) == nil){
- fprint(2, "libventi: short packet in vtrpc\n");
- packetfree(p);
- return;
- }
- tag = top[1];
- if((r = z->wait[tag]) == nil){
- fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
- abort();
- packetfree(p);
- return;
- }
- r->p = p;
- r->done = 1;
- rwakeup(&r->r);
- }
|