123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- #include "stdinc.h"
- #include "dat.h"
- #include "fns.h"
- typedef struct LumpQueue LumpQueue;
- typedef struct WLump WLump;
- enum
- {
- MaxLumpQ = 1 << 3 /* max. lumps on a single write queue, must be pow 2 */
- };
- struct WLump
- {
- Lump *u;
- Packet *p;
- int creator;
- int gen;
- uint ms;
- };
- struct LumpQueue
- {
- QLock lock;
- Rendez flush;
- Rendez full;
- Rendez empty;
- WLump q[MaxLumpQ];
- int w;
- int r;
- };
- static LumpQueue *lumpqs;
- static int nqs;
- static QLock glk;
- static int gen;
- static void queueproc(void *vq);
- int
- initlumpqueues(int nq)
- {
- LumpQueue *q;
- int i;
- nqs = nq;
- lumpqs = MKNZ(LumpQueue, nq);
- for(i = 0; i < nq; i++){
- q = &lumpqs[i];
- q->full.l = &q->lock;
- q->empty.l = &q->lock;
- q->flush.l = &q->lock;
- if(vtproc(queueproc, q) < 0){
- seterr(EOk, "can't start write queue slave: %r");
- return -1;
- }
- }
- return 0;
- }
- /*
- * queue a lump & it's packet data for writing
- */
- int
- queuewrite(Lump *u, Packet *p, int creator, uint ms)
- {
- LumpQueue *q;
- int i;
- trace(TraceProc, "queuewrite");
- i = indexsect(mainindex, u->score);
- if(i < 0 || i >= nqs){
- seterr(EBug, "internal error: illegal index section in queuewrite");
- return -1;
- }
- q = &lumpqs[i];
- qlock(&q->lock);
- while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){
- trace(TraceProc, "queuewrite sleep");
- rsleep(&q->full);
- }
- q->q[q->w].u = u;
- q->q[q->w].p = p;
- q->q[q->w].creator = creator;
- q->q[q->w].ms = ms;
- q->q[q->w].gen = gen;
- q->w = (q->w + 1) & (MaxLumpQ - 1);
- trace(TraceProc, "queuewrite wakeup");
- rwakeup(&q->empty);
- qunlock(&q->lock);
- return 0;
- }
- void
- flushqueue(void)
- {
- int i;
- LumpQueue *q;
- if(!lumpqs)
- return;
- trace(TraceProc, "flushqueue");
- qlock(&glk);
- gen++;
- qunlock(&glk);
- for(i=0; i<mainindex->nsects; i++){
- q = &lumpqs[i];
- qlock(&q->lock);
- while(q->w != q->r && gen - q->q[q->r].gen > 0){
- trace(TraceProc, "flushqueue sleep q%d", i);
- rsleep(&q->flush);
- }
- qunlock(&q->lock);
- }
- }
-
- static void
- queueproc(void *vq)
- {
- LumpQueue *q;
- Lump *u;
- Packet *p;
- int creator;
- uint ms;
- threadsetname("queueproc");
- q = vq;
- for(;;){
- qlock(&q->lock);
- while(q->w == q->r){
- trace(TraceProc, "queueproc sleep empty");
- rsleep(&q->empty);
- }
- u = q->q[q->r].u;
- p = q->q[q->r].p;
- creator = q->q[q->r].creator;
- ms = q->q[q->r].ms;
- q->r = (q->r + 1) & (MaxLumpQ - 1);
- trace(TraceProc, "queueproc wakeup flush");
- rwakeupall(&q->flush);
- trace(TraceProc, "queueproc wakeup full");
- rwakeup(&q->full);
- qunlock(&q->lock);
- trace(TraceProc, "queueproc writelump %V", u->score);
- if(writeqlump(u, p, creator, ms) < 0)
- fprint(2, "failed to write lump for %V: %r", u->score);
- trace(TraceProc, "queueproc wrotelump %V", u->score);
- putlump(u);
- }
- }
|