client.c 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. /*
  2. * This file is part of the UCB release of Plan 9. It is subject to the license
  3. * terms in the LICENSE file found in the top-level directory of this
  4. * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
  5. * part of the UCB release of Plan 9, including this file, may be copied,
  6. * modified, propagated, or distributed except according to the terms contained
  7. * in the LICENSE file.
  8. */
  9. /*
  10. * Sun RPC client.
  11. */
  12. #include <u.h>
  13. #include <libc.h>
  14. #include <thread.h>
  15. #include <sunrpc.h>
  16. typedef struct Out Out;
  17. struct Out
  18. {
  19. char err[ERRMAX]; /* error string */
  20. Channel *creply; /* send to finish rpc */
  21. uint8_t *p; /* pending request packet */
  22. int n; /* size of request */
  23. uint32_t tag; /* flush tag of pending request */
  24. uint32_t xid; /* xid of pending request */
  25. uint32_t st; /* first send time */
  26. uint32_t t; /* resend time */
  27. int nresend; /* number of resends */
  28. SunRpc rpc; /* response rpc */
  29. };
  30. static void
  31. udpThread(void *v)
  32. {
  33. uint8_t *p, *buf;
  34. Ioproc *io;
  35. int n;
  36. SunClient *cli;
  37. enum { BufSize = 65536 };
  38. cli = v;
  39. buf = emalloc(BufSize);
  40. io = ioproc();
  41. p = nil;
  42. for(;;){
  43. n = ioread(io, cli->fd, buf, BufSize);
  44. if(n <= 0)
  45. break;
  46. p = emalloc(4+n);
  47. memmove(p+4, buf, n);
  48. p[0] = n>>24;
  49. p[1] = n>>16;
  50. p[2] = n>>8;
  51. p[3] = n;
  52. if(sendp(cli->readchan, p) == 0)
  53. break;
  54. p = nil;
  55. }
  56. free(p);
  57. closeioproc(io);
  58. while(send(cli->dying, nil) == -1)
  59. ;
  60. }
  61. static void
  62. netThread(void *v)
  63. {
  64. uint8_t *p, buf[4];
  65. Ioproc *io;
  66. uint n, tot;
  67. int done;
  68. SunClient *cli;
  69. cli = v;
  70. io = ioproc();
  71. tot = 0;
  72. p = nil;
  73. for(;;){
  74. n = ioreadn(io, cli->fd, buf, 4);
  75. if(n != 4)
  76. break;
  77. n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
  78. if(cli->chatty)
  79. fprint(2, "%.8ux...", n);
  80. done = n&0x80000000;
  81. n &= ~0x80000000;
  82. if(tot == 0){
  83. p = emalloc(4+n);
  84. tot = 4;
  85. }else
  86. p = erealloc(p, tot+n);
  87. if(ioreadn(io, cli->fd, p+tot, n) != n)
  88. break;
  89. tot += n;
  90. if(done){
  91. p[0] = tot>>24;
  92. p[1] = tot>>16;
  93. p[2] = tot>>8;
  94. p[3] = tot;
  95. if(sendp(cli->readchan, p) == 0)
  96. break;
  97. p = nil;
  98. tot = 0;
  99. }
  100. }
  101. free(p);
  102. closeioproc(io);
  103. while(send(cli->dying, 0) == -1)
  104. ;
  105. }
  106. static void
  107. timerThread(void *v)
  108. {
  109. Ioproc *io;
  110. SunClient *cli;
  111. cli = v;
  112. io = ioproc();
  113. for(;;){
  114. if(iosleep(io, 200) < 0)
  115. break;
  116. if(sendul(cli->timerchan, 0) == 0)
  117. break;
  118. }
  119. closeioproc(io);
  120. while(send(cli->dying, 0) == -1)
  121. ;
  122. }
  123. static uint32_t
  124. msec(void)
  125. {
  126. return nsec()/1000000;
  127. }
  128. static uint32_t
  129. twait(uint32_t rtt, int nresend)
  130. {
  131. uint32_t t;
  132. t = rtt;
  133. if(nresend <= 1)
  134. {}
  135. else if(nresend <= 3)
  136. t *= 2;
  137. else if(nresend <= 18)
  138. t <<= nresend-2;
  139. else
  140. t = 60*1000;
  141. if(t > 60*1000)
  142. t = 60*1000;
  143. return t;
  144. }
  145. static void
  146. rpcMuxThread(void *v)
  147. {
  148. uint8_t *buf, *p, *ep;
  149. int i, n, nout, mout;
  150. uint32_t t, xidgen, tag;
  151. Alt a[5];
  152. Out *o, **out;
  153. SunRpc rpc;
  154. SunClient *cli;
  155. cli = v;
  156. mout = 16;
  157. nout = 0;
  158. out = emalloc(mout*sizeof(out[0]));
  159. xidgen = truerand();
  160. a[0].op = CHANRCV;
  161. a[0].c = cli->rpcchan;
  162. a[0].v = &o;
  163. a[1].op = CHANNOP;
  164. a[1].c = cli->timerchan;
  165. a[1].v = nil;
  166. a[2].op = CHANRCV;
  167. a[2].c = cli->flushchan;
  168. a[2].v = &tag;
  169. a[3].op = CHANRCV;
  170. a[3].c = cli->readchan;
  171. a[3].v = &buf;
  172. a[4].op = CHANEND;
  173. for(;;){
  174. switch(alt(a)){
  175. case 0: /* o = <-rpcchan */
  176. if(o == nil)
  177. goto Done;
  178. cli->nsend++;
  179. /* set xid */
  180. o->xid = ++xidgen;
  181. if(cli->needcount)
  182. p = o->p+4;
  183. else
  184. p = o->p;
  185. p[0] = xidgen>>24;
  186. p[1] = xidgen>>16;
  187. p[2] = xidgen>>8;
  188. p[3] = xidgen;
  189. if(write(cli->fd, o->p, o->n) != o->n){
  190. free(o->p);
  191. o->p = nil;
  192. snprint(o->err, sizeof o->err, "write: %r");
  193. sendp(o->creply, 0);
  194. break;
  195. }
  196. if(nout >= mout){
  197. mout *= 2;
  198. out = erealloc(out, mout*sizeof(out[0]));
  199. }
  200. o->st = msec();
  201. o->nresend = 0;
  202. o->t = o->st + twait(cli->rtt.avg, 0);
  203. if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
  204. out[nout++] = o;
  205. a[1].op = CHANRCV;
  206. break;
  207. case 1: /* <-timerchan */
  208. t = msec();
  209. for(i=0; i<nout; i++){
  210. o = out[i];
  211. if((int)(t - o->t) > 0){
  212. if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
  213. if(cli->maxwait && t - o->st >= cli->maxwait){
  214. free(o->p);
  215. o->p = nil;
  216. strcpy(o->err, "timeout");
  217. sendp(o->creply, 0);
  218. out[i--] = out[--nout];
  219. continue;
  220. }
  221. cli->nresend++;
  222. o->nresend++;
  223. o->t = t + twait(cli->rtt.avg, o->nresend);
  224. if(write(cli->fd, o->p, o->n) != o->n){
  225. free(o->p);
  226. o->p = nil;
  227. snprint(o->err, sizeof o->err, "rewrite: %r");
  228. sendp(o->creply, 0);
  229. out[i--] = out[--nout];
  230. continue;
  231. }
  232. }
  233. }
  234. /* stop ticking if no work; rpcchan will turn it back on */
  235. if(nout == 0)
  236. a[1].op = CHANNOP;
  237. break;
  238. case 2: /* tag = <-flushchan */
  239. for(i=0; i<nout; i++){
  240. o = out[i];
  241. if(o->tag == tag){
  242. out[i--] = out[--nout];
  243. strcpy(o->err, "flushed");
  244. free(o->p);
  245. o->p = nil;
  246. sendp(o->creply, 0);
  247. }
  248. }
  249. break;
  250. case 3: /* buf = <-readchan */
  251. p = buf;
  252. n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
  253. p += 4;
  254. ep = p+n;
  255. if(sunRpcUnpack(p, ep, &p, &rpc) < 0){
  256. fprint(2, "in: %.*H unpack failed\n", n, buf+4);
  257. free(buf);
  258. break;
  259. }
  260. if(cli->chatty)
  261. fprint(2, "in: %B\n", &rpc);
  262. if(rpc.iscall){
  263. fprint(2, "did not get reply\n");
  264. free(buf);
  265. break;
  266. }
  267. o = nil;
  268. for(i=0; i<nout; i++){
  269. o = out[i];
  270. if(o->xid == rpc.xid)
  271. break;
  272. }
  273. if(i==nout){
  274. if(cli->chatty) fprint(2, "did not find waiting request\n");
  275. free(buf);
  276. break;
  277. }
  278. out[i] = out[--nout];
  279. free(o->p);
  280. o->p = nil;
  281. if(rpc.status == SunSuccess){
  282. o->p = buf;
  283. o->rpc = rpc;
  284. }else{
  285. o->p = nil;
  286. free(buf);
  287. sunErrstr(rpc.status);
  288. rerrstr(o->err, sizeof o->err);
  289. }
  290. sendp(o->creply, 0);
  291. break;
  292. }
  293. }
  294. Done:
  295. free(out);
  296. sendp(cli->dying, 0);
  297. }
  298. SunClient*
  299. sunDial(char *address)
  300. {
  301. int fd;
  302. SunClient *cli;
  303. if((fd = dial(address, 0, 0, 0)) < 0)
  304. return nil;
  305. cli = emalloc(sizeof(SunClient));
  306. cli->fd = fd;
  307. cli->maxwait = 15000;
  308. cli->rtt.avg = 1000;
  309. cli->dying = chancreate(sizeof(void*), 0);
  310. cli->rpcchan = chancreate(sizeof(Out*), 0);
  311. cli->timerchan = chancreate(sizeof(uint32_t), 0);
  312. cli->flushchan = chancreate(sizeof(uint32_t), 0);
  313. cli->readchan = chancreate(sizeof(uint8_t*), 0);
  314. if(strstr(address, "udp!")){
  315. cli->needcount = 0;
  316. cli->nettid = threadcreate(udpThread, cli, SunStackSize);
  317. cli->timertid = threadcreate(timerThread, cli, SunStackSize);
  318. }else{
  319. cli->needcount = 1;
  320. cli->nettid = threadcreate(netThread, cli, SunStackSize);
  321. /* assume reliable: don't need timer */
  322. /* BUG: netThread should know how to redial */
  323. }
  324. threadcreate(rpcMuxThread, cli, SunStackSize);
  325. return cli;
  326. }
  327. void
  328. sunClientClose(SunClient *cli)
  329. {
  330. int n;
  331. /*
  332. * Threadints get you out of any stuck system calls
  333. * or thread rendezvouses, but do nothing if the thread
  334. * is in the ready state. Keep interrupting until it takes.
  335. */
  336. n = 0;
  337. if(!cli->timertid)
  338. n++;
  339. while(n < 2){
  340. threadint(cli->nettid);
  341. if(cli->timertid)
  342. threadint(cli->timertid);
  343. yield();
  344. while(nbrecv(cli->dying, nil) == 1)
  345. n++;
  346. }
  347. sendp(cli->rpcchan, 0);
  348. recvp(cli->dying);
  349. /* everyone's gone: clean up */
  350. close(cli->fd);
  351. chanfree(cli->flushchan);
  352. chanfree(cli->readchan);
  353. chanfree(cli->timerchan);
  354. free(cli);
  355. }
  356. void
  357. sunClientFlushRpc(SunClient *cli, uint32_t tag)
  358. {
  359. sendul(cli->flushchan, tag);
  360. }
  361. void
  362. sunClientProg(SunClient *cli, SunProg *p)
  363. {
  364. if(cli->nprog%16 == 0)
  365. cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
  366. cli->prog[cli->nprog++] = p;
  367. }
  368. int
  369. sunClientRpc(SunClient *cli, uint32_t tag, SunCall *tx, SunCall *rx,
  370. uint8_t **tofree)
  371. {
  372. uint8_t *bp, *p, *ep;
  373. int i, n1, n2, n, nn;
  374. Out o;
  375. SunProg *prog;
  376. SunStatus ok;
  377. for(i=0; i<cli->nprog; i++)
  378. if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
  379. break;
  380. if(i==cli->nprog){
  381. werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
  382. return -1;
  383. }
  384. prog = cli->prog[i];
  385. if(cli->chatty){
  386. fprint(2, "out: %B\n", &tx->rpc);
  387. fprint(2, "\t%C\n", tx);
  388. }
  389. n1 = sunRpcSize(&tx->rpc);
  390. n2 = sunCallSize(prog, tx);
  391. n = n1+n2;
  392. if(cli->needcount)
  393. n += 4;
  394. bp = emalloc(n);
  395. ep = bp+n;
  396. p = bp;
  397. if(cli->needcount){
  398. nn = n-4;
  399. p[0] = (nn>>24)|0x80;
  400. p[1] = nn>>16;
  401. p[2] = nn>>8;
  402. p[3] = nn;
  403. p += 4;
  404. }
  405. if((ok = sunRpcPack(p, ep, &p, &tx->rpc)) != SunSuccess
  406. || (ok = sunCallPack(prog, p, ep, &p, tx)) != SunSuccess){
  407. sunErrstr(ok);
  408. free(bp);
  409. return -1;
  410. }
  411. if(p != ep){
  412. werrstr("rpc: packet size mismatch");
  413. free(bp);
  414. return -1;
  415. }
  416. memset(&o, 0, sizeof o);
  417. o.creply = chancreate(sizeof(void*), 0);
  418. o.tag = tag;
  419. o.p = bp;
  420. o.n = n;
  421. sendp(cli->rpcchan, &o);
  422. recvp(o.creply);
  423. chanfree(o.creply);
  424. if(o.p == nil){
  425. werrstr("%s", o.err);
  426. return -1;
  427. }
  428. p = o.rpc.data;
  429. ep = p+o.rpc.ndata;
  430. rx->rpc = o.rpc;
  431. rx->rpc.proc = tx->rpc.proc;
  432. rx->rpc.prog = tx->rpc.prog;
  433. rx->rpc.vers = tx->rpc.vers;
  434. rx->type = (rx->rpc.proc<<1)|1;
  435. if((ok = sunCallUnpack(prog, p, ep, &p, rx)) != SunSuccess){
  436. sunErrstr(ok);
  437. werrstr("unpack: %r");
  438. free(o.p);
  439. return -1;
  440. }
  441. if(cli->chatty){
  442. fprint(2, "in: %B\n", &rx->rpc);
  443. fprint(2, "in:\t%C\n", rx);
  444. }
  445. if(tofree)
  446. *tofree = o.p;
  447. else
  448. free(o.p);
  449. return 0;
  450. }