connection.cpp 78 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991
  1. /*
  2. Minetest
  3. Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
  4. This program is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU Lesser General Public License as published by
  6. the Free Software Foundation; either version 2.1 of the License, or
  7. (at your option) any later version.
  8. This program is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. GNU Lesser General Public License for more details.
  12. You should have received a copy of the GNU Lesser General Public License along
  13. with this program; if not, write to the Free Software Foundation, Inc.,
  14. 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  15. */
  16. #include <iomanip>
  17. #include <cerrno>
  18. #include "connection.h"
  19. #include "serialization.h"
  20. #include "log.h"
  21. #include "porting.h"
  22. #include "network/networkpacket.h"
  23. #include "network/peerhandler.h"
  24. #include "util/serialize.h"
  25. #include "util/numeric.h"
  26. #include "util/string.h"
  27. #include "settings.h"
  28. #include "profiler.h"
  29. namespace con
  30. {
  31. /******************************************************************************/
  32. /* defines used for debugging and profiling */
  33. /******************************************************************************/
  34. #ifdef NDEBUG
  35. #define LOG(a) a
  36. #define PROFILE(a)
  37. #undef DEBUG_CONNECTION_KBPS
  38. #else
  39. /* this mutex is used to achieve log message consistency */
  40. std::mutex log_message_mutex;
  41. #define LOG(a) \
  42. { \
  43. MutexAutoLock loglock(log_message_mutex); \
  44. a; \
  45. }
  46. #define PROFILE(a) a
  47. //#define DEBUG_CONNECTION_KBPS
  48. #undef DEBUG_CONNECTION_KBPS
  49. #endif
  50. static inline float CALC_DTIME(u64 lasttime, u64 curtime)
  51. {
  52. float value = ( curtime - lasttime) / 1000.0;
  53. return MYMAX(MYMIN(value,0.1),0.0);
  54. }
  55. #define MAX_UDP_PEERS 65535
  56. #define PING_TIMEOUT 5.0
  57. /* maximum number of retries for reliable packets */
  58. #define MAX_RELIABLE_RETRY 5
  59. static u16 readPeerId(u8 *packetdata)
  60. {
  61. return readU16(&packetdata[4]);
  62. }
  63. static u8 readChannel(u8 *packetdata)
  64. {
  65. return readU8(&packetdata[6]);
  66. }
  67. BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
  68. u32 protocol_id, u16 sender_peer_id, u8 channel)
  69. {
  70. u32 packet_size = datasize + BASE_HEADER_SIZE;
  71. BufferedPacket p(packet_size);
  72. p.address = address;
  73. writeU32(&p.data[0], protocol_id);
  74. writeU16(&p.data[4], sender_peer_id);
  75. writeU8(&p.data[6], channel);
  76. memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
  77. return p;
  78. }
  79. BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
  80. u32 protocol_id, u16 sender_peer_id, u8 channel)
  81. {
  82. return makePacket(address, *data, data.getSize(),
  83. protocol_id, sender_peer_id, channel);
  84. }
  85. SharedBuffer<u8> makeOriginalPacket(
  86. SharedBuffer<u8> data)
  87. {
  88. u32 header_size = 1;
  89. u32 packet_size = data.getSize() + header_size;
  90. SharedBuffer<u8> b(packet_size);
  91. writeU8(&(b[0]), TYPE_ORIGINAL);
  92. if (data.getSize() > 0) {
  93. memcpy(&(b[header_size]), *data, data.getSize());
  94. }
  95. return b;
  96. }
  97. std::list<SharedBuffer<u8> > makeSplitPacket(
  98. SharedBuffer<u8> data,
  99. u32 chunksize_max,
  100. u16 seqnum)
  101. {
  102. // Chunk packets, containing the TYPE_SPLIT header
  103. std::list<SharedBuffer<u8> > chunks;
  104. u32 chunk_header_size = 7;
  105. u32 maximum_data_size = chunksize_max - chunk_header_size;
  106. u32 start = 0;
  107. u32 end = 0;
  108. u32 chunk_num = 0;
  109. u16 chunk_count = 0;
  110. do{
  111. end = start + maximum_data_size - 1;
  112. if (end > data.getSize() - 1)
  113. end = data.getSize() - 1;
  114. u32 payload_size = end - start + 1;
  115. u32 packet_size = chunk_header_size + payload_size;
  116. SharedBuffer<u8> chunk(packet_size);
  117. writeU8(&chunk[0], TYPE_SPLIT);
  118. writeU16(&chunk[1], seqnum);
  119. // [3] u16 chunk_count is written at next stage
  120. writeU16(&chunk[5], chunk_num);
  121. memcpy(&chunk[chunk_header_size], &data[start], payload_size);
  122. chunks.push_back(chunk);
  123. chunk_count++;
  124. start = end + 1;
  125. chunk_num++;
  126. }
  127. while(end != data.getSize() - 1);
  128. for (SharedBuffer<u8> &chunk : chunks) {
  129. // Write chunk_count
  130. writeU16(&(chunk[3]), chunk_count);
  131. }
  132. return chunks;
  133. }
  134. std::list<SharedBuffer<u8> > makeAutoSplitPacket(
  135. SharedBuffer<u8> data,
  136. u32 chunksize_max,
  137. u16 &split_seqnum)
  138. {
  139. u32 original_header_size = 1;
  140. std::list<SharedBuffer<u8> > list;
  141. if (data.getSize() + original_header_size > chunksize_max)
  142. {
  143. list = makeSplitPacket(data, chunksize_max, split_seqnum);
  144. split_seqnum++;
  145. return list;
  146. }
  147. list.push_back(makeOriginalPacket(data));
  148. return list;
  149. }
  150. SharedBuffer<u8> makeReliablePacket(
  151. const SharedBuffer<u8> &data,
  152. u16 seqnum)
  153. {
  154. u32 header_size = 3;
  155. u32 packet_size = data.getSize() + header_size;
  156. SharedBuffer<u8> b(packet_size);
  157. writeU8(&b[0], TYPE_RELIABLE);
  158. writeU16(&b[1], seqnum);
  159. memcpy(&b[header_size], *data, data.getSize());
  160. return b;
  161. }
  162. /*
  163. ReliablePacketBuffer
  164. */
  165. void ReliablePacketBuffer::print()
  166. {
  167. MutexAutoLock listlock(m_list_mutex);
  168. LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl);
  169. unsigned int index = 0;
  170. for (BufferedPacket &bufferedPacket : m_list) {
  171. u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE+1]));
  172. LOG(dout_con<<index<< ":" << s << std::endl);
  173. index++;
  174. }
  175. }
  176. bool ReliablePacketBuffer::empty()
  177. {
  178. MutexAutoLock listlock(m_list_mutex);
  179. return m_list.empty();
  180. }
  181. u32 ReliablePacketBuffer::size()
  182. {
  183. return m_list_size;
  184. }
  185. bool ReliablePacketBuffer::containsPacket(u16 seqnum)
  186. {
  187. return !(findPacket(seqnum) == m_list.end());
  188. }
  189. RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
  190. {
  191. std::list<BufferedPacket>::iterator i = m_list.begin();
  192. for(; i != m_list.end(); ++i)
  193. {
  194. u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
  195. /*dout_con<<"findPacket(): finding seqnum="<<seqnum
  196. <<", comparing to s="<<s<<std::endl;*/
  197. if (s == seqnum)
  198. break;
  199. }
  200. return i;
  201. }
  202. RPBSearchResult ReliablePacketBuffer::notFound()
  203. {
  204. return m_list.end();
  205. }
  206. bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
  207. {
  208. MutexAutoLock listlock(m_list_mutex);
  209. if (m_list.empty())
  210. return false;
  211. BufferedPacket p = *m_list.begin();
  212. result = readU16(&p.data[BASE_HEADER_SIZE+1]);
  213. return true;
  214. }
  215. BufferedPacket ReliablePacketBuffer::popFirst()
  216. {
  217. MutexAutoLock listlock(m_list_mutex);
  218. if (m_list.empty())
  219. throw NotFoundException("Buffer is empty");
  220. BufferedPacket p = *m_list.begin();
  221. m_list.erase(m_list.begin());
  222. --m_list_size;
  223. if (m_list_size == 0) {
  224. m_oldest_non_answered_ack = 0;
  225. } else {
  226. m_oldest_non_answered_ack =
  227. readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]);
  228. }
  229. return p;
  230. }
  231. BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
  232. {
  233. MutexAutoLock listlock(m_list_mutex);
  234. RPBSearchResult r = findPacket(seqnum);
  235. if (r == notFound()) {
  236. LOG(dout_con<<"Sequence number: " << seqnum
  237. << " not found in reliable buffer"<<std::endl);
  238. throw NotFoundException("seqnum not found in buffer");
  239. }
  240. BufferedPacket p = *r;
  241. RPBSearchResult next = r;
  242. ++next;
  243. if (next != notFound()) {
  244. u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1]));
  245. m_oldest_non_answered_ack = s;
  246. }
  247. m_list.erase(r);
  248. --m_list_size;
  249. if (m_list_size == 0)
  250. { m_oldest_non_answered_ack = 0; }
  251. else
  252. { m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]); }
  253. return p;
  254. }
  255. void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
  256. {
  257. MutexAutoLock listlock(m_list_mutex);
  258. if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
  259. errorstream << "ReliablePacketBuffer::insert(): Invalid data size for "
  260. "reliable packet" << std::endl;
  261. return;
  262. }
  263. u8 type = readU8(&p.data[BASE_HEADER_SIZE + 0]);
  264. if (type != TYPE_RELIABLE) {
  265. errorstream << "ReliablePacketBuffer::insert(): type is not reliable"
  266. << std::endl;
  267. return;
  268. }
  269. u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
  270. if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) {
  271. errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of "
  272. "expected window " << std::endl;
  273. return;
  274. }
  275. if (seqnum == next_expected) {
  276. errorstream << "ReliablePacketBuffer::insert(): seqnum is next expected"
  277. << std::endl;
  278. return;
  279. }
  280. ++m_list_size;
  281. sanity_check(m_list_size <= SEQNUM_MAX+1); // FIXME: Handle the error?
  282. // Find the right place for the packet and insert it there
  283. // If list is empty, just add it
  284. if (m_list.empty())
  285. {
  286. m_list.push_back(p);
  287. m_oldest_non_answered_ack = seqnum;
  288. // Done.
  289. return;
  290. }
  291. // Otherwise find the right place
  292. std::list<BufferedPacket>::iterator i = m_list.begin();
  293. // Find the first packet in the list which has a higher seqnum
  294. u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
  295. /* case seqnum is smaller then next_expected seqnum */
  296. /* this is true e.g. on wrap around */
  297. if (seqnum < next_expected) {
  298. while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
  299. ++i;
  300. if (i != m_list.end())
  301. s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
  302. }
  303. }
  304. /* non wrap around case (at least for incoming and next_expected */
  305. else
  306. {
  307. while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
  308. ++i;
  309. if (i != m_list.end())
  310. s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
  311. }
  312. }
  313. if (s == seqnum) {
  314. if (
  315. (readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) ||
  316. (i->data.getSize() != p.data.getSize()) ||
  317. (i->address != p.address)
  318. )
  319. {
  320. /* if this happens your maximum transfer window may be to big */
  321. fprintf(stderr,
  322. "Duplicated seqnum %d non matching packet detected:\n",
  323. seqnum);
  324. fprintf(stderr, "Old: seqnum: %05d size: %04d, address: %s\n",
  325. readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(),
  326. i->address.serializeString().c_str());
  327. fprintf(stderr, "New: seqnum: %05d size: %04u, address: %s\n",
  328. readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(),
  329. p.address.serializeString().c_str());
  330. throw IncomingDataCorruption("duplicated packet isn't same as original one");
  331. }
  332. /* nothing to do this seems to be a resent packet */
  333. /* for paranoia reason data should be compared */
  334. --m_list_size;
  335. }
  336. /* insert or push back */
  337. else if (i != m_list.end()) {
  338. m_list.insert(i, p);
  339. }
  340. else {
  341. m_list.push_back(p);
  342. }
  343. /* update last packet number */
  344. m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]);
  345. }
  346. void ReliablePacketBuffer::incrementTimeouts(float dtime)
  347. {
  348. MutexAutoLock listlock(m_list_mutex);
  349. for (BufferedPacket &bufferedPacket : m_list) {
  350. bufferedPacket.time += dtime;
  351. bufferedPacket.totaltime += dtime;
  352. }
  353. }
  354. std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
  355. unsigned int max_packets)
  356. {
  357. MutexAutoLock listlock(m_list_mutex);
  358. std::list<BufferedPacket> timed_outs;
  359. for (BufferedPacket &bufferedPacket : m_list) {
  360. if (bufferedPacket.time >= timeout) {
  361. timed_outs.push_back(bufferedPacket);
  362. //this packet will be sent right afterwards reset timeout here
  363. bufferedPacket.time = 0.0f;
  364. if (timed_outs.size() >= max_packets)
  365. break;
  366. }
  367. }
  368. return timed_outs;
  369. }
  370. /*
  371. IncomingSplitBuffer
  372. */
  373. IncomingSplitBuffer::~IncomingSplitBuffer()
  374. {
  375. MutexAutoLock listlock(m_map_mutex);
  376. for (auto &i : m_buf) {
  377. delete i.second;
  378. }
  379. }
  380. /*
  381. This will throw a GotSplitPacketException when a full
  382. split packet is constructed.
  383. */
  384. SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
  385. {
  386. MutexAutoLock listlock(m_map_mutex);
  387. u32 headersize = BASE_HEADER_SIZE + 7;
  388. if (p.data.getSize() < headersize) {
  389. errorstream << "Invalid data size for split packet" << std::endl;
  390. return SharedBuffer<u8>();
  391. }
  392. u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
  393. u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
  394. u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
  395. u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
  396. if (type != TYPE_SPLIT) {
  397. errorstream << "IncomingSplitBuffer::insert(): type is not split"
  398. << std::endl;
  399. return SharedBuffer<u8>();
  400. }
  401. // Add if doesn't exist
  402. if (m_buf.find(seqnum) == m_buf.end())
  403. {
  404. IncomingSplitPacket *sp = new IncomingSplitPacket();
  405. sp->chunk_count = chunk_count;
  406. sp->reliable = reliable;
  407. m_buf[seqnum] = sp;
  408. }
  409. IncomingSplitPacket *sp = m_buf[seqnum];
  410. // TODO: These errors should be thrown or something? Dunno.
  411. if (chunk_count != sp->chunk_count)
  412. LOG(derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
  413. <<" != sp->chunk_count="<<sp->chunk_count
  414. <<std::endl);
  415. if (reliable != sp->reliable)
  416. LOG(derr_con<<"Connection: WARNING: reliable="<<reliable
  417. <<" != sp->reliable="<<sp->reliable
  418. <<std::endl);
  419. // If chunk already exists, ignore it.
  420. // Sometimes two identical packets may arrive when there is network
  421. // lag and the server re-sends stuff.
  422. if (sp->chunks.find(chunk_num) != sp->chunks.end())
  423. return SharedBuffer<u8>();
  424. // Cut chunk data out of packet
  425. u32 chunkdatasize = p.data.getSize() - headersize;
  426. SharedBuffer<u8> chunkdata(chunkdatasize);
  427. memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
  428. // Set chunk data in buffer
  429. sp->chunks[chunk_num] = chunkdata;
  430. // If not all chunks are received, return empty buffer
  431. if (!sp->allReceived())
  432. return SharedBuffer<u8>();
  433. // Calculate total size
  434. u32 totalsize = 0;
  435. for (const auto &chunk : sp->chunks) {
  436. totalsize += chunk.second.getSize();
  437. }
  438. SharedBuffer<u8> fulldata(totalsize);
  439. // Copy chunks to data buffer
  440. u32 start = 0;
  441. for(u32 chunk_i=0; chunk_i<sp->chunk_count;
  442. chunk_i++)
  443. {
  444. SharedBuffer<u8> buf = sp->chunks[chunk_i];
  445. u16 chunkdatasize = buf.getSize();
  446. memcpy(&fulldata[start], *buf, chunkdatasize);
  447. start += chunkdatasize;;
  448. }
  449. // Remove sp from buffer
  450. m_buf.erase(seqnum);
  451. delete sp;
  452. return fulldata;
  453. }
  454. void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
  455. {
  456. std::list<u16> remove_queue;
  457. {
  458. MutexAutoLock listlock(m_map_mutex);
  459. for (auto &i : m_buf) {
  460. IncomingSplitPacket *p = i.second;
  461. // Reliable ones are not removed by timeout
  462. if (p->reliable)
  463. continue;
  464. p->time += dtime;
  465. if (p->time >= timeout)
  466. remove_queue.push_back(i.first);
  467. }
  468. }
  469. for (u16 j : remove_queue) {
  470. MutexAutoLock listlock(m_map_mutex);
  471. LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"<<std::endl);
  472. delete m_buf[j];
  473. m_buf.erase(j);
  474. }
  475. }
  476. /*
  477. ConnectionCommand
  478. */
  479. void ConnectionCommand::send(u16 peer_id_, u8 channelnum_, NetworkPacket *pkt,
  480. bool reliable_)
  481. {
  482. type = CONNCMD_SEND;
  483. peer_id = peer_id_;
  484. channelnum = channelnum_;
  485. data = pkt->oldForgePacket();
  486. reliable = reliable_;
  487. }
  488. /*
  489. Channel
  490. */
  491. u16 Channel::readNextIncomingSeqNum()
  492. {
  493. MutexAutoLock internal(m_internal_mutex);
  494. return next_incoming_seqnum;
  495. }
  496. u16 Channel::incNextIncomingSeqNum()
  497. {
  498. MutexAutoLock internal(m_internal_mutex);
  499. u16 retval = next_incoming_seqnum;
  500. next_incoming_seqnum++;
  501. return retval;
  502. }
  503. u16 Channel::readNextSplitSeqNum()
  504. {
  505. MutexAutoLock internal(m_internal_mutex);
  506. return next_outgoing_split_seqnum;
  507. }
  508. void Channel::setNextSplitSeqNum(u16 seqnum)
  509. {
  510. MutexAutoLock internal(m_internal_mutex);
  511. next_outgoing_split_seqnum = seqnum;
  512. }
  513. u16 Channel::getOutgoingSequenceNumber(bool& successful)
  514. {
  515. MutexAutoLock internal(m_internal_mutex);
  516. u16 retval = next_outgoing_seqnum;
  517. u16 lowest_unacked_seqnumber;
  518. /* shortcut if there ain't any packet in outgoing list */
  519. if (outgoing_reliables_sent.empty())
  520. {
  521. next_outgoing_seqnum++;
  522. return retval;
  523. }
  524. if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber))
  525. {
  526. if (lowest_unacked_seqnumber < next_outgoing_seqnum) {
  527. // ugly cast but this one is required in order to tell compiler we
  528. // know about difference of two unsigned may be negative in general
  529. // but we already made sure it won't happen in this case
  530. if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > window_size) {
  531. successful = false;
  532. return 0;
  533. }
  534. }
  535. else {
  536. // ugly cast but this one is required in order to tell compiler we
  537. // know about difference of two unsigned may be negative in general
  538. // but we already made sure it won't happen in this case
  539. if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) >
  540. window_size) {
  541. successful = false;
  542. return 0;
  543. }
  544. }
  545. }
  546. next_outgoing_seqnum++;
  547. return retval;
  548. }
  549. u16 Channel::readOutgoingSequenceNumber()
  550. {
  551. MutexAutoLock internal(m_internal_mutex);
  552. return next_outgoing_seqnum;
  553. }
  554. bool Channel::putBackSequenceNumber(u16 seqnum)
  555. {
  556. if (((seqnum + 1) % (SEQNUM_MAX+1)) == next_outgoing_seqnum) {
  557. next_outgoing_seqnum = seqnum;
  558. return true;
  559. }
  560. return false;
  561. }
  562. void Channel::UpdateBytesSent(unsigned int bytes, unsigned int packets)
  563. {
  564. MutexAutoLock internal(m_internal_mutex);
  565. current_bytes_transfered += bytes;
  566. current_packet_successfull += packets;
  567. }
  568. void Channel::UpdateBytesReceived(unsigned int bytes) {
  569. MutexAutoLock internal(m_internal_mutex);
  570. current_bytes_received += bytes;
  571. }
  572. void Channel::UpdateBytesLost(unsigned int bytes)
  573. {
  574. MutexAutoLock internal(m_internal_mutex);
  575. current_bytes_lost += bytes;
  576. }
  577. void Channel::UpdatePacketLossCounter(unsigned int count)
  578. {
  579. MutexAutoLock internal(m_internal_mutex);
  580. current_packet_loss += count;
  581. }
  582. void Channel::UpdatePacketTooLateCounter()
  583. {
  584. MutexAutoLock internal(m_internal_mutex);
  585. current_packet_too_late++;
  586. }
  587. void Channel::UpdateTimers(float dtime,bool legacy_peer)
  588. {
  589. bpm_counter += dtime;
  590. packet_loss_counter += dtime;
  591. if (packet_loss_counter > 1.0)
  592. {
  593. packet_loss_counter -= 1.0;
  594. unsigned int packet_loss = 11; /* use a neutral value for initialization */
  595. unsigned int packets_successfull = 0;
  596. //unsigned int packet_too_late = 0;
  597. bool reasonable_amount_of_data_transmitted = false;
  598. {
  599. MutexAutoLock internal(m_internal_mutex);
  600. packet_loss = current_packet_loss;
  601. //packet_too_late = current_packet_too_late;
  602. packets_successfull = current_packet_successfull;
  603. if (current_bytes_transfered > (unsigned int) (window_size*512/2))
  604. {
  605. reasonable_amount_of_data_transmitted = true;
  606. }
  607. current_packet_loss = 0;
  608. current_packet_too_late = 0;
  609. current_packet_successfull = 0;
  610. }
  611. /* dynamic window size is only available for non legacy peers */
  612. if (!legacy_peer) {
  613. float successfull_to_lost_ratio = 0.0;
  614. bool done = false;
  615. if (packets_successfull > 0) {
  616. successfull_to_lost_ratio = packet_loss/packets_successfull;
  617. }
  618. else if (packet_loss > 0)
  619. {
  620. window_size = MYMAX(
  621. (window_size - 10),
  622. MIN_RELIABLE_WINDOW_SIZE);
  623. done = true;
  624. }
  625. if (!done)
  626. {
  627. if ((successfull_to_lost_ratio < 0.01) &&
  628. (window_size < MAX_RELIABLE_WINDOW_SIZE))
  629. {
  630. /* don't even think about increasing if we didn't even
  631. * use major parts of our window */
  632. if (reasonable_amount_of_data_transmitted)
  633. window_size = MYMIN(
  634. (window_size + 100),
  635. MAX_RELIABLE_WINDOW_SIZE);
  636. }
  637. else if ((successfull_to_lost_ratio < 0.05) &&
  638. (window_size < MAX_RELIABLE_WINDOW_SIZE))
  639. {
  640. /* don't even think about increasing if we didn't even
  641. * use major parts of our window */
  642. if (reasonable_amount_of_data_transmitted)
  643. window_size = MYMIN(
  644. (window_size + 50),
  645. MAX_RELIABLE_WINDOW_SIZE);
  646. }
  647. else if (successfull_to_lost_ratio > 0.15)
  648. {
  649. window_size = MYMAX(
  650. (window_size - 100),
  651. MIN_RELIABLE_WINDOW_SIZE);
  652. }
  653. else if (successfull_to_lost_ratio > 0.1)
  654. {
  655. window_size = MYMAX(
  656. (window_size - 50),
  657. MIN_RELIABLE_WINDOW_SIZE);
  658. }
  659. }
  660. }
  661. }
  662. if (bpm_counter > 10.0)
  663. {
  664. {
  665. MutexAutoLock internal(m_internal_mutex);
  666. cur_kbps =
  667. (((float) current_bytes_transfered)/bpm_counter)/1024.0;
  668. current_bytes_transfered = 0;
  669. cur_kbps_lost =
  670. (((float) current_bytes_lost)/bpm_counter)/1024.0;
  671. current_bytes_lost = 0;
  672. cur_incoming_kbps =
  673. (((float) current_bytes_received)/bpm_counter)/1024.0;
  674. current_bytes_received = 0;
  675. bpm_counter = 0;
  676. }
  677. if (cur_kbps > max_kbps)
  678. {
  679. max_kbps = cur_kbps;
  680. }
  681. if (cur_kbps_lost > max_kbps_lost)
  682. {
  683. max_kbps_lost = cur_kbps_lost;
  684. }
  685. if (cur_incoming_kbps > max_incoming_kbps) {
  686. max_incoming_kbps = cur_incoming_kbps;
  687. }
  688. rate_samples = MYMIN(rate_samples+1,10);
  689. float old_fraction = ((float) (rate_samples-1) )/( (float) rate_samples);
  690. avg_kbps = avg_kbps * old_fraction +
  691. cur_kbps * (1.0 - old_fraction);
  692. avg_kbps_lost = avg_kbps_lost * old_fraction +
  693. cur_kbps_lost * (1.0 - old_fraction);
  694. avg_incoming_kbps = avg_incoming_kbps * old_fraction +
  695. cur_incoming_kbps * (1.0 - old_fraction);
  696. }
  697. }
  698. /*
  699. Peer
  700. */
  701. PeerHelper::PeerHelper(Peer* peer) :
  702. m_peer(peer)
  703. {
  704. if (peer && !peer->IncUseCount())
  705. m_peer = nullptr;
  706. }
  707. PeerHelper::~PeerHelper()
  708. {
  709. if (m_peer)
  710. m_peer->DecUseCount();
  711. m_peer = nullptr;
  712. }
  713. PeerHelper& PeerHelper::operator=(Peer* peer)
  714. {
  715. m_peer = peer;
  716. if (peer && !peer->IncUseCount())
  717. m_peer = nullptr;
  718. return *this;
  719. }
  720. Peer* PeerHelper::operator->() const
  721. {
  722. return m_peer;
  723. }
  724. Peer* PeerHelper::operator&() const
  725. {
  726. return m_peer;
  727. }
  728. bool PeerHelper::operator!() {
  729. return ! m_peer;
  730. }
  731. bool PeerHelper::operator!=(void* ptr)
  732. {
  733. return ((void*) m_peer != ptr);
  734. }
  735. bool Peer::IncUseCount()
  736. {
  737. MutexAutoLock lock(m_exclusive_access_mutex);
  738. if (!m_pending_deletion) {
  739. this->m_usage++;
  740. return true;
  741. }
  742. return false;
  743. }
  744. void Peer::DecUseCount()
  745. {
  746. {
  747. MutexAutoLock lock(m_exclusive_access_mutex);
  748. sanity_check(m_usage > 0);
  749. m_usage--;
  750. if (!((m_pending_deletion) && (m_usage == 0)))
  751. return;
  752. }
  753. delete this;
  754. }
  755. void Peer::RTTStatistics(float rtt, const std::string &profiler_id,
  756. unsigned int num_samples) {
  757. if (m_last_rtt > 0) {
  758. /* set min max values */
  759. if (rtt < m_rtt.min_rtt)
  760. m_rtt.min_rtt = rtt;
  761. if (rtt >= m_rtt.max_rtt)
  762. m_rtt.max_rtt = rtt;
  763. /* do average calculation */
  764. if (m_rtt.avg_rtt < 0.0)
  765. m_rtt.avg_rtt = rtt;
  766. else
  767. m_rtt.avg_rtt = m_rtt.avg_rtt * (num_samples/(num_samples-1)) +
  768. rtt * (1/num_samples);
  769. /* do jitter calculation */
  770. //just use some neutral value at beginning
  771. float jitter = m_rtt.jitter_min;
  772. if (rtt > m_last_rtt)
  773. jitter = rtt-m_last_rtt;
  774. if (rtt <= m_last_rtt)
  775. jitter = m_last_rtt - rtt;
  776. if (jitter < m_rtt.jitter_min)
  777. m_rtt.jitter_min = jitter;
  778. if (jitter >= m_rtt.jitter_max)
  779. m_rtt.jitter_max = jitter;
  780. if (m_rtt.jitter_avg < 0.0)
  781. m_rtt.jitter_avg = jitter;
  782. else
  783. m_rtt.jitter_avg = m_rtt.jitter_avg * (num_samples/(num_samples-1)) +
  784. jitter * (1/num_samples);
  785. if (!profiler_id.empty()) {
  786. g_profiler->graphAdd(profiler_id + "_rtt", rtt);
  787. g_profiler->graphAdd(profiler_id + "_jitter", jitter);
  788. }
  789. }
  790. /* save values required for next loop */
  791. m_last_rtt = rtt;
  792. }
  793. bool Peer::isTimedOut(float timeout)
  794. {
  795. MutexAutoLock lock(m_exclusive_access_mutex);
  796. u64 current_time = porting::getTimeMs();
  797. float dtime = CALC_DTIME(m_last_timeout_check,current_time);
  798. m_last_timeout_check = current_time;
  799. m_timeout_counter += dtime;
  800. return m_timeout_counter > timeout;
  801. }
  802. void Peer::Drop()
  803. {
  804. {
  805. MutexAutoLock usage_lock(m_exclusive_access_mutex);
  806. m_pending_deletion = true;
  807. if (m_usage != 0)
  808. return;
  809. }
  810. PROFILE(std::stringstream peerIdentifier1);
  811. PROFILE(peerIdentifier1 << "runTimeouts[" << m_connection->getDesc()
  812. << ";" << id << ";RELIABLE]");
  813. PROFILE(g_profiler->remove(peerIdentifier1.str()));
  814. PROFILE(std::stringstream peerIdentifier2);
  815. PROFILE(peerIdentifier2 << "sendPackets[" << m_connection->getDesc()
  816. << ";" << id << ";RELIABLE]");
  817. PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier2.str(), SPT_AVG));
  818. delete this;
  819. }
  820. UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) :
  821. Peer(a_address,a_id,connection)
  822. {
  823. }
  824. bool UDPPeer::getAddress(MTProtocols type,Address& toset)
  825. {
  826. if ((type == MTP_UDP) || (type == MTP_MINETEST_RELIABLE_UDP) || (type == MTP_PRIMARY))
  827. {
  828. toset = address;
  829. return true;
  830. }
  831. return false;
  832. }
  833. void UDPPeer::setNonLegacyPeer()
  834. {
  835. m_legacy_peer = false;
  836. for(unsigned int i=0; i< CHANNEL_COUNT; i++)
  837. {
  838. channels->setWindowSize(g_settings->getU16("max_packets_per_iteration"));
  839. }
  840. }
  841. void UDPPeer::reportRTT(float rtt)
  842. {
  843. if (rtt < 0.0) {
  844. return;
  845. }
  846. RTTStatistics(rtt,"rudp",MAX_RELIABLE_WINDOW_SIZE*10);
  847. float timeout = getStat(AVG_RTT) * RESEND_TIMEOUT_FACTOR;
  848. if (timeout < RESEND_TIMEOUT_MIN)
  849. timeout = RESEND_TIMEOUT_MIN;
  850. if (timeout > RESEND_TIMEOUT_MAX)
  851. timeout = RESEND_TIMEOUT_MAX;
  852. MutexAutoLock usage_lock(m_exclusive_access_mutex);
  853. resend_timeout = timeout;
  854. }
  855. bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
  856. {
  857. m_ping_timer += dtime;
  858. if (m_ping_timer >= PING_TIMEOUT)
  859. {
  860. // Create and send PING packet
  861. writeU8(&data[0], TYPE_CONTROL);
  862. writeU8(&data[1], CONTROLTYPE_PING);
  863. m_ping_timer = 0.0;
  864. return true;
  865. }
  866. return false;
  867. }
  868. void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
  869. unsigned int max_packet_size)
  870. {
  871. if (m_pending_disconnect)
  872. return;
  873. if ( channels[c.channelnum].queued_commands.empty() &&
  874. /* don't queue more packets then window size */
  875. (channels[c.channelnum].queued_reliables.size()
  876. < (channels[c.channelnum].getWindowSize()/2))) {
  877. LOG(dout_con<<m_connection->getDesc()
  878. <<" processing reliable command for peer id: " << c.peer_id
  879. <<" data size: " << c.data.getSize() << std::endl);
  880. if (!processReliableSendCommand(c,max_packet_size)) {
  881. channels[c.channelnum].queued_commands.push_back(c);
  882. }
  883. }
  884. else {
  885. LOG(dout_con<<m_connection->getDesc()
  886. <<" Queueing reliable command for peer id: " << c.peer_id
  887. <<" data size: " << c.data.getSize() <<std::endl);
  888. channels[c.channelnum].queued_commands.push_back(c);
  889. }
  890. }
  891. bool UDPPeer::processReliableSendCommand(
  892. ConnectionCommand &c,
  893. unsigned int max_packet_size)
  894. {
  895. if (m_pending_disconnect)
  896. return true;
  897. u32 chunksize_max = max_packet_size
  898. - BASE_HEADER_SIZE
  899. - RELIABLE_HEADER_SIZE;
  900. sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
  901. std::list<SharedBuffer<u8> > originals;
  902. u16 split_sequence_number = channels[c.channelnum].readNextSplitSeqNum();
  903. if (c.raw)
  904. {
  905. originals.emplace_back(c.data);
  906. }
  907. else {
  908. originals = makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number);
  909. channels[c.channelnum].setNextSplitSeqNum(split_sequence_number);
  910. }
  911. bool have_sequence_number = true;
  912. bool have_initial_sequence_number = false;
  913. std::queue<BufferedPacket> toadd;
  914. volatile u16 initial_sequence_number = 0;
  915. for (SharedBuffer<u8> &original : originals) {
  916. u16 seqnum = channels[c.channelnum].getOutgoingSequenceNumber(have_sequence_number);
  917. /* oops, we don't have enough sequence numbers to send this packet */
  918. if (!have_sequence_number)
  919. break;
  920. if (!have_initial_sequence_number)
  921. {
  922. initial_sequence_number = seqnum;
  923. have_initial_sequence_number = true;
  924. }
  925. SharedBuffer<u8> reliable = makeReliablePacket(original, seqnum);
  926. // Add base headers and make a packet
  927. BufferedPacket p = con::makePacket(address, reliable,
  928. m_connection->GetProtocolID(), m_connection->GetPeerID(),
  929. c.channelnum);
  930. toadd.push(p);
  931. }
  932. if (have_sequence_number) {
  933. volatile u16 pcount = 0;
  934. while (!toadd.empty()) {
  935. BufferedPacket p = toadd.front();
  936. toadd.pop();
  937. // LOG(dout_con<<connection->getDesc()
  938. // << " queuing reliable packet for peer_id: " << c.peer_id
  939. // << " channel: " << (c.channelnum&0xFF)
  940. // << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
  941. // << std::endl)
  942. channels[c.channelnum].queued_reliables.push(p);
  943. pcount++;
  944. }
  945. sanity_check(channels[c.channelnum].queued_reliables.size() < 0xFFFF);
  946. return true;
  947. }
  948. volatile u16 packets_available = toadd.size();
  949. /* we didn't get a single sequence number no need to fill queue */
  950. if (!have_initial_sequence_number) {
  951. return false;
  952. }
  953. while (!toadd.empty()) {
  954. /* remove packet */
  955. toadd.pop();
  956. bool successfully_put_back_sequence_number
  957. = channels[c.channelnum].putBackSequenceNumber(
  958. (initial_sequence_number+toadd.size() % (SEQNUM_MAX+1)));
  959. FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error");
  960. }
  961. LOG(dout_con<<m_connection->getDesc()
  962. << " Windowsize exceeded on reliable sending "
  963. << c.data.getSize() << " bytes"
  964. << std::endl << "\t\tinitial_sequence_number: "
  965. << initial_sequence_number
  966. << std::endl << "\t\tgot at most : "
  967. << packets_available << " packets"
  968. << std::endl << "\t\tpackets queued : "
  969. << channels[c.channelnum].outgoing_reliables_sent.size()
  970. << std::endl);
  971. return false;
  972. }
  973. void UDPPeer::RunCommandQueues(
  974. unsigned int max_packet_size,
  975. unsigned int maxcommands,
  976. unsigned int maxtransfer)
  977. {
  978. for (Channel &channel : channels) {
  979. unsigned int commands_processed = 0;
  980. if ((!channel.queued_commands.empty()) &&
  981. (channel.queued_reliables.size() < maxtransfer) &&
  982. (commands_processed < maxcommands)) {
  983. try {
  984. ConnectionCommand c = channel.queued_commands.front();
  985. LOG(dout_con << m_connection->getDesc()
  986. << " processing queued reliable command " << std::endl);
  987. // Packet is processed, remove it from queue
  988. if (processReliableSendCommand(c,max_packet_size)) {
  989. channel.queued_commands.pop_front();
  990. } else {
  991. LOG(dout_con << m_connection->getDesc()
  992. << " Failed to queue packets for peer_id: " << c.peer_id
  993. << ", delaying sending of " << c.data.getSize()
  994. << " bytes" << std::endl);
  995. }
  996. }
  997. catch (ItemNotFoundException &e) {
  998. // intentionally empty
  999. }
  1000. }
  1001. }
  1002. }
  1003. u16 UDPPeer::getNextSplitSequenceNumber(u8 channel)
  1004. {
  1005. assert(channel < CHANNEL_COUNT); // Pre-condition
  1006. return channels[channel].readNextSplitSeqNum();
  1007. }
  1008. void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum)
  1009. {
  1010. assert(channel < CHANNEL_COUNT); // Pre-condition
  1011. channels[channel].setNextSplitSeqNum(seqnum);
  1012. }
  1013. SharedBuffer<u8> UDPPeer::addSpiltPacket(u8 channel,
  1014. BufferedPacket toadd,
  1015. bool reliable)
  1016. {
  1017. assert(channel < CHANNEL_COUNT); // Pre-condition
  1018. return channels[channel].incoming_splits.insert(toadd,reliable);
  1019. }
  1020. /******************************************************************************/
  1021. /* Connection Threads */
  1022. /******************************************************************************/
  1023. ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
  1024. float timeout) :
  1025. Thread("ConnectionSend"),
  1026. m_max_packet_size(max_packet_size),
  1027. m_timeout(timeout),
  1028. m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
  1029. {
  1030. }
  1031. void * ConnectionSendThread::run()
  1032. {
  1033. assert(m_connection);
  1034. LOG(dout_con<<m_connection->getDesc()
  1035. <<"ConnectionSend thread started"<<std::endl);
  1036. u64 curtime = porting::getTimeMs();
  1037. u64 lasttime = curtime;
  1038. PROFILE(std::stringstream ThreadIdentifier);
  1039. PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
  1040. /* if stop is requested don't stop immediately but try to send all */
  1041. /* packets first */
  1042. while(!stopRequested() || packetsQueued()) {
  1043. BEGIN_DEBUG_EXCEPTION_HANDLER
  1044. PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
  1045. m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
  1046. /* wait for trigger or timeout */
  1047. m_send_sleep_semaphore.wait(50);
  1048. /* remove all triggers */
  1049. while(m_send_sleep_semaphore.wait(0)) {}
  1050. lasttime = curtime;
  1051. curtime = porting::getTimeMs();
  1052. float dtime = CALC_DTIME(lasttime,curtime);
  1053. /* first do all the reliable stuff */
  1054. runTimeouts(dtime);
  1055. /* translate commands to packets */
  1056. ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
  1057. while(c.type != CONNCMD_NONE)
  1058. {
  1059. if (c.reliable)
  1060. processReliableCommand(c);
  1061. else
  1062. processNonReliableCommand(c);
  1063. c = m_connection->m_command_queue.pop_frontNoEx(0);
  1064. }
  1065. /* send non reliable packets */
  1066. sendPackets(dtime);
  1067. END_DEBUG_EXCEPTION_HANDLER
  1068. }
  1069. PROFILE(g_profiler->remove(ThreadIdentifier.str()));
  1070. return NULL;
  1071. }
  1072. void ConnectionSendThread::Trigger()
  1073. {
  1074. m_send_sleep_semaphore.post();
  1075. }
  1076. bool ConnectionSendThread::packetsQueued()
  1077. {
  1078. std::list<u16> peerIds = m_connection->getPeerIDs();
  1079. if (!m_outgoing_queue.empty() && !peerIds.empty())
  1080. return true;
  1081. for (u16 peerId : peerIds) {
  1082. PeerHelper peer = m_connection->getPeerNoEx(peerId);
  1083. if (!peer)
  1084. continue;
  1085. if (dynamic_cast<UDPPeer*>(&peer) == 0)
  1086. continue;
  1087. for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
  1088. if (!channel.queued_commands.empty()) {
  1089. return true;
  1090. }
  1091. }
  1092. }
  1093. return false;
  1094. }
  1095. void ConnectionSendThread::runTimeouts(float dtime)
  1096. {
  1097. std::list<u16> timeouted_peers;
  1098. std::list<u16> peerIds = m_connection->getPeerIDs();
  1099. for (u16 &peerId : peerIds) {
  1100. PeerHelper peer = m_connection->getPeerNoEx(peerId);
  1101. if (!peer)
  1102. continue;
  1103. if (dynamic_cast<UDPPeer*>(&peer) == 0)
  1104. continue;
  1105. PROFILE(std::stringstream peerIdentifier);
  1106. PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
  1107. << ";" << peerId << ";RELIABLE]");
  1108. PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
  1109. SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
  1110. /*
  1111. Check peer timeout
  1112. */
  1113. if (peer->isTimedOut(m_timeout))
  1114. {
  1115. infostream<<m_connection->getDesc()
  1116. <<"RunTimeouts(): Peer "<<peer->id
  1117. <<" has timed out."
  1118. <<" (source=peer->timeout_counter)"
  1119. <<std::endl;
  1120. // Add peer to the list
  1121. timeouted_peers.push_back(peer->id);
  1122. // Don't bother going through the buffers of this one
  1123. continue;
  1124. }
  1125. float resend_timeout = dynamic_cast<UDPPeer*>(&peer)->getResendTimeout();
  1126. bool retry_count_exceeded = false;
  1127. for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
  1128. std::list<BufferedPacket> timed_outs;
  1129. if (dynamic_cast<UDPPeer*>(&peer)->getLegacyPeer())
  1130. channel.setWindowSize(g_settings->getU16("workaround_window_size"));
  1131. // Remove timed out incomplete unreliable split packets
  1132. channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
  1133. // Increment reliable packet times
  1134. channel.outgoing_reliables_sent.incrementTimeouts(dtime);
  1135. unsigned int numpeers = m_connection->m_peers.size();
  1136. if (numpeers == 0)
  1137. return;
  1138. // Re-send timed out outgoing reliables
  1139. timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
  1140. (m_max_data_packets_per_iteration/numpeers));
  1141. channel.UpdatePacketLossCounter(timed_outs.size());
  1142. g_profiler->graphAdd("packets_lost", timed_outs.size());
  1143. m_iteration_packets_avaialble -= timed_outs.size();
  1144. for(std::list<BufferedPacket>::iterator k = timed_outs.begin();
  1145. k != timed_outs.end(); ++k)
  1146. {
  1147. u16 peer_id = readPeerId(*(k->data));
  1148. u8 channelnum = readChannel(*(k->data));
  1149. u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE+1]));
  1150. channel.UpdateBytesLost(k->data.getSize());
  1151. k->resend_count++;
  1152. if (k-> resend_count > MAX_RELIABLE_RETRY) {
  1153. retry_count_exceeded = true;
  1154. timeouted_peers.push_back(peer->id);
  1155. /* no need to check additional packets if a single one did timeout*/
  1156. break;
  1157. }
  1158. LOG(derr_con<<m_connection->getDesc()
  1159. <<"RE-SENDING timed-out RELIABLE to "
  1160. << k->address.serializeString()
  1161. << "(t/o="<<resend_timeout<<"): "
  1162. <<"from_peer_id="<<peer_id
  1163. <<", channel="<<((int)channelnum&0xff)
  1164. <<", seqnum="<<seqnum
  1165. <<std::endl);
  1166. rawSend(*k);
  1167. // do not handle rtt here as we can't decide if this packet was
  1168. // lost or really takes more time to transmit
  1169. }
  1170. if (retry_count_exceeded) {
  1171. break; /* no need to check other channels if we already did timeout */
  1172. }
  1173. channel.UpdateTimers(dtime,dynamic_cast<UDPPeer*>(&peer)->getLegacyPeer());
  1174. }
  1175. /* skip to next peer if we did timeout */
  1176. if (retry_count_exceeded)
  1177. continue;
  1178. /* send ping if necessary */
  1179. if (dynamic_cast<UDPPeer*>(&peer)->Ping(dtime,data)) {
  1180. LOG(dout_con<<m_connection->getDesc()
  1181. <<"Sending ping for peer_id: "
  1182. << dynamic_cast<UDPPeer*>(&peer)->id <<std::endl);
  1183. /* this may fail if there ain't a sequence number left */
  1184. if (!rawSendAsPacket(dynamic_cast<UDPPeer*>(&peer)->id, 0, data, true))
  1185. {
  1186. //retrigger with reduced ping interval
  1187. dynamic_cast<UDPPeer*>(&peer)->Ping(4.0,data);
  1188. }
  1189. }
  1190. dynamic_cast<UDPPeer*>(&peer)->RunCommandQueues(m_max_packet_size,
  1191. m_max_commands_per_iteration,
  1192. m_max_packets_requeued);
  1193. }
  1194. // Remove timed out peers
  1195. for (u16 timeouted_peer : timeouted_peers) {
  1196. LOG(derr_con << m_connection->getDesc()
  1197. << "RunTimeouts(): Removing peer "<< timeouted_peer <<std::endl);
  1198. m_connection->deletePeer(timeouted_peer, true);
  1199. }
  1200. }
  1201. void ConnectionSendThread::rawSend(const BufferedPacket &packet)
  1202. {
  1203. try{
  1204. m_connection->m_udpSocket.Send(packet.address, *packet.data,
  1205. packet.data.getSize());
  1206. LOG(dout_con <<m_connection->getDesc()
  1207. << " rawSend: " << packet.data.getSize()
  1208. << " bytes sent" << std::endl);
  1209. } catch(SendFailedException &e) {
  1210. LOG(derr_con<<m_connection->getDesc()
  1211. <<"Connection::rawSend(): SendFailedException: "
  1212. <<packet.address.serializeString()<<std::endl);
  1213. }
  1214. }
  1215. void ConnectionSendThread::sendAsPacketReliable(BufferedPacket& p, Channel* channel)
  1216. {
  1217. try{
  1218. p.absolute_send_time = porting::getTimeMs();
  1219. // Buffer the packet
  1220. channel->outgoing_reliables_sent.insert(p,
  1221. (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
  1222. % (MAX_RELIABLE_WINDOW_SIZE+1));
  1223. }
  1224. catch(AlreadyExistsException &e)
  1225. {
  1226. LOG(derr_con<<m_connection->getDesc()
  1227. <<"WARNING: Going to send a reliable packet"
  1228. <<" in outgoing buffer" <<std::endl);
  1229. }
  1230. // Send the packet
  1231. rawSend(p);
  1232. }
  1233. bool ConnectionSendThread::rawSendAsPacket(u16 peer_id, u8 channelnum,
  1234. SharedBuffer<u8> data, bool reliable)
  1235. {
  1236. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  1237. if (!peer) {
  1238. LOG(dout_con<<m_connection->getDesc()
  1239. <<" INFO: dropped packet for non existent peer_id: "
  1240. << peer_id << std::endl);
  1241. FATAL_ERROR_IF(!reliable, "Trying to send raw packet reliable but no peer found!");
  1242. return false;
  1243. }
  1244. Channel *channel = &(dynamic_cast<UDPPeer*>(&peer)->channels[channelnum]);
  1245. if (reliable)
  1246. {
  1247. bool have_sequence_number_for_raw_packet = true;
  1248. u16 seqnum =
  1249. channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
  1250. if (!have_sequence_number_for_raw_packet)
  1251. return false;
  1252. SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
  1253. Address peer_address;
  1254. peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
  1255. // Add base headers and make a packet
  1256. BufferedPacket p = con::makePacket(peer_address, reliable,
  1257. m_connection->GetProtocolID(), m_connection->GetPeerID(),
  1258. channelnum);
  1259. // first check if our send window is already maxed out
  1260. if (channel->outgoing_reliables_sent.size()
  1261. < channel->getWindowSize()) {
  1262. LOG(dout_con<<m_connection->getDesc()
  1263. <<" INFO: sending a reliable packet to peer_id " << peer_id
  1264. <<" channel: " << channelnum
  1265. <<" seqnum: " << seqnum << std::endl);
  1266. sendAsPacketReliable(p,channel);
  1267. return true;
  1268. }
  1269. LOG(dout_con<<m_connection->getDesc()
  1270. <<" INFO: queueing reliable packet for peer_id: " << peer_id
  1271. <<" channel: " << channelnum
  1272. <<" seqnum: " << seqnum << std::endl);
  1273. channel->queued_reliables.push(p);
  1274. return false;
  1275. }
  1276. Address peer_address;
  1277. if (peer->getAddress(MTP_UDP, peer_address)) {
  1278. // Add base headers and make a packet
  1279. BufferedPacket p = con::makePacket(peer_address, data,
  1280. m_connection->GetProtocolID(), m_connection->GetPeerID(),
  1281. channelnum);
  1282. // Send the packet
  1283. rawSend(p);
  1284. return true;
  1285. }
  1286. LOG(dout_con << m_connection->getDesc()
  1287. << " INFO: dropped unreliable packet for peer_id: " << peer_id
  1288. << " because of (yet) missing udp address" << std::endl);
  1289. return false;
  1290. }
  1291. void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
  1292. {
  1293. assert(c.reliable); // Pre-condition
  1294. switch(c.type) {
  1295. case CONNCMD_NONE:
  1296. LOG(dout_con<<m_connection->getDesc()
  1297. <<"UDP processing reliable CONNCMD_NONE"<<std::endl);
  1298. return;
  1299. case CONNCMD_SEND:
  1300. LOG(dout_con<<m_connection->getDesc()
  1301. <<"UDP processing reliable CONNCMD_SEND"<<std::endl);
  1302. sendReliable(c);
  1303. return;
  1304. case CONNCMD_SEND_TO_ALL:
  1305. LOG(dout_con<<m_connection->getDesc()
  1306. <<"UDP processing CONNCMD_SEND_TO_ALL"<<std::endl);
  1307. sendToAllReliable(c);
  1308. return;
  1309. case CONCMD_CREATE_PEER:
  1310. LOG(dout_con<<m_connection->getDesc()
  1311. <<"UDP processing reliable CONCMD_CREATE_PEER"<<std::endl);
  1312. if (!rawSendAsPacket(c.peer_id,c.channelnum,c.data,c.reliable))
  1313. {
  1314. /* put to queue if we couldn't send it immediately */
  1315. sendReliable(c);
  1316. }
  1317. return;
  1318. case CONCMD_DISABLE_LEGACY:
  1319. LOG(dout_con<<m_connection->getDesc()
  1320. <<"UDP processing reliable CONCMD_DISABLE_LEGACY"<<std::endl);
  1321. if (!rawSendAsPacket(c.peer_id,c.channelnum,c.data,c.reliable))
  1322. {
  1323. /* put to queue if we couldn't send it immediately */
  1324. sendReliable(c);
  1325. }
  1326. return;
  1327. case CONNCMD_SERVE:
  1328. case CONNCMD_CONNECT:
  1329. case CONNCMD_DISCONNECT:
  1330. case CONCMD_ACK:
  1331. FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
  1332. default:
  1333. LOG(dout_con<<m_connection->getDesc()
  1334. <<" Invalid reliable command type: " << c.type <<std::endl);
  1335. }
  1336. }
  1337. void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
  1338. {
  1339. assert(!c.reliable); // Pre-condition
  1340. switch(c.type) {
  1341. case CONNCMD_NONE:
  1342. LOG(dout_con<<m_connection->getDesc()
  1343. <<" UDP processing CONNCMD_NONE"<<std::endl);
  1344. return;
  1345. case CONNCMD_SERVE:
  1346. LOG(dout_con<<m_connection->getDesc()
  1347. <<" UDP processing CONNCMD_SERVE port="
  1348. <<c.address.serializeString()<<std::endl);
  1349. serve(c.address);
  1350. return;
  1351. case CONNCMD_CONNECT:
  1352. LOG(dout_con<<m_connection->getDesc()
  1353. <<" UDP processing CONNCMD_CONNECT"<<std::endl);
  1354. connect(c.address);
  1355. return;
  1356. case CONNCMD_DISCONNECT:
  1357. LOG(dout_con<<m_connection->getDesc()
  1358. <<" UDP processing CONNCMD_DISCONNECT"<<std::endl);
  1359. disconnect();
  1360. return;
  1361. case CONNCMD_DISCONNECT_PEER:
  1362. LOG(dout_con<<m_connection->getDesc()
  1363. <<" UDP processing CONNCMD_DISCONNECT_PEER"<<std::endl);
  1364. disconnect_peer(c.peer_id);
  1365. return;
  1366. case CONNCMD_SEND:
  1367. LOG(dout_con<<m_connection->getDesc()
  1368. <<" UDP processing CONNCMD_SEND"<<std::endl);
  1369. send(c.peer_id, c.channelnum, c.data);
  1370. return;
  1371. case CONNCMD_SEND_TO_ALL:
  1372. LOG(dout_con<<m_connection->getDesc()
  1373. <<" UDP processing CONNCMD_SEND_TO_ALL"<<std::endl);
  1374. sendToAll(c.channelnum, c.data);
  1375. return;
  1376. case CONCMD_ACK:
  1377. LOG(dout_con<<m_connection->getDesc()
  1378. <<" UDP processing CONCMD_ACK"<<std::endl);
  1379. sendAsPacket(c.peer_id,c.channelnum,c.data,true);
  1380. return;
  1381. case CONCMD_CREATE_PEER:
  1382. FATAL_ERROR("Got command that should be reliable as unreliable command");
  1383. default:
  1384. LOG(dout_con<<m_connection->getDesc()
  1385. <<" Invalid command type: " << c.type <<std::endl);
  1386. }
  1387. }
  1388. void ConnectionSendThread::serve(Address bind_address)
  1389. {
  1390. LOG(dout_con<<m_connection->getDesc()
  1391. <<"UDP serving at port " << bind_address.serializeString() <<std::endl);
  1392. try{
  1393. m_connection->m_udpSocket.Bind(bind_address);
  1394. m_connection->SetPeerID(PEER_ID_SERVER);
  1395. }
  1396. catch(SocketException &e) {
  1397. // Create event
  1398. ConnectionEvent ce;
  1399. ce.bindFailed();
  1400. m_connection->putEvent(ce);
  1401. }
  1402. }
  1403. void ConnectionSendThread::connect(Address address)
  1404. {
  1405. LOG(dout_con<<m_connection->getDesc()<<" connecting to "<<address.serializeString()
  1406. <<":"<<address.getPort()<<std::endl);
  1407. UDPPeer *peer = m_connection->createServerPeer(address);
  1408. // Create event
  1409. ConnectionEvent e;
  1410. e.peerAdded(peer->id, peer->address);
  1411. m_connection->putEvent(e);
  1412. Address bind_addr;
  1413. if (address.isIPv6())
  1414. bind_addr.setAddress((IPv6AddressBytes*) NULL);
  1415. else
  1416. bind_addr.setAddress(0,0,0,0);
  1417. m_connection->m_udpSocket.Bind(bind_addr);
  1418. // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
  1419. m_connection->SetPeerID(PEER_ID_INEXISTENT);
  1420. NetworkPacket pkt(0,0);
  1421. m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
  1422. }
  1423. void ConnectionSendThread::disconnect()
  1424. {
  1425. LOG(dout_con<<m_connection->getDesc()<<" disconnecting"<<std::endl);
  1426. // Create and send DISCO packet
  1427. SharedBuffer<u8> data(2);
  1428. writeU8(&data[0], TYPE_CONTROL);
  1429. writeU8(&data[1], CONTROLTYPE_DISCO);
  1430. // Send to all
  1431. std::list<u16> peerids = m_connection->getPeerIDs();
  1432. for (u16 peerid : peerids) {
  1433. sendAsPacket(peerid, 0,data,false);
  1434. }
  1435. }
  1436. void ConnectionSendThread::disconnect_peer(u16 peer_id)
  1437. {
  1438. LOG(dout_con<<m_connection->getDesc()<<" disconnecting peer"<<std::endl);
  1439. // Create and send DISCO packet
  1440. SharedBuffer<u8> data(2);
  1441. writeU8(&data[0], TYPE_CONTROL);
  1442. writeU8(&data[1], CONTROLTYPE_DISCO);
  1443. sendAsPacket(peer_id, 0,data,false);
  1444. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  1445. if (!peer)
  1446. return;
  1447. if (dynamic_cast<UDPPeer*>(&peer) == 0)
  1448. {
  1449. return;
  1450. }
  1451. dynamic_cast<UDPPeer*>(&peer)->m_pending_disconnect = true;
  1452. }
  1453. void ConnectionSendThread::send(u16 peer_id, u8 channelnum,
  1454. SharedBuffer<u8> data)
  1455. {
  1456. assert(channelnum < CHANNEL_COUNT); // Pre-condition
  1457. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  1458. if (!peer)
  1459. {
  1460. LOG(dout_con<<m_connection->getDesc()<<" peer: peer_id="<<peer_id
  1461. << ">>>NOT<<< found on sending packet"
  1462. << ", channel " << (channelnum % 0xFF)
  1463. << ", size: " << data.getSize() <<std::endl);
  1464. return;
  1465. }
  1466. LOG(dout_con<<m_connection->getDesc()<<" sending to peer_id="<<peer_id
  1467. << ", channel " << (channelnum % 0xFF)
  1468. << ", size: " << data.getSize() <<std::endl);
  1469. u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
  1470. u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
  1471. std::list<SharedBuffer<u8> > originals;
  1472. originals = makeAutoSplitPacket(data, chunksize_max,split_sequence_number);
  1473. peer->setNextSplitSequenceNumber(channelnum,split_sequence_number);
  1474. for (const SharedBuffer<u8> &original : originals) {
  1475. sendAsPacket(peer_id, channelnum, original);
  1476. }
  1477. }
  1478. void ConnectionSendThread::sendReliable(ConnectionCommand &c)
  1479. {
  1480. PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
  1481. if (!peer)
  1482. return;
  1483. peer->PutReliableSendCommand(c,m_max_packet_size);
  1484. }
  1485. void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer<u8> data)
  1486. {
  1487. std::list<u16> peerids = m_connection->getPeerIDs();
  1488. for (u16 peerid : peerids) {
  1489. send(peerid, channelnum, data);
  1490. }
  1491. }
  1492. void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
  1493. {
  1494. std::list<u16> peerids = m_connection->getPeerIDs();
  1495. for (u16 peerid : peerids) {
  1496. PeerHelper peer = m_connection->getPeerNoEx(peerid);
  1497. if (!peer)
  1498. continue;
  1499. peer->PutReliableSendCommand(c,m_max_packet_size);
  1500. }
  1501. }
  1502. void ConnectionSendThread::sendPackets(float dtime)
  1503. {
  1504. std::list<u16> peerIds = m_connection->getPeerIDs();
  1505. std::list<u16> pendingDisconnect;
  1506. std::map<u16,bool> pending_unreliable;
  1507. for (u16 peerId : peerIds) {
  1508. PeerHelper peer = m_connection->getPeerNoEx(peerId);
  1509. //peer may have been removed
  1510. if (!peer) {
  1511. LOG(dout_con<<m_connection->getDesc()<< " Peer not found: peer_id=" << peerId
  1512. << std::endl);
  1513. continue;
  1514. }
  1515. peer->m_increment_packets_remaining = m_iteration_packets_avaialble/m_connection->m_peers.size();
  1516. UDPPeer *udpPeer = dynamic_cast<UDPPeer*>(&peer);
  1517. if (!udpPeer) {
  1518. continue;
  1519. }
  1520. if (udpPeer->m_pending_disconnect) {
  1521. pendingDisconnect.push_back(peerId);
  1522. }
  1523. PROFILE(std::stringstream peerIdentifier);
  1524. PROFILE(peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
  1525. << ";RELIABLE]");
  1526. PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
  1527. LOG(dout_con<<m_connection->getDesc()
  1528. << " Handle per peer queues: peer_id=" << peerId
  1529. << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
  1530. // first send queued reliable packets for all peers (if possible)
  1531. for (unsigned int i=0; i < CHANNEL_COUNT; i++) {
  1532. Channel &channel = udpPeer->channels[i];
  1533. u16 next_to_ack = 0;
  1534. channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
  1535. u16 next_to_receive = 0;
  1536. channel.incoming_reliables.getFirstSeqnum(next_to_receive);
  1537. LOG(dout_con<<m_connection->getDesc()<< "\t channel: "
  1538. << i << ", peer quota:"
  1539. << peer->m_increment_packets_remaining
  1540. << std::endl
  1541. << "\t\t\treliables on wire: "
  1542. << channel.outgoing_reliables_sent.size()
  1543. << ", waiting for ack for " << next_to_ack
  1544. << std::endl
  1545. << "\t\t\tincoming_reliables: "
  1546. << channel.incoming_reliables.size()
  1547. << ", next reliable packet: "
  1548. << channel.readNextIncomingSeqNum()
  1549. << ", next queued: " << next_to_receive
  1550. << std::endl
  1551. << "\t\t\treliables queued : "
  1552. << channel.queued_reliables.size()
  1553. << std::endl
  1554. << "\t\t\tqueued commands : "
  1555. << channel.queued_commands.size()
  1556. << std::endl);
  1557. while ((!channel.queued_reliables.empty()) &&
  1558. (channel.outgoing_reliables_sent.size()
  1559. < channel.getWindowSize())&&
  1560. (peer->m_increment_packets_remaining > 0))
  1561. {
  1562. BufferedPacket p = channel.queued_reliables.front();
  1563. channel.queued_reliables.pop();
  1564. LOG(dout_con<<m_connection->getDesc()
  1565. <<" INFO: sending a queued reliable packet "
  1566. <<" channel: " << i
  1567. <<", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
  1568. << std::endl);
  1569. sendAsPacketReliable(p, &channel);
  1570. peer->m_increment_packets_remaining--;
  1571. }
  1572. }
  1573. }
  1574. if (!m_outgoing_queue.empty()) {
  1575. LOG(dout_con<<m_connection->getDesc()
  1576. << " Handle non reliable queue ("
  1577. << m_outgoing_queue.size() << " pkts)" << std::endl);
  1578. }
  1579. unsigned int initial_queuesize = m_outgoing_queue.size();
  1580. /* send non reliable packets*/
  1581. for(unsigned int i=0;i < initial_queuesize;i++) {
  1582. OutgoingPacket packet = m_outgoing_queue.front();
  1583. m_outgoing_queue.pop();
  1584. if (packet.reliable)
  1585. continue;
  1586. PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
  1587. if (!peer) {
  1588. LOG(dout_con<<m_connection->getDesc()
  1589. <<" Outgoing queue: peer_id="<<packet.peer_id
  1590. << ">>>NOT<<< found on sending packet"
  1591. << ", channel " << (packet.channelnum % 0xFF)
  1592. << ", size: " << packet.data.getSize() <<std::endl);
  1593. continue;
  1594. }
  1595. /* send acks immediately */
  1596. if (packet.ack) {
  1597. rawSendAsPacket(packet.peer_id, packet.channelnum,
  1598. packet.data, packet.reliable);
  1599. peer->m_increment_packets_remaining =
  1600. MYMIN(0,peer->m_increment_packets_remaining--);
  1601. }
  1602. else if (
  1603. ( peer->m_increment_packets_remaining > 0) ||
  1604. (stopRequested())) {
  1605. rawSendAsPacket(packet.peer_id, packet.channelnum,
  1606. packet.data, packet.reliable);
  1607. peer->m_increment_packets_remaining--;
  1608. }
  1609. else {
  1610. m_outgoing_queue.push(packet);
  1611. pending_unreliable[packet.peer_id] = true;
  1612. }
  1613. }
  1614. for (u16 peerId : pendingDisconnect) {
  1615. if (!pending_unreliable[peerId])
  1616. {
  1617. m_connection->deletePeer(peerId,false);
  1618. }
  1619. }
  1620. }
  1621. void ConnectionSendThread::sendAsPacket(u16 peer_id, u8 channelnum,
  1622. SharedBuffer<u8> data, bool ack)
  1623. {
  1624. OutgoingPacket packet(peer_id, channelnum, data, false, ack);
  1625. m_outgoing_queue.push(packet);
  1626. }
  1627. ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
  1628. Thread("ConnectionReceive")
  1629. {
  1630. }
  1631. void * ConnectionReceiveThread::run()
  1632. {
  1633. assert(m_connection);
  1634. LOG(dout_con<<m_connection->getDesc()
  1635. <<"ConnectionReceive thread started"<<std::endl);
  1636. PROFILE(std::stringstream ThreadIdentifier);
  1637. PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
  1638. #ifdef DEBUG_CONNECTION_KBPS
  1639. u64 curtime = porting::getTimeMs();
  1640. u64 lasttime = curtime;
  1641. float debug_print_timer = 0.0;
  1642. #endif
  1643. while(!stopRequested()) {
  1644. BEGIN_DEBUG_EXCEPTION_HANDLER
  1645. PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
  1646. #ifdef DEBUG_CONNECTION_KBPS
  1647. lasttime = curtime;
  1648. curtime = porting::getTimeMs();
  1649. float dtime = CALC_DTIME(lasttime,curtime);
  1650. #endif
  1651. /* receive packets */
  1652. receive();
  1653. #ifdef DEBUG_CONNECTION_KBPS
  1654. debug_print_timer += dtime;
  1655. if (debug_print_timer > 20.0) {
  1656. debug_print_timer -= 20.0;
  1657. std::list<u16> peerids = m_connection->getPeerIDs();
  1658. for (std::list<u16>::iterator i = peerids.begin();
  1659. i != peerids.end();
  1660. i++)
  1661. {
  1662. PeerHelper peer = m_connection->getPeerNoEx(*i);
  1663. if (!peer)
  1664. continue;
  1665. float peer_current = 0.0;
  1666. float peer_loss = 0.0;
  1667. float avg_rate = 0.0;
  1668. float avg_loss = 0.0;
  1669. for(u16 j=0; j<CHANNEL_COUNT; j++)
  1670. {
  1671. peer_current +=peer->channels[j].getCurrentDownloadRateKB();
  1672. peer_loss += peer->channels[j].getCurrentLossRateKB();
  1673. avg_rate += peer->channels[j].getAvgDownloadRateKB();
  1674. avg_loss += peer->channels[j].getAvgLossRateKB();
  1675. }
  1676. std::stringstream output;
  1677. output << std::fixed << std::setprecision(1);
  1678. output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
  1679. output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
  1680. output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
  1681. output << std::setfill(' ');
  1682. for(u16 j=0; j<CHANNEL_COUNT; j++)
  1683. {
  1684. output << "\tcha " << j << ":"
  1685. << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
  1686. << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
  1687. << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
  1688. << " /"
  1689. << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
  1690. << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
  1691. << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
  1692. << " / WS: " << peer->channels[j].getWindowSize()
  1693. << std::endl;
  1694. }
  1695. fprintf(stderr,"%s\n",output.str().c_str());
  1696. }
  1697. }
  1698. #endif
  1699. END_DEBUG_EXCEPTION_HANDLER
  1700. }
  1701. PROFILE(g_profiler->remove(ThreadIdentifier.str()));
  1702. return NULL;
  1703. }
  1704. // Receive packets from the network and buffers and create ConnectionEvents
  1705. void ConnectionReceiveThread::receive()
  1706. {
  1707. // use IPv6 minimum allowed MTU as receive buffer size as this is
  1708. // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
  1709. // infrastructure
  1710. unsigned int packet_maxsize = 1500;
  1711. SharedBuffer<u8> packetdata(packet_maxsize);
  1712. bool packet_queued = true;
  1713. unsigned int loop_count = 0;
  1714. /* first of all read packets from socket */
  1715. /* check for incoming data available */
  1716. while( (loop_count < 10) &&
  1717. (m_connection->m_udpSocket.WaitData(50))) {
  1718. loop_count++;
  1719. try {
  1720. if (packet_queued) {
  1721. bool data_left = true;
  1722. u16 peer_id;
  1723. SharedBuffer<u8> resultdata;
  1724. while(data_left) {
  1725. try {
  1726. data_left = getFromBuffers(peer_id, resultdata);
  1727. if (data_left) {
  1728. ConnectionEvent e;
  1729. e.dataReceived(peer_id, resultdata);
  1730. m_connection->putEvent(e);
  1731. }
  1732. }
  1733. catch(ProcessedSilentlyException &e) {
  1734. /* try reading again */
  1735. }
  1736. }
  1737. packet_queued = false;
  1738. }
  1739. Address sender;
  1740. s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata, packet_maxsize);
  1741. if ((received_size < BASE_HEADER_SIZE) ||
  1742. (readU32(&packetdata[0]) != m_connection->GetProtocolID()))
  1743. {
  1744. LOG(derr_con<<m_connection->getDesc()
  1745. <<"Receive(): Invalid incoming packet, "
  1746. <<"size: " << received_size
  1747. <<", protocol: "
  1748. << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
  1749. << std::endl);
  1750. continue;
  1751. }
  1752. u16 peer_id = readPeerId(*packetdata);
  1753. u8 channelnum = readChannel(*packetdata);
  1754. if (channelnum > CHANNEL_COUNT-1) {
  1755. LOG(derr_con<<m_connection->getDesc()
  1756. <<"Receive(): Invalid channel "<<channelnum<<std::endl);
  1757. throw InvalidIncomingDataException("Channel doesn't exist");
  1758. }
  1759. /* Try to identify peer by sender address (may happen on join) */
  1760. if (peer_id == PEER_ID_INEXISTENT) {
  1761. peer_id = m_connection->lookupPeer(sender);
  1762. // We do not have to remind the peer of its
  1763. // peer id as the CONTROLTYPE_SET_PEER_ID
  1764. // command was sent reliably.
  1765. }
  1766. /* The peer was not found in our lists. Add it. */
  1767. if (peer_id == PEER_ID_INEXISTENT) {
  1768. peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
  1769. }
  1770. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  1771. if (!peer) {
  1772. LOG(dout_con<<m_connection->getDesc()
  1773. <<" got packet from unknown peer_id: "
  1774. <<peer_id<<" Ignoring."<<std::endl);
  1775. continue;
  1776. }
  1777. // Validate peer address
  1778. Address peer_address;
  1779. if (peer->getAddress(MTP_UDP, peer_address)) {
  1780. if (peer_address != sender) {
  1781. LOG(derr_con<<m_connection->getDesc()
  1782. <<m_connection->getDesc()
  1783. <<" Peer "<<peer_id<<" sending from different address."
  1784. " Ignoring."<<std::endl);
  1785. continue;
  1786. }
  1787. }
  1788. else {
  1789. bool invalid_address = true;
  1790. if (invalid_address) {
  1791. LOG(derr_con<<m_connection->getDesc()
  1792. <<m_connection->getDesc()
  1793. <<" Peer "<<peer_id<<" unknown."
  1794. " Ignoring."<<std::endl);
  1795. continue;
  1796. }
  1797. }
  1798. peer->ResetTimeout();
  1799. Channel *channel = 0;
  1800. if (dynamic_cast<UDPPeer*>(&peer) != 0)
  1801. {
  1802. channel = &(dynamic_cast<UDPPeer*>(&peer)->channels[channelnum]);
  1803. }
  1804. if (channel != 0) {
  1805. channel->UpdateBytesReceived(received_size);
  1806. }
  1807. // Throw the received packet to channel->processPacket()
  1808. // Make a new SharedBuffer from the data without the base headers
  1809. SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
  1810. memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
  1811. strippeddata.getSize());
  1812. try{
  1813. // Process it (the result is some data with no headers made by us)
  1814. SharedBuffer<u8> resultdata = processPacket
  1815. (channel, strippeddata, peer_id, channelnum, false);
  1816. LOG(dout_con<<m_connection->getDesc()
  1817. <<" ProcessPacket from peer_id: " << peer_id
  1818. << ",channel: " << (channelnum & 0xFF) << ", returned "
  1819. << resultdata.getSize() << " bytes" <<std::endl);
  1820. ConnectionEvent e;
  1821. e.dataReceived(peer_id, resultdata);
  1822. m_connection->putEvent(e);
  1823. }
  1824. catch(ProcessedSilentlyException &e) {
  1825. }
  1826. catch(ProcessedQueued &e) {
  1827. packet_queued = true;
  1828. }
  1829. }
  1830. catch(InvalidIncomingDataException &e) {
  1831. }
  1832. catch(ProcessedSilentlyException &e) {
  1833. }
  1834. }
  1835. }
  1836. bool ConnectionReceiveThread::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
  1837. {
  1838. std::list<u16> peerids = m_connection->getPeerIDs();
  1839. for (u16 peerid : peerids) {
  1840. PeerHelper peer = m_connection->getPeerNoEx(peerid);
  1841. if (!peer)
  1842. continue;
  1843. if (dynamic_cast<UDPPeer*>(&peer) == 0)
  1844. continue;
  1845. for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
  1846. if (checkIncomingBuffers(&channel, peer_id, dst)) {
  1847. return true;
  1848. }
  1849. }
  1850. }
  1851. return false;
  1852. }
  1853. bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
  1854. u16 &peer_id, SharedBuffer<u8> &dst)
  1855. {
  1856. u16 firstseqnum = 0;
  1857. if (channel->incoming_reliables.getFirstSeqnum(firstseqnum))
  1858. {
  1859. if (firstseqnum == channel->readNextIncomingSeqNum())
  1860. {
  1861. BufferedPacket p = channel->incoming_reliables.popFirst();
  1862. peer_id = readPeerId(*p.data);
  1863. u8 channelnum = readChannel(*p.data);
  1864. u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
  1865. LOG(dout_con<<m_connection->getDesc()
  1866. <<"UNBUFFERING TYPE_RELIABLE"
  1867. <<" seqnum="<<seqnum
  1868. <<" peer_id="<<peer_id
  1869. <<" channel="<<((int)channelnum&0xff)
  1870. <<std::endl);
  1871. channel->incNextIncomingSeqNum();
  1872. u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
  1873. // Get out the inside packet and re-process it
  1874. SharedBuffer<u8> payload(p.data.getSize() - headers_size);
  1875. memcpy(*payload, &p.data[headers_size], payload.getSize());
  1876. dst = processPacket(channel, payload, peer_id, channelnum, true);
  1877. return true;
  1878. }
  1879. }
  1880. return false;
  1881. }
  1882. SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
  1883. SharedBuffer<u8> packetdata, u16 peer_id, u8 channelnum, bool reliable)
  1884. {
  1885. PeerHelper peer = m_connection->getPeerNoEx(peer_id);
  1886. if (!peer) {
  1887. errorstream << "Peer not found (possible timeout)" << std::endl;
  1888. throw ProcessedSilentlyException("Peer not found (possible timeout)");
  1889. }
  1890. if (packetdata.getSize() < 1)
  1891. throw InvalidIncomingDataException("packetdata.getSize() < 1");
  1892. u8 type = readU8(&(packetdata[0]));
  1893. if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
  1894. std::string errmsg = "Invalid peer_id=" + itos(peer_id);
  1895. errorstream << errmsg << std::endl;
  1896. throw InvalidIncomingDataException(errmsg.c_str());
  1897. }
  1898. if (type == TYPE_CONTROL)
  1899. {
  1900. if (packetdata.getSize() < 2)
  1901. throw InvalidIncomingDataException("packetdata.getSize() < 2");
  1902. u8 controltype = readU8(&(packetdata[1]));
  1903. if (controltype == CONTROLTYPE_ACK)
  1904. {
  1905. assert(channel != NULL);
  1906. if (packetdata.getSize() < 4) {
  1907. throw InvalidIncomingDataException(
  1908. "packetdata.getSize() < 4 (ACK header size)");
  1909. }
  1910. u16 seqnum = readU16(&packetdata[2]);
  1911. LOG(dout_con<<m_connection->getDesc()
  1912. <<" [ CONTROLTYPE_ACK: channelnum="
  1913. <<((int)channelnum&0xff)<<", peer_id="<<peer_id
  1914. <<", seqnum="<<seqnum<< " ]"<<std::endl);
  1915. try{
  1916. BufferedPacket p =
  1917. channel->outgoing_reliables_sent.popSeqnum(seqnum);
  1918. // only calculate rtt from straight sent packets
  1919. if (p.resend_count == 0) {
  1920. // Get round trip time
  1921. u64 current_time = porting::getTimeMs();
  1922. // a overflow is quite unlikely but as it'd result in major
  1923. // rtt miscalculation we handle it here
  1924. if (current_time > p.absolute_send_time)
  1925. {
  1926. float rtt = (current_time - p.absolute_send_time) / 1000.0;
  1927. // Let peer calculate stuff according to it
  1928. // (avg_rtt and resend_timeout)
  1929. dynamic_cast<UDPPeer*>(&peer)->reportRTT(rtt);
  1930. }
  1931. else if (p.totaltime > 0)
  1932. {
  1933. float rtt = p.totaltime;
  1934. // Let peer calculate stuff according to it
  1935. // (avg_rtt and resend_timeout)
  1936. dynamic_cast<UDPPeer*>(&peer)->reportRTT(rtt);
  1937. }
  1938. }
  1939. //put bytes for max bandwidth calculation
  1940. channel->UpdateBytesSent(p.data.getSize(),1);
  1941. if (channel->outgoing_reliables_sent.size() == 0)
  1942. {
  1943. m_connection->TriggerSend();
  1944. }
  1945. }
  1946. catch(NotFoundException &e) {
  1947. LOG(derr_con<<m_connection->getDesc()
  1948. <<"WARNING: ACKed packet not "
  1949. "in outgoing queue"
  1950. <<std::endl);
  1951. channel->UpdatePacketTooLateCounter();
  1952. }
  1953. throw ProcessedSilentlyException("Got an ACK");
  1954. }
  1955. else if (controltype == CONTROLTYPE_SET_PEER_ID) {
  1956. // Got a packet to set our peer id
  1957. if (packetdata.getSize() < 4)
  1958. throw InvalidIncomingDataException
  1959. ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
  1960. u16 peer_id_new = readU16(&packetdata[2]);
  1961. LOG(dout_con<<m_connection->getDesc()
  1962. <<"Got new peer id: "<<peer_id_new<<"... "<<std::endl);
  1963. if (m_connection->GetPeerID() != PEER_ID_INEXISTENT)
  1964. {
  1965. LOG(derr_con<<m_connection->getDesc()
  1966. <<"WARNING: Not changing"
  1967. " existing peer id."<<std::endl);
  1968. }
  1969. else
  1970. {
  1971. LOG(dout_con<<m_connection->getDesc()<<"changing own peer id"<<std::endl);
  1972. m_connection->SetPeerID(peer_id_new);
  1973. }
  1974. ConnectionCommand cmd;
  1975. SharedBuffer<u8> reply(2);
  1976. writeU8(&reply[0], TYPE_CONTROL);
  1977. writeU8(&reply[1], CONTROLTYPE_ENABLE_BIG_SEND_WINDOW);
  1978. cmd.disableLegacy(PEER_ID_SERVER,reply);
  1979. m_connection->putCommand(cmd);
  1980. throw ProcessedSilentlyException("Got a SET_PEER_ID");
  1981. }
  1982. else if (controltype == CONTROLTYPE_PING)
  1983. {
  1984. // Just ignore it, the incoming data already reset
  1985. // the timeout counter
  1986. LOG(dout_con<<m_connection->getDesc()<<"PING"<<std::endl);
  1987. throw ProcessedSilentlyException("Got a PING");
  1988. }
  1989. else if (controltype == CONTROLTYPE_DISCO)
  1990. {
  1991. // Just ignore it, the incoming data already reset
  1992. // the timeout counter
  1993. LOG(dout_con<<m_connection->getDesc()
  1994. <<"DISCO: Removing peer "<<(peer_id)<<std::endl);
  1995. if (!m_connection->deletePeer(peer_id, false)) {
  1996. derr_con<<m_connection->getDesc()
  1997. <<"DISCO: Peer not found"<<std::endl;
  1998. }
  1999. throw ProcessedSilentlyException("Got a DISCO");
  2000. }
  2001. else if (controltype == CONTROLTYPE_ENABLE_BIG_SEND_WINDOW)
  2002. {
  2003. dynamic_cast<UDPPeer*>(&peer)->setNonLegacyPeer();
  2004. throw ProcessedSilentlyException("Got non legacy control");
  2005. }
  2006. else{
  2007. LOG(derr_con<<m_connection->getDesc()
  2008. <<"INVALID TYPE_CONTROL: invalid controltype="
  2009. <<((int)controltype&0xff)<<std::endl);
  2010. throw InvalidIncomingDataException("Invalid control type");
  2011. }
  2012. }
  2013. else if (type == TYPE_ORIGINAL)
  2014. {
  2015. if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
  2016. throw InvalidIncomingDataException
  2017. ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
  2018. LOG(dout_con<<m_connection->getDesc()
  2019. <<"RETURNING TYPE_ORIGINAL to user"
  2020. <<std::endl);
  2021. // Get the inside packet out and return it
  2022. SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
  2023. memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
  2024. return payload;
  2025. }
  2026. else if (type == TYPE_SPLIT)
  2027. {
  2028. Address peer_address;
  2029. if (peer->getAddress(MTP_UDP, peer_address)) {
  2030. // We have to create a packet again for buffering
  2031. // This isn't actually too bad an idea.
  2032. BufferedPacket packet = makePacket(
  2033. peer_address,
  2034. packetdata,
  2035. m_connection->GetProtocolID(),
  2036. peer_id,
  2037. channelnum);
  2038. // Buffer the packet
  2039. SharedBuffer<u8> data =
  2040. peer->addSpiltPacket(channelnum,packet,reliable);
  2041. if (data.getSize() != 0)
  2042. {
  2043. LOG(dout_con<<m_connection->getDesc()
  2044. <<"RETURNING TYPE_SPLIT: Constructed full data, "
  2045. <<"size="<<data.getSize()<<std::endl);
  2046. return data;
  2047. }
  2048. LOG(dout_con<<m_connection->getDesc()<<"BUFFERED TYPE_SPLIT"<<std::endl);
  2049. throw ProcessedSilentlyException("Buffered a split packet chunk");
  2050. }
  2051. else {
  2052. //TODO throw some error
  2053. }
  2054. }
  2055. else if (type == TYPE_RELIABLE)
  2056. {
  2057. assert(channel != NULL);
  2058. // Recursive reliable packets not allowed
  2059. if (reliable)
  2060. throw InvalidIncomingDataException("Found nested reliable packets");
  2061. if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
  2062. throw InvalidIncomingDataException
  2063. ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
  2064. u16 seqnum = readU16(&packetdata[1]);
  2065. bool is_future_packet = false;
  2066. bool is_old_packet = false;
  2067. /* packet is within our receive window send ack */
  2068. if (seqnum_in_window(seqnum, channel->readNextIncomingSeqNum(),MAX_RELIABLE_WINDOW_SIZE))
  2069. {
  2070. m_connection->sendAck(peer_id,channelnum,seqnum);
  2071. }
  2072. else {
  2073. is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
  2074. is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
  2075. /* packet is not within receive window, don't send ack. *
  2076. * if this was a valid packet it's gonna be retransmitted */
  2077. if (is_future_packet)
  2078. {
  2079. throw ProcessedSilentlyException("Received packet newer then expected, not sending ack");
  2080. }
  2081. /* seems like our ack was lost, send another one for a old packet */
  2082. if (is_old_packet)
  2083. {
  2084. LOG(dout_con<<m_connection->getDesc()
  2085. << "RE-SENDING ACK: peer_id: " << peer_id
  2086. << ", channel: " << (channelnum&0xFF)
  2087. << ", seqnum: " << seqnum << std::endl;)
  2088. m_connection->sendAck(peer_id,channelnum,seqnum);
  2089. // we already have this packet so this one was on wire at least
  2090. // the current timeout
  2091. // we don't know how long this packet was on wire don't do silly guessing
  2092. // dynamic_cast<UDPPeer*>(&peer)->reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
  2093. throw ProcessedSilentlyException("Retransmitting ack for old packet");
  2094. }
  2095. }
  2096. if (seqnum != channel->readNextIncomingSeqNum())
  2097. {
  2098. Address peer_address;
  2099. // this is a reliable packet so we have a udp address for sure
  2100. peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
  2101. // This one comes later, buffer it.
  2102. // Actually we have to make a packet to buffer one.
  2103. // Well, we have all the ingredients, so just do it.
  2104. BufferedPacket packet = con::makePacket(
  2105. peer_address,
  2106. packetdata,
  2107. m_connection->GetProtocolID(),
  2108. peer_id,
  2109. channelnum);
  2110. try{
  2111. channel->incoming_reliables.insert(packet,channel->readNextIncomingSeqNum());
  2112. LOG(dout_con<<m_connection->getDesc()
  2113. << "BUFFERING, TYPE_RELIABLE peer_id: " << peer_id
  2114. << ", channel: " << (channelnum&0xFF)
  2115. << ", seqnum: " << seqnum << std::endl;)
  2116. throw ProcessedQueued("Buffered future reliable packet");
  2117. }
  2118. catch(AlreadyExistsException &e)
  2119. {
  2120. }
  2121. catch(IncomingDataCorruption &e)
  2122. {
  2123. ConnectionCommand discon;
  2124. discon.disconnect_peer(peer_id);
  2125. m_connection->putCommand(discon);
  2126. LOG(derr_con<<m_connection->getDesc()
  2127. << "INVALID, TYPE_RELIABLE peer_id: " << peer_id
  2128. << ", channel: " << (channelnum&0xFF)
  2129. << ", seqnum: " << seqnum
  2130. << "DROPPING CLIENT!" << std::endl;)
  2131. }
  2132. }
  2133. /* we got a packet to process right now */
  2134. LOG(dout_con<<m_connection->getDesc()
  2135. << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer_id
  2136. << ", channel: " << (channelnum&0xFF)
  2137. << ", seqnum: " << seqnum << std::endl;)
  2138. /* check for resend case */
  2139. u16 queued_seqnum = 0;
  2140. if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum))
  2141. {
  2142. if (queued_seqnum == seqnum)
  2143. {
  2144. BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
  2145. /** TODO find a way to verify the new against the old packet */
  2146. }
  2147. }
  2148. channel->incNextIncomingSeqNum();
  2149. // Get out the inside packet and re-process it
  2150. SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
  2151. memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
  2152. return processPacket(channel, payload, peer_id, channelnum, true);
  2153. }
  2154. else
  2155. {
  2156. derr_con<<m_connection->getDesc()
  2157. <<"Got invalid type="<<((int)type&0xff)<<std::endl;
  2158. throw InvalidIncomingDataException("Invalid packet type");
  2159. }
  2160. // We should never get here.
  2161. FATAL_ERROR("Invalid execution point");
  2162. }
  2163. /*
  2164. Connection
  2165. */
  2166. Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
  2167. bool ipv6, PeerHandler *peerhandler) :
  2168. m_udpSocket(ipv6),
  2169. m_protocol_id(protocol_id),
  2170. m_sendThread(max_packet_size, timeout),
  2171. m_receiveThread(max_packet_size),
  2172. m_bc_peerhandler(peerhandler)
  2173. {
  2174. m_udpSocket.setTimeoutMs(5);
  2175. m_sendThread.setParent(this);
  2176. m_receiveThread.setParent(this);
  2177. m_sendThread.start();
  2178. m_receiveThread.start();
  2179. }
  2180. Connection::~Connection()
  2181. {
  2182. m_shutting_down = true;
  2183. // request threads to stop
  2184. m_sendThread.stop();
  2185. m_receiveThread.stop();
  2186. //TODO for some unkonwn reason send/receive threads do not exit as they're
  2187. // supposed to be but wait on peer timeout. To speed up shutdown we reduce
  2188. // timeout to half a second.
  2189. m_sendThread.setPeerTimeout(0.5);
  2190. // wait for threads to finish
  2191. m_sendThread.wait();
  2192. m_receiveThread.wait();
  2193. // Delete peers
  2194. for (auto &peer : m_peers) {
  2195. delete peer.second;
  2196. }
  2197. }
  2198. /* Internal stuff */
  2199. void Connection::putEvent(ConnectionEvent &e)
  2200. {
  2201. assert(e.type != CONNEVENT_NONE); // Pre-condition
  2202. m_event_queue.push_back(e);
  2203. }
  2204. PeerHelper Connection::getPeer(u16 peer_id)
  2205. {
  2206. MutexAutoLock peerlock(m_peers_mutex);
  2207. std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
  2208. if (node == m_peers.end()) {
  2209. throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
  2210. }
  2211. // Error checking
  2212. FATAL_ERROR_IF(node->second->id != peer_id, "Invalid peer id");
  2213. return PeerHelper(node->second);
  2214. }
  2215. PeerHelper Connection::getPeerNoEx(u16 peer_id)
  2216. {
  2217. MutexAutoLock peerlock(m_peers_mutex);
  2218. std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
  2219. if (node == m_peers.end()) {
  2220. return PeerHelper(NULL);
  2221. }
  2222. // Error checking
  2223. FATAL_ERROR_IF(node->second->id != peer_id, "Invalid peer id");
  2224. return PeerHelper(node->second);
  2225. }
  2226. /* find peer_id for address */
  2227. u16 Connection::lookupPeer(Address& sender)
  2228. {
  2229. MutexAutoLock peerlock(m_peers_mutex);
  2230. std::map<u16, Peer*>::iterator j;
  2231. j = m_peers.begin();
  2232. for(; j != m_peers.end(); ++j)
  2233. {
  2234. Peer *peer = j->second;
  2235. if (peer->isPendingDeletion())
  2236. continue;
  2237. Address tocheck;
  2238. if ((peer->getAddress(MTP_MINETEST_RELIABLE_UDP, tocheck)) && (tocheck == sender))
  2239. return peer->id;
  2240. if ((peer->getAddress(MTP_UDP, tocheck)) && (tocheck == sender))
  2241. return peer->id;
  2242. }
  2243. return PEER_ID_INEXISTENT;
  2244. }
  2245. std::list<Peer*> Connection::getPeers()
  2246. {
  2247. std::list<Peer*> list;
  2248. for (auto &p : m_peers) {
  2249. Peer *peer = p.second;
  2250. list.push_back(peer);
  2251. }
  2252. return list;
  2253. }
  2254. bool Connection::deletePeer(u16 peer_id, bool timeout)
  2255. {
  2256. Peer *peer = 0;
  2257. /* lock list as short as possible */
  2258. {
  2259. MutexAutoLock peerlock(m_peers_mutex);
  2260. if (m_peers.find(peer_id) == m_peers.end())
  2261. return false;
  2262. peer = m_peers[peer_id];
  2263. m_peers.erase(peer_id);
  2264. m_peer_ids.remove(peer_id);
  2265. }
  2266. Address peer_address;
  2267. //any peer has a primary address this never fails!
  2268. peer->getAddress(MTP_PRIMARY, peer_address);
  2269. // Create event
  2270. ConnectionEvent e;
  2271. e.peerRemoved(peer_id, timeout, peer_address);
  2272. putEvent(e);
  2273. peer->Drop();
  2274. return true;
  2275. }
  2276. /* Interface */
  2277. ConnectionEvent Connection::waitEvent(u32 timeout_ms)
  2278. {
  2279. try {
  2280. return m_event_queue.pop_front(timeout_ms);
  2281. } catch(ItemNotFoundException &ex) {
  2282. ConnectionEvent e;
  2283. e.type = CONNEVENT_NONE;
  2284. return e;
  2285. }
  2286. }
  2287. void Connection::putCommand(ConnectionCommand &c)
  2288. {
  2289. if (!m_shutting_down) {
  2290. m_command_queue.push_back(c);
  2291. m_sendThread.Trigger();
  2292. }
  2293. }
  2294. void Connection::Serve(Address bind_addr)
  2295. {
  2296. ConnectionCommand c;
  2297. c.serve(bind_addr);
  2298. putCommand(c);
  2299. }
  2300. void Connection::Connect(Address address)
  2301. {
  2302. ConnectionCommand c;
  2303. c.connect(address);
  2304. putCommand(c);
  2305. }
  2306. bool Connection::Connected()
  2307. {
  2308. MutexAutoLock peerlock(m_peers_mutex);
  2309. if (m_peers.size() != 1)
  2310. return false;
  2311. std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
  2312. if (node == m_peers.end())
  2313. return false;
  2314. if (m_peer_id == PEER_ID_INEXISTENT)
  2315. return false;
  2316. return true;
  2317. }
  2318. void Connection::Disconnect()
  2319. {
  2320. ConnectionCommand c;
  2321. c.disconnect();
  2322. putCommand(c);
  2323. }
  2324. void Connection::Receive(NetworkPacket* pkt)
  2325. {
  2326. for(;;) {
  2327. ConnectionEvent e = waitEvent(m_bc_receive_timeout);
  2328. if (e.type != CONNEVENT_NONE)
  2329. LOG(dout_con << getDesc() << ": Receive: got event: "
  2330. << e.describe() << std::endl);
  2331. switch(e.type) {
  2332. case CONNEVENT_NONE:
  2333. throw NoIncomingDataException("No incoming data");
  2334. case CONNEVENT_DATA_RECEIVED:
  2335. // Data size is lesser than command size, ignoring packet
  2336. if (e.data.getSize() < 2) {
  2337. continue;
  2338. }
  2339. pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id);
  2340. return;
  2341. case CONNEVENT_PEER_ADDED: {
  2342. UDPPeer tmp(e.peer_id, e.address, this);
  2343. if (m_bc_peerhandler)
  2344. m_bc_peerhandler->peerAdded(&tmp);
  2345. continue;
  2346. }
  2347. case CONNEVENT_PEER_REMOVED: {
  2348. UDPPeer tmp(e.peer_id, e.address, this);
  2349. if (m_bc_peerhandler)
  2350. m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
  2351. continue;
  2352. }
  2353. case CONNEVENT_BIND_FAILED:
  2354. throw ConnectionBindFailed("Failed to bind socket "
  2355. "(port already in use?)");
  2356. }
  2357. }
  2358. throw NoIncomingDataException("No incoming data");
  2359. }
  2360. void Connection::Send(u16 peer_id, u8 channelnum,
  2361. NetworkPacket* pkt, bool reliable)
  2362. {
  2363. assert(channelnum < CHANNEL_COUNT); // Pre-condition
  2364. ConnectionCommand c;
  2365. c.send(peer_id, channelnum, pkt, reliable);
  2366. putCommand(c);
  2367. }
  2368. Address Connection::GetPeerAddress(u16 peer_id)
  2369. {
  2370. PeerHelper peer = getPeerNoEx(peer_id);
  2371. if (!peer)
  2372. throw PeerNotFoundException("No address for peer found!");
  2373. Address peer_address;
  2374. peer->getAddress(MTP_PRIMARY, peer_address);
  2375. return peer_address;
  2376. }
  2377. float Connection::getPeerStat(u16 peer_id, rtt_stat_type type)
  2378. {
  2379. PeerHelper peer = getPeerNoEx(peer_id);
  2380. if (!peer) return -1;
  2381. return peer->getStat(type);
  2382. }
  2383. float Connection::getLocalStat(rate_stat_type type)
  2384. {
  2385. PeerHelper peer = getPeerNoEx(PEER_ID_SERVER);
  2386. FATAL_ERROR_IF(!peer, "Connection::getLocalStat we couldn't get our own peer? are you serious???");
  2387. float retval = 0.0;
  2388. for (Channel &channel : dynamic_cast<UDPPeer *>(&peer)->channels) {
  2389. switch(type) {
  2390. case CUR_DL_RATE:
  2391. retval += channel.getCurrentDownloadRateKB();
  2392. break;
  2393. case AVG_DL_RATE:
  2394. retval += channel.getAvgDownloadRateKB();
  2395. break;
  2396. case CUR_INC_RATE:
  2397. retval += channel.getCurrentIncomingRateKB();
  2398. break;
  2399. case AVG_INC_RATE:
  2400. retval += channel.getAvgIncomingRateKB();
  2401. break;
  2402. case AVG_LOSS_RATE:
  2403. retval += channel.getAvgLossRateKB();
  2404. break;
  2405. case CUR_LOSS_RATE:
  2406. retval += channel.getCurrentLossRateKB();
  2407. break;
  2408. default:
  2409. FATAL_ERROR("Connection::getLocalStat Invalid stat type");
  2410. }
  2411. }
  2412. return retval;
  2413. }
  2414. u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
  2415. {
  2416. // Somebody wants to make a new connection
  2417. // Get a unique peer id (2 or higher)
  2418. u16 peer_id_new = m_next_remote_peer_id;
  2419. u16 overflow = MAX_UDP_PEERS;
  2420. /*
  2421. Find an unused peer id
  2422. */
  2423. MutexAutoLock lock(m_peers_mutex);
  2424. bool out_of_ids = false;
  2425. for(;;) {
  2426. // Check if exists
  2427. if (m_peers.find(peer_id_new) == m_peers.end())
  2428. break;
  2429. // Check for overflow
  2430. if (peer_id_new == overflow) {
  2431. out_of_ids = true;
  2432. break;
  2433. }
  2434. peer_id_new++;
  2435. }
  2436. if (out_of_ids) {
  2437. errorstream << getDesc() << " ran out of peer ids" << std::endl;
  2438. return PEER_ID_INEXISTENT;
  2439. }
  2440. // Create a peer
  2441. Peer *peer = 0;
  2442. peer = new UDPPeer(peer_id_new, sender, this);
  2443. m_peers[peer->id] = peer;
  2444. m_peer_ids.push_back(peer->id);
  2445. m_next_remote_peer_id = (peer_id_new +1 ) % MAX_UDP_PEERS;
  2446. LOG(dout_con << getDesc()
  2447. << "createPeer(): giving peer_id=" << peer_id_new << std::endl);
  2448. ConnectionCommand cmd;
  2449. SharedBuffer<u8> reply(4);
  2450. writeU8(&reply[0], TYPE_CONTROL);
  2451. writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
  2452. writeU16(&reply[2], peer_id_new);
  2453. cmd.createPeer(peer_id_new,reply);
  2454. putCommand(cmd);
  2455. // Create peer addition event
  2456. ConnectionEvent e;
  2457. e.peerAdded(peer_id_new, sender);
  2458. putEvent(e);
  2459. // We're now talking to a valid peer_id
  2460. return peer_id_new;
  2461. }
  2462. void Connection::PrintInfo(std::ostream &out)
  2463. {
  2464. m_info_mutex.lock();
  2465. out<<getDesc()<<": ";
  2466. m_info_mutex.unlock();
  2467. }
  2468. void Connection::PrintInfo()
  2469. {
  2470. PrintInfo(dout_con);
  2471. }
  2472. const std::string Connection::getDesc()
  2473. {
  2474. return std::string("con(")+
  2475. itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")";
  2476. }
  2477. void Connection::DisconnectPeer(u16 peer_id)
  2478. {
  2479. ConnectionCommand discon;
  2480. discon.disconnect_peer(peer_id);
  2481. putCommand(discon);
  2482. }
  2483. void Connection::sendAck(u16 peer_id, u8 channelnum, u16 seqnum)
  2484. {
  2485. assert(channelnum < CHANNEL_COUNT); // Pre-condition
  2486. LOG(dout_con<<getDesc()
  2487. <<" Queuing ACK command to peer_id: " << peer_id <<
  2488. " channel: " << (channelnum & 0xFF) <<
  2489. " seqnum: " << seqnum << std::endl);
  2490. ConnectionCommand c;
  2491. SharedBuffer<u8> ack(4);
  2492. writeU8(&ack[0], TYPE_CONTROL);
  2493. writeU8(&ack[1], CONTROLTYPE_ACK);
  2494. writeU16(&ack[2], seqnum);
  2495. c.ack(peer_id, channelnum, ack);
  2496. putCommand(c);
  2497. m_sendThread.Trigger();
  2498. }
  2499. UDPPeer* Connection::createServerPeer(Address& address)
  2500. {
  2501. if (getPeerNoEx(PEER_ID_SERVER) != 0)
  2502. {
  2503. throw ConnectionException("Already connected to a server");
  2504. }
  2505. UDPPeer *peer = new UDPPeer(PEER_ID_SERVER, address, this);
  2506. {
  2507. MutexAutoLock lock(m_peers_mutex);
  2508. m_peers[peer->id] = peer;
  2509. m_peer_ids.push_back(peer->id);
  2510. }
  2511. return peer;
  2512. }
  2513. } // namespace