user_directory.py 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164
  1. # Copyright 2017 Vector Creations Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import re
  16. import unicodedata
  17. from typing import (
  18. TYPE_CHECKING,
  19. Iterable,
  20. List,
  21. Mapping,
  22. Optional,
  23. Sequence,
  24. Set,
  25. Tuple,
  26. cast,
  27. )
  28. try:
  29. # Figure out if ICU support is available for searching users.
  30. import icu
  31. USE_ICU = True
  32. except ModuleNotFoundError:
  33. USE_ICU = False
  34. from typing_extensions import TypedDict
  35. from synapse.api.errors import StoreError
  36. from synapse.util.stringutils import non_null_str_or_none
  37. if TYPE_CHECKING:
  38. from synapse.server import HomeServer
  39. from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules
  40. from synapse.storage.database import (
  41. DatabasePool,
  42. LoggingDatabaseConnection,
  43. LoggingTransaction,
  44. )
  45. from synapse.storage.databases.main.state import StateFilter
  46. from synapse.storage.databases.main.state_deltas import StateDeltasStore
  47. from synapse.storage.engines import PostgresEngine, Sqlite3Engine
  48. from synapse.types import (
  49. JsonDict,
  50. UserID,
  51. UserProfile,
  52. get_domain_from_id,
  53. get_localpart_from_id,
  54. )
  55. from synapse.util.caches.descriptors import cached
  56. logger = logging.getLogger(__name__)
  57. TEMP_TABLE = "_temp_populate_user_directory"
  58. class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
  59. # How many records do we calculate before sending it to
  60. # add_users_who_share_private_rooms?
  61. SHARE_PRIVATE_WORKING_SET = 500
  62. def __init__(
  63. self,
  64. database: DatabasePool,
  65. db_conn: LoggingDatabaseConnection,
  66. hs: "HomeServer",
  67. ) -> None:
  68. super().__init__(database, db_conn, hs)
  69. self.server_name: str = hs.hostname
  70. self.db_pool.updates.register_background_update_handler(
  71. "populate_user_directory_createtables",
  72. self._populate_user_directory_createtables,
  73. )
  74. self.db_pool.updates.register_background_update_handler(
  75. "populate_user_directory_process_rooms",
  76. self._populate_user_directory_process_rooms,
  77. )
  78. self.db_pool.updates.register_background_update_handler(
  79. "populate_user_directory_process_users",
  80. self._populate_user_directory_process_users,
  81. )
  82. self.db_pool.updates.register_background_update_handler(
  83. "populate_user_directory_cleanup", self._populate_user_directory_cleanup
  84. )
  85. async def _populate_user_directory_createtables(
  86. self, progress: JsonDict, batch_size: int
  87. ) -> int:
  88. # Get all the rooms that we want to process.
  89. def _make_staging_area(txn: LoggingTransaction) -> None:
  90. sql = (
  91. "CREATE TABLE IF NOT EXISTS "
  92. + TEMP_TABLE
  93. + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
  94. )
  95. txn.execute(sql)
  96. sql = (
  97. "CREATE TABLE IF NOT EXISTS "
  98. + TEMP_TABLE
  99. + "_position(position TEXT NOT NULL)"
  100. )
  101. txn.execute(sql)
  102. # Get rooms we want to process from the database
  103. sql = """
  104. SELECT room_id, count(*) FROM current_state_events
  105. GROUP BY room_id
  106. """
  107. txn.execute(sql)
  108. rooms = list(txn.fetchall())
  109. self.db_pool.simple_insert_many_txn(
  110. txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms
  111. )
  112. del rooms
  113. sql = (
  114. "CREATE TABLE IF NOT EXISTS "
  115. + TEMP_TABLE
  116. + "_users(user_id TEXT NOT NULL)"
  117. )
  118. txn.execute(sql)
  119. txn.execute("SELECT name FROM users")
  120. users = list(txn.fetchall())
  121. self.db_pool.simple_insert_many_txn(
  122. txn, TEMP_TABLE + "_users", keys=("user_id",), values=users
  123. )
  124. new_pos = await self.get_max_stream_id_in_current_state_deltas()
  125. await self.db_pool.runInteraction(
  126. "populate_user_directory_temp_build", _make_staging_area
  127. )
  128. await self.db_pool.simple_insert(
  129. TEMP_TABLE + "_position", {"position": new_pos}
  130. )
  131. await self.db_pool.updates._end_background_update(
  132. "populate_user_directory_createtables"
  133. )
  134. return 1
  135. async def _populate_user_directory_cleanup(
  136. self,
  137. progress: JsonDict,
  138. batch_size: int,
  139. ) -> int:
  140. """
  141. Update the user directory stream position, then clean up the old tables.
  142. """
  143. position = await self.db_pool.simple_select_one_onecol(
  144. TEMP_TABLE + "_position", {}, "position"
  145. )
  146. await self.update_user_directory_stream_pos(position)
  147. def _delete_staging_area(txn: LoggingTransaction) -> None:
  148. txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
  149. txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
  150. txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
  151. await self.db_pool.runInteraction(
  152. "populate_user_directory_cleanup", _delete_staging_area
  153. )
  154. await self.db_pool.updates._end_background_update(
  155. "populate_user_directory_cleanup"
  156. )
  157. return 1
  158. async def _populate_user_directory_process_rooms(
  159. self, progress: JsonDict, batch_size: int
  160. ) -> int:
  161. """
  162. Rescan the state of all rooms so we can track
  163. - who's in a public room;
  164. - which local users share a private room with other users (local
  165. and remote); and
  166. - who should be in the user_directory.
  167. Args:
  168. progress
  169. batch_size: Maximum number of state events to process per cycle.
  170. Returns:
  171. number of events processed.
  172. """
  173. # If we don't have progress filed, delete everything.
  174. if not progress:
  175. await self.delete_all_from_user_dir()
  176. def _get_next_batch(
  177. txn: LoggingTransaction,
  178. ) -> Optional[Sequence[Tuple[str, int]]]:
  179. # Only fetch 250 rooms, so we don't fetch too many at once, even
  180. # if those 250 rooms have less than batch_size state events.
  181. sql = """
  182. SELECT room_id, events FROM %s
  183. ORDER BY events DESC
  184. LIMIT 250
  185. """ % (
  186. TEMP_TABLE + "_rooms",
  187. )
  188. txn.execute(sql)
  189. rooms_to_work_on = cast(List[Tuple[str, int]], txn.fetchall())
  190. if not rooms_to_work_on:
  191. return None
  192. # Get how many are left to process, so we can give status on how
  193. # far we are in processing
  194. txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
  195. result = txn.fetchone()
  196. assert result is not None
  197. progress["remaining"] = result[0]
  198. return rooms_to_work_on
  199. rooms_to_work_on = await self.db_pool.runInteraction(
  200. "populate_user_directory_temp_read", _get_next_batch
  201. )
  202. # No more rooms -- complete the transaction.
  203. if not rooms_to_work_on:
  204. await self.db_pool.updates._end_background_update(
  205. "populate_user_directory_process_rooms"
  206. )
  207. return 1
  208. logger.debug(
  209. "Processing the next %d rooms of %d remaining"
  210. % (len(rooms_to_work_on), progress["remaining"])
  211. )
  212. processed_event_count = 0
  213. for room_id, event_count in rooms_to_work_on:
  214. is_in_room = await self.is_host_joined(room_id, self.server_name) # type: ignore[attr-defined]
  215. if is_in_room:
  216. users_with_profile = await self.get_users_in_room_with_profiles(room_id) # type: ignore[attr-defined]
  217. # Throw away users excluded from the directory.
  218. users_with_profile = {
  219. user_id: profile
  220. for user_id, profile in users_with_profile.items()
  221. if not self.hs.is_mine_id(user_id)
  222. or await self.should_include_local_user_in_dir(user_id)
  223. }
  224. # Upsert a user_directory record for each remote user we see.
  225. for user_id, profile in users_with_profile.items():
  226. # Local users are processed separately in
  227. # `_populate_user_directory_users`; there we can read from
  228. # the `profiles` table to ensure we don't leak their per-room
  229. # profiles. It also means we write local users to this table
  230. # exactly once, rather than once for every room they're in.
  231. if self.hs.is_mine_id(user_id):
  232. continue
  233. # TODO `users_with_profile` above reads from the `user_directory`
  234. # table, meaning that `profile` is bespoke to this room.
  235. # and this leaks remote users' per-room profiles to the user directory.
  236. await self.update_profile_in_user_dir(
  237. user_id, profile.display_name, profile.avatar_url
  238. )
  239. # Now update the room sharing tables to include this room.
  240. is_public = await self.is_room_world_readable_or_publicly_joinable(
  241. room_id
  242. )
  243. if is_public:
  244. if users_with_profile:
  245. await self.add_users_in_public_rooms(
  246. room_id, users_with_profile.keys()
  247. )
  248. else:
  249. to_insert = set()
  250. for user_id in users_with_profile:
  251. # We want the set of pairs (L, M) where L and M are
  252. # in `users_with_profile` and L is local.
  253. # Do so by looking for the local user L first.
  254. if not self.hs.is_mine_id(user_id):
  255. continue
  256. for other_user_id in users_with_profile:
  257. if user_id == other_user_id:
  258. continue
  259. user_set = (user_id, other_user_id)
  260. to_insert.add(user_set)
  261. # If it gets too big, stop and write to the database
  262. # to prevent storing too much in RAM.
  263. if len(to_insert) >= self.SHARE_PRIVATE_WORKING_SET:
  264. await self.add_users_who_share_private_room(
  265. room_id, to_insert
  266. )
  267. to_insert.clear()
  268. if to_insert:
  269. await self.add_users_who_share_private_room(room_id, to_insert)
  270. to_insert.clear()
  271. # We've finished a room. Delete it from the table.
  272. await self.db_pool.simple_delete_one(
  273. TEMP_TABLE + "_rooms", {"room_id": room_id}
  274. )
  275. # Update the remaining counter.
  276. progress["remaining"] -= 1
  277. await self.db_pool.runInteraction(
  278. "populate_user_directory",
  279. self.db_pool.updates._background_update_progress_txn,
  280. "populate_user_directory_process_rooms",
  281. progress,
  282. )
  283. processed_event_count += event_count
  284. if processed_event_count > batch_size:
  285. # Don't process any more rooms, we've hit our batch size.
  286. return processed_event_count
  287. return processed_event_count
  288. async def _populate_user_directory_process_users(
  289. self, progress: JsonDict, batch_size: int
  290. ) -> int:
  291. """
  292. Add all local users to the user directory.
  293. """
  294. def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:
  295. sql = "SELECT user_id FROM %s LIMIT %s" % (
  296. TEMP_TABLE + "_users",
  297. str(batch_size),
  298. )
  299. txn.execute(sql)
  300. user_result = cast(List[Tuple[str]], txn.fetchall())
  301. if not user_result:
  302. return None
  303. users_to_work_on = [x[0] for x in user_result]
  304. # Get how many are left to process, so we can give status on how
  305. # far we are in processing
  306. sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
  307. txn.execute(sql)
  308. count_result = txn.fetchone()
  309. assert count_result is not None
  310. progress["remaining"] = count_result[0]
  311. return users_to_work_on
  312. users_to_work_on = await self.db_pool.runInteraction(
  313. "populate_user_directory_temp_read", _get_next_batch
  314. )
  315. # No more users -- complete the transaction.
  316. if not users_to_work_on:
  317. await self.db_pool.updates._end_background_update(
  318. "populate_user_directory_process_users"
  319. )
  320. return 1
  321. logger.debug(
  322. "Processing the next %d users of %d remaining"
  323. % (len(users_to_work_on), progress["remaining"])
  324. )
  325. for user_id in users_to_work_on:
  326. if await self.should_include_local_user_in_dir(user_id):
  327. profile = await self.get_profileinfo(get_localpart_from_id(user_id)) # type: ignore[attr-defined]
  328. await self.update_profile_in_user_dir(
  329. user_id, profile.display_name, profile.avatar_url
  330. )
  331. # We've finished processing a user. Delete it from the table.
  332. await self.db_pool.simple_delete_one(
  333. TEMP_TABLE + "_users", {"user_id": user_id}
  334. )
  335. # Update the remaining counter.
  336. progress["remaining"] -= 1
  337. await self.db_pool.runInteraction(
  338. "populate_user_directory",
  339. self.db_pool.updates._background_update_progress_txn,
  340. "populate_user_directory_process_users",
  341. progress,
  342. )
  343. return len(users_to_work_on)
  344. async def should_include_local_user_in_dir(self, user: str) -> bool:
  345. """Certain classes of local user are omitted from the user directory.
  346. Is this user one of them?
  347. """
  348. # We're opting to exclude the appservice sender (user defined by the
  349. # `sender_localpart` in the appservice registration) even though
  350. # technically it could be DM-able. In the future, this could potentially
  351. # be configurable per-appservice whether the appservice sender can be
  352. # contacted.
  353. if self.get_app_service_by_user_id(user) is not None: # type: ignore[attr-defined]
  354. return False
  355. # We're opting to exclude appservice users (anyone matching the user
  356. # namespace regex in the appservice registration) even though technically
  357. # they could be DM-able. In the future, this could potentially
  358. # be configurable per-appservice whether the appservice users can be
  359. # contacted.
  360. if self.get_if_app_services_interested_in_user(user): # type: ignore[attr-defined]
  361. # TODO we might want to make this configurable for each app service
  362. return False
  363. # Support users are for diagnostics and should not appear in the user directory.
  364. if await self.is_support_user(user): # type: ignore[attr-defined]
  365. return False
  366. # Deactivated users aren't contactable, so should not appear in the user directory.
  367. try:
  368. if await self.get_user_deactivated_status(user): # type: ignore[attr-defined]
  369. return False
  370. except StoreError:
  371. # No such user in the users table. No need to do this when calling
  372. # is_support_user---that returns False if the user is missing.
  373. return False
  374. return True
  375. async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool:
  376. """Check if the room is either world_readable or publically joinable"""
  377. # Create a state filter that only queries join and history state event
  378. types_to_filter = (
  379. (EventTypes.JoinRules, ""),
  380. (EventTypes.RoomHistoryVisibility, ""),
  381. )
  382. # Getting the partial state is fine, as we're not looking at membership
  383. # events.
  384. current_state_ids = await self.get_partial_filtered_current_state_ids( # type: ignore[attr-defined]
  385. room_id, StateFilter.from_types(types_to_filter)
  386. )
  387. join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
  388. if join_rules_id:
  389. join_rule_ev = await self.get_event(join_rules_id, allow_none=True) # type: ignore[attr-defined]
  390. if join_rule_ev:
  391. if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC:
  392. return True
  393. hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, ""))
  394. if hist_vis_id:
  395. hist_vis_ev = await self.get_event(hist_vis_id, allow_none=True) # type: ignore[attr-defined]
  396. if hist_vis_ev:
  397. if (
  398. hist_vis_ev.content.get("history_visibility")
  399. == HistoryVisibility.WORLD_READABLE
  400. ):
  401. return True
  402. return False
  403. async def set_remote_user_profile_in_user_dir_stale(
  404. self, user_id: str, next_try_at_ms: int, retry_counter: int
  405. ) -> None:
  406. """
  407. Marks a remote user as having a possibly-stale user directory profile.
  408. Args:
  409. user_id: the remote user who may have a stale profile on this server.
  410. next_try_at_ms: timestamp in ms after which the user directory profile can be
  411. refreshed.
  412. retry_counter: number of failures in refreshing the profile so far. Used for
  413. exponential backoff calculations.
  414. """
  415. assert not self.hs.is_mine_id(
  416. user_id
  417. ), "Can't mark a local user as a stale remote user."
  418. server_name = UserID.from_string(user_id).domain
  419. await self.db_pool.simple_upsert(
  420. table="user_directory_stale_remote_users",
  421. keyvalues={"user_id": user_id},
  422. values={
  423. "next_try_at_ts": next_try_at_ms,
  424. "retry_counter": retry_counter,
  425. "user_server_name": server_name,
  426. },
  427. desc="set_remote_user_profile_in_user_dir_stale",
  428. )
  429. async def clear_remote_user_profile_in_user_dir_stale(self, user_id: str) -> None:
  430. """
  431. Marks a remote user as no longer having a possibly-stale user directory profile.
  432. Args:
  433. user_id: the remote user who no longer has a stale profile on this server.
  434. """
  435. await self.db_pool.simple_delete(
  436. table="user_directory_stale_remote_users",
  437. keyvalues={"user_id": user_id},
  438. desc="clear_remote_user_profile_in_user_dir_stale",
  439. )
  440. async def get_remote_servers_with_profiles_to_refresh(
  441. self, now_ts: int, limit: int
  442. ) -> List[str]:
  443. """
  444. Get a list of up to `limit` server names which have users whose
  445. locally-cached profiles we believe to be stale
  446. and are refreshable given the current time `now_ts` in milliseconds.
  447. """
  448. def _get_remote_servers_with_refreshable_profiles_txn(
  449. txn: LoggingTransaction,
  450. ) -> List[str]:
  451. sql = """
  452. SELECT user_server_name
  453. FROM user_directory_stale_remote_users
  454. WHERE next_try_at_ts < ?
  455. GROUP BY user_server_name
  456. ORDER BY MIN(next_try_at_ts), user_server_name
  457. LIMIT ?
  458. """
  459. txn.execute(sql, (now_ts, limit))
  460. return [row[0] for row in txn]
  461. return await self.db_pool.runInteraction(
  462. "get_remote_servers_with_profiles_to_refresh",
  463. _get_remote_servers_with_refreshable_profiles_txn,
  464. )
  465. async def get_remote_users_to_refresh_on_server(
  466. self, server_name: str, now_ts: int, limit: int
  467. ) -> List[Tuple[str, int, int]]:
  468. """
  469. Get a list of up to `limit` user IDs from the server `server_name`
  470. whose locally-cached profiles we believe to be stale
  471. and are refreshable given the current time `now_ts` in milliseconds.
  472. Returns:
  473. tuple of:
  474. - User ID
  475. - Retry counter (number of failures so far)
  476. - Time the retry is scheduled for, in milliseconds
  477. """
  478. def _get_remote_users_to_refresh_on_server_txn(
  479. txn: LoggingTransaction,
  480. ) -> List[Tuple[str, int, int]]:
  481. sql = """
  482. SELECT user_id, retry_counter, next_try_at_ts
  483. FROM user_directory_stale_remote_users
  484. WHERE user_server_name = ? AND next_try_at_ts < ?
  485. ORDER BY next_try_at_ts
  486. LIMIT ?
  487. """
  488. txn.execute(sql, (server_name, now_ts, limit))
  489. return cast(List[Tuple[str, int, int]], txn.fetchall())
  490. return await self.db_pool.runInteraction(
  491. "get_remote_users_to_refresh_on_server",
  492. _get_remote_users_to_refresh_on_server_txn,
  493. )
  494. async def update_profile_in_user_dir(
  495. self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
  496. ) -> None:
  497. """
  498. Update or add a user's profile in the user directory.
  499. If the user is remote, the profile will be marked as not stale.
  500. """
  501. # If the display name or avatar URL are unexpected types, replace with None.
  502. display_name = non_null_str_or_none(display_name)
  503. avatar_url = non_null_str_or_none(avatar_url)
  504. def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None:
  505. self.db_pool.simple_upsert_txn(
  506. txn,
  507. table="user_directory",
  508. keyvalues={"user_id": user_id},
  509. values={"display_name": display_name, "avatar_url": avatar_url},
  510. )
  511. if not self.hs.is_mine_id(user_id):
  512. # Remote users: Make sure the profile is not marked as stale anymore.
  513. self.db_pool.simple_delete_txn(
  514. txn,
  515. table="user_directory_stale_remote_users",
  516. keyvalues={"user_id": user_id},
  517. )
  518. # The display name that goes into the database index.
  519. index_display_name = display_name
  520. if index_display_name is not None:
  521. index_display_name = _filter_text_for_index(index_display_name)
  522. if isinstance(self.database_engine, PostgresEngine):
  523. # We weight the localpart most highly, then display name and finally
  524. # server name
  525. sql = """
  526. INSERT INTO user_directory_search(user_id, vector)
  527. VALUES (?,
  528. setweight(to_tsvector('simple', ?), 'A')
  529. || setweight(to_tsvector('simple', ?), 'D')
  530. || setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
  531. ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
  532. """
  533. txn.execute(
  534. sql,
  535. (
  536. user_id,
  537. get_localpart_from_id(user_id),
  538. get_domain_from_id(user_id),
  539. index_display_name,
  540. ),
  541. )
  542. elif isinstance(self.database_engine, Sqlite3Engine):
  543. value = (
  544. "%s %s" % (user_id, index_display_name)
  545. if index_display_name
  546. else user_id
  547. )
  548. self.db_pool.simple_upsert_txn(
  549. txn,
  550. table="user_directory_search",
  551. keyvalues={"user_id": user_id},
  552. values={"value": value},
  553. )
  554. else:
  555. # This should be unreachable.
  556. raise Exception("Unrecognized database engine")
  557. txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
  558. await self.db_pool.runInteraction(
  559. "update_profile_in_user_dir", _update_profile_in_user_dir_txn
  560. )
  561. async def add_users_who_share_private_room(
  562. self, room_id: str, user_id_tuples: Iterable[Tuple[str, str]]
  563. ) -> None:
  564. """Insert entries into the users_who_share_private_rooms table. The first
  565. user should be a local user.
  566. Args:
  567. room_id
  568. user_id_tuples: iterable of 2-tuple of user IDs.
  569. """
  570. await self.db_pool.simple_upsert_many(
  571. table="users_who_share_private_rooms",
  572. key_names=["user_id", "other_user_id", "room_id"],
  573. key_values=[
  574. (user_id, other_user_id, room_id)
  575. for user_id, other_user_id in user_id_tuples
  576. ],
  577. value_names=(),
  578. value_values=(),
  579. desc="add_users_who_share_room",
  580. )
  581. async def add_users_in_public_rooms(
  582. self, room_id: str, user_ids: Iterable[str]
  583. ) -> None:
  584. """Insert entries into the users_in_public_rooms table.
  585. Args:
  586. room_id
  587. user_ids
  588. """
  589. await self.db_pool.simple_upsert_many(
  590. table="users_in_public_rooms",
  591. key_names=["user_id", "room_id"],
  592. key_values=[(user_id, room_id) for user_id in user_ids],
  593. value_names=(),
  594. value_values=(),
  595. desc="add_users_in_public_rooms",
  596. )
  597. async def delete_all_from_user_dir(self) -> None:
  598. """Delete the entire user directory"""
  599. def _delete_all_from_user_dir_txn(txn: LoggingTransaction) -> None:
  600. # SQLite doesn't support TRUNCATE.
  601. # On Postgres, DELETE FROM does a table scan but TRUNCATE is more efficient.
  602. truncate = (
  603. "DELETE FROM"
  604. if isinstance(self.database_engine, Sqlite3Engine)
  605. else "TRUNCATE"
  606. )
  607. txn.execute(f"{truncate} user_directory")
  608. txn.execute(f"{truncate} user_directory_search")
  609. txn.execute(f"{truncate} users_in_public_rooms")
  610. txn.execute(f"{truncate} users_who_share_private_rooms")
  611. txn.call_after(self.get_user_in_directory.invalidate_all)
  612. await self.db_pool.runInteraction(
  613. "delete_all_from_user_dir", _delete_all_from_user_dir_txn
  614. )
  615. @cached()
  616. async def get_user_in_directory(self, user_id: str) -> Optional[Mapping[str, str]]:
  617. return await self.db_pool.simple_select_one(
  618. table="user_directory",
  619. keyvalues={"user_id": user_id},
  620. retcols=("display_name", "avatar_url"),
  621. allow_none=True,
  622. desc="get_user_in_directory",
  623. )
  624. async def update_user_directory_stream_pos(self, stream_id: Optional[int]) -> None:
  625. await self.db_pool.simple_update_one(
  626. table="user_directory_stream_pos",
  627. keyvalues={},
  628. updatevalues={"stream_id": stream_id},
  629. desc="update_user_directory_stream_pos",
  630. )
  631. class SearchResult(TypedDict):
  632. limited: bool
  633. results: List[UserProfile]
  634. class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
  635. # How many records do we calculate before sending it to
  636. # add_users_who_share_private_rooms?
  637. SHARE_PRIVATE_WORKING_SET = 500
  638. def __init__(
  639. self,
  640. database: DatabasePool,
  641. db_conn: LoggingDatabaseConnection,
  642. hs: "HomeServer",
  643. ) -> None:
  644. super().__init__(database, db_conn, hs)
  645. self._prefer_local_users_in_search = (
  646. hs.config.userdirectory.user_directory_search_prefer_local_users
  647. )
  648. self._server_name = hs.config.server.server_name
  649. async def remove_from_user_dir(self, user_id: str) -> None:
  650. def _remove_from_user_dir_txn(txn: LoggingTransaction) -> None:
  651. self.db_pool.simple_delete_txn(
  652. txn, table="user_directory", keyvalues={"user_id": user_id}
  653. )
  654. self.db_pool.simple_delete_txn(
  655. txn, table="user_directory_search", keyvalues={"user_id": user_id}
  656. )
  657. self.db_pool.simple_delete_txn(
  658. txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
  659. )
  660. self.db_pool.simple_delete_txn(
  661. txn,
  662. table="users_who_share_private_rooms",
  663. keyvalues={"user_id": user_id},
  664. )
  665. self.db_pool.simple_delete_txn(
  666. txn,
  667. table="users_who_share_private_rooms",
  668. keyvalues={"other_user_id": user_id},
  669. )
  670. txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
  671. await self.db_pool.runInteraction(
  672. "remove_from_user_dir", _remove_from_user_dir_txn
  673. )
  674. async def get_users_in_dir_due_to_room(self, room_id: str) -> Set[str]:
  675. """Get all user_ids that are in the room directory because they're
  676. in the given room_id
  677. """
  678. user_ids_share_pub = await self.db_pool.simple_select_onecol(
  679. table="users_in_public_rooms",
  680. keyvalues={"room_id": room_id},
  681. retcol="user_id",
  682. desc="get_users_in_dir_due_to_room",
  683. )
  684. user_ids_share_priv = await self.db_pool.simple_select_onecol(
  685. table="users_who_share_private_rooms",
  686. keyvalues={"room_id": room_id},
  687. retcol="other_user_id",
  688. desc="get_users_in_dir_due_to_room",
  689. )
  690. user_ids = set(user_ids_share_pub)
  691. user_ids.update(user_ids_share_priv)
  692. return user_ids
  693. async def remove_user_who_share_room(self, user_id: str, room_id: str) -> None:
  694. """
  695. Deletes entries in the users_who_share_*_rooms table. The first
  696. user should be a local user.
  697. Args:
  698. user_id
  699. room_id
  700. """
  701. def _remove_user_who_share_room_txn(txn: LoggingTransaction) -> None:
  702. self.db_pool.simple_delete_txn(
  703. txn,
  704. table="users_who_share_private_rooms",
  705. keyvalues={"user_id": user_id, "room_id": room_id},
  706. )
  707. self.db_pool.simple_delete_txn(
  708. txn,
  709. table="users_who_share_private_rooms",
  710. keyvalues={"other_user_id": user_id, "room_id": room_id},
  711. )
  712. self.db_pool.simple_delete_txn(
  713. txn,
  714. table="users_in_public_rooms",
  715. keyvalues={"user_id": user_id, "room_id": room_id},
  716. )
  717. await self.db_pool.runInteraction(
  718. "remove_user_who_share_room", _remove_user_who_share_room_txn
  719. )
  720. async def get_user_dir_rooms_user_is_in(self, user_id: str) -> List[str]:
  721. """
  722. Returns the rooms that a user is in.
  723. Args:
  724. user_id: Must be a local user
  725. Returns:
  726. List of room IDs
  727. """
  728. rows = await self.db_pool.simple_select_onecol(
  729. table="users_who_share_private_rooms",
  730. keyvalues={"user_id": user_id},
  731. retcol="room_id",
  732. desc="get_rooms_user_is_in",
  733. )
  734. pub_rows = await self.db_pool.simple_select_onecol(
  735. table="users_in_public_rooms",
  736. keyvalues={"user_id": user_id},
  737. retcol="room_id",
  738. desc="get_rooms_user_is_in",
  739. )
  740. users = set(pub_rows)
  741. users.update(rows)
  742. return list(users)
  743. async def get_user_directory_stream_pos(self) -> Optional[int]:
  744. """
  745. Get the stream ID of the user directory stream.
  746. Returns:
  747. The stream token or None if the initial background update hasn't happened yet.
  748. """
  749. return await self.db_pool.simple_select_one_onecol(
  750. table="user_directory_stream_pos",
  751. keyvalues={},
  752. retcol="stream_id",
  753. desc="get_user_directory_stream_pos",
  754. )
  755. async def search_user_dir(
  756. self, user_id: str, search_term: str, limit: int
  757. ) -> SearchResult:
  758. """Searches for users in directory
  759. Returns:
  760. dict of the form::
  761. {
  762. "limited": <bool>, # whether there were more results or not
  763. "results": [ # Ordered by best match first
  764. {
  765. "user_id": <user_id>,
  766. "display_name": <display_name>,
  767. "avatar_url": <avatar_url>
  768. }
  769. ]
  770. }
  771. """
  772. if self.hs.config.userdirectory.user_directory_search_all_users:
  773. join_args = (user_id,)
  774. where_clause = "user_id != ?"
  775. else:
  776. join_args = (user_id,)
  777. where_clause = """
  778. (
  779. EXISTS (select 1 from users_in_public_rooms WHERE user_id = t.user_id)
  780. OR EXISTS (
  781. SELECT 1 FROM users_who_share_private_rooms
  782. WHERE user_id = ? AND other_user_id = t.user_id
  783. )
  784. )
  785. """
  786. # We allow manipulating the ranking algorithm by injecting statements
  787. # based on config options.
  788. additional_ordering_statements = []
  789. ordering_arguments: Tuple[str, ...] = ()
  790. if isinstance(self.database_engine, PostgresEngine):
  791. full_query, exact_query, prefix_query = _parse_query_postgres(search_term)
  792. # If enabled, this config option will rank local users higher than those on
  793. # remote instances.
  794. if self._prefer_local_users_in_search:
  795. # This statement checks whether a given user's user ID contains a server name
  796. # that matches the local server
  797. statement = "* (CASE WHEN user_id LIKE ? THEN 2.0 ELSE 1.0 END)"
  798. additional_ordering_statements.append(statement)
  799. ordering_arguments += ("%:" + self._server_name,)
  800. # We order by rank and then if they have profile info
  801. # The ranking algorithm is hand tweaked for "best" results. Broadly
  802. # the idea is we give a higher weight to exact matches.
  803. # The array of numbers are the weights for the various part of the
  804. # search: (domain, _, display name, localpart)
  805. sql = """
  806. SELECT d.user_id AS user_id, display_name, avatar_url
  807. FROM user_directory_search as t
  808. INNER JOIN user_directory AS d USING (user_id)
  809. WHERE
  810. %(where_clause)s
  811. AND vector @@ to_tsquery('simple', ?)
  812. ORDER BY
  813. (CASE WHEN d.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END)
  814. * (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END)
  815. * (CASE WHEN avatar_url IS NOT NULL THEN 1.2 ELSE 1.0 END)
  816. * (
  817. 3 * ts_rank_cd(
  818. '{0.1, 0.1, 0.9, 1.0}',
  819. vector,
  820. to_tsquery('simple', ?),
  821. 8
  822. )
  823. + ts_rank_cd(
  824. '{0.1, 0.1, 0.9, 1.0}',
  825. vector,
  826. to_tsquery('simple', ?),
  827. 8
  828. )
  829. )
  830. %(order_case_statements)s
  831. DESC,
  832. display_name IS NULL,
  833. avatar_url IS NULL
  834. LIMIT ?
  835. """ % {
  836. "where_clause": where_clause,
  837. "order_case_statements": " ".join(additional_ordering_statements),
  838. }
  839. args = (
  840. join_args
  841. + (full_query, exact_query, prefix_query)
  842. + ordering_arguments
  843. + (limit + 1,)
  844. )
  845. elif isinstance(self.database_engine, Sqlite3Engine):
  846. search_query = _parse_query_sqlite(search_term)
  847. # If enabled, this config option will rank local users higher than those on
  848. # remote instances.
  849. if self._prefer_local_users_in_search:
  850. # This statement checks whether a given user's user ID contains a server name
  851. # that matches the local server
  852. #
  853. # Note that we need to include a comma at the end for valid SQL
  854. statement = "user_id LIKE ? DESC,"
  855. additional_ordering_statements.append(statement)
  856. ordering_arguments += ("%:" + self._server_name,)
  857. sql = """
  858. SELECT d.user_id AS user_id, display_name, avatar_url
  859. FROM user_directory_search as t
  860. INNER JOIN user_directory AS d USING (user_id)
  861. WHERE
  862. %(where_clause)s
  863. AND value MATCH ?
  864. ORDER BY
  865. rank(matchinfo(user_directory_search)) DESC,
  866. %(order_statements)s
  867. display_name IS NULL,
  868. avatar_url IS NULL
  869. LIMIT ?
  870. """ % {
  871. "where_clause": where_clause,
  872. "order_statements": " ".join(additional_ordering_statements),
  873. }
  874. args = join_args + (search_query,) + ordering_arguments + (limit + 1,)
  875. else:
  876. # This should be unreachable.
  877. raise Exception("Unrecognized database engine")
  878. results = cast(
  879. List[UserProfile],
  880. await self.db_pool.execute(
  881. "search_user_dir", self.db_pool.cursor_to_dict, sql, *args
  882. ),
  883. )
  884. limited = len(results) > limit
  885. return {"limited": limited, "results": results[0:limit]}
  886. def _filter_text_for_index(text: str) -> str:
  887. """Transforms text before it is inserted into the user directory index, or searched
  888. for in the user directory index.
  889. Note that the user directory search table needs to be rebuilt whenever this function
  890. changes.
  891. """
  892. # Lowercase the text, to make searches case-insensitive.
  893. # This is necessary for both PostgreSQL and SQLite. PostgreSQL's
  894. # `to_tsquery/to_tsvector` functions don't lowercase non-ASCII characters when using
  895. # the "C" collation, while SQLite just doesn't lowercase non-ASCII characters at
  896. # all.
  897. text = text.lower()
  898. # Normalize the text. NFKC normalization has two effects:
  899. # 1. It canonicalizes the text, ie. maps all visually identical strings to the same
  900. # string. For example, ["e", "◌́"] is mapped to ["é"].
  901. # 2. It maps strings that are roughly equivalent to the same string.
  902. # For example, ["dž"] is mapped to ["d", "ž"], ["①"] to ["1"] and ["i⁹"] to
  903. # ["i", "9"].
  904. text = unicodedata.normalize("NFKC", text)
  905. # Note that nothing is done to make searches accent-insensitive.
  906. # That could be achieved by converting to NFKD form instead (with combining accents
  907. # split out) and filtering out combining accents using `unicodedata.combining(c)`.
  908. # The downside of this may be noisier search results, since search terms with
  909. # explicit accents will match characters with no accents, or completely different
  910. # accents.
  911. #
  912. # text = unicodedata.normalize("NFKD", text)
  913. # text = "".join([c for c in text if not unicodedata.combining(c)])
  914. return text
  915. def _parse_query_sqlite(search_term: str) -> str:
  916. """Takes a plain unicode string from the user and converts it into a form
  917. that can be passed to database.
  918. We use this so that we can add prefix matching, which isn't something
  919. that is supported by default.
  920. We specifically add both a prefix and non prefix matching term so that
  921. exact matches get ranked higher.
  922. """
  923. search_term = _filter_text_for_index(search_term)
  924. # Pull out the individual words, discarding any non-word characters.
  925. results = _parse_words(search_term)
  926. return " & ".join("(%s* OR %s)" % (result, result) for result in results)
  927. def _parse_query_postgres(search_term: str) -> Tuple[str, str, str]:
  928. """Takes a plain unicode string from the user and converts it into a form
  929. that can be passed to database.
  930. We use this so that we can add prefix matching, which isn't something
  931. that is supported by default.
  932. """
  933. search_term = _filter_text_for_index(search_term)
  934. escaped_words = []
  935. for word in _parse_words(search_term):
  936. # Postgres tsvector and tsquery quoting rules:
  937. # words potentially containing punctuation should be quoted
  938. # and then existing quotes and backslashes should be doubled
  939. # See: https://www.postgresql.org/docs/current/datatype-textsearch.html#DATATYPE-TSQUERY
  940. quoted_word = word.replace("'", "''").replace("\\", "\\\\")
  941. escaped_words.append(f"'{quoted_word}'")
  942. both = " & ".join("(%s:* | %s)" % (word, word) for word in escaped_words)
  943. exact = " & ".join("%s" % (word,) for word in escaped_words)
  944. prefix = " & ".join("%s:*" % (word,) for word in escaped_words)
  945. return both, exact, prefix
  946. def _parse_words(search_term: str) -> List[str]:
  947. """Split the provided search string into a list of its words.
  948. If support for ICU (International Components for Unicode) is available, use it.
  949. Otherwise, fall back to using a regex to detect word boundaries. This latter
  950. solution works well enough for most latin-based languages, but doesn't work as well
  951. with other languages.
  952. Args:
  953. search_term: The search string.
  954. Returns:
  955. A list of the words in the search string.
  956. """
  957. if USE_ICU:
  958. return _parse_words_with_icu(search_term)
  959. return _parse_words_with_regex(search_term)
  960. def _parse_words_with_regex(search_term: str) -> List[str]:
  961. """
  962. Break down search term into words, when we don't have ICU available.
  963. See: `_parse_words`
  964. """
  965. return re.findall(r"([\w\-]+)", search_term, re.UNICODE)
  966. def _parse_words_with_icu(search_term: str) -> List[str]:
  967. """Break down the provided search string into its individual words using ICU
  968. (International Components for Unicode).
  969. Args:
  970. search_term: The search string.
  971. Returns:
  972. A list of the words in the search string.
  973. """
  974. results = []
  975. breaker = icu.BreakIterator.createWordInstance(icu.Locale.getDefault())
  976. breaker.setText(search_term)
  977. i = 0
  978. while True:
  979. j = breaker.nextBoundary()
  980. if j < 0:
  981. break
  982. result = search_term[i:j]
  983. # libicu considers spaces and punctuation between words as words, but we don't
  984. # want to include those in results as they would result in syntax errors in SQL
  985. # queries (e.g. "foo bar" would result in the search query including "foo & &
  986. # bar").
  987. if len(re.findall(r"([\w\-]+)", result, re.UNICODE)):
  988. results.append(result)
  989. i = j
  990. return results