user_directory.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2017 Vector Creations Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import re
  17. from twisted.internet import defer
  18. from synapse.api.constants import EventTypes, JoinRules
  19. from synapse.storage.background_updates import BackgroundUpdateStore
  20. from synapse.storage.engines import PostgresEngine, Sqlite3Engine
  21. from synapse.storage.state import StateFilter
  22. from synapse.storage.state_deltas import StateDeltasStore
  23. from synapse.types import get_domain_from_id, get_localpart_from_id
  24. from synapse.util.caches.descriptors import cached
  25. logger = logging.getLogger(__name__)
  26. TEMP_TABLE = "_temp_populate_user_directory"
  27. class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
  28. # How many records do we calculate before sending it to
  29. # add_users_who_share_private_rooms?
  30. SHARE_PRIVATE_WORKING_SET = 500
  31. def __init__(self, db_conn, hs):
  32. super(UserDirectoryStore, self).__init__(db_conn, hs)
  33. self.server_name = hs.hostname
  34. self.register_background_update_handler(
  35. "populate_user_directory_createtables",
  36. self._populate_user_directory_createtables,
  37. )
  38. self.register_background_update_handler(
  39. "populate_user_directory_process_rooms",
  40. self._populate_user_directory_process_rooms,
  41. )
  42. self.register_background_update_handler(
  43. "populate_user_directory_process_users",
  44. self._populate_user_directory_process_users,
  45. )
  46. self.register_background_update_handler(
  47. "populate_user_directory_cleanup", self._populate_user_directory_cleanup
  48. )
  49. @defer.inlineCallbacks
  50. def _populate_user_directory_createtables(self, progress, batch_size):
  51. # Get all the rooms that we want to process.
  52. def _make_staging_area(txn):
  53. sql = (
  54. "CREATE TABLE IF NOT EXISTS "
  55. + TEMP_TABLE
  56. + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
  57. )
  58. txn.execute(sql)
  59. sql = (
  60. "CREATE TABLE IF NOT EXISTS "
  61. + TEMP_TABLE
  62. + "_position(position TEXT NOT NULL)"
  63. )
  64. txn.execute(sql)
  65. # Get rooms we want to process from the database
  66. sql = """
  67. SELECT room_id, count(*) FROM current_state_events
  68. GROUP BY room_id
  69. """
  70. txn.execute(sql)
  71. rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
  72. self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
  73. del rooms
  74. # If search all users is on, get all the users we want to add.
  75. if self.hs.config.user_directory_search_all_users:
  76. sql = (
  77. "CREATE TABLE IF NOT EXISTS "
  78. + TEMP_TABLE
  79. + "_users(user_id TEXT NOT NULL)"
  80. )
  81. txn.execute(sql)
  82. txn.execute("SELECT name FROM users")
  83. users = [{"user_id": x[0]} for x in txn.fetchall()]
  84. self._simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
  85. new_pos = yield self.get_max_stream_id_in_current_state_deltas()
  86. yield self.runInteraction(
  87. "populate_user_directory_temp_build", _make_staging_area
  88. )
  89. yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
  90. yield self._end_background_update("populate_user_directory_createtables")
  91. return 1
  92. @defer.inlineCallbacks
  93. def _populate_user_directory_cleanup(self, progress, batch_size):
  94. """
  95. Update the user directory stream position, then clean up the old tables.
  96. """
  97. position = yield self._simple_select_one_onecol(
  98. TEMP_TABLE + "_position", None, "position"
  99. )
  100. yield self.update_user_directory_stream_pos(position)
  101. def _delete_staging_area(txn):
  102. txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
  103. txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
  104. txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
  105. yield self.runInteraction(
  106. "populate_user_directory_cleanup", _delete_staging_area
  107. )
  108. yield self._end_background_update("populate_user_directory_cleanup")
  109. return 1
  110. @defer.inlineCallbacks
  111. def _populate_user_directory_process_rooms(self, progress, batch_size):
  112. """
  113. Args:
  114. progress (dict)
  115. batch_size (int): Maximum number of state events to process
  116. per cycle.
  117. """
  118. state = self.hs.get_state_handler()
  119. # If we don't have progress filed, delete everything.
  120. if not progress:
  121. yield self.delete_all_from_user_dir()
  122. def _get_next_batch(txn):
  123. # Only fetch 250 rooms, so we don't fetch too many at once, even
  124. # if those 250 rooms have less than batch_size state events.
  125. sql = """
  126. SELECT room_id, events FROM %s
  127. ORDER BY events DESC
  128. LIMIT 250
  129. """ % (
  130. TEMP_TABLE + "_rooms",
  131. )
  132. txn.execute(sql)
  133. rooms_to_work_on = txn.fetchall()
  134. if not rooms_to_work_on:
  135. return None
  136. # Get how many are left to process, so we can give status on how
  137. # far we are in processing
  138. txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
  139. progress["remaining"] = txn.fetchone()[0]
  140. return rooms_to_work_on
  141. rooms_to_work_on = yield self.runInteraction(
  142. "populate_user_directory_temp_read", _get_next_batch
  143. )
  144. # No more rooms -- complete the transaction.
  145. if not rooms_to_work_on:
  146. yield self._end_background_update("populate_user_directory_process_rooms")
  147. return 1
  148. logger.info(
  149. "Processing the next %d rooms of %d remaining"
  150. % (len(rooms_to_work_on), progress["remaining"])
  151. )
  152. processed_event_count = 0
  153. for room_id, event_count in rooms_to_work_on:
  154. is_in_room = yield self.is_host_joined(room_id, self.server_name)
  155. if is_in_room:
  156. is_public = yield self.is_room_world_readable_or_publicly_joinable(
  157. room_id
  158. )
  159. users_with_profile = yield state.get_current_users_in_room(room_id)
  160. user_ids = set(users_with_profile)
  161. # Update each user in the user directory.
  162. for user_id, profile in users_with_profile.items():
  163. yield self.update_profile_in_user_dir(
  164. user_id, profile.display_name, profile.avatar_url
  165. )
  166. to_insert = set()
  167. if is_public:
  168. for user_id in user_ids:
  169. if self.get_if_app_services_interested_in_user(user_id):
  170. continue
  171. to_insert.add(user_id)
  172. if to_insert:
  173. yield self.add_users_in_public_rooms(room_id, to_insert)
  174. to_insert.clear()
  175. else:
  176. for user_id in user_ids:
  177. if not self.hs.is_mine_id(user_id):
  178. continue
  179. if self.get_if_app_services_interested_in_user(user_id):
  180. continue
  181. for other_user_id in user_ids:
  182. if user_id == other_user_id:
  183. continue
  184. user_set = (user_id, other_user_id)
  185. to_insert.add(user_set)
  186. # If it gets too big, stop and write to the database
  187. # to prevent storing too much in RAM.
  188. if len(to_insert) >= self.SHARE_PRIVATE_WORKING_SET:
  189. yield self.add_users_who_share_private_room(
  190. room_id, to_insert
  191. )
  192. to_insert.clear()
  193. if to_insert:
  194. yield self.add_users_who_share_private_room(room_id, to_insert)
  195. to_insert.clear()
  196. # We've finished a room. Delete it from the table.
  197. yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
  198. # Update the remaining counter.
  199. progress["remaining"] -= 1
  200. yield self.runInteraction(
  201. "populate_user_directory",
  202. self._background_update_progress_txn,
  203. "populate_user_directory_process_rooms",
  204. progress,
  205. )
  206. processed_event_count += event_count
  207. if processed_event_count > batch_size:
  208. # Don't process any more rooms, we've hit our batch size.
  209. return processed_event_count
  210. return processed_event_count
  211. @defer.inlineCallbacks
  212. def _populate_user_directory_process_users(self, progress, batch_size):
  213. """
  214. If search_all_users is enabled, add all of the users to the user directory.
  215. """
  216. if not self.hs.config.user_directory_search_all_users:
  217. yield self._end_background_update("populate_user_directory_process_users")
  218. return 1
  219. def _get_next_batch(txn):
  220. sql = "SELECT user_id FROM %s LIMIT %s" % (
  221. TEMP_TABLE + "_users",
  222. str(batch_size),
  223. )
  224. txn.execute(sql)
  225. users_to_work_on = txn.fetchall()
  226. if not users_to_work_on:
  227. return None
  228. users_to_work_on = [x[0] for x in users_to_work_on]
  229. # Get how many are left to process, so we can give status on how
  230. # far we are in processing
  231. sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
  232. txn.execute(sql)
  233. progress["remaining"] = txn.fetchone()[0]
  234. return users_to_work_on
  235. users_to_work_on = yield self.runInteraction(
  236. "populate_user_directory_temp_read", _get_next_batch
  237. )
  238. # No more users -- complete the transaction.
  239. if not users_to_work_on:
  240. yield self._end_background_update("populate_user_directory_process_users")
  241. return 1
  242. logger.info(
  243. "Processing the next %d users of %d remaining"
  244. % (len(users_to_work_on), progress["remaining"])
  245. )
  246. for user_id in users_to_work_on:
  247. profile = yield self.get_profileinfo(get_localpart_from_id(user_id))
  248. yield self.update_profile_in_user_dir(
  249. user_id, profile.display_name, profile.avatar_url
  250. )
  251. # We've finished processing a user. Delete it from the table.
  252. yield self._simple_delete_one(TEMP_TABLE + "_users", {"user_id": user_id})
  253. # Update the remaining counter.
  254. progress["remaining"] -= 1
  255. yield self.runInteraction(
  256. "populate_user_directory",
  257. self._background_update_progress_txn,
  258. "populate_user_directory_process_users",
  259. progress,
  260. )
  261. return len(users_to_work_on)
  262. @defer.inlineCallbacks
  263. def is_room_world_readable_or_publicly_joinable(self, room_id):
  264. """Check if the room is either world_readable or publically joinable
  265. """
  266. # Create a state filter that only queries join and history state event
  267. types_to_filter = (
  268. (EventTypes.JoinRules, ""),
  269. (EventTypes.RoomHistoryVisibility, ""),
  270. )
  271. current_state_ids = yield self.get_filtered_current_state_ids(
  272. room_id, StateFilter.from_types(types_to_filter)
  273. )
  274. join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
  275. if join_rules_id:
  276. join_rule_ev = yield self.get_event(join_rules_id, allow_none=True)
  277. if join_rule_ev:
  278. if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC:
  279. return True
  280. hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, ""))
  281. if hist_vis_id:
  282. hist_vis_ev = yield self.get_event(hist_vis_id, allow_none=True)
  283. if hist_vis_ev:
  284. if hist_vis_ev.content.get("history_visibility") == "world_readable":
  285. return True
  286. return False
  287. def update_profile_in_user_dir(self, user_id, display_name, avatar_url):
  288. """
  289. Update or add a user's profile in the user directory.
  290. """
  291. def _update_profile_in_user_dir_txn(txn):
  292. new_entry = self._simple_upsert_txn(
  293. txn,
  294. table="user_directory",
  295. keyvalues={"user_id": user_id},
  296. values={"display_name": display_name, "avatar_url": avatar_url},
  297. lock=False, # We're only inserter
  298. )
  299. if isinstance(self.database_engine, PostgresEngine):
  300. # We weight the localpart most highly, then display name and finally
  301. # server name
  302. if self.database_engine.can_native_upsert:
  303. sql = """
  304. INSERT INTO user_directory_search(user_id, vector)
  305. VALUES (?,
  306. setweight(to_tsvector('english', ?), 'A')
  307. || setweight(to_tsvector('english', ?), 'D')
  308. || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
  309. ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
  310. """
  311. txn.execute(
  312. sql,
  313. (
  314. user_id,
  315. get_localpart_from_id(user_id),
  316. get_domain_from_id(user_id),
  317. display_name,
  318. ),
  319. )
  320. else:
  321. # TODO: Remove this code after we've bumped the minimum version
  322. # of postgres to always support upserts, so we can get rid of
  323. # `new_entry` usage
  324. if new_entry is True:
  325. sql = """
  326. INSERT INTO user_directory_search(user_id, vector)
  327. VALUES (?,
  328. setweight(to_tsvector('english', ?), 'A')
  329. || setweight(to_tsvector('english', ?), 'D')
  330. || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
  331. )
  332. """
  333. txn.execute(
  334. sql,
  335. (
  336. user_id,
  337. get_localpart_from_id(user_id),
  338. get_domain_from_id(user_id),
  339. display_name,
  340. ),
  341. )
  342. elif new_entry is False:
  343. sql = """
  344. UPDATE user_directory_search
  345. SET vector = setweight(to_tsvector('english', ?), 'A')
  346. || setweight(to_tsvector('english', ?), 'D')
  347. || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
  348. WHERE user_id = ?
  349. """
  350. txn.execute(
  351. sql,
  352. (
  353. get_localpart_from_id(user_id),
  354. get_domain_from_id(user_id),
  355. display_name,
  356. user_id,
  357. ),
  358. )
  359. else:
  360. raise RuntimeError(
  361. "upsert returned None when 'can_native_upsert' is False"
  362. )
  363. elif isinstance(self.database_engine, Sqlite3Engine):
  364. value = "%s %s" % (user_id, display_name) if display_name else user_id
  365. self._simple_upsert_txn(
  366. txn,
  367. table="user_directory_search",
  368. keyvalues={"user_id": user_id},
  369. values={"value": value},
  370. lock=False, # We're only inserter
  371. )
  372. else:
  373. # This should be unreachable.
  374. raise Exception("Unrecognized database engine")
  375. txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
  376. return self.runInteraction(
  377. "update_profile_in_user_dir", _update_profile_in_user_dir_txn
  378. )
  379. def remove_from_user_dir(self, user_id):
  380. def _remove_from_user_dir_txn(txn):
  381. self._simple_delete_txn(
  382. txn, table="user_directory", keyvalues={"user_id": user_id}
  383. )
  384. self._simple_delete_txn(
  385. txn, table="user_directory_search", keyvalues={"user_id": user_id}
  386. )
  387. self._simple_delete_txn(
  388. txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
  389. )
  390. self._simple_delete_txn(
  391. txn,
  392. table="users_who_share_private_rooms",
  393. keyvalues={"user_id": user_id},
  394. )
  395. self._simple_delete_txn(
  396. txn,
  397. table="users_who_share_private_rooms",
  398. keyvalues={"other_user_id": user_id},
  399. )
  400. txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
  401. return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn)
  402. @defer.inlineCallbacks
  403. def get_users_in_dir_due_to_room(self, room_id):
  404. """Get all user_ids that are in the room directory because they're
  405. in the given room_id
  406. """
  407. user_ids_share_pub = yield self._simple_select_onecol(
  408. table="users_in_public_rooms",
  409. keyvalues={"room_id": room_id},
  410. retcol="user_id",
  411. desc="get_users_in_dir_due_to_room",
  412. )
  413. user_ids_share_priv = yield self._simple_select_onecol(
  414. table="users_who_share_private_rooms",
  415. keyvalues={"room_id": room_id},
  416. retcol="other_user_id",
  417. desc="get_users_in_dir_due_to_room",
  418. )
  419. user_ids = set(user_ids_share_pub)
  420. user_ids.update(user_ids_share_priv)
  421. return user_ids
  422. def add_users_who_share_private_room(self, room_id, user_id_tuples):
  423. """Insert entries into the users_who_share_private_rooms table. The first
  424. user should be a local user.
  425. Args:
  426. room_id (str)
  427. user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
  428. """
  429. def _add_users_who_share_room_txn(txn):
  430. self._simple_upsert_many_txn(
  431. txn,
  432. table="users_who_share_private_rooms",
  433. key_names=["user_id", "other_user_id", "room_id"],
  434. key_values=[
  435. (user_id, other_user_id, room_id)
  436. for user_id, other_user_id in user_id_tuples
  437. ],
  438. value_names=(),
  439. value_values=None,
  440. )
  441. return self.runInteraction(
  442. "add_users_who_share_room", _add_users_who_share_room_txn
  443. )
  444. def add_users_in_public_rooms(self, room_id, user_ids):
  445. """Insert entries into the users_who_share_private_rooms table. The first
  446. user should be a local user.
  447. Args:
  448. room_id (str)
  449. user_ids (list[str])
  450. """
  451. def _add_users_in_public_rooms_txn(txn):
  452. self._simple_upsert_many_txn(
  453. txn,
  454. table="users_in_public_rooms",
  455. key_names=["user_id", "room_id"],
  456. key_values=[(user_id, room_id) for user_id in user_ids],
  457. value_names=(),
  458. value_values=None,
  459. )
  460. return self.runInteraction(
  461. "add_users_in_public_rooms", _add_users_in_public_rooms_txn
  462. )
  463. def remove_user_who_share_room(self, user_id, room_id):
  464. """
  465. Deletes entries in the users_who_share_*_rooms table. The first
  466. user should be a local user.
  467. Args:
  468. user_id (str)
  469. room_id (str)
  470. """
  471. def _remove_user_who_share_room_txn(txn):
  472. self._simple_delete_txn(
  473. txn,
  474. table="users_who_share_private_rooms",
  475. keyvalues={"user_id": user_id, "room_id": room_id},
  476. )
  477. self._simple_delete_txn(
  478. txn,
  479. table="users_who_share_private_rooms",
  480. keyvalues={"other_user_id": user_id, "room_id": room_id},
  481. )
  482. self._simple_delete_txn(
  483. txn,
  484. table="users_in_public_rooms",
  485. keyvalues={"user_id": user_id, "room_id": room_id},
  486. )
  487. return self.runInteraction(
  488. "remove_user_who_share_room", _remove_user_who_share_room_txn
  489. )
  490. @defer.inlineCallbacks
  491. def get_user_dir_rooms_user_is_in(self, user_id):
  492. """
  493. Returns the rooms that a user is in.
  494. Args:
  495. user_id(str): Must be a local user
  496. Returns:
  497. list: user_id
  498. """
  499. rows = yield self._simple_select_onecol(
  500. table="users_who_share_private_rooms",
  501. keyvalues={"user_id": user_id},
  502. retcol="room_id",
  503. desc="get_rooms_user_is_in",
  504. )
  505. pub_rows = yield self._simple_select_onecol(
  506. table="users_in_public_rooms",
  507. keyvalues={"user_id": user_id},
  508. retcol="room_id",
  509. desc="get_rooms_user_is_in",
  510. )
  511. users = set(pub_rows)
  512. users.update(rows)
  513. return list(users)
  514. @defer.inlineCallbacks
  515. def get_rooms_in_common_for_users(self, user_id, other_user_id):
  516. """Given two user_ids find out the list of rooms they share.
  517. """
  518. sql = """
  519. SELECT room_id FROM (
  520. SELECT c.room_id FROM current_state_events AS c
  521. INNER JOIN room_memberships AS m USING (event_id)
  522. WHERE type = 'm.room.member'
  523. AND m.membership = 'join'
  524. AND state_key = ?
  525. ) AS f1 INNER JOIN (
  526. SELECT c.room_id FROM current_state_events AS c
  527. INNER JOIN room_memberships AS m USING (event_id)
  528. WHERE type = 'm.room.member'
  529. AND m.membership = 'join'
  530. AND state_key = ?
  531. ) f2 USING (room_id)
  532. """
  533. rows = yield self._execute(
  534. "get_rooms_in_common_for_users", None, sql, user_id, other_user_id
  535. )
  536. return [room_id for room_id, in rows]
  537. def delete_all_from_user_dir(self):
  538. """Delete the entire user directory
  539. """
  540. def _delete_all_from_user_dir_txn(txn):
  541. txn.execute("DELETE FROM user_directory")
  542. txn.execute("DELETE FROM user_directory_search")
  543. txn.execute("DELETE FROM users_in_public_rooms")
  544. txn.execute("DELETE FROM users_who_share_private_rooms")
  545. txn.call_after(self.get_user_in_directory.invalidate_all)
  546. return self.runInteraction(
  547. "delete_all_from_user_dir", _delete_all_from_user_dir_txn
  548. )
  549. @cached()
  550. def get_user_in_directory(self, user_id):
  551. return self._simple_select_one(
  552. table="user_directory",
  553. keyvalues={"user_id": user_id},
  554. retcols=("display_name", "avatar_url"),
  555. allow_none=True,
  556. desc="get_user_in_directory",
  557. )
  558. def get_user_directory_stream_pos(self):
  559. return self._simple_select_one_onecol(
  560. table="user_directory_stream_pos",
  561. keyvalues={},
  562. retcol="stream_id",
  563. desc="get_user_directory_stream_pos",
  564. )
  565. def update_user_directory_stream_pos(self, stream_id):
  566. return self._simple_update_one(
  567. table="user_directory_stream_pos",
  568. keyvalues={},
  569. updatevalues={"stream_id": stream_id},
  570. desc="update_user_directory_stream_pos",
  571. )
  572. @defer.inlineCallbacks
  573. def search_user_dir(self, user_id, search_term, limit):
  574. """Searches for users in directory
  575. Returns:
  576. dict of the form::
  577. {
  578. "limited": <bool>, # whether there were more results or not
  579. "results": [ # Ordered by best match first
  580. {
  581. "user_id": <user_id>,
  582. "display_name": <display_name>,
  583. "avatar_url": <avatar_url>
  584. }
  585. ]
  586. }
  587. """
  588. if self.hs.config.user_directory_search_all_users:
  589. join_args = (user_id,)
  590. where_clause = "user_id != ?"
  591. else:
  592. join_args = (user_id,)
  593. where_clause = """
  594. (
  595. EXISTS (select 1 from users_in_public_rooms WHERE user_id = t.user_id)
  596. OR EXISTS (
  597. SELECT 1 FROM users_who_share_private_rooms
  598. WHERE user_id = ? AND other_user_id = t.user_id
  599. )
  600. )
  601. """
  602. if isinstance(self.database_engine, PostgresEngine):
  603. full_query, exact_query, prefix_query = _parse_query_postgres(search_term)
  604. # We order by rank and then if they have profile info
  605. # The ranking algorithm is hand tweaked for "best" results. Broadly
  606. # the idea is we give a higher weight to exact matches.
  607. # The array of numbers are the weights for the various part of the
  608. # search: (domain, _, display name, localpart)
  609. sql = """
  610. SELECT d.user_id AS user_id, display_name, avatar_url
  611. FROM user_directory_search as t
  612. INNER JOIN user_directory AS d USING (user_id)
  613. WHERE
  614. %s
  615. AND vector @@ to_tsquery('english', ?)
  616. ORDER BY
  617. (CASE WHEN d.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END)
  618. * (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END)
  619. * (CASE WHEN avatar_url IS NOT NULL THEN 1.2 ELSE 1.0 END)
  620. * (
  621. 3 * ts_rank_cd(
  622. '{0.1, 0.1, 0.9, 1.0}',
  623. vector,
  624. to_tsquery('english', ?),
  625. 8
  626. )
  627. + ts_rank_cd(
  628. '{0.1, 0.1, 0.9, 1.0}',
  629. vector,
  630. to_tsquery('english', ?),
  631. 8
  632. )
  633. )
  634. DESC,
  635. display_name IS NULL,
  636. avatar_url IS NULL
  637. LIMIT ?
  638. """ % (
  639. where_clause,
  640. )
  641. args = join_args + (full_query, exact_query, prefix_query, limit + 1)
  642. elif isinstance(self.database_engine, Sqlite3Engine):
  643. search_query = _parse_query_sqlite(search_term)
  644. sql = """
  645. SELECT d.user_id AS user_id, display_name, avatar_url
  646. FROM user_directory_search as t
  647. INNER JOIN user_directory AS d USING (user_id)
  648. WHERE
  649. %s
  650. AND value MATCH ?
  651. ORDER BY
  652. rank(matchinfo(user_directory_search)) DESC,
  653. display_name IS NULL,
  654. avatar_url IS NULL
  655. LIMIT ?
  656. """ % (
  657. where_clause,
  658. )
  659. args = join_args + (search_query, limit + 1)
  660. else:
  661. # This should be unreachable.
  662. raise Exception("Unrecognized database engine")
  663. results = yield self._execute(
  664. "search_user_dir", self.cursor_to_dict, sql, *args
  665. )
  666. limited = len(results) > limit
  667. return {"limited": limited, "results": results}
  668. def _parse_query_sqlite(search_term):
  669. """Takes a plain unicode string from the user and converts it into a form
  670. that can be passed to database.
  671. We use this so that we can add prefix matching, which isn't something
  672. that is supported by default.
  673. We specifically add both a prefix and non prefix matching term so that
  674. exact matches get ranked higher.
  675. """
  676. # Pull out the individual words, discarding any non-word characters.
  677. results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
  678. return " & ".join("(%s* OR %s)" % (result, result) for result in results)
  679. def _parse_query_postgres(search_term):
  680. """Takes a plain unicode string from the user and converts it into a form
  681. that can be passed to database.
  682. We use this so that we can add prefix matching, which isn't something
  683. that is supported by default.
  684. """
  685. # Pull out the individual words, discarding any non-word characters.
  686. results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
  687. both = " & ".join("(%s:* | %s)" % (result, result) for result in results)
  688. exact = " & ".join("%s" % (result,) for result in results)
  689. prefix = " & ".join("%s:*" % (result,) for result in results)
  690. return both, exact, prefix