123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- #include <u.h>
- #include <libc.h>
- #include <thread.h>
- #include "threadimpl.h"
- static Lock chanlock; /* central channel access lock */
- static void enqueue(Alt*, Channel**);
- static void dequeue(Alt*);
- static int canexec(Alt*);
- static int altexec(Alt*, int);
- static void
- _chanfree(Channel *c)
- {
- int i, inuse;
- inuse = 0;
- for(i = 0; i < c->nentry; i++)
- if(c->qentry[i])
- inuse = 1;
- if(inuse)
- c->freed = 1;
- else{
- if(c->qentry)
- free(c->qentry);
- free(c);
- }
- }
- void
- chanfree(Channel *c)
- {
- lock(&chanlock);
- _chanfree(c);
- unlock(&chanlock);
- }
- int
- chaninit(Channel *c, int elemsize, int elemcnt)
- {
- if(elemcnt < 0 || elemsize <= 0 || c == nil)
- return -1;
- c->f = 0;
- c->n = 0;
- c->freed = 0;
- c->e = elemsize;
- c->s = elemcnt;
- _threaddebug(DBGCHAN, "chaninit %p", c);
- return 1;
- }
- Channel*
- chancreate(int elemsize, int elemcnt)
- {
- Channel *c;
- if(elemcnt < 0 || elemsize <= 0)
- return nil;
- c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
- c->e = elemsize;
- c->s = elemcnt;
- _threaddebug(DBGCHAN, "chancreate %p", c);
- return c;
- }
- int
- alt(Alt *alts)
- {
- Alt *a, *xa;
- Channel volatile *c;
- int n, s;
- void* r;
- Thread *t;
- /*
- * The point of going splhi here is that note handlers
- * might reasonably want to use channel operations,
- * but that will hang if the note comes while we hold the
- * chanlock. Instead, we delay the note until we've dropped
- * the lock.
- */
- t = _threadgetproc()->thread;
- if(t->moribund || _threadexitsallstatus)
- yield(); /* won't return */
- s = _procsplhi();
- lock(&chanlock);
- t->alt = alts;
- t->chan = Chanalt;
- /* test whether any channels can proceed */
- n = 0;
- a = nil;
- for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
- xa->entryno = -1;
- if(xa->op == CHANNOP)
- continue;
-
- c = xa->c;
- if(c==nil){
- unlock(&chanlock);
- _procsplx(s);
- t->chan = Channone;
- return -1;
- }
- if(canexec(xa))
- if(nrand(++n) == 0)
- a = xa;
- }
- if(a==nil){
- /* nothing can proceed */
- if(xa->op == CHANNOBLK){
- unlock(&chanlock);
- _procsplx(s);
- t->chan = Channone;
- return xa - alts;
- }
- /* enqueue on all channels. */
- c = nil;
- for(xa=alts; xa->op!=CHANEND; xa++){
- if(xa->op==CHANNOP)
- continue;
- enqueue(xa, &c);
- }
- /*
- * wait for successful rendezvous.
- * we can't just give up if the rendezvous
- * is interrupted -- someone else might come
- * along and try to rendezvous with us, so
- * we need to be here.
- */
- Again:
- unlock(&chanlock);
- _procsplx(s);
- r = _threadrendezvous(&c, 0);
- s = _procsplhi();
- lock(&chanlock);
- if(r==(void*)~0){ /* interrupted */
- if(c!=nil) /* someone will meet us; go back */
- goto Again;
- c = (Channel*)~0; /* so no one tries to meet us */
- }
- /* dequeue from channels, find selected one */
- a = nil;
- for(xa=alts; xa->op!=CHANEND; xa++){
- if(xa->op==CHANNOP)
- continue;
- if(xa->c == c)
- a = xa;
- dequeue(xa);
- }
- unlock(&chanlock);
- _procsplx(s);
- if(a == nil){ /* we were interrupted */
- assert(c==(Channel*)~0);
- return -1;
- }
- }else{
- altexec(a, s); /* unlocks chanlock, does splx */
- }
- _sched();
- t->chan = Channone;
- return a - alts;
- }
- static int
- runop(int op, Channel *c, void *v, int nb)
- {
- int r;
- Alt a[2];
- /*
- * we could do this without calling alt,
- * but the only reason would be performance,
- * and i'm not convinced it matters.
- */
- a[0].op = op;
- a[0].c = c;
- a[0].v = v;
- a[1].op = CHANEND;
- if(nb)
- a[1].op = CHANNOBLK;
- switch(r=alt(a)){
- case -1: /* interrupted */
- return -1;
- case 1: /* nonblocking, didn't accomplish anything */
- assert(nb);
- return 0;
- case 0:
- return 1;
- default:
- fprint(2, "ERROR: channel alt returned %d\n", r);
- abort();
- return -1;
- }
- }
- int
- recv(Channel *c, void *v)
- {
- return runop(CHANRCV, c, v, 0);
- }
- int
- nbrecv(Channel *c, void *v)
- {
- return runop(CHANRCV, c, v, 1);
- }
- int
- send(Channel *c, void *v)
- {
- return runop(CHANSND, c, v, 0);
- }
- int
- nbsend(Channel *c, void *v)
- {
- return runop(CHANSND, c, v, 1);
- }
- static void
- channelsize(Channel *c, int sz)
- {
- if(c->e != sz){
- fprint(2, "expected channel with elements of size %d, got size %d\n",
- sz, c->e);
- abort();
- }
- }
- int
- sendul(Channel *c, ulong v)
- {
- channelsize(c, sizeof(ulong));
- return send(c, &v);
- }
- ulong
- recvul(Channel *c)
- {
- ulong v;
- channelsize(c, sizeof(ulong));
- if(recv(c, &v) < 0)
- return ~0;
- return v;
- }
- int
- sendp(Channel *c, void *v)
- {
- channelsize(c, sizeof(void*));
- return send(c, &v);
- }
- void*
- recvp(Channel *c)
- {
- void *v;
- channelsize(c, sizeof(void*));
- if(recv(c, &v) < 0)
- return nil;
- return v;
- }
- int
- nbsendul(Channel *c, ulong v)
- {
- channelsize(c, sizeof(ulong));
- return nbsend(c, &v);
- }
- ulong
- nbrecvul(Channel *c)
- {
- ulong v;
- channelsize(c, sizeof(ulong));
- if(nbrecv(c, &v) == 0)
- return 0;
- return v;
- }
- int
- nbsendp(Channel *c, void *v)
- {
- channelsize(c, sizeof(void*));
- return nbsend(c, &v);
- }
- void*
- nbrecvp(Channel *c)
- {
- void *v;
- channelsize(c, sizeof(void*));
- if(nbrecv(c, &v) == 0)
- return nil;
- return v;
- }
- static int
- emptyentry(Channel *c)
- {
- int i, extra;
- assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
- for(i=0; i<c->nentry; i++)
- if(c->qentry[i]==nil)
- return i;
- extra = 16;
- c->nentry += extra;
- c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
- if(c->qentry == nil)
- sysfatal("realloc channel entries: %r");
- memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
- return i;
- }
- static void
- enqueue(Alt *a, Channel **c)
- {
- int i;
- _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
- a->tag = c;
- i = emptyentry(a->c);
- a->c->qentry[i] = a;
- }
- static void
- dequeue(Alt *a)
- {
- int i;
- Channel *c;
- c = a->c;
- for(i=0; i<c->nentry; i++)
- if(c->qentry[i]==a){
- _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
- c->qentry[i] = nil;
- if(c->freed)
- _chanfree(c);
- return;
- }
- }
- static int
- canexec(Alt *a)
- {
- int i, otherop;
- Channel *c;
- c = a->c;
- /* are there senders or receivers blocked? */
- otherop = (CHANSND+CHANRCV) - a->op;
- for(i=0; i<c->nentry; i++)
- if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
- _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
- return 1;
- }
- /* is there room in the channel? */
- if((a->op==CHANSND && c->n < c->s)
- || (a->op==CHANRCV && c->n > 0)){
- _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
- return 1;
- }
- return 0;
- }
- static void*
- altexecbuffered(Alt *a, int willreplace)
- {
- uchar *v;
- Channel *c;
- c = a->c;
- /* use buffered channel queue */
- if(a->op==CHANRCV && c->n > 0){
- _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
- v = c->v + c->e*(c->f%c->s);
- if(!willreplace)
- c->n--;
- c->f++;
- return v;
- }
- if(a->op==CHANSND && c->n < c->s){
- _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
- v = c->v + c->e*((c->f+c->n)%c->s);
- if(!willreplace)
- c->n++;
- return v;
- }
- abort();
- return nil;
- }
- static void
- altcopy(void *dst, void *src, int sz)
- {
- if(dst){
- if(src)
- memmove(dst, src, sz);
- else
- memset(dst, 0, sz);
- }
- }
- static int
- altexec(Alt *a, int spl)
- {
- volatile Alt *b;
- int i, n, otherop;
- Channel *c;
- void *me, *waiter, *buf;
- c = a->c;
- /* rendezvous with others */
- otherop = (CHANSND+CHANRCV) - a->op;
- n = 0;
- b = nil;
- me = a->v;
- for(i=0; i<c->nentry; i++)
- if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
- if(nrand(++n) == 0)
- b = c->qentry[i];
- if(b != nil){
- _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
- waiter = b->v;
- if(c->s && c->n){
- /*
- * if buffer is full and there are waiters
- * and we're meeting a waiter,
- * we must be receiving.
- *
- * we use the value in the channel buffer,
- * copy the waiter's value into the channel buffer
- * on behalf of the waiter, and then wake the waiter.
- */
- if(a->op!=CHANRCV)
- abort();
- buf = altexecbuffered(a, 1);
- altcopy(me, buf, c->e);
- altcopy(buf, waiter, c->e);
- }else{
- if(a->op==CHANRCV)
- altcopy(me, waiter, c->e);
- else
- altcopy(waiter, me, c->e);
- }
- *b->tag = c; /* commits us to rendezvous */
- _threaddebug(DBGCHAN, "unlocking the chanlock");
- unlock(&chanlock);
- _procsplx(spl);
- _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
- while(_threadrendezvous(b->tag, 0) == (void*)~0)
- ;
- return 1;
- }
- buf = altexecbuffered(a, 0);
- if(a->op==CHANRCV)
- altcopy(me, buf, c->e);
- else
- altcopy(buf, me, c->e);
- unlock(&chanlock);
- _procsplx(spl);
- return 1;
- }
|