BroadcastServer.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import socket
  2. import logging
  3. import time
  4. from contextlib import closing
  5. import msgpack
  6. from Debug import Debug
  7. from util import UpnpPunch
  8. class BroadcastServer(object):
  9. def __init__(self, service_name, listen_port=1544, listen_ip=''):
  10. self.log = logging.getLogger("BroadcastServer")
  11. self.listen_port = listen_port
  12. self.listen_ip = listen_ip
  13. self.running = False
  14. self.sock = None
  15. self.sender_info = {"service": service_name}
  16. def createBroadcastSocket(self):
  17. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  18. sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
  19. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  20. if hasattr(socket, 'SO_REUSEPORT'):
  21. try:
  22. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
  23. except Exception as err:
  24. self.log.warning("Error setting SO_REUSEPORT: %s" % err)
  25. binded = False
  26. for retry in range(3):
  27. try:
  28. sock.bind((self.listen_ip, self.listen_port))
  29. binded = True
  30. break
  31. except Exception as err:
  32. self.log.error(
  33. "Socket bind to %s:%s error: %s, retry #%s" %
  34. (self.listen_ip, self.listen_port, Debug.formatException(err), retry)
  35. )
  36. time.sleep(retry)
  37. if binded:
  38. return sock
  39. else:
  40. return False
  41. def start(self): # Listens for discover requests
  42. self.sock = self.createBroadcastSocket()
  43. if not self.sock:
  44. self.log.error("Unable to listen on port %s" % self.listen_port)
  45. return
  46. self.log.debug("Started on port %s" % self.listen_port)
  47. self.running = True
  48. while self.running:
  49. try:
  50. data, addr = self.sock.recvfrom(8192)
  51. except Exception as err:
  52. if self.running:
  53. self.log.error("Listener receive error: %s" % err)
  54. continue
  55. if not self.running:
  56. break
  57. try:
  58. message = msgpack.unpackb(data)
  59. response_addr, message = self.handleMessage(addr, message)
  60. if message:
  61. self.send(response_addr, message)
  62. except Exception as err:
  63. self.log.error("Handlemessage error: %s" % Debug.formatException(err))
  64. self.log.debug("Stopped listening on port %s" % self.listen_port)
  65. def stop(self):
  66. self.log.debug("Stopping, socket: %s" % self.sock)
  67. self.running = False
  68. if self.sock:
  69. self.sock.close()
  70. def send(self, addr, message):
  71. if type(message) is not list:
  72. message = [message]
  73. for message_part in message:
  74. message_part["sender"] = self.sender_info
  75. self.log.debug("Send to %s: %s" % (addr, message_part["cmd"]))
  76. with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as sock:
  77. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  78. sock.sendto(msgpack.packb(message_part), addr)
  79. def getMyIps(self):
  80. return UpnpPunch._get_local_ips()
  81. def broadcast(self, message, port=None):
  82. if not port:
  83. port = self.listen_port
  84. my_ips = self.getMyIps()
  85. addr = ("255.255.255.255", port)
  86. message["sender"] = self.sender_info
  87. self.log.debug("Broadcast using ips %s on port %s: %s" % (my_ips, port, message["cmd"]))
  88. for my_ip in my_ips:
  89. try:
  90. with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as sock:
  91. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  92. sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
  93. sock.bind((my_ip, 0))
  94. sock.sendto(msgpack.packb(message), addr)
  95. except Exception as err:
  96. self.log.warning("Error sending broadcast using ip %s: %s" % (my_ip, err))
  97. def handleMessage(self, addr, message):
  98. self.log.debug("Got from %s: %s" % (addr, message["cmd"]))
  99. cmd = message["cmd"]
  100. params = message.get("params", {})
  101. sender = message["sender"]
  102. sender["ip"] = addr[0]
  103. func_name = "action" + cmd[0].upper() + cmd[1:]
  104. func = getattr(self, func_name, None)
  105. if sender["service"] != "zeronet" or sender["peer_id"] == self.sender_info["peer_id"]:
  106. # Skip messages not for us or sent by us
  107. message = None
  108. elif func:
  109. message = func(sender, params)
  110. else:
  111. self.log.debug("Unknown cmd: %s" % cmd)
  112. message = None
  113. return (sender["ip"], sender["broadcast_port"]), message