venti.c 7.4 KB


  1. #ifdef PLAN9PORT
  2. #include <u.h>
  3. #include <signal.h>
  4. #endif
  5. #include "stdinc.h"
  6. #include <bio.h>
  7. #include "dat.h"
  8. #include "fns.h"
  9. #include "whack.h"
  10. int debug;
  11. int nofork;
  12. int mainstacksize = 256*1024;
  13. VtSrv *ventisrv;
  14. static void ventiserver(void*);
  15. static ulong
  16. freemem(void)
  17. {
  18. int nf, pgsize = 0;
  19. uvlong size, userpgs = 0, userused = 0;
  20. char *ln, *sl;
  21. char *fields[2];
  22. Biobuf *bp;
  23. size = 64*1024*1024;
  24. bp = Bopen("#c/swap", OREAD);
  25. if (bp != nil) {
  26. while ((ln = Brdline(bp, '\n')) != nil) {
  27. ln[Blinelen(bp)-1] = '\0';
  28. nf = tokenize(ln, fields, nelem(fields));
  29. if (nf != 2)
  30. continue;
  31. if (strcmp(fields[1], "pagesize") == 0)
  32. pgsize = atoi(fields[0]);
  33. else if (strcmp(fields[1], "user") == 0) {
  34. sl = strchr(fields[0], '/');
  35. if (sl == nil)
  36. continue;
  37. userpgs = atoll(sl+1);
  38. userused = atoll(fields[0]);
  39. }
  40. }
  41. Bterm(bp);
  42. if (pgsize > 0 && userpgs > 0)
  43. size = (userpgs - userused) * pgsize;
  44. }
  45. /* cap it to keep the size within 32 bits */
  46. if (size >= 3840UL * 1024 * 1024)
  47. size = 3840UL * 1024 * 1024;
  48. return size;
  49. }
  50. void
  51. usage(void)
  52. {
  53. fprint(2, "usage: venti [-Ldrsw] [-a ventiaddr] [-c config] "
  54. "[-h httpaddr] [-m %%] [-B blockcachesize] [-C cachesize] [-I icachesize] "
  55. "[-W webroot]\n");
  56. threadexitsall("usage");
  57. }
  58. void
  59. threadmain(int argc, char *argv[])
  60. {
  61. char *configfile, *haddr, *vaddr, *webroot;
  62. u32int mem, icmem, bcmem, minbcmem, mempcnt, stfree, aftblmfree, avail;
  63. vlong blmsize;
  64. Config config;
  65. traceinit();
  66. threadsetname("main");
  67. mempcnt = 0;
  68. vaddr = nil;
  69. haddr = nil;
  70. configfile = nil;
  71. webroot = nil;
  72. mem = 0;
  73. icmem = 0;
  74. bcmem = 0;
  75. stfree = 0;
  76. ARGBEGIN{
  77. case 'a':
  78. vaddr = EARGF(usage());
  79. break;
  80. case 'B':
  81. bcmem = unittoull(EARGF(usage()));
  82. break;
  83. case 'c':
  84. configfile = EARGF(usage());
  85. break;
  86. case 'C':
  87. mem = unittoull(EARGF(usage()));
  88. break;
  89. case 'D':
  90. settrace(EARGF(usage()));
  91. break;
  92. case 'd':
  93. debug = 1;
  94. nofork = 1;
  95. break;
  96. case 'h':
  97. haddr = EARGF(usage());
  98. break;
  99. case 'm':
  100. mempcnt = atoi(EARGF(usage()));
  101. if (mempcnt <= 0 || mempcnt >= 100)
  102. usage();
  103. break;
  104. case 'I':
  105. icmem = unittoull(EARGF(usage()));
  106. break;
  107. case 'L':
  108. ventilogging = 1;
  109. break;
  110. case 'r':
  111. readonly = 1;
  112. break;
  113. case 's':
  114. nofork = 1;
  115. break;
  116. case 'w': /* compatibility with old venti */
  117. queuewrites = 1;
  118. break;
  119. case 'W':
  120. webroot = EARGF(usage());
  121. break;
  122. default:
  123. usage();
  124. }ARGEND
  125. if(argc)
  126. usage();
  127. if(!nofork)
  128. rfork(RFNOTEG);
  129. #ifdef PLAN9PORT
  130. {
  131. /* sigh - needed to avoid signals when writing to hungup networks */
  132. struct sigaction sa;
  133. memset(&sa, 0, sizeof sa);
  134. sa.sa_handler = SIG_IGN;
  135. sigaction(SIGPIPE, &sa, nil);
  136. }
  137. #endif
  138. ventifmtinstall();
  139. trace(TraceQuiet, "venti started");
  140. fprint(2, "%T venti: ");
  141. if(configfile == nil)
  142. configfile = "venti.conf";
  143. /* automatic memory sizing? */
  144. if(mempcnt > 0)
  145. stfree = freemem();
  146. fprint(2, "conf...");
  147. if(initventi(configfile, &config) < 0)
  148. sysfatal("can't init server: %r");
  149. if(mainindex->bloom && loadbloom(mainindex->bloom) < 0)
  150. sysfatal("can't load bloom filter: %r");
  151. /* automatic memory sizing per venti(8) guidelines? */
  152. if(mempcnt > 0) {
  153. mem = bcmem = icmem = 0;
  154. aftblmfree = freemem();
  155. blmsize = stfree - aftblmfree;
  156. if (blmsize <= 0)
  157. blmsize = 0;
  158. avail = ((vlong)stfree * mempcnt) / 100 - blmsize;
  159. if (avail <= (1 + 2 + 6) * 1024 * 1024)
  160. fprint(2, "bloom filter bigger than mem pcnt; "
  161. "resorting to minimum values (9MB total)\n");
  162. else {
  163. if (avail >= 3840UL * 1024 * 1024)
  164. avail = 3840UL * 1024 * 1024; /* sanity */
  165. avail /= 2;
  166. icmem = avail;
  167. avail /= 3;
  168. mem = avail;
  169. bcmem = 2 * avail;
  170. }
  171. if (icmem < 6 * 1024 * 1024)
  172. icmem = 6 * 1024 * 1024;
  173. if (mem < 1 * 1024 * 1024) /* lumps */
  174. mem = 1 * 1024 * 1024;
  175. if (bcmem < 2 * 1024 * 1024)
  176. bcmem = 2 * 1024 * 1024;
  177. config.mem = mem;
  178. config.bcmem = bcmem;
  179. config.icmem = icmem;
  180. }
  181. if(mem == 0)
  182. mem = config.mem;
  183. if(bcmem == 0)
  184. bcmem = config.bcmem;
  185. if(icmem == 0)
  186. icmem = config.icmem;
  187. // fprint(2, "mem %d bcmem %d icmem %d...", mem, bcmem, icmem);
  188. if(haddr == nil)
  189. haddr = config.haddr;
  190. if(vaddr == nil)
  191. vaddr = config.vaddr;
  192. if(vaddr == nil)
  193. vaddr = "tcp!*!venti";
  194. if(webroot == nil)
  195. webroot = config.webroot;
  196. if(queuewrites == 0)
  197. queuewrites = config.queuewrites;
  198. if(haddr){
  199. fprint(2, "httpd %s...", haddr);
  200. if(httpdinit(haddr, webroot) < 0)
  201. fprint(2, "warning: can't start http server: %r");
  202. }
  203. fprint(2, "init...");
  204. if(mem == 0xffffffffUL)
  205. mem = 1 * 1024 * 1024;
  206. if(0) fprint(2, "initialize %d bytes of lump cache for %d lumps\n",
  207. mem, mem / (8 * 1024));
  208. initlumpcache(mem, mem / (8 * 1024));
  209. initicache(icmem);
  210. initicachewrite();
  211. /*
  212. * need a block for every arena and every process
  213. */
  214. minbcmem = maxblocksize *
  215. (mainindex->narenas + mainindex->nsects*4 + 16);
  216. if(bcmem < minbcmem)
  217. bcmem = minbcmem;
  218. if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
  219. initdcache(bcmem);
  220. if(mainindex->bloom)
  221. startbloomproc(mainindex->bloom);
  222. fprint(2, "sync...");
  223. if(!readonly && syncindex(mainindex) < 0)
  224. sysfatal("can't sync server: %r");
  225. if(!readonly && queuewrites){
  226. fprint(2, "queue...");
  227. if(initlumpqueues(mainindex->nsects) < 0){
  228. fprint(2, "can't initialize lump queues,"
  229. " disabling write queueing: %r");
  230. queuewrites = 0;
  231. }
  232. }
  233. if(initarenasum() < 0)
  234. fprint(2, "warning: can't initialize arena summing process: %r");
  235. fprint(2, "announce %s...", vaddr);
  236. ventisrv = vtlisten(vaddr);
  237. if(ventisrv == nil)
  238. sysfatal("can't announce %s: %r", vaddr);
  239. fprint(2, "serving.\n");
  240. if(nofork)
  241. ventiserver(nil);
  242. else
  243. vtproc(ventiserver, nil);
  244. threadexits(nil);
  245. }
  246. static void
  247. vtrerror(VtReq *r, char *error)
  248. {
  249. r->rx.msgtype = VtRerror;
  250. r->rx.error = estrdup(error);
  251. }
  252. static void
  253. ventiserver(void *v)
  254. {
  255. Packet *p;
  256. VtReq *r;
  257. char err[ERRMAX];
  258. uint ms;
  259. int cached, ok;
  260. USED(v);
  261. threadsetname("ventiserver");
  262. trace(TraceWork, "start");
  263. while((r = vtgetreq(ventisrv)) != nil){
  264. trace(TraceWork, "finish");
  265. trace(TraceWork, "start request %F", &r->tx);
  266. trace(TraceRpc, "<- %F", &r->tx);
  267. r->rx.msgtype = r->tx.msgtype+1;
  268. addstat(StatRpcTotal, 1);
  269. if(0) print("req (arenas[0]=%p sects[0]=%p) %F\n",
  270. mainindex->arenas[0], mainindex->sects[0], &r->tx);
  271. switch(r->tx.msgtype){
  272. default:
  273. vtrerror(r, "unknown request");
  274. break;
  275. case VtTread:
  276. ms = msec();
  277. r->rx.data = readlump(r->tx.score, r->tx.blocktype, r->tx.count, &cached);
  278. ms = msec() - ms;
  279. addstat2(StatRpcRead, 1, StatRpcReadTime, ms);
  280. if(r->rx.data == nil){
  281. addstat(StatRpcReadFail, 1);
  282. rerrstr(err, sizeof err);
  283. vtrerror(r, err);
  284. }else{
  285. addstat(StatRpcReadBytes, packetsize(r->rx.data));
  286. addstat(StatRpcReadOk, 1);
  287. if(cached)
  288. addstat2(StatRpcReadCached, 1, StatRpcReadCachedTime, ms);
  289. else
  290. addstat2(StatRpcReadUncached, 1, StatRpcReadUncachedTime, ms);
  291. }
  292. break;
  293. case VtTwrite:
  294. if(readonly){
  295. vtrerror(r, "read only");
  296. break;
  297. }
  298. p = r->tx.data;
  299. r->tx.data = nil;
  300. addstat(StatRpcWriteBytes, packetsize(p));
  301. ms = msec();
  302. ok = writelump(p, r->rx.score, r->tx.blocktype, 0, ms);
  303. ms = msec() - ms;
  304. addstat2(StatRpcWrite, 1, StatRpcWriteTime, ms);
  305. if(ok < 0){
  306. addstat(StatRpcWriteFail, 1);
  307. rerrstr(err, sizeof err);
  308. vtrerror(r, err);
  309. }
  310. break;
  311. case VtTsync:
  312. flushqueue();
  313. flushdcache();
  314. break;
  315. }
  316. trace(TraceRpc, "-> %F", &r->rx);
  317. vtrespond(r);
  318. trace(TraceWork, "start");
  319. }
  320. flushdcache();
  321. flushicache();
  322. threadexitsall(0);
  323. }