ConnectionServer.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. import logging
  2. import time
  3. import sys
  4. import gevent
  5. import msgpack
  6. from gevent.server import StreamServer
  7. from gevent.pool import Pool
  8. from Debug import Debug
  9. from Connection import Connection
  10. from Config import config
  11. from Crypt import CryptConnection
  12. from Crypt import CryptHash
  13. from Tor import TorManager
  14. class ConnectionServer:
  15. def __init__(self, ip=None, port=None, request_handler=None):
  16. self.ip = ip
  17. self.port = port
  18. self.last_connection_id = 1 # Connection id incrementer
  19. self.log = logging.getLogger("ConnServer")
  20. self.port_opened = None
  21. if config.tor != "disabled":
  22. self.tor_manager = TorManager(self.ip, self.port)
  23. else:
  24. self.tor_manager = None
  25. self.connections = [] # Connections
  26. self.whitelist = config.ip_local # No flood protection on this ips
  27. self.ip_incoming = {} # Incoming connections from ip in the last minute to avoid connection flood
  28. self.broken_ssl_peer_ids = {} # Peerids of broken ssl connections
  29. self.ips = {} # Connection by ip
  30. self.has_internet = True # Internet outage detection
  31. self.running = True
  32. self.thread_checker = gevent.spawn(self.checkConnections)
  33. self.bytes_recv = 0
  34. self.bytes_sent = 0
  35. # Bittorrent style peerid
  36. self.peer_id = "-ZN0%s-%s" % (config.version.replace(".", ""), CryptHash.random(12, "base64"))
  37. # Check msgpack version
  38. if msgpack.version[0] == 0 and msgpack.version[1] < 4:
  39. self.log.error(
  40. "Error: Unsupported msgpack version: %s (<0.4.0), please run `sudo apt-get install python-pip; sudo pip install msgpack-python --upgrade`" %
  41. str(msgpack.version)
  42. )
  43. sys.exit(0)
  44. if port: # Listen server on a port
  45. self.pool = Pool(1000) # do not accept more than 1000 connections
  46. self.stream_server = StreamServer(
  47. (ip.replace("*", "0.0.0.0"), port), self.handleIncomingConnection, spawn=self.pool, backlog=500
  48. )
  49. if request_handler:
  50. self.handleRequest = request_handler
  51. def start(self):
  52. self.running = True
  53. CryptConnection.manager.loadCerts()
  54. self.log.debug("Binding to: %s:%s, (msgpack: %s), supported crypt: %s" % (
  55. self.ip, self.port,
  56. ".".join(map(str, msgpack.version)), CryptConnection.manager.crypt_supported)
  57. )
  58. try:
  59. self.stream_server.serve_forever() # Start normal connection server
  60. except Exception, err:
  61. self.log.info("StreamServer bind error, must be running already: %s" % err)
  62. def stop(self):
  63. self.running = False
  64. self.stream_server.stop()
  65. def handleIncomingConnection(self, sock, addr):
  66. ip, port = addr
  67. # Connection flood protection
  68. if ip in self.ip_incoming and ip not in self.whitelist:
  69. self.ip_incoming[ip] += 1
  70. if self.ip_incoming[ip] > 6: # Allow 6 in 1 minute from same ip
  71. self.log.debug("Connection flood detected from %s" % ip)
  72. time.sleep(30)
  73. sock.close()
  74. return False
  75. else:
  76. self.ip_incoming[ip] = 1
  77. connection = Connection(self, ip, port, sock)
  78. self.connections.append(connection)
  79. self.ips[ip] = connection
  80. connection.handleIncomingConnection(sock)
  81. def getConnection(self, ip=None, port=None, peer_id=None, create=True, site=None):
  82. if ip.endswith(".onion") and self.tor_manager.start_onions and site: # Site-unique connection for Tor
  83. site_onion = self.tor_manager.getOnion(site.address)
  84. key = ip + site_onion
  85. else:
  86. key = ip
  87. # Find connection by ip
  88. if key in self.ips:
  89. connection = self.ips[key]
  90. if not peer_id or connection.handshake.get("peer_id") == peer_id: # Filter by peer_id
  91. if not connection.connected and create:
  92. succ = connection.event_connected.get() # Wait for connection
  93. if not succ:
  94. raise Exception("Connection event return error")
  95. return connection
  96. # Recover from connection pool
  97. for connection in self.connections:
  98. if connection.ip == ip:
  99. if peer_id and connection.handshake.get("peer_id") != peer_id: # Does not match
  100. continue
  101. if ip.endswith(".onion") and self.tor_manager.start_onions and ip.replace(".onion", "") != connection.target_onion:
  102. # For different site
  103. continue
  104. if not connection.connected and create:
  105. succ = connection.event_connected.get() # Wait for connection
  106. if not succ:
  107. raise Exception("Connection event return error")
  108. return connection
  109. # No connection found
  110. if create: # Allow to create new connection if not found
  111. if port == 0:
  112. raise Exception("This peer is not connectable")
  113. try:
  114. if ip.endswith(".onion") and self.tor_manager.start_onions and site: # Lock connection to site
  115. connection = Connection(self, ip, port, target_onion=site_onion)
  116. else:
  117. connection = Connection(self, ip, port)
  118. self.ips[key] = connection
  119. self.connections.append(connection)
  120. succ = connection.connect()
  121. if not succ:
  122. connection.close("Connection event return error")
  123. raise Exception("Connection event return error")
  124. except Exception, err:
  125. connection.close("%s Connect error: %s" % (ip, Debug.formatException(err)))
  126. raise err
  127. return connection
  128. else:
  129. return None
  130. def removeConnection(self, connection):
  131. # Delete if same as in registry
  132. if self.ips.get(connection.ip) == connection:
  133. del self.ips[connection.ip]
  134. # Site locked connection
  135. if connection.target_onion:
  136. if self.ips.get(connection.ip + connection.target_onion) == connection:
  137. del self.ips[connection.ip + connection.target_onion]
  138. # Cert pinned connection
  139. if connection.cert_pin and self.ips.get(connection.ip + "#" + connection.cert_pin) == connection:
  140. del self.ips[connection.ip + "#" + connection.cert_pin]
  141. if connection in self.connections:
  142. self.connections.remove(connection)
  143. def checkConnections(self):
  144. run_i = 0
  145. while self.running:
  146. run_i += 1
  147. time.sleep(60) # Check every minute
  148. self.ip_incoming = {} # Reset connected ips counter
  149. self.broken_ssl_peer_ids = {} # Reset broken ssl peerids count
  150. last_message_time = 0
  151. for connection in self.connections[:]: # Make a copy
  152. idle = time.time() - max(connection.last_recv_time, connection.start_time, connection.last_message_time)
  153. last_message_time = max(last_message_time, connection.last_message_time)
  154. if connection.unpacker and idle > 30:
  155. # Delete the unpacker if not needed
  156. del connection.unpacker
  157. connection.unpacker = None
  158. elif connection.last_cmd == "announce" and idle > 20: # Bootstrapper connection close after 20 sec
  159. connection.close("[Cleanup] Tracker connection: %s" % idle)
  160. if idle > 60 * 60:
  161. # Wake up after 1h
  162. connection.close("[Cleanup] After wakeup, idle: %s" % idle)
  163. elif idle > 20 * 60 and connection.last_send_time < time.time() - 10:
  164. # Idle more than 20 min and we have not sent request in last 10 sec
  165. if not connection.ping():
  166. connection.close("[Cleanup] Ping timeout")
  167. elif idle > 10 and connection.incomplete_buff_recv > 0:
  168. # Incomplete data with more than 10 sec idle
  169. connection.close("[Cleanup] Connection buff stalled")
  170. elif idle > 10 and connection.waiting_requests and time.time() - connection.last_send_time > 20:
  171. # Sent command and no response in 10 sec
  172. connection.close(
  173. "[Cleanup] Command %s timeout: %.3fs" % (connection.last_cmd, time.time() - connection.last_send_time)
  174. )
  175. elif idle > 30 and connection.protocol == "?": # No connection after 30 sec
  176. connection.close(
  177. "[Cleanup] Connect timeout: %.3fs" % idle
  178. )
  179. elif idle < 60 and connection.bad_actions > 40:
  180. connection.close(
  181. "[Cleanup] Too many bad actions: %s" % connection.bad_actions
  182. )
  183. elif idle > 5*60 and connection.sites == 0:
  184. connection.close(
  185. "[Cleanup] No site for connection"
  186. )
  187. elif run_i % 30 == 0:
  188. # Reset bad action counter every 30 min
  189. connection.bad_actions = 0
  190. # Internet outage detection
  191. if time.time() - last_message_time > max(60, 60*10/max(1,float(len(self.connections))/50)):
  192. # Offline: Last message more than 60-600sec depending on connection number
  193. if self.has_internet:
  194. self.has_internet = False
  195. self.onInternetOffline()
  196. else:
  197. # Online
  198. if not self.has_internet:
  199. self.has_internet = True
  200. self.onInternetOnline()
  201. def onInternetOnline(self):
  202. self.log.info("Internet online")
  203. def onInternetOffline(self):
  204. self.log.info("Internet offline")