channel.c 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. #include <u.h>
  2. #include <libc.h>
  3. #include <thread.h>
  4. #include "threadimpl.h"
  5. static Lock chanlock; /* central channel access lock */
  6. static void enqueue(Alt*, Channel**);
  7. static void dequeue(Alt*);
  8. static int canexec(Alt*);
  9. static int altexec(Alt*, int);
  10. static void
  11. _chanfree(Channel *c)
  12. {
  13. int i, inuse;
  14. inuse = 0;
  15. for(i = 0; i < c->nentry; i++)
  16. if(c->qentry[i])
  17. inuse = 1;
  18. if(inuse)
  19. c->freed = 1;
  20. else{
  21. if(c->qentry)
  22. free(c->qentry);
  23. free(c);
  24. }
  25. }
  26. void
  27. chanfree(Channel *c)
  28. {
  29. lock(&chanlock);
  30. _chanfree(c);
  31. unlock(&chanlock);
  32. }
  33. int
  34. chaninit(Channel *c, int elemsize, int elemcnt)
  35. {
  36. if(elemcnt < 0 || elemsize <= 0 || c == nil)
  37. return -1;
  38. c->f = 0;
  39. c->n = 0;
  40. c->freed = 0;
  41. c->e = elemsize;
  42. c->s = elemcnt;
  43. _threaddebug(DBGCHAN, "chaninit %p", c);
  44. return 1;
  45. }
  46. Channel*
  47. chancreate(int elemsize, int elemcnt)
  48. {
  49. Channel *c;
  50. if(elemcnt < 0 || elemsize <= 0)
  51. return nil;
  52. c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
  53. c->e = elemsize;
  54. c->s = elemcnt;
  55. _threaddebug(DBGCHAN, "chancreate %p", c);
  56. return c;
  57. }
  58. int
  59. alt(Alt *alts)
  60. {
  61. Alt *a, *xa;
  62. Channel volatile *c;
  63. int n, s;
  64. void* r;
  65. Thread *t;
  66. /*
  67. * The point of going splhi here is that note handlers
  68. * might reasonably want to use channel operations,
  69. * but that will hang if the note comes while we hold the
  70. * chanlock. Instead, we delay the note until we've dropped
  71. * the lock.
  72. */
  73. t = _threadgetproc()->thread;
  74. if(t->moribund || _threadexitsallstatus)
  75. yield(); /* won't return */
  76. s = _procsplhi();
  77. lock(&chanlock);
  78. t->alt = alts;
  79. t->chan = Chanalt;
  80. /* test whether any channels can proceed */
  81. n = 0;
  82. a = nil;
  83. for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
  84. xa->entryno = -1;
  85. if(xa->op == CHANNOP)
  86. continue;
  87. c = xa->c;
  88. if(c==nil){
  89. unlock(&chanlock);
  90. _procsplx(s);
  91. t->chan = Channone;
  92. return -1;
  93. }
  94. if(canexec(xa))
  95. if(nrand(++n) == 0)
  96. a = xa;
  97. }
  98. if(a==nil){
  99. /* nothing can proceed */
  100. if(xa->op == CHANNOBLK){
  101. unlock(&chanlock);
  102. _procsplx(s);
  103. t->chan = Channone;
  104. return xa - alts;
  105. }
  106. /* enqueue on all channels. */
  107. c = nil;
  108. for(xa=alts; xa->op!=CHANEND; xa++){
  109. if(xa->op==CHANNOP)
  110. continue;
  111. enqueue(xa, &c);
  112. }
  113. /*
  114. * wait for successful rendezvous.
  115. * we can't just give up if the rendezvous
  116. * is interrupted -- someone else might come
  117. * along and try to rendezvous with us, so
  118. * we need to be here.
  119. */
  120. Again:
  121. unlock(&chanlock);
  122. _procsplx(s);
  123. r = _threadrendezvous(&c, 0);
  124. s = _procsplhi();
  125. lock(&chanlock);
  126. if(r==(void*)~0){ /* interrupted */
  127. if(c!=nil) /* someone will meet us; go back */
  128. goto Again;
  129. c = (Channel*)~0; /* so no one tries to meet us */
  130. }
  131. /* dequeue from channels, find selected one */
  132. a = nil;
  133. for(xa=alts; xa->op!=CHANEND; xa++){
  134. if(xa->op==CHANNOP)
  135. continue;
  136. if(xa->c == c)
  137. a = xa;
  138. dequeue(xa);
  139. }
  140. unlock(&chanlock);
  141. _procsplx(s);
  142. if(a == nil){ /* we were interrupted */
  143. assert(c==(Channel*)~0);
  144. return -1;
  145. }
  146. }else{
  147. altexec(a, s); /* unlocks chanlock, does splx */
  148. }
  149. _sched();
  150. t->chan = Channone;
  151. return a - alts;
  152. }
  153. static int
  154. runop(int op, Channel *c, void *v, int nb)
  155. {
  156. int r;
  157. Alt a[2];
  158. /*
  159. * we could do this without calling alt,
  160. * but the only reason would be performance,
  161. * and i'm not convinced it matters.
  162. */
  163. a[0].op = op;
  164. a[0].c = c;
  165. a[0].v = v;
  166. a[1].op = CHANEND;
  167. if(nb)
  168. a[1].op = CHANNOBLK;
  169. switch(r=alt(a)){
  170. case -1: /* interrupted */
  171. return -1;
  172. case 1: /* nonblocking, didn't accomplish anything */
  173. assert(nb);
  174. return 0;
  175. case 0:
  176. return 1;
  177. default:
  178. fprint(2, "ERROR: channel alt returned %d\n", r);
  179. abort();
  180. return -1;
  181. }
  182. }
  183. int
  184. recv(Channel *c, void *v)
  185. {
  186. return runop(CHANRCV, c, v, 0);
  187. }
  188. int
  189. nbrecv(Channel *c, void *v)
  190. {
  191. return runop(CHANRCV, c, v, 1);
  192. }
  193. int
  194. send(Channel *c, void *v)
  195. {
  196. return runop(CHANSND, c, v, 0);
  197. }
  198. int
  199. nbsend(Channel *c, void *v)
  200. {
  201. return runop(CHANSND, c, v, 1);
  202. }
  203. static void
  204. channelsize(Channel *c, int sz)
  205. {
  206. if(c->e != sz){
  207. fprint(2, "expected channel with elements of size %d, got size %d\n",
  208. sz, c->e);
  209. abort();
  210. }
  211. }
  212. int
  213. sendul(Channel *c, ulong v)
  214. {
  215. channelsize(c, sizeof(ulong));
  216. return send(c, &v);
  217. }
  218. ulong
  219. recvul(Channel *c)
  220. {
  221. ulong v;
  222. channelsize(c, sizeof(ulong));
  223. if(recv(c, &v) < 0)
  224. return ~0;
  225. return v;
  226. }
  227. int
  228. sendp(Channel *c, void *v)
  229. {
  230. channelsize(c, sizeof(void*));
  231. return send(c, &v);
  232. }
  233. void*
  234. recvp(Channel *c)
  235. {
  236. void *v;
  237. channelsize(c, sizeof(void*));
  238. if(recv(c, &v) < 0)
  239. return nil;
  240. return v;
  241. }
  242. int
  243. nbsendul(Channel *c, ulong v)
  244. {
  245. channelsize(c, sizeof(ulong));
  246. return nbsend(c, &v);
  247. }
  248. ulong
  249. nbrecvul(Channel *c)
  250. {
  251. ulong v;
  252. channelsize(c, sizeof(ulong));
  253. if(nbrecv(c, &v) == 0)
  254. return 0;
  255. return v;
  256. }
  257. int
  258. nbsendp(Channel *c, void *v)
  259. {
  260. channelsize(c, sizeof(void*));
  261. return nbsend(c, &v);
  262. }
  263. void*
  264. nbrecvp(Channel *c)
  265. {
  266. void *v;
  267. channelsize(c, sizeof(void*));
  268. if(nbrecv(c, &v) == 0)
  269. return nil;
  270. return v;
  271. }
  272. static int
  273. emptyentry(Channel *c)
  274. {
  275. int i, extra;
  276. assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
  277. for(i=0; i<c->nentry; i++)
  278. if(c->qentry[i]==nil)
  279. return i;
  280. extra = 16;
  281. c->nentry += extra;
  282. c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
  283. if(c->qentry == nil)
  284. sysfatal("realloc channel entries: %r");
  285. memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
  286. return i;
  287. }
  288. static void
  289. enqueue(Alt *a, Channel **c)
  290. {
  291. int i;
  292. _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
  293. a->tag = c;
  294. i = emptyentry(a->c);
  295. a->c->qentry[i] = a;
  296. }
  297. static void
  298. dequeue(Alt *a)
  299. {
  300. int i;
  301. Channel *c;
  302. c = a->c;
  303. for(i=0; i<c->nentry; i++)
  304. if(c->qentry[i]==a){
  305. _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
  306. c->qentry[i] = nil;
  307. if(c->freed)
  308. _chanfree(c);
  309. return;
  310. }
  311. }
  312. static int
  313. canexec(Alt *a)
  314. {
  315. int i, otherop;
  316. Channel *c;
  317. c = a->c;
  318. /* are there senders or receivers blocked? */
  319. otherop = (CHANSND+CHANRCV) - a->op;
  320. for(i=0; i<c->nentry; i++)
  321. if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
  322. _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
  323. return 1;
  324. }
  325. /* is there room in the channel? */
  326. if((a->op==CHANSND && c->n < c->s)
  327. || (a->op==CHANRCV && c->n > 0)){
  328. _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
  329. return 1;
  330. }
  331. return 0;
  332. }
  333. static void*
  334. altexecbuffered(Alt *a, int willreplace)
  335. {
  336. uchar *v;
  337. Channel *c;
  338. c = a->c;
  339. /* use buffered channel queue */
  340. if(a->op==CHANRCV && c->n > 0){
  341. _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
  342. v = c->v + c->e*(c->f%c->s);
  343. if(!willreplace)
  344. c->n--;
  345. c->f++;
  346. return v;
  347. }
  348. if(a->op==CHANSND && c->n < c->s){
  349. _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
  350. v = c->v + c->e*((c->f+c->n)%c->s);
  351. if(!willreplace)
  352. c->n++;
  353. return v;
  354. }
  355. abort();
  356. return nil;
  357. }
  358. static void
  359. altcopy(void *dst, void *src, int sz)
  360. {
  361. if(dst){
  362. if(src)
  363. memmove(dst, src, sz);
  364. else
  365. memset(dst, 0, sz);
  366. }
  367. }
  368. static int
  369. altexec(Alt *a, int spl)
  370. {
  371. volatile Alt *b;
  372. int i, n, otherop;
  373. Channel *c;
  374. void *me, *waiter, *buf;
  375. c = a->c;
  376. /* rendezvous with others */
  377. otherop = (CHANSND+CHANRCV) - a->op;
  378. n = 0;
  379. b = nil;
  380. me = a->v;
  381. for(i=0; i<c->nentry; i++)
  382. if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
  383. if(nrand(++n) == 0)
  384. b = c->qentry[i];
  385. if(b != nil){
  386. _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
  387. waiter = b->v;
  388. if(c->s && c->n){
  389. /*
  390. * if buffer is full and there are waiters
  391. * and we're meeting a waiter,
  392. * we must be receiving.
  393. *
  394. * we use the value in the channel buffer,
  395. * copy the waiter's value into the channel buffer
  396. * on behalf of the waiter, and then wake the waiter.
  397. */
  398. if(a->op!=CHANRCV)
  399. abort();
  400. buf = altexecbuffered(a, 1);
  401. altcopy(me, buf, c->e);
  402. altcopy(buf, waiter, c->e);
  403. }else{
  404. if(a->op==CHANRCV)
  405. altcopy(me, waiter, c->e);
  406. else
  407. altcopy(waiter, me, c->e);
  408. }
  409. *b->tag = c; /* commits us to rendezvous */
  410. _threaddebug(DBGCHAN, "unlocking the chanlock");
  411. unlock(&chanlock);
  412. _procsplx(spl);
  413. _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
  414. while(_threadrendezvous(b->tag, 0) == (void*)~0)
  415. ;
  416. return 1;
  417. }
  418. buf = altexecbuffered(a, 0);
  419. if(a->op==CHANRCV)
  420. altcopy(me, buf, c->e);
  421. else
  422. altcopy(buf, me, c->e);
  423. unlock(&chanlock);
  424. _procsplx(spl);
  425. return 1;
  426. }