client.c 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. /*
  2. * Sun RPC client.
  3. */
  4. #include <u.h>
  5. #include <libc.h>
  6. #include <thread.h>
  7. #include <sunrpc.h>
  8. typedef struct Out Out;
  9. struct Out
  10. {
  11. char err[ERRMAX]; /* error string */
  12. Channel *creply; /* send to finish rpc */
  13. uchar *p; /* pending request packet */
  14. int n; /* size of request */
  15. ulong tag; /* flush tag of pending request */
  16. ulong xid; /* xid of pending request */
  17. ulong st; /* first send time */
  18. ulong t; /* resend time */
  19. int nresend; /* number of resends */
  20. SunRpc rpc; /* response rpc */
  21. };
  22. static void
  23. udpThread(void *v)
  24. {
  25. uchar *p, *buf;
  26. Ioproc *io;
  27. int n;
  28. SunClient *cli;
  29. enum { BufSize = 65536 };
  30. cli = v;
  31. buf = emalloc(BufSize);
  32. io = ioproc();
  33. p = nil;
  34. for(;;){
  35. n = ioread(io, cli->fd, buf, BufSize);
  36. if(n <= 0)
  37. break;
  38. p = emalloc(4+n);
  39. memmove(p+4, buf, n);
  40. p[0] = n>>24;
  41. p[1] = n>>16;
  42. p[2] = n>>8;
  43. p[3] = n;
  44. if(sendp(cli->readchan, p) == 0)
  45. break;
  46. p = nil;
  47. }
  48. free(p);
  49. closeioproc(io);
  50. while(send(cli->dying, nil) == -1)
  51. ;
  52. }
  53. static void
  54. netThread(void *v)
  55. {
  56. uchar *p, buf[4];
  57. Ioproc *io;
  58. uint n, tot;
  59. int done;
  60. SunClient *cli;
  61. cli = v;
  62. io = ioproc();
  63. tot = 0;
  64. p = nil;
  65. for(;;){
  66. n = ioreadn(io, cli->fd, buf, 4);
  67. if(n != 4)
  68. break;
  69. n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
  70. if(cli->chatty)
  71. fprint(2, "%.8ux...", n);
  72. done = n&0x80000000;
  73. n &= ~0x80000000;
  74. if(tot == 0){
  75. p = emalloc(4+n);
  76. tot = 4;
  77. }else
  78. p = erealloc(p, tot+n);
  79. if(ioreadn(io, cli->fd, p+tot, n) != n)
  80. break;
  81. tot += n;
  82. if(done){
  83. p[0] = tot>>24;
  84. p[1] = tot>>16;
  85. p[2] = tot>>8;
  86. p[3] = tot;
  87. if(sendp(cli->readchan, p) == 0)
  88. break;
  89. p = nil;
  90. tot = 0;
  91. }
  92. }
  93. free(p);
  94. closeioproc(io);
  95. while(send(cli->dying, 0) == -1)
  96. ;
  97. }
  98. static void
  99. timerThread(void *v)
  100. {
  101. Ioproc *io;
  102. SunClient *cli;
  103. cli = v;
  104. io = ioproc();
  105. for(;;){
  106. if(iosleep(io, 200) < 0)
  107. break;
  108. if(sendul(cli->timerchan, 0) == 0)
  109. break;
  110. }
  111. closeioproc(io);
  112. while(send(cli->dying, 0) == -1)
  113. ;
  114. }
  115. static ulong
  116. msec(void)
  117. {
  118. return nsec()/1000000;
  119. }
  120. static ulong
  121. twait(ulong rtt, int nresend)
  122. {
  123. ulong t;
  124. t = rtt;
  125. if(nresend <= 1)
  126. {}
  127. else if(nresend <= 3)
  128. t *= 2;
  129. else if(nresend <= 18)
  130. t <<= nresend-2;
  131. else
  132. t = 60*1000;
  133. if(t > 60*1000)
  134. t = 60*1000;
  135. return t;
  136. }
  137. static void
  138. rpcMuxThread(void *v)
  139. {
  140. uchar *buf, *p, *ep;
  141. int i, n, nout, mout;
  142. ulong t, xidgen, tag;
  143. Alt a[5];
  144. Out *o, **out;
  145. SunRpc rpc;
  146. SunClient *cli;
  147. cli = v;
  148. mout = 16;
  149. nout = 0;
  150. out = emalloc(mout*sizeof(out[0]));
  151. xidgen = truerand();
  152. a[0].op = CHANRCV;
  153. a[0].c = cli->rpcchan;
  154. a[0].v = &o;
  155. a[1].op = CHANNOP;
  156. a[1].c = cli->timerchan;
  157. a[1].v = nil;
  158. a[2].op = CHANRCV;
  159. a[2].c = cli->flushchan;
  160. a[2].v = &tag;
  161. a[3].op = CHANRCV;
  162. a[3].c = cli->readchan;
  163. a[3].v = &buf;
  164. a[4].op = CHANEND;
  165. for(;;){
  166. switch(alt(a)){
  167. case 0: /* o = <-rpcchan */
  168. if(o == nil)
  169. goto Done;
  170. cli->nsend++;
  171. /* set xid */
  172. o->xid = ++xidgen;
  173. if(cli->needcount)
  174. p = o->p+4;
  175. else
  176. p = o->p;
  177. p[0] = xidgen>>24;
  178. p[1] = xidgen>>16;
  179. p[2] = xidgen>>8;
  180. p[3] = xidgen;
  181. if(write(cli->fd, o->p, o->n) != o->n){
  182. free(o->p);
  183. o->p = nil;
  184. snprint(o->err, sizeof o->err, "write: %r");
  185. sendp(o->creply, 0);
  186. break;
  187. }
  188. if(nout >= mout){
  189. mout *= 2;
  190. out = erealloc(out, mout*sizeof(out[0]));
  191. }
  192. o->st = msec();
  193. o->nresend = 0;
  194. o->t = o->st + twait(cli->rtt.avg, 0);
  195. if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
  196. out[nout++] = o;
  197. a[1].op = CHANRCV;
  198. break;
  199. case 1: /* <-timerchan */
  200. t = msec();
  201. for(i=0; i<nout; i++){
  202. o = out[i];
  203. if((int)(t - o->t) > 0){
  204. if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
  205. if(cli->maxwait && t - o->st >= cli->maxwait){
  206. free(o->p);
  207. o->p = nil;
  208. strcpy(o->err, "timeout");
  209. sendp(o->creply, 0);
  210. out[i--] = out[--nout];
  211. continue;
  212. }
  213. cli->nresend++;
  214. o->nresend++;
  215. o->t = t + twait(cli->rtt.avg, o->nresend);
  216. if(write(cli->fd, o->p, o->n) != o->n){
  217. free(o->p);
  218. o->p = nil;
  219. snprint(o->err, sizeof o->err, "rewrite: %r");
  220. sendp(o->creply, 0);
  221. out[i--] = out[--nout];
  222. continue;
  223. }
  224. }
  225. }
  226. /* stop ticking if no work; rpcchan will turn it back on */
  227. if(nout == 0)
  228. a[1].op = CHANNOP;
  229. break;
  230. case 2: /* tag = <-flushchan */
  231. for(i=0; i<nout; i++){
  232. o = out[i];
  233. if(o->tag == tag){
  234. out[i--] = out[--nout];
  235. strcpy(o->err, "flushed");
  236. free(o->p);
  237. o->p = nil;
  238. sendp(o->creply, 0);
  239. }
  240. }
  241. break;
  242. case 3: /* buf = <-readchan */
  243. p = buf;
  244. n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
  245. p += 4;
  246. ep = p+n;
  247. if(sunRpcUnpack(p, ep, &p, &rpc) < 0){
  248. fprint(2, "in: %.*H unpack failed\n", n, buf+4);
  249. free(buf);
  250. break;
  251. }
  252. if(cli->chatty)
  253. fprint(2, "in: %B\n", &rpc);
  254. if(rpc.iscall){
  255. fprint(2, "did not get reply\n");
  256. free(buf);
  257. break;
  258. }
  259. o = nil;
  260. for(i=0; i<nout; i++){
  261. o = out[i];
  262. if(o->xid == rpc.xid)
  263. break;
  264. }
  265. if(i==nout){
  266. if(cli->chatty) fprint(2, "did not find waiting request\n");
  267. free(buf);
  268. break;
  269. }
  270. out[i] = out[--nout];
  271. free(o->p);
  272. o->p = nil;
  273. if(rpc.status == SunSuccess){
  274. o->p = buf;
  275. o->rpc = rpc;
  276. }else{
  277. o->p = nil;
  278. free(buf);
  279. sunErrstr(rpc.status);
  280. rerrstr(o->err, sizeof o->err);
  281. }
  282. sendp(o->creply, 0);
  283. break;
  284. }
  285. }
  286. Done:
  287. free(out);
  288. sendp(cli->dying, 0);
  289. }
  290. SunClient*
  291. sunDial(char *address)
  292. {
  293. int fd;
  294. SunClient *cli;
  295. if((fd = dial(address, 0, 0, 0)) < 0)
  296. return nil;
  297. cli = emalloc(sizeof(SunClient));
  298. cli->fd = fd;
  299. cli->maxwait = 15000;
  300. cli->rtt.avg = 1000;
  301. cli->dying = chancreate(sizeof(void*), 0);
  302. cli->rpcchan = chancreate(sizeof(Out*), 0);
  303. cli->timerchan = chancreate(sizeof(ulong), 0);
  304. cli->flushchan = chancreate(sizeof(ulong), 0);
  305. cli->readchan = chancreate(sizeof(uchar*), 0);
  306. if(strstr(address, "udp!")){
  307. cli->needcount = 0;
  308. cli->nettid = threadcreate(udpThread, cli, SunStackSize);
  309. cli->timertid = threadcreate(timerThread, cli, SunStackSize);
  310. }else{
  311. cli->needcount = 1;
  312. cli->nettid = threadcreate(netThread, cli, SunStackSize);
  313. /* assume reliable: don't need timer */
  314. /* BUG: netThread should know how to redial */
  315. }
  316. threadcreate(rpcMuxThread, cli, SunStackSize);
  317. return cli;
  318. }
  319. void
  320. sunClientClose(SunClient *cli)
  321. {
  322. int n;
  323. /*
  324. * Threadints get you out of any stuck system calls
  325. * or thread rendezvouses, but do nothing if the thread
  326. * is in the ready state. Keep interrupting until it takes.
  327. */
  328. n = 0;
  329. if(!cli->timertid)
  330. n++;
  331. while(n < 2){
  332. threadint(cli->nettid);
  333. if(cli->timertid)
  334. threadint(cli->timertid);
  335. yield();
  336. while(nbrecv(cli->dying, nil) == 1)
  337. n++;
  338. }
  339. sendp(cli->rpcchan, 0);
  340. recvp(cli->dying);
  341. /* everyone's gone: clean up */
  342. close(cli->fd);
  343. chanfree(cli->flushchan);
  344. chanfree(cli->readchan);
  345. chanfree(cli->timerchan);
  346. free(cli);
  347. }
  348. void
  349. sunClientFlushRpc(SunClient *cli, ulong tag)
  350. {
  351. sendul(cli->flushchan, tag);
  352. }
  353. void
  354. sunClientProg(SunClient *cli, SunProg *p)
  355. {
  356. if(cli->nprog%16 == 0)
  357. cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
  358. cli->prog[cli->nprog++] = p;
  359. }
  360. int
  361. sunClientRpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree)
  362. {
  363. uchar *bp, *p, *ep;
  364. int i, n1, n2, n, nn;
  365. Out o;
  366. SunProg *prog;
  367. SunStatus ok;
  368. for(i=0; i<cli->nprog; i++)
  369. if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
  370. break;
  371. if(i==cli->nprog){
  372. werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
  373. return -1;
  374. }
  375. prog = cli->prog[i];
  376. if(cli->chatty){
  377. fprint(2, "out: %B\n", &tx->rpc);
  378. fprint(2, "\t%C\n", tx);
  379. }
  380. n1 = sunRpcSize(&tx->rpc);
  381. n2 = sunCallSize(prog, tx);
  382. n = n1+n2;
  383. if(cli->needcount)
  384. n += 4;
  385. bp = emalloc(n);
  386. ep = bp+n;
  387. p = bp;
  388. if(cli->needcount){
  389. nn = n-4;
  390. p[0] = (nn>>24)|0x80;
  391. p[1] = nn>>16;
  392. p[2] = nn>>8;
  393. p[3] = nn;
  394. p += 4;
  395. }
  396. if((ok = sunRpcPack(p, ep, &p, &tx->rpc)) != SunSuccess
  397. || (ok = sunCallPack(prog, p, ep, &p, tx)) != SunSuccess){
  398. sunErrstr(ok);
  399. free(bp);
  400. return -1;
  401. }
  402. if(p != ep){
  403. werrstr("rpc: packet size mismatch");
  404. free(bp);
  405. return -1;
  406. }
  407. memset(&o, 0, sizeof o);
  408. o.creply = chancreate(sizeof(void*), 0);
  409. o.tag = tag;
  410. o.p = bp;
  411. o.n = n;
  412. sendp(cli->rpcchan, &o);
  413. recvp(o.creply);
  414. chanfree(o.creply);
  415. if(o.p == nil){
  416. werrstr("%s", o.err);
  417. return -1;
  418. }
  419. p = o.rpc.data;
  420. ep = p+o.rpc.ndata;
  421. rx->rpc = o.rpc;
  422. rx->rpc.proc = tx->rpc.proc;
  423. rx->rpc.prog = tx->rpc.prog;
  424. rx->rpc.vers = tx->rpc.vers;
  425. rx->type = (rx->rpc.proc<<1)|1;
  426. if((ok = sunCallUnpack(prog, p, ep, &p, rx)) != SunSuccess){
  427. sunErrstr(ok);
  428. werrstr("unpack: %r");
  429. free(o.p);
  430. return -1;
  431. }
  432. if(cli->chatty){
  433. fprint(2, "in: %B\n", &rx->rpc);
  434. fprint(2, "in:\t%C\n", rx);
  435. }
  436. if(tofree)
  437. *tofree = o.p;
  438. else
  439. free(o.p);
  440. return 0;
  441. }