lumpqueue.c 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. #include "stdinc.h"
  2. #include "dat.h"
  3. #include "fns.h"
  4. typedef struct LumpQueue LumpQueue;
  5. typedef struct WLump WLump;
  6. enum
  7. {
  8. MaxLumpQ = 1 << 3 /* max. lumps on a single write queue, must be pow 2 */
  9. };
  10. struct WLump
  11. {
  12. Lump *u;
  13. Packet *p;
  14. int creator;
  15. int gen;
  16. };
  17. struct LumpQueue
  18. {
  19. VtLock *lock;
  20. VtRendez *flush;
  21. VtRendez *full;
  22. VtRendez *empty;
  23. WLump q[MaxLumpQ];
  24. int w;
  25. int r;
  26. };
  27. static LumpQueue *lumpqs;
  28. static int nqs;
  29. static VtLock *glk;
  30. static int gen;
  31. static void doQueue(void *vq);
  32. int
  33. initLumpQueues(int nq)
  34. {
  35. LumpQueue *q;
  36. int i;
  37. nqs = nq;
  38. glk = vtLockAlloc();
  39. lumpqs = MKNZ(LumpQueue, nq);
  40. for(i = 0; i < nq; i++){
  41. q = &lumpqs[i];
  42. q->lock = vtLockAlloc();
  43. q->full = vtRendezAlloc(q->lock);
  44. q->empty = vtRendezAlloc(q->lock);
  45. q->flush = vtRendezAlloc(q->lock);
  46. if(vtThread(doQueue, q) < 0){
  47. setErr(EOk, "can't start write queue slave: %R");
  48. return 0;
  49. }
  50. }
  51. return 1;
  52. }
  53. /*
  54. * queue a lump & it's packet data for writing
  55. */
  56. int
  57. queueWrite(Lump *u, Packet *p, int creator)
  58. {
  59. LumpQueue *q;
  60. int i;
  61. i = indexSect(mainIndex, u->score);
  62. if(i < 0 || i >= nqs){
  63. setErr(EBug, "internal error: illegal index section in queueWrite");
  64. return 0;
  65. }
  66. q = &lumpqs[i];
  67. vtLock(q->lock);
  68. while(q->r == ((q->w + 1) & (MaxLumpQ - 1)))
  69. vtSleep(q->full);
  70. q->q[q->w].u = u;
  71. q->q[q->w].p = p;
  72. q->q[q->w].creator = creator;
  73. q->q[q->w].gen = gen;
  74. q->w = (q->w + 1) & (MaxLumpQ - 1);
  75. vtWakeup(q->empty);
  76. vtUnlock(q->lock);
  77. return 1;
  78. }
  79. void
  80. queueFlush(void)
  81. {
  82. int i;
  83. LumpQueue *q;
  84. vtLock(glk);
  85. gen++;
  86. vtUnlock(glk);
  87. for(i=0; i<mainIndex->nsects; i++){
  88. q = &lumpqs[i];
  89. vtLock(q->lock);
  90. while(q->w != q->r && gen - q->q[q->r].gen > 0)
  91. vtSleep(q->flush);
  92. vtUnlock(q->lock);
  93. }
  94. }
  95. static void
  96. doQueue(void *vq)
  97. {
  98. LumpQueue *q;
  99. Lump *u;
  100. Packet *p;
  101. int creator;
  102. q = vq;
  103. for(;;){
  104. vtLock(q->lock);
  105. while(q->w == q->r)
  106. vtSleep(q->empty);
  107. u = q->q[q->r].u;
  108. p = q->q[q->r].p;
  109. creator = q->q[q->r].creator;
  110. vtWakeup(q->full);
  111. vtUnlock(q->lock);
  112. if(!writeQLump(u, p, creator))
  113. fprint(2, "failed to write lump for %V: %R", u->score);
  114. vtLock(q->lock);
  115. q->r = (q->r + 1) & (MaxLumpQ - 1);
  116. vtWakeup(q->flush);
  117. vtUnlock(q->lock);
  118. putLump(u);
  119. }
  120. }