user_directory.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923
  1. # Copyright 2017 Vector Creations Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import re
  16. from typing import (
  17. TYPE_CHECKING,
  18. Dict,
  19. Iterable,
  20. List,
  21. Optional,
  22. Sequence,
  23. Set,
  24. Tuple,
  25. cast,
  26. )
  27. from typing_extensions import TypedDict
  28. from synapse.api.errors import StoreError
  29. from synapse.util.stringutils import non_null_str_or_none
  30. if TYPE_CHECKING:
  31. from synapse.server import HomeServer
  32. from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules
  33. from synapse.storage.database import (
  34. DatabasePool,
  35. LoggingDatabaseConnection,
  36. LoggingTransaction,
  37. )
  38. from synapse.storage.databases.main.state import StateFilter
  39. from synapse.storage.databases.main.state_deltas import StateDeltasStore
  40. from synapse.storage.engines import PostgresEngine, Sqlite3Engine
  41. from synapse.types import (
  42. JsonDict,
  43. UserProfile,
  44. get_domain_from_id,
  45. get_localpart_from_id,
  46. )
  47. from synapse.util.caches.descriptors import cached
  48. logger = logging.getLogger(__name__)
  49. TEMP_TABLE = "_temp_populate_user_directory"
  50. class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
  51. # How many records do we calculate before sending it to
  52. # add_users_who_share_private_rooms?
  53. SHARE_PRIVATE_WORKING_SET = 500
  54. def __init__(
  55. self,
  56. database: DatabasePool,
  57. db_conn: LoggingDatabaseConnection,
  58. hs: "HomeServer",
  59. ) -> None:
  60. super().__init__(database, db_conn, hs)
  61. self.server_name: str = hs.hostname
  62. self.db_pool.updates.register_background_update_handler(
  63. "populate_user_directory_createtables",
  64. self._populate_user_directory_createtables,
  65. )
  66. self.db_pool.updates.register_background_update_handler(
  67. "populate_user_directory_process_rooms",
  68. self._populate_user_directory_process_rooms,
  69. )
  70. self.db_pool.updates.register_background_update_handler(
  71. "populate_user_directory_process_users",
  72. self._populate_user_directory_process_users,
  73. )
  74. self.db_pool.updates.register_background_update_handler(
  75. "populate_user_directory_cleanup", self._populate_user_directory_cleanup
  76. )
  77. async def _populate_user_directory_createtables(
  78. self, progress: JsonDict, batch_size: int
  79. ) -> int:
  80. # Get all the rooms that we want to process.
  81. def _make_staging_area(txn: LoggingTransaction) -> None:
  82. sql = (
  83. "CREATE TABLE IF NOT EXISTS "
  84. + TEMP_TABLE
  85. + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
  86. )
  87. txn.execute(sql)
  88. sql = (
  89. "CREATE TABLE IF NOT EXISTS "
  90. + TEMP_TABLE
  91. + "_position(position TEXT NOT NULL)"
  92. )
  93. txn.execute(sql)
  94. # Get rooms we want to process from the database
  95. sql = """
  96. SELECT room_id, count(*) FROM current_state_events
  97. GROUP BY room_id
  98. """
  99. txn.execute(sql)
  100. rooms = list(txn.fetchall())
  101. self.db_pool.simple_insert_many_txn(
  102. txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms
  103. )
  104. del rooms
  105. sql = (
  106. "CREATE TABLE IF NOT EXISTS "
  107. + TEMP_TABLE
  108. + "_users(user_id TEXT NOT NULL)"
  109. )
  110. txn.execute(sql)
  111. txn.execute("SELECT name FROM users")
  112. users = list(txn.fetchall())
  113. self.db_pool.simple_insert_many_txn(
  114. txn, TEMP_TABLE + "_users", keys=("user_id",), values=users
  115. )
  116. new_pos = await self.get_max_stream_id_in_current_state_deltas()
  117. await self.db_pool.runInteraction(
  118. "populate_user_directory_temp_build", _make_staging_area
  119. )
  120. await self.db_pool.simple_insert(
  121. TEMP_TABLE + "_position", {"position": new_pos}
  122. )
  123. await self.db_pool.updates._end_background_update(
  124. "populate_user_directory_createtables"
  125. )
  126. return 1
  127. async def _populate_user_directory_cleanup(
  128. self,
  129. progress: JsonDict,
  130. batch_size: int,
  131. ) -> int:
  132. """
  133. Update the user directory stream position, then clean up the old tables.
  134. """
  135. position = await self.db_pool.simple_select_one_onecol(
  136. TEMP_TABLE + "_position", {}, "position"
  137. )
  138. await self.update_user_directory_stream_pos(position)
  139. def _delete_staging_area(txn: LoggingTransaction) -> None:
  140. txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
  141. txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
  142. txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
  143. await self.db_pool.runInteraction(
  144. "populate_user_directory_cleanup", _delete_staging_area
  145. )
  146. await self.db_pool.updates._end_background_update(
  147. "populate_user_directory_cleanup"
  148. )
  149. return 1
  150. async def _populate_user_directory_process_rooms(
  151. self, progress: JsonDict, batch_size: int
  152. ) -> int:
  153. """
  154. Rescan the state of all rooms so we can track
  155. - who's in a public room;
  156. - which local users share a private room with other users (local
  157. and remote); and
  158. - who should be in the user_directory.
  159. Args:
  160. progress
  161. batch_size: Maximum number of state events to process per cycle.
  162. Returns:
  163. number of events processed.
  164. """
  165. # If we don't have progress filed, delete everything.
  166. if not progress:
  167. await self.delete_all_from_user_dir()
  168. def _get_next_batch(
  169. txn: LoggingTransaction,
  170. ) -> Optional[Sequence[Tuple[str, int]]]:
  171. # Only fetch 250 rooms, so we don't fetch too many at once, even
  172. # if those 250 rooms have less than batch_size state events.
  173. sql = """
  174. SELECT room_id, events FROM %s
  175. ORDER BY events DESC
  176. LIMIT 250
  177. """ % (
  178. TEMP_TABLE + "_rooms",
  179. )
  180. txn.execute(sql)
  181. rooms_to_work_on = cast(List[Tuple[str, int]], txn.fetchall())
  182. if not rooms_to_work_on:
  183. return None
  184. # Get how many are left to process, so we can give status on how
  185. # far we are in processing
  186. txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
  187. result = txn.fetchone()
  188. assert result is not None
  189. progress["remaining"] = result[0]
  190. return rooms_to_work_on
  191. rooms_to_work_on = await self.db_pool.runInteraction(
  192. "populate_user_directory_temp_read", _get_next_batch
  193. )
  194. # No more rooms -- complete the transaction.
  195. if not rooms_to_work_on:
  196. await self.db_pool.updates._end_background_update(
  197. "populate_user_directory_process_rooms"
  198. )
  199. return 1
  200. logger.debug(
  201. "Processing the next %d rooms of %d remaining"
  202. % (len(rooms_to_work_on), progress["remaining"])
  203. )
  204. processed_event_count = 0
  205. for room_id, event_count in rooms_to_work_on:
  206. is_in_room = await self.is_host_joined(room_id, self.server_name) # type: ignore[attr-defined]
  207. if is_in_room:
  208. users_with_profile = await self.get_users_in_room_with_profiles(room_id) # type: ignore[attr-defined]
  209. # Throw away users excluded from the directory.
  210. users_with_profile = {
  211. user_id: profile
  212. for user_id, profile in users_with_profile.items()
  213. if not self.hs.is_mine_id(user_id)
  214. or await self.should_include_local_user_in_dir(user_id)
  215. }
  216. # Upsert a user_directory record for each remote user we see.
  217. for user_id, profile in users_with_profile.items():
  218. # Local users are processed separately in
  219. # `_populate_user_directory_users`; there we can read from
  220. # the `profiles` table to ensure we don't leak their per-room
  221. # profiles. It also means we write local users to this table
  222. # exactly once, rather than once for every room they're in.
  223. if self.hs.is_mine_id(user_id):
  224. continue
  225. # TODO `users_with_profile` above reads from the `user_directory`
  226. # table, meaning that `profile` is bespoke to this room.
  227. # and this leaks remote users' per-room profiles to the user directory.
  228. await self.update_profile_in_user_dir(
  229. user_id, profile.display_name, profile.avatar_url
  230. )
  231. # Now update the room sharing tables to include this room.
  232. is_public = await self.is_room_world_readable_or_publicly_joinable(
  233. room_id
  234. )
  235. if is_public:
  236. if users_with_profile:
  237. await self.add_users_in_public_rooms(
  238. room_id, users_with_profile.keys()
  239. )
  240. else:
  241. to_insert = set()
  242. for user_id in users_with_profile:
  243. # We want the set of pairs (L, M) where L and M are
  244. # in `users_with_profile` and L is local.
  245. # Do so by looking for the local user L first.
  246. if not self.hs.is_mine_id(user_id):
  247. continue
  248. for other_user_id in users_with_profile:
  249. if user_id == other_user_id:
  250. continue
  251. user_set = (user_id, other_user_id)
  252. to_insert.add(user_set)
  253. # If it gets too big, stop and write to the database
  254. # to prevent storing too much in RAM.
  255. if len(to_insert) >= self.SHARE_PRIVATE_WORKING_SET:
  256. await self.add_users_who_share_private_room(
  257. room_id, to_insert
  258. )
  259. to_insert.clear()
  260. if to_insert:
  261. await self.add_users_who_share_private_room(room_id, to_insert)
  262. to_insert.clear()
  263. # We've finished a room. Delete it from the table.
  264. await self.db_pool.simple_delete_one(
  265. TEMP_TABLE + "_rooms", {"room_id": room_id}
  266. )
  267. # Update the remaining counter.
  268. progress["remaining"] -= 1
  269. await self.db_pool.runInteraction(
  270. "populate_user_directory",
  271. self.db_pool.updates._background_update_progress_txn,
  272. "populate_user_directory_process_rooms",
  273. progress,
  274. )
  275. processed_event_count += event_count
  276. if processed_event_count > batch_size:
  277. # Don't process any more rooms, we've hit our batch size.
  278. return processed_event_count
  279. return processed_event_count
  280. async def _populate_user_directory_process_users(
  281. self, progress: JsonDict, batch_size: int
  282. ) -> int:
  283. """
  284. Add all local users to the user directory.
  285. """
  286. def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:
  287. sql = "SELECT user_id FROM %s LIMIT %s" % (
  288. TEMP_TABLE + "_users",
  289. str(batch_size),
  290. )
  291. txn.execute(sql)
  292. user_result = cast(List[Tuple[str]], txn.fetchall())
  293. if not user_result:
  294. return None
  295. users_to_work_on = [x[0] for x in user_result]
  296. # Get how many are left to process, so we can give status on how
  297. # far we are in processing
  298. sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
  299. txn.execute(sql)
  300. count_result = txn.fetchone()
  301. assert count_result is not None
  302. progress["remaining"] = count_result[0]
  303. return users_to_work_on
  304. users_to_work_on = await self.db_pool.runInteraction(
  305. "populate_user_directory_temp_read", _get_next_batch
  306. )
  307. # No more users -- complete the transaction.
  308. if not users_to_work_on:
  309. await self.db_pool.updates._end_background_update(
  310. "populate_user_directory_process_users"
  311. )
  312. return 1
  313. logger.debug(
  314. "Processing the next %d users of %d remaining"
  315. % (len(users_to_work_on), progress["remaining"])
  316. )
  317. for user_id in users_to_work_on:
  318. if await self.should_include_local_user_in_dir(user_id):
  319. profile = await self.get_profileinfo(get_localpart_from_id(user_id)) # type: ignore[attr-defined]
  320. await self.update_profile_in_user_dir(
  321. user_id, profile.display_name, profile.avatar_url
  322. )
  323. # We've finished processing a user. Delete it from the table.
  324. await self.db_pool.simple_delete_one(
  325. TEMP_TABLE + "_users", {"user_id": user_id}
  326. )
  327. # Update the remaining counter.
  328. progress["remaining"] -= 1
  329. await self.db_pool.runInteraction(
  330. "populate_user_directory",
  331. self.db_pool.updates._background_update_progress_txn,
  332. "populate_user_directory_process_users",
  333. progress,
  334. )
  335. return len(users_to_work_on)
  336. async def should_include_local_user_in_dir(self, user: str) -> bool:
  337. """Certain classes of local user are omitted from the user directory.
  338. Is this user one of them?
  339. """
  340. # We're opting to exclude the appservice sender (user defined by the
  341. # `sender_localpart` in the appservice registration) even though
  342. # technically it could be DM-able. In the future, this could potentially
  343. # be configurable per-appservice whether the appservice sender can be
  344. # contacted.
  345. if self.get_app_service_by_user_id(user) is not None: # type: ignore[attr-defined]
  346. return False
  347. # We're opting to exclude appservice users (anyone matching the user
  348. # namespace regex in the appservice registration) even though technically
  349. # they could be DM-able. In the future, this could potentially
  350. # be configurable per-appservice whether the appservice users can be
  351. # contacted.
  352. if self.get_if_app_services_interested_in_user(user): # type: ignore[attr-defined]
  353. # TODO we might want to make this configurable for each app service
  354. return False
  355. # Support users are for diagnostics and should not appear in the user directory.
  356. if await self.is_support_user(user): # type: ignore[attr-defined]
  357. return False
  358. # Deactivated users aren't contactable, so should not appear in the user directory.
  359. try:
  360. if await self.get_user_deactivated_status(user): # type: ignore[attr-defined]
  361. return False
  362. except StoreError:
  363. # No such user in the users table. No need to do this when calling
  364. # is_support_user---that returns False if the user is missing.
  365. return False
  366. return True
  367. async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool:
  368. """Check if the room is either world_readable or publically joinable"""
  369. # Create a state filter that only queries join and history state event
  370. types_to_filter = (
  371. (EventTypes.JoinRules, ""),
  372. (EventTypes.RoomHistoryVisibility, ""),
  373. )
  374. # Getting the partial state is fine, as we're not looking at membership
  375. # events.
  376. current_state_ids = await self.get_partial_filtered_current_state_ids( # type: ignore[attr-defined]
  377. room_id, StateFilter.from_types(types_to_filter)
  378. )
  379. join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
  380. if join_rules_id:
  381. join_rule_ev = await self.get_event(join_rules_id, allow_none=True) # type: ignore[attr-defined]
  382. if join_rule_ev:
  383. if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC:
  384. return True
  385. hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, ""))
  386. if hist_vis_id:
  387. hist_vis_ev = await self.get_event(hist_vis_id, allow_none=True) # type: ignore[attr-defined]
  388. if hist_vis_ev:
  389. if (
  390. hist_vis_ev.content.get("history_visibility")
  391. == HistoryVisibility.WORLD_READABLE
  392. ):
  393. return True
  394. return False
  395. async def update_profile_in_user_dir(
  396. self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
  397. ) -> None:
  398. """
  399. Update or add a user's profile in the user directory.
  400. """
  401. # If the display name or avatar URL are unexpected types, replace with None.
  402. display_name = non_null_str_or_none(display_name)
  403. avatar_url = non_null_str_or_none(avatar_url)
  404. def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None:
  405. self.db_pool.simple_upsert_txn(
  406. txn,
  407. table="user_directory",
  408. keyvalues={"user_id": user_id},
  409. values={"display_name": display_name, "avatar_url": avatar_url},
  410. lock=False, # We're only inserter
  411. )
  412. if isinstance(self.database_engine, PostgresEngine):
  413. # We weight the localpart most highly, then display name and finally
  414. # server name
  415. sql = """
  416. INSERT INTO user_directory_search(user_id, vector)
  417. VALUES (?,
  418. setweight(to_tsvector('simple', ?), 'A')
  419. || setweight(to_tsvector('simple', ?), 'D')
  420. || setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
  421. ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
  422. """
  423. txn.execute(
  424. sql,
  425. (
  426. user_id,
  427. get_localpart_from_id(user_id),
  428. get_domain_from_id(user_id),
  429. display_name,
  430. ),
  431. )
  432. elif isinstance(self.database_engine, Sqlite3Engine):
  433. value = "%s %s" % (user_id, display_name) if display_name else user_id
  434. self.db_pool.simple_upsert_txn(
  435. txn,
  436. table="user_directory_search",
  437. keyvalues={"user_id": user_id},
  438. values={"value": value},
  439. lock=False, # We're only inserter
  440. )
  441. else:
  442. # This should be unreachable.
  443. raise Exception("Unrecognized database engine")
  444. txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
  445. await self.db_pool.runInteraction(
  446. "update_profile_in_user_dir", _update_profile_in_user_dir_txn
  447. )
  448. async def add_users_who_share_private_room(
  449. self, room_id: str, user_id_tuples: Iterable[Tuple[str, str]]
  450. ) -> None:
  451. """Insert entries into the users_who_share_private_rooms table. The first
  452. user should be a local user.
  453. Args:
  454. room_id
  455. user_id_tuples: iterable of 2-tuple of user IDs.
  456. """
  457. await self.db_pool.simple_upsert_many(
  458. table="users_who_share_private_rooms",
  459. key_names=["user_id", "other_user_id", "room_id"],
  460. key_values=[
  461. (user_id, other_user_id, room_id)
  462. for user_id, other_user_id in user_id_tuples
  463. ],
  464. value_names=(),
  465. value_values=(),
  466. desc="add_users_who_share_room",
  467. )
  468. async def add_users_in_public_rooms(
  469. self, room_id: str, user_ids: Iterable[str]
  470. ) -> None:
  471. """Insert entries into the users_in_public_rooms table.
  472. Args:
  473. room_id
  474. user_ids
  475. """
  476. await self.db_pool.simple_upsert_many(
  477. table="users_in_public_rooms",
  478. key_names=["user_id", "room_id"],
  479. key_values=[(user_id, room_id) for user_id in user_ids],
  480. value_names=(),
  481. value_values=(),
  482. desc="add_users_in_public_rooms",
  483. )
  484. async def delete_all_from_user_dir(self) -> None:
  485. """Delete the entire user directory"""
  486. def _delete_all_from_user_dir_txn(txn: LoggingTransaction) -> None:
  487. txn.execute("DELETE FROM user_directory")
  488. txn.execute("DELETE FROM user_directory_search")
  489. txn.execute("DELETE FROM users_in_public_rooms")
  490. txn.execute("DELETE FROM users_who_share_private_rooms")
  491. txn.call_after(self.get_user_in_directory.invalidate_all)
  492. await self.db_pool.runInteraction(
  493. "delete_all_from_user_dir", _delete_all_from_user_dir_txn
  494. )
  495. @cached()
  496. async def get_user_in_directory(self, user_id: str) -> Optional[Dict[str, str]]:
  497. return await self.db_pool.simple_select_one(
  498. table="user_directory",
  499. keyvalues={"user_id": user_id},
  500. retcols=("display_name", "avatar_url"),
  501. allow_none=True,
  502. desc="get_user_in_directory",
  503. )
  504. async def update_user_directory_stream_pos(self, stream_id: Optional[int]) -> None:
  505. await self.db_pool.simple_update_one(
  506. table="user_directory_stream_pos",
  507. keyvalues={},
  508. updatevalues={"stream_id": stream_id},
  509. desc="update_user_directory_stream_pos",
  510. )
  511. class SearchResult(TypedDict):
  512. limited: bool
  513. results: List[UserProfile]
  514. class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
  515. # How many records do we calculate before sending it to
  516. # add_users_who_share_private_rooms?
  517. SHARE_PRIVATE_WORKING_SET = 500
  518. def __init__(
  519. self,
  520. database: DatabasePool,
  521. db_conn: LoggingDatabaseConnection,
  522. hs: "HomeServer",
  523. ) -> None:
  524. super().__init__(database, db_conn, hs)
  525. self._prefer_local_users_in_search = (
  526. hs.config.userdirectory.user_directory_search_prefer_local_users
  527. )
  528. self._server_name = hs.config.server.server_name
  529. async def remove_from_user_dir(self, user_id: str) -> None:
  530. def _remove_from_user_dir_txn(txn: LoggingTransaction) -> None:
  531. self.db_pool.simple_delete_txn(
  532. txn, table="user_directory", keyvalues={"user_id": user_id}
  533. )
  534. self.db_pool.simple_delete_txn(
  535. txn, table="user_directory_search", keyvalues={"user_id": user_id}
  536. )
  537. self.db_pool.simple_delete_txn(
  538. txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
  539. )
  540. self.db_pool.simple_delete_txn(
  541. txn,
  542. table="users_who_share_private_rooms",
  543. keyvalues={"user_id": user_id},
  544. )
  545. self.db_pool.simple_delete_txn(
  546. txn,
  547. table="users_who_share_private_rooms",
  548. keyvalues={"other_user_id": user_id},
  549. )
  550. txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
  551. await self.db_pool.runInteraction(
  552. "remove_from_user_dir", _remove_from_user_dir_txn
  553. )
  554. async def get_users_in_dir_due_to_room(self, room_id: str) -> Set[str]:
  555. """Get all user_ids that are in the room directory because they're
  556. in the given room_id
  557. """
  558. user_ids_share_pub = await self.db_pool.simple_select_onecol(
  559. table="users_in_public_rooms",
  560. keyvalues={"room_id": room_id},
  561. retcol="user_id",
  562. desc="get_users_in_dir_due_to_room",
  563. )
  564. user_ids_share_priv = await self.db_pool.simple_select_onecol(
  565. table="users_who_share_private_rooms",
  566. keyvalues={"room_id": room_id},
  567. retcol="other_user_id",
  568. desc="get_users_in_dir_due_to_room",
  569. )
  570. user_ids = set(user_ids_share_pub)
  571. user_ids.update(user_ids_share_priv)
  572. return user_ids
  573. async def remove_user_who_share_room(self, user_id: str, room_id: str) -> None:
  574. """
  575. Deletes entries in the users_who_share_*_rooms table. The first
  576. user should be a local user.
  577. Args:
  578. user_id
  579. room_id
  580. """
  581. def _remove_user_who_share_room_txn(txn: LoggingTransaction) -> None:
  582. self.db_pool.simple_delete_txn(
  583. txn,
  584. table="users_who_share_private_rooms",
  585. keyvalues={"user_id": user_id, "room_id": room_id},
  586. )
  587. self.db_pool.simple_delete_txn(
  588. txn,
  589. table="users_who_share_private_rooms",
  590. keyvalues={"other_user_id": user_id, "room_id": room_id},
  591. )
  592. self.db_pool.simple_delete_txn(
  593. txn,
  594. table="users_in_public_rooms",
  595. keyvalues={"user_id": user_id, "room_id": room_id},
  596. )
  597. await self.db_pool.runInteraction(
  598. "remove_user_who_share_room", _remove_user_who_share_room_txn
  599. )
  600. async def get_user_dir_rooms_user_is_in(self, user_id: str) -> List[str]:
  601. """
  602. Returns the rooms that a user is in.
  603. Args:
  604. user_id: Must be a local user
  605. Returns:
  606. List of room IDs
  607. """
  608. rows = await self.db_pool.simple_select_onecol(
  609. table="users_who_share_private_rooms",
  610. keyvalues={"user_id": user_id},
  611. retcol="room_id",
  612. desc="get_rooms_user_is_in",
  613. )
  614. pub_rows = await self.db_pool.simple_select_onecol(
  615. table="users_in_public_rooms",
  616. keyvalues={"user_id": user_id},
  617. retcol="room_id",
  618. desc="get_rooms_user_is_in",
  619. )
  620. users = set(pub_rows)
  621. users.update(rows)
  622. return list(users)
  623. async def get_user_directory_stream_pos(self) -> Optional[int]:
  624. """
  625. Get the stream ID of the user directory stream.
  626. Returns:
  627. The stream token or None if the initial background update hasn't happened yet.
  628. """
  629. return await self.db_pool.simple_select_one_onecol(
  630. table="user_directory_stream_pos",
  631. keyvalues={},
  632. retcol="stream_id",
  633. desc="get_user_directory_stream_pos",
  634. )
  635. async def search_user_dir(
  636. self, user_id: str, search_term: str, limit: int
  637. ) -> SearchResult:
  638. """Searches for users in directory
  639. Returns:
  640. dict of the form::
  641. {
  642. "limited": <bool>, # whether there were more results or not
  643. "results": [ # Ordered by best match first
  644. {
  645. "user_id": <user_id>,
  646. "display_name": <display_name>,
  647. "avatar_url": <avatar_url>
  648. }
  649. ]
  650. }
  651. """
  652. if self.hs.config.userdirectory.user_directory_search_all_users:
  653. join_args = (user_id,)
  654. where_clause = "user_id != ?"
  655. else:
  656. join_args = (user_id,)
  657. where_clause = """
  658. (
  659. EXISTS (select 1 from users_in_public_rooms WHERE user_id = t.user_id)
  660. OR EXISTS (
  661. SELECT 1 FROM users_who_share_private_rooms
  662. WHERE user_id = ? AND other_user_id = t.user_id
  663. )
  664. )
  665. """
  666. # We allow manipulating the ranking algorithm by injecting statements
  667. # based on config options.
  668. additional_ordering_statements = []
  669. ordering_arguments: Tuple[str, ...] = ()
  670. if isinstance(self.database_engine, PostgresEngine):
  671. full_query, exact_query, prefix_query = _parse_query_postgres(search_term)
  672. # If enabled, this config option will rank local users higher than those on
  673. # remote instances.
  674. if self._prefer_local_users_in_search:
  675. # This statement checks whether a given user's user ID contains a server name
  676. # that matches the local server
  677. statement = "* (CASE WHEN user_id LIKE ? THEN 2.0 ELSE 1.0 END)"
  678. additional_ordering_statements.append(statement)
  679. ordering_arguments += ("%:" + self._server_name,)
  680. # We order by rank and then if they have profile info
  681. # The ranking algorithm is hand tweaked for "best" results. Broadly
  682. # the idea is we give a higher weight to exact matches.
  683. # The array of numbers are the weights for the various part of the
  684. # search: (domain, _, display name, localpart)
  685. sql = """
  686. SELECT d.user_id AS user_id, display_name, avatar_url
  687. FROM user_directory_search as t
  688. INNER JOIN user_directory AS d USING (user_id)
  689. WHERE
  690. %(where_clause)s
  691. AND vector @@ to_tsquery('simple', ?)
  692. ORDER BY
  693. (CASE WHEN d.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END)
  694. * (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END)
  695. * (CASE WHEN avatar_url IS NOT NULL THEN 1.2 ELSE 1.0 END)
  696. * (
  697. 3 * ts_rank_cd(
  698. '{0.1, 0.1, 0.9, 1.0}',
  699. vector,
  700. to_tsquery('simple', ?),
  701. 8
  702. )
  703. + ts_rank_cd(
  704. '{0.1, 0.1, 0.9, 1.0}',
  705. vector,
  706. to_tsquery('simple', ?),
  707. 8
  708. )
  709. )
  710. %(order_case_statements)s
  711. DESC,
  712. display_name IS NULL,
  713. avatar_url IS NULL
  714. LIMIT ?
  715. """ % {
  716. "where_clause": where_clause,
  717. "order_case_statements": " ".join(additional_ordering_statements),
  718. }
  719. args = (
  720. join_args
  721. + (full_query, exact_query, prefix_query)
  722. + ordering_arguments
  723. + (limit + 1,)
  724. )
  725. elif isinstance(self.database_engine, Sqlite3Engine):
  726. search_query = _parse_query_sqlite(search_term)
  727. # If enabled, this config option will rank local users higher than those on
  728. # remote instances.
  729. if self._prefer_local_users_in_search:
  730. # This statement checks whether a given user's user ID contains a server name
  731. # that matches the local server
  732. #
  733. # Note that we need to include a comma at the end for valid SQL
  734. statement = "user_id LIKE ? DESC,"
  735. additional_ordering_statements.append(statement)
  736. ordering_arguments += ("%:" + self._server_name,)
  737. sql = """
  738. SELECT d.user_id AS user_id, display_name, avatar_url
  739. FROM user_directory_search as t
  740. INNER JOIN user_directory AS d USING (user_id)
  741. WHERE
  742. %(where_clause)s
  743. AND value MATCH ?
  744. ORDER BY
  745. rank(matchinfo(user_directory_search)) DESC,
  746. %(order_statements)s
  747. display_name IS NULL,
  748. avatar_url IS NULL
  749. LIMIT ?
  750. """ % {
  751. "where_clause": where_clause,
  752. "order_statements": " ".join(additional_ordering_statements),
  753. }
  754. args = join_args + (search_query,) + ordering_arguments + (limit + 1,)
  755. else:
  756. # This should be unreachable.
  757. raise Exception("Unrecognized database engine")
  758. results = cast(
  759. List[UserProfile],
  760. await self.db_pool.execute(
  761. "search_user_dir", self.db_pool.cursor_to_dict, sql, *args
  762. ),
  763. )
  764. limited = len(results) > limit
  765. return {"limited": limited, "results": results}
  766. def _parse_query_sqlite(search_term: str) -> str:
  767. """Takes a plain unicode string from the user and converts it into a form
  768. that can be passed to database.
  769. We use this so that we can add prefix matching, which isn't something
  770. that is supported by default.
  771. We specifically add both a prefix and non prefix matching term so that
  772. exact matches get ranked higher.
  773. """
  774. # Pull out the individual words, discarding any non-word characters.
  775. results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
  776. return " & ".join("(%s* OR %s)" % (result, result) for result in results)
  777. def _parse_query_postgres(search_term: str) -> Tuple[str, str, str]:
  778. """Takes a plain unicode string from the user and converts it into a form
  779. that can be passed to database.
  780. We use this so that we can add prefix matching, which isn't something
  781. that is supported by default.
  782. """
  783. # Pull out the individual words, discarding any non-word characters.
  784. results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
  785. both = " & ".join("(%s:* | %s)" % (result, result) for result in results)
  786. exact = " & ".join("%s" % (result,) for result in results)
  787. prefix = " & ".join("%s:*" % (result,) for result in results)
  788. return both, exact, prefix