/* vim: set expandtab ts=4 sw=4: */
/*
* You may redistribute this program and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include "memory/Allocator.h"
#include "wire/PFChan.h"
#include "net/EventEmitter.h"
#include "util/Identity.h"
#include "util/log/Log.h"
#include "wire/Error.h"
#include
#define ArrayList_TYPE struct Iface
#define ArrayList_NAME Ifaces
#include "util/ArrayList.h"
#define PING_MAGIC 0x01234567
struct Pathfinder
{
struct EventEmitter_pvt* ee;
struct Iface iface;
struct Allocator* alloc;
uint8_t userAgent[64];
uint32_t superiority;
uint32_t version;
#define Pathfinder_state_DISCONNECTED 0
#define Pathfinder_state_CONNECTED 1
#define Pathfinder_state_ERROR 2
uint16_t state;
uint16_t pathfinderId;
/**
* Number of bytes sent since the last ping, each ping contains this number at the time
* of pinging, then the number is incremented as more messages are sent, if it goes over
* 65535, the state is set to error.
*/
uint32_t bytesSinceLastPing;
Identity
};
#define ArrayList_TYPE struct Pathfinder
#define ArrayList_NAME Pathfinders
#include "util/ArrayList.h"
struct EventEmitter_pvt
{
struct EventEmitter pub;
struct Iface trickIf;
struct Allocator* alloc;
struct Log* log;
struct ArrayList_Ifaces* listTable[PFChan_Pathfinder__TOO_HIGH - PFChan_Pathfinder__TOO_LOW];
struct ArrayList_Pathfinders* pathfinders;
uint8_t publicKey[32];
Identity
};
#define OFFSET(ev) (ev - PFChan_Pathfinder__TOO_LOW - 1)
static struct ArrayList_Ifaces* getHandlers(struct EventEmitter_pvt* ee,
enum PFChan_Pathfinder ev,
bool create)
{
if (ev <= PFChan_Pathfinder__TOO_LOW || ev >= PFChan_Pathfinder__TOO_HIGH) { return NULL; }
struct ArrayList_Ifaces* out = ee->listTable[OFFSET(ev)];
if (!out) {
out = ee->listTable[OFFSET(ev)] = ArrayList_Ifaces_new(ee->alloc);
}
return out;
}
static Iface_DEFUN sendToPathfinder(Message_t* msg, struct Pathfinder* pf)
{
if (!pf || pf->state != Pathfinder_state_CONNECTED) { return NULL; }
if (pf->bytesSinceLastPing < 8192 && pf->bytesSinceLastPing + Message_getLength(msg) >= 8192) {
Message_t* ping = Message_new(0, 512, Message_getAlloc(msg));
Er_assert(Message_epush32be(ping, pf->bytesSinceLastPing));
Er_assert(Message_epush32be(ping, PING_MAGIC));
Er_assert(Message_epush32be(ping, PFChan_Core_PING));
Iface_send(&pf->iface, ping);
}
pf->bytesSinceLastPing += Message_getLength(msg);
return Iface_next(&pf->iface, msg);
}
static bool PFChan_Pathfinder_sizeOk(enum PFChan_Pathfinder ev, int size)
{
switch (ev) {
case PFChan_Pathfinder_CONNECT:
return (size == 8 + PFChan_Pathfinder_Connect_SIZE);
case PFChan_Pathfinder_SUPERIORITY:
return (size == 8 + PFChan_Pathfinder_Superiority_SIZE);
case PFChan_Pathfinder_NODE:
case PFChan_Pathfinder_SNODE:
return (size == 8 + PFChan_Node_SIZE);
case PFChan_Pathfinder_SENDMSG:
return (size >= 8 + PFChan_Msg_MIN_SIZE);
case PFChan_Pathfinder_PING:
case PFChan_Pathfinder_PONG:
return (size >= 8 + PFChan_Ping_SIZE);
case PFChan_Pathfinder_SESSIONS:
case PFChan_Pathfinder_PEERS:
case PFChan_Pathfinder_PATHFINDERS:
return (size == 8);
case PFChan_Pathfinder_CTRL_SENDMSG:
return (size >= 8 + PFChan_CtrlMsg_MIN_SIZE);
default:;
}
Assert_failure("invalid event [%d]", ev);
}
// Forget to add the event here? :)
Assert_compileTime(PFChan_Pathfinder__TOO_LOW == 511);
Assert_compileTime(PFChan_Pathfinder__TOO_HIGH == 523);
static bool PFChan_Core_sizeOk(enum PFChan_Core ev, int size)
{
switch (ev) {
case PFChan_Core_CONNECT:
return (size == 8 + PFChan_Core_Connect_SIZE);
case PFChan_Core_PATHFINDER:
case PFChan_Core_PATHFINDER_GONE:
return (size == 8 + PFChan_Core_Pathfinder_SIZE);
case PFChan_Core_SWITCH_ERR:
return (size >= 8 + PFChan_Core_SwitchErr_MIN_SIZE);
case PFChan_Core_SEARCH_REQ:
return (size == 8 + PFChan_Core_SearchReq_SIZE);
case PFChan_Core_PEER:
case PFChan_Core_PEER_GONE:
case PFChan_Core_SESSION:
case PFChan_Core_SESSION_ENDED:
case PFChan_Core_DISCOVERED_PATH:
case PFChan_Core_UNSETUP_SESSION:
return (size == 8 + PFChan_Node_SIZE);
case PFChan_Core_MSG:
return (size >= 8 + PFChan_Msg_MIN_SIZE);
case PFChan_Core_PING:
case PFChan_Core_PONG:
return (size == 8 + PFChan_Ping_SIZE);
case PFChan_Core_CTRL_MSG:
return (size >= 8 + PFChan_CtrlMsg_MIN_SIZE);
case PFChan_Core_LINK_STATE:
return (size >= 8 + PFChan_LinkState_Entry_SIZE) &&
!((size - 8) % PFChan_LinkState_Entry_SIZE);
default:;
}
Assert_failure("invalid event [%d]", ev);
}
// Remember to add the event to this function too!
Assert_compileTime(PFChan_Core__TOO_LOW == 1023);
Assert_compileTime(PFChan_Core__TOO_HIGH == 1040);
static Iface_DEFUN incomingFromCore(Message_t* msg, struct Iface* trickIf)
{
struct EventEmitter_pvt* ee = Identity_containerOf(trickIf, struct EventEmitter_pvt, trickIf);
Assert_true(!((uintptr_t)Message_bytes(msg) % 4) && "alignment");
enum PFChan_Core ev = Er_assert(Message_epop32be(msg));
Assert_true(PFChan_Core_sizeOk(ev, Message_getLength(msg)+4));
uint32_t pathfinderNum = Er_assert(Message_epop32be(msg));
Er_assert(Message_epush32be(msg, ev));
if (pathfinderNum != 0xffffffff) {
struct Pathfinder* pf = ArrayList_Pathfinders_get(ee->pathfinders, pathfinderNum);
Assert_true(pf && pf->state == Pathfinder_state_CONNECTED);
return sendToPathfinder(msg, pf);
} else {
for (int i = 0; i < ee->pathfinders->length; i++) {
struct Pathfinder* pf = ArrayList_Pathfinders_get(ee->pathfinders, i);
if (!pf || pf->state != Pathfinder_state_CONNECTED) { continue; }
Message_t* messageClone = Message_clone(msg, Message_getAlloc(msg));
Iface_CALL(sendToPathfinder, messageClone, pf);
}
}
return NULL;
}
static Message_t* pathfinderMsg(enum PFChan_Core ev,
struct Pathfinder* pf,
struct Allocator* alloc)
{
Message_t* msg = Message_new(PFChan_Core_Pathfinder_SIZE, 512, alloc);
struct PFChan_Core_Pathfinder* pathfinder = (struct PFChan_Core_Pathfinder*) Message_bytes(msg);
pathfinder->superiority_be = Endian_hostToBigEndian32(pf->superiority);
pathfinder->pathfinderId_be = Endian_hostToBigEndian32(pf->pathfinderId);
Bits_memcpy(pathfinder->userAgent, pf->userAgent, 64);
Er_assert(Message_epush32be(msg, 0xffffffff));
Er_assert(Message_epush32be(msg, ev));
return msg;
}
static int handleFromPathfinder(enum PFChan_Pathfinder ev,
Message_t* msg,
struct EventEmitter_pvt* ee,
struct Pathfinder* pf)
{
switch (ev) {
default: return false;
case PFChan_Pathfinder_CONNECT: {
struct PFChan_Pathfinder_Connect connect;
Er_assert(Message_eshift(msg, -8));
Er_assert(Message_epop(msg, &connect, PFChan_Pathfinder_Connect_SIZE));
pf->superiority = Endian_bigEndianToHost32(connect.superiority_be);
pf->version = Endian_bigEndianToHost32(connect.version_be);
Bits_memcpy(pf->userAgent, connect.userAgent, 64);
pf->state = Pathfinder_state_CONNECTED;
struct PFChan_Core_Connect resp;
resp.version_be = Endian_bigEndianToHost32(Version_CURRENT_PROTOCOL);
resp.pathfinderId_be = Endian_hostToBigEndian32(pf->pathfinderId);
Bits_memcpy(resp.publicKey, ee->publicKey, 32);
Er_assert(Message_epush(msg, &resp, PFChan_Core_Connect_SIZE));
Er_assert(Message_epush32be(msg, PFChan_Core_CONNECT));
Message_t* sendMsg = Message_clone(msg, Message_getAlloc(msg));
Iface_CALL(sendToPathfinder, sendMsg, pf);
break;
}
case PFChan_Pathfinder_SUPERIORITY: {
Er_assert(Message_eshift(msg, -8));
pf->superiority = Er_assert(Message_epop32be(msg));
Message_t* resp = pathfinderMsg(PFChan_Core_PATHFINDER, pf, Message_getAlloc(msg));
Iface_CALL(incomingFromCore, resp, &ee->trickIf);
break;
}
case PFChan_Pathfinder_PING: {
Message_t* sendMsg = Message_clone(msg, Message_getAlloc(msg));
Iface_send(&pf->iface, sendMsg);
break;
}
case PFChan_Pathfinder_PONG: {
Er_assert(Message_eshift(msg, -8));
uint32_t cookie = Er_assert(Message_epop32be(msg));
uint32_t count = Er_assert(Message_epop32be(msg));
if (cookie != PING_MAGIC || count > pf->bytesSinceLastPing) {
pf->state = Pathfinder_state_ERROR;
Message_t* resp = pathfinderMsg(PFChan_Core_PATHFINDER_GONE, pf, Message_getAlloc(msg));
Iface_CALL(incomingFromCore, resp, &ee->trickIf);
} else {
pf->bytesSinceLastPing -= count;
}
break;
}
case PFChan_Pathfinder_PATHFINDERS: {
for (int i = 0; i < ee->pathfinders->length; i++) {
struct Pathfinder* xpf = ArrayList_Pathfinders_get(ee->pathfinders, i);
if (!xpf || xpf->state != Pathfinder_state_CONNECTED) { continue; }
struct Allocator* alloc = Allocator_child(Message_getAlloc(msg));
Message_t* resp = pathfinderMsg(PFChan_Core_PATHFINDER, pf, alloc);
Iface_CALL(sendToPathfinder, resp, pf);
Allocator_free(alloc);
}
break;
}
}
return true;
}
static Iface_DEFUN incomingFromPathfinder(Message_t* msg, struct Iface* iface)
{
struct Pathfinder* pf = Identity_containerOf(iface, struct Pathfinder, iface);
struct EventEmitter_pvt* ee = Identity_check((struct EventEmitter_pvt*) pf->ee);
if (Message_getLength(msg) < 4) {
Log_debug(ee->log, "DROPPF runt");
return Error(msg, "RUNT");
}
enum PFChan_Pathfinder ev = Er_assert(Message_epop32be(msg));
Er_assert(Message_epush32be(msg, pf->pathfinderId));
Er_assert(Message_epush32be(msg, ev));
if (ev <= PFChan_Pathfinder__TOO_LOW || ev >= PFChan_Pathfinder__TOO_HIGH) {
Log_debug(ee->log, "DROPPF invalid type [%d]", ev);
return Error(msg, "INVALID");
}
if (!PFChan_Pathfinder_sizeOk(ev, Message_getLength(msg))) {
Log_debug(ee->log, "DROPPF incorrect length[%d] for type [%d]", Message_getLength(msg), ev);
return Error(msg, "INVALID");
}
if (pf->state == Pathfinder_state_DISCONNECTED) {
if (ev != PFChan_Pathfinder_CONNECT) {
Log_debug(ee->log, "DROPPF disconnected and event != CONNECT event:[%d]", ev);
return Error(msg, "INVALID");
}
} else if (pf->state != Pathfinder_state_CONNECTED) {
Log_debug(ee->log, "DROPPF error state");
return Error(msg, "INVALID");
}
if (handleFromPathfinder(ev, msg, ee, pf)) { return NULL; }
struct ArrayList_Ifaces* handlers = getHandlers(ee, ev, false);
if (!handlers) { return NULL; }
for (int i = 0; i < handlers->length; i++) {
Message_t* messageClone = Message_clone(msg, Message_getAlloc(msg));
struct Iface* iface = ArrayList_Ifaces_get(handlers, i);
// We have to call this manually because we don't have an interface handy which is
// actually plumbed with this one.
Assert_true(iface);
Assert_true(iface->send);
Iface_CALL(iface->send, messageClone, iface);
}
return NULL;
}
void EventEmitter_regCore(struct EventEmitter* eventEmitter,
struct Iface* iface,
enum PFChan_Pathfinder ev)
{
struct EventEmitter_pvt* ee = Identity_check((struct EventEmitter_pvt*) eventEmitter);
iface->connectedIf = &ee->trickIf;
Iface_setIdentity(iface);
struct ArrayList_Ifaces* l = getHandlers(ee, ev, true);
if (!l) {
Assert_true(ev == 0);
return;
}
ArrayList_Ifaces_add(l, iface);
}
void EventEmitter_regPathfinderIface(struct EventEmitter* emitter, struct Iface* iface)
{
struct EventEmitter_pvt* ee = Identity_check((struct EventEmitter_pvt*) emitter);
struct Allocator* alloc = Allocator_child(ee->alloc);
struct Pathfinder* pf = Allocator_calloc(alloc, sizeof(struct Pathfinder), 1);
pf->ee = ee;
pf->iface.send = incomingFromPathfinder;
pf->alloc = alloc;
Iface_plumb(&pf->iface, iface);
Identity_set(pf);
int i = 0;
for (; i < ee->pathfinders->length; i++) {
struct Pathfinder* xpf = ArrayList_Pathfinders_get(ee->pathfinders, i);
if (!xpf) { break; }
}
pf->pathfinderId = ArrayList_Pathfinders_put(ee->pathfinders, i, pf);
}
struct EventEmitter* EventEmitter_new(struct Allocator* allocator,
struct Log* log,
uint8_t* publicKey)
{
struct Allocator* alloc = Allocator_child(allocator);
struct EventEmitter_pvt* ee = Allocator_calloc(alloc, sizeof(struct EventEmitter_pvt), 1);
ee->log = log;
ee->alloc = alloc;
ee->trickIf.send = incomingFromCore;
Iface_setIdentity(&ee->trickIf);
ee->pathfinders = ArrayList_Pathfinders_new(ee->alloc);
Bits_memcpy(ee->publicKey, publicKey, 32);
Identity_set(ee);
return &ee->pub;
}