rudp.c 21 KB


  1. /*
  2. * This protocol is compatible with UDP's packet format.
  3. * It could be done over UDP if need be.
  4. */
  5. #include "u.h"
  6. #include "../port/lib.h"
  7. #include "mem.h"
  8. #include "dat.h"
  9. #include "fns.h"
  10. #include "../port/error.h"
  11. #include "ip.h"
  12. #define DEBUG 0
  13. #define DPRINT if(DEBUG)print
  14. #define SEQDIFF(a,b) ( (a)>=(b)?\
  15. (a)-(b):\
  16. 0xffffffffUL-((b)-(a)) )
  17. #define INSEQ(a,start,end) ( (start)<=(end)?\
  18. ((a)>(start)&&(a)<=(end)):\
  19. ((a)>(start)||(a)<=(end)) )
  20. #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
  21. #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
  22. enum
  23. {
  24. UDP_HDRSIZE = 20, /* pseudo header + udp header */
  25. UDP_PHDRSIZE = 12, /* pseudo header */
  26. UDP_RHDRSIZE = 36, /* pseudo header + udp header + rudp header */
  27. UDP_IPHDR = 8, /* ip header */
  28. IP_UDPPROTO = 254,
  29. UDP_USEAD6 = 36,
  30. UDP_USEAD4 = 12,
  31. Rudprxms = 200,
  32. Rudptickms = 50,
  33. Rudpmaxxmit = 10,
  34. Maxunacked = 100,
  35. };
  36. #define Hangupgen 0xffffffff /* used only in hangup messages */
  37. typedef struct Udphdr Udphdr;
  38. struct Udphdr
  39. {
  40. /* ip header */
  41. uchar vihl; /* Version and header length */
  42. uchar tos; /* Type of service */
  43. uchar length[2]; /* packet length */
  44. uchar id[2]; /* Identification */
  45. uchar frag[2]; /* Fragment information */
  46. /* pseudo header starts here */
  47. uchar Unused;
  48. uchar udpproto; /* Protocol */
  49. uchar udpplen[2]; /* Header plus data length */
  50. uchar udpsrc[4]; /* Ip source */
  51. uchar udpdst[4]; /* Ip destination */
  52. /* udp header */
  53. uchar udpsport[2]; /* Source port */
  54. uchar udpdport[2]; /* Destination port */
  55. uchar udplen[2]; /* data length */
  56. uchar udpcksum[2]; /* Checksum */
  57. };
  58. typedef struct Rudphdr Rudphdr;
  59. struct Rudphdr
  60. {
  61. /* ip header */
  62. uchar vihl; /* Version and header length */
  63. uchar tos; /* Type of service */
  64. uchar length[2]; /* packet length */
  65. uchar id[2]; /* Identification */
  66. uchar frag[2]; /* Fragment information */
  67. /* pseudo header starts here */
  68. uchar Unused;
  69. uchar udpproto; /* Protocol */
  70. uchar udpplen[2]; /* Header plus data length */
  71. uchar udpsrc[4]; /* Ip source */
  72. uchar udpdst[4]; /* Ip destination */
  73. /* udp header */
  74. uchar udpsport[2]; /* Source port */
  75. uchar udpdport[2]; /* Destination port */
  76. uchar udplen[2]; /* data length (includes rudp header) */
  77. uchar udpcksum[2]; /* Checksum */
  78. /* rudp header */
  79. uchar relseq[4]; /* id of this packet (or 0) */
  80. uchar relsgen[4]; /* generation/time stamp */
  81. uchar relack[4]; /* packet being acked (or 0) */
  82. uchar relagen[4]; /* generation/time stamp */
  83. };
  84. /*
  85. * one state structure per destination
  86. */
  87. typedef struct Reliable Reliable;
  88. struct Reliable
  89. {
  90. Ref;
  91. Reliable *next;
  92. uchar addr[IPaddrlen]; /* always V6 when put here */
  93. ushort port;
  94. Block *unacked; /* unacked msg list */
  95. Block *unackedtail; /* and its tail */
  96. int timeout; /* time since first unacked msg sent */
  97. int xmits; /* number of times first unacked msg sent */
  98. ulong sndseq; /* next packet to be sent */
  99. ulong sndgen; /* and its generation */
  100. ulong rcvseq; /* last packet received */
  101. ulong rcvgen; /* and its generation */
  102. ulong acksent; /* last ack sent */
  103. ulong ackrcvd; /* last msg for which ack was rcvd */
  104. /* flow control */
  105. QLock lock;
  106. Rendez vous;
  107. int blocked;
  108. };
  109. /* MIB II counters */
  110. typedef struct Rudpstats Rudpstats;
  111. struct Rudpstats
  112. {
  113. ulong rudpInDatagrams;
  114. ulong rudpNoPorts;
  115. ulong rudpInErrors;
  116. ulong rudpOutDatagrams;
  117. };
  118. typedef struct Rudppriv Rudppriv;
  119. struct Rudppriv
  120. {
  121. Rendez vous;
  122. Ipht ht;
  123. /* MIB counters */
  124. Rudpstats ustats;
  125. /* non-MIB stats */
  126. ulong csumerr; /* checksum errors */
  127. ulong lenerr; /* short packet */
  128. ulong rxmits; /* # of retransmissions */
  129. ulong orders; /* # of out of order pkts */
  130. /* keeping track of the ack kproc */
  131. int ackprocstarted;
  132. QLock apl;
  133. };
  134. static ulong generation = 0;
  135. static Rendez rend;
  136. /*
  137. * protocol specific part of Conv
  138. */
  139. typedef struct Rudpcb Rudpcb;
  140. struct Rudpcb
  141. {
  142. QLock;
  143. uchar headers;
  144. uchar randdrop;
  145. Reliable *r;
  146. };
  147. /*
  148. * local functions
  149. */
  150. void relsendack(Conv*, Reliable*, int);
  151. int reliput(Conv*, Block*, uchar*, ushort);
  152. Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
  153. void relput(Reliable*);
  154. void relforget(Conv *, uchar*, int, int);
  155. void relackproc(void *);
  156. void relackq(Reliable *, Block*);
  157. void relhangup(Conv *, Reliable*);
  158. void relrexmit(Conv *, Reliable*);
  159. void relput(Reliable*);
  160. void rudpkick(void *x);
  161. static void
  162. rudpstartackproc(Proto *rudp)
  163. {
  164. Rudppriv *rpriv;
  165. char kpname[KNAMELEN];
  166. rpriv = rudp->priv;
  167. if(rpriv->ackprocstarted == 0){
  168. qlock(&rpriv->apl);
  169. if(rpriv->ackprocstarted == 0){
  170. sprint(kpname, "#I%drudpack", rudp->f->dev);
  171. kproc(kpname, relackproc, rudp);
  172. rpriv->ackprocstarted = 1;
  173. }
  174. qunlock(&rpriv->apl);
  175. }
  176. }
  177. static char*
  178. rudpconnect(Conv *c, char **argv, int argc)
  179. {
  180. char *e;
  181. Rudppriv *upriv;
  182. upriv = c->p->priv;
  183. rudpstartackproc(c->p);
  184. e = Fsstdconnect(c, argv, argc);
  185. Fsconnected(c, e);
  186. iphtadd(&upriv->ht, c);
  187. return e;
  188. }
  189. static int
  190. rudpstate(Conv *c, char *state, int n)
  191. {
  192. Rudpcb *ucb;
  193. Reliable *r;
  194. int m;
  195. m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
  196. ucb = (Rudpcb*)c->ptcl;
  197. qlock(ucb);
  198. for(r = ucb->r; r; r = r->next)
  199. m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
  200. qunlock(ucb);
  201. return m;
  202. }
  203. static char*
  204. rudpannounce(Conv *c, char** argv, int argc)
  205. {
  206. char *e;
  207. Rudppriv *upriv;
  208. upriv = c->p->priv;
  209. rudpstartackproc(c->p);
  210. e = Fsstdannounce(c, argv, argc);
  211. if(e != nil)
  212. return e;
  213. Fsconnected(c, nil);
  214. iphtadd(&upriv->ht, c);
  215. return nil;
  216. }
  217. static void
  218. rudpcreate(Conv *c)
  219. {
  220. c->rq = qopen(64*1024, Qmsg, 0, 0);
  221. c->wq = qopen(64*1024, Qkick, rudpkick, c);
  222. }
  223. static void
  224. rudpclose(Conv *c)
  225. {
  226. Rudpcb *ucb;
  227. Reliable *r, *nr;
  228. Rudppriv *upriv;
  229. upriv = c->p->priv;
  230. iphtrem(&upriv->ht, c);
  231. /* force out any delayed acks */
  232. ucb = (Rudpcb*)c->ptcl;
  233. qlock(ucb);
  234. for(r = ucb->r; r; r = r->next){
  235. if(r->acksent != r->rcvseq)
  236. relsendack(c, r, 0);
  237. }
  238. qunlock(ucb);
  239. qclose(c->rq);
  240. qclose(c->wq);
  241. qclose(c->eq);
  242. ipmove(c->laddr, IPnoaddr);
  243. ipmove(c->raddr, IPnoaddr);
  244. c->lport = 0;
  245. c->rport = 0;
  246. ucb->headers = 0;
  247. ucb->randdrop = 0;
  248. qlock(ucb);
  249. for(r = ucb->r; r; r = nr){
  250. if(r->acksent != r->rcvseq)
  251. relsendack(c, r, 0);
  252. nr = r->next;
  253. relhangup(c, r);
  254. relput(r);
  255. }
  256. ucb->r = 0;
  257. qunlock(ucb);
  258. }
  259. /*
  260. * randomly don't send packets
  261. */
  262. static void
  263. doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
  264. {
  265. Rudpcb *ucb;
  266. ucb = (Rudpcb*)c->ptcl;
  267. if(ucb->randdrop && nrand(100) < ucb->randdrop)
  268. freeblist(bp);
  269. else
  270. ipoput4(f, bp, x, ttl, tos, nil);
  271. }
  272. int
  273. flow(void *v)
  274. {
  275. Reliable *r = v;
  276. return UNACKED(r) <= Maxunacked;
  277. }
  278. void
  279. rudpkick(void *x)
  280. {
  281. Conv *c = x;
  282. Udphdr *uh;
  283. ushort rport;
  284. uchar laddr[IPaddrlen], raddr[IPaddrlen];
  285. Block *bp;
  286. Rudpcb *ucb;
  287. Rudphdr *rh;
  288. Reliable *r;
  289. int dlen, ptcllen;
  290. Rudppriv *upriv;
  291. Fs *f;
  292. upriv = c->p->priv;
  293. f = c->p->f;
  294. netlog(c->p->f, Logrudp, "rudp: kick\n");
  295. bp = qget(c->wq);
  296. if(bp == nil)
  297. return;
  298. ucb = (Rudpcb*)c->ptcl;
  299. switch(ucb->headers) {
  300. case 6:
  301. /* get user specified addresses */
  302. bp = pullupblock(bp, UDP_USEAD6);
  303. if(bp == nil)
  304. return;
  305. ipmove(raddr, bp->rp);
  306. bp->rp += IPaddrlen;
  307. ipmove(laddr, bp->rp);
  308. bp->rp += IPaddrlen;
  309. /* pick interface closest to dest */
  310. if(ipforme(f, laddr) != Runi)
  311. findlocalip(f, laddr, raddr);
  312. rport = nhgets(bp->rp);
  313. bp->rp += 4; /* Igonore local port */
  314. break;
  315. case 4:
  316. bp = pullupblock(bp, UDP_USEAD4);
  317. if(bp == nil)
  318. return;
  319. v4tov6(raddr, bp->rp);
  320. bp->rp += IPv4addrlen;
  321. v4tov6(laddr, bp->rp);
  322. bp->rp += IPv4addrlen;
  323. if(ipforme(f, laddr) != Runi)
  324. findlocalip(f, laddr, raddr);
  325. rport = nhgets(bp->rp);
  326. bp->rp += 4; /* Igonore local port */
  327. break;
  328. default:
  329. ipmove(raddr, c->raddr);
  330. ipmove(laddr, c->laddr);
  331. rport = c->rport;
  332. break;
  333. }
  334. dlen = blocklen(bp);
  335. /* Make space to fit rudp & ip header */
  336. bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
  337. if(bp == nil)
  338. return;
  339. uh = (Udphdr *)(bp->rp);
  340. uh->vihl = IP_VER4;
  341. rh = (Rudphdr*)uh;
  342. ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
  343. uh->Unused = 0;
  344. uh->udpproto = IP_UDPPROTO;
  345. uh->frag[0] = 0;
  346. uh->frag[1] = 0;
  347. hnputs(uh->udpplen, ptcllen);
  348. switch(ucb->headers){
  349. case 4:
  350. case 6:
  351. v6tov4(uh->udpdst, raddr);
  352. hnputs(uh->udpdport, rport);
  353. v6tov4(uh->udpsrc, laddr);
  354. break;
  355. default:
  356. v6tov4(uh->udpdst, c->raddr);
  357. hnputs(uh->udpdport, c->rport);
  358. if(ipcmp(c->laddr, IPnoaddr) == 0)
  359. findlocalip(f, c->laddr, c->raddr);
  360. v6tov4(uh->udpsrc, c->laddr);
  361. break;
  362. }
  363. hnputs(uh->udpsport, c->lport);
  364. hnputs(uh->udplen, ptcllen);
  365. uh->udpcksum[0] = 0;
  366. uh->udpcksum[1] = 0;
  367. qlock(ucb);
  368. r = relstate(ucb, raddr, rport, "kick");
  369. r->sndseq = NEXTSEQ(r->sndseq);
  370. hnputl(rh->relseq, r->sndseq);
  371. hnputl(rh->relsgen, r->sndgen);
  372. hnputl(rh->relack, r->rcvseq); /* ACK last rcvd packet */
  373. hnputl(rh->relagen, r->rcvgen);
  374. if(r->rcvseq != r->acksent)
  375. r->acksent = r->rcvseq;
  376. hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
  377. relackq(r, bp);
  378. qunlock(ucb);
  379. upriv->ustats.rudpOutDatagrams++;
  380. DPRINT("sent: %lud/%lud, %lud/%lud\n",
  381. r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
  382. doipoput(c, f, bp, 0, c->ttl, c->tos);
  383. if(waserror()) {
  384. relput(r);
  385. qunlock(&r->lock);
  386. nexterror();
  387. }
  388. /* flow control of sorts */
  389. qlock(&r->lock);
  390. if(UNACKED(r) > Maxunacked){
  391. r->blocked = 1;
  392. sleep(&r->vous, flow, r);
  393. r->blocked = 0;
  394. }
  395. qunlock(&r->lock);
  396. relput(r);
  397. poperror();
  398. }
  399. void
  400. rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
  401. {
  402. int len, olen, ottl;
  403. Udphdr *uh;
  404. Conv *c;
  405. Rudpcb *ucb;
  406. uchar raddr[IPaddrlen], laddr[IPaddrlen];
  407. ushort rport, lport;
  408. Rudppriv *upriv;
  409. Fs *f;
  410. upriv = rudp->priv;
  411. f = rudp->f;
  412. upriv->ustats.rudpInDatagrams++;
  413. uh = (Udphdr*)(bp->rp);
  414. /* Put back pseudo header for checksum
  415. * (remember old values for icmpnoconv())
  416. */
  417. ottl = uh->Unused;
  418. uh->Unused = 0;
  419. len = nhgets(uh->udplen);
  420. olen = nhgets(uh->udpplen);
  421. hnputs(uh->udpplen, len);
  422. v4tov6(raddr, uh->udpsrc);
  423. v4tov6(laddr, uh->udpdst);
  424. lport = nhgets(uh->udpdport);
  425. rport = nhgets(uh->udpsport);
  426. if(nhgets(uh->udpcksum)) {
  427. if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
  428. upriv->ustats.rudpInErrors++;
  429. upriv->csumerr++;
  430. netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
  431. DPRINT("rudp: checksum error %I\n", raddr);
  432. freeblist(bp);
  433. return;
  434. }
  435. }
  436. qlock(rudp);
  437. c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
  438. if(c == nil){
  439. /* no converstation found */
  440. upriv->ustats.rudpNoPorts++;
  441. qunlock(rudp);
  442. netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
  443. laddr, lport);
  444. uh->Unused = ottl;
  445. hnputs(uh->udpplen, olen);
  446. icmpnoconv(f, bp);
  447. freeblist(bp);
  448. return;
  449. }
  450. ucb = (Rudpcb*)c->ptcl;
  451. qlock(ucb);
  452. qunlock(rudp);
  453. if(reliput(c, bp, raddr, rport) < 0){
  454. qunlock(ucb);
  455. freeb(bp);
  456. return;
  457. }
  458. /*
  459. * Trim the packet down to data size
  460. */
  461. len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
  462. bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
  463. if(bp == nil) {
  464. netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n",
  465. raddr, rport, laddr, lport);
  466. DPRINT("rudp: len err %I.%d -> %I.%d\n",
  467. raddr, rport, laddr, lport);
  468. upriv->lenerr++;
  469. return;
  470. }
  471. netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n",
  472. raddr, rport, laddr, lport, len);
  473. switch(ucb->headers){
  474. case 6:
  475. /* pass the src address */
  476. bp = padblock(bp, UDP_USEAD6);
  477. ipmove(bp->rp, raddr);
  478. if(ipforme(f, laddr) == Runi)
  479. ipmove(bp->rp+IPaddrlen, laddr);
  480. else
  481. ipmove(bp->rp+IPaddrlen, ifc->lifc->local);
  482. hnputs(bp->rp+2*IPaddrlen, rport);
  483. hnputs(bp->rp+2*IPaddrlen+2, lport);
  484. break;
  485. case 4:
  486. /* pass the src address */
  487. bp = padblock(bp, UDP_USEAD4);
  488. v6tov4(bp->rp, raddr);
  489. if(ipforme(f, laddr) == Runi)
  490. v6tov4(bp->rp+IPv4addrlen, laddr);
  491. else
  492. v6tov4(bp->rp+IPv4addrlen, ifc->lifc->local);
  493. hnputs(bp->rp + 2*IPv4addrlen, rport);
  494. hnputs(bp->rp + 2*IPv4addrlen + 2, lport);
  495. break;
  496. default:
  497. /* connection oriented rudp */
  498. if(ipcmp(c->raddr, IPnoaddr) == 0){
  499. /* save the src address in the conversation */
  500. ipmove(c->raddr, raddr);
  501. c->rport = rport;
  502. /* reply with the same ip address (if not broadcast) */
  503. if(ipforme(f, laddr) == Runi)
  504. ipmove(c->laddr, laddr);
  505. else
  506. v4tov6(c->laddr, ifc->lifc->local);
  507. }
  508. break;
  509. }
  510. if(bp->next)
  511. bp = concatblock(bp);
  512. if(qfull(c->rq)) {
  513. netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
  514. laddr, lport);
  515. freeblist(bp);
  516. }
  517. else
  518. qpass(c->rq, bp);
  519. qunlock(ucb);
  520. }
  521. static char *rudpunknown = "unknown rudp ctl request";
  522. char*
  523. rudpctl(Conv *c, char **f, int n)
  524. {
  525. Rudpcb *ucb;
  526. uchar ip[IPaddrlen];
  527. int x;
  528. ucb = (Rudpcb*)c->ptcl;
  529. if(n < 1)
  530. return rudpunknown;
  531. if(strcmp(f[0], "headers4") == 0){
  532. ucb->headers = 4;
  533. return nil;
  534. } else if(strcmp(f[0], "headers") == 0){
  535. ucb->headers = 6;
  536. return nil;
  537. } else if(strcmp(f[0], "hangup") == 0){
  538. if(n < 3)
  539. return "bad syntax";
  540. parseip(ip, f[1]);
  541. x = atoi(f[2]);
  542. qlock(ucb);
  543. relforget(c, ip, x, 1);
  544. qunlock(ucb);
  545. return nil;
  546. } else if(strcmp(f[0], "randdrop") == 0){
  547. x = 10; /* default is 10% */
  548. if(n > 1)
  549. x = atoi(f[1]);
  550. if(x > 100 || x < 0)
  551. return "illegal rudp drop rate";
  552. ucb->randdrop = x;
  553. return nil;
  554. }
  555. return rudpunknown;
  556. }
  557. void
  558. rudpadvise(Proto *rudp, Block *bp, char *msg)
  559. {
  560. Udphdr *h;
  561. uchar source[IPaddrlen], dest[IPaddrlen];
  562. ushort psource, pdest;
  563. Conv *s, **p;
  564. h = (Udphdr*)(bp->rp);
  565. v4tov6(dest, h->udpdst);
  566. v4tov6(source, h->udpsrc);
  567. psource = nhgets(h->udpsport);
  568. pdest = nhgets(h->udpdport);
  569. /* Look for a connection */
  570. for(p = rudp->conv; *p; p++) {
  571. s = *p;
  572. if(s->rport == pdest)
  573. if(s->lport == psource)
  574. if(ipcmp(s->raddr, dest) == 0)
  575. if(ipcmp(s->laddr, source) == 0){
  576. qhangup(s->rq, msg);
  577. qhangup(s->wq, msg);
  578. break;
  579. }
  580. }
  581. freeblist(bp);
  582. }
  583. int
  584. rudpstats(Proto *rudp, char *buf, int len)
  585. {
  586. Rudppriv *upriv;
  587. upriv = rudp->priv;
  588. return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
  589. upriv->ustats.rudpInDatagrams,
  590. upriv->ustats.rudpNoPorts,
  591. upriv->ustats.rudpInErrors,
  592. upriv->ustats.rudpOutDatagrams,
  593. upriv->rxmits,
  594. upriv->orders);
  595. }
  596. void
  597. rudpinit(Fs *fs)
  598. {
  599. Proto *rudp;
  600. rudp = smalloc(sizeof(Proto));
  601. rudp->priv = smalloc(sizeof(Rudppriv));
  602. rudp->name = "rudp";
  603. rudp->connect = rudpconnect;
  604. rudp->announce = rudpannounce;
  605. rudp->ctl = rudpctl;
  606. rudp->state = rudpstate;
  607. rudp->create = rudpcreate;
  608. rudp->close = rudpclose;
  609. rudp->rcv = rudpiput;
  610. rudp->advise = rudpadvise;
  611. rudp->stats = rudpstats;
  612. rudp->ipproto = IP_UDPPROTO;
  613. rudp->nc = 16;
  614. rudp->ptclsize = sizeof(Rudpcb);
  615. Fsproto(fs, rudp);
  616. }
  617. /*********************************************/
  618. /* Here starts the reliable helper functions */
  619. /*********************************************/
  620. /*
  621. * Enqueue a copy of an unacked block for possible retransmissions
  622. */
  623. void
  624. relackq(Reliable *r, Block *bp)
  625. {
  626. Block *np;
  627. np = copyblock(bp, blocklen(bp));
  628. if(r->unacked)
  629. r->unackedtail->list = np;
  630. else {
  631. /* restart timer */
  632. r->timeout = 0;
  633. r->xmits = 1;
  634. r->unacked = np;
  635. }
  636. r->unackedtail = np;
  637. np->list = nil;
  638. }
  639. /*
  640. * retransmit unacked blocks
  641. */
  642. void
  643. relackproc(void *a)
  644. {
  645. Rudpcb *ucb;
  646. Proto *rudp;
  647. Reliable *r;
  648. Conv **s, *c;
  649. Rudppriv *upriv;
  650. rudp = (Proto *)a;
  651. upriv = rudp->priv;
  652. loop:
  653. tsleep(&upriv->vous, return0, 0, Rudptickms);
  654. for(s = rudp->conv; *s; s++) {
  655. c = *s;
  656. ucb = (Rudpcb*)c->ptcl;
  657. qlock(ucb);
  658. for(r = ucb->r; r; r = r->next) {
  659. if(r->unacked != nil){
  660. r->timeout += Rudptickms;
  661. if(r->timeout > Rudprxms*r->xmits)
  662. relrexmit(c, r);
  663. }
  664. if(r->acksent != r->rcvseq)
  665. relsendack(c, r, 0);
  666. }
  667. qunlock(ucb);
  668. }
  669. goto loop;
  670. }
  671. /*
  672. * get the state record for a conversation
  673. */
  674. Reliable*
  675. relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
  676. {
  677. Reliable *r, **l;
  678. l = &ucb->r;
  679. for(r = *l; r; r = *l){
  680. if(memcmp(addr, r->addr, IPaddrlen) == 0 &&
  681. port == r->port)
  682. break;
  683. l = &r->next;
  684. }
  685. /* no state for this addr/port, create some */
  686. if(r == nil){
  687. while(generation == 0)
  688. generation = rand();
  689. DPRINT("from %s new state %lud for %I!%ud\n",
  690. from, generation, addr, port);
  691. r = smalloc(sizeof(Reliable));
  692. memmove(r->addr, addr, IPaddrlen);
  693. r->port = port;
  694. r->unacked = 0;
  695. if(generation == Hangupgen)
  696. generation++;
  697. r->sndgen = generation++;
  698. r->sndseq = 0;
  699. r->ackrcvd = 0;
  700. r->rcvgen = 0;
  701. r->rcvseq = 0;
  702. r->acksent = 0;
  703. r->xmits = 0;
  704. r->timeout = 0;
  705. r->ref = 0;
  706. incref(r); /* one reference for being in the list */
  707. *l = r;
  708. }
  709. incref(r);
  710. return r;
  711. }
  712. void
  713. relput(Reliable *r)
  714. {
  715. if(decref(r) == 0)
  716. free(r);
  717. }
  718. /*
  719. * forget a Reliable state
  720. */
  721. void
  722. relforget(Conv *c, uchar *ip, int port, int originator)
  723. {
  724. Rudpcb *ucb;
  725. Reliable *r, **l;
  726. ucb = (Rudpcb*)c->ptcl;
  727. l = &ucb->r;
  728. for(r = *l; r; r = *l){
  729. if(ipcmp(ip, r->addr) == 0 && port == r->port){
  730. *l = r->next;
  731. if(originator)
  732. relsendack(c, r, 1);
  733. relhangup(c, r);
  734. relput(r); /* remove from the list */
  735. break;
  736. }
  737. l = &r->next;
  738. }
  739. }
  740. /*
  741. * process a rcvd reliable packet. return -1 if not to be passed to user process,
  742. * 0 therwise.
  743. *
  744. * called with ucb locked.
  745. */
  746. int
  747. reliput(Conv *c, Block *bp, uchar *addr, ushort port)
  748. {
  749. Block *nbp;
  750. Rudpcb *ucb;
  751. Rudppriv *upriv;
  752. Udphdr *uh;
  753. Reliable *r;
  754. Rudphdr *rh;
  755. ulong seq, ack, sgen, agen, ackreal;
  756. int rv = -1;
  757. /* get fields */
  758. uh = (Udphdr*)(bp->rp);
  759. rh = (Rudphdr*)uh;
  760. seq = nhgetl(rh->relseq);
  761. sgen = nhgetl(rh->relsgen);
  762. ack = nhgetl(rh->relack);
  763. agen = nhgetl(rh->relagen);
  764. upriv = c->p->priv;
  765. ucb = (Rudpcb*)c->ptcl;
  766. r = relstate(ucb, addr, port, "input");
  767. DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n",
  768. seq, sgen, ack, agen, r->sndgen);
  769. /* if acking an incorrect generation, ignore */
  770. if(ack && agen != r->sndgen)
  771. goto out;
  772. /* Look for a hangup */
  773. if(sgen == Hangupgen) {
  774. if(agen == r->sndgen)
  775. relforget(c, addr, port, 0);
  776. goto out;
  777. }
  778. /* make sure we're not talking to a new remote side */
  779. if(r->rcvgen != sgen){
  780. if(seq != 0 && seq != 1)
  781. goto out;
  782. /* new connection */
  783. if(r->rcvgen != 0){
  784. DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
  785. relhangup(c, r);
  786. }
  787. r->rcvgen = sgen;
  788. }
  789. /* dequeue acked packets */
  790. if(ack && agen == r->sndgen){
  791. ackreal = 0;
  792. while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
  793. nbp = r->unacked;
  794. r->unacked = nbp->list;
  795. DPRINT("%lud/%lud acked, r->sndgen = %lud\n",
  796. ack, agen, r->sndgen);
  797. freeb(nbp);
  798. r->ackrcvd = NEXTSEQ(r->ackrcvd);
  799. ackreal = 1;
  800. }
  801. /* flow control */
  802. if(UNACKED(r) < Maxunacked/8 && r->blocked)
  803. wakeup(&r->vous);
  804. /*
  805. * retransmit next packet if the acked packet
  806. * was transmitted more than once
  807. */
  808. if(ackreal && r->unacked != nil){
  809. r->timeout = 0;
  810. if(r->xmits > 1){
  811. r->xmits = 1;
  812. relrexmit(c, r);
  813. }
  814. }
  815. }
  816. /* no message or input queue full */
  817. if(seq == 0 || qfull(c->rq))
  818. goto out;
  819. /* refuse out of order delivery */
  820. if(seq != NEXTSEQ(r->rcvseq)){
  821. relsendack(c, r, 0); /* tell him we got it already */
  822. upriv->orders++;
  823. DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
  824. goto out;
  825. }
  826. r->rcvseq = seq;
  827. rv = 0;
  828. out:
  829. relput(r);
  830. return rv;
  831. }
  832. void
  833. relsendack(Conv *c, Reliable *r, int hangup)
  834. {
  835. Udphdr *uh;
  836. Block *bp;
  837. Rudphdr *rh;
  838. int ptcllen;
  839. Fs *f;
  840. bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
  841. if(bp == nil)
  842. return;
  843. bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
  844. f = c->p->f;
  845. uh = (Udphdr *)(bp->rp);
  846. uh->vihl = IP_VER4;
  847. rh = (Rudphdr*)uh;
  848. ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
  849. uh->Unused = 0;
  850. uh->udpproto = IP_UDPPROTO;
  851. uh->frag[0] = 0;
  852. uh->frag[1] = 0;
  853. hnputs(uh->udpplen, ptcllen);
  854. v6tov4(uh->udpdst, r->addr);
  855. hnputs(uh->udpdport, r->port);
  856. hnputs(uh->udpsport, c->lport);
  857. if(ipcmp(c->laddr, IPnoaddr) == 0)
  858. findlocalip(f, c->laddr, c->raddr);
  859. v6tov4(uh->udpsrc, c->laddr);
  860. hnputs(uh->udplen, ptcllen);
  861. if(hangup)
  862. hnputl(rh->relsgen, Hangupgen);
  863. else
  864. hnputl(rh->relsgen, r->sndgen);
  865. hnputl(rh->relseq, 0);
  866. hnputl(rh->relagen, r->rcvgen);
  867. hnputl(rh->relack, r->rcvseq);
  868. if(r->acksent < r->rcvseq)
  869. r->acksent = r->rcvseq;
  870. uh->udpcksum[0] = 0;
  871. uh->udpcksum[1] = 0;
  872. hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
  873. DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
  874. doipoput(c, f, bp, 0, c->ttl, c->tos);
  875. }
  876. /*
  877. * called with ucb locked (and c locked if user initiated close)
  878. */
  879. void
  880. relhangup(Conv *c, Reliable *r)
  881. {
  882. int n;
  883. Block *bp;
  884. char hup[ERRMAX];
  885. n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
  886. qproduce(c->eq, hup, n);
  887. /*
  888. * dump any unacked outgoing messages
  889. */
  890. for(bp = r->unacked; bp != nil; bp = r->unacked){
  891. r->unacked = bp->list;
  892. bp->list = nil;
  893. freeb(bp);
  894. }
  895. r->rcvgen = 0;
  896. r->rcvseq = 0;
  897. r->acksent = 0;
  898. if(generation == Hangupgen)
  899. generation++;
  900. r->sndgen = generation++;
  901. r->sndseq = 0;
  902. r->ackrcvd = 0;
  903. r->xmits = 0;
  904. r->timeout = 0;
  905. wakeup(&r->vous);
  906. }
  907. /*
  908. * called with ucb locked
  909. */
  910. void
  911. relrexmit(Conv *c, Reliable *r)
  912. {
  913. Rudppriv *upriv;
  914. Block *np;
  915. Fs *f;
  916. upriv = c->p->priv;
  917. f = c->p->f;
  918. r->timeout = 0;
  919. if(r->xmits++ > Rudpmaxxmit){
  920. relhangup(c, r);
  921. return;
  922. }
  923. upriv->rxmits++;
  924. np = copyblock(r->unacked, blocklen(r->unacked));
  925. DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
  926. doipoput(c, f, np, 0, c->ttl, c->tos);
  927. }