1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540 |
- #include "u.h"
- #include "../port/lib.h"
- #include "mem.h"
- #include "dat.h"
- #include "fns.h"
- #include "../port/error.h"
- static ulong padblockcnt;
- static ulong concatblockcnt;
- static ulong pullupblockcnt;
- static ulong copyblockcnt;
- static ulong consumecnt;
- static ulong producecnt;
- static ulong qcopycnt;
- static int debugging;
- #define QDEBUG if(0)
- /*
- * IO queues
- */
- typedef struct Queue Queue;
- struct Queue
- {
- Lock;
- Block* bfirst; /* buffer */
- Block* blast;
- int len; /* bytes allocated to queue */
- int dlen; /* data bytes in queue */
- int limit; /* max bytes in queue */
- int inilim; /* initial limit */
- int state;
- int noblock; /* true if writes return immediately when q full */
- int eof; /* number of eofs read by user */
- void (*kick)(void*); /* restart output */
- void (*bypass)(void*, Block*); /* bypass queue altogether */
- void* arg; /* argument to kick */
- QLock rlock; /* mutex for reading processes */
- Rendez rr; /* process waiting to read */
- QLock wlock; /* mutex for writing processes */
- Rendez wr; /* process waiting to write */
- char err[ERRMAX];
- };
- enum
- {
- Maxatomic = 64*1024,
- };
- uint qiomaxatomic = Maxatomic;
- void
- ixsummary(void)
- {
- debugging ^= 1;
- iallocsummary();
- print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
- padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
- print("consume %lud, produce %lud, qcopy %lud\n",
- consumecnt, producecnt, qcopycnt);
- }
- /*
- * free a list of blocks
- */
- void
- freeblist(Block *b)
- {
- Block *next;
- for(; b != 0; b = next){
- next = b->next;
- if(b->ref == 1)
- b->next = nil;
- freeb(b);
- }
- }
- /*
- * pad a block to the front (or the back if size is negative)
- */
- Block*
- padblock(Block *bp, int size)
- {
- int n;
- Block *nbp;
- QDEBUG checkb(bp, "padblock 1");
- if(size >= 0){
- if(bp->rp - bp->base >= size){
- bp->rp -= size;
- return bp;
- }
- if(bp->next)
- panic("padblock %#p", getcallerpc(&bp));
- n = BLEN(bp);
- padblockcnt++;
- nbp = allocb(size+n);
- nbp->rp += size;
- nbp->wp = nbp->rp;
- memmove(nbp->wp, bp->rp, n);
- nbp->wp += n;
- freeb(bp);
- nbp->rp -= size;
- } else {
- size = -size;
- if(bp->next)
- panic("padblock %#p", getcallerpc(&bp));
- if(bp->lim - bp->wp >= size)
- return bp;
- n = BLEN(bp);
- padblockcnt++;
- nbp = allocb(size+n);
- memmove(nbp->wp, bp->rp, n);
- nbp->wp += n;
- freeb(bp);
- }
- QDEBUG checkb(nbp, "padblock 1");
- return nbp;
- }
- /*
- * return count of bytes in a string of blocks
- */
- int
- blocklen(Block *bp)
- {
- int len;
- len = 0;
- while(bp) {
- len += BLEN(bp);
- bp = bp->next;
- }
- return len;
- }
- /*
- * return count of space in blocks
- */
- int
- blockalloclen(Block *bp)
- {
- int len;
- len = 0;
- while(bp) {
- len += BALLOC(bp);
- bp = bp->next;
- }
- return len;
- }
- /*
- * copy the string of blocks into
- * a single block and free the string
- */
- Block*
- concatblock(Block *bp)
- {
- int len;
- Block *nb, *f;
- if(bp->next == 0)
- return bp;
- nb = allocb(blocklen(bp));
- for(f = bp; f; f = f->next) {
- len = BLEN(f);
- memmove(nb->wp, f->rp, len);
- nb->wp += len;
- }
- concatblockcnt += BLEN(nb);
- freeblist(bp);
- QDEBUG checkb(nb, "concatblock 1");
- return nb;
- }
- /*
- * make sure the first block has at least n bytes
- */
- Block*
- pullupblock(Block *bp, int n)
- {
- int i;
- Block *nbp;
- /*
- * this should almost always be true, it's
- * just to avoid every caller checking.
- */
- if(BLEN(bp) >= n)
- return bp;
- /*
- * if not enough room in the first block,
- * add another to the front of the list.
- */
- if(bp->lim - bp->rp < n){
- nbp = allocb(n);
- nbp->next = bp;
- bp = nbp;
- }
- /*
- * copy bytes from the trailing blocks into the first
- */
- n -= BLEN(bp);
- while(nbp = bp->next){
- i = BLEN(nbp);
- if(i > n) {
- memmove(bp->wp, nbp->rp, n);
- pullupblockcnt++;
- bp->wp += n;
- nbp->rp += n;
- QDEBUG checkb(bp, "pullupblock 1");
- return bp;
- } else {
- /* shouldn't happen but why crash if it does */
- if(i < 0){
- print("pullup negative length packet, called from %#p\n",
- getcallerpc(&bp));
- i = 0;
- }
- memmove(bp->wp, nbp->rp, i);
- pullupblockcnt++;
- bp->wp += i;
- bp->next = nbp->next;
- nbp->next = 0;
- freeb(nbp);
- n -= i;
- if(n == 0){
- QDEBUG checkb(bp, "pullupblock 2");
- return bp;
- }
- }
- }
- freeb(bp);
- return 0;
- }
- /*
- * make sure the first block has at least n bytes
- */
- Block*
- pullupqueue(Queue *q, int n)
- {
- Block *b;
- if(BLEN(q->bfirst) >= n)
- return q->bfirst;
- q->bfirst = pullupblock(q->bfirst, n);
- for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
- ;
- q->blast = b;
- return q->bfirst;
- }
- /*
- * trim to len bytes starting at offset
- */
- Block *
- trimblock(Block *bp, int offset, int len)
- {
- ulong l;
- Block *nb, *startb;
- QDEBUG checkb(bp, "trimblock 1");
- if(blocklen(bp) < offset+len) {
- freeblist(bp);
- return nil;
- }
- while((l = BLEN(bp)) < offset) {
- offset -= l;
- nb = bp->next;
- bp->next = nil;
- freeb(bp);
- bp = nb;
- }
- startb = bp;
- bp->rp += offset;
- while((l = BLEN(bp)) < len) {
- len -= l;
- bp = bp->next;
- }
- bp->wp -= (BLEN(bp) - len);
- if(bp->next) {
- freeblist(bp->next);
- bp->next = nil;
- }
- return startb;
- }
- /*
- * copy 'count' bytes into a new block
- */
- Block*
- copyblock(Block *bp, int count)
- {
- int l;
- Block *nbp;
- QDEBUG checkb(bp, "copyblock 0");
- nbp = allocb(count);
- for(; count > 0 && bp != 0; bp = bp->next){
- l = BLEN(bp);
- if(l > count)
- l = count;
- memmove(nbp->wp, bp->rp, l);
- nbp->wp += l;
- count -= l;
- }
- if(count > 0){
- memset(nbp->wp, 0, count);
- nbp->wp += count;
- }
- copyblockcnt++;
- QDEBUG checkb(nbp, "copyblock 1");
- return nbp;
- }
- Block*
- adjustblock(Block* bp, int len)
- {
- int n;
- Block *nbp;
- if(len < 0){
- freeb(bp);
- return nil;
- }
- if(bp->rp+len > bp->lim){
- nbp = copyblock(bp, len);
- freeblist(bp);
- QDEBUG checkb(nbp, "adjustblock 1");
- return nbp;
- }
- n = BLEN(bp);
- if(len > n)
- memset(bp->wp, 0, len-n);
- bp->wp = bp->rp+len;
- QDEBUG checkb(bp, "adjustblock 2");
- return bp;
- }
- /*
- * throw away up to count bytes from a
- * list of blocks. Return count of bytes
- * thrown away.
- */
- int
- pullblock(Block **bph, int count)
- {
- Block *bp;
- int n, bytes;
- bytes = 0;
- if(bph == nil)
- return 0;
- while(*bph != nil && count != 0) {
- bp = *bph;
- n = BLEN(bp);
- if(count < n)
- n = count;
- bytes += n;
- count -= n;
- bp->rp += n;
- QDEBUG checkb(bp, "pullblock ");
- if(BLEN(bp) == 0) {
- *bph = bp->next;
- bp->next = nil;
- freeb(bp);
- }
- }
- return bytes;
- }
- /*
- * get next block from a queue, return null if nothing there
- */
- Block*
- qget(Queue *q)
- {
- int dowakeup;
- Block *b;
- /* sync with qwrite */
- ilock(q);
- b = q->bfirst;
- if(b == nil){
- q->state |= Qstarve;
- iunlock(q);
- return nil;
- }
- q->bfirst = b->next;
- b->next = 0;
- q->len -= BALLOC(b);
- q->dlen -= BLEN(b);
- QDEBUG checkb(b, "qget");
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
- iunlock(q);
- if(dowakeup)
- wakeup(&q->wr);
- return b;
- }
- /*
- * throw away the next 'len' bytes in the queue
- */
- int
- qdiscard(Queue *q, int len)
- {
- Block *b;
- int dowakeup, n, sofar;
- ilock(q);
- for(sofar = 0; sofar < len; sofar += n){
- b = q->bfirst;
- if(b == nil)
- break;
- QDEBUG checkb(b, "qdiscard");
- n = BLEN(b);
- if(n <= len - sofar){
- q->bfirst = b->next;
- b->next = 0;
- q->len -= BALLOC(b);
- q->dlen -= BLEN(b);
- freeb(b);
- } else {
- n = len - sofar;
- b->rp += n;
- q->dlen -= n;
- }
- }
- /*
- * if writer flow controlled, restart
- *
- * This used to be
- * q->len < q->limit/2
- * but it slows down tcp too much for certain write sizes.
- * I really don't understand it completely. It may be
- * due to the queue draining so fast that the transmission
- * stalls waiting for the app to produce more data. - presotto
- */
- if((q->state & Qflow) && q->len < q->limit){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
- iunlock(q);
- if(dowakeup)
- wakeup(&q->wr);
- return sofar;
- }
- /*
- * Interrupt level copy out of a queue, return # bytes copied.
- */
- int
- qconsume(Queue *q, void *vp, int len)
- {
- Block *b;
- int n, dowakeup;
- uchar *p = vp;
- Block *tofree = nil;
- /* sync with qwrite */
- ilock(q);
- for(;;) {
- b = q->bfirst;
- if(b == 0){
- q->state |= Qstarve;
- iunlock(q);
- return -1;
- }
- QDEBUG checkb(b, "qconsume 1");
- n = BLEN(b);
- if(n > 0)
- break;
- q->bfirst = b->next;
- q->len -= BALLOC(b);
- /* remember to free this */
- b->next = tofree;
- tofree = b;
- };
- if(n < len)
- len = n;
- memmove(p, b->rp, len);
- consumecnt += n;
- b->rp += len;
- q->dlen -= len;
- /* discard the block if we're done with it */
- if((q->state & Qmsg) || len == n){
- q->bfirst = b->next;
- b->next = 0;
- q->len -= BALLOC(b);
- q->dlen -= BLEN(b);
- /* remember to free this */
- b->next = tofree;
- tofree = b;
- }
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
- iunlock(q);
- if(dowakeup)
- wakeup(&q->wr);
- if(tofree != nil)
- freeblist(tofree);
- return len;
- }
- int
- qpass(Queue *q, Block *b)
- {
- int dlen, len, dowakeup;
- /* sync with qread */
- dowakeup = 0;
- ilock(q);
- if(q->len >= q->limit){
- freeblist(b);
- iunlock(q);
- return -1;
- }
- if(q->state & Qclosed){
- len = BALLOC(b);
- freeblist(b);
- iunlock(q);
- return len;
- }
- /* add buffer to queue */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- len = BALLOC(b);
- dlen = BLEN(b);
- QDEBUG checkb(b, "qpass");
- while(b->next){
- b = b->next;
- QDEBUG checkb(b, "qpass");
- len += BALLOC(b);
- dlen += BLEN(b);
- }
- q->blast = b;
- q->len += len;
- q->dlen += dlen;
- if(q->len >= q->limit/2)
- q->state |= Qflow;
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(q);
- if(dowakeup)
- wakeup(&q->rr);
- return len;
- }
- int
- qpassnolim(Queue *q, Block *b)
- {
- int dlen, len, dowakeup;
- /* sync with qread */
- dowakeup = 0;
- ilock(q);
- if(q->state & Qclosed){
- freeblist(b);
- iunlock(q);
- return BALLOC(b);
- }
- /* add buffer to queue */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- len = BALLOC(b);
- dlen = BLEN(b);
- QDEBUG checkb(b, "qpass");
- while(b->next){
- b = b->next;
- QDEBUG checkb(b, "qpass");
- len += BALLOC(b);
- dlen += BLEN(b);
- }
- q->blast = b;
- q->len += len;
- q->dlen += dlen;
- if(q->len >= q->limit/2)
- q->state |= Qflow;
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(q);
- if(dowakeup)
- wakeup(&q->rr);
- return len;
- }
- /*
- * if the allocated space is way out of line with the used
- * space, reallocate to a smaller block
- */
- Block*
- packblock(Block *bp)
- {
- Block **l, *nbp;
- int n;
- for(l = &bp; *l; l = &(*l)->next){
- nbp = *l;
- n = BLEN(nbp);
- if((n<<2) < BALLOC(nbp)){
- *l = allocb(n);
- memmove((*l)->wp, nbp->rp, n);
- (*l)->wp += n;
- (*l)->next = nbp->next;
- freeb(nbp);
- }
- }
- return bp;
- }
- int
- qproduce(Queue *q, void *vp, int len)
- {
- Block *b;
- int dowakeup;
- uchar *p = vp;
- /* sync with qread */
- dowakeup = 0;
- ilock(q);
- /* no waiting receivers, room in buffer? */
- if(q->len >= q->limit){
- q->state |= Qflow;
- iunlock(q);
- return -1;
- }
- /* save in buffer */
- b = iallocb(len);
- if(b == 0){
- iunlock(q);
- return 0;
- }
- memmove(b->wp, p, len);
- producecnt += len;
- b->wp += len;
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- /* b->next = 0; done by iallocb() */
- q->len += BALLOC(b);
- q->dlen += BLEN(b);
- QDEBUG checkb(b, "qproduce");
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- if(q->len >= q->limit)
- q->state |= Qflow;
- iunlock(q);
- if(dowakeup)
- wakeup(&q->rr);
- return len;
- }
- /*
- * copy from offset in the queue
- */
- Block*
- qcopy(Queue *q, int len, ulong offset)
- {
- int sofar;
- int n;
- Block *b, *nb;
- uchar *p;
- nb = allocb(len);
- ilock(q);
- /* go to offset */
- b = q->bfirst;
- for(sofar = 0; ; sofar += n){
- if(b == nil){
- iunlock(q);
- return nb;
- }
- n = BLEN(b);
- if(sofar + n > offset){
- p = b->rp + offset - sofar;
- n -= offset - sofar;
- break;
- }
- QDEBUG checkb(b, "qcopy");
- b = b->next;
- }
- /* copy bytes from there */
- for(sofar = 0; sofar < len;){
- if(n > len - sofar)
- n = len - sofar;
- memmove(nb->wp, p, n);
- qcopycnt += n;
- sofar += n;
- nb->wp += n;
- b = b->next;
- if(b == nil)
- break;
- n = BLEN(b);
- p = b->rp;
- }
- iunlock(q);
- return nb;
- }
- /*
- * called by non-interrupt code
- */
- Queue*
- qopen(int limit, int msg, void (*kick)(void*), void *arg)
- {
- Queue *q;
- q = malloc(sizeof(Queue));
- if(q == 0)
- return 0;
- q->limit = q->inilim = limit;
- q->kick = kick;
- q->arg = arg;
- q->state = msg;
-
- q->state |= Qstarve;
- q->eof = 0;
- q->noblock = 0;
- return q;
- }
- /* open a queue to be bypassed */
- Queue*
- qbypass(void (*bypass)(void*, Block*), void *arg)
- {
- Queue *q;
- q = malloc(sizeof(Queue));
- if(q == 0)
- return 0;
- q->limit = 0;
- q->arg = arg;
- q->bypass = bypass;
- q->state = 0;
- return q;
- }
- static int
- notempty(void *a)
- {
- Queue *q = a;
- return (q->state & Qclosed) || q->bfirst != 0;
- }
- /*
- * wait for the queue to be non-empty or closed.
- * called with q ilocked.
- */
- static int
- qwait(Queue *q)
- {
- /* wait for data */
- for(;;){
- if(q->bfirst != nil)
- break;
- if(q->state & Qclosed){
- if(++q->eof > 3)
- return -1;
- if(*q->err && strcmp(q->err, Ehungup) != 0)
- return -1;
- return 0;
- }
- q->state |= Qstarve; /* flag requesting producer to wake me */
- iunlock(q);
- sleep(&q->rr, notempty, q);
- ilock(q);
- }
- return 1;
- }
- /*
- * add a block list to a queue
- */
- void
- qaddlist(Queue *q, Block *b)
- {
- /* queue the block */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->len += blockalloclen(b);
- q->dlen += blocklen(b);
- while(b->next)
- b = b->next;
- q->blast = b;
- }
- /*
- * called with q ilocked
- */
- Block*
- qremove(Queue *q)
- {
- Block *b;
- b = q->bfirst;
- if(b == nil)
- return nil;
- q->bfirst = b->next;
- b->next = nil;
- q->dlen -= BLEN(b);
- q->len -= BALLOC(b);
- QDEBUG checkb(b, "qremove");
- return b;
- }
- /*
- * copy the contents of a string of blocks into
- * memory. emptied blocks are freed. return
- * pointer to first unconsumed block.
- */
- Block*
- bl2mem(uchar *p, Block *b, int n)
- {
- int i;
- Block *next;
- for(; b != nil; b = next){
- i = BLEN(b);
- if(i > n){
- memmove(p, b->rp, n);
- b->rp += n;
- return b;
- }
- memmove(p, b->rp, i);
- n -= i;
- p += i;
- b->rp += i;
- next = b->next;
- freeb(b);
- }
- return nil;
- }
- /*
- * copy the contents of memory into a string of blocks.
- * return nil on error.
- */
- Block*
- mem2bl(uchar *p, int len)
- {
- int n;
- Block *b, *first, **l;
- first = nil;
- l = &first;
- if(waserror()){
- freeblist(first);
- nexterror();
- }
- do {
- n = len;
- if(n > Maxatomic)
- n = Maxatomic;
- *l = b = allocb(n);
- setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
- memmove(b->wp, p, n);
- b->wp += n;
- p += n;
- len -= n;
- l = &b->next;
- } while(len > 0);
- poperror();
- return first;
- }
- /*
- * put a block back to the front of the queue
- * called with q ilocked
- */
- void
- qputback(Queue *q, Block *b)
- {
- b->next = q->bfirst;
- if(q->bfirst == nil)
- q->blast = b;
- q->bfirst = b;
- q->len += BALLOC(b);
- q->dlen += BLEN(b);
- }
- /*
- * flow control, get producer going again
- * called with q ilocked
- */
- static void
- qwakeup_iunlock(Queue *q)
- {
- int dowakeup = 0;
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- }
- iunlock(q);
- /* wakeup flow controlled writers */
- if(dowakeup){
- if(q->kick)
- q->kick(q->arg);
- wakeup(&q->wr);
- }
- }
- /*
- * get next block from a queue (up to a limit)
- */
- Block*
- qbread(Queue *q, int len)
- {
- Block *b, *nb;
- int n;
- qlock(&q->rlock);
- if(waserror()){
- qunlock(&q->rlock);
- nexterror();
- }
- ilock(q);
- switch(qwait(q)){
- case 0:
- /* queue closed */
- iunlock(q);
- qunlock(&q->rlock);
- poperror();
- return nil;
- case -1:
- /* multiple reads on a closed queue */
- iunlock(q);
- error(q->err);
- }
- /* if we get here, there's at least one block in the queue */
- b = qremove(q);
- n = BLEN(b);
- /* split block if it's too big and this is not a message queue */
- nb = b;
- if(n > len){
- if((q->state&Qmsg) == 0){
- n -= len;
- b = allocb(n);
- memmove(b->wp, nb->rp+len, n);
- b->wp += n;
- qputback(q, b);
- }
- nb->wp = nb->rp + len;
- }
- /* restart producer */
- qwakeup_iunlock(q);
- poperror();
- qunlock(&q->rlock);
- return nb;
- }
- /*
- * read a queue. if no data is queued, post a Block
- * and wait on its Rendez.
- */
- long
- qread(Queue *q, void *vp, int len)
- {
- Block *b, *first, **l;
- int m, n;
- qlock(&q->rlock);
- if(waserror()){
- qunlock(&q->rlock);
- nexterror();
- }
- ilock(q);
- again:
- switch(qwait(q)){
- case 0:
- /* queue closed */
- iunlock(q);
- qunlock(&q->rlock);
- poperror();
- return 0;
- case -1:
- /* multiple reads on a closed queue */
- iunlock(q);
- error(q->err);
- }
- /* if we get here, there's at least one block in the queue */
- if(q->state & Qcoalesce){
- /* when coalescing, 0 length blocks just go away */
- b = q->bfirst;
- if(BLEN(b) <= 0){
- freeb(qremove(q));
- goto again;
- }
- /* grab the first block plus as many
- * following blocks as will completely
- * fit in the read.
- */
- n = 0;
- l = &first;
- m = BLEN(b);
- for(;;) {
- *l = qremove(q);
- l = &b->next;
- n += m;
- b = q->bfirst;
- if(b == nil)
- break;
- m = BLEN(b);
- if(n+m > len)
- break;
- }
- } else {
- first = qremove(q);
- n = BLEN(first);
- }
- /* copy to user space outside of the ilock */
- iunlock(q);
- b = bl2mem(vp, first, len);
- ilock(q);
- /* take care of any left over partial block */
- if(b != nil){
- n -= BLEN(b);
- if(q->state & Qmsg)
- freeb(b);
- else
- qputback(q, b);
- }
- /* restart producer */
- qwakeup_iunlock(q);
- poperror();
- qunlock(&q->rlock);
- return n;
- }
- static int
- qnotfull(void *a)
- {
- Queue *q = a;
- return q->len < q->limit || (q->state & Qclosed);
- }
- ulong noblockcnt;
- /*
- * add a block to a queue obeying flow control
- */
- long
- qbwrite(Queue *q, Block *b)
- {
- int n, dowakeup;
- Proc *p;
- n = BLEN(b);
- if(q->bypass){
- (*q->bypass)(q->arg, b);
- return n;
- }
- dowakeup = 0;
- qlock(&q->wlock);
- if(waserror()){
- if(b != nil)
- freeb(b);
- qunlock(&q->wlock);
- nexterror();
- }
- ilock(q);
- /* give up if the queue is closed */
- if(q->state & Qclosed){
- iunlock(q);
- error(q->err);
- }
- /* if nonblocking, don't queue over the limit */
- if(q->len >= q->limit){
- if(q->noblock){
- iunlock(q);
- freeb(b);
- noblockcnt += n;
- qunlock(&q->wlock);
- poperror();
- return n;
- }
- }
- /* queue the block */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- b->next = 0;
- q->len += BALLOC(b);
- q->dlen += n;
- QDEBUG checkb(b, "qbwrite");
- b = nil;
- /* make sure other end gets awakened */
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(q);
- /* get output going again */
- if(q->kick && (dowakeup || (q->state&Qkick)))
- q->kick(q->arg);
- /* wakeup anyone consuming at the other end */
- if(dowakeup){
- p = wakeup(&q->rr);
- /* if we just wokeup a higher priority process, let it run */
- if(p != nil && p->priority > up->priority)
- sched();
- }
- /*
- * flow control, wait for queue to get below the limit
- * before allowing the process to continue and queue
- * more. We do this here so that postnote can only
- * interrupt us after the data has been queued. This
- * means that things like 9p flushes and ssl messages
- * will not be disrupted by software interrupts.
- *
- * Note - this is moderately dangerous since a process
- * that keeps getting interrupted and rewriting will
- * queue infinite crud.
- */
- for(;;){
- if(q->noblock || qnotfull(q))
- break;
- ilock(q);
- q->state |= Qflow;
- iunlock(q);
- sleep(&q->wr, qnotfull, q);
- }
- USED(b);
- qunlock(&q->wlock);
- poperror();
- return n;
- }
- /*
- * write to a queue. only Maxatomic bytes at a time is atomic.
- */
- int
- qwrite(Queue *q, void *vp, int len)
- {
- int n, sofar;
- Block *b;
- uchar *p = vp;
- QDEBUG if(!islo())
- print("qwrite hi %#p\n", getcallerpc(&q));
- sofar = 0;
- do {
- n = len-sofar;
- if(n > Maxatomic)
- n = Maxatomic;
- b = allocb(n);
- setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
- if(waserror()){
- freeb(b);
- nexterror();
- }
- memmove(b->wp, p+sofar, n);
- poperror();
- b->wp += n;
- qbwrite(q, b);
- sofar += n;
- } while(sofar < len && (q->state & Qmsg) == 0);
- return len;
- }
- /*
- * used by print() to write to a queue. Since we may be splhi or not in
- * a process, don't qlock.
- *
- * this routine merges adjacent blocks if block n+1 will fit into
- * the free space of block n.
- */
- int
- qiwrite(Queue *q, void *vp, int len)
- {
- int n, sofar, dowakeup;
- Block *b;
- uchar *p = vp;
- dowakeup = 0;
- sofar = 0;
- do {
- n = len-sofar;
- if(n > Maxatomic)
- n = Maxatomic;
- b = iallocb(n);
- if(b == nil)
- break;
- memmove(b->wp, p+sofar, n);
- b->wp += n;
- ilock(q);
- /* we use an artificially high limit for kernel prints since anything
- * over the limit gets dropped
- */
- if(q->dlen >= 16*1024){
- iunlock(q);
- freeb(b);
- break;
- }
- QDEBUG checkb(b, "qiwrite");
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- q->len += BALLOC(b);
- q->dlen += n;
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(q);
- if(dowakeup){
- if(q->kick)
- q->kick(q->arg);
- wakeup(&q->rr);
- }
- sofar += n;
- } while(sofar < len && (q->state & Qmsg) == 0);
- return sofar;
- }
- /*
- * be extremely careful when calling this,
- * as there is no reference accounting
- */
- void
- qfree(Queue *q)
- {
- qclose(q);
- free(q);
- }
- /*
- * Mark a queue as closed. No further IO is permitted.
- * All blocks are released.
- */
- void
- qclose(Queue *q)
- {
- Block *bfirst;
- if(q == nil)
- return;
- /* mark it */
- ilock(q);
- q->state |= Qclosed;
- q->state &= ~(Qflow|Qstarve);
- strcpy(q->err, Ehungup);
- bfirst = q->bfirst;
- q->bfirst = 0;
- q->len = 0;
- q->dlen = 0;
- q->noblock = 0;
- iunlock(q);
- /* free queued blocks */
- freeblist(bfirst);
- /* wake up readers/writers */
- wakeup(&q->rr);
- wakeup(&q->wr);
- }
- /*
- * Mark a queue as closed. Wakeup any readers. Don't remove queued
- * blocks.
- */
- void
- qhangup(Queue *q, char *msg)
- {
- /* mark it */
- ilock(q);
- q->state |= Qclosed;
- if(msg == 0 || *msg == 0)
- strcpy(q->err, Ehungup);
- else
- strncpy(q->err, msg, ERRMAX-1);
- iunlock(q);
- /* wake up readers/writers */
- wakeup(&q->rr);
- wakeup(&q->wr);
- }
- /*
- * return non-zero if the q is hungup
- */
- int
- qisclosed(Queue *q)
- {
- return q->state & Qclosed;
- }
- /*
- * mark a queue as no longer hung up
- */
- void
- qreopen(Queue *q)
- {
- ilock(q);
- q->state &= ~Qclosed;
- q->state |= Qstarve;
- q->eof = 0;
- q->limit = q->inilim;
- iunlock(q);
- }
- /*
- * return bytes queued
- */
- int
- qlen(Queue *q)
- {
- return q->dlen;
- }
- /*
- * return space remaining before flow control
- */
- int
- qwindow(Queue *q)
- {
- int l;
- l = q->limit - q->len;
- if(l < 0)
- l = 0;
- return l;
- }
- /*
- * return true if we can read without blocking
- */
- int
- qcanread(Queue *q)
- {
- return q->bfirst!=0;
- }
- /*
- * change queue limit
- */
- void
- qsetlimit(Queue *q, int limit)
- {
- q->limit = limit;
- }
- /*
- * set blocking/nonblocking
- */
- void
- qnoblock(Queue *q, int onoff)
- {
- q->noblock = onoff;
- }
- /*
- * flush the output queue
- */
- void
- qflush(Queue *q)
- {
- Block *bfirst;
- /* mark it */
- ilock(q);
- bfirst = q->bfirst;
- q->bfirst = 0;
- q->len = 0;
- q->dlen = 0;
- iunlock(q);
- /* free queued blocks */
- freeblist(bfirst);
- /* wake up readers/writers */
- wakeup(&q->wr);
- }
- int
- qfull(Queue *q)
- {
- return q->state & Qflow;
- }
- int
- qstate(Queue *q)
- {
- return q->state;
- }
|