rudp.c 20 KB


  1. /*
  2. * Reliable User Datagram Protocol, currently only for IPv4.
  3. * This protocol is compatible with UDP's packet format.
  4. * It could be done over UDP if need be.
  5. */
  6. #include "u.h"
  7. #include "../port/lib.h"
  8. #include "mem.h"
  9. #include "dat.h"
  10. #include "fns.h"
  11. #include "../port/error.h"
  12. #include "ip.h"
  13. #define DEBUG 0
  14. #define DPRINT if(DEBUG)print
  15. #define SEQDIFF(a,b) ( (a)>=(b)?\
  16. (a)-(b):\
  17. 0xffffffffUL-((b)-(a)) )
  18. #define INSEQ(a,start,end) ( (start)<=(end)?\
  19. ((a)>(start)&&(a)<=(end)):\
  20. ((a)>(start)||(a)<=(end)) )
  21. #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
  22. #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
  23. enum
  24. {
  25. UDP_PHDRSIZE = 12, /* pseudo header */
  26. // UDP_HDRSIZE = 20, /* pseudo header + udp header */
  27. UDP_RHDRSIZE = 36, /* pseudo header + udp header + rudp header */
  28. UDP_IPHDR = 8, /* ip header */
  29. IP_UDPPROTO = 254,
  30. UDP_USEAD7 = 52, /* size of new ipv6 headers struct */
  31. Rudprxms = 200,
  32. Rudptickms = 50,
  33. Rudpmaxxmit = 10,
  34. Maxunacked = 100,
  35. };
  36. #define Hangupgen 0xffffffff /* used only in hangup messages */
  37. typedef struct Udphdr Udphdr;
  38. struct Udphdr
  39. {
  40. /* ip header */
  41. uchar vihl; /* Version and header length */
  42. uchar tos; /* Type of service */
  43. uchar length[2]; /* packet length */
  44. uchar id[2]; /* Identification */
  45. uchar frag[2]; /* Fragment information */
  46. /* pseudo header starts here */
  47. uchar Unused;
  48. uchar udpproto; /* Protocol */
  49. uchar udpplen[2]; /* Header plus data length */
  50. uchar udpsrc[4]; /* Ip source */
  51. uchar udpdst[4]; /* Ip destination */
  52. /* udp header */
  53. uchar udpsport[2]; /* Source port */
  54. uchar udpdport[2]; /* Destination port */
  55. uchar udplen[2]; /* data length */
  56. uchar udpcksum[2]; /* Checksum */
  57. };
  58. typedef struct Rudphdr Rudphdr;
  59. struct Rudphdr
  60. {
  61. /* ip header */
  62. uchar vihl; /* Version and header length */
  63. uchar tos; /* Type of service */
  64. uchar length[2]; /* packet length */
  65. uchar id[2]; /* Identification */
  66. uchar frag[2]; /* Fragment information */
  67. /* pseudo header starts here */
  68. uchar Unused;
  69. uchar udpproto; /* Protocol */
  70. uchar udpplen[2]; /* Header plus data length */
  71. uchar udpsrc[4]; /* Ip source */
  72. uchar udpdst[4]; /* Ip destination */
  73. /* udp header */
  74. uchar udpsport[2]; /* Source port */
  75. uchar udpdport[2]; /* Destination port */
  76. uchar udplen[2]; /* data length (includes rudp header) */
  77. uchar udpcksum[2]; /* Checksum */
  78. /* rudp header */
  79. uchar relseq[4]; /* id of this packet (or 0) */
  80. uchar relsgen[4]; /* generation/time stamp */
  81. uchar relack[4]; /* packet being acked (or 0) */
  82. uchar relagen[4]; /* generation/time stamp */
  83. };
  84. /*
  85. * one state structure per destination
  86. */
  87. typedef struct Reliable Reliable;
  88. struct Reliable
  89. {
  90. Ref;
  91. Reliable *next;
  92. uchar addr[IPaddrlen]; /* always V6 when put here */
  93. ushort port;
  94. Block *unacked; /* unacked msg list */
  95. Block *unackedtail; /* and its tail */
  96. int timeout; /* time since first unacked msg sent */
  97. int xmits; /* number of times first unacked msg sent */
  98. ulong sndseq; /* next packet to be sent */
  99. ulong sndgen; /* and its generation */
  100. ulong rcvseq; /* last packet received */
  101. ulong rcvgen; /* and its generation */
  102. ulong acksent; /* last ack sent */
  103. ulong ackrcvd; /* last msg for which ack was rcvd */
  104. /* flow control */
  105. QLock lock;
  106. Rendez vous;
  107. int blocked;
  108. };
  109. /* MIB II counters */
  110. typedef struct Rudpstats Rudpstats;
  111. struct Rudpstats
  112. {
  113. ulong rudpInDatagrams;
  114. ulong rudpNoPorts;
  115. ulong rudpInErrors;
  116. ulong rudpOutDatagrams;
  117. };
  118. typedef struct Rudppriv Rudppriv;
  119. struct Rudppriv
  120. {
  121. Ipht ht;
  122. /* MIB counters */
  123. Rudpstats ustats;
  124. /* non-MIB stats */
  125. ulong csumerr; /* checksum errors */
  126. ulong lenerr; /* short packet */
  127. ulong rxmits; /* # of retransmissions */
  128. ulong orders; /* # of out of order pkts */
  129. /* keeping track of the ack kproc */
  130. int ackprocstarted;
  131. QLock apl;
  132. };
  133. static ulong generation = 0;
  134. static Rendez rend;
  135. /*
  136. * protocol specific part of Conv
  137. */
  138. typedef struct Rudpcb Rudpcb;
  139. struct Rudpcb
  140. {
  141. QLock;
  142. uchar headers;
  143. uchar randdrop;
  144. Reliable *r;
  145. };
  146. /*
  147. * local functions
  148. */
  149. void relsendack(Conv*, Reliable*, int);
  150. int reliput(Conv*, Block*, uchar*, ushort);
  151. Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
  152. void relput(Reliable*);
  153. void relforget(Conv *, uchar*, int, int);
  154. void relackproc(void *);
  155. void relackq(Reliable *, Block*);
  156. void relhangup(Conv *, Reliable*);
  157. void relrexmit(Conv *, Reliable*);
  158. void relput(Reliable*);
  159. void rudpkick(void *x);
  160. static void
  161. rudpstartackproc(Proto *rudp)
  162. {
  163. Rudppriv *rpriv;
  164. char kpname[KNAMELEN];
  165. rpriv = rudp->priv;
  166. if(rpriv->ackprocstarted == 0){
  167. qlock(&rpriv->apl);
  168. if(rpriv->ackprocstarted == 0){
  169. sprint(kpname, "#I%drudpack", rudp->f->dev);
  170. kproc(kpname, relackproc, rudp);
  171. rpriv->ackprocstarted = 1;
  172. }
  173. qunlock(&rpriv->apl);
  174. }
  175. }
  176. static char*
  177. rudpconnect(Conv *c, char **argv, int argc)
  178. {
  179. char *e;
  180. Rudppriv *upriv;
  181. upriv = c->p->priv;
  182. rudpstartackproc(c->p);
  183. e = Fsstdconnect(c, argv, argc);
  184. Fsconnected(c, e);
  185. iphtadd(&upriv->ht, c);
  186. return e;
  187. }
  188. static int
  189. rudpstate(Conv *c, char *state, int n)
  190. {
  191. Rudpcb *ucb;
  192. Reliable *r;
  193. int m;
  194. m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
  195. ucb = (Rudpcb*)c->ptcl;
  196. qlock(ucb);
  197. for(r = ucb->r; r; r = r->next)
  198. m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
  199. m += snprint(state+m, n-m, "\n");
  200. qunlock(ucb);
  201. return m;
  202. }
  203. static char*
  204. rudpannounce(Conv *c, char** argv, int argc)
  205. {
  206. char *e;
  207. Rudppriv *upriv;
  208. upriv = c->p->priv;
  209. rudpstartackproc(c->p);
  210. e = Fsstdannounce(c, argv, argc);
  211. if(e != nil)
  212. return e;
  213. Fsconnected(c, nil);
  214. iphtadd(&upriv->ht, c);
  215. return nil;
  216. }
  217. static void
  218. rudpcreate(Conv *c)
  219. {
  220. c->rq = qopen(64*1024, Qmsg, 0, 0);
  221. c->wq = qopen(64*1024, Qkick, rudpkick, c);
  222. }
  223. static void
  224. rudpclose(Conv *c)
  225. {
  226. Rudpcb *ucb;
  227. Reliable *r, *nr;
  228. Rudppriv *upriv;
  229. upriv = c->p->priv;
  230. iphtrem(&upriv->ht, c);
  231. /* force out any delayed acks */
  232. ucb = (Rudpcb*)c->ptcl;
  233. qlock(ucb);
  234. for(r = ucb->r; r; r = r->next){
  235. if(r->acksent != r->rcvseq)
  236. relsendack(c, r, 0);
  237. }
  238. qunlock(ucb);
  239. qclose(c->rq);
  240. qclose(c->wq);
  241. qclose(c->eq);
  242. ipmove(c->laddr, IPnoaddr);
  243. ipmove(c->raddr, IPnoaddr);
  244. c->lport = 0;
  245. c->rport = 0;
  246. ucb->headers = 0;
  247. ucb->randdrop = 0;
  248. qlock(ucb);
  249. for(r = ucb->r; r; r = nr){
  250. if(r->acksent != r->rcvseq)
  251. relsendack(c, r, 0);
  252. nr = r->next;
  253. relhangup(c, r);
  254. relput(r);
  255. }
  256. ucb->r = 0;
  257. qunlock(ucb);
  258. }
  259. /*
  260. * randomly don't send packets
  261. */
  262. static void
  263. doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
  264. {
  265. Rudpcb *ucb;
  266. ucb = (Rudpcb*)c->ptcl;
  267. if(ucb->randdrop && nrand(100) < ucb->randdrop)
  268. freeblist(bp);
  269. else
  270. ipoput4(f, bp, x, ttl, tos, nil);
  271. }
  272. int
  273. flow(void *v)
  274. {
  275. Reliable *r = v;
  276. return UNACKED(r) <= Maxunacked;
  277. }
  278. void
  279. rudpkick(void *x)
  280. {
  281. Conv *c = x;
  282. Udphdr *uh;
  283. ushort rport;
  284. uchar laddr[IPaddrlen], raddr[IPaddrlen];
  285. Block *bp;
  286. Rudpcb *ucb;
  287. Rudphdr *rh;
  288. Reliable *r;
  289. int dlen, ptcllen;
  290. Rudppriv *upriv;
  291. Fs *f;
  292. upriv = c->p->priv;
  293. f = c->p->f;
  294. netlog(c->p->f, Logrudp, "rudp: kick\n");
  295. bp = qget(c->wq);
  296. if(bp == nil)
  297. return;
  298. ucb = (Rudpcb*)c->ptcl;
  299. switch(ucb->headers) {
  300. case 7:
  301. /* get user specified addresses */
  302. bp = pullupblock(bp, UDP_USEAD7);
  303. if(bp == nil)
  304. return;
  305. ipmove(raddr, bp->rp);
  306. bp->rp += IPaddrlen;
  307. ipmove(laddr, bp->rp);
  308. bp->rp += IPaddrlen;
  309. /* pick interface closest to dest */
  310. if(ipforme(f, laddr) != Runi)
  311. findlocalip(f, laddr, raddr);
  312. bp->rp += IPaddrlen; /* Ignore ifc address */
  313. rport = nhgets(bp->rp);
  314. bp->rp += 2+2; /* Ignore local port */
  315. break;
  316. default:
  317. ipmove(raddr, c->raddr);
  318. ipmove(laddr, c->laddr);
  319. rport = c->rport;
  320. break;
  321. }
  322. dlen = blocklen(bp);
  323. /* Make space to fit rudp & ip header */
  324. bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
  325. if(bp == nil)
  326. return;
  327. uh = (Udphdr *)(bp->rp);
  328. uh->vihl = IP_VER4;
  329. rh = (Rudphdr*)uh;
  330. ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
  331. uh->Unused = 0;
  332. uh->udpproto = IP_UDPPROTO;
  333. uh->frag[0] = 0;
  334. uh->frag[1] = 0;
  335. hnputs(uh->udpplen, ptcllen);
  336. switch(ucb->headers){
  337. case 7:
  338. v6tov4(uh->udpdst, raddr);
  339. hnputs(uh->udpdport, rport);
  340. v6tov4(uh->udpsrc, laddr);
  341. break;
  342. default:
  343. v6tov4(uh->udpdst, c->raddr);
  344. hnputs(uh->udpdport, c->rport);
  345. if(ipcmp(c->laddr, IPnoaddr) == 0)
  346. findlocalip(f, c->laddr, c->raddr);
  347. v6tov4(uh->udpsrc, c->laddr);
  348. break;
  349. }
  350. hnputs(uh->udpsport, c->lport);
  351. hnputs(uh->udplen, ptcllen);
  352. uh->udpcksum[0] = 0;
  353. uh->udpcksum[1] = 0;
  354. qlock(ucb);
  355. r = relstate(ucb, raddr, rport, "kick");
  356. r->sndseq = NEXTSEQ(r->sndseq);
  357. hnputl(rh->relseq, r->sndseq);
  358. hnputl(rh->relsgen, r->sndgen);
  359. hnputl(rh->relack, r->rcvseq); /* ACK last rcvd packet */
  360. hnputl(rh->relagen, r->rcvgen);
  361. if(r->rcvseq != r->acksent)
  362. r->acksent = r->rcvseq;
  363. hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
  364. relackq(r, bp);
  365. qunlock(ucb);
  366. upriv->ustats.rudpOutDatagrams++;
  367. DPRINT("sent: %lud/%lud, %lud/%lud\n",
  368. r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
  369. doipoput(c, f, bp, 0, c->ttl, c->tos);
  370. if(waserror()) {
  371. relput(r);
  372. qunlock(&r->lock);
  373. nexterror();
  374. }
  375. /* flow control of sorts */
  376. qlock(&r->lock);
  377. if(UNACKED(r) > Maxunacked){
  378. r->blocked = 1;
  379. sleep(&r->vous, flow, r);
  380. r->blocked = 0;
  381. }
  382. qunlock(&r->lock);
  383. relput(r);
  384. poperror();
  385. }
  386. void
  387. rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
  388. {
  389. int len, olen, ottl;
  390. Udphdr *uh;
  391. Conv *c;
  392. Rudpcb *ucb;
  393. uchar raddr[IPaddrlen], laddr[IPaddrlen];
  394. ushort rport, lport;
  395. Rudppriv *upriv;
  396. Fs *f;
  397. uchar *p;
  398. upriv = rudp->priv;
  399. f = rudp->f;
  400. upriv->ustats.rudpInDatagrams++;
  401. uh = (Udphdr*)(bp->rp);
  402. /* Put back pseudo header for checksum
  403. * (remember old values for icmpnoconv())
  404. */
  405. ottl = uh->Unused;
  406. uh->Unused = 0;
  407. len = nhgets(uh->udplen);
  408. olen = nhgets(uh->udpplen);
  409. hnputs(uh->udpplen, len);
  410. v4tov6(raddr, uh->udpsrc);
  411. v4tov6(laddr, uh->udpdst);
  412. lport = nhgets(uh->udpdport);
  413. rport = nhgets(uh->udpsport);
  414. if(nhgets(uh->udpcksum)) {
  415. if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
  416. upriv->ustats.rudpInErrors++;
  417. upriv->csumerr++;
  418. netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
  419. DPRINT("rudp: checksum error %I\n", raddr);
  420. freeblist(bp);
  421. return;
  422. }
  423. }
  424. qlock(rudp);
  425. c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
  426. if(c == nil){
  427. /* no conversation found */
  428. upriv->ustats.rudpNoPorts++;
  429. qunlock(rudp);
  430. netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
  431. laddr, lport);
  432. uh->Unused = ottl;
  433. hnputs(uh->udpplen, olen);
  434. icmpnoconv(f, bp);
  435. freeblist(bp);
  436. return;
  437. }
  438. ucb = (Rudpcb*)c->ptcl;
  439. qlock(ucb);
  440. qunlock(rudp);
  441. if(reliput(c, bp, raddr, rport) < 0){
  442. qunlock(ucb);
  443. freeb(bp);
  444. return;
  445. }
  446. /*
  447. * Trim the packet down to data size
  448. */
  449. len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
  450. bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
  451. if(bp == nil) {
  452. netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n",
  453. raddr, rport, laddr, lport);
  454. DPRINT("rudp: len err %I.%d -> %I.%d\n",
  455. raddr, rport, laddr, lport);
  456. upriv->lenerr++;
  457. return;
  458. }
  459. netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n",
  460. raddr, rport, laddr, lport, len);
  461. switch(ucb->headers){
  462. case 7:
  463. /* pass the src address */
  464. bp = padblock(bp, UDP_USEAD7);
  465. p = bp->rp;
  466. ipmove(p, raddr); p += IPaddrlen;
  467. ipmove(p, laddr); p += IPaddrlen;
  468. ipmove(p, ifc->lifc->local); p += IPaddrlen;
  469. hnputs(p, rport); p += 2;
  470. hnputs(p, lport);
  471. break;
  472. default:
  473. /* connection oriented rudp */
  474. if(ipcmp(c->raddr, IPnoaddr) == 0){
  475. /* save the src address in the conversation */
  476. ipmove(c->raddr, raddr);
  477. c->rport = rport;
  478. /* reply with the same ip address (if not broadcast) */
  479. if(ipforme(f, laddr) == Runi)
  480. ipmove(c->laddr, laddr);
  481. else
  482. v4tov6(c->laddr, ifc->lifc->local);
  483. }
  484. break;
  485. }
  486. if(bp->next)
  487. bp = concatblock(bp);
  488. if(qfull(c->rq)) {
  489. netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
  490. laddr, lport);
  491. freeblist(bp);
  492. }
  493. else
  494. qpass(c->rq, bp);
  495. qunlock(ucb);
  496. }
  497. static char *rudpunknown = "unknown rudp ctl request";
  498. char*
  499. rudpctl(Conv *c, char **f, int n)
  500. {
  501. Rudpcb *ucb;
  502. uchar ip[IPaddrlen];
  503. int x;
  504. ucb = (Rudpcb*)c->ptcl;
  505. if(n < 1)
  506. return rudpunknown;
  507. if(strcmp(f[0], "headers") == 0){
  508. ucb->headers = 7; /* new headers format */
  509. return nil;
  510. } else if(strcmp(f[0], "hangup") == 0){
  511. if(n < 3)
  512. return "bad syntax";
  513. if (parseip(ip, f[1]) == -1)
  514. return Ebadip;
  515. x = atoi(f[2]);
  516. qlock(ucb);
  517. relforget(c, ip, x, 1);
  518. qunlock(ucb);
  519. return nil;
  520. } else if(strcmp(f[0], "randdrop") == 0){
  521. x = 10; /* default is 10% */
  522. if(n > 1)
  523. x = atoi(f[1]);
  524. if(x > 100 || x < 0)
  525. return "illegal rudp drop rate";
  526. ucb->randdrop = x;
  527. return nil;
  528. }
  529. return rudpunknown;
  530. }
  531. void
  532. rudpadvise(Proto *rudp, Block *bp, char *msg)
  533. {
  534. Udphdr *h;
  535. uchar source[IPaddrlen], dest[IPaddrlen];
  536. ushort psource, pdest;
  537. Conv *s, **p;
  538. h = (Udphdr*)(bp->rp);
  539. v4tov6(dest, h->udpdst);
  540. v4tov6(source, h->udpsrc);
  541. psource = nhgets(h->udpsport);
  542. pdest = nhgets(h->udpdport);
  543. /* Look for a connection */
  544. for(p = rudp->conv; *p; p++) {
  545. s = *p;
  546. if(s->rport == pdest)
  547. if(s->lport == psource)
  548. if(ipcmp(s->raddr, dest) == 0)
  549. if(ipcmp(s->laddr, source) == 0){
  550. qhangup(s->rq, msg);
  551. qhangup(s->wq, msg);
  552. break;
  553. }
  554. }
  555. freeblist(bp);
  556. }
  557. int
  558. rudpstats(Proto *rudp, char *buf, int len)
  559. {
  560. Rudppriv *upriv;
  561. upriv = rudp->priv;
  562. return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
  563. upriv->ustats.rudpInDatagrams,
  564. upriv->ustats.rudpNoPorts,
  565. upriv->ustats.rudpInErrors,
  566. upriv->ustats.rudpOutDatagrams,
  567. upriv->rxmits,
  568. upriv->orders);
  569. }
  570. void
  571. rudpinit(Fs *fs)
  572. {
  573. Proto *rudp;
  574. rudp = smalloc(sizeof(Proto));
  575. rudp->priv = smalloc(sizeof(Rudppriv));
  576. rudp->name = "rudp";
  577. rudp->connect = rudpconnect;
  578. rudp->announce = rudpannounce;
  579. rudp->ctl = rudpctl;
  580. rudp->state = rudpstate;
  581. rudp->create = rudpcreate;
  582. rudp->close = rudpclose;
  583. rudp->rcv = rudpiput;
  584. rudp->advise = rudpadvise;
  585. rudp->stats = rudpstats;
  586. rudp->ipproto = IP_UDPPROTO;
  587. rudp->nc = 16;
  588. rudp->ptclsize = sizeof(Rudpcb);
  589. Fsproto(fs, rudp);
  590. }
  591. /*********************************************/
  592. /* Here starts the reliable helper functions */
  593. /*********************************************/
  594. /*
  595. * Enqueue a copy of an unacked block for possible retransmissions
  596. */
  597. void
  598. relackq(Reliable *r, Block *bp)
  599. {
  600. Block *np;
  601. np = copyblock(bp, blocklen(bp));
  602. if(r->unacked)
  603. r->unackedtail->list = np;
  604. else {
  605. /* restart timer */
  606. r->timeout = 0;
  607. r->xmits = 1;
  608. r->unacked = np;
  609. }
  610. r->unackedtail = np;
  611. np->list = nil;
  612. }
  613. /*
  614. * retransmit unacked blocks
  615. */
  616. void
  617. relackproc(void *a)
  618. {
  619. Rudpcb *ucb;
  620. Proto *rudp;
  621. Reliable *r;
  622. Conv **s, *c;
  623. rudp = (Proto *)a;
  624. loop:
  625. tsleep(&up->sleep, return0, 0, Rudptickms);
  626. for(s = rudp->conv; *s; s++) {
  627. c = *s;
  628. ucb = (Rudpcb*)c->ptcl;
  629. qlock(ucb);
  630. for(r = ucb->r; r; r = r->next) {
  631. if(r->unacked != nil){
  632. r->timeout += Rudptickms;
  633. if(r->timeout > Rudprxms*r->xmits)
  634. relrexmit(c, r);
  635. }
  636. if(r->acksent != r->rcvseq)
  637. relsendack(c, r, 0);
  638. }
  639. qunlock(ucb);
  640. }
  641. goto loop;
  642. }
  643. /*
  644. * get the state record for a conversation
  645. */
  646. Reliable*
  647. relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
  648. {
  649. Reliable *r, **l;
  650. l = &ucb->r;
  651. for(r = *l; r; r = *l){
  652. if(memcmp(addr, r->addr, IPaddrlen) == 0 &&
  653. port == r->port)
  654. break;
  655. l = &r->next;
  656. }
  657. /* no state for this addr/port, create some */
  658. if(r == nil){
  659. while(generation == 0)
  660. generation = rand();
  661. DPRINT("from %s new state %lud for %I!%ud\n",
  662. from, generation, addr, port);
  663. r = smalloc(sizeof(Reliable));
  664. memmove(r->addr, addr, IPaddrlen);
  665. r->port = port;
  666. r->unacked = 0;
  667. if(generation == Hangupgen)
  668. generation++;
  669. r->sndgen = generation++;
  670. r->sndseq = 0;
  671. r->ackrcvd = 0;
  672. r->rcvgen = 0;
  673. r->rcvseq = 0;
  674. r->acksent = 0;
  675. r->xmits = 0;
  676. r->timeout = 0;
  677. r->ref = 0;
  678. incref(r); /* one reference for being in the list */
  679. *l = r;
  680. }
  681. incref(r);
  682. return r;
  683. }
  684. void
  685. relput(Reliable *r)
  686. {
  687. if(decref(r) == 0)
  688. free(r);
  689. }
  690. /*
  691. * forget a Reliable state
  692. */
  693. void
  694. relforget(Conv *c, uchar *ip, int port, int originator)
  695. {
  696. Rudpcb *ucb;
  697. Reliable *r, **l;
  698. ucb = (Rudpcb*)c->ptcl;
  699. l = &ucb->r;
  700. for(r = *l; r; r = *l){
  701. if(ipcmp(ip, r->addr) == 0 && port == r->port){
  702. *l = r->next;
  703. if(originator)
  704. relsendack(c, r, 1);
  705. relhangup(c, r);
  706. relput(r); /* remove from the list */
  707. break;
  708. }
  709. l = &r->next;
  710. }
  711. }
  712. /*
  713. * process a rcvd reliable packet. return -1 if not to be passed to user process,
  714. * 0 therwise.
  715. *
  716. * called with ucb locked.
  717. */
  718. int
  719. reliput(Conv *c, Block *bp, uchar *addr, ushort port)
  720. {
  721. Block *nbp;
  722. Rudpcb *ucb;
  723. Rudppriv *upriv;
  724. Udphdr *uh;
  725. Reliable *r;
  726. Rudphdr *rh;
  727. ulong seq, ack, sgen, agen, ackreal;
  728. int rv = -1;
  729. /* get fields */
  730. uh = (Udphdr*)(bp->rp);
  731. rh = (Rudphdr*)uh;
  732. seq = nhgetl(rh->relseq);
  733. sgen = nhgetl(rh->relsgen);
  734. ack = nhgetl(rh->relack);
  735. agen = nhgetl(rh->relagen);
  736. upriv = c->p->priv;
  737. ucb = (Rudpcb*)c->ptcl;
  738. r = relstate(ucb, addr, port, "input");
  739. DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n",
  740. seq, sgen, ack, agen, r->sndgen);
  741. /* if acking an incorrect generation, ignore */
  742. if(ack && agen != r->sndgen)
  743. goto out;
  744. /* Look for a hangup */
  745. if(sgen == Hangupgen) {
  746. if(agen == r->sndgen)
  747. relforget(c, addr, port, 0);
  748. goto out;
  749. }
  750. /* make sure we're not talking to a new remote side */
  751. if(r->rcvgen != sgen){
  752. if(seq != 0 && seq != 1)
  753. goto out;
  754. /* new connection */
  755. if(r->rcvgen != 0){
  756. DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
  757. relhangup(c, r);
  758. }
  759. r->rcvgen = sgen;
  760. }
  761. /* dequeue acked packets */
  762. if(ack && agen == r->sndgen){
  763. ackreal = 0;
  764. while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
  765. nbp = r->unacked;
  766. r->unacked = nbp->list;
  767. DPRINT("%lud/%lud acked, r->sndgen = %lud\n",
  768. ack, agen, r->sndgen);
  769. freeb(nbp);
  770. r->ackrcvd = NEXTSEQ(r->ackrcvd);
  771. ackreal = 1;
  772. }
  773. /* flow control */
  774. if(UNACKED(r) < Maxunacked/8 && r->blocked)
  775. wakeup(&r->vous);
  776. /*
  777. * retransmit next packet if the acked packet
  778. * was transmitted more than once
  779. */
  780. if(ackreal && r->unacked != nil){
  781. r->timeout = 0;
  782. if(r->xmits > 1){
  783. r->xmits = 1;
  784. relrexmit(c, r);
  785. }
  786. }
  787. }
  788. /* no message or input queue full */
  789. if(seq == 0 || qfull(c->rq))
  790. goto out;
  791. /* refuse out of order delivery */
  792. if(seq != NEXTSEQ(r->rcvseq)){
  793. relsendack(c, r, 0); /* tell him we got it already */
  794. upriv->orders++;
  795. DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
  796. goto out;
  797. }
  798. r->rcvseq = seq;
  799. rv = 0;
  800. out:
  801. relput(r);
  802. return rv;
  803. }
  804. void
  805. relsendack(Conv *c, Reliable *r, int hangup)
  806. {
  807. Udphdr *uh;
  808. Block *bp;
  809. Rudphdr *rh;
  810. int ptcllen;
  811. Fs *f;
  812. bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
  813. if(bp == nil)
  814. return;
  815. bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
  816. f = c->p->f;
  817. uh = (Udphdr *)(bp->rp);
  818. uh->vihl = IP_VER4;
  819. rh = (Rudphdr*)uh;
  820. ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
  821. uh->Unused = 0;
  822. uh->udpproto = IP_UDPPROTO;
  823. uh->frag[0] = 0;
  824. uh->frag[1] = 0;
  825. hnputs(uh->udpplen, ptcllen);
  826. v6tov4(uh->udpdst, r->addr);
  827. hnputs(uh->udpdport, r->port);
  828. hnputs(uh->udpsport, c->lport);
  829. if(ipcmp(c->laddr, IPnoaddr) == 0)
  830. findlocalip(f, c->laddr, c->raddr);
  831. v6tov4(uh->udpsrc, c->laddr);
  832. hnputs(uh->udplen, ptcllen);
  833. if(hangup)
  834. hnputl(rh->relsgen, Hangupgen);
  835. else
  836. hnputl(rh->relsgen, r->sndgen);
  837. hnputl(rh->relseq, 0);
  838. hnputl(rh->relagen, r->rcvgen);
  839. hnputl(rh->relack, r->rcvseq);
  840. if(r->acksent < r->rcvseq)
  841. r->acksent = r->rcvseq;
  842. uh->udpcksum[0] = 0;
  843. uh->udpcksum[1] = 0;
  844. hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
  845. DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
  846. doipoput(c, f, bp, 0, c->ttl, c->tos);
  847. }
  848. /*
  849. * called with ucb locked (and c locked if user initiated close)
  850. */
  851. void
  852. relhangup(Conv *c, Reliable *r)
  853. {
  854. int n;
  855. Block *bp;
  856. char hup[ERRMAX];
  857. n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
  858. qproduce(c->eq, hup, n);
  859. /*
  860. * dump any unacked outgoing messages
  861. */
  862. for(bp = r->unacked; bp != nil; bp = r->unacked){
  863. r->unacked = bp->list;
  864. bp->list = nil;
  865. freeb(bp);
  866. }
  867. r->rcvgen = 0;
  868. r->rcvseq = 0;
  869. r->acksent = 0;
  870. if(generation == Hangupgen)
  871. generation++;
  872. r->sndgen = generation++;
  873. r->sndseq = 0;
  874. r->ackrcvd = 0;
  875. r->xmits = 0;
  876. r->timeout = 0;
  877. wakeup(&r->vous);
  878. }
  879. /*
  880. * called with ucb locked
  881. */
  882. void
  883. relrexmit(Conv *c, Reliable *r)
  884. {
  885. Rudppriv *upriv;
  886. Block *np;
  887. Fs *f;
  888. upriv = c->p->priv;
  889. f = c->p->f;
  890. r->timeout = 0;
  891. if(r->xmits++ > Rudpmaxxmit){
  892. relhangup(c, r);
  893. return;
  894. }
  895. upriv->rxmits++;
  896. np = copyblock(r->unacked, blocklen(r->unacked));
  897. DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
  898. doipoput(c, f, np, 0, c->ttl, c->tos);
  899. }