aan.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. /*
  2. * This file is part of the UCB release of Plan 9. It is subject to the license
  3. * terms in the LICENSE file found in the top-level directory of this
  4. * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
  5. * part of the UCB release of Plan 9, including this file, may be copied,
  6. * modified, propagated, or distributed except according to the terms contained
  7. * in the LICENSE file.
  8. */
  9. #include <u.h>
  10. #include <libc.h>
  11. #include <auth.h>
  12. #include <fcall.h>
  13. #include <thread.h>
  14. #define NS(x) ((int64_t)x)
  15. #define US(x) (NS(x) * 1000LL)
  16. #define MS(x) (US(x) * 1000LL)
  17. #define S(x) (MS(x) * 1000LL)
  18. #define LOGNAME "aan"
  19. enum {
  20. Synctime = S(8),
  21. Nbuf = 10,
  22. K = 1024,
  23. Bufsize = 8 * K,
  24. Stacksize = 8 * K,
  25. Timer = 0, /* Alt channels. */
  26. Unsent = 1,
  27. Maxto = 24 * 3600, /* A full day to reconnect. */
  28. Hdrsz = 12,
  29. };
  30. typedef struct Endpoints Endpoints;
  31. struct Endpoints {
  32. char *lsys;
  33. char *lserv;
  34. char *rsys;
  35. char *rserv;
  36. };
  37. typedef struct {
  38. uint32_t nb; /* Number of data bytes in this message */
  39. uint32_t msg; /* Message number */
  40. uint32_t acked; /* Number of messages acked */
  41. } Hdr;
  42. typedef struct {
  43. Hdr hdr;
  44. uint8_t buf[Bufsize];
  45. } Buf;
  46. static char *Logname = LOGNAME;
  47. static int client;
  48. static int debug;
  49. static char *devdir;
  50. static char *dialstring;
  51. static int done;
  52. static int inmsg;
  53. static int maxto = Maxto;
  54. static int netfd;
  55. static Channel *empty;
  56. static Channel *unacked;
  57. static Channel *unsent;
  58. static Alt a[] = {
  59. /* c v op */
  60. { nil, nil, CHANRCV }, /* timer */
  61. { nil, nil, CHANRCV }, /* unsent */
  62. { nil, nil, CHANEND },
  63. };
  64. static void dmessage(int, char *, ...);
  65. static void freeendpoints(Endpoints *);
  66. static void fromclient(void*);
  67. static void fromnet(void*);
  68. static Endpoints *getendpoints(char *);
  69. static void packhdr(Hdr *, uint8_t *);
  70. static void reconnect(void);
  71. static void showmsg(int, char *, Buf *);
  72. static void synchronize(void);
  73. static void timerproc(void *);
  74. static void unpackhdr(Hdr *, uint8_t *);
  75. static int writen(int, uint8_t *, int);
  76. static void
  77. usage(void)
  78. {
  79. fprint(2, "Usage: %s [-cd] [-m maxto] dialstring|netdir\n", argv0);
  80. threadexitsall("usage");
  81. }
  82. static int
  83. catch(void *v, char *s)
  84. {
  85. if (strstr(s, "alarm") != nil) {
  86. syslog(0, Logname, "Timed out waiting for client on %s, exiting...",
  87. devdir);
  88. threadexitsall(nil);
  89. }
  90. return 0;
  91. }
  92. void
  93. threadmain(int argc, char **argv)
  94. {
  95. int i, fd, failed, delta;
  96. int64_t synctime, now;
  97. char *p;
  98. unsigned char buf[Hdrsz];
  99. Buf *b, *eb;
  100. Channel *timer;
  101. Hdr hdr;
  102. ARGBEGIN {
  103. case 'c':
  104. client++;
  105. break;
  106. case 'd':
  107. debug++;
  108. break;
  109. case 'm':
  110. maxto = strtol(EARGF(usage()), (char **)nil, 0);
  111. break;
  112. default:
  113. usage();
  114. } ARGEND;
  115. if (argc != 1)
  116. usage();
  117. if (!client) {
  118. devdir = argv[0];
  119. if ((p = strstr(devdir, "/local")) != nil)
  120. *p = '\0';
  121. }else
  122. dialstring = argv[0];
  123. if (debug > 0) {
  124. fd = open("#c/cons", OWRITE|OCEXEC);
  125. dup(fd, 2);
  126. }
  127. fmtinstall('F', fcallfmt);
  128. atnotify(catch, 1);
  129. unsent = chancreate(sizeof(Buf *), Nbuf);
  130. unacked= chancreate(sizeof(Buf *), Nbuf);
  131. empty = chancreate(sizeof(Buf *), Nbuf);
  132. timer = chancreate(sizeof(unsigned char *), 1);
  133. for (i = 0; i != Nbuf; i++) {
  134. eb = malloc(sizeof(Buf));
  135. sendp(empty, eb);
  136. }
  137. netfd = -1;
  138. if (proccreate(fromnet, nil, Stacksize) < 0)
  139. sysfatal("Cannot start fromnet; %r");
  140. reconnect(); /* Set up the initial connection. */
  141. synchronize();
  142. if (proccreate(fromclient, nil, Stacksize) < 0)
  143. sysfatal("cannot start fromclient; %r");
  144. if (proccreate(timerproc, timer, Stacksize) < 0)
  145. sysfatal("Cannot start timerproc; %r");
  146. a[Timer].c = timer;
  147. a[Unsent].c = unsent;
  148. a[Unsent].v = &b;
  149. synctime = nsec() + Synctime;
  150. failed = 0;
  151. while (!done) {
  152. if (failed) {
  153. /* Wait for the netreader to die. */
  154. while (netfd >= 0) {
  155. dmessage(1, "main; waiting for netreader to die\n");
  156. sleep(1000);
  157. }
  158. /* the reader died; reestablish the world. */
  159. reconnect();
  160. synchronize();
  161. failed = 0;
  162. }
  163. now = nsec();
  164. delta = (synctime - nsec()) / MS(1);
  165. if (delta <= 0) {
  166. hdr.nb = 0;
  167. hdr.acked = inmsg;
  168. hdr.msg = -1;
  169. packhdr(&hdr, buf);
  170. if (writen(netfd, buf, sizeof(buf)) < 0) {
  171. dmessage(2, "main; writen failed; %r\n");
  172. failed = 1;
  173. continue;
  174. }
  175. synctime = nsec() + Synctime;
  176. assert(synctime > now);
  177. }
  178. switch (alt(a)) {
  179. case Timer:
  180. break;
  181. case Unsent:
  182. sendp(unacked, b);
  183. b->hdr.acked = inmsg;
  184. packhdr(&b->hdr, buf);
  185. if (writen(netfd, buf, sizeof(buf)) < 0 ||
  186. writen(netfd, b->buf, b->hdr.nb) < 0) {
  187. dmessage(2, "main; writen failed; %r\n");
  188. failed = 1;
  189. }
  190. if (b->hdr.nb == 0)
  191. done = 1;
  192. break;
  193. }
  194. }
  195. syslog(0, Logname, "exiting...");
  196. threadexitsall(nil);
  197. }
  198. static void
  199. fromclient(void *v)
  200. {
  201. Buf *b;
  202. static int outmsg;
  203. do {
  204. b = recvp(empty);
  205. if ((int)(b->hdr.nb = read(0, b->buf, Bufsize)) <= 0) {
  206. if ((int)b->hdr.nb < 0)
  207. dmessage(2, "fromclient; Cannot read 9P message; %r\n");
  208. else
  209. dmessage(2, "fromclient; Client terminated\n");
  210. b->hdr.nb = 0;
  211. }
  212. b->hdr.msg = outmsg++;
  213. showmsg(1, "fromclient", b);
  214. sendp(unsent, b);
  215. } while (b->hdr.nb != 0);
  216. }
  217. static void
  218. fromnet(void *v)
  219. {
  220. int len, acked, i;
  221. uint8_t buf[Hdrsz];
  222. Buf *b, *rb;
  223. static int lastacked;
  224. b = (Buf *)malloc(sizeof(Buf));
  225. assert(b);
  226. while (!done) {
  227. while (netfd < 0) {
  228. dmessage(1, "fromnet; waiting for connection... (inmsg %d)\n",
  229. inmsg);
  230. sleep(1000);
  231. }
  232. /* Read the header. */
  233. if ((len = readn(netfd, buf, sizeof(buf))) <= 0) {
  234. if (len < 0)
  235. dmessage(1, "fromnet; (hdr) network failure; %r\n");
  236. else
  237. dmessage(1, "fromnet; (hdr) network closed\n");
  238. close(netfd);
  239. netfd = -1;
  240. continue;
  241. }
  242. unpackhdr(&b->hdr, buf);
  243. dmessage(2, "fromnet: Got message, size %d, nb %d, msg %d\n",
  244. len, b->hdr.nb, b->hdr.msg);
  245. if (b->hdr.nb == 0) {
  246. if ((int32_t)b->hdr.msg >= 0) {
  247. dmessage(1, "fromnet; network closed\n");
  248. break;
  249. }
  250. continue;
  251. }
  252. if ((len = readn(netfd, b->buf, b->hdr.nb)) <= 0 ||
  253. len != b->hdr.nb) {
  254. if (len == 0)
  255. dmessage(1, "fromnet; network closed\n");
  256. else
  257. dmessage(1, "fromnet; network failure; %r\n");
  258. close(netfd);
  259. netfd = -1;
  260. continue;
  261. }
  262. if (b->hdr.msg < inmsg) {
  263. dmessage(1, "fromnet; skipping message %d, currently at %d\n",
  264. b->hdr.msg, inmsg);
  265. continue;
  266. }
  267. /* Process the acked list. */
  268. acked = b->hdr.acked - lastacked;
  269. for (i = 0; i != acked; i++) {
  270. rb = recvp(unacked);
  271. if (rb->hdr.msg != lastacked + i) {
  272. dmessage(1, "rb %p, msg %d, lastacked %d, i %d\n",
  273. rb, rb? rb->hdr.msg: -2, lastacked, i);
  274. assert(0);
  275. }
  276. rb->hdr.msg = -1;
  277. sendp(empty, rb);
  278. }
  279. lastacked = b->hdr.acked;
  280. inmsg++;
  281. showmsg(1, "fromnet", b);
  282. if (writen(1, b->buf, len) < 0)
  283. sysfatal("fromnet; cannot write to client; %r");
  284. }
  285. done = 1;
  286. }
  287. static void
  288. reconnect(void)
  289. {
  290. char err[32], ldir[40];
  291. int lcfd, fd;
  292. Endpoints *ep;
  293. if (dialstring) {
  294. syslog(0, Logname, "dialing %s", dialstring);
  295. while ((fd = dial(dialstring, nil, nil, nil)) < 0) {
  296. err[0] = '\0';
  297. errstr(err, sizeof err);
  298. if (strstr(err, "connection refused")) {
  299. dmessage(1, "reconnect; server died...\n");
  300. threadexitsall("server died...");
  301. }
  302. dmessage(1, "reconnect: dialed %s; %s\n", dialstring, err);
  303. sleep(1000);
  304. }
  305. syslog(0, Logname, "reconnected to %s", dialstring);
  306. } else {
  307. syslog(0, Logname, "waiting for connection on %s", devdir);
  308. alarm(maxto * 1000);
  309. if ((lcfd = listen(devdir, ldir)) < 0)
  310. sysfatal("reconnect; cannot listen; %r");
  311. if ((fd = accept(lcfd, ldir)) < 0)
  312. sysfatal("reconnect; cannot accept; %r");
  313. alarm(0);
  314. close(lcfd);
  315. ep = getendpoints(ldir);
  316. dmessage(1, "rsys '%s'\n", ep->rsys);
  317. syslog(0, Logname, "connected from %s", ep->rsys);
  318. freeendpoints(ep);
  319. }
  320. netfd = fd; /* Wakes up the netreader. */
  321. }
  322. static void
  323. synchronize(void)
  324. {
  325. Channel *tmp;
  326. Buf *b;
  327. uint8_t buf[Hdrsz];
  328. /*
  329. * Ignore network errors here. If we fail during
  330. * synchronization, the next alarm will pick up
  331. * the error.
  332. */
  333. tmp = chancreate(sizeof(Buf *), Nbuf);
  334. while ((b = nbrecvp(unacked)) != nil) {
  335. packhdr(&b->hdr, buf);
  336. writen(netfd, buf, sizeof(buf));
  337. writen(netfd, b->buf, b->hdr.nb);
  338. sendp(tmp, b);
  339. }
  340. chanfree(unacked);
  341. unacked = tmp;
  342. }
  343. static void
  344. showmsg(int level, char *s, Buf *b)
  345. {
  346. if (b == nil) {
  347. dmessage(level, "%s; b == nil\n", s);
  348. return;
  349. }
  350. dmessage(level, "%s; (len %d) %X %X %X %X %X %X %X %X %X (%p)\n", s,
  351. b->hdr.nb,
  352. b->buf[0], b->buf[1], b->buf[2],
  353. b->buf[3], b->buf[4], b->buf[5],
  354. b->buf[6], b->buf[7], b->buf[8], b);
  355. }
  356. static int
  357. writen(int fd, uint8_t *buf, int nb)
  358. {
  359. int n, len = nb;
  360. while (nb > 0) {
  361. if (fd < 0)
  362. return -1;
  363. if ((n = write(fd, buf, nb)) < 0) {
  364. dmessage(1, "writen; Write failed; %r\n");
  365. return -1;
  366. }
  367. dmessage(2, "writen: wrote %d bytes\n", n);
  368. buf += n;
  369. nb -= n;
  370. }
  371. return len;
  372. }
  373. static void
  374. timerproc(void *x)
  375. {
  376. Channel *timer = x;
  377. while (!done) {
  378. sleep((Synctime / MS(1)) >> 1);
  379. sendp(timer, "timer");
  380. }
  381. }
  382. static void
  383. dmessage(int level, char *fmt, ...)
  384. {
  385. va_list arg;
  386. if (level > debug)
  387. return;
  388. va_start(arg, fmt);
  389. vfprint(2, fmt, arg);
  390. va_end(arg);
  391. }
  392. static void
  393. getendpoint(char *dir, char *file, char **sysp, char **servp)
  394. {
  395. int fd, n;
  396. char buf[128];
  397. char *sys, *serv;
  398. sys = serv = 0;
  399. snprint(buf, sizeof buf, "%s/%s", dir, file);
  400. fd = open(buf, OREAD);
  401. if(fd >= 0){
  402. n = read(fd, buf, sizeof(buf)-1);
  403. if(n>0){
  404. buf[n-1] = 0;
  405. serv = strchr(buf, '!');
  406. if(serv){
  407. *serv++ = 0;
  408. serv = strdup(serv);
  409. }
  410. sys = strdup(buf);
  411. }
  412. close(fd);
  413. }
  414. if(serv == 0)
  415. serv = strdup("unknown");
  416. if(sys == 0)
  417. sys = strdup("unknown");
  418. *servp = serv;
  419. *sysp = sys;
  420. }
  421. static Endpoints *
  422. getendpoints(char *dir)
  423. {
  424. Endpoints *ep;
  425. ep = malloc(sizeof(*ep));
  426. getendpoint(dir, "local", &ep->lsys, &ep->lserv);
  427. getendpoint(dir, "remote", &ep->rsys, &ep->rserv);
  428. return ep;
  429. }
  430. static void
  431. freeendpoints(Endpoints *ep)
  432. {
  433. free(ep->lsys);
  434. free(ep->rsys);
  435. free(ep->lserv);
  436. free(ep->rserv);
  437. free(ep);
  438. }
  439. /* p must be a unsigned char* */
  440. #define U32GET(p) (p[0] | p[1]<<8 | p[2]<<16 | p[3]<<24)
  441. #define U32PUT(p,v) (p)[0] = (v); (p)[1] = (v)>>8; \
  442. (p)[2] = (v)>>16; (p)[3] = (v)>>24
  443. static void
  444. packhdr(Hdr *hdr, uint8_t *buf)
  445. {
  446. uint8_t *p;
  447. p = buf;
  448. U32PUT(p, hdr->nb);
  449. p += 4;
  450. U32PUT(p, hdr->msg);
  451. p += 4;
  452. U32PUT(p, hdr->acked);
  453. }
  454. static void
  455. unpackhdr(Hdr *hdr, uint8_t *buf)
  456. {
  457. uint8_t *p;
  458. p = buf;
  459. hdr->nb = U32GET(p);
  460. p += 4;
  461. hdr->msg = U32GET(p);
  462. p += 4;
  463. hdr->acked = U32GET(p);
  464. }