sortientry.c 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. #include "stdinc.h"
  2. #include "dat.h"
  3. #include "fns.h"
  4. #include <bio.h>
  5. typedef struct IEBuck IEBuck;
  6. typedef struct IEBucks IEBucks;
  7. enum
  8. {
  9. ClumpChunks = 32*1024
  10. };
  11. struct IEBuck
  12. {
  13. u32int head; /* head of chain of chunks on the disk */
  14. u32int used; /* usage of the last chunk */
  15. u64int total; /* total number of bytes in this bucket */
  16. u8int *buf; /* chunk of entries for this bucket */
  17. };
  18. struct IEBucks
  19. {
  20. Part *part;
  21. u64int off; /* offset for writing data in the partition */
  22. u32int chunks; /* total chunks written to fd */
  23. u64int max; /* max bytes entered in any one bucket */
  24. int bits; /* number of bits in initial bucket sort */
  25. int nbucks; /* 1 << bits, the number of buckets */
  26. u32int size; /* bytes in each of the buckets chunks */
  27. u32int usable; /* amount usable for IEntry data */
  28. u8int *buf; /* buffer for all chunks */
  29. u8int *xbuf;
  30. IEBuck *bucks;
  31. };
  32. #define U32GET(p) (((p)[0]<<24)|((p)[1]<<16)|((p)[2]<<8)|(p)[3])
  33. #define U32PUT(p,v) (p)[0]=(v)>>24;(p)[1]=(v)>>16;(p)[2]=(v)>>8;(p)[3]=(v)
  34. static IEBucks *initiebucks(Part *part, int bits, u32int size);
  35. static int flushiebuck(IEBucks *ib, int b, int reset);
  36. static int flushiebucks(IEBucks *ib);
  37. static u32int sortiebuck(IEBucks *ib, int b);
  38. static u64int sortiebucks(IEBucks *ib);
  39. static int sprayientry(IEBucks *ib, IEntry *ie);
  40. static u32int readarenainfo(IEBucks *ib, Arena *arena, u64int a, Bloom *b);
  41. static u32int readiebuck(IEBucks *ib, int b);
  42. static void freeiebucks(IEBucks *ib);
  43. /*
  44. * build a sorted file with all IEntries which should be in ix.
  45. * assumes the arenas' directories are up to date.
  46. * reads each, converts the entries to index entries,
  47. * and sorts them.
  48. */
  49. u64int
  50. sortrawientries(Index *ix, Part *tmp, u64int *base, Bloom *bloom)
  51. {
  52. IEBucks *ib;
  53. u64int clumps, sorted;
  54. u32int n;
  55. int i, ok;
  56. /* ZZZ should allow configuration of bits, bucket size */
  57. ib = initiebucks(tmp, 8, 64*1024);
  58. if(ib == nil){
  59. seterr(EOk, "can't create sorting buckets: %r");
  60. return TWID64;
  61. }
  62. ok = 0;
  63. clumps = 0;
  64. fprint(2, "constructing entry list\n");
  65. for(i = 0; i < ix->narenas; i++){
  66. n = readarenainfo(ib, ix->arenas[i], ix->amap[i].start, bloom);
  67. if(n == TWID32){
  68. ok = -1;
  69. break;
  70. }
  71. clumps += n;
  72. }
  73. fprint(2, "sorting %lld entries\n", clumps);
  74. if(ok == 0){
  75. sorted = sortiebucks(ib);
  76. *base = (u64int)ib->chunks * ib->size;
  77. if(sorted != clumps){
  78. fprint(2, "sorting messed up: clumps=%lld sorted=%lld\n", clumps, sorted);
  79. ok = -1;
  80. }
  81. }
  82. freeiebucks(ib);
  83. if(ok < 0)
  84. return TWID64;
  85. return clumps;
  86. }
  87. #define CHECK(cis) if(((ulong*)cis)[-4] != 0xA110C09) xabort();
  88. void
  89. xabort(void)
  90. {
  91. int *x;
  92. x = 0;
  93. *x = 0;
  94. }
  95. /*
  96. * read in all of the arena's clump directory,
  97. * convert to IEntry format, and bucket sort based
  98. * on the first few bits.
  99. */
  100. static u32int
  101. readarenainfo(IEBucks *ib, Arena *arena, u64int a, Bloom *b)
  102. {
  103. IEntry ie;
  104. ClumpInfo *ci, *cis;
  105. u32int clump;
  106. int i, n, ok, nskip;
  107. if(arena->memstats.clumps)
  108. fprint(2, "\tarena %s: %d entries\n", arena->name, arena->memstats.clumps);
  109. else
  110. fprint(2, "[%s] ", arena->name);
  111. cis = MKN(ClumpInfo, ClumpChunks);
  112. ok = 0;
  113. nskip = 0;
  114. memset(&ie, 0, sizeof(IEntry));
  115. for(clump = 0; clump < arena->memstats.clumps; clump += n){
  116. n = ClumpChunks;
  117. if(n > arena->memstats.clumps - clump)
  118. n = arena->memstats.clumps - clump;
  119. if(readclumpinfos(arena, clump, cis, n) != n){
  120. seterr(EOk, "arena directory read failed: %r");
  121. ok = -1;
  122. break;
  123. }
  124. for(i = 0; i < n; i++){
  125. ci = &cis[i];
  126. ie.ia.type = ci->type;
  127. ie.ia.size = ci->uncsize;
  128. ie.ia.addr = a;
  129. a += ci->size + ClumpSize;
  130. ie.ia.blocks = (ci->size + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog;
  131. scorecp(ie.score, ci->score);
  132. if(ci->type == VtCorruptType){
  133. if(0) print("! %V %22lld %3d %5d %3d\n",
  134. ie.score, ie.ia.addr, ie.ia.type, ie.ia.size, ie.ia.blocks);
  135. nskip++;
  136. }else
  137. sprayientry(ib, &ie);
  138. markbloomfilter(b, ie.score);
  139. }
  140. }
  141. free(cis);
  142. if(ok < 0)
  143. return TWID32;
  144. return clump - nskip;
  145. }
  146. /*
  147. * initialize the external bucket sorting data structures
  148. */
  149. static IEBucks*
  150. initiebucks(Part *part, int bits, u32int size)
  151. {
  152. IEBucks *ib;
  153. int i;
  154. ib = MKZ(IEBucks);
  155. if(ib == nil){
  156. seterr(EOk, "out of memory");
  157. return nil;
  158. }
  159. ib->bits = bits;
  160. ib->nbucks = 1 << bits;
  161. ib->size = size;
  162. ib->usable = (size - U32Size) / IEntrySize * IEntrySize;
  163. ib->bucks = MKNZ(IEBuck, ib->nbucks);
  164. if(ib->bucks == nil){
  165. seterr(EOk, "out of memory allocation sorting buckets");
  166. freeiebucks(ib);
  167. return nil;
  168. }
  169. ib->xbuf = MKN(u8int, size * ((1 << bits)+1));
  170. ib->buf = (u8int*)(((uintptr)ib->xbuf+size-1)&~(uintptr)(size-1));
  171. if(ib->buf == nil){
  172. seterr(EOk, "out of memory allocating sorting buckets' buffers");
  173. freeiebucks(ib);
  174. return nil;
  175. }
  176. for(i = 0; i < ib->nbucks; i++){
  177. ib->bucks[i].head = TWID32;
  178. ib->bucks[i].buf = &ib->buf[i * size];
  179. }
  180. ib->part = part;
  181. return ib;
  182. }
  183. static void
  184. freeiebucks(IEBucks *ib)
  185. {
  186. if(ib == nil)
  187. return;
  188. free(ib->bucks);
  189. free(ib->buf);
  190. free(ib);
  191. }
  192. /*
  193. * initial sort: put the entry into the correct bucket
  194. */
  195. static int
  196. sprayientry(IEBucks *ib, IEntry *ie)
  197. {
  198. u32int n;
  199. int b;
  200. b = hashbits(ie->score, ib->bits);
  201. n = ib->bucks[b].used;
  202. if(n + IEntrySize > ib->usable){
  203. /* should be flushed below, but if flush fails, this can happen */
  204. seterr(EOk, "out of space in bucket");
  205. return -1;
  206. }
  207. packientry(ie, &ib->bucks[b].buf[n]);
  208. n += IEntrySize;
  209. ib->bucks[b].used = n;
  210. if(n + IEntrySize <= ib->usable)
  211. return 0;
  212. return flushiebuck(ib, b, 1);
  213. }
  214. /*
  215. * finish sorting:
  216. * for each bucket, read it in and sort it
  217. * write out the the final file
  218. */
  219. static u64int
  220. sortiebucks(IEBucks *ib)
  221. {
  222. u64int tot;
  223. u32int n;
  224. int i;
  225. if(flushiebucks(ib) < 0)
  226. return TWID64;
  227. for(i = 0; i < ib->nbucks; i++)
  228. ib->bucks[i].buf = nil;
  229. ib->off = (u64int)ib->chunks * ib->size;
  230. free(ib->xbuf);
  231. ib->buf = MKN(u8int, ib->max + U32Size);
  232. if(ib->buf == nil){
  233. seterr(EOk, "out of memory allocating final sorting buffer; try more buckets");
  234. return TWID64;
  235. }
  236. tot = 0;
  237. for(i = 0; i < ib->nbucks; i++){
  238. n = sortiebuck(ib, i);
  239. if(n == TWID32)
  240. return TWID64;
  241. if(n != ib->bucks[i].total/IEntrySize)
  242. fprint(2, "bucket %d changed count %d => %d\n",
  243. i, (int)(ib->bucks[i].total/IEntrySize), n);
  244. tot += n;
  245. }
  246. return tot;
  247. }
  248. /*
  249. * sort from bucket b of ib into the output file to
  250. */
  251. static u32int
  252. sortiebuck(IEBucks *ib, int b)
  253. {
  254. u32int n;
  255. n = readiebuck(ib, b);
  256. if(n == TWID32)
  257. return TWID32;
  258. qsort(ib->buf, n, IEntrySize, ientrycmp);
  259. if(writepart(ib->part, ib->off, ib->buf, n*IEntrySize) < 0){
  260. seterr(EOk, "can't write sorted bucket: %r");
  261. return TWID32;
  262. }
  263. ib->off += n * IEntrySize;
  264. return n;
  265. }
  266. /*
  267. * write out a single bucket
  268. */
  269. static int
  270. flushiebuck(IEBucks *ib, int b, int reset)
  271. {
  272. u32int n;
  273. if(ib->bucks[b].used == 0)
  274. return 0;
  275. n = ib->bucks[b].used;
  276. U32PUT(&ib->bucks[b].buf[n], ib->bucks[b].head);
  277. n += U32Size;
  278. USED(n);
  279. if(writepart(ib->part, (u64int)ib->chunks * ib->size, ib->bucks[b].buf, ib->size) < 0){
  280. seterr(EOk, "can't write sorting bucket to file: %r");
  281. xabort();
  282. return -1;
  283. }
  284. ib->bucks[b].head = ib->chunks++;
  285. ib->bucks[b].total += ib->bucks[b].used;
  286. if(reset)
  287. ib->bucks[b].used = 0;
  288. return 0;
  289. }
  290. /*
  291. * write out all of the buckets, and compute
  292. * the maximum size of any bucket
  293. */
  294. static int
  295. flushiebucks(IEBucks *ib)
  296. {
  297. int i;
  298. for(i = 0; i < ib->nbucks; i++){
  299. if(flushiebuck(ib, i, 0) < 0)
  300. return -1;
  301. if(ib->bucks[i].total > ib->max)
  302. ib->max = ib->bucks[i].total;
  303. }
  304. return 0;
  305. }
  306. /*
  307. * read in the chained buffers for bucket b,
  308. * and return it's total number of IEntries
  309. */
  310. static u32int
  311. readiebuck(IEBucks *ib, int b)
  312. {
  313. u32int head, m, n;
  314. head = ib->bucks[b].head;
  315. n = 0;
  316. m = ib->bucks[b].used;
  317. if(m == 0)
  318. m = ib->usable;
  319. if(0) if(ib->bucks[b].total)
  320. fprint(2, "\tbucket %d: %lld entries\n", b, ib->bucks[b].total/IEntrySize);
  321. while(head != TWID32){
  322. if(readpart(ib->part, (u64int)head * ib->size, &ib->buf[n], m+U32Size) < 0){
  323. seterr(EOk, "can't read index sort bucket: %r");
  324. return TWID32;
  325. }
  326. n += m;
  327. head = U32GET(&ib->buf[n]);
  328. m = ib->usable;
  329. }
  330. if(n != ib->bucks[b].total)
  331. fprint(2, "\tbucket %d: expected %d entries, got %d\n",
  332. b, (int)ib->bucks[b].total/IEntrySize, n/IEntrySize);
  333. return n / IEntrySize;
  334. }