FileRequest.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. # Included modules
  2. import os
  3. from cStringIO import StringIO
  4. # Third party modules
  5. import gevent
  6. from Debug import Debug
  7. from Config import config
  8. from util import RateLimit
  9. from util import StreamingMsgpack
  10. from util import helper
  11. FILE_BUFF = 1024 * 512
  12. # Request from me
  13. class FileRequest(object):
  14. __slots__ = ("server", "connection", "req_id", "sites", "log", "responded")
  15. def __init__(self, server, connection):
  16. self.server = server
  17. self.connection = connection
  18. self.req_id = None
  19. self.sites = self.server.sites
  20. self.log = server.log
  21. self.responded = False # Responded to the request
  22. def send(self, msg, streaming=False):
  23. if not self.connection.closed:
  24. self.connection.send(msg, streaming)
  25. def sendRawfile(self, file, read_bytes):
  26. if not self.connection.closed:
  27. self.connection.sendRawfile(file, read_bytes)
  28. def response(self, msg, streaming=False):
  29. if self.responded:
  30. self.log.debug("Req id %s already responded" % self.req_id)
  31. return
  32. if not isinstance(msg, dict): # If msg not a dict create a {"body": msg}
  33. msg = {"body": msg}
  34. msg["cmd"] = "response"
  35. msg["to"] = self.req_id
  36. self.responded = True
  37. self.send(msg, streaming=streaming)
  38. # Route file requests
  39. def route(self, cmd, req_id, params):
  40. self.req_id = req_id
  41. if cmd == "getFile":
  42. self.actionGetFile(params)
  43. elif cmd == "streamFile":
  44. self.actionStreamFile(params)
  45. elif cmd == "update":
  46. event = "%s update %s %s" % (self.connection.id, params["site"], params["inner_path"])
  47. if not RateLimit.isAllowed(event): # There was already an update for this file in the last 10 second
  48. self.response({"ok": "File update queued"})
  49. # If called more than once within 10 sec only keep the last update
  50. RateLimit.callAsync(event, 10, self.actionUpdate, params)
  51. elif cmd == "pex":
  52. self.actionPex(params)
  53. elif cmd == "listModified":
  54. self.actionListModified(params)
  55. elif cmd == "getHashfield":
  56. self.actionGetHashfield(params)
  57. elif cmd == "ping":
  58. self.actionPing()
  59. else:
  60. self.actionUnknown(cmd, params)
  61. # Update a site file request
  62. def actionUpdate(self, params):
  63. site = self.sites.get(params["site"])
  64. if not site or not site.settings["serving"]: # Site unknown or not serving
  65. self.response({"error": "Unknown site"})
  66. return False
  67. if site.settings["own"] and params["inner_path"].endswith("content.json"):
  68. self.log.debug(
  69. "Someone trying to push a file to own site %s, reload local %s first" %
  70. (site.address, params["inner_path"])
  71. )
  72. changed, deleted = site.content_manager.loadContent(params["inner_path"], add_bad_files=False)
  73. if changed or deleted: # Content.json changed locally
  74. site.settings["size"] = site.content_manager.getTotalSize() # Update site size
  75. buff = StringIO(params["body"])
  76. valid = site.content_manager.verifyFile(params["inner_path"], buff)
  77. if valid is True: # Valid and changed
  78. self.log.info("Update for %s looks valid, saving..." % params["inner_path"])
  79. buff.seek(0)
  80. site.storage.write(params["inner_path"], buff)
  81. site.onFileDone(params["inner_path"]) # Trigger filedone
  82. if params["inner_path"].endswith("content.json"): # Download every changed file from peer
  83. peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True) # Add or get peer
  84. # On complete publish to other peers
  85. site.onComplete.once(lambda: site.publish(inner_path=params["inner_path"]), "publish_%s" % params["inner_path"])
  86. # Load new content file and download changed files in new thread
  87. gevent.spawn(
  88. lambda: site.downloadContent(params["inner_path"], peer=peer)
  89. )
  90. self.response({"ok": "Thanks, file %s updated!" % params["inner_path"]})
  91. elif valid is None: # Not changed
  92. peer = site.addPeer(*params["peer"], return_peer=True) # Add or get peer
  93. if peer:
  94. self.log.debug(
  95. "Same version, adding new peer for locked files: %s, tasks: %s" %
  96. (peer.key, len(site.worker_manager.tasks))
  97. )
  98. for task in site.worker_manager.tasks: # New peer add to every ongoing task
  99. if task["peers"]:
  100. # Download file from this peer too if its peer locked
  101. site.needFile(task["inner_path"], peer=peer, update=True, blocking=False)
  102. self.response({"ok": "File not changed"})
  103. else: # Invalid sign or sha1 hash
  104. self.log.debug("Update for %s is invalid" % params["inner_path"])
  105. self.response({"error": "File invalid"})
  106. # Send file content request
  107. def actionGetFile(self, params):
  108. site = self.sites.get(params["site"])
  109. if not site or not site.settings["serving"]: # Site unknown or not serving
  110. self.response({"error": "Unknown site"})
  111. return False
  112. try:
  113. file_path = site.storage.getPath(params["inner_path"])
  114. if config.debug_socket:
  115. self.log.debug("Opening file: %s" % file_path)
  116. with StreamingMsgpack.FilePart(file_path, "rb") as file:
  117. file.seek(params["location"])
  118. file.read_bytes = FILE_BUFF
  119. file_size = os.fstat(file.fileno()).st_size
  120. assert params["location"] < file_size
  121. back = {
  122. "body": file,
  123. "size": file_size,
  124. "location": min(file.tell() + FILE_BUFF, file_size)
  125. }
  126. if config.debug_socket:
  127. self.log.debug(
  128. "Sending file %s from position %s to %s" %
  129. (file_path, params["location"], back["location"])
  130. )
  131. self.response(back, streaming=True)
  132. bytes_sent = min(FILE_BUFF, file_size - params["location"]) # Number of bytes we going to send
  133. site.settings["bytes_sent"] = site.settings.get("bytes_sent", 0) + bytes_sent
  134. if config.debug_socket:
  135. self.log.debug("File %s at position %s sent %s bytes" % (file_path, params["location"], bytes_sent))
  136. # Add peer to site if not added before
  137. connected_peer = site.addPeer(self.connection.ip, self.connection.port)
  138. if connected_peer: # Just added
  139. connected_peer.connect(self.connection) # Assign current connection to peer
  140. except Exception, err:
  141. self.log.debug("GetFile read error: %s" % Debug.formatException(err))
  142. self.response({"error": "File read error: %s" % Debug.formatException(err)})
  143. return False
  144. # New-style file streaming out of Msgpack context
  145. def actionStreamFile(self, params):
  146. site = self.sites.get(params["site"])
  147. if not site or not site.settings["serving"]: # Site unknown or not serving
  148. self.response({"error": "Unknown site"})
  149. return False
  150. try:
  151. if config.debug_socket:
  152. self.log.debug("Opening file: %s" % params["inner_path"])
  153. with site.storage.open(params["inner_path"]) as file:
  154. file.seek(params["location"])
  155. file_size = os.fstat(file.fileno()).st_size
  156. stream_bytes = min(FILE_BUFF, file_size - params["location"])
  157. assert stream_bytes >= 0
  158. back = {
  159. "size": file_size,
  160. "location": min(file.tell() + FILE_BUFF, file_size),
  161. "stream_bytes": stream_bytes
  162. }
  163. if config.debug_socket:
  164. self.log.debug(
  165. "Sending file %s from position %s to %s" %
  166. (params["inner_path"], params["location"], back["location"])
  167. )
  168. self.response(back)
  169. self.sendRawfile(file, read_bytes=FILE_BUFF)
  170. site.settings["bytes_sent"] = site.settings.get("bytes_sent", 0) + stream_bytes
  171. if config.debug_socket:
  172. self.log.debug("File %s at position %s sent %s bytes" % (params["inner_path"], params["location"], stream_bytes))
  173. # Add peer to site if not added before
  174. connected_peer = site.addPeer(self.connection.ip, self.connection.port)
  175. if connected_peer: # Just added
  176. connected_peer.connect(self.connection) # Assign current connection to peer
  177. except Exception, err:
  178. self.log.debug("GetFile read error: %s" % Debug.formatException(err))
  179. self.response({"error": "File read error: %s" % Debug.formatException(err)})
  180. return False
  181. # Peer exchange request
  182. def actionPex(self, params):
  183. site = self.sites.get(params["site"])
  184. if not site or not site.settings["serving"]: # Site unknown or not serving
  185. self.response({"error": "Unknown site"})
  186. return False
  187. got_peer_keys = []
  188. added = 0
  189. # Add requester peer to site
  190. connected_peer = site.addPeer(self.connection.ip, self.connection.port)
  191. if connected_peer: # It was not registered before
  192. added += 1
  193. connected_peer.connect(self.connection) # Assign current connection to peer
  194. # Add sent peers to site
  195. for packed_address in params["peers"]:
  196. address = helper.unpackAddress(packed_address)
  197. got_peer_keys.append("%s:%s" % address)
  198. if site.addPeer(*address):
  199. added += 1
  200. # Send back peers that is not in the sent list and connectable (not port 0)
  201. packed_peers = [peer.packMyAddress() for peer in site.getConnectablePeers(params["need"], got_peer_keys)]
  202. if added:
  203. site.worker_manager.onPeers()
  204. self.log.debug("Added %s peers to %s using pex, sending back %s" % (added, site, len(packed_peers)))
  205. self.response({"peers": packed_peers})
  206. # Get modified content.json files since
  207. def actionListModified(self, params):
  208. site = self.sites.get(params["site"])
  209. if not site or not site.settings["serving"]: # Site unknown or not serving
  210. self.response({"error": "Unknown site"})
  211. return False
  212. modified_files = {
  213. inner_path: content["modified"]
  214. for inner_path, content in site.content_manager.contents.iteritems()
  215. if content["modified"] > params["since"]
  216. }
  217. # Add peer to site if not added before
  218. connected_peer = site.addPeer(self.connection.ip, self.connection.port)
  219. if connected_peer: # Just added
  220. connected_peer.connect(self.connection) # Assign current connection to peer
  221. self.response({"modified_files": modified_files})
  222. def actionGetHashfield(self, params):
  223. site = self.sites.get(params["site"])
  224. if not site or not site.settings["serving"]: # Site unknown or not serving
  225. self.response({"error": "Unknown site"})
  226. return False
  227. # Add peer to site if not added before
  228. connected_peer = site.addPeer(self.connection.ip, self.connection.port)
  229. if connected_peer: # Just added
  230. connected_peer.connect(self.connection) # Assign current connection to peer
  231. self.response({"hashfield_raw": site.content_manager.hashfield.tostring()})
  232. # Send a simple Pong! answer
  233. def actionPing(self):
  234. self.response("Pong!")
  235. # Unknown command
  236. def actionUnknown(self, cmd, params):
  237. self.response({"error": "Unknown command: %s" % cmd})