qio.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521
  1. #include "u.h"
  2. #include "../port/lib.h"
  3. #include "mem.h"
  4. #include "dat.h"
  5. #include "fns.h"
  6. #include "../port/error.h"
  7. static ulong padblockcnt;
  8. static ulong concatblockcnt;
  9. static ulong pullupblockcnt;
  10. static ulong copyblockcnt;
  11. static ulong consumecnt;
  12. static ulong producecnt;
  13. static ulong qcopycnt;
  14. static int debugging;
  15. #define QDEBUG if(0)
  16. /*
  17. * IO queues
  18. */
  19. typedef struct Queue Queue;
  20. struct Queue
  21. {
  22. Lock;
  23. Block* bfirst; /* buffer */
  24. Block* blast;
  25. int len; /* bytes allocated to queue */
  26. int dlen; /* data bytes in queue */
  27. int limit; /* max bytes in queue */
  28. int inilim; /* initial limit */
  29. int state;
  30. int noblock; /* true if writes return immediately when q full */
  31. int eof; /* number of eofs read by user */
  32. void (*kick)(void*); /* restart output */
  33. void (*bypass)(void*, Block*); /* bypass queue altogether */
  34. void* arg; /* argument to kick */
  35. QLock rlock; /* mutex for reading processes */
  36. Rendez rr; /* process waiting to read */
  37. QLock wlock; /* mutex for writing processes */
  38. Rendez wr; /* process waiting to write */
  39. char err[ERRMAX];
  40. };
  41. enum
  42. {
  43. Maxatomic = 64*1024,
  44. };
  45. uint qiomaxatomic = Maxatomic;
  46. void
  47. ixsummary(void)
  48. {
  49. debugging ^= 1;
  50. iallocsummary();
  51. print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
  52. padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
  53. print("consume %lud, produce %lud, qcopy %lud\n",
  54. consumecnt, producecnt, qcopycnt);
  55. }
  56. /*
  57. * free a list of blocks
  58. */
  59. void
  60. freeblist(Block *b)
  61. {
  62. Block *next;
  63. for(; b != 0; b = next){
  64. next = b->next;
  65. b->next = 0;
  66. freeb(b);
  67. }
  68. }
  69. /*
  70. * pad a block to the front (or the back if size is negative)
  71. */
  72. Block*
  73. padblock(Block *bp, int size)
  74. {
  75. int n;
  76. Block *nbp;
  77. QDEBUG checkb(bp, "padblock 1");
  78. if(size >= 0){
  79. if(bp->rp - bp->base >= size){
  80. bp->rp -= size;
  81. return bp;
  82. }
  83. if(bp->next)
  84. panic("padblock 0x%luX", getcallerpc(&bp));
  85. n = BLEN(bp);
  86. padblockcnt++;
  87. nbp = allocb(size+n);
  88. nbp->rp += size;
  89. nbp->wp = nbp->rp;
  90. memmove(nbp->wp, bp->rp, n);
  91. nbp->wp += n;
  92. freeb(bp);
  93. nbp->rp -= size;
  94. } else {
  95. size = -size;
  96. if(bp->next)
  97. panic("padblock 0x%luX", getcallerpc(&bp));
  98. if(bp->lim - bp->wp >= size)
  99. return bp;
  100. n = BLEN(bp);
  101. padblockcnt++;
  102. nbp = allocb(size+n);
  103. memmove(nbp->wp, bp->rp, n);
  104. nbp->wp += n;
  105. freeb(bp);
  106. }
  107. QDEBUG checkb(nbp, "padblock 1");
  108. return nbp;
  109. }
  110. /*
  111. * return count of bytes in a string of blocks
  112. */
  113. int
  114. blocklen(Block *bp)
  115. {
  116. int len;
  117. len = 0;
  118. while(bp) {
  119. len += BLEN(bp);
  120. bp = bp->next;
  121. }
  122. return len;
  123. }
  124. /*
  125. * return count of space in blocks
  126. */
  127. int
  128. blockalloclen(Block *bp)
  129. {
  130. int len;
  131. len = 0;
  132. while(bp) {
  133. len += BALLOC(bp);
  134. bp = bp->next;
  135. }
  136. return len;
  137. }
  138. /*
  139. * copy the string of blocks into
  140. * a single block and free the string
  141. */
  142. Block*
  143. concatblock(Block *bp)
  144. {
  145. int len;
  146. Block *nb, *f;
  147. if(bp->next == 0)
  148. return bp;
  149. nb = allocb(blocklen(bp));
  150. for(f = bp; f; f = f->next) {
  151. len = BLEN(f);
  152. memmove(nb->wp, f->rp, len);
  153. nb->wp += len;
  154. }
  155. concatblockcnt += BLEN(nb);
  156. freeblist(bp);
  157. QDEBUG checkb(nb, "concatblock 1");
  158. return nb;
  159. }
  160. /*
  161. * make sure the first block has at least n bytes
  162. */
  163. Block*
  164. pullupblock(Block *bp, int n)
  165. {
  166. int i;
  167. Block *nbp;
  168. /*
  169. * this should almost always be true, it's
  170. * just to avoid every caller checking.
  171. */
  172. if(BLEN(bp) >= n)
  173. return bp;
  174. /*
  175. * if not enough room in the first block,
  176. * add another to the front of the list.
  177. */
  178. if(bp->lim - bp->rp < n){
  179. nbp = allocb(n);
  180. nbp->next = bp;
  181. bp = nbp;
  182. }
  183. /*
  184. * copy bytes from the trailing blocks into the first
  185. */
  186. n -= BLEN(bp);
  187. while(nbp = bp->next){
  188. i = BLEN(nbp);
  189. if(i > n) {
  190. memmove(bp->wp, nbp->rp, n);
  191. pullupblockcnt++;
  192. bp->wp += n;
  193. nbp->rp += n;
  194. QDEBUG checkb(bp, "pullupblock 1");
  195. return bp;
  196. }
  197. else {
  198. memmove(bp->wp, nbp->rp, i);
  199. pullupblockcnt++;
  200. bp->wp += i;
  201. bp->next = nbp->next;
  202. nbp->next = 0;
  203. freeb(nbp);
  204. n -= i;
  205. if(n == 0){
  206. QDEBUG checkb(bp, "pullupblock 2");
  207. return bp;
  208. }
  209. }
  210. }
  211. freeb(bp);
  212. return 0;
  213. }
  214. /*
  215. * make sure the first block has at least n bytes
  216. */
  217. Block*
  218. pullupqueue(Queue *q, int n)
  219. {
  220. Block *b;
  221. if(BLEN(q->bfirst) >= n)
  222. return q->bfirst;
  223. q->bfirst = pullupblock(q->bfirst, n);
  224. for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
  225. ;
  226. q->blast = b;
  227. return q->bfirst;
  228. }
  229. /*
  230. * trim to len bytes starting at offset
  231. */
  232. Block *
  233. trimblock(Block *bp, int offset, int len)
  234. {
  235. ulong l;
  236. Block *nb, *startb;
  237. QDEBUG checkb(bp, "trimblock 1");
  238. if(blocklen(bp) < offset+len) {
  239. freeblist(bp);
  240. return nil;
  241. }
  242. while((l = BLEN(bp)) < offset) {
  243. offset -= l;
  244. nb = bp->next;
  245. bp->next = nil;
  246. freeb(bp);
  247. bp = nb;
  248. }
  249. startb = bp;
  250. bp->rp += offset;
  251. while((l = BLEN(bp)) < len) {
  252. len -= l;
  253. bp = bp->next;
  254. }
  255. bp->wp -= (BLEN(bp) - len);
  256. if(bp->next) {
  257. freeblist(bp->next);
  258. bp->next = nil;
  259. }
  260. return startb;
  261. }
  262. /*
  263. * copy 'count' bytes into a new block
  264. */
  265. Block*
  266. copyblock(Block *bp, int count)
  267. {
  268. int l;
  269. Block *nbp;
  270. QDEBUG checkb(bp, "copyblock 0");
  271. nbp = allocb(count);
  272. for(; count > 0 && bp != 0; bp = bp->next){
  273. l = BLEN(bp);
  274. if(l > count)
  275. l = count;
  276. memmove(nbp->wp, bp->rp, l);
  277. nbp->wp += l;
  278. count -= l;
  279. }
  280. if(count > 0){
  281. memset(nbp->wp, 0, count);
  282. nbp->wp += count;
  283. }
  284. copyblockcnt++;
  285. QDEBUG checkb(nbp, "copyblock 1");
  286. return nbp;
  287. }
  288. Block*
  289. adjustblock(Block* bp, int len)
  290. {
  291. int n;
  292. Block *nbp;
  293. if(len < 0){
  294. freeb(bp);
  295. return nil;
  296. }
  297. if(bp->rp+len > bp->lim){
  298. nbp = copyblock(bp, len);
  299. freeblist(bp);
  300. QDEBUG checkb(nbp, "adjustblock 1");
  301. return nbp;
  302. }
  303. n = BLEN(bp);
  304. if(len > n)
  305. memset(bp->wp, 0, len-n);
  306. bp->wp = bp->rp+len;
  307. QDEBUG checkb(bp, "adjustblock 2");
  308. return bp;
  309. }
  310. /*
  311. * throw away up to count bytes from a
  312. * list of blocks. Return count of bytes
  313. * thrown away.
  314. */
  315. int
  316. pullblock(Block **bph, int count)
  317. {
  318. Block *bp;
  319. int n, bytes;
  320. bytes = 0;
  321. if(bph == nil)
  322. return 0;
  323. while(*bph != nil && count != 0) {
  324. bp = *bph;
  325. n = BLEN(bp);
  326. if(count < n)
  327. n = count;
  328. bytes += n;
  329. count -= n;
  330. bp->rp += n;
  331. QDEBUG checkb(bp, "pullblock ");
  332. if(BLEN(bp) == 0) {
  333. *bph = bp->next;
  334. bp->next = nil;
  335. freeb(bp);
  336. }
  337. }
  338. return bytes;
  339. }
  340. /*
  341. * get next block from a queue, return null if nothing there
  342. */
  343. Block*
  344. qget(Queue *q)
  345. {
  346. int dowakeup;
  347. Block *b;
  348. /* sync with qwrite */
  349. ilock(q);
  350. b = q->bfirst;
  351. if(b == nil){
  352. q->state |= Qstarve;
  353. iunlock(q);
  354. return nil;
  355. }
  356. q->bfirst = b->next;
  357. b->next = 0;
  358. q->len -= BALLOC(b);
  359. q->dlen -= BLEN(b);
  360. QDEBUG checkb(b, "qget");
  361. /* if writer flow controlled, restart */
  362. if((q->state & Qflow) && q->len < q->limit/2){
  363. q->state &= ~Qflow;
  364. dowakeup = 1;
  365. } else
  366. dowakeup = 0;
  367. iunlock(q);
  368. if(dowakeup)
  369. wakeup(&q->wr);
  370. return b;
  371. }
  372. /*
  373. * throw away the next 'len' bytes in the queue
  374. */
  375. int
  376. qdiscard(Queue *q, int len)
  377. {
  378. Block *b;
  379. int dowakeup, n, sofar;
  380. ilock(q);
  381. for(sofar = 0; sofar < len; sofar += n){
  382. b = q->bfirst;
  383. if(b == nil)
  384. break;
  385. QDEBUG checkb(b, "qdiscard");
  386. n = BLEN(b);
  387. if(n <= len - sofar){
  388. q->bfirst = b->next;
  389. b->next = 0;
  390. q->len -= BALLOC(b);
  391. q->dlen -= BLEN(b);
  392. freeb(b);
  393. } else {
  394. n = len - sofar;
  395. b->rp += n;
  396. q->dlen -= n;
  397. }
  398. }
  399. /*
  400. * if writer flow controlled, restart
  401. *
  402. * This used to be
  403. * q->len < q->limit/2
  404. * but it slows down tcp too much for certain write sizes.
  405. * I really don't understand it completely. It may be
  406. * due to the queue draining so fast that the transmission
  407. * stalls waiting for the app to produce more data. - presotto
  408. */
  409. if((q->state & Qflow) && q->len < q->limit){
  410. q->state &= ~Qflow;
  411. dowakeup = 1;
  412. } else
  413. dowakeup = 0;
  414. iunlock(q);
  415. if(dowakeup)
  416. wakeup(&q->wr);
  417. return sofar;
  418. }
  419. /*
  420. * Interrupt level copy out of a queue, return # bytes copied.
  421. */
  422. int
  423. qconsume(Queue *q, void *vp, int len)
  424. {
  425. Block *b;
  426. int n, dowakeup;
  427. uchar *p = vp;
  428. Block *tofree = nil;
  429. /* sync with qwrite */
  430. ilock(q);
  431. for(;;) {
  432. b = q->bfirst;
  433. if(b == 0){
  434. q->state |= Qstarve;
  435. iunlock(q);
  436. return -1;
  437. }
  438. QDEBUG checkb(b, "qconsume 1");
  439. n = BLEN(b);
  440. if(n > 0)
  441. break;
  442. q->bfirst = b->next;
  443. q->len -= BALLOC(b);
  444. /* remember to free this */
  445. b->next = tofree;
  446. tofree = b;
  447. };
  448. if(n < len)
  449. len = n;
  450. memmove(p, b->rp, len);
  451. consumecnt += n;
  452. b->rp += len;
  453. q->dlen -= len;
  454. /* discard the block if we're done with it */
  455. if((q->state & Qmsg) || len == n){
  456. q->bfirst = b->next;
  457. b->next = 0;
  458. q->len -= BALLOC(b);
  459. q->dlen -= BLEN(b);
  460. /* remember to free this */
  461. b->next = tofree;
  462. tofree = b;
  463. }
  464. /* if writer flow controlled, restart */
  465. if((q->state & Qflow) && q->len < q->limit/2){
  466. q->state &= ~Qflow;
  467. dowakeup = 1;
  468. } else
  469. dowakeup = 0;
  470. iunlock(q);
  471. if(dowakeup)
  472. wakeup(&q->wr);
  473. if(tofree != nil)
  474. freeblist(tofree);
  475. return len;
  476. }
  477. int
  478. qpass(Queue *q, Block *b)
  479. {
  480. int dlen, len, dowakeup;
  481. /* sync with qread */
  482. dowakeup = 0;
  483. ilock(q);
  484. if(q->len >= q->limit){
  485. freeblist(b);
  486. iunlock(q);
  487. return -1;
  488. }
  489. if(q->state & Qclosed){
  490. freeblist(b);
  491. iunlock(q);
  492. return BALLOC(b);
  493. }
  494. /* add buffer to queue */
  495. if(q->bfirst)
  496. q->blast->next = b;
  497. else
  498. q->bfirst = b;
  499. len = BALLOC(b);
  500. dlen = BLEN(b);
  501. QDEBUG checkb(b, "qpass");
  502. while(b->next){
  503. b = b->next;
  504. QDEBUG checkb(b, "qpass");
  505. len += BALLOC(b);
  506. dlen += BLEN(b);
  507. }
  508. q->blast = b;
  509. q->len += len;
  510. q->dlen += dlen;
  511. if(q->len >= q->limit/2)
  512. q->state |= Qflow;
  513. if(q->state & Qstarve){
  514. q->state &= ~Qstarve;
  515. dowakeup = 1;
  516. }
  517. iunlock(q);
  518. if(dowakeup)
  519. wakeup(&q->rr);
  520. return len;
  521. }
  522. int
  523. qpassnolim(Queue *q, Block *b)
  524. {
  525. int dlen, len, dowakeup;
  526. /* sync with qread */
  527. dowakeup = 0;
  528. ilock(q);
  529. if(q->state & Qclosed){
  530. freeblist(b);
  531. iunlock(q);
  532. return BALLOC(b);
  533. }
  534. /* add buffer to queue */
  535. if(q->bfirst)
  536. q->blast->next = b;
  537. else
  538. q->bfirst = b;
  539. len = BALLOC(b);
  540. dlen = BLEN(b);
  541. QDEBUG checkb(b, "qpass");
  542. while(b->next){
  543. b = b->next;
  544. QDEBUG checkb(b, "qpass");
  545. len += BALLOC(b);
  546. dlen += BLEN(b);
  547. }
  548. q->blast = b;
  549. q->len += len;
  550. q->dlen += dlen;
  551. if(q->len >= q->limit/2)
  552. q->state |= Qflow;
  553. if(q->state & Qstarve){
  554. q->state &= ~Qstarve;
  555. dowakeup = 1;
  556. }
  557. iunlock(q);
  558. if(dowakeup)
  559. wakeup(&q->rr);
  560. return len;
  561. }
  562. /*
  563. * if the allocated space is way out of line with the used
  564. * space, reallocate to a smaller block
  565. */
  566. Block*
  567. packblock(Block *bp)
  568. {
  569. Block **l, *nbp;
  570. int n;
  571. for(l = &bp; *l; l = &(*l)->next){
  572. nbp = *l;
  573. n = BLEN(nbp);
  574. if((n<<2) < BALLOC(nbp)){
  575. *l = allocb(n);
  576. memmove((*l)->wp, nbp->rp, n);
  577. (*l)->wp += n;
  578. (*l)->next = nbp->next;
  579. freeb(nbp);
  580. }
  581. }
  582. return bp;
  583. }
  584. int
  585. qproduce(Queue *q, void *vp, int len)
  586. {
  587. Block *b;
  588. int dowakeup;
  589. uchar *p = vp;
  590. /* sync with qread */
  591. dowakeup = 0;
  592. ilock(q);
  593. /* no waiting receivers, room in buffer? */
  594. if(q->len >= q->limit){
  595. q->state |= Qflow;
  596. iunlock(q);
  597. return -1;
  598. }
  599. /* save in buffer */
  600. b = iallocb(len);
  601. if(b == 0){
  602. iunlock(q);
  603. return 0;
  604. }
  605. memmove(b->wp, p, len);
  606. producecnt += len;
  607. b->wp += len;
  608. if(q->bfirst)
  609. q->blast->next = b;
  610. else
  611. q->bfirst = b;
  612. q->blast = b;
  613. /* b->next = 0; done by iallocb() */
  614. q->len += BALLOC(b);
  615. q->dlen += BLEN(b);
  616. QDEBUG checkb(b, "qproduce");
  617. if(q->state & Qstarve){
  618. q->state &= ~Qstarve;
  619. dowakeup = 1;
  620. }
  621. if(q->len >= q->limit)
  622. q->state |= Qflow;
  623. iunlock(q);
  624. if(dowakeup)
  625. wakeup(&q->rr);
  626. return len;
  627. }
  628. /*
  629. * copy from offset in the queue
  630. */
  631. Block*
  632. qcopy(Queue *q, int len, ulong offset)
  633. {
  634. int sofar;
  635. int n;
  636. Block *b, *nb;
  637. uchar *p;
  638. nb = allocb(len);
  639. ilock(q);
  640. /* go to offset */
  641. b = q->bfirst;
  642. for(sofar = 0; ; sofar += n){
  643. if(b == nil){
  644. iunlock(q);
  645. return nb;
  646. }
  647. n = BLEN(b);
  648. if(sofar + n > offset){
  649. p = b->rp + offset - sofar;
  650. n -= offset - sofar;
  651. break;
  652. }
  653. QDEBUG checkb(b, "qcopy");
  654. b = b->next;
  655. }
  656. /* copy bytes from there */
  657. for(sofar = 0; sofar < len;){
  658. if(n > len - sofar)
  659. n = len - sofar;
  660. memmove(nb->wp, p, n);
  661. qcopycnt += n;
  662. sofar += n;
  663. nb->wp += n;
  664. b = b->next;
  665. if(b == nil)
  666. break;
  667. n = BLEN(b);
  668. p = b->rp;
  669. }
  670. iunlock(q);
  671. return nb;
  672. }
  673. /*
  674. * called by non-interrupt code
  675. */
  676. Queue*
  677. qopen(int limit, int msg, void (*kick)(void*), void *arg)
  678. {
  679. Queue *q;
  680. q = malloc(sizeof(Queue));
  681. if(q == 0)
  682. return 0;
  683. q->limit = q->inilim = limit;
  684. q->kick = kick;
  685. q->arg = arg;
  686. q->state = msg;
  687. q->state |= Qstarve;
  688. q->eof = 0;
  689. q->noblock = 0;
  690. return q;
  691. }
  692. /* open a queue to be bypassed */
  693. Queue*
  694. qbypass(void (*bypass)(void*, Block*), void *arg)
  695. {
  696. Queue *q;
  697. q = malloc(sizeof(Queue));
  698. if(q == 0)
  699. return 0;
  700. q->limit = 0;
  701. q->arg = arg;
  702. q->bypass = bypass;
  703. q->state = 0;
  704. return q;
  705. }
  706. static int
  707. notempty(void *a)
  708. {
  709. Queue *q = a;
  710. return (q->state & Qclosed) || q->bfirst != 0;
  711. }
  712. /*
  713. * wait for the queue to be non-empty or closed.
  714. * called with q ilocked.
  715. */
  716. static int
  717. qwait(Queue *q)
  718. {
  719. /* wait for data */
  720. for(;;){
  721. if(q->bfirst != nil)
  722. break;
  723. if(q->state & Qclosed){
  724. if(++q->eof > 3)
  725. return -1;
  726. if(*q->err && strcmp(q->err, Ehungup) != 0)
  727. return -1;
  728. return 0;
  729. }
  730. q->state |= Qstarve; /* flag requesting producer to wake me */
  731. iunlock(q);
  732. sleep(&q->rr, notempty, q);
  733. ilock(q);
  734. }
  735. return 1;
  736. }
  737. /*
  738. * add a block list to a queue
  739. */
  740. void
  741. qaddlist(Queue *q, Block *b)
  742. {
  743. /* queue the block */
  744. if(q->bfirst)
  745. q->blast->next = b;
  746. else
  747. q->bfirst = b;
  748. q->len += blockalloclen(b);
  749. q->dlen += blocklen(b);
  750. while(b->next)
  751. b = b->next;
  752. q->blast = b;
  753. }
  754. /*
  755. * called with q ilocked
  756. */
  757. Block*
  758. qremove(Queue *q)
  759. {
  760. Block *b;
  761. b = q->bfirst;
  762. if(b == nil)
  763. return nil;
  764. q->bfirst = b->next;
  765. b->next = nil;
  766. q->dlen -= BLEN(b);
  767. q->len -= BALLOC(b);
  768. QDEBUG checkb(b, "qremove");
  769. return b;
  770. }
  771. /*
  772. * copy the contents of a string of blocks into
  773. * memory. emptied blocks are freed. return
  774. * pointer to first unconsumed block.
  775. */
  776. Block*
  777. bl2mem(uchar *p, Block *b, int n)
  778. {
  779. int i;
  780. Block *next;
  781. for(; b != nil; b = next){
  782. i = BLEN(b);
  783. if(i > n){
  784. memmove(p, b->rp, n);
  785. b->rp += n;
  786. return b;
  787. }
  788. memmove(p, b->rp, i);
  789. n -= i;
  790. p += i;
  791. b->rp += i;
  792. next = b->next;
  793. freeb(b);
  794. }
  795. return nil;
  796. }
  797. /*
  798. * copy the contents of memory into a string of blocks.
  799. * return nil on error.
  800. */
  801. Block*
  802. mem2bl(uchar *p, int len)
  803. {
  804. int n;
  805. Block *b, *first, **l;
  806. first = nil;
  807. l = &first;
  808. if(waserror()){
  809. freeblist(first);
  810. nexterror();
  811. }
  812. do {
  813. n = len;
  814. if(n > Maxatomic)
  815. n = Maxatomic;
  816. *l = b = allocb(n);
  817. setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
  818. memmove(b->wp, p, n);
  819. b->wp += n;
  820. p += n;
  821. len -= n;
  822. l = &b->next;
  823. } while(len > 0);
  824. poperror();
  825. return first;
  826. }
  827. /*
  828. * put a block back to the front of the queue
  829. * called with q ilocked
  830. */
  831. void
  832. qputback(Queue *q, Block *b)
  833. {
  834. b->next = q->bfirst;
  835. if(q->bfirst == nil)
  836. q->blast = b;
  837. q->bfirst = b;
  838. q->len += BALLOC(b);
  839. q->dlen += BLEN(b);
  840. }
  841. /*
  842. * flow control, get producer going again
  843. * called with q ilocked
  844. */
  845. static void
  846. qwakeup_iunlock(Queue *q)
  847. {
  848. int dowakeup = 0;
  849. /* if writer flow controlled, restart */
  850. if((q->state & Qflow) && q->len < q->limit/2){
  851. q->state &= ~Qflow;
  852. dowakeup = 1;
  853. }
  854. iunlock(q);
  855. /* wakeup flow controlled writers */
  856. if(dowakeup){
  857. if(q->kick)
  858. q->kick(q->arg);
  859. wakeup(&q->wr);
  860. }
  861. }
  862. /*
  863. * get next block from a queue (up to a limit)
  864. */
  865. Block*
  866. qbread(Queue *q, int len)
  867. {
  868. Block *b, *nb;
  869. int n;
  870. qlock(&q->rlock);
  871. if(waserror()){
  872. qunlock(&q->rlock);
  873. nexterror();
  874. }
  875. ilock(q);
  876. switch(qwait(q)){
  877. case 0:
  878. /* queue closed */
  879. iunlock(q);
  880. qunlock(&q->rlock);
  881. poperror();
  882. return nil;
  883. case -1:
  884. /* multiple reads on a closed queue */
  885. iunlock(q);
  886. error(q->err);
  887. }
  888. /* if we get here, there's at least one block in the queue */
  889. b = qremove(q);
  890. n = BLEN(b);
  891. /* split block if it's too big and this is not a message queue */
  892. nb = b;
  893. if(n > len){
  894. if((q->state&Qmsg) == 0){
  895. n -= len;
  896. b = allocb(n);
  897. memmove(b->wp, nb->rp+len, n);
  898. b->wp += n;
  899. qputback(q, b);
  900. }
  901. nb->wp = nb->rp + len;
  902. }
  903. /* restart producer */
  904. qwakeup_iunlock(q);
  905. poperror();
  906. qunlock(&q->rlock);
  907. return nb;
  908. }
  909. /*
  910. * read a queue. if no data is queued, post a Block
  911. * and wait on its Rendez.
  912. */
  913. long
  914. qread(Queue *q, void *vp, int len)
  915. {
  916. Block *b, *first, **l;
  917. int m, n;
  918. qlock(&q->rlock);
  919. if(waserror()){
  920. qunlock(&q->rlock);
  921. nexterror();
  922. }
  923. ilock(q);
  924. again:
  925. switch(qwait(q)){
  926. case 0:
  927. /* queue closed */
  928. iunlock(q);
  929. qunlock(&q->rlock);
  930. poperror();
  931. return 0;
  932. case -1:
  933. /* multiple reads on a closed queue */
  934. iunlock(q);
  935. error(q->err);
  936. }
  937. /* if we get here, there's at least one block in the queue */
  938. if(q->state & Qcoalesce){
  939. /* when coalescing, 0 length blocks just go away */
  940. b = q->bfirst;
  941. if(BLEN(b) <= 0){
  942. freeb(qremove(q));
  943. goto again;
  944. }
  945. /* grab the first block plus as many
  946. * following blocks as will completely
  947. * fit in the read.
  948. */
  949. n = 0;
  950. l = &first;
  951. m = BLEN(b);
  952. for(;;) {
  953. *l = qremove(q);
  954. l = &b->next;
  955. n += m;
  956. b = q->bfirst;
  957. if(b == nil)
  958. break;
  959. m = BLEN(b);
  960. if(n+m > len)
  961. break;
  962. }
  963. } else {
  964. first = qremove(q);
  965. n = BLEN(first);
  966. }
  967. /* copy to user space outside of the ilock */
  968. iunlock(q);
  969. b = bl2mem(vp, first, len);
  970. ilock(q);
  971. /* take care of any left over partial block */
  972. if(b != nil){
  973. n -= BLEN(b);
  974. if(q->state & Qmsg)
  975. freeb(b);
  976. else
  977. qputback(q, b);
  978. }
  979. /* restart producer */
  980. qwakeup_iunlock(q);
  981. poperror();
  982. qunlock(&q->rlock);
  983. return n;
  984. }
  985. static int
  986. qnotfull(void *a)
  987. {
  988. Queue *q = a;
  989. return q->len < q->limit || (q->state & Qclosed);
  990. }
  991. ulong noblockcnt;
  992. /*
  993. * add a block to a queue obeying flow control
  994. */
  995. long
  996. qbwrite(Queue *q, Block *b)
  997. {
  998. int n, dowakeup;
  999. Proc *p;
  1000. n = BLEN(b);
  1001. if(q->bypass){
  1002. (*q->bypass)(q->arg, b);
  1003. return n;
  1004. }
  1005. dowakeup = 0;
  1006. qlock(&q->wlock);
  1007. if(waserror()){
  1008. if(b != nil)
  1009. freeb(b);
  1010. qunlock(&q->wlock);
  1011. nexterror();
  1012. }
  1013. ilock(q);
  1014. /* give up if the queue is closed */
  1015. if(q->state & Qclosed){
  1016. iunlock(q);
  1017. error(q->err);
  1018. }
  1019. /* if nonblocking, don't queue over the limit */
  1020. if(q->len >= q->limit){
  1021. if(q->noblock){
  1022. iunlock(q);
  1023. freeb(b);
  1024. noblockcnt += n;
  1025. qunlock(&q->wlock);
  1026. poperror();
  1027. return n;
  1028. }
  1029. }
  1030. /* queue the block */
  1031. if(q->bfirst)
  1032. q->blast->next = b;
  1033. else
  1034. q->bfirst = b;
  1035. q->blast = b;
  1036. b->next = 0;
  1037. q->len += BALLOC(b);
  1038. q->dlen += n;
  1039. QDEBUG checkb(b, "qbwrite");
  1040. b = nil;
  1041. /* make sure other end gets awakened */
  1042. if(q->state & Qstarve){
  1043. q->state &= ~Qstarve;
  1044. dowakeup = 1;
  1045. }
  1046. iunlock(q);
  1047. /* get output going again */
  1048. if(q->kick && (dowakeup || (q->state&Qkick)))
  1049. q->kick(q->arg);
  1050. /* wakeup anyone consuming at the other end */
  1051. if(dowakeup){
  1052. p = wakeup(&q->rr);
  1053. /* if we just wokeup a higher priority process, let it run */
  1054. if(p != nil && p->priority > up->priority)
  1055. sched();
  1056. }
  1057. /*
  1058. * flow control, wait for queue to get below the limit
  1059. * before allowing the process to continue and queue
  1060. * more. We do this here so that postnote can only
  1061. * interrupt us after the data has been queued. This
  1062. * means that things like 9p flushes and ssl messages
  1063. * will not be disrupted by software interrupts.
  1064. *
  1065. * Note - this is moderately dangerous since a process
  1066. * that keeps getting interrupted and rewriting will
  1067. * queue infinite crud.
  1068. */
  1069. for(;;){
  1070. if(q->noblock || qnotfull(q))
  1071. break;
  1072. ilock(q);
  1073. q->state |= Qflow;
  1074. iunlock(q);
  1075. sleep(&q->wr, qnotfull, q);
  1076. }
  1077. USED(b);
  1078. qunlock(&q->wlock);
  1079. poperror();
  1080. return n;
  1081. }
  1082. /*
  1083. * write to a queue. only Maxatomic bytes at a time is atomic.
  1084. */
  1085. int
  1086. qwrite(Queue *q, void *vp, int len)
  1087. {
  1088. int n, sofar;
  1089. Block *b;
  1090. uchar *p = vp;
  1091. QDEBUG if(!islo())
  1092. print("qwrite hi %lux\n", getcallerpc(&q));
  1093. sofar = 0;
  1094. do {
  1095. n = len-sofar;
  1096. if(n > Maxatomic)
  1097. n = Maxatomic;
  1098. b = allocb(n);
  1099. setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
  1100. if(waserror()){
  1101. freeb(b);
  1102. nexterror();
  1103. }
  1104. memmove(b->wp, p+sofar, n);
  1105. poperror();
  1106. b->wp += n;
  1107. qbwrite(q, b);
  1108. sofar += n;
  1109. } while(sofar < len && (q->state & Qmsg) == 0);
  1110. return len;
  1111. }
  1112. /*
  1113. * used by print() to write to a queue. Since we may be splhi or not in
  1114. * a process, don't qlock.
  1115. */
  1116. int
  1117. qiwrite(Queue *q, void *vp, int len)
  1118. {
  1119. int n, sofar, dowakeup;
  1120. Block *b;
  1121. uchar *p = vp;
  1122. dowakeup = 0;
  1123. sofar = 0;
  1124. do {
  1125. n = len-sofar;
  1126. if(n > Maxatomic)
  1127. n = Maxatomic;
  1128. b = iallocb(n);
  1129. if(b == nil)
  1130. break;
  1131. memmove(b->wp, p+sofar, n);
  1132. b->wp += n;
  1133. ilock(q);
  1134. QDEBUG checkb(b, "qiwrite");
  1135. if(q->bfirst)
  1136. q->blast->next = b;
  1137. else
  1138. q->bfirst = b;
  1139. q->blast = b;
  1140. q->len += BALLOC(b);
  1141. q->dlen += n;
  1142. if(q->state & Qstarve){
  1143. q->state &= ~Qstarve;
  1144. dowakeup = 1;
  1145. }
  1146. iunlock(q);
  1147. if(dowakeup){
  1148. if(q->kick)
  1149. q->kick(q->arg);
  1150. wakeup(&q->rr);
  1151. }
  1152. sofar += n;
  1153. } while(sofar < len && (q->state & Qmsg) == 0);
  1154. return sofar;
  1155. }
  1156. /*
  1157. * be extremely careful when calling this,
  1158. * as there is no reference accounting
  1159. */
  1160. void
  1161. qfree(Queue *q)
  1162. {
  1163. qclose(q);
  1164. free(q);
  1165. }
  1166. /*
  1167. * Mark a queue as closed. No further IO is permitted.
  1168. * All blocks are released.
  1169. */
  1170. void
  1171. qclose(Queue *q)
  1172. {
  1173. Block *bfirst;
  1174. if(q == nil)
  1175. return;
  1176. /* mark it */
  1177. ilock(q);
  1178. q->state |= Qclosed;
  1179. q->state &= ~(Qflow|Qstarve);
  1180. strcpy(q->err, Ehungup);
  1181. bfirst = q->bfirst;
  1182. q->bfirst = 0;
  1183. q->len = 0;
  1184. q->dlen = 0;
  1185. q->noblock = 0;
  1186. iunlock(q);
  1187. /* free queued blocks */
  1188. freeblist(bfirst);
  1189. /* wake up readers/writers */
  1190. wakeup(&q->rr);
  1191. wakeup(&q->wr);
  1192. }
  1193. /*
  1194. * Mark a queue as closed. Wakeup any readers. Don't remove queued
  1195. * blocks.
  1196. */
  1197. void
  1198. qhangup(Queue *q, char *msg)
  1199. {
  1200. /* mark it */
  1201. ilock(q);
  1202. q->state |= Qclosed;
  1203. if(msg == 0 || *msg == 0)
  1204. strcpy(q->err, Ehungup);
  1205. else
  1206. strncpy(q->err, msg, ERRMAX-1);
  1207. iunlock(q);
  1208. /* wake up readers/writers */
  1209. wakeup(&q->rr);
  1210. wakeup(&q->wr);
  1211. }
  1212. /*
  1213. * return non-zero if the q is hungup
  1214. */
  1215. int
  1216. qisclosed(Queue *q)
  1217. {
  1218. return q->state & Qclosed;
  1219. }
  1220. /*
  1221. * mark a queue as no longer hung up
  1222. */
  1223. void
  1224. qreopen(Queue *q)
  1225. {
  1226. ilock(q);
  1227. q->state &= ~Qclosed;
  1228. q->state |= Qstarve;
  1229. q->eof = 0;
  1230. q->limit = q->inilim;
  1231. iunlock(q);
  1232. }
  1233. /*
  1234. * return bytes queued
  1235. */
  1236. int
  1237. qlen(Queue *q)
  1238. {
  1239. return q->dlen;
  1240. }
  1241. /*
  1242. * return space remaining before flow control
  1243. */
  1244. int
  1245. qwindow(Queue *q)
  1246. {
  1247. int l;
  1248. l = q->limit - q->len;
  1249. if(l < 0)
  1250. l = 0;
  1251. return l;
  1252. }
  1253. /*
  1254. * return true if we can read without blocking
  1255. */
  1256. int
  1257. qcanread(Queue *q)
  1258. {
  1259. return q->bfirst!=0;
  1260. }
  1261. /*
  1262. * change queue limit
  1263. */
  1264. void
  1265. qsetlimit(Queue *q, int limit)
  1266. {
  1267. q->limit = limit;
  1268. }
  1269. /*
  1270. * set blocking/nonblocking
  1271. */
  1272. void
  1273. qnoblock(Queue *q, int onoff)
  1274. {
  1275. q->noblock = onoff;
  1276. }
  1277. /*
  1278. * flush the output queue
  1279. */
  1280. void
  1281. qflush(Queue *q)
  1282. {
  1283. Block *bfirst;
  1284. /* mark it */
  1285. ilock(q);
  1286. bfirst = q->bfirst;
  1287. q->bfirst = 0;
  1288. q->len = 0;
  1289. q->dlen = 0;
  1290. iunlock(q);
  1291. /* free queued blocks */
  1292. freeblist(bfirst);
  1293. /* wake up readers/writers */
  1294. wakeup(&q->wr);
  1295. }
  1296. int
  1297. qfull(Queue *q)
  1298. {
  1299. return q->state & Qflow;
  1300. }
  1301. int
  1302. qstate(Queue *q)
  1303. {
  1304. return q->state;
  1305. }