curvecpmessage.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654
  1. #include <sys/types.h>
  2. #include <sys/wait.h>
  3. #include <unistd.h>
  4. #include <signal.h>
  5. #include <poll.h>
  6. #include "open.h"
  7. #include "blocking.h"
  8. #include "e.h"
  9. #include "die.h"
  10. #include "randommod.h"
  11. #include "byte.h"
  12. #include "crypto_uint32.h"
  13. #include "uint16_pack.h"
  14. #include "uint32_pack.h"
  15. #include "uint64_pack.h"
  16. #include "uint16_unpack.h"
  17. #include "uint32_unpack.h"
  18. #include "uint64_unpack.h"
  19. #include "nanoseconds.h"
  20. #include "writeall.h"
  21. int flagverbose = 1;
  22. int flagserver = 1;
  23. int wantping = 0; /* 1: ping after a second; 2: ping immediately */
  24. #define USAGE "\
  25. curvecpmessage: how to use:\n\
  26. curvecpmessage: -q (optional): no error messages\n\
  27. curvecpmessage: -Q (optional): print error messages (default)\n\
  28. curvecpmessage: -v (optional): print extra information\n\
  29. curvecpmessage: -c (optional): program is a client; server starts first\n\
  30. curvecpmessage: -C (optional): program is a client that starts first\n\
  31. curvecpmessage: -s (optional): program is a server (default)\n\
  32. curvecpmessage: prog: run this program\n\
  33. "
  34. void die_usage(const char *s)
  35. {
  36. if (s) die_4(100,USAGE,"curvecpmessage: fatal: ",s,"\n");
  37. die_1(100,USAGE);
  38. }
  39. void die_fatal(const char *trouble,const char *d,const char *fn)
  40. {
  41. if (!flagverbose) die_0(111);
  42. if (d) {
  43. if (fn) die_9(111,"curvecpmessage: fatal: ",trouble," ",d,"/",fn,": ",e_str(errno),"\n");
  44. die_7(111,"curvecpmessage: fatal: ",trouble," ",d,": ",e_str(errno),"\n");
  45. }
  46. if (errno) die_5(111,"curvecpmessage: fatal: ",trouble,": ",e_str(errno),"\n");
  47. die_3(111,"curvecpmessage: fatal: ",trouble,"\n");
  48. }
  49. void die_badmessage(void)
  50. {
  51. errno = EPROTO;
  52. die_fatal("unable to read from file descriptor 8",0,0);
  53. }
  54. void die_internalerror(void)
  55. {
  56. errno = EPROTO;
  57. die_fatal("internal error",0,0);
  58. }
  59. int tochild[2] = {-1,-1};
  60. int fromchild[2] = {-1,-1};
  61. pid_t child = -1;
  62. int childstatus;
  63. struct pollfd p[3];
  64. long long sendacked = 0; /* number of initial bytes sent and fully acknowledged */
  65. long long sendbytes = 0; /* number of additional bytes to send */
  66. unsigned char sendbuf[131072]; /* circular queue with the additional bytes; size must be power of 2 */
  67. long long sendprocessed = 0; /* within sendbytes, number of bytes absorbed into blocks */
  68. crypto_uint16 sendeof = 0; /* 2048 for normal eof after sendbytes, 4096 for error after sendbytes */
  69. int sendeofprocessed = 0;
  70. int sendeofacked = 0;
  71. long long totalblocktransmissions = 0;
  72. long long totalblocks = 0;
  73. #define OUTGOING 128 /* must be power of 2 */
  74. long long blocknum = 0; /* number of outgoing blocks being tracked */
  75. long long blockfirst = 0; /* circular queue */
  76. long long blockpos[OUTGOING]; /* position of block's first byte within stream */
  77. long long blocklen[OUTGOING]; /* number of bytes in this block */
  78. crypto_uint16 blockeof[OUTGOING]; /* 0, 2048, 4096 */
  79. long long blocktransmissions[OUTGOING];
  80. long long blocktime[OUTGOING]; /* time of last message sending this block; 0 means acked */
  81. long long earliestblocktime = 0; /* if nonzero, minimum of active blocktime values */
  82. crypto_uint32 blockid[OUTGOING]; /* ID of last message sending this block */
  83. #define INCOMING 64 /* must be power of 2 */
  84. long long messagenum = 0; /* number of messages in incoming queue */
  85. long long messagefirst = 0; /* position of first message; circular queue */
  86. unsigned char messagelen[INCOMING]; /* times 16 */
  87. unsigned char message[INCOMING][1088];
  88. unsigned char messagetodo[2048];
  89. long long messagetodolen = 0;
  90. long long receivebytes = 0; /* number of initial bytes fully received */
  91. long long receivewritten = 0; /* within receivebytes, number of bytes given to child */
  92. crypto_uint16 receiveeof = 0; /* 0, 2048, 4096 */
  93. long long receivetotalbytes = 0; /* total number of bytes in stream, if receiveeof */
  94. unsigned char receivebuf[131072]; /* circular queue beyond receivewritten; size must be power of 2 */
  95. unsigned char receivevalid[131072]; /* 1 for byte successfully received; XXX: use buddy structure to speed this up */
  96. long long maxblocklen = 512;
  97. crypto_uint32 nextmessageid = 1;
  98. unsigned char buf[4096];
  99. long long lastblocktime = 0;
  100. long long nsecperblock = 1000000000;
  101. long long lastspeedadjustment = 0;
  102. long long lastedge = 0;
  103. long long lastdoubling = 0;
  104. long long rtt;
  105. long long rtt_delta;
  106. long long rtt_average = 0;
  107. long long rtt_deviation = 0;
  108. long long rtt_lowwater = 0;
  109. long long rtt_highwater = 0;
  110. long long rtt_timeout = 1000000000;
  111. long long rtt_seenrecenthigh = 0;
  112. long long rtt_seenrecentlow = 0;
  113. long long rtt_seenolderhigh = 0;
  114. long long rtt_seenolderlow = 0;
  115. long long rtt_phase = 0;
  116. long long lastpanic = 0;
  117. void earliestblocktime_compute(void) /* XXX: use priority queue */
  118. {
  119. long long i;
  120. long long pos;
  121. earliestblocktime = 0;
  122. for (i = 0;i < blocknum;++i) {
  123. pos = (blockfirst + i) & (OUTGOING - 1);
  124. if (blocktime[pos]) {
  125. if (!earliestblocktime)
  126. earliestblocktime = blocktime[pos];
  127. else
  128. if (blocktime[pos] < earliestblocktime)
  129. earliestblocktime = blocktime[pos];
  130. }
  131. }
  132. }
  133. void acknowledged(unsigned long long start,unsigned long long stop)
  134. {
  135. long long i;
  136. long long pos;
  137. if (stop == start) return;
  138. for (i = 0;i < blocknum;++i) {
  139. pos = (blockfirst + i) & (OUTGOING - 1);
  140. if (blockpos[pos] >= start && blockpos[pos] + blocklen[pos] <= stop) {
  141. blocktime[pos] = 0;
  142. totalblocktransmissions += blocktransmissions[pos];
  143. totalblocks += 1;
  144. }
  145. }
  146. while (blocknum) {
  147. pos = blockfirst & (OUTGOING - 1);
  148. if (blocktime[pos]) break;
  149. sendacked += blocklen[pos];
  150. sendbytes -= blocklen[pos];
  151. sendprocessed -= blocklen[pos];
  152. ++blockfirst;
  153. --blocknum;
  154. }
  155. if (sendeof)
  156. if (start == 0)
  157. if (stop > sendacked + sendbytes)
  158. if (!sendeofacked) {
  159. sendeofacked = 1;
  160. }
  161. earliestblocktime_compute();
  162. }
  163. int main(int argc,char **argv)
  164. {
  165. long long pos;
  166. long long len;
  167. long long u;
  168. long long r;
  169. long long i;
  170. long long k;
  171. long long recent;
  172. long long nextaction;
  173. long long timeout;
  174. struct pollfd *q;
  175. struct pollfd *watch8;
  176. struct pollfd *watchtochild;
  177. struct pollfd *watchfromchild;
  178. signal(SIGPIPE,SIG_IGN);
  179. if (!argv[0]) die_usage(0);
  180. for (;;) {
  181. char *x;
  182. if (!argv[1]) break;
  183. if (argv[1][0] != '-') break;
  184. x = *++argv;
  185. if (x[0] == '-' && x[1] == 0) break;
  186. if (x[0] == '-' && x[1] == '-' && x[2] == 0) break;
  187. while (*++x) {
  188. if (*x == 'q') { flagverbose = 0; continue; }
  189. if (*x == 'Q') { flagverbose = 1; continue; }
  190. if (*x == 'v') { if (flagverbose == 2) flagverbose = 3; else flagverbose = 2; continue; }
  191. if (*x == 'c') { flagserver = 0; wantping = 2; continue; }
  192. if (*x == 'C') { flagserver = 0; wantping = 1; continue; }
  193. if (*x == 's') { flagserver = 1; wantping = 0; continue; }
  194. die_usage(0);
  195. }
  196. }
  197. if (!*++argv) die_usage("missing prog");
  198. for (;;) {
  199. r = open_read("/dev/null");
  200. if (r == -1) die_fatal("unable to open /dev/null",0,0);
  201. if (r > 9) { close(r); break; }
  202. }
  203. if (open_pipe(tochild) == -1) die_fatal("unable to create pipe",0,0);
  204. if (open_pipe(fromchild) == -1) die_fatal("unable to create pipe",0,0);
  205. blocking_enable(tochild[0]);
  206. blocking_enable(fromchild[1]);
  207. child = fork();
  208. if (child == -1) die_fatal("unable to fork",0,0);
  209. if (child == 0) {
  210. close(8);
  211. close(9);
  212. if (flagserver) {
  213. close(0);
  214. if (dup(tochild[0]) != 0) die_fatal("unable to dup",0,0);
  215. close(1);
  216. if (dup(fromchild[1]) != 1) die_fatal("unable to dup",0,0);
  217. } else {
  218. close(6);
  219. if (dup(tochild[0]) != 6) die_fatal("unable to dup",0,0);
  220. close(7);
  221. if (dup(fromchild[1]) != 7) die_fatal("unable to dup",0,0);
  222. }
  223. signal(SIGPIPE,SIG_DFL);
  224. execvp(*argv,argv);
  225. die_fatal("unable to run",*argv,0);
  226. }
  227. close(tochild[0]);
  228. close(fromchild[1]);
  229. recent = nanoseconds();
  230. lastspeedadjustment = recent;
  231. if (flagserver) maxblocklen = 1024;
  232. for (;;) {
  233. if (sendeofacked)
  234. if (receivewritten == receivetotalbytes)
  235. if (receiveeof)
  236. if (tochild[1] < 0)
  237. break; /* XXX: to re-ack should enter a TIME-WAIT state here */
  238. q = p;
  239. watch8 = q;
  240. if (watch8) { q->fd = 8; q->events = POLLIN; ++q; }
  241. watchtochild = q;
  242. if (tochild[1] < 0) watchtochild = 0;
  243. if (receivewritten >= receivebytes) watchtochild = 0;
  244. if (watchtochild) { q->fd = tochild[1]; q->events = POLLOUT; ++q; }
  245. watchfromchild = q;
  246. if (sendeof) watchfromchild = 0;
  247. if (sendbytes + 4096 > sizeof sendbuf) watchfromchild = 0;
  248. if (watchfromchild) { q->fd = fromchild[0]; q->events = POLLIN; ++q; }
  249. nextaction = recent + 60000000000LL;
  250. if (wantping == 1) nextaction = recent + 1000000000;
  251. if (wantping == 2)
  252. if (nextaction > lastblocktime + nsecperblock) nextaction = lastblocktime + nsecperblock;
  253. if (blocknum < OUTGOING)
  254. if (!(sendeof ? sendeofprocessed : sendprocessed >= sendbytes))
  255. if (nextaction > lastblocktime + nsecperblock) nextaction = lastblocktime + nsecperblock;
  256. if (earliestblocktime)
  257. if (earliestblocktime + rtt_timeout > lastblocktime + nsecperblock)
  258. if (earliestblocktime + rtt_timeout < nextaction)
  259. nextaction = earliestblocktime + rtt_timeout;
  260. if (messagenum)
  261. if (!watchtochild)
  262. nextaction = 0;
  263. if (nextaction <= recent)
  264. timeout = 0;
  265. else
  266. timeout = (nextaction - recent) / 1000000 + 1;
  267. if (poll(p,q - p,timeout) < 0) {
  268. watch8 = 0;
  269. watchtochild = 0;
  270. watchfromchild = 0;
  271. } else {
  272. if (watch8) if (!watch8->revents) watch8 = 0;
  273. if (watchtochild) if (!watchtochild->revents) watchtochild = 0;
  274. if (watchfromchild) if (!watchfromchild->revents) watchfromchild = 0;
  275. }
  276. /* XXX: keepalives */
  277. do { /* try receiving data from child: */
  278. if (!watchfromchild) break;
  279. if (sendeof) break;
  280. if (sendbytes + 4096 > sizeof sendbuf) break;
  281. pos = (sendacked & (sizeof sendbuf - 1)) + sendbytes;
  282. if (pos < sizeof sendbuf) {
  283. r = read(fromchild[0],sendbuf + pos,sizeof sendbuf - pos);
  284. } else {
  285. r = read(fromchild[0],sendbuf + pos - sizeof sendbuf,sizeof sendbuf - sendbytes);
  286. }
  287. if (r == -1) if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) break;
  288. if (r < 0) { sendeof = 4096; break; }
  289. if (r == 0) { sendeof = 2048; break; }
  290. sendbytes += r;
  291. if (sendbytes >= 1152921504606846976LL) die_internalerror();
  292. } while(0);
  293. recent = nanoseconds();
  294. do { /* try re-sending an old block: */
  295. if (recent < lastblocktime + nsecperblock) break;
  296. if (earliestblocktime == 0) break;
  297. if (recent < earliestblocktime + rtt_timeout) break;
  298. for (i = 0;i < blocknum;++i) {
  299. pos = (blockfirst + i) & (OUTGOING - 1);
  300. if (blocktime[pos] == earliestblocktime) {
  301. if (recent > lastpanic + 4 * rtt_timeout) {
  302. nsecperblock *= 2;
  303. lastpanic = recent;
  304. lastedge = recent;
  305. }
  306. goto sendblock;
  307. }
  308. }
  309. } while(0);
  310. do { /* try sending a new block: */
  311. if (recent < lastblocktime + nsecperblock) break;
  312. if (blocknum >= OUTGOING) break;
  313. if (!wantping)
  314. if (sendeof ? sendeofprocessed : sendprocessed >= sendbytes) break;
  315. /* XXX: if any Nagle-type processing is desired, do it here */
  316. pos = (blockfirst + blocknum) & (OUTGOING - 1);
  317. ++blocknum;
  318. blockpos[pos] = sendacked + sendprocessed;
  319. blocklen[pos] = sendbytes - sendprocessed;
  320. if (blocklen[pos] > maxblocklen) blocklen[pos] = maxblocklen;
  321. if ((blockpos[pos] & (sizeof sendbuf - 1)) + blocklen[pos] > sizeof sendbuf)
  322. blocklen[pos] = sizeof sendbuf - (blockpos[pos] & (sizeof sendbuf - 1));
  323. /* XXX: or could have the full block in post-buffer space */
  324. sendprocessed += blocklen[pos];
  325. blockeof[pos] = 0;
  326. if (sendprocessed == sendbytes) {
  327. blockeof[pos] = sendeof;
  328. if (sendeof) sendeofprocessed = 1;
  329. }
  330. blocktransmissions[pos] = 0;
  331. sendblock:
  332. blocktransmissions[pos] += 1;
  333. blocktime[pos] = recent;
  334. blockid[pos] = nextmessageid;
  335. if (!++nextmessageid) ++nextmessageid;
  336. /* constraints: u multiple of 16; u >= 16; u <= 1088; u >= 48 + blocklen[pos] */
  337. u = 64 + blocklen[pos];
  338. if (u <= 192) u = 192;
  339. else if (u <= 320) u = 320;
  340. else if (u <= 576) u = 576;
  341. else if (u <= 1088) u = 1088;
  342. else die_internalerror();
  343. if (blocklen[pos] < 0 || blocklen[pos] > 1024) die_internalerror();
  344. byte_zero(buf + 8,u);
  345. buf[7] = u / 16;
  346. uint32_pack(buf + 8,blockid[pos]);
  347. /* XXX: include any acknowledgments that have piled up */
  348. uint16_pack(buf + 46,blockeof[pos] | (crypto_uint16) blocklen[pos]);
  349. uint64_pack(buf + 48,blockpos[pos]);
  350. byte_copy(buf + 8 + u - blocklen[pos],blocklen[pos],sendbuf + (blockpos[pos] & (sizeof sendbuf - 1)));
  351. if (writeall(9,buf + 7,u + 1) == -1) die_fatal("unable to write descriptor 9",0,0);
  352. lastblocktime = recent;
  353. wantping = 0;
  354. earliestblocktime_compute();
  355. } while(0);
  356. do { /* try receiving messages: */
  357. if (!watch8) break;
  358. r = read(8,buf,sizeof buf);
  359. if (r == -1) if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) break;
  360. if (r == 0) die_badmessage();
  361. if (r < 0) die_fatal("unable to read from file descriptor 8",0,0);
  362. for (k = 0;k < r;++k) {
  363. messagetodo[messagetodolen++] = buf[k];
  364. u = 16 * (unsigned long long) messagetodo[0];
  365. if (u < 16) die_badmessage();
  366. if (u > 1088) die_badmessage();
  367. if (messagetodolen == 1 + u) {
  368. if (messagenum < INCOMING) {
  369. pos = (messagefirst + messagenum) & (INCOMING - 1);
  370. messagelen[pos] = messagetodo[0];
  371. byte_copy(message[pos],u,messagetodo + 1);
  372. ++messagenum;
  373. } else {
  374. ; /* drop tail */
  375. }
  376. messagetodolen = 0;
  377. }
  378. }
  379. } while(0);
  380. do { /* try processing a message: */
  381. if (!messagenum) break;
  382. if (tochild[1] >= 0 && receivewritten < receivebytes) break;
  383. maxblocklen = 1024;
  384. pos = messagefirst & (INCOMING - 1);
  385. len = 16 * (unsigned long long) messagelen[pos];
  386. do { /* handle this message if it's comprehensible: */
  387. unsigned long long D;
  388. unsigned long long SF;
  389. unsigned long long startbyte;
  390. unsigned long long stopbyte;
  391. crypto_uint32 id;
  392. long long i;
  393. if (len < 48) break;
  394. if (len > 1088) break;
  395. id = uint32_unpack(message[pos] + 4);
  396. for (i = 0;i < blocknum;++i) {
  397. k = (blockfirst + i) & (OUTGOING - 1);
  398. if (blockid[k] == id) {
  399. rtt = recent - blocktime[k];
  400. if (!rtt_average) {
  401. nsecperblock = rtt;
  402. rtt_average = rtt;
  403. rtt_deviation = rtt / 2;
  404. rtt_highwater = rtt;
  405. rtt_lowwater = rtt;
  406. }
  407. /* Jacobson's retransmission timeout calculation: */
  408. rtt_delta = rtt - rtt_average;
  409. rtt_average += rtt_delta / 8;
  410. if (rtt_delta < 0) rtt_delta = -rtt_delta;
  411. rtt_delta -= rtt_deviation;
  412. rtt_deviation += rtt_delta / 4;
  413. rtt_timeout = rtt_average + 4 * rtt_deviation;
  414. /* adjust for delayed acks with anti-spiking: */
  415. rtt_timeout += 8 * nsecperblock;
  416. /* recognizing top and bottom of congestion cycle: */
  417. rtt_delta = rtt - rtt_highwater;
  418. rtt_highwater += rtt_delta / 1024;
  419. rtt_delta = rtt - rtt_lowwater;
  420. if (rtt_delta > 0) rtt_lowwater += rtt_delta / 8192;
  421. else rtt_lowwater += rtt_delta / 256;
  422. if (rtt_average > rtt_highwater + 5000000) rtt_seenrecenthigh = 1;
  423. else if (rtt_average < rtt_lowwater) rtt_seenrecentlow = 1;
  424. if (recent >= lastspeedadjustment + 16 * nsecperblock) {
  425. if (recent - lastspeedadjustment > 10000000000LL) {
  426. nsecperblock = 1000000000; /* slow restart */
  427. nsecperblock += randommod(nsecperblock / 8);
  428. }
  429. lastspeedadjustment = recent;
  430. if (nsecperblock >= 131072) {
  431. /* additive increase: adjust 1/N by a constant c */
  432. /* rtt-fair additive increase: adjust 1/N by a constant c every nanosecond */
  433. /* approximation: adjust 1/N by cN every N nanoseconds */
  434. /* i.e., N <- 1/(1/N + cN) = N/(1 + cN^2) every N nanoseconds */
  435. if (nsecperblock < 16777216) {
  436. /* N/(1+cN^2) approx N - cN^3 */
  437. u = nsecperblock / 131072;
  438. nsecperblock -= u * u * u;
  439. } else {
  440. double d = nsecperblock;
  441. nsecperblock = d/(1 + d*d / 2251799813685248.0);
  442. }
  443. }
  444. if (rtt_phase == 0) {
  445. if (rtt_seenolderhigh) {
  446. rtt_phase = 1;
  447. lastedge = recent;
  448. nsecperblock += randommod(nsecperblock / 4);
  449. }
  450. } else {
  451. if (rtt_seenolderlow) {
  452. rtt_phase = 0;
  453. }
  454. }
  455. rtt_seenolderhigh = rtt_seenrecenthigh;
  456. rtt_seenolderlow = rtt_seenrecentlow;
  457. rtt_seenrecenthigh = 0;
  458. rtt_seenrecentlow = 0;
  459. }
  460. do {
  461. if (recent - lastedge < 60000000000LL) {
  462. if (recent < lastdoubling + 4 * nsecperblock + 64 * rtt_timeout + 5000000000LL) break;
  463. } else {
  464. if (recent < lastdoubling + 4 * nsecperblock + 2 * rtt_timeout) break;
  465. }
  466. if (nsecperblock <= 65535) break;
  467. nsecperblock /= 2;
  468. lastdoubling = recent;
  469. if (lastedge) lastedge = recent;
  470. } while(0);
  471. }
  472. }
  473. stopbyte = uint64_unpack(message[pos] + 8);
  474. acknowledged(0,stopbyte);
  475. startbyte = stopbyte + (unsigned long long) uint32_unpack(message[pos] + 16);
  476. stopbyte = startbyte + (unsigned long long) uint16_unpack(message[pos] + 20);
  477. acknowledged(startbyte,stopbyte);
  478. startbyte = stopbyte + (unsigned long long) uint16_unpack(message[pos] + 22);
  479. stopbyte = startbyte + (unsigned long long) uint16_unpack(message[pos] + 24);
  480. acknowledged(startbyte,stopbyte);
  481. startbyte = stopbyte + (unsigned long long) uint16_unpack(message[pos] + 26);
  482. stopbyte = startbyte + (unsigned long long) uint16_unpack(message[pos] + 28);
  483. acknowledged(startbyte,stopbyte);
  484. startbyte = stopbyte + (unsigned long long) uint16_unpack(message[pos] + 30);
  485. stopbyte = startbyte + (unsigned long long) uint16_unpack(message[pos] + 32);
  486. acknowledged(startbyte,stopbyte);
  487. startbyte = stopbyte + (unsigned long long) uint16_unpack(message[pos] + 34);
  488. stopbyte = startbyte + (unsigned long long) uint16_unpack(message[pos] + 36);
  489. acknowledged(startbyte,stopbyte);
  490. D = uint16_unpack(message[pos] + 38);
  491. SF = D & (2048 + 4096);
  492. D -= SF;
  493. if (D > 1024) break;
  494. if (48 + D > len) break;
  495. startbyte = uint64_unpack(message[pos] + 40);
  496. stopbyte = startbyte + D;
  497. if (stopbyte > receivewritten + sizeof receivebuf) {
  498. break;
  499. /* of course, flow control would avoid this case */
  500. }
  501. if (SF) {
  502. receiveeof = SF;
  503. receivetotalbytes = stopbyte;
  504. }
  505. for (k = 0;k < D;++k) {
  506. unsigned char ch = message[pos][len - D + k];
  507. unsigned long long where = startbyte + k;
  508. if (where >= receivewritten && where < receivewritten + sizeof receivebuf) {
  509. receivevalid[where & (sizeof receivebuf - 1)] = 1;
  510. receivebuf[where & (sizeof receivebuf - 1)] = ch;
  511. }
  512. }
  513. for (;;) {
  514. if (receivebytes >= receivewritten + sizeof receivebuf) break;
  515. if (!receivevalid[receivebytes & (sizeof receivebuf - 1)]) break;
  516. ++receivebytes;
  517. }
  518. if (!uint32_unpack(message[pos])) break; /* never acknowledge a pure acknowledgment */
  519. /* XXX: delay acknowledgments */
  520. u = 192;
  521. byte_zero(buf + 8,u);
  522. buf[7] = u / 16;
  523. byte_copy(buf + 12,4,message[pos]);
  524. if (receiveeof && receivebytes == receivetotalbytes) {
  525. uint64_pack(buf + 16,receivebytes + 1);
  526. } else
  527. uint64_pack(buf + 16,receivebytes);
  528. /* XXX: incorporate selective acknowledgments */
  529. if (writeall(9,buf + 7,u + 1) == -1) die_fatal("unable to write descriptor 9",0,0);
  530. } while(0);
  531. ++messagefirst;
  532. --messagenum;
  533. } while(0);
  534. do { /* try sending data to child: */
  535. if (!watchtochild) break;
  536. if (tochild[1] < 0) { receivewritten = receivebytes; break; }
  537. if (receivewritten >= receivebytes) break;
  538. pos = receivewritten & (sizeof receivebuf - 1);
  539. len = receivebytes - receivewritten;
  540. if (pos + len > sizeof receivebuf) len = sizeof receivebuf - pos;
  541. r = write(tochild[1],receivebuf + pos,len);
  542. if (r == -1) if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) break;
  543. if (r <= 0) {
  544. close(tochild[1]);
  545. tochild[1] = -1;
  546. break;
  547. }
  548. byte_zero(receivevalid + pos,r);
  549. receivewritten += r;
  550. } while(0);
  551. do { /* try closing pipe to child: */
  552. if (!receiveeof) break;
  553. if (receivewritten < receivetotalbytes) break;
  554. if (tochild[1] < 0) break;
  555. if (receiveeof == 4096)
  556. ; /* XXX: UNIX doesn't provide a way to signal an error through a pipe */
  557. close(tochild[1]);
  558. tochild[1] = -1;
  559. } while(0);
  560. }
  561. do {
  562. r = waitpid(child,&childstatus,0);
  563. } while (r == -1 && errno == EINTR);
  564. if (!WIFEXITED(childstatus)) { errno = 0; die_fatal("process killed by signal",0,0); }
  565. return WEXITSTATUS(childstatus);
  566. }