devzp.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. /*
  2. * This file is part of the UCB release of Plan 9. It is subject to the license
  3. * terms in the LICENSE file found in the top-level directory of this
  4. * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
  5. * part of the UCB release of Plan 9, including this file, may be copied,
  6. * modified, propagated, or distributed except according to the terms contained
  7. * in the LICENSE file.
  8. */
  9. #include "u.h"
  10. #include "../port/lib.h"
  11. #include "mem.h"
  12. #include "dat.h"
  13. #include "fns.h"
  14. #include "../port/error.h"
  15. enum
  16. {
  17. Incr = 16,
  18. Maxatomic = 64*KiB,
  19. };
  20. typedef struct ZPipe ZPipe;
  21. typedef struct Zq Zq;
  22. struct Zq
  23. {
  24. Lock; /* to protect Zq */
  25. QLock rlck; /* one reader at a time */
  26. Kzio* io; /* io[] */
  27. Kzio* ep; /* end pointer */
  28. int closed; /* queue is closed */
  29. int waiting; /* reader is waiting */
  30. Kzio* rp; /* read pointer */
  31. Kzio* wp; /* write pointer */
  32. Rendez rr; /* reader rendez */
  33. };
  34. struct ZPipe
  35. {
  36. QLock;
  37. ZPipe *next;
  38. int ref;
  39. uint32_t path;
  40. Zq q[2];
  41. int qref[2];
  42. };
  43. struct
  44. {
  45. Lock;
  46. uint32_t path;
  47. } zpalloc;
  48. enum
  49. {
  50. Qdir,
  51. Qdata0,
  52. Qdata1,
  53. };
  54. Dirtab zpdir[] =
  55. {
  56. {".", {Qdir,0,QTDIR}, 0, DMDIR|0500},
  57. {"data", {Qdata0}, 0, 0600},
  58. {"data1", {Qdata1}, 0, 0600},
  59. };
  60. #define NZPDIR 3
  61. #define ZPTYPE(x) (((unsigned)x)&0x1f)
  62. #define ZPID(x) ((((unsigned)x))>>5)
  63. #define ZPQID(i, t) ((((unsigned)i)<<5)|(t))
  64. #define ZQLEN(q) ((q)->wp - (q)->rp)
  65. static int
  66. zqnotempty(void *x)
  67. {
  68. Zq *q;
  69. q = x;
  70. return ZQLEN(q) != 0 || q->closed != 0;
  71. }
  72. static void
  73. zqdump(Zq *q)
  74. {
  75. Kzio *io;
  76. if(DBGFLG == 0)
  77. return;
  78. print("zq %#p: io %#p rp %ld wp %ld ep %ld\n",
  79. q, q->io, q->rp - q->io, q->wp - q->io, q->ep - q->io);
  80. for(io = q->rp; io != nil && io < q->wp; io++)
  81. print("\tio[%ld] = %Z\n", io - q->io, io);
  82. print("\n");
  83. }
  84. /*
  85. * BUG: alloczio in here could be allocating data
  86. * in the kernel that is not needed. In fact, such data
  87. * might be in the kernel already. It's only that we don't
  88. * have a way to reference more than once to the same source
  89. * data (no reference counters).
  90. */
  91. static int
  92. zqread(Zq *q, Kzio io[], int nio, usize count)
  93. {
  94. Proc *up = externup();
  95. int i;
  96. int32_t tot, nr;
  97. Kzio *qio;
  98. Segment *s;
  99. char *p;
  100. DBG("zqread %ld\n", count);
  101. qlock(&q->rlck);
  102. lock(q);
  103. if(waserror()){
  104. unlock(q);
  105. qunlock(&q->rlck);
  106. nexterror();
  107. }
  108. while(q->closed == 0 && ZQLEN(q) == 0){
  109. q->waiting++;
  110. unlock(q);
  111. sleep(&q->rr, zqnotempty, q);
  112. lock(q);
  113. }
  114. i = 0;
  115. for(tot = 0; ZQLEN(q) > 0 && i < nio && tot < count; tot += nr){
  116. qio = q->rp;
  117. nr = qio->Zio.size;
  118. if(tot + nr > count){
  119. if(i > 0)
  120. break;
  121. io[i] = *qio;
  122. nr = count - tot;
  123. io[i].Zio.size = nr;
  124. s = getzkseg();
  125. if(s == nil){
  126. DBG("zqread: bytes thrown away\n");
  127. goto Consume; /* we drop bytes! */
  128. }
  129. qio->Zio.size -= nr;
  130. qio->Zio.data = alloczio(s, qio->Zio.size);
  131. p = io[i].Zio.data;
  132. memmove(qio->Zio.data, p + io[i].Zio.size, qio->Zio.size);
  133. DBG("zqread: copy %#Z %#Z\n", qio, io);
  134. qio->seg = s;
  135. }else
  136. io[i] = *qio;
  137. Consume:
  138. i++;
  139. q->rp++;
  140. }
  141. if(q->rp == q->wp)
  142. q->rp = q->wp = q->io;
  143. zqdump(q);
  144. poperror();
  145. unlock(q);
  146. qunlock(&q->rlck);
  147. return i;
  148. }
  149. /*
  150. * BUG: no flow control here.
  151. * We queue as many io[]s as we want.
  152. * Perhaps it would be better to do flow control,
  153. * but the process feeding the queue would run out
  154. * of buffering at some point, which also provides
  155. * flow control somehow.
  156. */
  157. static int32_t
  158. zqwrite(Zq *q, Kzio io[], int nio)
  159. {
  160. Proc *up = externup();
  161. int i, ei, ri, wi, awake;
  162. lock(q);
  163. if(waserror()){
  164. unlock(q);
  165. nexterror();
  166. }
  167. DBG("zqwrite io%#p[%d]\n", io, nio);
  168. if(DBGFLG)
  169. for(i = 0; i < nio; i++)
  170. print("\tio%#p[%d] = %Z\n", io, i, &io[i]);
  171. if(q->closed)
  172. error("queue is closed");
  173. if(q->wp + nio > q->ep){
  174. if(q->rp > q->io){
  175. memmove(q->io, q->rp, ZQLEN(q)*sizeof q->io[0]);
  176. q->wp = q->io + ZQLEN(q);
  177. q->rp = q->io;
  178. }
  179. if(q->wp + nio > q->ep){
  180. ei = q->ep - q->io;
  181. ei += Incr;
  182. ri = q->rp - q->io;
  183. wi = q->wp - q->io;
  184. q->io = realloc(q->io, ei*sizeof q->io[0]);
  185. if(q->io == nil)
  186. panic("zqwrite: no memory");
  187. q->ep = q->io + ei;
  188. q->rp = q->io + ri;
  189. q->wp = q->io + wi;
  190. DBG("zqwrite: io %#p rp %#p wp %#p ep %#p\n",
  191. q->io, q->rp, q->wp, q->ep);
  192. }
  193. assert(q->wp + nio <= q->ep);
  194. }
  195. memmove(q->wp, io, nio*sizeof io[0]);
  196. q->wp += nio;
  197. awake = q->waiting;
  198. if(awake)
  199. q->waiting--;
  200. zqdump(q);
  201. poperror();
  202. unlock(q);
  203. if(awake)
  204. wakeup(&q->rr);
  205. return nio;
  206. }
  207. static void
  208. zqflush(Zq *q)
  209. {
  210. lock(q);
  211. for(;q->rp < q->wp; q->rp++){
  212. qlock(&q->rp->seg->lk);
  213. zputaddr(q->rp->seg, PTR2UINT(q->rp->Zio.data));
  214. qunlock(&q->rp->seg->lk);
  215. putseg(q->rp->seg);
  216. }
  217. q->rp = q->wp = q->io;
  218. unlock(q);
  219. }
  220. static void
  221. zqclose(Zq *q)
  222. {
  223. q->closed = 1;
  224. zqflush(q);
  225. wakeup(&q->rr);
  226. }
  227. static void
  228. zqhangup(Zq *q)
  229. {
  230. q->closed = 1;
  231. wakeup(&q->rr);
  232. }
  233. static void
  234. zqreopen(Zq *q)
  235. {
  236. q->closed = 0;
  237. }
  238. /*
  239. * create a zp, no streams are created until an open
  240. */
  241. static Chan*
  242. zpattach(char *spec)
  243. {
  244. ZPipe *p;
  245. Chan *c;
  246. c = devattach(L'∏', spec);
  247. p = malloc(sizeof(ZPipe));
  248. if(p == 0)
  249. exhausted("memory");
  250. p->ref = 1;
  251. lock(&zpalloc);
  252. p->path = ++zpalloc.path;
  253. unlock(&zpalloc);
  254. mkqid(&c->qid, ZPQID(2*p->path, Qdir), 0, QTDIR);
  255. c->aux = p;
  256. c->devno = 0;
  257. return c;
  258. }
  259. static int
  260. zpgen(Chan *c, char* d, Dirtab *tab, int ntab, int i, Dir *dp)
  261. {
  262. Qid q;
  263. int len;
  264. ZPipe *p;
  265. if(i == DEVDOTDOT){
  266. devdir(c, c->qid, "#∏", 0, eve, DMDIR|0555, dp);
  267. return 1;
  268. }
  269. i++; /* skip . */
  270. if(tab==0 || i>=ntab)
  271. return -1;
  272. tab += i;
  273. p = c->aux;
  274. switch((uint32_t)tab->qid.path){
  275. case Qdata0:
  276. len = ZQLEN(&p->q[0]);
  277. break;
  278. case Qdata1:
  279. len = ZQLEN(&p->q[1]);
  280. break;
  281. default:
  282. len = tab->length;
  283. break;
  284. }
  285. mkqid(&q, ZPQID(ZPID(c->qid.path), tab->qid.path), 0, QTFILE);
  286. devdir(c, q, tab->name, len, eve, tab->perm, dp);
  287. return 1;
  288. }
  289. static Walkqid*
  290. zpwalk(Chan *c, Chan *nc, char **name, int nname)
  291. {
  292. Walkqid *wq;
  293. ZPipe *p;
  294. wq = devwalk(c, nc, name, nname, zpdir, NZPDIR, zpgen);
  295. if(wq != nil && wq->clone != nil && wq->clone != c){
  296. p = c->aux;
  297. qlock(p);
  298. p->ref++;
  299. if(c->flag & COPEN){
  300. print("channel open in zpwalk\n");
  301. switch(ZPTYPE(c->qid.path)){
  302. case Qdata0:
  303. p->qref[0]++;
  304. break;
  305. case Qdata1:
  306. p->qref[1]++;
  307. break;
  308. }
  309. }
  310. qunlock(p);
  311. }
  312. return wq;
  313. }
  314. static int32_t
  315. zpstat(Chan *c, uint8_t *db, int32_t n)
  316. {
  317. ZPipe *p;
  318. Dir dir;
  319. p = c->aux;
  320. switch(ZPTYPE(c->qid.path)){
  321. case Qdir:
  322. devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
  323. break;
  324. case Qdata0:
  325. devdir(c, c->qid, "data", ZQLEN(&p->q[0]), eve, 0600, &dir);
  326. break;
  327. case Qdata1:
  328. devdir(c, c->qid, "data1", ZQLEN(&p->q[1]), eve, 0600, &dir);
  329. break;
  330. default:
  331. panic("zpstat");
  332. }
  333. n = convD2M(&dir, db, n);
  334. if(n < BIT16SZ)
  335. error(Eshortstat);
  336. return n;
  337. }
  338. /*
  339. * if the stream doesn't exist, create it
  340. */
  341. static Chan*
  342. zpopen(Chan *c, int omode)
  343. {
  344. ZPipe *p;
  345. if(c->qid.type & QTDIR){
  346. if(omode != OREAD)
  347. error(Ebadarg);
  348. c->mode = omode;
  349. c->flag |= COPEN;
  350. c->offset = 0;
  351. return c;
  352. }
  353. p = c->aux;
  354. qlock(p);
  355. switch(ZPTYPE(c->qid.path)){
  356. case Qdata0:
  357. p->qref[0]++;
  358. break;
  359. case Qdata1:
  360. p->qref[1]++;
  361. break;
  362. }
  363. qunlock(p);
  364. c->mode = openmode(omode);
  365. c->flag |= COPEN;
  366. c->offset = 0;
  367. c->iounit = Maxatomic; /* should we care? */
  368. return c;
  369. }
  370. static void
  371. zpclose(Chan *c)
  372. {
  373. ZPipe *p;
  374. p = c->aux;
  375. qlock(p);
  376. if(c->flag & COPEN){
  377. /*
  378. * closing either side hangs up the stream
  379. */
  380. switch(ZPTYPE(c->qid.path)){
  381. case Qdata0:
  382. p->qref[0]--;
  383. if(p->qref[0] == 0){
  384. zqhangup(&p->q[1]);
  385. zqclose(&p->q[0]);
  386. }
  387. break;
  388. case Qdata1:
  389. p->qref[1]--;
  390. if(p->qref[1] == 0){
  391. zqhangup(&p->q[0]);
  392. zqclose(&p->q[1]);
  393. }
  394. break;
  395. }
  396. }
  397. /*
  398. * if both sides are closed, they are reusable
  399. */
  400. if(p->qref[0] == 0 && p->qref[1] == 0){
  401. zqreopen(&p->q[0]);
  402. zqreopen(&p->q[1]);
  403. }
  404. /*
  405. * free the structure on last close
  406. */
  407. p->ref--;
  408. if(p->ref == 0){
  409. qunlock(p);
  410. free(p);
  411. } else
  412. qunlock(p);
  413. }
  414. static int32_t
  415. zpread(Chan *c, void *va, int32_t n, int64_t mm)
  416. {
  417. ZPipe *p;
  418. Kzio io[32]; /* might read less than we could */
  419. int nio;
  420. p = c->aux;
  421. switch(ZPTYPE(c->qid.path)){
  422. case Qdir:
  423. return devdirread(c, va, n, zpdir, NZPDIR, zpgen);
  424. case Qdata0:
  425. nio = zqread(&p->q[0], io, nelem(io), n);
  426. return readzio(io, nio, va, n);
  427. case Qdata1:
  428. nio = zqread(&p->q[0], io, nelem(io), n);
  429. return readzio(io, nio, va, n);
  430. default:
  431. panic("zpread");
  432. }
  433. return -1; /* not reached */
  434. }
  435. static int
  436. zpzread(Chan *c, Kzio io[], int nio, usize n, int64_t offset)
  437. {
  438. ZPipe *p;
  439. p = c->aux;
  440. switch(ZPTYPE(c->qid.path)){
  441. case Qdir:
  442. return devzread(c, io, nio, n, offset);
  443. case Qdata0:
  444. return zqread(&p->q[0], io, nio, n);
  445. case Qdata1:
  446. return zqread(&p->q[0], io, nio, n);
  447. default:
  448. panic("zpread");
  449. }
  450. return -1; /* not reached */
  451. }
  452. /*
  453. * a write to a closed zp should cause a note to be sent to
  454. * the process.
  455. * If the data is already in a SG_ZIO segment, we shouldn't
  456. * be copying it again, probably.
  457. */
  458. static int32_t
  459. zpwrite(Chan *c, void *va, int32_t n, int64_t mm)
  460. {
  461. Proc *up = externup();
  462. ZPipe *p;
  463. Kzio io; /* might write less than we could */
  464. int32_t tot, nw;
  465. Segment *s;
  466. Zq *q;
  467. char *cp;
  468. if(n <= 0)
  469. return n;
  470. p = c->aux;
  471. switch(ZPTYPE(c->qid.path)){
  472. case Qdata0:
  473. q = &p->q[1];
  474. break;
  475. case Qdata1:
  476. q = &p->q[0];
  477. break;
  478. default:
  479. q = nil;
  480. panic("zpwrite");
  481. }
  482. s = getzkseg();
  483. if(waserror()){
  484. putseg(s);
  485. nexterror();
  486. }
  487. cp = va;
  488. for(tot = 0; tot < n; tot += nw){
  489. nw = n;
  490. if(nw > Maxatomic)
  491. nw = Maxatomic;
  492. io.Zio.data = alloczio(s, nw);
  493. memmove(io.Zio.data, cp + tot, nw);
  494. io.seg = s;
  495. incref(&s->r);
  496. io.Zio.size = nw;
  497. DBG("zpwrite: copy %Z %#p\n", &io, cp+tot);
  498. zqwrite(q, &io, 1);
  499. }
  500. poperror();
  501. putseg(s);
  502. return n;
  503. }
  504. static int
  505. zpzwrite(Chan *c, Kzio io[], int nio, int64_t mm)
  506. {
  507. ZPipe *p;
  508. p = c->aux;
  509. switch(ZPTYPE(c->qid.path)){
  510. case Qdata0:
  511. zqwrite(&p->q[1], io, nio);
  512. break;
  513. case Qdata1:
  514. zqwrite(&p->q[0], io, nio);
  515. break;
  516. default:
  517. panic("zpwrite");
  518. }
  519. return nio;
  520. }
  521. Dev zpdevtab = {
  522. .dc = L'∏',
  523. .name = "zp",
  524. .reset = devreset,
  525. .init = devinit,
  526. .shutdown = devshutdown,
  527. .attach = zpattach,
  528. .walk = zpwalk,
  529. .stat = zpstat,
  530. .open = zpopen,
  531. .create = devcreate,
  532. .close = zpclose,
  533. .read = zpread,
  534. .bread = devbread,
  535. .write = zpwrite,
  536. .bwrite = devbwrite,
  537. .remove = devremove,
  538. .wstat = devwstat,
  539. .power = nil, /* power */
  540. .config = nil, /* config */
  541. .zread = zpzread,
  542. .zwrite = zpzwrite,
  543. };