/* vim: set expandtab ts=4 sw=4: */
/*
* You may redistribute this program and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include "benc/serialization/standard/StandardBencSerializer.h"
#include "benc/serialization/json/JsonBencSerializer.h"
#include "benc/serialization/BencSerializer.h"
#include "io/ArrayReader.h"
#include "admin/angel/Core.h"
#include "admin/AdminClient.h"
#include "memory/MallocAllocator.h"
#include "memory/Allocator.h"
#include "util/log/FileWriterLog.h"
#include "wire/Message.h"
#include "interface/Interface.h"
#include "util/events/EventBase.h"
#include "crypto/random/Random.h"
#include "crypto/random/libuv/LibuvEntropyProvider.h"
#include "exception/Except.h"
#include "util/events/Timeout.h"
#include "crypto/Key.h"
#include "util/log/Log_impl.h"
#include "io/FileReader.h"
#include "io/ArrayWriter.h"
#include "util/Hex.h"
#include "crypto_scalarmult_curve25519.h"
#include // isatty()
struct NodeContext {
struct Interface angelIface;
struct Sockaddr* boundAddr;
struct Allocator* alloc;
struct EventBase* base;
uint8_t privateKeyHex[64];
String* publicKey;
struct AdminClient* adminClient;
char* nodeName;
/** Admin socket to bind */
String* bind;
/** Admin password */
String* pass;
/** UDPInterface */
int ifNum;
struct Sockaddr* udpAddr;
struct Log nodeLog;
struct Log* parentLogger;
Identity
};
static uint8_t messageToAngel(struct Message* msg, struct Interface* iface)
{
struct NodeContext* ctx = Identity_check((struct NodeContext*) iface);
if (ctx->boundAddr) { return 0; }
struct Allocator* alloc = Allocator_child(ctx->alloc);
struct Reader* reader = ArrayReader_new(msg->bytes, msg->length, alloc);
Dict* config = Dict_new(alloc);
Assert_always(!StandardBencSerializer_get()->parseDictionary(reader, alloc, config));
Dict* admin = Dict_getDict(config, String_CONST("admin"));
String* bind = Dict_getString(admin, String_CONST("bind"));
struct Sockaddr_storage ss;
Assert_always(!Sockaddr_parse(bind->bytes, &ss));
ctx->boundAddr = Sockaddr_clone(&ss.addr, ctx->alloc);
Allocator_free(alloc);
EventBase_endLoop(ctx->base);
return 0;
}
static void sendFirstMessageToCore(void* vcontext)
{
struct NodeContext* ctx = Identity_check((struct NodeContext*) vcontext);
struct Allocator* alloc = Allocator_child(ctx->alloc);
struct Message* msg = Message_new(0, 512, alloc);
Dict* d = Dict_new(alloc);
Dict_putString(d, String_CONST("privateKey"), String_new(ctx->privateKeyHex, alloc), alloc);
Dict* logging = Dict_new(alloc);
{
Dict_putString(logging, String_CONST("logTo"), String_CONST("stdout"), alloc);
}
Dict_putDict(d, String_CONST("logging"), logging, alloc);
Dict* admin = Dict_new(alloc);
{
Dict_putString(admin, String_CONST("bind"), ctx->bind, alloc);
Dict_putString(admin, String_CONST("pass"), ctx->pass, alloc);
}
Dict_putDict(d, String_CONST("admin"), admin, alloc);
uint8_t buff[512];
struct Writer* writer = ArrayWriter_new(buff, 512, alloc);
Assert_always(!StandardBencSerializer_get()->serializeDictionary(writer, d));
Message_push(msg, buff, Writer_bytesWritten(writer), NULL);
Interface_receiveMessage(&ctx->angelIface, msg);
Allocator_free(alloc);
}
struct RPCCall;
typedef void (* RPCCallback)(struct RPCCall* call, struct AdminClient_Result* res);
struct RPCCall
{
String* func;
Dict* args;
struct NodeContext* node;
RPCCallback callback;
};
struct Context
{
struct RPCCall* rpcCalls;
int rpcCallCount;
int nextCall;
struct Allocator* rpcAlloc;
struct Random* rand;
struct EventBase* base;
struct Log* logger;
struct Allocator* alloc;
struct NodeContext** nodes;
Dict* confNodes;
String** names;
Identity
};
static String* getPublicKey(char* privateKeyHex, struct Allocator* alloc)
{
uint8_t privateKey[32];
uint8_t publicKey[32];
Hex_decode(privateKey, 32, privateKeyHex, 65);
crypto_scalarmult_curve25519_base(publicKey, privateKey);
return Key_stringify(publicKey, alloc);
}
static void printLog(struct Log* log,
enum Log_Level logLevel,
const char* file,
int line,
const char* format,
va_list args)
{
struct NodeContext* ctx = Identity_check(
(struct NodeContext*) (((char*)log) - offsetof(struct NodeContext, nodeLog))
);
struct Allocator* alloc = Allocator_child(ctx->alloc);
String* str = String_printf(alloc, "[%s] %s", ctx->nodeName, file);
ctx->parentLogger->print(ctx->parentLogger, logLevel, str->bytes, line, format, args);
Allocator_free(alloc);
}
static struct RPCCall* pushCall(struct Context* ctx)
{
ctx->rpcCalls = Allocator_realloc(ctx->rpcAlloc,
ctx->rpcCalls,
sizeof(struct RPCCall) * (ctx->rpcCallCount+1));
Bits_memset(&ctx->rpcCalls[ctx->rpcCallCount], 0, sizeof(struct RPCCall));
return &ctx->rpcCalls[ctx->rpcCallCount++];
}
static void bindUDPCallback(struct RPCCall* call, struct AdminClient_Result* res)
{
Assert_always(!res->err);
Log_debug(&call->node->nodeLog, "UDPInterface_new() -> [%s]", res->messageBytes);
String* addr = Dict_getString(res->responseDict, String_CONST("bindAddress"));
int64_t* ifNum = Dict_getInt(res->responseDict, String_CONST("interfaceNumber"));
struct Sockaddr_storage ss;
Assert_always(!Sockaddr_parse(addr->bytes, &ss));
call->node->ifNum = *ifNum;
call->node->udpAddr = Sockaddr_clone(&ss.addr, call->node->alloc);
}
static void bindUDP(struct Context* ctx, struct NodeContext* node)
{
struct RPCCall* call = pushCall(ctx);
call->func = String_new("UDPInterface_new", ctx->rpcAlloc);
call->args = Dict_new(ctx->rpcAlloc);
call->node = node;
call->callback = bindUDPCallback;
}
static struct NodeContext* startNode(char* nodeName,
char* privateKeyHex,
Dict* admin,
struct Context* ctx,
struct Except* eh)
{
struct NodeContext* node = Allocator_clone(ctx->alloc, (&(struct NodeContext) {
.angelIface = {
.sendMessage = messageToAngel
},
.alloc = ctx->alloc,
.base = ctx->base,
.nodeLog = {
.print = printLog
},
.parentLogger = ctx->logger,
.nodeName = nodeName
}));
Identity_set(node);
node->bind = Dict_getString(admin, String_CONST("bind"));
if (!node->bind) {
node->bind = String_new("127.0.0.1:0", ctx->alloc);
}
node->pass = Dict_getString(admin, String_CONST("password"));
if (!node->pass) {
node->pass = String_new("x", ctx->alloc);
}
Bits_memcpyConst(node->privateKeyHex, privateKeyHex, 64);
Timeout_setTimeout(sendFirstMessageToCore, node, 0, ctx->base, node->alloc);
Core_init(node->alloc, &node->nodeLog, ctx->base, &node->angelIface, ctx->rand, eh);
// sendFirstMessageToCore causes the core to react causing messageToAngel which ends the loop
EventBase_beginLoop(ctx->base);
node->adminClient = AdminClient_new(node->boundAddr,
node->pass,
ctx->base,
&node->nodeLog,
node->alloc);
node->adminClient->millisecondsToWait = 120000;
bindUDP(ctx, node);
node->publicKey = getPublicKey(privateKeyHex, node->alloc);
return node;
}
static void beginConnectionCallback(struct RPCCall* call, struct AdminClient_Result* res)
{
Assert_always(!res->err);
Log_debug(&call->node->nodeLog, "UDPInterface_beginConnection() -> [%s]", res->messageBytes);
}
static void linkNodes(struct Context* ctx, struct NodeContext* client, struct NodeContext* server)
{
Dict* addPasswordArgs = Dict_new(ctx->rpcAlloc);
String* clientStr = String_printf(ctx->rpcAlloc, "%ld", (long) (uintptr_t) client);
Dict_putString(addPasswordArgs,
String_new("password", ctx->rpcAlloc),
clientStr,
ctx->rpcAlloc);
Dict_putString(addPasswordArgs,
String_new("user", ctx->rpcAlloc),
clientStr,
ctx->rpcAlloc);
struct RPCCall* addPasswordCall = pushCall(ctx);
addPasswordCall->func = String_new("AuthorizedPasswords_add", ctx->rpcAlloc);
addPasswordCall->args = addPasswordArgs;
addPasswordCall->node = server;
// client
Dict* beginConnectionArgs = Dict_new(ctx->rpcAlloc);
Dict_putInt(beginConnectionArgs,
String_new("interfaceNumber", ctx->rpcAlloc),
client->ifNum,
ctx->rpcAlloc);
Dict_putString(beginConnectionArgs,
String_new("password", ctx->rpcAlloc),
clientStr,
ctx->rpcAlloc);
Dict_putString(beginConnectionArgs,
String_new("publicKey", ctx->rpcAlloc),
server->publicKey,
ctx->rpcAlloc);
char* udpAddr = Sockaddr_print(server->udpAddr, ctx->rpcAlloc);
Dict_putString(beginConnectionArgs,
String_new("address", ctx->rpcAlloc),
String_new(udpAddr, ctx->rpcAlloc),
ctx->rpcAlloc);
Log_info(ctx->logger, "Linking [%s] with [%s/%s]",
client->nodeName, server->nodeName, udpAddr);
struct RPCCall* connectCall = pushCall(ctx);
connectCall->func = String_new("UDPInterface_beginConnection", ctx->rpcAlloc);
connectCall->args = beginConnectionArgs;
connectCall->node = client;
connectCall->callback = beginConnectionCallback;
}
static void linkAllNodes(struct Context* ctx)
{
int i = 0;
String* key = NULL;
Dict_forEach(ctx->confNodes, key) {
Dict* val = Dict_getDict(ctx->confNodes, key);
List* connectTo = Dict_getList(val, String_CONST("peers"));
for (int j = 0; j < List_size(connectTo); j++) {
String* server = List_getString(connectTo, j);
Assert_always(server);
for (int k = 0; k < Dict_size(ctx->confNodes); k++) {
if (String_equals(server, ctx->names[k])) {
linkNodes(ctx, ctx->nodes[i], ctx->nodes[k]);
break;
}
}
}
i++;
}
ctx->confNodes = NULL;
}
static void startRpc(void* vcontext);
static void rpcCallback(struct AdminClient_Promise* promise, struct AdminClient_Result* res)
{
struct Context* ctx = promise->userData;
Identity_check(ctx);
struct RPCCall* thisCall = &ctx->rpcCalls[ctx->nextCall];
if (thisCall->callback) {
thisCall->callback(thisCall, res);
}
ctx->nextCall++;
startRpc(ctx);
}
static void startRpc(void* vcontext)
{
struct Context* ctx = vcontext;
Identity_check(ctx);
if (ctx->nextCall >= ctx->rpcCallCount) {
if (ctx->confNodes) {
linkAllNodes(ctx);
}
}
if (ctx->nextCall >= ctx->rpcCallCount) {
Log_info(ctx->logger, "\n\nCompleted setting up simulation\n\n");
Allocator_free(ctx->rpcAlloc);
ctx->rpcAlloc = NULL;
ctx->rpcCalls = NULL;
ctx->rpcCallCount = 0;
return;
}
struct RPCCall* nextCall = &ctx->rpcCalls[ctx->nextCall];
struct AdminClient_Promise* promise = AdminClient_rpcCall(nextCall->func,
nextCall->args,
nextCall->node->adminClient,
ctx->rpcAlloc);
promise->callback = rpcCallback;
promise->userData = ctx;
}
static void letErRip(Dict* config, struct Allocator* alloc)
{
struct Except* eh = NULL;
struct Log* logger = FileWriterLog_new(stdout, alloc);
struct EventBase* base = EventBase_new(alloc);
struct Random* rand = LibuvEntropyProvider_newDefaultRandom(base, logger, eh, alloc);
Allocator_setCanary(alloc, (unsigned long)Random_uint64(rand));
struct Context sctx = {
.rpcAlloc = Allocator_child(alloc),
.logger = logger,
.base = base,
.rand = rand,
.alloc = alloc,
};
struct Context* ctx = &sctx;
Identity_set(ctx);
ctx->confNodes = Dict_getDict(config, String_CONST("nodes"));
ctx->nodes = Allocator_calloc(alloc, sizeof(char*), Dict_size(ctx->confNodes));
ctx->names = Allocator_calloc(alloc, sizeof(String*), Dict_size(ctx->confNodes));
String* key = NULL;
int i = 0;
Dict_forEach(ctx->confNodes, key) {
Dict* val = Dict_getDict(ctx->confNodes, key);
String* privateKeyHex = Dict_getString(val, String_CONST("privateKey"));
Dict* admin = Dict_getDict(val, String_CONST("admin"));
ctx->names[i] = key;
ctx->nodes[i] = startNode(key->bytes, privateKeyHex->bytes, admin, ctx, eh);
i++;
}
// begin the chain of RPC calls which sets up the net
Timeout_setTimeout(startRpc, ctx, 0, base, ctx->rpcAlloc);
EventBase_beginLoop(base);
Allocator_free(alloc);
}
static int usage(char* appName)
{
printf("Example usage: %s < config.json\n"
"Example config:\n"
"{\n"
" \"nodes\": {\n"
" \"alice\": {\n"
" \"privateKey\": "
"\"5e2295679394e5e1db67c238abbc10292ad9b127904394c52cc5fff39383e920\",\n"
" \"peers\": []\n"
" },\n"
" \"bob\": {\n"
" \"privateKey\": "
"\"6569bf3f0d168faa6dfb2912f8ee5ee9b938319e97618fdf06caed73b1aad1cc\",\n"
" \"peers\": [\n"
" \"alice\"\n"
" ]\n"
" }\n"
" }\n"
"}\n", appName);
return 0;
}
int main(int argc, char** argv)
{
Assert_always(argc > 0);
if (isatty(STDIN_FILENO)) {
return usage(argv[0]);
}
struct Allocator* alloc = MallocAllocator_new(1<<30);
struct Reader* stdinReader = FileReader_new(stdin, alloc);
Dict config;
if (JsonBencSerializer_get()->parseDictionary(stdinReader, alloc, &config)) {
fprintf(stderr, "Failed to parse configuration.\n");
return -1;
}
letErRip(&config, alloc);
}