runq.c 12 KB


  1. #include "common.h"
  2. #include <ctype.h>
  3. void doalldirs(void);
  4. void dodir(char*);
  5. void dofile(Dir*);
  6. void rundir(char*);
  7. char* file(char*, char);
  8. void warning(char*, void*);
  9. void error(char*, void*);
  10. int returnmail(char**, char*, char*);
  11. void logit(char*, char*, char**);
  12. void doload(int);
  13. #define HUNK 32
  14. char *cmd;
  15. char *root;
  16. int debug;
  17. int giveup = 2*24*60*60;
  18. int load;
  19. int limit;
  20. /* the current directory */
  21. Dir *dirbuf;
  22. long ndirbuf = 0;
  23. int nfiles;
  24. char *curdir;
  25. char *runqlog = "runq";
  26. int *pidlist;
  27. char **badsys; /* array of recalcitrant systems */
  28. int nbad;
  29. int npid = 50;
  30. int sflag; /* single thread per directory */
  31. int aflag; /* all directories */
  32. int Eflag; /* ignore E.xxxxxx dates */
  33. int Rflag; /* no giving up, ever */
  34. void
  35. usage(void)
  36. {
  37. fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
  38. exits("");
  39. }
  40. void
  41. main(int argc, char **argv)
  42. {
  43. char *qdir, *x;
  44. qdir = 0;
  45. ARGBEGIN{
  46. case 'l':
  47. x = ARGF();
  48. if(x == 0)
  49. usage();
  50. load = atoi(x);
  51. if(load < 0)
  52. load = 0;
  53. break;
  54. case 'E':
  55. Eflag++;
  56. break;
  57. case 'R': /* no giving up -- just leave stuff in the queue */
  58. Rflag++;
  59. break;
  60. case 'a':
  61. aflag++;
  62. break;
  63. case 'd':
  64. debug++;
  65. break;
  66. case 'r':
  67. limit = atoi(ARGF());
  68. break;
  69. case 's':
  70. sflag++;
  71. break;
  72. case 't':
  73. giveup = 60*60*atoi(ARGF());
  74. break;
  75. case 'q':
  76. qdir = ARGF();
  77. if(qdir == 0)
  78. usage();
  79. break;
  80. case 'n':
  81. npid = atoi(ARGF());
  82. if(npid == 0)
  83. usage();
  84. break;
  85. }ARGEND;
  86. if(argc != 2)
  87. usage();
  88. pidlist = malloc(npid*sizeof(*pidlist));
  89. if(pidlist == 0)
  90. error("can't malloc", 0);
  91. if(aflag == 0 && qdir == 0) {
  92. qdir = getuser();
  93. if(qdir == 0)
  94. error("unknown user", 0);
  95. }
  96. root = argv[0];
  97. cmd = argv[1];
  98. if(chdir(root) < 0)
  99. error("can't cd to %s", root);
  100. doload(1);
  101. if(aflag)
  102. doalldirs();
  103. else
  104. dodir(qdir);
  105. doload(0);
  106. exits(0);
  107. }
  108. int
  109. emptydir(char *name)
  110. {
  111. int fd;
  112. long n;
  113. char buf[2048];
  114. fd = open(name, OREAD);
  115. if(fd < 0)
  116. return 1;
  117. n = read(fd, buf, sizeof(buf));
  118. close(fd);
  119. if(n <= 0) {
  120. if(debug)
  121. fprint(2, "removing directory %s\n", name);
  122. syslog(0, runqlog, "rmdir %s", name);
  123. sysremove(name);
  124. return 1;
  125. }
  126. return 0;
  127. }
  128. int
  129. forkltd(void)
  130. {
  131. int i;
  132. int pid;
  133. for(i = 0; i < npid; i++){
  134. if(pidlist[i] <= 0)
  135. break;
  136. }
  137. while(i >= npid){
  138. pid = waitpid();
  139. if(pid < 0){
  140. syslog(0, runqlog, "forkltd confused");
  141. exits(0);
  142. }
  143. for(i = 0; i < npid; i++)
  144. if(pidlist[i] == pid)
  145. break;
  146. }
  147. pidlist[i] = fork();
  148. return pidlist[i];
  149. }
  150. /*
  151. * run all user directories, must be bootes (or root on unix) to do this
  152. */
  153. void
  154. doalldirs(void)
  155. {
  156. Dir *db;
  157. int fd;
  158. long i, n;
  159. fd = open(".", OREAD);
  160. if(fd == -1){
  161. warning("reading %s", root);
  162. return;
  163. }
  164. n = sysdirreadall(fd, &db);
  165. if(n > 0){
  166. for(i=0; i<n; i++){
  167. if(db[i].qid.type & QTDIR){
  168. if(emptydir(db[i].name))
  169. continue;
  170. switch(forkltd()){
  171. case -1:
  172. syslog(0, runqlog, "out of procs");
  173. doload(0);
  174. exits(0);
  175. case 0:
  176. if(sysdetach() < 0)
  177. error("%r", 0);
  178. dodir(db[i].name);
  179. exits(0);
  180. default:
  181. break;
  182. }
  183. }
  184. }
  185. free(db);
  186. }
  187. close(fd);
  188. }
  189. /*
  190. * cd to a user directory and run it
  191. */
  192. void
  193. dodir(char *name)
  194. {
  195. curdir = name;
  196. if(chdir(name) < 0){
  197. warning("cd to %s", name);
  198. return;
  199. }
  200. if(debug)
  201. fprint(2, "running %s\n", name);
  202. rundir(name);
  203. chdir("..");
  204. }
  205. /*
  206. * run the current directory
  207. */
  208. void
  209. rundir(char *name)
  210. {
  211. int fd;
  212. long i;
  213. if(aflag && sflag)
  214. fd = sysopenlocked(".", OREAD);
  215. else
  216. fd = open(".", OREAD);
  217. if(fd == -1){
  218. warning("reading %s", name);
  219. return;
  220. }
  221. nfiles = sysdirreadall(fd, &dirbuf);
  222. if(nfiles > 0){
  223. for(i=0; i<nfiles; i++){
  224. if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
  225. continue;
  226. dofile(&dirbuf[i]);
  227. }
  228. free(dirbuf);
  229. }
  230. if(aflag && sflag)
  231. sysunlockfile(fd);
  232. else
  233. close(fd);
  234. }
  235. /*
  236. * free files matching name in the current directory
  237. */
  238. void
  239. remmatch(char *name)
  240. {
  241. long i;
  242. syslog(0, runqlog, "removing %s/%s", curdir, name);
  243. for(i=0; i<nfiles; i++){
  244. if(strcmp(&dirbuf[i].name[1], &name[1]) == 0)
  245. sysremove(dirbuf[i].name);
  246. }
  247. /* error file (may have) appeared after we read the directory */
  248. /* stomp on data file in case of phase error */
  249. sysremove(file(name, 'D'));
  250. sysremove(file(name, 'E'));
  251. }
  252. /*
  253. * try a message
  254. */
  255. void
  256. dofile(Dir *dp)
  257. {
  258. Dir *d;
  259. int dfd, ac, dtime, efd, pid, i, etime;
  260. char *buf, *cp, **av;
  261. Waitmsg *wm;
  262. Biobuf *b;
  263. if(debug)
  264. fprint(2, "dofile %s\n", dp->name);
  265. /*
  266. * if no data file or empty control or data file, just clean up
  267. * the empty control file must be 15 minutes old, to minimize the
  268. * chance of a race.
  269. */
  270. d = dirstat(file(dp->name, 'D'));
  271. if(d == nil){
  272. syslog(0, runqlog, "no data file for %s", dp->name);
  273. remmatch(dp->name);
  274. return;
  275. }
  276. if(dp->length == 0){
  277. if(time(0)-dp->mtime > 15*60){
  278. syslog(0, runqlog, "empty ctl file for %s", dp->name);
  279. remmatch(dp->name);
  280. }
  281. return;
  282. }
  283. dtime = d->mtime;
  284. free(d);
  285. /*
  286. * retry times depend on the age of the errors file
  287. */
  288. if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){
  289. etime = d->mtime;
  290. free(d);
  291. if(etime - dtime < 60*60){
  292. /* up to the first hour, try every 15 minutes */
  293. if(time(0) - etime < 15*60)
  294. return;
  295. } else {
  296. /* after the first hour, try once an hour */
  297. if(time(0) - etime < 60*60)
  298. return;
  299. }
  300. }
  301. /*
  302. * open control and data
  303. */
  304. b = sysopen(file(dp->name, 'C'), "rl", 0660);
  305. if(b == 0) {
  306. if(debug)
  307. fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
  308. return;
  309. }
  310. dfd = open(file(dp->name, 'D'), OREAD);
  311. if(dfd < 0){
  312. if(debug)
  313. fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
  314. Bterm(b);
  315. sysunlockfile(Bfildes(b));
  316. return;
  317. }
  318. /*
  319. * make arg list
  320. * - read args into (malloc'd) buffer
  321. * - malloc a vector and copy pointers to args into it
  322. */
  323. buf = malloc(dp->length+1);
  324. if(buf == 0){
  325. warning("buffer allocation", 0);
  326. Bterm(b);
  327. sysunlockfile(Bfildes(b));
  328. close(dfd);
  329. return;
  330. }
  331. if(Bread(b, buf, dp->length) != dp->length){
  332. warning("reading control file %s\n", dp->name);
  333. Bterm(b);
  334. sysunlockfile(Bfildes(b));
  335. close(dfd);
  336. free(buf);
  337. return;
  338. }
  339. buf[dp->length] = 0;
  340. av = malloc(2*sizeof(char*));
  341. if(av == 0){
  342. warning("argv allocation", 0);
  343. close(dfd);
  344. free(buf);
  345. Bterm(b);
  346. sysunlockfile(Bfildes(b));
  347. return;
  348. }
  349. for(ac = 1, cp = buf; *cp; ac++){
  350. while(isspace(*cp))
  351. *cp++ = 0;
  352. if(*cp == 0)
  353. break;
  354. av = realloc(av, (ac+2)*sizeof(char*));
  355. if(av == 0){
  356. warning("argv allocation", 0);
  357. close(dfd);
  358. free(buf);
  359. Bterm(b);
  360. sysunlockfile(Bfildes(b));
  361. return;
  362. }
  363. av[ac] = cp;
  364. while(*cp && !isspace(*cp)){
  365. if(*cp++ == '"'){
  366. while(*cp && *cp != '"')
  367. cp++;
  368. if(*cp)
  369. cp++;
  370. }
  371. }
  372. }
  373. av[0] = cmd;
  374. av[ac] = 0;
  375. if(!Eflag &&time(0) - dtime > giveup){
  376. if(returnmail(av, dp->name, "Giveup") == 0)
  377. remmatch(dp->name);
  378. else {
  379. if(time(0) - dtime < giveup + 2*60*60)
  380. logit("returnmail failed", dp->name, av);
  381. if(time(0) - dtime > giveup + 24*60*60)
  382. remmatch(dp->name);
  383. }
  384. goto done;
  385. }
  386. for(i = 0; i < nbad; i++){
  387. if(strcmp(av[3], badsys[i]) == 0)
  388. goto done;
  389. }
  390. /*
  391. * transfer
  392. */
  393. pid = fork();
  394. switch(pid){
  395. case -1:
  396. sysunlockfile(Bfildes(b));
  397. syslog(0, runqlog, "out of procs");
  398. exits(0);
  399. case 0:
  400. if(debug) {
  401. fprint(2, "Starting %s", cmd);
  402. for(ac = 0; av[ac]; ac++)
  403. fprint(2, " %s", av[ac]);
  404. fprint(2, "\n");
  405. }
  406. logit("execing", dp->name, av);
  407. close(0);
  408. dup(dfd, 0);
  409. close(dfd);
  410. close(2);
  411. efd = open(file(dp->name, 'E'), OWRITE);
  412. if(efd < 0)
  413. efd = create(file(dp->name, 'E'), OWRITE, 0664);
  414. if(efd < 0)
  415. exits("");
  416. seek(efd, 0, 2);
  417. exec(cmd, av);
  418. error("can't exec %s", cmd);
  419. break;
  420. default:
  421. for(;;){
  422. wm = wait();
  423. if(wm == nil)
  424. error("wait failed: %r", "");
  425. if(wm->pid == pid)
  426. break;
  427. free(wm);
  428. }
  429. if(wm->msg[0]){
  430. if(debug)
  431. fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
  432. if(!Rflag && strstr(wm->msg, "Retry")==0){
  433. /* return the message and remove it */
  434. if(returnmail(av, dp->name, wm->msg) == 0)
  435. remmatch(dp->name);
  436. } else {
  437. /* add sys to bad list and try again later */
  438. nbad++;
  439. badsys = realloc(badsys, nbad*sizeof(char*));
  440. badsys[nbad-1] = strdup(av[3]);
  441. }
  442. } else {
  443. /* it worked remove the message */
  444. remmatch(dp->name);
  445. }
  446. free(wm);
  447. }
  448. done:
  449. Bterm(b);
  450. sysunlockfile(Bfildes(b));
  451. free(buf);
  452. free(av);
  453. close(dfd);
  454. }
  455. /*
  456. * return a name starting with the given character
  457. */
  458. char*
  459. file(char *name, char type)
  460. {
  461. static char nname[Elemlen+1];
  462. strncpy(nname, name, Elemlen);
  463. nname[Elemlen] = 0;
  464. nname[0] = type;
  465. return nname;
  466. }
  467. /*
  468. * send back the mail with an error message
  469. *
  470. * return 0 if successful
  471. */
  472. int
  473. returnmail(char **av, char *name, char *msg)
  474. {
  475. int pfd[2];
  476. Waitmsg *wm;
  477. int fd;
  478. char buf[256];
  479. char attachment[256];
  480. int i;
  481. long n;
  482. String *s;
  483. char *sender;
  484. if(av[1] == 0 || av[2] == 0){
  485. logit("runq - dumping bad file", name, av);
  486. return 0;
  487. }
  488. s = unescapespecial(s_copy(av[2]));
  489. sender = s_to_c(s);
  490. if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
  491. logit("runq - dumping p to p mail", name, av);
  492. return 0;
  493. }
  494. if(pipe(pfd) < 0)
  495. return -1;
  496. switch(rfork(RFFDG|RFPROC|RFENVG)){
  497. case -1:
  498. return -1;
  499. case 0:
  500. logit("returning", name, av);
  501. close(pfd[1]);
  502. close(0);
  503. dup(pfd[0], 0);
  504. close(pfd[0]);
  505. putenv("upasname", "/dev/null");
  506. snprint(buf, sizeof(buf), "%s/marshal", UPASBIN);
  507. snprint(attachment, sizeof(attachment), "%s", file(name, 'D'));
  508. execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, 0);
  509. error("can't exec", 0);
  510. break;
  511. default:
  512. break;
  513. }
  514. close(pfd[0]);
  515. fprint(pfd[1], "\n"); /* get out of headers */
  516. if(av[1]){
  517. fprint(pfd[1], "Your request ``%.20s ", av[1]);
  518. for(n = 3; av[n]; n++)
  519. fprint(pfd[1], "%s ", av[n]);
  520. }
  521. fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg);
  522. fd = open(file(name, 'E'), OREAD);
  523. if(fd >= 0){
  524. for(;;){
  525. n = read(fd, buf, sizeof(buf));
  526. if(n <= 0)
  527. break;
  528. if(write(pfd[1], buf, n) != n){
  529. close(fd);
  530. goto out;
  531. }
  532. }
  533. close(fd);
  534. }
  535. close(pfd[1]);
  536. out:
  537. wm = wait();
  538. if(wm == nil)
  539. return -1;
  540. i = wm->msg[0] ? -1 : 0;
  541. free(wm);
  542. return i;
  543. }
  544. /*
  545. * print a warning and continue
  546. */
  547. void
  548. warning(char *f, void *a)
  549. {
  550. char err[65];
  551. char buf[256];
  552. rerrstr(err, sizeof(err));
  553. snprint(buf, sizeof(buf), f, a);
  554. fprint(2, "runq: %s: %s\n", buf, err);
  555. }
  556. /*
  557. * print an error and die
  558. */
  559. void
  560. error(char *f, void *a)
  561. {
  562. char err[Errlen];
  563. char buf[256];
  564. rerrstr(err, sizeof(err));
  565. snprint(buf, sizeof(buf), f, a);
  566. fprint(2, "runq: %s: %s\n", buf, err);
  567. exits(buf);
  568. }
  569. void
  570. logit(char *msg, char *file, char **av)
  571. {
  572. int n, m;
  573. char buf[256];
  574. n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg);
  575. for(; *av; av++){
  576. m = strlen(*av);
  577. if(n + m + 4 > sizeof(buf))
  578. break;
  579. sprint(buf + n, " '%s'", *av);
  580. n += m + 3;
  581. }
  582. syslog(0, runqlog, "%s", buf);
  583. }
  584. char *loadfile = ".runqload";
  585. /*
  586. * load balancing
  587. */
  588. void
  589. doload(int start)
  590. {
  591. int fd;
  592. char buf[32];
  593. int i, n;
  594. Mlock *l;
  595. Dir *d;
  596. if(load <= 0)
  597. return;
  598. if(chdir(root) < 0){
  599. load = 0;
  600. return;
  601. }
  602. l = syslock(loadfile);
  603. fd = open(loadfile, ORDWR);
  604. if(fd < 0){
  605. fd = create(loadfile, 0666, ORDWR);
  606. if(fd < 0){
  607. load = 0;
  608. sysunlock(l);
  609. return;
  610. }
  611. }
  612. /* get current load */
  613. i = 0;
  614. n = read(fd, buf, sizeof(buf)-1);
  615. if(n >= 0){
  616. buf[n] = 0;
  617. i = atoi(buf);
  618. }
  619. if(i < 0)
  620. i = 0;
  621. /* ignore load if file hasn't been changed in 30 minutes */
  622. d = dirfstat(fd);
  623. if(d != nil){
  624. if(d->mtime + 30*60 < time(0))
  625. i = 0;
  626. free(d);
  627. }
  628. /* if load already too high, give up */
  629. if(start && i >= load){
  630. sysunlock(l);
  631. exits(0);
  632. }
  633. /* increment/decrement load */
  634. if(start)
  635. i++;
  636. else
  637. i--;
  638. seek(fd, 0, 0);
  639. fprint(fd, "%d\n", i);
  640. sysunlock(l);
  641. close(fd);
  642. }