devzp.c 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603
  1. /*
  2. * This file is part of the UCB release of Plan 9. It is subject to the license
  3. * terms in the LICENSE file found in the top-level directory of this
  4. * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
  5. * part of the UCB release of Plan 9, including this file, may be copied,
  6. * modified, propagated, or distributed except according to the terms contained
  7. * in the LICENSE file.
  8. */
  9. #include "u.h"
  10. #include "../port/lib.h"
  11. #include "mem.h"
  12. #include "dat.h"
  13. #include "fns.h"
  14. #include "../port/error.h"
  15. enum
  16. {
  17. Incr = 16,
  18. Maxatomic = 64*KiB,
  19. };
  20. typedef struct ZPipe ZPipe;
  21. typedef struct Zq Zq;
  22. struct Zq
  23. {
  24. Lock; /* to protect Zq */
  25. QLock rlck; /* one reader at a time */
  26. Kzio* io; /* io[] */
  27. Kzio* ep; /* end pointer */
  28. int closed; /* queue is closed */
  29. int waiting; /* reader is waiting */
  30. Kzio* rp; /* read pointer */
  31. Kzio* wp; /* write pointer */
  32. Rendez rr; /* reader rendez */
  33. };
  34. struct ZPipe
  35. {
  36. QLock;
  37. ZPipe *next;
  38. int ref;
  39. ulong path;
  40. Zq q[2];
  41. int qref[2];
  42. };
  43. struct
  44. {
  45. Lock;
  46. ulong path;
  47. } zpalloc;
  48. enum
  49. {
  50. Qdir,
  51. Qdata0,
  52. Qdata1,
  53. };
  54. Dirtab zpdir[] =
  55. {
  56. ".", {Qdir,0,QTDIR}, 0, DMDIR|0500,
  57. "data", {Qdata0}, 0, 0600,
  58. "data1", {Qdata1}, 0, 0600,
  59. };
  60. #define NZPDIR 3
  61. #define ZPTYPE(x) (((unsigned)x)&0x1f)
  62. #define ZPID(x) ((((unsigned)x))>>5)
  63. #define ZPQID(i, t) ((((unsigned)i)<<5)|(t))
  64. #define ZQLEN(q) ((q)->wp - (q)->rp)
  65. static int
  66. zqnotempty(void *x)
  67. {
  68. Zq *q;
  69. q = x;
  70. return ZQLEN(q) != 0 || q->closed != 0;
  71. }
  72. static void
  73. zqdump(Zq *q)
  74. {
  75. Kzio *io;
  76. if(DBGFLG == 0)
  77. return;
  78. print("zq %#p: io %#p rp %ld wp %ld ep %ld\n",
  79. q, q->io, q->rp - q->io, q->wp - q->io, q->ep - q->io);
  80. for(io = q->rp; io != nil && io < q->wp; io++)
  81. print("\tio[%ld] = %Z\n", io - q->io, io);
  82. print("\n");
  83. }
  84. /*
  85. * BUG: alloczio in here could be allocating data
  86. * in the kernel that is not needed. In fact, such data
  87. * might be in the kernel already. It's only that we don't
  88. * have a way to reference more than once to the same source
  89. * data (no reference counters).
  90. */
  91. static int
  92. zqread(Zq *q, Kzio io[], int nio, usize count)
  93. {
  94. int i;
  95. int32_t tot, nr;
  96. Kzio *qio;
  97. Segment *s;
  98. char *p;
  99. DBG("zqread %ld\n", count);
  100. qlock(&q->rlck);
  101. lock(q);
  102. if(waserror()){
  103. unlock(q);
  104. qunlock(&q->rlck);
  105. nexterror();
  106. }
  107. while(q->closed == 0 && ZQLEN(q) == 0){
  108. q->waiting++;
  109. unlock(q);
  110. sleep(&q->rr, zqnotempty, q);
  111. lock(q);
  112. }
  113. i = 0;
  114. for(tot = 0; ZQLEN(q) > 0 && i < nio && tot < count; tot += nr){
  115. qio = q->rp;
  116. nr = qio->size;
  117. if(tot + nr > count){
  118. if(i > 0)
  119. break;
  120. io[i] = *qio;
  121. nr = count - tot;
  122. io[i].size = nr;
  123. s = getzkseg();
  124. if(s == nil){
  125. DBG("zqread: bytes thrown away\n");
  126. goto Consume; /* we drop bytes! */
  127. }
  128. qio->size -= nr;
  129. qio->data = alloczio(s, qio->size);
  130. p = io[i].data;
  131. memmove(qio->data, p + io[i].size, qio->size);
  132. DBG("zqread: copy %#Z %#Z\n", qio, io);
  133. qio->seg = s;
  134. }else
  135. io[i] = *qio;
  136. Consume:
  137. i++;
  138. q->rp++;
  139. }
  140. if(q->rp == q->wp)
  141. q->rp = q->wp = q->io;
  142. zqdump(q);
  143. poperror();
  144. unlock(q);
  145. qunlock(&q->rlck);
  146. return i;
  147. }
  148. /*
  149. * BUG: no flow control here.
  150. * We queue as many io[]s as we want.
  151. * Perhaps it would be better to do flow control,
  152. * but the process feeding the queue would run out
  153. * of buffering at some point, which also provides
  154. * flow control somehow.
  155. */
  156. static int32_t
  157. zqwrite(Zq *q, Kzio io[], int nio)
  158. {
  159. int i, ei, ri, wi, awake;
  160. lock(q);
  161. if(waserror()){
  162. unlock(q);
  163. nexterror();
  164. }
  165. DBG("zqwrite io%#p[%d]\n", io, nio);
  166. if(DBGFLG)
  167. for(i = 0; i < nio; i++)
  168. print("\tio%#p[%d] = %Z\n", io, i, &io[i]);
  169. if(q->closed)
  170. error("queue is closed");
  171. if(q->wp + nio > q->ep){
  172. if(q->rp > q->io){
  173. memmove(q->io, q->rp, ZQLEN(q)*sizeof q->io[0]);
  174. q->wp = q->io + ZQLEN(q);
  175. q->rp = q->io;
  176. }
  177. if(q->wp + nio > q->ep){
  178. ei = q->ep - q->io;
  179. ei += Incr;
  180. ri = q->rp - q->io;
  181. wi = q->wp - q->io;
  182. q->io = realloc(q->io, ei*sizeof q->io[0]);
  183. if(q->io == nil)
  184. panic("zqwrite: no memory");
  185. q->ep = q->io + ei;
  186. q->rp = q->io + ri;
  187. q->wp = q->io + wi;
  188. DBG("zqwrite: io %#p rp %#p wp %#p ep %#p\n",
  189. q->io, q->rp, q->wp, q->ep);
  190. }
  191. assert(q->wp + nio <= q->ep);
  192. }
  193. memmove(q->wp, io, nio*sizeof io[0]);
  194. q->wp += nio;
  195. awake = q->waiting;
  196. if(awake)
  197. q->waiting--;
  198. zqdump(q);
  199. poperror();
  200. unlock(q);
  201. if(awake)
  202. wakeup(&q->rr);
  203. return nio;
  204. }
  205. static void
  206. zqflush(Zq *q)
  207. {
  208. lock(q);
  209. for(;q->rp < q->wp; q->rp++){
  210. qlock(&q->rp->seg->lk);
  211. zputaddr(q->rp->seg, PTR2UINT(q->rp->data));
  212. qunlock(&q->rp->seg->lk);
  213. putseg(q->rp->seg);
  214. }
  215. q->rp = q->wp = q->io;
  216. unlock(q);
  217. }
  218. static void
  219. zqclose(Zq *q)
  220. {
  221. q->closed = 1;
  222. zqflush(q);
  223. wakeup(&q->rr);
  224. }
  225. static void
  226. zqhangup(Zq *q)
  227. {
  228. q->closed = 1;
  229. wakeup(&q->rr);
  230. }
  231. static void
  232. zqreopen(Zq *q)
  233. {
  234. q->closed = 0;
  235. }
  236. /*
  237. * create a zp, no streams are created until an open
  238. */
  239. static Chan*
  240. zpattach(char *spec)
  241. {
  242. ZPipe *p;
  243. Chan *c;
  244. c = devattach(L'∏', spec);
  245. p = malloc(sizeof(ZPipe));
  246. if(p == 0)
  247. exhausted("memory");
  248. p->ref = 1;
  249. lock(&zpalloc);
  250. p->path = ++zpalloc.path;
  251. unlock(&zpalloc);
  252. mkqid(&c->qid, ZPQID(2*p->path, Qdir), 0, QTDIR);
  253. c->aux = p;
  254. c->devno = 0;
  255. return c;
  256. }
  257. static int
  258. zpgen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
  259. {
  260. Qid q;
  261. int len;
  262. ZPipe *p;
  263. if(i == DEVDOTDOT){
  264. devdir(c, c->qid, "#∏", 0, eve, DMDIR|0555, dp);
  265. return 1;
  266. }
  267. i++; /* skip . */
  268. if(tab==0 || i>=ntab)
  269. return -1;
  270. tab += i;
  271. p = c->aux;
  272. switch((uint32_t)tab->qid.path){
  273. case Qdata0:
  274. len = ZQLEN(&p->q[0]);
  275. break;
  276. case Qdata1:
  277. len = ZQLEN(&p->q[1]);
  278. break;
  279. default:
  280. len = tab->length;
  281. break;
  282. }
  283. mkqid(&q, ZPQID(ZPID(c->qid.path), tab->qid.path), 0, QTFILE);
  284. devdir(c, q, tab->name, len, eve, tab->perm, dp);
  285. return 1;
  286. }
  287. static Walkqid*
  288. zpwalk(Chan *c, Chan *nc, char **name, int nname)
  289. {
  290. Walkqid *wq;
  291. ZPipe *p;
  292. wq = devwalk(c, nc, name, nname, zpdir, NZPDIR, zpgen);
  293. if(wq != nil && wq->clone != nil && wq->clone != c){
  294. p = c->aux;
  295. qlock(p);
  296. p->ref++;
  297. if(c->flag & COPEN){
  298. print("channel open in zpwalk\n");
  299. switch(ZPTYPE(c->qid.path)){
  300. case Qdata0:
  301. p->qref[0]++;
  302. break;
  303. case Qdata1:
  304. p->qref[1]++;
  305. break;
  306. }
  307. }
  308. qunlock(p);
  309. }
  310. return wq;
  311. }
  312. static int32_t
  313. zpstat(Chan *c, uint8_t *db, int32_t n)
  314. {
  315. ZPipe *p;
  316. Dir dir;
  317. p = c->aux;
  318. switch(ZPTYPE(c->qid.path)){
  319. case Qdir:
  320. devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
  321. break;
  322. case Qdata0:
  323. devdir(c, c->qid, "data", ZQLEN(&p->q[0]), eve, 0600, &dir);
  324. break;
  325. case Qdata1:
  326. devdir(c, c->qid, "data1", ZQLEN(&p->q[1]), eve, 0600, &dir);
  327. break;
  328. default:
  329. panic("zpstat");
  330. }
  331. n = convD2M(&dir, db, n);
  332. if(n < BIT16SZ)
  333. error(Eshortstat);
  334. return n;
  335. }
  336. /*
  337. * if the stream doesn't exist, create it
  338. */
  339. static Chan*
  340. zpopen(Chan *c, int omode)
  341. {
  342. ZPipe *p;
  343. if(c->qid.type & QTDIR){
  344. if(omode != OREAD)
  345. error(Ebadarg);
  346. c->mode = omode;
  347. c->flag |= COPEN;
  348. c->offset = 0;
  349. return c;
  350. }
  351. p = c->aux;
  352. qlock(p);
  353. switch(ZPTYPE(c->qid.path)){
  354. case Qdata0:
  355. p->qref[0]++;
  356. break;
  357. case Qdata1:
  358. p->qref[1]++;
  359. break;
  360. }
  361. qunlock(p);
  362. c->mode = openmode(omode);
  363. c->flag |= COPEN;
  364. c->offset = 0;
  365. c->iounit = Maxatomic; /* should we care? */
  366. return c;
  367. }
  368. static void
  369. zpclose(Chan *c)
  370. {
  371. ZPipe *p;
  372. p = c->aux;
  373. qlock(p);
  374. if(c->flag & COPEN){
  375. /*
  376. * closing either side hangs up the stream
  377. */
  378. switch(ZPTYPE(c->qid.path)){
  379. case Qdata0:
  380. p->qref[0]--;
  381. if(p->qref[0] == 0){
  382. zqhangup(&p->q[1]);
  383. zqclose(&p->q[0]);
  384. }
  385. break;
  386. case Qdata1:
  387. p->qref[1]--;
  388. if(p->qref[1] == 0){
  389. zqhangup(&p->q[0]);
  390. zqclose(&p->q[1]);
  391. }
  392. break;
  393. }
  394. }
  395. /*
  396. * if both sides are closed, they are reusable
  397. */
  398. if(p->qref[0] == 0 && p->qref[1] == 0){
  399. zqreopen(&p->q[0]);
  400. zqreopen(&p->q[1]);
  401. }
  402. /*
  403. * free the structure on last close
  404. */
  405. p->ref--;
  406. if(p->ref == 0){
  407. qunlock(p);
  408. free(p);
  409. } else
  410. qunlock(p);
  411. }
  412. static int32_t
  413. zpread(Chan *c, void *va, int32_t n, int64_t)
  414. {
  415. ZPipe *p;
  416. Kzio io[32]; /* might read less than we could */
  417. int nio;
  418. p = c->aux;
  419. switch(ZPTYPE(c->qid.path)){
  420. case Qdir:
  421. return devdirread(c, va, n, zpdir, NZPDIR, zpgen);
  422. case Qdata0:
  423. nio = zqread(&p->q[0], io, nelem(io), n);
  424. return readzio(io, nio, va, n);
  425. case Qdata1:
  426. nio = zqread(&p->q[0], io, nelem(io), n);
  427. return readzio(io, nio, va, n);
  428. default:
  429. panic("zpread");
  430. }
  431. return -1; /* not reached */
  432. }
  433. static int
  434. zpzread(Chan *c, Kzio io[], int nio, usize n, int64_t offset)
  435. {
  436. ZPipe *p;
  437. p = c->aux;
  438. switch(ZPTYPE(c->qid.path)){
  439. case Qdir:
  440. return devzread(c, io, nio, n, offset);
  441. case Qdata0:
  442. return zqread(&p->q[0], io, nio, n);
  443. case Qdata1:
  444. return zqread(&p->q[0], io, nio, n);
  445. default:
  446. panic("zpread");
  447. }
  448. return -1; /* not reached */
  449. }
  450. /*
  451. * a write to a closed zp should cause a note to be sent to
  452. * the process.
  453. * If the data is already in a SG_ZIO segment, we shouldn't
  454. * be copying it again, probably.
  455. */
  456. static int32_t
  457. zpwrite(Chan *c, void *va, int32_t n, int64_t)
  458. {
  459. ZPipe *p;
  460. Kzio io; /* might write less than we could */
  461. int32_t tot, nw;
  462. Segment *s;
  463. Zq *q;
  464. char *cp;
  465. if(n <= 0)
  466. return n;
  467. p = c->aux;
  468. switch(ZPTYPE(c->qid.path)){
  469. case Qdata0:
  470. q = &p->q[1];
  471. break;
  472. case Qdata1:
  473. q = &p->q[0];
  474. break;
  475. default:
  476. q = nil;
  477. panic("zpwrite");
  478. }
  479. s = getzkseg();
  480. if(waserror()){
  481. putseg(s);
  482. nexterror();
  483. }
  484. cp = va;
  485. for(tot = 0; tot < n; tot += nw){
  486. nw = n;
  487. if(nw > Maxatomic)
  488. nw = Maxatomic;
  489. io.data = alloczio(s, nw);
  490. memmove(io.data, cp + tot, nw);
  491. io.seg = s;
  492. incref(s);
  493. io.size = nw;
  494. DBG("zpwrite: copy %Z %#p\n", &io, cp+tot);
  495. zqwrite(q, &io, 1);
  496. }
  497. poperror();
  498. putseg(s);
  499. return n;
  500. }
  501. static int
  502. zpzwrite(Chan *c, Kzio io[], int nio, int64_t)
  503. {
  504. ZPipe *p;
  505. p = c->aux;
  506. switch(ZPTYPE(c->qid.path)){
  507. case Qdata0:
  508. zqwrite(&p->q[1], io, nio);
  509. break;
  510. case Qdata1:
  511. zqwrite(&p->q[0], io, nio);
  512. break;
  513. default:
  514. panic("zpwrite");
  515. }
  516. return nio;
  517. }
  518. Dev zpdevtab = {
  519. L'∏',
  520. "zp",
  521. devreset,
  522. devinit,
  523. devshutdown,
  524. zpattach,
  525. zpwalk,
  526. zpstat,
  527. zpopen,
  528. devcreate,
  529. zpclose,
  530. zpread,
  531. devbread,
  532. zpwrite,
  533. devbwrite,
  534. devremove,
  535. devwstat,
  536. nil, /* power */
  537. nil, /* config */
  538. zpzread,
  539. zpzwrite,
  540. };