PeerDbPlugin.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import time
  2. import sqlite3
  3. import random
  4. import atexit
  5. import gevent
  6. from Plugin import PluginManager
  7. @PluginManager.registerTo("ContentDb")
  8. class ContentDbPlugin(object):
  9. def __init__(self, *args, **kwargs):
  10. atexit.register(self.saveAllPeers)
  11. super(ContentDbPlugin, self).__init__(*args, **kwargs)
  12. def getSchema(self):
  13. schema = super(ContentDbPlugin, self).getSchema()
  14. schema["tables"]["peer"] = {
  15. "cols": [
  16. ["site_id", "INTEGER REFERENCES site (site_id) ON DELETE CASCADE"],
  17. ["address", "TEXT NOT NULL"],
  18. ["port", "INTEGER NOT NULL"],
  19. ["hashfield", "BLOB"],
  20. ["reputation", "INTEGER NOT NULL"],
  21. ["time_added", "INTEGER NOT NULL"],
  22. ["time_found", "INTEGER NOT NULL"]
  23. ],
  24. "indexes": [
  25. "CREATE UNIQUE INDEX peer_key ON peer (site_id, address, port)"
  26. ],
  27. "schema_changed": 2
  28. }
  29. return schema
  30. def loadPeers(self, site):
  31. s = time.time()
  32. site_id = self.site_ids.get(site.address)
  33. res = self.execute("SELECT * FROM peer WHERE site_id = :site_id", {"site_id": site_id})
  34. num = 0
  35. num_hashfield = 0
  36. for row in res:
  37. peer = site.addPeer(str(row["address"]), row["port"])
  38. if not peer: # Already exist
  39. continue
  40. if row["hashfield"]:
  41. peer.hashfield.replaceFromString(row["hashfield"])
  42. num_hashfield += 1
  43. peer.time_added = row["time_added"]
  44. peer.time_found = row["time_found"]
  45. peer.reputation = row["reputation"]
  46. if row["address"].endswith(".onion"):
  47. peer.reputation = peer.reputation / 2 - 1 # Onion peers less likely working
  48. num += 1
  49. if num_hashfield:
  50. site.content_manager.has_optional_files = True
  51. site.log.debug("%s peers (%s with hashfield) loaded in %.3fs" % (num, num_hashfield, time.time() - s))
  52. def iteratePeers(self, site):
  53. site_id = self.site_ids.get(site.address)
  54. for key, peer in site.peers.iteritems():
  55. address, port = key.rsplit(":", 1)
  56. if peer.has_hashfield:
  57. hashfield = sqlite3.Binary(peer.hashfield.tostring())
  58. else:
  59. hashfield = ""
  60. yield (site_id, address, port, hashfield, peer.reputation, int(peer.time_added), int(peer.time_found))
  61. def savePeers(self, site, spawn=False):
  62. if spawn:
  63. # Save peers every hour (+random some secs to not update very site at same time)
  64. gevent.spawn_later(60 * 60 + random.randint(0, 60), self.savePeers, site, spawn=True)
  65. if not site.peers:
  66. site.log.debug("Peers not saved: No peers found")
  67. return
  68. s = time.time()
  69. site_id = self.site_ids.get(site.address)
  70. cur = self.getCursor()
  71. cur.execute("BEGIN")
  72. try:
  73. cur.execute("DELETE FROM peer WHERE site_id = :site_id", {"site_id": site_id})
  74. cur.cursor.executemany(
  75. "INSERT INTO peer (site_id, address, port, hashfield, reputation, time_added, time_found) VALUES (?, ?, ?, ?, ?, ?, ?)",
  76. self.iteratePeers(site)
  77. )
  78. except Exception as err:
  79. site.log.error("Save peer error: %s" % err)
  80. finally:
  81. cur.execute("END")
  82. site.log.debug("Peers saved in %.3fs" % (time.time() - s))
  83. def initSite(self, site):
  84. super(ContentDbPlugin, self).initSite(site)
  85. gevent.spawn_later(0.5, self.loadPeers, site)
  86. gevent.spawn_later(60*60, self.savePeers, site, spawn=True)
  87. def saveAllPeers(self):
  88. for site in self.sites.values():
  89. try:
  90. self.savePeers(site)
  91. except Exception, err:
  92. site.log.error("Save peer error: %s" % err)