channel.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628
  1. /*
  2. * This file is part of the UCB release of Plan 9. It is subject to the license
  3. * terms in the LICENSE file found in the top-level directory of this
  4. * distribution and at http://akaros.cs.berkeley.edu/files/Plan9License. No
  5. * part of the UCB release of Plan 9, including this file, may be copied,
  6. * modified, propagated, or distributed except according to the terms contained
  7. * in the LICENSE file.
  8. */
  9. #include <u.h>
  10. #include <libc.h>
  11. #include <thread.h>
  12. #include "threadimpl.h"
  13. /* Value to indicate the channel is closed */
  14. enum {
  15. CHANCLOSD = 0xc105ed,
  16. };
  17. static char errcl[] = "channel was closed";
  18. static Lock chanlock; /* central channel access lock */
  19. static void enqueue(Alt*, Channel**);
  20. static void dequeue(Alt*);
  21. static int canexec(Alt*);
  22. static int altexec(Alt*, int);
  23. #define Closed ((void*)CHANCLOSD)
  24. #define Intred ((void*)~0) /* interrupted */
  25. static void
  26. _chanfree(Channel *c)
  27. {
  28. int i, inuse;
  29. if(c->closed == 1) /* chanclose is ongoing */
  30. inuse = 1;
  31. else{
  32. inuse = 0;
  33. for(i = 0; i < c->nentry; i++) /* alt ongoing */
  34. if(c->qentry[i])
  35. inuse = 1;
  36. }
  37. if(inuse)
  38. c->freed = 1;
  39. else{
  40. if(c->qentry)
  41. free(c->qentry);
  42. free(c);
  43. }
  44. }
  45. void
  46. chanfree(Channel *c)
  47. {
  48. lock(&chanlock);
  49. _chanfree(c);
  50. unlock(&chanlock);
  51. }
  52. int
  53. chaninit(Channel *c, int elemsize, int elemcnt)
  54. {
  55. if(elemcnt < 0 || elemsize <= 0 || c == nil)
  56. return -1;
  57. c->f = 0;
  58. c->n = 0;
  59. c->closed = 0;
  60. c->freed = 0;
  61. c->e = elemsize;
  62. c->s = elemcnt;
  63. _threaddebug(DBGCHAN, "chaninit %p", c);
  64. return 1;
  65. }
  66. Channel*
  67. chancreate(int elemsize, int elemcnt)
  68. {
  69. Channel *c;
  70. if(elemcnt < 0 || elemsize <= 0)
  71. return nil;
  72. c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
  73. c->e = elemsize;
  74. c->s = elemcnt;
  75. _threaddebug(DBGCHAN, "chancreate %p", c);
  76. return c;
  77. }
  78. /* isopenfor is meant to be called under the chanlock. */
  79. static int
  80. isopenfor(Channel *c, int op)
  81. {
  82. return c->closed == 0 || (op == CHANRCV && c->n > 0);
  83. }
  84. int
  85. alt(Alt *alts)
  86. {
  87. Alt *a, *xa, *ca;
  88. Channel *c;
  89. int n, s, waiting, allreadycl;
  90. void* r;
  91. Thread *t;
  92. /*
  93. * The point of going splhi here is that note handlers
  94. * might reasonably want to use channel operations,
  95. * but that will hang if the note comes while we hold the
  96. * chanlock. Instead, we delay the note until we've dropped
  97. * the lock.
  98. */
  99. t = _threadgetproc()->thread;
  100. if(t->moribund || _threadexitsallstatus)
  101. yield(); /* won't return */
  102. s = _procsplhi();
  103. lock(&chanlock);
  104. t->alt = alts;
  105. t->chan = Chanalt;
  106. /* test whether any channels can proceed */
  107. n = 0;
  108. a = nil;
  109. for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
  110. xa->entryno = -1;
  111. if(xa->op == CHANNOP)
  112. continue;
  113. c = xa->c;
  114. if(c==nil){
  115. unlock(&chanlock);
  116. _procsplx(s);
  117. t->chan = Channone;
  118. return -1;
  119. }
  120. if(isopenfor(c, xa->op) && canexec(xa))
  121. if(nrand(++n) == 0)
  122. a = xa;
  123. }
  124. if(a==nil){
  125. /* nothing can proceed */
  126. if(xa->op == CHANNOBLK){
  127. unlock(&chanlock);
  128. _procsplx(s);
  129. t->chan = Channone;
  130. if(xa->op == CHANNOBLK)
  131. return xa - alts;
  132. }
  133. /* enqueue on all channels open for us. */
  134. c = nil;
  135. ca = nil;
  136. waiting = 0;
  137. allreadycl = 0;
  138. for(xa=alts; xa->op!=CHANEND; xa++)
  139. if(xa->op==CHANNOP)
  140. continue;
  141. else if(isopenfor(xa->c, xa->op)){
  142. waiting = 1;
  143. enqueue(xa, &c);
  144. } else if(xa->err != errcl)
  145. ca = xa;
  146. else
  147. allreadycl = 1;
  148. if(waiting == 0)
  149. if(ca != nil){
  150. /* everything was closed, select last channel */
  151. ca->err = errcl;
  152. unlock(&chanlock);
  153. _procsplx(s);
  154. t->chan = Channone;
  155. return ca - alts;
  156. } else if(allreadycl){
  157. /* everything was already closed */
  158. unlock(&chanlock);
  159. _procsplx(s);
  160. t->chan = Channone;
  161. return -1;
  162. }
  163. /*
  164. * wait for successful rendezvous.
  165. * we can't just give up if the rendezvous
  166. * is interrupted -- someone else might come
  167. * along and try to rendezvous with us, so
  168. * we need to be here.
  169. * if the channel was closed, the op is done
  170. * and we flag an error for the entry.
  171. */
  172. Channel volatile *vc;
  173. vc = (Channel volatile *)c;
  174. Again:
  175. unlock(&chanlock);
  176. _procsplx(s);
  177. r = _threadrendezvous(&vc, 0);
  178. s = _procsplhi();
  179. lock(&chanlock);
  180. if(r==Intred){ /* interrupted */
  181. if(vc!=nil) /* someone will meet us; go back */
  182. goto Again;
  183. vc = (Channel*)~0; /* so no one tries to meet us */
  184. }
  185. c = (Channel *)vc;
  186. /* dequeue from channels, find selected one */
  187. a = nil;
  188. for(xa=alts; xa->op!=CHANEND; xa++){
  189. if(xa->op==CHANNOP)
  190. continue;
  191. if(xa->c == c){
  192. a = xa;
  193. a->err = nil;
  194. if(r == Closed)
  195. a->err = errcl;
  196. }
  197. dequeue(xa);
  198. }
  199. unlock(&chanlock);
  200. _procsplx(s);
  201. if(a == nil){ /* we were interrupted */
  202. assert(c==(Channel*)~0);
  203. return -1;
  204. }
  205. }else
  206. altexec(a, s); /* unlocks chanlock, does splx */
  207. _sched();
  208. t->chan = Channone;
  209. return a - alts;
  210. }
  211. int
  212. chanclose(Channel *c)
  213. {
  214. volatile Alt *a;
  215. int i, s;
  216. s = _procsplhi(); /* note handlers; see :/^alt */
  217. lock(&chanlock);
  218. if(c->closed){
  219. /* Already close; we fail but it's ok. don't print */
  220. unlock(&chanlock);
  221. _procsplx(s);
  222. return -1;
  223. }
  224. c->closed = 1; /* Being closed */
  225. /*
  226. * Locate entries that will fail due to close
  227. * (send, and receive if nothing buffered) and wake them up.
  228. * the situation cannot change because all queries
  229. * should be committed by now and new ones will find the channel
  230. * closed. We still need to take the lock during the iteration
  231. * because we can wake threads on qentrys we have not seen yet
  232. * as in alt and there would be a race in the access to *a.
  233. */
  234. for(i = 0; i < c->nentry; i++){
  235. if((a = c->qentry[i]) == nil || *a->tag != nil)
  236. continue;
  237. if(a->op != CHANSND && (a->op != CHANRCV || c->n != 0))
  238. continue;
  239. *a->tag = c;
  240. unlock(&chanlock);
  241. _procsplx(s);
  242. while(_threadrendezvous(a->tag, Closed) == Intred)
  243. ;
  244. s = _procsplhi();
  245. lock(&chanlock);
  246. }
  247. c->closed = 2; /* Fully closed */
  248. if(c->freed)
  249. _chanfree(c);
  250. unlock(&chanlock);
  251. _procsplx(s);
  252. return 0;
  253. }
  254. int
  255. chanclosing(Channel *c)
  256. {
  257. int n, s;
  258. s = _procsplhi(); /* note handlers; see :/^alt */
  259. lock(&chanlock);
  260. if(c->closed == 0)
  261. n = -1;
  262. else
  263. n = c->n;
  264. unlock(&chanlock);
  265. _procsplx(s);
  266. return n;
  267. }
  268. /*
  269. * superseded by chanclosing
  270. int
  271. chanisclosed(Channel *c)
  272. {
  273. return chanisclosing(c) >= 0;
  274. }
  275. */
  276. static int
  277. runop(int op, Channel *c, void *v, int nb)
  278. {
  279. int r;
  280. Alt a[2];
  281. /*
  282. * we could do this without calling alt,
  283. * but the only reason would be performance,
  284. * and i'm not convinced it matters.
  285. */
  286. a[0].op = op;
  287. a[0].c = c;
  288. a[0].v = v;
  289. a[0].err = nil;
  290. a[1].op = CHANEND;
  291. if(nb)
  292. a[1].op = CHANNOBLK;
  293. switch(r=alt(a)){
  294. case -1: /* interrupted */
  295. return -1;
  296. case 1: /* nonblocking, didn't accomplish anything */
  297. assert(nb);
  298. return 0;
  299. case 0:
  300. /*
  301. * Okay, but return -1 if the op is done because of a close.
  302. */
  303. if(a[0].err != nil)
  304. return -1;
  305. return 1;
  306. default:
  307. fprint(2, "ERROR: channel alt returned %d\n", r);
  308. abort();
  309. return -1;
  310. }
  311. }
  312. int
  313. recv(Channel *c, void *v)
  314. {
  315. return runop(CHANRCV, c, v, 0);
  316. }
  317. int
  318. nbrecv(Channel *c, void *v)
  319. {
  320. return runop(CHANRCV, c, v, 1);
  321. }
  322. int
  323. send(Channel *c, void *v)
  324. {
  325. return runop(CHANSND, c, v, 0);
  326. }
  327. int
  328. nbsend(Channel *c, void *v)
  329. {
  330. return runop(CHANSND, c, v, 1);
  331. }
  332. static void
  333. channelsize(Channel *c, int sz)
  334. {
  335. if(c->e != sz){
  336. fprint(2, "expected channel with elements of size %d, got size %d\n",
  337. sz, c->e);
  338. abort();
  339. }
  340. }
  341. int
  342. sendul(Channel *c, uint32_t v)
  343. {
  344. channelsize(c, sizeof(uint32_t));
  345. return send(c, &v);
  346. }
  347. uint32_t
  348. recvul(Channel *c)
  349. {
  350. uint32_t v;
  351. channelsize(c, sizeof(uint32_t));
  352. if(recv(c, &v) < 0)
  353. return ~0;
  354. return v;
  355. }
  356. int
  357. sendp(Channel *c, void *v)
  358. {
  359. channelsize(c, sizeof(void*));
  360. return send(c, &v);
  361. }
  362. void*
  363. recvp(Channel *c)
  364. {
  365. void *v;
  366. channelsize(c, sizeof(void*));
  367. if(recv(c, &v) < 0)
  368. return nil;
  369. return v;
  370. }
  371. int
  372. nbsendul(Channel *c, uint32_t v)
  373. {
  374. channelsize(c, sizeof(uint32_t));
  375. return nbsend(c, &v);
  376. }
  377. uint32_t
  378. nbrecvul(Channel *c)
  379. {
  380. uint32_t v;
  381. channelsize(c, sizeof(uint32_t));
  382. if(nbrecv(c, &v) == 0)
  383. return 0;
  384. return v;
  385. }
  386. int
  387. nbsendp(Channel *c, void *v)
  388. {
  389. channelsize(c, sizeof(void*));
  390. return nbsend(c, &v);
  391. }
  392. void*
  393. nbrecvp(Channel *c)
  394. {
  395. void *v;
  396. channelsize(c, sizeof(void*));
  397. if(nbrecv(c, &v) == 0)
  398. return nil;
  399. return v;
  400. }
  401. static int
  402. emptyentry(Channel *c)
  403. {
  404. int i, extra;
  405. assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
  406. for(i=0; i<c->nentry; i++)
  407. if(c->qentry[i]==nil)
  408. return i;
  409. extra = 16;
  410. c->nentry += extra;
  411. c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
  412. if(c->qentry == nil)
  413. sysfatal("realloc channel entries: %r");
  414. memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
  415. return i;
  416. }
  417. /* enqueue is called under the chanlock, the Channel can be treated as non-volatile. */
  418. static void
  419. enqueue(Alt *a, Channel **c)
  420. {
  421. int i;
  422. _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
  423. a->tag = c;
  424. i = emptyentry(a->c);
  425. a->c->qentry[i] = a;
  426. }
  427. static void
  428. dequeue(Alt *a)
  429. {
  430. int i;
  431. Channel *c;
  432. c = a->c;
  433. for(i=0; i<c->nentry; i++)
  434. if(c->qentry[i]==a){
  435. _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
  436. c->qentry[i] = nil;
  437. /* release if freed and not closing */
  438. if(c->freed && c->closed != 1)
  439. _chanfree(c);
  440. return;
  441. }
  442. }
  443. static int
  444. canexec(Alt *a)
  445. {
  446. int i, otherop;
  447. Channel *c;
  448. c = a->c;
  449. /* are there senders or receivers blocked? */
  450. otherop = (CHANSND+CHANRCV) - a->op;
  451. for(i=0; i<c->nentry; i++)
  452. if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
  453. _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
  454. return 1;
  455. }
  456. /* is there room in the channel? */
  457. if((a->op==CHANSND && c->n < c->s)
  458. || (a->op==CHANRCV && c->n > 0)){
  459. _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
  460. return 1;
  461. }
  462. return 0;
  463. }
  464. static void*
  465. altexecbuffered(Alt *a, int willreplace)
  466. {
  467. uint8_t *v;
  468. Channel *c;
  469. c = a->c;
  470. /* use buffered channel queue */
  471. if(a->op==CHANRCV && c->n > 0){
  472. _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
  473. v = c->v + c->e*(c->f%c->s);
  474. if(!willreplace)
  475. c->n--;
  476. c->f++;
  477. return v;
  478. }
  479. if(a->op==CHANSND && c->n < c->s){
  480. _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
  481. v = c->v + c->e*((c->f+c->n)%c->s);
  482. if(!willreplace)
  483. c->n++;
  484. return v;
  485. }
  486. abort();
  487. return nil;
  488. }
  489. static void
  490. altcopy(void *dst, void *src, int sz)
  491. {
  492. if(dst){
  493. if(src)
  494. memmove(dst, src, sz);
  495. else
  496. memset(dst, 0, sz);
  497. }
  498. }
  499. static int
  500. altexec(Alt *a, int spl)
  501. {
  502. volatile Alt *b;
  503. int i, n, otherop;
  504. Channel *c;
  505. void *me, *waiter, *buf;
  506. c = a->c;
  507. /* rendezvous with others */
  508. otherop = (CHANSND+CHANRCV) - a->op;
  509. n = 0;
  510. b = nil;
  511. me = a->v;
  512. for(i=0; i<c->nentry; i++)
  513. if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
  514. if(nrand(++n) == 0)
  515. b = c->qentry[i];
  516. if(b != nil){
  517. _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
  518. waiter = b->v;
  519. if(c->s && c->n){
  520. /*
  521. * if buffer is full and there are waiters
  522. * and we're meeting a waiter,
  523. * we must be receiving.
  524. *
  525. * we use the value in the channel buffer,
  526. * copy the waiter's value into the channel buffer
  527. * on behalf of the waiter, and then wake the waiter.
  528. */
  529. if(a->op!=CHANRCV)
  530. abort();
  531. buf = altexecbuffered(a, 1);
  532. altcopy(me, buf, c->e);
  533. altcopy(buf, waiter, c->e);
  534. }else{
  535. if(a->op==CHANRCV)
  536. altcopy(me, waiter, c->e);
  537. else
  538. altcopy(waiter, me, c->e);
  539. }
  540. *b->tag = c; /* commits us to rendezvous */
  541. _threaddebug(DBGCHAN, "unlocking the chanlock");
  542. unlock(&chanlock);
  543. _procsplx(spl);
  544. _threaddebug(DBGCHAN, "chanlock is %lud",
  545. *(uint32_t*)&chanlock);
  546. while(_threadrendezvous(b->tag, 0) == Intred)
  547. ;
  548. return 1;
  549. }
  550. buf = altexecbuffered(a, 0);
  551. if(a->op==CHANRCV)
  552. altcopy(me, buf, c->e);
  553. else
  554. altcopy(buf, me, c->e);
  555. unlock(&chanlock);
  556. _procsplx(spl);
  557. return 1;
  558. }