ChartCollector.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import time
  2. import sys
  3. import collections
  4. import itertools
  5. import logging
  6. import gevent
  7. from util import helper
  8. from Config import config
  9. class ChartCollector(object):
  10. def __init__(self, db):
  11. self.db = db
  12. if config.action == "main":
  13. gevent.spawn_later(60 * 3, self.collector)
  14. self.log = logging.getLogger("ChartCollector")
  15. self.last_values = collections.defaultdict(dict)
  16. def setInitialLastValues(self, sites):
  17. # Recover last value of site bytes/sent
  18. for site in sites:
  19. self.last_values["site:" + site.address]["site_bytes_recv"] = site.settings.get("bytes_recv", 0)
  20. self.last_values["site:" + site.address]["site_bytes_sent"] = site.settings.get("bytes_sent", 0)
  21. def getCollectors(self):
  22. collectors = {}
  23. file_server = sys.modules["main"].file_server
  24. sites = file_server.sites
  25. if not sites:
  26. return collectors
  27. content_db = sites.values()[0].content_manager.contents.db
  28. # Connection stats
  29. collectors["connection"] = lambda: len(file_server.connections)
  30. collectors["connection_in"] = (
  31. lambda: len([1 for connection in file_server.connections if connection.type == "in"])
  32. )
  33. collectors["connection_onion"] = (
  34. lambda: len([1 for connection in file_server.connections if connection.ip.endswith(".onion")])
  35. )
  36. collectors["connection_ping_avg"] = (
  37. lambda: round(1000 * helper.avg(
  38. [connection.last_ping_delay for connection in file_server.connections if connection.last_ping_delay]
  39. ))
  40. )
  41. collectors["connection_ping_min"] = (
  42. lambda: round(1000 * min(
  43. [connection.last_ping_delay for connection in file_server.connections if connection.last_ping_delay]
  44. ))
  45. )
  46. collectors["connection_rev_avg"] = (
  47. lambda: helper.avg(
  48. [connection.handshake["rev"] for connection in file_server.connections if connection.handshake]
  49. )
  50. )
  51. # Request stats
  52. collectors["file_bytes_recv|change"] = lambda: file_server.bytes_recv
  53. collectors["file_bytes_sent|change"] = lambda: file_server.bytes_sent
  54. collectors["request_num_recv|change"] = lambda: file_server.num_recv
  55. collectors["request_num_sent|change"] = lambda: file_server.num_sent
  56. # Limit
  57. collectors["optional_limit"] = lambda: content_db.getOptionalLimitBytes()
  58. collectors["optional_used"] = lambda: content_db.getOptionalUsedBytes()
  59. collectors["optional_downloaded"] = lambda: sum([site.settings.get("optional_downloaded", 0) for site in sites.values()])
  60. # Peers
  61. collectors["peer"] = lambda (peers): len(peers)
  62. collectors["peer_onion"] = lambda (peers): len([True for peer in peers if ".onion" in peer])
  63. # Size
  64. collectors["size"] = lambda: sum([site.settings.get("size", 0) for site in sites.values()])
  65. collectors["size_optional"] = lambda: sum([site.settings.get("size_optional", 0) for site in sites.values()])
  66. collectors["content"] = lambda: sum([len(site.content_manager.contents) for site in sites.values()])
  67. return collectors
  68. def getSiteCollectors(self):
  69. site_collectors = {}
  70. # Size
  71. site_collectors["site_size"] = lambda(site): site.settings.get("size", 0)
  72. site_collectors["site_size_optional"] = lambda(site): site.settings.get("size_optional", 0)
  73. site_collectors["site_optional_downloaded"] = lambda(site): site.settings.get("optional_downloaded", 0)
  74. site_collectors["site_content"] = lambda(site): len(site.content_manager.contents)
  75. # Data transfer
  76. site_collectors["site_bytes_recv|change"] = lambda(site): site.settings.get("bytes_recv", 0)
  77. site_collectors["site_bytes_sent|change"] = lambda(site): site.settings.get("bytes_sent", 0)
  78. # Peers
  79. site_collectors["site_peer"] = lambda(site): len(site.peers)
  80. site_collectors["site_peer_onion"] = lambda(site): len(
  81. [True for peer in site.peers.itervalues() if peer.ip.endswith(".onion")]
  82. )
  83. site_collectors["site_peer_connected"] = lambda(site): len([True for peer in site.peers.itervalues() if peer.connection])
  84. return site_collectors
  85. def getUniquePeers(self):
  86. sites = sys.modules["main"].file_server.sites
  87. return set(itertools.chain.from_iterable(
  88. [site.peers.keys() for site in sites.values()]
  89. ))
  90. def collectDatas(self, collectors, last_values, site=None):
  91. if site is None:
  92. peers = self.getUniquePeers()
  93. datas = {}
  94. for key, collector in collectors.iteritems():
  95. try:
  96. if site:
  97. value = collector(site)
  98. elif key.startswith("peer"):
  99. value = collector(peers)
  100. else:
  101. value = collector()
  102. except Exception as err:
  103. self.log.info("Collector %s error: %s" % (key, err))
  104. value = None
  105. if "|change" in key: # Store changes relative to last value
  106. key = key.replace("|change", "")
  107. last_value = last_values.get(key, 0)
  108. last_values[key] = value
  109. value = value - last_value
  110. if value is None:
  111. datas[key] = None
  112. else:
  113. datas[key] = round(value, 3)
  114. return datas
  115. def collectGlobal(self, collectors, last_values):
  116. now = int(time.time())
  117. s = time.time()
  118. datas = self.collectDatas(collectors, last_values["global"])
  119. values = []
  120. for key, value in datas.iteritems():
  121. values.append((self.db.getTypeId(key), value, now))
  122. self.log.debug("Global collectors done in %.3fs" % (time.time() - s))
  123. s = time.time()
  124. cur = self.db.getCursor()
  125. cur.execute("BEGIN")
  126. cur.cursor.executemany("INSERT INTO data (type_id, value, date_added) VALUES (?, ?, ?)", values)
  127. cur.execute("END")
  128. cur.close()
  129. self.log.debug("Global collectors inserted in %.3fs" % (time.time() - s))
  130. def collectSites(self, sites, collectors, last_values):
  131. now = int(time.time())
  132. s = time.time()
  133. values = []
  134. for address, site in sites.iteritems():
  135. site_datas = self.collectDatas(collectors, last_values["site:%s" % address], site)
  136. for key, value in site_datas.iteritems():
  137. values.append((self.db.getTypeId(key), self.db.getSiteId(address), value, now))
  138. time.sleep(0.000001)
  139. self.log.debug("Site collections done in %.3fs" % (time.time() - s))
  140. s = time.time()
  141. cur = self.db.getCursor()
  142. cur.execute("BEGIN")
  143. cur.cursor.executemany("INSERT INTO data (type_id, site_id, value, date_added) VALUES (?, ?, ?, ?)", values)
  144. cur.execute("END")
  145. cur.close()
  146. self.log.debug("Site collectors inserted in %.3fs" % (time.time() - s))
  147. def collector(self):
  148. collectors = self.getCollectors()
  149. site_collectors = self.getSiteCollectors()
  150. sites = sys.modules["main"].file_server.sites
  151. i = 0
  152. while 1:
  153. self.collectGlobal(collectors, self.last_values)
  154. if i % 12 == 0: # Only collect sites data every hour
  155. self.collectSites(sites, site_collectors, self.last_values)
  156. time.sleep(60 * 5)
  157. i += 1