123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- /*
- * This file is part of the UCB release of Plan 9. It is subject to the license
- * terms in the LICENSE file found in the top-level directory of this
- * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
- * part of the UCB release of Plan 9, including this file, may be copied,
- * modified, propagated, or distributed except according to the terms contained
- * in the LICENSE file.
- */
- #include "u.h"
- #include "../port/lib.h"
- #include "mem.h"
- #include "dat.h"
- #include "fns.h"
- #include "../port/error.h"
- typedef struct Pipe Pipe;
- struct Pipe
- {
- QLock;
- Pipe *next;
- int ref;
- uint32_t path;
- Queue *q[2];
- int qref[2];
- };
- struct
- {
- Lock;
- uint32_t path;
- } pipealloc;
- enum
- {
- Qdir,
- Qdata0,
- Qdata1,
- };
- Dirtab pipedir[] =
- {
- ".", {Qdir,0,QTDIR}, 0, DMDIR|0500,
- "data", {Qdata0}, 0, 0600,
- "data1", {Qdata1}, 0, 0600,
- };
- #define NPIPEDIR 3
- #define PIPETYPE(x) (((unsigned)x)&0x1f)
- #define PIPEID(x) ((((unsigned)x))>>5)
- #define PIPEQID(i, t) ((((unsigned)i)<<5)|(t))
- enum
- {
- /* Plan 9 default for nmach > 1 */
- Pipeqsize = 256*1024
- };
- static void
- pipeinit(void)
- {
- }
- /*
- * create a pipe, no streams are created until an open
- */
- static Chan*
- pipeattach(char *spec)
- {
- Pipe *p;
- Chan *c;
- c = devattach('|', spec);
- p = malloc(sizeof(Pipe));
- if(p == 0)
- exhausted("memory");
- p->ref = 1;
- p->q[0] = qopen(Pipeqsize, 0, 0, 0);
- if(p->q[0] == 0){
- free(p);
- exhausted("memory");
- }
- p->q[1] = qopen(Pipeqsize, 0, 0, 0);
- if(p->q[1] == 0){
- free(p->q[0]);
- free(p);
- exhausted("memory");
- }
- lock(&pipealloc);
- p->path = ++pipealloc.path;
- unlock(&pipealloc);
- mkqid(&c->qid, PIPEQID(2*p->path, Qdir), 0, QTDIR);
- c->aux = p;
- c->devno = 0;
- return c;
- }
- static int
- pipegen(Chan *c, char* d, Dirtab *tab, int ntab, int i, Dir *dp)
- {
- Qid q;
- int len;
- Pipe *p;
- if(i == DEVDOTDOT){
- devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
- return 1;
- }
- i++; /* skip . */
- if(tab==0 || i>=ntab)
- return -1;
- tab += i;
- p = c->aux;
- switch((uint32_t)tab->qid.path){
- case Qdata0:
- len = qlen(p->q[0]);
- break;
- case Qdata1:
- len = qlen(p->q[1]);
- break;
- default:
- len = tab->length;
- break;
- }
- mkqid(&q, PIPEQID(PIPEID(c->qid.path), tab->qid.path), 0, QTFILE);
- devdir(c, q, tab->name, len, eve, tab->perm, dp);
- return 1;
- }
- static Walkqid*
- pipewalk(Chan *c, Chan *nc, char **name, int nname)
- {
- Walkqid *wq;
- Pipe *p;
- wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
- if(wq != nil && wq->clone != nil && wq->clone != c){
- p = c->aux;
- qlock(p);
- p->ref++;
- if(c->flag & COPEN){
- print("channel open in pipewalk\n");
- switch(PIPETYPE(c->qid.path)){
- case Qdata0:
- p->qref[0]++;
- break;
- case Qdata1:
- p->qref[1]++;
- break;
- }
- }
- qunlock(p);
- }
- return wq;
- }
- static int32_t
- pipestat(Chan *c, uint8_t *db, int32_t n)
- {
- Pipe *p;
- Dir dir;
- p = c->aux;
- switch(PIPETYPE(c->qid.path)){
- case Qdir:
- devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
- break;
- case Qdata0:
- devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
- break;
- case Qdata1:
- devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
- break;
- default:
- panic("pipestat");
- }
- n = convD2M(&dir, db, n);
- if(n < BIT16SZ)
- error(Eshortstat);
- return n;
- }
- /*
- * if the stream doesn't exist, create it
- */
- static Chan*
- pipeopen(Chan *c, int omode)
- {
- Pipe *p;
- if(c->qid.type & QTDIR){
- if(omode != OREAD)
- error(Ebadarg);
- c->mode = omode;
- c->flag |= COPEN;
- c->offset = 0;
- return c;
- }
- p = c->aux;
- qlock(p);
- switch(PIPETYPE(c->qid.path)){
- case Qdata0:
- p->qref[0]++;
- break;
- case Qdata1:
- p->qref[1]++;
- break;
- }
- qunlock(p);
- c->mode = openmode(omode);
- c->flag |= COPEN;
- c->offset = 0;
- c->iounit = qiomaxatomic;
- return c;
- }
- static void
- pipeclose(Chan *c)
- {
- Pipe *p;
- p = c->aux;
- qlock(p);
- if(c->flag & COPEN){
- /*
- * closing either side hangs up the stream
- */
- switch(PIPETYPE(c->qid.path)){
- case Qdata0:
- p->qref[0]--;
- if(p->qref[0] == 0){
- qhangup(p->q[1], 0);
- qclose(p->q[0]);
- }
- break;
- case Qdata1:
- p->qref[1]--;
- if(p->qref[1] == 0){
- qhangup(p->q[0], 0);
- qclose(p->q[1]);
- }
- break;
- }
- }
- /*
- * if both sides are closed, they are reusable
- */
- if(p->qref[0] == 0 && p->qref[1] == 0){
- qreopen(p->q[0]);
- qreopen(p->q[1]);
- }
- /*
- * free the structure on last close
- */
- p->ref--;
- if(p->ref == 0){
- qunlock(p);
- free(p->q[0]);
- free(p->q[1]);
- free(p);
- } else
- qunlock(p);
- }
- static int32_t
- piperead(Chan *c, void *va, int32_t n, int64_t m)
- {
- Pipe *p;
- p = c->aux;
- switch(PIPETYPE(c->qid.path)){
- case Qdir:
- return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
- case Qdata0:
- return qread(p->q[0], va, n);
- case Qdata1:
- return qread(p->q[1], va, n);
- default:
- panic("piperead");
- }
- return -1; /* not reached */
- }
- static Block*
- pipebread(Chan *c, int32_t n, int64_t offset)
- {
- Pipe *p;
- p = c->aux;
- switch(PIPETYPE(c->qid.path)){
- case Qdata0:
- return qbread(p->q[0], n);
- case Qdata1:
- return qbread(p->q[1], n);
- }
- return devbread(c, n, offset);
- }
- /*
- * a write to a closed pipe causes a note to be sent to
- * the process.
- */
- static int32_t
- pipewrite(Chan *c, void *va, int32_t n, int64_t mm)
- {
- Proc *up = externup();
- Pipe *p;
- if(0)if(!islo())
- print("pipewrite hi %#p\n", getcallerpc(&c)); // devmnt?
- if(waserror()) {
- /* avoid notes when pipe is a mounted queue */
- if((c->flag & CMSG) == 0)
- postnote(up, 1, "sys: write on closed pipe", NUser);
- nexterror();
- }
- p = c->aux;
- switch(PIPETYPE(c->qid.path)){
- case Qdata0:
- n = qwrite(p->q[1], va, n);
- break;
- case Qdata1:
- n = qwrite(p->q[0], va, n);
- break;
- default:
- panic("pipewrite");
- }
- poperror();
- return n;
- }
- static int32_t
- pipebwrite(Chan *c, Block *bp, int64_t mm)
- {
- Proc *up = externup();
- int32_t n;
- Pipe *p;
- if(waserror()) {
- /* avoid notes when pipe is a mounted queue */
- if((c->flag & CMSG) == 0)
- postnote(up, 1, "sys: write on closed pipe", NUser);
- nexterror();
- }
- p = c->aux;
- switch(PIPETYPE(c->qid.path)){
- case Qdata0:
- n = qbwrite(p->q[1], bp);
- break;
- case Qdata1:
- n = qbwrite(p->q[0], bp);
- break;
- default:
- n = 0;
- panic("pipebwrite");
- }
- poperror();
- return n;
- }
- Dev pipedevtab = {
- '|',
- "pipe",
- devreset,
- pipeinit,
- devshutdown,
- pipeattach,
- pipewalk,
- pipestat,
- pipeopen,
- devcreate,
- pipeclose,
- piperead,
- pipebread,
- pipewrite,
- pipebwrite,
- devremove,
- devwstat,
- };
|