Db.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. import sqlite3
  2. import json
  3. import time
  4. import logging
  5. import re
  6. import os
  7. import gevent
  8. from DbCursor import DbCursor
  9. opened_dbs = []
  10. # Close idle databases to save some memory
  11. def dbCleanup():
  12. while 1:
  13. time.sleep(60 * 5)
  14. for db in opened_dbs[:]:
  15. if time.time() - db.last_query_time > 60 * 3:
  16. db.close()
  17. gevent.spawn(dbCleanup)
  18. class Db:
  19. def __init__(self, schema, db_path):
  20. self.db_path = db_path
  21. self.db_dir = os.path.dirname(db_path) + "/"
  22. self.schema = schema
  23. self.schema["version"] = self.schema.get("version", 1)
  24. self.conn = None
  25. self.cur = None
  26. self.log = logging.getLogger("Db:%s" % schema["db_name"])
  27. self.table_names = None
  28. self.collect_stats = False
  29. self.query_stats = {}
  30. self.db_keyvalues = {}
  31. self.last_query_time = time.time()
  32. def __repr__(self):
  33. return "<Db:%s>" % self.db_path
  34. def connect(self):
  35. if self not in opened_dbs:
  36. opened_dbs.append(self)
  37. self.log.debug("Connecting to %s (sqlite version: %s)..." % (self.db_path, sqlite3.version))
  38. if not os.path.isdir(self.db_dir): # Directory not exist yet
  39. os.makedirs(self.db_dir)
  40. self.log.debug("Created Db path: %s" % self.db_dir)
  41. if not os.path.isfile(self.db_path):
  42. self.log.debug("Db file not exist yet: %s" % self.db_path)
  43. self.conn = sqlite3.connect(self.db_path)
  44. self.conn.row_factory = sqlite3.Row
  45. self.conn.isolation_level = None
  46. self.cur = self.getCursor()
  47. # We need more speed then security
  48. self.cur.execute("PRAGMA journal_mode = WAL")
  49. self.cur.execute("PRAGMA journal_mode = MEMORY")
  50. self.cur.execute("PRAGMA synchronous = OFF")
  51. # Execute query using dbcursor
  52. def execute(self, query, params=None):
  53. self.last_query_time = time.time()
  54. if not self.conn:
  55. self.connect()
  56. return self.cur.execute(query, params)
  57. def close(self):
  58. if self in opened_dbs:
  59. opened_dbs.remove(self)
  60. self.log.debug("Closing")
  61. if self.cur:
  62. self.cur.close()
  63. if self.conn:
  64. self.conn.close()
  65. self.conn = None
  66. self.cur = None
  67. # Gets a cursor object to database
  68. # Return: Cursor class
  69. def getCursor(self):
  70. if not self.conn:
  71. self.connect()
  72. return DbCursor(self.conn, self)
  73. # Get the table version
  74. # Return: Table version or None if not exist
  75. def getTableVersion(self, table_name):
  76. """if not self.table_names: # Get existing table names
  77. res = self.cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
  78. self.table_names = [row["name"] for row in res]
  79. if table_name not in self.table_names:
  80. return False
  81. else:"""
  82. if not self.db_keyvalues: # Get db keyvalues
  83. try:
  84. res = self.cur.execute("SELECT * FROM keyvalue WHERE json_id=0") # json_id = 0 is internal keyvalues
  85. except sqlite3.OperationalError, err: # Table not exist
  86. self.log.debug("Query error: %s" % err)
  87. return False
  88. for row in res:
  89. self.db_keyvalues[row["key"]] = row["value"]
  90. return self.db_keyvalues.get("table.%s.version" % table_name, 0)
  91. # Check Db tables
  92. # Return: <list> Changed table names
  93. def checkTables(self):
  94. s = time.time()
  95. changed_tables = []
  96. cur = self.getCursor()
  97. cur.execute("BEGIN")
  98. # Check internal tables
  99. # Check keyvalue table
  100. changed = cur.needTable("keyvalue", [
  101. ["keyvalue_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
  102. ["key", "TEXT"],
  103. ["value", "INTEGER"],
  104. ["json_id", "INTEGER REFERENCES json (json_id)"],
  105. ], [
  106. "CREATE UNIQUE INDEX key_id ON keyvalue(json_id, key)"
  107. ], version=self.schema["version"])
  108. if changed:
  109. changed_tables.append("keyvalue")
  110. # Check json table
  111. if self.schema["version"] == 1:
  112. changed = cur.needTable("json", [
  113. ["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
  114. ["path", "VARCHAR(255)"]
  115. ], [
  116. "CREATE UNIQUE INDEX path ON json(path)"
  117. ], version=self.schema["version"])
  118. else:
  119. changed = cur.needTable("json", [
  120. ["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
  121. ["directory", "VARCHAR(255)"],
  122. ["file_name", "VARCHAR(255)"]
  123. ], [
  124. "CREATE UNIQUE INDEX path ON json(directory, file_name)"
  125. ], version=self.schema["version"])
  126. if changed:
  127. changed_tables.append("json")
  128. # Check schema tables
  129. for table_name, table_settings in self.schema["tables"].items():
  130. changed = cur.needTable(
  131. table_name, table_settings["cols"],
  132. table_settings["indexes"], version=table_settings["schema_changed"]
  133. )
  134. if changed:
  135. changed_tables.append(table_name)
  136. cur.execute("COMMIT")
  137. self.log.debug("Db check done in %.3fs, changed tables: %s" % (time.time() - s, changed_tables))
  138. return changed_tables
  139. # Load json file to db
  140. # Return: True if matched
  141. def loadJson(self, file_path, file=None, cur=None):
  142. if not file_path.startswith(self.db_dir):
  143. return False # Not from the db dir: Skipping
  144. relative_path = re.sub("^%s" % self.db_dir, "", file_path) # File path realative to db file
  145. # Check if filename matches any of mappings in schema
  146. matched_maps = []
  147. for match, map_settings in self.schema["maps"].items():
  148. if re.match(match, relative_path):
  149. matched_maps.append(map_settings)
  150. # No match found for the file
  151. if not matched_maps:
  152. return False
  153. # Load the json file
  154. if not file:
  155. file = open(file_path)
  156. data = json.load(file)
  157. # No cursor specificed
  158. if not cur:
  159. cur = self.getCursor()
  160. cur.execute("BEGIN")
  161. cur.logging = False
  162. commit_after_done = True
  163. else:
  164. commit_after_done = False
  165. # Row for current json file
  166. json_row = cur.getJsonRow(relative_path)
  167. # Check matched mappings in schema
  168. for map in matched_maps:
  169. # Insert non-relational key values
  170. if map.get("to_keyvalue"):
  171. # Get current values
  172. res = cur.execute("SELECT * FROM keyvalue WHERE json_id = ?", (json_row["json_id"],))
  173. current_keyvalue = {}
  174. current_keyvalue_id = {}
  175. for row in res:
  176. current_keyvalue[row["key"]] = row["value"]
  177. current_keyvalue_id[row["key"]] = row["keyvalue_id"]
  178. for key in map["to_keyvalue"]:
  179. if key not in current_keyvalue: # Keyvalue not exist yet in the db
  180. cur.execute(
  181. "INSERT INTO keyvalue ?",
  182. {"key": key, "value": data.get(key), "json_id": json_row["json_id"]}
  183. )
  184. elif data.get(key) != current_keyvalue[key]: # Keyvalue different value
  185. cur.execute(
  186. "UPDATE keyvalue SET value = ? WHERE keyvalue_id = ?",
  187. (data.get(key), current_keyvalue_id[key])
  188. )
  189. """
  190. for key in map.get("to_keyvalue", []):
  191. cur.execute("INSERT OR REPLACE INTO keyvalue ?",
  192. {"key": key, "value": data.get(key), "json_id": json_row["json_id"]}
  193. )
  194. """
  195. # Insert data to tables
  196. for table_settings in map.get("to_table", []):
  197. if isinstance(table_settings, dict): # Custom settings
  198. table_name = table_settings["table"] # Table name to insert datas
  199. node = table_settings.get("node", table_name) # Node keyname in data json file
  200. key_col = table_settings.get("key_col") # Map dict key as this col
  201. val_col = table_settings.get("val_col") # Map dict value as this col
  202. import_cols = table_settings.get("import_cols")
  203. replaces = table_settings.get("replaces")
  204. else: # Simple settings
  205. table_name = table_settings
  206. node = table_settings
  207. key_col = None
  208. val_col = None
  209. import_cols = None
  210. replaces = None
  211. cur.execute("DELETE FROM %s WHERE json_id = ?" % table_name, (json_row["json_id"],))
  212. if node not in data:
  213. continue
  214. if key_col: # Map as dict
  215. for key, val in data[node].iteritems():
  216. if val_col: # Single value
  217. cur.execute(
  218. "INSERT OR REPLACE INTO %s ?" % table_name,
  219. {key_col: key, val_col: val, "json_id": json_row["json_id"]}
  220. )
  221. else: # Multi value
  222. if isinstance(val, dict): # Single row
  223. row = val
  224. if import_cols:
  225. row = {key: row[key] for key in import_cols} # Filter row by import_cols
  226. row[key_col] = key
  227. # Replace in value if necessary
  228. if replaces:
  229. for replace_key, replace in replaces.iteritems():
  230. if replace_key in row:
  231. for replace_from, replace_to in replace.iteritems():
  232. row[replace_key] = row[replace_key].replace(replace_from, replace_to)
  233. row["json_id"] = json_row["json_id"]
  234. cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row)
  235. else: # Multi row
  236. for row in val:
  237. row[key_col] = key
  238. row["json_id"] = json_row["json_id"]
  239. cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row)
  240. else: # Map as list
  241. for row in data[node]:
  242. row["json_id"] = json_row["json_id"]
  243. cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row)
  244. if commit_after_done:
  245. cur.execute("COMMIT")
  246. return True
  247. if __name__ == "__main__":
  248. s = time.time()
  249. console_log = logging.StreamHandler()
  250. logging.getLogger('').setLevel(logging.DEBUG)
  251. logging.getLogger('').addHandler(console_log)
  252. console_log.setLevel(logging.DEBUG)
  253. dbjson = Db(json.load(open("zerotalk.schema.json")), "data/users/zerotalk.db")
  254. dbjson.collect_stats = True
  255. dbjson.checkTables()
  256. cur = dbjson.getCursor()
  257. cur.execute("BEGIN")
  258. cur.logging = False
  259. dbjson.loadJson("data/users/content.json", cur=cur)
  260. for user_dir in os.listdir("data/users"):
  261. if os.path.isdir("data/users/%s" % user_dir):
  262. dbjson.loadJson("data/users/%s/data.json" % user_dir, cur=cur)
  263. # print ".",
  264. cur.logging = True
  265. cur.execute("COMMIT")
  266. print "Done in %.3fs" % (time.time() - s)
  267. for query, stats in sorted(dbjson.query_stats.items()):
  268. print "-", query, stats