rudp.c 21 KB


  1. /*
  2. * This file is part of the UCB release of Plan 9. It is subject to the license
  3. * terms in the LICENSE file found in the top-level directory of this
  4. * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
  5. * part of the UCB release of Plan 9, including this file, may be copied,
  6. * modified, propagated, or distributed except according to the terms contained
  7. * in the LICENSE file.
  8. */
  9. /*
  10. * Reliable User Datagram Protocol, currently only for IPv4.
  11. * This protocol is compatible with UDP's packet format.
  12. * It could be done over UDP if need be.
  13. */
  14. #include "u.h"
  15. #include "../port/lib.h"
  16. #include "mem.h"
  17. #include "dat.h"
  18. #include "fns.h"
  19. #include "../port/error.h"
  20. #include "ip.h"
  21. #define DEBUG 0
  22. #define DPRINT if(DEBUG)print
  23. #define SEQDIFF(a,b) ( (a)>=(b)?\
  24. (a)-(b):\
  25. 0xffffffffUL-((b)-(a)) )
  26. #define INSEQ(a,start,end) ( (start)<=(end)?\
  27. ((a)>(start)&&(a)<=(end)):\
  28. ((a)>(start)||(a)<=(end)) )
  29. #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
  30. #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
  31. enum
  32. {
  33. UDP_PHDRSIZE = 12, /* pseudo header */
  34. // UDP_HDRSIZE = 20, /* pseudo header + udp header */
  35. UDP_RHDRSIZE = 36, /* pseudo header + udp header + rudp header */
  36. UDP_IPHDR = 8, /* ip header */
  37. IP_UDPPROTO = 254,
  38. UDP_USEAD7 = 52, /* size of new ipv6 headers struct */
  39. Rudprxms = 200,
  40. Rudptickms = 50,
  41. Rudpmaxxmit = 10,
  42. Maxunacked = 100,
  43. };
  44. #define Hangupgen 0xffffffff /* used only in hangup messages */
  45. typedef struct Udphdr Udphdr;
  46. struct Udphdr
  47. {
  48. /* ip header */
  49. uint8_t vihl; /* Version and header length */
  50. uint8_t tos; /* Type of service */
  51. uint8_t length[2]; /* packet length */
  52. uint8_t id[2]; /* Identification */
  53. uint8_t frag[2]; /* Fragment information */
  54. /* pseudo header starts here */
  55. uint8_t Unused;
  56. uint8_t udpproto; /* Protocol */
  57. uint8_t udpplen[2]; /* Header plus data length */
  58. uint8_t udpsrc[4]; /* Ip source */
  59. uint8_t udpdst[4]; /* Ip destination */
  60. /* udp header */
  61. uint8_t udpsport[2]; /* Source port */
  62. uint8_t udpdport[2]; /* Destination port */
  63. uint8_t udplen[2]; /* data length */
  64. uint8_t udpcksum[2]; /* Checksum */
  65. };
  66. typedef struct Rudphdr Rudphdr;
  67. struct Rudphdr
  68. {
  69. /* ip header */
  70. uint8_t vihl; /* Version and header length */
  71. uint8_t tos; /* Type of service */
  72. uint8_t length[2]; /* packet length */
  73. uint8_t id[2]; /* Identification */
  74. uint8_t frag[2]; /* Fragment information */
  75. /* pseudo header starts here */
  76. uint8_t Unused;
  77. uint8_t udpproto; /* Protocol */
  78. uint8_t udpplen[2]; /* Header plus data length */
  79. uint8_t udpsrc[4]; /* Ip source */
  80. uint8_t udpdst[4]; /* Ip destination */
  81. /* udp header */
  82. uint8_t udpsport[2]; /* Source port */
  83. uint8_t udpdport[2]; /* Destination port */
  84. uint8_t udplen[2]; /* data length (includes rudp header) */
  85. uint8_t udpcksum[2]; /* Checksum */
  86. /* rudp header */
  87. uint8_t relseq[4]; /* id of this packet (or 0) */
  88. uint8_t relsgen[4]; /* generation/time stamp */
  89. uint8_t relack[4]; /* packet being acked (or 0) */
  90. uint8_t relagen[4]; /* generation/time stamp */
  91. };
  92. /*
  93. * one state structure per destination
  94. */
  95. typedef struct Reliable Reliable;
  96. struct Reliable
  97. {
  98. Ref;
  99. Reliable *next;
  100. uint8_t addr[IPaddrlen]; /* always V6 when put here */
  101. uint16_t port;
  102. Block *unacked; /* unacked msg list */
  103. Block *unackedtail; /* and its tail */
  104. int timeout; /* time since first unacked msg sent */
  105. int xmits; /* number of times first unacked msg sent */
  106. uint32_t sndseq; /* next packet to be sent */
  107. uint32_t sndgen; /* and its generation */
  108. uint32_t rcvseq; /* last packet received */
  109. uint32_t rcvgen; /* and its generation */
  110. uint32_t acksent; /* last ack sent */
  111. uint32_t ackrcvd; /* last msg for which ack was rcvd */
  112. /* flow control */
  113. QLock lock;
  114. Rendez vous;
  115. int blocked;
  116. };
  117. /* MIB II counters */
  118. typedef struct Rudpstats Rudpstats;
  119. struct Rudpstats
  120. {
  121. uint32_t rudpInDatagrams;
  122. uint32_t rudpNoPorts;
  123. uint32_t rudpInErrors;
  124. uint32_t rudpOutDatagrams;
  125. };
  126. typedef struct Rudppriv Rudppriv;
  127. struct Rudppriv
  128. {
  129. Ipht ht;
  130. /* MIB counters */
  131. Rudpstats ustats;
  132. /* non-MIB stats */
  133. uint32_t csumerr; /* checksum errors */
  134. uint32_t lenerr; /* short packet */
  135. uint32_t rxmits; /* # of retransmissions */
  136. uint32_t orders; /* # of out of order pkts */
  137. /* keeping track of the ack kproc */
  138. int ackprocstarted;
  139. QLock apl;
  140. };
  141. static uint32_t generation = 0;
  142. //static Rendez rend;
  143. /*
  144. * protocol specific part of Conv
  145. */
  146. typedef struct Rudpcb Rudpcb;
  147. struct Rudpcb
  148. {
  149. QLock;
  150. uint8_t headers;
  151. uint8_t randdrop;
  152. Reliable *r;
  153. };
  154. /*
  155. * local functions
  156. */
  157. void relsendack(Conv*, Reliable*, int);
  158. int reliput(Conv*, Block*, uint8_t*, uint16_t);
  159. Reliable *relstate(Rudpcb*, uint8_t*, uint16_t, char*);
  160. void relput(Reliable*);
  161. void relforget(Conv *, uint8_t*, int, int);
  162. void relackproc(void *);
  163. void relackq(Reliable *, Block*);
  164. void relhangup(Conv *, Reliable*);
  165. void relrexmit(Conv *, Reliable*);
  166. void relput(Reliable*);
  167. void rudpkick(void *x);
  168. static void
  169. rudpstartackproc(Proto *rudp)
  170. {
  171. Rudppriv *rpriv;
  172. char kpname[KNAMELEN];
  173. rpriv = rudp->priv;
  174. if(rpriv->ackprocstarted == 0){
  175. qlock(&rpriv->apl);
  176. if(rpriv->ackprocstarted == 0){
  177. snprint(kpname, sizeof kpname, "#I%drudpack",
  178. rudp->f->dev);
  179. kproc(kpname, relackproc, rudp);
  180. rpriv->ackprocstarted = 1;
  181. }
  182. qunlock(&rpriv->apl);
  183. }
  184. }
  185. static char*
  186. rudpconnect(Conv *c, char **argv, int argc)
  187. {
  188. char *e;
  189. Rudppriv *upriv;
  190. upriv = c->p->priv;
  191. rudpstartackproc(c->p);
  192. e = Fsstdconnect(c, argv, argc);
  193. Fsconnected(c, e);
  194. iphtadd(&upriv->ht, c);
  195. return e;
  196. }
  197. static int
  198. rudpstate(Conv *c, char *state, int n)
  199. {
  200. Rudpcb *ucb;
  201. Reliable *r;
  202. int m;
  203. m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
  204. ucb = (Rudpcb*)c->ptcl;
  205. qlock(ucb);
  206. for(r = ucb->r; r; r = r->next)
  207. m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
  208. m += snprint(state+m, n-m, "\n");
  209. qunlock(ucb);
  210. return m;
  211. }
  212. static char*
  213. rudpannounce(Conv *c, char** argv, int argc)
  214. {
  215. char *e;
  216. Rudppriv *upriv;
  217. upriv = c->p->priv;
  218. rudpstartackproc(c->p);
  219. e = Fsstdannounce(c, argv, argc);
  220. if(e != nil)
  221. return e;
  222. Fsconnected(c, nil);
  223. iphtadd(&upriv->ht, c);
  224. return nil;
  225. }
  226. static void
  227. rudpcreate(Conv *c)
  228. {
  229. c->rq = qopen(64*1024, Qmsg, 0, 0);
  230. c->wq = qopen(64*1024, Qkick, rudpkick, c);
  231. }
  232. static void
  233. rudpclose(Conv *c)
  234. {
  235. Rudpcb *ucb;
  236. Reliable *r, *nr;
  237. Rudppriv *upriv;
  238. upriv = c->p->priv;
  239. iphtrem(&upriv->ht, c);
  240. /* force out any delayed acks */
  241. ucb = (Rudpcb*)c->ptcl;
  242. qlock(ucb);
  243. for(r = ucb->r; r; r = r->next){
  244. if(r->acksent != r->rcvseq)
  245. relsendack(c, r, 0);
  246. }
  247. qunlock(ucb);
  248. qclose(c->rq);
  249. qclose(c->wq);
  250. qclose(c->eq);
  251. ipmove(c->laddr, IPnoaddr);
  252. ipmove(c->raddr, IPnoaddr);
  253. c->lport = 0;
  254. c->rport = 0;
  255. ucb->headers = 0;
  256. ucb->randdrop = 0;
  257. qlock(ucb);
  258. for(r = ucb->r; r; r = nr){
  259. if(r->acksent != r->rcvseq)
  260. relsendack(c, r, 0);
  261. nr = r->next;
  262. relhangup(c, r);
  263. relput(r);
  264. }
  265. ucb->r = 0;
  266. qunlock(ucb);
  267. }
  268. /*
  269. * randomly don't send packets
  270. */
  271. static void
  272. doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
  273. {
  274. Rudpcb *ucb;
  275. ucb = (Rudpcb*)c->ptcl;
  276. if(ucb->randdrop && nrand(100) < ucb->randdrop)
  277. freeblist(bp);
  278. else
  279. ipoput4(f, bp, x, ttl, tos, nil);
  280. }
  281. int
  282. flow(void *v)
  283. {
  284. Reliable *r = v;
  285. return UNACKED(r) <= Maxunacked;
  286. }
  287. void
  288. rudpkick(void *x)
  289. {
  290. Proc *up = externup();
  291. Conv *c = x;
  292. Udphdr *uh;
  293. uint16_t rport;
  294. uint8_t laddr[IPaddrlen], raddr[IPaddrlen];
  295. Block *bp;
  296. Rudpcb *ucb;
  297. Rudphdr *rh;
  298. Reliable *r;
  299. int dlen, ptcllen;
  300. Rudppriv *upriv;
  301. Fs *f;
  302. upriv = c->p->priv;
  303. f = c->p->f;
  304. netlog(c->p->f, Logrudp, "rudp: kick\n");
  305. bp = qget(c->wq);
  306. if(bp == nil)
  307. return;
  308. ucb = (Rudpcb*)c->ptcl;
  309. switch(ucb->headers) {
  310. case 7:
  311. /* get user specified addresses */
  312. bp = pullupblock(bp, UDP_USEAD7);
  313. if(bp == nil)
  314. return;
  315. ipmove(raddr, bp->rp);
  316. bp->rp += IPaddrlen;
  317. ipmove(laddr, bp->rp);
  318. bp->rp += IPaddrlen;
  319. /* pick interface closest to dest */
  320. if(ipforme(f, laddr) != Runi)
  321. findlocalip(f, laddr, raddr);
  322. bp->rp += IPaddrlen; /* Ignore ifc address */
  323. rport = nhgets(bp->rp);
  324. bp->rp += 2+2; /* Ignore local port */
  325. break;
  326. default:
  327. ipmove(raddr, c->raddr);
  328. ipmove(laddr, c->laddr);
  329. rport = c->rport;
  330. break;
  331. }
  332. dlen = blocklen(bp);
  333. /* Make space to fit rudp & ip header */
  334. bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
  335. if(bp == nil)
  336. return;
  337. uh = (Udphdr *)(bp->rp);
  338. uh->vihl = IP_VER4;
  339. rh = (Rudphdr*)uh;
  340. ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
  341. uh->Unused = 0;
  342. uh->udpproto = IP_UDPPROTO;
  343. uh->frag[0] = 0;
  344. uh->frag[1] = 0;
  345. hnputs(uh->udpplen, ptcllen);
  346. switch(ucb->headers){
  347. case 7:
  348. v6tov4(uh->udpdst, raddr);
  349. hnputs(uh->udpdport, rport);
  350. v6tov4(uh->udpsrc, laddr);
  351. break;
  352. default:
  353. v6tov4(uh->udpdst, c->raddr);
  354. hnputs(uh->udpdport, c->rport);
  355. if(ipcmp(c->laddr, IPnoaddr) == 0)
  356. findlocalip(f, c->laddr, c->raddr);
  357. v6tov4(uh->udpsrc, c->laddr);
  358. break;
  359. }
  360. hnputs(uh->udpsport, c->lport);
  361. hnputs(uh->udplen, ptcllen);
  362. uh->udpcksum[0] = 0;
  363. uh->udpcksum[1] = 0;
  364. qlock(ucb);
  365. r = relstate(ucb, raddr, rport, "kick");
  366. r->sndseq = NEXTSEQ(r->sndseq);
  367. hnputl(rh->relseq, r->sndseq);
  368. hnputl(rh->relsgen, r->sndgen);
  369. hnputl(rh->relack, r->rcvseq); /* ACK last rcvd packet */
  370. hnputl(rh->relagen, r->rcvgen);
  371. if(r->rcvseq != r->acksent)
  372. r->acksent = r->rcvseq;
  373. hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
  374. relackq(r, bp);
  375. qunlock(ucb);
  376. upriv->ustats.rudpOutDatagrams++;
  377. DPRINT("sent: %lud/%lud, %lud/%lud\n",
  378. r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
  379. doipoput(c, f, bp, 0, c->ttl, c->tos);
  380. if(waserror()) {
  381. relput(r);
  382. qunlock(&r->lock);
  383. nexterror();
  384. }
  385. /* flow control of sorts */
  386. qlock(&r->lock);
  387. if(UNACKED(r) > Maxunacked){
  388. r->blocked = 1;
  389. sleep(&r->vous, flow, r);
  390. r->blocked = 0;
  391. }
  392. qunlock(&r->lock);
  393. relput(r);
  394. poperror();
  395. }
  396. void
  397. rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
  398. {
  399. int len, olen, ottl;
  400. Udphdr *uh;
  401. Conv *c;
  402. Rudpcb *ucb;
  403. uint8_t raddr[IPaddrlen], laddr[IPaddrlen];
  404. uint16_t rport, lport;
  405. Rudppriv *upriv;
  406. Fs *f;
  407. uint8_t *p;
  408. upriv = rudp->priv;
  409. f = rudp->f;
  410. upriv->ustats.rudpInDatagrams++;
  411. uh = (Udphdr*)(bp->rp);
  412. /* Put back pseudo header for checksum
  413. * (remember old values for icmpnoconv())
  414. */
  415. ottl = uh->Unused;
  416. uh->Unused = 0;
  417. len = nhgets(uh->udplen);
  418. olen = nhgets(uh->udpplen);
  419. hnputs(uh->udpplen, len);
  420. v4tov6(raddr, uh->udpsrc);
  421. v4tov6(laddr, uh->udpdst);
  422. lport = nhgets(uh->udpdport);
  423. rport = nhgets(uh->udpsport);
  424. if(nhgets(uh->udpcksum)) {
  425. if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
  426. upriv->ustats.rudpInErrors++;
  427. upriv->csumerr++;
  428. netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
  429. DPRINT("rudp: checksum error %I\n", raddr);
  430. freeblist(bp);
  431. return;
  432. }
  433. }
  434. qlock(rudp);
  435. c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
  436. if(c == nil){
  437. /* no conversation found */
  438. upriv->ustats.rudpNoPorts++;
  439. qunlock(rudp);
  440. netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
  441. laddr, lport);
  442. uh->Unused = ottl;
  443. hnputs(uh->udpplen, olen);
  444. icmpnoconv(f, bp);
  445. freeblist(bp);
  446. return;
  447. }
  448. ucb = (Rudpcb*)c->ptcl;
  449. qlock(ucb);
  450. qunlock(rudp);
  451. if(reliput(c, bp, raddr, rport) < 0){
  452. qunlock(ucb);
  453. freeb(bp);
  454. return;
  455. }
  456. /*
  457. * Trim the packet down to data size
  458. */
  459. len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
  460. bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
  461. if(bp == nil) {
  462. netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n",
  463. raddr, rport, laddr, lport);
  464. DPRINT("rudp: len err %I.%d -> %I.%d\n",
  465. raddr, rport, laddr, lport);
  466. upriv->lenerr++;
  467. return;
  468. }
  469. netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n",
  470. raddr, rport, laddr, lport, len);
  471. switch(ucb->headers){
  472. case 7:
  473. /* pass the src address */
  474. bp = padblock(bp, UDP_USEAD7);
  475. p = bp->rp;
  476. ipmove(p, raddr); p += IPaddrlen;
  477. ipmove(p, laddr); p += IPaddrlen;
  478. ipmove(p, ifc->lifc->local); p += IPaddrlen;
  479. hnputs(p, rport); p += 2;
  480. hnputs(p, lport);
  481. break;
  482. default:
  483. /* connection oriented rudp */
  484. if(ipcmp(c->raddr, IPnoaddr) == 0){
  485. /* save the src address in the conversation */
  486. ipmove(c->raddr, raddr);
  487. c->rport = rport;
  488. /* reply with the same ip address (if not broadcast) */
  489. if(ipforme(f, laddr) == Runi)
  490. ipmove(c->laddr, laddr);
  491. else
  492. v4tov6(c->laddr, ifc->lifc->local);
  493. }
  494. break;
  495. }
  496. if(bp->next)
  497. bp = concatblock(bp);
  498. if(qfull(c->rq)) {
  499. netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
  500. laddr, lport);
  501. freeblist(bp);
  502. }
  503. else
  504. qpass(c->rq, bp);
  505. qunlock(ucb);
  506. }
  507. static char *rudpunknown = "unknown rudp ctl request";
  508. char*
  509. rudpctl(Conv *c, char **f, int n)
  510. {
  511. Rudpcb *ucb;
  512. uint8_t ip[IPaddrlen];
  513. int x;
  514. ucb = (Rudpcb*)c->ptcl;
  515. if(n < 1)
  516. return rudpunknown;
  517. if(strcmp(f[0], "headers") == 0){
  518. ucb->headers = 7; /* new headers format */
  519. return nil;
  520. } else if(strcmp(f[0], "hangup") == 0){
  521. if(n < 3)
  522. return "bad syntax";
  523. if (parseip(ip, f[1]) == -1)
  524. return Ebadip;
  525. x = atoi(f[2]);
  526. qlock(ucb);
  527. relforget(c, ip, x, 1);
  528. qunlock(ucb);
  529. return nil;
  530. } else if(strcmp(f[0], "randdrop") == 0){
  531. x = 10; /* default is 10% */
  532. if(n > 1)
  533. x = atoi(f[1]);
  534. if(x > 100 || x < 0)
  535. return "illegal rudp drop rate";
  536. ucb->randdrop = x;
  537. return nil;
  538. }
  539. return rudpunknown;
  540. }
  541. void
  542. rudpadvise(Proto *rudp, Block *bp, char *msg)
  543. {
  544. Udphdr *h;
  545. uint8_t source[IPaddrlen], dest[IPaddrlen];
  546. uint16_t psource, pdest;
  547. Conv *s, **p;
  548. h = (Udphdr*)(bp->rp);
  549. v4tov6(dest, h->udpdst);
  550. v4tov6(source, h->udpsrc);
  551. psource = nhgets(h->udpsport);
  552. pdest = nhgets(h->udpdport);
  553. /* Look for a connection */
  554. for(p = rudp->conv; *p; p++) {
  555. s = *p;
  556. if(s->rport == pdest)
  557. if(s->lport == psource)
  558. if(ipcmp(s->raddr, dest) == 0)
  559. if(ipcmp(s->laddr, source) == 0){
  560. qhangup(s->rq, msg);
  561. qhangup(s->wq, msg);
  562. break;
  563. }
  564. }
  565. freeblist(bp);
  566. }
  567. int
  568. rudpstats(Proto *rudp, char *buf, int len)
  569. {
  570. Rudppriv *upriv;
  571. upriv = rudp->priv;
  572. return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
  573. upriv->ustats.rudpInDatagrams,
  574. upriv->ustats.rudpNoPorts,
  575. upriv->ustats.rudpInErrors,
  576. upriv->ustats.rudpOutDatagrams,
  577. upriv->rxmits,
  578. upriv->orders);
  579. }
  580. void
  581. rudpinit(Fs *fs)
  582. {
  583. Proto *rudp;
  584. rudp = smalloc(sizeof(Proto));
  585. rudp->priv = smalloc(sizeof(Rudppriv));
  586. rudp->name = "rudp";
  587. rudp->connect = rudpconnect;
  588. rudp->announce = rudpannounce;
  589. rudp->ctl = rudpctl;
  590. rudp->state = rudpstate;
  591. rudp->create = rudpcreate;
  592. rudp->close = rudpclose;
  593. rudp->rcv = rudpiput;
  594. rudp->advise = rudpadvise;
  595. rudp->stats = rudpstats;
  596. rudp->ipproto = IP_UDPPROTO;
  597. rudp->nc = 32;
  598. rudp->ptclsize = sizeof(Rudpcb);
  599. Fsproto(fs, rudp);
  600. }
  601. /*********************************************/
  602. /* Here starts the reliable helper functions */
  603. /*********************************************/
  604. /*
  605. * Enqueue a copy of an unacked block for possible retransmissions
  606. */
  607. void
  608. relackq(Reliable *r, Block *bp)
  609. {
  610. Block *np;
  611. np = copyblock(bp, blocklen(bp));
  612. if(r->unacked)
  613. r->unackedtail->list = np;
  614. else {
  615. /* restart timer */
  616. r->timeout = 0;
  617. r->xmits = 1;
  618. r->unacked = np;
  619. }
  620. r->unackedtail = np;
  621. np->list = nil;
  622. }
  623. /*
  624. * retransmit unacked blocks
  625. */
  626. void
  627. relackproc(void *a)
  628. {
  629. Proc *up = externup();
  630. Rudpcb *ucb;
  631. Proto *rudp;
  632. Reliable *r;
  633. Conv **s, *c;
  634. rudp = (Proto *)a;
  635. loop:
  636. tsleep(&up->sleep, return0, 0, Rudptickms);
  637. for(s = rudp->conv; *s; s++) {
  638. c = *s;
  639. ucb = (Rudpcb*)c->ptcl;
  640. qlock(ucb);
  641. for(r = ucb->r; r; r = r->next) {
  642. if(r->unacked != nil){
  643. r->timeout += Rudptickms;
  644. if(r->timeout > Rudprxms*r->xmits)
  645. relrexmit(c, r);
  646. }
  647. if(r->acksent != r->rcvseq)
  648. relsendack(c, r, 0);
  649. }
  650. qunlock(ucb);
  651. }
  652. goto loop;
  653. }
  654. /*
  655. * get the state record for a conversation
  656. */
  657. Reliable*
  658. relstate(Rudpcb *ucb, uint8_t *addr, uint16_t port, char *from)
  659. {
  660. Reliable *r, **l;
  661. l = &ucb->r;
  662. for(r = *l; r; r = *l){
  663. if(memcmp(addr, r->addr, IPaddrlen) == 0 &&
  664. port == r->port)
  665. break;
  666. l = &r->next;
  667. }
  668. /* no state for this addr/port, create some */
  669. if(r == nil){
  670. while(generation == 0)
  671. generation = rand();
  672. DPRINT("from %s new state %lud for %I!%ud\n",
  673. from, generation, addr, port);
  674. r = smalloc(sizeof(Reliable));
  675. memmove(r->addr, addr, IPaddrlen);
  676. r->port = port;
  677. r->unacked = 0;
  678. if(generation == Hangupgen)
  679. generation++;
  680. r->sndgen = generation++;
  681. r->sndseq = 0;
  682. r->ackrcvd = 0;
  683. r->rcvgen = 0;
  684. r->rcvseq = 0;
  685. r->acksent = 0;
  686. r->xmits = 0;
  687. r->timeout = 0;
  688. r->ref = 0;
  689. incref(r); /* one reference for being in the list */
  690. *l = r;
  691. }
  692. incref(r);
  693. return r;
  694. }
  695. void
  696. relput(Reliable *r)
  697. {
  698. if(decref(r) == 0)
  699. free(r);
  700. }
  701. /*
  702. * forget a Reliable state
  703. */
  704. void
  705. relforget(Conv *c, uint8_t *ip, int port, int originator)
  706. {
  707. Rudpcb *ucb;
  708. Reliable *r, **l;
  709. ucb = (Rudpcb*)c->ptcl;
  710. l = &ucb->r;
  711. for(r = *l; r; r = *l){
  712. if(ipcmp(ip, r->addr) == 0 && port == r->port){
  713. *l = r->next;
  714. if(originator)
  715. relsendack(c, r, 1);
  716. relhangup(c, r);
  717. relput(r); /* remove from the list */
  718. break;
  719. }
  720. l = &r->next;
  721. }
  722. }
  723. /*
  724. * process a rcvd reliable packet. return -1 if not to be passed to user process,
  725. * 0 therwise.
  726. *
  727. * called with ucb locked.
  728. */
  729. int
  730. reliput(Conv *c, Block *bp, uint8_t *addr, uint16_t port)
  731. {
  732. Block *nbp;
  733. Rudpcb *ucb;
  734. Rudppriv *upriv;
  735. Udphdr *uh;
  736. Reliable *r;
  737. Rudphdr *rh;
  738. uint32_t seq, ack, sgen, agen, ackreal;
  739. int rv = -1;
  740. /* get fields */
  741. uh = (Udphdr*)(bp->rp);
  742. rh = (Rudphdr*)uh;
  743. seq = nhgetl(rh->relseq);
  744. sgen = nhgetl(rh->relsgen);
  745. ack = nhgetl(rh->relack);
  746. agen = nhgetl(rh->relagen);
  747. upriv = c->p->priv;
  748. ucb = (Rudpcb*)c->ptcl;
  749. r = relstate(ucb, addr, port, "input");
  750. DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n",
  751. seq, sgen, ack, agen, r->sndgen);
  752. /* if acking an incorrect generation, ignore */
  753. if(ack && agen != r->sndgen)
  754. goto out;
  755. /* Look for a hangup */
  756. if(sgen == Hangupgen) {
  757. if(agen == r->sndgen)
  758. relforget(c, addr, port, 0);
  759. goto out;
  760. }
  761. /* make sure we're not talking to a new remote side */
  762. if(r->rcvgen != sgen){
  763. if(seq != 0 && seq != 1)
  764. goto out;
  765. /* new connection */
  766. if(r->rcvgen != 0){
  767. DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
  768. relhangup(c, r);
  769. }
  770. r->rcvgen = sgen;
  771. }
  772. /* dequeue acked packets */
  773. if(ack && agen == r->sndgen){
  774. ackreal = 0;
  775. while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
  776. nbp = r->unacked;
  777. r->unacked = nbp->list;
  778. DPRINT("%lud/%lud acked, r->sndgen = %lud\n",
  779. ack, agen, r->sndgen);
  780. freeb(nbp);
  781. r->ackrcvd = NEXTSEQ(r->ackrcvd);
  782. ackreal = 1;
  783. }
  784. /* flow control */
  785. if(UNACKED(r) < Maxunacked/8 && r->blocked)
  786. wakeup(&r->vous);
  787. /*
  788. * retransmit next packet if the acked packet
  789. * was transmitted more than once
  790. */
  791. if(ackreal && r->unacked != nil){
  792. r->timeout = 0;
  793. if(r->xmits > 1){
  794. r->xmits = 1;
  795. relrexmit(c, r);
  796. }
  797. }
  798. }
  799. /* no message or input queue full */
  800. if(seq == 0 || qfull(c->rq))
  801. goto out;
  802. /* refuse out of order delivery */
  803. if(seq != NEXTSEQ(r->rcvseq)){
  804. relsendack(c, r, 0); /* tell him we got it already */
  805. upriv->orders++;
  806. DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
  807. goto out;
  808. }
  809. r->rcvseq = seq;
  810. rv = 0;
  811. out:
  812. relput(r);
  813. return rv;
  814. }
  815. void
  816. relsendack(Conv *c, Reliable *r, int hangup)
  817. {
  818. Udphdr *uh;
  819. Block *bp;
  820. Rudphdr *rh;
  821. int ptcllen;
  822. Fs *f;
  823. bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
  824. if(bp == nil)
  825. return;
  826. bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
  827. f = c->p->f;
  828. uh = (Udphdr *)(bp->rp);
  829. uh->vihl = IP_VER4;
  830. rh = (Rudphdr*)uh;
  831. ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
  832. uh->Unused = 0;
  833. uh->udpproto = IP_UDPPROTO;
  834. uh->frag[0] = 0;
  835. uh->frag[1] = 0;
  836. hnputs(uh->udpplen, ptcllen);
  837. v6tov4(uh->udpdst, r->addr);
  838. hnputs(uh->udpdport, r->port);
  839. hnputs(uh->udpsport, c->lport);
  840. if(ipcmp(c->laddr, IPnoaddr) == 0)
  841. findlocalip(f, c->laddr, c->raddr);
  842. v6tov4(uh->udpsrc, c->laddr);
  843. hnputs(uh->udplen, ptcllen);
  844. if(hangup)
  845. hnputl(rh->relsgen, Hangupgen);
  846. else
  847. hnputl(rh->relsgen, r->sndgen);
  848. hnputl(rh->relseq, 0);
  849. hnputl(rh->relagen, r->rcvgen);
  850. hnputl(rh->relack, r->rcvseq);
  851. if(r->acksent < r->rcvseq)
  852. r->acksent = r->rcvseq;
  853. uh->udpcksum[0] = 0;
  854. uh->udpcksum[1] = 0;
  855. hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
  856. DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
  857. doipoput(c, f, bp, 0, c->ttl, c->tos);
  858. }
  859. /*
  860. * called with ucb locked (and c locked if user initiated close)
  861. */
  862. void
  863. relhangup(Conv *c, Reliable *r)
  864. {
  865. int n;
  866. Block *bp;
  867. char hup[ERRMAX];
  868. n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
  869. qproduce(c->eq, hup, n);
  870. /*
  871. * dump any unacked outgoing messages
  872. */
  873. for(bp = r->unacked; bp != nil; bp = r->unacked){
  874. r->unacked = bp->list;
  875. bp->list = nil;
  876. freeb(bp);
  877. }
  878. r->rcvgen = 0;
  879. r->rcvseq = 0;
  880. r->acksent = 0;
  881. if(generation == Hangupgen)
  882. generation++;
  883. r->sndgen = generation++;
  884. r->sndseq = 0;
  885. r->ackrcvd = 0;
  886. r->xmits = 0;
  887. r->timeout = 0;
  888. wakeup(&r->vous);
  889. }
  890. /*
  891. * called with ucb locked
  892. */
  893. void
  894. relrexmit(Conv *c, Reliable *r)
  895. {
  896. Rudppriv *upriv;
  897. Block *np;
  898. Fs *f;
  899. upriv = c->p->priv;
  900. f = c->p->f;
  901. r->timeout = 0;
  902. if(r->xmits++ > Rudpmaxxmit){
  903. relhangup(c, r);
  904. return;
  905. }
  906. upriv->rxmits++;
  907. np = copyblock(r->unacked, blocklen(r->unacked));
  908. DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
  909. doipoput(c, f, np, 0, c->ttl, c->tos);
  910. }