runq.c 12 KB


  1. #include "common.h"
  2. #include <ctype.h>
  3. void doalldirs(void);
  4. void dodir(char*);
  5. void dofile(Dir*);
  6. void rundir(char*);
  7. char* file(char*, char);
  8. void warning(char*, void*);
  9. void error(char*, void*);
  10. int returnmail(char**, char*, char*);
  11. void logit(char*, char*, char**);
  12. void doload(int);
  13. #define HUNK 32
  14. char *cmd;
  15. char *root;
  16. int debug;
  17. int giveup = 2*24*60*60;
  18. int load;
  19. int limit;
  20. /* the current directory */
  21. Dir *dirbuf;
  22. long ndirbuf = 0;
  23. int nfiles;
  24. char *curdir;
  25. char *runqlog = "runq";
  26. int *pidlist;
  27. char **badsys; /* array of recalcitrant systems */
  28. int nbad;
  29. int npid = 50;
  30. int sflag; /* single thread per directory */
  31. int aflag; /* all directories */
  32. int Eflag; /* ignore E.xxxxxx dates */
  33. int Rflag; /* no giving up, ever */
  34. void
  35. usage(void)
  36. {
  37. fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
  38. exits("");
  39. }
  40. void
  41. main(int argc, char **argv)
  42. {
  43. char *qdir, *x;
  44. qdir = 0;
  45. ARGBEGIN{
  46. case 'l':
  47. x = ARGF();
  48. if(x == 0)
  49. usage();
  50. load = atoi(x);
  51. if(load < 0)
  52. load = 0;
  53. break;
  54. case 'E':
  55. Eflag++;
  56. break;
  57. case 'R': /* no giving up -- just leave stuff in the queue */
  58. Rflag++;
  59. break;
  60. case 'a':
  61. aflag++;
  62. break;
  63. case 'd':
  64. debug++;
  65. break;
  66. case 'r':
  67. limit = atoi(ARGF());
  68. break;
  69. case 's':
  70. sflag++;
  71. break;
  72. case 't':
  73. giveup = 60*60*atoi(ARGF());
  74. break;
  75. case 'q':
  76. qdir = ARGF();
  77. if(qdir == 0)
  78. usage();
  79. break;
  80. case 'n':
  81. npid = atoi(ARGF());
  82. if(npid == 0)
  83. usage();
  84. break;
  85. }ARGEND;
  86. if(argc != 2)
  87. usage();
  88. pidlist = malloc(npid*sizeof(*pidlist));
  89. if(pidlist == 0)
  90. error("can't malloc", 0);
  91. if(aflag == 0 && qdir == 0) {
  92. qdir = getuser();
  93. if(qdir == 0)
  94. error("unknown user", 0);
  95. }
  96. root = argv[0];
  97. cmd = argv[1];
  98. if(chdir(root) < 0)
  99. error("can't cd to %s", root);
  100. doload(1);
  101. if(aflag)
  102. doalldirs();
  103. else
  104. dodir(qdir);
  105. doload(0);
  106. exits(0);
  107. }
  108. int
  109. emptydir(char *name)
  110. {
  111. int fd;
  112. long n;
  113. char buf[2048];
  114. fd = open(name, OREAD);
  115. if(fd < 0)
  116. return 1;
  117. n = read(fd, buf, sizeof(buf));
  118. close(fd);
  119. if(n <= 0) {
  120. if(debug)
  121. fprint(2, "removing directory %s\n", name);
  122. syslog(0, runqlog, "rmdir %s", name);
  123. sysremove(name);
  124. return 1;
  125. }
  126. return 0;
  127. }
  128. int
  129. forkltd(void)
  130. {
  131. int i;
  132. int pid;
  133. for(i = 0; i < npid; i++){
  134. if(pidlist[i] <= 0)
  135. break;
  136. }
  137. while(i >= npid){
  138. pid = waitpid();
  139. if(pid < 0){
  140. syslog(0, runqlog, "forkltd confused");
  141. exits(0);
  142. }
  143. for(i = 0; i < npid; i++)
  144. if(pidlist[i] == pid)
  145. break;
  146. }
  147. pidlist[i] = fork();
  148. return pidlist[i];
  149. }
  150. /*
  151. * run all user directories, must be bootes (or root on unix) to do this
  152. */
  153. void
  154. doalldirs(void)
  155. {
  156. Dir *db;
  157. int fd;
  158. long i, n;
  159. fd = open(".", OREAD);
  160. if(fd == -1){
  161. warning("reading %s", root);
  162. return;
  163. }
  164. n = sysdirreadall(fd, &db);
  165. if(n > 0){
  166. for(i=0; i<n; i++){
  167. if(db[i].qid.type & QTDIR){
  168. if(emptydir(db[i].name))
  169. continue;
  170. switch(forkltd()){
  171. case -1:
  172. syslog(0, runqlog, "out of procs");
  173. doload(0);
  174. exits(0);
  175. case 0:
  176. if(sysdetach() < 0)
  177. error("%r", 0);
  178. dodir(db[i].name);
  179. exits(0);
  180. default:
  181. break;
  182. }
  183. }
  184. }
  185. free(db);
  186. }
  187. close(fd);
  188. }
  189. /*
  190. * cd to a user directory and run it
  191. */
  192. void
  193. dodir(char *name)
  194. {
  195. curdir = name;
  196. if(chdir(name) < 0){
  197. warning("cd to %s", name);
  198. return;
  199. }
  200. if(debug)
  201. fprint(2, "running %s\n", name);
  202. rundir(name);
  203. chdir("..");
  204. }
  205. /*
  206. * run the current directory
  207. */
  208. void
  209. rundir(char *name)
  210. {
  211. int fd;
  212. long i;
  213. if(aflag && sflag)
  214. fd = sysopenlocked(".", OREAD);
  215. else
  216. fd = open(".", OREAD);
  217. if(fd == -1){
  218. warning("reading %s", name);
  219. return;
  220. }
  221. nfiles = sysdirreadall(fd, &dirbuf);
  222. if(nfiles > 0){
  223. for(i=0; i<nfiles; i++){
  224. if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
  225. continue;
  226. dofile(&dirbuf[i]);
  227. }
  228. free(dirbuf);
  229. }
  230. if(aflag && sflag)
  231. sysunlockfile(fd);
  232. else
  233. close(fd);
  234. }
  235. /*
  236. * free files matching name in the current directory
  237. */
  238. void
  239. remmatch(char *name)
  240. {
  241. long i;
  242. syslog(0, runqlog, "removing %s/%s", curdir, name);
  243. for(i=0; i<nfiles; i++){
  244. if(strcmp(&dirbuf[i].name[1], &name[1]) == 0)
  245. sysremove(dirbuf[i].name);
  246. }
  247. /* error file (may have) appeared after we read the directory */
  248. /* stomp on data file in case of phase error */
  249. sysremove(file(name, 'D'));
  250. sysremove(file(name, 'E'));
  251. }
  252. /*
  253. * try a message
  254. */
  255. void
  256. dofile(Dir *dp)
  257. {
  258. Dir *d;
  259. int dfd, ac, dtime, efd, pid, i, etime;
  260. char *buf, *cp, **av;
  261. Waitmsg *wm;
  262. Biobuf *b;
  263. if(debug)
  264. fprint(2, "dofile %s\n", dp->name);
  265. /*
  266. * if no data file or empty control or data file, just clean up
  267. * the empty control file must be 15 minutes old, to minimize the
  268. * chance of a race.
  269. */
  270. d = dirstat(file(dp->name, 'D'));
  271. if(d == nil){
  272. syslog(0, runqlog, "no data file for %s", dp->name);
  273. remmatch(dp->name);
  274. return;
  275. }
  276. if(dp->length == 0){
  277. if(time(0)-dp->mtime > 15*60){
  278. syslog(0, runqlog, "empty ctl file for %s", dp->name);
  279. remmatch(dp->name);
  280. }
  281. return;
  282. }
  283. dtime = d->mtime;
  284. free(d);
  285. /*
  286. * retry times depend on the age of the errors file
  287. */
  288. if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){
  289. etime = d->mtime;
  290. free(d);
  291. if(etime - dtime < 60*60){
  292. /* up to the first hour, try every 15 minutes */
  293. if(time(0) - etime < 15*60)
  294. return;
  295. } else {
  296. /* after the first hour, try once an hour */
  297. if(time(0) - etime < 60*60)
  298. return;
  299. }
  300. }
  301. /*
  302. * open control and data
  303. */
  304. b = sysopen(file(dp->name, 'C'), "rl", 0660);
  305. if(b == 0) {
  306. if(debug)
  307. fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
  308. return;
  309. }
  310. dfd = open(file(dp->name, 'D'), OREAD);
  311. if(dfd < 0){
  312. if(debug)
  313. fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
  314. Bterm(b);
  315. sysunlockfile(Bfildes(b));
  316. return;
  317. }
  318. /*
  319. * make arg list
  320. * - read args into (malloc'd) buffer
  321. * - malloc a vector and copy pointers to args into it
  322. */
  323. buf = malloc(dp->length+1);
  324. if(buf == 0){
  325. warning("buffer allocation", 0);
  326. Bterm(b);
  327. sysunlockfile(Bfildes(b));
  328. close(dfd);
  329. return;
  330. }
  331. if(Bread(b, buf, dp->length) != dp->length){
  332. warning("reading control file %s\n", dp->name);
  333. Bterm(b);
  334. sysunlockfile(Bfildes(b));
  335. close(dfd);
  336. free(buf);
  337. return;
  338. }
  339. buf[dp->length] = 0;
  340. av = malloc(2*sizeof(char*));
  341. if(av == 0){
  342. warning("argv allocation", 0);
  343. close(dfd);
  344. free(buf);
  345. Bterm(b);
  346. sysunlockfile(Bfildes(b));
  347. return;
  348. }
  349. for(ac = 1, cp = buf; *cp; ac++){
  350. while(isspace(*cp))
  351. *cp++ = 0;
  352. if(*cp == 0)
  353. break;
  354. av = realloc(av, (ac+2)*sizeof(char*));
  355. if(av == 0){
  356. warning("argv allocation", 0);
  357. close(dfd);
  358. free(buf);
  359. Bterm(b);
  360. sysunlockfile(Bfildes(b));
  361. return;
  362. }
  363. av[ac] = cp;
  364. while(*cp && !isspace(*cp)){
  365. if(*cp++ == '"'){
  366. while(*cp && *cp != '"')
  367. cp++;
  368. if(*cp)
  369. cp++;
  370. }
  371. }
  372. }
  373. av[0] = cmd;
  374. av[ac] = 0;
  375. if(!Eflag &&time(0) - dtime > giveup){
  376. if(returnmail(av, dp->name, "Giveup") != 0)
  377. logit("returnmail failed", dp->name, av);
  378. remmatch(dp->name);
  379. goto done;
  380. }
  381. for(i = 0; i < nbad; i++){
  382. if(strcmp(av[3], badsys[i]) == 0)
  383. goto done;
  384. }
  385. /*
  386. * transfer
  387. */
  388. pid = fork();
  389. switch(pid){
  390. case -1:
  391. sysunlockfile(Bfildes(b));
  392. syslog(0, runqlog, "out of procs");
  393. exits(0);
  394. case 0:
  395. if(debug) {
  396. fprint(2, "Starting %s", cmd);
  397. for(ac = 0; av[ac]; ac++)
  398. fprint(2, " %s", av[ac]);
  399. fprint(2, "\n");
  400. }
  401. logit("execing", dp->name, av);
  402. close(0);
  403. dup(dfd, 0);
  404. close(dfd);
  405. close(2);
  406. efd = open(file(dp->name, 'E'), OWRITE);
  407. if(efd < 0){
  408. if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
  409. efd = create(file(dp->name, 'E'), OWRITE, 0666);
  410. if(efd < 0){
  411. if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
  412. exits("could not open error file - Retry");
  413. }
  414. }
  415. seek(efd, 0, 2);
  416. exec(cmd, av);
  417. error("can't exec %s", cmd);
  418. break;
  419. default:
  420. for(;;){
  421. wm = wait();
  422. if(wm == nil)
  423. error("wait failed: %r", "");
  424. if(wm->pid == pid)
  425. break;
  426. free(wm);
  427. }
  428. if(debug)
  429. fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
  430. if(wm->msg[0]){
  431. if(debug)
  432. fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
  433. if(!Rflag && strstr(wm->msg, "Retry")==0){
  434. /* return the message and remove it */
  435. if(returnmail(av, dp->name, wm->msg) != 0)
  436. logit("returnmail failed", dp->name, av);
  437. remmatch(dp->name);
  438. } else {
  439. /* add sys to bad list and try again later */
  440. nbad++;
  441. badsys = realloc(badsys, nbad*sizeof(char*));
  442. badsys[nbad-1] = strdup(av[3]);
  443. }
  444. } else {
  445. /* it worked remove the message */
  446. remmatch(dp->name);
  447. }
  448. free(wm);
  449. }
  450. done:
  451. Bterm(b);
  452. sysunlockfile(Bfildes(b));
  453. free(buf);
  454. free(av);
  455. close(dfd);
  456. }
  457. /*
  458. * return a name starting with the given character
  459. */
  460. char*
  461. file(char *name, char type)
  462. {
  463. static char nname[Elemlen+1];
  464. strncpy(nname, name, Elemlen);
  465. nname[Elemlen] = 0;
  466. nname[0] = type;
  467. return nname;
  468. }
  469. /*
  470. * send back the mail with an error message
  471. *
  472. * return 0 if successful
  473. */
  474. int
  475. returnmail(char **av, char *name, char *msg)
  476. {
  477. int pfd[2];
  478. Waitmsg *wm;
  479. int fd;
  480. char buf[256];
  481. char attachment[256];
  482. int i;
  483. long n;
  484. String *s;
  485. char *sender;
  486. if(av[1] == 0 || av[2] == 0){
  487. logit("runq - dumping bad file", name, av);
  488. return 0;
  489. }
  490. s = unescapespecial(s_copy(av[2]));
  491. sender = s_to_c(s);
  492. if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
  493. logit("runq - dumping p to p mail", name, av);
  494. return 0;
  495. }
  496. if(pipe(pfd) < 0){
  497. logit("runq - pipe failed", name, av);
  498. return -1;
  499. }
  500. switch(rfork(RFFDG|RFPROC|RFENVG)){
  501. case -1:
  502. logit("runq - fork failed", name, av);
  503. return -1;
  504. case 0:
  505. logit("returning", name, av);
  506. close(pfd[1]);
  507. close(0);
  508. dup(pfd[0], 0);
  509. close(pfd[0]);
  510. putenv("upasname", "/dev/null");
  511. snprint(buf, sizeof(buf), "%s/marshal", UPASBIN);
  512. snprint(attachment, sizeof(attachment), "%s", file(name, 'D'));
  513. execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, 0);
  514. error("can't exec", 0);
  515. break;
  516. default:
  517. break;
  518. }
  519. close(pfd[0]);
  520. fprint(pfd[1], "\n"); /* get out of headers */
  521. if(av[1]){
  522. fprint(pfd[1], "Your request ``%.20s ", av[1]);
  523. for(n = 3; av[n]; n++)
  524. fprint(pfd[1], "%s ", av[n]);
  525. }
  526. fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg);
  527. fd = open(file(name, 'E'), OREAD);
  528. if(fd >= 0){
  529. for(;;){
  530. n = read(fd, buf, sizeof(buf));
  531. if(n <= 0)
  532. break;
  533. if(write(pfd[1], buf, n) != n){
  534. close(fd);
  535. goto out;
  536. }
  537. }
  538. close(fd);
  539. }
  540. close(pfd[1]);
  541. out:
  542. wm = wait();
  543. if(wm == nil){
  544. syslog(0, "runq", "wait: %r");
  545. logit("wait failed", name, av);
  546. return -1;
  547. }
  548. i = 0;
  549. if(wm->msg[0]){
  550. i = -1;
  551. syslog(0, "runq", "returnmail child: %s", wm->msg);
  552. logit("returnmail child failed", name, av);
  553. }
  554. free(wm);
  555. return i;
  556. }
  557. /*
  558. * print a warning and continue
  559. */
  560. void
  561. warning(char *f, void *a)
  562. {
  563. char err[65];
  564. char buf[256];
  565. rerrstr(err, sizeof(err));
  566. snprint(buf, sizeof(buf), f, a);
  567. fprint(2, "runq: %s: %s\n", buf, err);
  568. }
  569. /*
  570. * print an error and die
  571. */
  572. void
  573. error(char *f, void *a)
  574. {
  575. char err[Errlen];
  576. char buf[256];
  577. rerrstr(err, sizeof(err));
  578. snprint(buf, sizeof(buf), f, a);
  579. fprint(2, "runq: %s: %s\n", buf, err);
  580. exits(buf);
  581. }
  582. void
  583. logit(char *msg, char *file, char **av)
  584. {
  585. int n, m;
  586. char buf[256];
  587. n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg);
  588. for(; *av; av++){
  589. m = strlen(*av);
  590. if(n + m + 4 > sizeof(buf))
  591. break;
  592. sprint(buf + n, " '%s'", *av);
  593. n += m + 3;
  594. }
  595. syslog(0, runqlog, "%s", buf);
  596. }
  597. char *loadfile = ".runqload";
  598. /*
  599. * load balancing
  600. */
  601. void
  602. doload(int start)
  603. {
  604. int fd;
  605. char buf[32];
  606. int i, n;
  607. Mlock *l;
  608. Dir *d;
  609. if(load <= 0)
  610. return;
  611. if(chdir(root) < 0){
  612. load = 0;
  613. return;
  614. }
  615. l = syslock(loadfile);
  616. fd = open(loadfile, ORDWR);
  617. if(fd < 0){
  618. fd = create(loadfile, 0666, ORDWR);
  619. if(fd < 0){
  620. load = 0;
  621. sysunlock(l);
  622. return;
  623. }
  624. }
  625. /* get current load */
  626. i = 0;
  627. n = read(fd, buf, sizeof(buf)-1);
  628. if(n >= 0){
  629. buf[n] = 0;
  630. i = atoi(buf);
  631. }
  632. if(i < 0)
  633. i = 0;
  634. /* ignore load if file hasn't been changed in 30 minutes */
  635. d = dirfstat(fd);
  636. if(d != nil){
  637. if(d->mtime + 30*60 < time(0))
  638. i = 0;
  639. free(d);
  640. }
  641. /* if load already too high, give up */
  642. if(start && i >= load){
  643. sysunlock(l);
  644. exits(0);
  645. }
  646. /* increment/decrement load */
  647. if(start)
  648. i++;
  649. else
  650. i--;
  651. seek(fd, 0, 0);
  652. fprint(fd, "%d\n", i);
  653. sysunlock(l);
  654. close(fd);
  655. }