123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825 |
- #include "stdinc.h"
- #include "9.h"
- #include "dat.h"
- #include "fns.h"
- enum {
- NConInit = 128,
- NMsgInit = 384,
- NMsgProcInit = 64,
- NMsizeInit = 8192+IOHDRSZ,
- };
- static struct {
- VtLock* alock; /* alloc */
- Msg* ahead;
- VtRendez* arendez;
- int maxmsg;
- int nmsg;
- int nmsgstarve;
- VtLock* rlock; /* read */
- Msg* rhead;
- Msg* rtail;
- VtRendez* rrendez;
- int maxproc;
- int nproc;
- int nprocstarve;
- u32int msize; /* immutable */
- } mbox;
- static struct {
- VtLock* alock; /* alloc */
- Con* ahead;
- VtRendez* arendez;
- VtLock* clock;
- Con* chead;
- Con* ctail;
- int maxcon;
- int ncon;
- int nconstarve;
- u32int msize;
- } cbox;
- static void
- conFree(Con* con)
- {
- assert(con->version == nil);
- assert(con->mhead == nil);
- assert(con->whead == nil);
- assert(con->nfid == 0);
- assert(con->state == ConMoribund);
- if(con->fd >= 0){
- close(con->fd);
- con->fd = -1;
- }
- con->state = ConDead;
- con->aok = 0;
- con->flags = 0;
- con->isconsole = 0;
- vtLock(cbox.alock);
- if(con->cprev != nil)
- con->cprev->cnext = con->cnext;
- else
- cbox.chead = con->cnext;
- if(con->cnext != nil)
- con->cnext->cprev = con->cprev;
- else
- cbox.ctail = con->cprev;
- con->cprev = con->cnext = nil;
- if(cbox.ncon > cbox.maxcon){
- if(con->name != nil)
- vtMemFree(con->name);
- vtLockFree(con->fidlock);
- vtMemFree(con->data);
- vtRendezFree(con->wrendez);
- vtLockFree(con->wlock);
- vtRendezFree(con->mrendez);
- vtLockFree(con->mlock);
- vtRendezFree(con->rendez);
- vtLockFree(con->lock);
- vtMemFree(con);
- cbox.ncon--;
- vtUnlock(cbox.alock);
- return;
- }
- con->anext = cbox.ahead;
- cbox.ahead = con;
- if(con->anext == nil)
- vtWakeup(cbox.arendez);
- vtUnlock(cbox.alock);
- }
- static void
- msgFree(Msg* m)
- {
- assert(m->rwnext == nil);
- assert(m->flush == nil);
- vtLock(mbox.alock);
- if(mbox.nmsg > mbox.maxmsg){
- vtMemFree(m->data);
- vtMemFree(m);
- mbox.nmsg--;
- vtUnlock(mbox.alock);
- return;
- }
- m->anext = mbox.ahead;
- mbox.ahead = m;
- if(m->anext == nil)
- vtWakeup(mbox.arendez);
- vtUnlock(mbox.alock);
- }
- static Msg*
- msgAlloc(Con* con)
- {
- Msg *m;
- vtLock(mbox.alock);
- while(mbox.ahead == nil){
- if(mbox.nmsg >= mbox.maxmsg){
- mbox.nmsgstarve++;
- vtSleep(mbox.arendez);
- continue;
- }
- m = vtMemAllocZ(sizeof(Msg));
- m->data = vtMemAlloc(mbox.msize);
- m->msize = mbox.msize;
- mbox.nmsg++;
- mbox.ahead = m;
- break;
- }
- m = mbox.ahead;
- mbox.ahead = m->anext;
- m->anext = nil;
- vtUnlock(mbox.alock);
- m->con = con;
- m->state = MsgR;
- m->nowq = 0;
- return m;
- }
- static void
- msgMunlink(Msg* m)
- {
- Con *con;
- con = m->con;
- if(m->mprev != nil)
- m->mprev->mnext = m->mnext;
- else
- con->mhead = m->mnext;
- if(m->mnext != nil)
- m->mnext->mprev = m->mprev;
- else
- con->mtail = m->mprev;
- m->mprev = m->mnext = nil;
- }
- void
- msgFlush(Msg* m)
- {
- Con *con;
- Msg *flush, *old;
- con = m->con;
- if(Dflag)
- fprint(2, "msgFlush %F\n", &m->t);
- /*
- * If this Tflush has been flushed, nothing to do.
- * Look for the message to be flushed in the
- * queue of all messages still on this connection.
- * If it's not found must assume Elvis has already
- * left the building and reply normally.
- */
- vtLock(con->mlock);
- if(m->state == MsgF){
- vtUnlock(con->mlock);
- return;
- }
- for(old = con->mhead; old != nil; old = old->mnext)
- if(old->t.tag == m->t.oldtag)
- break;
- if(old == nil){
- if(Dflag)
- fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag);
- vtUnlock(con->mlock);
- return;
- }
- if(Dflag)
- fprint(2, "\tmsgFlush found %F\n", &old->t);
- /*
- * Found it.
- * There are two cases where the old message can be
- * truly flushed and no reply to the original message given.
- * The first is when the old message is in MsgR state; no
- * processing has been done yet and it is still on the read
- * queue. The second is if old is a Tflush, which doesn't
- * affect the server state. In both cases, put the old
- * message into MsgF state and let MsgWrite toss it after
- * pulling it off the queue.
- */
- if(old->state == MsgR || old->t.type == Tflush){
- old->state = MsgF;
- if(Dflag)
- fprint(2, "msgFlush: change %d from MsgR to MsgF\n",
- m->t.oldtag);
- }
- /*
- * Link this flush message and the old message
- * so multiple flushes can be coalesced (if there are
- * multiple Tflush messages for a particular pending
- * request, it is only necessary to respond to the last
- * one, so any previous can be removed) and to be
- * sure flushes wait for their corresponding old
- * message to go out first.
- * Waiting flush messages do not go on the write queue,
- * they are processed after the old message is dealt
- * with. There's no real need to protect the setting of
- * Msg.nowq, the only code to check it runs in this
- * process after this routine returns.
- */
- if((flush = old->flush) != nil){
- if(Dflag)
- fprint(2, "msgFlush: remove %d from %d list\n",
- old->flush->t.tag, old->t.tag);
- m->flush = flush->flush;
- flush->flush = nil;
- msgMunlink(flush);
- msgFree(flush);
- }
- old->flush = m;
- m->nowq = 1;
- if(Dflag)
- fprint(2, "msgFlush: add %d to %d queue\n",
- m->t.tag, old->t.tag);
- vtUnlock(con->mlock);
- }
- static void
- msgProc(void*)
- {
- Msg *m;
- char *e;
- Con *con;
- vtThreadSetName("msgProc");
- for(;;){
- /*
- * If surplus to requirements, exit.
- * If not, wait for and pull a message off
- * the read queue.
- */
- vtLock(mbox.rlock);
- if(mbox.nproc > mbox.maxproc){
- mbox.nproc--;
- vtUnlock(mbox.rlock);
- break;
- }
- while(mbox.rhead == nil)
- vtSleep(mbox.rrendez);
- m = mbox.rhead;
- mbox.rhead = m->rwnext;
- m->rwnext = nil;
- vtUnlock(mbox.rlock);
- con = m->con;
- e = nil;
- /*
- * If the message has been flushed before
- * any 9P processing has started, mark it so
- * none will be attempted.
- */
- vtLock(con->mlock);
- if(m->state == MsgF)
- e = "flushed";
- else
- m->state = Msg9;
- vtUnlock(con->mlock);
- if(e == nil){
- /*
- * explain this
- */
- vtLock(con->lock);
- if(m->t.type == Tversion){
- con->version = m;
- con->state = ConDown;
- while(con->mhead != m)
- vtSleep(con->rendez);
- assert(con->state == ConDown);
- if(con->version == m){
- con->version = nil;
- con->state = ConInit;
- }
- else
- e = "Tversion aborted";
- }
- else if(con->state != ConUp)
- e = "connection not ready";
- vtUnlock(con->lock);
- }
- /*
- * Dispatch if not error already.
- */
- m->r.tag = m->t.tag;
- if(e == nil && !(*rFcall[m->t.type])(m))
- e = vtGetError();
- if(e != nil){
- m->r.type = Rerror;
- m->r.ename = e;
- }
- else
- m->r.type = m->t.type+1;
- /*
- * Put the message (with reply) on the
- * write queue and wakeup the write process.
- */
- if(!m->nowq){
- vtLock(con->wlock);
- if(con->whead == nil)
- con->whead = m;
- else
- con->wtail->rwnext = m;
- con->wtail = m;
- vtWakeup(con->wrendez);
- vtUnlock(con->wlock);
- }
- }
- }
- static void
- msgRead(void* v)
- {
- Msg *m;
- Con *con;
- int eof, fd, n;
- vtThreadSetName("msgRead");
- con = v;
- fd = con->fd;
- eof = 0;
- while(!eof){
- m = msgAlloc(con);
- while((n = read9pmsg(fd, m->data, con->msize)) == 0)
- ;
- if(n < 0){
- m->t.type = Tversion;
- m->t.fid = NOFID;
- m->t.tag = NOTAG;
- m->t.msize = con->msize;
- m->t.version = "9PEoF";
- eof = 1;
- }
- else if(convM2S(m->data, n, &m->t) != n){
- if(Dflag)
- fprint(2, "msgRead: convM2S error: %s\n",
- con->name);
- msgFree(m);
- continue;
- }
- if(Dflag)
- fprint(2, "msgRead %p: t %F\n", con, &m->t);
- vtLock(con->mlock);
- if(con->mtail != nil){
- m->mprev = con->mtail;
- con->mtail->mnext = m;
- }
- else{
- con->mhead = m;
- m->mprev = nil;
- }
- con->mtail = m;
- vtUnlock(con->mlock);
- vtLock(mbox.rlock);
- if(mbox.rhead == nil){
- mbox.rhead = m;
- if(!vtWakeup(mbox.rrendez)){
- if(mbox.nproc < mbox.maxproc){
- if(vtThread(msgProc, nil) > 0)
- mbox.nproc++;
- }
- else
- mbox.nprocstarve++;
- }
- /*
- * don't need this surely?
- vtWakeup(mbox.rrendez);
- */
- }
- else
- mbox.rtail->rwnext = m;
- mbox.rtail = m;
- vtUnlock(mbox.rlock);
- }
- }
- static void
- msgWrite(void* v)
- {
- Con *con;
- int eof, n;
- Msg *flush, *m;
- vtThreadSetName("msgWrite");
- con = v;
- if(vtThread(msgRead, con) < 0){
- conFree(con);
- return;
- }
- for(;;){
- /*
- * Wait for and pull a message off the write queue.
- */
- vtLock(con->wlock);
- while(con->whead == nil)
- vtSleep(con->wrendez);
- m = con->whead;
- con->whead = m->rwnext;
- m->rwnext = nil;
- assert(!m->nowq);
- vtUnlock(con->wlock);
- eof = 0;
- /*
- * Write each message (if it hasn't been flushed)
- * followed by any messages waiting for it to complete.
- */
- vtLock(con->mlock);
- while(m != nil){
- msgMunlink(m);
- if(Dflag)
- fprint(2, "msgWrite %d: r %F\n",
- m->state, &m->r);
- if(m->state != MsgF){
- m->state = MsgW;
- vtUnlock(con->mlock);
- n = convS2M(&m->r, con->data, con->msize);
- if(write(con->fd, con->data, n) != n)
- eof = 1;
- vtLock(con->mlock);
- }
- if((flush = m->flush) != nil){
- assert(flush->nowq);
- m->flush = nil;
- }
- msgFree(m);
- m = flush;
- }
- vtUnlock(con->mlock);
- vtLock(con->lock);
- if(eof && con->fd >= 0){
- close(con->fd);
- con->fd = -1;
- }
- if(con->state == ConDown)
- vtWakeup(con->rendez);
- if(con->state == ConMoribund && con->mhead == nil){
- vtUnlock(con->lock);
- conFree(con);
- break;
- }
- vtUnlock(con->lock);
- }
- }
- Con*
- conAlloc(int fd, char* name, int flags)
- {
- Con *con;
- char buf[128], *p;
- int rfd, n;
- vtLock(cbox.alock);
- while(cbox.ahead == nil){
- if(cbox.ncon >= cbox.maxcon){
- cbox.nconstarve++;
- vtSleep(cbox.arendez);
- continue;
- }
- con = vtMemAllocZ(sizeof(Con));
- con->lock = vtLockAlloc();
- con->rendez = vtRendezAlloc(con->lock);
- con->data = vtMemAlloc(cbox.msize);
- con->msize = cbox.msize;
- con->alock = vtLockAlloc();
- con->mlock = vtLockAlloc();
- con->mrendez = vtRendezAlloc(con->mlock);
- con->wlock = vtLockAlloc();
- con->wrendez = vtRendezAlloc(con->wlock);
- con->fidlock = vtLockAlloc();
- cbox.ncon++;
- cbox.ahead = con;
- break;
- }
- con = cbox.ahead;
- cbox.ahead = con->anext;
- con->anext = nil;
- if(cbox.ctail != nil){
- con->cprev = cbox.ctail;
- cbox.ctail->cnext = con;
- }
- else{
- cbox.chead = con;
- con->cprev = nil;
- }
- cbox.ctail = con;
- assert(con->mhead == nil);
- assert(con->whead == nil);
- assert(con->fhead == nil);
- assert(con->nfid == 0);
- con->state = ConNew;
- con->fd = fd;
- if(con->name != nil){
- vtMemFree(con->name);
- con->name = nil;
- }
- if(name != nil)
- con->name = vtStrDup(name);
- else
- con->name = vtStrDup("unknown");
- con->remote[0] = 0;
- snprint(buf, sizeof buf, "%s/remote", con->name);
- if((rfd = open(buf, OREAD)) >= 0){
- n = read(rfd, buf, sizeof buf-1);
- close(rfd);
- if(n > 0){
- buf[n] = 0;
- if((p = strchr(buf, '\n')) != nil)
- *p = 0;
- strecpy(con->remote, con->remote+sizeof con->remote, buf);
- }
- }
- con->flags = flags;
- con->isconsole = 0;
- vtUnlock(cbox.alock);
- if(vtThread(msgWrite, con) < 0){
- conFree(con);
- return nil;
- }
- return con;
- }
- static int
- cmdMsg(int argc, char* argv[])
- {
- char *p;
- char *usage = "usage: msg [-m nmsg] [-p nproc]";
- int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve;
- maxmsg = maxproc = 0;
- ARGBEGIN{
- default:
- return cliError(usage);
- case 'm':
- p = ARGF();
- if(p == nil)
- return cliError(usage);
- maxmsg = strtol(argv[0], &p, 0);
- if(maxmsg <= 0 || p == argv[0] || *p != '\0')
- return cliError(usage);
- break;
- case 'p':
- p = ARGF();
- if(p == nil)
- return cliError(usage);
- maxproc = strtol(argv[0], &p, 0);
- if(maxproc <= 0 || p == argv[0] || *p != '\0')
- return cliError(usage);
- break;
- }ARGEND
- if(argc)
- return cliError(usage);
- vtLock(mbox.alock);
- if(maxmsg)
- mbox.maxmsg = maxmsg;
- maxmsg = mbox.maxmsg;
- nmsg = mbox.nmsg;
- nmsgstarve = mbox.nmsgstarve;
- vtUnlock(mbox.alock);
- vtLock(mbox.rlock);
- if(maxproc)
- mbox.maxproc = maxproc;
- maxproc = mbox.maxproc;
- nproc = mbox.nproc;
- nprocstarve = mbox.nprocstarve;
- vtUnlock(mbox.rlock);
- consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc);
- consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n",
- nmsg, nmsgstarve, nproc, nprocstarve);
- return 1;
- }
- static int
- scmp(Fid *a, Fid *b)
- {
- if(a == 0)
- return 1;
- if(b == 0)
- return -1;
- return strcmp(a->uname, b->uname);
- }
- static Fid*
- fidMerge(Fid *a, Fid *b)
- {
- Fid *s, **l;
- l = &s;
- while(a || b){
- if(scmp(a, b) < 0){
- *l = a;
- l = &a->sort;
- a = a->sort;
- }else{
- *l = b;
- l = &b->sort;
- b = b->sort;
- }
- }
- *l = 0;
- return s;
- }
- static Fid*
- fidMergeSort(Fid *f)
- {
- int delay;
- Fid *a, *b;
- if(f == nil)
- return nil;
- if(f->sort == nil)
- return f;
- a = b = f;
- delay = 1;
- while(a && b){
- if(delay) /* easy way to handle 2-element list */
- delay = 0;
- else
- a = a->sort;
- if(b = b->sort)
- b = b->sort;
- }
- b = a->sort;
- a->sort = nil;
- a = fidMergeSort(f);
- b = fidMergeSort(b);
- return fidMerge(a, b);
- }
- static int
- cmdWho(int argc, char* argv[])
- {
- char *usage = "usage: who";
- int i, l1, l2, l;
- Con *con;
- Fid *fid, *last;
- ARGBEGIN{
- default:
- return cliError(usage);
- }ARGEND
- if(argc > 0)
- return cliError(usage);
- vtRLock(cbox.clock);
- l1 = 0;
- l2 = 0;
- for(con=cbox.chead; con; con=con->cnext){
- if((l = strlen(con->name)) > l1)
- l1 = l;
- if((l = strlen(con->remote)) > l2)
- l2 = l;
- }
- for(con=cbox.chead; con; con=con->cnext){
- consPrint("\t%-*s %-*s", l1, con->name, l2, con->remote);
- vtLock(con->fidlock);
- last = nil;
- for(i=0; i<NFidHash; i++)
- for(fid=con->fidhash[i]; fid; fid=fid->hash)
- if(fid->fidno != NOFID && fid->uname){
- fid->sort = last;
- last = fid;
- }
- fid = fidMergeSort(last);
- last = nil;
- for(; fid; last=fid, fid=fid->sort)
- if(last==nil || strcmp(fid->uname, last->uname) != 0)
- consPrint(" %q", fid->uname);
- vtUnlock(con->fidlock);
- consPrint("\n");
- }
- vtRUnlock(cbox.clock);
- return 1;
- }
- void
- msgInit(void)
- {
- mbox.alock = vtLockAlloc();
- mbox.arendez = vtRendezAlloc(mbox.alock);
- mbox.rlock = vtLockAlloc();
- mbox.rrendez = vtRendezAlloc(mbox.rlock);
- mbox.maxmsg = NMsgInit;
- mbox.maxproc = NMsgProcInit;
- mbox.msize = NMsizeInit;
- cliAddCmd("msg", cmdMsg);
- }
- static int
- cmdCon(int argc, char* argv[])
- {
- char *p;
- Con *con;
- char *usage = "usage: con [-m ncon]";
- int maxcon, ncon, nconstarve;
- maxcon = 0;
- ARGBEGIN{
- default:
- return cliError(usage);
- case 'm':
- p = ARGF();
- if(p == nil)
- return cliError(usage);
- maxcon = strtol(argv[0], &p, 0);
- if(maxcon <= 0 || p == argv[0] || *p != '\0')
- return cliError(usage);
- break;
- }ARGEND
- if(argc)
- return cliError(usage);
- vtLock(cbox.clock);
- if(maxcon)
- cbox.maxcon = maxcon;
- maxcon = cbox.maxcon;
- ncon = cbox.ncon;
- nconstarve = cbox.nconstarve;
- vtUnlock(cbox.clock);
- consPrint("\tcon -m %d\n", maxcon);
- consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve);
- vtRLock(cbox.clock);
- for(con = cbox.chead; con != nil; con = con->cnext){
- consPrint("\t%s\n", con->name);
- }
- vtRUnlock(cbox.clock);
- return 1;
- }
- void
- conInit(void)
- {
- cbox.alock = vtLockAlloc();
- cbox.arendez = vtRendezAlloc(cbox.alock);
- cbox.clock = vtLockAlloc();
- cbox.maxcon = NConInit;
- cbox.msize = NMsizeInit;
- cliAddCmd("con", cmdCon);
- cliAddCmd("who", cmdWho);
- }
|