devpipe.c 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. #include "u.h"
  2. #include "lib.h"
  3. #include "dat.h"
  4. #include "fns.h"
  5. #include "error.h"
  6. #include "netif.h"
  7. typedef struct Pipe Pipe;
  8. struct Pipe
  9. {
  10. QLock lk;
  11. Pipe *next;
  12. int ref;
  13. ulong path;
  14. Queue *q[2];
  15. int qref[2];
  16. };
  17. struct
  18. {
  19. Lock lk;
  20. ulong path;
  21. } pipealloc;
  22. enum
  23. {
  24. Qdir,
  25. Qdata0,
  26. Qdata1,
  27. };
  28. Dirtab pipedir[] =
  29. {
  30. ".", {Qdir,0,QTDIR}, 0, DMDIR|0500,
  31. "data", {Qdata0}, 0, 0600,
  32. "data1", {Qdata1}, 0, 0600,
  33. };
  34. #define NPIPEDIR 3
  35. static void
  36. pipeinit(void)
  37. {
  38. if(conf.pipeqsize == 0){
  39. if(conf.nmach > 1)
  40. conf.pipeqsize = 256*1024;
  41. else
  42. conf.pipeqsize = 32*1024;
  43. }
  44. }
  45. /*
  46. * create a pipe, no streams are created until an open
  47. */
  48. static Chan*
  49. pipeattach(char *spec)
  50. {
  51. Pipe *p;
  52. Chan *c;
  53. c = devattach('|', spec);
  54. p = malloc(sizeof(Pipe));
  55. if(p == 0)
  56. exhausted("memory");
  57. p->ref = 1;
  58. p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
  59. if(p->q[0] == 0){
  60. free(p);
  61. exhausted("memory");
  62. }
  63. p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
  64. if(p->q[1] == 0){
  65. free(p->q[0]);
  66. free(p);
  67. exhausted("memory");
  68. }
  69. lock(&pipealloc.lk);
  70. p->path = ++pipealloc.path;
  71. unlock(&pipealloc.lk);
  72. mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR);
  73. c->aux = p;
  74. c->dev = 0;
  75. return c;
  76. }
  77. static int
  78. pipegen(Chan *c, char *name, Dirtab *tab, int ntab, int i, Dir *dp)
  79. {
  80. Qid q;
  81. int len;
  82. Pipe *p;
  83. USED(name);
  84. if(i == DEVDOTDOT){
  85. devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
  86. return 1;
  87. }
  88. i++; /* skip . */
  89. if(tab==0 || i>=ntab)
  90. return -1;
  91. tab += i;
  92. p = c->aux;
  93. switch((ulong)tab->qid.path){
  94. case Qdata0:
  95. len = qlen(p->q[0]);
  96. break;
  97. case Qdata1:
  98. len = qlen(p->q[1]);
  99. break;
  100. default:
  101. len = tab->length;
  102. break;
  103. }
  104. mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE);
  105. devdir(c, q, tab->name, len, eve, tab->perm, dp);
  106. return 1;
  107. }
  108. static Walkqid*
  109. pipewalk(Chan *c, Chan *nc, char **name, int nname)
  110. {
  111. Walkqid *wq;
  112. Pipe *p;
  113. wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
  114. if(wq != nil && wq->clone != nil && wq->clone != c){
  115. p = c->aux;
  116. qlock(&p->lk);
  117. p->ref++;
  118. if(c->flag & COPEN){
  119. print("channel open in pipewalk\n");
  120. switch(NETTYPE(c->qid.path)){
  121. case Qdata0:
  122. p->qref[0]++;
  123. break;
  124. case Qdata1:
  125. p->qref[1]++;
  126. break;
  127. }
  128. }
  129. qunlock(&p->lk);
  130. }
  131. return wq;
  132. }
  133. static int
  134. pipestat(Chan *c, uchar *db, int n)
  135. {
  136. Pipe *p;
  137. Dir dir;
  138. p = c->aux;
  139. switch(NETTYPE(c->qid.path)){
  140. case Qdir:
  141. devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
  142. break;
  143. case Qdata0:
  144. devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
  145. break;
  146. case Qdata1:
  147. devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
  148. break;
  149. default:
  150. panic("pipestat");
  151. }
  152. n = convD2M(&dir, db, n);
  153. if(n < BIT16SZ)
  154. error(Eshortstat);
  155. return n;
  156. }
  157. /*
  158. * if the stream doesn't exist, create it
  159. */
  160. static Chan*
  161. pipeopen(Chan *c, int omode)
  162. {
  163. Pipe *p;
  164. if(c->qid.type & QTDIR){
  165. if(omode != OREAD)
  166. error(Ebadarg);
  167. c->mode = omode;
  168. c->flag |= COPEN;
  169. c->offset = 0;
  170. return c;
  171. }
  172. p = c->aux;
  173. qlock(&p->lk);
  174. switch(NETTYPE(c->qid.path)){
  175. case Qdata0:
  176. p->qref[0]++;
  177. break;
  178. case Qdata1:
  179. p->qref[1]++;
  180. break;
  181. }
  182. qunlock(&p->lk);
  183. c->mode = openmode(omode);
  184. c->flag |= COPEN;
  185. c->offset = 0;
  186. c->iounit = qiomaxatomic;
  187. return c;
  188. }
  189. static void
  190. pipeclose(Chan *c)
  191. {
  192. Pipe *p;
  193. p = c->aux;
  194. qlock(&p->lk);
  195. if(c->flag & COPEN){
  196. /*
  197. * closing either side hangs up the stream
  198. */
  199. switch(NETTYPE(c->qid.path)){
  200. case Qdata0:
  201. p->qref[0]--;
  202. if(p->qref[0] == 0){
  203. qhangup(p->q[1], 0);
  204. qclose(p->q[0]);
  205. }
  206. break;
  207. case Qdata1:
  208. p->qref[1]--;
  209. if(p->qref[1] == 0){
  210. qhangup(p->q[0], 0);
  211. qclose(p->q[1]);
  212. }
  213. break;
  214. }
  215. }
  216. /*
  217. * if both sides are closed, they are reusable
  218. */
  219. if(p->qref[0] == 0 && p->qref[1] == 0){
  220. qreopen(p->q[0]);
  221. qreopen(p->q[1]);
  222. }
  223. /*
  224. * free the structure on last close
  225. */
  226. p->ref--;
  227. if(p->ref == 0){
  228. qunlock(&p->lk);
  229. free(p->q[0]);
  230. free(p->q[1]);
  231. free(p);
  232. } else
  233. qunlock(&p->lk);
  234. }
  235. static long
  236. piperead(Chan *c, void *va, long n, vlong offset)
  237. {
  238. Pipe *p;
  239. USED(offset);
  240. p = c->aux;
  241. switch(NETTYPE(c->qid.path)){
  242. case Qdir:
  243. return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
  244. case Qdata0:
  245. return qread(p->q[0], va, n);
  246. case Qdata1:
  247. return qread(p->q[1], va, n);
  248. default:
  249. panic("piperead");
  250. }
  251. return -1; /* not reached */
  252. }
  253. static Block*
  254. pipebread(Chan *c, long n, ulong offset)
  255. {
  256. Pipe *p;
  257. p = c->aux;
  258. switch(NETTYPE(c->qid.path)){
  259. case Qdata0:
  260. return qbread(p->q[0], n);
  261. case Qdata1:
  262. return qbread(p->q[1], n);
  263. }
  264. return devbread(c, n, offset);
  265. }
  266. /*
  267. * a write to a closed pipe causes a note to be sent to
  268. * the process.
  269. */
  270. static long
  271. pipewrite(Chan *c, void *va, long n, vlong offset)
  272. {
  273. Pipe *p;
  274. USED(offset);
  275. if(!islo())
  276. print("pipewrite hi %lux\n", getcallerpc(&c));
  277. if(waserror()) {
  278. /* avoid notes when pipe is a mounted queue */
  279. if((c->flag & CMSG) == 0)
  280. postnote(up, 1, "sys: write on closed pipe", NUser);
  281. nexterror();
  282. }
  283. p = c->aux;
  284. switch(NETTYPE(c->qid.path)){
  285. case Qdata0:
  286. n = qwrite(p->q[1], va, n);
  287. break;
  288. case Qdata1:
  289. n = qwrite(p->q[0], va, n);
  290. break;
  291. default:
  292. panic("pipewrite");
  293. }
  294. poperror();
  295. return n;
  296. }
  297. static long
  298. pipebwrite(Chan *c, Block *bp, ulong offset)
  299. {
  300. long n;
  301. Pipe *p;
  302. USED(offset);
  303. if(waserror()) {
  304. /* avoid notes when pipe is a mounted queue */
  305. if((c->flag & CMSG) == 0)
  306. postnote(up, 1, "sys: write on closed pipe", NUser);
  307. nexterror();
  308. }
  309. p = c->aux;
  310. switch(NETTYPE(c->qid.path)){
  311. case Qdata0:
  312. n = qbwrite(p->q[1], bp);
  313. break;
  314. case Qdata1:
  315. n = qbwrite(p->q[0], bp);
  316. break;
  317. default:
  318. n = 0;
  319. panic("pipebwrite");
  320. }
  321. poperror();
  322. return n;
  323. }
  324. Dev pipedevtab = {
  325. '|',
  326. "pipe",
  327. devreset,
  328. pipeinit,
  329. devshutdown,
  330. pipeattach,
  331. pipewalk,
  332. pipestat,
  333. pipeopen,
  334. devcreate,
  335. pipeclose,
  336. piperead,
  337. pipebread,
  338. pipewrite,
  339. pipebwrite,
  340. devremove,
  341. devwstat,
  342. };