runq.c 13 KB


  1. #include "common.h"
  2. #include <ctype.h>
  3. void doalldirs(void);
  4. void dodir(char*);
  5. void dofile(Dir*);
  6. void rundir(char*);
  7. char* file(char*, char);
  8. void warning(char*, void*);
  9. void error(char*, void*);
  10. int returnmail(char**, char*, char*);
  11. void logit(char*, char*, char**);
  12. void doload(int);
  13. #define HUNK 32
  14. char *cmd;
  15. char *root;
  16. int debug;
  17. int giveup = 2*24*60*60;
  18. int load;
  19. int limit;
  20. /* the current directory */
  21. Dir *dirbuf;
  22. long ndirbuf = 0;
  23. int nfiles;
  24. char *curdir;
  25. char *runqlog = "runq";
  26. int *pidlist;
  27. char **badsys; /* array of recalcitrant systems */
  28. int nbad;
  29. int npid = 50;
  30. int sflag; /* single thread per directory */
  31. int aflag; /* all directories */
  32. int Eflag; /* ignore E.xxxxxx dates */
  33. int Rflag; /* no giving up, ever */
  34. void
  35. usage(void)
  36. {
  37. fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
  38. exits("");
  39. }
  40. void
  41. main(int argc, char **argv)
  42. {
  43. char *qdir, *x;
  44. qdir = 0;
  45. ARGBEGIN{
  46. case 'l':
  47. x = ARGF();
  48. if(x == 0)
  49. usage();
  50. load = atoi(x);
  51. if(load < 0)
  52. load = 0;
  53. break;
  54. case 'E':
  55. Eflag++;
  56. break;
  57. case 'R': /* no giving up -- just leave stuff in the queue */
  58. Rflag++;
  59. break;
  60. case 'a':
  61. aflag++;
  62. break;
  63. case 'd':
  64. debug++;
  65. break;
  66. case 'r':
  67. limit = atoi(ARGF());
  68. break;
  69. case 's':
  70. sflag++;
  71. break;
  72. case 't':
  73. giveup = 60*60*atoi(ARGF());
  74. break;
  75. case 'q':
  76. qdir = ARGF();
  77. if(qdir == 0)
  78. usage();
  79. break;
  80. case 'n':
  81. npid = atoi(ARGF());
  82. if(npid == 0)
  83. usage();
  84. break;
  85. }ARGEND;
  86. if(argc != 2)
  87. usage();
  88. pidlist = malloc(npid*sizeof(*pidlist));
  89. if(pidlist == 0)
  90. error("can't malloc", 0);
  91. if(aflag == 0 && qdir == 0) {
  92. qdir = getuser();
  93. if(qdir == 0)
  94. error("unknown user", 0);
  95. }
  96. root = argv[0];
  97. cmd = argv[1];
  98. if(chdir(root) < 0)
  99. error("can't cd to %s", root);
  100. doload(1);
  101. if(aflag)
  102. doalldirs();
  103. else
  104. dodir(qdir);
  105. doload(0);
  106. exits(0);
  107. }
  108. int
  109. emptydir(char *name)
  110. {
  111. int fd;
  112. long n;
  113. char buf[2048];
  114. fd = open(name, OREAD);
  115. if(fd < 0)
  116. return 1;
  117. n = read(fd, buf, sizeof(buf));
  118. close(fd);
  119. if(n <= 0) {
  120. if(debug)
  121. fprint(2, "removing directory %s\n", name);
  122. syslog(0, runqlog, "rmdir %s", name);
  123. sysremove(name);
  124. return 1;
  125. }
  126. return 0;
  127. }
  128. int
  129. forkltd(void)
  130. {
  131. int i;
  132. int pid;
  133. for(i = 0; i < npid; i++){
  134. if(pidlist[i] <= 0)
  135. break;
  136. }
  137. while(i >= npid){
  138. pid = waitpid();
  139. if(pid < 0){
  140. syslog(0, runqlog, "forkltd confused");
  141. exits(0);
  142. }
  143. for(i = 0; i < npid; i++)
  144. if(pidlist[i] == pid)
  145. break;
  146. }
  147. pidlist[i] = fork();
  148. return pidlist[i];
  149. }
  150. /*
  151. * run all user directories, must be bootes (or root on unix) to do this
  152. */
  153. void
  154. doalldirs(void)
  155. {
  156. Dir *db;
  157. int fd;
  158. long i, n;
  159. fd = open(".", OREAD);
  160. if(fd == -1){
  161. warning("reading %s", root);
  162. return;
  163. }
  164. n = sysdirreadall(fd, &db);
  165. if(n > 0){
  166. for(i=0; i<n; i++){
  167. if(db[i].qid.type & QTDIR){
  168. if(emptydir(db[i].name))
  169. continue;
  170. switch(forkltd()){
  171. case -1:
  172. syslog(0, runqlog, "out of procs");
  173. doload(0);
  174. exits(0);
  175. case 0:
  176. if(sysdetach() < 0)
  177. error("%r", 0);
  178. dodir(db[i].name);
  179. exits(0);
  180. default:
  181. break;
  182. }
  183. }
  184. }
  185. free(db);
  186. }
  187. close(fd);
  188. }
  189. /*
  190. * cd to a user directory and run it
  191. */
  192. void
  193. dodir(char *name)
  194. {
  195. curdir = name;
  196. if(chdir(name) < 0){
  197. warning("cd to %s", name);
  198. return;
  199. }
  200. if(debug)
  201. fprint(2, "running %s\n", name);
  202. rundir(name);
  203. chdir("..");
  204. }
  205. /*
  206. * run the current directory
  207. */
  208. void
  209. rundir(char *name)
  210. {
  211. int fd;
  212. long i;
  213. if(aflag && sflag)
  214. fd = sysopenlocked(".", OREAD);
  215. else
  216. fd = open(".", OREAD);
  217. if(fd == -1){
  218. warning("reading %s", name);
  219. return;
  220. }
  221. nfiles = sysdirreadall(fd, &dirbuf);
  222. if(nfiles > 0){
  223. for(i=0; i<nfiles; i++){
  224. if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
  225. continue;
  226. dofile(&dirbuf[i]);
  227. }
  228. free(dirbuf);
  229. }
  230. if(aflag && sflag)
  231. sysunlockfile(fd);
  232. else
  233. close(fd);
  234. }
  235. /*
  236. * free files matching name in the current directory
  237. */
  238. void
  239. remmatch(char *name)
  240. {
  241. long i;
  242. syslog(0, runqlog, "removing %s/%s", curdir, name);
  243. for(i=0; i<nfiles; i++){
  244. if(strcmp(&dirbuf[i].name[1], &name[1]) == 0)
  245. sysremove(dirbuf[i].name);
  246. }
  247. /* error file (may have) appeared after we read the directory */
  248. /* stomp on data file in case of phase error */
  249. sysremove(file(name, 'D'));
  250. sysremove(file(name, 'E'));
  251. }
  252. /*
  253. * like trylock, but we've already got the lock on fd,
  254. * and don't want an L. lock file.
  255. */
  256. static Mlock *
  257. keeplockalive(char *path, int fd)
  258. {
  259. char buf[1];
  260. Mlock *l;
  261. l = malloc(sizeof(Mlock));
  262. if(l == 0)
  263. return 0;
  264. l->fd = fd;
  265. l->name = s_new();
  266. s_append(l->name, path);
  267. /* fork process to keep lock alive until sysunlock(l) */
  268. switch(l->pid = rfork(RFPROC)){
  269. default:
  270. break;
  271. case 0:
  272. fd = l->fd;
  273. for(;;){
  274. sleep(1000*60);
  275. if(pread(fd, buf, 1, 0) < 0)
  276. break;
  277. }
  278. _exits(0);
  279. }
  280. return l;
  281. }
  282. /*
  283. * try a message
  284. */
  285. void
  286. dofile(Dir *dp)
  287. {
  288. Dir *d;
  289. int dfd, ac, dtime, efd, pid, i, etime;
  290. char *buf, *cp, **av;
  291. Waitmsg *wm;
  292. Biobuf *b;
  293. Mlock *l = nil;
  294. if(debug)
  295. fprint(2, "dofile %s\n", dp->name);
  296. /*
  297. * if no data file or empty control or data file, just clean up
  298. * the empty control file must be 15 minutes old, to minimize the
  299. * chance of a race.
  300. */
  301. d = dirstat(file(dp->name, 'D'));
  302. if(d == nil){
  303. syslog(0, runqlog, "no data file for %s", dp->name);
  304. remmatch(dp->name);
  305. return;
  306. }
  307. if(dp->length == 0){
  308. if(time(0)-dp->mtime > 15*60){
  309. syslog(0, runqlog, "empty ctl file for %s", dp->name);
  310. remmatch(dp->name);
  311. }
  312. return;
  313. }
  314. dtime = d->mtime;
  315. free(d);
  316. /*
  317. * retry times depend on the age of the errors file
  318. */
  319. if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){
  320. etime = d->mtime;
  321. free(d);
  322. if(etime - dtime < 60*60){
  323. /* up to the first hour, try every 15 minutes */
  324. if(time(0) - etime < 15*60)
  325. return;
  326. } else {
  327. /* after the first hour, try once an hour */
  328. if(time(0) - etime < 60*60)
  329. return;
  330. }
  331. }
  332. /*
  333. * open control and data
  334. */
  335. b = sysopen(file(dp->name, 'C'), "rl", 0660);
  336. if(b == 0) {
  337. if(debug)
  338. fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
  339. return;
  340. }
  341. dfd = open(file(dp->name, 'D'), OREAD);
  342. if(dfd < 0){
  343. if(debug)
  344. fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
  345. Bterm(b);
  346. sysunlockfile(Bfildes(b));
  347. return;
  348. }
  349. /*
  350. * make arg list
  351. * - read args into (malloc'd) buffer
  352. * - malloc a vector and copy pointers to args into it
  353. */
  354. buf = malloc(dp->length+1);
  355. if(buf == 0){
  356. warning("buffer allocation", 0);
  357. Bterm(b);
  358. sysunlockfile(Bfildes(b));
  359. close(dfd);
  360. return;
  361. }
  362. if(Bread(b, buf, dp->length) != dp->length){
  363. warning("reading control file %s\n", dp->name);
  364. Bterm(b);
  365. sysunlockfile(Bfildes(b));
  366. close(dfd);
  367. free(buf);
  368. return;
  369. }
  370. buf[dp->length] = 0;
  371. av = malloc(2*sizeof(char*));
  372. if(av == 0){
  373. warning("argv allocation", 0);
  374. close(dfd);
  375. free(buf);
  376. Bterm(b);
  377. sysunlockfile(Bfildes(b));
  378. return;
  379. }
  380. for(ac = 1, cp = buf; *cp; ac++){
  381. while(isspace(*cp))
  382. *cp++ = 0;
  383. if(*cp == 0)
  384. break;
  385. av = realloc(av, (ac+2)*sizeof(char*));
  386. if(av == 0){
  387. warning("argv allocation", 0);
  388. close(dfd);
  389. free(buf);
  390. Bterm(b);
  391. sysunlockfile(Bfildes(b));
  392. return;
  393. }
  394. av[ac] = cp;
  395. while(*cp && !isspace(*cp)){
  396. if(*cp++ == '"'){
  397. while(*cp && *cp != '"')
  398. cp++;
  399. if(*cp)
  400. cp++;
  401. }
  402. }
  403. }
  404. av[0] = cmd;
  405. av[ac] = 0;
  406. if(!Eflag &&time(0) - dtime > giveup){
  407. if(returnmail(av, dp->name, "Giveup") != 0)
  408. logit("returnmail failed", dp->name, av);
  409. remmatch(dp->name);
  410. goto done;
  411. }
  412. for(i = 0; i < nbad; i++){
  413. if(strcmp(av[3], badsys[i]) == 0)
  414. goto done;
  415. }
  416. /*
  417. * Ken's fs, for example, gives us 5 minutes of inactivity before
  418. * the lock goes stale, so we have to keep reading it.
  419. */
  420. l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
  421. /*
  422. * transfer
  423. */
  424. pid = fork();
  425. switch(pid){
  426. case -1:
  427. sysunlock(l);
  428. sysunlockfile(Bfildes(b));
  429. syslog(0, runqlog, "out of procs");
  430. exits(0);
  431. case 0:
  432. if(debug) {
  433. fprint(2, "Starting %s", cmd);
  434. for(ac = 0; av[ac]; ac++)
  435. fprint(2, " %s", av[ac]);
  436. fprint(2, "\n");
  437. }
  438. logit("execing", dp->name, av);
  439. close(0);
  440. dup(dfd, 0);
  441. close(dfd);
  442. close(2);
  443. efd = open(file(dp->name, 'E'), OWRITE);
  444. if(efd < 0){
  445. if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
  446. efd = create(file(dp->name, 'E'), OWRITE, 0666);
  447. if(efd < 0){
  448. if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
  449. exits("could not open error file - Retry");
  450. }
  451. }
  452. seek(efd, 0, 2);
  453. exec(cmd, av);
  454. error("can't exec %s", cmd);
  455. break;
  456. default:
  457. for(;;){
  458. wm = wait();
  459. if(wm == nil)
  460. error("wait failed: %r", "");
  461. if(wm->pid == pid)
  462. break;
  463. free(wm);
  464. }
  465. if(debug)
  466. fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
  467. if(wm->msg[0]){
  468. if(debug)
  469. fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
  470. if(!Rflag && strstr(wm->msg, "Retry")==0){
  471. /* return the message and remove it */
  472. if(returnmail(av, dp->name, wm->msg) != 0)
  473. logit("returnmail failed", dp->name, av);
  474. remmatch(dp->name);
  475. } else {
  476. /* add sys to bad list and try again later */
  477. nbad++;
  478. badsys = realloc(badsys, nbad*sizeof(char*));
  479. badsys[nbad-1] = strdup(av[3]);
  480. }
  481. } else {
  482. /* it worked remove the message */
  483. remmatch(dp->name);
  484. }
  485. free(wm);
  486. }
  487. done:
  488. if (l)
  489. sysunlock(l);
  490. Bterm(b);
  491. sysunlockfile(Bfildes(b));
  492. free(buf);
  493. free(av);
  494. close(dfd);
  495. }
  496. /*
  497. * return a name starting with the given character
  498. */
  499. char*
  500. file(char *name, char type)
  501. {
  502. static char nname[Elemlen+1];
  503. strncpy(nname, name, Elemlen);
  504. nname[Elemlen] = 0;
  505. nname[0] = type;
  506. return nname;
  507. }
  508. /*
  509. * send back the mail with an error message
  510. *
  511. * return 0 if successful
  512. */
  513. int
  514. returnmail(char **av, char *name, char *msg)
  515. {
  516. int pfd[2];
  517. Waitmsg *wm;
  518. int fd;
  519. char buf[256];
  520. char attachment[256];
  521. int i;
  522. long n;
  523. String *s;
  524. char *sender;
  525. if(av[1] == 0 || av[2] == 0){
  526. logit("runq - dumping bad file", name, av);
  527. return 0;
  528. }
  529. s = unescapespecial(s_copy(av[2]));
  530. sender = s_to_c(s);
  531. if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
  532. logit("runq - dumping p to p mail", name, av);
  533. return 0;
  534. }
  535. if(pipe(pfd) < 0){
  536. logit("runq - pipe failed", name, av);
  537. return -1;
  538. }
  539. switch(rfork(RFFDG|RFPROC|RFENVG)){
  540. case -1:
  541. logit("runq - fork failed", name, av);
  542. return -1;
  543. case 0:
  544. logit("returning", name, av);
  545. close(pfd[1]);
  546. close(0);
  547. dup(pfd[0], 0);
  548. close(pfd[0]);
  549. putenv("upasname", "/dev/null");
  550. snprint(buf, sizeof(buf), "%s/marshal", UPASBIN);
  551. snprint(attachment, sizeof(attachment), "%s", file(name, 'D'));
  552. execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, nil);
  553. error("can't exec", 0);
  554. break;
  555. default:
  556. break;
  557. }
  558. close(pfd[0]);
  559. fprint(pfd[1], "\n"); /* get out of headers */
  560. if(av[1]){
  561. fprint(pfd[1], "Your request ``%.20s ", av[1]);
  562. for(n = 3; av[n]; n++)
  563. fprint(pfd[1], "%s ", av[n]);
  564. }
  565. fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg);
  566. fd = open(file(name, 'E'), OREAD);
  567. if(fd >= 0){
  568. for(;;){
  569. n = read(fd, buf, sizeof(buf));
  570. if(n <= 0)
  571. break;
  572. if(write(pfd[1], buf, n) != n){
  573. close(fd);
  574. goto out;
  575. }
  576. }
  577. close(fd);
  578. }
  579. close(pfd[1]);
  580. out:
  581. wm = wait();
  582. if(wm == nil){
  583. syslog(0, "runq", "wait: %r");
  584. logit("wait failed", name, av);
  585. return -1;
  586. }
  587. i = 0;
  588. if(wm->msg[0]){
  589. i = -1;
  590. syslog(0, "runq", "returnmail child: %s", wm->msg);
  591. logit("returnmail child failed", name, av);
  592. }
  593. free(wm);
  594. return i;
  595. }
  596. /*
  597. * print a warning and continue
  598. */
  599. void
  600. warning(char *f, void *a)
  601. {
  602. char err[65];
  603. char buf[256];
  604. rerrstr(err, sizeof(err));
  605. snprint(buf, sizeof(buf), f, a);
  606. fprint(2, "runq: %s: %s\n", buf, err);
  607. }
  608. /*
  609. * print an error and die
  610. */
  611. void
  612. error(char *f, void *a)
  613. {
  614. char err[Errlen];
  615. char buf[256];
  616. rerrstr(err, sizeof(err));
  617. snprint(buf, sizeof(buf), f, a);
  618. fprint(2, "runq: %s: %s\n", buf, err);
  619. exits(buf);
  620. }
  621. void
  622. logit(char *msg, char *file, char **av)
  623. {
  624. int n, m;
  625. char buf[256];
  626. n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg);
  627. for(; *av; av++){
  628. m = strlen(*av);
  629. if(n + m + 4 > sizeof(buf))
  630. break;
  631. sprint(buf + n, " '%s'", *av);
  632. n += m + 3;
  633. }
  634. syslog(0, runqlog, "%s", buf);
  635. }
  636. char *loadfile = ".runqload";
  637. /*
  638. * load balancing
  639. */
  640. void
  641. doload(int start)
  642. {
  643. int fd;
  644. char buf[32];
  645. int i, n;
  646. Mlock *l;
  647. Dir *d;
  648. if(load <= 0)
  649. return;
  650. if(chdir(root) < 0){
  651. load = 0;
  652. return;
  653. }
  654. l = syslock(loadfile);
  655. fd = open(loadfile, ORDWR);
  656. if(fd < 0){
  657. fd = create(loadfile, 0666, ORDWR);
  658. if(fd < 0){
  659. load = 0;
  660. sysunlock(l);
  661. return;
  662. }
  663. }
  664. /* get current load */
  665. i = 0;
  666. n = read(fd, buf, sizeof(buf)-1);
  667. if(n >= 0){
  668. buf[n] = 0;
  669. i = atoi(buf);
  670. }
  671. if(i < 0)
  672. i = 0;
  673. /* ignore load if file hasn't been changed in 30 minutes */
  674. d = dirfstat(fd);
  675. if(d != nil){
  676. if(d->mtime + 30*60 < time(0))
  677. i = 0;
  678. free(d);
  679. }
  680. /* if load already too high, give up */
  681. if(start && i >= load){
  682. sysunlock(l);
  683. exits(0);
  684. }
  685. /* increment/decrement load */
  686. if(start)
  687. i++;
  688. else
  689. i--;
  690. seek(fd, 0, 0);
  691. fprint(fd, "%d\n", i);
  692. sysunlock(l);
  693. close(fd);
  694. }