fts.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from synapse.storage.prepare_database import get_statements
  16. from synapse.storage.engines import PostgresEngine, Sqlite3Engine
  17. import simplejson
  18. logger = logging.getLogger(__name__)
  19. POSTGRES_TABLE = """
  20. CREATE TABLE IF NOT EXISTS event_search (
  21. event_id TEXT,
  22. room_id TEXT,
  23. sender TEXT,
  24. key TEXT,
  25. vector tsvector
  26. );
  27. CREATE INDEX event_search_fts_idx ON event_search USING gin(vector);
  28. CREATE INDEX event_search_ev_idx ON event_search(event_id);
  29. CREATE INDEX event_search_ev_ridx ON event_search(room_id);
  30. """
  31. SQLITE_TABLE = (
  32. "CREATE VIRTUAL TABLE event_search"
  33. " USING fts4 ( event_id, room_id, sender, key, value )"
  34. )
  35. def run_create(cur, database_engine, *args, **kwargs):
  36. if isinstance(database_engine, PostgresEngine):
  37. for statement in get_statements(POSTGRES_TABLE.splitlines()):
  38. cur.execute(statement)
  39. elif isinstance(database_engine, Sqlite3Engine):
  40. cur.execute(SQLITE_TABLE)
  41. else:
  42. raise Exception("Unrecognized database engine")
  43. cur.execute("SELECT MIN(stream_ordering) FROM events")
  44. rows = cur.fetchall()
  45. min_stream_id = rows[0][0]
  46. cur.execute("SELECT MAX(stream_ordering) FROM events")
  47. rows = cur.fetchall()
  48. max_stream_id = rows[0][0]
  49. if min_stream_id is not None and max_stream_id is not None:
  50. progress = {
  51. "target_min_stream_id_inclusive": min_stream_id,
  52. "max_stream_id_exclusive": max_stream_id + 1,
  53. "rows_inserted": 0,
  54. }
  55. progress_json = simplejson.dumps(progress)
  56. sql = (
  57. "INSERT into background_updates (update_name, progress_json)"
  58. " VALUES (?, ?)"
  59. )
  60. sql = database_engine.convert_param_style(sql)
  61. cur.execute(sql, ("event_search", progress_json))
  62. def run_upgrade(*args, **kwargs):
  63. pass