12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532 |
- #include "u.h"
- #include "../port/lib.h"
- #include "mem.h"
- #include "dat.h"
- #include "fns.h"
- #include "../port/error.h"
- static ulong padblockcnt;
- static ulong concatblockcnt;
- static ulong pullupblockcnt;
- static ulong copyblockcnt;
- static ulong consumecnt;
- static ulong producecnt;
- static ulong qcopycnt;
- static int debugging;
- #define QDEBUG if(0)
- /*
- * IO queues
- */
- typedef struct Queue Queue;
- struct Queue
- {
- Lock;
- Block* bfirst; /* buffer */
- Block* blast;
- int len; /* bytes allocated to queue */
- int dlen; /* data bytes in queue */
- int limit; /* max bytes in queue */
- int inilim; /* initial limit */
- int state;
- int noblock; /* true if writes return immediately when q full */
- int eof; /* number of eofs read by user */
- void (*kick)(void*); /* restart output */
- void (*bypass)(void*, Block*); /* bypass queue altogether */
- void* arg; /* argument to kick */
- QLock rlock; /* mutex for reading processes */
- Rendez rr; /* process waiting to read */
- QLock wlock; /* mutex for writing processes */
- Rendez wr; /* process waiting to write */
- char err[ERRMAX];
- };
- enum
- {
- Maxatomic = 64*1024,
- };
- uint qiomaxatomic = Maxatomic;
- void
- ixsummary(void)
- {
- debugging ^= 1;
- iallocsummary();
- print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
- padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
- print("consume %lud, produce %lud, qcopy %lud\n",
- consumecnt, producecnt, qcopycnt);
- }
- /*
- * free a list of blocks
- */
- void
- freeblist(Block *b)
- {
- Block *next;
- for(; b != 0; b = next){
- next = b->next;
- b->next = 0;
- freeb(b);
- }
- }
- /*
- * pad a block to the front (or the back if size is negative)
- */
- Block*
- padblock(Block *bp, int size)
- {
- int n;
- Block *nbp;
- QDEBUG checkb(bp, "padblock 1");
- if(size >= 0){
- if(bp->rp - bp->base >= size){
- bp->rp -= size;
- return bp;
- }
- if(bp->next)
- panic("padblock 0x%luX", getcallerpc(&bp));
- n = BLEN(bp);
- padblockcnt++;
- nbp = allocb(size+n);
- nbp->rp += size;
- nbp->wp = nbp->rp;
- memmove(nbp->wp, bp->rp, n);
- nbp->wp += n;
- freeb(bp);
- nbp->rp -= size;
- } else {
- size = -size;
- if(bp->next)
- panic("padblock 0x%luX", getcallerpc(&bp));
- if(bp->lim - bp->wp >= size)
- return bp;
- n = BLEN(bp);
- padblockcnt++;
- nbp = allocb(size+n);
- memmove(nbp->wp, bp->rp, n);
- nbp->wp += n;
- freeb(bp);
- }
- QDEBUG checkb(nbp, "padblock 1");
- return nbp;
- }
- /*
- * return count of bytes in a string of blocks
- */
- int
- blocklen(Block *bp)
- {
- int len;
- len = 0;
- while(bp) {
- len += BLEN(bp);
- bp = bp->next;
- }
- return len;
- }
- /*
- * return count of space in blocks
- */
- int
- blockalloclen(Block *bp)
- {
- int len;
- len = 0;
- while(bp) {
- len += BALLOC(bp);
- bp = bp->next;
- }
- return len;
- }
- /*
- * copy the string of blocks into
- * a single block and free the string
- */
- Block*
- concatblock(Block *bp)
- {
- int len;
- Block *nb, *f;
- if(bp->next == 0)
- return bp;
- nb = allocb(blocklen(bp));
- for(f = bp; f; f = f->next) {
- len = BLEN(f);
- memmove(nb->wp, f->rp, len);
- nb->wp += len;
- }
- concatblockcnt += BLEN(nb);
- freeblist(bp);
- QDEBUG checkb(nb, "concatblock 1");
- return nb;
- }
- /*
- * make sure the first block has at least n bytes
- */
- Block*
- pullupblock(Block *bp, int n)
- {
- int i;
- Block *nbp;
- /*
- * this should almost always be true, it's
- * just to avoid every caller checking.
- */
- if(BLEN(bp) >= n)
- return bp;
- /*
- * if not enough room in the first block,
- * add another to the front of the list.
- */
- if(bp->lim - bp->rp < n){
- nbp = allocb(n);
- nbp->next = bp;
- bp = nbp;
- }
- /*
- * copy bytes from the trailing blocks into the first
- */
- n -= BLEN(bp);
- while(nbp = bp->next){
- i = BLEN(nbp);
- if(i > n) {
- memmove(bp->wp, nbp->rp, n);
- pullupblockcnt++;
- bp->wp += n;
- nbp->rp += n;
- QDEBUG checkb(bp, "pullupblock 1");
- return bp;
- } else {
- /* shouldn't happen but why crash if it does */
- if(i < 0){
- print("pullup negative length packet\n");
- i = 0;
- }
- memmove(bp->wp, nbp->rp, i);
- pullupblockcnt++;
- bp->wp += i;
- bp->next = nbp->next;
- nbp->next = 0;
- freeb(nbp);
- n -= i;
- if(n == 0){
- QDEBUG checkb(bp, "pullupblock 2");
- return bp;
- }
- }
- }
- freeb(bp);
- return 0;
- }
- /*
- * make sure the first block has at least n bytes
- */
- Block*
- pullupqueue(Queue *q, int n)
- {
- Block *b;
- if(BLEN(q->bfirst) >= n)
- return q->bfirst;
- q->bfirst = pullupblock(q->bfirst, n);
- for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
- ;
- q->blast = b;
- return q->bfirst;
- }
- /*
- * trim to len bytes starting at offset
- */
- Block *
- trimblock(Block *bp, int offset, int len)
- {
- ulong l;
- Block *nb, *startb;
- QDEBUG checkb(bp, "trimblock 1");
- if(blocklen(bp) < offset+len) {
- freeblist(bp);
- return nil;
- }
- while((l = BLEN(bp)) < offset) {
- offset -= l;
- nb = bp->next;
- bp->next = nil;
- freeb(bp);
- bp = nb;
- }
- startb = bp;
- bp->rp += offset;
- while((l = BLEN(bp)) < len) {
- len -= l;
- bp = bp->next;
- }
- bp->wp -= (BLEN(bp) - len);
- if(bp->next) {
- freeblist(bp->next);
- bp->next = nil;
- }
- return startb;
- }
- /*
- * copy 'count' bytes into a new block
- */
- Block*
- copyblock(Block *bp, int count)
- {
- int l;
- Block *nbp;
- QDEBUG checkb(bp, "copyblock 0");
- nbp = allocb(count);
- for(; count > 0 && bp != 0; bp = bp->next){
- l = BLEN(bp);
- if(l > count)
- l = count;
- memmove(nbp->wp, bp->rp, l);
- nbp->wp += l;
- count -= l;
- }
- if(count > 0){
- memset(nbp->wp, 0, count);
- nbp->wp += count;
- }
- copyblockcnt++;
- QDEBUG checkb(nbp, "copyblock 1");
- return nbp;
- }
- Block*
- adjustblock(Block* bp, int len)
- {
- int n;
- Block *nbp;
- if(len < 0){
- freeb(bp);
- return nil;
- }
- if(bp->rp+len > bp->lim){
- nbp = copyblock(bp, len);
- freeblist(bp);
- QDEBUG checkb(nbp, "adjustblock 1");
- return nbp;
- }
- n = BLEN(bp);
- if(len > n)
- memset(bp->wp, 0, len-n);
- bp->wp = bp->rp+len;
- QDEBUG checkb(bp, "adjustblock 2");
- return bp;
- }
- /*
- * throw away up to count bytes from a
- * list of blocks. Return count of bytes
- * thrown away.
- */
- int
- pullblock(Block **bph, int count)
- {
- Block *bp;
- int n, bytes;
- bytes = 0;
- if(bph == nil)
- return 0;
- while(*bph != nil && count != 0) {
- bp = *bph;
- n = BLEN(bp);
- if(count < n)
- n = count;
- bytes += n;
- count -= n;
- bp->rp += n;
- QDEBUG checkb(bp, "pullblock ");
- if(BLEN(bp) == 0) {
- *bph = bp->next;
- bp->next = nil;
- freeb(bp);
- }
- }
- return bytes;
- }
- /*
- * get next block from a queue, return null if nothing there
- */
- Block*
- qget(Queue *q)
- {
- int dowakeup;
- Block *b;
- /* sync with qwrite */
- ilock(q);
- b = q->bfirst;
- if(b == nil){
- q->state |= Qstarve;
- iunlock(q);
- return nil;
- }
- q->bfirst = b->next;
- b->next = 0;
- q->len -= BALLOC(b);
- q->dlen -= BLEN(b);
- QDEBUG checkb(b, "qget");
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
- iunlock(q);
- if(dowakeup)
- wakeup(&q->wr);
- return b;
- }
- /*
- * throw away the next 'len' bytes in the queue
- */
- int
- qdiscard(Queue *q, int len)
- {
- Block *b;
- int dowakeup, n, sofar;
- ilock(q);
- for(sofar = 0; sofar < len; sofar += n){
- b = q->bfirst;
- if(b == nil)
- break;
- QDEBUG checkb(b, "qdiscard");
- n = BLEN(b);
- if(n <= len - sofar){
- q->bfirst = b->next;
- b->next = 0;
- q->len -= BALLOC(b);
- q->dlen -= BLEN(b);
- freeb(b);
- } else {
- n = len - sofar;
- b->rp += n;
- q->dlen -= n;
- }
- }
- /*
- * if writer flow controlled, restart
- *
- * This used to be
- * q->len < q->limit/2
- * but it slows down tcp too much for certain write sizes.
- * I really don't understand it completely. It may be
- * due to the queue draining so fast that the transmission
- * stalls waiting for the app to produce more data. - presotto
- */
- if((q->state & Qflow) && q->len < q->limit){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
- iunlock(q);
- if(dowakeup)
- wakeup(&q->wr);
- return sofar;
- }
- /*
- * Interrupt level copy out of a queue, return # bytes copied.
- */
- int
- qconsume(Queue *q, void *vp, int len)
- {
- Block *b;
- int n, dowakeup;
- uchar *p = vp;
- Block *tofree = nil;
- /* sync with qwrite */
- ilock(q);
- for(;;) {
- b = q->bfirst;
- if(b == 0){
- q->state |= Qstarve;
- iunlock(q);
- return -1;
- }
- QDEBUG checkb(b, "qconsume 1");
- n = BLEN(b);
- if(n > 0)
- break;
- q->bfirst = b->next;
- q->len -= BALLOC(b);
- /* remember to free this */
- b->next = tofree;
- tofree = b;
- };
- if(n < len)
- len = n;
- memmove(p, b->rp, len);
- consumecnt += n;
- b->rp += len;
- q->dlen -= len;
- /* discard the block if we're done with it */
- if((q->state & Qmsg) || len == n){
- q->bfirst = b->next;
- b->next = 0;
- q->len -= BALLOC(b);
- q->dlen -= BLEN(b);
- /* remember to free this */
- b->next = tofree;
- tofree = b;
- }
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
- iunlock(q);
- if(dowakeup)
- wakeup(&q->wr);
- if(tofree != nil)
- freeblist(tofree);
- return len;
- }
- int
- qpass(Queue *q, Block *b)
- {
- int dlen, len, dowakeup;
- /* sync with qread */
- dowakeup = 0;
- ilock(q);
- if(q->len >= q->limit){
- freeblist(b);
- iunlock(q);
- return -1;
- }
- if(q->state & Qclosed){
- freeblist(b);
- iunlock(q);
- return BALLOC(b);
- }
- /* add buffer to queue */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- len = BALLOC(b);
- dlen = BLEN(b);
- QDEBUG checkb(b, "qpass");
- while(b->next){
- b = b->next;
- QDEBUG checkb(b, "qpass");
- len += BALLOC(b);
- dlen += BLEN(b);
- }
- q->blast = b;
- q->len += len;
- q->dlen += dlen;
- if(q->len >= q->limit/2)
- q->state |= Qflow;
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(q);
- if(dowakeup)
- wakeup(&q->rr);
- return len;
- }
- int
- qpassnolim(Queue *q, Block *b)
- {
- int dlen, len, dowakeup;
- /* sync with qread */
- dowakeup = 0;
- ilock(q);
- if(q->state & Qclosed){
- freeblist(b);
- iunlock(q);
- return BALLOC(b);
- }
- /* add buffer to queue */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- len = BALLOC(b);
- dlen = BLEN(b);
- QDEBUG checkb(b, "qpass");
- while(b->next){
- b = b->next;
- QDEBUG checkb(b, "qpass");
- len += BALLOC(b);
- dlen += BLEN(b);
- }
- q->blast = b;
- q->len += len;
- q->dlen += dlen;
- if(q->len >= q->limit/2)
- q->state |= Qflow;
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(q);
- if(dowakeup)
- wakeup(&q->rr);
- return len;
- }
- /*
- * if the allocated space is way out of line with the used
- * space, reallocate to a smaller block
- */
- Block*
- packblock(Block *bp)
- {
- Block **l, *nbp;
- int n;
- for(l = &bp; *l; l = &(*l)->next){
- nbp = *l;
- n = BLEN(nbp);
- if((n<<2) < BALLOC(nbp)){
- *l = allocb(n);
- memmove((*l)->wp, nbp->rp, n);
- (*l)->wp += n;
- (*l)->next = nbp->next;
- freeb(nbp);
- }
- }
- return bp;
- }
- int
- qproduce(Queue *q, void *vp, int len)
- {
- Block *b;
- int dowakeup;
- uchar *p = vp;
- /* sync with qread */
- dowakeup = 0;
- ilock(q);
- /* no waiting receivers, room in buffer? */
- if(q->len >= q->limit){
- q->state |= Qflow;
- iunlock(q);
- return -1;
- }
- /* save in buffer */
- b = iallocb(len);
- if(b == 0){
- iunlock(q);
- return 0;
- }
- memmove(b->wp, p, len);
- producecnt += len;
- b->wp += len;
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- /* b->next = 0; done by iallocb() */
- q->len += BALLOC(b);
- q->dlen += BLEN(b);
- QDEBUG checkb(b, "qproduce");
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- if(q->len >= q->limit)
- q->state |= Qflow;
- iunlock(q);
- if(dowakeup)
- wakeup(&q->rr);
- return len;
- }
- /*
- * copy from offset in the queue
- */
- Block*
- qcopy(Queue *q, int len, ulong offset)
- {
- int sofar;
- int n;
- Block *b, *nb;
- uchar *p;
- nb = allocb(len);
- ilock(q);
- /* go to offset */
- b = q->bfirst;
- for(sofar = 0; ; sofar += n){
- if(b == nil){
- iunlock(q);
- return nb;
- }
- n = BLEN(b);
- if(sofar + n > offset){
- p = b->rp + offset - sofar;
- n -= offset - sofar;
- break;
- }
- QDEBUG checkb(b, "qcopy");
- b = b->next;
- }
- /* copy bytes from there */
- for(sofar = 0; sofar < len;){
- if(n > len - sofar)
- n = len - sofar;
- memmove(nb->wp, p, n);
- qcopycnt += n;
- sofar += n;
- nb->wp += n;
- b = b->next;
- if(b == nil)
- break;
- n = BLEN(b);
- p = b->rp;
- }
- iunlock(q);
- return nb;
- }
- /*
- * called by non-interrupt code
- */
- Queue*
- qopen(int limit, int msg, void (*kick)(void*), void *arg)
- {
- Queue *q;
- q = malloc(sizeof(Queue));
- if(q == 0)
- return 0;
- q->limit = q->inilim = limit;
- q->kick = kick;
- q->arg = arg;
- q->state = msg;
-
- q->state |= Qstarve;
- q->eof = 0;
- q->noblock = 0;
- return q;
- }
- /* open a queue to be bypassed */
- Queue*
- qbypass(void (*bypass)(void*, Block*), void *arg)
- {
- Queue *q;
- q = malloc(sizeof(Queue));
- if(q == 0)
- return 0;
- q->limit = 0;
- q->arg = arg;
- q->bypass = bypass;
- q->state = 0;
- return q;
- }
- static int
- notempty(void *a)
- {
- Queue *q = a;
- return (q->state & Qclosed) || q->bfirst != 0;
- }
- /*
- * wait for the queue to be non-empty or closed.
- * called with q ilocked.
- */
- static int
- qwait(Queue *q)
- {
- /* wait for data */
- for(;;){
- if(q->bfirst != nil)
- break;
- if(q->state & Qclosed){
- if(++q->eof > 3)
- return -1;
- if(*q->err && strcmp(q->err, Ehungup) != 0)
- return -1;
- return 0;
- }
- q->state |= Qstarve; /* flag requesting producer to wake me */
- iunlock(q);
- sleep(&q->rr, notempty, q);
- ilock(q);
- }
- return 1;
- }
- /*
- * add a block list to a queue
- */
- void
- qaddlist(Queue *q, Block *b)
- {
- /* queue the block */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->len += blockalloclen(b);
- q->dlen += blocklen(b);
- while(b->next)
- b = b->next;
- q->blast = b;
- }
- /*
- * called with q ilocked
- */
- Block*
- qremove(Queue *q)
- {
- Block *b;
- b = q->bfirst;
- if(b == nil)
- return nil;
- q->bfirst = b->next;
- b->next = nil;
- q->dlen -= BLEN(b);
- q->len -= BALLOC(b);
- QDEBUG checkb(b, "qremove");
- return b;
- }
- /*
- * copy the contents of a string of blocks into
- * memory. emptied blocks are freed. return
- * pointer to first unconsumed block.
- */
- Block*
- bl2mem(uchar *p, Block *b, int n)
- {
- int i;
- Block *next;
- for(; b != nil; b = next){
- i = BLEN(b);
- if(i > n){
- memmove(p, b->rp, n);
- b->rp += n;
- return b;
- }
- memmove(p, b->rp, i);
- n -= i;
- p += i;
- b->rp += i;
- next = b->next;
- freeb(b);
- }
- return nil;
- }
- /*
- * copy the contents of memory into a string of blocks.
- * return nil on error.
- */
- Block*
- mem2bl(uchar *p, int len)
- {
- int n;
- Block *b, *first, **l;
- first = nil;
- l = &first;
- if(waserror()){
- freeblist(first);
- nexterror();
- }
- do {
- n = len;
- if(n > Maxatomic)
- n = Maxatomic;
- *l = b = allocb(n);
- setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
- memmove(b->wp, p, n);
- b->wp += n;
- p += n;
- len -= n;
- l = &b->next;
- } while(len > 0);
- poperror();
- return first;
- }
- /*
- * put a block back to the front of the queue
- * called with q ilocked
- */
- void
- qputback(Queue *q, Block *b)
- {
- b->next = q->bfirst;
- if(q->bfirst == nil)
- q->blast = b;
- q->bfirst = b;
- q->len += BALLOC(b);
- q->dlen += BLEN(b);
- }
- /*
- * flow control, get producer going again
- * called with q ilocked
- */
- static void
- qwakeup_iunlock(Queue *q)
- {
- int dowakeup = 0;
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- }
- iunlock(q);
- /* wakeup flow controlled writers */
- if(dowakeup){
- if(q->kick)
- q->kick(q->arg);
- wakeup(&q->wr);
- }
- }
- /*
- * get next block from a queue (up to a limit)
- */
- Block*
- qbread(Queue *q, int len)
- {
- Block *b, *nb;
- int n;
- qlock(&q->rlock);
- if(waserror()){
- qunlock(&q->rlock);
- nexterror();
- }
- ilock(q);
- switch(qwait(q)){
- case 0:
- /* queue closed */
- iunlock(q);
- qunlock(&q->rlock);
- poperror();
- return nil;
- case -1:
- /* multiple reads on a closed queue */
- iunlock(q);
- error(q->err);
- }
- /* if we get here, there's at least one block in the queue */
- b = qremove(q);
- n = BLEN(b);
- /* split block if it's too big and this is not a message queue */
- nb = b;
- if(n > len){
- if((q->state&Qmsg) == 0){
- n -= len;
- b = allocb(n);
- memmove(b->wp, nb->rp+len, n);
- b->wp += n;
- qputback(q, b);
- }
- nb->wp = nb->rp + len;
- }
- /* restart producer */
- qwakeup_iunlock(q);
- poperror();
- qunlock(&q->rlock);
- return nb;
- }
- /*
- * read a queue. if no data is queued, post a Block
- * and wait on its Rendez.
- */
- long
- qread(Queue *q, void *vp, int len)
- {
- Block *b, *first, **l;
- int m, n;
- qlock(&q->rlock);
- if(waserror()){
- qunlock(&q->rlock);
- nexterror();
- }
- ilock(q);
- again:
- switch(qwait(q)){
- case 0:
- /* queue closed */
- iunlock(q);
- qunlock(&q->rlock);
- poperror();
- return 0;
- case -1:
- /* multiple reads on a closed queue */
- iunlock(q);
- error(q->err);
- }
- /* if we get here, there's at least one block in the queue */
- if(q->state & Qcoalesce){
- /* when coalescing, 0 length blocks just go away */
- b = q->bfirst;
- if(BLEN(b) <= 0){
- freeb(qremove(q));
- goto again;
- }
- /* grab the first block plus as many
- * following blocks as will completely
- * fit in the read.
- */
- n = 0;
- l = &first;
- m = BLEN(b);
- for(;;) {
- *l = qremove(q);
- l = &b->next;
- n += m;
- b = q->bfirst;
- if(b == nil)
- break;
- m = BLEN(b);
- if(n+m > len)
- break;
- }
- } else {
- first = qremove(q);
- n = BLEN(first);
- }
- /* copy to user space outside of the ilock */
- iunlock(q);
- b = bl2mem(vp, first, len);
- ilock(q);
- /* take care of any left over partial block */
- if(b != nil){
- n -= BLEN(b);
- if(q->state & Qmsg)
- freeb(b);
- else
- qputback(q, b);
- }
- /* restart producer */
- qwakeup_iunlock(q);
- poperror();
- qunlock(&q->rlock);
- return n;
- }
- static int
- qnotfull(void *a)
- {
- Queue *q = a;
- return q->len < q->limit || (q->state & Qclosed);
- }
- ulong noblockcnt;
- /*
- * add a block to a queue obeying flow control
- */
- long
- qbwrite(Queue *q, Block *b)
- {
- int n, dowakeup;
- Proc *p;
- n = BLEN(b);
- if(q->bypass){
- (*q->bypass)(q->arg, b);
- return n;
- }
- dowakeup = 0;
- qlock(&q->wlock);
- if(waserror()){
- if(b != nil)
- freeb(b);
- qunlock(&q->wlock);
- nexterror();
- }
- ilock(q);
- /* give up if the queue is closed */
- if(q->state & Qclosed){
- iunlock(q);
- error(q->err);
- }
- /* if nonblocking, don't queue over the limit */
- if(q->len >= q->limit){
- if(q->noblock){
- iunlock(q);
- freeb(b);
- noblockcnt += n;
- qunlock(&q->wlock);
- poperror();
- return n;
- }
- }
- /* queue the block */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- b->next = 0;
- q->len += BALLOC(b);
- q->dlen += n;
- QDEBUG checkb(b, "qbwrite");
- b = nil;
- /* make sure other end gets awakened */
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(q);
- /* get output going again */
- if(q->kick && (dowakeup || (q->state&Qkick)))
- q->kick(q->arg);
- /* wakeup anyone consuming at the other end */
- if(dowakeup){
- p = wakeup(&q->rr);
- /* if we just wokeup a higher priority process, let it run */
- if(p != nil && p->priority > up->priority)
- sched();
- }
- /*
- * flow control, wait for queue to get below the limit
- * before allowing the process to continue and queue
- * more. We do this here so that postnote can only
- * interrupt us after the data has been queued. This
- * means that things like 9p flushes and ssl messages
- * will not be disrupted by software interrupts.
- *
- * Note - this is moderately dangerous since a process
- * that keeps getting interrupted and rewriting will
- * queue infinite crud.
- */
- for(;;){
- if(q->noblock || qnotfull(q))
- break;
- ilock(q);
- q->state |= Qflow;
- iunlock(q);
- sleep(&q->wr, qnotfull, q);
- }
- USED(b);
- qunlock(&q->wlock);
- poperror();
- return n;
- }
- /*
- * write to a queue. only Maxatomic bytes at a time is atomic.
- */
- int
- qwrite(Queue *q, void *vp, int len)
- {
- int n, sofar;
- Block *b;
- uchar *p = vp;
- QDEBUG if(!islo())
- print("qwrite hi %lux\n", getcallerpc(&q));
- sofar = 0;
- do {
- n = len-sofar;
- if(n > Maxatomic)
- n = Maxatomic;
- b = allocb(n);
- setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
- if(waserror()){
- freeb(b);
- nexterror();
- }
- memmove(b->wp, p+sofar, n);
- poperror();
- b->wp += n;
- qbwrite(q, b);
- sofar += n;
- } while(sofar < len && (q->state & Qmsg) == 0);
- return len;
- }
- /*
- * used by print() to write to a queue. Since we may be splhi or not in
- * a process, don't qlock.
- */
- int
- qiwrite(Queue *q, void *vp, int len)
- {
- int n, sofar, dowakeup;
- Block *b;
- uchar *p = vp;
- dowakeup = 0;
- sofar = 0;
- do {
- n = len-sofar;
- if(n > Maxatomic)
- n = Maxatomic;
- b = iallocb(n);
- if(b == nil)
- break;
- memmove(b->wp, p+sofar, n);
- b->wp += n;
- ilock(q);
- /* don't queue over the limit, just lose the bytes */
- if(q->len >= q->limit){
- iunlock(q);
- freeb(b);
- break;
- }
- QDEBUG checkb(b, "qiwrite");
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- q->len += BALLOC(b);
- q->dlen += n;
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(q);
- if(dowakeup){
- if(q->kick)
- q->kick(q->arg);
- wakeup(&q->rr);
- }
- sofar += n;
- } while(sofar < len && (q->state & Qmsg) == 0);
- return sofar;
- }
- /*
- * be extremely careful when calling this,
- * as there is no reference accounting
- */
- void
- qfree(Queue *q)
- {
- qclose(q);
- free(q);
- }
- /*
- * Mark a queue as closed. No further IO is permitted.
- * All blocks are released.
- */
- void
- qclose(Queue *q)
- {
- Block *bfirst;
- if(q == nil)
- return;
- /* mark it */
- ilock(q);
- q->state |= Qclosed;
- q->state &= ~(Qflow|Qstarve);
- strcpy(q->err, Ehungup);
- bfirst = q->bfirst;
- q->bfirst = 0;
- q->len = 0;
- q->dlen = 0;
- q->noblock = 0;
- iunlock(q);
- /* free queued blocks */
- freeblist(bfirst);
- /* wake up readers/writers */
- wakeup(&q->rr);
- wakeup(&q->wr);
- }
- /*
- * Mark a queue as closed. Wakeup any readers. Don't remove queued
- * blocks.
- */
- void
- qhangup(Queue *q, char *msg)
- {
- /* mark it */
- ilock(q);
- q->state |= Qclosed;
- if(msg == 0 || *msg == 0)
- strcpy(q->err, Ehungup);
- else
- strncpy(q->err, msg, ERRMAX-1);
- iunlock(q);
- /* wake up readers/writers */
- wakeup(&q->rr);
- wakeup(&q->wr);
- }
- /*
- * return non-zero if the q is hungup
- */
- int
- qisclosed(Queue *q)
- {
- return q->state & Qclosed;
- }
- /*
- * mark a queue as no longer hung up
- */
- void
- qreopen(Queue *q)
- {
- ilock(q);
- q->state &= ~Qclosed;
- q->state |= Qstarve;
- q->eof = 0;
- q->limit = q->inilim;
- iunlock(q);
- }
- /*
- * return bytes queued
- */
- int
- qlen(Queue *q)
- {
- return q->dlen;
- }
- /*
- * return space remaining before flow control
- */
- int
- qwindow(Queue *q)
- {
- int l;
- l = q->limit - q->len;
- if(l < 0)
- l = 0;
- return l;
- }
- /*
- * return true if we can read without blocking
- */
- int
- qcanread(Queue *q)
- {
- return q->bfirst!=0;
- }
- /*
- * change queue limit
- */
- void
- qsetlimit(Queue *q, int limit)
- {
- q->limit = limit;
- }
- /*
- * set blocking/nonblocking
- */
- void
- qnoblock(Queue *q, int onoff)
- {
- q->noblock = onoff;
- }
- /*
- * flush the output queue
- */
- void
- qflush(Queue *q)
- {
- Block *bfirst;
- /* mark it */
- ilock(q);
- bfirst = q->bfirst;
- q->bfirst = 0;
- q->len = 0;
- q->dlen = 0;
- iunlock(q);
- /* free queued blocks */
- freeblist(bfirst);
- /* wake up readers/writers */
- wakeup(&q->wr);
- }
- int
- qfull(Queue *q)
- {
- return q->state & Qflow;
- }
- int
- qstate(Queue *q)
- {
- return q->state;
- }
|