123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- #include <u.h>
- #include <libc.h>
- #include <venti.h>
- #include "queue.h"
- long ventisendbytes, ventisendpackets;
- long ventirecvbytes, ventirecvpackets;
- static int
- _vtsend(VtConn *z, Packet *p)
- {
- IOchunk ioc;
- int n, tot;
- uchar buf[2];
- if(z->state != VtStateConnected) {
- werrstr("session not connected");
- return -1;
- }
- /* add framing */
- n = packetsize(p);
- if(n >= (1<<16)) {
- werrstr("packet too large");
- packetfree(p);
- return -1;
- }
- buf[0] = n>>8;
- buf[1] = n;
- packetprefix(p, buf, 2);
- ventisendbytes += n+2;
- ventisendpackets++;
- tot = 0;
- for(;;){
- n = packetfragments(p, &ioc, 1, 0);
- if(n == 0)
- break;
- if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
- vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
- packetfree(p);
- return -1;
- }
- packetconsume(p, nil, ioc.len);
- tot += ioc.len;
- }
- vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
- packetfree(p);
- return 1;
- }
- static int
- interrupted(void)
- {
- char e[ERRMAX];
- rerrstr(e, sizeof e);
- return strstr(e, "interrupted") != nil;
- }
- static Packet*
- _vtrecv(VtConn *z)
- {
- uchar buf[10], *b;
- int n;
- Packet *p;
- int size, len;
- if(z->state != VtStateConnected) {
- werrstr("session not connected");
- return nil;
- }
- p = z->part;
- /* get enough for head size */
- size = packetsize(p);
- while(size < 2) {
- b = packettrailer(p, 2);
- assert(b != nil);
- if(0) fprint(2, "%d read hdr\n", getpid());
- n = read(z->infd, b, 2);
- if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
- if(n==0 || (n<0 && !interrupted()))
- goto Err;
- size += n;
- packettrim(p, 0, size);
- }
- if(packetconsume(p, buf, 2) < 0)
- goto Err;
- len = (buf[0] << 8) | buf[1];
- size -= 2;
- while(size < len) {
- n = len - size;
- if(n > MaxFragSize)
- n = MaxFragSize;
- b = packettrailer(p, n);
- if(0) fprint(2, "%d read body %d\n", getpid(), n);
- n = read(z->infd, b, n);
- if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
- if(n > 0)
- size += n;
- packettrim(p, 0, size);
- if(n==0 || (n<0 && !interrupted()))
- goto Err;
- }
- ventirecvbytes += len;
- ventirecvpackets++;
- p = packetsplit(p, len);
- vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
- return p;
- Err:
- vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
- return nil;
- }
- /*
- * If you fork off two procs running vtrecvproc and vtsendproc,
- * then vtrecv/vtsend (and thus vtrpc) will never block except on
- * rendevouses, which is nice when it's running in one thread of many.
- */
- void
- vtrecvproc(void *v)
- {
- Packet *p;
- VtConn *z;
- Queue *q;
- z = v;
- q = _vtqalloc();
- qlock(&z->lk);
- z->readq = q;
- qlock(&z->inlk);
- rwakeup(&z->rpcfork);
- qunlock(&z->lk);
- while((p = _vtrecv(z)) != nil)
- if(_vtqsend(q, p) < 0){
- packetfree(p);
- break;
- }
- qunlock(&z->inlk);
- qlock(&z->lk);
- _vtqhangup(q);
- while((p = _vtnbqrecv(q)) != nil)
- packetfree(p);
- _vtqdecref(q);
- z->readq = nil;
- rwakeup(&z->rpcfork);
- qunlock(&z->lk);
- vthangup(z);
- }
- void
- vtsendproc(void *v)
- {
- Queue *q;
- Packet *p;
- VtConn *z;
- z = v;
- q = _vtqalloc();
- qlock(&z->lk);
- z->writeq = q;
- qlock(&z->outlk);
- rwakeup(&z->rpcfork);
- qunlock(&z->lk);
- while((p = _vtqrecv(q)) != nil)
- if(_vtsend(z, p) < 0)
- break;
- qunlock(&z->outlk);
- qlock(&z->lk);
- _vtqhangup(q);
- while((p = _vtnbqrecv(q)) != nil)
- packetfree(p);
- _vtqdecref(q);
- z->writeq = nil;
- rwakeup(&z->rpcfork);
- qunlock(&z->lk);
- return;
- }
- Packet*
- vtrecv(VtConn *z)
- {
- Packet *p;
- Queue *q;
- qlock(&z->lk);
- if(z->state != VtStateConnected){
- werrstr("not connected");
- qunlock(&z->lk);
- return nil;
- }
- if(z->readq){
- q = _vtqincref(z->readq);
- qunlock(&z->lk);
- p = _vtqrecv(q);
- _vtqdecref(q);
- return p;
- }
- qlock(&z->inlk);
- qunlock(&z->lk);
- p = _vtrecv(z);
- qunlock(&z->inlk);
- if(!p)
- vthangup(z);
- return p;
- }
- int
- vtsend(VtConn *z, Packet *p)
- {
- Queue *q;
- qlock(&z->lk);
- if(z->state != VtStateConnected){
- packetfree(p);
- werrstr("not connected");
- qunlock(&z->lk);
- return -1;
- }
- if(z->writeq){
- q = _vtqincref(z->writeq);
- qunlock(&z->lk);
- if(_vtqsend(q, p) < 0){
- _vtqdecref(q);
- packetfree(p);
- return -1;
- }
- _vtqdecref(q);
- return 0;
- }
- qlock(&z->outlk);
- qunlock(&z->lk);
- if(_vtsend(z, p) < 0){
- qunlock(&z->outlk);
- vthangup(z);
- return -1;
- }
- qunlock(&z->outlk);
- return 0;
- }
|