lumpqueue.c 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. #include "stdinc.h"
  2. #include "dat.h"
  3. #include "fns.h"
  4. typedef struct LumpQueue LumpQueue;
  5. typedef struct WLump WLump;
  6. enum
  7. {
  8. MaxLumpQ = 1 << 3 /* max. lumps on a single write queue, must be pow 2 */
  9. };
  10. struct WLump
  11. {
  12. Lump *u;
  13. Packet *p;
  14. int creator;
  15. int gen;
  16. uint ms;
  17. };
  18. struct LumpQueue
  19. {
  20. QLock lock;
  21. Rendez flush;
  22. Rendez full;
  23. Rendez empty;
  24. WLump q[MaxLumpQ];
  25. int w;
  26. int r;
  27. };
  28. static LumpQueue *lumpqs;
  29. static int nqs;
  30. static QLock glk;
  31. static int gen;
  32. static void queueproc(void *vq);
  33. int
  34. initlumpqueues(int nq)
  35. {
  36. LumpQueue *q;
  37. int i;
  38. nqs = nq;
  39. lumpqs = MKNZ(LumpQueue, nq);
  40. for(i = 0; i < nq; i++){
  41. q = &lumpqs[i];
  42. q->full.l = &q->lock;
  43. q->empty.l = &q->lock;
  44. q->flush.l = &q->lock;
  45. if(vtproc(queueproc, q) < 0){
  46. seterr(EOk, "can't start write queue slave: %r");
  47. return -1;
  48. }
  49. }
  50. return 0;
  51. }
  52. /*
  53. * queue a lump & it's packet data for writing
  54. */
  55. int
  56. queuewrite(Lump *u, Packet *p, int creator, uint ms)
  57. {
  58. LumpQueue *q;
  59. int i;
  60. trace(TraceProc, "queuewrite");
  61. i = indexsect(mainindex, u->score);
  62. if(i < 0 || i >= nqs){
  63. seterr(EBug, "internal error: illegal index section in queuewrite");
  64. return -1;
  65. }
  66. q = &lumpqs[i];
  67. qlock(&q->lock);
  68. while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){
  69. trace(TraceProc, "queuewrite sleep");
  70. rsleep(&q->full);
  71. }
  72. q->q[q->w].u = u;
  73. q->q[q->w].p = p;
  74. q->q[q->w].creator = creator;
  75. q->q[q->w].ms = ms;
  76. q->q[q->w].gen = gen;
  77. q->w = (q->w + 1) & (MaxLumpQ - 1);
  78. trace(TraceProc, "queuewrite wakeup");
  79. rwakeup(&q->empty);
  80. qunlock(&q->lock);
  81. return 0;
  82. }
  83. void
  84. flushqueue(void)
  85. {
  86. int i;
  87. LumpQueue *q;
  88. if(!lumpqs)
  89. return;
  90. trace(TraceProc, "flushqueue");
  91. qlock(&glk);
  92. gen++;
  93. qunlock(&glk);
  94. for(i=0; i<mainindex->nsects; i++){
  95. q = &lumpqs[i];
  96. qlock(&q->lock);
  97. while(q->w != q->r && gen - q->q[q->r].gen > 0){
  98. trace(TraceProc, "flushqueue sleep q%d", i);
  99. rsleep(&q->flush);
  100. }
  101. qunlock(&q->lock);
  102. }
  103. }
  104. static void
  105. queueproc(void *vq)
  106. {
  107. LumpQueue *q;
  108. Lump *u;
  109. Packet *p;
  110. int creator;
  111. uint ms;
  112. threadsetname("queueproc");
  113. q = vq;
  114. for(;;){
  115. qlock(&q->lock);
  116. while(q->w == q->r){
  117. trace(TraceProc, "queueproc sleep empty");
  118. rsleep(&q->empty);
  119. }
  120. u = q->q[q->r].u;
  121. p = q->q[q->r].p;
  122. creator = q->q[q->r].creator;
  123. ms = q->q[q->r].ms;
  124. q->r = (q->r + 1) & (MaxLumpQ - 1);
  125. trace(TraceProc, "queueproc wakeup flush");
  126. rwakeupall(&q->flush);
  127. trace(TraceProc, "queueproc wakeup full");
  128. rwakeup(&q->full);
  129. qunlock(&q->lock);
  130. trace(TraceProc, "queueproc writelump %V", u->score);
  131. if(writeqlump(u, p, creator, ms) < 0)
  132. fprint(2, "failed to write lump for %V: %r", u->score);
  133. trace(TraceProc, "queueproc wrotelump %V", u->score);
  134. putlump(u);
  135. }
  136. }