123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723 |
- #include "common.h"
- #include <ctype.h>
- void doalldirs(void);
- void dodir(char*);
- void dofile(Dir*);
- void rundir(char*);
- char* file(char*, char);
- void warning(char*, void*);
- void error(char*, void*);
- int returnmail(char**, char*, char*);
- void logit(char*, char*, char**);
- void doload(int);
- #define HUNK 32
- char *cmd;
- char *root;
- int debug;
- int giveup = 2*24*60*60;
- int load;
- int limit;
- /* the current directory */
- Dir *dirbuf;
- long ndirbuf = 0;
- int nfiles;
- char *curdir;
- char *runqlog = "runq";
- int *pidlist;
- char **badsys; /* array of recalcitrant systems */
- int nbad;
- int npid = 50;
- int sflag; /* single thread per directory */
- int aflag; /* all directories */
- int Eflag; /* ignore E.xxxxxx dates */
- int Rflag; /* no giving up, ever */
- void
- usage(void)
- {
- fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
- exits("");
- }
- void
- main(int argc, char **argv)
- {
- char *qdir, *x;
- qdir = 0;
- ARGBEGIN{
- case 'l':
- x = ARGF();
- if(x == 0)
- usage();
- load = atoi(x);
- if(load < 0)
- load = 0;
- break;
- case 'E':
- Eflag++;
- break;
- case 'R': /* no giving up -- just leave stuff in the queue */
- Rflag++;
- break;
- case 'a':
- aflag++;
- break;
- case 'd':
- debug++;
- break;
- case 'r':
- limit = atoi(ARGF());
- break;
- case 's':
- sflag++;
- break;
- case 't':
- giveup = 60*60*atoi(ARGF());
- break;
- case 'q':
- qdir = ARGF();
- if(qdir == 0)
- usage();
- break;
- case 'n':
- npid = atoi(ARGF());
- if(npid == 0)
- usage();
- break;
- }ARGEND;
- if(argc != 2)
- usage();
- pidlist = malloc(npid*sizeof(*pidlist));
- if(pidlist == 0)
- error("can't malloc", 0);
- if(aflag == 0 && qdir == 0) {
- qdir = getuser();
- if(qdir == 0)
- error("unknown user", 0);
- }
- root = argv[0];
- cmd = argv[1];
- if(chdir(root) < 0)
- error("can't cd to %s", root);
- doload(1);
- if(aflag)
- doalldirs();
- else
- dodir(qdir);
- doload(0);
- exits(0);
- }
- int
- emptydir(char *name)
- {
- int fd;
- long n;
- char buf[2048];
- fd = open(name, OREAD);
- if(fd < 0)
- return 1;
- n = read(fd, buf, sizeof(buf));
- close(fd);
- if(n <= 0) {
- if(debug)
- fprint(2, "removing directory %s\n", name);
- syslog(0, runqlog, "rmdir %s", name);
- sysremove(name);
- return 1;
- }
- return 0;
- }
- int
- forkltd(void)
- {
- int i;
- int pid;
- for(i = 0; i < npid; i++){
- if(pidlist[i] <= 0)
- break;
- }
- while(i >= npid){
- pid = waitpid();
- if(pid < 0){
- syslog(0, runqlog, "forkltd confused");
- exits(0);
- }
- for(i = 0; i < npid; i++)
- if(pidlist[i] == pid)
- break;
- }
- pidlist[i] = fork();
- return pidlist[i];
- }
- /*
- * run all user directories, must be bootes (or root on unix) to do this
- */
- void
- doalldirs(void)
- {
- Dir *db;
- int fd;
- long i, n;
- fd = open(".", OREAD);
- if(fd == -1){
- warning("reading %s", root);
- return;
- }
- n = sysdirreadall(fd, &db);
- if(n > 0){
- for(i=0; i<n; i++){
- if(db[i].qid.type & QTDIR){
- if(emptydir(db[i].name))
- continue;
- switch(forkltd()){
- case -1:
- syslog(0, runqlog, "out of procs");
- doload(0);
- exits(0);
- case 0:
- if(sysdetach() < 0)
- error("%r", 0);
- dodir(db[i].name);
- exits(0);
- default:
- break;
- }
- }
- }
- free(db);
- }
- close(fd);
- }
- /*
- * cd to a user directory and run it
- */
- void
- dodir(char *name)
- {
- curdir = name;
- if(chdir(name) < 0){
- warning("cd to %s", name);
- return;
- }
- if(debug)
- fprint(2, "running %s\n", name);
- rundir(name);
- chdir("..");
- }
- /*
- * run the current directory
- */
- void
- rundir(char *name)
- {
- int fd;
- long i;
- if(aflag && sflag)
- fd = sysopenlocked(".", OREAD);
- else
- fd = open(".", OREAD);
- if(fd == -1){
- warning("reading %s", name);
- return;
- }
- nfiles = sysdirreadall(fd, &dirbuf);
- if(nfiles > 0){
- for(i=0; i<nfiles; i++){
- if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
- continue;
- dofile(&dirbuf[i]);
- }
- free(dirbuf);
- }
- if(aflag && sflag)
- sysunlockfile(fd);
- else
- close(fd);
- }
- /*
- * free files matching name in the current directory
- */
- void
- remmatch(char *name)
- {
- long i;
- syslog(0, runqlog, "removing %s/%s", curdir, name);
- for(i=0; i<nfiles; i++){
- if(strcmp(&dirbuf[i].name[1], &name[1]) == 0)
- sysremove(dirbuf[i].name);
- }
- /* error file (may have) appeared after we read the directory */
- /* stomp on data file in case of phase error */
- sysremove(file(name, 'D'));
- sysremove(file(name, 'E'));
- }
- /*
- * try a message
- */
- void
- dofile(Dir *dp)
- {
- Dir *d;
- int dfd, ac, dtime, efd, pid, i, etime;
- char *buf, *cp, **av;
- Waitmsg *wm;
- Biobuf *b;
- if(debug)
- fprint(2, "dofile %s\n", dp->name);
- /*
- * if no data file or empty control or data file, just clean up
- * the empty control file must be 15 minutes old, to minimize the
- * chance of a race.
- */
- d = dirstat(file(dp->name, 'D'));
- if(d == nil){
- syslog(0, runqlog, "no data file for %s", dp->name);
- remmatch(dp->name);
- return;
- }
- if(dp->length == 0){
- if(time(0)-dp->mtime > 15*60){
- syslog(0, runqlog, "empty ctl file for %s", dp->name);
- remmatch(dp->name);
- }
- return;
- }
- dtime = d->mtime;
- free(d);
- /*
- * retry times depend on the age of the errors file
- */
- if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){
- etime = d->mtime;
- free(d);
- if(etime - dtime < 60*60){
- /* up to the first hour, try every 15 minutes */
- if(time(0) - etime < 15*60)
- return;
- } else {
- /* after the first hour, try once an hour */
- if(time(0) - etime < 60*60)
- return;
- }
-
- }
- /*
- * open control and data
- */
- b = sysopen(file(dp->name, 'C'), "rl", 0660);
- if(b == 0) {
- if(debug)
- fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
- return;
- }
- dfd = open(file(dp->name, 'D'), OREAD);
- if(dfd < 0){
- if(debug)
- fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
- Bterm(b);
- sysunlockfile(Bfildes(b));
- return;
- }
- /*
- * make arg list
- * - read args into (malloc'd) buffer
- * - malloc a vector and copy pointers to args into it
- */
- buf = malloc(dp->length+1);
- if(buf == 0){
- warning("buffer allocation", 0);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- close(dfd);
- return;
- }
- if(Bread(b, buf, dp->length) != dp->length){
- warning("reading control file %s\n", dp->name);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- close(dfd);
- free(buf);
- return;
- }
- buf[dp->length] = 0;
- av = malloc(2*sizeof(char*));
- if(av == 0){
- warning("argv allocation", 0);
- close(dfd);
- free(buf);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- return;
- }
- for(ac = 1, cp = buf; *cp; ac++){
- while(isspace(*cp))
- *cp++ = 0;
- if(*cp == 0)
- break;
- av = realloc(av, (ac+2)*sizeof(char*));
- if(av == 0){
- warning("argv allocation", 0);
- close(dfd);
- free(buf);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- return;
- }
- av[ac] = cp;
- while(*cp && !isspace(*cp)){
- if(*cp++ == '"'){
- while(*cp && *cp != '"')
- cp++;
- if(*cp)
- cp++;
- }
- }
- }
- av[0] = cmd;
- av[ac] = 0;
- if(!Eflag &&time(0) - dtime > giveup){
- if(returnmail(av, dp->name, "Giveup") != 0)
- logit("returnmail failed", dp->name, av);
- remmatch(dp->name);
- goto done;
- }
- for(i = 0; i < nbad; i++){
- if(strcmp(av[3], badsys[i]) == 0)
- goto done;
- }
- /*
- * transfer
- */
- pid = fork();
- switch(pid){
- case -1:
- sysunlockfile(Bfildes(b));
- syslog(0, runqlog, "out of procs");
- exits(0);
- case 0:
- if(debug) {
- fprint(2, "Starting %s", cmd);
- for(ac = 0; av[ac]; ac++)
- fprint(2, " %s", av[ac]);
- fprint(2, "\n");
- }
- logit("execing", dp->name, av);
- close(0);
- dup(dfd, 0);
- close(dfd);
- close(2);
- efd = open(file(dp->name, 'E'), OWRITE);
- if(efd < 0){
- if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
- efd = create(file(dp->name, 'E'), OWRITE, 0666);
- if(efd < 0){
- if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
- exits("could not open error file - Retry");
- }
- }
- seek(efd, 0, 2);
- exec(cmd, av);
- error("can't exec %s", cmd);
- break;
- default:
- for(;;){
- wm = wait();
- if(wm == nil)
- error("wait failed: %r", "");
- if(wm->pid == pid)
- break;
- free(wm);
- }
- if(debug)
- fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
- if(wm->msg[0]){
- if(debug)
- fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
- if(!Rflag && strstr(wm->msg, "Retry")==0){
- /* return the message and remove it */
- if(returnmail(av, dp->name, wm->msg) != 0)
- logit("returnmail failed", dp->name, av);
- remmatch(dp->name);
- } else {
- /* add sys to bad list and try again later */
- nbad++;
- badsys = realloc(badsys, nbad*sizeof(char*));
- badsys[nbad-1] = strdup(av[3]);
- }
- } else {
- /* it worked remove the message */
- remmatch(dp->name);
- }
- free(wm);
- }
- done:
- Bterm(b);
- sysunlockfile(Bfildes(b));
- free(buf);
- free(av);
- close(dfd);
- }
- /*
- * return a name starting with the given character
- */
- char*
- file(char *name, char type)
- {
- static char nname[Elemlen+1];
- strncpy(nname, name, Elemlen);
- nname[Elemlen] = 0;
- nname[0] = type;
- return nname;
- }
- /*
- * send back the mail with an error message
- *
- * return 0 if successful
- */
- int
- returnmail(char **av, char *name, char *msg)
- {
- int pfd[2];
- Waitmsg *wm;
- int fd;
- char buf[256];
- char attachment[256];
- int i;
- long n;
- String *s;
- char *sender;
- if(av[1] == 0 || av[2] == 0){
- logit("runq - dumping bad file", name, av);
- return 0;
- }
- s = unescapespecial(s_copy(av[2]));
- sender = s_to_c(s);
- if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
- logit("runq - dumping p to p mail", name, av);
- return 0;
- }
- if(pipe(pfd) < 0){
- logit("runq - pipe failed", name, av);
- return -1;
- }
- switch(rfork(RFFDG|RFPROC|RFENVG)){
- case -1:
- logit("runq - fork failed", name, av);
- return -1;
- case 0:
- logit("returning", name, av);
- close(pfd[1]);
- close(0);
- dup(pfd[0], 0);
- close(pfd[0]);
- putenv("upasname", "/dev/null");
- snprint(buf, sizeof(buf), "%s/marshal", UPASBIN);
- snprint(attachment, sizeof(attachment), "%s", file(name, 'D'));
- execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, 0);
- error("can't exec", 0);
- break;
- default:
- break;
- }
- close(pfd[0]);
- fprint(pfd[1], "\n"); /* get out of headers */
- if(av[1]){
- fprint(pfd[1], "Your request ``%.20s ", av[1]);
- for(n = 3; av[n]; n++)
- fprint(pfd[1], "%s ", av[n]);
- }
- fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg);
- fd = open(file(name, 'E'), OREAD);
- if(fd >= 0){
- for(;;){
- n = read(fd, buf, sizeof(buf));
- if(n <= 0)
- break;
- if(write(pfd[1], buf, n) != n){
- close(fd);
- goto out;
- }
- }
- close(fd);
- }
- close(pfd[1]);
- out:
- wm = wait();
- if(wm == nil){
- syslog(0, "runq", "wait: %r");
- logit("wait failed", name, av);
- return -1;
- }
- i = 0;
- if(wm->msg[0]){
- i = -1;
- syslog(0, "runq", "returnmail child: %s", wm->msg);
- logit("returnmail child failed", name, av);
- }
- free(wm);
- return i;
- }
- /*
- * print a warning and continue
- */
- void
- warning(char *f, void *a)
- {
- char err[65];
- char buf[256];
- rerrstr(err, sizeof(err));
- snprint(buf, sizeof(buf), f, a);
- fprint(2, "runq: %s: %s\n", buf, err);
- }
- /*
- * print an error and die
- */
- void
- error(char *f, void *a)
- {
- char err[Errlen];
- char buf[256];
- rerrstr(err, sizeof(err));
- snprint(buf, sizeof(buf), f, a);
- fprint(2, "runq: %s: %s\n", buf, err);
- exits(buf);
- }
- void
- logit(char *msg, char *file, char **av)
- {
- int n, m;
- char buf[256];
- n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg);
- for(; *av; av++){
- m = strlen(*av);
- if(n + m + 4 > sizeof(buf))
- break;
- sprint(buf + n, " '%s'", *av);
- n += m + 3;
- }
- syslog(0, runqlog, "%s", buf);
- }
- char *loadfile = ".runqload";
- /*
- * load balancing
- */
- void
- doload(int start)
- {
- int fd;
- char buf[32];
- int i, n;
- Mlock *l;
- Dir *d;
- if(load <= 0)
- return;
- if(chdir(root) < 0){
- load = 0;
- return;
- }
- l = syslock(loadfile);
- fd = open(loadfile, ORDWR);
- if(fd < 0){
- fd = create(loadfile, 0666, ORDWR);
- if(fd < 0){
- load = 0;
- sysunlock(l);
- return;
- }
- }
- /* get current load */
- i = 0;
- n = read(fd, buf, sizeof(buf)-1);
- if(n >= 0){
- buf[n] = 0;
- i = atoi(buf);
- }
- if(i < 0)
- i = 0;
- /* ignore load if file hasn't been changed in 30 minutes */
- d = dirfstat(fd);
- if(d != nil){
- if(d->mtime + 30*60 < time(0))
- i = 0;
- free(d);
- }
- /* if load already too high, give up */
- if(start && i >= load){
- sysunlock(l);
- exits(0);
- }
- /* increment/decrement load */
- if(start)
- i++;
- else
- i--;
- seek(fd, 0, 0);
- fprint(fd, "%d\n", i);
- sysunlock(l);
- close(fd);
- }
|