rudp.c 21 KB


  1. /*
  2. * This protocol is compatible with UDP's packet format.
  3. * It could be done over UDP if need be.
  4. */
  5. #include "u.h"
  6. #include "../port/lib.h"
  7. #include "mem.h"
  8. #include "dat.h"
  9. #include "fns.h"
  10. #include "../port/error.h"
  11. #include "ip.h"
  12. #define DEBUG 0
  13. #define DPRINT if(DEBUG)print
  14. #define SEQDIFF(a,b) ( (a)>=(b)?\
  15. (a)-(b):\
  16. 0xffffffffUL-((b)-(a)) )
  17. #define INSEQ(a,start,end) ( (start)<=(end)?\
  18. ((a)>(start)&&(a)<=(end)):\
  19. ((a)>(start)||(a)<=(end)) )
  20. #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
  21. #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
  22. enum
  23. {
  24. UDP_HDRSIZE = 20, /* pseudo header + udp header */
  25. UDP_PHDRSIZE = 12, /* pseudo header */
  26. UDP_RHDRSIZE = 36, /* pseudo header + udp header + rudp header */
  27. UDP_IPHDR = 8, /* ip header */
  28. IP_UDPPROTO = 254,
  29. UDP_USEAD7 = 52,
  30. UDP_USEAD6 = 36,
  31. UDP_USEAD4 = 12,
  32. Rudprxms = 200,
  33. Rudptickms = 50,
  34. Rudpmaxxmit = 10,
  35. Maxunacked = 100,
  36. };
  37. #define Hangupgen 0xffffffff /* used only in hangup messages */
  38. typedef struct Udphdr Udphdr;
  39. struct Udphdr
  40. {
  41. /* ip header */
  42. uchar vihl; /* Version and header length */
  43. uchar tos; /* Type of service */
  44. uchar length[2]; /* packet length */
  45. uchar id[2]; /* Identification */
  46. uchar frag[2]; /* Fragment information */
  47. /* pseudo header starts here */
  48. uchar Unused;
  49. uchar udpproto; /* Protocol */
  50. uchar udpplen[2]; /* Header plus data length */
  51. uchar udpsrc[4]; /* Ip source */
  52. uchar udpdst[4]; /* Ip destination */
  53. /* udp header */
  54. uchar udpsport[2]; /* Source port */
  55. uchar udpdport[2]; /* Destination port */
  56. uchar udplen[2]; /* data length */
  57. uchar udpcksum[2]; /* Checksum */
  58. };
  59. typedef struct Rudphdr Rudphdr;
  60. struct Rudphdr
  61. {
  62. /* ip header */
  63. uchar vihl; /* Version and header length */
  64. uchar tos; /* Type of service */
  65. uchar length[2]; /* packet length */
  66. uchar id[2]; /* Identification */
  67. uchar frag[2]; /* Fragment information */
  68. /* pseudo header starts here */
  69. uchar Unused;
  70. uchar udpproto; /* Protocol */
  71. uchar udpplen[2]; /* Header plus data length */
  72. uchar udpsrc[4]; /* Ip source */
  73. uchar udpdst[4]; /* Ip destination */
  74. /* udp header */
  75. uchar udpsport[2]; /* Source port */
  76. uchar udpdport[2]; /* Destination port */
  77. uchar udplen[2]; /* data length (includes rudp header) */
  78. uchar udpcksum[2]; /* Checksum */
  79. /* rudp header */
  80. uchar relseq[4]; /* id of this packet (or 0) */
  81. uchar relsgen[4]; /* generation/time stamp */
  82. uchar relack[4]; /* packet being acked (or 0) */
  83. uchar relagen[4]; /* generation/time stamp */
  84. };
  85. /*
  86. * one state structure per destination
  87. */
  88. typedef struct Reliable Reliable;
  89. struct Reliable
  90. {
  91. Ref;
  92. Reliable *next;
  93. uchar addr[IPaddrlen]; /* always V6 when put here */
  94. ushort port;
  95. Block *unacked; /* unacked msg list */
  96. Block *unackedtail; /* and its tail */
  97. int timeout; /* time since first unacked msg sent */
  98. int xmits; /* number of times first unacked msg sent */
  99. ulong sndseq; /* next packet to be sent */
  100. ulong sndgen; /* and its generation */
  101. ulong rcvseq; /* last packet received */
  102. ulong rcvgen; /* and its generation */
  103. ulong acksent; /* last ack sent */
  104. ulong ackrcvd; /* last msg for which ack was rcvd */
  105. /* flow control */
  106. QLock lock;
  107. Rendez vous;
  108. int blocked;
  109. };
  110. /* MIB II counters */
  111. typedef struct Rudpstats Rudpstats;
  112. struct Rudpstats
  113. {
  114. ulong rudpInDatagrams;
  115. ulong rudpNoPorts;
  116. ulong rudpInErrors;
  117. ulong rudpOutDatagrams;
  118. };
  119. typedef struct Rudppriv Rudppriv;
  120. struct Rudppriv
  121. {
  122. Ipht ht;
  123. /* MIB counters */
  124. Rudpstats ustats;
  125. /* non-MIB stats */
  126. ulong csumerr; /* checksum errors */
  127. ulong lenerr; /* short packet */
  128. ulong rxmits; /* # of retransmissions */
  129. ulong orders; /* # of out of order pkts */
  130. /* keeping track of the ack kproc */
  131. int ackprocstarted;
  132. QLock apl;
  133. };
  134. static ulong generation = 0;
  135. static Rendez rend;
  136. /*
  137. * protocol specific part of Conv
  138. */
  139. typedef struct Rudpcb Rudpcb;
  140. struct Rudpcb
  141. {
  142. QLock;
  143. uchar headers;
  144. uchar randdrop;
  145. Reliable *r;
  146. };
  147. /*
  148. * local functions
  149. */
  150. void relsendack(Conv*, Reliable*, int);
  151. int reliput(Conv*, Block*, uchar*, ushort);
  152. Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
  153. void relput(Reliable*);
  154. void relforget(Conv *, uchar*, int, int);
  155. void relackproc(void *);
  156. void relackq(Reliable *, Block*);
  157. void relhangup(Conv *, Reliable*);
  158. void relrexmit(Conv *, Reliable*);
  159. void relput(Reliable*);
  160. void rudpkick(void *x);
  161. static void
  162. rudpstartackproc(Proto *rudp)
  163. {
  164. Rudppriv *rpriv;
  165. char kpname[KNAMELEN];
  166. rpriv = rudp->priv;
  167. if(rpriv->ackprocstarted == 0){
  168. qlock(&rpriv->apl);
  169. if(rpriv->ackprocstarted == 0){
  170. sprint(kpname, "#I%drudpack", rudp->f->dev);
  171. kproc(kpname, relackproc, rudp);
  172. rpriv->ackprocstarted = 1;
  173. }
  174. qunlock(&rpriv->apl);
  175. }
  176. }
  177. static char*
  178. rudpconnect(Conv *c, char **argv, int argc)
  179. {
  180. char *e;
  181. Rudppriv *upriv;
  182. upriv = c->p->priv;
  183. rudpstartackproc(c->p);
  184. e = Fsstdconnect(c, argv, argc);
  185. Fsconnected(c, e);
  186. iphtadd(&upriv->ht, c);
  187. return e;
  188. }
  189. static int
  190. rudpstate(Conv *c, char *state, int n)
  191. {
  192. Rudpcb *ucb;
  193. Reliable *r;
  194. int m;
  195. m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
  196. ucb = (Rudpcb*)c->ptcl;
  197. qlock(ucb);
  198. for(r = ucb->r; r; r = r->next)
  199. m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
  200. qunlock(ucb);
  201. return m;
  202. }
  203. static char*
  204. rudpannounce(Conv *c, char** argv, int argc)
  205. {
  206. char *e;
  207. Rudppriv *upriv;
  208. upriv = c->p->priv;
  209. rudpstartackproc(c->p);
  210. e = Fsstdannounce(c, argv, argc);
  211. if(e != nil)
  212. return e;
  213. Fsconnected(c, nil);
  214. iphtadd(&upriv->ht, c);
  215. return nil;
  216. }
  217. static void
  218. rudpcreate(Conv *c)
  219. {
  220. c->rq = qopen(64*1024, Qmsg, 0, 0);
  221. c->wq = qopen(64*1024, Qkick, rudpkick, c);
  222. }
  223. static void
  224. rudpclose(Conv *c)
  225. {
  226. Rudpcb *ucb;
  227. Reliable *r, *nr;
  228. Rudppriv *upriv;
  229. upriv = c->p->priv;
  230. iphtrem(&upriv->ht, c);
  231. /* force out any delayed acks */
  232. ucb = (Rudpcb*)c->ptcl;
  233. qlock(ucb);
  234. for(r = ucb->r; r; r = r->next){
  235. if(r->acksent != r->rcvseq)
  236. relsendack(c, r, 0);
  237. }
  238. qunlock(ucb);
  239. qclose(c->rq);
  240. qclose(c->wq);
  241. qclose(c->eq);
  242. ipmove(c->laddr, IPnoaddr);
  243. ipmove(c->raddr, IPnoaddr);
  244. c->lport = 0;
  245. c->rport = 0;
  246. ucb->headers = 0;
  247. ucb->randdrop = 0;
  248. qlock(ucb);
  249. for(r = ucb->r; r; r = nr){
  250. if(r->acksent != r->rcvseq)
  251. relsendack(c, r, 0);
  252. nr = r->next;
  253. relhangup(c, r);
  254. relput(r);
  255. }
  256. ucb->r = 0;
  257. qunlock(ucb);
  258. }
  259. /*
  260. * randomly don't send packets
  261. */
  262. static void
  263. doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
  264. {
  265. Rudpcb *ucb;
  266. ucb = (Rudpcb*)c->ptcl;
  267. if(ucb->randdrop && nrand(100) < ucb->randdrop)
  268. freeblist(bp);
  269. else
  270. ipoput4(f, bp, x, ttl, tos, nil);
  271. }
  272. int
  273. flow(void *v)
  274. {
  275. Reliable *r = v;
  276. return UNACKED(r) <= Maxunacked;
  277. }
  278. void
  279. rudpkick(void *x)
  280. {
  281. Conv *c = x;
  282. Udphdr *uh;
  283. ushort rport;
  284. uchar laddr[IPaddrlen], raddr[IPaddrlen];
  285. Block *bp;
  286. Rudpcb *ucb;
  287. Rudphdr *rh;
  288. Reliable *r;
  289. int dlen, ptcllen;
  290. Rudppriv *upriv;
  291. Fs *f;
  292. upriv = c->p->priv;
  293. f = c->p->f;
  294. netlog(c->p->f, Logrudp, "rudp: kick\n");
  295. bp = qget(c->wq);
  296. if(bp == nil)
  297. return;
  298. ucb = (Rudpcb*)c->ptcl;
  299. switch(ucb->headers) {
  300. case 7:
  301. /* get user specified addresses */
  302. bp = pullupblock(bp, UDP_USEAD7);
  303. if(bp == nil)
  304. return;
  305. ipmove(raddr, bp->rp);
  306. bp->rp += IPaddrlen;
  307. ipmove(laddr, bp->rp);
  308. bp->rp += IPaddrlen;
  309. /* pick interface closest to dest */
  310. if(ipforme(f, laddr) != Runi)
  311. findlocalip(f, laddr, raddr);
  312. bp->rp += IPaddrlen; /* Ignore ifc address */
  313. rport = nhgets(bp->rp);
  314. bp->rp += 2+2; /* Ignore local port */
  315. break;
  316. case 6: /* OBS */
  317. /* get user specified addresses */
  318. bp = pullupblock(bp, UDP_USEAD6);
  319. if(bp == nil)
  320. return;
  321. ipmove(raddr, bp->rp);
  322. bp->rp += IPaddrlen;
  323. ipmove(laddr, bp->rp);
  324. bp->rp += IPaddrlen;
  325. /* pick interface closest to dest */
  326. if(ipforme(f, laddr) != Runi)
  327. findlocalip(f, laddr, raddr);
  328. rport = nhgets(bp->rp);
  329. bp->rp += 4; /* Igonore local port */
  330. break;
  331. default:
  332. ipmove(raddr, c->raddr);
  333. ipmove(laddr, c->laddr);
  334. rport = c->rport;
  335. break;
  336. }
  337. dlen = blocklen(bp);
  338. /* Make space to fit rudp & ip header */
  339. bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
  340. if(bp == nil)
  341. return;
  342. uh = (Udphdr *)(bp->rp);
  343. uh->vihl = IP_VER4;
  344. rh = (Rudphdr*)uh;
  345. ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
  346. uh->Unused = 0;
  347. uh->udpproto = IP_UDPPROTO;
  348. uh->frag[0] = 0;
  349. uh->frag[1] = 0;
  350. hnputs(uh->udpplen, ptcllen);
  351. switch(ucb->headers){
  352. case 6: /* OBS */
  353. case 7:
  354. v6tov4(uh->udpdst, raddr);
  355. hnputs(uh->udpdport, rport);
  356. v6tov4(uh->udpsrc, laddr);
  357. break;
  358. default:
  359. v6tov4(uh->udpdst, c->raddr);
  360. hnputs(uh->udpdport, c->rport);
  361. if(ipcmp(c->laddr, IPnoaddr) == 0)
  362. findlocalip(f, c->laddr, c->raddr);
  363. v6tov4(uh->udpsrc, c->laddr);
  364. break;
  365. }
  366. hnputs(uh->udpsport, c->lport);
  367. hnputs(uh->udplen, ptcllen);
  368. uh->udpcksum[0] = 0;
  369. uh->udpcksum[1] = 0;
  370. qlock(ucb);
  371. r = relstate(ucb, raddr, rport, "kick");
  372. r->sndseq = NEXTSEQ(r->sndseq);
  373. hnputl(rh->relseq, r->sndseq);
  374. hnputl(rh->relsgen, r->sndgen);
  375. hnputl(rh->relack, r->rcvseq); /* ACK last rcvd packet */
  376. hnputl(rh->relagen, r->rcvgen);
  377. if(r->rcvseq != r->acksent)
  378. r->acksent = r->rcvseq;
  379. hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
  380. relackq(r, bp);
  381. qunlock(ucb);
  382. upriv->ustats.rudpOutDatagrams++;
  383. DPRINT("sent: %lud/%lud, %lud/%lud\n",
  384. r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
  385. doipoput(c, f, bp, 0, c->ttl, c->tos);
  386. if(waserror()) {
  387. relput(r);
  388. qunlock(&r->lock);
  389. nexterror();
  390. }
  391. /* flow control of sorts */
  392. qlock(&r->lock);
  393. if(UNACKED(r) > Maxunacked){
  394. r->blocked = 1;
  395. sleep(&r->vous, flow, r);
  396. r->blocked = 0;
  397. }
  398. qunlock(&r->lock);
  399. relput(r);
  400. poperror();
  401. }
  402. void
  403. rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
  404. {
  405. int len, olen, ottl;
  406. Udphdr *uh;
  407. Conv *c;
  408. Rudpcb *ucb;
  409. uchar raddr[IPaddrlen], laddr[IPaddrlen];
  410. ushort rport, lport;
  411. Rudppriv *upriv;
  412. Fs *f;
  413. uchar *p;
  414. upriv = rudp->priv;
  415. f = rudp->f;
  416. upriv->ustats.rudpInDatagrams++;
  417. uh = (Udphdr*)(bp->rp);
  418. /* Put back pseudo header for checksum
  419. * (remember old values for icmpnoconv())
  420. */
  421. ottl = uh->Unused;
  422. uh->Unused = 0;
  423. len = nhgets(uh->udplen);
  424. olen = nhgets(uh->udpplen);
  425. hnputs(uh->udpplen, len);
  426. v4tov6(raddr, uh->udpsrc);
  427. v4tov6(laddr, uh->udpdst);
  428. lport = nhgets(uh->udpdport);
  429. rport = nhgets(uh->udpsport);
  430. if(nhgets(uh->udpcksum)) {
  431. if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
  432. upriv->ustats.rudpInErrors++;
  433. upriv->csumerr++;
  434. netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
  435. DPRINT("rudp: checksum error %I\n", raddr);
  436. freeblist(bp);
  437. return;
  438. }
  439. }
  440. qlock(rudp);
  441. c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
  442. if(c == nil){
  443. /* no converstation found */
  444. upriv->ustats.rudpNoPorts++;
  445. qunlock(rudp);
  446. netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
  447. laddr, lport);
  448. uh->Unused = ottl;
  449. hnputs(uh->udpplen, olen);
  450. icmpnoconv(f, bp);
  451. freeblist(bp);
  452. return;
  453. }
  454. ucb = (Rudpcb*)c->ptcl;
  455. qlock(ucb);
  456. qunlock(rudp);
  457. if(reliput(c, bp, raddr, rport) < 0){
  458. qunlock(ucb);
  459. freeb(bp);
  460. return;
  461. }
  462. /*
  463. * Trim the packet down to data size
  464. */
  465. len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
  466. bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
  467. if(bp == nil) {
  468. netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n",
  469. raddr, rport, laddr, lport);
  470. DPRINT("rudp: len err %I.%d -> %I.%d\n",
  471. raddr, rport, laddr, lport);
  472. upriv->lenerr++;
  473. return;
  474. }
  475. netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n",
  476. raddr, rport, laddr, lport, len);
  477. switch(ucb->headers){
  478. case 7:
  479. /* pass the src address */
  480. bp = padblock(bp, UDP_USEAD7);
  481. p = bp->rp;
  482. ipmove(p, raddr); p += IPaddrlen;
  483. ipmove(p, laddr); p += IPaddrlen;
  484. ipmove(p, ifc->lifc->local); p += IPaddrlen;
  485. hnputs(p, rport); p += 2;
  486. hnputs(p, lport);
  487. break;
  488. case 6: /* OBS */
  489. /* pass the src address */
  490. bp = padblock(bp, UDP_USEAD6);
  491. p = bp->rp;
  492. ipmove(p, raddr); p += IPaddrlen;
  493. ipmove(p, ipforme(f, laddr)==Runi ? laddr : ifc->lifc->local); p += IPaddrlen;
  494. hnputs(p, rport); p += 2;
  495. hnputs(p, lport);
  496. break;
  497. default:
  498. /* connection oriented rudp */
  499. if(ipcmp(c->raddr, IPnoaddr) == 0){
  500. /* save the src address in the conversation */
  501. ipmove(c->raddr, raddr);
  502. c->rport = rport;
  503. /* reply with the same ip address (if not broadcast) */
  504. if(ipforme(f, laddr) == Runi)
  505. ipmove(c->laddr, laddr);
  506. else
  507. v4tov6(c->laddr, ifc->lifc->local);
  508. }
  509. break;
  510. }
  511. if(bp->next)
  512. bp = concatblock(bp);
  513. if(qfull(c->rq)) {
  514. netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
  515. laddr, lport);
  516. freeblist(bp);
  517. }
  518. else
  519. qpass(c->rq, bp);
  520. qunlock(ucb);
  521. }
  522. static char *rudpunknown = "unknown rudp ctl request";
  523. char*
  524. rudpctl(Conv *c, char **f, int n)
  525. {
  526. Rudpcb *ucb;
  527. uchar ip[IPaddrlen];
  528. int x;
  529. ucb = (Rudpcb*)c->ptcl;
  530. if(n < 1)
  531. return rudpunknown;
  532. if(strcmp(f[0], "headers++4") == 0){
  533. ucb->headers = 7; /* new headers format */
  534. return nil;
  535. } else if(strcmp(f[0], "headers") == 0){ /* OBS */
  536. ucb->headers = 6;
  537. return nil;
  538. } else if(strcmp(f[0], "hangup") == 0){
  539. if(n < 3)
  540. return "bad syntax";
  541. parseip(ip, f[1]);
  542. x = atoi(f[2]);
  543. qlock(ucb);
  544. relforget(c, ip, x, 1);
  545. qunlock(ucb);
  546. return nil;
  547. } else if(strcmp(f[0], "randdrop") == 0){
  548. x = 10; /* default is 10% */
  549. if(n > 1)
  550. x = atoi(f[1]);
  551. if(x > 100 || x < 0)
  552. return "illegal rudp drop rate";
  553. ucb->randdrop = x;
  554. return nil;
  555. }
  556. return rudpunknown;
  557. }
  558. void
  559. rudpadvise(Proto *rudp, Block *bp, char *msg)
  560. {
  561. Udphdr *h;
  562. uchar source[IPaddrlen], dest[IPaddrlen];
  563. ushort psource, pdest;
  564. Conv *s, **p;
  565. h = (Udphdr*)(bp->rp);
  566. v4tov6(dest, h->udpdst);
  567. v4tov6(source, h->udpsrc);
  568. psource = nhgets(h->udpsport);
  569. pdest = nhgets(h->udpdport);
  570. /* Look for a connection */
  571. for(p = rudp->conv; *p; p++) {
  572. s = *p;
  573. if(s->rport == pdest)
  574. if(s->lport == psource)
  575. if(ipcmp(s->raddr, dest) == 0)
  576. if(ipcmp(s->laddr, source) == 0){
  577. qhangup(s->rq, msg);
  578. qhangup(s->wq, msg);
  579. break;
  580. }
  581. }
  582. freeblist(bp);
  583. }
  584. int
  585. rudpstats(Proto *rudp, char *buf, int len)
  586. {
  587. Rudppriv *upriv;
  588. upriv = rudp->priv;
  589. return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
  590. upriv->ustats.rudpInDatagrams,
  591. upriv->ustats.rudpNoPorts,
  592. upriv->ustats.rudpInErrors,
  593. upriv->ustats.rudpOutDatagrams,
  594. upriv->rxmits,
  595. upriv->orders);
  596. }
  597. void
  598. rudpinit(Fs *fs)
  599. {
  600. Proto *rudp;
  601. rudp = smalloc(sizeof(Proto));
  602. rudp->priv = smalloc(sizeof(Rudppriv));
  603. rudp->name = "rudp";
  604. rudp->connect = rudpconnect;
  605. rudp->announce = rudpannounce;
  606. rudp->ctl = rudpctl;
  607. rudp->state = rudpstate;
  608. rudp->create = rudpcreate;
  609. rudp->close = rudpclose;
  610. rudp->rcv = rudpiput;
  611. rudp->advise = rudpadvise;
  612. rudp->stats = rudpstats;
  613. rudp->ipproto = IP_UDPPROTO;
  614. rudp->nc = 16;
  615. rudp->ptclsize = sizeof(Rudpcb);
  616. Fsproto(fs, rudp);
  617. }
  618. /*********************************************/
  619. /* Here starts the reliable helper functions */
  620. /*********************************************/
  621. /*
  622. * Enqueue a copy of an unacked block for possible retransmissions
  623. */
  624. void
  625. relackq(Reliable *r, Block *bp)
  626. {
  627. Block *np;
  628. np = copyblock(bp, blocklen(bp));
  629. if(r->unacked)
  630. r->unackedtail->list = np;
  631. else {
  632. /* restart timer */
  633. r->timeout = 0;
  634. r->xmits = 1;
  635. r->unacked = np;
  636. }
  637. r->unackedtail = np;
  638. np->list = nil;
  639. }
  640. /*
  641. * retransmit unacked blocks
  642. */
  643. void
  644. relackproc(void *a)
  645. {
  646. Rudpcb *ucb;
  647. Proto *rudp;
  648. Reliable *r;
  649. Conv **s, *c;
  650. rudp = (Proto *)a;
  651. loop:
  652. tsleep(&up->sleep, return0, 0, Rudptickms);
  653. for(s = rudp->conv; *s; s++) {
  654. c = *s;
  655. ucb = (Rudpcb*)c->ptcl;
  656. qlock(ucb);
  657. for(r = ucb->r; r; r = r->next) {
  658. if(r->unacked != nil){
  659. r->timeout += Rudptickms;
  660. if(r->timeout > Rudprxms*r->xmits)
  661. relrexmit(c, r);
  662. }
  663. if(r->acksent != r->rcvseq)
  664. relsendack(c, r, 0);
  665. }
  666. qunlock(ucb);
  667. }
  668. goto loop;
  669. }
  670. /*
  671. * get the state record for a conversation
  672. */
  673. Reliable*
  674. relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
  675. {
  676. Reliable *r, **l;
  677. l = &ucb->r;
  678. for(r = *l; r; r = *l){
  679. if(memcmp(addr, r->addr, IPaddrlen) == 0 &&
  680. port == r->port)
  681. break;
  682. l = &r->next;
  683. }
  684. /* no state for this addr/port, create some */
  685. if(r == nil){
  686. while(generation == 0)
  687. generation = rand();
  688. DPRINT("from %s new state %lud for %I!%ud\n",
  689. from, generation, addr, port);
  690. r = smalloc(sizeof(Reliable));
  691. memmove(r->addr, addr, IPaddrlen);
  692. r->port = port;
  693. r->unacked = 0;
  694. if(generation == Hangupgen)
  695. generation++;
  696. r->sndgen = generation++;
  697. r->sndseq = 0;
  698. r->ackrcvd = 0;
  699. r->rcvgen = 0;
  700. r->rcvseq = 0;
  701. r->acksent = 0;
  702. r->xmits = 0;
  703. r->timeout = 0;
  704. r->ref = 0;
  705. incref(r); /* one reference for being in the list */
  706. *l = r;
  707. }
  708. incref(r);
  709. return r;
  710. }
  711. void
  712. relput(Reliable *r)
  713. {
  714. if(decref(r) == 0)
  715. free(r);
  716. }
  717. /*
  718. * forget a Reliable state
  719. */
  720. void
  721. relforget(Conv *c, uchar *ip, int port, int originator)
  722. {
  723. Rudpcb *ucb;
  724. Reliable *r, **l;
  725. ucb = (Rudpcb*)c->ptcl;
  726. l = &ucb->r;
  727. for(r = *l; r; r = *l){
  728. if(ipcmp(ip, r->addr) == 0 && port == r->port){
  729. *l = r->next;
  730. if(originator)
  731. relsendack(c, r, 1);
  732. relhangup(c, r);
  733. relput(r); /* remove from the list */
  734. break;
  735. }
  736. l = &r->next;
  737. }
  738. }
  739. /*
  740. * process a rcvd reliable packet. return -1 if not to be passed to user process,
  741. * 0 therwise.
  742. *
  743. * called with ucb locked.
  744. */
  745. int
  746. reliput(Conv *c, Block *bp, uchar *addr, ushort port)
  747. {
  748. Block *nbp;
  749. Rudpcb *ucb;
  750. Rudppriv *upriv;
  751. Udphdr *uh;
  752. Reliable *r;
  753. Rudphdr *rh;
  754. ulong seq, ack, sgen, agen, ackreal;
  755. int rv = -1;
  756. /* get fields */
  757. uh = (Udphdr*)(bp->rp);
  758. rh = (Rudphdr*)uh;
  759. seq = nhgetl(rh->relseq);
  760. sgen = nhgetl(rh->relsgen);
  761. ack = nhgetl(rh->relack);
  762. agen = nhgetl(rh->relagen);
  763. upriv = c->p->priv;
  764. ucb = (Rudpcb*)c->ptcl;
  765. r = relstate(ucb, addr, port, "input");
  766. DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n",
  767. seq, sgen, ack, agen, r->sndgen);
  768. /* if acking an incorrect generation, ignore */
  769. if(ack && agen != r->sndgen)
  770. goto out;
  771. /* Look for a hangup */
  772. if(sgen == Hangupgen) {
  773. if(agen == r->sndgen)
  774. relforget(c, addr, port, 0);
  775. goto out;
  776. }
  777. /* make sure we're not talking to a new remote side */
  778. if(r->rcvgen != sgen){
  779. if(seq != 0 && seq != 1)
  780. goto out;
  781. /* new connection */
  782. if(r->rcvgen != 0){
  783. DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
  784. relhangup(c, r);
  785. }
  786. r->rcvgen = sgen;
  787. }
  788. /* dequeue acked packets */
  789. if(ack && agen == r->sndgen){
  790. ackreal = 0;
  791. while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
  792. nbp = r->unacked;
  793. r->unacked = nbp->list;
  794. DPRINT("%lud/%lud acked, r->sndgen = %lud\n",
  795. ack, agen, r->sndgen);
  796. freeb(nbp);
  797. r->ackrcvd = NEXTSEQ(r->ackrcvd);
  798. ackreal = 1;
  799. }
  800. /* flow control */
  801. if(UNACKED(r) < Maxunacked/8 && r->blocked)
  802. wakeup(&r->vous);
  803. /*
  804. * retransmit next packet if the acked packet
  805. * was transmitted more than once
  806. */
  807. if(ackreal && r->unacked != nil){
  808. r->timeout = 0;
  809. if(r->xmits > 1){
  810. r->xmits = 1;
  811. relrexmit(c, r);
  812. }
  813. }
  814. }
  815. /* no message or input queue full */
  816. if(seq == 0 || qfull(c->rq))
  817. goto out;
  818. /* refuse out of order delivery */
  819. if(seq != NEXTSEQ(r->rcvseq)){
  820. relsendack(c, r, 0); /* tell him we got it already */
  821. upriv->orders++;
  822. DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
  823. goto out;
  824. }
  825. r->rcvseq = seq;
  826. rv = 0;
  827. out:
  828. relput(r);
  829. return rv;
  830. }
  831. void
  832. relsendack(Conv *c, Reliable *r, int hangup)
  833. {
  834. Udphdr *uh;
  835. Block *bp;
  836. Rudphdr *rh;
  837. int ptcllen;
  838. Fs *f;
  839. bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
  840. if(bp == nil)
  841. return;
  842. bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
  843. f = c->p->f;
  844. uh = (Udphdr *)(bp->rp);
  845. uh->vihl = IP_VER4;
  846. rh = (Rudphdr*)uh;
  847. ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
  848. uh->Unused = 0;
  849. uh->udpproto = IP_UDPPROTO;
  850. uh->frag[0] = 0;
  851. uh->frag[1] = 0;
  852. hnputs(uh->udpplen, ptcllen);
  853. v6tov4(uh->udpdst, r->addr);
  854. hnputs(uh->udpdport, r->port);
  855. hnputs(uh->udpsport, c->lport);
  856. if(ipcmp(c->laddr, IPnoaddr) == 0)
  857. findlocalip(f, c->laddr, c->raddr);
  858. v6tov4(uh->udpsrc, c->laddr);
  859. hnputs(uh->udplen, ptcllen);
  860. if(hangup)
  861. hnputl(rh->relsgen, Hangupgen);
  862. else
  863. hnputl(rh->relsgen, r->sndgen);
  864. hnputl(rh->relseq, 0);
  865. hnputl(rh->relagen, r->rcvgen);
  866. hnputl(rh->relack, r->rcvseq);
  867. if(r->acksent < r->rcvseq)
  868. r->acksent = r->rcvseq;
  869. uh->udpcksum[0] = 0;
  870. uh->udpcksum[1] = 0;
  871. hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
  872. DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
  873. doipoput(c, f, bp, 0, c->ttl, c->tos);
  874. }
  875. /*
  876. * called with ucb locked (and c locked if user initiated close)
  877. */
  878. void
  879. relhangup(Conv *c, Reliable *r)
  880. {
  881. int n;
  882. Block *bp;
  883. char hup[ERRMAX];
  884. n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
  885. qproduce(c->eq, hup, n);
  886. /*
  887. * dump any unacked outgoing messages
  888. */
  889. for(bp = r->unacked; bp != nil; bp = r->unacked){
  890. r->unacked = bp->list;
  891. bp->list = nil;
  892. freeb(bp);
  893. }
  894. r->rcvgen = 0;
  895. r->rcvseq = 0;
  896. r->acksent = 0;
  897. if(generation == Hangupgen)
  898. generation++;
  899. r->sndgen = generation++;
  900. r->sndseq = 0;
  901. r->ackrcvd = 0;
  902. r->xmits = 0;
  903. r->timeout = 0;
  904. wakeup(&r->vous);
  905. }
  906. /*
  907. * called with ucb locked
  908. */
  909. void
  910. relrexmit(Conv *c, Reliable *r)
  911. {
  912. Rudppriv *upriv;
  913. Block *np;
  914. Fs *f;
  915. upriv = c->p->priv;
  916. f = c->p->f;
  917. r->timeout = 0;
  918. if(r->xmits++ > Rudpmaxxmit){
  919. relhangup(c, r);
  920. return;
  921. }
  922. upriv->rxmits++;
  923. np = copyblock(r->unacked, blocklen(r->unacked));
  924. DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
  925. doipoput(c, f, np, 0, c->ttl, c->tos);
  926. }