/* vim: set expandtab ts=4 sw=4: */
/*
* You may redistribute this program and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include "crypto/AddressCalc.h"
#include "crypto/CryptoAuth_pvt.h"
#include "net/DefaultInterfaceController.h"
#include "dht/dhtcore/RumorMill.h"
#include "memory/Allocator.h"
#include "net/SwitchPinger.h"
#include "util/Base32.h"
#include "util/Bits.h"
#include "util/events/Time.h"
#include "util/events/Timeout.h"
#include "util/Identity.h"
#include "util/version/Version.h"
#include "util/AddrTools.h"
#include "wire/Error.h"
#include "wire/Message.h"
#include // offsetof
/** After this number of milliseconds, a node will be regarded as unresponsive. */
#define UNRESPONSIVE_AFTER_MILLISECONDS (20*1024)
/**
* After this number of milliseconds without a valid incoming message,
* a peer is "lazy" and should be pinged.
*/
#define PING_AFTER_MILLISECONDS (3*1024)
/** How often to ping "lazy" peers, "unresponsive" peers are only pinged 20% of the time. */
#define PING_INTERVAL_MILLISECONDS 1024
/** The number of milliseconds to wait for a ping response. */
#define TIMEOUT_MILLISECONDS (2*1024)
/**
* The number of seconds to wait before an unresponsive peer
* making an incoming connection is forgotten.
*/
#define FORGET_AFTER_MILLISECONDS (256*1024)
/*--------------------Structs--------------------*/
struct IFCPeer
{
/** The interface which is registered with the switch. */
struct Interface switchIf;
/** The internal (wrapped by CryptoAuth) interface. */
struct Interface* cryptoAuthIf;
/** The external (network side) interface. */
struct Interface* external;
/** The label for this endpoint, needed to ping the endpoint. */
uint64_t switchLabel;
/** Milliseconds since the epoch when the last *valid* message was received. */
uint64_t timeOfLastMessage;
/** Time when the last switch ping response was received from this node. */
uint64_t timeOfLastPing;
/** The handle which can be used to look up this endpoint in the endpoint set. */
uint32_t handle;
/** True if we should forget about the peer if they do not respond. */
bool isIncomingConnection : 1;
/**
* If InterfaceController_PeerState_UNAUTHENTICATED, no permanent state will be kept.
* During transition from HANDSHAKE to ESTABLISHED, a check is done for a registeration of a
* node which is already registered in a different switch slot, if there is one and the
* handshake completes, it will be moved.
*/
int state : 31;
// traffic counters
uint64_t bytesOut;
uint64_t bytesIn;
Identity
};
#define Map_NAME OfIFCPeerByExernalIf
#define Map_ENABLE_HANDLES
#define Map_KEY_TYPE struct Interface*
#define Map_VALUE_TYPE struct IFCPeer*
#include "util/Map.h"
struct Context
{
/** Public functions and fields for this ifcontroller. */
struct InterfaceController pub;
/** Used to get a peer by its handle. */
struct Map_OfIFCPeerByExernalIf peerMap;
struct Allocator* const allocator;
struct CryptoAuth* const ca;
/** Switch for adding nodes when they are discovered. */
struct SwitchCore* const switchCore;
/** Router needed to inject newly added nodes to bootstrap the system. */
struct RouterModule* const routerModule;
struct RumorMill* const rumorMill;
struct Log* const logger;
struct EventBase* const eventBase;
/** After this number of milliseconds, a neoghbor will be regarded as unresponsive. */
uint32_t unresponsiveAfterMilliseconds;
/** The number of milliseconds to wait before pinging. */
uint32_t pingAfterMilliseconds;
/** The number of milliseconds to let a ping go before timing it out. */
uint32_t timeoutMilliseconds;
/** After this number of milliseconds, an incoming connection is forgotten entirely. */
uint32_t forgetAfterMilliseconds;
/** A counter to allow for 3/4 of all pings to be skipped when a node is definitely down. */
uint32_t pingCount;
/** The timeout event to use for pinging potentially unresponsive neighbors. */
struct Timeout* const pingInterval;
/** For pinging lazy/unresponsive nodes. */
struct SwitchPinger* const switchPinger;
/** A password which is generated per-startup and sent out in beacon messages. */
uint8_t beaconPassword[Headers_Beacon_PASSWORD_LEN];
Identity
};
//---------------//
static inline struct Context* ifcontrollerForPeer(struct IFCPeer* ep)
{
return Identity_check((struct Context*) ep->switchIf.senderContext);
}
static void onPingResponse(enum SwitchPinger_Result result,
uint64_t label,
String* data,
uint32_t millisecondsLag,
uint32_t version,
void* onResponseContext)
{
if (SwitchPinger_Result_OK != result) {
return;
}
struct IFCPeer* ep = Identity_check((struct IFCPeer*) onResponseContext);
struct Context* ic = ifcontrollerForPeer(ep);
struct Address addr;
Bits_memset(&addr, 0, sizeof(struct Address));
Bits_memcpyConst(addr.key, CryptoAuth_getHerPublicKey(ep->cryptoAuthIf), 32);
addr.path = ep->switchLabel;
addr.protocolVersion = version;
ep->timeOfLastPing = Time_currentTimeMilliseconds(ic->eventBase);
#ifdef Log_DEBUG
uint8_t addrStr[60];
Address_print(addrStr, &addr);
#endif
if (!Version_isCompatible(Version_CURRENT_PROTOCOL, version)) {
Log_debug(ic->logger, "got switch pong from node [%s] with incompatible version [%d]",
addrStr, version);
} else {
Log_debug(ic->logger, "got switch pong from node with version [%d]", version);
}
struct Node_Two* nn = RouterModule_nodeForPath(label, ic->routerModule);
if (!nn) {
RumorMill_addNode(ic->rumorMill, &addr);
} else if (!Node_getBestParent(nn)) {
RouterModule_peerIsReachable(label, millisecondsLag, ic->routerModule);
}
#ifdef Log_DEBUG
// This will be false if it times out.
//Assert_true(label == ep->switchLabel);
uint8_t path[20];
AddrTools_printPath(path, label);
uint8_t sl[20];
AddrTools_printPath(sl, ep->switchLabel);
Log_debug(ic->logger, "Received [%s] from lazy endpoint [%s] [%s]",
SwitchPinger_resultString(result)->bytes, path, sl);
#endif
}
// Called from the pingInteral timeout.
static void pingCallback(void* vic)
{
struct Context* ic = Identity_check((struct Context*) vic);
uint64_t now = Time_currentTimeMilliseconds(ic->eventBase);
ic->pingCount++;
// scan for endpoints have not sent anything recently.
for (uint32_t i = 0; i < ic->peerMap.count; i++) {
struct IFCPeer* ep = ic->peerMap.values[i];
if (now < ep->timeOfLastMessage + ic->pingAfterMilliseconds) {
if (now < ep->timeOfLastPing + ic->pingAfterMilliseconds) {
// Possibly an out-of-date node which is mangling packets, don't ping too often
// because it causes the RumorMill to be filled with this node over and over.
continue;
}
// This is here because of a pathological state where the connection is in ESTABLISHED
// state but the *direct peer* has somehow been dropped from the routing table
// usually because of a call to NodeStore_brokenPath()
struct Node_Two* peerNode = RouterModule_nodeForPath(ep->switchLabel, ic->routerModule);
if (peerNode && Node_getBestParent(peerNode)) {
continue;
}
}
#ifdef Log_DEBUG
uint8_t key[56];
Base32_encode(key, 56, CryptoAuth_getHerPublicKey(ep->cryptoAuthIf), 32);
#endif
if (ep->isIncomingConnection
&& now > ep->timeOfLastMessage + ic->forgetAfterMilliseconds)
{
Log_debug(ic->logger, "Unresponsive peer [%s.k] has not responded in [%u] "
"seconds, dropping connection",
key, ic->forgetAfterMilliseconds / 1024);
Allocator_free(ep->external->allocator);
return;
}
bool unresponsive = (now > ep->timeOfLastMessage + ic->unresponsiveAfterMilliseconds);
if (unresponsive) {
// flush the peer from the table...
RouterModule_brokenPath(ep->switchLabel, ic->routerModule);
// Lets skip 87% of pings when they're really down.
if (ic->pingCount % 8) {
continue;
}
ep->state = InterfaceController_PeerState_UNRESPONSIVE;
}
struct SwitchPinger_Ping* ping =
SwitchPinger_newPing(ep->switchLabel,
String_CONST(""),
ic->timeoutMilliseconds,
onPingResponse,
ic->allocator,
ic->switchPinger);
#ifdef Log_DEBUG
uint32_t lag = (now - ep->timeOfLastMessage) / 1024;
#endif
if (!ping) {
Log_debug(ic->logger,
"Failed to ping %s peer [%s.k] lag [%u], out of ping slots.",
(unresponsive ? "unresponsive" : "lazy"), key, lag);
return;
}
ping->onResponseContext = ep;
Log_debug(ic->logger,
"Pinging %s peer [%s.k] lag [%u]",
(unresponsive ? "unresponsive" : "lazy"), key, lag);
}
}
/** If there's already an endpoint with the same public key, merge the new one with the old one. */
static void moveEndpointIfNeeded(struct IFCPeer* ep, struct Context* ic)
{
Log_debug(ic->logger, "Checking for old sessions to merge with.");
uint8_t* key = CryptoAuth_getHerPublicKey(ep->cryptoAuthIf);
for (uint32_t i = 0; i < ic->peerMap.count; i++) {
struct IFCPeer* thisEp = ic->peerMap.values[i];
uint8_t* thisKey = CryptoAuth_getHerPublicKey(thisEp->cryptoAuthIf);
if (thisEp != ep && !Bits_memcmp(thisKey, key, 32)) {
Log_info(ic->logger, "Moving endpoint to merge new session with old.");
ep->switchLabel = thisEp->switchLabel;
SwitchCore_swapInterfaces(&thisEp->switchIf, &ep->switchIf);
Allocator_free(thisEp->external->allocator);
return;
}
}
}
// Incoming message which has passed through the cryptoauth and needs to be forwarded to the switch.
static uint8_t receivedAfterCryptoAuth(struct Message* msg, struct Interface* cryptoAuthIf)
{
struct IFCPeer* ep = Identity_check((struct IFCPeer*) cryptoAuthIf->receiverContext);
struct Context* ic = ifcontrollerForPeer(ep);
ep->bytesIn += msg->length;
if (ep->state < InterfaceController_PeerState_ESTABLISHED) {
if (CryptoAuth_getState(cryptoAuthIf) >= CryptoAuth_HANDSHAKE3) {
moveEndpointIfNeeded(ep, ic);
ep->state = InterfaceController_PeerState_ESTABLISHED;
} else {
ep->state = InterfaceController_PeerState_HANDSHAKE;
// prevent some kinds of nasty things which could be done with packet replay.
// This is checking the message switch header and will drop it unless the label
// directs it to *this* router.
if (msg->length < 8 || msg->bytes[7] != 1) {
Log_info(ic->logger, "DROP message because CA is not established.");
return Error_NONE;
} else {
// When a "server" gets a new connection from a "client" the router doesn't
// know about that client so if the client sends a packet to the server, the
// server will be unable to handle it until the client has sent inter-router
// communication to the server. Here we will ping the client so when the
// server gets the ping response, it will insert the client into its table
// and know its version.
// prevent DoS by limiting the number of times this can be called per second
// limit it to 7, this will affect innocent packets but it doesn't matter much
// since this is mostly just an optimization and for keeping the tests happy.
if ((ic->pingCount + 1) % 7) {
pingCallback(ic);
}
}
}
} else if (ep->state == InterfaceController_PeerState_UNRESPONSIVE
&& CryptoAuth_getState(cryptoAuthIf) >= CryptoAuth_HANDSHAKE3)
{
ep->state = InterfaceController_PeerState_ESTABLISHED;
} else {
ep->timeOfLastMessage = Time_currentTimeMilliseconds(ic->eventBase);
}
return ep->switchIf.receiveMessage(msg, &ep->switchIf);
}
// This is directly called from SwitchCore, message is not encrypted.
static uint8_t sendFromSwitch(struct Message* msg, struct Interface* switchIf)
{
struct IFCPeer* ep = Identity_check((struct IFCPeer*) switchIf);
ep->bytesOut += msg->length;
struct Context* ic = ifcontrollerForPeer(ep);
uint8_t ret;
uint64_t now = Time_currentTimeMilliseconds(ic->eventBase);
if (now - ep->timeOfLastMessage > ic->unresponsiveAfterMilliseconds) {
// TODO(cjd): This is a hack because if the time of last message exceeds the
// unresponsive time, we need to send back an error and that means
// mangling the message which would otherwise be in the queue.
struct Allocator* tempAlloc = Allocator_child(ic->allocator);
struct Message* toSend = Message_clone(msg, tempAlloc);
ret = Interface_sendMessage(ep->cryptoAuthIf, toSend);
Allocator_free(tempAlloc);
} else {
ret = Interface_sendMessage(ep->cryptoAuthIf, msg);
}
// If this node is unresponsive then return an error.
if (ret || now - ep->timeOfLastMessage > ic->unresponsiveAfterMilliseconds) {
return ret ? ret : Error_UNDELIVERABLE;
} else {
/* Way way way too much noise
Log_debug(ic->logger, "Sending to neighbor, last message from this node was [%u] ms ago.",
(now - ep->timeOfLastMessage));
*/
}
return Error_NONE;
}
static int closeInterface(struct Allocator_OnFreeJob* job)
{
struct IFCPeer* toClose = Identity_check((struct IFCPeer*) job->userData);
struct Context* ic = ifcontrollerForPeer(toClose);
// flush the peer from the table...
RouterModule_brokenPath(toClose->switchLabel, ic->routerModule);
int index = Map_OfIFCPeerByExernalIf_indexForHandle(toClose->handle, &ic->peerMap);
Assert_true(index >= 0);
Map_OfIFCPeerByExernalIf_remove(index, &ic->peerMap);
return 0;
}
static int registerPeer(struct InterfaceController* ifController,
uint8_t herPublicKey[32],
String* password,
bool requireAuth,
bool isIncomingConnection,
struct Interface* externalInterface)
{
struct Context* ic = Identity_check((struct Context*) ifController);
if (Map_OfIFCPeerByExernalIf_indexForKey(&externalInterface, &ic->peerMap) > -1) {
return 0;
}
Log_debug(ic->logger, "registerPeer [%p] total [%u]",
(void*)externalInterface, ic->peerMap.count);
uint8_t ip6[16];
if (herPublicKey) {
AddressCalc_addressForPublicKey(ip6, herPublicKey);
if (!AddressCalc_validAddress(ip6)) {
return InterfaceController_registerPeer_BAD_KEY;
}
if (!Bits_memcmp(ic->ca->publicKey, herPublicKey, 32)) {
// can't link with yourself, wiseguy
return InterfaceController_registerPeer_BAD_KEY;
}
} else {
Assert_true(requireAuth);
}
struct Allocator* epAllocator = externalInterface->allocator;
struct IFCPeer* ep = Allocator_calloc(epAllocator, sizeof(struct IFCPeer), 1);
ep->bytesOut = 0;
ep->bytesIn = 0;
ep->external = externalInterface;
int setIndex = Map_OfIFCPeerByExernalIf_put(&externalInterface, &ep, &ic->peerMap);
ep->handle = ic->peerMap.handles[setIndex];
Identity_set(ep);
Allocator_onFree(epAllocator, closeInterface, ep);
// If the other end need not supply a valid password to connect
// we will set the connection state to HANDSHAKE because we don't
// want the connection to be trashed after the first invalid packet.
if (!requireAuth) {
ep->state = InterfaceController_PeerState_HANDSHAKE;
}
ep->cryptoAuthIf = CryptoAuth_wrapInterface(externalInterface,
herPublicKey,
NULL,
requireAuth,
"outer",
ic->ca);
ep->cryptoAuthIf->receiveMessage = receivedAfterCryptoAuth;
ep->cryptoAuthIf->receiverContext = ep;
// Always use authType 1 until something else comes along, then we'll have to refactor.
if (password) {
CryptoAuth_setAuth(password, 1, ep->cryptoAuthIf);
}
ep->isIncomingConnection = isIncomingConnection;
Bits_memcpyConst(&ep->switchIf, (&(struct Interface) {
.sendMessage = sendFromSwitch,
// ifcontrollerForPeer uses this.
// sendFromSwitch relies on the fact that the
// switchIf is the same memory location as the Peer.
.senderContext = ic,
.allocator = epAllocator
}), sizeof(struct Interface));
int ret = SwitchCore_addInterface(&ep->switchIf, 0, &ep->switchLabel, ic->switchCore);
if (ret) {
return (ret == SwitchCore_addInterface_OUT_OF_SPACE)
? InterfaceController_registerPeer_OUT_OF_SPACE
: InterfaceController_registerPeer_INTERNAL;
}
// We want the node to immedietly be pinged but we don't want it to appear unresponsive because
// the pinger will only ping every (PING_INTERVAL * 8) so we set timeOfLastMessage to
// (now - pingAfterMilliseconds - 1) so it will be considered a "lazy node".
ep->timeOfLastMessage =
Time_currentTimeMilliseconds(ic->eventBase) - ic->pingAfterMilliseconds - 1;
if (herPublicKey) {
#ifdef Log_INFO
uint8_t printAddr[60];
AddrTools_printIp(printAddr, ip6);
Log_info(ic->logger, "Adding peer [%s]", printAddr);
#endif
// Kick the ping callback so that the node will be pinged ASAP.
pingCallback(ic);
}
return 0;
}
static enum InterfaceController_PeerState getPeerState(struct Interface* iface)
{
struct Interface* cryptoAuthIf = CryptoAuth_getConnectedInterface(iface);
struct IFCPeer* p = Identity_check((struct IFCPeer*) cryptoAuthIf->receiverContext);
return p->state;
}
static void populateBeacon(struct InterfaceController* ifc, struct Headers_Beacon* beacon)
{
struct Context* ic = Identity_check((struct Context*) ifc);
beacon->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
Bits_memcpyConst(beacon->password, ic->beaconPassword, Headers_Beacon_PASSWORD_LEN);
Bits_memcpyConst(beacon->publicKey, ic->ca->publicKey, 32);
}
static int getPeerStats(struct InterfaceController* ifController,
struct Allocator* alloc,
struct InterfaceController_peerStats** statsOut)
{
struct Context* ic = Identity_check((struct Context*) ifController);
int count = ic->peerMap.count;
struct InterfaceController_peerStats* stats =
Allocator_malloc(alloc, sizeof(struct InterfaceController_peerStats)*count);
for (int i = 0; i < count; i++) {
struct IFCPeer* peer = ic->peerMap.values[i];
struct InterfaceController_peerStats* s = &stats[i];
s->pubKey = CryptoAuth_getHerPublicKey(peer->cryptoAuthIf);
s->bytesOut = peer->bytesOut;
s->bytesIn = peer->bytesIn;
s->timeOfLastMessage = peer->timeOfLastMessage;
s->state = peer->state;
s->switchLabel = peer->switchLabel;
s->isIncomingConnection = peer->isIncomingConnection;
s->user = NULL;
if (s->isIncomingConnection) {
s->user = CryptoAuth_getUser(peer->cryptoAuthIf);
}
struct ReplayProtector* rp = CryptoAuth_getReplayProtector(peer->cryptoAuthIf);
s->duplicates = rp->duplicates;
s->lostPackets = rp->lostPackets;
s->receivedOutOfRange = rp->receivedOutOfRange;
}
*statsOut = stats;
return count;
}
static int disconnectPeer(struct InterfaceController* ifController, uint8_t herPublicKey[32])
{
struct Context* ic = Identity_check((struct Context*) ifController);
for (uint32_t i = 0; i < ic->peerMap.count; i++) {
struct IFCPeer* peer = ic->peerMap.values[i];
if (!Bits_memcmp(herPublicKey, CryptoAuth_getHerPublicKey(peer->cryptoAuthIf), 32)) {
Allocator_free(peer->external->allocator);
return 0;
}
}
return InterfaceController_disconnectPeer_NOTFOUND;
}
struct InterfaceController* DefaultInterfaceController_new(struct CryptoAuth* ca,
struct SwitchCore* switchCore,
struct RouterModule* routerModule,
struct RumorMill* rumorMill,
struct Log* logger,
struct EventBase* eventBase,
struct SwitchPinger* switchPinger,
struct Random* rand,
struct Allocator* allocator)
{
struct Context* out = Allocator_malloc(allocator, sizeof(struct Context));
Bits_memcpyConst(out, (&(struct Context) {
.pub = {
.registerPeer = registerPeer,
.disconnectPeer = disconnectPeer,
.getPeerState = getPeerState,
.populateBeacon = populateBeacon,
.getPeerStats = getPeerStats,
},
.peerMap = {
.allocator = allocator
},
.allocator = allocator,
.ca = ca,
.switchCore = switchCore,
.routerModule = routerModule,
.rumorMill = rumorMill,
.logger = logger,
.eventBase = eventBase,
.switchPinger = switchPinger,
.unresponsiveAfterMilliseconds = UNRESPONSIVE_AFTER_MILLISECONDS,
.pingAfterMilliseconds = PING_AFTER_MILLISECONDS,
.timeoutMilliseconds = TIMEOUT_MILLISECONDS,
.forgetAfterMilliseconds = FORGET_AFTER_MILLISECONDS,
.pingInterval = (switchPinger)
? Timeout_setInterval(pingCallback,
out,
PING_INTERVAL_MILLISECONDS,
eventBase,
allocator)
: NULL
}), sizeof(struct Context));
Identity_set(out);
// Add the beaconing password.
Random_bytes(rand, out->beaconPassword, Headers_Beacon_PASSWORD_LEN);
String strPass = { .bytes=(char*)out->beaconPassword, .len=Headers_Beacon_PASSWORD_LEN };
int ret = CryptoAuth_addUser(&strPass, 1, String_CONST("Local Peers"), ca);
if (ret) {
Log_warn(logger, "CryptoAuth_addUser() returned [%d]", ret);
}
return &out->pub;
}