123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 |
- /* vim: set expandtab ts=4 sw=4: */
- /*
- * You may redistribute this program and/or modify it under the terms of
- * the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
- #include "benc/serialization/standard/StandardBencSerializer.h"
- #include "benc/serialization/json/JsonBencSerializer.h"
- #include "benc/serialization/BencSerializer.h"
- #include "io/ArrayReader.h"
- #include "admin/angel/Core.h"
- #include "admin/AdminClient.h"
- #include "memory/MallocAllocator.h"
- #include "memory/Allocator.h"
- #include "util/log/FileWriterLog.h"
- #include "wire/Message.h"
- #include "interface/Interface.h"
- #include "util/events/EventBase.h"
- #include "crypto/random/Random.h"
- #include "crypto/random/libuv/LibuvEntropyProvider.h"
- #include "exception/Except.h"
- #include "util/events/Timeout.h"
- #include "crypto/Key.h"
- #include "util/log/Log_impl.h"
- #include "io/FileReader.h"
- #include "io/ArrayWriter.h"
- #include "util/Hex.h"
- #include "crypto_scalarmult_curve25519.h"
- #include <unistd.h> // isatty()
- struct NodeContext {
- struct Interface angelIface;
- struct Sockaddr* boundAddr;
- struct Allocator* alloc;
- struct EventBase* base;
- uint8_t privateKeyHex[64];
- String* publicKey;
- struct AdminClient* adminClient;
- char* nodeName;
- /** Admin socket to bind */
- String* bind;
- /** Admin password */
- String* pass;
- /** UDPInterface */
- int ifNum;
- struct Sockaddr* udpAddr;
- struct Log nodeLog;
- struct Log* parentLogger;
- Identity
- };
- static uint8_t messageToAngel(struct Message* msg, struct Interface* iface)
- {
- struct NodeContext* ctx = Identity_check((struct NodeContext*) iface);
- if (ctx->boundAddr) { return 0; }
- struct Allocator* alloc = Allocator_child(ctx->alloc);
- struct Reader* reader = ArrayReader_new(msg->bytes, msg->length, alloc);
- Dict* config = Dict_new(alloc);
- Assert_always(!StandardBencSerializer_get()->parseDictionary(reader, alloc, config));
- Dict* admin = Dict_getDict(config, String_CONST("admin"));
- String* bind = Dict_getString(admin, String_CONST("bind"));
- struct Sockaddr_storage ss;
- Assert_always(!Sockaddr_parse(bind->bytes, &ss));
- ctx->boundAddr = Sockaddr_clone(&ss.addr, ctx->alloc);
- Allocator_free(alloc);
- EventBase_endLoop(ctx->base);
- return 0;
- }
- static void sendFirstMessageToCore(void* vcontext)
- {
- struct NodeContext* ctx = Identity_check((struct NodeContext*) vcontext);
- struct Allocator* alloc = Allocator_child(ctx->alloc);
- struct Message* msg = Message_new(0, 512, alloc);
- Dict* d = Dict_new(alloc);
- Dict_putString(d, String_CONST("privateKey"), String_new(ctx->privateKeyHex, alloc), alloc);
- Dict* logging = Dict_new(alloc);
- {
- Dict_putString(logging, String_CONST("logTo"), String_CONST("stdout"), alloc);
- }
- Dict_putDict(d, String_CONST("logging"), logging, alloc);
- Dict* admin = Dict_new(alloc);
- {
- Dict_putString(admin, String_CONST("bind"), ctx->bind, alloc);
- Dict_putString(admin, String_CONST("pass"), ctx->pass, alloc);
- }
- Dict_putDict(d, String_CONST("admin"), admin, alloc);
- uint8_t buff[512];
- struct Writer* writer = ArrayWriter_new(buff, 512, alloc);
- Assert_always(!StandardBencSerializer_get()->serializeDictionary(writer, d));
- Message_push(msg, buff, Writer_bytesWritten(writer), NULL);
- Interface_receiveMessage(&ctx->angelIface, msg);
- Allocator_free(alloc);
- }
- struct RPCCall;
- typedef void (* RPCCallback)(struct RPCCall* call, struct AdminClient_Result* res);
- struct RPCCall
- {
- String* func;
- Dict* args;
- struct NodeContext* node;
- RPCCallback callback;
- };
- struct Context
- {
- struct RPCCall* rpcCalls;
- int rpcCallCount;
- int nextCall;
- struct Allocator* rpcAlloc;
- struct Random* rand;
- struct EventBase* base;
- struct Log* logger;
- struct Allocator* alloc;
- struct NodeContext** nodes;
- Dict* confNodes;
- String** names;
- Identity
- };
- static String* getPublicKey(char* privateKeyHex, struct Allocator* alloc)
- {
- uint8_t privateKey[32];
- uint8_t publicKey[32];
- Hex_decode(privateKey, 32, privateKeyHex, 65);
- crypto_scalarmult_curve25519_base(publicKey, privateKey);
- return Key_stringify(publicKey, alloc);
- }
- static void printLog(struct Log* log,
- enum Log_Level logLevel,
- const char* file,
- int line,
- const char* format,
- va_list args)
- {
- struct NodeContext* ctx = Identity_check(
- (struct NodeContext*) (((char*)log) - offsetof(struct NodeContext, nodeLog))
- );
- struct Allocator* alloc = Allocator_child(ctx->alloc);
- String* str = String_printf(alloc, "[%s] %s", ctx->nodeName, file);
- ctx->parentLogger->print(ctx->parentLogger, logLevel, str->bytes, line, format, args);
- Allocator_free(alloc);
- }
- static struct RPCCall* pushCall(struct Context* ctx)
- {
- ctx->rpcCalls = Allocator_realloc(ctx->rpcAlloc,
- ctx->rpcCalls,
- sizeof(struct RPCCall) * (ctx->rpcCallCount+1));
- Bits_memset(&ctx->rpcCalls[ctx->rpcCallCount], 0, sizeof(struct RPCCall));
- return &ctx->rpcCalls[ctx->rpcCallCount++];
- }
- static void bindUDPCallback(struct RPCCall* call, struct AdminClient_Result* res)
- {
- Assert_always(!res->err);
- Log_debug(&call->node->nodeLog, "UDPInterface_new() -> [%s]", res->messageBytes);
- String* addr = Dict_getString(res->responseDict, String_CONST("bindAddress"));
- int64_t* ifNum = Dict_getInt(res->responseDict, String_CONST("interfaceNumber"));
- struct Sockaddr_storage ss;
- Assert_always(!Sockaddr_parse(addr->bytes, &ss));
- call->node->ifNum = *ifNum;
- call->node->udpAddr = Sockaddr_clone(&ss.addr, call->node->alloc);
- }
- static void bindUDP(struct Context* ctx, struct NodeContext* node)
- {
- struct RPCCall* call = pushCall(ctx);
- call->func = String_new("UDPInterface_new", ctx->rpcAlloc);
- call->args = Dict_new(ctx->rpcAlloc);
- call->node = node;
- call->callback = bindUDPCallback;
- }
- static struct NodeContext* startNode(char* nodeName,
- char* privateKeyHex,
- Dict* admin,
- struct Context* ctx,
- struct Except* eh)
- {
- struct NodeContext* node = Allocator_clone(ctx->alloc, (&(struct NodeContext) {
- .angelIface = {
- .sendMessage = messageToAngel
- },
- .alloc = ctx->alloc,
- .base = ctx->base,
- .nodeLog = {
- .print = printLog
- },
- .parentLogger = ctx->logger,
- .nodeName = nodeName
- }));
- Identity_set(node);
- node->bind = Dict_getString(admin, String_CONST("bind"));
- if (!node->bind) {
- node->bind = String_new("127.0.0.1:0", ctx->alloc);
- }
- node->pass = Dict_getString(admin, String_CONST("password"));
- if (!node->pass) {
- node->pass = String_new("x", ctx->alloc);
- }
- Bits_memcpyConst(node->privateKeyHex, privateKeyHex, 64);
- Timeout_setTimeout(sendFirstMessageToCore, node, 0, ctx->base, node->alloc);
- Core_init(node->alloc, &node->nodeLog, ctx->base, &node->angelIface, ctx->rand, eh);
- // sendFirstMessageToCore causes the core to react causing messageToAngel which ends the loop
- EventBase_beginLoop(ctx->base);
- node->adminClient = AdminClient_new(node->boundAddr,
- node->pass,
- ctx->base,
- &node->nodeLog,
- node->alloc);
- node->adminClient->millisecondsToWait = 120000;
- bindUDP(ctx, node);
- node->publicKey = getPublicKey(privateKeyHex, node->alloc);
- return node;
- }
- static void beginConnectionCallback(struct RPCCall* call, struct AdminClient_Result* res)
- {
- Assert_always(!res->err);
- Log_debug(&call->node->nodeLog, "UDPInterface_beginConnection() -> [%s]", res->messageBytes);
- }
- static void linkNodes(struct Context* ctx, struct NodeContext* client, struct NodeContext* server)
- {
- Dict* addPasswordArgs = Dict_new(ctx->rpcAlloc);
- String* clientStr = String_printf(ctx->rpcAlloc, "%ld", (long) (uintptr_t) client);
- Dict_putString(addPasswordArgs,
- String_new("password", ctx->rpcAlloc),
- clientStr,
- ctx->rpcAlloc);
- Dict_putString(addPasswordArgs,
- String_new("user", ctx->rpcAlloc),
- clientStr,
- ctx->rpcAlloc);
- struct RPCCall* addPasswordCall = pushCall(ctx);
- addPasswordCall->func = String_new("AuthorizedPasswords_add", ctx->rpcAlloc);
- addPasswordCall->args = addPasswordArgs;
- addPasswordCall->node = server;
- // client
- Dict* beginConnectionArgs = Dict_new(ctx->rpcAlloc);
- Dict_putInt(beginConnectionArgs,
- String_new("interfaceNumber", ctx->rpcAlloc),
- client->ifNum,
- ctx->rpcAlloc);
- Dict_putString(beginConnectionArgs,
- String_new("password", ctx->rpcAlloc),
- clientStr,
- ctx->rpcAlloc);
- Dict_putString(beginConnectionArgs,
- String_new("publicKey", ctx->rpcAlloc),
- server->publicKey,
- ctx->rpcAlloc);
- char* udpAddr = Sockaddr_print(server->udpAddr, ctx->rpcAlloc);
- Dict_putString(beginConnectionArgs,
- String_new("address", ctx->rpcAlloc),
- String_new(udpAddr, ctx->rpcAlloc),
- ctx->rpcAlloc);
- Log_info(ctx->logger, "Linking [%s] with [%s/%s]",
- client->nodeName, server->nodeName, udpAddr);
- struct RPCCall* connectCall = pushCall(ctx);
- connectCall->func = String_new("UDPInterface_beginConnection", ctx->rpcAlloc);
- connectCall->args = beginConnectionArgs;
- connectCall->node = client;
- connectCall->callback = beginConnectionCallback;
- }
- static void linkAllNodes(struct Context* ctx)
- {
- int i = 0;
- String* key = NULL;
- Dict_forEach(ctx->confNodes, key) {
- Dict* val = Dict_getDict(ctx->confNodes, key);
- List* connectTo = Dict_getList(val, String_CONST("peers"));
- for (int j = 0; j < List_size(connectTo); j++) {
- String* server = List_getString(connectTo, j);
- Assert_always(server);
- for (int k = 0; k < Dict_size(ctx->confNodes); k++) {
- if (String_equals(server, ctx->names[k])) {
- linkNodes(ctx, ctx->nodes[i], ctx->nodes[k]);
- break;
- }
- }
- }
- i++;
- }
- ctx->confNodes = NULL;
- }
- static void startRpc(void* vcontext);
- static void rpcCallback(struct AdminClient_Promise* promise, struct AdminClient_Result* res)
- {
- struct Context* ctx = promise->userData;
- Identity_check(ctx);
- struct RPCCall* thisCall = &ctx->rpcCalls[ctx->nextCall];
- if (thisCall->callback) {
- thisCall->callback(thisCall, res);
- }
- ctx->nextCall++;
- startRpc(ctx);
- }
- static void startRpc(void* vcontext)
- {
- struct Context* ctx = vcontext;
- Identity_check(ctx);
- if (ctx->nextCall >= ctx->rpcCallCount) {
- if (ctx->confNodes) {
- linkAllNodes(ctx);
- }
- }
- if (ctx->nextCall >= ctx->rpcCallCount) {
- Log_info(ctx->logger, "\n\nCompleted setting up simulation\n\n");
- Allocator_free(ctx->rpcAlloc);
- ctx->rpcAlloc = NULL;
- ctx->rpcCalls = NULL;
- ctx->rpcCallCount = 0;
- return;
- }
- struct RPCCall* nextCall = &ctx->rpcCalls[ctx->nextCall];
- struct AdminClient_Promise* promise = AdminClient_rpcCall(nextCall->func,
- nextCall->args,
- nextCall->node->adminClient,
- ctx->rpcAlloc);
- promise->callback = rpcCallback;
- promise->userData = ctx;
- }
- static void letErRip(Dict* config, struct Allocator* alloc)
- {
- struct Except* eh = NULL;
- struct Log* logger = FileWriterLog_new(stdout, alloc);
- struct EventBase* base = EventBase_new(alloc);
- struct Random* rand = LibuvEntropyProvider_newDefaultRandom(base, logger, eh, alloc);
- Allocator_setCanary(alloc, (unsigned long)Random_uint64(rand));
- struct Context sctx = {
- .rpcAlloc = Allocator_child(alloc),
- .logger = logger,
- .base = base,
- .rand = rand,
- .alloc = alloc,
- };
- struct Context* ctx = &sctx;
- Identity_set(ctx);
- ctx->confNodes = Dict_getDict(config, String_CONST("nodes"));
- ctx->nodes = Allocator_calloc(alloc, sizeof(char*), Dict_size(ctx->confNodes));
- ctx->names = Allocator_calloc(alloc, sizeof(String*), Dict_size(ctx->confNodes));
- String* key = NULL;
- int i = 0;
- Dict_forEach(ctx->confNodes, key) {
- Dict* val = Dict_getDict(ctx->confNodes, key);
- String* privateKeyHex = Dict_getString(val, String_CONST("privateKey"));
- Dict* admin = Dict_getDict(val, String_CONST("admin"));
- ctx->names[i] = key;
- ctx->nodes[i] = startNode(key->bytes, privateKeyHex->bytes, admin, ctx, eh);
- i++;
- }
- // begin the chain of RPC calls which sets up the net
- Timeout_setTimeout(startRpc, ctx, 0, base, ctx->rpcAlloc);
- EventBase_beginLoop(base);
- Allocator_free(alloc);
- }
- static int usage(char* appName)
- {
- printf("Example usage: %s < config.json\n"
- "Example config:\n"
- "{\n"
- " \"nodes\": {\n"
- " \"alice\": {\n"
- " \"privateKey\": "
- "\"5e2295679394e5e1db67c238abbc10292ad9b127904394c52cc5fff39383e920\",\n"
- " \"peers\": []\n"
- " },\n"
- " \"bob\": {\n"
- " \"privateKey\": "
- "\"6569bf3f0d168faa6dfb2912f8ee5ee9b938319e97618fdf06caed73b1aad1cc\",\n"
- " \"peers\": [\n"
- " \"alice\"\n"
- " ]\n"
- " }\n"
- " }\n"
- "}\n", appName);
- return 0;
- }
- int main(int argc, char** argv)
- {
- Assert_always(argc > 0);
- if (isatty(STDIN_FILENO)) {
- return usage(argv[0]);
- }
- struct Allocator* alloc = MallocAllocator_new(1<<30);
- struct Reader* stdinReader = FileReader_new(stdin, alloc);
- Dict config;
- if (JsonBencSerializer_get()->parseDictionary(stdinReader, alloc, &config)) {
- fprintf(stderr, "Failed to parse configuration.\n");
- return -1;
- }
- letErRip(&config, alloc);
- }
|