user_directory.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965
  1. # Copyright 2017 Vector Creations Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import re
  16. from typing import (
  17. TYPE_CHECKING,
  18. Dict,
  19. Iterable,
  20. List,
  21. Optional,
  22. Sequence,
  23. Set,
  24. Tuple,
  25. cast,
  26. )
  27. from typing_extensions import TypedDict
  28. from synapse.api.errors import StoreError
  29. from synapse.util.stringutils import non_null_str_or_none
  30. if TYPE_CHECKING:
  31. from synapse.server import HomeServer
  32. from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules
  33. from synapse.storage.database import (
  34. DatabasePool,
  35. LoggingDatabaseConnection,
  36. LoggingTransaction,
  37. )
  38. from synapse.storage.databases.main.state import StateFilter
  39. from synapse.storage.databases.main.state_deltas import StateDeltasStore
  40. from synapse.storage.engines import PostgresEngine, Sqlite3Engine
  41. from synapse.types import (
  42. JsonDict,
  43. UserProfile,
  44. get_domain_from_id,
  45. get_localpart_from_id,
  46. )
  47. from synapse.util.caches.descriptors import cached
  48. logger = logging.getLogger(__name__)
  49. TEMP_TABLE = "_temp_populate_user_directory"
  50. class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
  51. # How many records do we calculate before sending it to
  52. # add_users_who_share_private_rooms?
  53. SHARE_PRIVATE_WORKING_SET = 500
  54. def __init__(
  55. self,
  56. database: DatabasePool,
  57. db_conn: LoggingDatabaseConnection,
  58. hs: "HomeServer",
  59. ) -> None:
  60. super().__init__(database, db_conn, hs)
  61. self.server_name: str = hs.hostname
  62. self.db_pool.updates.register_background_update_handler(
  63. "populate_user_directory_createtables",
  64. self._populate_user_directory_createtables,
  65. )
  66. self.db_pool.updates.register_background_update_handler(
  67. "populate_user_directory_process_rooms",
  68. self._populate_user_directory_process_rooms,
  69. )
  70. self.db_pool.updates.register_background_update_handler(
  71. "populate_user_directory_process_users",
  72. self._populate_user_directory_process_users,
  73. )
  74. self.db_pool.updates.register_background_update_handler(
  75. "populate_user_directory_cleanup", self._populate_user_directory_cleanup
  76. )
  77. async def _populate_user_directory_createtables(
  78. self, progress: JsonDict, batch_size: int
  79. ) -> int:
  80. # Get all the rooms that we want to process.
  81. def _make_staging_area(txn: LoggingTransaction) -> None:
  82. sql = (
  83. "CREATE TABLE IF NOT EXISTS "
  84. + TEMP_TABLE
  85. + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
  86. )
  87. txn.execute(sql)
  88. sql = (
  89. "CREATE TABLE IF NOT EXISTS "
  90. + TEMP_TABLE
  91. + "_position(position TEXT NOT NULL)"
  92. )
  93. txn.execute(sql)
  94. # Get rooms we want to process from the database
  95. sql = """
  96. SELECT room_id, count(*) FROM current_state_events
  97. GROUP BY room_id
  98. """
  99. txn.execute(sql)
  100. rooms = list(txn.fetchall())
  101. self.db_pool.simple_insert_many_txn(
  102. txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms
  103. )
  104. del rooms
  105. sql = (
  106. "CREATE TABLE IF NOT EXISTS "
  107. + TEMP_TABLE
  108. + "_users(user_id TEXT NOT NULL)"
  109. )
  110. txn.execute(sql)
  111. txn.execute("SELECT name FROM users")
  112. users = list(txn.fetchall())
  113. self.db_pool.simple_insert_many_txn(
  114. txn, TEMP_TABLE + "_users", keys=("user_id",), values=users
  115. )
  116. new_pos = await self.get_max_stream_id_in_current_state_deltas()
  117. await self.db_pool.runInteraction(
  118. "populate_user_directory_temp_build", _make_staging_area
  119. )
  120. await self.db_pool.simple_insert(
  121. TEMP_TABLE + "_position", {"position": new_pos}
  122. )
  123. await self.db_pool.updates._end_background_update(
  124. "populate_user_directory_createtables"
  125. )
  126. return 1
  127. async def _populate_user_directory_cleanup(
  128. self,
  129. progress: JsonDict,
  130. batch_size: int,
  131. ) -> int:
  132. """
  133. Update the user directory stream position, then clean up the old tables.
  134. """
  135. position = await self.db_pool.simple_select_one_onecol(
  136. TEMP_TABLE + "_position", {}, "position"
  137. )
  138. await self.update_user_directory_stream_pos(position)
  139. def _delete_staging_area(txn: LoggingTransaction) -> None:
  140. txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
  141. txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
  142. txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
  143. await self.db_pool.runInteraction(
  144. "populate_user_directory_cleanup", _delete_staging_area
  145. )
  146. await self.db_pool.updates._end_background_update(
  147. "populate_user_directory_cleanup"
  148. )
  149. return 1
  150. async def _populate_user_directory_process_rooms(
  151. self, progress: JsonDict, batch_size: int
  152. ) -> int:
  153. """
  154. Rescan the state of all rooms so we can track
  155. - who's in a public room;
  156. - which local users share a private room with other users (local
  157. and remote); and
  158. - who should be in the user_directory.
  159. Args:
  160. progress (dict)
  161. batch_size (int): Maximum number of state events to process
  162. per cycle.
  163. Returns:
  164. number of events processed.
  165. """
  166. # If we don't have progress filed, delete everything.
  167. if not progress:
  168. await self.delete_all_from_user_dir()
  169. def _get_next_batch(
  170. txn: LoggingTransaction,
  171. ) -> Optional[Sequence[Tuple[str, int]]]:
  172. # Only fetch 250 rooms, so we don't fetch too many at once, even
  173. # if those 250 rooms have less than batch_size state events.
  174. sql = """
  175. SELECT room_id, events FROM %s
  176. ORDER BY events DESC
  177. LIMIT 250
  178. """ % (
  179. TEMP_TABLE + "_rooms",
  180. )
  181. txn.execute(sql)
  182. rooms_to_work_on = cast(List[Tuple[str, int]], txn.fetchall())
  183. if not rooms_to_work_on:
  184. return None
  185. # Get how many are left to process, so we can give status on how
  186. # far we are in processing
  187. txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
  188. result = txn.fetchone()
  189. assert result is not None
  190. progress["remaining"] = result[0]
  191. return rooms_to_work_on
  192. rooms_to_work_on = await self.db_pool.runInteraction(
  193. "populate_user_directory_temp_read", _get_next_batch
  194. )
  195. # No more rooms -- complete the transaction.
  196. if not rooms_to_work_on:
  197. await self.db_pool.updates._end_background_update(
  198. "populate_user_directory_process_rooms"
  199. )
  200. return 1
  201. logger.debug(
  202. "Processing the next %d rooms of %d remaining"
  203. % (len(rooms_to_work_on), progress["remaining"])
  204. )
  205. processed_event_count = 0
  206. for room_id, event_count in rooms_to_work_on:
  207. is_in_room = await self.is_host_joined(room_id, self.server_name) # type: ignore[attr-defined]
  208. if is_in_room:
  209. users_with_profile = await self.get_users_in_room_with_profiles(room_id) # type: ignore[attr-defined]
  210. # Throw away users excluded from the directory.
  211. users_with_profile = {
  212. user_id: profile
  213. for user_id, profile in users_with_profile.items()
  214. if not self.hs.is_mine_id(user_id)
  215. or await self.should_include_local_user_in_dir(user_id)
  216. }
  217. # Upsert a user_directory record for each remote user we see.
  218. for user_id, profile in users_with_profile.items():
  219. # Local users are processed separately in
  220. # `_populate_user_directory_users`; there we can read from
  221. # the `profiles` table to ensure we don't leak their per-room
  222. # profiles. It also means we write local users to this table
  223. # exactly once, rather than once for every room they're in.
  224. if self.hs.is_mine_id(user_id):
  225. continue
  226. # TODO `users_with_profile` above reads from the `user_directory`
  227. # table, meaning that `profile` is bespoke to this room.
  228. # and this leaks remote users' per-room profiles to the user directory.
  229. await self.update_profile_in_user_dir(
  230. user_id, profile.display_name, profile.avatar_url
  231. )
  232. # Now update the room sharing tables to include this room.
  233. is_public = await self.is_room_world_readable_or_publicly_joinable(
  234. room_id
  235. )
  236. if is_public:
  237. if users_with_profile:
  238. await self.add_users_in_public_rooms(
  239. room_id, users_with_profile.keys()
  240. )
  241. else:
  242. to_insert = set()
  243. for user_id in users_with_profile:
  244. # We want the set of pairs (L, M) where L and M are
  245. # in `users_with_profile` and L is local.
  246. # Do so by looking for the local user L first.
  247. if not self.hs.is_mine_id(user_id):
  248. continue
  249. for other_user_id in users_with_profile:
  250. if user_id == other_user_id:
  251. continue
  252. user_set = (user_id, other_user_id)
  253. to_insert.add(user_set)
  254. # If it gets too big, stop and write to the database
  255. # to prevent storing too much in RAM.
  256. if len(to_insert) >= self.SHARE_PRIVATE_WORKING_SET:
  257. await self.add_users_who_share_private_room(
  258. room_id, to_insert
  259. )
  260. to_insert.clear()
  261. if to_insert:
  262. await self.add_users_who_share_private_room(room_id, to_insert)
  263. to_insert.clear()
  264. # We've finished a room. Delete it from the table.
  265. await self.db_pool.simple_delete_one(
  266. TEMP_TABLE + "_rooms", {"room_id": room_id}
  267. )
  268. # Update the remaining counter.
  269. progress["remaining"] -= 1
  270. await self.db_pool.runInteraction(
  271. "populate_user_directory",
  272. self.db_pool.updates._background_update_progress_txn,
  273. "populate_user_directory_process_rooms",
  274. progress,
  275. )
  276. processed_event_count += event_count
  277. if processed_event_count > batch_size:
  278. # Don't process any more rooms, we've hit our batch size.
  279. return processed_event_count
  280. return processed_event_count
  281. async def _populate_user_directory_process_users(
  282. self, progress: JsonDict, batch_size: int
  283. ) -> int:
  284. """
  285. Add all local users to the user directory.
  286. """
  287. def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:
  288. sql = "SELECT user_id FROM %s LIMIT %s" % (
  289. TEMP_TABLE + "_users",
  290. str(batch_size),
  291. )
  292. txn.execute(sql)
  293. user_result = cast(List[Tuple[str]], txn.fetchall())
  294. if not user_result:
  295. return None
  296. users_to_work_on = [x[0] for x in user_result]
  297. # Get how many are left to process, so we can give status on how
  298. # far we are in processing
  299. sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
  300. txn.execute(sql)
  301. count_result = txn.fetchone()
  302. assert count_result is not None
  303. progress["remaining"] = count_result[0]
  304. return users_to_work_on
  305. users_to_work_on = await self.db_pool.runInteraction(
  306. "populate_user_directory_temp_read", _get_next_batch
  307. )
  308. # No more users -- complete the transaction.
  309. if not users_to_work_on:
  310. await self.db_pool.updates._end_background_update(
  311. "populate_user_directory_process_users"
  312. )
  313. return 1
  314. logger.debug(
  315. "Processing the next %d users of %d remaining"
  316. % (len(users_to_work_on), progress["remaining"])
  317. )
  318. for user_id in users_to_work_on:
  319. if await self.should_include_local_user_in_dir(user_id):
  320. profile = await self.get_profileinfo(get_localpart_from_id(user_id)) # type: ignore[attr-defined]
  321. await self.update_profile_in_user_dir(
  322. user_id, profile.display_name, profile.avatar_url
  323. )
  324. # We've finished processing a user. Delete it from the table.
  325. await self.db_pool.simple_delete_one(
  326. TEMP_TABLE + "_users", {"user_id": user_id}
  327. )
  328. # Update the remaining counter.
  329. progress["remaining"] -= 1
  330. await self.db_pool.runInteraction(
  331. "populate_user_directory",
  332. self.db_pool.updates._background_update_progress_txn,
  333. "populate_user_directory_process_users",
  334. progress,
  335. )
  336. return len(users_to_work_on)
  337. async def should_include_local_user_in_dir(self, user: str) -> bool:
  338. """Certain classes of local user are omitted from the user directory.
  339. Is this user one of them?
  340. """
  341. # We're opting to exclude the appservice sender (user defined by the
  342. # `sender_localpart` in the appservice registration) even though
  343. # technically it could be DM-able. In the future, this could potentially
  344. # be configurable per-appservice whether the appservice sender can be
  345. # contacted.
  346. if self.get_app_service_by_user_id(user) is not None: # type: ignore[attr-defined]
  347. return False
  348. # We're opting to exclude appservice users (anyone matching the user
  349. # namespace regex in the appservice registration) even though technically
  350. # they could be DM-able. In the future, this could potentially
  351. # be configurable per-appservice whether the appservice users can be
  352. # contacted.
  353. if self.get_if_app_services_interested_in_user(user): # type: ignore[attr-defined]
  354. # TODO we might want to make this configurable for each app service
  355. return False
  356. # Support users are for diagnostics and should not appear in the user directory.
  357. if await self.is_support_user(user): # type: ignore[attr-defined]
  358. return False
  359. # Deactivated users aren't contactable, so should not appear in the user directory.
  360. try:
  361. if await self.get_user_deactivated_status(user): # type: ignore[attr-defined]
  362. return False
  363. except StoreError:
  364. # No such user in the users table. No need to do this when calling
  365. # is_support_user---that returns False if the user is missing.
  366. return False
  367. return True
  368. async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool:
  369. """Check if the room is either world_readable or publically joinable"""
  370. # Create a state filter that only queries join and history state event
  371. types_to_filter = (
  372. (EventTypes.JoinRules, ""),
  373. (EventTypes.RoomHistoryVisibility, ""),
  374. )
  375. current_state_ids = await self.get_filtered_current_state_ids( # type: ignore[attr-defined]
  376. room_id, StateFilter.from_types(types_to_filter)
  377. )
  378. join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
  379. if join_rules_id:
  380. join_rule_ev = await self.get_event(join_rules_id, allow_none=True) # type: ignore[attr-defined]
  381. if join_rule_ev:
  382. if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC:
  383. return True
  384. hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, ""))
  385. if hist_vis_id:
  386. hist_vis_ev = await self.get_event(hist_vis_id, allow_none=True) # type: ignore[attr-defined]
  387. if hist_vis_ev:
  388. if (
  389. hist_vis_ev.content.get("history_visibility")
  390. == HistoryVisibility.WORLD_READABLE
  391. ):
  392. return True
  393. return False
  394. async def update_profile_in_user_dir(
  395. self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
  396. ) -> None:
  397. """
  398. Update or add a user's profile in the user directory.
  399. """
  400. # If the display name or avatar URL are unexpected types, replace with None.
  401. display_name = non_null_str_or_none(display_name)
  402. avatar_url = non_null_str_or_none(avatar_url)
  403. def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None:
  404. self.db_pool.simple_upsert_txn(
  405. txn,
  406. table="user_directory",
  407. keyvalues={"user_id": user_id},
  408. values={"display_name": display_name, "avatar_url": avatar_url},
  409. lock=False, # We're only inserter
  410. )
  411. if isinstance(self.database_engine, PostgresEngine):
  412. # We weight the localpart most highly, then display name and finally
  413. # server name
  414. sql = """
  415. INSERT INTO user_directory_search(user_id, vector)
  416. VALUES (?,
  417. setweight(to_tsvector('simple', ?), 'A')
  418. || setweight(to_tsvector('simple', ?), 'D')
  419. || setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
  420. ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
  421. """
  422. txn.execute(
  423. sql,
  424. (
  425. user_id,
  426. get_localpart_from_id(user_id),
  427. get_domain_from_id(user_id),
  428. display_name,
  429. ),
  430. )
  431. elif isinstance(self.database_engine, Sqlite3Engine):
  432. value = "%s %s" % (user_id, display_name) if display_name else user_id
  433. self.db_pool.simple_upsert_txn(
  434. txn,
  435. table="user_directory_search",
  436. keyvalues={"user_id": user_id},
  437. values={"value": value},
  438. lock=False, # We're only inserter
  439. )
  440. else:
  441. # This should be unreachable.
  442. raise Exception("Unrecognized database engine")
  443. txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
  444. await self.db_pool.runInteraction(
  445. "update_profile_in_user_dir", _update_profile_in_user_dir_txn
  446. )
  447. async def add_users_who_share_private_room(
  448. self, room_id: str, user_id_tuples: Iterable[Tuple[str, str]]
  449. ) -> None:
  450. """Insert entries into the users_who_share_private_rooms table. The first
  451. user should be a local user.
  452. Args:
  453. room_id
  454. user_id_tuples: iterable of 2-tuple of user IDs.
  455. """
  456. await self.db_pool.simple_upsert_many(
  457. table="users_who_share_private_rooms",
  458. key_names=["user_id", "other_user_id", "room_id"],
  459. key_values=[
  460. (user_id, other_user_id, room_id)
  461. for user_id, other_user_id in user_id_tuples
  462. ],
  463. value_names=(),
  464. value_values=(),
  465. desc="add_users_who_share_room",
  466. )
  467. async def add_users_in_public_rooms(
  468. self, room_id: str, user_ids: Iterable[str]
  469. ) -> None:
  470. """Insert entries into the users_in_public_rooms table.
  471. Args:
  472. room_id
  473. user_ids
  474. """
  475. await self.db_pool.simple_upsert_many(
  476. table="users_in_public_rooms",
  477. key_names=["user_id", "room_id"],
  478. key_values=[(user_id, room_id) for user_id in user_ids],
  479. value_names=(),
  480. value_values=(),
  481. desc="add_users_in_public_rooms",
  482. )
  483. async def delete_all_from_user_dir(self) -> None:
  484. """Delete the entire user directory"""
  485. def _delete_all_from_user_dir_txn(txn: LoggingTransaction) -> None:
  486. txn.execute("DELETE FROM user_directory")
  487. txn.execute("DELETE FROM user_directory_search")
  488. txn.execute("DELETE FROM users_in_public_rooms")
  489. txn.execute("DELETE FROM users_who_share_private_rooms")
  490. txn.call_after(self.get_user_in_directory.invalidate_all)
  491. await self.db_pool.runInteraction(
  492. "delete_all_from_user_dir", _delete_all_from_user_dir_txn
  493. )
  494. @cached()
  495. async def get_user_in_directory(self, user_id: str) -> Optional[Dict[str, str]]:
  496. return await self.db_pool.simple_select_one(
  497. table="user_directory",
  498. keyvalues={"user_id": user_id},
  499. retcols=("display_name", "avatar_url"),
  500. allow_none=True,
  501. desc="get_user_in_directory",
  502. )
  503. async def update_user_directory_stream_pos(self, stream_id: Optional[int]) -> None:
  504. await self.db_pool.simple_update_one(
  505. table="user_directory_stream_pos",
  506. keyvalues={},
  507. updatevalues={"stream_id": stream_id},
  508. desc="update_user_directory_stream_pos",
  509. )
  510. class SearchResult(TypedDict):
  511. limited: bool
  512. results: List[UserProfile]
  513. class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
  514. # How many records do we calculate before sending it to
  515. # add_users_who_share_private_rooms?
  516. SHARE_PRIVATE_WORKING_SET = 500
  517. def __init__(
  518. self,
  519. database: DatabasePool,
  520. db_conn: LoggingDatabaseConnection,
  521. hs: "HomeServer",
  522. ) -> None:
  523. super().__init__(database, db_conn, hs)
  524. self._prefer_local_users_in_search = (
  525. hs.config.userdirectory.user_directory_search_prefer_local_users
  526. )
  527. self._server_name = hs.config.server.server_name
  528. async def remove_from_user_dir(self, user_id: str) -> None:
  529. def _remove_from_user_dir_txn(txn: LoggingTransaction) -> None:
  530. self.db_pool.simple_delete_txn(
  531. txn, table="user_directory", keyvalues={"user_id": user_id}
  532. )
  533. self.db_pool.simple_delete_txn(
  534. txn, table="user_directory_search", keyvalues={"user_id": user_id}
  535. )
  536. self.db_pool.simple_delete_txn(
  537. txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
  538. )
  539. self.db_pool.simple_delete_txn(
  540. txn,
  541. table="users_who_share_private_rooms",
  542. keyvalues={"user_id": user_id},
  543. )
  544. self.db_pool.simple_delete_txn(
  545. txn,
  546. table="users_who_share_private_rooms",
  547. keyvalues={"other_user_id": user_id},
  548. )
  549. txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
  550. await self.db_pool.runInteraction(
  551. "remove_from_user_dir", _remove_from_user_dir_txn
  552. )
  553. async def get_users_in_dir_due_to_room(self, room_id: str) -> Set[str]:
  554. """Get all user_ids that are in the room directory because they're
  555. in the given room_id
  556. """
  557. user_ids_share_pub = await self.db_pool.simple_select_onecol(
  558. table="users_in_public_rooms",
  559. keyvalues={"room_id": room_id},
  560. retcol="user_id",
  561. desc="get_users_in_dir_due_to_room",
  562. )
  563. user_ids_share_priv = await self.db_pool.simple_select_onecol(
  564. table="users_who_share_private_rooms",
  565. keyvalues={"room_id": room_id},
  566. retcol="other_user_id",
  567. desc="get_users_in_dir_due_to_room",
  568. )
  569. user_ids = set(user_ids_share_pub)
  570. user_ids.update(user_ids_share_priv)
  571. return user_ids
  572. async def remove_user_who_share_room(self, user_id: str, room_id: str) -> None:
  573. """
  574. Deletes entries in the users_who_share_*_rooms table. The first
  575. user should be a local user.
  576. Args:
  577. user_id
  578. room_id
  579. """
  580. def _remove_user_who_share_room_txn(txn: LoggingTransaction) -> None:
  581. self.db_pool.simple_delete_txn(
  582. txn,
  583. table="users_who_share_private_rooms",
  584. keyvalues={"user_id": user_id, "room_id": room_id},
  585. )
  586. self.db_pool.simple_delete_txn(
  587. txn,
  588. table="users_who_share_private_rooms",
  589. keyvalues={"other_user_id": user_id, "room_id": room_id},
  590. )
  591. self.db_pool.simple_delete_txn(
  592. txn,
  593. table="users_in_public_rooms",
  594. keyvalues={"user_id": user_id, "room_id": room_id},
  595. )
  596. await self.db_pool.runInteraction(
  597. "remove_user_who_share_room", _remove_user_who_share_room_txn
  598. )
  599. async def get_user_dir_rooms_user_is_in(self, user_id: str) -> List[str]:
  600. """
  601. Returns the rooms that a user is in.
  602. Args:
  603. user_id(str): Must be a local user
  604. Returns:
  605. list: user_id
  606. """
  607. rows = await self.db_pool.simple_select_onecol(
  608. table="users_who_share_private_rooms",
  609. keyvalues={"user_id": user_id},
  610. retcol="room_id",
  611. desc="get_rooms_user_is_in",
  612. )
  613. pub_rows = await self.db_pool.simple_select_onecol(
  614. table="users_in_public_rooms",
  615. keyvalues={"user_id": user_id},
  616. retcol="room_id",
  617. desc="get_rooms_user_is_in",
  618. )
  619. users = set(pub_rows)
  620. users.update(rows)
  621. return list(users)
  622. async def get_mutual_rooms_for_users(
  623. self, user_id: str, other_user_id: str
  624. ) -> Set[str]:
  625. """
  626. Returns the rooms that a local user shares with another local or remote user.
  627. Args:
  628. user_id: The MXID of a local user
  629. other_user_id: The MXID of the other user
  630. Returns:
  631. A set of room ID's that the users share.
  632. """
  633. def _get_mutual_rooms_for_users_txn(
  634. txn: LoggingTransaction,
  635. ) -> List[Dict[str, str]]:
  636. txn.execute(
  637. """
  638. SELECT p1.room_id
  639. FROM users_in_public_rooms as p1
  640. INNER JOIN users_in_public_rooms as p2
  641. ON p1.room_id = p2.room_id
  642. AND p1.user_id = ?
  643. AND p2.user_id = ?
  644. UNION
  645. SELECT room_id
  646. FROM users_who_share_private_rooms
  647. WHERE
  648. user_id = ?
  649. AND other_user_id = ?
  650. """,
  651. (user_id, other_user_id, user_id, other_user_id),
  652. )
  653. rows = self.db_pool.cursor_to_dict(txn)
  654. return rows
  655. rows = await self.db_pool.runInteraction(
  656. "get_mutual_rooms_for_users", _get_mutual_rooms_for_users_txn
  657. )
  658. return {row["room_id"] for row in rows}
  659. async def get_user_directory_stream_pos(self) -> Optional[int]:
  660. """
  661. Get the stream ID of the user directory stream.
  662. Returns:
  663. The stream token or None if the initial background update hasn't happened yet.
  664. """
  665. return await self.db_pool.simple_select_one_onecol(
  666. table="user_directory_stream_pos",
  667. keyvalues={},
  668. retcol="stream_id",
  669. desc="get_user_directory_stream_pos",
  670. )
  671. async def search_user_dir(
  672. self, user_id: str, search_term: str, limit: int
  673. ) -> SearchResult:
  674. """Searches for users in directory
  675. Returns:
  676. dict of the form::
  677. {
  678. "limited": <bool>, # whether there were more results or not
  679. "results": [ # Ordered by best match first
  680. {
  681. "user_id": <user_id>,
  682. "display_name": <display_name>,
  683. "avatar_url": <avatar_url>
  684. }
  685. ]
  686. }
  687. """
  688. if self.hs.config.userdirectory.user_directory_search_all_users:
  689. join_args = (user_id,)
  690. where_clause = "user_id != ?"
  691. else:
  692. join_args = (user_id,)
  693. where_clause = """
  694. (
  695. EXISTS (select 1 from users_in_public_rooms WHERE user_id = t.user_id)
  696. OR EXISTS (
  697. SELECT 1 FROM users_who_share_private_rooms
  698. WHERE user_id = ? AND other_user_id = t.user_id
  699. )
  700. )
  701. """
  702. # We allow manipulating the ranking algorithm by injecting statements
  703. # based on config options.
  704. additional_ordering_statements = []
  705. ordering_arguments: Tuple[str, ...] = ()
  706. if isinstance(self.database_engine, PostgresEngine):
  707. full_query, exact_query, prefix_query = _parse_query_postgres(search_term)
  708. # If enabled, this config option will rank local users higher than those on
  709. # remote instances.
  710. if self._prefer_local_users_in_search:
  711. # This statement checks whether a given user's user ID contains a server name
  712. # that matches the local server
  713. statement = "* (CASE WHEN user_id LIKE ? THEN 2.0 ELSE 1.0 END)"
  714. additional_ordering_statements.append(statement)
  715. ordering_arguments += ("%:" + self._server_name,)
  716. # We order by rank and then if they have profile info
  717. # The ranking algorithm is hand tweaked for "best" results. Broadly
  718. # the idea is we give a higher weight to exact matches.
  719. # The array of numbers are the weights for the various part of the
  720. # search: (domain, _, display name, localpart)
  721. sql = """
  722. SELECT d.user_id AS user_id, display_name, avatar_url
  723. FROM user_directory_search as t
  724. INNER JOIN user_directory AS d USING (user_id)
  725. WHERE
  726. %(where_clause)s
  727. AND vector @@ to_tsquery('simple', ?)
  728. ORDER BY
  729. (CASE WHEN d.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END)
  730. * (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END)
  731. * (CASE WHEN avatar_url IS NOT NULL THEN 1.2 ELSE 1.0 END)
  732. * (
  733. 3 * ts_rank_cd(
  734. '{0.1, 0.1, 0.9, 1.0}',
  735. vector,
  736. to_tsquery('simple', ?),
  737. 8
  738. )
  739. + ts_rank_cd(
  740. '{0.1, 0.1, 0.9, 1.0}',
  741. vector,
  742. to_tsquery('simple', ?),
  743. 8
  744. )
  745. )
  746. %(order_case_statements)s
  747. DESC,
  748. display_name IS NULL,
  749. avatar_url IS NULL
  750. LIMIT ?
  751. """ % {
  752. "where_clause": where_clause,
  753. "order_case_statements": " ".join(additional_ordering_statements),
  754. }
  755. args = (
  756. join_args
  757. + (full_query, exact_query, prefix_query)
  758. + ordering_arguments
  759. + (limit + 1,)
  760. )
  761. elif isinstance(self.database_engine, Sqlite3Engine):
  762. search_query = _parse_query_sqlite(search_term)
  763. # If enabled, this config option will rank local users higher than those on
  764. # remote instances.
  765. if self._prefer_local_users_in_search:
  766. # This statement checks whether a given user's user ID contains a server name
  767. # that matches the local server
  768. #
  769. # Note that we need to include a comma at the end for valid SQL
  770. statement = "user_id LIKE ? DESC,"
  771. additional_ordering_statements.append(statement)
  772. ordering_arguments += ("%:" + self._server_name,)
  773. sql = """
  774. SELECT d.user_id AS user_id, display_name, avatar_url
  775. FROM user_directory_search as t
  776. INNER JOIN user_directory AS d USING (user_id)
  777. WHERE
  778. %(where_clause)s
  779. AND value MATCH ?
  780. ORDER BY
  781. rank(matchinfo(user_directory_search)) DESC,
  782. %(order_statements)s
  783. display_name IS NULL,
  784. avatar_url IS NULL
  785. LIMIT ?
  786. """ % {
  787. "where_clause": where_clause,
  788. "order_statements": " ".join(additional_ordering_statements),
  789. }
  790. args = join_args + (search_query,) + ordering_arguments + (limit + 1,)
  791. else:
  792. # This should be unreachable.
  793. raise Exception("Unrecognized database engine")
  794. results = cast(
  795. List[UserProfile],
  796. await self.db_pool.execute(
  797. "search_user_dir", self.db_pool.cursor_to_dict, sql, *args
  798. ),
  799. )
  800. limited = len(results) > limit
  801. return {"limited": limited, "results": results}
  802. def _parse_query_sqlite(search_term: str) -> str:
  803. """Takes a plain unicode string from the user and converts it into a form
  804. that can be passed to database.
  805. We use this so that we can add prefix matching, which isn't something
  806. that is supported by default.
  807. We specifically add both a prefix and non prefix matching term so that
  808. exact matches get ranked higher.
  809. """
  810. # Pull out the individual words, discarding any non-word characters.
  811. results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
  812. return " & ".join("(%s* OR %s)" % (result, result) for result in results)
  813. def _parse_query_postgres(search_term: str) -> Tuple[str, str, str]:
  814. """Takes a plain unicode string from the user and converts it into a form
  815. that can be passed to database.
  816. We use this so that we can add prefix matching, which isn't something
  817. that is supported by default.
  818. """
  819. # Pull out the individual words, discarding any non-word characters.
  820. results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
  821. both = " & ".join("(%s:* | %s)" % (result, result) for result in results)
  822. exact = " & ".join("%s" % (result,) for result in results)
  823. prefix = " & ".join("%s:*" % (result,) for result in results)
  824. return both, exact, prefix