receipts.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
  16. from synapse.api.constants import EduTypes, ReceiptTypes
  17. from synapse.appservice import ApplicationService
  18. from synapse.streams import EventSource
  19. from synapse.types import (
  20. JsonDict,
  21. ReadReceipt,
  22. StreamKeyType,
  23. UserID,
  24. get_domain_from_id,
  25. )
  26. if TYPE_CHECKING:
  27. from synapse.server import HomeServer
  28. logger = logging.getLogger(__name__)
  29. class ReceiptsHandler:
  30. def __init__(self, hs: "HomeServer"):
  31. self.notifier = hs.get_notifier()
  32. self.server_name = hs.config.server.server_name
  33. self.store = hs.get_datastores().main
  34. self.event_auth_handler = hs.get_event_auth_handler()
  35. self.hs = hs
  36. # We only need to poke the federation sender explicitly if its on the
  37. # same instance. Other federation sender instances will get notified by
  38. # `synapse.app.generic_worker.FederationSenderHandler` when it sees it
  39. # in the receipts stream.
  40. self.federation_sender = None
  41. if hs.should_send_federation():
  42. self.federation_sender = hs.get_federation_sender()
  43. # If we can handle the receipt EDUs we do so, otherwise we route them
  44. # to the appropriate worker.
  45. if hs.get_instance_name() in hs.config.worker.writers.receipts:
  46. hs.get_federation_registry().register_edu_handler(
  47. EduTypes.RECEIPT, self._received_remote_receipt
  48. )
  49. else:
  50. hs.get_federation_registry().register_instances_for_edu(
  51. EduTypes.RECEIPT,
  52. hs.config.worker.writers.receipts,
  53. )
  54. self.clock = self.hs.get_clock()
  55. self.state = hs.get_state_handler()
  56. async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None:
  57. """Called when we receive an EDU of type m.receipt from a remote HS."""
  58. receipts = []
  59. for room_id, room_values in content.items():
  60. # If we're not in the room just ditch the event entirely. This is
  61. # probably an old server that has come back and thinks we're still in
  62. # the room (or we've been rejoined to the room by a state reset).
  63. is_in_room = await self.event_auth_handler.is_host_in_room(
  64. room_id, self.server_name
  65. )
  66. if not is_in_room:
  67. logger.info(
  68. "Ignoring receipt for room %r from server %s as we're not in the room",
  69. room_id,
  70. origin,
  71. )
  72. continue
  73. for receipt_type, users in room_values.items():
  74. for user_id, user_values in users.items():
  75. if get_domain_from_id(user_id) != origin:
  76. logger.info(
  77. "Received receipt for user %r from server %s, ignoring",
  78. user_id,
  79. origin,
  80. )
  81. continue
  82. # Check if these receipts apply to a thread.
  83. data = user_values.get("data", {})
  84. thread_id = data.get("thread_id")
  85. # If the thread ID is invalid, consider it missing.
  86. if not isinstance(thread_id, str):
  87. thread_id = None
  88. receipts.append(
  89. ReadReceipt(
  90. room_id=room_id,
  91. receipt_type=receipt_type,
  92. user_id=user_id,
  93. event_ids=user_values["event_ids"],
  94. thread_id=thread_id,
  95. data=data,
  96. )
  97. )
  98. await self._handle_new_receipts(receipts)
  99. async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool:
  100. """Takes a list of receipts, stores them and informs the notifier."""
  101. min_batch_id: Optional[int] = None
  102. max_batch_id: Optional[int] = None
  103. for receipt in receipts:
  104. res = await self.store.insert_receipt(
  105. receipt.room_id,
  106. receipt.receipt_type,
  107. receipt.user_id,
  108. receipt.event_ids,
  109. receipt.thread_id,
  110. receipt.data,
  111. )
  112. if not res:
  113. # res will be None if this receipt is 'old'
  114. continue
  115. stream_id, max_persisted_id = res
  116. if min_batch_id is None or stream_id < min_batch_id:
  117. min_batch_id = stream_id
  118. if max_batch_id is None or max_persisted_id > max_batch_id:
  119. max_batch_id = max_persisted_id
  120. # Either both of these should be None or neither.
  121. if min_batch_id is None or max_batch_id is None:
  122. # no new receipts
  123. return False
  124. affected_room_ids = list({r.room_id for r in receipts})
  125. self.notifier.on_new_event(
  126. StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids
  127. )
  128. # Note that the min here shouldn't be relied upon to be accurate.
  129. await self.hs.get_pusherpool().on_new_receipts(
  130. min_batch_id, max_batch_id, affected_room_ids
  131. )
  132. return True
  133. async def received_client_receipt(
  134. self,
  135. room_id: str,
  136. receipt_type: str,
  137. user_id: str,
  138. event_id: str,
  139. thread_id: Optional[str],
  140. ) -> None:
  141. """Called when a client tells us a local user has read up to the given
  142. event_id in the room.
  143. """
  144. receipt = ReadReceipt(
  145. room_id=room_id,
  146. receipt_type=receipt_type,
  147. user_id=user_id,
  148. event_ids=[event_id],
  149. thread_id=thread_id,
  150. data={"ts": int(self.clock.time_msec())},
  151. )
  152. is_new = await self._handle_new_receipts([receipt])
  153. if not is_new:
  154. return
  155. if self.federation_sender and receipt_type != ReceiptTypes.READ_PRIVATE:
  156. await self.federation_sender.send_read_receipt(receipt)
  157. class ReceiptEventSource(EventSource[int, JsonDict]):
  158. def __init__(self, hs: "HomeServer"):
  159. self.store = hs.get_datastores().main
  160. self.config = hs.config
  161. @staticmethod
  162. def filter_out_private_receipts(
  163. rooms: List[JsonDict], user_id: str
  164. ) -> List[JsonDict]:
  165. """
  166. Filters a list of serialized receipts (as returned by /sync and /initialSync)
  167. and removes private read receipts of other users.
  168. This operates on the return value of get_linearized_receipts_for_rooms(),
  169. which is wrapped in a cache. Care must be taken to ensure that the input
  170. values are not modified.
  171. Args:
  172. rooms: A list of mappings, each mapping has a `content` field, which
  173. is a map of event ID -> receipt type -> user ID -> receipt information.
  174. Returns:
  175. The same as rooms, but filtered.
  176. """
  177. result = []
  178. # Iterate through each room's receipt content.
  179. for room in rooms:
  180. # The receipt content with other user's private read receipts removed.
  181. content = {}
  182. # Iterate over each event ID / receipts for that event.
  183. for event_id, orig_event_content in room.get("content", {}).items():
  184. event_content = orig_event_content
  185. # If there are private read receipts, additional logic is necessary.
  186. if ReceiptTypes.READ_PRIVATE in event_content:
  187. # Make a copy without private read receipts to avoid leaking
  188. # other user's private read receipts..
  189. event_content = {
  190. receipt_type: receipt_value
  191. for receipt_type, receipt_value in event_content.items()
  192. if receipt_type != ReceiptTypes.READ_PRIVATE
  193. }
  194. # Copy the current user's private read receipt from the
  195. # original content, if it exists.
  196. user_private_read_receipt = orig_event_content[
  197. ReceiptTypes.READ_PRIVATE
  198. ].get(user_id, None)
  199. if user_private_read_receipt:
  200. event_content[ReceiptTypes.READ_PRIVATE] = {
  201. user_id: user_private_read_receipt
  202. }
  203. # Include the event if there is at least one non-private read
  204. # receipt or the current user has a private read receipt.
  205. if event_content:
  206. content[event_id] = event_content
  207. # Include the event if there is at least one non-private read receipt
  208. # or the current user has a private read receipt.
  209. if content:
  210. # Build a new event to avoid mutating the cache.
  211. new_room = {k: v for k, v in room.items() if k != "content"}
  212. new_room["content"] = content
  213. result.append(new_room)
  214. return result
  215. async def get_new_events(
  216. self,
  217. user: UserID,
  218. from_key: int,
  219. limit: int,
  220. room_ids: Iterable[str],
  221. is_guest: bool,
  222. explicit_room_id: Optional[str] = None,
  223. ) -> Tuple[List[JsonDict], int]:
  224. from_key = int(from_key)
  225. to_key = self.get_current_key()
  226. if from_key == to_key:
  227. return [], to_key
  228. events = await self.store.get_linearized_receipts_for_rooms(
  229. room_ids, from_key=from_key, to_key=to_key
  230. )
  231. events = ReceiptEventSource.filter_out_private_receipts(
  232. events, user.to_string()
  233. )
  234. return events, to_key
  235. async def get_new_events_as(
  236. self, from_key: int, to_key: int, service: ApplicationService
  237. ) -> Tuple[List[JsonDict], int]:
  238. """Returns a set of new read receipt events that an appservice
  239. may be interested in.
  240. Args:
  241. from_key: the stream position at which events should be fetched from
  242. to_key: the stream position up to which events should be fetched to
  243. service: The appservice which may be interested
  244. Returns:
  245. A two-tuple containing the following:
  246. * A list of json dictionaries derived from read receipts that the
  247. appservice may be interested in.
  248. * The current read receipt stream token.
  249. """
  250. from_key = int(from_key)
  251. if from_key == to_key:
  252. return [], to_key
  253. # Fetch all read receipts for all rooms, up to a limit of 100. This is ordered
  254. # by most recent.
  255. rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms(
  256. from_key=from_key, to_key=to_key
  257. )
  258. # Then filter down to rooms that the AS can read
  259. events = []
  260. for room_id, event in rooms_to_events.items():
  261. if not await service.is_interested_in_room(room_id, self.store):
  262. continue
  263. events.append(event)
  264. return events, to_key
  265. def get_current_key(self, direction: str = "f") -> int:
  266. return self.store.get_max_receipt_stream_id()