qio.c 24 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591
  1. /*
  2. * This file is part of the UCB release of Plan 9. It is subject to the license
  3. * terms in the LICENSE file found in the top-level directory of this
  4. * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
  5. * part of the UCB release of Plan 9, including this file, may be copied,
  6. * modified, propagated, or distributed except according to the terms contained
  7. * in the LICENSE file.
  8. */
  9. #include "u.h"
  10. #include "../port/lib.h"
  11. #include "mem.h"
  12. #include "dat.h"
  13. #include "fns.h"
  14. #include "../port/error.h"
  15. static uint32_t padblockcnt;
  16. static uint32_t concatblockcnt;
  17. static uint32_t pullupblockcnt;
  18. static uint32_t copyblockcnt;
  19. static uint32_t consumecnt;
  20. static uint32_t producecnt;
  21. static uint32_t qcopycnt;
  22. static int debugging;
  23. #define QDEBUG if(0)
  24. /*
  25. * IO queues
  26. */
  27. typedef struct Queue Queue;
  28. struct Queue
  29. {
  30. Lock Lock;
  31. Block* bfirst; /* buffer */
  32. Block* blast;
  33. int len; /* bytes allocated to queue */
  34. int dlen; /* data bytes in queue */
  35. int limit; /* max bytes in queue */
  36. int inilim; /* initial limit */
  37. int state;
  38. int noblock; /* true if writes return immediately when q full */
  39. int eof; /* number of eofs read by user */
  40. void (*kick)(void*); /* restart output */
  41. void (*bypass)(void*, Block*); /* bypass queue altogether */
  42. void* arg; /* argument to kick */
  43. QLock rlock; /* mutex for reading processes */
  44. Rendez rr; /* process waiting to read */
  45. QLock wlock; /* mutex for writing processes */
  46. Rendez wr; /* process waiting to write */
  47. char err[ERRMAX];
  48. };
  49. enum
  50. {
  51. Maxatomic = 64*1024,
  52. };
  53. uint qiomaxatomic = Maxatomic;
  54. void
  55. ixsummary(void)
  56. {
  57. debugging ^= 1;
  58. iallocsummary();
  59. print("pad %lu, concat %lu, pullup %lu, copy %lu\n",
  60. padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
  61. print("consume %lu, produce %lu, qcopy %lu\n",
  62. consumecnt, producecnt, qcopycnt);
  63. }
  64. /*
  65. * free a list of blocks
  66. */
  67. void
  68. freeblist(Block *b)
  69. {
  70. Block *next;
  71. for(; b != 0; b = next){
  72. next = b->next;
  73. b->next = 0;
  74. freeb(b);
  75. }
  76. }
  77. /*
  78. * pad a block to the front (or the back if size is negative)
  79. */
  80. Block*
  81. padblock(Block *bp, int size)
  82. {
  83. int n;
  84. Block *nbp;
  85. QDEBUG checkb(bp, "padblock 1");
  86. if(size >= 0){
  87. if(bp->rp - bp->base >= size){
  88. bp->rp -= size;
  89. return bp;
  90. }
  91. if(bp->next)
  92. panic("padblock %#p", getcallerpc());
  93. n = BLEN(bp);
  94. padblockcnt++;
  95. nbp = allocb(size+n);
  96. nbp->rp += size;
  97. nbp->wp = nbp->rp;
  98. memmove(nbp->wp, bp->rp, n);
  99. nbp->wp += n;
  100. freeb(bp);
  101. nbp->rp -= size;
  102. } else {
  103. size = -size;
  104. if(bp->next)
  105. panic("padblock %#p", getcallerpc());
  106. if(bp->lim - bp->wp >= size)
  107. return bp;
  108. n = BLEN(bp);
  109. padblockcnt++;
  110. nbp = allocb(size+n);
  111. memmove(nbp->wp, bp->rp, n);
  112. nbp->wp += n;
  113. freeb(bp);
  114. }
  115. QDEBUG checkb(nbp, "padblock 1");
  116. return nbp;
  117. }
  118. /*
  119. * return count of bytes in a string of blocks
  120. */
  121. int
  122. blocklen(Block *bp)
  123. {
  124. int len;
  125. len = 0;
  126. while(bp) {
  127. len += BLEN(bp);
  128. bp = bp->next;
  129. }
  130. return len;
  131. }
  132. /*
  133. * return count of space in blocks
  134. */
  135. int
  136. blockalloclen(Block *bp)
  137. {
  138. int len;
  139. len = 0;
  140. while(bp) {
  141. len += BALLOC(bp);
  142. bp = bp->next;
  143. }
  144. return len;
  145. }
  146. /*
  147. * copy the string of blocks into
  148. * a single block and free the string
  149. */
  150. Block*
  151. concatblock(Block *bp)
  152. {
  153. int len;
  154. Block *nb, *f;
  155. if(bp->next == 0)
  156. return bp;
  157. nb = allocb(blocklen(bp));
  158. for(f = bp; f; f = f->next) {
  159. len = BLEN(f);
  160. memmove(nb->wp, f->rp, len);
  161. nb->wp += len;
  162. }
  163. concatblockcnt += BLEN(nb);
  164. freeblist(bp);
  165. QDEBUG checkb(nb, "concatblock 1");
  166. return nb;
  167. }
  168. /*
  169. * make sure the first block has at least n bytes
  170. */
  171. Block*
  172. pullupblock(Block *bp, int n)
  173. {
  174. int i;
  175. Block *nbp;
  176. /*
  177. * this should almost always be true, it's
  178. * just to avoid every caller checking.
  179. */
  180. if(BLEN(bp) >= n)
  181. return bp;
  182. /*
  183. * if not enough room in the first block,
  184. * add another to the front of the list.
  185. */
  186. if(bp->lim - bp->rp < n){
  187. nbp = allocb(n);
  188. nbp->next = bp;
  189. bp = nbp;
  190. }
  191. /*
  192. * copy bytes from the trailing blocks into the first
  193. */
  194. n -= BLEN(bp);
  195. while((nbp = bp->next) != nil){
  196. i = BLEN(nbp);
  197. if(i > n) {
  198. memmove(bp->wp, nbp->rp, n);
  199. pullupblockcnt++;
  200. bp->wp += n;
  201. nbp->rp += n;
  202. QDEBUG checkb(bp, "pullupblock 1");
  203. return bp;
  204. } else {
  205. /* shouldn't happen but why crash if it does */
  206. if(i < 0){
  207. print("pullupblock -ve length, from %#p\n",
  208. getcallerpc());
  209. i = 0;
  210. }
  211. memmove(bp->wp, nbp->rp, i);
  212. pullupblockcnt++;
  213. bp->wp += i;
  214. bp->next = nbp->next;
  215. nbp->next = 0;
  216. freeb(nbp);
  217. n -= i;
  218. if(n == 0){
  219. QDEBUG checkb(bp, "pullupblock 2");
  220. return bp;
  221. }
  222. }
  223. }
  224. freeb(bp);
  225. return 0;
  226. }
  227. /*
  228. * make sure the first block has at least n bytes
  229. */
  230. Block*
  231. pullupqueue(Queue *q, int n)
  232. {
  233. Block *b;
  234. if(BLEN(q->bfirst) >= n)
  235. return q->bfirst;
  236. q->bfirst = pullupblock(q->bfirst, n);
  237. for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
  238. ;
  239. q->blast = b;
  240. return q->bfirst;
  241. }
  242. /*
  243. * trim to len bytes starting at offset
  244. */
  245. Block *
  246. trimblock(Block *bp, int offset, int len)
  247. {
  248. int32_t l;
  249. Block *nb, *startb;
  250. QDEBUG checkb(bp, "trimblock 1");
  251. if(blocklen(bp) < offset+len) {
  252. freeblist(bp);
  253. return nil;
  254. }
  255. while((l = BLEN(bp)) < offset) {
  256. offset -= l;
  257. nb = bp->next;
  258. bp->next = nil;
  259. freeb(bp);
  260. bp = nb;
  261. }
  262. startb = bp;
  263. bp->rp += offset;
  264. while((l = BLEN(bp)) < len) {
  265. len -= l;
  266. bp = bp->next;
  267. }
  268. bp->wp -= (BLEN(bp) - len);
  269. if(bp->next) {
  270. freeblist(bp->next);
  271. bp->next = nil;
  272. }
  273. return startb;
  274. }
  275. /*
  276. * copy 'count' bytes into a new block
  277. */
  278. Block*
  279. copyblock(Block *bp, int count)
  280. {
  281. int l;
  282. Block *nbp;
  283. QDEBUG checkb(bp, "copyblock 0");
  284. if(bp->flag & BINTR){
  285. nbp = iallocb(count);
  286. if(nbp == nil)
  287. return nil;
  288. }else
  289. nbp = allocb(count);
  290. for(; count > 0 && bp != 0; bp = bp->next){
  291. l = BLEN(bp);
  292. if(l > count)
  293. l = count;
  294. memmove(nbp->wp, bp->rp, l);
  295. nbp->wp += l;
  296. count -= l;
  297. }
  298. if(count > 0){
  299. memset(nbp->wp, 0, count);
  300. nbp->wp += count;
  301. }
  302. copyblockcnt++;
  303. QDEBUG checkb(nbp, "copyblock 1");
  304. return nbp;
  305. }
  306. Block*
  307. adjustblock(Block* bp, int len)
  308. {
  309. int n;
  310. Block *nbp;
  311. if(len < 0){
  312. freeb(bp);
  313. return nil;
  314. }
  315. if(bp->rp+len > bp->lim){
  316. nbp = copyblock(bp, len);
  317. freeblist(bp);
  318. QDEBUG checkb(nbp, "adjustblock 1");
  319. return nbp;
  320. }
  321. n = BLEN(bp);
  322. if(len > n)
  323. memset(bp->wp, 0, len-n);
  324. bp->wp = bp->rp+len;
  325. QDEBUG checkb(bp, "adjustblock 2");
  326. return bp;
  327. }
  328. /*
  329. * throw away up to count bytes from a
  330. * list of blocks. Return count of bytes
  331. * thrown away.
  332. */
  333. int
  334. pullblock(Block **bph, int count)
  335. {
  336. Block *bp;
  337. int n, bytes;
  338. bytes = 0;
  339. if(bph == nil)
  340. return 0;
  341. while(*bph != nil && count != 0) {
  342. bp = *bph;
  343. n = BLEN(bp);
  344. if(count < n)
  345. n = count;
  346. bytes += n;
  347. count -= n;
  348. bp->rp += n;
  349. QDEBUG checkb(bp, "pullblock ");
  350. if(BLEN(bp) == 0) {
  351. *bph = bp->next;
  352. bp->next = nil;
  353. freeb(bp);
  354. }
  355. }
  356. return bytes;
  357. }
  358. /*
  359. * get next block from a queue, return null if nothing there
  360. */
  361. Block*
  362. qget(Queue *q)
  363. {
  364. int dowakeup;
  365. Block *b;
  366. /* sync with qwrite */
  367. ilock(&q->Lock);
  368. b = q->bfirst;
  369. if(b == nil){
  370. q->state |= Qstarve;
  371. iunlock(&q->Lock);
  372. return nil;
  373. }
  374. q->bfirst = b->next;
  375. b->next = 0;
  376. q->len -= BALLOC(b);
  377. q->dlen -= BLEN(b);
  378. QDEBUG checkb(b, "qget");
  379. /* if writer flow controlled, restart */
  380. if((q->state & Qflow) && q->len < q->limit/2){
  381. q->state &= ~Qflow;
  382. dowakeup = 1;
  383. } else
  384. dowakeup = 0;
  385. iunlock(&q->Lock);
  386. if(dowakeup)
  387. wakeup(&q->wr);
  388. return b;
  389. }
  390. /*
  391. * throw away the next 'len' bytes in the queue
  392. */
  393. int
  394. qdiscard(Queue *q, int len)
  395. {
  396. Block *b;
  397. int dowakeup, n, sofar;
  398. ilock(&q->Lock);
  399. for(sofar = 0; sofar < len; sofar += n){
  400. b = q->bfirst;
  401. if(b == nil)
  402. break;
  403. QDEBUG checkb(b, "qdiscard");
  404. n = BLEN(b);
  405. if(n <= len - sofar){
  406. q->bfirst = b->next;
  407. b->next = 0;
  408. q->len -= BALLOC(b);
  409. q->dlen -= BLEN(b);
  410. freeb(b);
  411. } else {
  412. n = len - sofar;
  413. b->rp += n;
  414. q->dlen -= n;
  415. }
  416. }
  417. /*
  418. * if writer flow controlled, restart
  419. *
  420. * This used to be
  421. * q->len < q->limit/2
  422. * but it slows down tcp too much for certain write sizes.
  423. * I really don't understand it completely. It may be
  424. * due to the queue draining so fast that the transmission
  425. * stalls waiting for the app to produce more data. - presotto
  426. *
  427. * changed back from q->len < q->limit for reno tcp. - jmk
  428. */
  429. if((q->state & Qflow) && q->len < q->limit/2){
  430. q->state &= ~Qflow;
  431. dowakeup = 1;
  432. } else
  433. dowakeup = 0;
  434. iunlock(&q->Lock);
  435. if(dowakeup)
  436. wakeup(&q->wr);
  437. return sofar;
  438. }
  439. /*
  440. * Interrupt level copy out of a queue, return # bytes copied.
  441. */
  442. int
  443. qconsume(Queue *q, void *vp, int len)
  444. {
  445. Block *b;
  446. int n, dowakeup;
  447. uint8_t *p = vp;
  448. Block *tofree = nil;
  449. /* sync with qwrite */
  450. ilock(&q->Lock);
  451. for(;;) {
  452. b = q->bfirst;
  453. if(b == 0){
  454. q->state |= Qstarve;
  455. iunlock(&q->Lock);
  456. return -1;
  457. }
  458. QDEBUG checkb(b, "qconsume 1");
  459. n = BLEN(b);
  460. if(n > 0)
  461. break;
  462. q->bfirst = b->next;
  463. q->len -= BALLOC(b);
  464. /* remember to free this */
  465. b->next = tofree;
  466. tofree = b;
  467. };
  468. if(n < len)
  469. len = n;
  470. memmove(p, b->rp, len);
  471. consumecnt += n;
  472. b->rp += len;
  473. q->dlen -= len;
  474. /* discard the block if we're done with it */
  475. if((q->state & Qmsg) || len == n){
  476. q->bfirst = b->next;
  477. b->next = 0;
  478. q->len -= BALLOC(b);
  479. q->dlen -= BLEN(b);
  480. /* remember to free this */
  481. b->next = tofree;
  482. tofree = b;
  483. }
  484. /* if writer flow controlled, restart */
  485. if((q->state & Qflow) && q->len < q->limit/2){
  486. q->state &= ~Qflow;
  487. dowakeup = 1;
  488. } else
  489. dowakeup = 0;
  490. iunlock(&q->Lock);
  491. if(dowakeup)
  492. wakeup(&q->wr);
  493. if(tofree != nil)
  494. freeblist(tofree);
  495. return len;
  496. }
  497. int
  498. qpass(Queue *q, Block *b)
  499. {
  500. int dlen, len, dowakeup;
  501. /* sync with qread */
  502. dowakeup = 0;
  503. ilock(&q->Lock);
  504. if(q->len >= q->limit){
  505. freeblist(b);
  506. iunlock(&q->Lock);
  507. return -1;
  508. }
  509. if(q->state & Qclosed){
  510. len = BALLOC(b);
  511. freeblist(b);
  512. iunlock(&q->Lock);
  513. return len;
  514. }
  515. /* add buffer to queue */
  516. if(q->bfirst)
  517. q->blast->next = b;
  518. else
  519. q->bfirst = b;
  520. len = BALLOC(b);
  521. dlen = BLEN(b);
  522. QDEBUG checkb(b, "qpass");
  523. while(b->next){
  524. b = b->next;
  525. QDEBUG checkb(b, "qpass");
  526. len += BALLOC(b);
  527. dlen += BLEN(b);
  528. }
  529. q->blast = b;
  530. q->len += len;
  531. q->dlen += dlen;
  532. if(q->len >= q->limit/2)
  533. q->state |= Qflow;
  534. if(q->state & Qstarve){
  535. q->state &= ~Qstarve;
  536. dowakeup = 1;
  537. }
  538. iunlock(&q->Lock);
  539. if(dowakeup)
  540. wakeup(&q->rr);
  541. return len;
  542. }
  543. int
  544. qpassnolim(Queue *q, Block *b)
  545. {
  546. int dlen, len, dowakeup;
  547. /* sync with qread */
  548. dowakeup = 0;
  549. ilock(&q->Lock);
  550. if(q->state & Qclosed){
  551. len = BALLOC(b);
  552. freeblist(b);
  553. iunlock(&q->Lock);
  554. return len;
  555. }
  556. /* add buffer to queue */
  557. if(q->bfirst)
  558. q->blast->next = b;
  559. else
  560. q->bfirst = b;
  561. len = BALLOC(b);
  562. dlen = BLEN(b);
  563. QDEBUG checkb(b, "qpass");
  564. while(b->next){
  565. b = b->next;
  566. QDEBUG checkb(b, "qpass");
  567. len += BALLOC(b);
  568. dlen += BLEN(b);
  569. }
  570. q->blast = b;
  571. q->len += len;
  572. q->dlen += dlen;
  573. if(q->len >= q->limit/2)
  574. q->state |= Qflow;
  575. if(q->state & Qstarve){
  576. q->state &= ~Qstarve;
  577. dowakeup = 1;
  578. }
  579. iunlock(&q->Lock);
  580. if(dowakeup)
  581. wakeup(&q->rr);
  582. return len;
  583. }
  584. /*
  585. * if the allocated space is way out of line with the used
  586. * space, reallocate to a smaller block
  587. */
  588. Block*
  589. packblock(Block *bp)
  590. {
  591. Block **l, *nbp;
  592. int n;
  593. for(l = &bp; *l; l = &(*l)->next){
  594. nbp = *l;
  595. n = BLEN(nbp);
  596. if((n<<2) < BALLOC(nbp)){
  597. *l = allocb(n);
  598. memmove((*l)->wp, nbp->rp, n);
  599. (*l)->wp += n;
  600. (*l)->next = nbp->next;
  601. freeb(nbp);
  602. }
  603. }
  604. return bp;
  605. }
  606. int
  607. qproduce(Queue *q, void *vp, int len)
  608. {
  609. Block *b;
  610. int dowakeup;
  611. uint8_t *p = vp;
  612. /* sync with qread */
  613. dowakeup = 0;
  614. ilock(&q->Lock);
  615. /* no waiting receivers, room in buffer? */
  616. if(q->len >= q->limit){
  617. q->state |= Qflow;
  618. iunlock(&q->Lock);
  619. return -1;
  620. }
  621. /* save in buffer */
  622. b = iallocb(len);
  623. if(b == 0){
  624. iunlock(&q->Lock);
  625. return 0;
  626. }
  627. memmove(b->wp, p, len);
  628. producecnt += len;
  629. b->wp += len;
  630. if(q->bfirst)
  631. q->blast->next = b;
  632. else
  633. q->bfirst = b;
  634. q->blast = b;
  635. /* b->next = 0; done by iallocb() */
  636. q->len += BALLOC(b);
  637. q->dlen += BLEN(b);
  638. QDEBUG checkb(b, "qproduce");
  639. if(q->state & Qstarve){
  640. q->state &= ~Qstarve;
  641. dowakeup = 1;
  642. }
  643. if(q->len >= q->limit)
  644. q->state |= Qflow;
  645. iunlock(&q->Lock);
  646. if(dowakeup)
  647. wakeup(&q->rr);
  648. return len;
  649. }
  650. /*
  651. * copy from offset in the queue
  652. */
  653. Block*
  654. qcopy(Queue *q, int len, uint32_t offset)
  655. {
  656. int sofar;
  657. int n;
  658. Block *b, *nb;
  659. uint8_t *p;
  660. nb = allocb(len);
  661. ilock(&q->Lock);
  662. /* go to offset */
  663. b = q->bfirst;
  664. for(sofar = 0; ; sofar += n){
  665. if(b == nil){
  666. iunlock(&q->Lock);
  667. return nb;
  668. }
  669. n = BLEN(b);
  670. if(sofar + n > offset){
  671. p = b->rp + offset - sofar;
  672. n -= offset - sofar;
  673. break;
  674. }
  675. QDEBUG checkb(b, "qcopy");
  676. b = b->next;
  677. }
  678. /* copy bytes from there */
  679. for(sofar = 0; sofar < len;){
  680. if(n > len - sofar)
  681. n = len - sofar;
  682. memmove(nb->wp, p, n);
  683. qcopycnt += n;
  684. sofar += n;
  685. nb->wp += n;
  686. b = b->next;
  687. if(b == nil)
  688. break;
  689. n = BLEN(b);
  690. p = b->rp;
  691. }
  692. iunlock(&q->Lock);
  693. return nb;
  694. }
  695. /*
  696. * called by non-interrupt code
  697. */
  698. Queue*
  699. qopen(int limit, int msg, void (*kick)(void*), void *arg)
  700. {
  701. Queue *q;
  702. q = malloc(sizeof(Queue));
  703. if(q == 0)
  704. return 0;
  705. q->limit = q->inilim = limit;
  706. q->kick = kick;
  707. q->arg = arg;
  708. q->state = msg;
  709. q->state |= Qstarve;
  710. q->eof = 0;
  711. q->noblock = 0;
  712. return q;
  713. }
  714. /* open a queue to be bypassed */
  715. Queue*
  716. qbypass(void (*bypass)(void*, Block*), void *arg)
  717. {
  718. Queue *q;
  719. q = malloc(sizeof(Queue));
  720. if(q == 0)
  721. return 0;
  722. q->limit = 0;
  723. q->arg = arg;
  724. q->bypass = bypass;
  725. q->state = 0;
  726. return q;
  727. }
  728. static int
  729. notempty(void *a)
  730. {
  731. Queue *q = a;
  732. return (q->state & Qclosed) || q->bfirst != 0;
  733. }
  734. /*
  735. * wait for the queue to be non-empty or closed.
  736. * called with q ilocked.
  737. */
  738. static int
  739. qwait(Queue *q)
  740. {
  741. /* wait for data */
  742. for(;;){
  743. if(q->bfirst != nil)
  744. break;
  745. if(q->state & Qclosed){
  746. if(++q->eof > 3)
  747. return -1;
  748. if(*q->err && strcmp(q->err, Ehungup) != 0)
  749. return -1;
  750. return 0;
  751. }
  752. q->state |= Qstarve; /* flag requesting producer to wake me */
  753. iunlock(&q->Lock);
  754. sleep(&q->rr, notempty, q);
  755. ilock(&q->Lock);
  756. }
  757. return 1;
  758. }
  759. /*
  760. * add a block list to a queue
  761. */
  762. void
  763. qaddlist(Queue *q, Block *b)
  764. {
  765. /* queue the block */
  766. if(q->bfirst)
  767. q->blast->next = b;
  768. else
  769. q->bfirst = b;
  770. q->len += blockalloclen(b);
  771. q->dlen += blocklen(b);
  772. while(b->next)
  773. b = b->next;
  774. q->blast = b;
  775. }
  776. /*
  777. * called with q ilocked
  778. */
  779. Block*
  780. qremove(Queue *q)
  781. {
  782. Block *b;
  783. b = q->bfirst;
  784. if(b == nil)
  785. return nil;
  786. q->bfirst = b->next;
  787. b->next = nil;
  788. q->dlen -= BLEN(b);
  789. q->len -= BALLOC(b);
  790. QDEBUG checkb(b, "qremove");
  791. return b;
  792. }
  793. /*
  794. * copy the contents of a string of blocks into
  795. * memory. emptied blocks are freed. return
  796. * pointer to first unconsumed block.
  797. */
  798. Block*
  799. bl2mem(uint8_t *p, Block *b, int n)
  800. {
  801. int i;
  802. Block *next;
  803. for(; b != nil; b = next){
  804. i = BLEN(b);
  805. if(i > n){
  806. memmove(p, b->rp, n);
  807. b->rp += n;
  808. return b;
  809. }
  810. memmove(p, b->rp, i);
  811. n -= i;
  812. p += i;
  813. b->rp += i;
  814. next = b->next;
  815. freeb(b);
  816. }
  817. return nil;
  818. }
  819. /*
  820. * copy the contents of memory into a string of blocks.
  821. * return nil on error.
  822. */
  823. Block*
  824. mem2bl(uint8_t *p, int len)
  825. {
  826. Proc *up = externup();
  827. int n;
  828. Block *b, *first, **l;
  829. first = nil;
  830. l = &first;
  831. if(waserror()){
  832. freeblist(first);
  833. nexterror();
  834. }
  835. do {
  836. n = len;
  837. if(n > Maxatomic)
  838. n = Maxatomic;
  839. *l = b = allocb(n);
  840. setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
  841. memmove(b->wp, p, n);
  842. b->wp += n;
  843. p += n;
  844. len -= n;
  845. l = &b->next;
  846. } while(len > 0);
  847. poperror();
  848. return first;
  849. }
  850. /*
  851. * put a block back to the front of the queue
  852. * called with q ilocked
  853. */
  854. void
  855. qputback(Queue *q, Block *b)
  856. {
  857. b->next = q->bfirst;
  858. if(q->bfirst == nil)
  859. q->blast = b;
  860. q->bfirst = b;
  861. q->len += BALLOC(b);
  862. q->dlen += BLEN(b);
  863. }
  864. /*
  865. * flow control, get producer going again
  866. * called with q ilocked
  867. */
  868. static void
  869. qwakeup_iunlock(Queue *q)
  870. {
  871. int dowakeup;
  872. /* if writer flow controlled, restart */
  873. if((q->state & Qflow) && q->len < q->limit/2){
  874. q->state &= ~Qflow;
  875. dowakeup = 1;
  876. }
  877. else
  878. dowakeup = 0;
  879. iunlock(&q->Lock);
  880. /* wakeup flow controlled writers */
  881. if(dowakeup){
  882. if(q->kick)
  883. q->kick(q->arg);
  884. wakeup(&q->wr);
  885. }
  886. }
  887. /*
  888. * get next block from a queue (up to a limit)
  889. */
  890. Block*
  891. qbread(Queue *q, int len)
  892. {
  893. Proc *up = externup();
  894. Block *b, *nb;
  895. int n;
  896. qlock(&q->rlock);
  897. if(waserror()){
  898. qunlock(&q->rlock);
  899. nexterror();
  900. }
  901. ilock(&q->Lock);
  902. switch(qwait(q)){
  903. case 0:
  904. /* queue closed */
  905. iunlock(&q->Lock);
  906. qunlock(&q->rlock);
  907. poperror();
  908. return nil;
  909. case -1:
  910. /* multiple reads on a closed queue */
  911. iunlock(&q->Lock);
  912. error(q->err);
  913. }
  914. /* if we get here, there's at least one block in the queue */
  915. b = qremove(q);
  916. n = BLEN(b);
  917. /* split block if it's too big and this is not a message queue */
  918. nb = b;
  919. if(n > len){
  920. if((q->state&Qmsg) == 0){
  921. n -= len;
  922. b = allocb(n);
  923. memmove(b->wp, nb->rp+len, n);
  924. b->wp += n;
  925. qputback(q, b);
  926. }
  927. nb->wp = nb->rp + len;
  928. }
  929. /* restart producer */
  930. qwakeup_iunlock(q);
  931. poperror();
  932. qunlock(&q->rlock);
  933. return nb;
  934. }
  935. /*
  936. * read a queue. if no data is queued, post a Block
  937. * and wait on its Rendez.
  938. */
  939. int32_t
  940. qread(Queue *q, void *vp, int len)
  941. {
  942. Proc *up = externup();
  943. Block *b, *first, **l;
  944. int blen, n;
  945. qlock(&q->rlock);
  946. if(waserror()){
  947. qunlock(&q->rlock);
  948. nexterror();
  949. }
  950. ilock(&q->Lock);
  951. again:
  952. switch(qwait(q)){
  953. case 0:
  954. /* queue closed */
  955. iunlock(&q->Lock);
  956. qunlock(&q->rlock);
  957. poperror();
  958. return 0;
  959. case -1:
  960. /* multiple reads on a closed queue */
  961. iunlock(&q->Lock);
  962. error(q->err);
  963. }
  964. /* if we get here, there's at least one block in the queue */
  965. if(q->state & Qcoalesce){
  966. /* when coalescing, 0 length blocks just go away */
  967. b = q->bfirst;
  968. if(BLEN(b) <= 0){
  969. freeb(qremove(q));
  970. goto again;
  971. }
  972. /* grab the first block plus as many
  973. * following blocks as will completely
  974. * fit in the read.
  975. */
  976. n = 0;
  977. l = &first;
  978. blen = BLEN(b);
  979. for(;;) {
  980. *l = qremove(q);
  981. l = &b->next;
  982. n += blen;
  983. b = q->bfirst;
  984. if(b == nil)
  985. break;
  986. blen = BLEN(b);
  987. if(n+blen > len)
  988. break;
  989. }
  990. } else {
  991. first = qremove(q);
  992. n = BLEN(first);
  993. }
  994. /* copy to user space outside of the ilock */
  995. iunlock(&q->Lock);
  996. b = bl2mem(vp, first, len);
  997. ilock(&q->Lock);
  998. /* take care of any left over partial block */
  999. if(b != nil){
  1000. n -= BLEN(b);
  1001. if(q->state & Qmsg)
  1002. freeb(b);
  1003. else
  1004. qputback(q, b);
  1005. }
  1006. /* restart producer */
  1007. qwakeup_iunlock(q);
  1008. poperror();
  1009. qunlock(&q->rlock);
  1010. return n;
  1011. }
  1012. static int
  1013. qnotfull(void *a)
  1014. {
  1015. Queue *q = a;
  1016. return q->len < q->limit || (q->state & Qclosed);
  1017. }
  1018. uint32_t noblockcnt;
  1019. /*
  1020. * add a block to a queue obeying flow control
  1021. */
  1022. int32_t
  1023. qbwrite(Queue *q, Block *b)
  1024. {
  1025. Proc *up = externup();
  1026. int n, dowakeup;
  1027. n = BLEN(b);
  1028. if(q->bypass){
  1029. (*q->bypass)(q->arg, b);
  1030. return n;
  1031. }
  1032. dowakeup = 0;
  1033. qlock(&q->wlock);
  1034. if(waserror()){
  1035. if(b != nil)
  1036. freeb(b);
  1037. qunlock(&q->wlock);
  1038. nexterror();
  1039. }
  1040. ilock(&q->Lock);
  1041. /* give up if the queue is closed */
  1042. if(q->state & Qclosed){
  1043. iunlock(&q->Lock);
  1044. error(q->err);
  1045. }
  1046. /* if nonblocking, don't queue over the limit */
  1047. if(q->len >= q->limit){
  1048. if(q->noblock){
  1049. iunlock(&q->Lock);
  1050. freeb(b);
  1051. noblockcnt += n;
  1052. qunlock(&q->wlock);
  1053. poperror();
  1054. return n;
  1055. }
  1056. }
  1057. /* queue the block */
  1058. if(q->bfirst)
  1059. q->blast->next = b;
  1060. else
  1061. q->bfirst = b;
  1062. q->blast = b;
  1063. b->next = 0;
  1064. q->len += BALLOC(b);
  1065. q->dlen += n;
  1066. QDEBUG checkb(b, "qbwrite");
  1067. b = nil;
  1068. /* make sure other end gets awakened */
  1069. if(q->state & Qstarve){
  1070. q->state &= ~Qstarve;
  1071. dowakeup = 1;
  1072. }
  1073. iunlock(&q->Lock);
  1074. /* get output going again */
  1075. if(q->kick && (dowakeup || (q->state&Qkick)))
  1076. q->kick(q->arg);
  1077. /* wakeup anyone consuming at the other end */
  1078. if(dowakeup)
  1079. wakeup(&q->rr);
  1080. /*
  1081. * flow control, wait for queue to get below the limit
  1082. * before allowing the process to continue and queue
  1083. * more. We do this here so that postnote can only
  1084. * interrupt us after the data has been queued. This
  1085. * means that things like 9p flushes and ssl messages
  1086. * will not be disrupted by software interrupts.
  1087. *
  1088. * Note - this is moderately dangerous since a process
  1089. * that keeps getting interrupted and rewriting will
  1090. * queue infinite crud.
  1091. */
  1092. for(;;){
  1093. if(q->noblock || qnotfull(q))
  1094. break;
  1095. ilock(&q->Lock);
  1096. q->state |= Qflow;
  1097. iunlock(&q->Lock);
  1098. sleep(&q->wr, qnotfull, q);
  1099. }
  1100. USED(b);
  1101. qunlock(&q->wlock);
  1102. poperror();
  1103. return n;
  1104. }
  1105. int32_t qibwrite(Queue *q, Block *b)
  1106. {
  1107. int n, dowakeup;
  1108. dowakeup = 0;
  1109. n = BLEN(b);
  1110. lock(&q->Lock);
  1111. if (q->bfirst)
  1112. q->blast->next = b;
  1113. else
  1114. q->bfirst = b;
  1115. q->blast = b;
  1116. q->len += BALLOC(b);
  1117. q->dlen += n;
  1118. if (q->state & Qstarve) {
  1119. q->state &= ~Qstarve;
  1120. dowakeup = 1;
  1121. }
  1122. unlock(&q->Lock);
  1123. if (dowakeup) {
  1124. if (q->kick)
  1125. q->kick(q->arg);
  1126. wakeup(&q->rr);
  1127. }
  1128. return n;
  1129. }
  1130. /*
  1131. * write to a queue. only Maxatomic bytes at a time is atomic.
  1132. */
  1133. int
  1134. qwrite(Queue *q, void *vp, int len)
  1135. {
  1136. Proc *up = externup();
  1137. int n, sofar;
  1138. Block *b;
  1139. uint8_t *p = vp;
  1140. QDEBUG if(!islo())
  1141. print("qwrite hi %#p\n", getcallerpc());
  1142. sofar = 0;
  1143. do {
  1144. n = len-sofar;
  1145. if(n > Maxatomic)
  1146. n = Maxatomic;
  1147. b = allocb(n);
  1148. setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
  1149. if(waserror()){
  1150. freeb(b);
  1151. nexterror();
  1152. }
  1153. memmove(b->wp, p+sofar, n);
  1154. poperror();
  1155. b->wp += n;
  1156. qbwrite(q, b);
  1157. sofar += n;
  1158. } while(sofar < len && (q->state & Qmsg) == 0);
  1159. return len;
  1160. }
  1161. /*
  1162. * used by print() to write to a queue. Since we may be splhi or not in
  1163. * a process, don't qlock.
  1164. *
  1165. * this routine merges adjacent blocks if block n+1 will fit into
  1166. * the free space of block n.
  1167. */
  1168. int
  1169. qiwrite(Queue *q, void *vp, int len)
  1170. {
  1171. int n, sofar, dowakeup;
  1172. Block *b;
  1173. uint8_t *p = vp;
  1174. dowakeup = 0;
  1175. sofar = 0;
  1176. do {
  1177. n = len-sofar;
  1178. if(n > Maxatomic)
  1179. n = Maxatomic;
  1180. b = iallocb(n);
  1181. if(b == nil)
  1182. break;
  1183. memmove(b->wp, p+sofar, n);
  1184. b->wp += n;
  1185. ilock(&q->Lock);
  1186. /* we use an artificially high limit for kernel prints since anything
  1187. * over the limit gets dropped
  1188. */
  1189. if(q->dlen >= 16*1024){
  1190. iunlock(&q->Lock);
  1191. freeb(b);
  1192. break;
  1193. }
  1194. QDEBUG checkb(b, "qiwrite");
  1195. if(q->bfirst)
  1196. q->blast->next = b;
  1197. else
  1198. q->bfirst = b;
  1199. q->blast = b;
  1200. q->len += BALLOC(b);
  1201. q->dlen += n;
  1202. if(q->state & Qstarve){
  1203. q->state &= ~Qstarve;
  1204. dowakeup = 1;
  1205. }
  1206. iunlock(&q->Lock);
  1207. if(dowakeup){
  1208. if(q->kick)
  1209. q->kick(q->arg);
  1210. wakeup(&q->rr);
  1211. }
  1212. sofar += n;
  1213. } while(sofar < len && (q->state & Qmsg) == 0);
  1214. return sofar;
  1215. }
  1216. /*
  1217. * be extremely careful when calling this,
  1218. * as there is no reference accounting
  1219. */
  1220. void
  1221. qfree(Queue *q)
  1222. {
  1223. qclose(q);
  1224. free(q);
  1225. }
  1226. /*
  1227. * Mark a queue as closed. No further IO is permitted.
  1228. * All blocks are released.
  1229. */
  1230. void
  1231. qclose(Queue *q)
  1232. {
  1233. Block *bfirst;
  1234. if(q == nil)
  1235. return;
  1236. /* mark it */
  1237. ilock(&q->Lock);
  1238. q->state |= Qclosed;
  1239. q->state &= ~(Qflow|Qstarve);
  1240. strcpy(q->err, Ehungup);
  1241. bfirst = q->bfirst;
  1242. q->bfirst = 0;
  1243. q->len = 0;
  1244. q->dlen = 0;
  1245. q->noblock = 0;
  1246. iunlock(&q->Lock);
  1247. /* free queued blocks */
  1248. freeblist(bfirst);
  1249. /* wake up readers/writers */
  1250. wakeup(&q->rr);
  1251. wakeup(&q->wr);
  1252. }
  1253. /*
  1254. * Mark a queue as closed. Wakeup any readers. Don't remove queued
  1255. * blocks.
  1256. */
  1257. void
  1258. qhangup(Queue *q, char *msg)
  1259. {
  1260. /* mark it */
  1261. ilock(&q->Lock);
  1262. q->state |= Qclosed;
  1263. if(msg == 0 || *msg == 0)
  1264. strcpy(q->err, Ehungup);
  1265. else
  1266. strncpy(q->err, msg, ERRMAX-1);
  1267. iunlock(&q->Lock);
  1268. /* wake up readers/writers */
  1269. wakeup(&q->rr);
  1270. wakeup(&q->wr);
  1271. }
  1272. /*
  1273. * return non-zero if the q is hungup
  1274. */
  1275. int
  1276. qisclosed(Queue *q)
  1277. {
  1278. return q->state & Qclosed;
  1279. }
  1280. /*
  1281. * mark a queue as no longer hung up
  1282. */
  1283. void
  1284. qreopen(Queue *q)
  1285. {
  1286. ilock(&q->Lock);
  1287. q->state &= ~Qclosed;
  1288. q->state |= Qstarve;
  1289. q->eof = 0;
  1290. q->limit = q->inilim;
  1291. iunlock(&q->Lock);
  1292. }
  1293. /*
  1294. * return bytes queued
  1295. */
  1296. int
  1297. qlen(Queue *q)
  1298. {
  1299. return q->dlen;
  1300. }
  1301. /*
  1302. * return space remaining before flow control
  1303. */
  1304. int
  1305. qwindow(Queue *q)
  1306. {
  1307. int l;
  1308. l = q->limit - q->len;
  1309. if(l < 0)
  1310. l = 0;
  1311. return l;
  1312. }
  1313. /*
  1314. * return true if we can read without blocking
  1315. */
  1316. int
  1317. qcanread(Queue *q)
  1318. {
  1319. return q->bfirst!=0;
  1320. }
  1321. /*
  1322. * change queue limit
  1323. */
  1324. void
  1325. qsetlimit(Queue *q, int limit)
  1326. {
  1327. q->limit = limit;
  1328. }
  1329. /*
  1330. * set blocking/nonblocking
  1331. */
  1332. void
  1333. qnoblock(Queue *q, int onoff)
  1334. {
  1335. q->noblock = onoff;
  1336. }
  1337. /*
  1338. * flush the output queue
  1339. */
  1340. void
  1341. qflush(Queue *q)
  1342. {
  1343. Block *bfirst;
  1344. /* mark it */
  1345. ilock(&q->Lock);
  1346. bfirst = q->bfirst;
  1347. q->bfirst = 0;
  1348. q->len = 0;
  1349. q->dlen = 0;
  1350. iunlock(&q->Lock);
  1351. /* free queued blocks */
  1352. freeblist(bfirst);
  1353. /* wake up readers/writers */
  1354. wakeup(&q->wr);
  1355. }
  1356. int
  1357. qfull(Queue *q)
  1358. {
  1359. return q->state & Qflow;
  1360. }
  1361. int
  1362. qstate(Queue *q)
  1363. {
  1364. return q->state;
  1365. }