9proc.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. #include "stdinc.h"
  2. #include "9.h"
  3. #include "dat.h"
  4. #include "fns.h"
  5. enum {
  6. NConInit = 128,
  7. NMsgInit = 20,
  8. NMsgProcInit = 4,
  9. NMsizeInit = 8192+IOHDRSZ,
  10. };
  11. static struct {
  12. VtLock* alock; /* alloc */
  13. Msg* ahead;
  14. VtRendez* arendez;
  15. int maxmsg;
  16. int nmsg;
  17. int nmsgstarve;
  18. VtLock* rlock; /* read */
  19. Msg* rhead;
  20. Msg* rtail;
  21. VtRendez* rrendez;
  22. int maxproc;
  23. int nproc;
  24. int nprocstarve;
  25. u32int msize; /* immutable */
  26. } mbox;
  27. static struct {
  28. VtLock* alock; /* alloc */
  29. Con* ahead;
  30. VtRendez* arendez;
  31. VtLock* clock;
  32. Con* chead;
  33. Con* ctail;
  34. int maxcon;
  35. int ncon;
  36. int nconstarve;
  37. u32int msize;
  38. } cbox;
  39. static void
  40. conFree(Con* con)
  41. {
  42. assert(con->version == nil);
  43. assert(con->mhead == nil);
  44. assert(con->whead == nil);
  45. assert(con->nfid == 0);
  46. assert(con->state == ConMoribund);
  47. if(con->fd >= 0){
  48. close(con->fd);
  49. con->fd = -1;
  50. }
  51. con->state = ConDead;
  52. vtLock(cbox.alock);
  53. if(con->cprev != nil)
  54. con->cprev->cnext = con->cnext;
  55. else
  56. cbox.chead = con->cnext;
  57. if(con->cnext != nil)
  58. con->cnext->cprev = con->cprev;
  59. else
  60. cbox.ctail = con->cprev;
  61. con->cprev = con->cnext = nil;
  62. if(cbox.ncon > cbox.maxcon){
  63. if(con->name != nil)
  64. vtMemFree(con->name);
  65. vtLockFree(con->fidlock);
  66. vtMemFree(con->data);
  67. vtRendezAlloc(con->wrendez);
  68. vtLockFree(con->wlock);
  69. vtRendezAlloc(con->mrendez);
  70. vtLockFree(con->mlock);
  71. vtRendezAlloc(con->rendez);
  72. vtLockFree(con->lock);
  73. vtMemFree(con);
  74. cbox.ncon--;
  75. vtUnlock(cbox.alock);
  76. return;
  77. }
  78. con->anext = cbox.ahead;
  79. cbox.ahead = con;
  80. if(con->anext == nil)
  81. vtWakeup(cbox.arendez);
  82. vtUnlock(cbox.alock);
  83. }
  84. static void
  85. msgFree(Msg* m)
  86. {
  87. assert(m->rwnext == nil);
  88. assert(m->fnext == nil && m->fprev == nil);
  89. vtLock(mbox.alock);
  90. if(mbox.nmsg > mbox.maxmsg){
  91. vtMemFree(m->data);
  92. vtMemFree(m);
  93. mbox.nmsg--;
  94. vtUnlock(mbox.alock);
  95. return;
  96. }
  97. m->anext = mbox.ahead;
  98. mbox.ahead = m;
  99. if(m->anext == nil)
  100. vtWakeup(mbox.arendez);
  101. vtUnlock(mbox.alock);
  102. }
  103. static Msg*
  104. msgAlloc(Con* con)
  105. {
  106. Msg *m;
  107. vtLock(mbox.alock);
  108. while(mbox.ahead == nil){
  109. if(mbox.nmsg >= mbox.maxmsg){
  110. mbox.nmsgstarve++;
  111. vtSleep(mbox.arendez);
  112. continue;
  113. }
  114. m = vtMemAllocZ(sizeof(Msg));
  115. m->data = vtMemAlloc(mbox.msize);
  116. m->msize = mbox.msize;
  117. mbox.nmsg++;
  118. mbox.ahead = m;
  119. break;
  120. }
  121. m = mbox.ahead;
  122. mbox.ahead = m->anext;
  123. m->anext = nil;
  124. vtUnlock(mbox.alock);
  125. m->con = con;
  126. m->state = MsgR;
  127. return m;
  128. }
  129. static void
  130. msgMunlink(Msg* m)
  131. {
  132. Con *con;
  133. con = m->con;
  134. if(m->mprev != nil)
  135. m->mprev->mnext = m->mnext;
  136. else
  137. con->mhead = m->mnext;
  138. if(m->mnext != nil)
  139. m->mnext->mprev = m->mprev;
  140. else
  141. con->mtail = m->mprev;
  142. m->mprev = m->mnext = nil;
  143. }
  144. static void
  145. msgUnlinkUnlockAndFree(Msg* m)
  146. {
  147. /*
  148. * Unlink the message from the flush and message queues,
  149. * unlock the connection message lock and free the message.
  150. * Called with con->mlock locked.
  151. */
  152. if(m->fprev != nil)
  153. m->fprev->fnext = m->fnext;
  154. if(m->fnext != nil)
  155. m->fnext->fprev = m->fprev;
  156. m->fprev = m->fnext = nil;
  157. msgMunlink(m);
  158. vtUnlock(m->con->mlock);
  159. msgFree(m);
  160. }
  161. void
  162. msgFlush(Msg* m)
  163. {
  164. Msg *old;
  165. Con *con;
  166. con = m->con;
  167. /*
  168. * Look for the message to be flushed in the
  169. * queue of all messages still on this connection.
  170. */
  171. vtLock(con->mlock);
  172. for(old = con->mhead; old != nil; old = old->mnext)
  173. if(old->t.tag == m->t.oldtag)
  174. break;
  175. if(old == nil){
  176. if(Dflag)
  177. fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag);
  178. vtUnlock(con->mlock);
  179. return;
  180. }
  181. /*
  182. * Found it.
  183. *
  184. * Easy case is no 9P processing done yet,
  185. * message is on the read queue.
  186. * Mark the message as flushed and let the read
  187. * process throw it away after after pulling
  188. * it off the read queue.
  189. */
  190. if(old->state == MsgR){
  191. old->state = MsgF;
  192. if(Dflag)
  193. fprint(2, "msgFlush: change %d from MsgR to MsgF\n", m->t.oldtag);
  194. vtUnlock(con->mlock);
  195. return;
  196. }
  197. /*
  198. * Flushing flushes.
  199. * Since they don't affect the server state, flushes
  200. * can be deleted when in Msg9 or MsgW state.
  201. */
  202. if(old->t.type == Tflush){
  203. /*
  204. * For Msg9 state, the old message may
  205. * or may not be on the write queue.
  206. * Mark the message as flushed and let
  207. * the write process throw it away after
  208. * after pulling it off the write queue.
  209. */
  210. if(old->state == Msg9){
  211. old->state = MsgF;
  212. if(Dflag)
  213. fprint(2, "msgFlush: change %d from Msg9 to MsgF\n", m->t.oldtag);
  214. vtUnlock(con->mlock);
  215. return;
  216. }
  217. assert(old->state == MsgW);
  218. /*
  219. * A flush in MsgW state implies it is waiting
  220. * for its corresponding old message to be written,
  221. * so it can be deleted right here, right now...
  222. * right here, right now... right here, right now...
  223. * right about now... the funk soul brother.
  224. */
  225. if(Dflag)
  226. fprint(2, "msgFlush: delete pending flush %F\n", &old->t);
  227. msgUnlinkUnlockAndFree(old);
  228. return;
  229. }
  230. /*
  231. * Must wait for the old message to be written.
  232. * Add m to old's flush queue.
  233. * Old is the head of its own flush queue.
  234. */
  235. m->fprev = old;
  236. m->fnext = old->fnext;
  237. if(m->fnext)
  238. m->fnext->fprev = m;
  239. old->fnext = m;
  240. if(Dflag)
  241. fprint(2, "msgFlush: add %d to %d queue\n", m->t.tag, old->t.tag);
  242. vtUnlock(con->mlock);
  243. }
  244. static void
  245. msgProc(void*)
  246. {
  247. Msg *m;
  248. char *e;
  249. Con *con;
  250. vtThreadSetName("msgProc");
  251. for(;;){
  252. /*
  253. * If surplus to requirements, exit.
  254. * If not, wait for and pull a message off
  255. * the read queue.
  256. */
  257. vtLock(mbox.rlock);
  258. if(mbox.nproc > mbox.maxproc){
  259. mbox.nproc--;
  260. vtUnlock(mbox.rlock);
  261. break;
  262. }
  263. while(mbox.rhead == nil)
  264. vtSleep(mbox.rrendez);
  265. m = mbox.rhead;
  266. mbox.rhead = m->rwnext;
  267. m->rwnext = nil;
  268. vtUnlock(mbox.rlock);
  269. con = m->con;
  270. e = nil;
  271. /*
  272. * If the message has been flushed before any
  273. * 9P processing has started, just throw it away.
  274. */
  275. vtLock(con->mlock);
  276. if(m->state == MsgF){
  277. msgUnlinkUnlockAndFree(m);
  278. continue;
  279. }
  280. m->state = Msg9;
  281. vtUnlock(con->mlock);
  282. /*
  283. * explain this
  284. */
  285. vtLock(con->lock);
  286. if(m->t.type == Tversion){
  287. con->version = m;
  288. con->state = ConDown;
  289. while(con->mhead != m)
  290. vtSleep(con->rendez);
  291. assert(con->state == ConDown);
  292. if(con->version == m){
  293. con->version = nil;
  294. con->state = ConInit;
  295. }
  296. else
  297. e = "Tversion aborted";
  298. }
  299. else if(con->state != ConUp)
  300. e = "connection not ready";
  301. vtUnlock(con->lock);
  302. /*
  303. * Dispatch if not error already.
  304. */
  305. m->r.tag = m->t.tag;
  306. if(e == nil && !(*rFcall[m->t.type])(m))
  307. e = vtGetError();
  308. if(e != nil){
  309. m->r.type = Rerror;
  310. m->r.ename = e;
  311. }
  312. else
  313. m->r.type = m->t.type+1;
  314. /*
  315. * Put the message (with reply) on the
  316. * write queue and wakeup the write process.
  317. */
  318. vtLock(con->wlock);
  319. if(con->whead == nil)
  320. con->whead = m;
  321. else
  322. con->wtail->rwnext = m;
  323. con->wtail = m;
  324. vtWakeup(con->wrendez);
  325. vtUnlock(con->wlock);
  326. }
  327. }
  328. static void
  329. msgRead(void* v)
  330. {
  331. Msg *m;
  332. Con *con;
  333. int eof, fd, n;
  334. vtThreadSetName("msgRead");
  335. con = v;
  336. fd = con->fd;
  337. eof = 0;
  338. while(!eof){
  339. m = msgAlloc(con);
  340. while((n = read9pmsg(fd, m->data, con->msize)) == 0)
  341. ;
  342. if(n < 0){
  343. m->t.type = Tversion;
  344. m->t.fid = NOFID;
  345. m->t.tag = NOTAG;
  346. m->t.msize = con->msize;
  347. m->t.version = "9PEoF";
  348. eof = 1;
  349. }
  350. else if(convM2S(m->data, n, &m->t) != n){
  351. if(Dflag)
  352. fprint(2, "msgRead: convM2S error: %s\n",
  353. con->name);
  354. msgFree(m);
  355. continue;
  356. }
  357. if(Dflag)
  358. fprint(2, "msgRead: t %F\n", &m->t);
  359. vtLock(con->mlock);
  360. if(con->mtail != nil){
  361. m->mprev = con->mtail;
  362. con->mtail->mnext = m;
  363. }
  364. else{
  365. con->mhead = m;
  366. m->mprev = nil;
  367. }
  368. con->mtail = m;
  369. vtUnlock(con->mlock);
  370. vtLock(mbox.rlock);
  371. if(mbox.rhead == nil){
  372. mbox.rhead = m;
  373. if(!vtWakeup(mbox.rrendez)){
  374. if(mbox.nproc < mbox.maxproc){
  375. if(vtThread(msgProc, nil) > 0)
  376. mbox.nproc++;
  377. }
  378. else
  379. mbox.nprocstarve++;
  380. }
  381. /*
  382. * don't need this surely?
  383. vtWakeup(mbox.rrendez);
  384. */
  385. }
  386. else
  387. mbox.rtail->rwnext = m;
  388. mbox.rtail = m;
  389. vtUnlock(mbox.rlock);
  390. }
  391. }
  392. static int
  393. _msgWrite(Msg* m)
  394. {
  395. Con *con;
  396. int eof, n;
  397. con = m->con;
  398. /*
  399. * An Rflush with a .fprev implies it is on a flush queue waiting for
  400. * its corresponding 'oldtag' message to go out first, so punt
  401. * until the 'oldtag' message goes out (see below).
  402. */
  403. if(m->r.type == Rflush && m->fprev != nil){
  404. fprint(2, "msgWrite: delay r %F\n", &m->r);
  405. return 0;
  406. }
  407. msgMunlink(m);
  408. vtUnlock(con->mlock);
  409. /*
  410. * TODO: optimise this copy away somehow for
  411. * read, stat, etc.
  412. */
  413. assert(n = convS2M(&m->r, con->data, con->msize));
  414. if(write(con->fd, con->data, n) != n)
  415. eof = 1;
  416. else
  417. eof = 0;
  418. if(Dflag)
  419. fprint(2, "msgWrite: r %F\n", &m->r);
  420. /*
  421. * Just wrote a reply. If it has any flushes waiting
  422. * for it to have gone out, recurse down the list writing
  423. * them out too.
  424. */
  425. vtLock(con->mlock);
  426. if(m->fnext != nil){
  427. m->fnext->fprev = nil;
  428. eof += _msgWrite(m->fnext);
  429. m->fnext = nil;
  430. }
  431. msgFree(m);
  432. return eof;
  433. }
  434. static void
  435. msgWrite(void* v)
  436. {
  437. Msg *m;
  438. int eof;
  439. Con *con;
  440. vtThreadSetName("msgWrite");
  441. con = v;
  442. if(vtThread(msgRead, con) < 0){
  443. conFree(con);
  444. return;
  445. }
  446. for(;;){
  447. /*
  448. * Wait for and pull a message off the write queue.
  449. */
  450. vtLock(con->wlock);
  451. while(con->whead == nil)
  452. vtSleep(con->wrendez);
  453. m = con->whead;
  454. con->whead = m->rwnext;
  455. m->rwnext = nil;
  456. vtUnlock(con->wlock);
  457. /*
  458. * Throw the message away if it's a flushed flush,
  459. * otherwise change its state and try to write it out.
  460. */
  461. vtLock(con->mlock);
  462. if(m->state == MsgF){
  463. assert(m->t.type == Tflush);
  464. msgUnlinkUnlockAndFree(m);
  465. continue;
  466. }
  467. m->state = MsgW;
  468. eof = _msgWrite(m);
  469. vtUnlock(con->mlock);
  470. vtLock(con->lock);
  471. if(eof && con->fd >= 0){
  472. close(con->fd);
  473. con->fd = -1;
  474. }
  475. if(con->state == ConDown)
  476. vtWakeup(con->rendez);
  477. if(con->state == ConMoribund && con->mhead == nil){
  478. vtUnlock(con->lock);
  479. conFree(con);
  480. break;
  481. }
  482. vtUnlock(con->lock);
  483. }
  484. }
  485. Con*
  486. conAlloc(int fd, char* name)
  487. {
  488. Con *con;
  489. vtLock(cbox.alock);
  490. while(cbox.ahead == nil){
  491. if(cbox.ncon >= cbox.maxcon){
  492. cbox.nconstarve++;
  493. vtSleep(cbox.arendez);
  494. continue;
  495. }
  496. con = vtMemAllocZ(sizeof(Con));
  497. con->lock = vtLockAlloc();
  498. con->rendez = vtRendezAlloc(con->lock);
  499. con->data = vtMemAlloc(cbox.msize);
  500. con->msize = cbox.msize;
  501. con->alock = vtLockAlloc();
  502. con->mlock = vtLockAlloc();
  503. con->mrendez = vtRendezAlloc(con->mlock);
  504. con->wlock = vtLockAlloc();
  505. con->wrendez = vtRendezAlloc(con->wlock);
  506. con->fidlock = vtLockAlloc();
  507. cbox.ncon++;
  508. cbox.ahead = con;
  509. break;
  510. }
  511. con = cbox.ahead;
  512. cbox.ahead = con->anext;
  513. con->anext = nil;
  514. if(cbox.ctail != nil){
  515. con->cprev = cbox.ctail;
  516. cbox.ctail->cnext = con;
  517. }
  518. else{
  519. cbox.chead = con;
  520. con->cprev = nil;
  521. }
  522. cbox.ctail = con;
  523. assert(con->mhead == nil);
  524. assert(con->whead == nil);
  525. assert(con->fhead == nil);
  526. assert(con->nfid == 0);
  527. con->state = ConNew;
  528. con->fd = fd;
  529. if(con->name != nil){
  530. vtMemFree(con->name);
  531. con->name = nil;
  532. }
  533. if(name != nil)
  534. con->name = vtStrDup(name);
  535. else
  536. con->name = vtStrDup("unknown");
  537. con->aok = 0;
  538. vtUnlock(cbox.alock);
  539. if(vtThread(msgWrite, con) < 0){
  540. conFree(con);
  541. return nil;
  542. }
  543. return con;
  544. }
  545. static int
  546. cmdMsg(int argc, char* argv[])
  547. {
  548. char *p;
  549. char *usage = "usage: msg [-m nmsg] [-p nproc]";
  550. int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve;
  551. maxmsg = maxproc = 0;
  552. ARGBEGIN{
  553. default:
  554. return cliError(usage);
  555. case 'm':
  556. p = ARGF();
  557. if(p == nil)
  558. return cliError(usage);
  559. maxmsg = strtol(argv[0], &p, 0);
  560. if(maxmsg <= 0 || p == argv[0] || *p != '\0')
  561. return cliError(usage);
  562. break;
  563. case 'p':
  564. p = ARGF();
  565. if(p == nil)
  566. return cliError(usage);
  567. maxproc = strtol(argv[0], &p, 0);
  568. if(maxproc <= 0 || p == argv[0] || *p != '\0')
  569. return cliError(usage);
  570. break;
  571. }ARGEND
  572. if(argc)
  573. return cliError(usage);
  574. vtLock(mbox.alock);
  575. if(maxmsg)
  576. mbox.maxmsg = maxmsg;
  577. maxmsg = mbox.maxmsg;
  578. nmsg = mbox.nmsg;
  579. nmsgstarve = mbox.nmsgstarve;
  580. vtUnlock(mbox.alock);
  581. vtLock(mbox.rlock);
  582. if(maxproc)
  583. mbox.maxproc = maxproc;
  584. maxproc = mbox.maxproc;
  585. nproc = mbox.nproc;
  586. nprocstarve = mbox.nprocstarve;
  587. vtUnlock(mbox.rlock);
  588. consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc);
  589. consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n",
  590. nmsg, nmsgstarve, nproc, nprocstarve);
  591. return 1;
  592. }
  593. void
  594. msgInit(void)
  595. {
  596. mbox.alock = vtLockAlloc();
  597. mbox.arendez = vtRendezAlloc(mbox.alock);
  598. mbox.rlock = vtLockAlloc();
  599. mbox.rrendez = vtRendezAlloc(mbox.rlock);
  600. mbox.maxmsg = NMsgInit;
  601. mbox.maxproc = NMsgProcInit;
  602. mbox.msize = NMsizeInit;
  603. cliAddCmd("msg", cmdMsg);
  604. }
  605. static int
  606. cmdCon(int argc, char* argv[])
  607. {
  608. char *p;
  609. Con *con;
  610. char *usage = "usage: con [-m ncon]";
  611. int maxcon, ncon, nconstarve;
  612. maxcon = 0;
  613. ARGBEGIN{
  614. default:
  615. return cliError(usage);
  616. case 'm':
  617. p = ARGF();
  618. if(p == nil)
  619. return cliError(usage);
  620. maxcon = strtol(argv[0], &p, 0);
  621. if(maxcon <= 0 || p == argv[0] || *p != '\0')
  622. return cliError(usage);
  623. break;
  624. }ARGEND
  625. if(argc)
  626. return cliError(usage);
  627. vtLock(cbox.clock);
  628. if(maxcon)
  629. cbox.maxcon = maxcon;
  630. maxcon = cbox.maxcon;
  631. ncon = cbox.ncon;
  632. nconstarve = cbox.nconstarve;
  633. vtUnlock(cbox.clock);
  634. consPrint("\tcon -m %d\n", maxcon);
  635. consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve);
  636. vtRLock(cbox.clock);
  637. for(con = cbox.chead; con != nil; con = con->cnext){
  638. consPrint("\t%s\n", con->name);
  639. }
  640. vtRUnlock(cbox.clock);
  641. return 1;
  642. }
  643. void
  644. conInit(void)
  645. {
  646. cbox.alock = vtLockAlloc();
  647. cbox.arendez = vtRendezAlloc(cbox.alock);
  648. cbox.clock = vtLockAlloc();
  649. cbox.maxcon = NConInit;
  650. cbox.msize = NMsizeInit;
  651. cliAddCmd("con", cmdCon);
  652. }