buildindex.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961
  1. /*
  2. * Rebuild the index from scratch, in place.
  3. */
  4. #include "stdinc.h"
  5. #include "dat.h"
  6. #include "fns.h"
  7. enum
  8. {
  9. MinBufSize = 64*1024,
  10. MaxBufSize = 4*1024*1024,
  11. };
  12. int dumb;
  13. int errors;
  14. char **isect;
  15. int nisect;
  16. int bloom;
  17. int zero;
  18. u32int isectmem;
  19. u64int totalbuckets;
  20. u64int totalclumps;
  21. Channel *arenadonechan;
  22. Channel *isectdonechan;
  23. Index *ix;
  24. u64int arenaentries;
  25. u64int skipentries;
  26. u64int indexentries;
  27. static int shouldprocess(ISect*);
  28. static void isectproc(void*);
  29. static void arenapartproc(void*);
  30. void
  31. usage(void)
  32. {
  33. fprint(2, "usage: buildindex [-b] [-i isect]... [-M imem] venti.conf\n");
  34. threadexitsall("usage");
  35. }
  36. void
  37. threadmain(int argc, char *argv[])
  38. {
  39. int fd, i, napart, nfinish, maxdisks;
  40. u32int bcmem, imem;
  41. Config conf;
  42. Part *p;
  43. maxdisks = 100000;
  44. ventifmtinstall();
  45. imem = 256*1024*1024;
  46. ARGBEGIN{
  47. case 'b':
  48. bloom = 1;
  49. break;
  50. case 'd': /* debugging - make sure to run all 3 passes */
  51. dumb = 1;
  52. break;
  53. case 'i':
  54. isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0]));
  55. isect[nisect++] = EARGF(usage());
  56. break;
  57. case 'M':
  58. imem = unittoull(EARGF(usage()));
  59. break;
  60. case 'm': /* temporary - might go away */
  61. maxdisks = atoi(EARGF(usage()));
  62. break;
  63. default:
  64. usage();
  65. break;
  66. }ARGEND
  67. if(argc != 1)
  68. usage();
  69. if(initventi(argv[0], &conf) < 0)
  70. sysfatal("can't init venti: %r");
  71. ix = mainindex;
  72. if(nisect == 0 && ix->bloom)
  73. bloom = 1;
  74. if(bloom && ix->bloom && resetbloom(ix->bloom) < 0)
  75. sysfatal("loadbloom: %r");
  76. if(bloom && !ix->bloom)
  77. sysfatal("-b specified but no bloom filter");
  78. if(!bloom)
  79. ix->bloom = nil;
  80. isectmem = imem/ix->nsects;
  81. /*
  82. * safety first - only need read access to arenas
  83. */
  84. p = nil;
  85. for(i=0; i<ix->narenas; i++){
  86. if(ix->arenas[i]->part != p){
  87. p = ix->arenas[i]->part;
  88. if((fd = open(p->filename, OREAD)) < 0)
  89. sysfatal("cannot reopen %s: %r", p->filename);
  90. dup(fd, p->fd);
  91. close(fd);
  92. }
  93. }
  94. /*
  95. * need a block for every arena
  96. */
  97. bcmem = maxblocksize * (mainindex->narenas + 16);
  98. if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
  99. initdcache(bcmem);
  100. totalclumps = 0;
  101. for(i=0; i<ix->narenas; i++)
  102. totalclumps += ix->arenas[i]->diskstats.clumps;
  103. totalbuckets = 0;
  104. for(i=0; i<ix->nsects; i++)
  105. totalbuckets += ix->sects[i]->blocks;
  106. fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets);
  107. /* start index procs */
  108. fprint(2, "%T read index\n");
  109. isectdonechan = chancreate(sizeof(void*), 0);
  110. for(i=0; i<ix->nsects; i++){
  111. if(shouldprocess(ix->sects[i])){
  112. ix->sects[i]->writechan = chancreate(sizeof(IEntry), 0);
  113. vtproc(isectproc, ix->sects[i]);
  114. }
  115. }
  116. for(i=0; i<nisect; i++)
  117. if(isect[i])
  118. fprint(2, "warning: did not find index section %s\n", isect[i]);
  119. /* start arena procs */
  120. p = nil;
  121. napart = 0;
  122. nfinish = 0;
  123. arenadonechan = chancreate(sizeof(void*), 0);
  124. for(i=0; i<ix->narenas; i++){
  125. if(ix->arenas[i]->part != p){
  126. p = ix->arenas[i]->part;
  127. vtproc(arenapartproc, p);
  128. if(++napart >= maxdisks){
  129. recvp(arenadonechan);
  130. nfinish++;
  131. }
  132. }
  133. }
  134. /* wait for arena procs to finish */
  135. for(nfinish=0; nfinish<napart; nfinish++)
  136. recvp(arenadonechan);
  137. /* tell index procs to finish */
  138. for(i=0; i<ix->nsects; i++)
  139. if(ix->sects[i]->writechan)
  140. send(ix->sects[i]->writechan, nil);
  141. /* wait for index procs to finish */
  142. for(i=0; i<ix->nsects; i++)
  143. if(ix->sects[i]->writechan)
  144. recvp(isectdonechan);
  145. if(ix->bloom && writebloom(ix->bloom) < 0)
  146. fprint(2, "writing bloom filter: %r\n");
  147. fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n",
  148. arenaentries, indexentries, skipentries);
  149. threadexitsall(nil);
  150. }
  151. static int
  152. shouldprocess(ISect *is)
  153. {
  154. int i;
  155. if(nisect == 0)
  156. return 1;
  157. for(i=0; i<nisect; i++)
  158. if(isect[i] && strcmp(isect[i], is->name) == 0){
  159. isect[i] = nil;
  160. return 1;
  161. }
  162. return 0;
  163. }
  164. static void
  165. add(u64int *a, u64int n)
  166. {
  167. static Lock l;
  168. lock(&l);
  169. *a += n;
  170. unlock(&l);
  171. }
  172. /*
  173. * Read through an arena partition and send each of its IEntries
  174. * to the appropriate index section. When finished, send on
  175. * arenadonechan.
  176. */
  177. enum
  178. {
  179. ClumpChunks = 32*1024,
  180. };
  181. static void
  182. arenapartproc(void *v)
  183. {
  184. int i, j, n, nskip, x;
  185. u32int clump;
  186. u64int addr, tot;
  187. Arena *a;
  188. ClumpInfo *ci, *cis;
  189. IEntry ie;
  190. Part *p;
  191. p = v;
  192. threadsetname("arenaproc %s", p->name);
  193. nskip = 0;
  194. tot = 0;
  195. cis = MKN(ClumpInfo, ClumpChunks);
  196. for(i=0; i<ix->narenas; i++){
  197. a = ix->arenas[i];
  198. if(a->part != p)
  199. continue;
  200. if(a->memstats.clumps)
  201. fprint(2, "%T arena %s: %d entries\n",
  202. a->name, a->memstats.clumps);
  203. /*
  204. * Running the loop backwards accesses the
  205. * clump info blocks forwards, since they are
  206. * stored in reverse order at the end of the arena.
  207. * This speeds things slightly.
  208. */
  209. addr = ix->amap[i].start + a->memstats.used;
  210. for(clump=a->memstats.clumps; clump > 0; clump-=n){
  211. n = ClumpChunks;
  212. if(n > clump)
  213. n = clump;
  214. if(readclumpinfos(a, clump-n, cis, n) != n){
  215. fprint(2, "%T arena %s: directory read: %r\n", a->name);
  216. errors = 1;
  217. break;
  218. }
  219. for(j=n-1; j>=0; j--){
  220. ci = &cis[j];
  221. ie.ia.type = ci->type;
  222. ie.ia.size = ci->uncsize;
  223. addr -= ci->size + ClumpSize;
  224. ie.ia.addr = addr;
  225. ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog;
  226. scorecp(ie.score, ci->score);
  227. if(ci->type == VtCorruptType)
  228. nskip++;
  229. else{
  230. tot++;
  231. x = indexsect(ix, ie.score);
  232. assert(0 <= x && x < ix->nsects);
  233. if(ix->sects[x]->writechan)
  234. send(ix->sects[x]->writechan, &ie);
  235. if(ix->bloom)
  236. markbloomfilter(ix->bloom, ie.score);
  237. }
  238. }
  239. }
  240. if(addr != ix->amap[i].start)
  241. fprint(2, "%T arena %s: clump miscalculation %lld != %lld\n", a->name, addr, ix->amap[i].start);
  242. }
  243. add(&arenaentries, tot);
  244. add(&skipentries, nskip);
  245. sendp(arenadonechan, p);
  246. }
  247. /*
  248. * Convert score into relative bucket number in isect.
  249. * Can pass a packed ientry instead of score - score is first.
  250. */
  251. static u32int
  252. score2bucket(ISect *is, uchar *score)
  253. {
  254. u32int b;
  255. b = hashbits(score, 32)/ix->div;
  256. if(b < is->start || b >= is->stop){
  257. fprint(2, "score2bucket: score=%V div=%d b=%ud start=%ud stop=%ud\n",
  258. score, ix->div, b, is->start, is->stop);
  259. }
  260. assert(is->start <= b && b < is->stop);
  261. return b - is->start;
  262. }
  263. /*
  264. * Convert offset in index section to bucket number.
  265. */
  266. static u32int
  267. offset2bucket(ISect *is, u64int offset)
  268. {
  269. u32int b;
  270. assert(is->blockbase <= offset);
  271. offset -= is->blockbase;
  272. b = offset/is->blocksize;
  273. assert(b < is->stop-is->start);
  274. return b;
  275. }
  276. /*
  277. * Convert bucket number to offset.
  278. */
  279. static u64int
  280. bucket2offset(ISect *is, u32int b)
  281. {
  282. assert(b <= is->stop-is->start);
  283. return is->blockbase + (u64int)b*is->blocksize;
  284. }
  285. /*
  286. * IEntry buffers to hold initial round of spraying.
  287. */
  288. typedef struct Buf Buf;
  289. struct Buf
  290. {
  291. Part *part; /* partition being written */
  292. uchar *bp; /* current block */
  293. uchar *ep; /* end of block */
  294. uchar *wp; /* write position in block */
  295. u64int boffset; /* start offset */
  296. u64int woffset; /* next write offset */
  297. u64int eoffset; /* end offset */
  298. u32int nentry; /* number of entries written */
  299. };
  300. static void
  301. bflush(Buf *buf)
  302. {
  303. u32int bufsize;
  304. if(buf->woffset >= buf->eoffset)
  305. sysfatal("buf index chunk overflow - need bigger index");
  306. bufsize = buf->ep - buf->bp;
  307. if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){
  308. fprint(2, "write %s: %r\n", buf->part->name);
  309. errors = 1;
  310. }
  311. buf->woffset += bufsize;
  312. memset(buf->bp, 0, bufsize);
  313. buf->wp = buf->bp;
  314. }
  315. static void
  316. bwrite(Buf *buf, IEntry *ie)
  317. {
  318. if(buf->wp+IEntrySize > buf->ep)
  319. bflush(buf);
  320. assert(buf->bp <= buf->wp && buf->wp < buf->ep);
  321. packientry(ie, buf->wp);
  322. buf->wp += IEntrySize;
  323. assert(buf->bp <= buf->wp && buf->wp <= buf->ep);
  324. buf->nentry++;
  325. }
  326. /*
  327. * Minibuffer. In-memory data structure holds our place
  328. * in the buffer but has no block data. We are writing and
  329. * reading the minibuffers at the same time. (Careful!)
  330. */
  331. typedef struct Minibuf Minibuf;
  332. struct Minibuf
  333. {
  334. u64int boffset; /* start offset */
  335. u64int roffset; /* read offset */
  336. u64int woffset; /* write offset */
  337. u64int eoffset; /* end offset */
  338. u32int nentry; /* # entries left to read */
  339. u32int nwentry; /* # entries written */
  340. };
  341. /*
  342. * Index entry pool. Used when trying to shuffle around
  343. * the entries in a big buffer into the corresponding M minibuffers.
  344. * Sized to hold M*EntriesPerBlock entries, so that there will always
  345. * either be room in the pool for another block worth of entries
  346. * or there will be an entire block worth of sorted entries to
  347. * write out.
  348. */
  349. typedef struct IEntryLink IEntryLink;
  350. typedef struct IPool IPool;
  351. struct IEntryLink
  352. {
  353. uchar ie[IEntrySize]; /* raw IEntry */
  354. IEntryLink *next; /* next in chain */
  355. };
  356. struct IPool
  357. {
  358. ISect *isect;
  359. u32int buck0; /* first bucket in pool */
  360. u32int mbufbuckets; /* buckets per minibuf */
  361. IEntryLink *entry; /* all IEntryLinks */
  362. u32int nentry; /* # of IEntryLinks */
  363. IEntryLink *free; /* free list */
  364. u32int nfree; /* # on free list */
  365. Minibuf *mbuf; /* all minibufs */
  366. u32int nmbuf; /* # of minibufs */
  367. IEntryLink **mlist; /* lists for each minibuf */
  368. u32int *mcount; /* # on each mlist[i] */
  369. u32int bufsize; /* block buffer size */
  370. uchar *rbuf; /* read buffer */
  371. uchar *wbuf; /* write buffer */
  372. u32int epbuf; /* entries per block buffer */
  373. };
  374. /*
  375. static int
  376. countsokay(IPool *p)
  377. {
  378. int i;
  379. u64int n;
  380. n = 0;
  381. for(i=0; i<p->nmbuf; i++)
  382. n += p->mcount[i];
  383. n += p->nfree;
  384. if(n != p->nentry){
  385. print("free %ud:", p->nfree);
  386. for(i=0; i<p->nmbuf; i++)
  387. print(" %ud", p->mcount[i]);
  388. print(" = %lld nentry: %ud\n", n, p->nentry);
  389. }
  390. return n == p->nentry;
  391. }
  392. */
  393. static IPool*
  394. mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf,
  395. u32int mbufbuckets, u32int bufsize)
  396. {
  397. u32int i, nentry;
  398. uchar *data;
  399. IPool *p;
  400. IEntryLink *l;
  401. nentry = (nmbuf+1)*bufsize / IEntrySize;
  402. p = ezmalloc(sizeof(IPool)
  403. +nentry*sizeof(IEntry)
  404. +nmbuf*sizeof(IEntryLink*)
  405. +nmbuf*sizeof(u32int)
  406. +3*bufsize);
  407. p->isect = isect;
  408. p->mbufbuckets = mbufbuckets;
  409. p->bufsize = bufsize;
  410. p->entry = (IEntryLink*)(p+1);
  411. p->nentry = nentry;
  412. p->mlist = (IEntryLink**)(p->entry+nentry);
  413. p->mcount = (u32int*)(p->mlist+nmbuf);
  414. p->nmbuf = nmbuf;
  415. p->mbuf = mbuf;
  416. data = (uchar*)(p->mcount+nmbuf);
  417. data += bufsize - (uintptr)data%bufsize;
  418. p->rbuf = data;
  419. p->wbuf = data+bufsize;
  420. p->epbuf = bufsize/IEntrySize;
  421. for(i=0; i<p->nentry; i++){
  422. l = &p->entry[i];
  423. l->next = p->free;
  424. p->free = l;
  425. p->nfree++;
  426. }
  427. return p;
  428. }
  429. /*
  430. * Add the index entry ie to the pool p.
  431. * Caller must know there is room.
  432. */
  433. static void
  434. ipoolinsert(IPool *p, uchar *ie)
  435. {
  436. u32int buck, x;
  437. IEntryLink *l;
  438. assert(p->free != nil);
  439. buck = score2bucket(p->isect, ie);
  440. x = (buck-p->buck0) / p->mbufbuckets;
  441. if(x >= p->nmbuf){
  442. fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n",
  443. buck, p->mbufbuckets, x);
  444. }
  445. assert(x < p->nmbuf);
  446. l = p->free;
  447. p->free = l->next;
  448. p->nfree--;
  449. memmove(l->ie, ie, IEntrySize);
  450. l->next = p->mlist[x];
  451. p->mlist[x] = l;
  452. p->mcount[x]++;
  453. }
  454. /*
  455. * Pull out a block containing as many
  456. * entries as possible for minibuffer x.
  457. */
  458. static u32int
  459. ipoolgetbuf(IPool *p, u32int x)
  460. {
  461. uchar *bp, *ep, *wp;
  462. IEntryLink *l;
  463. u32int n;
  464. bp = p->wbuf;
  465. ep = p->wbuf + p->bufsize;
  466. n = 0;
  467. assert(x < p->nmbuf);
  468. for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){
  469. l = p->mlist[x];
  470. p->mlist[x] = l->next;
  471. p->mcount[x]--;
  472. memmove(wp, l->ie, IEntrySize);
  473. l->next = p->free;
  474. p->free = l;
  475. p->nfree++;
  476. n++;
  477. }
  478. memset(wp, 0, ep-wp);
  479. return n;
  480. }
  481. /*
  482. * Read a block worth of entries from the minibuf
  483. * into the pool. Caller must know there is room.
  484. */
  485. static void
  486. ipoolloadblock(IPool *p, Minibuf *mb)
  487. {
  488. u32int i, n;
  489. assert(mb->nentry > 0);
  490. assert(mb->roffset >= mb->woffset);
  491. assert(mb->roffset < mb->eoffset);
  492. n = p->bufsize/IEntrySize;
  493. if(n > mb->nentry)
  494. n = mb->nentry;
  495. if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0)
  496. fprint(2, "readpart %s: %r\n", p->isect->part->name);
  497. else{
  498. for(i=0; i<n; i++)
  499. ipoolinsert(p, p->rbuf+i*IEntrySize);
  500. }
  501. mb->nentry -= n;
  502. mb->roffset += p->bufsize;
  503. }
  504. /*
  505. * Write out a block worth of entries to minibuffer x.
  506. * If necessary, pick up the data there before overwriting it.
  507. */
  508. static void
  509. ipoolflush0(IPool *pool, u32int x)
  510. {
  511. u32int bufsize;
  512. Minibuf *mb;
  513. mb = pool->mbuf+x;
  514. bufsize = pool->bufsize;
  515. mb->nwentry += ipoolgetbuf(pool, x);
  516. if(mb->nentry > 0 && mb->roffset == mb->woffset){
  517. assert(pool->nfree >= pool->bufsize/IEntrySize);
  518. /*
  519. * There will be room in the pool -- we just
  520. * removed a block worth.
  521. */
  522. ipoolloadblock(pool, mb);
  523. }
  524. if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0)
  525. fprint(2, "writepart %s: %r\n", pool->isect->part->name);
  526. mb->woffset += bufsize;
  527. }
  528. /*
  529. * Write out some full block of entries.
  530. * (There must be one -- the pool is almost full!)
  531. */
  532. static void
  533. ipoolflush1(IPool *pool)
  534. {
  535. u32int i;
  536. assert(pool->nfree <= pool->epbuf);
  537. for(i=0; i<pool->nmbuf; i++){
  538. if(pool->mcount[i] >= pool->epbuf){
  539. ipoolflush0(pool, i);
  540. return;
  541. }
  542. }
  543. /* can't be reached - someone must be full */
  544. sysfatal("ipoolflush1");
  545. }
  546. /*
  547. * Flush all the entries in the pool out to disk.
  548. * Nothing more to read from disk.
  549. */
  550. static void
  551. ipoolflush(IPool *pool)
  552. {
  553. u32int i;
  554. for(i=0; i<pool->nmbuf; i++)
  555. while(pool->mlist[i])
  556. ipoolflush0(pool, i);
  557. assert(pool->nfree == pool->nentry);
  558. }
  559. /*
  560. * Third pass. Pick up each minibuffer from disk into
  561. * memory and then write out the buckets.
  562. */
  563. /*
  564. * Compare two packed index entries.
  565. * Usual ordering except break ties by putting higher
  566. * index addresses first (assumes have duplicates
  567. * due to corruption in the lower addresses).
  568. */
  569. static int
  570. ientrycmpaddr(const void *va, const void *vb)
  571. {
  572. int i;
  573. uchar *a, *b;
  574. a = (uchar*)va;
  575. b = (uchar*)vb;
  576. i = ientrycmp(a, b);
  577. if(i)
  578. return i;
  579. return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8);
  580. }
  581. static void
  582. zerorange(Part *p, u64int o, u64int e)
  583. {
  584. static uchar zero[MaxIoSize];
  585. u32int n;
  586. for(; o<e; o+=n){
  587. n = sizeof zero;
  588. if(o+n > e)
  589. n = e-o;
  590. if(writepart(p, o, zero, n) < 0)
  591. fprint(2, "writepart %s: %r\n", p->name);
  592. }
  593. }
  594. /*
  595. * Load a minibuffer into memory and write out the
  596. * corresponding buckets.
  597. */
  598. static void
  599. sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize)
  600. {
  601. uchar *buckdata, *p, *q, *ep;
  602. u32int b, lastb, memsize, n;
  603. u64int o;
  604. IBucket ib;
  605. Part *part;
  606. part = is->part;
  607. buckdata = emalloc(is->blocksize);
  608. if(mb->nwentry == 0)
  609. return;
  610. /*
  611. * read entire buffer.
  612. */
  613. assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset);
  614. assert(mb->woffset-mb->boffset <= nbuf);
  615. if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){
  616. fprint(2, "readpart %s: %r\n", part->name);
  617. errors = 1;
  618. return;
  619. }
  620. assert(*(uint*)buf != 0xa5a5a5a5);
  621. /*
  622. * remove fragmentation due to IEntrySize
  623. * not evenly dividing Bufsize
  624. */
  625. memsize = (bufsize/IEntrySize)*IEntrySize;
  626. for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){
  627. memmove(p, q, memsize);
  628. p += memsize;
  629. q += bufsize;
  630. }
  631. ep = buf + mb->nwentry*IEntrySize;
  632. assert(ep <= buf+nbuf);
  633. /*
  634. * sort entries
  635. */
  636. qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr);
  637. /*
  638. * write buckets out
  639. */
  640. n = 0;
  641. lastb = offset2bucket(is, mb->boffset);
  642. for(p=buf; p<ep; p=q){
  643. b = score2bucket(is, p);
  644. for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize)
  645. ;
  646. if(lastb+1 < b && zero)
  647. zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b));
  648. if(IBucketSize+(q-p) > is->blocksize)
  649. sysfatal("bucket overflow - make index bigger");
  650. memmove(buckdata+IBucketSize, p, q-p);
  651. ib.n = (q-p)/IEntrySize;
  652. n += ib.n;
  653. packibucket(&ib, buckdata, is->bucketmagic);
  654. if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0)
  655. fprint(2, "write %s: %r\n", part->name);
  656. lastb = b;
  657. }
  658. if(lastb+1 < is->stop-is->start && zero)
  659. zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start));
  660. if(n != mb->nwentry)
  661. fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%ld\n", n, mb->nwentry, (ep-buf)/IEntrySize);
  662. free(buckdata);
  663. }
  664. static void
  665. isectproc(void *v)
  666. {
  667. u32int buck, bufbuckets, bufsize, epbuf, i, j;
  668. u32int mbufbuckets, n, nbucket, nn, space;
  669. u32int nbuf, nminibuf, xminiclump, prod;
  670. u64int blocksize, offset, xclump;
  671. uchar *data, *p;
  672. Buf *buf;
  673. IEntry ie;
  674. IPool *ipool;
  675. ISect *is;
  676. Minibuf *mbuf, *mb;
  677. is = v;
  678. blocksize = is->blocksize;
  679. nbucket = is->stop - is->start;
  680. /*
  681. * Three passes:
  682. * pass 1 - write index entries from arenas into
  683. * large sequential sections on index disk.
  684. * requires nbuf * bufsize memory.
  685. *
  686. * pass 2 - split each section into minibufs.
  687. * requires nminibuf * bufsize memory.
  688. *
  689. * pass 3 - read each minibuf into memory and
  690. * write buckets out.
  691. * requires entries/minibuf * IEntrySize memory.
  692. *
  693. * The larger we set bufsize the less seeking hurts us.
  694. *
  695. * The fewer sections and minibufs we have, the less
  696. * seeking hurts us.
  697. *
  698. * The fewer sections and minibufs we have, the
  699. * more entries we end up with in each minibuf
  700. * at the end.
  701. *
  702. * Shoot for using half our memory to hold each
  703. * minibuf. The chance of a random distribution
  704. * getting off by 2x is quite low.
  705. *
  706. * Once that is decided, figure out the smallest
  707. * nminibuf and nsection/biggest bufsize we can use
  708. * and still fit in the memory constraints.
  709. */
  710. /* expected number of clump index entries we'll see */
  711. xclump = nbucket * (double)totalclumps/totalbuckets;
  712. /* number of clumps we want to see in a minibuf */
  713. xminiclump = isectmem/2/IEntrySize;
  714. /* total number of minibufs we need */
  715. prod = (xclump+xminiclump-1) / xminiclump;
  716. /* if possible, skip second pass */
  717. if(!dumb && prod*MinBufSize < isectmem){
  718. nbuf = prod;
  719. nminibuf = 1;
  720. }else{
  721. /* otherwise use nsection = sqrt(nmini) */
  722. for(nbuf=1; nbuf*nbuf<prod; nbuf++)
  723. ;
  724. if(nbuf*MinBufSize > isectmem)
  725. sysfatal("not enough memory");
  726. nminibuf = nbuf;
  727. }
  728. /* size buffer to use extra memory */
  729. bufsize = MinBufSize;
  730. while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize)
  731. bufsize *= 2;
  732. data = emalloc(nbuf*bufsize);
  733. epbuf = bufsize/IEntrySize;
  734. fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n",
  735. is->part->name, nbucket, nbuf, nminibuf, bufsize);
  736. /*
  737. * Accept index entries from arena procs.
  738. */
  739. buf = MKNZ(Buf, nbuf);
  740. p = data;
  741. offset = is->blockbase;
  742. bufbuckets = (nbucket+nbuf-1)/nbuf;
  743. for(i=0; i<nbuf; i++){
  744. buf[i].part = is->part;
  745. buf[i].bp = p;
  746. buf[i].wp = p;
  747. p += bufsize;
  748. buf[i].ep = p;
  749. buf[i].boffset = offset;
  750. buf[i].woffset = offset;
  751. if(i < nbuf-1){
  752. offset += bufbuckets*blocksize;
  753. buf[i].eoffset = offset;
  754. }else{
  755. offset = is->blockbase + nbucket*blocksize;
  756. buf[i].eoffset = offset;
  757. }
  758. }
  759. assert(p == data+nbuf*bufsize);
  760. n = 0;
  761. while(recv(is->writechan, &ie) == 1){
  762. if(ie.ia.addr == 0)
  763. break;
  764. buck = score2bucket(is, ie.score);
  765. i = buck/bufbuckets;
  766. assert(i < nbuf);
  767. bwrite(&buf[i], &ie);
  768. n++;
  769. }
  770. add(&indexentries, n);
  771. nn = 0;
  772. for(i=0; i<nbuf; i++){
  773. bflush(&buf[i]);
  774. buf[i].bp = nil;
  775. buf[i].ep = nil;
  776. buf[i].wp = nil;
  777. nn += buf[i].nentry;
  778. }
  779. if(n != nn)
  780. fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn);
  781. free(data);
  782. fprint(2, "%T %s: reordering\n", is->part->name);
  783. /*
  784. * Rearrange entries into minibuffers and then
  785. * split each minibuffer into buckets.
  786. * The minibuffer must be sized so that it is
  787. * a multiple of blocksize -- ipoolloadblock assumes
  788. * that each minibuf starts aligned on a blocksize
  789. * boundary.
  790. */
  791. mbuf = MKN(Minibuf, nminibuf);
  792. mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf;
  793. while(mbufbuckets*blocksize % bufsize)
  794. mbufbuckets++;
  795. for(i=0; i<nbuf; i++){
  796. /*
  797. * Set up descriptors.
  798. */
  799. n = buf[i].nentry;
  800. nn = 0;
  801. offset = buf[i].boffset;
  802. memset(mbuf, 0, nminibuf*sizeof(mbuf[0]));
  803. for(j=0; j<nminibuf; j++){
  804. mb = &mbuf[j];
  805. mb->boffset = offset;
  806. offset += mbufbuckets*blocksize;
  807. if(offset > buf[i].eoffset)
  808. offset = buf[i].eoffset;
  809. mb->eoffset = offset;
  810. mb->roffset = mb->boffset;
  811. mb->woffset = mb->boffset;
  812. mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize;
  813. if(mb->nentry > buf[i].nentry)
  814. mb->nentry = buf[i].nentry;
  815. buf[i].nentry -= mb->nentry;
  816. nn += mb->nentry;
  817. }
  818. if(n != nn)
  819. fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);;
  820. /*
  821. * Rearrange.
  822. */
  823. if(!dumb && nminibuf == 1){
  824. mbuf[0].nwentry = mbuf[0].nentry;
  825. mbuf[0].woffset = buf[i].woffset;
  826. }else{
  827. ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize);
  828. ipool->buck0 = bufbuckets*i;
  829. for(j=0; j<nminibuf; j++){
  830. mb = &mbuf[j];
  831. while(mb->nentry > 0){
  832. if(ipool->nfree < epbuf){
  833. ipoolflush1(ipool);
  834. /* ipoolflush1 might change mb->nentry */
  835. continue;
  836. }
  837. assert(ipool->nfree >= epbuf);
  838. ipoolloadblock(ipool, mb);
  839. }
  840. }
  841. ipoolflush(ipool);
  842. nn = 0;
  843. for(j=0; j<nminibuf; j++)
  844. nn += mbuf[j].nwentry;
  845. if(n != nn)
  846. fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i);
  847. free(ipool);
  848. }
  849. /*
  850. * Make buckets.
  851. */
  852. space = 0;
  853. for(j=0; j<nminibuf; j++)
  854. if(space < mbuf[j].woffset - mbuf[j].boffset)
  855. space = mbuf[j].woffset - mbuf[j].boffset;
  856. data = emalloc(space);
  857. for(j=0; j<nminibuf; j++){
  858. mb = &mbuf[j];
  859. sortminibuffer(is, mb, data, space, bufsize);
  860. }
  861. free(data);
  862. }
  863. sendp(isectdonechan, is);
  864. }