search.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import re
  17. from collections import namedtuple
  18. from six import string_types
  19. from canonicaljson import json
  20. from twisted.internet import defer
  21. from synapse.api.errors import SynapseError
  22. from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
  23. from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
  24. from synapse.storage.database import Database
  25. from synapse.storage.engines import PostgresEngine, Sqlite3Engine
  26. logger = logging.getLogger(__name__)
  27. SearchEntry = namedtuple(
  28. "SearchEntry",
  29. ["key", "value", "event_id", "room_id", "stream_ordering", "origin_server_ts"],
  30. )
  31. class SearchBackgroundUpdateStore(SQLBaseStore):
  32. EVENT_SEARCH_UPDATE_NAME = "event_search"
  33. EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
  34. EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist"
  35. EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin"
  36. def __init__(self, database: Database, db_conn, hs):
  37. super(SearchBackgroundUpdateStore, self).__init__(database, db_conn, hs)
  38. if not hs.config.enable_search:
  39. return
  40. self.db.updates.register_background_update_handler(
  41. self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search
  42. )
  43. self.db.updates.register_background_update_handler(
  44. self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_reindex_search_order
  45. )
  46. # we used to have a background update to turn the GIN index into a
  47. # GIST one; we no longer do that (obviously) because we actually want
  48. # a GIN index. However, it's possible that some people might still have
  49. # the background update queued, so we register a handler to clear the
  50. # background update.
  51. self.db.updates.register_noop_background_update(
  52. self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME
  53. )
  54. self.db.updates.register_background_update_handler(
  55. self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search
  56. )
  57. @defer.inlineCallbacks
  58. def _background_reindex_search(self, progress, batch_size):
  59. # we work through the events table from highest stream id to lowest
  60. target_min_stream_id = progress["target_min_stream_id_inclusive"]
  61. max_stream_id = progress["max_stream_id_exclusive"]
  62. rows_inserted = progress.get("rows_inserted", 0)
  63. TYPES = ["m.room.name", "m.room.message", "m.room.topic"]
  64. def reindex_search_txn(txn):
  65. sql = (
  66. "SELECT stream_ordering, event_id, room_id, type, json, "
  67. " origin_server_ts FROM events"
  68. " JOIN event_json USING (room_id, event_id)"
  69. " WHERE ? <= stream_ordering AND stream_ordering < ?"
  70. " AND (%s)"
  71. " ORDER BY stream_ordering DESC"
  72. " LIMIT ?"
  73. ) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),)
  74. txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
  75. # we could stream straight from the results into
  76. # store_search_entries_txn with a generator function, but that
  77. # would mean having two cursors open on the database at once.
  78. # Instead we just build a list of results.
  79. rows = self.db.cursor_to_dict(txn)
  80. if not rows:
  81. return 0
  82. min_stream_id = rows[-1]["stream_ordering"]
  83. event_search_rows = []
  84. for row in rows:
  85. try:
  86. event_id = row["event_id"]
  87. room_id = row["room_id"]
  88. etype = row["type"]
  89. stream_ordering = row["stream_ordering"]
  90. origin_server_ts = row["origin_server_ts"]
  91. try:
  92. event_json = json.loads(row["json"])
  93. content = event_json["content"]
  94. except Exception:
  95. continue
  96. if etype == "m.room.message":
  97. key = "content.body"
  98. value = content["body"]
  99. elif etype == "m.room.topic":
  100. key = "content.topic"
  101. value = content["topic"]
  102. elif etype == "m.room.name":
  103. key = "content.name"
  104. value = content["name"]
  105. else:
  106. raise Exception("unexpected event type %s" % etype)
  107. except (KeyError, AttributeError):
  108. # If the event is missing a necessary field then
  109. # skip over it.
  110. continue
  111. if not isinstance(value, string_types):
  112. # If the event body, name or topic isn't a string
  113. # then skip over it
  114. continue
  115. event_search_rows.append(
  116. SearchEntry(
  117. key=key,
  118. value=value,
  119. event_id=event_id,
  120. room_id=room_id,
  121. stream_ordering=stream_ordering,
  122. origin_server_ts=origin_server_ts,
  123. )
  124. )
  125. self.store_search_entries_txn(txn, event_search_rows)
  126. progress = {
  127. "target_min_stream_id_inclusive": target_min_stream_id,
  128. "max_stream_id_exclusive": min_stream_id,
  129. "rows_inserted": rows_inserted + len(event_search_rows),
  130. }
  131. self.db.updates._background_update_progress_txn(
  132. txn, self.EVENT_SEARCH_UPDATE_NAME, progress
  133. )
  134. return len(event_search_rows)
  135. result = yield self.db.runInteraction(
  136. self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
  137. )
  138. if not result:
  139. yield self.db.updates._end_background_update(self.EVENT_SEARCH_UPDATE_NAME)
  140. return result
  141. @defer.inlineCallbacks
  142. def _background_reindex_gin_search(self, progress, batch_size):
  143. """This handles old synapses which used GIST indexes, if any;
  144. converting them back to be GIN as per the actual schema.
  145. """
  146. def create_index(conn):
  147. conn.rollback()
  148. # we have to set autocommit, because postgres refuses to
  149. # CREATE INDEX CONCURRENTLY without it.
  150. conn.set_session(autocommit=True)
  151. try:
  152. c = conn.cursor()
  153. # if we skipped the conversion to GIST, we may already/still
  154. # have an event_search_fts_idx; unfortunately postgres 9.4
  155. # doesn't support CREATE INDEX IF EXISTS so we just catch the
  156. # exception and ignore it.
  157. import psycopg2
  158. try:
  159. c.execute(
  160. "CREATE INDEX CONCURRENTLY event_search_fts_idx"
  161. " ON event_search USING GIN (vector)"
  162. )
  163. except psycopg2.ProgrammingError as e:
  164. logger.warning(
  165. "Ignoring error %r when trying to switch from GIST to GIN", e
  166. )
  167. # we should now be able to delete the GIST index.
  168. c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist")
  169. finally:
  170. conn.set_session(autocommit=False)
  171. if isinstance(self.database_engine, PostgresEngine):
  172. yield self.db.runWithConnection(create_index)
  173. yield self.db.updates._end_background_update(
  174. self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME
  175. )
  176. return 1
  177. @defer.inlineCallbacks
  178. def _background_reindex_search_order(self, progress, batch_size):
  179. target_min_stream_id = progress["target_min_stream_id_inclusive"]
  180. max_stream_id = progress["max_stream_id_exclusive"]
  181. rows_inserted = progress.get("rows_inserted", 0)
  182. have_added_index = progress["have_added_indexes"]
  183. if not have_added_index:
  184. def create_index(conn):
  185. conn.rollback()
  186. conn.set_session(autocommit=True)
  187. c = conn.cursor()
  188. # We create with NULLS FIRST so that when we search *backwards*
  189. # we get the ones with non null origin_server_ts *first*
  190. c.execute(
  191. "CREATE INDEX CONCURRENTLY event_search_room_order ON event_search("
  192. "room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)"
  193. )
  194. c.execute(
  195. "CREATE INDEX CONCURRENTLY event_search_order ON event_search("
  196. "origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)"
  197. )
  198. conn.set_session(autocommit=False)
  199. yield self.db.runWithConnection(create_index)
  200. pg = dict(progress)
  201. pg["have_added_indexes"] = True
  202. yield self.db.runInteraction(
  203. self.EVENT_SEARCH_ORDER_UPDATE_NAME,
  204. self.db.updates._background_update_progress_txn,
  205. self.EVENT_SEARCH_ORDER_UPDATE_NAME,
  206. pg,
  207. )
  208. def reindex_search_txn(txn):
  209. sql = (
  210. "UPDATE event_search AS es SET stream_ordering = e.stream_ordering,"
  211. " origin_server_ts = e.origin_server_ts"
  212. " FROM events AS e"
  213. " WHERE e.event_id = es.event_id"
  214. " AND ? <= e.stream_ordering AND e.stream_ordering < ?"
  215. " RETURNING es.stream_ordering"
  216. )
  217. min_stream_id = max_stream_id - batch_size
  218. txn.execute(sql, (min_stream_id, max_stream_id))
  219. rows = txn.fetchall()
  220. if min_stream_id < target_min_stream_id:
  221. # We've recached the end.
  222. return len(rows), False
  223. progress = {
  224. "target_min_stream_id_inclusive": target_min_stream_id,
  225. "max_stream_id_exclusive": min_stream_id,
  226. "rows_inserted": rows_inserted + len(rows),
  227. "have_added_indexes": True,
  228. }
  229. self.db.updates._background_update_progress_txn(
  230. txn, self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress
  231. )
  232. return len(rows), True
  233. num_rows, finished = yield self.db.runInteraction(
  234. self.EVENT_SEARCH_ORDER_UPDATE_NAME, reindex_search_txn
  235. )
  236. if not finished:
  237. yield self.db.updates._end_background_update(
  238. self.EVENT_SEARCH_ORDER_UPDATE_NAME
  239. )
  240. return num_rows
  241. def store_search_entries_txn(self, txn, entries):
  242. """Add entries to the search table
  243. Args:
  244. txn (cursor):
  245. entries (iterable[SearchEntry]):
  246. entries to be added to the table
  247. """
  248. if not self.hs.config.enable_search:
  249. return
  250. if isinstance(self.database_engine, PostgresEngine):
  251. sql = (
  252. "INSERT INTO event_search"
  253. " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
  254. " VALUES (?,?,?,to_tsvector('english', ?),?,?)"
  255. )
  256. args = (
  257. (
  258. entry.event_id,
  259. entry.room_id,
  260. entry.key,
  261. entry.value,
  262. entry.stream_ordering,
  263. entry.origin_server_ts,
  264. )
  265. for entry in entries
  266. )
  267. txn.executemany(sql, args)
  268. elif isinstance(self.database_engine, Sqlite3Engine):
  269. sql = (
  270. "INSERT INTO event_search (event_id, room_id, key, value)"
  271. " VALUES (?,?,?,?)"
  272. )
  273. args = (
  274. (entry.event_id, entry.room_id, entry.key, entry.value)
  275. for entry in entries
  276. )
  277. txn.executemany(sql, args)
  278. else:
  279. # This should be unreachable.
  280. raise Exception("Unrecognized database engine")
  281. class SearchStore(SearchBackgroundUpdateStore):
  282. def __init__(self, database: Database, db_conn, hs):
  283. super(SearchStore, self).__init__(database, db_conn, hs)
  284. def store_event_search_txn(self, txn, event, key, value):
  285. """Add event to the search table
  286. Args:
  287. txn (cursor):
  288. event (EventBase):
  289. key (str):
  290. value (str):
  291. """
  292. self.store_search_entries_txn(
  293. txn,
  294. (
  295. SearchEntry(
  296. key=key,
  297. value=value,
  298. event_id=event.event_id,
  299. room_id=event.room_id,
  300. stream_ordering=event.internal_metadata.stream_ordering,
  301. origin_server_ts=event.origin_server_ts,
  302. ),
  303. ),
  304. )
  305. @defer.inlineCallbacks
  306. def search_msgs(self, room_ids, search_term, keys):
  307. """Performs a full text search over events with given keys.
  308. Args:
  309. room_ids (list): List of room ids to search in
  310. search_term (str): Search term to search for
  311. keys (list): List of keys to search in, currently supports
  312. "content.body", "content.name", "content.topic"
  313. Returns:
  314. list of dicts
  315. """
  316. clauses = []
  317. search_query = _parse_query(self.database_engine, search_term)
  318. args = []
  319. # Make sure we don't explode because the person is in too many rooms.
  320. # We filter the results below regardless.
  321. if len(room_ids) < 500:
  322. clause, args = make_in_list_sql_clause(
  323. self.database_engine, "room_id", room_ids
  324. )
  325. clauses = [clause]
  326. local_clauses = []
  327. for key in keys:
  328. local_clauses.append("key = ?")
  329. args.append(key)
  330. clauses.append("(%s)" % (" OR ".join(local_clauses),))
  331. count_args = args
  332. count_clauses = clauses
  333. if isinstance(self.database_engine, PostgresEngine):
  334. sql = (
  335. "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) AS rank,"
  336. " room_id, event_id"
  337. " FROM event_search"
  338. " WHERE vector @@ to_tsquery('english', ?)"
  339. )
  340. args = [search_query, search_query] + args
  341. count_sql = (
  342. "SELECT room_id, count(*) as count FROM event_search"
  343. " WHERE vector @@ to_tsquery('english', ?)"
  344. )
  345. count_args = [search_query] + count_args
  346. elif isinstance(self.database_engine, Sqlite3Engine):
  347. sql = (
  348. "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id"
  349. " FROM event_search"
  350. " WHERE value MATCH ?"
  351. )
  352. args = [search_query] + args
  353. count_sql = (
  354. "SELECT room_id, count(*) as count FROM event_search"
  355. " WHERE value MATCH ?"
  356. )
  357. count_args = [search_term] + count_args
  358. else:
  359. # This should be unreachable.
  360. raise Exception("Unrecognized database engine")
  361. for clause in clauses:
  362. sql += " AND " + clause
  363. for clause in count_clauses:
  364. count_sql += " AND " + clause
  365. # We add an arbitrary limit here to ensure we don't try to pull the
  366. # entire table from the database.
  367. sql += " ORDER BY rank DESC LIMIT 500"
  368. results = yield self.db.execute(
  369. "search_msgs", self.db.cursor_to_dict, sql, *args
  370. )
  371. results = list(filter(lambda row: row["room_id"] in room_ids, results))
  372. # We set redact_behaviour to BLOCK here to prevent redacted events being returned in
  373. # search results (which is a data leak)
  374. events = yield self.get_events_as_list(
  375. [r["event_id"] for r in results],
  376. redact_behaviour=EventRedactBehaviour.BLOCK,
  377. )
  378. event_map = {ev.event_id: ev for ev in events}
  379. highlights = None
  380. if isinstance(self.database_engine, PostgresEngine):
  381. highlights = yield self._find_highlights_in_postgres(search_query, events)
  382. count_sql += " GROUP BY room_id"
  383. count_results = yield self.db.execute(
  384. "search_rooms_count", self.db.cursor_to_dict, count_sql, *count_args
  385. )
  386. count = sum(row["count"] for row in count_results if row["room_id"] in room_ids)
  387. return {
  388. "results": [
  389. {"event": event_map[r["event_id"]], "rank": r["rank"]}
  390. for r in results
  391. if r["event_id"] in event_map
  392. ],
  393. "highlights": highlights,
  394. "count": count,
  395. }
  396. @defer.inlineCallbacks
  397. def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None):
  398. """Performs a full text search over events with given keys.
  399. Args:
  400. room_id (list): The room_ids to search in
  401. search_term (str): Search term to search for
  402. keys (list): List of keys to search in, currently supports
  403. "content.body", "content.name", "content.topic"
  404. pagination_token (str): A pagination token previously returned
  405. Returns:
  406. list of dicts
  407. """
  408. clauses = []
  409. search_query = _parse_query(self.database_engine, search_term)
  410. args = []
  411. # Make sure we don't explode because the person is in too many rooms.
  412. # We filter the results below regardless.
  413. if len(room_ids) < 500:
  414. clause, args = make_in_list_sql_clause(
  415. self.database_engine, "room_id", room_ids
  416. )
  417. clauses = [clause]
  418. local_clauses = []
  419. for key in keys:
  420. local_clauses.append("key = ?")
  421. args.append(key)
  422. clauses.append("(%s)" % (" OR ".join(local_clauses),))
  423. # take copies of the current args and clauses lists, before adding
  424. # pagination clauses to main query.
  425. count_args = list(args)
  426. count_clauses = list(clauses)
  427. if pagination_token:
  428. try:
  429. origin_server_ts, stream = pagination_token.split(",")
  430. origin_server_ts = int(origin_server_ts)
  431. stream = int(stream)
  432. except Exception:
  433. raise SynapseError(400, "Invalid pagination token")
  434. clauses.append(
  435. "(origin_server_ts < ?"
  436. " OR (origin_server_ts = ? AND stream_ordering < ?))"
  437. )
  438. args.extend([origin_server_ts, origin_server_ts, stream])
  439. if isinstance(self.database_engine, PostgresEngine):
  440. sql = (
  441. "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) as rank,"
  442. " origin_server_ts, stream_ordering, room_id, event_id"
  443. " FROM event_search"
  444. " WHERE vector @@ to_tsquery('english', ?) AND "
  445. )
  446. args = [search_query, search_query] + args
  447. count_sql = (
  448. "SELECT room_id, count(*) as count FROM event_search"
  449. " WHERE vector @@ to_tsquery('english', ?) AND "
  450. )
  451. count_args = [search_query] + count_args
  452. elif isinstance(self.database_engine, Sqlite3Engine):
  453. # We use CROSS JOIN here to ensure we use the right indexes.
  454. # https://sqlite.org/optoverview.html#crossjoin
  455. #
  456. # We want to use the full text search index on event_search to
  457. # extract all possible matches first, then lookup those matches
  458. # in the events table to get the topological ordering. We need
  459. # to use the indexes in this order because sqlite refuses to
  460. # MATCH unless it uses the full text search index
  461. sql = (
  462. "SELECT rank(matchinfo) as rank, room_id, event_id,"
  463. " origin_server_ts, stream_ordering"
  464. " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo"
  465. " FROM event_search"
  466. " WHERE value MATCH ?"
  467. " )"
  468. " CROSS JOIN events USING (event_id)"
  469. " WHERE "
  470. )
  471. args = [search_query] + args
  472. count_sql = (
  473. "SELECT room_id, count(*) as count FROM event_search"
  474. " WHERE value MATCH ? AND "
  475. )
  476. count_args = [search_term] + count_args
  477. else:
  478. # This should be unreachable.
  479. raise Exception("Unrecognized database engine")
  480. sql += " AND ".join(clauses)
  481. count_sql += " AND ".join(count_clauses)
  482. # We add an arbitrary limit here to ensure we don't try to pull the
  483. # entire table from the database.
  484. if isinstance(self.database_engine, PostgresEngine):
  485. sql += (
  486. " ORDER BY origin_server_ts DESC NULLS LAST,"
  487. " stream_ordering DESC NULLS LAST LIMIT ?"
  488. )
  489. elif isinstance(self.database_engine, Sqlite3Engine):
  490. sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?"
  491. else:
  492. raise Exception("Unrecognized database engine")
  493. args.append(limit)
  494. results = yield self.db.execute(
  495. "search_rooms", self.db.cursor_to_dict, sql, *args
  496. )
  497. results = list(filter(lambda row: row["room_id"] in room_ids, results))
  498. # We set redact_behaviour to BLOCK here to prevent redacted events being returned in
  499. # search results (which is a data leak)
  500. events = yield self.get_events_as_list(
  501. [r["event_id"] for r in results],
  502. redact_behaviour=EventRedactBehaviour.BLOCK,
  503. )
  504. event_map = {ev.event_id: ev for ev in events}
  505. highlights = None
  506. if isinstance(self.database_engine, PostgresEngine):
  507. highlights = yield self._find_highlights_in_postgres(search_query, events)
  508. count_sql += " GROUP BY room_id"
  509. count_results = yield self.db.execute(
  510. "search_rooms_count", self.db.cursor_to_dict, count_sql, *count_args
  511. )
  512. count = sum(row["count"] for row in count_results if row["room_id"] in room_ids)
  513. return {
  514. "results": [
  515. {
  516. "event": event_map[r["event_id"]],
  517. "rank": r["rank"],
  518. "pagination_token": "%s,%s"
  519. % (r["origin_server_ts"], r["stream_ordering"]),
  520. }
  521. for r in results
  522. if r["event_id"] in event_map
  523. ],
  524. "highlights": highlights,
  525. "count": count,
  526. }
  527. def _find_highlights_in_postgres(self, search_query, events):
  528. """Given a list of events and a search term, return a list of words
  529. that match from the content of the event.
  530. This is used to give a list of words that clients can match against to
  531. highlight the matching parts.
  532. Args:
  533. search_query (str)
  534. events (list): A list of events
  535. Returns:
  536. deferred : A set of strings.
  537. """
  538. def f(txn):
  539. highlight_words = set()
  540. for event in events:
  541. # As a hack we simply join values of all possible keys. This is
  542. # fine since we're only using them to find possible highlights.
  543. values = []
  544. for key in ("body", "name", "topic"):
  545. v = event.content.get(key, None)
  546. if v:
  547. values.append(v)
  548. if not values:
  549. continue
  550. value = " ".join(values)
  551. # We need to find some values for StartSel and StopSel that
  552. # aren't in the value so that we can pick results out.
  553. start_sel = "<"
  554. stop_sel = ">"
  555. while start_sel in value:
  556. start_sel += "<"
  557. while stop_sel in value:
  558. stop_sel += ">"
  559. query = "SELECT ts_headline(?, to_tsquery('english', ?), %s)" % (
  560. _to_postgres_options(
  561. {
  562. "StartSel": start_sel,
  563. "StopSel": stop_sel,
  564. "MaxFragments": "50",
  565. }
  566. )
  567. )
  568. txn.execute(query, (value, search_query))
  569. (headline,) = txn.fetchall()[0]
  570. # Now we need to pick the possible highlights out of the haedline
  571. # result.
  572. matcher_regex = "%s(.*?)%s" % (
  573. re.escape(start_sel),
  574. re.escape(stop_sel),
  575. )
  576. res = re.findall(matcher_regex, headline)
  577. highlight_words.update([r.lower() for r in res])
  578. return highlight_words
  579. return self.db.runInteraction("_find_highlights", f)
  580. def _to_postgres_options(options_dict):
  581. return "'%s'" % (",".join("%s=%s" % (k, v) for k, v in options_dict.items()),)
  582. def _parse_query(database_engine, search_term):
  583. """Takes a plain unicode string from the user and converts it into a form
  584. that can be passed to database.
  585. We use this so that we can add prefix matching, which isn't something
  586. that is supported by default.
  587. """
  588. # Pull out the individual words, discarding any non-word characters.
  589. results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
  590. if isinstance(database_engine, PostgresEngine):
  591. return " & ".join(result + ":*" for result in results)
  592. elif isinstance(database_engine, Sqlite3Engine):
  593. return " & ".join(result + "*" for result in results)
  594. else:
  595. # This should be unreachable.
  596. raise Exception("Unrecognized database engine")