client.c 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. #include <u.h>
  2. #include <libc.h>
  3. #include <fcall.h>
  4. #include <thread.h>
  5. #include <9p.h>
  6. #include "dat.h"
  7. int nclient;
  8. Client **client;
  9. #define Zmsg ((Msg*)~0)
  10. char nocmd[] = "";
  11. static void readthread(void*);
  12. static void writethread(void*);
  13. static void kickwriter(Client*);
  14. int
  15. newclient(void)
  16. {
  17. int i;
  18. Client *c;
  19. for(i=0; i<nclient; i++)
  20. if(client[i]->ref==0 && !client[i]->moribund)
  21. return i;
  22. c = emalloc(sizeof(Client));
  23. c->writerkick = chancreate(sizeof(void*), 1);
  24. c->execpid = chancreate(sizeof(ulong), 0);
  25. c->cmd = nocmd;
  26. c->readerproc = ioproc();
  27. c->writerproc = ioproc();
  28. c->num = nclient;
  29. if(nclient%16 == 0)
  30. client = erealloc(client, (nclient+16)*sizeof(client[0]));
  31. client[nclient++] = c;
  32. return nclient-1;
  33. }
  34. void
  35. die(Client *c)
  36. {
  37. Msg *m, *next;
  38. Req *r, *rnext;
  39. c->moribund = 1;
  40. kickwriter(c);
  41. iointerrupt(c->readerproc);
  42. iointerrupt(c->writerproc);
  43. if(--c->activethread == 0){
  44. if(c->cmd != nocmd){
  45. free(c->cmd);
  46. c->cmd = nocmd;
  47. }
  48. c->pid = 0;
  49. c->moribund = 0;
  50. c->status = Closed;
  51. for(m=c->mq; m && m != Zmsg; m=next){
  52. next = m->link;
  53. free(m);
  54. }
  55. c->mq = nil;
  56. if(c->rq != nil){
  57. for(r=c->rq; r; r=rnext){
  58. rnext = r->aux;
  59. respond(r, "hangup");
  60. }
  61. c->rq = nil;
  62. }
  63. if(c->wq != nil){
  64. for(r=c->wq; r; r=rnext){
  65. rnext = r->aux;
  66. respond(r, "hangup");
  67. }
  68. c->wq = nil;
  69. }
  70. c->rq = nil;
  71. c->wq = nil;
  72. c->emq = nil;
  73. c->erq = nil;
  74. c->ewq = nil;
  75. }
  76. }
  77. void
  78. closeclient(Client *c)
  79. {
  80. if(--c->ref == 0){
  81. if(c->pid > 0)
  82. postnote(PNPROC, c->pid, "kill");
  83. c->status = Hangup;
  84. close(c->fd[0]);
  85. c->fd[0] = c->fd[1] = -1;
  86. c->moribund = 1;
  87. kickwriter(c);
  88. iointerrupt(c->readerproc);
  89. iointerrupt(c->writerproc);
  90. c->activethread++;
  91. die(c);
  92. }
  93. }
  94. void
  95. queuerdreq(Client *c, Req *r)
  96. {
  97. if(c->rq==nil)
  98. c->erq = &c->rq;
  99. *c->erq = r;
  100. r->aux = nil;
  101. c->erq = (Req**)&r->aux;
  102. }
  103. void
  104. queuewrreq(Client *c, Req *r)
  105. {
  106. if(c->wq==nil)
  107. c->ewq = &c->wq;
  108. *c->ewq = r;
  109. r->aux = nil;
  110. c->ewq = (Req**)&r->aux;
  111. }
  112. void
  113. queuemsg(Client *c, Msg *m)
  114. {
  115. if(c->mq==nil)
  116. c->emq = &c->mq;
  117. *c->emq = m;
  118. if(m != Zmsg){
  119. m->link = nil;
  120. c->emq = (Msg**)&m->link;
  121. }else
  122. c->emq = nil;
  123. }
  124. void
  125. matchmsgs(Client *c)
  126. {
  127. Req *r;
  128. Msg *m;
  129. int n, rm;
  130. while(c->rq && c->mq){
  131. r = c->rq;
  132. c->rq = r->aux;
  133. rm = 0;
  134. m = c->mq;
  135. if(m == Zmsg){
  136. respond(r, "execnet: no more data");
  137. break;
  138. }
  139. n = r->ifcall.count;
  140. if(n >= m->ep - m->rp){
  141. n = m->ep - m->rp;
  142. c->mq = m->link;
  143. rm = 1;
  144. }
  145. if(n)
  146. memmove(r->ofcall.data, m->rp, n);
  147. if(rm)
  148. free(m);
  149. else
  150. m->rp += n;
  151. r->ofcall.count = n;
  152. respond(r, nil);
  153. }
  154. }
  155. void
  156. findrdreq(Client *c, Req *r)
  157. {
  158. Req **l;
  159. for(l=&c->rq; *l; l=(Req**)&(*l)->aux){
  160. if(*l == r){
  161. *l = r->aux;
  162. if(*l == nil)
  163. c->erq = l;
  164. respond(r, "flushed");
  165. break;
  166. }
  167. }
  168. }
  169. void
  170. findwrreq(Client *c, Req *r)
  171. {
  172. Req **l;
  173. for(l=&c->wq; *l; l=(Req**)&(*l)->aux){
  174. if(*l == r){
  175. *l = r->aux;
  176. if(*l == nil)
  177. c->ewq = l;
  178. respond(r, "flushed");
  179. return;
  180. }
  181. }
  182. }
  183. void
  184. dataread(Req *r, Client *c)
  185. {
  186. queuerdreq(c, r);
  187. matchmsgs(c);
  188. }
  189. static void
  190. readthread(void *a)
  191. {
  192. uchar *buf;
  193. int n;
  194. Client *c;
  195. Ioproc *io;
  196. Msg *m;
  197. char tmp[32];
  198. c = a;
  199. snprint(tmp, sizeof tmp, "read%d", c->num);
  200. threadsetname(tmp);
  201. buf = emalloc(8192);
  202. io = c->readerproc;
  203. while((n = ioread(io, c->fd[0], buf, 8192)) >= 0){
  204. m = emalloc(sizeof(Msg)+n);
  205. m->rp = (uchar*)&m[1];
  206. m->ep = m->rp + n;
  207. if(n)
  208. memmove(m->rp, buf, n);
  209. queuemsg(c, m);
  210. matchmsgs(c);
  211. }
  212. queuemsg(c, Zmsg);
  213. free(buf);
  214. die(c);
  215. }
  216. static void
  217. kickwriter(Client *c)
  218. {
  219. nbsendp(c->writerkick, nil);
  220. }
  221. void
  222. clientflush(Req *or, Client *c)
  223. {
  224. if(or->ifcall.type == Tread)
  225. findrdreq(c, or);
  226. else{
  227. if(c->execreq == or){
  228. c->execreq = nil;
  229. iointerrupt(c->writerproc);
  230. }
  231. findwrreq(c, or);
  232. if(c->curw == or){
  233. c->curw = nil;
  234. iointerrupt(c->writerproc);
  235. kickwriter(c);
  236. }
  237. }
  238. }
  239. void
  240. datawrite(Req *r, Client *c)
  241. {
  242. queuewrreq(c, r);
  243. kickwriter(c);
  244. }
  245. static void
  246. writethread(void *a)
  247. {
  248. char e[ERRMAX];
  249. uchar *buf;
  250. int n;
  251. Ioproc *io;
  252. Req *r;
  253. Client *c;
  254. char tmp[32];
  255. c = a;
  256. snprint(tmp, sizeof tmp, "write%d", c->num);
  257. threadsetname(tmp);
  258. buf = emalloc(8192);
  259. io = c->writerproc;
  260. for(;;){
  261. while(c->wq == nil){
  262. if(c->moribund)
  263. goto Out;
  264. recvp(c->writerkick);
  265. if(c->moribund)
  266. goto Out;
  267. }
  268. r = c->wq;
  269. c->wq = r->aux;
  270. c->curw = r;
  271. n = iowrite(io, c->fd[1], r->ifcall.data, r->ifcall.count);
  272. if(chatty9p)
  273. fprint(2, "io->write returns %d\n", n);
  274. if(n >= 0){
  275. r->ofcall.count = n;
  276. respond(r, nil);
  277. }else{
  278. rerrstr(e, sizeof e);
  279. respond(r, e);
  280. }
  281. }
  282. Out:
  283. free(buf);
  284. die(c);
  285. }
  286. static void
  287. execproc(void *a)
  288. {
  289. int i, fd;
  290. Client *c;
  291. char tmp[32];
  292. c = a;
  293. snprint(tmp, sizeof tmp, "execproc%d", c->num);
  294. threadsetname(tmp);
  295. if(pipe(c->fd) < 0){
  296. rerrstr(c->err, sizeof c->err);
  297. sendul(c->execpid, -1);
  298. return;
  299. }
  300. rfork(RFFDG);
  301. fd = c->fd[1];
  302. close(c->fd[0]);
  303. dup(fd, 0);
  304. dup(fd, 1);
  305. for(i=3; i<100; i++) /* should do better */
  306. close(i);
  307. strcpy(c->err, "exec failed");
  308. procexecl(c->execpid, "/bin/rc", "rc", "-c", c->cmd, nil);
  309. }
  310. static void
  311. execthread(void *a)
  312. {
  313. Client *c;
  314. int p;
  315. char tmp[32];
  316. c = a;
  317. snprint(tmp, sizeof tmp, "exec%d", c->num);
  318. threadsetname(tmp);
  319. c->execpid = chancreate(sizeof(ulong), 0);
  320. proccreate(execproc, c, STACK);
  321. p = recvul(c->execpid);
  322. chanfree(c->execpid);
  323. c->execpid = nil;
  324. close(c->fd[1]);
  325. c->fd[1] = c->fd[0];
  326. if(p != -1){
  327. c->pid = p;
  328. c->activethread = 2;
  329. threadcreate(readthread, c, STACK);
  330. threadcreate(writethread, c, STACK);
  331. if(c->execreq)
  332. respond(c->execreq, nil);
  333. }else{
  334. if(c->execreq)
  335. respond(c->execreq, c->err);
  336. }
  337. }
  338. void
  339. ctlwrite(Req *r, Client *c)
  340. {
  341. char *f[3], *s, *p;
  342. int nf;
  343. s = emalloc(r->ifcall.count+1);
  344. memmove(s, r->ifcall.data, r->ifcall.count);
  345. s[r->ifcall.count] = '\0';
  346. f[0] = s;
  347. p = strchr(s, ' ');
  348. if(p == nil)
  349. nf = 1;
  350. else{
  351. *p++ = '\0';
  352. f[1] = p;
  353. nf = 2;
  354. }
  355. if(f[0][0] == '\0'){
  356. free(s);
  357. respond(r, nil);
  358. return;
  359. }
  360. r->ofcall.count = r->ifcall.count;
  361. if(strcmp(f[0], "hangup") == 0){
  362. if(c->pid == 0){
  363. respond(r, "connection already hung up");
  364. goto Out;
  365. }
  366. postnote(PNPROC, c->pid, "kill");
  367. respond(r, nil);
  368. goto Out;
  369. }
  370. if(strcmp(f[0], "connect") == 0){
  371. if(c->cmd != nocmd){
  372. respond(r, "already have connection");
  373. goto Out;
  374. }
  375. if(nf == 1){
  376. respond(r, "need argument to connect");
  377. goto Out;
  378. }
  379. c->status = Exec;
  380. if(p = strrchr(f[1], '!'))
  381. *p = '\0';
  382. c->cmd = emalloc(4+1+strlen(f[1])+1);
  383. strcpy(c->cmd, "exec ");
  384. strcat(c->cmd, f[1]);
  385. c->execreq = r;
  386. threadcreate(execthread, c, STACK);
  387. goto Out;
  388. }
  389. respond(r, "bad or inappropriate control message");
  390. Out:
  391. free(s);
  392. }