disk.c 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. #include "stdinc.h"
  2. #include "dat.h"
  3. #include "fns.h"
  4. #include "error.h"
  5. static void diskThread(void *a);
  6. enum {
  7. QueueSize = 100, /* maximum block to queue */
  8. };
  9. struct Disk {
  10. VtLock *lk;
  11. int ref;
  12. int fd;
  13. Header h;
  14. VtRendez *flow;
  15. VtRendez *starve;
  16. VtRendez *flush;
  17. VtRendez *die;
  18. int nqueue;
  19. Block *cur; /* block to do on current scan */
  20. Block *next; /* blocks to do next scan */
  21. };
  22. Disk *
  23. diskAlloc(int fd)
  24. {
  25. u8int buf[HeaderSize];
  26. Header h;
  27. Disk *disk;
  28. if(pread(fd, buf, HeaderSize, HeaderOffset) < HeaderSize){
  29. vtOSError();
  30. return nil;
  31. }
  32. if(!headerUnpack(&h, buf))
  33. return nil;
  34. disk = vtMemAllocZ(sizeof(Disk));
  35. disk->lk = vtLockAlloc();
  36. disk->starve = vtRendezAlloc(disk->lk);
  37. disk->flow = vtRendezAlloc(disk->lk);
  38. disk->flush = vtRendezAlloc(disk->lk);
  39. disk->fd = fd;
  40. disk->h = h;
  41. disk->ref = 2;
  42. vtThread(diskThread, disk);
  43. return disk;
  44. }
  45. void
  46. diskFree(Disk *disk)
  47. {
  48. diskFlush(disk);
  49. /* kill slave */
  50. vtLock(disk->lk);
  51. disk->die = vtRendezAlloc(disk->lk);
  52. vtWakeup(disk->starve);
  53. while(disk->ref > 1)
  54. vtSleep(disk->die);
  55. vtUnlock(disk->lk);
  56. vtRendezFree(disk->flow);
  57. vtRendezFree(disk->starve);
  58. vtRendezFree(disk->die);
  59. vtLockFree(disk->lk);
  60. close(disk->fd);
  61. vtMemFree(disk);
  62. }
  63. static u32int
  64. partStart(Disk *disk, int part)
  65. {
  66. switch(part){
  67. default:
  68. assert(0);
  69. case PartSuper:
  70. return disk->h.super;
  71. case PartLabel:
  72. return disk->h.label;
  73. case PartData:
  74. return disk->h.data;
  75. }
  76. }
  77. static u32int
  78. partEnd(Disk *disk, int part)
  79. {
  80. switch(part){
  81. default:
  82. assert(0);
  83. case PartSuper:
  84. return disk->h.super+1;
  85. case PartLabel:
  86. return disk->h.data;
  87. case PartData:
  88. return disk->h.end;
  89. }
  90. }
  91. int
  92. diskReadRaw(Disk *disk, int part, u32int addr, uchar *buf)
  93. {
  94. ulong start, end;
  95. u64int offset;
  96. int n, nn;
  97. start = partStart(disk, part);
  98. end = partEnd(disk, part);
  99. if(addr >= end-start){
  100. vtSetError(EBadAddr);
  101. return 0;
  102. }
  103. offset = ((u64int)(addr + start))*disk->h.blockSize;
  104. n = disk->h.blockSize;
  105. while(n > 0){
  106. nn = pread(disk->fd, buf, n, offset);
  107. if(nn < 0){
  108. vtOSError();
  109. return 0;
  110. }
  111. if(nn == 0){
  112. vtSetError(EIO);
  113. return 0;
  114. }
  115. n -= nn;
  116. offset += nn;
  117. buf += nn;
  118. }
  119. return 1;
  120. }
  121. int
  122. diskWriteRaw(Disk *disk, int part, u32int addr, uchar *buf)
  123. {
  124. ulong start, end;
  125. u64int offset;
  126. int n;
  127. start = partStart(disk, part);
  128. end = partEnd(disk, part);
  129. if(addr >= end-start){
  130. vtSetError(EBadAddr);
  131. return 0;
  132. }
  133. offset = ((u64int)(addr + start))*disk->h.blockSize;
  134. n = pwrite(disk->fd, buf, disk->h.blockSize, offset);
  135. if(n < 0){
  136. vtOSError();
  137. return 0;
  138. }
  139. if(n < disk->h.blockSize) {
  140. vtSetError("short write");
  141. return 0;
  142. }
  143. return 1;
  144. }
  145. static void
  146. diskQueue(Disk *disk, Block *b)
  147. {
  148. Block **bp, *bb;
  149. vtLock(disk->lk);
  150. while(disk->nqueue >= QueueSize)
  151. vtSleep(disk->flow);
  152. if(disk->cur == nil || b->addr > disk->cur->addr)
  153. bp = &disk->cur;
  154. else
  155. bp = &disk->next;
  156. for(bb=*bp; bb; bb=*bp){
  157. if(b->addr < bb->addr)
  158. break;
  159. bp = &bb->ionext;
  160. }
  161. b->ionext = bb;
  162. *bp = b;
  163. if(disk->nqueue == 0)
  164. vtWakeup(disk->starve);
  165. disk->nqueue++;
  166. vtUnlock(disk->lk);
  167. }
  168. void
  169. diskRead(Disk *disk, Block *b)
  170. {
  171. assert(b->iostate == BioEmpty || b->iostate == BioLabel);
  172. blockSetIOState(b, BioReading);
  173. diskQueue(disk, b);
  174. }
  175. void
  176. diskWrite(Disk *disk, Block *b)
  177. {
  178. assert(b->iostate == BioDirty);
  179. blockSetIOState(b, BioWriting);
  180. diskQueue(disk, b);
  181. }
  182. int
  183. diskBlockSize(Disk *disk)
  184. {
  185. return disk->h.blockSize; /* immuttable */
  186. }
  187. int
  188. diskFlush(Disk *disk)
  189. {
  190. Dir dir;
  191. vtLock(disk->lk);
  192. while(disk->nqueue > 0)
  193. vtSleep(disk->flush);
  194. vtUnlock(disk->lk);
  195. /* there really should be a cleaner interface to flush an fd */
  196. nulldir(&dir);
  197. if(dirfwstat(disk->fd, &dir) < 0){
  198. vtOSError();
  199. return 0;
  200. }
  201. return 1;
  202. }
  203. u32int
  204. diskSize(Disk *disk, int part)
  205. {
  206. return partEnd(disk, part) - partStart(disk, part);
  207. }
  208. static void
  209. diskThread(void *a)
  210. {
  211. Disk *disk = a;
  212. Block *b;
  213. uchar *buf, *p;
  214. double t;
  215. int nio;
  216. vtThreadSetName("disk");
  217. fprint(2, "diskThread %d\n", getpid());
  218. buf = vtMemAlloc(disk->h.blockSize);
  219. vtLock(disk->lk);
  220. nio = 0;
  221. t = -nsec();
  222. for(;;){
  223. while(disk->nqueue == 0){
  224. t += nsec();
  225. if(nio >= 10000){
  226. fprint(2, "disk: io=%d at %.3fms\n", nio, t*1e-6/nio);
  227. nio = 0;
  228. t = 0.;
  229. }
  230. if(disk->die != nil)
  231. goto Done;
  232. vtSleep(disk->starve);
  233. t -= nsec();
  234. }
  235. assert(disk->cur != nil || disk->next != nil);
  236. if(disk->cur == nil){
  237. disk->cur = disk->next;
  238. disk->next = nil;
  239. }
  240. b = disk->cur;
  241. disk->cur = b->ionext;
  242. vtUnlock(disk->lk);
  243. /*
  244. * no one should hold onto blocking in the
  245. * reading or writing state, so this lock should
  246. * not cause deadlock.
  247. */
  248. if(0)fprint(2, "diskThread: %d:%d %x\n", getpid(), b->part, b->addr);
  249. bwatchLock(b);
  250. vtLock(b->lk);
  251. assert(b->nlock == 1);
  252. switch(b->iostate){
  253. default:
  254. abort();
  255. case BioReading:
  256. if(!diskReadRaw(disk, b->part, b->addr, b->data)){
  257. fprint(2, "diskReadRaw failed: part=%d addr=%ux: %r\n", b->part, b->addr);
  258. blockSetIOState(b, BioReadError);
  259. }else
  260. blockSetIOState(b, BioClean);
  261. break;
  262. case BioWriting:
  263. p = blockRollback(b, buf);
  264. if(!diskWriteRaw(disk, b->part, b->addr, p)){
  265. fprint(2, "diskWriteRaw failed: date=%s part=%d addr=%ux: %r\n", ctime(times(0)), b->part, b->addr);
  266. break;
  267. }
  268. if(p != buf)
  269. blockSetIOState(b, BioClean);
  270. else
  271. blockSetIOState(b, BioDirty);
  272. break;
  273. }
  274. blockPut(b); /* remove extra reference, unlock */
  275. vtLock(disk->lk);
  276. disk->nqueue--;
  277. if(disk->nqueue == QueueSize-1)
  278. vtWakeup(disk->flow);
  279. if(disk->nqueue == 0)
  280. vtWakeup(disk->flush);
  281. nio++;
  282. }
  283. Done:
  284. fprint(2, "diskThread done\n");
  285. disk->ref--;
  286. vtWakeup(disk->die);
  287. vtUnlock(disk->lk);
  288. vtMemFree(buf);
  289. }