/* * Multiplexed Venti client. It would be nice if we * could turn this into a generic library routine rather * than keep it Venti specific. A user-level 9P client * could use something like this too. * * (Actually it does - this should be replaced with libmux, * which should be renamed librpcmux.) * * This is a little more complicated than it might be * because we want it to work well within and without libthread. * * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel. */ #include #include #include typedef struct Rwait Rwait; struct Rwait { Rendez r; Packet *p; int done; int sleeping; }; static int gettag(VtConn*, Rwait*); static void puttag(VtConn*, Rwait*, int); static void muxrpc(VtConn*, Packet*); Packet* _vtrpc(VtConn *z, Packet *p, VtFcall *tx) { int i; uchar tag, buf[2], *top; Rwait *r, *rr; if(z == nil){ werrstr("not connected"); packetfree(p); return nil; } /* must malloc because stack could be private */ r = vtmallocz(sizeof(Rwait)); qlock(&z->lk); r->r.l = &z->lk; tag = gettag(z, r); if(tx){ /* vtfcallrpc can't print packet because it doesn't have tag */ tx->tag = tag; if(chattyventi) fprint(2, "%s -> %F\n", argv0, tx); } /* slam tag into packet */ top = packetpeek(p, buf, 0, 2); if(top == nil){ packetfree(p); return nil; } if(top == buf){ werrstr("first two bytes must be in same packet fragment"); packetfree(p); vtfree(r); return nil; } top[1] = tag; qunlock(&z->lk); if(vtsend(z, p) < 0){ vtfree(r); return nil; } qlock(&z->lk); /* wait for the muxer to give us our packet */ r->sleeping = 1; z->nsleep++; while(z->muxer && !r->done) rsleep(&r->r); z->nsleep--; r->sleeping = 0; /* if not done, there's no muxer: start muxing */ if(!r->done){ if(z->muxer) abort(); z->muxer = 1; while(!r->done){ qunlock(&z->lk); if((p = vtrecv(z)) == nil){ werrstr("unexpected eof on venti connection"); z->muxer = 0; vtfree(r); return nil; } qlock(&z->lk); muxrpc(z, p); } z->muxer = 0; /* if there is anyone else sleeping, wake first unfinished to mux */ if(z->nsleep) for(i=0; i<256; i++){ rr = z->wait[i]; if(rr && rr->sleeping && !rr->done){ rwakeup(&rr->r); break; } } } p = r->p; puttag(z, r, tag); vtfree(r); qunlock(&z->lk); return p; } Packet* vtrpc(VtConn *z, Packet *p) { return _vtrpc(z, p, nil); } static int gettag(VtConn *z, Rwait *r) { int i; Again: while(z->ntag == 256) rsleep(&z->tagrend); for(i=0; i<256; i++) if(z->wait[i] == 0){ z->ntag++; z->wait[i] = r; return i; } fprint(2, "libventi: ntag botch\n"); goto Again; } static void puttag(VtConn *z, Rwait *r, int tag) { assert(z->wait[tag] == r); z->wait[tag] = nil; z->ntag--; rwakeup(&z->tagrend); } static void muxrpc(VtConn *z, Packet *p) { uchar tag, buf[2], *top; Rwait *r; if((top = packetpeek(p, buf, 0, 2)) == nil){ fprint(2, "libventi: short packet in vtrpc\n"); packetfree(p); return; } tag = top[1]; if((r = z->wait[tag]) == nil){ fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag); abort(); packetfree(p); return; } r->p = p; r->done = 1; rwakeup(&r->r); }