rudp.c 21 KB


  1. /*
  2. * Reliable User Datagram Protocol, currently only for IPv4.
  3. * This protocol is compatible with UDP's packet format.
  4. * It could be done over UDP if need be.
  5. */
  6. #include "u.h"
  7. #include "../port/lib.h"
  8. #include "mem.h"
  9. #include "dat.h"
  10. #include "fns.h"
  11. #include "../port/error.h"
  12. #include "ip.h"
  13. #define DEBUG 0
  14. #define DPRINT if(DEBUG)print
  15. #define SEQDIFF(a,b) ( (a)>=(b)?\
  16. (a)-(b):\
  17. 0xffffffffUL-((b)-(a)) )
  18. #define INSEQ(a,start,end) ( (start)<=(end)?\
  19. ((a)>(start)&&(a)<=(end)):\
  20. ((a)>(start)||(a)<=(end)) )
  21. #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
  22. #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
  23. enum
  24. {
  25. UDP_HDRSIZE = 20, /* pseudo header + udp header */
  26. UDP_PHDRSIZE = 12, /* pseudo header */
  27. UDP_RHDRSIZE = 36, /* pseudo header + udp header + rudp header */
  28. UDP_IPHDR = 8, /* ip header */
  29. IP_UDPPROTO = 254,
  30. UDP_USEAD7 = 52,
  31. UDP_USEAD6 = 36,
  32. UDP_USEAD4 = 12,
  33. Rudprxms = 200,
  34. Rudptickms = 50,
  35. Rudpmaxxmit = 10,
  36. Maxunacked = 100,
  37. };
  38. #define Hangupgen 0xffffffff /* used only in hangup messages */
  39. typedef struct Udphdr Udphdr;
  40. struct Udphdr
  41. {
  42. /* ip header */
  43. uchar vihl; /* Version and header length */
  44. uchar tos; /* Type of service */
  45. uchar length[2]; /* packet length */
  46. uchar id[2]; /* Identification */
  47. uchar frag[2]; /* Fragment information */
  48. /* pseudo header starts here */
  49. uchar Unused;
  50. uchar udpproto; /* Protocol */
  51. uchar udpplen[2]; /* Header plus data length */
  52. uchar udpsrc[4]; /* Ip source */
  53. uchar udpdst[4]; /* Ip destination */
  54. /* udp header */
  55. uchar udpsport[2]; /* Source port */
  56. uchar udpdport[2]; /* Destination port */
  57. uchar udplen[2]; /* data length */
  58. uchar udpcksum[2]; /* Checksum */
  59. };
  60. typedef struct Rudphdr Rudphdr;
  61. struct Rudphdr
  62. {
  63. /* ip header */
  64. uchar vihl; /* Version and header length */
  65. uchar tos; /* Type of service */
  66. uchar length[2]; /* packet length */
  67. uchar id[2]; /* Identification */
  68. uchar frag[2]; /* Fragment information */
  69. /* pseudo header starts here */
  70. uchar Unused;
  71. uchar udpproto; /* Protocol */
  72. uchar udpplen[2]; /* Header plus data length */
  73. uchar udpsrc[4]; /* Ip source */
  74. uchar udpdst[4]; /* Ip destination */
  75. /* udp header */
  76. uchar udpsport[2]; /* Source port */
  77. uchar udpdport[2]; /* Destination port */
  78. uchar udplen[2]; /* data length (includes rudp header) */
  79. uchar udpcksum[2]; /* Checksum */
  80. /* rudp header */
  81. uchar relseq[4]; /* id of this packet (or 0) */
  82. uchar relsgen[4]; /* generation/time stamp */
  83. uchar relack[4]; /* packet being acked (or 0) */
  84. uchar relagen[4]; /* generation/time stamp */
  85. };
  86. /*
  87. * one state structure per destination
  88. */
  89. typedef struct Reliable Reliable;
  90. struct Reliable
  91. {
  92. Ref;
  93. Reliable *next;
  94. uchar addr[IPaddrlen]; /* always V6 when put here */
  95. ushort port;
  96. Block *unacked; /* unacked msg list */
  97. Block *unackedtail; /* and its tail */
  98. int timeout; /* time since first unacked msg sent */
  99. int xmits; /* number of times first unacked msg sent */
  100. ulong sndseq; /* next packet to be sent */
  101. ulong sndgen; /* and its generation */
  102. ulong rcvseq; /* last packet received */
  103. ulong rcvgen; /* and its generation */
  104. ulong acksent; /* last ack sent */
  105. ulong ackrcvd; /* last msg for which ack was rcvd */
  106. /* flow control */
  107. QLock lock;
  108. Rendez vous;
  109. int blocked;
  110. };
  111. /* MIB II counters */
  112. typedef struct Rudpstats Rudpstats;
  113. struct Rudpstats
  114. {
  115. ulong rudpInDatagrams;
  116. ulong rudpNoPorts;
  117. ulong rudpInErrors;
  118. ulong rudpOutDatagrams;
  119. };
  120. typedef struct Rudppriv Rudppriv;
  121. struct Rudppriv
  122. {
  123. Ipht ht;
  124. /* MIB counters */
  125. Rudpstats ustats;
  126. /* non-MIB stats */
  127. ulong csumerr; /* checksum errors */
  128. ulong lenerr; /* short packet */
  129. ulong rxmits; /* # of retransmissions */
  130. ulong orders; /* # of out of order pkts */
  131. /* keeping track of the ack kproc */
  132. int ackprocstarted;
  133. QLock apl;
  134. };
  135. static ulong generation = 0;
  136. static Rendez rend;
  137. /*
  138. * protocol specific part of Conv
  139. */
  140. typedef struct Rudpcb Rudpcb;
  141. struct Rudpcb
  142. {
  143. QLock;
  144. uchar headers;
  145. uchar randdrop;
  146. Reliable *r;
  147. };
  148. /*
  149. * local functions
  150. */
  151. void relsendack(Conv*, Reliable*, int);
  152. int reliput(Conv*, Block*, uchar*, ushort);
  153. Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
  154. void relput(Reliable*);
  155. void relforget(Conv *, uchar*, int, int);
  156. void relackproc(void *);
  157. void relackq(Reliable *, Block*);
  158. void relhangup(Conv *, Reliable*);
  159. void relrexmit(Conv *, Reliable*);
  160. void relput(Reliable*);
  161. void rudpkick(void *x);
  162. static void
  163. rudpstartackproc(Proto *rudp)
  164. {
  165. Rudppriv *rpriv;
  166. char kpname[KNAMELEN];
  167. rpriv = rudp->priv;
  168. if(rpriv->ackprocstarted == 0){
  169. qlock(&rpriv->apl);
  170. if(rpriv->ackprocstarted == 0){
  171. sprint(kpname, "#I%drudpack", rudp->f->dev);
  172. kproc(kpname, relackproc, rudp);
  173. rpriv->ackprocstarted = 1;
  174. }
  175. qunlock(&rpriv->apl);
  176. }
  177. }
  178. static char*
  179. rudpconnect(Conv *c, char **argv, int argc)
  180. {
  181. char *e;
  182. Rudppriv *upriv;
  183. upriv = c->p->priv;
  184. rudpstartackproc(c->p);
  185. e = Fsstdconnect(c, argv, argc);
  186. Fsconnected(c, e);
  187. iphtadd(&upriv->ht, c);
  188. return e;
  189. }
  190. static int
  191. rudpstate(Conv *c, char *state, int n)
  192. {
  193. Rudpcb *ucb;
  194. Reliable *r;
  195. int m;
  196. m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
  197. ucb = (Rudpcb*)c->ptcl;
  198. qlock(ucb);
  199. for(r = ucb->r; r; r = r->next)
  200. m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
  201. m += snprint(state+m, n-m, "\n");
  202. qunlock(ucb);
  203. return m;
  204. }
  205. static char*
  206. rudpannounce(Conv *c, char** argv, int argc)
  207. {
  208. char *e;
  209. Rudppriv *upriv;
  210. upriv = c->p->priv;
  211. rudpstartackproc(c->p);
  212. e = Fsstdannounce(c, argv, argc);
  213. if(e != nil)
  214. return e;
  215. Fsconnected(c, nil);
  216. iphtadd(&upriv->ht, c);
  217. return nil;
  218. }
  219. static void
  220. rudpcreate(Conv *c)
  221. {
  222. c->rq = qopen(64*1024, Qmsg, 0, 0);
  223. c->wq = qopen(64*1024, Qkick, rudpkick, c);
  224. }
  225. static void
  226. rudpclose(Conv *c)
  227. {
  228. Rudpcb *ucb;
  229. Reliable *r, *nr;
  230. Rudppriv *upriv;
  231. upriv = c->p->priv;
  232. iphtrem(&upriv->ht, c);
  233. /* force out any delayed acks */
  234. ucb = (Rudpcb*)c->ptcl;
  235. qlock(ucb);
  236. for(r = ucb->r; r; r = r->next){
  237. if(r->acksent != r->rcvseq)
  238. relsendack(c, r, 0);
  239. }
  240. qunlock(ucb);
  241. qclose(c->rq);
  242. qclose(c->wq);
  243. qclose(c->eq);
  244. ipmove(c->laddr, IPnoaddr);
  245. ipmove(c->raddr, IPnoaddr);
  246. c->lport = 0;
  247. c->rport = 0;
  248. ucb->headers = 0;
  249. ucb->randdrop = 0;
  250. qlock(ucb);
  251. for(r = ucb->r; r; r = nr){
  252. if(r->acksent != r->rcvseq)
  253. relsendack(c, r, 0);
  254. nr = r->next;
  255. relhangup(c, r);
  256. relput(r);
  257. }
  258. ucb->r = 0;
  259. qunlock(ucb);
  260. }
  261. /*
  262. * randomly don't send packets
  263. */
  264. static void
  265. doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
  266. {
  267. Rudpcb *ucb;
  268. ucb = (Rudpcb*)c->ptcl;
  269. if(ucb->randdrop && nrand(100) < ucb->randdrop)
  270. freeblist(bp);
  271. else
  272. ipoput4(f, bp, x, ttl, tos, nil);
  273. }
  274. int
  275. flow(void *v)
  276. {
  277. Reliable *r = v;
  278. return UNACKED(r) <= Maxunacked;
  279. }
  280. void
  281. rudpkick(void *x)
  282. {
  283. Conv *c = x;
  284. Udphdr *uh;
  285. ushort rport;
  286. uchar laddr[IPaddrlen], raddr[IPaddrlen];
  287. Block *bp;
  288. Rudpcb *ucb;
  289. Rudphdr *rh;
  290. Reliable *r;
  291. int dlen, ptcllen;
  292. Rudppriv *upriv;
  293. Fs *f;
  294. upriv = c->p->priv;
  295. f = c->p->f;
  296. netlog(c->p->f, Logrudp, "rudp: kick\n");
  297. bp = qget(c->wq);
  298. if(bp == nil)
  299. return;
  300. ucb = (Rudpcb*)c->ptcl;
  301. switch(ucb->headers) {
  302. case 7:
  303. /* get user specified addresses */
  304. bp = pullupblock(bp, UDP_USEAD7);
  305. if(bp == nil)
  306. return;
  307. ipmove(raddr, bp->rp);
  308. bp->rp += IPaddrlen;
  309. ipmove(laddr, bp->rp);
  310. bp->rp += IPaddrlen;
  311. /* pick interface closest to dest */
  312. if(ipforme(f, laddr) != Runi)
  313. findlocalip(f, laddr, raddr);
  314. bp->rp += IPaddrlen; /* Ignore ifc address */
  315. rport = nhgets(bp->rp);
  316. bp->rp += 2+2; /* Ignore local port */
  317. break;
  318. case 6: /* OBS */
  319. /* get user specified addresses */
  320. bp = pullupblock(bp, UDP_USEAD6);
  321. if(bp == nil)
  322. return;
  323. ipmove(raddr, bp->rp);
  324. bp->rp += IPaddrlen;
  325. ipmove(laddr, bp->rp);
  326. bp->rp += IPaddrlen;
  327. /* pick interface closest to dest */
  328. if(ipforme(f, laddr) != Runi)
  329. findlocalip(f, laddr, raddr);
  330. rport = nhgets(bp->rp);
  331. bp->rp += 4; /* Igonore local port */
  332. break;
  333. default:
  334. ipmove(raddr, c->raddr);
  335. ipmove(laddr, c->laddr);
  336. rport = c->rport;
  337. break;
  338. }
  339. dlen = blocklen(bp);
  340. /* Make space to fit rudp & ip header */
  341. bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
  342. if(bp == nil)
  343. return;
  344. uh = (Udphdr *)(bp->rp);
  345. uh->vihl = IP_VER4;
  346. rh = (Rudphdr*)uh;
  347. ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
  348. uh->Unused = 0;
  349. uh->udpproto = IP_UDPPROTO;
  350. uh->frag[0] = 0;
  351. uh->frag[1] = 0;
  352. hnputs(uh->udpplen, ptcllen);
  353. switch(ucb->headers){
  354. case 6: /* OBS */
  355. case 7:
  356. v6tov4(uh->udpdst, raddr);
  357. hnputs(uh->udpdport, rport);
  358. v6tov4(uh->udpsrc, laddr);
  359. break;
  360. default:
  361. v6tov4(uh->udpdst, c->raddr);
  362. hnputs(uh->udpdport, c->rport);
  363. if(ipcmp(c->laddr, IPnoaddr) == 0)
  364. findlocalip(f, c->laddr, c->raddr);
  365. v6tov4(uh->udpsrc, c->laddr);
  366. break;
  367. }
  368. hnputs(uh->udpsport, c->lport);
  369. hnputs(uh->udplen, ptcllen);
  370. uh->udpcksum[0] = 0;
  371. uh->udpcksum[1] = 0;
  372. qlock(ucb);
  373. r = relstate(ucb, raddr, rport, "kick");
  374. r->sndseq = NEXTSEQ(r->sndseq);
  375. hnputl(rh->relseq, r->sndseq);
  376. hnputl(rh->relsgen, r->sndgen);
  377. hnputl(rh->relack, r->rcvseq); /* ACK last rcvd packet */
  378. hnputl(rh->relagen, r->rcvgen);
  379. if(r->rcvseq != r->acksent)
  380. r->acksent = r->rcvseq;
  381. hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
  382. relackq(r, bp);
  383. qunlock(ucb);
  384. upriv->ustats.rudpOutDatagrams++;
  385. DPRINT("sent: %lud/%lud, %lud/%lud\n",
  386. r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
  387. doipoput(c, f, bp, 0, c->ttl, c->tos);
  388. if(waserror()) {
  389. relput(r);
  390. qunlock(&r->lock);
  391. nexterror();
  392. }
  393. /* flow control of sorts */
  394. qlock(&r->lock);
  395. if(UNACKED(r) > Maxunacked){
  396. r->blocked = 1;
  397. sleep(&r->vous, flow, r);
  398. r->blocked = 0;
  399. }
  400. qunlock(&r->lock);
  401. relput(r);
  402. poperror();
  403. }
  404. void
  405. rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
  406. {
  407. int len, olen, ottl;
  408. Udphdr *uh;
  409. Conv *c;
  410. Rudpcb *ucb;
  411. uchar raddr[IPaddrlen], laddr[IPaddrlen];
  412. ushort rport, lport;
  413. Rudppriv *upriv;
  414. Fs *f;
  415. uchar *p;
  416. upriv = rudp->priv;
  417. f = rudp->f;
  418. upriv->ustats.rudpInDatagrams++;
  419. uh = (Udphdr*)(bp->rp);
  420. /* Put back pseudo header for checksum
  421. * (remember old values for icmpnoconv())
  422. */
  423. ottl = uh->Unused;
  424. uh->Unused = 0;
  425. len = nhgets(uh->udplen);
  426. olen = nhgets(uh->udpplen);
  427. hnputs(uh->udpplen, len);
  428. v4tov6(raddr, uh->udpsrc);
  429. v4tov6(laddr, uh->udpdst);
  430. lport = nhgets(uh->udpdport);
  431. rport = nhgets(uh->udpsport);
  432. if(nhgets(uh->udpcksum)) {
  433. if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
  434. upriv->ustats.rudpInErrors++;
  435. upriv->csumerr++;
  436. netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
  437. DPRINT("rudp: checksum error %I\n", raddr);
  438. freeblist(bp);
  439. return;
  440. }
  441. }
  442. qlock(rudp);
  443. c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
  444. if(c == nil){
  445. /* no conversation found */
  446. upriv->ustats.rudpNoPorts++;
  447. qunlock(rudp);
  448. netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
  449. laddr, lport);
  450. uh->Unused = ottl;
  451. hnputs(uh->udpplen, olen);
  452. icmpnoconv(f, bp);
  453. freeblist(bp);
  454. return;
  455. }
  456. ucb = (Rudpcb*)c->ptcl;
  457. qlock(ucb);
  458. qunlock(rudp);
  459. if(reliput(c, bp, raddr, rport) < 0){
  460. qunlock(ucb);
  461. freeb(bp);
  462. return;
  463. }
  464. /*
  465. * Trim the packet down to data size
  466. */
  467. len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
  468. bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
  469. if(bp == nil) {
  470. netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n",
  471. raddr, rport, laddr, lport);
  472. DPRINT("rudp: len err %I.%d -> %I.%d\n",
  473. raddr, rport, laddr, lport);
  474. upriv->lenerr++;
  475. return;
  476. }
  477. netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n",
  478. raddr, rport, laddr, lport, len);
  479. switch(ucb->headers){
  480. case 7:
  481. /* pass the src address */
  482. bp = padblock(bp, UDP_USEAD7);
  483. p = bp->rp;
  484. ipmove(p, raddr); p += IPaddrlen;
  485. ipmove(p, laddr); p += IPaddrlen;
  486. ipmove(p, ifc->lifc->local); p += IPaddrlen;
  487. hnputs(p, rport); p += 2;
  488. hnputs(p, lport);
  489. break;
  490. case 6: /* OBS */
  491. /* pass the src address */
  492. bp = padblock(bp, UDP_USEAD6);
  493. p = bp->rp;
  494. ipmove(p, raddr); p += IPaddrlen;
  495. ipmove(p, ipforme(f, laddr)==Runi ? laddr : ifc->lifc->local); p += IPaddrlen;
  496. hnputs(p, rport); p += 2;
  497. hnputs(p, lport);
  498. break;
  499. default:
  500. /* connection oriented rudp */
  501. if(ipcmp(c->raddr, IPnoaddr) == 0){
  502. /* save the src address in the conversation */
  503. ipmove(c->raddr, raddr);
  504. c->rport = rport;
  505. /* reply with the same ip address (if not broadcast) */
  506. if(ipforme(f, laddr) == Runi)
  507. ipmove(c->laddr, laddr);
  508. else
  509. v4tov6(c->laddr, ifc->lifc->local);
  510. }
  511. break;
  512. }
  513. if(bp->next)
  514. bp = concatblock(bp);
  515. if(qfull(c->rq)) {
  516. netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
  517. laddr, lport);
  518. freeblist(bp);
  519. }
  520. else
  521. qpass(c->rq, bp);
  522. qunlock(ucb);
  523. }
  524. static char *rudpunknown = "unknown rudp ctl request";
  525. char*
  526. rudpctl(Conv *c, char **f, int n)
  527. {
  528. Rudpcb *ucb;
  529. uchar ip[IPaddrlen];
  530. int x;
  531. ucb = (Rudpcb*)c->ptcl;
  532. if(n < 1)
  533. return rudpunknown;
  534. if(strcmp(f[0], "headers++4") == 0){
  535. ucb->headers = 7; /* new headers format */
  536. return nil;
  537. } else if(strcmp(f[0], "headers") == 0){ /* OBS */
  538. ucb->headers = 6;
  539. return nil;
  540. } else if(strcmp(f[0], "hangup") == 0){
  541. if(n < 3)
  542. return "bad syntax";
  543. parseip(ip, f[1]);
  544. x = atoi(f[2]);
  545. qlock(ucb);
  546. relforget(c, ip, x, 1);
  547. qunlock(ucb);
  548. return nil;
  549. } else if(strcmp(f[0], "randdrop") == 0){
  550. x = 10; /* default is 10% */
  551. if(n > 1)
  552. x = atoi(f[1]);
  553. if(x > 100 || x < 0)
  554. return "illegal rudp drop rate";
  555. ucb->randdrop = x;
  556. return nil;
  557. }
  558. return rudpunknown;
  559. }
  560. void
  561. rudpadvise(Proto *rudp, Block *bp, char *msg)
  562. {
  563. Udphdr *h;
  564. uchar source[IPaddrlen], dest[IPaddrlen];
  565. ushort psource, pdest;
  566. Conv *s, **p;
  567. h = (Udphdr*)(bp->rp);
  568. v4tov6(dest, h->udpdst);
  569. v4tov6(source, h->udpsrc);
  570. psource = nhgets(h->udpsport);
  571. pdest = nhgets(h->udpdport);
  572. /* Look for a connection */
  573. for(p = rudp->conv; *p; p++) {
  574. s = *p;
  575. if(s->rport == pdest)
  576. if(s->lport == psource)
  577. if(ipcmp(s->raddr, dest) == 0)
  578. if(ipcmp(s->laddr, source) == 0){
  579. qhangup(s->rq, msg);
  580. qhangup(s->wq, msg);
  581. break;
  582. }
  583. }
  584. freeblist(bp);
  585. }
  586. int
  587. rudpstats(Proto *rudp, char *buf, int len)
  588. {
  589. Rudppriv *upriv;
  590. upriv = rudp->priv;
  591. return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
  592. upriv->ustats.rudpInDatagrams,
  593. upriv->ustats.rudpNoPorts,
  594. upriv->ustats.rudpInErrors,
  595. upriv->ustats.rudpOutDatagrams,
  596. upriv->rxmits,
  597. upriv->orders);
  598. }
  599. void
  600. rudpinit(Fs *fs)
  601. {
  602. Proto *rudp;
  603. rudp = smalloc(sizeof(Proto));
  604. rudp->priv = smalloc(sizeof(Rudppriv));
  605. rudp->name = "rudp";
  606. rudp->connect = rudpconnect;
  607. rudp->announce = rudpannounce;
  608. rudp->ctl = rudpctl;
  609. rudp->state = rudpstate;
  610. rudp->create = rudpcreate;
  611. rudp->close = rudpclose;
  612. rudp->rcv = rudpiput;
  613. rudp->advise = rudpadvise;
  614. rudp->stats = rudpstats;
  615. rudp->ipproto = IP_UDPPROTO;
  616. rudp->nc = 16;
  617. rudp->ptclsize = sizeof(Rudpcb);
  618. Fsproto(fs, rudp);
  619. }
  620. /*********************************************/
  621. /* Here starts the reliable helper functions */
  622. /*********************************************/
  623. /*
  624. * Enqueue a copy of an unacked block for possible retransmissions
  625. */
  626. void
  627. relackq(Reliable *r, Block *bp)
  628. {
  629. Block *np;
  630. np = copyblock(bp, blocklen(bp));
  631. if(r->unacked)
  632. r->unackedtail->list = np;
  633. else {
  634. /* restart timer */
  635. r->timeout = 0;
  636. r->xmits = 1;
  637. r->unacked = np;
  638. }
  639. r->unackedtail = np;
  640. np->list = nil;
  641. }
  642. /*
  643. * retransmit unacked blocks
  644. */
  645. void
  646. relackproc(void *a)
  647. {
  648. Rudpcb *ucb;
  649. Proto *rudp;
  650. Reliable *r;
  651. Conv **s, *c;
  652. rudp = (Proto *)a;
  653. loop:
  654. tsleep(&up->sleep, return0, 0, Rudptickms);
  655. for(s = rudp->conv; *s; s++) {
  656. c = *s;
  657. ucb = (Rudpcb*)c->ptcl;
  658. qlock(ucb);
  659. for(r = ucb->r; r; r = r->next) {
  660. if(r->unacked != nil){
  661. r->timeout += Rudptickms;
  662. if(r->timeout > Rudprxms*r->xmits)
  663. relrexmit(c, r);
  664. }
  665. if(r->acksent != r->rcvseq)
  666. relsendack(c, r, 0);
  667. }
  668. qunlock(ucb);
  669. }
  670. goto loop;
  671. }
  672. /*
  673. * get the state record for a conversation
  674. */
  675. Reliable*
  676. relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
  677. {
  678. Reliable *r, **l;
  679. l = &ucb->r;
  680. for(r = *l; r; r = *l){
  681. if(memcmp(addr, r->addr, IPaddrlen) == 0 &&
  682. port == r->port)
  683. break;
  684. l = &r->next;
  685. }
  686. /* no state for this addr/port, create some */
  687. if(r == nil){
  688. while(generation == 0)
  689. generation = rand();
  690. DPRINT("from %s new state %lud for %I!%ud\n",
  691. from, generation, addr, port);
  692. r = smalloc(sizeof(Reliable));
  693. memmove(r->addr, addr, IPaddrlen);
  694. r->port = port;
  695. r->unacked = 0;
  696. if(generation == Hangupgen)
  697. generation++;
  698. r->sndgen = generation++;
  699. r->sndseq = 0;
  700. r->ackrcvd = 0;
  701. r->rcvgen = 0;
  702. r->rcvseq = 0;
  703. r->acksent = 0;
  704. r->xmits = 0;
  705. r->timeout = 0;
  706. r->ref = 0;
  707. incref(r); /* one reference for being in the list */
  708. *l = r;
  709. }
  710. incref(r);
  711. return r;
  712. }
  713. void
  714. relput(Reliable *r)
  715. {
  716. if(decref(r) == 0)
  717. free(r);
  718. }
  719. /*
  720. * forget a Reliable state
  721. */
  722. void
  723. relforget(Conv *c, uchar *ip, int port, int originator)
  724. {
  725. Rudpcb *ucb;
  726. Reliable *r, **l;
  727. ucb = (Rudpcb*)c->ptcl;
  728. l = &ucb->r;
  729. for(r = *l; r; r = *l){
  730. if(ipcmp(ip, r->addr) == 0 && port == r->port){
  731. *l = r->next;
  732. if(originator)
  733. relsendack(c, r, 1);
  734. relhangup(c, r);
  735. relput(r); /* remove from the list */
  736. break;
  737. }
  738. l = &r->next;
  739. }
  740. }
  741. /*
  742. * process a rcvd reliable packet. return -1 if not to be passed to user process,
  743. * 0 therwise.
  744. *
  745. * called with ucb locked.
  746. */
  747. int
  748. reliput(Conv *c, Block *bp, uchar *addr, ushort port)
  749. {
  750. Block *nbp;
  751. Rudpcb *ucb;
  752. Rudppriv *upriv;
  753. Udphdr *uh;
  754. Reliable *r;
  755. Rudphdr *rh;
  756. ulong seq, ack, sgen, agen, ackreal;
  757. int rv = -1;
  758. /* get fields */
  759. uh = (Udphdr*)(bp->rp);
  760. rh = (Rudphdr*)uh;
  761. seq = nhgetl(rh->relseq);
  762. sgen = nhgetl(rh->relsgen);
  763. ack = nhgetl(rh->relack);
  764. agen = nhgetl(rh->relagen);
  765. upriv = c->p->priv;
  766. ucb = (Rudpcb*)c->ptcl;
  767. r = relstate(ucb, addr, port, "input");
  768. DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n",
  769. seq, sgen, ack, agen, r->sndgen);
  770. /* if acking an incorrect generation, ignore */
  771. if(ack && agen != r->sndgen)
  772. goto out;
  773. /* Look for a hangup */
  774. if(sgen == Hangupgen) {
  775. if(agen == r->sndgen)
  776. relforget(c, addr, port, 0);
  777. goto out;
  778. }
  779. /* make sure we're not talking to a new remote side */
  780. if(r->rcvgen != sgen){
  781. if(seq != 0 && seq != 1)
  782. goto out;
  783. /* new connection */
  784. if(r->rcvgen != 0){
  785. DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
  786. relhangup(c, r);
  787. }
  788. r->rcvgen = sgen;
  789. }
  790. /* dequeue acked packets */
  791. if(ack && agen == r->sndgen){
  792. ackreal = 0;
  793. while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
  794. nbp = r->unacked;
  795. r->unacked = nbp->list;
  796. DPRINT("%lud/%lud acked, r->sndgen = %lud\n",
  797. ack, agen, r->sndgen);
  798. freeb(nbp);
  799. r->ackrcvd = NEXTSEQ(r->ackrcvd);
  800. ackreal = 1;
  801. }
  802. /* flow control */
  803. if(UNACKED(r) < Maxunacked/8 && r->blocked)
  804. wakeup(&r->vous);
  805. /*
  806. * retransmit next packet if the acked packet
  807. * was transmitted more than once
  808. */
  809. if(ackreal && r->unacked != nil){
  810. r->timeout = 0;
  811. if(r->xmits > 1){
  812. r->xmits = 1;
  813. relrexmit(c, r);
  814. }
  815. }
  816. }
  817. /* no message or input queue full */
  818. if(seq == 0 || qfull(c->rq))
  819. goto out;
  820. /* refuse out of order delivery */
  821. if(seq != NEXTSEQ(r->rcvseq)){
  822. relsendack(c, r, 0); /* tell him we got it already */
  823. upriv->orders++;
  824. DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
  825. goto out;
  826. }
  827. r->rcvseq = seq;
  828. rv = 0;
  829. out:
  830. relput(r);
  831. return rv;
  832. }
  833. void
  834. relsendack(Conv *c, Reliable *r, int hangup)
  835. {
  836. Udphdr *uh;
  837. Block *bp;
  838. Rudphdr *rh;
  839. int ptcllen;
  840. Fs *f;
  841. bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
  842. if(bp == nil)
  843. return;
  844. bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
  845. f = c->p->f;
  846. uh = (Udphdr *)(bp->rp);
  847. uh->vihl = IP_VER4;
  848. rh = (Rudphdr*)uh;
  849. ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
  850. uh->Unused = 0;
  851. uh->udpproto = IP_UDPPROTO;
  852. uh->frag[0] = 0;
  853. uh->frag[1] = 0;
  854. hnputs(uh->udpplen, ptcllen);
  855. v6tov4(uh->udpdst, r->addr);
  856. hnputs(uh->udpdport, r->port);
  857. hnputs(uh->udpsport, c->lport);
  858. if(ipcmp(c->laddr, IPnoaddr) == 0)
  859. findlocalip(f, c->laddr, c->raddr);
  860. v6tov4(uh->udpsrc, c->laddr);
  861. hnputs(uh->udplen, ptcllen);
  862. if(hangup)
  863. hnputl(rh->relsgen, Hangupgen);
  864. else
  865. hnputl(rh->relsgen, r->sndgen);
  866. hnputl(rh->relseq, 0);
  867. hnputl(rh->relagen, r->rcvgen);
  868. hnputl(rh->relack, r->rcvseq);
  869. if(r->acksent < r->rcvseq)
  870. r->acksent = r->rcvseq;
  871. uh->udpcksum[0] = 0;
  872. uh->udpcksum[1] = 0;
  873. hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
  874. DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
  875. doipoput(c, f, bp, 0, c->ttl, c->tos);
  876. }
  877. /*
  878. * called with ucb locked (and c locked if user initiated close)
  879. */
  880. void
  881. relhangup(Conv *c, Reliable *r)
  882. {
  883. int n;
  884. Block *bp;
  885. char hup[ERRMAX];
  886. n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
  887. qproduce(c->eq, hup, n);
  888. /*
  889. * dump any unacked outgoing messages
  890. */
  891. for(bp = r->unacked; bp != nil; bp = r->unacked){
  892. r->unacked = bp->list;
  893. bp->list = nil;
  894. freeb(bp);
  895. }
  896. r->rcvgen = 0;
  897. r->rcvseq = 0;
  898. r->acksent = 0;
  899. if(generation == Hangupgen)
  900. generation++;
  901. r->sndgen = generation++;
  902. r->sndseq = 0;
  903. r->ackrcvd = 0;
  904. r->xmits = 0;
  905. r->timeout = 0;
  906. wakeup(&r->vous);
  907. }
  908. /*
  909. * called with ucb locked
  910. */
  911. void
  912. relrexmit(Conv *c, Reliable *r)
  913. {
  914. Rudppriv *upriv;
  915. Block *np;
  916. Fs *f;
  917. upriv = c->p->priv;
  918. f = c->p->f;
  919. r->timeout = 0;
  920. if(r->xmits++ > Rudpmaxxmit){
  921. relhangup(c, r);
  922. return;
  923. }
  924. upriv->rxmits++;
  925. np = copyblock(r->unacked, blocklen(r->unacked));
  926. DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
  927. doipoput(c, f, np, 0, c->ttl, c->tos);
  928. }