qio.c 24 KB


  1. /*
  2. * This file is part of the UCB release of Plan 9. It is subject to the license
  3. * terms in the LICENSE file found in the top-level directory of this
  4. * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
  5. * part of the UCB release of Plan 9, including this file, may be copied,
  6. * modified, propagated, or distributed except according to the terms contained
  7. * in the LICENSE file.
  8. */
  9. #include "u.h"
  10. #include "../port/lib.h"
  11. #include "mem.h"
  12. #include "dat.h"
  13. #include "fns.h"
  14. #include "../port/error.h"
  15. static u32 padblockcnt;
  16. static u32 concatblockcnt;
  17. static u32 pullupblockcnt;
  18. static u32 copyblockcnt;
  19. static u32 consumecnt;
  20. static u32 producecnt;
  21. static u32 qcopycnt;
  22. static int debugging;
  23. #define QDEBUG if(0)
  24. /*
  25. * IO queues
  26. */
  27. typedef struct Queue Queue;
  28. struct Queue {
  29. Lock Lock;
  30. Block *bfirst; /* buffer */
  31. Block *blast;
  32. int len; /* bytes allocated to queue */
  33. int dlen; /* data bytes in queue */
  34. int limit; /* max bytes in queue */
  35. int inilim; /* initial limit */
  36. int state;
  37. int noblock; /* true if writes return immediately when q full */
  38. int eof; /* number of eofs read by user */
  39. void (*kick)(void *); /* restart output */
  40. void (*bypass)(void *, Block *); /* bypass queue altogether */
  41. void *arg; /* argument to kick */
  42. QLock rlock; /* mutex for reading processes */
  43. Rendez rr; /* process waiting to read */
  44. QLock wlock; /* mutex for writing processes */
  45. Rendez wr; /* process waiting to write */
  46. char err[ERRMAX];
  47. };
  48. enum {
  49. Maxatomic = 64 * 1024,
  50. };
  51. u32 qiomaxatomic = Maxatomic;
  52. void
  53. ixsummary(void)
  54. {
  55. debugging ^= 1;
  56. iallocsummary();
  57. print("pad %lu, concat %lu, pullup %lu, copy %lu\n",
  58. padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
  59. print("consume %lu, produce %lu, qcopy %lu\n",
  60. consumecnt, producecnt, qcopycnt);
  61. }
  62. /*
  63. * free a list of blocks
  64. */
  65. void
  66. freeblist(Block *b)
  67. {
  68. Block *next;
  69. for(; b != 0; b = next){
  70. next = b->next;
  71. b->next = 0;
  72. freeb(b);
  73. }
  74. }
  75. /*
  76. * pad a block to the front (or the back if size is negative)
  77. */
  78. Block *
  79. padblock(Block *bp, int size)
  80. {
  81. int n;
  82. Block *nbp;
  83. QDEBUG checkb(bp, "padblock 1");
  84. if(size >= 0){
  85. if(bp->rp - bp->base >= size){
  86. bp->rp -= size;
  87. return bp;
  88. }
  89. if(bp->next)
  90. panic("padblock %#p", getcallerpc());
  91. n = BLEN(bp);
  92. padblockcnt++;
  93. nbp = allocb(size + n);
  94. nbp->rp += size;
  95. nbp->wp = nbp->rp;
  96. memmove(nbp->wp, bp->rp, n);
  97. nbp->wp += n;
  98. freeb(bp);
  99. nbp->rp -= size;
  100. } else {
  101. size = -size;
  102. if(bp->next)
  103. panic("padblock %#p", getcallerpc());
  104. if(bp->lim - bp->wp >= size)
  105. return bp;
  106. n = BLEN(bp);
  107. padblockcnt++;
  108. nbp = allocb(size + n);
  109. memmove(nbp->wp, bp->rp, n);
  110. nbp->wp += n;
  111. freeb(bp);
  112. }
  113. QDEBUG checkb(nbp, "padblock 1");
  114. return nbp;
  115. }
  116. /*
  117. * return count of bytes in a string of blocks
  118. */
  119. int
  120. blocklen(Block *bp)
  121. {
  122. int len;
  123. len = 0;
  124. while(bp){
  125. len += BLEN(bp);
  126. bp = bp->next;
  127. }
  128. return len;
  129. }
  130. /*
  131. * return count of space in blocks
  132. */
  133. int
  134. blockalloclen(Block *bp)
  135. {
  136. int len;
  137. len = 0;
  138. while(bp){
  139. len += BALLOC(bp);
  140. bp = bp->next;
  141. }
  142. return len;
  143. }
  144. /*
  145. * copy the string of blocks into
  146. * a single block and free the string
  147. */
  148. Block *
  149. concatblock(Block *bp)
  150. {
  151. int len;
  152. Block *nb, *f;
  153. if(bp->next == 0)
  154. return bp;
  155. nb = allocb(blocklen(bp));
  156. for(f = bp; f; f = f->next){
  157. len = BLEN(f);
  158. memmove(nb->wp, f->rp, len);
  159. nb->wp += len;
  160. }
  161. concatblockcnt += BLEN(nb);
  162. freeblist(bp);
  163. QDEBUG checkb(nb, "concatblock 1");
  164. return nb;
  165. }
  166. /*
  167. * make sure the first block has at least n bytes
  168. */
  169. Block *
  170. pullupblock(Block *bp, int n)
  171. {
  172. int i;
  173. Block *nbp;
  174. /*
  175. * this should almost always be true, it's
  176. * just to avoid every caller checking.
  177. */
  178. if(BLEN(bp) >= n)
  179. return bp;
  180. /*
  181. * if not enough room in the first block,
  182. * add another to the front of the list.
  183. */
  184. if(bp->lim - bp->rp < n){
  185. nbp = allocb(n);
  186. nbp->next = bp;
  187. bp = nbp;
  188. }
  189. /*
  190. * copy bytes from the trailing blocks into the first
  191. */
  192. n -= BLEN(bp);
  193. while((nbp = bp->next) != nil){
  194. i = BLEN(nbp);
  195. if(i > n){
  196. memmove(bp->wp, nbp->rp, n);
  197. pullupblockcnt++;
  198. bp->wp += n;
  199. nbp->rp += n;
  200. QDEBUG checkb(bp, "pullupblock 1");
  201. return bp;
  202. } else {
  203. /* shouldn't happen but why crash if it does */
  204. if(i < 0){
  205. print("pullupblock -ve length, from %#p\n",
  206. getcallerpc());
  207. i = 0;
  208. }
  209. memmove(bp->wp, nbp->rp, i);
  210. pullupblockcnt++;
  211. bp->wp += i;
  212. bp->next = nbp->next;
  213. nbp->next = 0;
  214. freeb(nbp);
  215. n -= i;
  216. if(n == 0){
  217. QDEBUG checkb(bp, "pullupblock 2");
  218. return bp;
  219. }
  220. }
  221. }
  222. freeb(bp);
  223. return 0;
  224. }
  225. /*
  226. * make sure the first block has at least n bytes
  227. */
  228. Block *
  229. pullupqueue(Queue *q, int n)
  230. {
  231. Block *b;
  232. if(BLEN(q->bfirst) >= n)
  233. return q->bfirst;
  234. q->bfirst = pullupblock(q->bfirst, n);
  235. for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
  236. ;
  237. q->blast = b;
  238. return q->bfirst;
  239. }
  240. /*
  241. * trim to len bytes starting at offset
  242. */
  243. Block *
  244. trimblock(Block *bp, int offset, int len)
  245. {
  246. i32 l;
  247. Block *nb, *startb;
  248. QDEBUG checkb(bp, "trimblock 1");
  249. if(blocklen(bp) < offset + len){
  250. freeblist(bp);
  251. return nil;
  252. }
  253. while((l = BLEN(bp)) < offset){
  254. offset -= l;
  255. nb = bp->next;
  256. bp->next = nil;
  257. freeb(bp);
  258. bp = nb;
  259. }
  260. startb = bp;
  261. bp->rp += offset;
  262. while((l = BLEN(bp)) < len){
  263. len -= l;
  264. bp = bp->next;
  265. }
  266. bp->wp -= (BLEN(bp) - len);
  267. if(bp->next){
  268. freeblist(bp->next);
  269. bp->next = nil;
  270. }
  271. return startb;
  272. }
  273. /*
  274. * copy 'count' bytes into a new block
  275. */
  276. Block *
  277. copyblock(Block *bp, int count)
  278. {
  279. int l;
  280. Block *nbp;
  281. QDEBUG checkb(bp, "copyblock 0");
  282. if(bp->flag & BINTR){
  283. nbp = iallocb(count);
  284. if(nbp == nil)
  285. return nil;
  286. } else
  287. nbp = allocb(count);
  288. for(; count > 0 && bp != 0; bp = bp->next){
  289. l = BLEN(bp);
  290. if(l > count)
  291. l = count;
  292. memmove(nbp->wp, bp->rp, l);
  293. nbp->wp += l;
  294. count -= l;
  295. }
  296. if(count > 0){
  297. memset(nbp->wp, 0, count);
  298. nbp->wp += count;
  299. }
  300. copyblockcnt++;
  301. QDEBUG checkb(nbp, "copyblock 1");
  302. return nbp;
  303. }
  304. Block *
  305. adjustblock(Block *bp, int len)
  306. {
  307. int n;
  308. Block *nbp;
  309. if(len < 0){
  310. freeb(bp);
  311. return nil;
  312. }
  313. if(bp->rp + len > bp->lim){
  314. nbp = copyblock(bp, len);
  315. freeblist(bp);
  316. QDEBUG checkb(nbp, "adjustblock 1");
  317. return nbp;
  318. }
  319. n = BLEN(bp);
  320. if(len > n)
  321. memset(bp->wp, 0, len - n);
  322. bp->wp = bp->rp + len;
  323. QDEBUG checkb(bp, "adjustblock 2");
  324. return bp;
  325. }
  326. /*
  327. * throw away up to count bytes from a
  328. * list of blocks. Return count of bytes
  329. * thrown away.
  330. */
  331. int
  332. pullblock(Block **bph, int count)
  333. {
  334. Block *bp;
  335. int n, bytes;
  336. bytes = 0;
  337. if(bph == nil)
  338. return 0;
  339. while(*bph != nil && count != 0){
  340. bp = *bph;
  341. n = BLEN(bp);
  342. if(count < n)
  343. n = count;
  344. bytes += n;
  345. count -= n;
  346. bp->rp += n;
  347. QDEBUG checkb(bp, "pullblock ");
  348. if(BLEN(bp) == 0){
  349. *bph = bp->next;
  350. bp->next = nil;
  351. freeb(bp);
  352. }
  353. }
  354. return bytes;
  355. }
  356. /*
  357. * get next block from a queue, return null if nothing there
  358. */
  359. Block *
  360. qget(Queue *q)
  361. {
  362. int dowakeup;
  363. Block *b;
  364. /* sync with qwrite */
  365. ilock(&q->Lock);
  366. b = q->bfirst;
  367. if(b == nil){
  368. q->state |= Qstarve;
  369. iunlock(&q->Lock);
  370. return nil;
  371. }
  372. q->bfirst = b->next;
  373. b->next = 0;
  374. q->len -= BALLOC(b);
  375. q->dlen -= BLEN(b);
  376. QDEBUG checkb(b, "qget");
  377. /* if writer flow controlled, restart */
  378. if((q->state & Qflow) && q->len < q->limit / 2){
  379. q->state &= ~Qflow;
  380. dowakeup = 1;
  381. } else
  382. dowakeup = 0;
  383. iunlock(&q->Lock);
  384. if(dowakeup)
  385. wakeup(&q->wr);
  386. return b;
  387. }
  388. /*
  389. * throw away the next 'len' bytes in the queue
  390. */
  391. int
  392. qdiscard(Queue *q, int len)
  393. {
  394. Block *b;
  395. int dowakeup, n, sofar;
  396. ilock(&q->Lock);
  397. for(sofar = 0; sofar < len; sofar += n){
  398. b = q->bfirst;
  399. if(b == nil)
  400. break;
  401. QDEBUG checkb(b, "qdiscard");
  402. n = BLEN(b);
  403. if(n <= len - sofar){
  404. q->bfirst = b->next;
  405. b->next = 0;
  406. q->len -= BALLOC(b);
  407. q->dlen -= BLEN(b);
  408. freeb(b);
  409. } else {
  410. n = len - sofar;
  411. b->rp += n;
  412. q->dlen -= n;
  413. }
  414. }
  415. /*
  416. * if writer flow controlled, restart
  417. *
  418. * This used to be
  419. * q->len < q->limit/2
  420. * but it slows down tcp too much for certain write sizes.
  421. * I really don't understand it completely. It may be
  422. * due to the queue draining so fast that the transmission
  423. * stalls waiting for the app to produce more data. - presotto
  424. *
  425. * changed back from q->len < q->limit for reno tcp. - jmk
  426. */
  427. if((q->state & Qflow) && q->len < q->limit / 2){
  428. q->state &= ~Qflow;
  429. dowakeup = 1;
  430. } else
  431. dowakeup = 0;
  432. iunlock(&q->Lock);
  433. if(dowakeup)
  434. wakeup(&q->wr);
  435. return sofar;
  436. }
  437. /*
  438. * Interrupt level copy out of a queue, return # bytes copied.
  439. */
  440. int
  441. qconsume(Queue *q, void *vp, int len)
  442. {
  443. Block *b;
  444. int n, dowakeup;
  445. u8 *p = vp;
  446. Block *tofree = nil;
  447. /* sync with qwrite */
  448. ilock(&q->Lock);
  449. for(;;){
  450. b = q->bfirst;
  451. if(b == 0){
  452. q->state |= Qstarve;
  453. iunlock(&q->Lock);
  454. return -1;
  455. }
  456. QDEBUG checkb(b, "qconsume 1");
  457. n = BLEN(b);
  458. if(n > 0)
  459. break;
  460. q->bfirst = b->next;
  461. q->len -= BALLOC(b);
  462. /* remember to free this */
  463. b->next = tofree;
  464. tofree = b;
  465. };
  466. if(n < len)
  467. len = n;
  468. memmove(p, b->rp, len);
  469. consumecnt += n;
  470. b->rp += len;
  471. q->dlen -= len;
  472. /* discard the block if we're done with it */
  473. if((q->state & Qmsg) || len == n){
  474. q->bfirst = b->next;
  475. b->next = 0;
  476. q->len -= BALLOC(b);
  477. q->dlen -= BLEN(b);
  478. /* remember to free this */
  479. b->next = tofree;
  480. tofree = b;
  481. }
  482. /* if writer flow controlled, restart */
  483. if((q->state & Qflow) && q->len < q->limit / 2){
  484. q->state &= ~Qflow;
  485. dowakeup = 1;
  486. } else
  487. dowakeup = 0;
  488. iunlock(&q->Lock);
  489. if(dowakeup)
  490. wakeup(&q->wr);
  491. if(tofree != nil)
  492. freeblist(tofree);
  493. return len;
  494. }
  495. int
  496. qpass(Queue *q, Block *b)
  497. {
  498. int dlen, len, dowakeup;
  499. /* sync with qread */
  500. dowakeup = 0;
  501. ilock(&q->Lock);
  502. if(q->len >= q->limit){
  503. freeblist(b);
  504. iunlock(&q->Lock);
  505. return -1;
  506. }
  507. if(q->state & Qclosed){
  508. len = BALLOC(b);
  509. freeblist(b);
  510. iunlock(&q->Lock);
  511. return len;
  512. }
  513. /* add buffer to queue */
  514. if(q->bfirst)
  515. q->blast->next = b;
  516. else
  517. q->bfirst = b;
  518. len = BALLOC(b);
  519. dlen = BLEN(b);
  520. QDEBUG checkb(b, "qpass");
  521. while(b->next){
  522. b = b->next;
  523. QDEBUG checkb(b, "qpass");
  524. len += BALLOC(b);
  525. dlen += BLEN(b);
  526. }
  527. q->blast = b;
  528. q->len += len;
  529. q->dlen += dlen;
  530. if(q->len >= q->limit / 2)
  531. q->state |= Qflow;
  532. if(q->state & Qstarve){
  533. q->state &= ~Qstarve;
  534. dowakeup = 1;
  535. }
  536. iunlock(&q->Lock);
  537. if(dowakeup)
  538. wakeup(&q->rr);
  539. return len;
  540. }
  541. int
  542. qpassnolim(Queue *q, Block *b)
  543. {
  544. int dlen, len, dowakeup;
  545. /* sync with qread */
  546. dowakeup = 0;
  547. ilock(&q->Lock);
  548. if(q->state & Qclosed){
  549. len = BALLOC(b);
  550. freeblist(b);
  551. iunlock(&q->Lock);
  552. return len;
  553. }
  554. /* add buffer to queue */
  555. if(q->bfirst)
  556. q->blast->next = b;
  557. else
  558. q->bfirst = b;
  559. len = BALLOC(b);
  560. dlen = BLEN(b);
  561. QDEBUG checkb(b, "qpass");
  562. while(b->next){
  563. b = b->next;
  564. QDEBUG checkb(b, "qpass");
  565. len += BALLOC(b);
  566. dlen += BLEN(b);
  567. }
  568. q->blast = b;
  569. q->len += len;
  570. q->dlen += dlen;
  571. if(q->len >= q->limit / 2)
  572. q->state |= Qflow;
  573. if(q->state & Qstarve){
  574. q->state &= ~Qstarve;
  575. dowakeup = 1;
  576. }
  577. iunlock(&q->Lock);
  578. if(dowakeup)
  579. wakeup(&q->rr);
  580. return len;
  581. }
  582. /*
  583. * if the allocated space is way out of line with the used
  584. * space, reallocate to a smaller block
  585. */
  586. Block *
  587. packblock(Block *bp)
  588. {
  589. Block **l, *nbp;
  590. int n;
  591. for(l = &bp; *l; l = &(*l)->next){
  592. nbp = *l;
  593. n = BLEN(nbp);
  594. if((n << 2) < BALLOC(nbp)){
  595. *l = allocb(n);
  596. memmove((*l)->wp, nbp->rp, n);
  597. (*l)->wp += n;
  598. (*l)->next = nbp->next;
  599. freeb(nbp);
  600. }
  601. }
  602. return bp;
  603. }
  604. int
  605. qproduce(Queue *q, void *vp, int len)
  606. {
  607. Block *b;
  608. int dowakeup;
  609. u8 *p = vp;
  610. /* sync with qread */
  611. dowakeup = 0;
  612. ilock(&q->Lock);
  613. /* no waiting receivers, room in buffer? */
  614. if(q->len >= q->limit){
  615. q->state |= Qflow;
  616. iunlock(&q->Lock);
  617. return -1;
  618. }
  619. /* save in buffer */
  620. b = iallocb(len);
  621. if(b == 0){
  622. iunlock(&q->Lock);
  623. return 0;
  624. }
  625. memmove(b->wp, p, len);
  626. producecnt += len;
  627. b->wp += len;
  628. if(q->bfirst)
  629. q->blast->next = b;
  630. else
  631. q->bfirst = b;
  632. q->blast = b;
  633. /* b->next = 0; done by iallocb() */
  634. q->len += BALLOC(b);
  635. q->dlen += BLEN(b);
  636. QDEBUG checkb(b, "qproduce");
  637. if(q->state & Qstarve){
  638. q->state &= ~Qstarve;
  639. dowakeup = 1;
  640. }
  641. if(q->len >= q->limit)
  642. q->state |= Qflow;
  643. iunlock(&q->Lock);
  644. if(dowakeup)
  645. wakeup(&q->rr);
  646. return len;
  647. }
  648. /*
  649. * copy from offset in the queue
  650. */
  651. Block *
  652. qcopy(Queue *q, int len, u32 offset)
  653. {
  654. int sofar;
  655. int n;
  656. Block *b, *nb;
  657. u8 *p;
  658. nb = allocb(len);
  659. ilock(&q->Lock);
  660. /* go to offset */
  661. b = q->bfirst;
  662. for(sofar = 0;; sofar += n){
  663. if(b == nil){
  664. iunlock(&q->Lock);
  665. return nb;
  666. }
  667. n = BLEN(b);
  668. if(sofar + n > offset){
  669. p = b->rp + offset - sofar;
  670. n -= offset - sofar;
  671. break;
  672. }
  673. QDEBUG checkb(b, "qcopy");
  674. b = b->next;
  675. }
  676. /* copy bytes from there */
  677. for(sofar = 0; sofar < len;){
  678. if(n > len - sofar)
  679. n = len - sofar;
  680. memmove(nb->wp, p, n);
  681. qcopycnt += n;
  682. sofar += n;
  683. nb->wp += n;
  684. b = b->next;
  685. if(b == nil)
  686. break;
  687. n = BLEN(b);
  688. p = b->rp;
  689. }
  690. iunlock(&q->Lock);
  691. return nb;
  692. }
  693. /*
  694. * called by non-interrupt code
  695. */
  696. Queue *
  697. qopen(int limit, int msg, void (*kick)(void *), void *arg)
  698. {
  699. Queue *q;
  700. q = malloc(sizeof(Queue));
  701. if(q == 0)
  702. return 0;
  703. q->limit = q->inilim = limit;
  704. q->kick = kick;
  705. q->arg = arg;
  706. q->state = msg;
  707. q->state |= Qstarve;
  708. q->eof = 0;
  709. q->noblock = 0;
  710. return q;
  711. }
  712. /* open a queue to be bypassed */
  713. Queue *
  714. qbypass(void (*bypass)(void *, Block *), void *arg)
  715. {
  716. Queue *q;
  717. q = malloc(sizeof(Queue));
  718. if(q == 0)
  719. return 0;
  720. q->limit = 0;
  721. q->arg = arg;
  722. q->bypass = bypass;
  723. q->state = 0;
  724. return q;
  725. }
  726. static int
  727. notempty(void *a)
  728. {
  729. Queue *q = a;
  730. return (q->state & Qclosed) || q->bfirst != 0;
  731. }
  732. /*
  733. * wait for the queue to be non-empty or closed.
  734. * called with q ilocked.
  735. */
  736. static int
  737. qwait(Queue *q)
  738. {
  739. /* wait for data */
  740. for(;;){
  741. if(q->bfirst != nil)
  742. break;
  743. if(q->state & Qclosed){
  744. if(++q->eof > 3)
  745. return -1;
  746. if(*q->err && strcmp(q->err, Ehungup) != 0)
  747. return -1;
  748. return 0;
  749. }
  750. q->state |= Qstarve; /* flag requesting producer to wake me */
  751. iunlock(&q->Lock);
  752. sleep(&q->rr, notempty, q);
  753. ilock(&q->Lock);
  754. }
  755. return 1;
  756. }
  757. /*
  758. * add a block list to a queue
  759. */
  760. void
  761. qaddlist(Queue *q, Block *b)
  762. {
  763. /* queue the block */
  764. if(q->bfirst)
  765. q->blast->next = b;
  766. else
  767. q->bfirst = b;
  768. q->len += blockalloclen(b);
  769. q->dlen += blocklen(b);
  770. while(b->next)
  771. b = b->next;
  772. q->blast = b;
  773. }
  774. /*
  775. * called with q ilocked
  776. */
  777. Block *
  778. qremove(Queue *q)
  779. {
  780. Block *b;
  781. b = q->bfirst;
  782. if(b == nil)
  783. return nil;
  784. q->bfirst = b->next;
  785. b->next = nil;
  786. q->dlen -= BLEN(b);
  787. q->len -= BALLOC(b);
  788. QDEBUG checkb(b, "qremove");
  789. return b;
  790. }
  791. /*
  792. * copy the contents of a string of blocks into
  793. * memory. emptied blocks are freed. return
  794. * pointer to first unconsumed block.
  795. */
  796. Block *
  797. bl2mem(u8 *p, Block *b, int n)
  798. {
  799. int i;
  800. Block *next;
  801. for(; b != nil; b = next){
  802. i = BLEN(b);
  803. if(i > n){
  804. memmove(p, b->rp, n);
  805. b->rp += n;
  806. return b;
  807. }
  808. memmove(p, b->rp, i);
  809. n -= i;
  810. p += i;
  811. b->rp += i;
  812. next = b->next;
  813. freeb(b);
  814. }
  815. return nil;
  816. }
  817. /*
  818. * copy the contents of memory into a string of blocks.
  819. * return nil on error.
  820. */
  821. Block *
  822. mem2bl(u8 *p, int len)
  823. {
  824. Proc *up = externup();
  825. int n;
  826. Block *b, *first, **l;
  827. first = nil;
  828. l = &first;
  829. if(waserror()){
  830. freeblist(first);
  831. nexterror();
  832. }
  833. do {
  834. n = len;
  835. if(n > Maxatomic)
  836. n = Maxatomic;
  837. *l = b = allocb(n);
  838. setmalloctag(b, (up->text[0] << 24) | (up->text[1] << 16) | (up->text[2] << 8) | up->text[3]);
  839. memmove(b->wp, p, n);
  840. b->wp += n;
  841. p += n;
  842. len -= n;
  843. l = &b->next;
  844. } while(len > 0);
  845. poperror();
  846. return first;
  847. }
  848. /*
  849. * put a block back to the front of the queue
  850. * called with q ilocked
  851. */
  852. void
  853. qputback(Queue *q, Block *b)
  854. {
  855. b->next = q->bfirst;
  856. if(q->bfirst == nil)
  857. q->blast = b;
  858. q->bfirst = b;
  859. q->len += BALLOC(b);
  860. q->dlen += BLEN(b);
  861. }
  862. /*
  863. * flow control, get producer going again
  864. * called with q ilocked
  865. */
  866. static void
  867. qwakeup_iunlock(Queue *q)
  868. {
  869. int dowakeup;
  870. /* if writer flow controlled, restart */
  871. if((q->state & Qflow) && q->len < q->limit / 2){
  872. q->state &= ~Qflow;
  873. dowakeup = 1;
  874. } else
  875. dowakeup = 0;
  876. iunlock(&q->Lock);
  877. /* wakeup flow controlled writers */
  878. if(dowakeup){
  879. if(q->kick)
  880. q->kick(q->arg);
  881. wakeup(&q->wr);
  882. }
  883. }
  884. /*
  885. * get next block from a queue (up to a limit)
  886. */
  887. Block *
  888. qbread(Queue *q, int len)
  889. {
  890. Proc *up = externup();
  891. Block *b, *nb;
  892. int n;
  893. qlock(&q->rlock);
  894. if(waserror()){
  895. qunlock(&q->rlock);
  896. nexterror();
  897. }
  898. ilock(&q->Lock);
  899. switch(qwait(q)){
  900. case 0:
  901. /* queue closed */
  902. iunlock(&q->Lock);
  903. qunlock(&q->rlock);
  904. poperror();
  905. return nil;
  906. case -1:
  907. /* multiple reads on a closed queue */
  908. iunlock(&q->Lock);
  909. error(q->err);
  910. }
  911. /* if we get here, there's at least one block in the queue */
  912. b = qremove(q);
  913. n = BLEN(b);
  914. /* split block if it's too big and this is not a message queue */
  915. nb = b;
  916. if(n > len){
  917. if((q->state & Qmsg) == 0){
  918. n -= len;
  919. b = allocb(n);
  920. memmove(b->wp, nb->rp + len, n);
  921. b->wp += n;
  922. qputback(q, b);
  923. }
  924. nb->wp = nb->rp + len;
  925. }
  926. /* restart producer */
  927. qwakeup_iunlock(q);
  928. poperror();
  929. qunlock(&q->rlock);
  930. return nb;
  931. }
  932. /*
  933. * read a queue. if no data is queued, post a Block
  934. * and wait on its Rendez.
  935. */
  936. i32
  937. qread(Queue *q, void *vp, int len)
  938. {
  939. Proc *up = externup();
  940. Block *b, *first, **l;
  941. int blen, n;
  942. qlock(&q->rlock);
  943. if(waserror()){
  944. qunlock(&q->rlock);
  945. nexterror();
  946. }
  947. ilock(&q->Lock);
  948. again:
  949. switch(qwait(q)){
  950. case 0:
  951. /* queue closed */
  952. iunlock(&q->Lock);
  953. qunlock(&q->rlock);
  954. poperror();
  955. return 0;
  956. case -1:
  957. /* multiple reads on a closed queue */
  958. iunlock(&q->Lock);
  959. error(q->err);
  960. }
  961. /* if we get here, there's at least one block in the queue */
  962. if(q->state & Qcoalesce){
  963. /* when coalescing, 0 length blocks just go away */
  964. b = q->bfirst;
  965. if(BLEN(b) <= 0){
  966. freeb(qremove(q));
  967. goto again;
  968. }
  969. /* grab the first block plus as many
  970. * following blocks as will completely
  971. * fit in the read.
  972. */
  973. n = 0;
  974. l = &first;
  975. blen = BLEN(b);
  976. for(;;){
  977. *l = qremove(q);
  978. l = &b->next;
  979. n += blen;
  980. b = q->bfirst;
  981. if(b == nil)
  982. break;
  983. blen = BLEN(b);
  984. if(n + blen > len)
  985. break;
  986. }
  987. } else {
  988. first = qremove(q);
  989. n = BLEN(first);
  990. }
  991. /* copy to user space outside of the ilock */
  992. iunlock(&q->Lock);
  993. b = bl2mem(vp, first, len);
  994. ilock(&q->Lock);
  995. /* take care of any left over partial block */
  996. if(b != nil){
  997. n -= BLEN(b);
  998. if(q->state & Qmsg)
  999. freeb(b);
  1000. else
  1001. qputback(q, b);
  1002. }
  1003. /* restart producer */
  1004. qwakeup_iunlock(q);
  1005. poperror();
  1006. qunlock(&q->rlock);
  1007. return n;
  1008. }
  1009. static int
  1010. qnotfull(void *a)
  1011. {
  1012. Queue *q = a;
  1013. return q->len < q->limit || (q->state & Qclosed);
  1014. }
  1015. u32 noblockcnt;
  1016. /*
  1017. * add a block to a queue obeying flow control
  1018. */
  1019. i32
  1020. qbwrite(Queue *q, Block *b)
  1021. {
  1022. Proc *up = externup();
  1023. int n, dowakeup;
  1024. n = BLEN(b);
  1025. if(q->bypass){
  1026. (*q->bypass)(q->arg, b);
  1027. return n;
  1028. }
  1029. dowakeup = 0;
  1030. qlock(&q->wlock);
  1031. if(waserror()){
  1032. if(b != nil)
  1033. freeb(b);
  1034. qunlock(&q->wlock);
  1035. nexterror();
  1036. }
  1037. ilock(&q->Lock);
  1038. /* give up if the queue is closed */
  1039. if(q->state & Qclosed){
  1040. iunlock(&q->Lock);
  1041. error(q->err);
  1042. }
  1043. /* if nonblocking, don't queue over the limit */
  1044. if(q->len >= q->limit){
  1045. if(q->noblock){
  1046. iunlock(&q->Lock);
  1047. freeb(b);
  1048. noblockcnt += n;
  1049. qunlock(&q->wlock);
  1050. poperror();
  1051. return n;
  1052. }
  1053. }
  1054. /* queue the block */
  1055. if(q->bfirst)
  1056. q->blast->next = b;
  1057. else
  1058. q->bfirst = b;
  1059. q->blast = b;
  1060. b->next = 0;
  1061. q->len += BALLOC(b);
  1062. q->dlen += n;
  1063. QDEBUG checkb(b, "qbwrite");
  1064. b = nil;
  1065. /* make sure other end gets awakened */
  1066. if(q->state & Qstarve){
  1067. q->state &= ~Qstarve;
  1068. dowakeup = 1;
  1069. }
  1070. iunlock(&q->Lock);
  1071. /* get output going again */
  1072. if(q->kick && (dowakeup || (q->state & Qkick)))
  1073. q->kick(q->arg);
  1074. /* wakeup anyone consuming at the other end */
  1075. if(dowakeup)
  1076. wakeup(&q->rr);
  1077. /*
  1078. * flow control, wait for queue to get below the limit
  1079. * before allowing the process to continue and queue
  1080. * more. We do this here so that postnote can only
  1081. * interrupt us after the data has been queued. This
  1082. * means that things like 9p flushes and ssl messages
  1083. * will not be disrupted by software interrupts.
  1084. *
  1085. * Note - this is moderately dangerous since a process
  1086. * that keeps getting interrupted and rewriting will
  1087. * queue infinite crud.
  1088. */
  1089. for(;;){
  1090. if(q->noblock || qnotfull(q))
  1091. break;
  1092. ilock(&q->Lock);
  1093. q->state |= Qflow;
  1094. iunlock(&q->Lock);
  1095. sleep(&q->wr, qnotfull, q);
  1096. }
  1097. USED(b);
  1098. qunlock(&q->wlock);
  1099. poperror();
  1100. return n;
  1101. }
  1102. i32
  1103. qibwrite(Queue *q, Block *b)
  1104. {
  1105. int n, dowakeup;
  1106. dowakeup = 0;
  1107. n = BLEN(b);
  1108. lock(&q->Lock);
  1109. if(q->bfirst)
  1110. q->blast->next = b;
  1111. else
  1112. q->bfirst = b;
  1113. q->blast = b;
  1114. q->len += BALLOC(b);
  1115. q->dlen += n;
  1116. if(q->state & Qstarve){
  1117. q->state &= ~Qstarve;
  1118. dowakeup = 1;
  1119. }
  1120. unlock(&q->Lock);
  1121. if(dowakeup){
  1122. if(q->kick)
  1123. q->kick(q->arg);
  1124. wakeup(&q->rr);
  1125. }
  1126. return n;
  1127. }
  1128. /*
  1129. * write to a queue. only Maxatomic bytes at a time is atomic.
  1130. */
  1131. int
  1132. qwrite(Queue *q, void *vp, int len)
  1133. {
  1134. Proc *up = externup();
  1135. int n, sofar;
  1136. Block *b;
  1137. u8 *p = vp;
  1138. QDEBUG if(!islo())
  1139. print("qwrite hi %#p\n", getcallerpc());
  1140. sofar = 0;
  1141. do {
  1142. n = len - sofar;
  1143. if(n > Maxatomic)
  1144. n = Maxatomic;
  1145. b = allocb(n);
  1146. setmalloctag(b, (up->text[0] << 24) | (up->text[1] << 16) | (up->text[2] << 8) | up->text[3]);
  1147. if(waserror()){
  1148. freeb(b);
  1149. nexterror();
  1150. }
  1151. memmove(b->wp, p + sofar, n);
  1152. poperror();
  1153. b->wp += n;
  1154. qbwrite(q, b);
  1155. sofar += n;
  1156. } while(sofar < len && (q->state & Qmsg) == 0);
  1157. return len;
  1158. }
  1159. /*
  1160. * used by print() to write to a queue. Since we may be splhi or not in
  1161. * a process, don't qlock.
  1162. *
  1163. * this routine merges adjacent blocks if block n+1 will fit into
  1164. * the free space of block n.
  1165. */
  1166. int
  1167. qiwrite(Queue *q, void *vp, int len)
  1168. {
  1169. int n, sofar, dowakeup;
  1170. Block *b;
  1171. u8 *p = vp;
  1172. dowakeup = 0;
  1173. sofar = 0;
  1174. do {
  1175. n = len - sofar;
  1176. if(n > Maxatomic)
  1177. n = Maxatomic;
  1178. b = iallocb(n);
  1179. if(b == nil)
  1180. break;
  1181. memmove(b->wp, p + sofar, n);
  1182. b->wp += n;
  1183. ilock(&q->Lock);
  1184. /* we use an artificially high limit for kernel prints since anything
  1185. * over the limit gets dropped
  1186. */
  1187. if(q->dlen >= 16 * 1024){
  1188. iunlock(&q->Lock);
  1189. freeb(b);
  1190. break;
  1191. }
  1192. QDEBUG checkb(b, "qiwrite");
  1193. if(q->bfirst)
  1194. q->blast->next = b;
  1195. else
  1196. q->bfirst = b;
  1197. q->blast = b;
  1198. q->len += BALLOC(b);
  1199. q->dlen += n;
  1200. if(q->state & Qstarve){
  1201. q->state &= ~Qstarve;
  1202. dowakeup = 1;
  1203. }
  1204. iunlock(&q->Lock);
  1205. if(dowakeup){
  1206. if(q->kick)
  1207. q->kick(q->arg);
  1208. wakeup(&q->rr);
  1209. }
  1210. sofar += n;
  1211. } while(sofar < len && (q->state & Qmsg) == 0);
  1212. return sofar;
  1213. }
  1214. /*
  1215. * be extremely careful when calling this,
  1216. * as there is no reference accounting
  1217. */
  1218. void
  1219. qfree(Queue *q)
  1220. {
  1221. qclose(q);
  1222. free(q);
  1223. }
  1224. /*
  1225. * Mark a queue as closed. No further IO is permitted.
  1226. * All blocks are released.
  1227. */
  1228. void
  1229. qclose(Queue *q)
  1230. {
  1231. Block *bfirst;
  1232. if(q == nil)
  1233. return;
  1234. /* mark it */
  1235. ilock(&q->Lock);
  1236. q->state |= Qclosed;
  1237. q->state &= ~(Qflow | Qstarve);
  1238. strcpy(q->err, Ehungup);
  1239. bfirst = q->bfirst;
  1240. q->bfirst = 0;
  1241. q->len = 0;
  1242. q->dlen = 0;
  1243. q->noblock = 0;
  1244. iunlock(&q->Lock);
  1245. /* free queued blocks */
  1246. freeblist(bfirst);
  1247. /* wake up readers/writers */
  1248. wakeup(&q->rr);
  1249. wakeup(&q->wr);
  1250. }
  1251. /*
  1252. * Mark a queue as closed. Wakeup any readers. Don't remove queued
  1253. * blocks.
  1254. */
  1255. void
  1256. qhangup(Queue *q, char *msg)
  1257. {
  1258. /* mark it */
  1259. ilock(&q->Lock);
  1260. q->state |= Qclosed;
  1261. if(msg == 0 || *msg == 0)
  1262. strcpy(q->err, Ehungup);
  1263. else
  1264. strncpy(q->err, msg, ERRMAX - 1);
  1265. iunlock(&q->Lock);
  1266. /* wake up readers/writers */
  1267. wakeup(&q->rr);
  1268. wakeup(&q->wr);
  1269. }
  1270. /*
  1271. * return non-zero if the q is hungup
  1272. */
  1273. int
  1274. qisclosed(Queue *q)
  1275. {
  1276. return q->state & Qclosed;
  1277. }
  1278. /*
  1279. * mark a queue as no longer hung up
  1280. */
  1281. void
  1282. qreopen(Queue *q)
  1283. {
  1284. ilock(&q->Lock);
  1285. q->state &= ~Qclosed;
  1286. q->state |= Qstarve;
  1287. q->eof = 0;
  1288. q->limit = q->inilim;
  1289. iunlock(&q->Lock);
  1290. }
  1291. /*
  1292. * return bytes queued
  1293. */
  1294. int
  1295. qlen(Queue *q)
  1296. {
  1297. return q->dlen;
  1298. }
  1299. /*
  1300. * return space remaining before flow control
  1301. */
  1302. int
  1303. qwindow(Queue *q)
  1304. {
  1305. int l;
  1306. l = q->limit - q->len;
  1307. if(l < 0)
  1308. l = 0;
  1309. return l;
  1310. }
  1311. /*
  1312. * return true if we can read without blocking
  1313. */
  1314. int
  1315. qcanread(Queue *q)
  1316. {
  1317. return q->bfirst != 0;
  1318. }
  1319. /*
  1320. * change queue limit
  1321. */
  1322. void
  1323. qsetlimit(Queue *q, int limit)
  1324. {
  1325. q->limit = limit;
  1326. }
  1327. /*
  1328. * set blocking/nonblocking
  1329. */
  1330. void
  1331. qnoblock(Queue *q, int onoff)
  1332. {
  1333. q->noblock = onoff;
  1334. }
  1335. /*
  1336. * flush the output queue
  1337. */
  1338. void
  1339. qflush(Queue *q)
  1340. {
  1341. Block *bfirst;
  1342. /* mark it */
  1343. ilock(&q->Lock);
  1344. bfirst = q->bfirst;
  1345. q->bfirst = 0;
  1346. q->len = 0;
  1347. q->dlen = 0;
  1348. iunlock(&q->Lock);
  1349. /* free queued blocks */
  1350. freeblist(bfirst);
  1351. /* wake up readers/writers */
  1352. wakeup(&q->wr);
  1353. }
  1354. int
  1355. qfull(Queue *q)
  1356. {
  1357. return q->state & Qflow;
  1358. }
  1359. int
  1360. qstate(Queue *q)
  1361. {
  1362. return q->state;
  1363. }