venti.c 5.5 KB


  1. #ifdef PLAN9PORT
  2. #include <u.h>
  3. #include <signal.h>
  4. #endif
  5. #include "stdinc.h"
  6. #include "dat.h"
  7. #include "fns.h"
  8. #include "whack.h"
  9. int debug;
  10. int nofork;
  11. int mainstacksize = 256*1024;
  12. VtSrv *ventisrv;
  13. static void ventiserver(void*);
  14. void
  15. usage(void)
  16. {
  17. fprint(2, "usage: venti [-Ldrsw] [-a ventiaddr] [-c config] "
  18. "[-h httpaddr] [-B blockcachesize] [-C cachesize] [-I icachesize] [-W webroot]\n");
  19. threadexitsall("usage");
  20. }
  21. void
  22. threadmain(int argc, char *argv[])
  23. {
  24. char *configfile, *haddr, *vaddr, *webroot;
  25. u32int mem, icmem, bcmem, minbcmem;
  26. Config config;
  27. traceinit();
  28. threadsetname("main");
  29. vaddr = nil;
  30. haddr = nil;
  31. configfile = nil;
  32. webroot = nil;
  33. mem = 0;
  34. icmem = 0;
  35. bcmem = 0;
  36. ARGBEGIN{
  37. case 'a':
  38. vaddr = EARGF(usage());
  39. break;
  40. case 'B':
  41. bcmem = unittoull(EARGF(usage()));
  42. break;
  43. case 'c':
  44. configfile = EARGF(usage());
  45. break;
  46. case 'C':
  47. mem = unittoull(EARGF(usage()));
  48. break;
  49. case 'D':
  50. settrace(EARGF(usage()));
  51. break;
  52. case 'd':
  53. debug = 1;
  54. nofork = 1;
  55. break;
  56. case 'h':
  57. haddr = EARGF(usage());
  58. break;
  59. case 'I':
  60. icmem = unittoull(EARGF(usage()));
  61. break;
  62. case 'L':
  63. ventilogging = 1;
  64. break;
  65. case 'r':
  66. readonly = 1;
  67. break;
  68. case 's':
  69. nofork = 1;
  70. break;
  71. case 'w': /* compatibility with old venti */
  72. queuewrites = 1;
  73. break;
  74. case 'W':
  75. webroot = EARGF(usage());
  76. break;
  77. default:
  78. usage();
  79. }ARGEND
  80. if(argc)
  81. usage();
  82. if(!nofork)
  83. rfork(RFNOTEG);
  84. #ifdef PLAN9PORT
  85. {
  86. /* sigh - needed to avoid signals when writing to hungup networks */
  87. struct sigaction sa;
  88. memset(&sa, 0, sizeof sa);
  89. sa.sa_handler = SIG_IGN;
  90. sigaction(SIGPIPE, &sa, nil);
  91. }
  92. #endif
  93. ventifmtinstall();
  94. trace(TraceQuiet, "venti started");
  95. fprint(2, "%T venti: ");
  96. if(configfile == nil)
  97. configfile = "venti.conf";
  98. fprint(2, "conf...");
  99. if(initventi(configfile, &config) < 0)
  100. sysfatal("can't init server: %r");
  101. if(mainindex->bloom && loadbloom(mainindex->bloom) < 0)
  102. sysfatal("can't load bloom filter: %r");
  103. if(mem == 0)
  104. mem = config.mem;
  105. if(bcmem == 0)
  106. bcmem = config.bcmem;
  107. if(icmem == 0)
  108. icmem = config.icmem;
  109. if(haddr == nil)
  110. haddr = config.haddr;
  111. if(vaddr == nil)
  112. vaddr = config.vaddr;
  113. if(vaddr == nil)
  114. vaddr = "tcp!*!venti";
  115. if(webroot == nil)
  116. webroot = config.webroot;
  117. if(queuewrites == 0)
  118. queuewrites = config.queuewrites;
  119. if(haddr){
  120. fprint(2, "httpd %s...", haddr);
  121. if(httpdinit(haddr, webroot) < 0)
  122. fprint(2, "warning: can't start http server: %r");
  123. }
  124. fprint(2, "init...");
  125. if(mem == 0xffffffffUL)
  126. mem = 1 * 1024 * 1024;
  127. if(0) fprint(2, "initialize %d bytes of lump cache for %d lumps\n",
  128. mem, mem / (8 * 1024));
  129. initlumpcache(mem, mem / (8 * 1024));
  130. initicache(icmem);
  131. initicachewrite();
  132. /*
  133. * need a block for every arena and every process
  134. */
  135. minbcmem = maxblocksize *
  136. (mainindex->narenas + mainindex->nsects*4 + 16);
  137. if(bcmem < minbcmem)
  138. bcmem = minbcmem;
  139. if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
  140. initdcache(bcmem);
  141. if(mainindex->bloom)
  142. startbloomproc(mainindex->bloom);
  143. fprint(2, "sync...");
  144. if(!readonly && syncindex(mainindex) < 0)
  145. sysfatal("can't sync server: %r");
  146. if(!readonly && queuewrites){
  147. fprint(2, "queue...");
  148. if(initlumpqueues(mainindex->nsects) < 0){
  149. fprint(2, "can't initialize lump queues,"
  150. " disabling write queueing: %r");
  151. queuewrites = 0;
  152. }
  153. }
  154. if(initarenasum() < 0)
  155. fprint(2, "warning: can't initialize arena summing process: %r");
  156. fprint(2, "announce %s...", vaddr);
  157. ventisrv = vtlisten(vaddr);
  158. if(ventisrv == nil)
  159. sysfatal("can't announce %s: %r", vaddr);
  160. fprint(2, "serving.\n");
  161. if(nofork)
  162. ventiserver(nil);
  163. else
  164. vtproc(ventiserver, nil);
  165. }
  166. static void
  167. vtrerror(VtReq *r, char *error)
  168. {
  169. r->rx.msgtype = VtRerror;
  170. r->rx.error = estrdup(error);
  171. }
  172. static void
  173. ventiserver(void *v)
  174. {
  175. Packet *p;
  176. VtReq *r;
  177. char err[ERRMAX];
  178. uint ms;
  179. int cached, ok;
  180. USED(v);
  181. threadsetname("ventiserver");
  182. trace(TraceWork, "start");
  183. while((r = vtgetreq(ventisrv)) != nil){
  184. trace(TraceWork, "finish");
  185. trace(TraceWork, "start request %F", &r->tx);
  186. trace(TraceRpc, "<- %F", &r->tx);
  187. r->rx.msgtype = r->tx.msgtype+1;
  188. addstat(StatRpcTotal, 1);
  189. if(0) print("req (arenas[0]=%p sects[0]=%p) %F\n",
  190. mainindex->arenas[0], mainindex->sects[0], &r->tx);
  191. switch(r->tx.msgtype){
  192. default:
  193. vtrerror(r, "unknown request");
  194. break;
  195. case VtTread:
  196. ms = msec();
  197. r->rx.data = readlump(r->tx.score, r->tx.blocktype, r->tx.count, &cached);
  198. ms = msec() - ms;
  199. addstat2(StatRpcRead, 1, StatRpcReadTime, ms);
  200. if(r->rx.data == nil){
  201. addstat(StatRpcReadFail, 1);
  202. rerrstr(err, sizeof err);
  203. vtrerror(r, err);
  204. }else{
  205. addstat(StatRpcReadBytes, packetsize(r->rx.data));
  206. addstat(StatRpcReadOk, 1);
  207. if(cached)
  208. addstat2(StatRpcReadCached, 1, StatRpcReadCachedTime, ms);
  209. else
  210. addstat2(StatRpcReadUncached, 1, StatRpcReadUncachedTime, ms);
  211. }
  212. break;
  213. case VtTwrite:
  214. if(readonly){
  215. vtrerror(r, "read only");
  216. break;
  217. }
  218. p = r->tx.data;
  219. r->tx.data = nil;
  220. addstat(StatRpcWriteBytes, packetsize(p));
  221. ms = msec();
  222. ok = writelump(p, r->rx.score, r->tx.blocktype, 0, ms);
  223. ms = msec() - ms;
  224. addstat2(StatRpcWrite, 1, StatRpcWriteTime, ms);
  225. if(ok < 0){
  226. addstat(StatRpcWriteFail, 1);
  227. rerrstr(err, sizeof err);
  228. vtrerror(r, err);
  229. }
  230. break;
  231. case VtTsync:
  232. flushqueue();
  233. flushdcache();
  234. break;
  235. }
  236. trace(TraceRpc, "-> %F", &r->rx);
  237. vtrespond(r);
  238. trace(TraceWork, "start");
  239. }
  240. flushdcache();
  241. flushicache();
  242. threadexitsall(0);
  243. }