search.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import re
  17. from collections import namedtuple
  18. from twisted.internet import defer
  19. from synapse.api.errors import SynapseError
  20. from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
  21. from synapse.storage.database import DatabasePool
  22. from synapse.storage.databases.main.events_worker import EventRedactBehaviour
  23. from synapse.storage.engines import PostgresEngine, Sqlite3Engine
  24. logger = logging.getLogger(__name__)
  25. SearchEntry = namedtuple(
  26. "SearchEntry",
  27. ["key", "value", "event_id", "room_id", "stream_ordering", "origin_server_ts"],
  28. )
  29. class SearchWorkerStore(SQLBaseStore):
  30. def store_search_entries_txn(self, txn, entries):
  31. """Add entries to the search table
  32. Args:
  33. txn (cursor):
  34. entries (iterable[SearchEntry]):
  35. entries to be added to the table
  36. """
  37. if not self.hs.config.enable_search:
  38. return
  39. if isinstance(self.database_engine, PostgresEngine):
  40. sql = (
  41. "INSERT INTO event_search"
  42. " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
  43. " VALUES (?,?,?,to_tsvector('english', ?),?,?)"
  44. )
  45. args = (
  46. (
  47. entry.event_id,
  48. entry.room_id,
  49. entry.key,
  50. entry.value,
  51. entry.stream_ordering,
  52. entry.origin_server_ts,
  53. )
  54. for entry in entries
  55. )
  56. txn.executemany(sql, args)
  57. elif isinstance(self.database_engine, Sqlite3Engine):
  58. sql = (
  59. "INSERT INTO event_search (event_id, room_id, key, value)"
  60. " VALUES (?,?,?,?)"
  61. )
  62. args = (
  63. (entry.event_id, entry.room_id, entry.key, entry.value)
  64. for entry in entries
  65. )
  66. txn.executemany(sql, args)
  67. else:
  68. # This should be unreachable.
  69. raise Exception("Unrecognized database engine")
  70. class SearchBackgroundUpdateStore(SearchWorkerStore):
  71. EVENT_SEARCH_UPDATE_NAME = "event_search"
  72. EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
  73. EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist"
  74. EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin"
  75. def __init__(self, database: DatabasePool, db_conn, hs):
  76. super(SearchBackgroundUpdateStore, self).__init__(database, db_conn, hs)
  77. if not hs.config.enable_search:
  78. return
  79. self.db_pool.updates.register_background_update_handler(
  80. self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search
  81. )
  82. self.db_pool.updates.register_background_update_handler(
  83. self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_reindex_search_order
  84. )
  85. # we used to have a background update to turn the GIN index into a
  86. # GIST one; we no longer do that (obviously) because we actually want
  87. # a GIN index. However, it's possible that some people might still have
  88. # the background update queued, so we register a handler to clear the
  89. # background update.
  90. self.db_pool.updates.register_noop_background_update(
  91. self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME
  92. )
  93. self.db_pool.updates.register_background_update_handler(
  94. self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search
  95. )
  96. @defer.inlineCallbacks
  97. def _background_reindex_search(self, progress, batch_size):
  98. # we work through the events table from highest stream id to lowest
  99. target_min_stream_id = progress["target_min_stream_id_inclusive"]
  100. max_stream_id = progress["max_stream_id_exclusive"]
  101. rows_inserted = progress.get("rows_inserted", 0)
  102. TYPES = ["m.room.name", "m.room.message", "m.room.topic"]
  103. def reindex_search_txn(txn):
  104. sql = (
  105. "SELECT stream_ordering, event_id, room_id, type, json, "
  106. " origin_server_ts FROM events"
  107. " JOIN event_json USING (room_id, event_id)"
  108. " WHERE ? <= stream_ordering AND stream_ordering < ?"
  109. " AND (%s)"
  110. " ORDER BY stream_ordering DESC"
  111. " LIMIT ?"
  112. ) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),)
  113. txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
  114. # we could stream straight from the results into
  115. # store_search_entries_txn with a generator function, but that
  116. # would mean having two cursors open on the database at once.
  117. # Instead we just build a list of results.
  118. rows = self.db_pool.cursor_to_dict(txn)
  119. if not rows:
  120. return 0
  121. min_stream_id = rows[-1]["stream_ordering"]
  122. event_search_rows = []
  123. for row in rows:
  124. try:
  125. event_id = row["event_id"]
  126. room_id = row["room_id"]
  127. etype = row["type"]
  128. stream_ordering = row["stream_ordering"]
  129. origin_server_ts = row["origin_server_ts"]
  130. try:
  131. event_json = db_to_json(row["json"])
  132. content = event_json["content"]
  133. except Exception:
  134. continue
  135. if etype == "m.room.message":
  136. key = "content.body"
  137. value = content["body"]
  138. elif etype == "m.room.topic":
  139. key = "content.topic"
  140. value = content["topic"]
  141. elif etype == "m.room.name":
  142. key = "content.name"
  143. value = content["name"]
  144. else:
  145. raise Exception("unexpected event type %s" % etype)
  146. except (KeyError, AttributeError):
  147. # If the event is missing a necessary field then
  148. # skip over it.
  149. continue
  150. if not isinstance(value, str):
  151. # If the event body, name or topic isn't a string
  152. # then skip over it
  153. continue
  154. event_search_rows.append(
  155. SearchEntry(
  156. key=key,
  157. value=value,
  158. event_id=event_id,
  159. room_id=room_id,
  160. stream_ordering=stream_ordering,
  161. origin_server_ts=origin_server_ts,
  162. )
  163. )
  164. self.store_search_entries_txn(txn, event_search_rows)
  165. progress = {
  166. "target_min_stream_id_inclusive": target_min_stream_id,
  167. "max_stream_id_exclusive": min_stream_id,
  168. "rows_inserted": rows_inserted + len(event_search_rows),
  169. }
  170. self.db_pool.updates._background_update_progress_txn(
  171. txn, self.EVENT_SEARCH_UPDATE_NAME, progress
  172. )
  173. return len(event_search_rows)
  174. result = yield self.db_pool.runInteraction(
  175. self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
  176. )
  177. if not result:
  178. yield self.db_pool.updates._end_background_update(
  179. self.EVENT_SEARCH_UPDATE_NAME
  180. )
  181. return result
  182. @defer.inlineCallbacks
  183. def _background_reindex_gin_search(self, progress, batch_size):
  184. """This handles old synapses which used GIST indexes, if any;
  185. converting them back to be GIN as per the actual schema.
  186. """
  187. def create_index(conn):
  188. conn.rollback()
  189. # we have to set autocommit, because postgres refuses to
  190. # CREATE INDEX CONCURRENTLY without it.
  191. conn.set_session(autocommit=True)
  192. try:
  193. c = conn.cursor()
  194. # if we skipped the conversion to GIST, we may already/still
  195. # have an event_search_fts_idx; unfortunately postgres 9.4
  196. # doesn't support CREATE INDEX IF EXISTS so we just catch the
  197. # exception and ignore it.
  198. import psycopg2
  199. try:
  200. c.execute(
  201. "CREATE INDEX CONCURRENTLY event_search_fts_idx"
  202. " ON event_search USING GIN (vector)"
  203. )
  204. except psycopg2.ProgrammingError as e:
  205. logger.warning(
  206. "Ignoring error %r when trying to switch from GIST to GIN", e
  207. )
  208. # we should now be able to delete the GIST index.
  209. c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist")
  210. finally:
  211. conn.set_session(autocommit=False)
  212. if isinstance(self.database_engine, PostgresEngine):
  213. yield self.db_pool.runWithConnection(create_index)
  214. yield self.db_pool.updates._end_background_update(
  215. self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME
  216. )
  217. return 1
  218. @defer.inlineCallbacks
  219. def _background_reindex_search_order(self, progress, batch_size):
  220. target_min_stream_id = progress["target_min_stream_id_inclusive"]
  221. max_stream_id = progress["max_stream_id_exclusive"]
  222. rows_inserted = progress.get("rows_inserted", 0)
  223. have_added_index = progress["have_added_indexes"]
  224. if not have_added_index:
  225. def create_index(conn):
  226. conn.rollback()
  227. conn.set_session(autocommit=True)
  228. c = conn.cursor()
  229. # We create with NULLS FIRST so that when we search *backwards*
  230. # we get the ones with non null origin_server_ts *first*
  231. c.execute(
  232. "CREATE INDEX CONCURRENTLY event_search_room_order ON event_search("
  233. "room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)"
  234. )
  235. c.execute(
  236. "CREATE INDEX CONCURRENTLY event_search_order ON event_search("
  237. "origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)"
  238. )
  239. conn.set_session(autocommit=False)
  240. yield self.db_pool.runWithConnection(create_index)
  241. pg = dict(progress)
  242. pg["have_added_indexes"] = True
  243. yield self.db_pool.runInteraction(
  244. self.EVENT_SEARCH_ORDER_UPDATE_NAME,
  245. self.db_pool.updates._background_update_progress_txn,
  246. self.EVENT_SEARCH_ORDER_UPDATE_NAME,
  247. pg,
  248. )
  249. def reindex_search_txn(txn):
  250. sql = (
  251. "UPDATE event_search AS es SET stream_ordering = e.stream_ordering,"
  252. " origin_server_ts = e.origin_server_ts"
  253. " FROM events AS e"
  254. " WHERE e.event_id = es.event_id"
  255. " AND ? <= e.stream_ordering AND e.stream_ordering < ?"
  256. " RETURNING es.stream_ordering"
  257. )
  258. min_stream_id = max_stream_id - batch_size
  259. txn.execute(sql, (min_stream_id, max_stream_id))
  260. rows = txn.fetchall()
  261. if min_stream_id < target_min_stream_id:
  262. # We've recached the end.
  263. return len(rows), False
  264. progress = {
  265. "target_min_stream_id_inclusive": target_min_stream_id,
  266. "max_stream_id_exclusive": min_stream_id,
  267. "rows_inserted": rows_inserted + len(rows),
  268. "have_added_indexes": True,
  269. }
  270. self.db_pool.updates._background_update_progress_txn(
  271. txn, self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress
  272. )
  273. return len(rows), True
  274. num_rows, finished = yield self.db_pool.runInteraction(
  275. self.EVENT_SEARCH_ORDER_UPDATE_NAME, reindex_search_txn
  276. )
  277. if not finished:
  278. yield self.db_pool.updates._end_background_update(
  279. self.EVENT_SEARCH_ORDER_UPDATE_NAME
  280. )
  281. return num_rows
  282. class SearchStore(SearchBackgroundUpdateStore):
  283. def __init__(self, database: DatabasePool, db_conn, hs):
  284. super(SearchStore, self).__init__(database, db_conn, hs)
  285. @defer.inlineCallbacks
  286. def search_msgs(self, room_ids, search_term, keys):
  287. """Performs a full text search over events with given keys.
  288. Args:
  289. room_ids (list): List of room ids to search in
  290. search_term (str): Search term to search for
  291. keys (list): List of keys to search in, currently supports
  292. "content.body", "content.name", "content.topic"
  293. Returns:
  294. list of dicts
  295. """
  296. clauses = []
  297. search_query = _parse_query(self.database_engine, search_term)
  298. args = []
  299. # Make sure we don't explode because the person is in too many rooms.
  300. # We filter the results below regardless.
  301. if len(room_ids) < 500:
  302. clause, args = make_in_list_sql_clause(
  303. self.database_engine, "room_id", room_ids
  304. )
  305. clauses = [clause]
  306. local_clauses = []
  307. for key in keys:
  308. local_clauses.append("key = ?")
  309. args.append(key)
  310. clauses.append("(%s)" % (" OR ".join(local_clauses),))
  311. count_args = args
  312. count_clauses = clauses
  313. if isinstance(self.database_engine, PostgresEngine):
  314. sql = (
  315. "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) AS rank,"
  316. " room_id, event_id"
  317. " FROM event_search"
  318. " WHERE vector @@ to_tsquery('english', ?)"
  319. )
  320. args = [search_query, search_query] + args
  321. count_sql = (
  322. "SELECT room_id, count(*) as count FROM event_search"
  323. " WHERE vector @@ to_tsquery('english', ?)"
  324. )
  325. count_args = [search_query] + count_args
  326. elif isinstance(self.database_engine, Sqlite3Engine):
  327. sql = (
  328. "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id"
  329. " FROM event_search"
  330. " WHERE value MATCH ?"
  331. )
  332. args = [search_query] + args
  333. count_sql = (
  334. "SELECT room_id, count(*) as count FROM event_search"
  335. " WHERE value MATCH ?"
  336. )
  337. count_args = [search_term] + count_args
  338. else:
  339. # This should be unreachable.
  340. raise Exception("Unrecognized database engine")
  341. for clause in clauses:
  342. sql += " AND " + clause
  343. for clause in count_clauses:
  344. count_sql += " AND " + clause
  345. # We add an arbitrary limit here to ensure we don't try to pull the
  346. # entire table from the database.
  347. sql += " ORDER BY rank DESC LIMIT 500"
  348. results = yield self.db_pool.execute(
  349. "search_msgs", self.db_pool.cursor_to_dict, sql, *args
  350. )
  351. results = list(filter(lambda row: row["room_id"] in room_ids, results))
  352. # We set redact_behaviour to BLOCK here to prevent redacted events being returned in
  353. # search results (which is a data leak)
  354. events = yield self.get_events_as_list(
  355. [r["event_id"] for r in results],
  356. redact_behaviour=EventRedactBehaviour.BLOCK,
  357. )
  358. event_map = {ev.event_id: ev for ev in events}
  359. highlights = None
  360. if isinstance(self.database_engine, PostgresEngine):
  361. highlights = yield self._find_highlights_in_postgres(search_query, events)
  362. count_sql += " GROUP BY room_id"
  363. count_results = yield self.db_pool.execute(
  364. "search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args
  365. )
  366. count = sum(row["count"] for row in count_results if row["room_id"] in room_ids)
  367. return {
  368. "results": [
  369. {"event": event_map[r["event_id"]], "rank": r["rank"]}
  370. for r in results
  371. if r["event_id"] in event_map
  372. ],
  373. "highlights": highlights,
  374. "count": count,
  375. }
  376. @defer.inlineCallbacks
  377. def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None):
  378. """Performs a full text search over events with given keys.
  379. Args:
  380. room_id (list): The room_ids to search in
  381. search_term (str): Search term to search for
  382. keys (list): List of keys to search in, currently supports
  383. "content.body", "content.name", "content.topic"
  384. pagination_token (str): A pagination token previously returned
  385. Returns:
  386. list of dicts
  387. """
  388. clauses = []
  389. search_query = _parse_query(self.database_engine, search_term)
  390. args = []
  391. # Make sure we don't explode because the person is in too many rooms.
  392. # We filter the results below regardless.
  393. if len(room_ids) < 500:
  394. clause, args = make_in_list_sql_clause(
  395. self.database_engine, "room_id", room_ids
  396. )
  397. clauses = [clause]
  398. local_clauses = []
  399. for key in keys:
  400. local_clauses.append("key = ?")
  401. args.append(key)
  402. clauses.append("(%s)" % (" OR ".join(local_clauses),))
  403. # take copies of the current args and clauses lists, before adding
  404. # pagination clauses to main query.
  405. count_args = list(args)
  406. count_clauses = list(clauses)
  407. if pagination_token:
  408. try:
  409. origin_server_ts, stream = pagination_token.split(",")
  410. origin_server_ts = int(origin_server_ts)
  411. stream = int(stream)
  412. except Exception:
  413. raise SynapseError(400, "Invalid pagination token")
  414. clauses.append(
  415. "(origin_server_ts < ?"
  416. " OR (origin_server_ts = ? AND stream_ordering < ?))"
  417. )
  418. args.extend([origin_server_ts, origin_server_ts, stream])
  419. if isinstance(self.database_engine, PostgresEngine):
  420. sql = (
  421. "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) as rank,"
  422. " origin_server_ts, stream_ordering, room_id, event_id"
  423. " FROM event_search"
  424. " WHERE vector @@ to_tsquery('english', ?) AND "
  425. )
  426. args = [search_query, search_query] + args
  427. count_sql = (
  428. "SELECT room_id, count(*) as count FROM event_search"
  429. " WHERE vector @@ to_tsquery('english', ?) AND "
  430. )
  431. count_args = [search_query] + count_args
  432. elif isinstance(self.database_engine, Sqlite3Engine):
  433. # We use CROSS JOIN here to ensure we use the right indexes.
  434. # https://sqlite.org/optoverview.html#crossjoin
  435. #
  436. # We want to use the full text search index on event_search to
  437. # extract all possible matches first, then lookup those matches
  438. # in the events table to get the topological ordering. We need
  439. # to use the indexes in this order because sqlite refuses to
  440. # MATCH unless it uses the full text search index
  441. sql = (
  442. "SELECT rank(matchinfo) as rank, room_id, event_id,"
  443. " origin_server_ts, stream_ordering"
  444. " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo"
  445. " FROM event_search"
  446. " WHERE value MATCH ?"
  447. " )"
  448. " CROSS JOIN events USING (event_id)"
  449. " WHERE "
  450. )
  451. args = [search_query] + args
  452. count_sql = (
  453. "SELECT room_id, count(*) as count FROM event_search"
  454. " WHERE value MATCH ? AND "
  455. )
  456. count_args = [search_term] + count_args
  457. else:
  458. # This should be unreachable.
  459. raise Exception("Unrecognized database engine")
  460. sql += " AND ".join(clauses)
  461. count_sql += " AND ".join(count_clauses)
  462. # We add an arbitrary limit here to ensure we don't try to pull the
  463. # entire table from the database.
  464. if isinstance(self.database_engine, PostgresEngine):
  465. sql += (
  466. " ORDER BY origin_server_ts DESC NULLS LAST,"
  467. " stream_ordering DESC NULLS LAST LIMIT ?"
  468. )
  469. elif isinstance(self.database_engine, Sqlite3Engine):
  470. sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?"
  471. else:
  472. raise Exception("Unrecognized database engine")
  473. args.append(limit)
  474. results = yield self.db_pool.execute(
  475. "search_rooms", self.db_pool.cursor_to_dict, sql, *args
  476. )
  477. results = list(filter(lambda row: row["room_id"] in room_ids, results))
  478. # We set redact_behaviour to BLOCK here to prevent redacted events being returned in
  479. # search results (which is a data leak)
  480. events = yield self.get_events_as_list(
  481. [r["event_id"] for r in results],
  482. redact_behaviour=EventRedactBehaviour.BLOCK,
  483. )
  484. event_map = {ev.event_id: ev for ev in events}
  485. highlights = None
  486. if isinstance(self.database_engine, PostgresEngine):
  487. highlights = yield self._find_highlights_in_postgres(search_query, events)
  488. count_sql += " GROUP BY room_id"
  489. count_results = yield self.db_pool.execute(
  490. "search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args
  491. )
  492. count = sum(row["count"] for row in count_results if row["room_id"] in room_ids)
  493. return {
  494. "results": [
  495. {
  496. "event": event_map[r["event_id"]],
  497. "rank": r["rank"],
  498. "pagination_token": "%s,%s"
  499. % (r["origin_server_ts"], r["stream_ordering"]),
  500. }
  501. for r in results
  502. if r["event_id"] in event_map
  503. ],
  504. "highlights": highlights,
  505. "count": count,
  506. }
  507. def _find_highlights_in_postgres(self, search_query, events):
  508. """Given a list of events and a search term, return a list of words
  509. that match from the content of the event.
  510. This is used to give a list of words that clients can match against to
  511. highlight the matching parts.
  512. Args:
  513. search_query (str)
  514. events (list): A list of events
  515. Returns:
  516. deferred : A set of strings.
  517. """
  518. def f(txn):
  519. highlight_words = set()
  520. for event in events:
  521. # As a hack we simply join values of all possible keys. This is
  522. # fine since we're only using them to find possible highlights.
  523. values = []
  524. for key in ("body", "name", "topic"):
  525. v = event.content.get(key, None)
  526. if v:
  527. values.append(v)
  528. if not values:
  529. continue
  530. value = " ".join(values)
  531. # We need to find some values for StartSel and StopSel that
  532. # aren't in the value so that we can pick results out.
  533. start_sel = "<"
  534. stop_sel = ">"
  535. while start_sel in value:
  536. start_sel += "<"
  537. while stop_sel in value:
  538. stop_sel += ">"
  539. query = "SELECT ts_headline(?, to_tsquery('english', ?), %s)" % (
  540. _to_postgres_options(
  541. {
  542. "StartSel": start_sel,
  543. "StopSel": stop_sel,
  544. "MaxFragments": "50",
  545. }
  546. )
  547. )
  548. txn.execute(query, (value, search_query))
  549. (headline,) = txn.fetchall()[0]
  550. # Now we need to pick the possible highlights out of the haedline
  551. # result.
  552. matcher_regex = "%s(.*?)%s" % (
  553. re.escape(start_sel),
  554. re.escape(stop_sel),
  555. )
  556. res = re.findall(matcher_regex, headline)
  557. highlight_words.update([r.lower() for r in res])
  558. return highlight_words
  559. return self.db_pool.runInteraction("_find_highlights", f)
  560. def _to_postgres_options(options_dict):
  561. return "'%s'" % (",".join("%s=%s" % (k, v) for k, v in options_dict.items()),)
  562. def _parse_query(database_engine, search_term):
  563. """Takes a plain unicode string from the user and converts it into a form
  564. that can be passed to database.
  565. We use this so that we can add prefix matching, which isn't something
  566. that is supported by default.
  567. """
  568. # Pull out the individual words, discarding any non-word characters.
  569. results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
  570. if isinstance(database_engine, PostgresEngine):
  571. return " & ".join(result + ":*" for result in results)
  572. elif isinstance(database_engine, Sqlite3Engine):
  573. return " & ".join(result + "*" for result in results)
  574. else:
  575. # This should be unreachable.
  576. raise Exception("Unrecognized database engine")