123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 |
- #include <u.h>
- #include <libc.h>
- #include <fcall.h>
- #include <thread.h>
- #include <9p.h>
- #include "dat.h"
- int nclient;
- Client **client;
- #define Zmsg ((Msg*)~0)
- char nocmd[] = "";
- static void readthread(void*);
- static void writethread(void*);
- static void kickwriter(Client*);
- int
- newclient(void)
- {
- int i;
- Client *c;
- for(i=0; i<nclient; i++)
- if(client[i]->ref==0 && !client[i]->moribund)
- return i;
- c = emalloc(sizeof(Client));
- c->writerkick = chancreate(sizeof(void*), 1);
- c->execpid = chancreate(sizeof(ulong), 0);
- c->cmd = nocmd;
- c->readerproc = ioproc();
- c->writerproc = ioproc();
- c->num = nclient;
- if(nclient%16 == 0)
- client = erealloc(client, (nclient+16)*sizeof(client[0]));
- client[nclient++] = c;
- return nclient-1;
- }
- void
- die(Client *c)
- {
- Msg *m, *next;
- Req *r, *rnext;
- c->moribund = 1;
- kickwriter(c);
- iointerrupt(c->readerproc);
- iointerrupt(c->writerproc);
- if(--c->activethread == 0){
- if(c->cmd != nocmd){
- free(c->cmd);
- c->cmd = nocmd;
- }
- c->pid = 0;
- c->moribund = 0;
- c->status = Closed;
- for(m=c->mq; m && m != Zmsg; m=next){
- next = m->link;
- free(m);
- }
- c->mq = nil;
- if(c->rq != nil){
- for(r=c->rq; r; r=rnext){
- rnext = r->aux;
- respond(r, "hangup");
- }
- c->rq = nil;
- }
- if(c->wq != nil){
- for(r=c->wq; r; r=rnext){
- rnext = r->aux;
- respond(r, "hangup");
- }
- c->wq = nil;
- }
- c->rq = nil;
- c->wq = nil;
- c->emq = nil;
- c->erq = nil;
- c->ewq = nil;
- }
- }
- void
- closeclient(Client *c)
- {
- if(--c->ref == 0){
- if(c->pid > 0)
- postnote(PNPROC, c->pid, "kill");
- c->status = Hangup;
- close(c->fd[0]);
- c->fd[0] = c->fd[1] = -1;
- c->moribund = 1;
- kickwriter(c);
- iointerrupt(c->readerproc);
- iointerrupt(c->writerproc);
- c->activethread++;
- die(c);
- }
- }
- void
- queuerdreq(Client *c, Req *r)
- {
- if(c->rq==nil)
- c->erq = &c->rq;
- *c->erq = r;
- r->aux = nil;
- c->erq = (Req**)&r->aux;
- }
- void
- queuewrreq(Client *c, Req *r)
- {
- if(c->wq==nil)
- c->ewq = &c->wq;
- *c->ewq = r;
- r->aux = nil;
- c->ewq = (Req**)&r->aux;
- }
- void
- queuemsg(Client *c, Msg *m)
- {
- if(c->mq==nil)
- c->emq = &c->mq;
- *c->emq = m;
- if(m != Zmsg){
- m->link = nil;
- c->emq = (Msg**)&m->link;
- }else
- c->emq = nil;
- }
- void
- matchmsgs(Client *c)
- {
- Req *r;
- Msg *m;
- int n, rm;
- while(c->rq && c->mq){
- r = c->rq;
- c->rq = r->aux;
- rm = 0;
- m = c->mq;
- if(m == Zmsg){
- respond(r, "execnet: no more data");
- break;
- }
- n = r->ifcall.count;
- if(n >= m->ep - m->rp){
- n = m->ep - m->rp;
- c->mq = m->link;
- rm = 1;
- }
- if(n)
- memmove(r->ofcall.data, m->rp, n);
- if(rm)
- free(m);
- else
- m->rp += n;
- r->ofcall.count = n;
- respond(r, nil);
- }
- }
- void
- findrdreq(Client *c, Req *r)
- {
- Req **l;
- for(l=&c->rq; *l; l=(Req**)&(*l)->aux){
- if(*l == r){
- *l = r->aux;
- if(*l == nil)
- c->erq = l;
- respond(r, "flushed");
- break;
- }
- }
- }
- void
- findwrreq(Client *c, Req *r)
- {
- Req **l;
- for(l=&c->wq; *l; l=(Req**)&(*l)->aux){
- if(*l == r){
- *l = r->aux;
- if(*l == nil)
- c->ewq = l;
- respond(r, "flushed");
- return;
- }
- }
- }
- void
- dataread(Req *r, Client *c)
- {
- queuerdreq(c, r);
- matchmsgs(c);
- }
- static void
- readthread(void *a)
- {
- uchar *buf;
- int n;
- Client *c;
- Ioproc *io;
- Msg *m;
- char tmp[32];
- c = a;
- snprint(tmp, sizeof tmp, "read%d", c->num);
- threadsetname(tmp);
- buf = emalloc(8192);
- io = c->readerproc;
- while((n = ioread(io, c->fd[0], buf, 8192)) >= 0){
- m = emalloc(sizeof(Msg)+n);
- m->rp = (uchar*)&m[1];
- m->ep = m->rp + n;
- if(n)
- memmove(m->rp, buf, n);
- queuemsg(c, m);
- matchmsgs(c);
- }
- queuemsg(c, Zmsg);
- free(buf);
- die(c);
- }
- static void
- kickwriter(Client *c)
- {
- nbsendp(c->writerkick, nil);
- }
- void
- clientflush(Req *or, Client *c)
- {
- if(or->ifcall.type == Tread)
- findrdreq(c, or);
- else{
- if(c->execreq == or){
- c->execreq = nil;
- iointerrupt(c->writerproc);
- }
- findwrreq(c, or);
- if(c->curw == or){
- c->curw = nil;
- iointerrupt(c->writerproc);
- kickwriter(c);
- }
- }
- }
- void
- datawrite(Req *r, Client *c)
- {
- queuewrreq(c, r);
- kickwriter(c);
- }
- static void
- writethread(void *a)
- {
- char e[ERRMAX];
- uchar *buf;
- int n;
- Ioproc *io;
- Req *r;
- Client *c;
- char tmp[32];
- c = a;
- snprint(tmp, sizeof tmp, "write%d", c->num);
- threadsetname(tmp);
- buf = emalloc(8192);
- io = c->writerproc;
- for(;;){
- while(c->wq == nil){
- if(c->moribund)
- goto Out;
- recvp(c->writerkick);
- if(c->moribund)
- goto Out;
- }
- r = c->wq;
- c->wq = r->aux;
- c->curw = r;
- n = iowrite(io, c->fd[1], r->ifcall.data, r->ifcall.count);
- if(chatty9p)
- fprint(2, "io->write returns %d\n", n);
- if(n >= 0){
- r->ofcall.count = n;
- respond(r, nil);
- }else{
- rerrstr(e, sizeof e);
- respond(r, e);
- }
- }
- Out:
- free(buf);
- die(c);
- }
- static void
- execproc(void *a)
- {
- int i, fd;
- Client *c;
- char tmp[32];
- c = a;
- snprint(tmp, sizeof tmp, "execproc%d", c->num);
- threadsetname(tmp);
- if(pipe(c->fd) < 0){
- rerrstr(c->err, sizeof c->err);
- sendul(c->execpid, -1);
- return;
- }
- rfork(RFFDG);
- fd = c->fd[1];
- close(c->fd[0]);
- dup(fd, 0);
- dup(fd, 1);
- for(i=3; i<100; i++) /* should do better */
- close(i);
- strcpy(c->err, "exec failed");
- procexecl(c->execpid, "/bin/rc", "rc", "-c", c->cmd, nil);
- }
- static void
- execthread(void *a)
- {
- Client *c;
- int p;
- char tmp[32];
- c = a;
- snprint(tmp, sizeof tmp, "exec%d", c->num);
- threadsetname(tmp);
- c->execpid = chancreate(sizeof(ulong), 0);
- proccreate(execproc, c, STACK);
- p = recvul(c->execpid);
- chanfree(c->execpid);
- c->execpid = nil;
- close(c->fd[1]);
- c->fd[1] = c->fd[0];
- if(p != -1){
- c->pid = p;
- c->activethread = 2;
- threadcreate(readthread, c, STACK);
- threadcreate(writethread, c, STACK);
- if(c->execreq)
- respond(c->execreq, nil);
- }else{
- if(c->execreq)
- respond(c->execreq, c->err);
- }
- }
- void
- ctlwrite(Req *r, Client *c)
- {
- char *f[3], *s, *p;
- int nf;
- s = emalloc(r->ifcall.count+1);
- memmove(s, r->ifcall.data, r->ifcall.count);
- s[r->ifcall.count] = '\0';
- f[0] = s;
- p = strchr(s, ' ');
- if(p == nil)
- nf = 1;
- else{
- *p++ = '\0';
- f[1] = p;
- nf = 2;
- }
- if(f[0][0] == '\0'){
- free(s);
- respond(r, nil);
- return;
- }
- r->ofcall.count = r->ifcall.count;
- if(strcmp(f[0], "hangup") == 0){
- if(c->pid == 0){
- respond(r, "connection already hung up");
- goto Out;
- }
- postnote(PNPROC, c->pid, "kill");
- respond(r, nil);
- goto Out;
- }
- if(strcmp(f[0], "connect") == 0){
- if(c->cmd != nocmd){
- respond(r, "already have connection");
- goto Out;
- }
- if(nf == 1){
- respond(r, "need argument to connect");
- goto Out;
- }
- c->status = Exec;
- if(p = strrchr(f[1], '!'))
- *p = '\0';
- c->cmd = emalloc(4+1+strlen(f[1])+1);
- strcpy(c->cmd, "exec ");
- strcat(c->cmd, f[1]);
- c->execreq = r;
- threadcreate(execthread, c, STACK);
- goto Out;
- }
- respond(r, "bad or inappropriate control message");
- Out:
- free(s);
- }
|