فهرست منبع

Added beginnings of peerLink, for the moment all it does is track kbps of links

Caleb James DeLisle 9 سال پیش
والد
کامیت
63e23e921e
9فایلهای تغییر یافته به همراه141 افزوده شده و 32 حذف شده
  1. 5 0
      net/InterfaceController.c
  2. 3 0
      net/InterfaceController.h
  3. 7 0
      net/InterfaceController_admin.c
  4. 18 31
      net/PeerLink.c
  5. 8 0
      net/PeerLink.h
  6. 2 1
      tools/peerStats
  7. 2 0
      util/Constant.h
  8. 11 0
      util/Constant.js
  9. 85 0
      util/Kbps.h

+ 5 - 0
net/InterfaceController.c

@@ -907,6 +907,11 @@ int InterfaceController_getPeerStats(struct InterfaceController* ifController,
             s->duplicates = rp->duplicates;
             s->lostPackets = rp->lostPackets;
             s->receivedOutOfRange = rp->receivedOutOfRange;
+
+            struct PeerLink_Kbps kbps;
+            PeerLink_kbps(peer->peerLink, &kbps);
+            s->sendKbps = kbps.sendKbps;
+            s->recvKbps = kbps.recvKbps;
         }
     }
 

+ 3 - 0
net/InterfaceController.h

@@ -87,6 +87,9 @@ struct InterfaceController_PeerStats
     uint32_t duplicates;
     uint32_t lostPackets;
     uint32_t receivedOutOfRange;
+
+    uint32_t sendKbps;
+    uint32_t recvKbps;
 };
 
 struct InterfaceController

+ 7 - 0
net/InterfaceController_admin.c

@@ -54,6 +54,9 @@ static void adminPeerStats(Dict* args, void* vcontext, String* txid, struct Allo
     String* user = String_CONST("user");
     String* version = String_CONST("version");
 
+    String* recvKbps = String_CONST("recvKbps");
+    String* sendKbps = String_CONST("sendKbps");
+
     String* duplicates = String_CONST("duplicates");
     String* lostPackets = String_CONST("lostPackets");
     String* receivedOutOfRange = String_CONST("receivedOutOfRange");
@@ -63,6 +66,10 @@ static void adminPeerStats(Dict* args, void* vcontext, String* txid, struct Allo
         Dict* d = Dict_new(alloc);
         Dict_putInt(d, bytesIn, stats[i].bytesIn, alloc);
         Dict_putInt(d, bytesOut, stats[i].bytesOut, alloc);
+
+        Dict_putInt(d, recvKbps, stats[i].recvKbps, alloc);
+        Dict_putInt(d, sendKbps, stats[i].sendKbps, alloc);
+
         Dict_putString(d, addr, Address_toString(&stats[i].addr, alloc), alloc);
         Dict_putString(d, pubKey, Key_stringify(stats[i].addr.key, alloc), alloc);
 

+ 18 - 31
net/PeerLink.c

@@ -15,53 +15,32 @@
 #include "memory/Allocator.h"
 #include "net/PeerLink.h"
 #include "util/Identity.h"
+#include "util/Kbps.h"
 #include "wire/SwitchHeader.h"
+#include "util/events/Time.h"
 
 #define ArrayList_TYPE struct Message
 #define ArrayList_NAME Messages
 #include "util/ArrayList.h"
 
-struct Bandwidth
-{
-    uint32_t lastMessageTime;
-    struct Average avg;
-};
-
 struct PeerLink_pvt
 {
     struct PeerLink pub;
     struct Allocator* alloc;
     struct EventBase* base;
     struct ArrayList_Messages* queue;
-    struct Bandwidth sendBw;
-    struct Bandwidth recvBw;
+    struct Kbps sendBw;
+    struct Kbps recvBw;
     Identity
 };
 
-/**
- * messageLength / millisecondsSinceLast -> bytes per millisecond
- * (messageLength << 10) / millisecondsSinceLast -> bytes per second
- */
-static uint64_t instantaniousBandwidth(uint64_t messageLength, uint64_t millisecondsSinceLast)
-{
-    return (messageLength << 10) / millisecondsSinceLast;
-}
-
-static void calcNextBandwidth(struct Bandwidth* bw, int messageLength, struct PeerLink_pvt* pl)
-{
-    uint64_t now = Time_currentTimeMilliseconds(pl->base);
-    if (now <= bw->lastMessageTime) { now = bw->lastMessageTime; }
-    uint64_t ibw = instantaniousBandwidth(messageLength, now - bw->lastMessageTime);
-    
-}
-
 struct Message* PeerLink_poll(struct PeerLink* peerLink)
 {
     struct PeerLink_pvt* pl = Identity_check((struct PeerLink_pvt*) peerLink);
     struct Message* out = ArrayList_Messages_shift(pl->queue);
-    if (out) {
-        Allocator_disown(pl->alloc, out->alloc);
-    }
+    if (!out) { return NULL; }
+    Allocator_disown(pl->alloc, out->alloc);
+    Kbps_accumulate(&pl->sendBw, Time_currentTimeMilliseconds(pl->base), out->length);
     return out;
 }
 
@@ -73,10 +52,18 @@ int PeerLink_send(struct Message* msg, struct PeerLink* peerLink)
     return pl->queue->length;
 }
 
-void PeerLink_recv(struct Message* msg, struct PeerLink* pl)
+void PeerLink_recv(struct Message* msg, struct PeerLink* peerLink)
 {
-    //struct PeerLink_pvt* pl = Identity_check((struct PeerLink_pvt*) peerLink);
-    // do nothing for now, later we will begin to check headers.
+    struct PeerLink_pvt* pl = Identity_check((struct PeerLink_pvt*) peerLink);
+    Kbps_accumulate(&pl->recvBw, Time_currentTimeMilliseconds(pl->base), msg->length);
+}
+
+void PeerLink_kbps(struct PeerLink* peerLink, struct PeerLink_Kbps* output)
+{
+    struct PeerLink_pvt* pl = Identity_check((struct PeerLink_pvt*) peerLink);
+    uint32_t now = Time_currentTimeMilliseconds(pl->base);
+    output->recvKbps = Kbps_accumulate(&pl->recvBw, now, Kbps_accumulate_NO_PACKET);
+    output->sendKbps = Kbps_accumulate(&pl->sendBw, now, Kbps_accumulate_NO_PACKET);
 }
 
 struct PeerLink* PeerLink_new(struct EventBase* base, struct Allocator* alloc)

+ 8 - 0
net/PeerLink.h

@@ -37,6 +37,12 @@ struct PeerLink
     bool peerHeaderEnabled;
 };
 
+struct PeerLink_Kbps
+{
+    uint32_t sendKbps;
+    uint32_t recvKbps;
+};
+
 /**
  * Attempt to get a message from the peerlink to send, if it is time to send one.
  * If there are no messages in the queue or the link is already at capacity, NULL will be returned.
@@ -57,6 +63,8 @@ int PeerLink_send(struct Message* msg, struct PeerLink* pl);
  */
 void PeerLink_recv(struct Message* msg, struct PeerLink* pl);
 
+void PeerLink_kbps(struct PeerLink* peerLink, struct PeerLink_Kbps* output);
+
 
 struct PeerLink* PeerLink_new(struct EventBase* base, struct Allocator* alloc);
 

+ 2 - 1
tools/peerStats

@@ -24,7 +24,8 @@ Cjdns.connectAsAnon(function (cjdns) {
             if (err) { throw err; }
             ret.peers.forEach(function (peer) {
                 p = peer['addr'] + ' ' + peer['state'] +
-                    ' in ' + peer['bytesIn'] + ' out ' + peer['bytesOut'];
+                    ' in ' + peer['recvKbps'] + 'kb/s' +
+                    ' out ' + peer['sendKbps'] + 'kb/s';
 
                     if (Number(peer['duplicates']) !== 0) {
                         p += ' ' + ' DUP ' + peer['duplicates'];

+ 2 - 0
util/Constant.h

@@ -25,4 +25,6 @@
 
 #define Constant_randHexString(len) <?js return file.Constant_JS.randHexString(#len); ?>
 
+#define Constant_log2(num) <?js return file.Constant_JS.log2(num); ?>
+
 #endif

+ 11 - 0
util/Constant.js

@@ -78,6 +78,17 @@ var randHexString = module.exports.randHexString = function (lenStr) {
     return '"' + hex.substring(0,len) + '"';
 };
 
+var log2 = module.exports.log2 = function (val) {
+    var x = 1;
+    for (var i = 0; i < 31; i++) {
+        if (x === val) {
+            if ((1 << i) !== val) { throw new Error(); }
+            return i;
+        }
+        x = x + x;
+    }
+    throw new Error("not an even power of 2");
+};
 
 if (!module.parent) {
     console.log("testing " + __filename);

+ 85 - 0
util/Kbps.h

@@ -0,0 +1,85 @@
+/* vim: set expandtab ts=4 sw=4: */
+/*
+ * You may redistribute this program and/or modify it under the terms of
+ * the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef Kbps_H
+#define Kbps_H
+
+#include "util/Constant.h"
+
+// Must be multiples of 2
+#define Kbps_WINDOW_SIZE 8
+
+// Must be a multiple of 1024 (1 second)
+#define Kbps_TIMESPAN    1024
+
+#define Kbps_WINDOW_SH Constant_log2(Kbps_WINDOW_SIZE)
+#define Kbps_TIMESPAN_SH Constant_log2(Kbps_TIMESPAN)
+
+struct Kbps
+{
+    /** bookkeeping */
+    uint32_t window[Kbps_WINDOW_SIZE];
+
+    /** Millisecond time of last message. */
+    uint32_t lastTime;
+
+    /** KiloBytes per timeslot. */
+    uint32_t currentBpt;
+};
+
+#define Kbps_accumulate_NO_PACKET 0xffffffff
+static inline uint32_t Kbps_accumulate(struct Kbps* ctx, uint32_t now, uint32_t packetSize)
+{
+    // now >> 10-3 == now / 1024 * 8 (eighth second Kbps_WINDOW_SIZE)
+    uint32_t xnow = now;
+    if ((xnow - ctx->lastTime) >> 31) { xnow = ctx->lastTime; }
+    int slot = (xnow >> (Kbps_TIMESPAN_SH - Kbps_WINDOW_SH) ) % Kbps_WINDOW_SIZE;
+    int lslot = ( ctx->lastTime >> (Kbps_TIMESPAN_SH - Kbps_WINDOW_SH) ) % Kbps_WINDOW_SIZE;
+    if (xnow - ctx->lastTime > Kbps_TIMESPAN ||
+        ((lslot == slot) && (xnow - ctx->lastTime > (Kbps_TIMESPAN >> 1))))
+    {
+        for (int i = 0; i < Kbps_WINDOW_SIZE; i++) {
+            ctx->currentBpt -= ctx->window[i];
+            ctx->window[i] = 0;
+        }
+        Assert_true(!ctx->currentBpt);
+    } else if (lslot != slot) {
+        for (int i = lslot + 1; ; i++) {
+            i %= Kbps_WINDOW_SIZE;
+            ctx->currentBpt -= ctx->window[i];
+            ctx->window[i] = 0;
+            if (i == slot) { break; }
+        }
+    }
+    // Make sure it didn't go under zero and roll over
+    Assert_true(!(ctx->currentBpt >> 31));
+    if (packetSize != Kbps_accumulate_NO_PACKET) {
+        ctx->window[slot] += packetSize;
+        ctx->currentBpt += packetSize;
+    }
+    ctx->lastTime = now;
+
+    // /= 2 ** Kbps_TIMESPAN_SH --> bytes per timespan to bytes per millisecond
+    // *= 2 ** 10               --> bytes per millisecond to bytes per second
+    // *= 2 ** 3                --> bytes per second to bits per second
+    // /= 2 ** 10               --> bits per second to kbits per second
+    return (
+        ctx->currentBpt <?js
+            var x = (Number(Kbps_TIMESPAN_SH) - 10 - 3 + 10);
+            return ( ((x) < 0) ? "<<" : ">>" ) + " " + Math.abs(x);
+        ?>
+    );
+}
+
+#endif