PeerDbPlugin.py 3.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import time
  2. import sqlite3
  3. import random
  4. import atexit
  5. import gevent
  6. from Plugin import PluginManager
  7. @PluginManager.registerTo("ContentDb")
  8. class ContentDbPlugin(object):
  9. def __init__(self, *args, **kwargs):
  10. atexit.register(self.saveAllPeers)
  11. super(ContentDbPlugin, self).__init__(*args, **kwargs)
  12. def getSchema(self):
  13. schema = super(ContentDbPlugin, self).getSchema()
  14. schema["tables"]["peer"] = {
  15. "cols": [
  16. ["site_id", "INTEGER REFERENCES site (site_id) ON DELETE CASCADE"],
  17. ["address", "TEXT NOT NULL"],
  18. ["port", "INTEGER NOT NULL"],
  19. ["hashfield", "BLOB"],
  20. ["time_added", "INTEGER NOT NULL"]
  21. ],
  22. "indexes": [
  23. "CREATE UNIQUE INDEX peer_key ON peer (site_id, address, port)"
  24. ],
  25. "schema_changed": 1
  26. }
  27. return schema
  28. def loadPeers(self, site):
  29. s = time.time()
  30. site_id = self.site_ids.get(site.address)
  31. res = self.execute("SELECT * FROM peer WHERE site_id = :site_id", {"site_id": site_id})
  32. num = 0
  33. num_hashfield = 0
  34. for row in res:
  35. peer = site.addPeer(row["address"], row["port"])
  36. if not peer: # Already exist
  37. continue
  38. if row["hashfield"]:
  39. peer.hashfield.replaceFromString(row["hashfield"])
  40. num_hashfield += 1
  41. peer.time_added = row["time_added"]
  42. peer.reputation = int((time.time() - peer.time_added) / (60 * 60 * 24)) # Boost reputation for older peers
  43. if row["address"].endswith(".onion"):
  44. peer.reputation = peer.reputation / 2 # Onion peers less likely working
  45. num += 1
  46. site.log.debug("%s peers (%s with hashfield) loaded in %.3fs" % (num, num_hashfield, time.time() - s))
  47. def iteratePeers(self, site):
  48. site_id = self.site_ids.get(site.address)
  49. for key, peer in site.peers.iteritems():
  50. address, port = key.split(":")
  51. if peer.has_hashfield:
  52. hashfield = sqlite3.Binary(peer.hashfield.tostring())
  53. else:
  54. hashfield = ""
  55. yield (site_id, address, port, hashfield, int(peer.time_added))
  56. def savePeers(self, site, spawn=False):
  57. if spawn:
  58. # Save peers every hour (+random some secs to not update very site at same time)
  59. gevent.spawn_later(60 * 60 + random.randint(0, 60), self.savePeers, site, spawn=True)
  60. if not site.peers:
  61. site.log.debug("Peers not saved: No peers found")
  62. return
  63. s = time.time()
  64. site_id = self.site_ids.get(site.address)
  65. cur = self.getCursor()
  66. cur.execute("BEGIN")
  67. try:
  68. self.execute("DELETE FROM peer WHERE site_id = :site_id", {"site_id": site_id})
  69. self.cur.cursor.executemany(
  70. "INSERT INTO peer (site_id, address, port, hashfield, time_added) VALUES (?, ?, ?, ?, ?)",
  71. self.iteratePeers(site)
  72. )
  73. except Exception, err:
  74. site.log.error("Save peer error: %s" % err)
  75. finally:
  76. cur.execute("END")
  77. site.log.debug("Peers saved in %.3fs" % (time.time() - s))
  78. def initSite(self, site):
  79. super(ContentDbPlugin, self).initSite(site)
  80. gevent.spawn_later(0.5, self.loadPeers, site)
  81. gevent.spawn_later(60*60, self.savePeers, site, spawn=True)
  82. def saveAllPeers(self):
  83. for site in self.sites.values():
  84. try:
  85. self.savePeers(site)
  86. except Exception, err:
  87. site.log.error("Save peer error: %s" % err)