Peer.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. import logging
  2. import time
  3. import array
  4. import gevent
  5. from cStringIO import StringIO
  6. from Debug import Debug
  7. from Config import config
  8. from util import helper
  9. from PeerHashfield import PeerHashfield
  10. if config.use_tempfiles:
  11. import tempfile
  12. # Communicate remote peers
  13. class Peer(object):
  14. __slots__ = (
  15. "ip", "port", "site", "key", "connection", "last_found", "last_response", "last_ping", "last_hashfield",
  16. "hashfield", "added", "connection_error", "hash_failed", "download_bytes", "download_time"
  17. )
  18. def __init__(self, ip, port, site=None):
  19. self.ip = ip
  20. self.port = port
  21. self.site = site
  22. self.key = "%s:%s" % (ip, port)
  23. self.connection = None
  24. self.hashfield = PeerHashfield() # Got optional files hash_id
  25. self.last_hashfield = 0 # Last time hashfiled downloaded
  26. self.last_found = time.time() # Time of last found in the torrent tracker
  27. self.last_response = None # Time of last successful response from peer
  28. self.last_ping = None # Last response time for ping
  29. self.added = time.time()
  30. self.connection_error = 0 # Series of connection error
  31. self.hash_failed = 0 # Number of bad files from peer
  32. self.download_bytes = 0 # Bytes downloaded
  33. self.download_time = 0 # Time spent to download
  34. def log(self, text):
  35. if self.site:
  36. self.site.log.debug("%s:%s %s" % (self.ip, self.port, text))
  37. else:
  38. logging.debug("%s:%s %s" % (self.ip, self.port, text))
  39. # Connect to host
  40. def connect(self, connection=None):
  41. if self.connection:
  42. self.log("Getting connection (Closing %s)..." % self.connection)
  43. self.connection.close()
  44. else:
  45. self.log("Getting connection...")
  46. if connection: # Connection specified
  47. self.connection = connection
  48. else: # Try to find from connection pool or create new connection
  49. self.connection = None
  50. try:
  51. self.connection = self.site.connection_server.getConnection(self.ip, self.port)
  52. except Exception, err:
  53. self.onConnectionError()
  54. self.log("Getting connection error: %s (connection_error: %s, hash_failed: %s)" %
  55. (Debug.formatException(err), self.connection_error, self.hash_failed))
  56. self.connection = None
  57. # Check if we have connection to peer
  58. def findConnection(self):
  59. if self.connection and self.connection.connected: # We have connection to peer
  60. return self.connection
  61. else: # Try to find from other sites connections
  62. self.connection = self.site.connection_server.getConnection(self.ip, self.port, create=False)
  63. return self.connection
  64. def __str__(self):
  65. return "Peer:%-12s" % self.ip
  66. def __repr__(self):
  67. return "<%s>" % self.__str__()
  68. def packMyAddress(self):
  69. return helper.packAddress(self.ip, self.port)
  70. # Found a peer on tracker
  71. def found(self):
  72. self.last_found = time.time()
  73. # Send a command to peer
  74. def request(self, cmd, params={}, stream_to=None):
  75. if not self.connection or self.connection.closed:
  76. self.connect()
  77. if not self.connection:
  78. self.onConnectionError()
  79. return None # Connection failed
  80. for retry in range(1, 3): # Retry 3 times
  81. try:
  82. response = self.connection.request(cmd, params, stream_to)
  83. if not response:
  84. raise Exception("Send error")
  85. if "error" in response:
  86. self.log("%s error: %s" % (cmd, response["error"]))
  87. self.onConnectionError()
  88. else: # Successful request, reset connection error num
  89. self.connection_error = 0
  90. self.last_response = time.time()
  91. return response
  92. except Exception, err:
  93. if type(err).__name__ == "Notify": # Greenlet killed by worker
  94. self.log("Peer worker got killed: %s, aborting cmd: %s" % (err.message, cmd))
  95. break
  96. else:
  97. self.onConnectionError()
  98. self.log(
  99. "%s (connection_error: %s, hash_failed: %s, retry: %s)" %
  100. (Debug.formatException(err), self.connection_error, self.hash_failed, retry)
  101. )
  102. time.sleep(1 * retry)
  103. self.connect()
  104. return None # Failed after 4 retry
  105. # Get a file content from peer
  106. def getFile(self, site, inner_path):
  107. # Use streamFile if client supports it
  108. if config.stream_downloads and self.connection and self.connection.handshake and self.connection.handshake["rev"] > 310:
  109. return self.streamFile(site, inner_path)
  110. location = 0
  111. if config.use_tempfiles:
  112. buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b')
  113. else:
  114. buff = StringIO()
  115. s = time.time()
  116. while True: # Read in 512k parts
  117. back = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location})
  118. if not back or "body" not in back: # Error
  119. return False
  120. buff.write(back["body"])
  121. back["body"] = None # Save memory
  122. if back["location"] == back["size"]: # End of file
  123. break
  124. else:
  125. location = back["location"]
  126. self.download_bytes += back["location"]
  127. self.download_time += (time.time() - s)
  128. self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + back["location"]
  129. buff.seek(0)
  130. return buff
  131. # Download file out of msgpack context to save memory and cpu
  132. def streamFile(self, site, inner_path):
  133. location = 0
  134. if config.use_tempfiles:
  135. buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b')
  136. else:
  137. buff = StringIO()
  138. s = time.time()
  139. while True: # Read in 512k parts
  140. back = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location}, stream_to=buff)
  141. if not back: # Error
  142. self.log("Invalid response: %s" % back)
  143. return False
  144. if back["location"] == back["size"]: # End of file
  145. break
  146. else:
  147. location = back["location"]
  148. self.download_bytes += back["location"]
  149. self.download_time += (time.time() - s)
  150. self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + back["location"]
  151. buff.seek(0)
  152. return buff
  153. # Send a ping request
  154. def ping(self):
  155. response_time = None
  156. for retry in range(1, 3): # Retry 3 times
  157. s = time.time()
  158. with gevent.Timeout(10.0, False): # 10 sec timeout, don't raise exception
  159. response = self.request("ping")
  160. if response and "body" in response and response["body"] == "Pong!":
  161. response_time = time.time() - s
  162. break # All fine, exit from for loop
  163. # Timeout reached or bad response
  164. self.onConnectionError()
  165. self.connect()
  166. time.sleep(1)
  167. if response_time:
  168. self.log("Ping: %.3f" % response_time)
  169. else:
  170. self.log("Ping failed")
  171. self.last_ping = response_time
  172. return response_time
  173. # Request peer exchange from peer
  174. def pex(self, site=None, need_num=5):
  175. if not site:
  176. site = self.site # If no site defined request peers for this site
  177. # give him/her 5 connectible peers
  178. packed_peers = [peer.packMyAddress() for peer in self.site.getConnectablePeers(5)]
  179. response = self.request("pex", {"site": site.address, "peers": packed_peers, "need": need_num})
  180. if not response or "error" in response:
  181. return False
  182. added = 0
  183. for peer in response.get("peers", []):
  184. address = helper.unpackAddress(peer)
  185. if site.addPeer(*address):
  186. added += 1
  187. if added:
  188. self.log("Added peers using pex: %s" % added)
  189. return added
  190. # Request optional files hashfield from peer
  191. def getHashfield(self):
  192. self.last_hashfield = time.time()
  193. res = self.request("getHashfield", {"site": self.site.address})
  194. if res and "error" not in res:
  195. self.hashfield.replaceFromString(res["hashfield_raw"])
  196. return self.hashfield
  197. else:
  198. return False
  199. # List modified files since the date
  200. # Return: {inner_path: modification date,...}
  201. def listModified(self, since):
  202. return self.request("listModified", {"since": since, "site": self.site.address})
  203. # Stop and remove from site
  204. def remove(self):
  205. self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))
  206. if self.site and self.key in self.site.peers:
  207. del(self.site.peers[self.key])
  208. if self.connection:
  209. self.connection.close()
  210. # - HASHFIELD -
  211. def updateHashfield(self, force=False):
  212. # Don't update hashfield again in 15 min
  213. if self.last_hashfield and time.time() - self.last_hashfield > 60 * 15 and not force:
  214. return False
  215. response = self.request("getHashfield", {"site": self.site.address})
  216. if not response or "error" in response:
  217. return False
  218. self.last_hashfield = time.time()
  219. self.hashfield = response["hashfield"]
  220. return self.hashfield
  221. def setHashfield(self, hashfield_dump):
  222. self.hashfield.fromstring(hashfield_dump)
  223. def hasHash(self, hash_id):
  224. return hash_id in self.hashfield
  225. # Return: ["ip:port", "ip:port",...]
  226. def findHash(self, hash_id):
  227. response = self.request("findHash", {"site": self.site.address, "hash_id": hash_id})
  228. if not response or "error" in response:
  229. return False
  230. return [helper.unpackAddress(peer) for peer in response["peers"]]
  231. # - EVENTS -
  232. # On connection error
  233. def onConnectionError(self):
  234. self.connection_error += 1
  235. if self.connection_error >= 3: # Dead peer
  236. self.remove()
  237. # Done working with peer
  238. def onWorkerDone(self):
  239. pass