123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961 |
- /*
- * Rebuild the index from scratch, in place.
- */
- #include "stdinc.h"
- #include "dat.h"
- #include "fns.h"
- enum
- {
- MinBufSize = 64*1024,
- MaxBufSize = 4*1024*1024,
- };
- int dumb;
- int errors;
- char **isect;
- int nisect;
- int bloom;
- int zero;
- u32int isectmem;
- u64int totalbuckets;
- u64int totalclumps;
- Channel *arenadonechan;
- Channel *isectdonechan;
- Index *ix;
- u64int arenaentries;
- u64int skipentries;
- u64int indexentries;
- static int shouldprocess(ISect*);
- static void isectproc(void*);
- static void arenapartproc(void*);
- void
- usage(void)
- {
- fprint(2, "usage: buildindex [-b] [-i isect]... [-M imem] venti.conf\n");
- threadexitsall("usage");
- }
- void
- threadmain(int argc, char *argv[])
- {
- int fd, i, napart, nfinish, maxdisks;
- u32int bcmem, imem;
- Config conf;
- Part *p;
-
- maxdisks = 100000;
- ventifmtinstall();
- imem = 256*1024*1024;
- ARGBEGIN{
- case 'b':
- bloom = 1;
- break;
- case 'd': /* debugging - make sure to run all 3 passes */
- dumb = 1;
- break;
- case 'i':
- isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0]));
- isect[nisect++] = EARGF(usage());
- break;
- case 'M':
- imem = unittoull(EARGF(usage()));
- break;
- case 'm': /* temporary - might go away */
- maxdisks = atoi(EARGF(usage()));
- break;
- default:
- usage();
- break;
- }ARGEND
- if(argc != 1)
- usage();
- if(initventi(argv[0], &conf) < 0)
- sysfatal("can't init venti: %r");
- ix = mainindex;
- if(nisect == 0 && ix->bloom)
- bloom = 1;
- if(bloom && ix->bloom && resetbloom(ix->bloom) < 0)
- sysfatal("loadbloom: %r");
- if(bloom && !ix->bloom)
- sysfatal("-b specified but no bloom filter");
- if(!bloom)
- ix->bloom = nil;
- isectmem = imem/ix->nsects;
- /*
- * safety first - only need read access to arenas
- */
- p = nil;
- for(i=0; i<ix->narenas; i++){
- if(ix->arenas[i]->part != p){
- p = ix->arenas[i]->part;
- if((fd = open(p->filename, OREAD)) < 0)
- sysfatal("cannot reopen %s: %r", p->filename);
- dup(fd, p->fd);
- close(fd);
- }
- }
-
- /*
- * need a block for every arena
- */
- bcmem = maxblocksize * (mainindex->narenas + 16);
- if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
- initdcache(bcmem);
-
- totalclumps = 0;
- for(i=0; i<ix->narenas; i++)
- totalclumps += ix->arenas[i]->diskstats.clumps;
-
- totalbuckets = 0;
- for(i=0; i<ix->nsects; i++)
- totalbuckets += ix->sects[i]->blocks;
- fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets);
- /* start index procs */
- fprint(2, "%T read index\n");
- isectdonechan = chancreate(sizeof(void*), 0);
- for(i=0; i<ix->nsects; i++){
- if(shouldprocess(ix->sects[i])){
- ix->sects[i]->writechan = chancreate(sizeof(IEntry), 0);
- vtproc(isectproc, ix->sects[i]);
- }
- }
-
- for(i=0; i<nisect; i++)
- if(isect[i])
- fprint(2, "warning: did not find index section %s\n", isect[i]);
- /* start arena procs */
- p = nil;
- napart = 0;
- nfinish = 0;
- arenadonechan = chancreate(sizeof(void*), 0);
- for(i=0; i<ix->narenas; i++){
- if(ix->arenas[i]->part != p){
- p = ix->arenas[i]->part;
- vtproc(arenapartproc, p);
- if(++napart >= maxdisks){
- recvp(arenadonechan);
- nfinish++;
- }
- }
- }
- /* wait for arena procs to finish */
- for(nfinish=0; nfinish<napart; nfinish++)
- recvp(arenadonechan);
- /* tell index procs to finish */
- for(i=0; i<ix->nsects; i++)
- if(ix->sects[i]->writechan)
- send(ix->sects[i]->writechan, nil);
- /* wait for index procs to finish */
- for(i=0; i<ix->nsects; i++)
- if(ix->sects[i]->writechan)
- recvp(isectdonechan);
- if(ix->bloom && writebloom(ix->bloom) < 0)
- fprint(2, "writing bloom filter: %r\n");
- fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n",
- arenaentries, indexentries, skipentries);
- threadexitsall(nil);
- }
- static int
- shouldprocess(ISect *is)
- {
- int i;
-
- if(nisect == 0)
- return 1;
- for(i=0; i<nisect; i++)
- if(isect[i] && strcmp(isect[i], is->name) == 0){
- isect[i] = nil;
- return 1;
- }
- return 0;
- }
- static void
- add(u64int *a, u64int n)
- {
- static Lock l;
-
- lock(&l);
- *a += n;
- unlock(&l);
- }
- /*
- * Read through an arena partition and send each of its IEntries
- * to the appropriate index section. When finished, send on
- * arenadonechan.
- */
- enum
- {
- ClumpChunks = 32*1024,
- };
- static void
- arenapartproc(void *v)
- {
- int i, j, n, nskip, x;
- u32int clump;
- u64int addr, tot;
- Arena *a;
- ClumpInfo *ci, *cis;
- IEntry ie;
- Part *p;
-
- p = v;
- threadsetname("arenaproc %s", p->name);
- nskip = 0;
- tot = 0;
- cis = MKN(ClumpInfo, ClumpChunks);
- for(i=0; i<ix->narenas; i++){
- a = ix->arenas[i];
- if(a->part != p)
- continue;
- if(a->memstats.clumps)
- fprint(2, "%T arena %s: %d entries\n",
- a->name, a->memstats.clumps);
- /*
- * Running the loop backwards accesses the
- * clump info blocks forwards, since they are
- * stored in reverse order at the end of the arena.
- * This speeds things slightly.
- */
- addr = ix->amap[i].start + a->memstats.used;
- for(clump=a->memstats.clumps; clump > 0; clump-=n){
- n = ClumpChunks;
- if(n > clump)
- n = clump;
- if(readclumpinfos(a, clump-n, cis, n) != n){
- fprint(2, "%T arena %s: directory read: %r\n", a->name);
- errors = 1;
- break;
- }
- for(j=n-1; j>=0; j--){
- ci = &cis[j];
- ie.ia.type = ci->type;
- ie.ia.size = ci->uncsize;
- addr -= ci->size + ClumpSize;
- ie.ia.addr = addr;
- ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog;
- scorecp(ie.score, ci->score);
- if(ci->type == VtCorruptType)
- nskip++;
- else{
- tot++;
- x = indexsect(ix, ie.score);
- assert(0 <= x && x < ix->nsects);
- if(ix->sects[x]->writechan)
- send(ix->sects[x]->writechan, &ie);
- if(ix->bloom)
- markbloomfilter(ix->bloom, ie.score);
- }
- }
- }
- if(addr != ix->amap[i].start)
- fprint(2, "%T arena %s: clump miscalculation %lld != %lld\n", a->name, addr, ix->amap[i].start);
- }
- add(&arenaentries, tot);
- add(&skipentries, nskip);
- sendp(arenadonechan, p);
- }
- /*
- * Convert score into relative bucket number in isect.
- * Can pass a packed ientry instead of score - score is first.
- */
- static u32int
- score2bucket(ISect *is, uchar *score)
- {
- u32int b;
-
- b = hashbits(score, 32)/ix->div;
- if(b < is->start || b >= is->stop){
- fprint(2, "score2bucket: score=%V div=%d b=%ud start=%ud stop=%ud\n",
- score, ix->div, b, is->start, is->stop);
- }
- assert(is->start <= b && b < is->stop);
- return b - is->start;
- }
- /*
- * Convert offset in index section to bucket number.
- */
- static u32int
- offset2bucket(ISect *is, u64int offset)
- {
- u32int b;
-
- assert(is->blockbase <= offset);
- offset -= is->blockbase;
- b = offset/is->blocksize;
- assert(b < is->stop-is->start);
- return b;
- }
- /*
- * Convert bucket number to offset.
- */
- static u64int
- bucket2offset(ISect *is, u32int b)
- {
- assert(b <= is->stop-is->start);
- return is->blockbase + (u64int)b*is->blocksize;
- }
- /*
- * IEntry buffers to hold initial round of spraying.
- */
- typedef struct Buf Buf;
- struct Buf
- {
- Part *part; /* partition being written */
- uchar *bp; /* current block */
- uchar *ep; /* end of block */
- uchar *wp; /* write position in block */
- u64int boffset; /* start offset */
- u64int woffset; /* next write offset */
- u64int eoffset; /* end offset */
- u32int nentry; /* number of entries written */
- };
- static void
- bflush(Buf *buf)
- {
- u32int bufsize;
-
- if(buf->woffset >= buf->eoffset)
- sysfatal("buf index chunk overflow - need bigger index");
- bufsize = buf->ep - buf->bp;
- if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){
- fprint(2, "write %s: %r\n", buf->part->name);
- errors = 1;
- }
- buf->woffset += bufsize;
- memset(buf->bp, 0, bufsize);
- buf->wp = buf->bp;
- }
- static void
- bwrite(Buf *buf, IEntry *ie)
- {
- if(buf->wp+IEntrySize > buf->ep)
- bflush(buf);
- assert(buf->bp <= buf->wp && buf->wp < buf->ep);
- packientry(ie, buf->wp);
- buf->wp += IEntrySize;
- assert(buf->bp <= buf->wp && buf->wp <= buf->ep);
- buf->nentry++;
- }
- /*
- * Minibuffer. In-memory data structure holds our place
- * in the buffer but has no block data. We are writing and
- * reading the minibuffers at the same time. (Careful!)
- */
- typedef struct Minibuf Minibuf;
- struct Minibuf
- {
- u64int boffset; /* start offset */
- u64int roffset; /* read offset */
- u64int woffset; /* write offset */
- u64int eoffset; /* end offset */
- u32int nentry; /* # entries left to read */
- u32int nwentry; /* # entries written */
- };
- /*
- * Index entry pool. Used when trying to shuffle around
- * the entries in a big buffer into the corresponding M minibuffers.
- * Sized to hold M*EntriesPerBlock entries, so that there will always
- * either be room in the pool for another block worth of entries
- * or there will be an entire block worth of sorted entries to
- * write out.
- */
- typedef struct IEntryLink IEntryLink;
- typedef struct IPool IPool;
- struct IEntryLink
- {
- uchar ie[IEntrySize]; /* raw IEntry */
- IEntryLink *next; /* next in chain */
- };
- struct IPool
- {
- ISect *isect;
- u32int buck0; /* first bucket in pool */
- u32int mbufbuckets; /* buckets per minibuf */
- IEntryLink *entry; /* all IEntryLinks */
- u32int nentry; /* # of IEntryLinks */
- IEntryLink *free; /* free list */
- u32int nfree; /* # on free list */
- Minibuf *mbuf; /* all minibufs */
- u32int nmbuf; /* # of minibufs */
- IEntryLink **mlist; /* lists for each minibuf */
- u32int *mcount; /* # on each mlist[i] */
- u32int bufsize; /* block buffer size */
- uchar *rbuf; /* read buffer */
- uchar *wbuf; /* write buffer */
- u32int epbuf; /* entries per block buffer */
- };
- /*
- static int
- countsokay(IPool *p)
- {
- int i;
- u64int n;
-
- n = 0;
- for(i=0; i<p->nmbuf; i++)
- n += p->mcount[i];
- n += p->nfree;
- if(n != p->nentry){
- print("free %ud:", p->nfree);
- for(i=0; i<p->nmbuf; i++)
- print(" %ud", p->mcount[i]);
- print(" = %lld nentry: %ud\n", n, p->nentry);
- }
- return n == p->nentry;
- }
- */
- static IPool*
- mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf,
- u32int mbufbuckets, u32int bufsize)
- {
- u32int i, nentry;
- uchar *data;
- IPool *p;
- IEntryLink *l;
-
- nentry = (nmbuf+1)*bufsize / IEntrySize;
- p = ezmalloc(sizeof(IPool)
- +nentry*sizeof(IEntry)
- +nmbuf*sizeof(IEntryLink*)
- +nmbuf*sizeof(u32int)
- +3*bufsize);
-
- p->isect = isect;
- p->mbufbuckets = mbufbuckets;
- p->bufsize = bufsize;
- p->entry = (IEntryLink*)(p+1);
- p->nentry = nentry;
- p->mlist = (IEntryLink**)(p->entry+nentry);
- p->mcount = (u32int*)(p->mlist+nmbuf);
- p->nmbuf = nmbuf;
- p->mbuf = mbuf;
- data = (uchar*)(p->mcount+nmbuf);
- data += bufsize - (uintptr)data%bufsize;
- p->rbuf = data;
- p->wbuf = data+bufsize;
- p->epbuf = bufsize/IEntrySize;
- for(i=0; i<p->nentry; i++){
- l = &p->entry[i];
- l->next = p->free;
- p->free = l;
- p->nfree++;
- }
- return p;
- }
- /*
- * Add the index entry ie to the pool p.
- * Caller must know there is room.
- */
- static void
- ipoolinsert(IPool *p, uchar *ie)
- {
- u32int buck, x;
- IEntryLink *l;
- assert(p->free != nil);
- buck = score2bucket(p->isect, ie);
- x = (buck-p->buck0) / p->mbufbuckets;
- if(x >= p->nmbuf){
- fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n",
- buck, p->mbufbuckets, x);
- }
- assert(x < p->nmbuf);
- l = p->free;
- p->free = l->next;
- p->nfree--;
- memmove(l->ie, ie, IEntrySize);
- l->next = p->mlist[x];
- p->mlist[x] = l;
- p->mcount[x]++;
- }
- /*
- * Pull out a block containing as many
- * entries as possible for minibuffer x.
- */
- static u32int
- ipoolgetbuf(IPool *p, u32int x)
- {
- uchar *bp, *ep, *wp;
- IEntryLink *l;
- u32int n;
-
- bp = p->wbuf;
- ep = p->wbuf + p->bufsize;
- n = 0;
- assert(x < p->nmbuf);
- for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){
- l = p->mlist[x];
- p->mlist[x] = l->next;
- p->mcount[x]--;
- memmove(wp, l->ie, IEntrySize);
- l->next = p->free;
- p->free = l;
- p->nfree++;
- n++;
- }
- memset(wp, 0, ep-wp);
- return n;
- }
- /*
- * Read a block worth of entries from the minibuf
- * into the pool. Caller must know there is room.
- */
- static void
- ipoolloadblock(IPool *p, Minibuf *mb)
- {
- u32int i, n;
-
- assert(mb->nentry > 0);
- assert(mb->roffset >= mb->woffset);
- assert(mb->roffset < mb->eoffset);
- n = p->bufsize/IEntrySize;
- if(n > mb->nentry)
- n = mb->nentry;
- if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0)
- fprint(2, "readpart %s: %r\n", p->isect->part->name);
- else{
- for(i=0; i<n; i++)
- ipoolinsert(p, p->rbuf+i*IEntrySize);
- }
- mb->nentry -= n;
- mb->roffset += p->bufsize;
- }
- /*
- * Write out a block worth of entries to minibuffer x.
- * If necessary, pick up the data there before overwriting it.
- */
- static void
- ipoolflush0(IPool *pool, u32int x)
- {
- u32int bufsize;
- Minibuf *mb;
-
- mb = pool->mbuf+x;
- bufsize = pool->bufsize;
- mb->nwentry += ipoolgetbuf(pool, x);
- if(mb->nentry > 0 && mb->roffset == mb->woffset){
- assert(pool->nfree >= pool->bufsize/IEntrySize);
- /*
- * There will be room in the pool -- we just
- * removed a block worth.
- */
- ipoolloadblock(pool, mb);
- }
- if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0)
- fprint(2, "writepart %s: %r\n", pool->isect->part->name);
- mb->woffset += bufsize;
- }
- /*
- * Write out some full block of entries.
- * (There must be one -- the pool is almost full!)
- */
- static void
- ipoolflush1(IPool *pool)
- {
- u32int i;
- assert(pool->nfree <= pool->epbuf);
- for(i=0; i<pool->nmbuf; i++){
- if(pool->mcount[i] >= pool->epbuf){
- ipoolflush0(pool, i);
- return;
- }
- }
- /* can't be reached - someone must be full */
- sysfatal("ipoolflush1");
- }
- /*
- * Flush all the entries in the pool out to disk.
- * Nothing more to read from disk.
- */
- static void
- ipoolflush(IPool *pool)
- {
- u32int i;
-
- for(i=0; i<pool->nmbuf; i++)
- while(pool->mlist[i])
- ipoolflush0(pool, i);
- assert(pool->nfree == pool->nentry);
- }
- /*
- * Third pass. Pick up each minibuffer from disk into
- * memory and then write out the buckets.
- */
- /*
- * Compare two packed index entries.
- * Usual ordering except break ties by putting higher
- * index addresses first (assumes have duplicates
- * due to corruption in the lower addresses).
- */
- static int
- ientrycmpaddr(const void *va, const void *vb)
- {
- int i;
- uchar *a, *b;
-
- a = (uchar*)va;
- b = (uchar*)vb;
- i = ientrycmp(a, b);
- if(i)
- return i;
- return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8);
- }
- static void
- zerorange(Part *p, u64int o, u64int e)
- {
- static uchar zero[MaxIoSize];
- u32int n;
-
- for(; o<e; o+=n){
- n = sizeof zero;
- if(o+n > e)
- n = e-o;
- if(writepart(p, o, zero, n) < 0)
- fprint(2, "writepart %s: %r\n", p->name);
- }
- }
- /*
- * Load a minibuffer into memory and write out the
- * corresponding buckets.
- */
- static void
- sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize)
- {
- uchar *buckdata, *p, *q, *ep;
- u32int b, lastb, memsize, n;
- u64int o;
- IBucket ib;
- Part *part;
-
- part = is->part;
- buckdata = emalloc(is->blocksize);
-
- if(mb->nwentry == 0)
- return;
- /*
- * read entire buffer.
- */
- assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset);
- assert(mb->woffset-mb->boffset <= nbuf);
- if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){
- fprint(2, "readpart %s: %r\n", part->name);
- errors = 1;
- return;
- }
- assert(*(uint*)buf != 0xa5a5a5a5);
-
- /*
- * remove fragmentation due to IEntrySize
- * not evenly dividing Bufsize
- */
- memsize = (bufsize/IEntrySize)*IEntrySize;
- for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){
- memmove(p, q, memsize);
- p += memsize;
- q += bufsize;
- }
- ep = buf + mb->nwentry*IEntrySize;
- assert(ep <= buf+nbuf);
- /*
- * sort entries
- */
- qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr);
- /*
- * write buckets out
- */
- n = 0;
- lastb = offset2bucket(is, mb->boffset);
- for(p=buf; p<ep; p=q){
- b = score2bucket(is, p);
- for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize)
- ;
- if(lastb+1 < b && zero)
- zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b));
- if(IBucketSize+(q-p) > is->blocksize)
- sysfatal("bucket overflow - make index bigger");
- memmove(buckdata+IBucketSize, p, q-p);
- ib.n = (q-p)/IEntrySize;
- n += ib.n;
- packibucket(&ib, buckdata, is->bucketmagic);
- if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0)
- fprint(2, "write %s: %r\n", part->name);
- lastb = b;
- }
- if(lastb+1 < is->stop-is->start && zero)
- zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start));
- if(n != mb->nwentry)
- fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%ld\n", n, mb->nwentry, (ep-buf)/IEntrySize);
- free(buckdata);
- }
- static void
- isectproc(void *v)
- {
- u32int buck, bufbuckets, bufsize, epbuf, i, j;
- u32int mbufbuckets, n, nbucket, nn, space;
- u32int nbuf, nminibuf, xminiclump, prod;
- u64int blocksize, offset, xclump;
- uchar *data, *p;
- Buf *buf;
- IEntry ie;
- IPool *ipool;
- ISect *is;
- Minibuf *mbuf, *mb;
-
- is = v;
- blocksize = is->blocksize;
- nbucket = is->stop - is->start;
- /*
- * Three passes:
- * pass 1 - write index entries from arenas into
- * large sequential sections on index disk.
- * requires nbuf * bufsize memory.
- *
- * pass 2 - split each section into minibufs.
- * requires nminibuf * bufsize memory.
- *
- * pass 3 - read each minibuf into memory and
- * write buckets out.
- * requires entries/minibuf * IEntrySize memory.
- *
- * The larger we set bufsize the less seeking hurts us.
- *
- * The fewer sections and minibufs we have, the less
- * seeking hurts us.
- *
- * The fewer sections and minibufs we have, the
- * more entries we end up with in each minibuf
- * at the end.
- *
- * Shoot for using half our memory to hold each
- * minibuf. The chance of a random distribution
- * getting off by 2x is quite low.
- *
- * Once that is decided, figure out the smallest
- * nminibuf and nsection/biggest bufsize we can use
- * and still fit in the memory constraints.
- */
-
- /* expected number of clump index entries we'll see */
- xclump = nbucket * (double)totalclumps/totalbuckets;
-
- /* number of clumps we want to see in a minibuf */
- xminiclump = isectmem/2/IEntrySize;
-
- /* total number of minibufs we need */
- prod = (xclump+xminiclump-1) / xminiclump;
-
- /* if possible, skip second pass */
- if(!dumb && prod*MinBufSize < isectmem){
- nbuf = prod;
- nminibuf = 1;
- }else{
- /* otherwise use nsection = sqrt(nmini) */
- for(nbuf=1; nbuf*nbuf<prod; nbuf++)
- ;
- if(nbuf*MinBufSize > isectmem)
- sysfatal("not enough memory");
- nminibuf = nbuf;
- }
- /* size buffer to use extra memory */
- bufsize = MinBufSize;
- while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize)
- bufsize *= 2;
- data = emalloc(nbuf*bufsize);
- epbuf = bufsize/IEntrySize;
- fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n",
- is->part->name, nbucket, nbuf, nminibuf, bufsize);
- /*
- * Accept index entries from arena procs.
- */
- buf = MKNZ(Buf, nbuf);
- p = data;
- offset = is->blockbase;
- bufbuckets = (nbucket+nbuf-1)/nbuf;
- for(i=0; i<nbuf; i++){
- buf[i].part = is->part;
- buf[i].bp = p;
- buf[i].wp = p;
- p += bufsize;
- buf[i].ep = p;
- buf[i].boffset = offset;
- buf[i].woffset = offset;
- if(i < nbuf-1){
- offset += bufbuckets*blocksize;
- buf[i].eoffset = offset;
- }else{
- offset = is->blockbase + nbucket*blocksize;
- buf[i].eoffset = offset;
- }
- }
- assert(p == data+nbuf*bufsize);
- n = 0;
- while(recv(is->writechan, &ie) == 1){
- if(ie.ia.addr == 0)
- break;
- buck = score2bucket(is, ie.score);
- i = buck/bufbuckets;
- assert(i < nbuf);
- bwrite(&buf[i], &ie);
- n++;
- }
- add(&indexentries, n);
-
- nn = 0;
- for(i=0; i<nbuf; i++){
- bflush(&buf[i]);
- buf[i].bp = nil;
- buf[i].ep = nil;
- buf[i].wp = nil;
- nn += buf[i].nentry;
- }
- if(n != nn)
- fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn);
-
- free(data);
- fprint(2, "%T %s: reordering\n", is->part->name);
-
- /*
- * Rearrange entries into minibuffers and then
- * split each minibuffer into buckets.
- * The minibuffer must be sized so that it is
- * a multiple of blocksize -- ipoolloadblock assumes
- * that each minibuf starts aligned on a blocksize
- * boundary.
- */
- mbuf = MKN(Minibuf, nminibuf);
- mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf;
- while(mbufbuckets*blocksize % bufsize)
- mbufbuckets++;
- for(i=0; i<nbuf; i++){
- /*
- * Set up descriptors.
- */
- n = buf[i].nentry;
- nn = 0;
- offset = buf[i].boffset;
- memset(mbuf, 0, nminibuf*sizeof(mbuf[0]));
- for(j=0; j<nminibuf; j++){
- mb = &mbuf[j];
- mb->boffset = offset;
- offset += mbufbuckets*blocksize;
- if(offset > buf[i].eoffset)
- offset = buf[i].eoffset;
- mb->eoffset = offset;
- mb->roffset = mb->boffset;
- mb->woffset = mb->boffset;
- mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize;
- if(mb->nentry > buf[i].nentry)
- mb->nentry = buf[i].nentry;
- buf[i].nentry -= mb->nentry;
- nn += mb->nentry;
- }
- if(n != nn)
- fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);;
- /*
- * Rearrange.
- */
- if(!dumb && nminibuf == 1){
- mbuf[0].nwentry = mbuf[0].nentry;
- mbuf[0].woffset = buf[i].woffset;
- }else{
- ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize);
- ipool->buck0 = bufbuckets*i;
- for(j=0; j<nminibuf; j++){
- mb = &mbuf[j];
- while(mb->nentry > 0){
- if(ipool->nfree < epbuf){
- ipoolflush1(ipool);
- /* ipoolflush1 might change mb->nentry */
- continue;
- }
- assert(ipool->nfree >= epbuf);
- ipoolloadblock(ipool, mb);
- }
- }
- ipoolflush(ipool);
- nn = 0;
- for(j=0; j<nminibuf; j++)
- nn += mbuf[j].nwentry;
- if(n != nn)
- fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i);
- free(ipool);
- }
- /*
- * Make buckets.
- */
- space = 0;
- for(j=0; j<nminibuf; j++)
- if(space < mbuf[j].woffset - mbuf[j].boffset)
- space = mbuf[j].woffset - mbuf[j].boffset;
- data = emalloc(space);
- for(j=0; j<nminibuf; j++){
- mb = &mbuf[j];
- sortminibuffer(is, mb, data, space, bufsize);
- }
- free(data);
- }
-
- sendp(isectdonechan, is);
- }
|