qio.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554
  1. #include "dat.h"
  2. #include "fns.h"
  3. #include "error.h"
  4. #define QDEBUG if(0)
  5. /*
  6. * IO queues
  7. */
  8. struct Queue
  9. {
  10. Lock l;
  11. Block* bfirst; /* buffer */
  12. Block* blast;
  13. int len; /* bytes allocated to queue */
  14. int dlen; /* data bytes in queue */
  15. int limit; /* max bytes in queue */
  16. int inilim; /* initial limit */
  17. int state;
  18. int noblock; /* true if writes return immediately when q full */
  19. int eof; /* number of eofs read by user */
  20. void (*kick)(void*); /* restart output */
  21. void (*bypass)(void*, Block*); /* bypass queue altogether */
  22. void* arg; /* argument to kick */
  23. QLock rlock; /* mutex for reading processes */
  24. Rendez rr; /* process waiting to read */
  25. QLock wlock; /* mutex for writing processes */
  26. Rendez wr; /* process waiting to write */
  27. char err[ERRMAX];
  28. int want;
  29. };
  30. enum
  31. {
  32. Maxatomic = 32*1024
  33. };
  34. uint qiomaxatomic = Maxatomic;
  35. void
  36. checkb(Block *b, char *msg)
  37. {
  38. if(b->base > b->lim)
  39. panic("checkb 0 %s %lux %lux", msg, b->base, b->lim);
  40. if(b->rp < b->base)
  41. panic("checkb 1 %s %lux %lux", msg, b->base, b->rp);
  42. if(b->wp < b->base)
  43. panic("checkb 2 %s %lux %lux", msg, b->base, b->wp);
  44. if(b->rp > b->lim)
  45. panic("checkb 3 %s %lux %lux", msg, b->rp, b->lim);
  46. if(b->wp > b->lim)
  47. panic("checkb 4 %s %lux %lux", msg, b->wp, b->lim);
  48. }
  49. void
  50. freeb(Block *b)
  51. {
  52. if(b == nil)
  53. return;
  54. /*
  55. * drivers which perform non cache coherent DMA manage their own buffer
  56. * pool of uncached buffers and provide their own free routine.
  57. */
  58. if(b->free) {
  59. b->free(b);
  60. return;
  61. }
  62. /* poison the block in case someone is still holding onto it */
  63. b->next = (void*)0xdeadbabe;
  64. b->rp = (void*)0xdeadbabe;
  65. b->wp = (void*)0xdeadbabe;
  66. b->lim = (void*)0xdeadbabe;
  67. b->base = (void*)0xdeadbabe;
  68. free(b);
  69. }
  70. /*
  71. * free a list of blocks
  72. */
  73. void
  74. freeblist(Block *b)
  75. {
  76. Block *next;
  77. for(; b != 0; b = next){
  78. next = b->next;
  79. b->next = 0;
  80. freeb(b);
  81. }
  82. }
  83. ulong padblockoverhead;
  84. /*
  85. * pad a block to the front (or the back if size is negative)
  86. */
  87. Block*
  88. padblock(Block *bp, int size)
  89. {
  90. int n;
  91. Block *nbp;
  92. if(size >= 0){
  93. if(bp->rp - bp->base >= size){
  94. bp->rp -= size;
  95. return bp;
  96. }
  97. if(bp->next)
  98. panic("padblock 0x%luX", getcallerpc(&bp));
  99. n = BLEN(bp);
  100. padblockoverhead += n;
  101. nbp = allocb(size+n);
  102. nbp->rp += size;
  103. nbp->wp = nbp->rp;
  104. memmove(nbp->wp, bp->rp, n);
  105. nbp->wp += n;
  106. freeb(bp);
  107. nbp->rp -= size;
  108. } else {
  109. size = -size;
  110. if(bp->next)
  111. panic("padblock 0x%luX", getcallerpc(&bp));
  112. if(bp->lim - bp->wp >= size)
  113. return bp;
  114. n = BLEN(bp);
  115. padblockoverhead += n;
  116. nbp = allocb(size+n);
  117. memmove(nbp->wp, bp->rp, n);
  118. nbp->wp += n;
  119. freeb(bp);
  120. }
  121. return nbp;
  122. }
  123. /*
  124. * return count of bytes in a string of blocks
  125. */
  126. int
  127. blocklen(Block *bp)
  128. {
  129. int len;
  130. len = 0;
  131. while(bp) {
  132. len += BLEN(bp);
  133. bp = bp->next;
  134. }
  135. return len;
  136. }
  137. /*
  138. * return count of space in blocks
  139. */
  140. int
  141. blockalloclen(Block *bp)
  142. {
  143. int len;
  144. len = 0;
  145. while(bp) {
  146. len += BALLOC(bp);
  147. bp = bp->next;
  148. }
  149. return len;
  150. }
  151. /*
  152. * copy the string of blocks into
  153. * a single block and free the string
  154. */
  155. Block*
  156. concatblock(Block *bp)
  157. {
  158. int len;
  159. Block *nb, *f;
  160. if(bp->next == 0)
  161. return bp;
  162. nb = allocb(blocklen(bp));
  163. for(f = bp; f; f = f->next) {
  164. len = BLEN(f);
  165. memmove(nb->wp, f->rp, len);
  166. nb->wp += len;
  167. }
  168. freeblist(bp);
  169. return nb;
  170. }
  171. /*
  172. * make sure the first block has at least n bytes. If we started with
  173. * less than n bytes, make sure we have exactly n bytes. devssl.c depends
  174. * on this.
  175. */
  176. Block*
  177. pullupblock(Block *bp, int n)
  178. {
  179. int i;
  180. Block *nbp;
  181. /*
  182. * this should almost always be true, the rest it
  183. * just to avoid every caller checking.
  184. */
  185. if(BLEN(bp) >= n)
  186. return bp;
  187. /*
  188. * if not enough room in the first block,
  189. * add another to the front of the list.
  190. */
  191. if(bp->lim - bp->rp < n){
  192. nbp = allocb(n);
  193. nbp->next = bp;
  194. bp = nbp;
  195. }
  196. /*
  197. * copy bytes from the trailing blocks into the first
  198. */
  199. n -= BLEN(bp);
  200. while(nbp = bp->next){
  201. i = BLEN(nbp);
  202. if(i > n) {
  203. memmove(bp->wp, nbp->rp, n);
  204. bp->wp += n;
  205. nbp->rp += n;
  206. return bp;
  207. }
  208. else {
  209. memmove(bp->wp, nbp->rp, i);
  210. bp->wp += i;
  211. bp->next = nbp->next;
  212. nbp->next = 0;
  213. freeb(nbp);
  214. n -= i;
  215. if(n == 0)
  216. return bp;
  217. }
  218. }
  219. freeblist(bp);
  220. return 0;
  221. }
  222. /*
  223. * make sure the first block has at least n bytes
  224. */
  225. Block*
  226. pullupqueue(Queue *q, int n)
  227. {
  228. Block *b;
  229. if(BLEN(q->bfirst) >= n)
  230. return q->bfirst;
  231. q->bfirst = pullupblock(q->bfirst, n);
  232. for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
  233. ;
  234. q->blast = b;
  235. return q->bfirst;
  236. }
  237. /*
  238. * trim to len bytes starting at offset
  239. */
  240. Block *
  241. trimblock(Block *bp, int offset, int len)
  242. {
  243. ulong l;
  244. Block *nb, *startb;
  245. if(blocklen(bp) < offset+len) {
  246. freeblist(bp);
  247. return nil;
  248. }
  249. while((l = BLEN(bp)) < offset) {
  250. offset -= l;
  251. nb = bp->next;
  252. bp->next = nil;
  253. freeb(bp);
  254. bp = nb;
  255. }
  256. startb = bp;
  257. bp->rp += offset;
  258. while((l = BLEN(bp)) < len) {
  259. len -= l;
  260. bp = bp->next;
  261. }
  262. bp->wp -= (BLEN(bp) - len);
  263. if(bp->next) {
  264. freeblist(bp->next);
  265. bp->next = nil;
  266. }
  267. return startb;
  268. }
  269. /*
  270. * copy 'count' bytes into a new block
  271. */
  272. Block*
  273. copyblock(Block *bp, int count)
  274. {
  275. int l;
  276. Block *nbp;
  277. nbp = allocb(count);
  278. for(; count > 0 && bp != 0; bp = bp->next){
  279. l = BLEN(bp);
  280. if(l > count)
  281. l = count;
  282. memmove(nbp->wp, bp->rp, l);
  283. nbp->wp += l;
  284. count -= l;
  285. }
  286. if(count > 0){
  287. memset(nbp->wp, 0, count);
  288. nbp->wp += count;
  289. }
  290. return nbp;
  291. }
  292. Block*
  293. adjustblock(Block* bp, int len)
  294. {
  295. int n;
  296. Block *nbp;
  297. if(len < 0){
  298. freeb(bp);
  299. return nil;
  300. }
  301. if(bp->rp+len > bp->lim){
  302. nbp = copyblock(bp, len);
  303. freeblist(bp);
  304. QDEBUG checkb(nbp, "adjustblock 1");
  305. return nbp;
  306. }
  307. n = BLEN(bp);
  308. if(len > n)
  309. memset(bp->wp, 0, len-n);
  310. bp->wp = bp->rp+len;
  311. QDEBUG checkb(bp, "adjustblock 2");
  312. return bp;
  313. }
  314. /*
  315. * throw away up to count bytes from a
  316. * list of blocks. Return count of bytes
  317. * thrown away.
  318. */
  319. int
  320. pullblock(Block **bph, int count)
  321. {
  322. Block *bp;
  323. int n, bytes;
  324. bytes = 0;
  325. if(bph == nil)
  326. return 0;
  327. while(*bph != nil && count != 0) {
  328. bp = *bph;
  329. n = BLEN(bp);
  330. if(count < n)
  331. n = count;
  332. bytes += n;
  333. count -= n;
  334. bp->rp += n;
  335. if(BLEN(bp) == 0) {
  336. *bph = bp->next;
  337. bp->next = nil;
  338. freeb(bp);
  339. }
  340. }
  341. return bytes;
  342. }
  343. /*
  344. * allocate queues and blocks (round data base address to 64 bit boundary)
  345. */
  346. Block*
  347. iallocb(int size)
  348. {
  349. Block *b;
  350. ulong addr;
  351. b = kmalloc(sizeof(Block)+size);
  352. if(b == 0)
  353. return 0;
  354. memset(b, 0, sizeof(Block));
  355. addr = (ulong)b + sizeof(Block);
  356. b->base = (uchar*)addr;
  357. b->lim = b->base + size;
  358. b->rp = b->base;
  359. b->wp = b->rp;
  360. return b;
  361. }
  362. /*
  363. * call error if iallocb fails
  364. */
  365. Block*
  366. allocb(int size)
  367. {
  368. Block *b;
  369. b = iallocb(size);
  370. if(b == 0)
  371. exhausted("allocb");
  372. return b;
  373. }
  374. /*
  375. * get next block from a queue, return null if nothing there
  376. */
  377. Block*
  378. qget(Queue *q)
  379. {
  380. int dowakeup;
  381. Block *b;
  382. /* sync with qwrite */
  383. lock(&q->l);
  384. b = q->bfirst;
  385. if(b == 0){
  386. q->state |= Qstarve;
  387. unlock(&q->l);
  388. return 0;
  389. }
  390. q->bfirst = b->next;
  391. b->next = 0;
  392. q->len -= BALLOC(b);
  393. q->dlen -= BLEN(b);
  394. QDEBUG checkb(b, "qget");
  395. /* if writer flow controlled, restart */
  396. if((q->state & Qflow) && q->len < q->limit/2){
  397. q->state &= ~Qflow;
  398. dowakeup = 1;
  399. } else
  400. dowakeup = 0;
  401. unlock(&q->l);
  402. if(dowakeup)
  403. Wakeup(&q->wr);
  404. return b;
  405. }
  406. /*
  407. * throw away the next 'len' bytes in the queue
  408. */
  409. int
  410. qdiscard(Queue *q, int len)
  411. {
  412. Block *b;
  413. int dowakeup, n, sofar;
  414. lock(&q->l);
  415. for(sofar = 0; sofar < len; sofar += n){
  416. b = q->bfirst;
  417. if(b == nil)
  418. break;
  419. QDEBUG checkb(b, "qdiscard");
  420. n = BLEN(b);
  421. if(n <= len - sofar){
  422. q->bfirst = b->next;
  423. b->next = 0;
  424. q->len -= BALLOC(b);
  425. q->dlen -= BLEN(b);
  426. freeb(b);
  427. } else {
  428. n = len - sofar;
  429. b->rp += n;
  430. q->dlen -= n;
  431. }
  432. }
  433. /* if writer flow controlled, restart */
  434. if((q->state & Qflow) && q->len < q->limit){
  435. q->state &= ~Qflow;
  436. dowakeup = 1;
  437. } else
  438. dowakeup = 0;
  439. unlock(&q->l);
  440. if(dowakeup)
  441. Wakeup(&q->wr);
  442. return sofar;
  443. }
  444. /*
  445. * Interrupt level copy out of a queue, return # bytes copied.
  446. */
  447. int
  448. qconsume(Queue *q, void *vp, int len)
  449. {
  450. Block *b;
  451. int n, dowakeup;
  452. uchar *p = vp;
  453. Block *tofree = nil;
  454. /* sync with qwrite */
  455. lock(&q->l);
  456. for(;;) {
  457. b = q->bfirst;
  458. if(b == 0){
  459. q->state |= Qstarve;
  460. unlock(&q->l);
  461. return -1;
  462. }
  463. QDEBUG checkb(b, "qconsume 1");
  464. n = BLEN(b);
  465. if(n > 0)
  466. break;
  467. q->bfirst = b->next;
  468. q->len -= BALLOC(b);
  469. /* remember to free this */
  470. b->next = tofree;
  471. tofree = b;
  472. }
  473. if(n < len)
  474. len = n;
  475. memmove(p, b->rp, len);
  476. if((q->state & Qmsg) || len == n)
  477. q->bfirst = b->next;
  478. b->rp += len;
  479. q->dlen -= len;
  480. /* discard the block if we're done with it */
  481. if((q->state & Qmsg) || len == n){
  482. b->next = 0;
  483. q->len -= BALLOC(b);
  484. q->dlen -= BLEN(b);
  485. /* remember to free this */
  486. b->next = tofree;
  487. tofree = b;
  488. }
  489. /* if writer flow controlled, restart */
  490. if((q->state & Qflow) && q->len < q->limit/2){
  491. q->state &= ~Qflow;
  492. dowakeup = 1;
  493. } else
  494. dowakeup = 0;
  495. unlock(&q->l);
  496. if(dowakeup)
  497. Wakeup(&q->wr);
  498. if(tofree != nil)
  499. freeblist(tofree);
  500. return len;
  501. }
  502. int
  503. qpass(Queue *q, Block *b)
  504. {
  505. int dlen, len, dowakeup;
  506. /* sync with qread */
  507. dowakeup = 0;
  508. lock(&q->l);
  509. if(q->len >= q->limit){
  510. unlock(&q->l);
  511. freeblist(b);
  512. return -1;
  513. }
  514. if(q->state & Qclosed){
  515. unlock(&q->l);
  516. len = blocklen(b);
  517. freeblist(b);
  518. return len;
  519. }
  520. /* add buffer to queue */
  521. if(q->bfirst)
  522. q->blast->next = b;
  523. else
  524. q->bfirst = b;
  525. len = BALLOC(b);
  526. dlen = BLEN(b);
  527. QDEBUG checkb(b, "qpass");
  528. while(b->next){
  529. b = b->next;
  530. QDEBUG checkb(b, "qpass");
  531. len += BALLOC(b);
  532. dlen += BLEN(b);
  533. }
  534. q->blast = b;
  535. q->len += len;
  536. q->dlen += dlen;
  537. if(q->len >= q->limit/2)
  538. q->state |= Qflow;
  539. if(q->state & Qstarve){
  540. q->state &= ~Qstarve;
  541. dowakeup = 1;
  542. }
  543. unlock(&q->l);
  544. if(dowakeup)
  545. Wakeup(&q->rr);
  546. return len;
  547. }
  548. int
  549. qpassnolim(Queue *q, Block *b)
  550. {
  551. int dlen, len, dowakeup;
  552. /* sync with qread */
  553. dowakeup = 0;
  554. lock(&q->l);
  555. len = BALLOC(b);
  556. if(q->state & Qclosed){
  557. unlock(&q->l);
  558. freeblist(b);
  559. return len;
  560. }
  561. /* add buffer to queue */
  562. if(q->bfirst)
  563. q->blast->next = b;
  564. else
  565. q->bfirst = b;
  566. dlen = BLEN(b);
  567. QDEBUG checkb(b, "qpass");
  568. while(b->next){
  569. b = b->next;
  570. QDEBUG checkb(b, "qpass");
  571. len += BALLOC(b);
  572. dlen += BLEN(b);
  573. }
  574. q->blast = b;
  575. q->len += len;
  576. q->dlen += dlen;
  577. if(q->len >= q->limit/2)
  578. q->state |= Qflow;
  579. if(q->state & Qstarve){
  580. q->state &= ~Qstarve;
  581. dowakeup = 1;
  582. }
  583. unlock(&q->l);
  584. if(dowakeup)
  585. Wakeup(&q->rr);
  586. return len;
  587. }
  588. /*
  589. * if the allocated space is way out of line with the used
  590. * space, reallocate to a smaller block
  591. */
  592. Block*
  593. packblock(Block *bp)
  594. {
  595. Block **l, *nbp;
  596. int n;
  597. for(l = &bp; *l; l = &(*l)->next){
  598. nbp = *l;
  599. n = BLEN(nbp);
  600. if((n<<2) < BALLOC(nbp)){
  601. *l = allocb(n);
  602. memmove((*l)->wp, nbp->rp, n);
  603. (*l)->wp += n;
  604. (*l)->next = nbp->next;
  605. freeb(nbp);
  606. }
  607. }
  608. return bp;
  609. }
  610. int
  611. qproduce(Queue *q, void *vp, int len)
  612. {
  613. Block *b;
  614. int dowakeup;
  615. uchar *p = vp;
  616. /* sync with qread */
  617. dowakeup = 0;
  618. lock(&q->l);
  619. if(q->state & Qclosed){
  620. unlock(&q->l);
  621. return -1;
  622. }
  623. /* no waiting receivers, room in buffer? */
  624. if(q->len >= q->limit){
  625. q->state |= Qflow;
  626. unlock(&q->l);
  627. return -1;
  628. }
  629. /* save in buffer */
  630. b = iallocb(len);
  631. if(b == 0){
  632. unlock(&q->l);
  633. print("qproduce: iallocb failed\n");
  634. return -1;
  635. }
  636. memmove(b->wp, p, len);
  637. b->wp += len;
  638. if(q->bfirst)
  639. q->blast->next = b;
  640. else
  641. q->bfirst = b;
  642. q->blast = b;
  643. /* b->next = 0; done by allocb() */
  644. q->len += BALLOC(b);
  645. q->dlen += BLEN(b);
  646. QDEBUG checkb(b, "qproduce");
  647. if(q->state & Qstarve){
  648. q->state &= ~Qstarve;
  649. dowakeup = 1;
  650. }
  651. if(q->len >= q->limit)
  652. q->state |= Qflow;
  653. unlock(&q->l);
  654. if(dowakeup)
  655. Wakeup(&q->rr);
  656. return len;
  657. }
  658. /*
  659. * copy from offset in the queue
  660. */
  661. Block*
  662. qcopy(Queue *q, int len, ulong offset)
  663. {
  664. int sofar;
  665. int n;
  666. Block *b, *nb;
  667. uchar *p;
  668. nb = allocb(len);
  669. lock(&q->l);
  670. /* go to offset */
  671. b = q->bfirst;
  672. for(sofar = 0; ; sofar += n){
  673. if(b == nil){
  674. unlock(&q->l);
  675. return nb;
  676. }
  677. n = BLEN(b);
  678. if(sofar + n > offset){
  679. p = b->rp + offset - sofar;
  680. n -= offset - sofar;
  681. break;
  682. }
  683. b = b->next;
  684. }
  685. /* copy bytes from there */
  686. for(sofar = 0; sofar < len;){
  687. if(n > len - sofar)
  688. n = len - sofar;
  689. memmove(nb->wp, p, n);
  690. sofar += n;
  691. nb->wp += n;
  692. b = b->next;
  693. if(b == nil)
  694. break;
  695. n = BLEN(b);
  696. p = b->rp;
  697. }
  698. unlock(&q->l);
  699. return nb;
  700. }
  701. /*
  702. * called by non-interrupt code
  703. */
  704. Queue*
  705. qopen(int limit, int msg, void (*kick)(void*), void *arg)
  706. {
  707. Queue *q;
  708. q = kmalloc(sizeof(Queue));
  709. if(q == 0)
  710. return 0;
  711. q->limit = q->inilim = limit;
  712. q->kick = kick;
  713. q->arg = arg;
  714. q->state = msg;
  715. q->state |= Qstarve;
  716. q->eof = 0;
  717. q->noblock = 0;
  718. return q;
  719. }
  720. /* open a queue to be bypassed */
  721. Queue*
  722. qbypass(void (*bypass)(void*, Block*), void *arg)
  723. {
  724. Queue *q;
  725. q = malloc(sizeof(Queue));
  726. if(q == 0)
  727. return 0;
  728. q->limit = 0;
  729. q->arg = arg;
  730. q->bypass = bypass;
  731. q->state = 0;
  732. return q;
  733. }
  734. static int
  735. notempty(void *a)
  736. {
  737. Queue *q = a;
  738. return (q->state & Qclosed) || q->bfirst != 0;
  739. }
  740. /*
  741. * wait for the queue to be non-empty or closed.
  742. * called with q ilocked.
  743. */
  744. static int
  745. qwait(Queue *q)
  746. {
  747. /* wait for data */
  748. for(;;){
  749. if(q->bfirst != nil)
  750. break;
  751. if(q->state & Qclosed){
  752. if(++q->eof > 3)
  753. return -1;
  754. if(*q->err && strcmp(q->err, Ehungup) != 0)
  755. return -1;
  756. return 0;
  757. }
  758. q->state |= Qstarve; /* flag requesting producer to wake me */
  759. unlock(&q->l);
  760. Sleep(&q->rr, notempty, q);
  761. lock(&q->l);
  762. }
  763. return 1;
  764. }
  765. /*
  766. * add a block list to a queue
  767. */
  768. void
  769. qaddlist(Queue *q, Block *b)
  770. {
  771. /* queue the block */
  772. if(q->bfirst)
  773. q->blast->next = b;
  774. else
  775. q->bfirst = b;
  776. q->len += blockalloclen(b);
  777. q->dlen += blocklen(b);
  778. while(b->next)
  779. b = b->next;
  780. q->blast = b;
  781. }
  782. /*
  783. * called with q locked
  784. */
  785. Block*
  786. qremove(Queue *q)
  787. {
  788. Block *b;
  789. b = q->bfirst;
  790. if(b == nil)
  791. return nil;
  792. q->bfirst = b->next;
  793. b->next = nil;
  794. q->dlen -= BLEN(b);
  795. q->len -= BALLOC(b);
  796. QDEBUG checkb(b, "qremove");
  797. return b;
  798. }
  799. /*
  800. * copy the contents of a string of blocks into
  801. * memory. emptied blocks are freed. return
  802. * pointer to first unconsumed block.
  803. */
  804. Block*
  805. bl2mem(uchar *p, Block *b, int n)
  806. {
  807. int i;
  808. Block *next;
  809. for(; b != nil; b = next){
  810. i = BLEN(b);
  811. if(i > n){
  812. memmove(p, b->rp, n);
  813. b->rp += n;
  814. return b;
  815. }
  816. memmove(p, b->rp, i);
  817. n -= i;
  818. p += i;
  819. b->rp += i;
  820. next = b->next;
  821. freeb(b);
  822. }
  823. return nil;
  824. }
  825. /*
  826. * copy the contents of memory into a string of blocks.
  827. * return nil on error.
  828. */
  829. Block*
  830. mem2bl(uchar *p, int len)
  831. {
  832. int n;
  833. Block *b, *first, **l;
  834. first = nil;
  835. l = &first;
  836. if(waserror()){
  837. freeblist(first);
  838. nexterror();
  839. }
  840. do {
  841. n = len;
  842. if(n > Maxatomic)
  843. n = Maxatomic;
  844. *l = b = allocb(n);
  845. setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
  846. memmove(b->wp, p, n);
  847. b->wp += n;
  848. p += n;
  849. len -= n;
  850. l = &b->next;
  851. } while(len > 0);
  852. poperror();
  853. return first;
  854. }
  855. /*
  856. * put a block back to the front of the queue
  857. * called with q ilocked
  858. */
  859. void
  860. qputback(Queue *q, Block *b)
  861. {
  862. b->next = q->bfirst;
  863. if(q->bfirst == nil)
  864. q->blast = b;
  865. q->bfirst = b;
  866. q->len += BALLOC(b);
  867. q->dlen += BLEN(b);
  868. }
  869. /*
  870. * flow control, get producer going again
  871. * called with q locked
  872. */
  873. static void
  874. qwakeup_unlock(Queue *q)
  875. {
  876. int dowakeup = 0;
  877. /* if writer flow controlled, restart */
  878. if((q->state & Qflow) && q->len < q->limit/2){
  879. q->state &= ~Qflow;
  880. dowakeup = 1;
  881. }
  882. unlock(&q->l);
  883. /* wakeup flow controlled writers */
  884. if(dowakeup){
  885. if(q->kick)
  886. q->kick(q->arg);
  887. Wakeup(&q->wr);
  888. }
  889. }
  890. /*
  891. * get next block from a queue (up to a limit)
  892. */
  893. Block*
  894. qbread(Queue *q, int len)
  895. {
  896. Block *b, *nb;
  897. int n;
  898. qlock(&q->rlock);
  899. if(waserror()){
  900. qunlock(&q->rlock);
  901. nexterror();
  902. }
  903. lock(&q->l);
  904. switch(qwait(q)){
  905. case 0:
  906. /* queue closed */
  907. unlock(&q->l);
  908. poperror();
  909. qunlock(&q->rlock);
  910. return nil;
  911. case -1:
  912. /* multiple reads on a closed queue */
  913. unlock(&q->l);
  914. error(q->err);
  915. }
  916. /* if we get here, there's at least one block in the queue */
  917. b = qremove(q);
  918. n = BLEN(b);
  919. /* split block if it's too big and this is not a message oriented queue */
  920. nb = b;
  921. if(n > len){
  922. if((q->state&Qmsg) == 0){
  923. n -= len;
  924. b = allocb(n);
  925. memmove(b->wp, nb->rp+len, n);
  926. b->wp += n;
  927. qputback(q, b);
  928. }
  929. nb->wp = nb->rp + len;
  930. }
  931. /* restart producer */
  932. qwakeup_unlock(q);
  933. poperror();
  934. qunlock(&q->rlock);
  935. return nb;
  936. }
  937. /*
  938. * read a queue. if no data is queued, wait on its Rendez
  939. */
  940. long
  941. qread(Queue *q, void *vp, int len)
  942. {
  943. Block *b, *first, **l;
  944. int m, n;
  945. qlock(&q->rlock);
  946. if(waserror()){
  947. qunlock(&q->rlock);
  948. nexterror();
  949. }
  950. lock(&q->l);
  951. again:
  952. switch(qwait(q)){
  953. case 0:
  954. /* queue closed */
  955. unlock(&q->l);
  956. poperror();
  957. qunlock(&q->rlock);
  958. return 0;
  959. case -1:
  960. /* multiple reads on a closed queue */
  961. unlock(&q->l);
  962. error(q->err);
  963. }
  964. /* if we get here, there's at least one block in the queue */
  965. if(q->state & Qcoalesce){
  966. /* when coalescing, 0 length blocks just go away */
  967. b = q->bfirst;
  968. if(BLEN(b) <= 0){
  969. freeb(qremove(q));
  970. goto again;
  971. }
  972. /* grab the first block plus as many
  973. * following blocks as will completely
  974. * fit in the read.
  975. */
  976. n = 0;
  977. l = &first;
  978. m = BLEN(b);
  979. for(;;) {
  980. *l = qremove(q);
  981. l = &b->next;
  982. n += m;
  983. b = q->bfirst;
  984. if(b == nil)
  985. break;
  986. m = BLEN(b);
  987. if(n+m > len)
  988. break;
  989. }
  990. } else {
  991. first = qremove(q);
  992. n = BLEN(first);
  993. }
  994. /* copy to user space outside of the ilock */
  995. unlock(&q->l);
  996. b = bl2mem(vp, first, len);
  997. lock(&q->l);
  998. /* take care of any left over partial block */
  999. if(b != nil){
  1000. n -= BLEN(b);
  1001. if(q->state & Qmsg)
  1002. freeb(b);
  1003. else
  1004. qputback(q, b);
  1005. }
  1006. /* restart producer */
  1007. qwakeup_unlock(q);
  1008. poperror();
  1009. qunlock(&q->rlock);
  1010. return n;
  1011. }
  1012. static int
  1013. qnotfull(void *a)
  1014. {
  1015. Queue *q = a;
  1016. return q->len < q->limit || (q->state & Qclosed);
  1017. }
  1018. /*
  1019. * add a block to a queue obeying flow control
  1020. */
  1021. long
  1022. qbwrite(Queue *q, Block *b)
  1023. {
  1024. int n, dowakeup;
  1025. volatile struct {Block *b;} cb;
  1026. dowakeup = 0;
  1027. n = BLEN(b);
  1028. if(q->bypass){
  1029. (*q->bypass)(q->arg, b);
  1030. return n;
  1031. }
  1032. cb.b = b;
  1033. qlock(&q->wlock);
  1034. if(waserror()){
  1035. if(cb.b != nil)
  1036. freeb(cb.b);
  1037. qunlock(&q->wlock);
  1038. nexterror();
  1039. }
  1040. lock(&q->l);
  1041. /* give up if the queue is closed */
  1042. if(q->state & Qclosed){
  1043. unlock(&q->l);
  1044. error(q->err);
  1045. }
  1046. /* if nonblocking, don't queue over the limit */
  1047. if(q->len >= q->limit){
  1048. if(q->noblock){
  1049. unlock(&q->l);
  1050. freeb(b);
  1051. poperror();
  1052. qunlock(&q->wlock);
  1053. return n;
  1054. }
  1055. }
  1056. /* queue the block */
  1057. if(q->bfirst)
  1058. q->blast->next = b;
  1059. else
  1060. q->bfirst = b;
  1061. q->blast = b;
  1062. b->next = 0;
  1063. q->len += BALLOC(b);
  1064. q->dlen += n;
  1065. QDEBUG checkb(b, "qbwrite");
  1066. cb.b = nil;
  1067. if(q->state & Qstarve){
  1068. q->state &= ~Qstarve;
  1069. dowakeup = 1;
  1070. }
  1071. unlock(&q->l);
  1072. /* get output going again */
  1073. if(q->kick && (dowakeup || (q->state&Qkick)))
  1074. q->kick(q->arg);
  1075. if(dowakeup)
  1076. Wakeup(&q->rr);
  1077. /*
  1078. * flow control, wait for queue to get below the limit
  1079. * before allowing the process to continue and queue
  1080. * more. We do this here so that postnote can only
  1081. * interrupt us after the data has been queued. This
  1082. * means that things like 9p flushes and ssl messages
  1083. * will not be disrupted by software interrupts.
  1084. *
  1085. * Note - this is moderately dangerous since a process
  1086. * that keeps getting interrupted and rewriting will
  1087. * queue infinite crud.
  1088. */
  1089. for(;;){
  1090. if(q->noblock || qnotfull(q))
  1091. break;
  1092. lock(&q->l);
  1093. q->state |= Qflow;
  1094. unlock(&q->l);
  1095. Sleep(&q->wr, qnotfull, q);
  1096. }
  1097. qunlock(&q->wlock);
  1098. poperror();
  1099. return n;
  1100. }
  1101. /*
  1102. * write to a queue. only Maxatomic bytes at a time is atomic.
  1103. */
  1104. int
  1105. qwrite(Queue *q, void *vp, int len)
  1106. {
  1107. int n, sofar;
  1108. Block *b;
  1109. uchar *p = vp;
  1110. sofar = 0;
  1111. do {
  1112. n = len-sofar;
  1113. if(n > Maxatomic)
  1114. n = Maxatomic;
  1115. b = allocb(n);
  1116. setmalloctag(b, getcallerpc(&q));
  1117. if(waserror()){
  1118. freeb(b);
  1119. nexterror();
  1120. }
  1121. memmove(b->wp, p+sofar, n);
  1122. poperror();
  1123. b->wp += n;
  1124. qbwrite(q, b);
  1125. sofar += n;
  1126. } while(sofar < len && (q->state & Qmsg) == 0);
  1127. return len;
  1128. }
  1129. /*
  1130. * used by print() to write to a queue. Since we may be splhi or not in
  1131. * a process, don't qlock.
  1132. */
  1133. int
  1134. qiwrite(Queue *q, void *vp, int len)
  1135. {
  1136. int n, sofar, dowakeup;
  1137. Block *b;
  1138. uchar *p = vp;
  1139. dowakeup = 0;
  1140. sofar = 0;
  1141. do {
  1142. n = len-sofar;
  1143. if(n > Maxatomic)
  1144. n = Maxatomic;
  1145. b = iallocb(n);
  1146. if (b == 0) {
  1147. print("qiwrite: iallocb failed\n");
  1148. break;
  1149. }
  1150. memmove(b->wp, p+sofar, n);
  1151. b->wp += n;
  1152. lock(&q->l);
  1153. QDEBUG checkb(b, "qiwrite");
  1154. if(q->bfirst)
  1155. q->blast->next = b;
  1156. else
  1157. q->bfirst = b;
  1158. q->blast = b;
  1159. q->len += BALLOC(b);
  1160. q->dlen += n;
  1161. if(q->state & Qstarve){
  1162. q->state &= ~Qstarve;
  1163. dowakeup = 1;
  1164. }
  1165. unlock(&q->l);
  1166. if(dowakeup){
  1167. if(q->kick)
  1168. q->kick(q->arg);
  1169. Wakeup(&q->rr);
  1170. }
  1171. sofar += n;
  1172. } while(sofar < len && (q->state & Qmsg) == 0);
  1173. return sofar;
  1174. }
  1175. /*
  1176. * be extremely careful when calling this,
  1177. * as there is no reference accounting
  1178. */
  1179. void
  1180. qfree(Queue *q)
  1181. {
  1182. qclose(q);
  1183. free(q);
  1184. }
  1185. /*
  1186. * Mark a queue as closed. No further IO is permitted.
  1187. * All blocks are released.
  1188. */
  1189. void
  1190. qclose(Queue *q)
  1191. {
  1192. Block *bfirst;
  1193. if(q == nil)
  1194. return;
  1195. /* mark it */
  1196. lock(&q->l);
  1197. q->state |= Qclosed;
  1198. q->state &= ~(Qflow|Qstarve);
  1199. strcpy(q->err, Ehungup);
  1200. bfirst = q->bfirst;
  1201. q->bfirst = 0;
  1202. q->len = 0;
  1203. q->dlen = 0;
  1204. q->noblock = 0;
  1205. unlock(&q->l);
  1206. /* free queued blocks */
  1207. freeblist(bfirst);
  1208. /* wake up readers/writers */
  1209. Wakeup(&q->rr);
  1210. Wakeup(&q->wr);
  1211. }
  1212. /*
  1213. * Mark a queue as closed. Wakeup any readers. Don't remove queued
  1214. * blocks.
  1215. */
  1216. void
  1217. qhangup(Queue *q, char *msg)
  1218. {
  1219. /* mark it */
  1220. lock(&q->l);
  1221. q->state |= Qclosed;
  1222. if(msg == 0 || *msg == 0)
  1223. strcpy(q->err, Ehungup);
  1224. else
  1225. kstrcpy(q->err, msg, sizeof q->err);
  1226. unlock(&q->l);
  1227. /* wake up readers/writers */
  1228. Wakeup(&q->rr);
  1229. Wakeup(&q->wr);
  1230. }
  1231. /*
  1232. * return non-zero if the q is hungup
  1233. */
  1234. int
  1235. qisclosed(Queue *q)
  1236. {
  1237. return q->state & Qclosed;
  1238. }
  1239. /*
  1240. * mark a queue as no longer hung up
  1241. */
  1242. void
  1243. qreopen(Queue *q)
  1244. {
  1245. lock(&q->l);
  1246. q->state &= ~Qclosed;
  1247. q->state |= Qstarve;
  1248. q->eof = 0;
  1249. q->limit = q->inilim;
  1250. unlock(&q->l);
  1251. }
  1252. /*
  1253. * return bytes queued
  1254. */
  1255. int
  1256. qlen(Queue *q)
  1257. {
  1258. return q->dlen;
  1259. }
  1260. /*
  1261. * return space remaining before flow control
  1262. */
  1263. int
  1264. qwindow(Queue *q)
  1265. {
  1266. int l;
  1267. l = q->limit - q->len;
  1268. if(l < 0)
  1269. l = 0;
  1270. return l;
  1271. }
  1272. /*
  1273. * return true if we can read without blocking
  1274. */
  1275. int
  1276. qcanread(Queue *q)
  1277. {
  1278. return q->bfirst!=0;
  1279. }
  1280. /*
  1281. * change queue limit
  1282. */
  1283. void
  1284. qsetlimit(Queue *q, int limit)
  1285. {
  1286. q->limit = limit;
  1287. }
  1288. /*
  1289. * set blocking/nonblocking
  1290. */
  1291. void
  1292. qnoblock(Queue *q, int onoff)
  1293. {
  1294. q->noblock = onoff;
  1295. }
  1296. /*
  1297. * flush the output queue
  1298. */
  1299. void
  1300. qflush(Queue *q)
  1301. {
  1302. Block *bfirst;
  1303. /* mark it */
  1304. lock(&q->l);
  1305. bfirst = q->bfirst;
  1306. q->bfirst = 0;
  1307. q->len = 0;
  1308. q->dlen = 0;
  1309. unlock(&q->l);
  1310. /* free queued blocks */
  1311. freeblist(bfirst);
  1312. /* wake up readers/writers */
  1313. Wakeup(&q->wr);
  1314. }
  1315. int
  1316. qfull(Queue *q)
  1317. {
  1318. return q->state & Qflow;
  1319. }
  1320. int
  1321. qstate(Queue *q)
  1322. {
  1323. return q->state;
  1324. }
  1325. int
  1326. qclosed(Queue *q)
  1327. {
  1328. return q->state & Qclosed;
  1329. }