PeerLink.c 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. #include "memory/Allocator.h"
  16. #include "net/PeerLink.h"
  17. #include "util/Identity.h"
  18. #include "util/Kbps.h"
  19. #include "wire/SwitchHeader.h"
  20. #include "util/events/Time.h"
  21. #define ArrayList_TYPE struct Message
  22. #define ArrayList_NAME Messages
  23. #include "util/ArrayList.h"
  24. struct PeerLink_pvt
  25. {
  26. struct PeerLink pub;
  27. struct Allocator* alloc;
  28. struct EventBase* base;
  29. struct ArrayList_Messages* queue;
  30. struct Kbps sendBw;
  31. struct Kbps recvBw;
  32. Identity
  33. };
  34. struct Message* PeerLink_poll(struct PeerLink* peerLink)
  35. {
  36. struct PeerLink_pvt* pl = Identity_check((struct PeerLink_pvt*) peerLink);
  37. struct Message* out = ArrayList_Messages_shift(pl->queue);
  38. if (!out) { return NULL; }
  39. Allocator_disown(pl->alloc, out->alloc);
  40. Kbps_accumulate(&pl->sendBw, Time_currentTimeMilliseconds(pl->base), out->length);
  41. return out;
  42. }
  43. int PeerLink_send(struct Message* msg, struct PeerLink* peerLink)
  44. {
  45. struct PeerLink_pvt* pl = Identity_check((struct PeerLink_pvt*) peerLink);
  46. Allocator_adopt(pl->alloc, msg->alloc);
  47. ArrayList_Messages_add(pl->queue, msg);
  48. return pl->queue->length;
  49. }
  50. void PeerLink_recv(struct Message* msg, struct PeerLink* peerLink)
  51. {
  52. struct PeerLink_pvt* pl = Identity_check((struct PeerLink_pvt*) peerLink);
  53. Kbps_accumulate(&pl->recvBw, Time_currentTimeMilliseconds(pl->base), msg->length);
  54. }
  55. void PeerLink_kbps(struct PeerLink* peerLink, struct PeerLink_Kbps* output)
  56. {
  57. struct PeerLink_pvt* pl = Identity_check((struct PeerLink_pvt*) peerLink);
  58. uint32_t now = Time_currentTimeMilliseconds(pl->base);
  59. output->recvKbps = Kbps_accumulate(&pl->recvBw, now, Kbps_accumulate_NO_PACKET);
  60. output->sendKbps = Kbps_accumulate(&pl->sendBw, now, Kbps_accumulate_NO_PACKET);
  61. }
  62. struct PeerLink* PeerLink_new(struct EventBase* base, struct Allocator* alloc)
  63. {
  64. struct PeerLink_pvt* pl = Allocator_calloc(alloc, sizeof(struct PeerLink_pvt), 1);
  65. Identity_set(pl);
  66. pl->base = base;
  67. pl->alloc = alloc;
  68. pl->queue = ArrayList_Messages_new(alloc);
  69. return &pl->pub;
  70. }