123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404 |
- /*
- Minetest
- Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
- Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 2.1 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public License along
- with this program; if not, write to the Free Software Foundation, Inc.,
- 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
- #include "connectionthreads.h"
- #include "log.h"
- #include "profiler.h"
- #include "settings.h"
- #include "network/networkpacket.h"
- #include "util/serialize.h"
- namespace con
- {
- /******************************************************************************/
- /* defines used for debugging and profiling */
- /******************************************************************************/
- #ifdef NDEBUG
- #define LOG(a) a
- #define PROFILE(a)
- #undef DEBUG_CONNECTION_KBPS
- #else
- /* this mutex is used to achieve log message consistency */
- std::mutex log_conthread_mutex;
- #define LOG(a) \
- { \
- MutexAutoLock loglock(log_conthread_mutex); \
- a; \
- }
- #define PROFILE(a) a
- //#define DEBUG_CONNECTION_KBPS
- #undef DEBUG_CONNECTION_KBPS
- #endif
- /* maximum number of retries for reliable packets */
- #define MAX_RELIABLE_RETRY 5
- #define WINDOW_SIZE 5
- static session_t readPeerId(u8 *packetdata)
- {
- return readU16(&packetdata[4]);
- }
- static u8 readChannel(u8 *packetdata)
- {
- return readU8(&packetdata[6]);
- }
- /******************************************************************************/
- /* Connection Threads */
- /******************************************************************************/
- ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
- float timeout) :
- Thread("ConnectionSend"),
- m_max_packet_size(max_packet_size),
- m_timeout(timeout),
- m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
- {
- }
- void *ConnectionSendThread::run()
- {
- assert(m_connection);
- LOG(dout_con << m_connection->getDesc()
- << "ConnectionSend thread started" << std::endl);
- u64 curtime = porting::getTimeMs();
- u64 lasttime = curtime;
- PROFILE(std::stringstream ThreadIdentifier);
- PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
- /* if stop is requested don't stop immediately but try to send all */
- /* packets first */
- while (!stopRequested() || packetsQueued()) {
- BEGIN_DEBUG_EXCEPTION_HANDLER
- PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
- m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
- /* wait for trigger or timeout */
- m_send_sleep_semaphore.wait(50);
- /* remove all triggers */
- while (m_send_sleep_semaphore.wait(0)) {
- }
- lasttime = curtime;
- curtime = porting::getTimeMs();
- float dtime = CALC_DTIME(lasttime, curtime);
- /* first do all the reliable stuff */
- runTimeouts(dtime);
- /* translate commands to packets */
- ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
- while (c.type != CONNCMD_NONE) {
- if (c.reliable)
- processReliableCommand(c);
- else
- processNonReliableCommand(c);
- c = m_connection->m_command_queue.pop_frontNoEx(0);
- }
- /* send non reliable packets */
- sendPackets(dtime);
- END_DEBUG_EXCEPTION_HANDLER
- }
- PROFILE(g_profiler->remove(ThreadIdentifier.str()));
- return NULL;
- }
- void ConnectionSendThread::Trigger()
- {
- m_send_sleep_semaphore.post();
- }
- bool ConnectionSendThread::packetsQueued()
- {
- std::list<session_t> peerIds = m_connection->getPeerIDs();
- if (!m_outgoing_queue.empty() && !peerIds.empty())
- return true;
- for (session_t peerId : peerIds) {
- PeerHelper peer = m_connection->getPeerNoEx(peerId);
- if (!peer)
- continue;
- if (dynamic_cast<UDPPeer *>(&peer) == 0)
- continue;
- for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
- if (!channel.queued_commands.empty()) {
- return true;
- }
- }
- }
- return false;
- }
- void ConnectionSendThread::runTimeouts(float dtime)
- {
- std::list<session_t> timeouted_peers;
- std::list<session_t> peerIds = m_connection->getPeerIDs();
- for (session_t &peerId : peerIds) {
- PeerHelper peer = m_connection->getPeerNoEx(peerId);
- if (!peer)
- continue;
- UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
- if (!udpPeer)
- continue;
- PROFILE(std::stringstream peerIdentifier);
- PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
- << ";" << peerId << ";RELIABLE]");
- PROFILE(ScopeProfiler
- peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
- SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
- /*
- Check peer timeout
- */
- if (peer->isTimedOut(m_timeout)) {
- infostream << m_connection->getDesc()
- << "RunTimeouts(): Peer " << peer->id
- << " has timed out."
- << " (source=peer->timeout_counter)"
- << std::endl;
- // Add peer to the list
- timeouted_peers.push_back(peer->id);
- // Don't bother going through the buffers of this one
- continue;
- }
- float resend_timeout = udpPeer->getResendTimeout();
- bool retry_count_exceeded = false;
- for (Channel &channel : udpPeer->channels) {
- std::list<BufferedPacket> timed_outs;
- if (udpPeer->getLegacyPeer())
- channel.setWindowSize(WINDOW_SIZE);
- // Remove timed out incomplete unreliable split packets
- channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
- // Increment reliable packet times
- channel.outgoing_reliables_sent.incrementTimeouts(dtime);
- unsigned int numpeers = m_connection->m_peers.size();
- if (numpeers == 0)
- return;
- // Re-send timed out outgoing reliables
- timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
- (m_max_data_packets_per_iteration / numpeers));
- channel.UpdatePacketLossCounter(timed_outs.size());
- g_profiler->graphAdd("packets_lost", timed_outs.size());
- m_iteration_packets_avaialble -= timed_outs.size();
- for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
- k != timed_outs.end(); ++k) {
- session_t peer_id = readPeerId(*(k->data));
- u8 channelnum = readChannel(*(k->data));
- u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
- channel.UpdateBytesLost(k->data.getSize());
- k->resend_count++;
- if (k->resend_count > MAX_RELIABLE_RETRY) {
- retry_count_exceeded = true;
- timeouted_peers.push_back(peer->id);
- /* no need to check additional packets if a single one did timeout*/
- break;
- }
- LOG(derr_con << m_connection->getDesc()
- << "RE-SENDING timed-out RELIABLE to "
- << k->address.serializeString()
- << "(t/o=" << resend_timeout << "): "
- << "from_peer_id=" << peer_id
- << ", channel=" << ((int) channelnum & 0xff)
- << ", seqnum=" << seqnum
- << std::endl);
- rawSend(*k);
- // do not handle rtt here as we can't decide if this packet was
- // lost or really takes more time to transmit
- }
- if (retry_count_exceeded) {
- break; /* no need to check other channels if we already did timeout */
- }
- channel.UpdateTimers(dtime, udpPeer->getLegacyPeer());
- }
- /* skip to next peer if we did timeout */
- if (retry_count_exceeded)
- continue;
- /* send ping if necessary */
- if (udpPeer->Ping(dtime, data)) {
- LOG(dout_con << m_connection->getDesc()
- << "Sending ping for peer_id: " << udpPeer->id << std::endl);
- /* this may fail if there ain't a sequence number left */
- if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
- //retrigger with reduced ping interval
- udpPeer->Ping(4.0, data);
- }
- }
- udpPeer->RunCommandQueues(m_max_packet_size,
- m_max_commands_per_iteration,
- m_max_packets_requeued);
- }
- // Remove timed out peers
- for (u16 timeouted_peer : timeouted_peers) {
- LOG(derr_con << m_connection->getDesc()
- << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
- m_connection->deletePeer(timeouted_peer, true);
- }
- }
- void ConnectionSendThread::rawSend(const BufferedPacket &packet)
- {
- try {
- m_connection->m_udpSocket.Send(packet.address, *packet.data,
- packet.data.getSize());
- LOG(dout_con << m_connection->getDesc()
- << " rawSend: " << packet.data.getSize()
- << " bytes sent" << std::endl);
- } catch (SendFailedException &e) {
- LOG(derr_con << m_connection->getDesc()
- << "Connection::rawSend(): SendFailedException: "
- << packet.address.serializeString() << std::endl);
- }
- }
- void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
- {
- try {
- p.absolute_send_time = porting::getTimeMs();
- // Buffer the packet
- channel->outgoing_reliables_sent.insert(p,
- (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
- % (MAX_RELIABLE_WINDOW_SIZE + 1));
- }
- catch (AlreadyExistsException &e) {
- LOG(derr_con << m_connection->getDesc()
- << "WARNING: Going to send a reliable packet"
- << " in outgoing buffer" << std::endl);
- }
- // Send the packet
- rawSend(p);
- }
- bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
- SharedBuffer<u8> data, bool reliable)
- {
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
- if (!peer) {
- LOG(dout_con << m_connection->getDesc()
- << " INFO: dropped packet for non existent peer_id: "
- << peer_id << std::endl);
- FATAL_ERROR_IF(!reliable,
- "Trying to send raw packet reliable but no peer found!");
- return false;
- }
- Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
- if (reliable) {
- bool have_sequence_number_for_raw_packet = true;
- u16 seqnum =
- channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
- if (!have_sequence_number_for_raw_packet)
- return false;
- SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
- Address peer_address;
- peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
- // Add base headers and make a packet
- BufferedPacket p = con::makePacket(peer_address, reliable,
- m_connection->GetProtocolID(), m_connection->GetPeerID(),
- channelnum);
- // first check if our send window is already maxed out
- if (channel->outgoing_reliables_sent.size()
- < channel->getWindowSize()) {
- LOG(dout_con << m_connection->getDesc()
- << " INFO: sending a reliable packet to peer_id " << peer_id
- << " channel: " << channelnum
- << " seqnum: " << seqnum << std::endl);
- sendAsPacketReliable(p, channel);
- return true;
- }
- LOG(dout_con << m_connection->getDesc()
- << " INFO: queueing reliable packet for peer_id: " << peer_id
- << " channel: " << channelnum
- << " seqnum: " << seqnum << std::endl);
- channel->queued_reliables.push(p);
- return false;
- }
- Address peer_address;
- if (peer->getAddress(MTP_UDP, peer_address)) {
- // Add base headers and make a packet
- BufferedPacket p = con::makePacket(peer_address, data,
- m_connection->GetProtocolID(), m_connection->GetPeerID(),
- channelnum);
- // Send the packet
- rawSend(p);
- return true;
- }
- LOG(dout_con << m_connection->getDesc()
- << " INFO: dropped unreliable packet for peer_id: " << peer_id
- << " because of (yet) missing udp address" << std::endl);
- return false;
- }
- void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
- {
- assert(c.reliable); // Pre-condition
- switch (c.type) {
- case CONNCMD_NONE:
- LOG(dout_con << m_connection->getDesc()
- << "UDP processing reliable CONNCMD_NONE" << std::endl);
- return;
- case CONNCMD_SEND:
- LOG(dout_con << m_connection->getDesc()
- << "UDP processing reliable CONNCMD_SEND" << std::endl);
- sendReliable(c);
- return;
- case CONNCMD_SEND_TO_ALL:
- LOG(dout_con << m_connection->getDesc()
- << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
- sendToAllReliable(c);
- return;
- case CONCMD_CREATE_PEER:
- LOG(dout_con << m_connection->getDesc()
- << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
- if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
- /* put to queue if we couldn't send it immediately */
- sendReliable(c);
- }
- return;
- case CONCMD_DISABLE_LEGACY:
- LOG(dout_con << m_connection->getDesc()
- << "UDP processing reliable CONCMD_DISABLE_LEGACY" << std::endl);
- if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
- /* put to queue if we couldn't send it immediately */
- sendReliable(c);
- }
- return;
- case CONNCMD_SERVE:
- case CONNCMD_CONNECT:
- case CONNCMD_DISCONNECT:
- case CONCMD_ACK:
- FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
- default:
- LOG(dout_con << m_connection->getDesc()
- << " Invalid reliable command type: " << c.type << std::endl);
- }
- }
- void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
- {
- assert(!c.reliable); // Pre-condition
- switch (c.type) {
- case CONNCMD_NONE:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_NONE" << std::endl);
- return;
- case CONNCMD_SERVE:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_SERVE port="
- << c.address.serializeString() << std::endl);
- serve(c.address);
- return;
- case CONNCMD_CONNECT:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_CONNECT" << std::endl);
- connect(c.address);
- return;
- case CONNCMD_DISCONNECT:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_DISCONNECT" << std::endl);
- disconnect();
- return;
- case CONNCMD_DISCONNECT_PEER:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
- disconnect_peer(c.peer_id);
- return;
- case CONNCMD_SEND:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_SEND" << std::endl);
- send(c.peer_id, c.channelnum, c.data);
- return;
- case CONNCMD_SEND_TO_ALL:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
- sendToAll(c.channelnum, c.data);
- return;
- case CONCMD_ACK:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONCMD_ACK" << std::endl);
- sendAsPacket(c.peer_id, c.channelnum, c.data, true);
- return;
- case CONCMD_CREATE_PEER:
- FATAL_ERROR("Got command that should be reliable as unreliable command");
- default:
- LOG(dout_con << m_connection->getDesc()
- << " Invalid command type: " << c.type << std::endl);
- }
- }
- void ConnectionSendThread::serve(Address bind_address)
- {
- LOG(dout_con << m_connection->getDesc()
- << "UDP serving at port " << bind_address.serializeString() << std::endl);
- try {
- m_connection->m_udpSocket.Bind(bind_address);
- m_connection->SetPeerID(PEER_ID_SERVER);
- }
- catch (SocketException &e) {
- // Create event
- ConnectionEvent ce;
- ce.bindFailed();
- m_connection->putEvent(ce);
- }
- }
- void ConnectionSendThread::connect(Address address)
- {
- LOG(dout_con << m_connection->getDesc() << " connecting to "
- << address.serializeString()
- << ":" << address.getPort() << std::endl);
- UDPPeer *peer = m_connection->createServerPeer(address);
- // Create event
- ConnectionEvent e;
- e.peerAdded(peer->id, peer->address);
- m_connection->putEvent(e);
- Address bind_addr;
- if (address.isIPv6())
- bind_addr.setAddress((IPv6AddressBytes *) NULL);
- else
- bind_addr.setAddress(0, 0, 0, 0);
- m_connection->m_udpSocket.Bind(bind_addr);
- // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
- m_connection->SetPeerID(PEER_ID_INEXISTENT);
- NetworkPacket pkt(0, 0);
- m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
- }
- void ConnectionSendThread::disconnect()
- {
- LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
- // Create and send DISCO packet
- SharedBuffer<u8> data(2);
- writeU8(&data[0], PACKET_TYPE_CONTROL);
- writeU8(&data[1], CONTROLTYPE_DISCO);
- // Send to all
- std::list<session_t> peerids = m_connection->getPeerIDs();
- for (session_t peerid : peerids) {
- sendAsPacket(peerid, 0, data, false);
- }
- }
- void ConnectionSendThread::disconnect_peer(session_t peer_id)
- {
- LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
- // Create and send DISCO packet
- SharedBuffer<u8> data(2);
- writeU8(&data[0], PACKET_TYPE_CONTROL);
- writeU8(&data[1], CONTROLTYPE_DISCO);
- sendAsPacket(peer_id, 0, data, false);
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
- if (!peer)
- return;
- if (dynamic_cast<UDPPeer *>(&peer) == 0) {
- return;
- }
- dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
- }
- void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
- SharedBuffer<u8> data)
- {
- assert(channelnum < CHANNEL_COUNT); // Pre-condition
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
- if (!peer) {
- LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
- << ">>>NOT<<< found on sending packet"
- << ", channel " << (channelnum % 0xFF)
- << ", size: " << data.getSize() << std::endl);
- return;
- }
- LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
- << ", channel " << (channelnum % 0xFF)
- << ", size: " << data.getSize() << std::endl);
- u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
- u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
- std::list<SharedBuffer<u8>> originals;
- makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
- peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
- for (const SharedBuffer<u8> &original : originals) {
- sendAsPacket(peer_id, channelnum, original);
- }
- }
- void ConnectionSendThread::sendReliable(ConnectionCommand &c)
- {
- PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
- if (!peer)
- return;
- peer->PutReliableSendCommand(c, m_max_packet_size);
- }
- void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer<u8> data)
- {
- std::list<session_t> peerids = m_connection->getPeerIDs();
- for (session_t peerid : peerids) {
- send(peerid, channelnum, data);
- }
- }
- void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
- {
- std::list<session_t> peerids = m_connection->getPeerIDs();
- for (session_t peerid : peerids) {
- PeerHelper peer = m_connection->getPeerNoEx(peerid);
- if (!peer)
- continue;
- peer->PutReliableSendCommand(c, m_max_packet_size);
- }
- }
- void ConnectionSendThread::sendPackets(float dtime)
- {
- std::list<session_t> peerIds = m_connection->getPeerIDs();
- std::list<session_t> pendingDisconnect;
- std::map<session_t, bool> pending_unreliable;
- for (session_t peerId : peerIds) {
- PeerHelper peer = m_connection->getPeerNoEx(peerId);
- //peer may have been removed
- if (!peer) {
- LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
- << peerId
- << std::endl);
- continue;
- }
- peer->m_increment_packets_remaining =
- m_iteration_packets_avaialble / m_connection->m_peers.size();
- UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
- if (!udpPeer) {
- continue;
- }
- if (udpPeer->m_pending_disconnect) {
- pendingDisconnect.push_back(peerId);
- }
- PROFILE(std::stringstream
- peerIdentifier);
- PROFILE(
- peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
- << ";RELIABLE]");
- PROFILE(ScopeProfiler
- peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
- LOG(dout_con << m_connection->getDesc()
- << " Handle per peer queues: peer_id=" << peerId
- << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
- // first send queued reliable packets for all peers (if possible)
- for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
- Channel &channel = udpPeer->channels[i];
- u16 next_to_ack = 0;
- channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
- u16 next_to_receive = 0;
- channel.incoming_reliables.getFirstSeqnum(next_to_receive);
- LOG(dout_con << m_connection->getDesc() << "\t channel: "
- << i << ", peer quota:"
- << peer->m_increment_packets_remaining
- << std::endl
- << "\t\t\treliables on wire: "
- << channel.outgoing_reliables_sent.size()
- << ", waiting for ack for " << next_to_ack
- << std::endl
- << "\t\t\tincoming_reliables: "
- << channel.incoming_reliables.size()
- << ", next reliable packet: "
- << channel.readNextIncomingSeqNum()
- << ", next queued: " << next_to_receive
- << std::endl
- << "\t\t\treliables queued : "
- << channel.queued_reliables.size()
- << std::endl
- << "\t\t\tqueued commands : "
- << channel.queued_commands.size()
- << std::endl);
- while (!channel.queued_reliables.empty() &&
- channel.outgoing_reliables_sent.size()
- < channel.getWindowSize() &&
- peer->m_increment_packets_remaining > 0) {
- BufferedPacket p = channel.queued_reliables.front();
- channel.queued_reliables.pop();
- LOG(dout_con << m_connection->getDesc()
- << " INFO: sending a queued reliable packet "
- << " channel: " << i
- << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
- << std::endl);
- sendAsPacketReliable(p, &channel);
- peer->m_increment_packets_remaining--;
- }
- }
- }
- if (!m_outgoing_queue.empty()) {
- LOG(dout_con << m_connection->getDesc()
- << " Handle non reliable queue ("
- << m_outgoing_queue.size() << " pkts)" << std::endl);
- }
- unsigned int initial_queuesize = m_outgoing_queue.size();
- /* send non reliable packets*/
- for (unsigned int i = 0; i < initial_queuesize; i++) {
- OutgoingPacket packet = m_outgoing_queue.front();
- m_outgoing_queue.pop();
- if (packet.reliable)
- continue;
- PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
- if (!peer) {
- LOG(dout_con << m_connection->getDesc()
- << " Outgoing queue: peer_id=" << packet.peer_id
- << ">>>NOT<<< found on sending packet"
- << ", channel " << (packet.channelnum % 0xFF)
- << ", size: " << packet.data.getSize() << std::endl);
- continue;
- }
- /* send acks immediately */
- if (packet.ack) {
- rawSendAsPacket(packet.peer_id, packet.channelnum,
- packet.data, packet.reliable);
- peer->m_increment_packets_remaining =
- MYMIN(0, peer->m_increment_packets_remaining--);
- } else if (
- (peer->m_increment_packets_remaining > 0) ||
- (stopRequested())) {
- rawSendAsPacket(packet.peer_id, packet.channelnum,
- packet.data, packet.reliable);
- peer->m_increment_packets_remaining--;
- } else {
- m_outgoing_queue.push(packet);
- pending_unreliable[packet.peer_id] = true;
- }
- }
- for (session_t peerId : pendingDisconnect) {
- if (!pending_unreliable[peerId]) {
- m_connection->deletePeer(peerId, false);
- }
- }
- }
- void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
- SharedBuffer<u8> data, bool ack)
- {
- OutgoingPacket packet(peer_id, channelnum, data, false, ack);
- m_outgoing_queue.push(packet);
- }
- ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
- Thread("ConnectionReceive")
- {
- }
- void *ConnectionReceiveThread::run()
- {
- assert(m_connection);
- LOG(dout_con << m_connection->getDesc()
- << "ConnectionReceive thread started" << std::endl);
- PROFILE(std::stringstream
- ThreadIdentifier);
- PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
- #ifdef DEBUG_CONNECTION_KBPS
- u64 curtime = porting::getTimeMs();
- u64 lasttime = curtime;
- float debug_print_timer = 0.0;
- #endif
- while (!stopRequested()) {
- BEGIN_DEBUG_EXCEPTION_HANDLER
- PROFILE(ScopeProfiler
- sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
- #ifdef DEBUG_CONNECTION_KBPS
- lasttime = curtime;
- curtime = porting::getTimeMs();
- float dtime = CALC_DTIME(lasttime,curtime);
- #endif
- /* receive packets */
- receive();
- #ifdef DEBUG_CONNECTION_KBPS
- debug_print_timer += dtime;
- if (debug_print_timer > 20.0) {
- debug_print_timer -= 20.0;
- std::list<session_t> peerids = m_connection->getPeerIDs();
- for (std::list<session_t>::iterator i = peerids.begin();
- i != peerids.end();
- i++)
- {
- PeerHelper peer = m_connection->getPeerNoEx(*i);
- if (!peer)
- continue;
- float peer_current = 0.0;
- float peer_loss = 0.0;
- float avg_rate = 0.0;
- float avg_loss = 0.0;
- for(u16 j=0; j<CHANNEL_COUNT; j++)
- {
- peer_current +=peer->channels[j].getCurrentDownloadRateKB();
- peer_loss += peer->channels[j].getCurrentLossRateKB();
- avg_rate += peer->channels[j].getAvgDownloadRateKB();
- avg_loss += peer->channels[j].getAvgLossRateKB();
- }
- std::stringstream output;
- output << std::fixed << std::setprecision(1);
- output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
- output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
- output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
- output << std::setfill(' ');
- for(u16 j=0; j<CHANNEL_COUNT; j++)
- {
- output << "\tcha " << j << ":"
- << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
- << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
- << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
- << " /"
- << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
- << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
- << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
- << " / WS: " << peer->channels[j].getWindowSize()
- << std::endl;
- }
- fprintf(stderr,"%s\n",output.str().c_str());
- }
- }
- #endif
- END_DEBUG_EXCEPTION_HANDLER
- }
- PROFILE(g_profiler->remove(ThreadIdentifier.str()));
- return NULL;
- }
- // Receive packets from the network and buffers and create ConnectionEvents
- void ConnectionReceiveThread::receive()
- {
- // use IPv6 minimum allowed MTU as receive buffer size as this is
- // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
- // infrastructure
- unsigned int packet_maxsize = 1500;
- SharedBuffer<u8> packetdata(packet_maxsize);
- bool packet_queued = true;
- unsigned int loop_count = 0;
- /* first of all read packets from socket */
- /* check for incoming data available */
- while ((loop_count < 10) &&
- (m_connection->m_udpSocket.WaitData(50))) {
- loop_count++;
- try {
- if (packet_queued) {
- bool data_left = true;
- session_t peer_id;
- SharedBuffer<u8> resultdata;
- while (data_left) {
- try {
- data_left = getFromBuffers(peer_id, resultdata);
- if (data_left) {
- ConnectionEvent e;
- e.dataReceived(peer_id, resultdata);
- m_connection->putEvent(e);
- }
- }
- catch (ProcessedSilentlyException &e) {
- /* try reading again */
- }
- }
- packet_queued = false;
- }
- Address sender;
- s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
- packet_maxsize);
- if ((received_size < BASE_HEADER_SIZE) ||
- (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
- LOG(derr_con << m_connection->getDesc()
- << "Receive(): Invalid incoming packet, "
- << "size: " << received_size
- << ", protocol: "
- << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
- << std::endl);
- continue;
- }
- session_t peer_id = readPeerId(*packetdata);
- u8 channelnum = readChannel(*packetdata);
- if (channelnum > CHANNEL_COUNT - 1) {
- LOG(derr_con << m_connection->getDesc()
- << "Receive(): Invalid channel " << channelnum << std::endl);
- throw InvalidIncomingDataException("Channel doesn't exist");
- }
- /* Try to identify peer by sender address (may happen on join) */
- if (peer_id == PEER_ID_INEXISTENT) {
- peer_id = m_connection->lookupPeer(sender);
- // We do not have to remind the peer of its
- // peer id as the CONTROLTYPE_SET_PEER_ID
- // command was sent reliably.
- }
- /* The peer was not found in our lists. Add it. */
- if (peer_id == PEER_ID_INEXISTENT) {
- peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
- }
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
- if (!peer) {
- LOG(dout_con << m_connection->getDesc()
- << " got packet from unknown peer_id: "
- << peer_id << " Ignoring." << std::endl);
- continue;
- }
- // Validate peer address
- Address peer_address;
- if (peer->getAddress(MTP_UDP, peer_address)) {
- if (peer_address != sender) {
- LOG(derr_con << m_connection->getDesc()
- << m_connection->getDesc()
- << " Peer " << peer_id << " sending from different address."
- " Ignoring." << std::endl);
- continue;
- }
- } else {
- bool invalid_address = true;
- if (invalid_address) {
- LOG(derr_con << m_connection->getDesc()
- << m_connection->getDesc()
- << " Peer " << peer_id << " unknown."
- " Ignoring." << std::endl);
- continue;
- }
- }
- peer->ResetTimeout();
- Channel *channel = 0;
- if (dynamic_cast<UDPPeer *>(&peer) != 0) {
- channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
- }
- if (channel != 0) {
- channel->UpdateBytesReceived(received_size);
- }
- // Throw the received packet to channel->processPacket()
- // Make a new SharedBuffer from the data without the base headers
- SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
- memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
- strippeddata.getSize());
- try {
- // Process it (the result is some data with no headers made by us)
- SharedBuffer<u8> resultdata = processPacket
- (channel, strippeddata, peer_id, channelnum, false);
- LOG(dout_con << m_connection->getDesc()
- << " ProcessPacket from peer_id: " << peer_id
- << ",channel: " << (channelnum & 0xFF) << ", returned "
- << resultdata.getSize() << " bytes" << std::endl);
- ConnectionEvent e;
- e.dataReceived(peer_id, resultdata);
- m_connection->putEvent(e);
- }
- catch (ProcessedSilentlyException &e) {
- }
- catch (ProcessedQueued &e) {
- packet_queued = true;
- }
- }
- catch (InvalidIncomingDataException &e) {
- }
- catch (ProcessedSilentlyException &e) {
- }
- }
- }
- bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
- {
- std::list<session_t> peerids = m_connection->getPeerIDs();
- for (session_t peerid : peerids) {
- PeerHelper peer = m_connection->getPeerNoEx(peerid);
- if (!peer)
- continue;
- if (dynamic_cast<UDPPeer *>(&peer) == 0)
- continue;
- for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
- if (checkIncomingBuffers(&channel, peer_id, dst)) {
- return true;
- }
- }
- }
- return false;
- }
- bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
- session_t &peer_id, SharedBuffer<u8> &dst)
- {
- u16 firstseqnum = 0;
- if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
- if (firstseqnum == channel->readNextIncomingSeqNum()) {
- BufferedPacket p = channel->incoming_reliables.popFirst();
- peer_id = readPeerId(*p.data);
- u8 channelnum = readChannel(*p.data);
- u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
- LOG(dout_con << m_connection->getDesc()
- << "UNBUFFERING TYPE_RELIABLE"
- << " seqnum=" << seqnum
- << " peer_id=" << peer_id
- << " channel=" << ((int) channelnum & 0xff)
- << std::endl);
- channel->incNextIncomingSeqNum();
- u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
- // Get out the inside packet and re-process it
- SharedBuffer<u8> payload(p.data.getSize() - headers_size);
- memcpy(*payload, &p.data[headers_size], payload.getSize());
- dst = processPacket(channel, payload, peer_id, channelnum, true);
- return true;
- }
- }
- return false;
- }
- SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
- SharedBuffer<u8> packetdata, session_t peer_id, u8 channelnum, bool reliable)
- {
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
- if (!peer) {
- errorstream << "Peer not found (possible timeout)" << std::endl;
- throw ProcessedSilentlyException("Peer not found (possible timeout)");
- }
- if (packetdata.getSize() < 1)
- throw InvalidIncomingDataException("packetdata.getSize() < 1");
- u8 type = readU8(&(packetdata[0]));
- if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
- std::string errmsg = "Invalid peer_id=" + itos(peer_id);
- errorstream << errmsg << std::endl;
- throw InvalidIncomingDataException(errmsg.c_str());
- }
- if (type >= PACKET_TYPE_MAX) {
- derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
- << std::endl;
- throw InvalidIncomingDataException("Invalid packet type");
- }
- const PacketTypeHandler &pHandle = packetTypeRouter[type];
- return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
- }
- const ConnectionReceiveThread::PacketTypeHandler
- ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
- {&ConnectionReceiveThread::handlePacketType_Control},
- {&ConnectionReceiveThread::handlePacketType_Original},
- {&ConnectionReceiveThread::handlePacketType_Split},
- {&ConnectionReceiveThread::handlePacketType_Reliable},
- };
- SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
- SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
- {
- if (packetdata.getSize() < 2)
- throw InvalidIncomingDataException("packetdata.getSize() < 2");
- u8 controltype = readU8(&(packetdata[1]));
- if (controltype == CONTROLTYPE_ACK) {
- assert(channel != NULL);
- if (packetdata.getSize() < 4) {
- throw InvalidIncomingDataException(
- "packetdata.getSize() < 4 (ACK header size)");
- }
- u16 seqnum = readU16(&packetdata[2]);
- LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
- << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
- << seqnum << " ]" << std::endl);
- try {
- BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
- // only calculate rtt from straight sent packets
- if (p.resend_count == 0) {
- // Get round trip time
- u64 current_time = porting::getTimeMs();
- // a overflow is quite unlikely but as it'd result in major
- // rtt miscalculation we handle it here
- if (current_time > p.absolute_send_time) {
- float rtt = (current_time - p.absolute_send_time) / 1000.0;
- // Let peer calculate stuff according to it
- // (avg_rtt and resend_timeout)
- dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
- } else if (p.totaltime > 0) {
- float rtt = p.totaltime;
- // Let peer calculate stuff according to it
- // (avg_rtt and resend_timeout)
- dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
- }
- }
- // put bytes for max bandwidth calculation
- channel->UpdateBytesSent(p.data.getSize(), 1);
- if (channel->outgoing_reliables_sent.size() == 0)
- m_connection->TriggerSend();
- } catch (NotFoundException &e) {
- LOG(derr_con << m_connection->getDesc()
- << "WARNING: ACKed packet not in outgoing queue" << std::endl);
- channel->UpdatePacketTooLateCounter();
- }
- throw ProcessedSilentlyException("Got an ACK");
- } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
- // Got a packet to set our peer id
- if (packetdata.getSize() < 4)
- throw InvalidIncomingDataException
- ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
- session_t peer_id_new = readU16(&packetdata[2]);
- LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
- << "... " << std::endl);
- if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
- LOG(derr_con << m_connection->getDesc()
- << "WARNING: Not changing existing peer id." << std::endl);
- } else {
- LOG(dout_con << m_connection->getDesc() << "changing own peer id"
- << std::endl);
- m_connection->SetPeerID(peer_id_new);
- }
- ConnectionCommand cmd;
- SharedBuffer<u8> reply(2);
- writeU8(&reply[0], PACKET_TYPE_CONTROL);
- writeU8(&reply[1], CONTROLTYPE_ENABLE_BIG_SEND_WINDOW);
- cmd.disableLegacy(PEER_ID_SERVER, reply);
- m_connection->putCommand(cmd);
- throw ProcessedSilentlyException("Got a SET_PEER_ID");
- } else if (controltype == CONTROLTYPE_PING) {
- // Just ignore it, the incoming data already reset
- // the timeout counter
- LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
- throw ProcessedSilentlyException("Got a PING");
- } else if (controltype == CONTROLTYPE_DISCO) {
- // Just ignore it, the incoming data already reset
- // the timeout counter
- LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
- << peer->id << std::endl);
- if (!m_connection->deletePeer(peer->id, false)) {
- derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
- }
- throw ProcessedSilentlyException("Got a DISCO");
- } else if (controltype == CONTROLTYPE_ENABLE_BIG_SEND_WINDOW) {
- dynamic_cast<UDPPeer *>(peer)->setNonLegacyPeer();
- throw ProcessedSilentlyException("Got non legacy control");
- } else {
- LOG(derr_con << m_connection->getDesc()
- << "INVALID TYPE_CONTROL: invalid controltype="
- << ((int) controltype & 0xff) << std::endl);
- throw InvalidIncomingDataException("Invalid control type");
- }
- }
- SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
- SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
- {
- if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
- throw InvalidIncomingDataException
- ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
- LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
- << std::endl);
- // Get the inside packet out and return it
- SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
- memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
- return payload;
- }
- SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
- SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
- {
- Address peer_address;
- if (peer->getAddress(MTP_UDP, peer_address)) {
- // We have to create a packet again for buffering
- // This isn't actually too bad an idea.
- BufferedPacket packet = makePacket(peer_address,
- packetdata,
- m_connection->GetProtocolID(),
- peer->id,
- channelnum);
- // Buffer the packet
- SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
- if (data.getSize() != 0) {
- LOG(dout_con << m_connection->getDesc()
- << "RETURNING TYPE_SPLIT: Constructed full data, "
- << "size=" << data.getSize() << std::endl);
- return data;
- }
- LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
- throw ProcessedSilentlyException("Buffered a split packet chunk");
- }
- // We should never get here.
- FATAL_ERROR("Invalid execution point");
- }
- SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
- SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
- {
- assert(channel != NULL);
- // Recursive reliable packets not allowed
- if (reliable)
- throw InvalidIncomingDataException("Found nested reliable packets");
- if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
- throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
- u16 seqnum = readU16(&packetdata[1]);
- bool is_future_packet = false;
- bool is_old_packet = false;
- /* packet is within our receive window send ack */
- if (seqnum_in_window(seqnum,
- channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
- m_connection->sendAck(peer->id, channelnum, seqnum);
- } else {
- is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
- is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
- /* packet is not within receive window, don't send ack. *
- * if this was a valid packet it's gonna be retransmitted */
- if (is_future_packet)
- throw ProcessedSilentlyException(
- "Received packet newer then expected, not sending ack");
- /* seems like our ack was lost, send another one for a old packet */
- if (is_old_packet) {
- LOG(dout_con << m_connection->getDesc()
- << "RE-SENDING ACK: peer_id: " << peer->id
- << ", channel: " << (channelnum & 0xFF)
- << ", seqnum: " << seqnum << std::endl;)
- m_connection->sendAck(peer->id, channelnum, seqnum);
- // we already have this packet so this one was on wire at least
- // the current timeout
- // we don't know how long this packet was on wire don't do silly guessing
- // dynamic_cast<UDPPeer*>(&peer)->
- // reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
- throw ProcessedSilentlyException("Retransmitting ack for old packet");
- }
- }
- if (seqnum != channel->readNextIncomingSeqNum()) {
- Address peer_address;
- // this is a reliable packet so we have a udp address for sure
- peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
- // This one comes later, buffer it.
- // Actually we have to make a packet to buffer one.
- // Well, we have all the ingredients, so just do it.
- BufferedPacket packet = con::makePacket(
- peer_address,
- packetdata,
- m_connection->GetProtocolID(),
- peer->id,
- channelnum);
- try {
- channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
- LOG(dout_con << m_connection->getDesc()
- << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
- << ", channel: " << (channelnum & 0xFF)
- << ", seqnum: " << seqnum << std::endl;)
- throw ProcessedQueued("Buffered future reliable packet");
- } catch (AlreadyExistsException &e) {
- } catch (IncomingDataCorruption &e) {
- ConnectionCommand discon;
- discon.disconnect_peer(peer->id);
- m_connection->putCommand(discon);
- LOG(derr_con << m_connection->getDesc()
- << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
- << ", channel: " << (channelnum & 0xFF)
- << ", seqnum: " << seqnum
- << "DROPPING CLIENT!" << std::endl;)
- }
- }
- /* we got a packet to process right now */
- LOG(dout_con << m_connection->getDesc()
- << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
- << ", channel: " << (channelnum & 0xFF)
- << ", seqnum: " << seqnum << std::endl;)
- /* check for resend case */
- u16 queued_seqnum = 0;
- if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
- if (queued_seqnum == seqnum) {
- BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
- /** TODO find a way to verify the new against the old packet */
- }
- }
- channel->incNextIncomingSeqNum();
- // Get out the inside packet and re-process it
- SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
- memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
- return processPacket(channel, payload, peer->id, channelnum, true);
- }
- }
|