connectionthreads.cpp 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358
  1. /*
  2. Minetest
  3. Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
  4. Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
  5. This program is free software; you can redistribute it and/or modify
  6. it under the terms of the GNU Lesser General Public License as published by
  7. the Free Software Foundation; either version 2.1 of the License, or
  8. (at your option) any later version.
  9. This program is distributed in the hope that it will be useful,
  10. but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. GNU Lesser General Public License for more details.
  13. You should have received a copy of the GNU Lesser General Public License along
  14. with this program; if not, write to the Free Software Foundation, Inc.,
  15. 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  16. */
  17. #include "connectionthreads.h"
  18. #include "log.h"
  19. #include "profiler.h"
  20. #include "settings.h"
  21. #include "network/networkpacket.h"
  22. #include "util/serialize.h"
  23. namespace con
  24. {
  25. /******************************************************************************/
  26. /* defines used for debugging and profiling */
  27. /******************************************************************************/
  28. #ifdef NDEBUG
  29. #define PROFILE(a)
  30. #undef DEBUG_CONNECTION_KBPS
  31. #else
  32. /* this mutex is used to achieve log message consistency */
  33. #define PROFILE(a) a
  34. //#define DEBUG_CONNECTION_KBPS
  35. #undef DEBUG_CONNECTION_KBPS
  36. #endif
  37. // TODO: Clean this up.
  38. #define LOG(a) a
  39. #define WINDOW_SIZE 5
  40. static session_t readPeerId(const u8 *packetdata)
  41. {
  42. return readU16(&packetdata[4]);
  43. }
  44. static u8 readChannel(const u8 *packetdata)
  45. {
  46. return readU8(&packetdata[6]);
  47. }
  48. /******************************************************************************/
  49. /* Connection Threads */
  50. /******************************************************************************/
  51. ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
  52. float timeout) :
  53. Thread("ConnectionSend"),
  54. m_max_packet_size(max_packet_size),
  55. m_timeout(timeout),
  56. m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
  57. {
  58. SANITY_CHECK(m_max_data_packets_per_iteration > 1);
  59. }
  60. void *ConnectionSendThread::run()
  61. {
  62. assert(m_connection);
  63. LOG(dout_con << m_connection->getDesc()
  64. << "ConnectionSend thread started" << std::endl);
  65. u64 curtime = porting::getTimeMs();
  66. u64 lasttime = curtime;
  67. PROFILE(std::stringstream ThreadIdentifier);
  68. PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
  69. /* if stop is requested don't stop immediately but try to send all */
  70. /* packets first */
  71. while (!stopRequested() || packetsQueued()) {
  72. BEGIN_DEBUG_EXCEPTION_HANDLER
  73. PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
  74. m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
  75. /* wait for trigger or timeout */
  76. m_send_sleep_semaphore.wait(50);
  77. /* remove all triggers */
  78. while (m_send_sleep_semaphore.wait(0)) {
  79. }
  80. lasttime = curtime;
  81. curtime = porting::getTimeMs();
  82. float dtime = CALC_DTIME(lasttime, curtime);
  83. /* first resend timed-out packets */
  84. runTimeouts(dtime);
  85. if (m_iteration_packets_avaialble == 0) {
  86. LOG(warningstream << m_connection->getDesc()
  87. << " Packet quota used up after re-sending packets, "
  88. << "max=" << m_max_data_packets_per_iteration << std::endl);
  89. }
  90. /* translate commands to packets */
  91. auto c = m_connection->m_command_queue.pop_frontNoEx(0);
  92. while (c && c->type != CONNCMD_NONE) {
  93. if (c->reliable)
  94. processReliableCommand(c);
  95. else
  96. processNonReliableCommand(c);
  97. c = m_connection->m_command_queue.pop_frontNoEx(0);
  98. }
  99. /* send queued packets */
  100. sendPackets(dtime);
  101. END_DEBUG_EXCEPTION_HANDLER
  102. }
  103. PROFILE(g_profiler->remove(ThreadIdentifier.str()));
  104. return NULL;
  105. }
  106. void ConnectionSendThread::Trigger()
  107. {
  108. m_send_sleep_semaphore.post();
  109. }
  110. bool ConnectionSendThread::packetsQueued()
  111. {
  112. std::vector<session_t> peerIds = m_connection->getPeerIDs();
  113. if (!m_outgoing_queue.empty() && !peerIds.empty())
  114. return true;
  115. for (session_t peerId : peerIds) {
  116. PeerHelper peer = m_connection->getPeerNoEx(peerId);
  117. if (!peer)
  118. continue;
  119. if (dynamic_cast<UDPPeer *>(&peer) == 0)
  120. continue;
  121. for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
  122. if (!channel.queued_commands.empty()) {
  123. return true;
  124. }
  125. }
  126. }
  127. return false;
  128. }
  129. void ConnectionSendThread::runTimeouts(float dtime)
  130. {
  131. std::vector<session_t> timeouted_peers;
  132. std::vector<session_t> peerIds = m_connection->getPeerIDs();
  133. const u32 numpeers = m_connection->m_peers.size();
  134. if (numpeers == 0)
  135. return;
  136. for (session_t &peerId : peerIds) {
  137. PeerHelper peer = m_connection->getPeerNoEx(peerId);
  138. if (!peer)
  139. continue;
  140. UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
  141. if (!udpPeer)
  142. continue;
  143. PROFILE(std::stringstream peerIdentifier);
  144. PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
  145. << ";" << peerId << ";RELIABLE]");
  146. PROFILE(ScopeProfiler
  147. peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
  148. SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
  149. /*
  150. Check peer timeout
  151. */
  152. if (peer->isTimedOut(m_timeout)) {
  153. infostream << m_connection->getDesc()
  154. << "RunTimeouts(): Peer " << peer->id
  155. << " has timed out."
  156. << std::endl;
  157. // Add peer to the list
  158. timeouted_peers.push_back(peer->id);
  159. // Don't bother going through the buffers of this one
  160. continue;
  161. }
  162. float resend_timeout = udpPeer->getResendTimeout();
  163. for (Channel &channel : udpPeer->channels) {
  164. // Remove timed out incomplete unreliable split packets
  165. channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
  166. // Increment reliable packet times
  167. channel.outgoing_reliables_sent.incrementTimeouts(dtime);
  168. // Re-send timed out outgoing reliables
  169. auto timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
  170. (m_max_data_packets_per_iteration / numpeers));
  171. channel.UpdatePacketLossCounter(timed_outs.size());
  172. g_profiler->graphAdd("packets_lost", timed_outs.size());
  173. m_iteration_packets_avaialble -= timed_outs.size();
  174. for (const auto &k : timed_outs) {
  175. u8 channelnum = readChannel(k->data);
  176. u16 seqnum = k->getSeqnum();
  177. channel.UpdateBytesLost(k->size());
  178. LOG(derr_con << m_connection->getDesc()
  179. << "RE-SENDING timed-out RELIABLE to "
  180. << k->address.serializeString()
  181. << "(t/o=" << resend_timeout << "): "
  182. << "count=" << k->resend_count
  183. << ", channel=" << ((int) channelnum & 0xff)
  184. << ", seqnum=" << seqnum
  185. << std::endl);
  186. rawSend(k.get());
  187. // do not handle rtt here as we can't decide if this packet was
  188. // lost or really takes more time to transmit
  189. }
  190. channel.UpdateTimers(dtime);
  191. }
  192. /* send ping if necessary */
  193. if (udpPeer->Ping(dtime, data)) {
  194. LOG(dout_con << m_connection->getDesc()
  195. << "Sending ping for peer_id: " << udpPeer->id << std::endl);
  196. /* this may fail if there ain't a sequence number left */
  197. if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
  198. //retrigger with reduced ping interval
  199. udpPeer->Ping(4.0, data);
  200. }
  201. }
  202. udpPeer->RunCommandQueues(m_max_packet_size,
  203. m_max_commands_per_iteration,
  204. m_max_packets_requeued);
  205. }
  206. // Remove timed out peers
  207. for (u16 timeouted_peer : timeouted_peers) {
  208. LOG(dout_con << m_connection->getDesc()
  209. << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
  210. m_connection->deletePeer(timeouted_peer, true);
  211. }
  212. }
  213. void ConnectionSendThread::rawSend(const BufferedPacket *p)
  214. {
  215. try {
  216. m_connection->m_udpSocket.Send(p->address, p->data, p->size());
  217. LOG(dout_con << m_connection->getDesc()
  218. << " rawSend: " << p->size()
  219. << " bytes sent" << std::endl);
  220. } catch (SendFailedException &e) {
  221. LOG(derr_con << m_connection->getDesc()
  222. << "Connection::rawSend(): SendFailedException: "
  223. << p->address.serializeString() << std::endl);
  224. }
  225. }
  226. void ConnectionSendThread::sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel)
  227. {
  228. try {
  229. p->absolute_send_time = porting::getTimeMs();
  230. // Buffer the packet
  231. channel->outgoing_reliables_sent.insert(p,
  232. (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
  233. % (MAX_RELIABLE_WINDOW_SIZE + 1));
  234. }
  235. catch (AlreadyExistsException &e) {
  236. LOG(derr_con << m_connection->getDesc()
  237. << "WARNING: Going to send a reliable packet"
  238. << " in outgoing buffer" << std::endl);
  239. }
  240. // Send the packet
  241. rawSend(p.get());
  242. }
  243. bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
  244. const SharedBuffer<u8> &data, bool reliable)
  245. {
  246. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  247. if (!peer) {
  248. LOG(errorstream << m_connection->getDesc()
  249. << " dropped " << (reliable ? "reliable " : "")
  250. << "packet for non existent peer_id: " << peer_id << std::endl);
  251. return false;
  252. }
  253. Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
  254. if (reliable) {
  255. bool have_seqnum = false;
  256. const u16 seqnum = channel->getOutgoingSequenceNumber(have_seqnum);
  257. if (!have_seqnum)
  258. return false;
  259. SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
  260. Address peer_address;
  261. peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
  262. // Add base headers and make a packet
  263. BufferedPacketPtr p = con::makePacket(peer_address, reliable,
  264. m_connection->GetProtocolID(), m_connection->GetPeerID(),
  265. channelnum);
  266. // first check if our send window is already maxed out
  267. if (channel->outgoing_reliables_sent.size() < channel->getWindowSize()) {
  268. LOG(dout_con << m_connection->getDesc()
  269. << " INFO: sending a reliable packet to peer_id " << peer_id
  270. << " channel: " << (u32)channelnum
  271. << " seqnum: " << seqnum << std::endl);
  272. sendAsPacketReliable(p, channel);
  273. return true;
  274. }
  275. LOG(dout_con << m_connection->getDesc()
  276. << " INFO: queueing reliable packet for peer_id: " << peer_id
  277. << " channel: " << (u32)channelnum
  278. << " seqnum: " << seqnum << std::endl);
  279. channel->queued_reliables.push(p);
  280. return false;
  281. }
  282. Address peer_address;
  283. if (peer->getAddress(MTP_UDP, peer_address)) {
  284. // Add base headers and make a packet
  285. BufferedPacketPtr p = con::makePacket(peer_address, data,
  286. m_connection->GetProtocolID(), m_connection->GetPeerID(),
  287. channelnum);
  288. // Send the packet
  289. rawSend(p.get());
  290. return true;
  291. }
  292. LOG(dout_con << m_connection->getDesc()
  293. << " INFO: dropped unreliable packet for peer_id: " << peer_id
  294. << " because of (yet) missing udp address" << std::endl);
  295. return false;
  296. }
  297. void ConnectionSendThread::processReliableCommand(ConnectionCommandPtr &c)
  298. {
  299. assert(c->reliable); // Pre-condition
  300. switch (c->type) {
  301. case CONNCMD_NONE:
  302. LOG(dout_con << m_connection->getDesc()
  303. << "UDP processing reliable CONNCMD_NONE" << std::endl);
  304. return;
  305. case CONNCMD_SEND:
  306. LOG(dout_con << m_connection->getDesc()
  307. << "UDP processing reliable CONNCMD_SEND" << std::endl);
  308. sendReliable(c);
  309. return;
  310. case CONNCMD_SEND_TO_ALL:
  311. LOG(dout_con << m_connection->getDesc()
  312. << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
  313. sendToAllReliable(c);
  314. return;
  315. case CONCMD_CREATE_PEER:
  316. LOG(dout_con << m_connection->getDesc()
  317. << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
  318. if (!rawSendAsPacket(c->peer_id, c->channelnum, c->data, c->reliable)) {
  319. /* put to queue if we couldn't send it immediately */
  320. sendReliable(c);
  321. }
  322. return;
  323. case CONNCMD_SERVE:
  324. case CONNCMD_CONNECT:
  325. case CONNCMD_DISCONNECT:
  326. case CONCMD_ACK:
  327. FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
  328. default:
  329. LOG(dout_con << m_connection->getDesc()
  330. << " Invalid reliable command type: " << c->type << std::endl);
  331. }
  332. }
  333. void ConnectionSendThread::processNonReliableCommand(ConnectionCommandPtr &c_ptr)
  334. {
  335. const ConnectionCommand &c = *c_ptr;
  336. assert(!c.reliable); // Pre-condition
  337. switch (c.type) {
  338. case CONNCMD_NONE:
  339. LOG(dout_con << m_connection->getDesc()
  340. << " UDP processing CONNCMD_NONE" << std::endl);
  341. return;
  342. case CONNCMD_SERVE:
  343. LOG(dout_con << m_connection->getDesc()
  344. << " UDP processing CONNCMD_SERVE port="
  345. << c.address.serializeString() << std::endl);
  346. serve(c.address);
  347. return;
  348. case CONNCMD_CONNECT:
  349. LOG(dout_con << m_connection->getDesc()
  350. << " UDP processing CONNCMD_CONNECT" << std::endl);
  351. connect(c.address);
  352. return;
  353. case CONNCMD_DISCONNECT:
  354. LOG(dout_con << m_connection->getDesc()
  355. << " UDP processing CONNCMD_DISCONNECT" << std::endl);
  356. disconnect();
  357. return;
  358. case CONNCMD_DISCONNECT_PEER:
  359. LOG(dout_con << m_connection->getDesc()
  360. << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
  361. disconnect_peer(c.peer_id);
  362. return;
  363. case CONNCMD_SEND:
  364. LOG(dout_con << m_connection->getDesc()
  365. << " UDP processing CONNCMD_SEND" << std::endl);
  366. send(c.peer_id, c.channelnum, c.data);
  367. return;
  368. case CONNCMD_SEND_TO_ALL:
  369. LOG(dout_con << m_connection->getDesc()
  370. << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
  371. sendToAll(c.channelnum, c.data);
  372. return;
  373. case CONCMD_ACK:
  374. LOG(dout_con << m_connection->getDesc()
  375. << " UDP processing CONCMD_ACK" << std::endl);
  376. sendAsPacket(c.peer_id, c.channelnum, c.data, true);
  377. return;
  378. case CONCMD_CREATE_PEER:
  379. FATAL_ERROR("Got command that should be reliable as unreliable command");
  380. default:
  381. LOG(dout_con << m_connection->getDesc()
  382. << " Invalid command type: " << c.type << std::endl);
  383. }
  384. }
  385. void ConnectionSendThread::serve(Address bind_address)
  386. {
  387. LOG(dout_con << m_connection->getDesc()
  388. << "UDP serving at port " << bind_address.serializeString() << std::endl);
  389. try {
  390. m_connection->m_udpSocket.Bind(bind_address);
  391. m_connection->SetPeerID(PEER_ID_SERVER);
  392. }
  393. catch (SocketException &e) {
  394. // Create event
  395. m_connection->putEvent(ConnectionEvent::bindFailed());
  396. }
  397. }
  398. void ConnectionSendThread::connect(Address address)
  399. {
  400. LOG(dout_con << m_connection->getDesc() << " connecting to "
  401. << address.serializeString()
  402. << ":" << address.getPort() << std::endl);
  403. UDPPeer *peer = m_connection->createServerPeer(address);
  404. // Create event
  405. m_connection->putEvent(ConnectionEvent::peerAdded(peer->id, peer->address));
  406. Address bind_addr;
  407. if (address.isIPv6())
  408. bind_addr.setAddress((IPv6AddressBytes *) NULL);
  409. else
  410. bind_addr.setAddress(0, 0, 0, 0);
  411. m_connection->m_udpSocket.Bind(bind_addr);
  412. // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
  413. m_connection->SetPeerID(PEER_ID_INEXISTENT);
  414. NetworkPacket pkt(0, 0);
  415. m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
  416. }
  417. void ConnectionSendThread::disconnect()
  418. {
  419. LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
  420. // Create and send DISCO packet
  421. SharedBuffer<u8> data(2);
  422. writeU8(&data[0], PACKET_TYPE_CONTROL);
  423. writeU8(&data[1], CONTROLTYPE_DISCO);
  424. // Send to all
  425. std::vector<session_t> peerids = m_connection->getPeerIDs();
  426. for (session_t peerid : peerids) {
  427. sendAsPacket(peerid, 0, data, false);
  428. }
  429. }
  430. void ConnectionSendThread::disconnect_peer(session_t peer_id)
  431. {
  432. LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
  433. // Create and send DISCO packet
  434. SharedBuffer<u8> data(2);
  435. writeU8(&data[0], PACKET_TYPE_CONTROL);
  436. writeU8(&data[1], CONTROLTYPE_DISCO);
  437. sendAsPacket(peer_id, 0, data, false);
  438. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  439. if (!peer)
  440. return;
  441. if (dynamic_cast<UDPPeer *>(&peer) == 0) {
  442. return;
  443. }
  444. dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
  445. }
  446. void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
  447. const SharedBuffer<u8> &data)
  448. {
  449. assert(channelnum < CHANNEL_COUNT); // Pre-condition
  450. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  451. if (!peer) {
  452. LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
  453. << ">>>NOT<<< found on sending packet"
  454. << ", channel " << (channelnum % 0xFF)
  455. << ", size: " << data.getSize() << std::endl);
  456. return;
  457. }
  458. LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
  459. << ", channel " << (channelnum % 0xFF)
  460. << ", size: " << data.getSize() << std::endl);
  461. u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
  462. u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
  463. std::list<SharedBuffer<u8>> originals;
  464. makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
  465. peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
  466. for (const SharedBuffer<u8> &original : originals) {
  467. sendAsPacket(peer_id, channelnum, original);
  468. }
  469. }
  470. void ConnectionSendThread::sendReliable(ConnectionCommandPtr &c)
  471. {
  472. PeerHelper peer = m_connection->getPeerNoEx(c->peer_id);
  473. if (!peer)
  474. return;
  475. peer->PutReliableSendCommand(c, m_max_packet_size);
  476. }
  477. void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data)
  478. {
  479. std::vector<session_t> peerids = m_connection->getPeerIDs();
  480. for (session_t peerid : peerids) {
  481. send(peerid, channelnum, data);
  482. }
  483. }
  484. void ConnectionSendThread::sendToAllReliable(ConnectionCommandPtr &c)
  485. {
  486. std::vector<session_t> peerids = m_connection->getPeerIDs();
  487. for (session_t peerid : peerids) {
  488. PeerHelper peer = m_connection->getPeerNoEx(peerid);
  489. if (!peer)
  490. continue;
  491. peer->PutReliableSendCommand(c, m_max_packet_size);
  492. }
  493. }
  494. void ConnectionSendThread::sendPackets(float dtime)
  495. {
  496. std::vector<session_t> peerIds = m_connection->getPeerIDs();
  497. std::vector<session_t> pendingDisconnect;
  498. std::map<session_t, bool> pending_unreliable;
  499. const unsigned int peer_packet_quota = m_iteration_packets_avaialble
  500. / MYMAX(peerIds.size(), 1);
  501. for (session_t peerId : peerIds) {
  502. PeerHelper peer = m_connection->getPeerNoEx(peerId);
  503. //peer may have been removed
  504. if (!peer) {
  505. LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
  506. << peerId
  507. << std::endl);
  508. continue;
  509. }
  510. peer->m_increment_packets_remaining = peer_packet_quota;
  511. UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
  512. if (!udpPeer) {
  513. continue;
  514. }
  515. if (udpPeer->m_pending_disconnect) {
  516. pendingDisconnect.push_back(peerId);
  517. }
  518. PROFILE(std::stringstream
  519. peerIdentifier);
  520. PROFILE(
  521. peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
  522. << ";RELIABLE]");
  523. PROFILE(ScopeProfiler
  524. peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
  525. LOG(dout_con << m_connection->getDesc()
  526. << " Handle per peer queues: peer_id=" << peerId
  527. << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
  528. // first send queued reliable packets for all peers (if possible)
  529. for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
  530. Channel &channel = udpPeer->channels[i];
  531. // Reduces logging verbosity
  532. if (channel.queued_reliables.empty())
  533. continue;
  534. u16 next_to_ack = 0;
  535. channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
  536. u16 next_to_receive = 0;
  537. channel.incoming_reliables.getFirstSeqnum(next_to_receive);
  538. LOG(dout_con << m_connection->getDesc() << "\t channel: "
  539. << i << ", peer quota:"
  540. << peer->m_increment_packets_remaining
  541. << std::endl
  542. << "\t\t\treliables on wire: "
  543. << channel.outgoing_reliables_sent.size()
  544. << ", waiting for ack for " << next_to_ack
  545. << std::endl
  546. << "\t\t\tincoming_reliables: "
  547. << channel.incoming_reliables.size()
  548. << ", next reliable packet: "
  549. << channel.readNextIncomingSeqNum()
  550. << ", next queued: " << next_to_receive
  551. << std::endl
  552. << "\t\t\treliables queued : "
  553. << channel.queued_reliables.size()
  554. << std::endl
  555. << "\t\t\tqueued commands : "
  556. << channel.queued_commands.size()
  557. << std::endl);
  558. while (!channel.queued_reliables.empty() &&
  559. channel.outgoing_reliables_sent.size()
  560. < channel.getWindowSize() &&
  561. peer->m_increment_packets_remaining > 0) {
  562. BufferedPacketPtr p = channel.queued_reliables.front();
  563. channel.queued_reliables.pop();
  564. LOG(dout_con << m_connection->getDesc()
  565. << " INFO: sending a queued reliable packet "
  566. << " channel: " << i
  567. << ", seqnum: " << p->getSeqnum()
  568. << std::endl);
  569. sendAsPacketReliable(p, &channel);
  570. peer->m_increment_packets_remaining--;
  571. }
  572. }
  573. }
  574. if (!m_outgoing_queue.empty()) {
  575. LOG(dout_con << m_connection->getDesc()
  576. << " Handle non reliable queue ("
  577. << m_outgoing_queue.size() << " pkts)" << std::endl);
  578. }
  579. unsigned int initial_queuesize = m_outgoing_queue.size();
  580. /* send non reliable packets*/
  581. for (unsigned int i = 0; i < initial_queuesize; i++) {
  582. OutgoingPacket packet = m_outgoing_queue.front();
  583. m_outgoing_queue.pop();
  584. if (packet.reliable)
  585. continue;
  586. PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
  587. if (!peer) {
  588. LOG(dout_con << m_connection->getDesc()
  589. << " Outgoing queue: peer_id=" << packet.peer_id
  590. << ">>>NOT<<< found on sending packet"
  591. << ", channel " << (packet.channelnum % 0xFF)
  592. << ", size: " << packet.data.getSize() << std::endl);
  593. continue;
  594. }
  595. /* send acks immediately */
  596. if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) {
  597. rawSendAsPacket(packet.peer_id, packet.channelnum,
  598. packet.data, packet.reliable);
  599. if (peer->m_increment_packets_remaining > 0)
  600. peer->m_increment_packets_remaining--;
  601. } else {
  602. m_outgoing_queue.push(packet);
  603. pending_unreliable[packet.peer_id] = true;
  604. }
  605. }
  606. if (peer_packet_quota > 0) {
  607. for (session_t peerId : peerIds) {
  608. PeerHelper peer = m_connection->getPeerNoEx(peerId);
  609. if (!peer)
  610. continue;
  611. if (peer->m_increment_packets_remaining == 0) {
  612. LOG(warningstream << m_connection->getDesc()
  613. << " Packet quota used up for peer_id=" << peerId
  614. << ", was " << peer_packet_quota << " pkts" << std::endl);
  615. }
  616. }
  617. }
  618. for (session_t peerId : pendingDisconnect) {
  619. if (!pending_unreliable[peerId]) {
  620. m_connection->deletePeer(peerId, false);
  621. }
  622. }
  623. }
  624. void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
  625. const SharedBuffer<u8> &data, bool ack)
  626. {
  627. OutgoingPacket packet(peer_id, channelnum, data, false, ack);
  628. m_outgoing_queue.push(packet);
  629. }
  630. ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
  631. Thread("ConnectionReceive")
  632. {
  633. }
  634. void *ConnectionReceiveThread::run()
  635. {
  636. assert(m_connection);
  637. LOG(dout_con << m_connection->getDesc()
  638. << "ConnectionReceive thread started" << std::endl);
  639. PROFILE(std::stringstream
  640. ThreadIdentifier);
  641. PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
  642. // use IPv6 minimum allowed MTU as receive buffer size as this is
  643. // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
  644. // infrastructure
  645. const unsigned int packet_maxsize = 1500;
  646. SharedBuffer<u8> packetdata(packet_maxsize);
  647. bool packet_queued = true;
  648. #ifdef DEBUG_CONNECTION_KBPS
  649. u64 curtime = porting::getTimeMs();
  650. u64 lasttime = curtime;
  651. float debug_print_timer = 0.0;
  652. #endif
  653. while (!stopRequested()) {
  654. BEGIN_DEBUG_EXCEPTION_HANDLER
  655. PROFILE(ScopeProfiler
  656. sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
  657. #ifdef DEBUG_CONNECTION_KBPS
  658. lasttime = curtime;
  659. curtime = porting::getTimeMs();
  660. float dtime = CALC_DTIME(lasttime,curtime);
  661. #endif
  662. /* receive packets */
  663. receive(packetdata, packet_queued);
  664. #ifdef DEBUG_CONNECTION_KBPS
  665. debug_print_timer += dtime;
  666. if (debug_print_timer > 20.0) {
  667. debug_print_timer -= 20.0;
  668. std::vector<session_t> peerids = m_connection->getPeerIDs();
  669. for (auto id : peerids)
  670. {
  671. PeerHelper peer = m_connection->getPeerNoEx(id);
  672. if (!peer)
  673. continue;
  674. float peer_current = 0.0;
  675. float peer_loss = 0.0;
  676. float avg_rate = 0.0;
  677. float avg_loss = 0.0;
  678. for(u16 j=0; j<CHANNEL_COUNT; j++)
  679. {
  680. peer_current +=peer->channels[j].getCurrentDownloadRateKB();
  681. peer_loss += peer->channels[j].getCurrentLossRateKB();
  682. avg_rate += peer->channels[j].getAvgDownloadRateKB();
  683. avg_loss += peer->channels[j].getAvgLossRateKB();
  684. }
  685. std::stringstream output;
  686. output << std::fixed << std::setprecision(1);
  687. output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
  688. output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
  689. output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
  690. output << std::setfill(' ');
  691. for(u16 j=0; j<CHANNEL_COUNT; j++)
  692. {
  693. output << "\tcha " << j << ":"
  694. << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
  695. << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
  696. << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
  697. << " /"
  698. << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
  699. << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
  700. << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
  701. << " / WS: " << peer->channels[j].getWindowSize()
  702. << std::endl;
  703. }
  704. fprintf(stderr,"%s\n",output.str().c_str());
  705. }
  706. }
  707. #endif
  708. END_DEBUG_EXCEPTION_HANDLER
  709. }
  710. PROFILE(g_profiler->remove(ThreadIdentifier.str()));
  711. return NULL;
  712. }
  713. // Receive packets from the network and buffers and create ConnectionEvents
  714. void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
  715. bool &packet_queued)
  716. {
  717. try {
  718. // First, see if there any buffered packets we can process now
  719. if (packet_queued) {
  720. session_t peer_id;
  721. SharedBuffer<u8> resultdata;
  722. while (true) {
  723. try {
  724. if (!getFromBuffers(peer_id, resultdata))
  725. break;
  726. m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
  727. }
  728. catch (ProcessedSilentlyException &e) {
  729. /* try reading again */
  730. }
  731. }
  732. packet_queued = false;
  733. }
  734. // Call Receive() to wait for incoming data
  735. Address sender;
  736. s32 received_size = m_connection->m_udpSocket.Receive(sender,
  737. *packetdata, packetdata.getSize());
  738. if (received_size < 0)
  739. return;
  740. if ((received_size < BASE_HEADER_SIZE) ||
  741. (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
  742. LOG(derr_con << m_connection->getDesc()
  743. << "Receive(): Invalid incoming packet, "
  744. << "size: " << received_size
  745. << ", protocol: "
  746. << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
  747. << std::endl);
  748. return;
  749. }
  750. session_t peer_id = readPeerId(*packetdata);
  751. u8 channelnum = readChannel(*packetdata);
  752. if (channelnum > CHANNEL_COUNT - 1) {
  753. LOG(derr_con << m_connection->getDesc()
  754. << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
  755. return;
  756. }
  757. /* Try to identify peer by sender address (may happen on join) */
  758. if (peer_id == PEER_ID_INEXISTENT) {
  759. peer_id = m_connection->lookupPeer(sender);
  760. // We do not have to remind the peer of its
  761. // peer id as the CONTROLTYPE_SET_PEER_ID
  762. // command was sent reliably.
  763. }
  764. if (peer_id == PEER_ID_INEXISTENT) {
  765. /* Ignore it if we are a client */
  766. if (m_connection->ConnectedToServer())
  767. return;
  768. /* The peer was not found in our lists. Add it. */
  769. peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
  770. }
  771. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  772. if (!peer) {
  773. LOG(dout_con << m_connection->getDesc()
  774. << " got packet from unknown peer_id: "
  775. << peer_id << " Ignoring." << std::endl);
  776. return;
  777. }
  778. // Validate peer address
  779. Address peer_address;
  780. if (peer->getAddress(MTP_UDP, peer_address)) {
  781. if (peer_address != sender) {
  782. LOG(derr_con << m_connection->getDesc()
  783. << " Peer " << peer_id << " sending from different address."
  784. " Ignoring." << std::endl);
  785. return;
  786. }
  787. } else {
  788. LOG(derr_con << m_connection->getDesc()
  789. << " Peer " << peer_id << " doesn't have an address?!"
  790. " Ignoring." << std::endl);
  791. return;
  792. }
  793. peer->ResetTimeout();
  794. Channel *channel = nullptr;
  795. if (dynamic_cast<UDPPeer *>(&peer)) {
  796. channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
  797. } else {
  798. LOG(derr_con << m_connection->getDesc()
  799. << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
  800. " Ignoring." << std::endl);
  801. return;
  802. }
  803. channel->UpdateBytesReceived(received_size);
  804. // Throw the received packet to channel->processPacket()
  805. // Make a new SharedBuffer from the data without the base headers
  806. SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
  807. memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
  808. strippeddata.getSize());
  809. try {
  810. // Process it (the result is some data with no headers made by us)
  811. SharedBuffer<u8> resultdata = processPacket
  812. (channel, strippeddata, peer_id, channelnum, false);
  813. LOG(dout_con << m_connection->getDesc()
  814. << " ProcessPacket from peer_id: " << peer_id
  815. << ", channel: " << (u32)channelnum << ", returned "
  816. << resultdata.getSize() << " bytes" << std::endl);
  817. m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
  818. }
  819. catch (ProcessedSilentlyException &e) {
  820. }
  821. catch (ProcessedQueued &e) {
  822. // we set it to true anyway (see below)
  823. }
  824. /* Every time we receive a packet it can happen that a previously
  825. * buffered packet is now ready to process. */
  826. packet_queued = true;
  827. }
  828. catch (InvalidIncomingDataException &e) {
  829. }
  830. }
  831. bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
  832. {
  833. std::vector<session_t> peerids = m_connection->getPeerIDs();
  834. for (session_t peerid : peerids) {
  835. PeerHelper peer = m_connection->getPeerNoEx(peerid);
  836. if (!peer)
  837. continue;
  838. UDPPeer *p = dynamic_cast<UDPPeer *>(&peer);
  839. if (!p)
  840. continue;
  841. for (Channel &channel : p->channels) {
  842. if (checkIncomingBuffers(&channel, peer_id, dst)) {
  843. return true;
  844. }
  845. }
  846. }
  847. return false;
  848. }
  849. bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
  850. session_t &peer_id, SharedBuffer<u8> &dst)
  851. {
  852. u16 firstseqnum = 0;
  853. if (!channel->incoming_reliables.getFirstSeqnum(firstseqnum))
  854. return false;
  855. if (firstseqnum != channel->readNextIncomingSeqNum())
  856. return false;
  857. BufferedPacketPtr p = channel->incoming_reliables.popFirst();
  858. peer_id = readPeerId(p->data); // Carried over to caller function
  859. u8 channelnum = readChannel(p->data);
  860. u16 seqnum = p->getSeqnum();
  861. LOG(dout_con << m_connection->getDesc()
  862. << "UNBUFFERING TYPE_RELIABLE"
  863. << " seqnum=" << seqnum
  864. << " peer_id=" << peer_id
  865. << " channel=" << ((int) channelnum & 0xff)
  866. << std::endl);
  867. channel->incNextIncomingSeqNum();
  868. u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
  869. // Get out the inside packet and re-process it
  870. SharedBuffer<u8> payload(p->size() - headers_size);
  871. memcpy(*payload, &p->data[headers_size], payload.getSize());
  872. dst = processPacket(channel, payload, peer_id, channelnum, true);
  873. return true;
  874. }
  875. SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
  876. const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable)
  877. {
  878. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  879. if (!peer) {
  880. errorstream << "Peer not found (possible timeout)" << std::endl;
  881. throw ProcessedSilentlyException("Peer not found (possible timeout)");
  882. }
  883. if (packetdata.getSize() < 1)
  884. throw InvalidIncomingDataException("packetdata.getSize() < 1");
  885. u8 type = readU8(&(packetdata[0]));
  886. if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
  887. std::string errmsg = "Invalid peer_id=" + itos(peer_id);
  888. errorstream << errmsg << std::endl;
  889. throw InvalidIncomingDataException(errmsg.c_str());
  890. }
  891. if (type >= PACKET_TYPE_MAX) {
  892. derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
  893. << std::endl;
  894. throw InvalidIncomingDataException("Invalid packet type");
  895. }
  896. const PacketTypeHandler &pHandle = packetTypeRouter[type];
  897. return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
  898. }
  899. const ConnectionReceiveThread::PacketTypeHandler
  900. ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
  901. {&ConnectionReceiveThread::handlePacketType_Control},
  902. {&ConnectionReceiveThread::handlePacketType_Original},
  903. {&ConnectionReceiveThread::handlePacketType_Split},
  904. {&ConnectionReceiveThread::handlePacketType_Reliable},
  905. };
  906. SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
  907. const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
  908. {
  909. if (packetdata.getSize() < 2)
  910. throw InvalidIncomingDataException("packetdata.getSize() < 2");
  911. ControlType controltype = (ControlType)readU8(&(packetdata[1]));
  912. if (controltype == CONTROLTYPE_ACK) {
  913. assert(channel != NULL);
  914. if (packetdata.getSize() < 4) {
  915. throw InvalidIncomingDataException(
  916. "packetdata.getSize() < 4 (ACK header size)");
  917. }
  918. u16 seqnum = readU16(&packetdata[2]);
  919. LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
  920. << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
  921. << seqnum << " ]" << std::endl);
  922. try {
  923. BufferedPacketPtr p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
  924. // the rtt calculation will be a bit off for re-sent packets but that's okay
  925. {
  926. // Get round trip time
  927. u64 current_time = porting::getTimeMs();
  928. // an overflow is quite unlikely but as it'd result in major
  929. // rtt miscalculation we handle it here
  930. if (current_time > p->absolute_send_time) {
  931. float rtt = (current_time - p->absolute_send_time) / 1000.0;
  932. // Let peer calculate stuff according to it
  933. // (avg_rtt and resend_timeout)
  934. dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
  935. } else if (p->totaltime > 0) {
  936. float rtt = p->totaltime;
  937. // Let peer calculate stuff according to it
  938. // (avg_rtt and resend_timeout)
  939. dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
  940. }
  941. }
  942. // put bytes for max bandwidth calculation
  943. channel->UpdateBytesSent(p->size(), 1);
  944. if (channel->outgoing_reliables_sent.size() == 0)
  945. m_connection->TriggerSend();
  946. } catch (NotFoundException &e) {
  947. LOG(derr_con << m_connection->getDesc()
  948. << "WARNING: ACKed packet not in outgoing queue"
  949. << " seqnum=" << seqnum << std::endl);
  950. channel->UpdatePacketTooLateCounter();
  951. }
  952. throw ProcessedSilentlyException("Got an ACK");
  953. } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
  954. // Got a packet to set our peer id
  955. if (packetdata.getSize() < 4)
  956. throw InvalidIncomingDataException
  957. ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
  958. session_t peer_id_new = readU16(&packetdata[2]);
  959. LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
  960. << "... " << std::endl);
  961. if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
  962. LOG(derr_con << m_connection->getDesc()
  963. << "WARNING: Not changing existing peer id." << std::endl);
  964. } else {
  965. LOG(dout_con << m_connection->getDesc() << "changing own peer id"
  966. << std::endl);
  967. m_connection->SetPeerID(peer_id_new);
  968. }
  969. throw ProcessedSilentlyException("Got a SET_PEER_ID");
  970. } else if (controltype == CONTROLTYPE_PING) {
  971. // Just ignore it, the incoming data already reset
  972. // the timeout counter
  973. LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
  974. throw ProcessedSilentlyException("Got a PING");
  975. } else if (controltype == CONTROLTYPE_DISCO) {
  976. // Just ignore it, the incoming data already reset
  977. // the timeout counter
  978. LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
  979. << peer->id << std::endl);
  980. if (!m_connection->deletePeer(peer->id, false)) {
  981. derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
  982. }
  983. throw ProcessedSilentlyException("Got a DISCO");
  984. } else {
  985. LOG(derr_con << m_connection->getDesc()
  986. << "INVALID controltype="
  987. << ((int) controltype & 0xff) << std::endl);
  988. throw InvalidIncomingDataException("Invalid control type");
  989. }
  990. }
  991. SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
  992. const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
  993. {
  994. if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
  995. throw InvalidIncomingDataException
  996. ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
  997. LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
  998. << std::endl);
  999. // Get the inside packet out and return it
  1000. SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
  1001. memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
  1002. return payload;
  1003. }
  1004. SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
  1005. const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
  1006. {
  1007. Address peer_address;
  1008. if (peer->getAddress(MTP_UDP, peer_address)) {
  1009. // We have to create a packet again for buffering
  1010. // This isn't actually too bad an idea.
  1011. BufferedPacketPtr packet = con::makePacket(peer_address,
  1012. packetdata,
  1013. m_connection->GetProtocolID(),
  1014. peer->id,
  1015. channelnum);
  1016. // Buffer the packet
  1017. SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
  1018. if (data.getSize() != 0) {
  1019. LOG(dout_con << m_connection->getDesc()
  1020. << "RETURNING TYPE_SPLIT: Constructed full data, "
  1021. << "size=" << data.getSize() << std::endl);
  1022. return data;
  1023. }
  1024. LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
  1025. throw ProcessedSilentlyException("Buffered a split packet chunk");
  1026. }
  1027. // We should never get here.
  1028. FATAL_ERROR("Invalid execution point");
  1029. }
  1030. SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
  1031. const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
  1032. {
  1033. assert(channel != NULL);
  1034. // Recursive reliable packets not allowed
  1035. if (reliable)
  1036. throw InvalidIncomingDataException("Found nested reliable packets");
  1037. if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
  1038. throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
  1039. const u16 seqnum = readU16(&packetdata[1]);
  1040. bool is_future_packet = false;
  1041. bool is_old_packet = false;
  1042. /* packet is within our receive window send ack */
  1043. if (seqnum_in_window(seqnum,
  1044. channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
  1045. m_connection->sendAck(peer->id, channelnum, seqnum);
  1046. } else {
  1047. is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
  1048. is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
  1049. /* packet is not within receive window, don't send ack. *
  1050. * if this was a valid packet it's gonna be retransmitted */
  1051. if (is_future_packet)
  1052. throw ProcessedSilentlyException(
  1053. "Received packet newer then expected, not sending ack");
  1054. /* seems like our ack was lost, send another one for an old packet */
  1055. if (is_old_packet) {
  1056. LOG(dout_con << m_connection->getDesc()
  1057. << "RE-SENDING ACK: peer_id: " << peer->id
  1058. << ", channel: " << (channelnum & 0xFF)
  1059. << ", seqnum: " << seqnum << std::endl;)
  1060. m_connection->sendAck(peer->id, channelnum, seqnum);
  1061. // we already have this packet so this one was on wire at least
  1062. // the current timeout
  1063. // we don't know how long this packet was on wire don't do silly guessing
  1064. // dynamic_cast<UDPPeer*>(&peer)->
  1065. // reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
  1066. throw ProcessedSilentlyException("Retransmitting ack for old packet");
  1067. }
  1068. }
  1069. if (seqnum != channel->readNextIncomingSeqNum()) {
  1070. Address peer_address;
  1071. // this is a reliable packet so we have a udp address for sure
  1072. peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
  1073. // This one comes later, buffer it.
  1074. // Actually we have to make a packet to buffer one.
  1075. // Well, we have all the ingredients, so just do it.
  1076. BufferedPacketPtr packet = con::makePacket(
  1077. peer_address,
  1078. packetdata,
  1079. m_connection->GetProtocolID(),
  1080. peer->id,
  1081. channelnum);
  1082. try {
  1083. channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
  1084. LOG(dout_con << m_connection->getDesc()
  1085. << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
  1086. << ", channel: " << (channelnum & 0xFF)
  1087. << ", seqnum: " << seqnum << std::endl;)
  1088. throw ProcessedQueued("Buffered future reliable packet");
  1089. } catch (AlreadyExistsException &e) {
  1090. } catch (IncomingDataCorruption &e) {
  1091. m_connection->putCommand(ConnectionCommand::disconnect_peer(peer->id));
  1092. LOG(derr_con << m_connection->getDesc()
  1093. << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
  1094. << ", channel: " << (channelnum & 0xFF)
  1095. << ", seqnum: " << seqnum
  1096. << "DROPPING CLIENT!" << std::endl;)
  1097. }
  1098. }
  1099. /* we got a packet to process right now */
  1100. LOG(dout_con << m_connection->getDesc()
  1101. << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
  1102. << ", channel: " << (channelnum & 0xFF)
  1103. << ", seqnum: " << seqnum << std::endl;)
  1104. /* check for resend case */
  1105. u16 queued_seqnum = 0;
  1106. if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
  1107. if (queued_seqnum == seqnum) {
  1108. BufferedPacketPtr queued_packet = channel->incoming_reliables.popFirst();
  1109. /** TODO find a way to verify the new against the old packet */
  1110. }
  1111. }
  1112. channel->incNextIncomingSeqNum();
  1113. // Get out the inside packet and re-process it
  1114. SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
  1115. memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
  1116. return processPacket(channel, payload, peer->id, channelnum, true);
  1117. }
  1118. }