channel.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. /*
  2. * This file is part of the UCB release of Plan 9. It is subject to the license
  3. * terms in the LICENSE file found in the top-level directory of this
  4. * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
  5. * part of the UCB release of Plan 9, including this file, may be copied,
  6. * modified, propagated, or distributed except according to the terms contained
  7. * in the LICENSE file.
  8. */
  9. #include <u.h>
  10. #include <libc.h>
  11. #include <thread.h>
  12. #include "threadimpl.h"
  13. /* Value to indicate the channel is closed */
  14. enum {
  15. CHANCLOSD = 0xc105ed,
  16. };
  17. static char errcl[] = "channel was closed";
  18. static Lock chanlock; /* central channel access lock */
  19. static void enqueue(Alt*, Channel**);
  20. static void dequeue(Alt*);
  21. static int canexec(Alt*);
  22. static int altexec(Alt*, int);
  23. #define Closed ((void*)CHANCLOSD)
  24. #define Intred ((void*)~0) /* interrupted */
  25. static void
  26. _chanfree(Channel *c)
  27. {
  28. int i, inuse;
  29. if(c->closed == 1) /* chanclose is ongoing */
  30. inuse = 1;
  31. else{
  32. inuse = 0;
  33. for(i = 0; i < c->nentry; i++) /* alt ongoing */
  34. if(c->qentry[i])
  35. inuse = 1;
  36. }
  37. if(inuse)
  38. c->freed = 1;
  39. else{
  40. if(c->qentry)
  41. free(c->qentry);
  42. free(c);
  43. }
  44. }
  45. void
  46. chanfree(Channel *c)
  47. {
  48. lock(&chanlock);
  49. _chanfree(c);
  50. unlock(&chanlock);
  51. }
  52. int
  53. chaninit(Channel *c, int elemsize, int elemcnt)
  54. {
  55. if(elemcnt < 0 || elemsize <= 0 || c == nil)
  56. return -1;
  57. c->f = 0;
  58. c->n = 0;
  59. c->closed = 0;
  60. c->freed = 0;
  61. c->e = elemsize;
  62. c->s = elemcnt;
  63. _threaddebug(DBGCHAN, "chaninit %p", c);
  64. return 1;
  65. }
  66. Channel*
  67. chancreate(int elemsize, int elemcnt)
  68. {
  69. Channel *c;
  70. if(elemcnt < 0 || elemsize <= 0)
  71. return nil;
  72. c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
  73. c->e = elemsize;
  74. c->s = elemcnt;
  75. _threaddebug(DBGCHAN, "chancreate %p", c);
  76. return c;
  77. }
  78. /* isopenfor is meant to be called under the chanlock. */
  79. static int
  80. isopenfor(Channel *c, int op)
  81. {
  82. return c->closed == 0 || (op == CHANRCV && c->n > 0);
  83. }
  84. int
  85. alt(Alt *alts)
  86. {
  87. Alt *a, *xa, *ca;
  88. Channel volatile *c;
  89. int n, s, waiting, allreadycl;
  90. void* r;
  91. Thread *t;
  92. /*
  93. * The point of going splhi here is that note handlers
  94. * might reasonably want to use channel operations,
  95. * but that will hang if the note comes while we hold the
  96. * chanlock. Instead, we delay the note until we've dropped
  97. * the lock.
  98. */
  99. t = _threadgetproc()->thread;
  100. if(t->moribund || _threadexitsallstatus)
  101. yield(); /* won't return */
  102. s = _procsplhi();
  103. lock(&chanlock);
  104. t->alt = alts;
  105. t->chan = Chanalt;
  106. /* test whether any channels can proceed */
  107. n = 0;
  108. a = nil;
  109. for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
  110. xa->entryno = -1;
  111. if(xa->op == CHANNOP)
  112. continue;
  113. c = xa->c;
  114. if(c==nil){
  115. unlock(&chanlock);
  116. _procsplx(s);
  117. t->chan = Channone;
  118. return -1;
  119. }
  120. if(isopenfor((Channel*)c, xa->op) && canexec(xa))
  121. if(nrand(++n) == 0)
  122. a = xa;
  123. }
  124. if(a==nil){
  125. /* nothing can proceed */
  126. if(xa->op == CHANNOBLK){
  127. unlock(&chanlock);
  128. _procsplx(s);
  129. t->chan = Channone;
  130. if(xa->op == CHANNOBLK)
  131. return xa - alts;
  132. }
  133. /* enqueue on all channels open for us. */
  134. c = nil;
  135. ca = nil;
  136. waiting = 0;
  137. allreadycl = 0;
  138. for(xa=alts; xa->op!=CHANEND; xa++)
  139. if(xa->op==CHANNOP)
  140. continue;
  141. else if(isopenfor(xa->c, xa->op)){
  142. waiting = 1;
  143. enqueue(xa, (Channel**)&c);
  144. } else if(xa->err != errcl)
  145. ca = xa;
  146. else
  147. allreadycl = 1;
  148. if(waiting == 0) {
  149. if(ca != nil){
  150. /* everything was closed, select last channel */
  151. ca->err = errcl;
  152. unlock(&chanlock);
  153. _procsplx(s);
  154. t->chan = Channone;
  155. return ca - alts;
  156. } else if(allreadycl){
  157. /* everything was already closed */
  158. unlock(&chanlock);
  159. _procsplx(s);
  160. t->chan = Channone;
  161. return -1;
  162. }
  163. }
  164. /*
  165. * wait for successful rendezvous.
  166. * we can't just give up if the rendezvous
  167. * is interrupted -- someone else might come
  168. * along and try to rendezvous with us, so
  169. * we need to be here.
  170. * if the channel was closed, the op is done
  171. * and we flag an error for the entry.
  172. */
  173. Again:
  174. unlock(&chanlock);
  175. _procsplx(s);
  176. r = _threadrendezvous(&c, 0);
  177. s = _procsplhi();
  178. lock(&chanlock);
  179. if(r==Intred){ /* interrupted */
  180. if(c!=nil) /* someone will meet us; go back */
  181. goto Again;
  182. c = (Channel*)~0; /* so no one tries to meet us */
  183. }
  184. /* dequeue from channels, find selected one */
  185. a = nil;
  186. for(xa=alts; xa->op!=CHANEND; xa++){
  187. if(xa->op==CHANNOP)
  188. continue;
  189. if(xa->c == c){
  190. a = xa;
  191. a->err = nil;
  192. if(r == Closed)
  193. a->err = errcl;
  194. }
  195. dequeue(xa);
  196. }
  197. unlock(&chanlock);
  198. _procsplx(s);
  199. if(a == nil){ /* we were interrupted */
  200. assert(c==(Channel*)~0);
  201. return -1;
  202. }
  203. }else
  204. altexec(a, s); /* unlocks chanlock, does splx */
  205. _sched();
  206. t->chan = Channone;
  207. return a - alts;
  208. }
  209. int
  210. chanclose(Channel *c)
  211. {
  212. volatile Alt *a;
  213. int i, s;
  214. s = _procsplhi(); /* note handlers; see :/^alt */
  215. lock(&chanlock);
  216. if(c->closed){
  217. /* Already close; we fail but it's ok. don't print */
  218. unlock(&chanlock);
  219. _procsplx(s);
  220. return -1;
  221. }
  222. c->closed = 1; /* Being closed */
  223. /*
  224. * Locate entries that will fail due to close
  225. * (send, and receive if nothing buffered) and wake them up.
  226. * the situation cannot change because all queries
  227. * should be committed by now and new ones will find the channel
  228. * closed. We still need to take the lock during the iteration
  229. * because we can wake threads on qentrys we have not seen yet
  230. * as in alt and there would be a race in the access to *a.
  231. */
  232. for(i = 0; i < c->nentry; i++){
  233. if((a = c->qentry[i]) == nil || *a->tag != nil)
  234. continue;
  235. if(a->op != CHANSND && (a->op != CHANRCV || c->n != 0))
  236. continue;
  237. *a->tag = c;
  238. unlock(&chanlock);
  239. _procsplx(s);
  240. while(_threadrendezvous(a->tag, Closed) == Intred)
  241. ;
  242. s = _procsplhi();
  243. lock(&chanlock);
  244. }
  245. c->closed = 2; /* Fully closed */
  246. if(c->freed)
  247. _chanfree(c);
  248. unlock(&chanlock);
  249. _procsplx(s);
  250. return 0;
  251. }
  252. int
  253. chanclosing(Channel *c)
  254. {
  255. int n, s;
  256. s = _procsplhi(); /* note handlers; see :/^alt */
  257. lock(&chanlock);
  258. if(c->closed == 0)
  259. n = -1;
  260. else
  261. n = c->n;
  262. unlock(&chanlock);
  263. _procsplx(s);
  264. return n;
  265. }
  266. /*
  267. * superseded by chanclosing
  268. int
  269. chanisclosed(Channel *c)
  270. {
  271. return chanisclosing(c) >= 0;
  272. }
  273. */
  274. static int
  275. runop(int op, Channel *c, void *v, int nb)
  276. {
  277. int r;
  278. Alt a[2];
  279. /*
  280. * we could do this without calling alt,
  281. * but the only reason would be performance,
  282. * and i'm not convinced it matters.
  283. */
  284. a[0].op = op;
  285. a[0].c = c;
  286. a[0].v = v;
  287. a[0].err = nil;
  288. a[1].op = CHANEND;
  289. if(nb)
  290. a[1].op = CHANNOBLK;
  291. switch(r=alt(a)){
  292. case -1: /* interrupted */
  293. return -1;
  294. case 1: /* nonblocking, didn't accomplish anything */
  295. assert(nb);
  296. return 0;
  297. case 0:
  298. /*
  299. * Okay, but return -1 if the op is done because of a close.
  300. */
  301. if(a[0].err != nil)
  302. return -1;
  303. return 1;
  304. default:
  305. fprint(2, "ERROR: channel alt returned %d\n", r);
  306. abort();
  307. return -1;
  308. }
  309. }
  310. int
  311. recv(Channel *c, void *v)
  312. {
  313. return runop(CHANRCV, c, v, 0);
  314. }
  315. int
  316. nbrecv(Channel *c, void *v)
  317. {
  318. return runop(CHANRCV, c, v, 1);
  319. }
  320. int
  321. send(Channel *c, void *v)
  322. {
  323. return runop(CHANSND, c, v, 0);
  324. }
  325. int
  326. nbsend(Channel *c, void *v)
  327. {
  328. return runop(CHANSND, c, v, 1);
  329. }
  330. static void
  331. channelsize(Channel *c, int sz)
  332. {
  333. if(c->e != sz){
  334. fprint(2, "expected channel with elements of size %d, got size %d\n",
  335. sz, c->e);
  336. abort();
  337. }
  338. }
  339. int
  340. sendul(Channel *c, uint32_t v)
  341. {
  342. channelsize(c, sizeof(uint32_t));
  343. return send(c, &v);
  344. }
  345. uint32_t
  346. recvul(Channel *c)
  347. {
  348. uint32_t v;
  349. channelsize(c, sizeof(uint32_t));
  350. if(recv(c, &v) < 0)
  351. return ~0;
  352. return v;
  353. }
  354. int
  355. sendp(Channel *c, void *v)
  356. {
  357. channelsize(c, sizeof(void*));
  358. return send(c, &v);
  359. }
  360. void*
  361. recvp(Channel *c)
  362. {
  363. void *v;
  364. channelsize(c, sizeof(void*));
  365. if(recv(c, &v) < 0)
  366. return nil;
  367. return v;
  368. }
  369. int
  370. nbsendul(Channel *c, uint32_t v)
  371. {
  372. channelsize(c, sizeof(uint32_t));
  373. return nbsend(c, &v);
  374. }
  375. uint32_t
  376. nbrecvul(Channel *c)
  377. {
  378. uint32_t v;
  379. channelsize(c, sizeof(uint32_t));
  380. if(nbrecv(c, &v) == 0)
  381. return 0;
  382. return v;
  383. }
  384. int
  385. nbsendp(Channel *c, void *v)
  386. {
  387. channelsize(c, sizeof(void*));
  388. return nbsend(c, &v);
  389. }
  390. void*
  391. nbrecvp(Channel *c)
  392. {
  393. void *v;
  394. channelsize(c, sizeof(void*));
  395. if(nbrecv(c, &v) == 0)
  396. return nil;
  397. return v;
  398. }
  399. static int
  400. emptyentry(Channel *c)
  401. {
  402. int i, extra;
  403. assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
  404. for(i=0; i<c->nentry; i++)
  405. if(c->qentry[i]==nil)
  406. return i;
  407. extra = 16;
  408. c->nentry += extra;
  409. c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
  410. if(c->qentry == nil)
  411. sysfatal("realloc channel entries: %r");
  412. memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
  413. return i;
  414. }
  415. /* enqueue is called under the chanlock, the Channel can be treated as non-volatile. */
  416. static void
  417. enqueue(Alt *a, Channel **c)
  418. {
  419. int i;
  420. _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
  421. a->tag = c;
  422. i = emptyentry(a->c);
  423. a->c->qentry[i] = a;
  424. }
  425. static void
  426. dequeue(Alt *a)
  427. {
  428. int i;
  429. Channel *c;
  430. c = a->c;
  431. for(i=0; i<c->nentry; i++)
  432. if(c->qentry[i]==a){
  433. _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
  434. c->qentry[i] = nil;
  435. /* release if freed and not closing */
  436. if(c->freed && c->closed != 1)
  437. _chanfree(c);
  438. return;
  439. }
  440. }
  441. static int
  442. canexec(Alt *a)
  443. {
  444. int i, otherop;
  445. Channel *c;
  446. c = a->c;
  447. /* are there senders or receivers blocked? */
  448. otherop = (CHANSND+CHANRCV) - a->op;
  449. for(i=0; i<c->nentry; i++)
  450. if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
  451. _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
  452. return 1;
  453. }
  454. /* is there room in the channel? */
  455. if((a->op==CHANSND && c->n < c->s)
  456. || (a->op==CHANRCV && c->n > 0)){
  457. _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
  458. return 1;
  459. }
  460. return 0;
  461. }
  462. static void*
  463. altexecbuffered(Alt *a, int willreplace)
  464. {
  465. uint8_t *v;
  466. Channel *c;
  467. c = a->c;
  468. /* use buffered channel queue */
  469. if(a->op==CHANRCV && c->n > 0){
  470. _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
  471. v = c->v + c->e*(c->f%c->s);
  472. if(!willreplace)
  473. c->n--;
  474. c->f++;
  475. return v;
  476. }
  477. if(a->op==CHANSND && c->n < c->s){
  478. _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
  479. v = c->v + c->e*((c->f+c->n)%c->s);
  480. if(!willreplace)
  481. c->n++;
  482. return v;
  483. }
  484. abort();
  485. return nil;
  486. }
  487. static void
  488. altcopy(void *dst, void *src, int sz)
  489. {
  490. if(dst){
  491. if(src)
  492. memmove(dst, src, sz);
  493. else
  494. memset(dst, 0, sz);
  495. }
  496. }
  497. static int
  498. altexec(Alt *a, int spl)
  499. {
  500. volatile Alt *b;
  501. int i, n, otherop;
  502. Channel *c;
  503. void *me, *waiter, *buf;
  504. c = a->c;
  505. /* rendezvous with others */
  506. otherop = (CHANSND+CHANRCV) - a->op;
  507. n = 0;
  508. b = nil;
  509. me = a->v;
  510. for(i=0; i<c->nentry; i++)
  511. if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
  512. if(nrand(++n) == 0)
  513. b = c->qentry[i];
  514. if(b != nil){
  515. _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
  516. waiter = b->v;
  517. if(c->s && c->n){
  518. /*
  519. * if buffer is full and there are waiters
  520. * and we're meeting a waiter,
  521. * we must be receiving.
  522. *
  523. * we use the value in the channel buffer,
  524. * copy the waiter's value into the channel buffer
  525. * on behalf of the waiter, and then wake the waiter.
  526. */
  527. if(a->op!=CHANRCV)
  528. abort();
  529. buf = altexecbuffered(a, 1);
  530. altcopy(me, buf, c->e);
  531. altcopy(buf, waiter, c->e);
  532. }else{
  533. if(a->op==CHANRCV)
  534. altcopy(me, waiter, c->e);
  535. else
  536. altcopy(waiter, me, c->e);
  537. }
  538. *b->tag = c; /* commits us to rendezvous */
  539. _threaddebug(DBGCHAN, "unlocking the chanlock");
  540. unlock(&chanlock);
  541. _procsplx(spl);
  542. _threaddebug(DBGCHAN, "chanlock is %lu",
  543. *(uint32_t*)&chanlock);
  544. while(_threadrendezvous(b->tag, 0) == Intred)
  545. ;
  546. return 1;
  547. }
  548. buf = altexecbuffered(a, 0);
  549. if(a->op==CHANRCV)
  550. altcopy(me, buf, c->e);
  551. else
  552. altcopy(buf, me, c->e);
  553. unlock(&chanlock);
  554. _procsplx(spl);
  555. return 1;
  556. }