devfdmux.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. /*
  2. * This file is part of the UCB release of Plan 9. It is subject to the license
  3. * terms in the LICENSE file found in the top-level directory of this
  4. * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
  5. * part of the UCB release of Plan 9, including this file, may be copied,
  6. * modified, propagated, or distributed except according to the terms contained
  7. * in the LICENSE file.
  8. */
  9. /* fdmux is a way to mediate access to a read/write fd to a set of processes.
  10. * The owner of the mux reads and writes fd[0]. The children read and write fd[1].
  11. * Access to fd[1] is controlled by which process group a process is in.
  12. * We are not fooling with SIGHUP yet. Need to think on that.
  13. */
  14. #include "u.h"
  15. #include "../port/lib.h"
  16. #include "mem.h"
  17. #include "dat.h"
  18. #include "fns.h"
  19. #include "../port/error.h"
  20. typedef struct Fdmux Fdmux;
  21. struct Fdmux
  22. {
  23. QLock QLock;
  24. Fdmux *next;
  25. int ref;
  26. uint32_t path;
  27. Queue *q[2];
  28. int qref[2];
  29. int owner; // pid of owner, e.g. regress/fdmux.
  30. int pgrpid; // id of processes allowed to read/write fd[1]. If they do not match, they slep
  31. int slpid; // session leader. If > 0, we send them a note if anyone blocks on read/write.
  32. int active; // The active pid. Useful for interrupt.
  33. int dead;
  34. int debug;
  35. Rendez r;
  36. };
  37. struct
  38. {
  39. Lock Lock;
  40. uint32_t path;
  41. } fdmuxalloc;
  42. enum
  43. {
  44. Qdir,
  45. Qdata0,
  46. Qdata1,
  47. Qctl,
  48. };
  49. /* this is intended to be mounted in /dev with MBEFORE. That way programs that open
  50. * /dev/cons* will get our version.
  51. */
  52. Dirtab fdmuxdir[] =
  53. {
  54. {".", {Qdir,0,QTDIR}, 0, DMDIR|0500},
  55. {"m", {Qdata0}, 0, 0600},
  56. {"cons", {Qdata1}, 0, 0600},
  57. {"consctl", {Qctl}, 0, 0600},
  58. };
  59. #define NFDMUXDIR 4
  60. #define FDMUXTYPE(x) (((unsigned)x)&0x1f)
  61. #define FDMUXID(x) ((((unsigned)x))>>5)
  62. #define FDMUXQID(i, t) ((((unsigned)i)<<5)|(t))
  63. enum
  64. {
  65. /* Plan 9 default for nmach > 1 */
  66. Fdmuxqsize = 256*1024
  67. };
  68. static int testready(void *v)
  69. {
  70. Chan *c = v;
  71. Proc *up = externup();
  72. Fdmux *p;
  73. p = c->aux;
  74. if (up->pgrp->pgrpid == p->pgrpid)
  75. return 1;
  76. return 0;
  77. }
  78. static void
  79. fdmuxinit(void)
  80. {
  81. }
  82. /*
  83. * create a fdmux, no streams are created until an open
  84. */
  85. static Chan*
  86. fdmuxattach(char *spec)
  87. {
  88. Fdmux *p;
  89. Chan *c;
  90. Proc *up = externup();
  91. c = devattach('<', spec);
  92. p = malloc(sizeof(Fdmux));
  93. if(p == 0)
  94. exhausted("memory");
  95. p->ref = 1;
  96. p->pgrpid = up->pgrp->pgrpid;
  97. p->q[0] = qopen(Fdmuxqsize, 0, 0, 0);
  98. if(p->q[0] == 0){
  99. free(p);
  100. exhausted("memory");
  101. }
  102. p->q[1] = qopen(Fdmuxqsize, 0, 0, 0);
  103. if(p->q[1] == 0){
  104. free(p->q[0]);
  105. free(p);
  106. exhausted("memory");
  107. }
  108. lock(&fdmuxalloc.Lock);
  109. p->path = ++fdmuxalloc.path;
  110. unlock(&fdmuxalloc.Lock);
  111. mkqid(&c->qid, FDMUXQID(2*p->path, Qdir), 0, QTDIR);
  112. c->aux = p;
  113. c->devno = 0;
  114. return c;
  115. }
  116. static int
  117. fdmuxgen(Chan *c, char* d, Dirtab *tab, int ntab, int i, Dir *dp)
  118. {
  119. Qid q;
  120. int len;
  121. Fdmux *p;
  122. if(i == DEVDOTDOT){
  123. devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
  124. return 1;
  125. }
  126. i++; /* skip . */
  127. if(tab==0 || i>=ntab)
  128. return -1;
  129. tab += i;
  130. p = c->aux;
  131. switch((uint32_t)tab->qid.path){
  132. case Qdata0:
  133. len = qlen(p->q[0]);
  134. break;
  135. case Qdata1:
  136. len = qlen(p->q[1]);
  137. break;
  138. default:
  139. len = tab->length;
  140. break;
  141. }
  142. mkqid(&q, FDMUXQID(FDMUXID(c->qid.path), tab->qid.path), 0, QTFILE);
  143. devdir(c, q, tab->name, len, eve, tab->perm, dp);
  144. return 1;
  145. }
  146. static Walkqid*
  147. fdmuxwalk(Chan *c, Chan *nc, char **name, int nname)
  148. {
  149. Walkqid *wq;
  150. Fdmux *p;
  151. wq = devwalk(c, nc, name, nname, fdmuxdir, NFDMUXDIR, fdmuxgen);
  152. if(wq != nil && wq->clone != nil && wq->clone != c){
  153. p = c->aux;
  154. qlock(&p->QLock);
  155. p->ref++;
  156. if(c->flag & COPEN){
  157. print("channel open in fdmuxwalk\n");
  158. switch(FDMUXTYPE(c->qid.path)){
  159. case Qdata0:
  160. p->qref[0]++;
  161. break;
  162. case Qdata1:
  163. p->qref[1]++;
  164. break;
  165. }
  166. }
  167. qunlock(&p->QLock);
  168. }
  169. return wq;
  170. }
  171. static int32_t
  172. fdmuxstat(Chan *c, uint8_t *db, int32_t n)
  173. {
  174. Fdmux *p;
  175. Dir dir;
  176. p = c->aux;
  177. switch(FDMUXTYPE(c->qid.path)){
  178. case Qdir:
  179. devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
  180. break;
  181. case Qdata0:
  182. devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
  183. break;
  184. case Qdata1:
  185. devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
  186. break;
  187. default:
  188. panic("fdmuxstat");
  189. }
  190. n = convD2M(&dir, db, n);
  191. if(n < BIT16SZ)
  192. error(Eshortstat);
  193. return n;
  194. }
  195. /*
  196. * if the stream doesn't exist, create it
  197. */
  198. static Chan*
  199. fdmuxopen(Chan *c, int omode)
  200. {
  201. Fdmux *p;
  202. if(c->qid.type & QTDIR){
  203. if(omode != OREAD)
  204. error(Ebadarg);
  205. c->mode = omode;
  206. c->flag |= COPEN;
  207. c->offset = 0;
  208. return c;
  209. }
  210. p = c->aux;
  211. qlock(&p->QLock);
  212. switch(FDMUXTYPE(c->qid.path)){
  213. case Qdata0:
  214. p->qref[0]++;
  215. break;
  216. case Qdata1:
  217. p->qref[1]++;
  218. break;
  219. }
  220. qunlock(&p->QLock);
  221. c->mode = openmode(omode);
  222. c->flag |= COPEN;
  223. c->offset = 0;
  224. c->iounit = qiomaxatomic;
  225. return c;
  226. }
  227. static void
  228. fdmuxclose(Chan *c)
  229. {
  230. Fdmux *p;
  231. p = c->aux;
  232. /* Any close by the session leader kills the fdmux.
  233. * Problem: how do we detect this? Because session leaders
  234. * can do thinks like dup and close. Most annoying.
  235. if (up->pid == p->slpid)
  236. p->dead = 1;
  237. */
  238. qlock(&p->QLock);
  239. if(c->flag & COPEN){
  240. /*
  241. * closing either side hangs up the stream
  242. */
  243. switch(FDMUXTYPE(c->qid.path)){
  244. case Qdata0:
  245. p->qref[0]--;
  246. if(p->qref[0] == 0){
  247. qhangup(p->q[1], 0);
  248. qclose(p->q[0]);
  249. }
  250. break;
  251. case Qdata1:
  252. // TODO: what if we are the last member of this Pgrp? oopsie.
  253. // We *could* check the ref I guess.
  254. p->qref[1]--;
  255. if(p->qref[1] == 0){
  256. qhangup(p->q[0], 0);
  257. qclose(p->q[1]);
  258. }
  259. break;
  260. }
  261. }
  262. /*
  263. * free the structure on last close
  264. */
  265. p->ref--;
  266. if(p->ref == 0){
  267. qunlock(&p->QLock);
  268. free(p->q[0]);
  269. free(p->q[1]);
  270. free(p);
  271. } else
  272. qunlock(&p->QLock);
  273. }
  274. static int32_t
  275. fdmuxread(Chan *c, void *va, int32_t n, int64_t m)
  276. {
  277. Proc *up = externup();
  278. Fdmux *p;
  279. char buf[32];
  280. p = c->aux;
  281. switch(FDMUXTYPE(c->qid.path)){
  282. case Qdir:
  283. return devdirread(c, va, n, fdmuxdir, NFDMUXDIR, fdmuxgen);
  284. case Qctl:
  285. snprint(buf, sizeof(buf), "{pgripid: %d, pid: %d}", p->pgrpid, p->slpid);
  286. n = readstr(m, va, n, buf);
  287. return n;
  288. case Qdata0:
  289. if (p->debug)
  290. print("pid %d reads m\n", up->pid);
  291. if (p->dead)
  292. return -1;
  293. return qread(p->q[0], va, n);
  294. case Qdata1:
  295. /* TODO: proper locking */
  296. if (p->dead)
  297. return -1;
  298. if (up->pgrp->pgrpid != p->pgrpid)
  299. tsleep(&p->r, testready, c, 1000);
  300. p->active = up->pid;
  301. if (p->debug)
  302. print("pid %d reads s\n", up->pid);
  303. return qread(p->q[1], va, n);
  304. default:
  305. panic("fdmuxread");
  306. }
  307. return -1; /* not reached */
  308. }
  309. static Block*
  310. fdmuxbread(Chan *c, int32_t n, int64_t offset)
  311. {
  312. Proc *up = externup();
  313. Fdmux *p;
  314. int l;
  315. Block *b;
  316. p = c->aux;
  317. switch(FDMUXTYPE(c->qid.path)){
  318. case Qctl:
  319. b = iallocb(8);
  320. l = snprint((char *)b->wp, 8, "%d", p->pgrpid);
  321. b->wp += l;
  322. return b;
  323. case Qdata0:
  324. if (p->dead)
  325. return nil;
  326. return qbread(p->q[0], n);
  327. case Qdata1:
  328. if (p->dead)
  329. return nil;
  330. /* TODO: proper locking */
  331. if (up->pgrp->pgrpid != p->pgrpid)
  332. tsleep(&p->r, testready, c, 1000);
  333. return qbread(p->q[1], n);
  334. }
  335. return devbread(c, n, offset);
  336. }
  337. /*
  338. * a write to a closed fdmux causes a note to be sent to
  339. * the process.
  340. */
  341. static int32_t
  342. fdmuxwrite(Chan *c, void *va, int32_t n, int64_t mm)
  343. {
  344. Proc *up = externup();
  345. Fdmux *p;
  346. char buf[32];
  347. char notename[32];
  348. int id;
  349. int l;
  350. char *signal = "interrupt";
  351. int siglen = 9;
  352. if(0)if(!islo())
  353. print("fdmuxwrite hi %#p\n", getcallerpc()); // devmnt?
  354. if(waserror()) {
  355. /* avoid notes when fdmux is a mounted queue */
  356. if((c->flag & CMSG) == 0)
  357. postnote(up, 1, "sys: write on closed fdmux", NUser);
  358. nexterror();
  359. }
  360. p = c->aux;
  361. switch(FDMUXTYPE(c->qid.path)){
  362. /* single letter command a number. */
  363. case Qctl:
  364. if(n >= sizeof(buf))
  365. n = sizeof(buf)-1;
  366. strncpy(buf, va, n);
  367. buf[n] = 0;
  368. id = strtoul(&buf[1], 0, 0);
  369. switch(buf[0]) {
  370. case 'd':
  371. break;
  372. case 'k':
  373. break;
  374. case 'p':
  375. case 'l':
  376. break;
  377. case 'n':
  378. if (id == 0)
  379. id = p->active;
  380. break;
  381. case 's':
  382. signal = "stop";
  383. siglen = 4;
  384. if (id == 0)
  385. id = p->slpid;
  386. break;
  387. default:
  388. print("usage: k (kill) or d (debug) or [lnps][optional number]");
  389. break;
  390. }
  391. if (p->debug)
  392. print("pid %d writes cmd :%s:\n", up->pid, buf);
  393. switch(buf[0]) {
  394. case 'd':
  395. p->debug++;
  396. break;
  397. case 'k':
  398. p->dead++;
  399. break;
  400. case 'p':
  401. // NO checking. How would we know?
  402. if (p->debug)
  403. print("Set pgrpid to %d\n", id);
  404. p->pgrpid = id;
  405. break;
  406. case 'l':
  407. if (p->debug)
  408. print("Set sleader to %d\n", id);
  409. p->slpid = id;
  410. break;
  411. case 'n':
  412. l = snprint(notename, sizeof(notename), "#p/%d/note", id);
  413. c = namec(notename, Aopen, ORDWR, 0);
  414. if (p->debug)
  415. print("send note %s to %d c %p\n", notename, id, c);
  416. if (! c)
  417. error(notename);
  418. if (waserror()) {
  419. cclose(c);
  420. nexterror();
  421. }
  422. n = c->dev->write(c, signal, siglen, 0);
  423. poperror();
  424. if (p->debug)
  425. print("Wrote %s len %d res %d\n", notename, l, n);
  426. cclose(c);
  427. break;
  428. default:
  429. print("ignoring unsupported command :%s:\n", buf);
  430. break;
  431. }
  432. break;
  433. case Qdata0:
  434. if (p->debug)
  435. print("pid %d writes m\n", up->pid);
  436. if (p->dead) {
  437. n = -1;
  438. break;
  439. }
  440. n = qwrite(p->q[1], va, n);
  441. break;
  442. case Qdata1:
  443. /* TODO: proper locking */
  444. if (p->dead) {
  445. n = -1;
  446. break;
  447. }
  448. if (up->pgrp->pgrpid != p->pgrpid)
  449. tsleep(&p->r, testready, c, 1000);
  450. p->active = up->pid;
  451. if (p->debug)
  452. print("pid %d writes s\n", up->pid);
  453. n = qwrite(p->q[0], va, n);
  454. break;
  455. default:
  456. panic("fdmuxwrite");
  457. }
  458. poperror();
  459. return n;
  460. }
  461. static int32_t
  462. fdmuxbwrite(Chan *c, Block *bp, int64_t mm)
  463. {
  464. Proc *up = externup();
  465. int32_t n;
  466. Fdmux *p;
  467. if(waserror()) {
  468. /* avoid notes when fdmux is a mounted queue */
  469. if((c->flag & CMSG) == 0)
  470. postnote(up, 1, "sys: write on closed fdmux", NUser);
  471. nexterror();
  472. }
  473. p = c->aux;
  474. switch(FDMUXTYPE(c->qid.path)){
  475. case Qdata0:
  476. if (p->dead) {
  477. n = -1;
  478. break;
  479. }
  480. n = qbwrite(p->q[1], bp);
  481. break;
  482. case Qdata1:
  483. if (p->dead) {
  484. n = -1;
  485. break;
  486. }
  487. /* TODO: proper locking */
  488. if (up->pgrp->pgrpid != p->pgrpid)
  489. tsleep(&p->r, testready, c, 1000);
  490. n = qbwrite(p->q[0], bp);
  491. break;
  492. default:
  493. n = 0;
  494. panic("fdmuxbwrite");
  495. }
  496. poperror();
  497. return n;
  498. }
  499. Dev fdmuxdevtab = {
  500. .dc = '<',
  501. .name = "fdmux",
  502. .reset = devreset,
  503. .init = fdmuxinit,
  504. .shutdown = devshutdown,
  505. .attach = fdmuxattach,
  506. .walk = fdmuxwalk,
  507. .stat = fdmuxstat,
  508. .open = fdmuxopen,
  509. .create = devcreate,
  510. .close = fdmuxclose,
  511. .read = fdmuxread,
  512. .bread = fdmuxbread,
  513. .write = fdmuxwrite,
  514. .bwrite = fdmuxbwrite,
  515. .remove = devremove,
  516. .wstat = devwstat,
  517. };