123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- # Included modules
- import os
- from cStringIO import StringIO
- # Third party modules
- import gevent
- from Debug import Debug
- from Config import config
- from util import RateLimit
- from util import StreamingMsgpack
- from util import helper
- FILE_BUFF = 1024 * 512
- # Request from me
- class FileRequest(object):
- __slots__ = ("server", "connection", "req_id", "sites", "log", "responded")
- def __init__(self, server, connection):
- self.server = server
- self.connection = connection
- self.req_id = None
- self.sites = self.server.sites
- self.log = server.log
- self.responded = False # Responded to the request
- def send(self, msg, streaming=False):
- if not self.connection.closed:
- self.connection.send(msg, streaming)
- def sendRawfile(self, file, read_bytes):
- if not self.connection.closed:
- self.connection.sendRawfile(file, read_bytes)
- def response(self, msg, streaming=False):
- if self.responded:
- self.log.debug("Req id %s already responded" % self.req_id)
- return
- if not isinstance(msg, dict): # If msg not a dict create a {"body": msg}
- msg = {"body": msg}
- msg["cmd"] = "response"
- msg["to"] = self.req_id
- self.responded = True
- self.send(msg, streaming=streaming)
- # Route file requests
- def route(self, cmd, req_id, params):
- self.req_id = req_id
- if cmd == "getFile":
- self.actionGetFile(params)
- elif cmd == "streamFile":
- self.actionStreamFile(params)
- elif cmd == "update":
- event = "%s update %s %s" % (self.connection.id, params["site"], params["inner_path"])
- if not RateLimit.isAllowed(event): # There was already an update for this file in the last 10 second
- self.response({"ok": "File update queued"})
- # If called more than once within 10 sec only keep the last update
- RateLimit.callAsync(event, 10, self.actionUpdate, params)
- elif cmd == "pex":
- self.actionPex(params)
- elif cmd == "listModified":
- self.actionListModified(params)
- elif cmd == "getHashfield":
- self.actionGetHashfield(params)
- elif cmd == "ping":
- self.actionPing()
- else:
- self.actionUnknown(cmd, params)
- # Update a site file request
- def actionUpdate(self, params):
- site = self.sites.get(params["site"])
- if not site or not site.settings["serving"]: # Site unknown or not serving
- self.response({"error": "Unknown site"})
- return False
- if site.settings["own"] and params["inner_path"].endswith("content.json"):
- self.log.debug(
- "Someone trying to push a file to own site %s, reload local %s first" %
- (site.address, params["inner_path"])
- )
- changed, deleted = site.content_manager.loadContent(params["inner_path"], add_bad_files=False)
- if changed or deleted: # Content.json changed locally
- site.settings["size"] = site.content_manager.getTotalSize() # Update site size
- buff = StringIO(params["body"])
- valid = site.content_manager.verifyFile(params["inner_path"], buff)
- if valid is True: # Valid and changed
- self.log.info("Update for %s looks valid, saving..." % params["inner_path"])
- buff.seek(0)
- site.storage.write(params["inner_path"], buff)
- site.onFileDone(params["inner_path"]) # Trigger filedone
- if params["inner_path"].endswith("content.json"): # Download every changed file from peer
- peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True) # Add or get peer
- # On complete publish to other peers
- site.onComplete.once(lambda: site.publish(inner_path=params["inner_path"]), "publish_%s" % params["inner_path"])
- # Load new content file and download changed files in new thread
- gevent.spawn(
- lambda: site.downloadContent(params["inner_path"], peer=peer)
- )
- self.response({"ok": "Thanks, file %s updated!" % params["inner_path"]})
- elif valid is None: # Not changed
- peer = site.addPeer(*params["peer"], return_peer=True) # Add or get peer
- if peer:
- self.log.debug(
- "Same version, adding new peer for locked files: %s, tasks: %s" %
- (peer.key, len(site.worker_manager.tasks))
- )
- for task in site.worker_manager.tasks: # New peer add to every ongoing task
- if task["peers"]:
- # Download file from this peer too if its peer locked
- site.needFile(task["inner_path"], peer=peer, update=True, blocking=False)
- self.response({"ok": "File not changed"})
- else: # Invalid sign or sha1 hash
- self.log.debug("Update for %s is invalid" % params["inner_path"])
- self.response({"error": "File invalid"})
- # Send file content request
- def actionGetFile(self, params):
- site = self.sites.get(params["site"])
- if not site or not site.settings["serving"]: # Site unknown or not serving
- self.response({"error": "Unknown site"})
- return False
- try:
- file_path = site.storage.getPath(params["inner_path"])
- if config.debug_socket:
- self.log.debug("Opening file: %s" % file_path)
- with StreamingMsgpack.FilePart(file_path, "rb") as file:
- file.seek(params["location"])
- file.read_bytes = FILE_BUFF
- file_size = os.fstat(file.fileno()).st_size
- assert params["location"] < file_size
- back = {
- "body": file,
- "size": file_size,
- "location": min(file.tell() + FILE_BUFF, file_size)
- }
- if config.debug_socket:
- self.log.debug(
- "Sending file %s from position %s to %s" %
- (file_path, params["location"], back["location"])
- )
- self.response(back, streaming=True)
- bytes_sent = min(FILE_BUFF, file_size - params["location"]) # Number of bytes we going to send
- site.settings["bytes_sent"] = site.settings.get("bytes_sent", 0) + bytes_sent
- if config.debug_socket:
- self.log.debug("File %s at position %s sent %s bytes" % (file_path, params["location"], bytes_sent))
- # Add peer to site if not added before
- connected_peer = site.addPeer(self.connection.ip, self.connection.port)
- if connected_peer: # Just added
- connected_peer.connect(self.connection) # Assign current connection to peer
- except Exception, err:
- self.log.debug("GetFile read error: %s" % Debug.formatException(err))
- self.response({"error": "File read error: %s" % Debug.formatException(err)})
- return False
- # New-style file streaming out of Msgpack context
- def actionStreamFile(self, params):
- site = self.sites.get(params["site"])
- if not site or not site.settings["serving"]: # Site unknown or not serving
- self.response({"error": "Unknown site"})
- return False
- try:
- if config.debug_socket:
- self.log.debug("Opening file: %s" % params["inner_path"])
- with site.storage.open(params["inner_path"]) as file:
- file.seek(params["location"])
- file_size = os.fstat(file.fileno()).st_size
- stream_bytes = min(FILE_BUFF, file_size - params["location"])
- assert stream_bytes >= 0
- back = {
- "size": file_size,
- "location": min(file.tell() + FILE_BUFF, file_size),
- "stream_bytes": stream_bytes
- }
- if config.debug_socket:
- self.log.debug(
- "Sending file %s from position %s to %s" %
- (params["inner_path"], params["location"], back["location"])
- )
- self.response(back)
- self.sendRawfile(file, read_bytes=FILE_BUFF)
- site.settings["bytes_sent"] = site.settings.get("bytes_sent", 0) + stream_bytes
- if config.debug_socket:
- self.log.debug("File %s at position %s sent %s bytes" % (params["inner_path"], params["location"], stream_bytes))
- # Add peer to site if not added before
- connected_peer = site.addPeer(self.connection.ip, self.connection.port)
- if connected_peer: # Just added
- connected_peer.connect(self.connection) # Assign current connection to peer
- except Exception, err:
- self.log.debug("GetFile read error: %s" % Debug.formatException(err))
- self.response({"error": "File read error: %s" % Debug.formatException(err)})
- return False
- # Peer exchange request
- def actionPex(self, params):
- site = self.sites.get(params["site"])
- if not site or not site.settings["serving"]: # Site unknown or not serving
- self.response({"error": "Unknown site"})
- return False
- got_peer_keys = []
- added = 0
- # Add requester peer to site
- connected_peer = site.addPeer(self.connection.ip, self.connection.port)
- if connected_peer: # It was not registered before
- added += 1
- connected_peer.connect(self.connection) # Assign current connection to peer
- # Add sent peers to site
- for packed_address in params["peers"]:
- address = helper.unpackAddress(packed_address)
- got_peer_keys.append("%s:%s" % address)
- if site.addPeer(*address):
- added += 1
- # Send back peers that is not in the sent list and connectable (not port 0)
- packed_peers = [peer.packMyAddress() for peer in site.getConnectablePeers(params["need"], got_peer_keys)]
- if added:
- site.worker_manager.onPeers()
- self.log.debug("Added %s peers to %s using pex, sending back %s" % (added, site, len(packed_peers)))
- self.response({"peers": packed_peers})
- # Get modified content.json files since
- def actionListModified(self, params):
- site = self.sites.get(params["site"])
- if not site or not site.settings["serving"]: # Site unknown or not serving
- self.response({"error": "Unknown site"})
- return False
- modified_files = {
- inner_path: content["modified"]
- for inner_path, content in site.content_manager.contents.iteritems()
- if content["modified"] > params["since"]
- }
- # Add peer to site if not added before
- connected_peer = site.addPeer(self.connection.ip, self.connection.port)
- if connected_peer: # Just added
- connected_peer.connect(self.connection) # Assign current connection to peer
- self.response({"modified_files": modified_files})
- def actionGetHashfield(self, params):
- site = self.sites.get(params["site"])
- if not site or not site.settings["serving"]: # Site unknown or not serving
- self.response({"error": "Unknown site"})
- return False
- # Add peer to site if not added before
- connected_peer = site.addPeer(self.connection.ip, self.connection.port)
- if connected_peer: # Just added
- connected_peer.connect(self.connection) # Assign current connection to peer
- self.response({"hashfield_raw": site.content_manager.hashfield.tostring()})
- # Send a simple Pong! answer
- def actionPing(self):
- self.response("Pong!")
- # Unknown command
- def actionUnknown(self, cmd, params):
- self.response({"error": "Unknown command: %s" % cmd})
|