9proc.c 7.2 KB


  1. #include "stdinc.h"
  2. #include "9.h"
  3. #include "dat.h"
  4. #include "fns.h"
  5. enum {
  6. NConInit = 128,
  7. NMsgInit = 20,
  8. NMsgProcInit = 4,
  9. NMsizeInit = 8192+IOHDRSZ,
  10. };
  11. static struct {
  12. VtLock* lock;
  13. Con** con; /* arena */
  14. int ncon; /* how many in arena */
  15. int hi; /* high watermark */
  16. int cur; /* hint for allocation */
  17. u32int msize;
  18. } cbox;
  19. static struct {
  20. VtLock* lock;
  21. Msg* free;
  22. VtRendez* alloc;
  23. Msg* head;
  24. Msg* tail;
  25. VtRendez* work;
  26. int maxmsg;
  27. int nmsg;
  28. int maxproc;
  29. int nproc;
  30. u32int msize; /* immutable */
  31. } mbox;
  32. static void
  33. msgFree(Msg* m)
  34. {
  35. vtLock(mbox.lock);
  36. if(mbox.nmsg > mbox.maxmsg){
  37. vtMemFree(m->data);
  38. vtMemFree(m);
  39. mbox.nmsg--;
  40. vtUnlock(mbox.lock);
  41. return;
  42. }
  43. m->next = mbox.free;
  44. mbox.free = m;
  45. if(m->next == nil)
  46. vtWakeup(mbox.alloc);
  47. vtUnlock(mbox.lock);
  48. }
  49. static void
  50. conFree(Con* con)
  51. {
  52. if(con->fd >= 0){
  53. close(con->fd);
  54. con->fd = -1;
  55. }
  56. assert(con->version == nil);
  57. assert(con->mhead == nil);
  58. assert(con->nmsg == 0);
  59. assert(con->nfid == 0);
  60. assert(con->state == CsMoribund);
  61. con->state = CsDead;
  62. }
  63. static void
  64. msgProc(void*)
  65. {
  66. int n;
  67. Msg *m;
  68. char *e;
  69. Con *con;
  70. vtThreadSetName("msg");
  71. vtLock(mbox.lock);
  72. while(mbox.nproc <= mbox.maxproc){
  73. while(mbox.head == nil)
  74. vtSleep(mbox.work);
  75. m = mbox.head;
  76. mbox.head = m->next;
  77. m->next = nil;
  78. e = nil;
  79. con = m->con;
  80. vtLock(con->lock);
  81. assert(con->state != CsDead);
  82. con->nmsg++;
  83. if(m->t.type == Tversion){
  84. con->version = m;
  85. con->state = CsDown;
  86. while(con->mhead != nil)
  87. vtSleep(con->active);
  88. assert(con->state == CsDown);
  89. if(con->version == m){
  90. con->version = nil;
  91. con->state = CsInit;
  92. }
  93. else
  94. e = "Tversion aborted";
  95. }
  96. else if(con->state != CsUp)
  97. e = "connection not ready";
  98. /*
  99. * Add Msg to end of active list.
  100. */
  101. if(con->mtail != nil){
  102. m->prev = con->mtail;
  103. con->mtail->next = m;
  104. }
  105. else{
  106. con->mhead = m;
  107. m->prev = nil;
  108. }
  109. con->mtail = m;
  110. m->next = nil;
  111. vtUnlock(con->lock);
  112. vtUnlock(mbox.lock);
  113. /*
  114. * Dispatch if not error already.
  115. */
  116. m->r.tag = m->t.tag;
  117. if(e == nil && !(*rFcall[m->t.type])(m))
  118. e = vtGetError();
  119. if(e != nil){
  120. m->r.type = Rerror;
  121. m->r.ename = e;
  122. }
  123. else
  124. m->r.type = m->t.type+1;
  125. vtLock(con->lock);
  126. /*
  127. * Remove Msg from active list.
  128. */
  129. if(m->prev != nil)
  130. m->prev->next = m->next;
  131. else
  132. con->mhead = m->next;
  133. if(m->next != nil)
  134. m->next->prev = m->prev;
  135. else
  136. con->mtail = m->prev;
  137. m->prev = m->next = nil;
  138. if(con->mhead == nil)
  139. vtWakeup(con->active);
  140. if(Dflag)
  141. fprint(2, "msgProc: r %F\n", &m->r);
  142. if((con->state == CsNew || con->state == CsUp) && !m->flush){
  143. /*
  144. * TODO: optimise this copy away somehow for
  145. * read, stat, etc.
  146. */
  147. assert(n = convS2M(&m->r, con->data, con->msize));
  148. if(write(con->fd, con->data, n) != n){
  149. if(con->fd >= 0){
  150. close(con->fd);
  151. con->fd = -1;
  152. }
  153. }
  154. }
  155. con->nmsg--;
  156. if(con->state == CsMoribund && con->nmsg == 0){
  157. vtUnlock(con->lock);
  158. conFree(con);
  159. }
  160. else
  161. vtUnlock(con->lock);
  162. vtLock(mbox.lock);
  163. m->next = mbox.free;
  164. mbox.free = m;
  165. if(m->next == nil)
  166. vtWakeup(mbox.alloc);
  167. }
  168. mbox.nproc--;
  169. vtUnlock(mbox.lock);
  170. }
  171. static void
  172. conProc(void* v)
  173. {
  174. Msg *m;
  175. Con *con;
  176. int eof, fd, n;
  177. vtThreadSetName("con");
  178. con = v;
  179. if(Dflag)
  180. fprint(2, "conProc: con->fd %d\n", con->fd);
  181. fd = con->fd;
  182. eof = 0;
  183. vtLock(mbox.lock);
  184. while(!eof){
  185. while(mbox.free == nil){
  186. if(mbox.nmsg >= mbox.maxmsg){
  187. vtSleep(mbox.alloc);
  188. continue;
  189. }
  190. m = vtMemAllocZ(sizeof(Msg));
  191. m->data = vtMemAlloc(mbox.msize);
  192. m->msize = mbox.msize;
  193. mbox.nmsg++;
  194. mbox.free = m;
  195. break;
  196. }
  197. m = mbox.free;
  198. mbox.free = m->next;
  199. m->next = nil;
  200. vtUnlock(mbox.lock);
  201. m->con = con;
  202. m->flush = 0;
  203. while((n = read9pmsg(fd, m->data, con->msize)) == 0)
  204. ;
  205. if(n < 0){
  206. m->t.type = Tversion;
  207. m->t.fid = NOFID;
  208. m->t.tag = NOTAG;
  209. m->t.msize = con->msize;
  210. m->t.version = "9PEoF";
  211. eof = 1;
  212. }
  213. else if(convM2S(m->data, n, &m->t) != n){
  214. if(Dflag)
  215. fprint(2, "conProc: convM2S error: %s\n",
  216. con->name);
  217. msgFree(m);
  218. vtLock(mbox.lock);
  219. continue;
  220. }
  221. if(Dflag)
  222. fprint(2, "conProc: t %F\n", &m->t);
  223. vtLock(mbox.lock);
  224. if(mbox.head == nil){
  225. mbox.head = m;
  226. if(!vtWakeup(mbox.work) && mbox.nproc < mbox.maxproc){
  227. if(vtThread(msgProc, nil) > 0)
  228. mbox.nproc++;
  229. }
  230. vtWakeup(mbox.work);
  231. }
  232. else
  233. mbox.tail->next = m;
  234. mbox.tail = m;
  235. }
  236. vtUnlock(mbox.lock);
  237. }
  238. Con*
  239. conAlloc(int fd, char* name)
  240. {
  241. Con *con;
  242. int cur, i;
  243. vtLock(cbox.lock);
  244. cur = cbox.cur;
  245. for(i = 0; i < cbox.hi; i++){
  246. /*
  247. * Look for any unallocated or CsDead up to the
  248. * high watermark; cur is a hint where to start.
  249. * Wrap around the whole arena.
  250. */
  251. if(cbox.con[cur] == nil || cbox.con[cur]->state == CsDead)
  252. break;
  253. if(++cur >= cbox.hi)
  254. cur = 0;
  255. }
  256. if(i >= cbox.hi){
  257. /*
  258. * None found.
  259. * If the high watermark is up to the limit of those
  260. * allocated, increase the size of the arena.
  261. * Bump up the watermark and take the next.
  262. */
  263. if(cbox.hi >= cbox.ncon){
  264. cbox.con = vtMemRealloc(cbox.con,
  265. (cbox.ncon+NConInit)*sizeof(Con*));
  266. memset(&cbox.con[cbox.ncon], 0, NConInit*sizeof(Con*));
  267. cbox.ncon += NConInit;
  268. }
  269. cur = cbox.hi++;
  270. }
  271. /*
  272. * Do one-time initialisation if necessary.
  273. * Put back a new hint.
  274. * Do specific initialisation and start the proc.
  275. */
  276. con = cbox.con[cur];
  277. if(con == nil){
  278. con = vtMemAllocZ(sizeof(Con));
  279. con->lock = vtLockAlloc();
  280. con->data = vtMemAlloc(cbox.msize);
  281. con->msize = cbox.msize;
  282. con->active = vtRendezAlloc(con->lock);
  283. con->fidlock = vtLockAlloc();
  284. cbox.con[cur] = con;
  285. }
  286. assert(con->mhead == nil);
  287. assert(con->nmsg == 0);
  288. assert(con->fhead == nil);
  289. assert(con->nfid == 0);
  290. con->state = CsNew;
  291. if(++cur >= cbox.hi)
  292. cur = 0;
  293. cbox.cur = cur;
  294. con->fd = fd;
  295. if(con->name != nil){
  296. vtMemFree(con->name);
  297. con->name = nil;
  298. }
  299. if(name != nil)
  300. con->name = vtStrDup(name);
  301. con->aok = 0;
  302. vtUnlock(cbox.lock);
  303. if(vtThread(conProc, con) < 0){
  304. conFree(con);
  305. return nil;
  306. }
  307. return con;
  308. }
  309. static int
  310. cmdMsg(int argc, char* argv[])
  311. {
  312. char *p;
  313. int maxmsg, maxproc;
  314. char *usage = "usage: msg [-m nmsg] [-p nproc]";
  315. maxmsg = maxproc = 0;
  316. ARGBEGIN{
  317. default:
  318. return cliError(usage);
  319. case 'm':
  320. p = ARGF();
  321. if(p == nil)
  322. return cliError(usage);
  323. maxmsg = strtol(argv[0], &p, 0);
  324. if(maxmsg <= 0 || p == argv[0] || *p != '\0')
  325. return cliError(usage);
  326. break;
  327. case 'p':
  328. p = ARGF();
  329. if(p == nil)
  330. return cliError(usage);
  331. maxproc = strtol(argv[0], &p, 0);
  332. if(maxproc <= 0 || p == argv[0] || *p != '\0')
  333. return cliError(usage);
  334. break;
  335. }ARGEND
  336. if(argc)
  337. return cliError(usage);
  338. vtLock(mbox.lock);
  339. if(maxmsg)
  340. mbox.maxmsg = maxmsg;
  341. maxmsg = mbox.maxmsg;
  342. if(maxproc)
  343. mbox.maxproc = maxproc;
  344. maxproc = mbox.maxproc;
  345. vtUnlock(mbox.lock);
  346. consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc);
  347. return 1;
  348. }
  349. void
  350. procInit(void)
  351. {
  352. mbox.lock = vtLockAlloc();
  353. mbox.alloc = vtRendezAlloc(mbox.lock);
  354. mbox.work = vtRendezAlloc(mbox.lock);
  355. mbox.maxmsg = NMsgInit;
  356. mbox.maxproc = NMsgProcInit;
  357. mbox.msize = NMsizeInit;
  358. cliAddCmd("msg", cmdMsg);
  359. cbox.lock = vtLockAlloc();
  360. cbox.con = nil;
  361. cbox.ncon = 0;
  362. cbox.msize = NMsizeInit;
  363. }