net.c 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. /* network i/o */
  2. #include "all.h"
  3. #include "io.h"
  4. #include <fcall.h> /* 9p2000 */
  5. #include <thread.h>
  6. enum {
  7. Maxfdata = 8192,
  8. Nqueue = 200, /* queue size (tunable) */
  9. Netclosed = 0, /* Connection state */
  10. Netopen,
  11. };
  12. /*
  13. * the kernel file server read packets directly from
  14. * its ethernet(s) and did all the protocol processing.
  15. * if the incoming packets were 9p (over il/ip), they
  16. * were queued for the server processes to operate upon.
  17. *
  18. * in user mode, we have one process per incoming connection
  19. * instead, and those processes get just the data, minus
  20. * tcp and ip headers, so they just see a stream of 9p messages,
  21. * which they then queue for the server processes.
  22. *
  23. * there used to be more queueing (in the kernel), with separate
  24. * processes for ethernet input, il input, 9p processing, il output
  25. * and ethernet output, and queues connecting them. we now let
  26. * the kernel's network queues, protocol stacks and processes do
  27. * much of this work.
  28. *
  29. * partly as a result of this, we can now process 9p messages
  30. * transported via tcp, exploit multiple x86 processors, and
  31. * were able to shed 70% of the file server's source, by line count.
  32. *
  33. * the upshot is that Ether (now Network) is no longer a perfect fit for
  34. * the way network i/o is done now. the notion of `connection'
  35. * is being introduced to complement it.
  36. */
  37. typedef struct Network Network;
  38. typedef struct Netconn Netconn;
  39. typedef struct Conn9p Conn9p;
  40. /* a network, not necessarily an ethernet */
  41. struct Network {
  42. int ctlrno;
  43. char iname[NAMELEN];
  44. char oname[NAMELEN];
  45. char *dialstr;
  46. char anndir[40];
  47. char lisdir[40];
  48. int annfd; /* fd from announce */
  49. };
  50. /* an open tcp (or other transport) connection */
  51. struct Netconn {
  52. Queue* reply; /* network output */
  53. char* raddr; /* remote caller's addr */
  54. Chan* chan; /* list of tcp channels */
  55. int alloc; /* flag: allocated */
  56. int state;
  57. Conn9p* conn9p; /* not reference-counted */
  58. Lock;
  59. };
  60. /*
  61. * incoming 9P network connection from a given machine.
  62. * typically will multiplex 9P sessions for multiple users.
  63. */
  64. struct Conn9p {
  65. QLock;
  66. Ref;
  67. int fd;
  68. char* dir;
  69. Netconn*netconn; /* cross-connection */
  70. char* raddr;
  71. };
  72. static Network netif[Maxnets];
  73. static struct {
  74. Lock;
  75. Chan* chan;
  76. } netchans;
  77. static Queue *netoq; /* only one network output queue is needed */
  78. char *annstrs[Maxnets] = {
  79. "tcp!*!9fs",
  80. };
  81. /* never returns nil */
  82. static Chan*
  83. getchan(Conn9p *conn9p)
  84. {
  85. Netconn *netconn;
  86. Chan *cp, *xcp;
  87. lock(&netchans);
  88. /* look for conn9p's Chan */
  89. xcp = nil;
  90. for(cp = netchans.chan; cp; cp = netconn->chan) {
  91. netconn = cp->pdata;
  92. if(!netconn->alloc)
  93. xcp = cp; /* remember free Chan */
  94. else if(netconn->raddr != nil &&
  95. strcmp(conn9p->raddr, netconn->raddr) == 0) {
  96. unlock(&netchans);
  97. return cp; /* found conn9p's Chan */
  98. }
  99. }
  100. /* conn9p's Chan not found; if no free Chan, allocate & fill in one */
  101. cp = xcp;
  102. if(cp == nil) {
  103. cp = fs_chaninit(Devnet, 1, sizeof(Netconn));
  104. netconn = cp->pdata;
  105. netconn->chan = netchans.chan;
  106. netconn->state = Netopen; /* a guess */
  107. /* cross-connect netconn and conn9p */
  108. netconn->conn9p = conn9p; /* not reference-counted */
  109. conn9p->netconn = netconn;
  110. netchans.chan = cp;
  111. }
  112. /* fill in Chan's netconn */
  113. netconn = cp->pdata;
  114. netconn->raddr = strdup(conn9p->raddr);
  115. /* fill in Chan */
  116. cp->send = serveq;
  117. if (cp->reply == nil)
  118. cp->reply = netoq;
  119. netconn->reply = netoq;
  120. cp->protocol = nil;
  121. cp->msize = 0;
  122. cp->whotime = 0;
  123. strncpy(cp->whochan, conn9p->raddr, sizeof cp->whochan);
  124. // cp->whoprint = tcpwhoprint;
  125. netconn->alloc = 1;
  126. unlock(&netchans);
  127. return cp;
  128. }
  129. static char *
  130. fd2name(int fd)
  131. {
  132. char data[128];
  133. if (fd2path(fd, data, sizeof data) < 0)
  134. return strdup("/GOK");
  135. return strdup(data);
  136. }
  137. static void
  138. hangupdfd(int dfd)
  139. {
  140. int ctlfd;
  141. char *end, *data;
  142. data = fd2name(dfd);
  143. close(dfd);
  144. end = strstr(data, "/data");
  145. if (end != nil)
  146. strcpy(end, "/ctl");
  147. ctlfd = open(data, OWRITE);
  148. if (ctlfd >= 0) {
  149. hangup(ctlfd);
  150. close(ctlfd);
  151. }
  152. free(data);
  153. }
  154. void
  155. closechan(int n)
  156. {
  157. Chan *cp;
  158. for(cp = chans; cp; cp = cp->next)
  159. if(cp->whotime != 0 && cp->chan == n)
  160. fileinit(cp);
  161. }
  162. void
  163. nethangup(Chan *cp, char *msg, int dolock)
  164. {
  165. Netconn *netconn;
  166. netconn = cp->pdata;
  167. netconn->state = Netclosed;
  168. if(msg != nil)
  169. print("hangup! %s %s\n", msg, netconn->raddr);
  170. fileinit(cp);
  171. cp->whotime = 0;
  172. strcpy(cp->whoname, "<none>");
  173. if(dolock)
  174. lock(&netchans);
  175. netconn->alloc = 0;
  176. free(netconn->raddr);
  177. netconn->raddr = nil;
  178. if(dolock)
  179. unlock(&netchans);
  180. }
  181. void
  182. chanhangup(Chan *cp, char *msg, int dolock)
  183. {
  184. Netconn *netconn = cp->pdata;
  185. Conn9p *conn9p = netconn->conn9p;
  186. if (conn9p->fd > 0)
  187. hangupdfd(conn9p->fd); /* drop it */
  188. nethangup(cp, msg, dolock);
  189. }
  190. /*
  191. * returns length of next 9p message (including the length) and
  192. * leaves it in the first few bytes of abuf.
  193. */
  194. static long
  195. size9pmsg(int fd, void *abuf, uint n)
  196. {
  197. int m;
  198. uchar *buf = abuf;
  199. if (n < BIT32SZ)
  200. return -1; /* caller screwed up */
  201. /* read count */
  202. m = readn(fd, buf, BIT32SZ);
  203. if(m != BIT32SZ){
  204. if(m < 0)
  205. return -1;
  206. return 0;
  207. }
  208. return GBIT32(buf);
  209. }
  210. static int
  211. readalloc9pmsg(int fd, Msgbuf **mbp)
  212. {
  213. int m, len;
  214. uchar lenbuf[BIT32SZ];
  215. Msgbuf *mb;
  216. *mbp = nil;
  217. len = size9pmsg(fd, lenbuf, BIT32SZ);
  218. if (len <= 0)
  219. return len;
  220. if(len <= BIT32SZ || len > IOHDRSZ+Maxfdata){
  221. werrstr("bad length in 9P2000 message header");
  222. return -1;
  223. }
  224. if ((mb = mballoc(len, nil, Mbeth1)) == nil)
  225. panic("readalloc9pmsg: mballoc failed");
  226. *mbp = mb;
  227. memmove(mb->data, lenbuf, BIT32SZ);
  228. len -= BIT32SZ;
  229. m = readn(fd, mb->data+BIT32SZ, len);
  230. if(m < len)
  231. return 0;
  232. return BIT32SZ+m;
  233. }
  234. static void
  235. connection(void *v)
  236. {
  237. int n;
  238. char buf[64];
  239. Chan *chan9p;
  240. Conn9p *conn9p = v;
  241. Msgbuf *mb;
  242. NetConnInfo *nci;
  243. incref(conn9p); /* count connections */
  244. nci = getnetconninfo(conn9p->dir, conn9p->fd);
  245. if (nci == nil)
  246. panic("connection: getnetconninfo(%s, %d) failed",
  247. conn9p->dir, conn9p->fd);
  248. conn9p->raddr = nci->raddr;
  249. chan9p = getchan(conn9p);
  250. print("new connection on %s pid %d from %s\n",
  251. conn9p->dir, getpid(), conn9p->raddr);
  252. /*
  253. * reading from a pipe or a network device
  254. * will give an error after a few eof reads.
  255. * however, we cannot tell the difference
  256. * between a zero-length read and an interrupt
  257. * on the processes writing to us,
  258. * so we wait for the error.
  259. */
  260. while (conn9p->fd > 0 && (n = readalloc9pmsg(conn9p->fd, &mb)) >= 0) {
  261. if(n == 0)
  262. continue;
  263. mb->param = (uintptr)conn9p; /* has fd for replies */
  264. mb->chan = chan9p;
  265. assert(mb->magic == Mbmagic);
  266. incref(conn9p); /* & count packets in flight */
  267. fs_send(serveq, mb); /* to 9P server processes */
  268. /* mb will be freed by receiving process */
  269. }
  270. rerrstr(buf, sizeof buf);
  271. qlock(conn9p);
  272. print("connection hung up from %s\n", conn9p->dir);
  273. if (conn9p->fd > 0) /* not poisoned yet? */
  274. hangupdfd(conn9p->fd); /* poison the fd */
  275. nethangup(chan9p, "remote hung up", 1);
  276. closechan(chan9p->chan);
  277. conn9p->fd = -1; /* poison conn9p */
  278. if (decref(conn9p) == 0) { /* last conn.? turn the lights off */
  279. free(conn9p->dir);
  280. qunlock(conn9p);
  281. free(conn9p);
  282. } else
  283. qunlock(conn9p);
  284. freenetconninfo(nci);
  285. if(buf[0] == '\0' || strstr(buf, "hungup") != nil)
  286. exits("");
  287. sysfatal("mount read, pid %d", getpid());
  288. }
  289. static void
  290. neti(void *v)
  291. {
  292. int lisfd, accfd;
  293. Network *net;
  294. Conn9p *conn9p;
  295. net = v;
  296. print("net%di\n", net->ctlrno);
  297. for(;;) {
  298. lisfd = listen(net->anndir, net->lisdir);
  299. if (lisfd < 0) {
  300. print("listen %s failed: %r\n", net->anndir);
  301. continue;
  302. }
  303. /* got new call on lisfd */
  304. accfd = accept(lisfd, net->lisdir);
  305. if (accfd < 0) {
  306. print("accept %d (from %s) failed: %r\n",
  307. lisfd, net->lisdir);
  308. continue;
  309. }
  310. /* accepted that call */
  311. conn9p = malloc(sizeof *conn9p);
  312. conn9p->dir = strdup(net->lisdir);
  313. conn9p->fd = accfd;
  314. newproc(connection, conn9p, smprint("9P read %s", conn9p->dir));
  315. close(lisfd);
  316. }
  317. }
  318. /* only need one of these for all network connections, thus all interfaces */
  319. static void
  320. neto(void *)
  321. {
  322. int len, datafd;
  323. Msgbuf *mb;
  324. Conn9p *conn9p;
  325. print("neto\n");
  326. for(;;) {
  327. /* receive 9P answer from 9P server processes */
  328. while((mb = fs_recv(netoq, 0)) == nil)
  329. continue;
  330. if(mb->data == nil) {
  331. print("neto: pkt nil cat=%d free=%d\n",
  332. mb->category, mb->flags&FREE);
  333. if(!(mb->flags & FREE))
  334. mbfree(mb);
  335. continue;
  336. }
  337. /* send answer back over the network connection in the reply */
  338. len = mb->count;
  339. conn9p = (Conn9p *)mb->param;
  340. assert(conn9p);
  341. qlock(conn9p);
  342. datafd = conn9p->fd;
  343. assert(len >= 0);
  344. /* datafd < 0 probably indicates poisoning by the read side */
  345. if (datafd < 0 || write(datafd, mb->data, len) != len) {
  346. print( "network write error (%r);");
  347. print(" closing connection for %s\n", conn9p->dir);
  348. nethangup(getchan(conn9p), "network write error", 1);
  349. if (datafd > 0)
  350. hangupdfd(datafd); /* drop it */
  351. conn9p->fd = -1; /* poison conn9p */
  352. }
  353. mbfree(mb);
  354. if (decref(conn9p) == 0)
  355. panic("neto: zero ref count");
  356. qunlock(conn9p);
  357. }
  358. }
  359. void
  360. netstart(void)
  361. {
  362. int netorun = 0;
  363. Network *net;
  364. if(netoq == nil)
  365. netoq = newqueue(Nqueue, "network reply");
  366. for(net = &netif[0]; net < &netif[Maxnets]; net++){
  367. if(net->dialstr == nil)
  368. continue;
  369. sprint(net->oname, "neto");
  370. if (netorun++ == 0)
  371. newproc(neto, nil, net->oname);
  372. sprint(net->iname, "net%di", net->ctlrno);
  373. newproc(neti, net, net->iname);
  374. }
  375. }
  376. void
  377. netinit(void)
  378. {
  379. Network *net;
  380. for (net = netif; net < netif + Maxnets; net++) {
  381. net->dialstr = annstrs[net - netif];
  382. if (net->dialstr == nil)
  383. continue;
  384. net->annfd = announce(net->dialstr, net->anndir);
  385. /* /bin/service/tcp564 may already have grabbed the port */
  386. if (net->annfd < 0)
  387. sysfatal("can't announce %s: %r", net->dialstr);
  388. print("netinit: announced on %s\n", net->dialstr);
  389. }
  390. }