search.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import itertools
  16. import logging
  17. from unpaddedbase64 import decode_base64, encode_base64
  18. from twisted.internet import defer
  19. from synapse.api.constants import EventTypes, Membership
  20. from synapse.api.errors import SynapseError
  21. from synapse.api.filtering import Filter
  22. from synapse.events.utils import serialize_event
  23. from synapse.storage.state import StateFilter
  24. from synapse.visibility import filter_events_for_client
  25. from ._base import BaseHandler
  26. logger = logging.getLogger(__name__)
  27. class SearchHandler(BaseHandler):
  28. def __init__(self, hs):
  29. super(SearchHandler, self).__init__(hs)
  30. @defer.inlineCallbacks
  31. def get_old_rooms_from_upgraded_room(self, room_id):
  32. """Retrieves room IDs of old rooms in the history of an upgraded room.
  33. We do so by checking the m.room.create event of the room for a
  34. `predecessor` key. If it exists, we add the room ID to our return
  35. list and then check that room for a m.room.create event and so on
  36. until we can no longer find any more previous rooms.
  37. The full list of all found rooms in then returned.
  38. Args:
  39. room_id (str): id of the room to search through.
  40. Returns:
  41. Deferred[iterable[unicode]]: predecessor room ids
  42. """
  43. historical_room_ids = []
  44. while True:
  45. predecessor = yield self.store.get_room_predecessor(room_id)
  46. # If no predecessor, assume we've hit a dead end
  47. if not predecessor:
  48. break
  49. # Add predecessor's room ID
  50. historical_room_ids.append(predecessor["room_id"])
  51. # Scan through the old room for further predecessors
  52. room_id = predecessor["room_id"]
  53. defer.returnValue(historical_room_ids)
  54. @defer.inlineCallbacks
  55. def search(self, user, content, batch=None):
  56. """Performs a full text search for a user.
  57. Args:
  58. user (UserID)
  59. content (dict): Search parameters
  60. batch (str): The next_batch parameter. Used for pagination.
  61. Returns:
  62. dict to be returned to the client with results of search
  63. """
  64. if not self.hs.config.enable_search:
  65. raise SynapseError(400, "Search is disabled on this homeserver")
  66. batch_group = None
  67. batch_group_key = None
  68. batch_token = None
  69. if batch:
  70. try:
  71. b = decode_base64(batch).decode('ascii')
  72. batch_group, batch_group_key, batch_token = b.split("\n")
  73. assert batch_group is not None
  74. assert batch_group_key is not None
  75. assert batch_token is not None
  76. except Exception:
  77. raise SynapseError(400, "Invalid batch")
  78. logger.info(
  79. "Search batch properties: %r, %r, %r",
  80. batch_group, batch_group_key, batch_token,
  81. )
  82. logger.info("Search content: %s", content)
  83. try:
  84. room_cat = content["search_categories"]["room_events"]
  85. # The actual thing to query in FTS
  86. search_term = room_cat["search_term"]
  87. # Which "keys" to search over in FTS query
  88. keys = room_cat.get("keys", [
  89. "content.body", "content.name", "content.topic",
  90. ])
  91. # Filter to apply to results
  92. filter_dict = room_cat.get("filter", {})
  93. # What to order results by (impacts whether pagination can be doen)
  94. order_by = room_cat.get("order_by", "rank")
  95. # Return the current state of the rooms?
  96. include_state = room_cat.get("include_state", False)
  97. # Include context around each event?
  98. event_context = room_cat.get(
  99. "event_context", None
  100. )
  101. # Group results together? May allow clients to paginate within a
  102. # group
  103. group_by = room_cat.get("groupings", {}).get("group_by", {})
  104. group_keys = [g["key"] for g in group_by]
  105. if event_context is not None:
  106. before_limit = int(event_context.get(
  107. "before_limit", 5
  108. ))
  109. after_limit = int(event_context.get(
  110. "after_limit", 5
  111. ))
  112. # Return the historic display name and avatar for the senders
  113. # of the events?
  114. include_profile = bool(event_context.get("include_profile", False))
  115. except KeyError:
  116. raise SynapseError(400, "Invalid search query")
  117. if order_by not in ("rank", "recent"):
  118. raise SynapseError(400, "Invalid order by: %r" % (order_by,))
  119. if set(group_keys) - {"room_id", "sender"}:
  120. raise SynapseError(
  121. 400,
  122. "Invalid group by keys: %r" % (set(group_keys) - {"room_id", "sender"},)
  123. )
  124. search_filter = Filter(filter_dict)
  125. # TODO: Search through left rooms too
  126. rooms = yield self.store.get_rooms_for_user_where_membership_is(
  127. user.to_string(),
  128. membership_list=[Membership.JOIN],
  129. # membership_list=[Membership.JOIN, Membership.LEAVE, Membership.Ban],
  130. )
  131. room_ids = set(r.room_id for r in rooms)
  132. # If doing a subset of all rooms seearch, check if any of the rooms
  133. # are from an upgraded room, and search their contents as well
  134. if search_filter.rooms:
  135. historical_room_ids = []
  136. for room_id in search_filter.rooms:
  137. # Add any previous rooms to the search if they exist
  138. ids = yield self.get_old_rooms_from_upgraded_room(room_id)
  139. historical_room_ids += ids
  140. # Prevent any historical events from being filtered
  141. search_filter = search_filter.with_room_ids(historical_room_ids)
  142. room_ids = search_filter.filter_rooms(room_ids)
  143. if batch_group == "room_id":
  144. room_ids.intersection_update({batch_group_key})
  145. if not room_ids:
  146. defer.returnValue({
  147. "search_categories": {
  148. "room_events": {
  149. "results": [],
  150. "count": 0,
  151. "highlights": [],
  152. }
  153. }
  154. })
  155. rank_map = {} # event_id -> rank of event
  156. allowed_events = []
  157. room_groups = {} # Holds result of grouping by room, if applicable
  158. sender_group = {} # Holds result of grouping by sender, if applicable
  159. # Holds the next_batch for the entire result set if one of those exists
  160. global_next_batch = None
  161. highlights = set()
  162. count = None
  163. if order_by == "rank":
  164. search_result = yield self.store.search_msgs(
  165. room_ids, search_term, keys
  166. )
  167. count = search_result["count"]
  168. if search_result["highlights"]:
  169. highlights.update(search_result["highlights"])
  170. results = search_result["results"]
  171. results_map = {r["event"].event_id: r for r in results}
  172. rank_map.update({r["event"].event_id: r["rank"] for r in results})
  173. filtered_events = search_filter.filter([r["event"] for r in results])
  174. events = yield filter_events_for_client(
  175. self.store, user.to_string(), filtered_events
  176. )
  177. events.sort(key=lambda e: -rank_map[e.event_id])
  178. allowed_events = events[:search_filter.limit()]
  179. for e in allowed_events:
  180. rm = room_groups.setdefault(e.room_id, {
  181. "results": [],
  182. "order": rank_map[e.event_id],
  183. })
  184. rm["results"].append(e.event_id)
  185. s = sender_group.setdefault(e.sender, {
  186. "results": [],
  187. "order": rank_map[e.event_id],
  188. })
  189. s["results"].append(e.event_id)
  190. elif order_by == "recent":
  191. room_events = []
  192. i = 0
  193. pagination_token = batch_token
  194. # We keep looping and we keep filtering until we reach the limit
  195. # or we run out of things.
  196. # But only go around 5 times since otherwise synapse will be sad.
  197. while len(room_events) < search_filter.limit() and i < 5:
  198. i += 1
  199. search_result = yield self.store.search_rooms(
  200. room_ids, search_term, keys, search_filter.limit() * 2,
  201. pagination_token=pagination_token,
  202. )
  203. if search_result["highlights"]:
  204. highlights.update(search_result["highlights"])
  205. count = search_result["count"]
  206. results = search_result["results"]
  207. results_map = {r["event"].event_id: r for r in results}
  208. rank_map.update({r["event"].event_id: r["rank"] for r in results})
  209. filtered_events = search_filter.filter([
  210. r["event"] for r in results
  211. ])
  212. events = yield filter_events_for_client(
  213. self.store, user.to_string(), filtered_events
  214. )
  215. room_events.extend(events)
  216. room_events = room_events[:search_filter.limit()]
  217. if len(results) < search_filter.limit() * 2:
  218. pagination_token = None
  219. break
  220. else:
  221. pagination_token = results[-1]["pagination_token"]
  222. for event in room_events:
  223. group = room_groups.setdefault(event.room_id, {
  224. "results": [],
  225. })
  226. group["results"].append(event.event_id)
  227. if room_events and len(room_events) >= search_filter.limit():
  228. last_event_id = room_events[-1].event_id
  229. pagination_token = results_map[last_event_id]["pagination_token"]
  230. # We want to respect the given batch group and group keys so
  231. # that if people blindly use the top level `next_batch` token
  232. # it returns more from the same group (if applicable) rather
  233. # than reverting to searching all results again.
  234. if batch_group and batch_group_key:
  235. global_next_batch = encode_base64(("%s\n%s\n%s" % (
  236. batch_group, batch_group_key, pagination_token
  237. )).encode('ascii'))
  238. else:
  239. global_next_batch = encode_base64(("%s\n%s\n%s" % (
  240. "all", "", pagination_token
  241. )).encode('ascii'))
  242. for room_id, group in room_groups.items():
  243. group["next_batch"] = encode_base64(("%s\n%s\n%s" % (
  244. "room_id", room_id, pagination_token
  245. )).encode('ascii'))
  246. allowed_events.extend(room_events)
  247. else:
  248. # We should never get here due to the guard earlier.
  249. raise NotImplementedError()
  250. logger.info("Found %d events to return", len(allowed_events))
  251. # If client has asked for "context" for each event (i.e. some surrounding
  252. # events and state), fetch that
  253. if event_context is not None:
  254. now_token = yield self.hs.get_event_sources().get_current_token()
  255. contexts = {}
  256. for event in allowed_events:
  257. res = yield self.store.get_events_around(
  258. event.room_id, event.event_id, before_limit, after_limit,
  259. )
  260. logger.info(
  261. "Context for search returned %d and %d events",
  262. len(res["events_before"]), len(res["events_after"]),
  263. )
  264. res["events_before"] = yield filter_events_for_client(
  265. self.store, user.to_string(), res["events_before"]
  266. )
  267. res["events_after"] = yield filter_events_for_client(
  268. self.store, user.to_string(), res["events_after"]
  269. )
  270. res["start"] = now_token.copy_and_replace(
  271. "room_key", res["start"]
  272. ).to_string()
  273. res["end"] = now_token.copy_and_replace(
  274. "room_key", res["end"]
  275. ).to_string()
  276. if include_profile:
  277. senders = set(
  278. ev.sender
  279. for ev in itertools.chain(
  280. res["events_before"], [event], res["events_after"]
  281. )
  282. )
  283. if res["events_after"]:
  284. last_event_id = res["events_after"][-1].event_id
  285. else:
  286. last_event_id = event.event_id
  287. state_filter = StateFilter.from_types(
  288. [(EventTypes.Member, sender) for sender in senders]
  289. )
  290. state = yield self.store.get_state_for_event(
  291. last_event_id, state_filter
  292. )
  293. res["profile_info"] = {
  294. s.state_key: {
  295. "displayname": s.content.get("displayname", None),
  296. "avatar_url": s.content.get("avatar_url", None),
  297. }
  298. for s in state.values()
  299. if s.type == EventTypes.Member and s.state_key in senders
  300. }
  301. contexts[event.event_id] = res
  302. else:
  303. contexts = {}
  304. # TODO: Add a limit
  305. time_now = self.clock.time_msec()
  306. for context in contexts.values():
  307. context["events_before"] = [
  308. serialize_event(e, time_now)
  309. for e in context["events_before"]
  310. ]
  311. context["events_after"] = [
  312. serialize_event(e, time_now)
  313. for e in context["events_after"]
  314. ]
  315. state_results = {}
  316. if include_state:
  317. rooms = set(e.room_id for e in allowed_events)
  318. for room_id in rooms:
  319. state = yield self.state_handler.get_current_state(room_id)
  320. state_results[room_id] = list(state.values())
  321. state_results.values()
  322. # We're now about to serialize the events. We should not make any
  323. # blocking calls after this. Otherwise the 'age' will be wrong
  324. results = [
  325. {
  326. "rank": rank_map[e.event_id],
  327. "result": serialize_event(e, time_now),
  328. "context": contexts.get(e.event_id, {}),
  329. }
  330. for e in allowed_events
  331. ]
  332. rooms_cat_res = {
  333. "results": results,
  334. "count": count,
  335. "highlights": list(highlights),
  336. }
  337. if state_results:
  338. rooms_cat_res["state"] = {
  339. room_id: [serialize_event(e, time_now) for e in state]
  340. for room_id, state in state_results.items()
  341. }
  342. if room_groups and "room_id" in group_keys:
  343. rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups
  344. if sender_group and "sender" in group_keys:
  345. rooms_cat_res.setdefault("groups", {})["sender"] = sender_group
  346. if global_next_batch:
  347. rooms_cat_res["next_batch"] = global_next_batch
  348. defer.returnValue({
  349. "search_categories": {
  350. "room_events": rooms_cat_res
  351. }
  352. })