connectionthreads.cpp 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404
  1. /*
  2. Minetest
  3. Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
  4. Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
  5. This program is free software; you can redistribute it and/or modify
  6. it under the terms of the GNU Lesser General Public License as published by
  7. the Free Software Foundation; either version 2.1 of the License, or
  8. (at your option) any later version.
  9. This program is distributed in the hope that it will be useful,
  10. but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. GNU Lesser General Public License for more details.
  13. You should have received a copy of the GNU Lesser General Public License along
  14. with this program; if not, write to the Free Software Foundation, Inc.,
  15. 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  16. */
  17. #include "connectionthreads.h"
  18. #include "log.h"
  19. #include "profiler.h"
  20. #include "settings.h"
  21. #include "network/networkpacket.h"
  22. #include "util/serialize.h"
  23. namespace con
  24. {
  25. /******************************************************************************/
  26. /* defines used for debugging and profiling */
  27. /******************************************************************************/
  28. #ifdef NDEBUG
  29. #define LOG(a) a
  30. #define PROFILE(a)
  31. #undef DEBUG_CONNECTION_KBPS
  32. #else
  33. /* this mutex is used to achieve log message consistency */
  34. std::mutex log_conthread_mutex;
  35. #define LOG(a) \
  36. { \
  37. MutexAutoLock loglock(log_conthread_mutex); \
  38. a; \
  39. }
  40. #define PROFILE(a) a
  41. //#define DEBUG_CONNECTION_KBPS
  42. #undef DEBUG_CONNECTION_KBPS
  43. #endif
  44. /* maximum number of retries for reliable packets */
  45. #define MAX_RELIABLE_RETRY 5
  46. #define WINDOW_SIZE 5
  47. static session_t readPeerId(u8 *packetdata)
  48. {
  49. return readU16(&packetdata[4]);
  50. }
  51. static u8 readChannel(u8 *packetdata)
  52. {
  53. return readU8(&packetdata[6]);
  54. }
  55. /******************************************************************************/
  56. /* Connection Threads */
  57. /******************************************************************************/
  58. ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
  59. float timeout) :
  60. Thread("ConnectionSend"),
  61. m_max_packet_size(max_packet_size),
  62. m_timeout(timeout),
  63. m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
  64. {
  65. }
  66. void *ConnectionSendThread::run()
  67. {
  68. assert(m_connection);
  69. LOG(dout_con << m_connection->getDesc()
  70. << "ConnectionSend thread started" << std::endl);
  71. u64 curtime = porting::getTimeMs();
  72. u64 lasttime = curtime;
  73. PROFILE(std::stringstream ThreadIdentifier);
  74. PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
  75. /* if stop is requested don't stop immediately but try to send all */
  76. /* packets first */
  77. while (!stopRequested() || packetsQueued()) {
  78. BEGIN_DEBUG_EXCEPTION_HANDLER
  79. PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
  80. m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
  81. /* wait for trigger or timeout */
  82. m_send_sleep_semaphore.wait(50);
  83. /* remove all triggers */
  84. while (m_send_sleep_semaphore.wait(0)) {
  85. }
  86. lasttime = curtime;
  87. curtime = porting::getTimeMs();
  88. float dtime = CALC_DTIME(lasttime, curtime);
  89. /* first do all the reliable stuff */
  90. runTimeouts(dtime);
  91. /* translate commands to packets */
  92. ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
  93. while (c.type != CONNCMD_NONE) {
  94. if (c.reliable)
  95. processReliableCommand(c);
  96. else
  97. processNonReliableCommand(c);
  98. c = m_connection->m_command_queue.pop_frontNoEx(0);
  99. }
  100. /* send non reliable packets */
  101. sendPackets(dtime);
  102. END_DEBUG_EXCEPTION_HANDLER
  103. }
  104. PROFILE(g_profiler->remove(ThreadIdentifier.str()));
  105. return NULL;
  106. }
  107. void ConnectionSendThread::Trigger()
  108. {
  109. m_send_sleep_semaphore.post();
  110. }
  111. bool ConnectionSendThread::packetsQueued()
  112. {
  113. std::list<session_t> peerIds = m_connection->getPeerIDs();
  114. if (!m_outgoing_queue.empty() && !peerIds.empty())
  115. return true;
  116. for (session_t peerId : peerIds) {
  117. PeerHelper peer = m_connection->getPeerNoEx(peerId);
  118. if (!peer)
  119. continue;
  120. if (dynamic_cast<UDPPeer *>(&peer) == 0)
  121. continue;
  122. for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
  123. if (!channel.queued_commands.empty()) {
  124. return true;
  125. }
  126. }
  127. }
  128. return false;
  129. }
  130. void ConnectionSendThread::runTimeouts(float dtime)
  131. {
  132. std::list<session_t> timeouted_peers;
  133. std::list<session_t> peerIds = m_connection->getPeerIDs();
  134. for (session_t &peerId : peerIds) {
  135. PeerHelper peer = m_connection->getPeerNoEx(peerId);
  136. if (!peer)
  137. continue;
  138. UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
  139. if (!udpPeer)
  140. continue;
  141. PROFILE(std::stringstream peerIdentifier);
  142. PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
  143. << ";" << peerId << ";RELIABLE]");
  144. PROFILE(ScopeProfiler
  145. peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
  146. SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
  147. /*
  148. Check peer timeout
  149. */
  150. if (peer->isTimedOut(m_timeout)) {
  151. infostream << m_connection->getDesc()
  152. << "RunTimeouts(): Peer " << peer->id
  153. << " has timed out."
  154. << " (source=peer->timeout_counter)"
  155. << std::endl;
  156. // Add peer to the list
  157. timeouted_peers.push_back(peer->id);
  158. // Don't bother going through the buffers of this one
  159. continue;
  160. }
  161. float resend_timeout = udpPeer->getResendTimeout();
  162. bool retry_count_exceeded = false;
  163. for (Channel &channel : udpPeer->channels) {
  164. std::list<BufferedPacket> timed_outs;
  165. if (udpPeer->getLegacyPeer())
  166. channel.setWindowSize(WINDOW_SIZE);
  167. // Remove timed out incomplete unreliable split packets
  168. channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
  169. // Increment reliable packet times
  170. channel.outgoing_reliables_sent.incrementTimeouts(dtime);
  171. unsigned int numpeers = m_connection->m_peers.size();
  172. if (numpeers == 0)
  173. return;
  174. // Re-send timed out outgoing reliables
  175. timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
  176. (m_max_data_packets_per_iteration / numpeers));
  177. channel.UpdatePacketLossCounter(timed_outs.size());
  178. g_profiler->graphAdd("packets_lost", timed_outs.size());
  179. m_iteration_packets_avaialble -= timed_outs.size();
  180. for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
  181. k != timed_outs.end(); ++k) {
  182. session_t peer_id = readPeerId(*(k->data));
  183. u8 channelnum = readChannel(*(k->data));
  184. u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
  185. channel.UpdateBytesLost(k->data.getSize());
  186. k->resend_count++;
  187. if (k->resend_count > MAX_RELIABLE_RETRY) {
  188. retry_count_exceeded = true;
  189. timeouted_peers.push_back(peer->id);
  190. /* no need to check additional packets if a single one did timeout*/
  191. break;
  192. }
  193. LOG(derr_con << m_connection->getDesc()
  194. << "RE-SENDING timed-out RELIABLE to "
  195. << k->address.serializeString()
  196. << "(t/o=" << resend_timeout << "): "
  197. << "from_peer_id=" << peer_id
  198. << ", channel=" << ((int) channelnum & 0xff)
  199. << ", seqnum=" << seqnum
  200. << std::endl);
  201. rawSend(*k);
  202. // do not handle rtt here as we can't decide if this packet was
  203. // lost or really takes more time to transmit
  204. }
  205. if (retry_count_exceeded) {
  206. break; /* no need to check other channels if we already did timeout */
  207. }
  208. channel.UpdateTimers(dtime, udpPeer->getLegacyPeer());
  209. }
  210. /* skip to next peer if we did timeout */
  211. if (retry_count_exceeded)
  212. continue;
  213. /* send ping if necessary */
  214. if (udpPeer->Ping(dtime, data)) {
  215. LOG(dout_con << m_connection->getDesc()
  216. << "Sending ping for peer_id: " << udpPeer->id << std::endl);
  217. /* this may fail if there ain't a sequence number left */
  218. if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
  219. //retrigger with reduced ping interval
  220. udpPeer->Ping(4.0, data);
  221. }
  222. }
  223. udpPeer->RunCommandQueues(m_max_packet_size,
  224. m_max_commands_per_iteration,
  225. m_max_packets_requeued);
  226. }
  227. // Remove timed out peers
  228. for (u16 timeouted_peer : timeouted_peers) {
  229. LOG(derr_con << m_connection->getDesc()
  230. << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
  231. m_connection->deletePeer(timeouted_peer, true);
  232. }
  233. }
  234. void ConnectionSendThread::rawSend(const BufferedPacket &packet)
  235. {
  236. try {
  237. m_connection->m_udpSocket.Send(packet.address, *packet.data,
  238. packet.data.getSize());
  239. LOG(dout_con << m_connection->getDesc()
  240. << " rawSend: " << packet.data.getSize()
  241. << " bytes sent" << std::endl);
  242. } catch (SendFailedException &e) {
  243. LOG(derr_con << m_connection->getDesc()
  244. << "Connection::rawSend(): SendFailedException: "
  245. << packet.address.serializeString() << std::endl);
  246. }
  247. }
  248. void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
  249. {
  250. try {
  251. p.absolute_send_time = porting::getTimeMs();
  252. // Buffer the packet
  253. channel->outgoing_reliables_sent.insert(p,
  254. (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
  255. % (MAX_RELIABLE_WINDOW_SIZE + 1));
  256. }
  257. catch (AlreadyExistsException &e) {
  258. LOG(derr_con << m_connection->getDesc()
  259. << "WARNING: Going to send a reliable packet"
  260. << " in outgoing buffer" << std::endl);
  261. }
  262. // Send the packet
  263. rawSend(p);
  264. }
  265. bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
  266. SharedBuffer<u8> data, bool reliable)
  267. {
  268. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  269. if (!peer) {
  270. LOG(dout_con << m_connection->getDesc()
  271. << " INFO: dropped packet for non existent peer_id: "
  272. << peer_id << std::endl);
  273. FATAL_ERROR_IF(!reliable,
  274. "Trying to send raw packet reliable but no peer found!");
  275. return false;
  276. }
  277. Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
  278. if (reliable) {
  279. bool have_sequence_number_for_raw_packet = true;
  280. u16 seqnum =
  281. channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
  282. if (!have_sequence_number_for_raw_packet)
  283. return false;
  284. SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
  285. Address peer_address;
  286. peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
  287. // Add base headers and make a packet
  288. BufferedPacket p = con::makePacket(peer_address, reliable,
  289. m_connection->GetProtocolID(), m_connection->GetPeerID(),
  290. channelnum);
  291. // first check if our send window is already maxed out
  292. if (channel->outgoing_reliables_sent.size()
  293. < channel->getWindowSize()) {
  294. LOG(dout_con << m_connection->getDesc()
  295. << " INFO: sending a reliable packet to peer_id " << peer_id
  296. << " channel: " << channelnum
  297. << " seqnum: " << seqnum << std::endl);
  298. sendAsPacketReliable(p, channel);
  299. return true;
  300. }
  301. LOG(dout_con << m_connection->getDesc()
  302. << " INFO: queueing reliable packet for peer_id: " << peer_id
  303. << " channel: " << channelnum
  304. << " seqnum: " << seqnum << std::endl);
  305. channel->queued_reliables.push(p);
  306. return false;
  307. }
  308. Address peer_address;
  309. if (peer->getAddress(MTP_UDP, peer_address)) {
  310. // Add base headers and make a packet
  311. BufferedPacket p = con::makePacket(peer_address, data,
  312. m_connection->GetProtocolID(), m_connection->GetPeerID(),
  313. channelnum);
  314. // Send the packet
  315. rawSend(p);
  316. return true;
  317. }
  318. LOG(dout_con << m_connection->getDesc()
  319. << " INFO: dropped unreliable packet for peer_id: " << peer_id
  320. << " because of (yet) missing udp address" << std::endl);
  321. return false;
  322. }
  323. void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
  324. {
  325. assert(c.reliable); // Pre-condition
  326. switch (c.type) {
  327. case CONNCMD_NONE:
  328. LOG(dout_con << m_connection->getDesc()
  329. << "UDP processing reliable CONNCMD_NONE" << std::endl);
  330. return;
  331. case CONNCMD_SEND:
  332. LOG(dout_con << m_connection->getDesc()
  333. << "UDP processing reliable CONNCMD_SEND" << std::endl);
  334. sendReliable(c);
  335. return;
  336. case CONNCMD_SEND_TO_ALL:
  337. LOG(dout_con << m_connection->getDesc()
  338. << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
  339. sendToAllReliable(c);
  340. return;
  341. case CONCMD_CREATE_PEER:
  342. LOG(dout_con << m_connection->getDesc()
  343. << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
  344. if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
  345. /* put to queue if we couldn't send it immediately */
  346. sendReliable(c);
  347. }
  348. return;
  349. case CONCMD_DISABLE_LEGACY:
  350. LOG(dout_con << m_connection->getDesc()
  351. << "UDP processing reliable CONCMD_DISABLE_LEGACY" << std::endl);
  352. if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
  353. /* put to queue if we couldn't send it immediately */
  354. sendReliable(c);
  355. }
  356. return;
  357. case CONNCMD_SERVE:
  358. case CONNCMD_CONNECT:
  359. case CONNCMD_DISCONNECT:
  360. case CONCMD_ACK:
  361. FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
  362. default:
  363. LOG(dout_con << m_connection->getDesc()
  364. << " Invalid reliable command type: " << c.type << std::endl);
  365. }
  366. }
  367. void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
  368. {
  369. assert(!c.reliable); // Pre-condition
  370. switch (c.type) {
  371. case CONNCMD_NONE:
  372. LOG(dout_con << m_connection->getDesc()
  373. << " UDP processing CONNCMD_NONE" << std::endl);
  374. return;
  375. case CONNCMD_SERVE:
  376. LOG(dout_con << m_connection->getDesc()
  377. << " UDP processing CONNCMD_SERVE port="
  378. << c.address.serializeString() << std::endl);
  379. serve(c.address);
  380. return;
  381. case CONNCMD_CONNECT:
  382. LOG(dout_con << m_connection->getDesc()
  383. << " UDP processing CONNCMD_CONNECT" << std::endl);
  384. connect(c.address);
  385. return;
  386. case CONNCMD_DISCONNECT:
  387. LOG(dout_con << m_connection->getDesc()
  388. << " UDP processing CONNCMD_DISCONNECT" << std::endl);
  389. disconnect();
  390. return;
  391. case CONNCMD_DISCONNECT_PEER:
  392. LOG(dout_con << m_connection->getDesc()
  393. << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
  394. disconnect_peer(c.peer_id);
  395. return;
  396. case CONNCMD_SEND:
  397. LOG(dout_con << m_connection->getDesc()
  398. << " UDP processing CONNCMD_SEND" << std::endl);
  399. send(c.peer_id, c.channelnum, c.data);
  400. return;
  401. case CONNCMD_SEND_TO_ALL:
  402. LOG(dout_con << m_connection->getDesc()
  403. << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
  404. sendToAll(c.channelnum, c.data);
  405. return;
  406. case CONCMD_ACK:
  407. LOG(dout_con << m_connection->getDesc()
  408. << " UDP processing CONCMD_ACK" << std::endl);
  409. sendAsPacket(c.peer_id, c.channelnum, c.data, true);
  410. return;
  411. case CONCMD_CREATE_PEER:
  412. FATAL_ERROR("Got command that should be reliable as unreliable command");
  413. default:
  414. LOG(dout_con << m_connection->getDesc()
  415. << " Invalid command type: " << c.type << std::endl);
  416. }
  417. }
  418. void ConnectionSendThread::serve(Address bind_address)
  419. {
  420. LOG(dout_con << m_connection->getDesc()
  421. << "UDP serving at port " << bind_address.serializeString() << std::endl);
  422. try {
  423. m_connection->m_udpSocket.Bind(bind_address);
  424. m_connection->SetPeerID(PEER_ID_SERVER);
  425. }
  426. catch (SocketException &e) {
  427. // Create event
  428. ConnectionEvent ce;
  429. ce.bindFailed();
  430. m_connection->putEvent(ce);
  431. }
  432. }
  433. void ConnectionSendThread::connect(Address address)
  434. {
  435. LOG(dout_con << m_connection->getDesc() << " connecting to "
  436. << address.serializeString()
  437. << ":" << address.getPort() << std::endl);
  438. UDPPeer *peer = m_connection->createServerPeer(address);
  439. // Create event
  440. ConnectionEvent e;
  441. e.peerAdded(peer->id, peer->address);
  442. m_connection->putEvent(e);
  443. Address bind_addr;
  444. if (address.isIPv6())
  445. bind_addr.setAddress((IPv6AddressBytes *) NULL);
  446. else
  447. bind_addr.setAddress(0, 0, 0, 0);
  448. m_connection->m_udpSocket.Bind(bind_addr);
  449. // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
  450. m_connection->SetPeerID(PEER_ID_INEXISTENT);
  451. NetworkPacket pkt(0, 0);
  452. m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
  453. }
  454. void ConnectionSendThread::disconnect()
  455. {
  456. LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
  457. // Create and send DISCO packet
  458. SharedBuffer<u8> data(2);
  459. writeU8(&data[0], PACKET_TYPE_CONTROL);
  460. writeU8(&data[1], CONTROLTYPE_DISCO);
  461. // Send to all
  462. std::list<session_t> peerids = m_connection->getPeerIDs();
  463. for (session_t peerid : peerids) {
  464. sendAsPacket(peerid, 0, data, false);
  465. }
  466. }
  467. void ConnectionSendThread::disconnect_peer(session_t peer_id)
  468. {
  469. LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
  470. // Create and send DISCO packet
  471. SharedBuffer<u8> data(2);
  472. writeU8(&data[0], PACKET_TYPE_CONTROL);
  473. writeU8(&data[1], CONTROLTYPE_DISCO);
  474. sendAsPacket(peer_id, 0, data, false);
  475. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  476. if (!peer)
  477. return;
  478. if (dynamic_cast<UDPPeer *>(&peer) == 0) {
  479. return;
  480. }
  481. dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
  482. }
  483. void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
  484. SharedBuffer<u8> data)
  485. {
  486. assert(channelnum < CHANNEL_COUNT); // Pre-condition
  487. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  488. if (!peer) {
  489. LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
  490. << ">>>NOT<<< found on sending packet"
  491. << ", channel " << (channelnum % 0xFF)
  492. << ", size: " << data.getSize() << std::endl);
  493. return;
  494. }
  495. LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
  496. << ", channel " << (channelnum % 0xFF)
  497. << ", size: " << data.getSize() << std::endl);
  498. u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
  499. u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
  500. std::list<SharedBuffer<u8>> originals;
  501. makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
  502. peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
  503. for (const SharedBuffer<u8> &original : originals) {
  504. sendAsPacket(peer_id, channelnum, original);
  505. }
  506. }
  507. void ConnectionSendThread::sendReliable(ConnectionCommand &c)
  508. {
  509. PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
  510. if (!peer)
  511. return;
  512. peer->PutReliableSendCommand(c, m_max_packet_size);
  513. }
  514. void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer<u8> data)
  515. {
  516. std::list<session_t> peerids = m_connection->getPeerIDs();
  517. for (session_t peerid : peerids) {
  518. send(peerid, channelnum, data);
  519. }
  520. }
  521. void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
  522. {
  523. std::list<session_t> peerids = m_connection->getPeerIDs();
  524. for (session_t peerid : peerids) {
  525. PeerHelper peer = m_connection->getPeerNoEx(peerid);
  526. if (!peer)
  527. continue;
  528. peer->PutReliableSendCommand(c, m_max_packet_size);
  529. }
  530. }
  531. void ConnectionSendThread::sendPackets(float dtime)
  532. {
  533. std::list<session_t> peerIds = m_connection->getPeerIDs();
  534. std::list<session_t> pendingDisconnect;
  535. std::map<session_t, bool> pending_unreliable;
  536. for (session_t peerId : peerIds) {
  537. PeerHelper peer = m_connection->getPeerNoEx(peerId);
  538. //peer may have been removed
  539. if (!peer) {
  540. LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
  541. << peerId
  542. << std::endl);
  543. continue;
  544. }
  545. peer->m_increment_packets_remaining =
  546. m_iteration_packets_avaialble / m_connection->m_peers.size();
  547. UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
  548. if (!udpPeer) {
  549. continue;
  550. }
  551. if (udpPeer->m_pending_disconnect) {
  552. pendingDisconnect.push_back(peerId);
  553. }
  554. PROFILE(std::stringstream
  555. peerIdentifier);
  556. PROFILE(
  557. peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
  558. << ";RELIABLE]");
  559. PROFILE(ScopeProfiler
  560. peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
  561. LOG(dout_con << m_connection->getDesc()
  562. << " Handle per peer queues: peer_id=" << peerId
  563. << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
  564. // first send queued reliable packets for all peers (if possible)
  565. for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
  566. Channel &channel = udpPeer->channels[i];
  567. u16 next_to_ack = 0;
  568. channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
  569. u16 next_to_receive = 0;
  570. channel.incoming_reliables.getFirstSeqnum(next_to_receive);
  571. LOG(dout_con << m_connection->getDesc() << "\t channel: "
  572. << i << ", peer quota:"
  573. << peer->m_increment_packets_remaining
  574. << std::endl
  575. << "\t\t\treliables on wire: "
  576. << channel.outgoing_reliables_sent.size()
  577. << ", waiting for ack for " << next_to_ack
  578. << std::endl
  579. << "\t\t\tincoming_reliables: "
  580. << channel.incoming_reliables.size()
  581. << ", next reliable packet: "
  582. << channel.readNextIncomingSeqNum()
  583. << ", next queued: " << next_to_receive
  584. << std::endl
  585. << "\t\t\treliables queued : "
  586. << channel.queued_reliables.size()
  587. << std::endl
  588. << "\t\t\tqueued commands : "
  589. << channel.queued_commands.size()
  590. << std::endl);
  591. while (!channel.queued_reliables.empty() &&
  592. channel.outgoing_reliables_sent.size()
  593. < channel.getWindowSize() &&
  594. peer->m_increment_packets_remaining > 0) {
  595. BufferedPacket p = channel.queued_reliables.front();
  596. channel.queued_reliables.pop();
  597. LOG(dout_con << m_connection->getDesc()
  598. << " INFO: sending a queued reliable packet "
  599. << " channel: " << i
  600. << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
  601. << std::endl);
  602. sendAsPacketReliable(p, &channel);
  603. peer->m_increment_packets_remaining--;
  604. }
  605. }
  606. }
  607. if (!m_outgoing_queue.empty()) {
  608. LOG(dout_con << m_connection->getDesc()
  609. << " Handle non reliable queue ("
  610. << m_outgoing_queue.size() << " pkts)" << std::endl);
  611. }
  612. unsigned int initial_queuesize = m_outgoing_queue.size();
  613. /* send non reliable packets*/
  614. for (unsigned int i = 0; i < initial_queuesize; i++) {
  615. OutgoingPacket packet = m_outgoing_queue.front();
  616. m_outgoing_queue.pop();
  617. if (packet.reliable)
  618. continue;
  619. PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
  620. if (!peer) {
  621. LOG(dout_con << m_connection->getDesc()
  622. << " Outgoing queue: peer_id=" << packet.peer_id
  623. << ">>>NOT<<< found on sending packet"
  624. << ", channel " << (packet.channelnum % 0xFF)
  625. << ", size: " << packet.data.getSize() << std::endl);
  626. continue;
  627. }
  628. /* send acks immediately */
  629. if (packet.ack) {
  630. rawSendAsPacket(packet.peer_id, packet.channelnum,
  631. packet.data, packet.reliable);
  632. peer->m_increment_packets_remaining =
  633. MYMIN(0, peer->m_increment_packets_remaining--);
  634. } else if (
  635. (peer->m_increment_packets_remaining > 0) ||
  636. (stopRequested())) {
  637. rawSendAsPacket(packet.peer_id, packet.channelnum,
  638. packet.data, packet.reliable);
  639. peer->m_increment_packets_remaining--;
  640. } else {
  641. m_outgoing_queue.push(packet);
  642. pending_unreliable[packet.peer_id] = true;
  643. }
  644. }
  645. for (session_t peerId : pendingDisconnect) {
  646. if (!pending_unreliable[peerId]) {
  647. m_connection->deletePeer(peerId, false);
  648. }
  649. }
  650. }
  651. void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
  652. SharedBuffer<u8> data, bool ack)
  653. {
  654. OutgoingPacket packet(peer_id, channelnum, data, false, ack);
  655. m_outgoing_queue.push(packet);
  656. }
  657. ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
  658. Thread("ConnectionReceive")
  659. {
  660. }
  661. void *ConnectionReceiveThread::run()
  662. {
  663. assert(m_connection);
  664. LOG(dout_con << m_connection->getDesc()
  665. << "ConnectionReceive thread started" << std::endl);
  666. PROFILE(std::stringstream
  667. ThreadIdentifier);
  668. PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
  669. #ifdef DEBUG_CONNECTION_KBPS
  670. u64 curtime = porting::getTimeMs();
  671. u64 lasttime = curtime;
  672. float debug_print_timer = 0.0;
  673. #endif
  674. while (!stopRequested()) {
  675. BEGIN_DEBUG_EXCEPTION_HANDLER
  676. PROFILE(ScopeProfiler
  677. sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
  678. #ifdef DEBUG_CONNECTION_KBPS
  679. lasttime = curtime;
  680. curtime = porting::getTimeMs();
  681. float dtime = CALC_DTIME(lasttime,curtime);
  682. #endif
  683. /* receive packets */
  684. receive();
  685. #ifdef DEBUG_CONNECTION_KBPS
  686. debug_print_timer += dtime;
  687. if (debug_print_timer > 20.0) {
  688. debug_print_timer -= 20.0;
  689. std::list<session_t> peerids = m_connection->getPeerIDs();
  690. for (std::list<session_t>::iterator i = peerids.begin();
  691. i != peerids.end();
  692. i++)
  693. {
  694. PeerHelper peer = m_connection->getPeerNoEx(*i);
  695. if (!peer)
  696. continue;
  697. float peer_current = 0.0;
  698. float peer_loss = 0.0;
  699. float avg_rate = 0.0;
  700. float avg_loss = 0.0;
  701. for(u16 j=0; j<CHANNEL_COUNT; j++)
  702. {
  703. peer_current +=peer->channels[j].getCurrentDownloadRateKB();
  704. peer_loss += peer->channels[j].getCurrentLossRateKB();
  705. avg_rate += peer->channels[j].getAvgDownloadRateKB();
  706. avg_loss += peer->channels[j].getAvgLossRateKB();
  707. }
  708. std::stringstream output;
  709. output << std::fixed << std::setprecision(1);
  710. output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
  711. output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
  712. output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
  713. output << std::setfill(' ');
  714. for(u16 j=0; j<CHANNEL_COUNT; j++)
  715. {
  716. output << "\tcha " << j << ":"
  717. << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
  718. << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
  719. << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
  720. << " /"
  721. << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
  722. << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
  723. << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
  724. << " / WS: " << peer->channels[j].getWindowSize()
  725. << std::endl;
  726. }
  727. fprintf(stderr,"%s\n",output.str().c_str());
  728. }
  729. }
  730. #endif
  731. END_DEBUG_EXCEPTION_HANDLER
  732. }
  733. PROFILE(g_profiler->remove(ThreadIdentifier.str()));
  734. return NULL;
  735. }
  736. // Receive packets from the network and buffers and create ConnectionEvents
  737. void ConnectionReceiveThread::receive()
  738. {
  739. // use IPv6 minimum allowed MTU as receive buffer size as this is
  740. // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
  741. // infrastructure
  742. unsigned int packet_maxsize = 1500;
  743. SharedBuffer<u8> packetdata(packet_maxsize);
  744. bool packet_queued = true;
  745. unsigned int loop_count = 0;
  746. /* first of all read packets from socket */
  747. /* check for incoming data available */
  748. while ((loop_count < 10) &&
  749. (m_connection->m_udpSocket.WaitData(50))) {
  750. loop_count++;
  751. try {
  752. if (packet_queued) {
  753. bool data_left = true;
  754. session_t peer_id;
  755. SharedBuffer<u8> resultdata;
  756. while (data_left) {
  757. try {
  758. data_left = getFromBuffers(peer_id, resultdata);
  759. if (data_left) {
  760. ConnectionEvent e;
  761. e.dataReceived(peer_id, resultdata);
  762. m_connection->putEvent(e);
  763. }
  764. }
  765. catch (ProcessedSilentlyException &e) {
  766. /* try reading again */
  767. }
  768. }
  769. packet_queued = false;
  770. }
  771. Address sender;
  772. s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
  773. packet_maxsize);
  774. if ((received_size < BASE_HEADER_SIZE) ||
  775. (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
  776. LOG(derr_con << m_connection->getDesc()
  777. << "Receive(): Invalid incoming packet, "
  778. << "size: " << received_size
  779. << ", protocol: "
  780. << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
  781. << std::endl);
  782. continue;
  783. }
  784. session_t peer_id = readPeerId(*packetdata);
  785. u8 channelnum = readChannel(*packetdata);
  786. if (channelnum > CHANNEL_COUNT - 1) {
  787. LOG(derr_con << m_connection->getDesc()
  788. << "Receive(): Invalid channel " << channelnum << std::endl);
  789. throw InvalidIncomingDataException("Channel doesn't exist");
  790. }
  791. /* Try to identify peer by sender address (may happen on join) */
  792. if (peer_id == PEER_ID_INEXISTENT) {
  793. peer_id = m_connection->lookupPeer(sender);
  794. // We do not have to remind the peer of its
  795. // peer id as the CONTROLTYPE_SET_PEER_ID
  796. // command was sent reliably.
  797. }
  798. /* The peer was not found in our lists. Add it. */
  799. if (peer_id == PEER_ID_INEXISTENT) {
  800. peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
  801. }
  802. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  803. if (!peer) {
  804. LOG(dout_con << m_connection->getDesc()
  805. << " got packet from unknown peer_id: "
  806. << peer_id << " Ignoring." << std::endl);
  807. continue;
  808. }
  809. // Validate peer address
  810. Address peer_address;
  811. if (peer->getAddress(MTP_UDP, peer_address)) {
  812. if (peer_address != sender) {
  813. LOG(derr_con << m_connection->getDesc()
  814. << m_connection->getDesc()
  815. << " Peer " << peer_id << " sending from different address."
  816. " Ignoring." << std::endl);
  817. continue;
  818. }
  819. } else {
  820. bool invalid_address = true;
  821. if (invalid_address) {
  822. LOG(derr_con << m_connection->getDesc()
  823. << m_connection->getDesc()
  824. << " Peer " << peer_id << " unknown."
  825. " Ignoring." << std::endl);
  826. continue;
  827. }
  828. }
  829. peer->ResetTimeout();
  830. Channel *channel = 0;
  831. if (dynamic_cast<UDPPeer *>(&peer) != 0) {
  832. channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
  833. }
  834. if (channel != 0) {
  835. channel->UpdateBytesReceived(received_size);
  836. }
  837. // Throw the received packet to channel->processPacket()
  838. // Make a new SharedBuffer from the data without the base headers
  839. SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
  840. memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
  841. strippeddata.getSize());
  842. try {
  843. // Process it (the result is some data with no headers made by us)
  844. SharedBuffer<u8> resultdata = processPacket
  845. (channel, strippeddata, peer_id, channelnum, false);
  846. LOG(dout_con << m_connection->getDesc()
  847. << " ProcessPacket from peer_id: " << peer_id
  848. << ",channel: " << (channelnum & 0xFF) << ", returned "
  849. << resultdata.getSize() << " bytes" << std::endl);
  850. ConnectionEvent e;
  851. e.dataReceived(peer_id, resultdata);
  852. m_connection->putEvent(e);
  853. }
  854. catch (ProcessedSilentlyException &e) {
  855. }
  856. catch (ProcessedQueued &e) {
  857. packet_queued = true;
  858. }
  859. }
  860. catch (InvalidIncomingDataException &e) {
  861. }
  862. catch (ProcessedSilentlyException &e) {
  863. }
  864. }
  865. }
  866. bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
  867. {
  868. std::list<session_t> peerids = m_connection->getPeerIDs();
  869. for (session_t peerid : peerids) {
  870. PeerHelper peer = m_connection->getPeerNoEx(peerid);
  871. if (!peer)
  872. continue;
  873. if (dynamic_cast<UDPPeer *>(&peer) == 0)
  874. continue;
  875. for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
  876. if (checkIncomingBuffers(&channel, peer_id, dst)) {
  877. return true;
  878. }
  879. }
  880. }
  881. return false;
  882. }
  883. bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
  884. session_t &peer_id, SharedBuffer<u8> &dst)
  885. {
  886. u16 firstseqnum = 0;
  887. if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
  888. if (firstseqnum == channel->readNextIncomingSeqNum()) {
  889. BufferedPacket p = channel->incoming_reliables.popFirst();
  890. peer_id = readPeerId(*p.data);
  891. u8 channelnum = readChannel(*p.data);
  892. u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
  893. LOG(dout_con << m_connection->getDesc()
  894. << "UNBUFFERING TYPE_RELIABLE"
  895. << " seqnum=" << seqnum
  896. << " peer_id=" << peer_id
  897. << " channel=" << ((int) channelnum & 0xff)
  898. << std::endl);
  899. channel->incNextIncomingSeqNum();
  900. u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
  901. // Get out the inside packet and re-process it
  902. SharedBuffer<u8> payload(p.data.getSize() - headers_size);
  903. memcpy(*payload, &p.data[headers_size], payload.getSize());
  904. dst = processPacket(channel, payload, peer_id, channelnum, true);
  905. return true;
  906. }
  907. }
  908. return false;
  909. }
  910. SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
  911. SharedBuffer<u8> packetdata, session_t peer_id, u8 channelnum, bool reliable)
  912. {
  913. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  914. if (!peer) {
  915. errorstream << "Peer not found (possible timeout)" << std::endl;
  916. throw ProcessedSilentlyException("Peer not found (possible timeout)");
  917. }
  918. if (packetdata.getSize() < 1)
  919. throw InvalidIncomingDataException("packetdata.getSize() < 1");
  920. u8 type = readU8(&(packetdata[0]));
  921. if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
  922. std::string errmsg = "Invalid peer_id=" + itos(peer_id);
  923. errorstream << errmsg << std::endl;
  924. throw InvalidIncomingDataException(errmsg.c_str());
  925. }
  926. if (type >= PACKET_TYPE_MAX) {
  927. derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
  928. << std::endl;
  929. throw InvalidIncomingDataException("Invalid packet type");
  930. }
  931. const PacketTypeHandler &pHandle = packetTypeRouter[type];
  932. return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
  933. }
  934. const ConnectionReceiveThread::PacketTypeHandler
  935. ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
  936. {&ConnectionReceiveThread::handlePacketType_Control},
  937. {&ConnectionReceiveThread::handlePacketType_Original},
  938. {&ConnectionReceiveThread::handlePacketType_Split},
  939. {&ConnectionReceiveThread::handlePacketType_Reliable},
  940. };
  941. SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
  942. SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
  943. {
  944. if (packetdata.getSize() < 2)
  945. throw InvalidIncomingDataException("packetdata.getSize() < 2");
  946. u8 controltype = readU8(&(packetdata[1]));
  947. if (controltype == CONTROLTYPE_ACK) {
  948. assert(channel != NULL);
  949. if (packetdata.getSize() < 4) {
  950. throw InvalidIncomingDataException(
  951. "packetdata.getSize() < 4 (ACK header size)");
  952. }
  953. u16 seqnum = readU16(&packetdata[2]);
  954. LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
  955. << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
  956. << seqnum << " ]" << std::endl);
  957. try {
  958. BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
  959. // only calculate rtt from straight sent packets
  960. if (p.resend_count == 0) {
  961. // Get round trip time
  962. u64 current_time = porting::getTimeMs();
  963. // a overflow is quite unlikely but as it'd result in major
  964. // rtt miscalculation we handle it here
  965. if (current_time > p.absolute_send_time) {
  966. float rtt = (current_time - p.absolute_send_time) / 1000.0;
  967. // Let peer calculate stuff according to it
  968. // (avg_rtt and resend_timeout)
  969. dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
  970. } else if (p.totaltime > 0) {
  971. float rtt = p.totaltime;
  972. // Let peer calculate stuff according to it
  973. // (avg_rtt and resend_timeout)
  974. dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
  975. }
  976. }
  977. // put bytes for max bandwidth calculation
  978. channel->UpdateBytesSent(p.data.getSize(), 1);
  979. if (channel->outgoing_reliables_sent.size() == 0)
  980. m_connection->TriggerSend();
  981. } catch (NotFoundException &e) {
  982. LOG(derr_con << m_connection->getDesc()
  983. << "WARNING: ACKed packet not in outgoing queue" << std::endl);
  984. channel->UpdatePacketTooLateCounter();
  985. }
  986. throw ProcessedSilentlyException("Got an ACK");
  987. } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
  988. // Got a packet to set our peer id
  989. if (packetdata.getSize() < 4)
  990. throw InvalidIncomingDataException
  991. ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
  992. session_t peer_id_new = readU16(&packetdata[2]);
  993. LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
  994. << "... " << std::endl);
  995. if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
  996. LOG(derr_con << m_connection->getDesc()
  997. << "WARNING: Not changing existing peer id." << std::endl);
  998. } else {
  999. LOG(dout_con << m_connection->getDesc() << "changing own peer id"
  1000. << std::endl);
  1001. m_connection->SetPeerID(peer_id_new);
  1002. }
  1003. ConnectionCommand cmd;
  1004. SharedBuffer<u8> reply(2);
  1005. writeU8(&reply[0], PACKET_TYPE_CONTROL);
  1006. writeU8(&reply[1], CONTROLTYPE_ENABLE_BIG_SEND_WINDOW);
  1007. cmd.disableLegacy(PEER_ID_SERVER, reply);
  1008. m_connection->putCommand(cmd);
  1009. throw ProcessedSilentlyException("Got a SET_PEER_ID");
  1010. } else if (controltype == CONTROLTYPE_PING) {
  1011. // Just ignore it, the incoming data already reset
  1012. // the timeout counter
  1013. LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
  1014. throw ProcessedSilentlyException("Got a PING");
  1015. } else if (controltype == CONTROLTYPE_DISCO) {
  1016. // Just ignore it, the incoming data already reset
  1017. // the timeout counter
  1018. LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
  1019. << peer->id << std::endl);
  1020. if (!m_connection->deletePeer(peer->id, false)) {
  1021. derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
  1022. }
  1023. throw ProcessedSilentlyException("Got a DISCO");
  1024. } else if (controltype == CONTROLTYPE_ENABLE_BIG_SEND_WINDOW) {
  1025. dynamic_cast<UDPPeer *>(peer)->setNonLegacyPeer();
  1026. throw ProcessedSilentlyException("Got non legacy control");
  1027. } else {
  1028. LOG(derr_con << m_connection->getDesc()
  1029. << "INVALID TYPE_CONTROL: invalid controltype="
  1030. << ((int) controltype & 0xff) << std::endl);
  1031. throw InvalidIncomingDataException("Invalid control type");
  1032. }
  1033. }
  1034. SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
  1035. SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
  1036. {
  1037. if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
  1038. throw InvalidIncomingDataException
  1039. ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
  1040. LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
  1041. << std::endl);
  1042. // Get the inside packet out and return it
  1043. SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
  1044. memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
  1045. return payload;
  1046. }
  1047. SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
  1048. SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
  1049. {
  1050. Address peer_address;
  1051. if (peer->getAddress(MTP_UDP, peer_address)) {
  1052. // We have to create a packet again for buffering
  1053. // This isn't actually too bad an idea.
  1054. BufferedPacket packet = makePacket(peer_address,
  1055. packetdata,
  1056. m_connection->GetProtocolID(),
  1057. peer->id,
  1058. channelnum);
  1059. // Buffer the packet
  1060. SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
  1061. if (data.getSize() != 0) {
  1062. LOG(dout_con << m_connection->getDesc()
  1063. << "RETURNING TYPE_SPLIT: Constructed full data, "
  1064. << "size=" << data.getSize() << std::endl);
  1065. return data;
  1066. }
  1067. LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
  1068. throw ProcessedSilentlyException("Buffered a split packet chunk");
  1069. }
  1070. // We should never get here.
  1071. FATAL_ERROR("Invalid execution point");
  1072. }
  1073. SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
  1074. SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
  1075. {
  1076. assert(channel != NULL);
  1077. // Recursive reliable packets not allowed
  1078. if (reliable)
  1079. throw InvalidIncomingDataException("Found nested reliable packets");
  1080. if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
  1081. throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
  1082. u16 seqnum = readU16(&packetdata[1]);
  1083. bool is_future_packet = false;
  1084. bool is_old_packet = false;
  1085. /* packet is within our receive window send ack */
  1086. if (seqnum_in_window(seqnum,
  1087. channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
  1088. m_connection->sendAck(peer->id, channelnum, seqnum);
  1089. } else {
  1090. is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
  1091. is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
  1092. /* packet is not within receive window, don't send ack. *
  1093. * if this was a valid packet it's gonna be retransmitted */
  1094. if (is_future_packet)
  1095. throw ProcessedSilentlyException(
  1096. "Received packet newer then expected, not sending ack");
  1097. /* seems like our ack was lost, send another one for a old packet */
  1098. if (is_old_packet) {
  1099. LOG(dout_con << m_connection->getDesc()
  1100. << "RE-SENDING ACK: peer_id: " << peer->id
  1101. << ", channel: " << (channelnum & 0xFF)
  1102. << ", seqnum: " << seqnum << std::endl;)
  1103. m_connection->sendAck(peer->id, channelnum, seqnum);
  1104. // we already have this packet so this one was on wire at least
  1105. // the current timeout
  1106. // we don't know how long this packet was on wire don't do silly guessing
  1107. // dynamic_cast<UDPPeer*>(&peer)->
  1108. // reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
  1109. throw ProcessedSilentlyException("Retransmitting ack for old packet");
  1110. }
  1111. }
  1112. if (seqnum != channel->readNextIncomingSeqNum()) {
  1113. Address peer_address;
  1114. // this is a reliable packet so we have a udp address for sure
  1115. peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
  1116. // This one comes later, buffer it.
  1117. // Actually we have to make a packet to buffer one.
  1118. // Well, we have all the ingredients, so just do it.
  1119. BufferedPacket packet = con::makePacket(
  1120. peer_address,
  1121. packetdata,
  1122. m_connection->GetProtocolID(),
  1123. peer->id,
  1124. channelnum);
  1125. try {
  1126. channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
  1127. LOG(dout_con << m_connection->getDesc()
  1128. << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
  1129. << ", channel: " << (channelnum & 0xFF)
  1130. << ", seqnum: " << seqnum << std::endl;)
  1131. throw ProcessedQueued("Buffered future reliable packet");
  1132. } catch (AlreadyExistsException &e) {
  1133. } catch (IncomingDataCorruption &e) {
  1134. ConnectionCommand discon;
  1135. discon.disconnect_peer(peer->id);
  1136. m_connection->putCommand(discon);
  1137. LOG(derr_con << m_connection->getDesc()
  1138. << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
  1139. << ", channel: " << (channelnum & 0xFF)
  1140. << ", seqnum: " << seqnum
  1141. << "DROPPING CLIENT!" << std::endl;)
  1142. }
  1143. }
  1144. /* we got a packet to process right now */
  1145. LOG(dout_con << m_connection->getDesc()
  1146. << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
  1147. << ", channel: " << (channelnum & 0xFF)
  1148. << ", seqnum: " << seqnum << std::endl;)
  1149. /* check for resend case */
  1150. u16 queued_seqnum = 0;
  1151. if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
  1152. if (queued_seqnum == seqnum) {
  1153. BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
  1154. /** TODO find a way to verify the new against the old packet */
  1155. }
  1156. }
  1157. channel->incNextIncomingSeqNum();
  1158. // Get out the inside packet and re-process it
  1159. SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
  1160. memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
  1161. return processPacket(channel, payload, peer->id, channelnum, true);
  1162. }
  1163. }