Site.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773
  1. import os
  2. import json
  3. import logging
  4. import hashlib
  5. import re
  6. import time
  7. import random
  8. import sys
  9. import binascii
  10. import struct
  11. import socket
  12. import urllib
  13. import urllib2
  14. import gevent
  15. import util
  16. from lib import bencode
  17. from lib.subtl.subtl import UdpTrackerClient
  18. from Config import config
  19. from Peer import Peer
  20. from Worker import WorkerManager
  21. from Debug import Debug
  22. from Content import ContentManager
  23. from SiteStorage import SiteStorage
  24. from Crypt import CryptHash
  25. from util import helper
  26. import SiteManager
  27. class Site:
  28. def __init__(self, address, allow_create=True):
  29. self.address = re.sub("[^A-Za-z0-9]", "", address) # Make sure its correct address
  30. self.address_short = "%s..%s" % (self.address[:6], self.address[-4:]) # Short address for logging
  31. self.log = logging.getLogger("Site:%s" % self.address_short)
  32. self.addEventListeners()
  33. self.content = None # Load content.json
  34. self.peers = {} # Key: ip:port, Value: Peer.Peer
  35. self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself)
  36. self.last_announce = 0 # Last announce time to tracker
  37. self.last_tracker_id = random.randint(0, 10) # Last announced tracker id
  38. self.worker_manager = WorkerManager(self) # Handle site download from other peers
  39. self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept)
  40. self.content_updated = None # Content.js update time
  41. self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout]
  42. self.page_requested = False # Page viewed in browser
  43. self.storage = SiteStorage(self, allow_create=allow_create) # Save and load site files
  44. self.loadSettings() # Load settings from sites.json
  45. self.content_manager = ContentManager(self) # Load contents
  46. self.connection_server = None
  47. if "main" in sys.modules and "file_server" in dir(sys.modules["main"]): # Use global file server by default if possible
  48. self.connection_server = sys.modules["main"].file_server
  49. else:
  50. self.connection_server = None
  51. if not self.settings.get("auth_key"): # To auth user in site (Obsolete, will be removed)
  52. self.settings["auth_key"] = CryptHash.random()
  53. self.log.debug("New auth key: %s" % self.settings["auth_key"])
  54. self.saveSettings()
  55. if not self.settings.get("wrapper_key"): # To auth websocket permissions
  56. self.settings["wrapper_key"] = CryptHash.random()
  57. self.log.debug("New wrapper key: %s" % self.settings["wrapper_key"])
  58. self.saveSettings()
  59. self.websockets = [] # Active site websocket connections
  60. def __str__(self):
  61. return "Site %s" % self.address_short
  62. def __repr__(self):
  63. return "<%s>" % self.__str__()
  64. # Load site settings from data/sites.json
  65. def loadSettings(self):
  66. sites_settings = json.load(open("%s/sites.json" % config.data_dir))
  67. if self.address in sites_settings:
  68. self.settings = sites_settings[self.address]
  69. else:
  70. if self.address == config.homepage: # Add admin permissions to homepage
  71. permissions = ["ADMIN"]
  72. else:
  73. permissions = []
  74. self.settings = {"own": False, "serving": True, "permissions": permissions} # Default
  75. return
  76. # Save site settings to data/sites.json
  77. def saveSettings(self):
  78. sites_settings = json.load(open("%s/sites.json" % config.data_dir))
  79. sites_settings[self.address] = self.settings
  80. helper.atomicWrite("%s/sites.json" % config.data_dir, json.dumps(sites_settings, indent=2, sort_keys=True))
  81. # Max site size in MB
  82. def getSizeLimit(self):
  83. return self.settings.get("size_limit", config.size_limit)
  84. # Next size limit based on current size
  85. def getNextSizeLimit(self):
  86. size_limits = [10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000, 100000]
  87. size = self.settings.get("size", 0)
  88. for size_limit in size_limits:
  89. if size * 1.2 < size_limit * 1024 * 1024:
  90. return size_limit
  91. return 999999
  92. # Download all file from content.json
  93. def downloadContent(self, inner_path, download_files=True, peer=None, check_modifications=False):
  94. s = time.time()
  95. self.log.debug("Downloading %s..." % inner_path)
  96. found = self.needFile(inner_path, update=self.bad_files.get(inner_path))
  97. content_inner_dir = helper.getDirname(inner_path)
  98. if not found:
  99. self.log.debug("Download %s failed, check_modifications: %s" % (inner_path, check_modifications))
  100. if check_modifications: # Download failed, but check modifications if its succed later
  101. self.onFileDone.once(lambda file_name: self.checkModifications(0), "check_modifications")
  102. return False # Could not download content.json
  103. self.log.debug("Got %s" % inner_path)
  104. changed, deleted = self.content_manager.loadContent(inner_path, load_includes=False)
  105. # Start download files
  106. file_threads = []
  107. if download_files:
  108. for file_relative_path in self.content_manager.contents[inner_path].get("files", {}).keys():
  109. file_inner_path = content_inner_dir + file_relative_path
  110. # Start download and dont wait for finish, return the event
  111. res = self.needFile(file_inner_path, blocking=False, update=self.bad_files.get(file_inner_path), peer=peer)
  112. if res is not True and res is not False: # Need downloading and file is allowed
  113. file_threads.append(res) # Append evt
  114. # Wait for includes download
  115. include_threads = []
  116. for file_relative_path in self.content_manager.contents[inner_path].get("includes", {}).keys():
  117. file_inner_path = content_inner_dir + file_relative_path
  118. include_thread = gevent.spawn(self.downloadContent, file_inner_path, download_files=download_files, peer=peer)
  119. include_threads.append(include_thread)
  120. self.log.debug("%s: Downloading %s includes..." % (inner_path, len(include_threads)))
  121. gevent.joinall(include_threads)
  122. self.log.debug("%s: Includes download ended" % inner_path)
  123. if check_modifications: # Check if every file is up-to-date
  124. self.checkModifications(0)
  125. self.log.debug("%s: Downloading %s files, changed: %s..." % (inner_path, len(file_threads), len(changed)))
  126. gevent.joinall(file_threads)
  127. self.log.debug("%s: DownloadContent ended in %.2fs" % (inner_path, time.time() - s))
  128. return True
  129. # Return bad files with less than 3 retry
  130. def getReachableBadFiles(self):
  131. if not self.bad_files:
  132. return False
  133. return [bad_file for bad_file, retry in self.bad_files.iteritems() if retry < 3]
  134. # Retry download bad files
  135. def retryBadFiles(self):
  136. for bad_file in self.bad_files.keys():
  137. self.needFile(bad_file, update=True, blocking=False)
  138. # Download all files of the site
  139. @util.Noparallel(blocking=False)
  140. def download(self, check_size=False, blind_includes=False):
  141. self.log.debug(
  142. "Start downloading, bad_files: %s, check_size: %s, blind_includes: %s" %
  143. (self.bad_files, check_size, blind_includes)
  144. )
  145. gevent.spawn(self.announce)
  146. if check_size: # Check the size first
  147. valid = self.downloadContent("content.json", download_files=False) # Just download content.json files
  148. if not valid:
  149. return False # Cant download content.jsons or size is not fits
  150. # Download everything
  151. valid = self.downloadContent("content.json", check_modifications=blind_includes)
  152. self.retryBadFiles()
  153. return valid
  154. # Update worker, try to find client that supports listModifications command
  155. def updater(self, peers_try, queried, since):
  156. while 1:
  157. if not peers_try or len(queried) >= 3: # Stop after 3 successful query
  158. break
  159. peer = peers_try.pop(0)
  160. if not peer.connection and len(queried) < 2:
  161. peer.connect() # Only open new connection if less than 2 queried already
  162. if not peer.connection or peer.connection.handshake.get("rev", 0) < 126:
  163. continue # Not compatible
  164. res = peer.listModified(since)
  165. if not res or "modified_files" not in res:
  166. continue # Failed query
  167. queried.append(peer)
  168. for inner_path, modified in res["modified_files"].iteritems(): # Check if the peer has newer files than we
  169. content = self.content_manager.contents.get(inner_path)
  170. if (not content or modified > content["modified"]) and inner_path not in self.bad_files:
  171. self.log.debug("New modified file from %s: %s" % (peer, inner_path))
  172. # We dont have this file or we have older
  173. self.bad_files[inner_path] = self.bad_files.get(inner_path, 0) + 1 # Mark as bad file
  174. gevent.spawn(self.downloadContent, inner_path) # Download the content.json + the changed files
  175. # Check modified content.json files from peers and add modified files to bad_files
  176. # Return: Successfully queried peers [Peer, Peer...]
  177. def checkModifications(self, since=None):
  178. peers_try = [] # Try these peers
  179. queried = [] # Successfully queried from these peers
  180. peers = self.peers.values()
  181. random.shuffle(peers)
  182. for peer in peers: # Try to find connected good peers, but we must have at least 5 peers
  183. if peer.findConnection() and peer.connection.handshake.get("rev", 0) > 125: # Add to the beginning if rev125
  184. peers_try.insert(0, peer)
  185. elif len(peers_try) < 5: # Backup peers, add to end of the try list
  186. peers_try.append(peer)
  187. if since is None: # No since definied, download from last modification time-1day
  188. since = self.settings.get("modified", 60 * 60 * 24) - 60 * 60 * 24
  189. self.log.debug("Try to get listModifications from peers: %s since: %s" % (peers_try, since))
  190. updaters = []
  191. for i in range(3):
  192. updaters.append(gevent.spawn(self.updater, peers_try, queried, since))
  193. gevent.joinall(updaters, timeout=10) # Wait 10 sec to workers done query modifications
  194. time.sleep(0.1)
  195. self.log.debug("Queried listModifications from: %s" % queried)
  196. return queried
  197. # Update content.json from peers and download changed files
  198. # Return: None
  199. @util.Noparallel()
  200. def update(self, announce=False):
  201. self.content_manager.loadContent("content.json") # Reload content.json
  202. self.content_updated = None # Reset content updated time
  203. self.updateWebsocket(updating=True)
  204. if announce:
  205. self.announce()
  206. queried = self.checkModifications()
  207. if not queried: # Not found any client that supports listModifications
  208. self.log.debug("Fallback to old-style update")
  209. self.redownloadContents()
  210. if not self.settings["own"]:
  211. self.storage.checkFiles(quick_check=True) # Quick check files based on file size
  212. changed, deleted = self.content_manager.loadContent("content.json")
  213. if self.bad_files:
  214. self.download()
  215. self.settings["size"] = self.content_manager.getTotalSize() # Update site size
  216. self.updateWebsocket(updated=True)
  217. # Update site by redownload all content.json
  218. def redownloadContents(self):
  219. # Download all content.json again
  220. content_threads = []
  221. for inner_path in self.content_manager.contents.keys():
  222. content_threads.append(self.needFile(inner_path, update=True, blocking=False))
  223. self.log.debug("Waiting %s content.json to finish..." % len(content_threads))
  224. gevent.joinall(content_threads)
  225. # Publish worker
  226. def publisher(self, inner_path, peers, published, limit, event_done=None):
  227. file_size = self.storage.getSize(inner_path)
  228. body = self.storage.read(inner_path)
  229. while 1:
  230. if not peers or len(published) >= limit:
  231. if event_done:
  232. event_done.set(True)
  233. break # All peers done, or published engouht
  234. peer = peers.pop(0)
  235. if peer.connection and peer.connection.last_ping_delay: # Peer connected
  236. # Timeout: 5sec + size in kb + last_ping
  237. timeout = timeout = 5 + int(file_size / 1024) + peer.connection.last_ping_delay
  238. else: # Peer not connected
  239. # Timeout: 5sec + size in kb
  240. timeout = timeout = 5 + int(file_size / 1024)
  241. result = {"exception": "Timeout"}
  242. for retry in range(2):
  243. try:
  244. with gevent.Timeout(timeout, False):
  245. result = peer.request("update", {
  246. "site": self.address,
  247. "inner_path": inner_path,
  248. "body": body,
  249. "peer": (config.ip_external, config.fileserver_port)
  250. })
  251. if result:
  252. break
  253. except Exception, err:
  254. result = {"exception": Debug.formatException(err)}
  255. if result and "ok" in result:
  256. published.append(peer)
  257. self.log.info("[OK] %s: %s" % (peer.key, result["ok"]))
  258. else:
  259. if result == {"exception": "Timeout"}:
  260. peer.onConnectionError()
  261. self.log.info("[FAILED] %s: %s" % (peer.key, result))
  262. # Update content.json on peers
  263. @util.Noparallel()
  264. def publish(self, limit=5, inner_path="content.json"):
  265. published = [] # Successfully published (Peer)
  266. publishers = [] # Publisher threads
  267. connected_peers = self.getConnectedPeers()
  268. if len(connected_peers) > limit * 2: # Publish to already connected peers if possible
  269. peers = connected_peers
  270. else:
  271. peers = self.peers.values()
  272. self.log.info("Publishing to %s/%s peers (connected: %s)..." % (
  273. min(len(self.peers), limit), len(self.peers), len(connected_peers)
  274. ))
  275. if not peers:
  276. return 0 # No peers found
  277. random.shuffle(peers)
  278. event_done = gevent.event.AsyncResult()
  279. for i in range(min(len(self.peers), limit, 5)): # Max 5 thread
  280. publisher = gevent.spawn(self.publisher, inner_path, peers, published, limit, event_done)
  281. publishers.append(publisher)
  282. event_done.get() # Wait for done
  283. if len(published) < min(len(self.peers), limit):
  284. time.sleep(0.2) # If less than we need sleep a bit
  285. if len(published) == 0:
  286. gevent.joinall(publishers) # No successful publish, wait for all publisher
  287. # Make sure the connected passive peers got the update
  288. passive_peers = [
  289. peer for peer in peers
  290. if peer.connection and not peer.connection.closed and peer.key.endswith(":0") and peer not in published
  291. ] # Every connected passive peer that we not published to
  292. self.log.info(
  293. "Successfuly published to %s peers, publishing to %s more passive peers" %
  294. (len(published), len(passive_peers))
  295. )
  296. for peer in passive_peers:
  297. gevent.spawn(self.publisher, inner_path, passive_peers, published, limit=10)
  298. return len(published)
  299. # Copy this site
  300. def clone(self, address, privatekey=None, address_index=None, overwrite=False):
  301. import shutil
  302. new_site = SiteManager.site_manager.need(address, all_file=False)
  303. default_dirs = [] # Dont copy these directories (has -default version)
  304. for dir_name in os.listdir(self.storage.directory):
  305. if "-default" in dir_name:
  306. default_dirs.append(dir_name.replace("-default", ""))
  307. self.log.debug("Cloning to %s, ignore dirs: %s" % (address, default_dirs))
  308. # Copy root content.json
  309. if not new_site.storage.isFile("content.json") and not overwrite:
  310. # Content.json not exist yet, create a new one from source site
  311. content_json = self.storage.loadJson("content.json")
  312. if "domain" in content_json:
  313. del content_json["domain"]
  314. content_json["title"] = "my" + content_json["title"]
  315. content_json["cloned_from"] = self.address
  316. if address_index:
  317. content_json["address_index"] = address_index # Site owner's BIP32 index
  318. new_site.storage.writeJson("content.json", content_json)
  319. new_site.content_manager.loadContent(
  320. "content.json", add_bad_files=False, delete_removed_files=False, load_includes=False
  321. )
  322. # Copy files
  323. for content_inner_path, content in self.content_manager.contents.items():
  324. for file_relative_path in sorted(content["files"].keys()):
  325. file_inner_path = helper.getDirname(content_inner_path) + file_relative_path # Relative to content.json
  326. file_inner_path = file_inner_path.strip("/") # Strip leading /
  327. if file_inner_path.split("/")[0] in default_dirs: # Dont copy directories that has -default postfixed alternative
  328. self.log.debug("[SKIP] %s (has default alternative)" % file_inner_path)
  329. continue
  330. file_path = self.storage.getPath(file_inner_path)
  331. # Copy the file normally to keep the -default postfixed dir and file to allow cloning later
  332. file_path_dest = new_site.storage.getPath(file_inner_path)
  333. self.log.debug("[COPY] %s to %s..." % (file_inner_path, file_path_dest))
  334. dest_dir = os.path.dirname(file_path_dest)
  335. if not os.path.isdir(dest_dir):
  336. os.makedirs(dest_dir)
  337. shutil.copy(file_path, file_path_dest)
  338. # If -default in path, create a -default less copy of the file
  339. if "-default" in file_inner_path:
  340. file_path_dest = new_site.storage.getPath(file_inner_path.replace("-default", ""))
  341. if new_site.storage.isFile(file_path_dest) and not overwrite: # Don't overwrite site files with default ones
  342. self.log.debug("[SKIP] Default file: %s (already exist)" % file_inner_path)
  343. continue
  344. self.log.debug("[COPY] Default file: %s to %s..." % (file_inner_path, file_path_dest))
  345. dest_dir = os.path.dirname(file_path_dest)
  346. if not os.path.isdir(dest_dir):
  347. os.makedirs(dest_dir)
  348. shutil.copy(file_path, file_path_dest)
  349. # Sign if content json
  350. if file_path_dest.endswith("/content.json"):
  351. new_site.storage.onUpdated(file_inner_path.replace("-default", ""))
  352. new_site.content_manager.loadContent(
  353. file_inner_path.replace("-default", ""), add_bad_files=False,
  354. delete_removed_files=False, load_includes=False
  355. )
  356. if privatekey:
  357. new_site.content_manager.sign(file_inner_path.replace("-default", ""), privatekey)
  358. new_site.content_manager.loadContent(
  359. file_inner_path, add_bad_files=False, delete_removed_files=False, load_includes=False
  360. )
  361. if privatekey:
  362. new_site.content_manager.sign("content.json", privatekey)
  363. new_site.content_manager.loadContent(
  364. "content.json", add_bad_files=False, delete_removed_files=False, load_includes=False
  365. )
  366. # Rebuild DB
  367. if new_site.storage.isFile("dbschema.json"):
  368. new_site.storage.closeDb()
  369. new_site.storage.rebuildDb()
  370. return new_site
  371. # Check and download if file not exist
  372. def needFile(self, inner_path, update=False, blocking=True, peer=None, priority=0):
  373. if self.storage.isFile(inner_path) and not update: # File exist, no need to do anything
  374. return True
  375. elif self.settings["serving"] is False: # Site not serving
  376. return False
  377. else: # Wait until file downloaded
  378. self.bad_files[inner_path] = self.bad_files.get(inner_path, 0) + 1 # Mark as bad file
  379. if not self.content_manager.contents.get("content.json"): # No content.json, download it first!
  380. self.log.debug("Need content.json first")
  381. gevent.spawn(self.announce)
  382. if inner_path != "content.json": # Prevent double download
  383. task = self.worker_manager.addTask("content.json", peer)
  384. task.get()
  385. self.content_manager.loadContent()
  386. if not self.content_manager.contents.get("content.json"):
  387. return False # Content.json download failed
  388. if not inner_path.endswith("content.json") and not self.content_manager.getFileInfo(inner_path):
  389. # No info for file, download all content.json first
  390. self.log.debug("No info for %s, waiting for all content.json" % inner_path)
  391. success = self.downloadContent("content.json", download_files=False)
  392. if not success:
  393. return False
  394. if not self.content_manager.getFileInfo(inner_path):
  395. return False # Still no info for file
  396. task = self.worker_manager.addTask(inner_path, peer, priority=priority)
  397. if blocking:
  398. return task.get()
  399. else:
  400. return task
  401. # Add or update a peer to site
  402. # return_peer: Always return the peer even if it was already present
  403. def addPeer(self, ip, port, return_peer=False):
  404. if not ip:
  405. return False
  406. if (ip, port) in self.peer_blacklist:
  407. return False # Ignore blacklist (eg. myself)
  408. key = "%s:%s" % (ip, port)
  409. if key in self.peers: # Already has this ip
  410. self.peers[key].found()
  411. if return_peer: # Always return peer
  412. return self.peers[key]
  413. else:
  414. return False
  415. else: # New peer
  416. peer = Peer(ip, port, self)
  417. self.peers[key] = peer
  418. return peer
  419. # Gather peer from connected peers
  420. @util.Noparallel(blocking=False)
  421. def announcePex(self, query_num=2, need_num=5):
  422. peers = [peer for peer in self.peers.values() if peer.connection and peer.connection.connected] # Connected peers
  423. if len(peers) == 0: # Small number of connected peers for this site, connect to any
  424. self.log.debug("Small number of peers detected...query all of peers using pex")
  425. peers = self.peers.values()
  426. need_num = 10
  427. random.shuffle(peers)
  428. done = 0
  429. added = 0
  430. for peer in peers:
  431. if peer.connection: # Has connection
  432. if "port_opened" in peer.connection.handshake: # This field added recently, so probably has has peer exchange
  433. res = peer.pex(need_num=need_num)
  434. else:
  435. res = False
  436. else: # No connection
  437. res = peer.pex(need_num=need_num)
  438. if type(res) == int: # We have result
  439. done += 1
  440. added += res
  441. if res:
  442. self.worker_manager.onPeers()
  443. self.updateWebsocket(peers_added=res)
  444. if done == query_num:
  445. break
  446. self.log.debug("Queried pex from %s peers got %s new peers." % (done, added))
  447. # Gather peers from tracker
  448. # Return: Complete time or False on error
  449. def announceTracker(self, protocol, address, fileserver_port, address_hash, my_peer_id):
  450. s = time.time()
  451. if protocol == "udp": # Udp tracker
  452. if config.disable_udp:
  453. return False # No udp supported
  454. ip, port = address.split(":")
  455. tracker = UdpTrackerClient(ip, int(port))
  456. tracker.peer_port = fileserver_port
  457. try:
  458. tracker.connect()
  459. tracker.poll_once()
  460. tracker.announce(info_hash=address_hash, num_want=50)
  461. back = tracker.poll_once()
  462. peers = back["response"]["peers"]
  463. except Exception, err:
  464. return False
  465. else: # Http tracker
  466. params = {
  467. 'info_hash': binascii.a2b_hex(address_hash),
  468. 'peer_id': my_peer_id, 'port': fileserver_port,
  469. 'uploaded': 0, 'downloaded': 0, 'left': 0, 'compact': 1, 'numwant': 30,
  470. 'event': 'started'
  471. }
  472. req = None
  473. try:
  474. url = "http://" + address + "?" + urllib.urlencode(params)
  475. # Load url
  476. with gevent.Timeout(10, False): # Make sure of timeout
  477. req = urllib2.urlopen(url, timeout=8)
  478. response = req.read()
  479. req.fp._sock.recv = None # Hacky avoidance of memory leak for older python versions
  480. req.close()
  481. req = None
  482. if not response:
  483. self.log.debug("Http tracker %s response error" % url)
  484. return False
  485. # Decode peers
  486. peer_data = bencode.decode(response)["peers"]
  487. response = None
  488. peer_count = len(peer_data) / 6
  489. peers = []
  490. for peer_offset in xrange(peer_count):
  491. off = 6 * peer_offset
  492. peer = peer_data[off:off + 6]
  493. addr, port = struct.unpack('!LH', peer)
  494. peers.append({"addr": socket.inet_ntoa(struct.pack('!L', addr)), "port": port})
  495. except Exception, err:
  496. self.log.debug("Http tracker %s error: %s" % (url, err))
  497. if req:
  498. req.close()
  499. req = None
  500. return False
  501. # Adding peers
  502. added = 0
  503. for peer in peers:
  504. if not peer["port"]:
  505. continue # Dont add peers with port 0
  506. if self.addPeer(peer["addr"], peer["port"]):
  507. added += 1
  508. if added:
  509. self.worker_manager.onPeers()
  510. self.updateWebsocket(peers_added=added)
  511. self.log.debug("Found %s peers, new: %s" % (len(peers), added))
  512. return time.time() - s
  513. # Add myself and get other peers from tracker
  514. def announce(self, force=False, num=5, pex=True):
  515. if time.time() < self.last_announce + 30 and not force:
  516. return # No reannouncing within 30 secs
  517. self.last_announce = time.time()
  518. trackers = config.trackers
  519. if num == 1: # Only announce on one tracker, increment the queried tracker id
  520. self.last_tracker_id += 1
  521. self.last_tracker_id = self.last_tracker_id % len(trackers)
  522. trackers = [trackers[self.last_tracker_id]] # We only going to use this one
  523. errors = []
  524. slow = []
  525. address_hash = hashlib.sha1(self.address).hexdigest() # Site address hash
  526. my_peer_id = sys.modules["main"].file_server.peer_id
  527. if sys.modules["main"].file_server.port_opened:
  528. fileserver_port = config.fileserver_port
  529. else: # Port not opened, report port 0
  530. fileserver_port = 0
  531. s = time.time()
  532. announced = 0
  533. threads = []
  534. for tracker in trackers: # Start announce threads
  535. protocol, address = tracker.split("://")
  536. thread = gevent.spawn(self.announceTracker, protocol, address, fileserver_port, address_hash, my_peer_id)
  537. threads.append(thread)
  538. thread.address = address
  539. thread.protocol = protocol
  540. if len(threads) > num: # Announce limit
  541. break
  542. gevent.joinall(threads) # Wait for announce finish
  543. for thread in threads:
  544. if thread.value:
  545. if thread.value > 1:
  546. slow.append("%.2fs %s://%s" % (thread.value, thread.protocol, thread.address))
  547. announced += 1
  548. else:
  549. errors.append("%s://%s" % (thread.protocol, thread.address))
  550. # Save peers num
  551. self.settings["peers"] = len(self.peers)
  552. self.saveSettings()
  553. if len(errors) < min(num, len(trackers)): # Less errors than total tracker nums
  554. self.log.debug(
  555. "Announced port %s to %s trackers in %.3fs, errors: %s, slow: %s" %
  556. (fileserver_port, announced, time.time() - s, errors, slow)
  557. )
  558. else:
  559. self.log.error("Announce to %s trackers in %.3fs, failed" % (announced, time.time() - s))
  560. if pex:
  561. if not [peer for peer in self.peers.values() if peer.connection and peer.connection.connected]:
  562. # If no connected peer yet then wait for connections
  563. gevent.spawn_later(3, self.announcePex, need_num=10) # Spawn 3 secs later
  564. else: # Else announce immediately
  565. self.announcePex()
  566. # Keep connections to get the updates (required for passive clients)
  567. def needConnections(self, num=3):
  568. need = min(len(self.peers), num) # Need 3 peer, but max total peers
  569. connected = 0
  570. for peer in self.peers.values(): # Check current connected number
  571. if peer.connection and peer.connection.connected:
  572. connected += 1
  573. self.log.debug("Need connections: %s, Current: %s, Total: %s" % (need, connected, len(self.peers)))
  574. if connected < need: # Need more than we have
  575. for peer in self.peers.values():
  576. if not peer.connection or not peer.connection.connected: # No peer connection or disconnected
  577. peer.pex() # Initiate peer exchange
  578. if peer.connection and peer.connection.connected:
  579. connected += 1 # Successfully connected
  580. if connected >= need:
  581. break
  582. return connected
  583. # Return: Probably working, connectable Peers
  584. def getConnectablePeers(self, need_num=5, ignore=[]):
  585. peers = self.peers.values()
  586. random.shuffle(peers)
  587. found = []
  588. for peer in peers:
  589. if peer.key.endswith(":0"):
  590. continue # Not connectable
  591. if not peer.connection:
  592. continue # No connection
  593. if peer.key in ignore:
  594. continue # The requester has this peer
  595. if time.time() - peer.connection.last_recv_time > 60 * 60 * 2: # Last message more than 2 hours ago
  596. peer.connection = None # Cleanup: Dead connection
  597. continue
  598. found.append(peer)
  599. if len(found) >= need_num:
  600. break # Found requested number of peers
  601. if (not found and not ignore) or (need_num > 5 and need_num < 100 and len(found) < need_num):
  602. # Return not that good peers: Not found any peer and the requester dont have any or cant give enough peer
  603. found = [peer for peer in peers if not peer.key.endswith(":0") and peer.key not in ignore][0:need_num - len(found)]
  604. return found
  605. def getConnectedPeers(self):
  606. return [peer for peer in self.peers.values() if peer.connection and peer.connection.connected]
  607. # Cleanup probably dead peers
  608. def cleanupPeers(self):
  609. peers = self.peers.values()
  610. if len(peers) < 20:
  611. return False
  612. removed = 0
  613. for peer in peers:
  614. if peer.connection and peer.connection.connected:
  615. continue
  616. if peer.connection and not peer.connection.connected:
  617. peer.connection = None # Dead connection
  618. if time.time() - peer.last_found > 60 * 60 * 4: # Not found on tracker or via pex in last 4 hour
  619. peer.remove()
  620. removed += 1
  621. if removed > 5: # Don't remove too much at once
  622. break
  623. if removed:
  624. self.log.debug("Cleanup peers result: Removed %s, left: %s" % (removed, len(self.peers)))
  625. # - Events -
  626. # Add event listeners
  627. def addEventListeners(self):
  628. self.onFileStart = util.Event() # If WorkerManager added new task
  629. self.onFileDone = util.Event() # If WorkerManager successfully downloaded a file
  630. self.onFileFail = util.Event() # If WorkerManager failed to download a file
  631. self.onComplete = util.Event() # All file finished
  632. self.onFileStart.append(lambda inner_path: self.fileStarted()) # No parameters to make Noparallel batching working
  633. self.onFileDone.append(lambda inner_path: self.fileDone(inner_path))
  634. self.onFileFail.append(lambda inner_path: self.fileFailed(inner_path))
  635. # Send site status update to websocket clients
  636. def updateWebsocket(self, **kwargs):
  637. if kwargs:
  638. param = {"event": kwargs.items()[0]}
  639. else:
  640. param = None
  641. for ws in self.websockets:
  642. ws.event("siteChanged", self, param)
  643. # File download started
  644. @util.Noparallel(blocking=False)
  645. def fileStarted(self):
  646. time.sleep(0.001) # Wait for other files adds
  647. self.updateWebsocket(file_started=True)
  648. # File downloaded successful
  649. def fileDone(self, inner_path):
  650. # File downloaded, remove it from bad files
  651. if inner_path in self.bad_files:
  652. self.log.debug("Bad file solved: %s" % inner_path)
  653. del(self.bad_files[inner_path])
  654. # Update content.json last downlad time
  655. if inner_path == "content.json":
  656. self.content_updated = time.time()
  657. self.updateWebsocket(file_done=inner_path)
  658. # File download failed
  659. def fileFailed(self, inner_path):
  660. if inner_path == "content.json":
  661. self.content_updated = False
  662. self.log.debug("Can't update content.json")
  663. if inner_path in self.bad_files:
  664. self.bad_files[inner_path] = self.bad_files.get(inner_path, 0) + 1
  665. self.updateWebsocket(file_failed=inner_path)