devpipe.c 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. #include "u.h"
  2. #include "../port/lib.h"
  3. #include "mem.h"
  4. #include "dat.h"
  5. #include "fns.h"
  6. #include "../port/error.h"
  7. #include "netif.h"
  8. typedef struct Pipe Pipe;
  9. struct Pipe
  10. {
  11. QLock;
  12. Pipe *next;
  13. int ref;
  14. ulong path;
  15. Queue *q[2];
  16. int qref[2];
  17. };
  18. struct
  19. {
  20. Lock;
  21. ulong path;
  22. } pipealloc;
  23. enum
  24. {
  25. Qdir,
  26. Qdata0,
  27. Qdata1,
  28. };
  29. Dirtab pipedir[] =
  30. {
  31. ".", {Qdir,0,QTDIR}, 0, DMDIR|0500,
  32. "data", {Qdata0}, 0, 0600,
  33. "data1", {Qdata1}, 0, 0600,
  34. };
  35. #define NPIPEDIR 3
  36. static void
  37. pipeinit(void)
  38. {
  39. if(conf.pipeqsize == 0){
  40. if(conf.nmach > 1)
  41. conf.pipeqsize = 256*1024;
  42. else
  43. conf.pipeqsize = 32*1024;
  44. }
  45. }
  46. /*
  47. * create a pipe, no streams are created until an open
  48. */
  49. static Chan*
  50. pipeattach(char *spec)
  51. {
  52. Pipe *p;
  53. Chan *c;
  54. c = devattach('|', spec);
  55. p = malloc(sizeof(Pipe));
  56. if(p == 0)
  57. exhausted("memory");
  58. p->ref = 1;
  59. p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
  60. if(p->q[0] == 0){
  61. free(p);
  62. exhausted("memory");
  63. }
  64. p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
  65. if(p->q[1] == 0){
  66. free(p->q[0]);
  67. free(p);
  68. exhausted("memory");
  69. }
  70. lock(&pipealloc);
  71. p->path = ++pipealloc.path;
  72. unlock(&pipealloc);
  73. mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR);
  74. c->aux = p;
  75. c->dev = 0;
  76. return c;
  77. }
  78. static int
  79. pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
  80. {
  81. Qid q;
  82. int len;
  83. Pipe *p;
  84. if(i == DEVDOTDOT){
  85. devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
  86. return 1;
  87. }
  88. i++; /* skip . */
  89. if(tab==0 || i>=ntab)
  90. return -1;
  91. tab += i;
  92. p = c->aux;
  93. switch((ulong)tab->qid.path){
  94. case Qdata0:
  95. len = qlen(p->q[0]);
  96. break;
  97. case Qdata1:
  98. len = qlen(p->q[1]);
  99. break;
  100. default:
  101. len = tab->length;
  102. break;
  103. }
  104. mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE);
  105. devdir(c, q, tab->name, len, eve, tab->perm, dp);
  106. return 1;
  107. }
  108. static Walkqid*
  109. pipewalk(Chan *c, Chan *nc, char **name, int nname)
  110. {
  111. Walkqid *wq;
  112. Pipe *p;
  113. wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
  114. if(wq != nil && wq->clone != nil && wq->clone != c){
  115. p = c->aux;
  116. qlock(p);
  117. p->ref++;
  118. if(c->flag & COPEN){
  119. print("channel open in pipewalk\n");
  120. switch(NETTYPE(c->qid.path)){
  121. case Qdata0:
  122. p->qref[0]++;
  123. break;
  124. case Qdata1:
  125. p->qref[1]++;
  126. break;
  127. }
  128. }
  129. qunlock(p);
  130. }
  131. return wq;
  132. }
  133. static int
  134. pipestat(Chan *c, uchar *db, int n)
  135. {
  136. Pipe *p;
  137. Dir dir;
  138. p = c->aux;
  139. switch(NETTYPE(c->qid.path)){
  140. case Qdir:
  141. devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
  142. break;
  143. case Qdata0:
  144. devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
  145. break;
  146. case Qdata1:
  147. devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
  148. break;
  149. default:
  150. panic("pipestat");
  151. }
  152. n = convD2M(&dir, db, n);
  153. if(n < BIT16SZ)
  154. error(Eshortstat);
  155. return n;
  156. }
  157. /*
  158. * if the stream doesn't exist, create it
  159. */
  160. static Chan*
  161. pipeopen(Chan *c, int omode)
  162. {
  163. Pipe *p;
  164. if(c->qid.type & QTDIR){
  165. if(omode != OREAD)
  166. error(Ebadarg);
  167. c->mode = omode;
  168. c->flag |= COPEN;
  169. c->offset = 0;
  170. return c;
  171. }
  172. p = c->aux;
  173. qlock(p);
  174. switch(NETTYPE(c->qid.path)){
  175. case Qdata0:
  176. p->qref[0]++;
  177. break;
  178. case Qdata1:
  179. p->qref[1]++;
  180. break;
  181. }
  182. qunlock(p);
  183. c->mode = openmode(omode);
  184. c->flag |= COPEN;
  185. c->offset = 0;
  186. c->iounit = qiomaxatomic;
  187. return c;
  188. }
  189. static void
  190. pipeclose(Chan *c)
  191. {
  192. Pipe *p;
  193. p = c->aux;
  194. qlock(p);
  195. if(c->flag & COPEN){
  196. /*
  197. * closing either side hangs up the stream
  198. */
  199. switch(NETTYPE(c->qid.path)){
  200. case Qdata0:
  201. p->qref[0]--;
  202. if(p->qref[0] == 0){
  203. qhangup(p->q[1], 0);
  204. qclose(p->q[0]);
  205. }
  206. break;
  207. case Qdata1:
  208. p->qref[1]--;
  209. if(p->qref[1] == 0){
  210. qhangup(p->q[0], 0);
  211. qclose(p->q[1]);
  212. }
  213. break;
  214. }
  215. }
  216. /*
  217. * if both sides are closed, they are reusable
  218. */
  219. if(p->qref[0] == 0 && p->qref[1] == 0){
  220. qreopen(p->q[0]);
  221. qreopen(p->q[1]);
  222. }
  223. /*
  224. * free the structure on last close
  225. */
  226. p->ref--;
  227. if(p->ref == 0){
  228. qunlock(p);
  229. free(p->q[0]);
  230. free(p->q[1]);
  231. free(p);
  232. } else
  233. qunlock(p);
  234. }
  235. static long
  236. piperead(Chan *c, void *va, long n, vlong)
  237. {
  238. Pipe *p;
  239. p = c->aux;
  240. switch(NETTYPE(c->qid.path)){
  241. case Qdir:
  242. return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
  243. case Qdata0:
  244. return qread(p->q[0], va, n);
  245. case Qdata1:
  246. return qread(p->q[1], va, n);
  247. default:
  248. panic("piperead");
  249. }
  250. return -1; /* not reached */
  251. }
  252. static Block*
  253. pipebread(Chan *c, long n, ulong offset)
  254. {
  255. Pipe *p;
  256. p = c->aux;
  257. switch(NETTYPE(c->qid.path)){
  258. case Qdata0:
  259. return qbread(p->q[0], n);
  260. case Qdata1:
  261. return qbread(p->q[1], n);
  262. }
  263. return devbread(c, n, offset);
  264. }
  265. /*
  266. * a write to a closed pipe causes a note to be sent to
  267. * the process.
  268. */
  269. static long
  270. pipewrite(Chan *c, void *va, long n, vlong)
  271. {
  272. Pipe *p;
  273. if(!islo())
  274. print("pipewrite hi %#p\n", getcallerpc(&c));
  275. if(waserror()) {
  276. /* avoid notes when pipe is a mounted queue */
  277. if((c->flag & CMSG) == 0)
  278. postnote(up, 1, "sys: write on closed pipe", NUser);
  279. nexterror();
  280. }
  281. p = c->aux;
  282. switch(NETTYPE(c->qid.path)){
  283. case Qdata0:
  284. n = qwrite(p->q[1], va, n);
  285. break;
  286. case Qdata1:
  287. n = qwrite(p->q[0], va, n);
  288. break;
  289. default:
  290. panic("pipewrite");
  291. }
  292. poperror();
  293. return n;
  294. }
  295. static long
  296. pipebwrite(Chan *c, Block *bp, ulong)
  297. {
  298. long n;
  299. Pipe *p;
  300. if(waserror()) {
  301. /* avoid notes when pipe is a mounted queue */
  302. if((c->flag & CMSG) == 0)
  303. postnote(up, 1, "sys: write on closed pipe", NUser);
  304. nexterror();
  305. }
  306. p = c->aux;
  307. switch(NETTYPE(c->qid.path)){
  308. case Qdata0:
  309. n = qbwrite(p->q[1], bp);
  310. break;
  311. case Qdata1:
  312. n = qbwrite(p->q[0], bp);
  313. break;
  314. default:
  315. n = 0;
  316. panic("pipebwrite");
  317. }
  318. poperror();
  319. return n;
  320. }
  321. Dev pipedevtab = {
  322. '|',
  323. "pipe",
  324. devreset,
  325. pipeinit,
  326. devshutdown,
  327. pipeattach,
  328. pipewalk,
  329. pipestat,
  330. pipeopen,
  331. devcreate,
  332. pipeclose,
  333. piperead,
  334. pipebread,
  335. pipewrite,
  336. pipebwrite,
  337. devremove,
  338. devwstat,
  339. };