qio.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524
  1. #include "u.h"
  2. #include "lib.h"
  3. #include "dat.h"
  4. #include "fns.h"
  5. #include "error.h"
  6. static ulong padblockcnt;
  7. static ulong concatblockcnt;
  8. static ulong pullupblockcnt;
  9. static ulong copyblockcnt;
  10. static ulong consumecnt;
  11. static ulong producecnt;
  12. static ulong qcopycnt;
  13. static int debugging;
  14. #define QDEBUG if(0)
  15. /*
  16. * IO queues
  17. */
  18. struct Queue
  19. {
  20. Lock lk;
  21. Block* bfirst; /* buffer */
  22. Block* blast;
  23. int len; /* bytes allocated to queue */
  24. int dlen; /* data bytes in queue */
  25. int limit; /* max bytes in queue */
  26. int inilim; /* initial limit */
  27. int state;
  28. int noblock; /* true if writes return immediately when q full */
  29. int eof; /* number of eofs read by user */
  30. void (*kick)(void*); /* restart output */
  31. void (*bypass)(void*, Block*); /* bypass queue altogether */
  32. void* arg; /* argument to kick */
  33. QLock rlock; /* mutex for reading processes */
  34. Rendez rr; /* process waiting to read */
  35. QLock wlock; /* mutex for writing processes */
  36. Rendez wr; /* process waiting to write */
  37. char err[ERRMAX];
  38. };
  39. enum
  40. {
  41. Maxatomic = 64*1024,
  42. };
  43. uint qiomaxatomic = Maxatomic;
  44. void
  45. ixsummary(void)
  46. {
  47. debugging ^= 1;
  48. iallocsummary();
  49. print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
  50. padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
  51. print("consume %lud, produce %lud, qcopy %lud\n",
  52. consumecnt, producecnt, qcopycnt);
  53. }
  54. /*
  55. * free a list of blocks
  56. */
  57. void
  58. freeblist(Block *b)
  59. {
  60. Block *next;
  61. for(; b != 0; b = next){
  62. next = b->next;
  63. b->next = 0;
  64. freeb(b);
  65. }
  66. }
  67. /*
  68. * pad a block to the front (or the back if size is negative)
  69. */
  70. Block*
  71. padblock(Block *bp, int size)
  72. {
  73. int n;
  74. Block *nbp;
  75. QDEBUG checkb(bp, "padblock 1");
  76. if(size >= 0){
  77. if(bp->rp - bp->base >= size){
  78. bp->rp -= size;
  79. return bp;
  80. }
  81. if(bp->next)
  82. panic("padblock 0x%p", getcallerpc(&bp));
  83. n = BLEN(bp);
  84. padblockcnt++;
  85. nbp = allocb(size+n);
  86. nbp->rp += size;
  87. nbp->wp = nbp->rp;
  88. memmove(nbp->wp, bp->rp, n);
  89. nbp->wp += n;
  90. freeb(bp);
  91. nbp->rp -= size;
  92. } else {
  93. size = -size;
  94. if(bp->next)
  95. panic("padblock 0x%p", getcallerpc(&bp));
  96. if(bp->lim - bp->wp >= size)
  97. return bp;
  98. n = BLEN(bp);
  99. padblockcnt++;
  100. nbp = allocb(size+n);
  101. memmove(nbp->wp, bp->rp, n);
  102. nbp->wp += n;
  103. freeb(bp);
  104. }
  105. QDEBUG checkb(nbp, "padblock 1");
  106. return nbp;
  107. }
  108. /*
  109. * return count of bytes in a string of blocks
  110. */
  111. int
  112. blocklen(Block *bp)
  113. {
  114. int len;
  115. len = 0;
  116. while(bp) {
  117. len += BLEN(bp);
  118. bp = bp->next;
  119. }
  120. return len;
  121. }
  122. /*
  123. * return count of space in blocks
  124. */
  125. int
  126. blockalloclen(Block *bp)
  127. {
  128. int len;
  129. len = 0;
  130. while(bp) {
  131. len += BALLOC(bp);
  132. bp = bp->next;
  133. }
  134. return len;
  135. }
  136. /*
  137. * copy the string of blocks into
  138. * a single block and free the string
  139. */
  140. Block*
  141. concatblock(Block *bp)
  142. {
  143. int len;
  144. Block *nb, *f;
  145. if(bp->next == 0)
  146. return bp;
  147. nb = allocb(blocklen(bp));
  148. for(f = bp; f; f = f->next) {
  149. len = BLEN(f);
  150. memmove(nb->wp, f->rp, len);
  151. nb->wp += len;
  152. }
  153. concatblockcnt += BLEN(nb);
  154. freeblist(bp);
  155. QDEBUG checkb(nb, "concatblock 1");
  156. return nb;
  157. }
  158. /*
  159. * make sure the first block has at least n bytes
  160. */
  161. Block*
  162. pullupblock(Block *bp, int n)
  163. {
  164. int i;
  165. Block *nbp;
  166. /*
  167. * this should almost always be true, it's
  168. * just to avoid every caller checking.
  169. */
  170. if(BLEN(bp) >= n)
  171. return bp;
  172. /*
  173. * if not enough room in the first block,
  174. * add another to the front of the list.
  175. */
  176. if(bp->lim - bp->rp < n){
  177. nbp = allocb(n);
  178. nbp->next = bp;
  179. bp = nbp;
  180. }
  181. /*
  182. * copy bytes from the trailing blocks into the first
  183. */
  184. n -= BLEN(bp);
  185. while((nbp = bp->next)){
  186. i = BLEN(nbp);
  187. if(i > n) {
  188. memmove(bp->wp, nbp->rp, n);
  189. pullupblockcnt++;
  190. bp->wp += n;
  191. nbp->rp += n;
  192. QDEBUG checkb(bp, "pullupblock 1");
  193. return bp;
  194. } else {
  195. /* shouldn't happen but why crash if it does */
  196. if(i < 0){
  197. print("pullup negative length packet\n");
  198. i = 0;
  199. }
  200. memmove(bp->wp, nbp->rp, i);
  201. pullupblockcnt++;
  202. bp->wp += i;
  203. bp->next = nbp->next;
  204. nbp->next = 0;
  205. freeb(nbp);
  206. n -= i;
  207. if(n == 0){
  208. QDEBUG checkb(bp, "pullupblock 2");
  209. return bp;
  210. }
  211. }
  212. }
  213. freeb(bp);
  214. return 0;
  215. }
  216. /*
  217. * make sure the first block has at least n bytes
  218. */
  219. Block*
  220. pullupqueue(Queue *q, int n)
  221. {
  222. Block *b;
  223. if(BLEN(q->bfirst) >= n)
  224. return q->bfirst;
  225. q->bfirst = pullupblock(q->bfirst, n);
  226. for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
  227. ;
  228. q->blast = b;
  229. return q->bfirst;
  230. }
  231. /*
  232. * trim to len bytes starting at offset
  233. */
  234. Block *
  235. trimblock(Block *bp, int offset, int len)
  236. {
  237. ulong l;
  238. Block *nb, *startb;
  239. QDEBUG checkb(bp, "trimblock 1");
  240. if(blocklen(bp) < offset+len) {
  241. freeblist(bp);
  242. return nil;
  243. }
  244. while((l = BLEN(bp)) < offset) {
  245. offset -= l;
  246. nb = bp->next;
  247. bp->next = nil;
  248. freeb(bp);
  249. bp = nb;
  250. }
  251. startb = bp;
  252. bp->rp += offset;
  253. while((l = BLEN(bp)) < len) {
  254. len -= l;
  255. bp = bp->next;
  256. }
  257. bp->wp -= (BLEN(bp) - len);
  258. if(bp->next) {
  259. freeblist(bp->next);
  260. bp->next = nil;
  261. }
  262. return startb;
  263. }
  264. /*
  265. * copy 'count' bytes into a new block
  266. */
  267. Block*
  268. copyblock(Block *bp, int count)
  269. {
  270. int l;
  271. Block *nbp;
  272. QDEBUG checkb(bp, "copyblock 0");
  273. nbp = allocb(count);
  274. for(; count > 0 && bp != 0; bp = bp->next){
  275. l = BLEN(bp);
  276. if(l > count)
  277. l = count;
  278. memmove(nbp->wp, bp->rp, l);
  279. nbp->wp += l;
  280. count -= l;
  281. }
  282. if(count > 0){
  283. memset(nbp->wp, 0, count);
  284. nbp->wp += count;
  285. }
  286. copyblockcnt++;
  287. QDEBUG checkb(nbp, "copyblock 1");
  288. return nbp;
  289. }
  290. Block*
  291. adjustblock(Block* bp, int len)
  292. {
  293. int n;
  294. Block *nbp;
  295. if(len < 0){
  296. freeb(bp);
  297. return nil;
  298. }
  299. if(bp->rp+len > bp->lim){
  300. nbp = copyblock(bp, len);
  301. freeblist(bp);
  302. QDEBUG checkb(nbp, "adjustblock 1");
  303. return nbp;
  304. }
  305. n = BLEN(bp);
  306. if(len > n)
  307. memset(bp->wp, 0, len-n);
  308. bp->wp = bp->rp+len;
  309. QDEBUG checkb(bp, "adjustblock 2");
  310. return bp;
  311. }
  312. /*
  313. * throw away up to count bytes from a
  314. * list of blocks. Return count of bytes
  315. * thrown away.
  316. */
  317. int
  318. pullblock(Block **bph, int count)
  319. {
  320. Block *bp;
  321. int n, bytes;
  322. bytes = 0;
  323. if(bph == nil)
  324. return 0;
  325. while(*bph != nil && count != 0) {
  326. bp = *bph;
  327. n = BLEN(bp);
  328. if(count < n)
  329. n = count;
  330. bytes += n;
  331. count -= n;
  332. bp->rp += n;
  333. QDEBUG checkb(bp, "pullblock ");
  334. if(BLEN(bp) == 0) {
  335. *bph = bp->next;
  336. bp->next = nil;
  337. freeb(bp);
  338. }
  339. }
  340. return bytes;
  341. }
  342. /*
  343. * get next block from a queue, return null if nothing there
  344. */
  345. Block*
  346. qget(Queue *q)
  347. {
  348. int dowakeup;
  349. Block *b;
  350. /* sync with qwrite */
  351. ilock(&q->lk);
  352. b = q->bfirst;
  353. if(b == nil){
  354. q->state |= Qstarve;
  355. iunlock(&q->lk);
  356. return nil;
  357. }
  358. q->bfirst = b->next;
  359. b->next = 0;
  360. q->len -= BALLOC(b);
  361. q->dlen -= BLEN(b);
  362. QDEBUG checkb(b, "qget");
  363. /* if writer flow controlled, restart */
  364. if((q->state & Qflow) && q->len < q->limit/2){
  365. q->state &= ~Qflow;
  366. dowakeup = 1;
  367. } else
  368. dowakeup = 0;
  369. iunlock(&q->lk);
  370. if(dowakeup)
  371. wakeup(&q->wr);
  372. return b;
  373. }
  374. /*
  375. * throw away the next 'len' bytes in the queue
  376. */
  377. int
  378. qdiscard(Queue *q, int len)
  379. {
  380. Block *b;
  381. int dowakeup, n, sofar;
  382. ilock(&q->lk);
  383. for(sofar = 0; sofar < len; sofar += n){
  384. b = q->bfirst;
  385. if(b == nil)
  386. break;
  387. QDEBUG checkb(b, "qdiscard");
  388. n = BLEN(b);
  389. if(n <= len - sofar){
  390. q->bfirst = b->next;
  391. b->next = 0;
  392. q->len -= BALLOC(b);
  393. q->dlen -= BLEN(b);
  394. freeb(b);
  395. } else {
  396. n = len - sofar;
  397. b->rp += n;
  398. q->dlen -= n;
  399. }
  400. }
  401. /*
  402. * if writer flow controlled, restart
  403. *
  404. * This used to be
  405. * q->len < q->limit/2
  406. * but it slows down tcp too much for certain write sizes.
  407. * I really don't understand it completely. It may be
  408. * due to the queue draining so fast that the transmission
  409. * stalls waiting for the app to produce more data. - presotto
  410. */
  411. if((q->state & Qflow) && q->len < q->limit){
  412. q->state &= ~Qflow;
  413. dowakeup = 1;
  414. } else
  415. dowakeup = 0;
  416. iunlock(&q->lk);
  417. if(dowakeup)
  418. wakeup(&q->wr);
  419. return sofar;
  420. }
  421. /*
  422. * Interrupt level copy out of a queue, return # bytes copied.
  423. */
  424. int
  425. qconsume(Queue *q, void *vp, int len)
  426. {
  427. Block *b;
  428. int n, dowakeup;
  429. uchar *p = vp;
  430. Block *tofree = nil;
  431. /* sync with qwrite */
  432. ilock(&q->lk);
  433. for(;;) {
  434. b = q->bfirst;
  435. if(b == 0){
  436. q->state |= Qstarve;
  437. iunlock(&q->lk);
  438. return -1;
  439. }
  440. QDEBUG checkb(b, "qconsume 1");
  441. n = BLEN(b);
  442. if(n > 0)
  443. break;
  444. q->bfirst = b->next;
  445. q->len -= BALLOC(b);
  446. /* remember to free this */
  447. b->next = tofree;
  448. tofree = b;
  449. };
  450. if(n < len)
  451. len = n;
  452. memmove(p, b->rp, len);
  453. consumecnt += n;
  454. b->rp += len;
  455. q->dlen -= len;
  456. /* discard the block if we're done with it */
  457. if((q->state & Qmsg) || len == n){
  458. q->bfirst = b->next;
  459. b->next = 0;
  460. q->len -= BALLOC(b);
  461. q->dlen -= BLEN(b);
  462. /* remember to free this */
  463. b->next = tofree;
  464. tofree = b;
  465. }
  466. /* if writer flow controlled, restart */
  467. if((q->state & Qflow) && q->len < q->limit/2){
  468. q->state &= ~Qflow;
  469. dowakeup = 1;
  470. } else
  471. dowakeup = 0;
  472. iunlock(&q->lk);
  473. if(dowakeup)
  474. wakeup(&q->wr);
  475. if(tofree != nil)
  476. freeblist(tofree);
  477. return len;
  478. }
  479. int
  480. qpass(Queue *q, Block *b)
  481. {
  482. int dlen, len, dowakeup;
  483. /* sync with qread */
  484. dowakeup = 0;
  485. ilock(&q->lk);
  486. if(q->len >= q->limit){
  487. freeblist(b);
  488. iunlock(&q->lk);
  489. return -1;
  490. }
  491. if(q->state & Qclosed){
  492. freeblist(b);
  493. iunlock(&q->lk);
  494. return BALLOC(b);
  495. }
  496. /* add buffer to queue */
  497. if(q->bfirst)
  498. q->blast->next = b;
  499. else
  500. q->bfirst = b;
  501. len = BALLOC(b);
  502. dlen = BLEN(b);
  503. QDEBUG checkb(b, "qpass");
  504. while(b->next){
  505. b = b->next;
  506. QDEBUG checkb(b, "qpass");
  507. len += BALLOC(b);
  508. dlen += BLEN(b);
  509. }
  510. q->blast = b;
  511. q->len += len;
  512. q->dlen += dlen;
  513. if(q->len >= q->limit/2)
  514. q->state |= Qflow;
  515. if(q->state & Qstarve){
  516. q->state &= ~Qstarve;
  517. dowakeup = 1;
  518. }
  519. iunlock(&q->lk);
  520. if(dowakeup)
  521. wakeup(&q->rr);
  522. return len;
  523. }
  524. int
  525. qpassnolim(Queue *q, Block *b)
  526. {
  527. int dlen, len, dowakeup;
  528. /* sync with qread */
  529. dowakeup = 0;
  530. ilock(&q->lk);
  531. if(q->state & Qclosed){
  532. freeblist(b);
  533. iunlock(&q->lk);
  534. return BALLOC(b);
  535. }
  536. /* add buffer to queue */
  537. if(q->bfirst)
  538. q->blast->next = b;
  539. else
  540. q->bfirst = b;
  541. len = BALLOC(b);
  542. dlen = BLEN(b);
  543. QDEBUG checkb(b, "qpass");
  544. while(b->next){
  545. b = b->next;
  546. QDEBUG checkb(b, "qpass");
  547. len += BALLOC(b);
  548. dlen += BLEN(b);
  549. }
  550. q->blast = b;
  551. q->len += len;
  552. q->dlen += dlen;
  553. if(q->len >= q->limit/2)
  554. q->state |= Qflow;
  555. if(q->state & Qstarve){
  556. q->state &= ~Qstarve;
  557. dowakeup = 1;
  558. }
  559. iunlock(&q->lk);
  560. if(dowakeup)
  561. wakeup(&q->rr);
  562. return len;
  563. }
  564. /*
  565. * if the allocated space is way out of line with the used
  566. * space, reallocate to a smaller block
  567. */
  568. Block*
  569. packblock(Block *bp)
  570. {
  571. Block **l, *nbp;
  572. int n;
  573. for(l = &bp; *l; l = &(*l)->next){
  574. nbp = *l;
  575. n = BLEN(nbp);
  576. if((n<<2) < BALLOC(nbp)){
  577. *l = allocb(n);
  578. memmove((*l)->wp, nbp->rp, n);
  579. (*l)->wp += n;
  580. (*l)->next = nbp->next;
  581. freeb(nbp);
  582. }
  583. }
  584. return bp;
  585. }
  586. int
  587. qproduce(Queue *q, void *vp, int len)
  588. {
  589. Block *b;
  590. int dowakeup;
  591. uchar *p = vp;
  592. /* sync with qread */
  593. dowakeup = 0;
  594. ilock(&q->lk);
  595. /* no waiting receivers, room in buffer? */
  596. if(q->len >= q->limit){
  597. q->state |= Qflow;
  598. iunlock(&q->lk);
  599. return -1;
  600. }
  601. /* save in buffer */
  602. b = iallocb(len);
  603. if(b == 0){
  604. iunlock(&q->lk);
  605. return 0;
  606. }
  607. memmove(b->wp, p, len);
  608. producecnt += len;
  609. b->wp += len;
  610. if(q->bfirst)
  611. q->blast->next = b;
  612. else
  613. q->bfirst = b;
  614. q->blast = b;
  615. /* b->next = 0; done by iallocb() */
  616. q->len += BALLOC(b);
  617. q->dlen += BLEN(b);
  618. QDEBUG checkb(b, "qproduce");
  619. if(q->state & Qstarve){
  620. q->state &= ~Qstarve;
  621. dowakeup = 1;
  622. }
  623. if(q->len >= q->limit)
  624. q->state |= Qflow;
  625. iunlock(&q->lk);
  626. if(dowakeup)
  627. wakeup(&q->rr);
  628. return len;
  629. }
  630. /*
  631. * copy from offset in the queue
  632. */
  633. Block*
  634. qcopy(Queue *q, int len, ulong offset)
  635. {
  636. int sofar;
  637. int n;
  638. Block *b, *nb;
  639. uchar *p;
  640. nb = allocb(len);
  641. ilock(&q->lk);
  642. /* go to offset */
  643. b = q->bfirst;
  644. for(sofar = 0; ; sofar += n){
  645. if(b == nil){
  646. iunlock(&q->lk);
  647. return nb;
  648. }
  649. n = BLEN(b);
  650. if(sofar + n > offset){
  651. p = b->rp + offset - sofar;
  652. n -= offset - sofar;
  653. break;
  654. }
  655. QDEBUG checkb(b, "qcopy");
  656. b = b->next;
  657. }
  658. /* copy bytes from there */
  659. for(sofar = 0; sofar < len;){
  660. if(n > len - sofar)
  661. n = len - sofar;
  662. memmove(nb->wp, p, n);
  663. qcopycnt += n;
  664. sofar += n;
  665. nb->wp += n;
  666. b = b->next;
  667. if(b == nil)
  668. break;
  669. n = BLEN(b);
  670. p = b->rp;
  671. }
  672. iunlock(&q->lk);
  673. return nb;
  674. }
  675. /*
  676. * called by non-interrupt code
  677. */
  678. Queue*
  679. qopen(int limit, int msg, void (*kick)(void*), void *arg)
  680. {
  681. Queue *q;
  682. q = malloc(sizeof(Queue));
  683. if(q == 0)
  684. return 0;
  685. q->limit = q->inilim = limit;
  686. q->kick = kick;
  687. q->arg = arg;
  688. q->state = msg;
  689. q->state |= Qstarve;
  690. q->eof = 0;
  691. q->noblock = 0;
  692. return q;
  693. }
  694. /* open a queue to be bypassed */
  695. Queue*
  696. qbypass(void (*bypass)(void*, Block*), void *arg)
  697. {
  698. Queue *q;
  699. q = malloc(sizeof(Queue));
  700. if(q == 0)
  701. return 0;
  702. q->limit = 0;
  703. q->arg = arg;
  704. q->bypass = bypass;
  705. q->state = 0;
  706. return q;
  707. }
  708. static int
  709. notempty(void *a)
  710. {
  711. Queue *q = a;
  712. return (q->state & Qclosed) || q->bfirst != 0;
  713. }
  714. /*
  715. * wait for the queue to be non-empty or closed.
  716. * called with q ilocked.
  717. */
  718. static int
  719. qwait(Queue *q)
  720. {
  721. /* wait for data */
  722. for(;;){
  723. if(q->bfirst != nil)
  724. break;
  725. if(q->state & Qclosed){
  726. if(++q->eof > 3)
  727. return -1;
  728. if(*q->err && strcmp(q->err, Ehungup) != 0)
  729. return -1;
  730. return 0;
  731. }
  732. q->state |= Qstarve; /* flag requesting producer to wake me */
  733. iunlock(&q->lk);
  734. sleep(&q->rr, notempty, q);
  735. ilock(&q->lk);
  736. }
  737. return 1;
  738. }
  739. /*
  740. * add a block list to a queue
  741. */
  742. void
  743. qaddlist(Queue *q, Block *b)
  744. {
  745. /* queue the block */
  746. if(q->bfirst)
  747. q->blast->next = b;
  748. else
  749. q->bfirst = b;
  750. q->len += blockalloclen(b);
  751. q->dlen += blocklen(b);
  752. while(b->next)
  753. b = b->next;
  754. q->blast = b;
  755. }
  756. /*
  757. * called with q ilocked
  758. */
  759. Block*
  760. qremove(Queue *q)
  761. {
  762. Block *b;
  763. b = q->bfirst;
  764. if(b == nil)
  765. return nil;
  766. q->bfirst = b->next;
  767. b->next = nil;
  768. q->dlen -= BLEN(b);
  769. q->len -= BALLOC(b);
  770. QDEBUG checkb(b, "qremove");
  771. return b;
  772. }
  773. /*
  774. * copy the contents of a string of blocks into
  775. * memory. emptied blocks are freed. return
  776. * pointer to first unconsumed block.
  777. */
  778. Block*
  779. bl2mem(uchar *p, Block *b, int n)
  780. {
  781. int i;
  782. Block *next;
  783. for(; b != nil; b = next){
  784. i = BLEN(b);
  785. if(i > n){
  786. memmove(p, b->rp, n);
  787. b->rp += n;
  788. return b;
  789. }
  790. memmove(p, b->rp, i);
  791. n -= i;
  792. p += i;
  793. b->rp += i;
  794. next = b->next;
  795. freeb(b);
  796. }
  797. return nil;
  798. }
  799. /*
  800. * copy the contents of memory into a string of blocks.
  801. * return nil on error.
  802. */
  803. Block*
  804. mem2bl(uchar *p, int len)
  805. {
  806. int n;
  807. Block *b, *first, **l;
  808. first = nil;
  809. l = &first;
  810. if(waserror()){
  811. freeblist(first);
  812. nexterror();
  813. }
  814. do {
  815. n = len;
  816. if(n > Maxatomic)
  817. n = Maxatomic;
  818. *l = b = allocb(n);
  819. /* setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */
  820. memmove(b->wp, p, n);
  821. b->wp += n;
  822. p += n;
  823. len -= n;
  824. l = &b->next;
  825. } while(len > 0);
  826. poperror();
  827. return first;
  828. }
  829. /*
  830. * put a block back to the front of the queue
  831. * called with q ilocked
  832. */
  833. void
  834. qputback(Queue *q, Block *b)
  835. {
  836. b->next = q->bfirst;
  837. if(q->bfirst == nil)
  838. q->blast = b;
  839. q->bfirst = b;
  840. q->len += BALLOC(b);
  841. q->dlen += BLEN(b);
  842. }
  843. /*
  844. * flow control, get producer going again
  845. * called with q ilocked
  846. */
  847. static void
  848. qwakeup_iunlock(Queue *q)
  849. {
  850. int dowakeup = 0;
  851. /* if writer flow controlled, restart */
  852. if((q->state & Qflow) && q->len < q->limit/2){
  853. q->state &= ~Qflow;
  854. dowakeup = 1;
  855. }
  856. iunlock(&q->lk);
  857. /* wakeup flow controlled writers */
  858. if(dowakeup){
  859. if(q->kick)
  860. q->kick(q->arg);
  861. wakeup(&q->wr);
  862. }
  863. }
  864. /*
  865. * get next block from a queue (up to a limit)
  866. */
  867. Block*
  868. qbread(Queue *q, int len)
  869. {
  870. Block *b, *nb;
  871. int n;
  872. qlock(&q->rlock);
  873. if(waserror()){
  874. qunlock(&q->rlock);
  875. nexterror();
  876. }
  877. ilock(&q->lk);
  878. switch(qwait(q)){
  879. case 0:
  880. /* queue closed */
  881. iunlock(&q->lk);
  882. qunlock(&q->rlock);
  883. poperror();
  884. return nil;
  885. case -1:
  886. /* multiple reads on a closed queue */
  887. iunlock(&q->lk);
  888. error(q->err);
  889. }
  890. /* if we get here, there's at least one block in the queue */
  891. b = qremove(q);
  892. n = BLEN(b);
  893. /* split block if it's too big and this is not a message queue */
  894. nb = b;
  895. if(n > len){
  896. if((q->state&Qmsg) == 0){
  897. n -= len;
  898. b = allocb(n);
  899. memmove(b->wp, nb->rp+len, n);
  900. b->wp += n;
  901. qputback(q, b);
  902. }
  903. nb->wp = nb->rp + len;
  904. }
  905. /* restart producer */
  906. qwakeup_iunlock(q);
  907. poperror();
  908. qunlock(&q->rlock);
  909. return nb;
  910. }
  911. /*
  912. * read a queue. if no data is queued, post a Block
  913. * and wait on its Rendez.
  914. */
  915. long
  916. qread(Queue *q, void *vp, int len)
  917. {
  918. Block *b, *first, **l;
  919. int m, n;
  920. qlock(&q->rlock);
  921. if(waserror()){
  922. qunlock(&q->rlock);
  923. nexterror();
  924. }
  925. ilock(&q->lk);
  926. again:
  927. switch(qwait(q)){
  928. case 0:
  929. /* queue closed */
  930. iunlock(&q->lk);
  931. qunlock(&q->rlock);
  932. poperror();
  933. return 0;
  934. case -1:
  935. /* multiple reads on a closed queue */
  936. iunlock(&q->lk);
  937. error(q->err);
  938. }
  939. /* if we get here, there's at least one block in the queue */
  940. if(q->state & Qcoalesce){
  941. /* when coalescing, 0 length blocks just go away */
  942. b = q->bfirst;
  943. if(BLEN(b) <= 0){
  944. freeb(qremove(q));
  945. goto again;
  946. }
  947. /* grab the first block plus as many
  948. * following blocks as will completely
  949. * fit in the read.
  950. */
  951. n = 0;
  952. l = &first;
  953. m = BLEN(b);
  954. for(;;) {
  955. *l = qremove(q);
  956. l = &b->next;
  957. n += m;
  958. b = q->bfirst;
  959. if(b == nil)
  960. break;
  961. m = BLEN(b);
  962. if(n+m > len)
  963. break;
  964. }
  965. } else {
  966. first = qremove(q);
  967. n = BLEN(first);
  968. }
  969. /* copy to user space outside of the ilock */
  970. iunlock(&q->lk);
  971. b = bl2mem(vp, first, len);
  972. ilock(&q->lk);
  973. /* take care of any left over partial block */
  974. if(b != nil){
  975. n -= BLEN(b);
  976. if(q->state & Qmsg)
  977. freeb(b);
  978. else
  979. qputback(q, b);
  980. }
  981. /* restart producer */
  982. qwakeup_iunlock(q);
  983. poperror();
  984. qunlock(&q->rlock);
  985. return n;
  986. }
  987. static int
  988. qnotfull(void *a)
  989. {
  990. Queue *q = a;
  991. return q->len < q->limit || (q->state & Qclosed);
  992. }
  993. ulong noblockcnt;
  994. /*
  995. * add a block to a queue obeying flow control
  996. */
  997. long
  998. qbwrite(Queue *q, Block *b)
  999. {
  1000. int n, dowakeup;
  1001. n = BLEN(b);
  1002. if(q->bypass){
  1003. (*q->bypass)(q->arg, b);
  1004. return n;
  1005. }
  1006. dowakeup = 0;
  1007. qlock(&q->wlock);
  1008. if(waserror()){
  1009. if(b != nil)
  1010. freeb(b);
  1011. qunlock(&q->wlock);
  1012. nexterror();
  1013. }
  1014. ilock(&q->lk);
  1015. /* give up if the queue is closed */
  1016. if(q->state & Qclosed){
  1017. iunlock(&q->lk);
  1018. error(q->err);
  1019. }
  1020. /* if nonblocking, don't queue over the limit */
  1021. if(q->len >= q->limit){
  1022. if(q->noblock){
  1023. iunlock(&q->lk);
  1024. freeb(b);
  1025. noblockcnt += n;
  1026. qunlock(&q->wlock);
  1027. poperror();
  1028. return n;
  1029. }
  1030. }
  1031. /* queue the block */
  1032. if(q->bfirst)
  1033. q->blast->next = b;
  1034. else
  1035. q->bfirst = b;
  1036. q->blast = b;
  1037. b->next = 0;
  1038. q->len += BALLOC(b);
  1039. q->dlen += n;
  1040. QDEBUG checkb(b, "qbwrite");
  1041. b = nil;
  1042. /* make sure other end gets awakened */
  1043. if(q->state & Qstarve){
  1044. q->state &= ~Qstarve;
  1045. dowakeup = 1;
  1046. }
  1047. iunlock(&q->lk);
  1048. /* get output going again */
  1049. if(q->kick && (dowakeup || (q->state&Qkick)))
  1050. q->kick(q->arg);
  1051. /* wakeup anyone consuming at the other end */
  1052. if(dowakeup){
  1053. wakeup(&q->rr);
  1054. /* if we just wokeup a higher priority process, let it run */
  1055. /*
  1056. p = wakeup(&q->rr);
  1057. if(p != nil && p->priority > up->priority)
  1058. sched();
  1059. */
  1060. }
  1061. /*
  1062. * flow control, wait for queue to get below the limit
  1063. * before allowing the process to continue and queue
  1064. * more. We do this here so that postnote can only
  1065. * interrupt us after the data has been queued. This
  1066. * means that things like 9p flushes and ssl messages
  1067. * will not be disrupted by software interrupts.
  1068. *
  1069. * Note - this is moderately dangerous since a process
  1070. * that keeps getting interrupted and rewriting will
  1071. * queue infinite crud.
  1072. */
  1073. for(;;){
  1074. if(q->noblock || qnotfull(q))
  1075. break;
  1076. ilock(&q->lk);
  1077. q->state |= Qflow;
  1078. iunlock(&q->lk);
  1079. sleep(&q->wr, qnotfull, q);
  1080. }
  1081. USED(b);
  1082. qunlock(&q->wlock);
  1083. poperror();
  1084. return n;
  1085. }
  1086. /*
  1087. * write to a queue. only Maxatomic bytes at a time is atomic.
  1088. */
  1089. int
  1090. qwrite(Queue *q, void *vp, int len)
  1091. {
  1092. int n, sofar;
  1093. Block *b;
  1094. uchar *p = vp;
  1095. QDEBUG if(!islo())
  1096. print("qwrite hi %p\n", getcallerpc(&q));
  1097. sofar = 0;
  1098. do {
  1099. n = len-sofar;
  1100. if(n > Maxatomic)
  1101. n = Maxatomic;
  1102. b = allocb(n);
  1103. /* setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */
  1104. if(waserror()){
  1105. freeb(b);
  1106. nexterror();
  1107. }
  1108. memmove(b->wp, p+sofar, n);
  1109. poperror();
  1110. b->wp += n;
  1111. qbwrite(q, b);
  1112. sofar += n;
  1113. } while(sofar < len && (q->state & Qmsg) == 0);
  1114. return len;
  1115. }
  1116. /*
  1117. * used by print() to write to a queue. Since we may be splhi or not in
  1118. * a process, don't qlock.
  1119. */
  1120. int
  1121. qiwrite(Queue *q, void *vp, int len)
  1122. {
  1123. int n, sofar, dowakeup;
  1124. Block *b;
  1125. uchar *p = vp;
  1126. dowakeup = 0;
  1127. sofar = 0;
  1128. do {
  1129. n = len-sofar;
  1130. if(n > Maxatomic)
  1131. n = Maxatomic;
  1132. b = iallocb(n);
  1133. if(b == nil)
  1134. break;
  1135. memmove(b->wp, p+sofar, n);
  1136. b->wp += n;
  1137. ilock(&q->lk);
  1138. QDEBUG checkb(b, "qiwrite");
  1139. if(q->bfirst)
  1140. q->blast->next = b;
  1141. else
  1142. q->bfirst = b;
  1143. q->blast = b;
  1144. q->len += BALLOC(b);
  1145. q->dlen += n;
  1146. if(q->state & Qstarve){
  1147. q->state &= ~Qstarve;
  1148. dowakeup = 1;
  1149. }
  1150. iunlock(&q->lk);
  1151. if(dowakeup){
  1152. if(q->kick)
  1153. q->kick(q->arg);
  1154. wakeup(&q->rr);
  1155. }
  1156. sofar += n;
  1157. } while(sofar < len && (q->state & Qmsg) == 0);
  1158. return sofar;
  1159. }
  1160. /*
  1161. * be extremely careful when calling this,
  1162. * as there is no reference accounting
  1163. */
  1164. void
  1165. qfree(Queue *q)
  1166. {
  1167. qclose(q);
  1168. free(q);
  1169. }
  1170. /*
  1171. * Mark a queue as closed. No further IO is permitted.
  1172. * All blocks are released.
  1173. */
  1174. void
  1175. qclose(Queue *q)
  1176. {
  1177. Block *bfirst;
  1178. if(q == nil)
  1179. return;
  1180. /* mark it */
  1181. ilock(&q->lk);
  1182. q->state |= Qclosed;
  1183. q->state &= ~(Qflow|Qstarve);
  1184. strcpy(q->err, Ehungup);
  1185. bfirst = q->bfirst;
  1186. q->bfirst = 0;
  1187. q->len = 0;
  1188. q->dlen = 0;
  1189. q->noblock = 0;
  1190. iunlock(&q->lk);
  1191. /* free queued blocks */
  1192. freeblist(bfirst);
  1193. /* wake up readers/writers */
  1194. wakeup(&q->rr);
  1195. wakeup(&q->wr);
  1196. }
  1197. /*
  1198. * Mark a queue as closed. Wakeup any readers. Don't remove queued
  1199. * blocks.
  1200. */
  1201. void
  1202. qhangup(Queue *q, char *msg)
  1203. {
  1204. /* mark it */
  1205. ilock(&q->lk);
  1206. q->state |= Qclosed;
  1207. if(msg == 0 || *msg == 0)
  1208. strcpy(q->err, Ehungup);
  1209. else
  1210. strncpy(q->err, msg, ERRMAX-1);
  1211. iunlock(&q->lk);
  1212. /* wake up readers/writers */
  1213. wakeup(&q->rr);
  1214. wakeup(&q->wr);
  1215. }
  1216. /*
  1217. * return non-zero if the q is hungup
  1218. */
  1219. int
  1220. qisclosed(Queue *q)
  1221. {
  1222. return q->state & Qclosed;
  1223. }
  1224. /*
  1225. * mark a queue as no longer hung up
  1226. */
  1227. void
  1228. qreopen(Queue *q)
  1229. {
  1230. ilock(&q->lk);
  1231. q->state &= ~Qclosed;
  1232. q->state |= Qstarve;
  1233. q->eof = 0;
  1234. q->limit = q->inilim;
  1235. iunlock(&q->lk);
  1236. }
  1237. /*
  1238. * return bytes queued
  1239. */
  1240. int
  1241. qlen(Queue *q)
  1242. {
  1243. return q->dlen;
  1244. }
  1245. /*
  1246. * return space remaining before flow control
  1247. */
  1248. int
  1249. qwindow(Queue *q)
  1250. {
  1251. int l;
  1252. l = q->limit - q->len;
  1253. if(l < 0)
  1254. l = 0;
  1255. return l;
  1256. }
  1257. /*
  1258. * return true if we can read without blocking
  1259. */
  1260. int
  1261. qcanread(Queue *q)
  1262. {
  1263. return q->bfirst!=0;
  1264. }
  1265. /*
  1266. * change queue limit
  1267. */
  1268. void
  1269. qsetlimit(Queue *q, int limit)
  1270. {
  1271. q->limit = limit;
  1272. }
  1273. /*
  1274. * set blocking/nonblocking
  1275. */
  1276. void
  1277. qnoblock(Queue *q, int onoff)
  1278. {
  1279. q->noblock = onoff;
  1280. }
  1281. /*
  1282. * flush the output queue
  1283. */
  1284. void
  1285. qflush(Queue *q)
  1286. {
  1287. Block *bfirst;
  1288. /* mark it */
  1289. ilock(&q->lk);
  1290. bfirst = q->bfirst;
  1291. q->bfirst = 0;
  1292. q->len = 0;
  1293. q->dlen = 0;
  1294. iunlock(&q->lk);
  1295. /* free queued blocks */
  1296. freeblist(bfirst);
  1297. /* wake up readers/writers */
  1298. wakeup(&q->wr);
  1299. }
  1300. int
  1301. qfull(Queue *q)
  1302. {
  1303. return q->state & Qflow;
  1304. }
  1305. int
  1306. qstate(Queue *q)
  1307. {
  1308. return q->state;
  1309. }