ChartDb.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. from Config import config
  2. from Db import Db
  3. import time
  4. class ChartDb(Db):
  5. def __init__(self):
  6. self.version = 2
  7. super(ChartDb, self).__init__(self.getSchema(), "%s/chart.db" % config.data_dir)
  8. self.foreign_keys = True
  9. self.checkTables()
  10. self.sites = self.loadSites()
  11. self.types = self.loadTypes()
  12. def getSchema(self):
  13. schema = {}
  14. schema["db_name"] = "Chart"
  15. schema["tables"] = {}
  16. schema["tables"]["data"] = {
  17. "cols": [
  18. ["data_id", "INTEGER PRIMARY KEY ASC AUTOINCREMENT NOT NULL UNIQUE"],
  19. ["type_id", "INTEGER NOT NULL"],
  20. ["site_id", "INTEGER"],
  21. ["value", "INTEGER"],
  22. ["date_added", "DATETIME DEFAULT (CURRENT_TIMESTAMP)"]
  23. ],
  24. "indexes": [
  25. "CREATE INDEX site_id ON data (site_id)",
  26. "CREATE INDEX date_added ON data (date_added)"
  27. ],
  28. "schema_changed": 2
  29. }
  30. schema["tables"]["type"] = {
  31. "cols": [
  32. ["type_id", "INTEGER PRIMARY KEY NOT NULL UNIQUE"],
  33. ["name", "TEXT"]
  34. ],
  35. "schema_changed": 1
  36. }
  37. schema["tables"]["site"] = {
  38. "cols": [
  39. ["site_id", "INTEGER PRIMARY KEY NOT NULL UNIQUE"],
  40. ["address", "TEXT"]
  41. ],
  42. "schema_changed": 1
  43. }
  44. return schema
  45. def getTypeId(self, name):
  46. if name not in self.types:
  47. self.execute("INSERT INTO type ?", {"name": name})
  48. self.types[name] = self.cur.cursor.lastrowid
  49. return self.types[name]
  50. def getSiteId(self, address):
  51. if address not in self.sites:
  52. self.execute("INSERT INTO site ?", {"address": address})
  53. self.sites[address] = self.cur.cursor.lastrowid
  54. return self.sites[address]
  55. def loadSites(self):
  56. sites = {}
  57. for row in self.execute("SELECT * FROM site"):
  58. sites[row["address"]] = row["site_id"]
  59. return sites
  60. def loadTypes(self):
  61. types = {}
  62. for row in self.execute("SELECT * FROM type"):
  63. types[row["name"]] = row["type_id"]
  64. return types
  65. def deleteSite(self, address):
  66. if address in self.sites:
  67. site_id = self.sites[address]
  68. del self.sites[address]
  69. self.execute("DELETE FROM site WHERE ?", {"site_id": site_id})
  70. self.execute("DELETE FROM data WHERE ?", {"site_id": site_id})
  71. def archive(self):
  72. week_back = 1
  73. while 1:
  74. s = time.time()
  75. date_added_from = time.time() - 60 * 60 * 24 * 7 * (week_back + 1)
  76. date_added_to = date_added_from + 60 * 60 * 24 * 7
  77. res = self.execute("""
  78. SELECT
  79. MAX(date_added) AS date_added,
  80. SUM(value) AS value,
  81. GROUP_CONCAT(data_id) AS data_ids,
  82. type_id,
  83. site_id,
  84. COUNT(*) AS num
  85. FROM data
  86. WHERE
  87. site_id IS NULL AND
  88. date_added > :date_added_from AND
  89. date_added < :date_added_to
  90. GROUP BY strftime('%Y-%m-%d %H', date_added, 'unixepoch', 'localtime'), type_id
  91. """, {"date_added_from": date_added_from, "date_added_to": date_added_to})
  92. num_archived = 0
  93. cur = self.getCursor()
  94. for row in res:
  95. if row["num"] == 1:
  96. continue
  97. cur.execute("INSERT INTO data ?", {
  98. "type_id": row["type_id"],
  99. "site_id": row["site_id"],
  100. "value": row["value"],
  101. "date_added": row["date_added"]
  102. })
  103. cur.execute("DELETE FROM data WHERE data_id IN (%s)" % row["data_ids"])
  104. num_archived += row["num"]
  105. self.log.debug("Archived %s data from %s weeks ago in %.3fs" % (num_archived, week_back, time.time() - s))
  106. week_back += 1
  107. time.sleep(0.1)
  108. if num_archived == 0:
  109. break
  110. if week_back > 1:
  111. self.execute("VACUUM")