devpipe.c 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. /*
  2. * This file is part of the UCB release of Plan 9. It is subject to the license
  3. * terms in the LICENSE file found in the top-level directory of this
  4. * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
  5. * part of the UCB release of Plan 9, including this file, may be copied,
  6. * modified, propagated, or distributed except according to the terms contained
  7. * in the LICENSE file.
  8. */
  9. #include "u.h"
  10. #include "../port/lib.h"
  11. #include "mem.h"
  12. #include "dat.h"
  13. #include "fns.h"
  14. #include "../port/error.h"
  15. typedef struct Pipe Pipe;
  16. struct Pipe
  17. {
  18. QLock QLock;
  19. Pipe *next;
  20. int ref;
  21. uint32_t path;
  22. Queue *q[2];
  23. int qref[2];
  24. };
  25. struct
  26. {
  27. Lock Lock;
  28. uint32_t path;
  29. } pipealloc;
  30. enum
  31. {
  32. Qdir,
  33. Qdata0,
  34. Qdata1,
  35. };
  36. Dirtab pipedir[] =
  37. {
  38. {".", {Qdir,0,QTDIR}, 0, DMDIR|0500},
  39. {"data", {Qdata0}, 0, 0600},
  40. {"data1", {Qdata1}, 0, 0600},
  41. };
  42. #define NPIPEDIR 3
  43. #define PIPETYPE(x) (((unsigned)x)&0x1f)
  44. #define PIPEID(x) ((((unsigned)x))>>5)
  45. #define PIPEQID(i, t) ((((unsigned)i)<<5)|(t))
  46. enum
  47. {
  48. /* Plan 9 default for nmach > 1 */
  49. Pipeqsize = 256*1024
  50. };
  51. static void
  52. pipeinit(void)
  53. {
  54. }
  55. /*
  56. * create a pipe, no streams are created until an open
  57. */
  58. static Chan*
  59. pipeattach(char *spec)
  60. {
  61. Pipe *p;
  62. Chan *c;
  63. c = devattach('|', spec);
  64. p = malloc(sizeof(Pipe));
  65. if(p == 0)
  66. exhausted("memory");
  67. p->ref = 1;
  68. p->q[0] = qopen(Pipeqsize, 0, 0, 0);
  69. if(p->q[0] == 0){
  70. free(p);
  71. exhausted("memory");
  72. }
  73. p->q[1] = qopen(Pipeqsize, 0, 0, 0);
  74. if(p->q[1] == 0){
  75. free(p->q[0]);
  76. free(p);
  77. exhausted("memory");
  78. }
  79. lock(&pipealloc.Lock);
  80. p->path = ++pipealloc.path;
  81. unlock(&pipealloc.Lock);
  82. mkqid(&c->qid, PIPEQID(2*p->path, Qdir), 0, QTDIR);
  83. c->aux = p;
  84. c->devno = 0;
  85. return c;
  86. }
  87. static int
  88. pipegen(Chan *c, char* d, Dirtab *tab, int ntab, int i, Dir *dp)
  89. {
  90. Qid q;
  91. int len;
  92. Pipe *p;
  93. if(i == DEVDOTDOT){
  94. devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
  95. return 1;
  96. }
  97. i++; /* skip . */
  98. if(tab==0 || i>=ntab)
  99. return -1;
  100. tab += i;
  101. p = c->aux;
  102. switch((uint32_t)tab->qid.path){
  103. case Qdata0:
  104. len = qlen(p->q[0]);
  105. break;
  106. case Qdata1:
  107. len = qlen(p->q[1]);
  108. break;
  109. default:
  110. len = tab->length;
  111. break;
  112. }
  113. mkqid(&q, PIPEQID(PIPEID(c->qid.path), tab->qid.path), 0, QTFILE);
  114. devdir(c, q, tab->name, len, eve, tab->perm, dp);
  115. return 1;
  116. }
  117. static Walkqid*
  118. pipewalk(Chan *c, Chan *nc, char **name, int nname)
  119. {
  120. Walkqid *wq;
  121. Pipe *p;
  122. wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
  123. if(wq != nil && wq->clone != nil && wq->clone != c){
  124. p = c->aux;
  125. qlock(&p->QLock);
  126. p->ref++;
  127. if(c->flag & COPEN){
  128. print("channel open in pipewalk\n");
  129. switch(PIPETYPE(c->qid.path)){
  130. case Qdata0:
  131. p->qref[0]++;
  132. break;
  133. case Qdata1:
  134. p->qref[1]++;
  135. break;
  136. }
  137. }
  138. qunlock(&p->QLock);
  139. }
  140. return wq;
  141. }
  142. static int32_t
  143. pipestat(Chan *c, uint8_t *db, int32_t n)
  144. {
  145. Pipe *p;
  146. Dir dir;
  147. p = c->aux;
  148. switch(PIPETYPE(c->qid.path)){
  149. case Qdir:
  150. devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
  151. break;
  152. case Qdata0:
  153. devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
  154. break;
  155. case Qdata1:
  156. devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
  157. break;
  158. default:
  159. panic("pipestat");
  160. }
  161. n = convD2M(&dir, db, n);
  162. if(n < BIT16SZ)
  163. error(Eshortstat);
  164. return n;
  165. }
  166. /*
  167. * if the stream doesn't exist, create it
  168. */
  169. static Chan*
  170. pipeopen(Chan *c, int omode)
  171. {
  172. Pipe *p;
  173. if(c->qid.type & QTDIR){
  174. if(omode != OREAD)
  175. error(Ebadarg);
  176. c->mode = omode;
  177. c->flag |= COPEN;
  178. c->offset = 0;
  179. return c;
  180. }
  181. p = c->aux;
  182. qlock(&p->QLock);
  183. switch(PIPETYPE(c->qid.path)){
  184. case Qdata0:
  185. p->qref[0]++;
  186. break;
  187. case Qdata1:
  188. p->qref[1]++;
  189. break;
  190. }
  191. qunlock(&p->QLock);
  192. c->mode = openmode(omode);
  193. c->flag |= COPEN;
  194. c->offset = 0;
  195. c->iounit = qiomaxatomic;
  196. return c;
  197. }
  198. static void
  199. pipeclose(Chan *c)
  200. {
  201. Pipe *p;
  202. p = c->aux;
  203. qlock(&p->QLock);
  204. if(c->flag & COPEN){
  205. /*
  206. * closing either side hangs up the stream
  207. */
  208. switch(PIPETYPE(c->qid.path)){
  209. case Qdata0:
  210. p->qref[0]--;
  211. if(p->qref[0] == 0){
  212. qhangup(p->q[1], 0);
  213. qclose(p->q[0]);
  214. }
  215. break;
  216. case Qdata1:
  217. p->qref[1]--;
  218. if(p->qref[1] == 0){
  219. qhangup(p->q[0], 0);
  220. qclose(p->q[1]);
  221. }
  222. break;
  223. }
  224. }
  225. /*
  226. * if both sides are closed, they are reusable
  227. */
  228. if(p->qref[0] == 0 && p->qref[1] == 0){
  229. qreopen(p->q[0]);
  230. qreopen(p->q[1]);
  231. }
  232. /*
  233. * free the structure on last close
  234. */
  235. p->ref--;
  236. if(p->ref == 0){
  237. qunlock(&p->QLock);
  238. free(p->q[0]);
  239. free(p->q[1]);
  240. free(p);
  241. } else
  242. qunlock(&p->QLock);
  243. }
  244. static int32_t
  245. piperead(Chan *c, void *va, int32_t n, int64_t m)
  246. {
  247. Pipe *p;
  248. p = c->aux;
  249. switch(PIPETYPE(c->qid.path)){
  250. case Qdir:
  251. return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
  252. case Qdata0:
  253. return qread(p->q[0], va, n);
  254. case Qdata1:
  255. return qread(p->q[1], va, n);
  256. default:
  257. panic("piperead");
  258. }
  259. return -1; /* not reached */
  260. }
  261. static Block*
  262. pipebread(Chan *c, int32_t n, int64_t offset)
  263. {
  264. Pipe *p;
  265. p = c->aux;
  266. switch(PIPETYPE(c->qid.path)){
  267. case Qdata0:
  268. return qbread(p->q[0], n);
  269. case Qdata1:
  270. return qbread(p->q[1], n);
  271. }
  272. return devbread(c, n, offset);
  273. }
  274. /*
  275. * a write to a closed pipe causes a note to be sent to
  276. * the process.
  277. */
  278. static int32_t
  279. pipewrite(Chan *c, void *va, int32_t n, int64_t mm)
  280. {
  281. Proc *up = externup();
  282. Pipe *p;
  283. if(0)if(!islo())
  284. print("pipewrite hi %#p\n", getcallerpc()); // devmnt?
  285. if(waserror()) {
  286. /* avoid notes when pipe is a mounted queue */
  287. if((c->flag & CMSG) == 0)
  288. postnote(up, 1, "sys: write on closed pipe", NUser);
  289. nexterror();
  290. }
  291. p = c->aux;
  292. switch(PIPETYPE(c->qid.path)){
  293. case Qdata0:
  294. n = qwrite(p->q[1], va, n);
  295. break;
  296. case Qdata1:
  297. n = qwrite(p->q[0], va, n);
  298. break;
  299. default:
  300. panic("pipewrite");
  301. }
  302. poperror();
  303. return n;
  304. }
  305. static int32_t
  306. pipebwrite(Chan *c, Block *bp, int64_t mm)
  307. {
  308. Proc *up = externup();
  309. int32_t n;
  310. Pipe *p;
  311. if(waserror()) {
  312. /* avoid notes when pipe is a mounted queue */
  313. if((c->flag & CMSG) == 0)
  314. postnote(up, 1, "sys: write on closed pipe", NUser);
  315. nexterror();
  316. }
  317. p = c->aux;
  318. switch(PIPETYPE(c->qid.path)){
  319. case Qdata0:
  320. n = qbwrite(p->q[1], bp);
  321. break;
  322. case Qdata1:
  323. n = qbwrite(p->q[0], bp);
  324. break;
  325. default:
  326. n = 0;
  327. panic("pipebwrite");
  328. }
  329. poperror();
  330. return n;
  331. }
  332. Dev pipedevtab = {
  333. .dc = '|',
  334. .name = "pipe",
  335. .reset = devreset,
  336. .init = pipeinit,
  337. .shutdown = devshutdown,
  338. .attach = pipeattach,
  339. .walk = pipewalk,
  340. .stat = pipestat,
  341. .open = pipeopen,
  342. .create = devcreate,
  343. .close = pipeclose,
  344. .read = piperead,
  345. .bread = pipebread,
  346. .write = pipewrite,
  347. .bwrite = pipebwrite,
  348. .remove = devremove,
  349. .wstat = devwstat,
  350. };