12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358 |
- /*
- Minetest
- Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
- Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 2.1 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public License along
- with this program; if not, write to the Free Software Foundation, Inc.,
- 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
- #include "connectionthreads.h"
- #include "log.h"
- #include "profiler.h"
- #include "settings.h"
- #include "network/networkpacket.h"
- #include "util/serialize.h"
- namespace con
- {
- /******************************************************************************/
- /* defines used for debugging and profiling */
- /******************************************************************************/
- #ifdef NDEBUG
- #define PROFILE(a)
- #undef DEBUG_CONNECTION_KBPS
- #else
- /* this mutex is used to achieve log message consistency */
- #define PROFILE(a) a
- //#define DEBUG_CONNECTION_KBPS
- #undef DEBUG_CONNECTION_KBPS
- #endif
- // TODO: Clean this up.
- #define LOG(a) a
- #define WINDOW_SIZE 5
- static session_t readPeerId(const u8 *packetdata)
- {
- return readU16(&packetdata[4]);
- }
- static u8 readChannel(const u8 *packetdata)
- {
- return readU8(&packetdata[6]);
- }
- /******************************************************************************/
- /* Connection Threads */
- /******************************************************************************/
- ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
- float timeout) :
- Thread("ConnectionSend"),
- m_max_packet_size(max_packet_size),
- m_timeout(timeout),
- m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
- {
- SANITY_CHECK(m_max_data_packets_per_iteration > 1);
- }
- void *ConnectionSendThread::run()
- {
- assert(m_connection);
- LOG(dout_con << m_connection->getDesc()
- << "ConnectionSend thread started" << std::endl);
- u64 curtime = porting::getTimeMs();
- u64 lasttime = curtime;
- PROFILE(std::stringstream ThreadIdentifier);
- PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
- /* if stop is requested don't stop immediately but try to send all */
- /* packets first */
- while (!stopRequested() || packetsQueued()) {
- BEGIN_DEBUG_EXCEPTION_HANDLER
- PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
- m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
- /* wait for trigger or timeout */
- m_send_sleep_semaphore.wait(50);
- /* remove all triggers */
- while (m_send_sleep_semaphore.wait(0)) {
- }
- lasttime = curtime;
- curtime = porting::getTimeMs();
- float dtime = CALC_DTIME(lasttime, curtime);
- /* first resend timed-out packets */
- runTimeouts(dtime);
- if (m_iteration_packets_avaialble == 0) {
- LOG(warningstream << m_connection->getDesc()
- << " Packet quota used up after re-sending packets, "
- << "max=" << m_max_data_packets_per_iteration << std::endl);
- }
- /* translate commands to packets */
- auto c = m_connection->m_command_queue.pop_frontNoEx(0);
- while (c && c->type != CONNCMD_NONE) {
- if (c->reliable)
- processReliableCommand(c);
- else
- processNonReliableCommand(c);
- c = m_connection->m_command_queue.pop_frontNoEx(0);
- }
- /* send queued packets */
- sendPackets(dtime);
- END_DEBUG_EXCEPTION_HANDLER
- }
- PROFILE(g_profiler->remove(ThreadIdentifier.str()));
- return NULL;
- }
- void ConnectionSendThread::Trigger()
- {
- m_send_sleep_semaphore.post();
- }
- bool ConnectionSendThread::packetsQueued()
- {
- std::vector<session_t> peerIds = m_connection->getPeerIDs();
- if (!m_outgoing_queue.empty() && !peerIds.empty())
- return true;
- for (session_t peerId : peerIds) {
- PeerHelper peer = m_connection->getPeerNoEx(peerId);
- if (!peer)
- continue;
- if (dynamic_cast<UDPPeer *>(&peer) == 0)
- continue;
- for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
- if (!channel.queued_commands.empty()) {
- return true;
- }
- }
- }
- return false;
- }
- void ConnectionSendThread::runTimeouts(float dtime)
- {
- std::vector<session_t> timeouted_peers;
- std::vector<session_t> peerIds = m_connection->getPeerIDs();
- const u32 numpeers = m_connection->m_peers.size();
- if (numpeers == 0)
- return;
- for (session_t &peerId : peerIds) {
- PeerHelper peer = m_connection->getPeerNoEx(peerId);
- if (!peer)
- continue;
- UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
- if (!udpPeer)
- continue;
- PROFILE(std::stringstream peerIdentifier);
- PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
- << ";" << peerId << ";RELIABLE]");
- PROFILE(ScopeProfiler
- peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
- SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
- /*
- Check peer timeout
- */
- if (peer->isTimedOut(m_timeout)) {
- infostream << m_connection->getDesc()
- << "RunTimeouts(): Peer " << peer->id
- << " has timed out."
- << std::endl;
- // Add peer to the list
- timeouted_peers.push_back(peer->id);
- // Don't bother going through the buffers of this one
- continue;
- }
- float resend_timeout = udpPeer->getResendTimeout();
- for (Channel &channel : udpPeer->channels) {
- // Remove timed out incomplete unreliable split packets
- channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
- // Increment reliable packet times
- channel.outgoing_reliables_sent.incrementTimeouts(dtime);
- // Re-send timed out outgoing reliables
- auto timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
- (m_max_data_packets_per_iteration / numpeers));
- channel.UpdatePacketLossCounter(timed_outs.size());
- g_profiler->graphAdd("packets_lost", timed_outs.size());
- m_iteration_packets_avaialble -= timed_outs.size();
- for (const auto &k : timed_outs) {
- u8 channelnum = readChannel(k->data);
- u16 seqnum = k->getSeqnum();
- channel.UpdateBytesLost(k->size());
- LOG(derr_con << m_connection->getDesc()
- << "RE-SENDING timed-out RELIABLE to "
- << k->address.serializeString()
- << "(t/o=" << resend_timeout << "): "
- << "count=" << k->resend_count
- << ", channel=" << ((int) channelnum & 0xff)
- << ", seqnum=" << seqnum
- << std::endl);
- rawSend(k.get());
- // do not handle rtt here as we can't decide if this packet was
- // lost or really takes more time to transmit
- }
- channel.UpdateTimers(dtime);
- }
- /* send ping if necessary */
- if (udpPeer->Ping(dtime, data)) {
- LOG(dout_con << m_connection->getDesc()
- << "Sending ping for peer_id: " << udpPeer->id << std::endl);
- /* this may fail if there ain't a sequence number left */
- if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
- //retrigger with reduced ping interval
- udpPeer->Ping(4.0, data);
- }
- }
- udpPeer->RunCommandQueues(m_max_packet_size,
- m_max_commands_per_iteration,
- m_max_packets_requeued);
- }
- // Remove timed out peers
- for (u16 timeouted_peer : timeouted_peers) {
- LOG(dout_con << m_connection->getDesc()
- << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
- m_connection->deletePeer(timeouted_peer, true);
- }
- }
- void ConnectionSendThread::rawSend(const BufferedPacket *p)
- {
- try {
- m_connection->m_udpSocket.Send(p->address, p->data, p->size());
- LOG(dout_con << m_connection->getDesc()
- << " rawSend: " << p->size()
- << " bytes sent" << std::endl);
- } catch (SendFailedException &e) {
- LOG(derr_con << m_connection->getDesc()
- << "Connection::rawSend(): SendFailedException: "
- << p->address.serializeString() << std::endl);
- }
- }
- void ConnectionSendThread::sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel)
- {
- try {
- p->absolute_send_time = porting::getTimeMs();
- // Buffer the packet
- channel->outgoing_reliables_sent.insert(p,
- (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
- % (MAX_RELIABLE_WINDOW_SIZE + 1));
- }
- catch (AlreadyExistsException &e) {
- LOG(derr_con << m_connection->getDesc()
- << "WARNING: Going to send a reliable packet"
- << " in outgoing buffer" << std::endl);
- }
- // Send the packet
- rawSend(p.get());
- }
- bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
- const SharedBuffer<u8> &data, bool reliable)
- {
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
- if (!peer) {
- LOG(errorstream << m_connection->getDesc()
- << " dropped " << (reliable ? "reliable " : "")
- << "packet for non existent peer_id: " << peer_id << std::endl);
- return false;
- }
- Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
- if (reliable) {
- bool have_seqnum = false;
- const u16 seqnum = channel->getOutgoingSequenceNumber(have_seqnum);
- if (!have_seqnum)
- return false;
- SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
- Address peer_address;
- peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
- // Add base headers and make a packet
- BufferedPacketPtr p = con::makePacket(peer_address, reliable,
- m_connection->GetProtocolID(), m_connection->GetPeerID(),
- channelnum);
- // first check if our send window is already maxed out
- if (channel->outgoing_reliables_sent.size() < channel->getWindowSize()) {
- LOG(dout_con << m_connection->getDesc()
- << " INFO: sending a reliable packet to peer_id " << peer_id
- << " channel: " << (u32)channelnum
- << " seqnum: " << seqnum << std::endl);
- sendAsPacketReliable(p, channel);
- return true;
- }
- LOG(dout_con << m_connection->getDesc()
- << " INFO: queueing reliable packet for peer_id: " << peer_id
- << " channel: " << (u32)channelnum
- << " seqnum: " << seqnum << std::endl);
- channel->queued_reliables.push(p);
- return false;
- }
- Address peer_address;
- if (peer->getAddress(MTP_UDP, peer_address)) {
- // Add base headers and make a packet
- BufferedPacketPtr p = con::makePacket(peer_address, data,
- m_connection->GetProtocolID(), m_connection->GetPeerID(),
- channelnum);
- // Send the packet
- rawSend(p.get());
- return true;
- }
- LOG(dout_con << m_connection->getDesc()
- << " INFO: dropped unreliable packet for peer_id: " << peer_id
- << " because of (yet) missing udp address" << std::endl);
- return false;
- }
- void ConnectionSendThread::processReliableCommand(ConnectionCommandPtr &c)
- {
- assert(c->reliable); // Pre-condition
- switch (c->type) {
- case CONNCMD_NONE:
- LOG(dout_con << m_connection->getDesc()
- << "UDP processing reliable CONNCMD_NONE" << std::endl);
- return;
- case CONNCMD_SEND:
- LOG(dout_con << m_connection->getDesc()
- << "UDP processing reliable CONNCMD_SEND" << std::endl);
- sendReliable(c);
- return;
- case CONNCMD_SEND_TO_ALL:
- LOG(dout_con << m_connection->getDesc()
- << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
- sendToAllReliable(c);
- return;
- case CONCMD_CREATE_PEER:
- LOG(dout_con << m_connection->getDesc()
- << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
- if (!rawSendAsPacket(c->peer_id, c->channelnum, c->data, c->reliable)) {
- /* put to queue if we couldn't send it immediately */
- sendReliable(c);
- }
- return;
- case CONNCMD_SERVE:
- case CONNCMD_CONNECT:
- case CONNCMD_DISCONNECT:
- case CONCMD_ACK:
- FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
- default:
- LOG(dout_con << m_connection->getDesc()
- << " Invalid reliable command type: " << c->type << std::endl);
- }
- }
- void ConnectionSendThread::processNonReliableCommand(ConnectionCommandPtr &c_ptr)
- {
- const ConnectionCommand &c = *c_ptr;
- assert(!c.reliable); // Pre-condition
- switch (c.type) {
- case CONNCMD_NONE:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_NONE" << std::endl);
- return;
- case CONNCMD_SERVE:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_SERVE port="
- << c.address.serializeString() << std::endl);
- serve(c.address);
- return;
- case CONNCMD_CONNECT:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_CONNECT" << std::endl);
- connect(c.address);
- return;
- case CONNCMD_DISCONNECT:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_DISCONNECT" << std::endl);
- disconnect();
- return;
- case CONNCMD_DISCONNECT_PEER:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
- disconnect_peer(c.peer_id);
- return;
- case CONNCMD_SEND:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_SEND" << std::endl);
- send(c.peer_id, c.channelnum, c.data);
- return;
- case CONNCMD_SEND_TO_ALL:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
- sendToAll(c.channelnum, c.data);
- return;
- case CONCMD_ACK:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONCMD_ACK" << std::endl);
- sendAsPacket(c.peer_id, c.channelnum, c.data, true);
- return;
- case CONCMD_CREATE_PEER:
- FATAL_ERROR("Got command that should be reliable as unreliable command");
- default:
- LOG(dout_con << m_connection->getDesc()
- << " Invalid command type: " << c.type << std::endl);
- }
- }
- void ConnectionSendThread::serve(Address bind_address)
- {
- LOG(dout_con << m_connection->getDesc()
- << "UDP serving at port " << bind_address.serializeString() << std::endl);
- try {
- m_connection->m_udpSocket.Bind(bind_address);
- m_connection->SetPeerID(PEER_ID_SERVER);
- }
- catch (SocketException &e) {
- // Create event
- m_connection->putEvent(ConnectionEvent::bindFailed());
- }
- }
- void ConnectionSendThread::connect(Address address)
- {
- LOG(dout_con << m_connection->getDesc() << " connecting to "
- << address.serializeString()
- << ":" << address.getPort() << std::endl);
- UDPPeer *peer = m_connection->createServerPeer(address);
- // Create event
- m_connection->putEvent(ConnectionEvent::peerAdded(peer->id, peer->address));
- Address bind_addr;
- if (address.isIPv6())
- bind_addr.setAddress((IPv6AddressBytes *) NULL);
- else
- bind_addr.setAddress(0, 0, 0, 0);
- m_connection->m_udpSocket.Bind(bind_addr);
- // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
- m_connection->SetPeerID(PEER_ID_INEXISTENT);
- NetworkPacket pkt(0, 0);
- m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
- }
- void ConnectionSendThread::disconnect()
- {
- LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
- // Create and send DISCO packet
- SharedBuffer<u8> data(2);
- writeU8(&data[0], PACKET_TYPE_CONTROL);
- writeU8(&data[1], CONTROLTYPE_DISCO);
- // Send to all
- std::vector<session_t> peerids = m_connection->getPeerIDs();
- for (session_t peerid : peerids) {
- sendAsPacket(peerid, 0, data, false);
- }
- }
- void ConnectionSendThread::disconnect_peer(session_t peer_id)
- {
- LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
- // Create and send DISCO packet
- SharedBuffer<u8> data(2);
- writeU8(&data[0], PACKET_TYPE_CONTROL);
- writeU8(&data[1], CONTROLTYPE_DISCO);
- sendAsPacket(peer_id, 0, data, false);
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
- if (!peer)
- return;
- if (dynamic_cast<UDPPeer *>(&peer) == 0) {
- return;
- }
- dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
- }
- void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
- const SharedBuffer<u8> &data)
- {
- assert(channelnum < CHANNEL_COUNT); // Pre-condition
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
- if (!peer) {
- LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
- << ">>>NOT<<< found on sending packet"
- << ", channel " << (channelnum % 0xFF)
- << ", size: " << data.getSize() << std::endl);
- return;
- }
- LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
- << ", channel " << (channelnum % 0xFF)
- << ", size: " << data.getSize() << std::endl);
- u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
- u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
- std::list<SharedBuffer<u8>> originals;
- makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
- peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
- for (const SharedBuffer<u8> &original : originals) {
- sendAsPacket(peer_id, channelnum, original);
- }
- }
- void ConnectionSendThread::sendReliable(ConnectionCommandPtr &c)
- {
- PeerHelper peer = m_connection->getPeerNoEx(c->peer_id);
- if (!peer)
- return;
- peer->PutReliableSendCommand(c, m_max_packet_size);
- }
- void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data)
- {
- std::vector<session_t> peerids = m_connection->getPeerIDs();
- for (session_t peerid : peerids) {
- send(peerid, channelnum, data);
- }
- }
- void ConnectionSendThread::sendToAllReliable(ConnectionCommandPtr &c)
- {
- std::vector<session_t> peerids = m_connection->getPeerIDs();
- for (session_t peerid : peerids) {
- PeerHelper peer = m_connection->getPeerNoEx(peerid);
- if (!peer)
- continue;
- peer->PutReliableSendCommand(c, m_max_packet_size);
- }
- }
- void ConnectionSendThread::sendPackets(float dtime)
- {
- std::vector<session_t> peerIds = m_connection->getPeerIDs();
- std::vector<session_t> pendingDisconnect;
- std::map<session_t, bool> pending_unreliable;
- const unsigned int peer_packet_quota = m_iteration_packets_avaialble
- / MYMAX(peerIds.size(), 1);
- for (session_t peerId : peerIds) {
- PeerHelper peer = m_connection->getPeerNoEx(peerId);
- //peer may have been removed
- if (!peer) {
- LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
- << peerId
- << std::endl);
- continue;
- }
- peer->m_increment_packets_remaining = peer_packet_quota;
- UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
- if (!udpPeer) {
- continue;
- }
- if (udpPeer->m_pending_disconnect) {
- pendingDisconnect.push_back(peerId);
- }
- PROFILE(std::stringstream
- peerIdentifier);
- PROFILE(
- peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
- << ";RELIABLE]");
- PROFILE(ScopeProfiler
- peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
- LOG(dout_con << m_connection->getDesc()
- << " Handle per peer queues: peer_id=" << peerId
- << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
- // first send queued reliable packets for all peers (if possible)
- for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
- Channel &channel = udpPeer->channels[i];
- // Reduces logging verbosity
- if (channel.queued_reliables.empty())
- continue;
- u16 next_to_ack = 0;
- channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
- u16 next_to_receive = 0;
- channel.incoming_reliables.getFirstSeqnum(next_to_receive);
- LOG(dout_con << m_connection->getDesc() << "\t channel: "
- << i << ", peer quota:"
- << peer->m_increment_packets_remaining
- << std::endl
- << "\t\t\treliables on wire: "
- << channel.outgoing_reliables_sent.size()
- << ", waiting for ack for " << next_to_ack
- << std::endl
- << "\t\t\tincoming_reliables: "
- << channel.incoming_reliables.size()
- << ", next reliable packet: "
- << channel.readNextIncomingSeqNum()
- << ", next queued: " << next_to_receive
- << std::endl
- << "\t\t\treliables queued : "
- << channel.queued_reliables.size()
- << std::endl
- << "\t\t\tqueued commands : "
- << channel.queued_commands.size()
- << std::endl);
- while (!channel.queued_reliables.empty() &&
- channel.outgoing_reliables_sent.size()
- < channel.getWindowSize() &&
- peer->m_increment_packets_remaining > 0) {
- BufferedPacketPtr p = channel.queued_reliables.front();
- channel.queued_reliables.pop();
- LOG(dout_con << m_connection->getDesc()
- << " INFO: sending a queued reliable packet "
- << " channel: " << i
- << ", seqnum: " << p->getSeqnum()
- << std::endl);
- sendAsPacketReliable(p, &channel);
- peer->m_increment_packets_remaining--;
- }
- }
- }
- if (!m_outgoing_queue.empty()) {
- LOG(dout_con << m_connection->getDesc()
- << " Handle non reliable queue ("
- << m_outgoing_queue.size() << " pkts)" << std::endl);
- }
- unsigned int initial_queuesize = m_outgoing_queue.size();
- /* send non reliable packets*/
- for (unsigned int i = 0; i < initial_queuesize; i++) {
- OutgoingPacket packet = m_outgoing_queue.front();
- m_outgoing_queue.pop();
- if (packet.reliable)
- continue;
- PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
- if (!peer) {
- LOG(dout_con << m_connection->getDesc()
- << " Outgoing queue: peer_id=" << packet.peer_id
- << ">>>NOT<<< found on sending packet"
- << ", channel " << (packet.channelnum % 0xFF)
- << ", size: " << packet.data.getSize() << std::endl);
- continue;
- }
- /* send acks immediately */
- if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) {
- rawSendAsPacket(packet.peer_id, packet.channelnum,
- packet.data, packet.reliable);
- if (peer->m_increment_packets_remaining > 0)
- peer->m_increment_packets_remaining--;
- } else {
- m_outgoing_queue.push(packet);
- pending_unreliable[packet.peer_id] = true;
- }
- }
- if (peer_packet_quota > 0) {
- for (session_t peerId : peerIds) {
- PeerHelper peer = m_connection->getPeerNoEx(peerId);
- if (!peer)
- continue;
- if (peer->m_increment_packets_remaining == 0) {
- LOG(warningstream << m_connection->getDesc()
- << " Packet quota used up for peer_id=" << peerId
- << ", was " << peer_packet_quota << " pkts" << std::endl);
- }
- }
- }
- for (session_t peerId : pendingDisconnect) {
- if (!pending_unreliable[peerId]) {
- m_connection->deletePeer(peerId, false);
- }
- }
- }
- void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
- const SharedBuffer<u8> &data, bool ack)
- {
- OutgoingPacket packet(peer_id, channelnum, data, false, ack);
- m_outgoing_queue.push(packet);
- }
- ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
- Thread("ConnectionReceive")
- {
- }
- void *ConnectionReceiveThread::run()
- {
- assert(m_connection);
- LOG(dout_con << m_connection->getDesc()
- << "ConnectionReceive thread started" << std::endl);
- PROFILE(std::stringstream
- ThreadIdentifier);
- PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
- // use IPv6 minimum allowed MTU as receive buffer size as this is
- // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
- // infrastructure
- const unsigned int packet_maxsize = 1500;
- SharedBuffer<u8> packetdata(packet_maxsize);
- bool packet_queued = true;
- #ifdef DEBUG_CONNECTION_KBPS
- u64 curtime = porting::getTimeMs();
- u64 lasttime = curtime;
- float debug_print_timer = 0.0;
- #endif
- while (!stopRequested()) {
- BEGIN_DEBUG_EXCEPTION_HANDLER
- PROFILE(ScopeProfiler
- sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
- #ifdef DEBUG_CONNECTION_KBPS
- lasttime = curtime;
- curtime = porting::getTimeMs();
- float dtime = CALC_DTIME(lasttime,curtime);
- #endif
- /* receive packets */
- receive(packetdata, packet_queued);
- #ifdef DEBUG_CONNECTION_KBPS
- debug_print_timer += dtime;
- if (debug_print_timer > 20.0) {
- debug_print_timer -= 20.0;
- std::vector<session_t> peerids = m_connection->getPeerIDs();
- for (auto id : peerids)
- {
- PeerHelper peer = m_connection->getPeerNoEx(id);
- if (!peer)
- continue;
- float peer_current = 0.0;
- float peer_loss = 0.0;
- float avg_rate = 0.0;
- float avg_loss = 0.0;
- for(u16 j=0; j<CHANNEL_COUNT; j++)
- {
- peer_current +=peer->channels[j].getCurrentDownloadRateKB();
- peer_loss += peer->channels[j].getCurrentLossRateKB();
- avg_rate += peer->channels[j].getAvgDownloadRateKB();
- avg_loss += peer->channels[j].getAvgLossRateKB();
- }
- std::stringstream output;
- output << std::fixed << std::setprecision(1);
- output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
- output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
- output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
- output << std::setfill(' ');
- for(u16 j=0; j<CHANNEL_COUNT; j++)
- {
- output << "\tcha " << j << ":"
- << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
- << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
- << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
- << " /"
- << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
- << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
- << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
- << " / WS: " << peer->channels[j].getWindowSize()
- << std::endl;
- }
- fprintf(stderr,"%s\n",output.str().c_str());
- }
- }
- #endif
- END_DEBUG_EXCEPTION_HANDLER
- }
- PROFILE(g_profiler->remove(ThreadIdentifier.str()));
- return NULL;
- }
- // Receive packets from the network and buffers and create ConnectionEvents
- void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
- bool &packet_queued)
- {
- try {
- // First, see if there any buffered packets we can process now
- if (packet_queued) {
- session_t peer_id;
- SharedBuffer<u8> resultdata;
- while (true) {
- try {
- if (!getFromBuffers(peer_id, resultdata))
- break;
- m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
- }
- catch (ProcessedSilentlyException &e) {
- /* try reading again */
- }
- }
- packet_queued = false;
- }
- // Call Receive() to wait for incoming data
- Address sender;
- s32 received_size = m_connection->m_udpSocket.Receive(sender,
- *packetdata, packetdata.getSize());
- if (received_size < 0)
- return;
- if ((received_size < BASE_HEADER_SIZE) ||
- (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
- LOG(derr_con << m_connection->getDesc()
- << "Receive(): Invalid incoming packet, "
- << "size: " << received_size
- << ", protocol: "
- << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
- << std::endl);
- return;
- }
- session_t peer_id = readPeerId(*packetdata);
- u8 channelnum = readChannel(*packetdata);
- if (channelnum > CHANNEL_COUNT - 1) {
- LOG(derr_con << m_connection->getDesc()
- << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
- return;
- }
- /* Try to identify peer by sender address (may happen on join) */
- if (peer_id == PEER_ID_INEXISTENT) {
- peer_id = m_connection->lookupPeer(sender);
- // We do not have to remind the peer of its
- // peer id as the CONTROLTYPE_SET_PEER_ID
- // command was sent reliably.
- }
- if (peer_id == PEER_ID_INEXISTENT) {
- /* Ignore it if we are a client */
- if (m_connection->ConnectedToServer())
- return;
- /* The peer was not found in our lists. Add it. */
- peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
- }
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
- if (!peer) {
- LOG(dout_con << m_connection->getDesc()
- << " got packet from unknown peer_id: "
- << peer_id << " Ignoring." << std::endl);
- return;
- }
- // Validate peer address
- Address peer_address;
- if (peer->getAddress(MTP_UDP, peer_address)) {
- if (peer_address != sender) {
- LOG(derr_con << m_connection->getDesc()
- << " Peer " << peer_id << " sending from different address."
- " Ignoring." << std::endl);
- return;
- }
- } else {
- LOG(derr_con << m_connection->getDesc()
- << " Peer " << peer_id << " doesn't have an address?!"
- " Ignoring." << std::endl);
- return;
- }
- peer->ResetTimeout();
- Channel *channel = nullptr;
- if (dynamic_cast<UDPPeer *>(&peer)) {
- channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
- } else {
- LOG(derr_con << m_connection->getDesc()
- << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
- " Ignoring." << std::endl);
- return;
- }
- channel->UpdateBytesReceived(received_size);
- // Throw the received packet to channel->processPacket()
- // Make a new SharedBuffer from the data without the base headers
- SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
- memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
- strippeddata.getSize());
- try {
- // Process it (the result is some data with no headers made by us)
- SharedBuffer<u8> resultdata = processPacket
- (channel, strippeddata, peer_id, channelnum, false);
- LOG(dout_con << m_connection->getDesc()
- << " ProcessPacket from peer_id: " << peer_id
- << ", channel: " << (u32)channelnum << ", returned "
- << resultdata.getSize() << " bytes" << std::endl);
- m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
- }
- catch (ProcessedSilentlyException &e) {
- }
- catch (ProcessedQueued &e) {
- // we set it to true anyway (see below)
- }
- /* Every time we receive a packet it can happen that a previously
- * buffered packet is now ready to process. */
- packet_queued = true;
- }
- catch (InvalidIncomingDataException &e) {
- }
- }
- bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
- {
- std::vector<session_t> peerids = m_connection->getPeerIDs();
- for (session_t peerid : peerids) {
- PeerHelper peer = m_connection->getPeerNoEx(peerid);
- if (!peer)
- continue;
- UDPPeer *p = dynamic_cast<UDPPeer *>(&peer);
- if (!p)
- continue;
- for (Channel &channel : p->channels) {
- if (checkIncomingBuffers(&channel, peer_id, dst)) {
- return true;
- }
- }
- }
- return false;
- }
- bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
- session_t &peer_id, SharedBuffer<u8> &dst)
- {
- u16 firstseqnum = 0;
- if (!channel->incoming_reliables.getFirstSeqnum(firstseqnum))
- return false;
- if (firstseqnum != channel->readNextIncomingSeqNum())
- return false;
- BufferedPacketPtr p = channel->incoming_reliables.popFirst();
- peer_id = readPeerId(p->data); // Carried over to caller function
- u8 channelnum = readChannel(p->data);
- u16 seqnum = p->getSeqnum();
- LOG(dout_con << m_connection->getDesc()
- << "UNBUFFERING TYPE_RELIABLE"
- << " seqnum=" << seqnum
- << " peer_id=" << peer_id
- << " channel=" << ((int) channelnum & 0xff)
- << std::endl);
- channel->incNextIncomingSeqNum();
- u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
- // Get out the inside packet and re-process it
- SharedBuffer<u8> payload(p->size() - headers_size);
- memcpy(*payload, &p->data[headers_size], payload.getSize());
- dst = processPacket(channel, payload, peer_id, channelnum, true);
- return true;
- }
- SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
- const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable)
- {
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
- if (!peer) {
- errorstream << "Peer not found (possible timeout)" << std::endl;
- throw ProcessedSilentlyException("Peer not found (possible timeout)");
- }
- if (packetdata.getSize() < 1)
- throw InvalidIncomingDataException("packetdata.getSize() < 1");
- u8 type = readU8(&(packetdata[0]));
- if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
- std::string errmsg = "Invalid peer_id=" + itos(peer_id);
- errorstream << errmsg << std::endl;
- throw InvalidIncomingDataException(errmsg.c_str());
- }
- if (type >= PACKET_TYPE_MAX) {
- derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
- << std::endl;
- throw InvalidIncomingDataException("Invalid packet type");
- }
- const PacketTypeHandler &pHandle = packetTypeRouter[type];
- return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
- }
- const ConnectionReceiveThread::PacketTypeHandler
- ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
- {&ConnectionReceiveThread::handlePacketType_Control},
- {&ConnectionReceiveThread::handlePacketType_Original},
- {&ConnectionReceiveThread::handlePacketType_Split},
- {&ConnectionReceiveThread::handlePacketType_Reliable},
- };
- SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
- const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
- {
- if (packetdata.getSize() < 2)
- throw InvalidIncomingDataException("packetdata.getSize() < 2");
- ControlType controltype = (ControlType)readU8(&(packetdata[1]));
- if (controltype == CONTROLTYPE_ACK) {
- assert(channel != NULL);
- if (packetdata.getSize() < 4) {
- throw InvalidIncomingDataException(
- "packetdata.getSize() < 4 (ACK header size)");
- }
- u16 seqnum = readU16(&packetdata[2]);
- LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
- << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
- << seqnum << " ]" << std::endl);
- try {
- BufferedPacketPtr p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
- // the rtt calculation will be a bit off for re-sent packets but that's okay
- {
- // Get round trip time
- u64 current_time = porting::getTimeMs();
- // a overflow is quite unlikely but as it'd result in major
- // rtt miscalculation we handle it here
- if (current_time > p->absolute_send_time) {
- float rtt = (current_time - p->absolute_send_time) / 1000.0;
- // Let peer calculate stuff according to it
- // (avg_rtt and resend_timeout)
- dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
- } else if (p->totaltime > 0) {
- float rtt = p->totaltime;
- // Let peer calculate stuff according to it
- // (avg_rtt and resend_timeout)
- dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
- }
- }
- // put bytes for max bandwidth calculation
- channel->UpdateBytesSent(p->size(), 1);
- if (channel->outgoing_reliables_sent.size() == 0)
- m_connection->TriggerSend();
- } catch (NotFoundException &e) {
- LOG(derr_con << m_connection->getDesc()
- << "WARNING: ACKed packet not in outgoing queue"
- << " seqnum=" << seqnum << std::endl);
- channel->UpdatePacketTooLateCounter();
- }
- throw ProcessedSilentlyException("Got an ACK");
- } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
- // Got a packet to set our peer id
- if (packetdata.getSize() < 4)
- throw InvalidIncomingDataException
- ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
- session_t peer_id_new = readU16(&packetdata[2]);
- LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
- << "... " << std::endl);
- if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
- LOG(derr_con << m_connection->getDesc()
- << "WARNING: Not changing existing peer id." << std::endl);
- } else {
- LOG(dout_con << m_connection->getDesc() << "changing own peer id"
- << std::endl);
- m_connection->SetPeerID(peer_id_new);
- }
- throw ProcessedSilentlyException("Got a SET_PEER_ID");
- } else if (controltype == CONTROLTYPE_PING) {
- // Just ignore it, the incoming data already reset
- // the timeout counter
- LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
- throw ProcessedSilentlyException("Got a PING");
- } else if (controltype == CONTROLTYPE_DISCO) {
- // Just ignore it, the incoming data already reset
- // the timeout counter
- LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
- << peer->id << std::endl);
- if (!m_connection->deletePeer(peer->id, false)) {
- derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
- }
- throw ProcessedSilentlyException("Got a DISCO");
- } else {
- LOG(derr_con << m_connection->getDesc()
- << "INVALID controltype="
- << ((int) controltype & 0xff) << std::endl);
- throw InvalidIncomingDataException("Invalid control type");
- }
- }
- SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
- const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
- {
- if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
- throw InvalidIncomingDataException
- ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
- LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
- << std::endl);
- // Get the inside packet out and return it
- SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
- memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
- return payload;
- }
- SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
- const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
- {
- Address peer_address;
- if (peer->getAddress(MTP_UDP, peer_address)) {
- // We have to create a packet again for buffering
- // This isn't actually too bad an idea.
- BufferedPacketPtr packet = con::makePacket(peer_address,
- packetdata,
- m_connection->GetProtocolID(),
- peer->id,
- channelnum);
- // Buffer the packet
- SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
- if (data.getSize() != 0) {
- LOG(dout_con << m_connection->getDesc()
- << "RETURNING TYPE_SPLIT: Constructed full data, "
- << "size=" << data.getSize() << std::endl);
- return data;
- }
- LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
- throw ProcessedSilentlyException("Buffered a split packet chunk");
- }
- // We should never get here.
- FATAL_ERROR("Invalid execution point");
- }
- SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
- const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
- {
- assert(channel != NULL);
- // Recursive reliable packets not allowed
- if (reliable)
- throw InvalidIncomingDataException("Found nested reliable packets");
- if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
- throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
- const u16 seqnum = readU16(&packetdata[1]);
- bool is_future_packet = false;
- bool is_old_packet = false;
- /* packet is within our receive window send ack */
- if (seqnum_in_window(seqnum,
- channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
- m_connection->sendAck(peer->id, channelnum, seqnum);
- } else {
- is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
- is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
- /* packet is not within receive window, don't send ack. *
- * if this was a valid packet it's gonna be retransmitted */
- if (is_future_packet)
- throw ProcessedSilentlyException(
- "Received packet newer then expected, not sending ack");
- /* seems like our ack was lost, send another one for a old packet */
- if (is_old_packet) {
- LOG(dout_con << m_connection->getDesc()
- << "RE-SENDING ACK: peer_id: " << peer->id
- << ", channel: " << (channelnum & 0xFF)
- << ", seqnum: " << seqnum << std::endl;)
- m_connection->sendAck(peer->id, channelnum, seqnum);
- // we already have this packet so this one was on wire at least
- // the current timeout
- // we don't know how long this packet was on wire don't do silly guessing
- // dynamic_cast<UDPPeer*>(&peer)->
- // reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
- throw ProcessedSilentlyException("Retransmitting ack for old packet");
- }
- }
- if (seqnum != channel->readNextIncomingSeqNum()) {
- Address peer_address;
- // this is a reliable packet so we have a udp address for sure
- peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
- // This one comes later, buffer it.
- // Actually we have to make a packet to buffer one.
- // Well, we have all the ingredients, so just do it.
- BufferedPacketPtr packet = con::makePacket(
- peer_address,
- packetdata,
- m_connection->GetProtocolID(),
- peer->id,
- channelnum);
- try {
- channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
- LOG(dout_con << m_connection->getDesc()
- << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
- << ", channel: " << (channelnum & 0xFF)
- << ", seqnum: " << seqnum << std::endl;)
- throw ProcessedQueued("Buffered future reliable packet");
- } catch (AlreadyExistsException &e) {
- } catch (IncomingDataCorruption &e) {
- m_connection->putCommand(ConnectionCommand::disconnect_peer(peer->id));
- LOG(derr_con << m_connection->getDesc()
- << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
- << ", channel: " << (channelnum & 0xFF)
- << ", seqnum: " << seqnum
- << "DROPPING CLIENT!" << std::endl;)
- }
- }
- /* we got a packet to process right now */
- LOG(dout_con << m_connection->getDesc()
- << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
- << ", channel: " << (channelnum & 0xFF)
- << ", seqnum: " << seqnum << std::endl;)
- /* check for resend case */
- u16 queued_seqnum = 0;
- if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
- if (queued_seqnum == seqnum) {
- BufferedPacketPtr queued_packet = channel->incoming_reliables.popFirst();
- /** TODO find a way to verify the new against the old packet */
- }
- }
- channel->incNextIncomingSeqNum();
- // Get out the inside packet and re-process it
- SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
- memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
- return processPacket(channel, payload, peer->id, channelnum, true);
- }
- }
|