receipts.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
  16. from synapse.api.constants import ReceiptTypes
  17. from synapse.appservice import ApplicationService
  18. from synapse.streams import EventSource
  19. from synapse.types import (
  20. JsonDict,
  21. ReadReceipt,
  22. StreamKeyType,
  23. UserID,
  24. get_domain_from_id,
  25. )
  26. if TYPE_CHECKING:
  27. from synapse.server import HomeServer
  28. logger = logging.getLogger(__name__)
  29. class ReceiptsHandler:
  30. def __init__(self, hs: "HomeServer"):
  31. self.notifier = hs.get_notifier()
  32. self.server_name = hs.config.server.server_name
  33. self.store = hs.get_datastores().main
  34. self.event_auth_handler = hs.get_event_auth_handler()
  35. self.hs = hs
  36. # We only need to poke the federation sender explicitly if its on the
  37. # same instance. Other federation sender instances will get notified by
  38. # `synapse.app.generic_worker.FederationSenderHandler` when it sees it
  39. # in the receipts stream.
  40. self.federation_sender = None
  41. if hs.should_send_federation():
  42. self.federation_sender = hs.get_federation_sender()
  43. # If we can handle the receipt EDUs we do so, otherwise we route them
  44. # to the appropriate worker.
  45. if hs.get_instance_name() in hs.config.worker.writers.receipts:
  46. hs.get_federation_registry().register_edu_handler(
  47. "m.receipt", self._received_remote_receipt
  48. )
  49. else:
  50. hs.get_federation_registry().register_instances_for_edu(
  51. "m.receipt",
  52. hs.config.worker.writers.receipts,
  53. )
  54. self.clock = self.hs.get_clock()
  55. self.state = hs.get_state_handler()
  56. async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None:
  57. """Called when we receive an EDU of type m.receipt from a remote HS."""
  58. receipts = []
  59. for room_id, room_values in content.items():
  60. # If we're not in the room just ditch the event entirely. This is
  61. # probably an old server that has come back and thinks we're still in
  62. # the room (or we've been rejoined to the room by a state reset).
  63. is_in_room = await self.event_auth_handler.check_host_in_room(
  64. room_id, self.server_name
  65. )
  66. if not is_in_room:
  67. logger.info(
  68. "Ignoring receipt for room %r from server %s as we're not in the room",
  69. room_id,
  70. origin,
  71. )
  72. continue
  73. for receipt_type, users in room_values.items():
  74. for user_id, user_values in users.items():
  75. if get_domain_from_id(user_id) != origin:
  76. logger.info(
  77. "Received receipt for user %r from server %s, ignoring",
  78. user_id,
  79. origin,
  80. )
  81. continue
  82. receipts.append(
  83. ReadReceipt(
  84. room_id=room_id,
  85. receipt_type=receipt_type,
  86. user_id=user_id,
  87. event_ids=user_values["event_ids"],
  88. data=user_values.get("data", {}),
  89. )
  90. )
  91. await self._handle_new_receipts(receipts)
  92. async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool:
  93. """Takes a list of receipts, stores them and informs the notifier."""
  94. min_batch_id: Optional[int] = None
  95. max_batch_id: Optional[int] = None
  96. for receipt in receipts:
  97. res = await self.store.insert_receipt(
  98. receipt.room_id,
  99. receipt.receipt_type,
  100. receipt.user_id,
  101. receipt.event_ids,
  102. receipt.data,
  103. )
  104. if not res:
  105. # res will be None if this receipt is 'old'
  106. continue
  107. stream_id, max_persisted_id = res
  108. if min_batch_id is None or stream_id < min_batch_id:
  109. min_batch_id = stream_id
  110. if max_batch_id is None or max_persisted_id > max_batch_id:
  111. max_batch_id = max_persisted_id
  112. # Either both of these should be None or neither.
  113. if min_batch_id is None or max_batch_id is None:
  114. # no new receipts
  115. return False
  116. affected_room_ids = list({r.room_id for r in receipts})
  117. self.notifier.on_new_event(
  118. StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids
  119. )
  120. # Note that the min here shouldn't be relied upon to be accurate.
  121. await self.hs.get_pusherpool().on_new_receipts(
  122. min_batch_id, max_batch_id, affected_room_ids
  123. )
  124. return True
  125. async def received_client_receipt(
  126. self, room_id: str, receipt_type: str, user_id: str, event_id: str
  127. ) -> None:
  128. """Called when a client tells us a local user has read up to the given
  129. event_id in the room.
  130. """
  131. receipt = ReadReceipt(
  132. room_id=room_id,
  133. receipt_type=receipt_type,
  134. user_id=user_id,
  135. event_ids=[event_id],
  136. data={"ts": int(self.clock.time_msec())},
  137. )
  138. is_new = await self._handle_new_receipts([receipt])
  139. if not is_new:
  140. return
  141. if self.federation_sender and receipt_type != ReceiptTypes.READ_PRIVATE:
  142. await self.federation_sender.send_read_receipt(receipt)
  143. class ReceiptEventSource(EventSource[int, JsonDict]):
  144. def __init__(self, hs: "HomeServer"):
  145. self.store = hs.get_datastores().main
  146. self.config = hs.config
  147. @staticmethod
  148. def filter_out_private_receipts(
  149. rooms: List[JsonDict], user_id: str
  150. ) -> List[JsonDict]:
  151. """
  152. Filters a list of serialized receipts (as returned by /sync and /initialSync)
  153. and removes private read receipts of other users.
  154. This operates on the return value of get_linearized_receipts_for_rooms(),
  155. which is wrapped in a cache. Care must be taken to ensure that the input
  156. values are not modified.
  157. Args:
  158. rooms: A list of mappings, each mapping has a `content` field, which
  159. is a map of event ID -> receipt type -> user ID -> receipt information.
  160. Returns:
  161. The same as rooms, but filtered.
  162. """
  163. result = []
  164. # Iterate through each room's receipt content.
  165. for room in rooms:
  166. # The receipt content with other user's private read receipts removed.
  167. content = {}
  168. # Iterate over each event ID / receipts for that event.
  169. for event_id, orig_event_content in room.get("content", {}).items():
  170. event_content = orig_event_content
  171. # If there are private read receipts, additional logic is necessary.
  172. if ReceiptTypes.READ_PRIVATE in event_content:
  173. # Make a copy without private read receipts to avoid leaking
  174. # other user's private read receipts..
  175. event_content = {
  176. receipt_type: receipt_value
  177. for receipt_type, receipt_value in event_content.items()
  178. if receipt_type != ReceiptTypes.READ_PRIVATE
  179. }
  180. # Copy the current user's private read receipt from the
  181. # original content, if it exists.
  182. user_private_read_receipt = orig_event_content[
  183. ReceiptTypes.READ_PRIVATE
  184. ].get(user_id, None)
  185. if user_private_read_receipt:
  186. event_content[ReceiptTypes.READ_PRIVATE] = {
  187. user_id: user_private_read_receipt
  188. }
  189. # Include the event if there is at least one non-private read
  190. # receipt or the current user has a private read receipt.
  191. if event_content:
  192. content[event_id] = event_content
  193. # Include the event if there is at least one non-private read receipt
  194. # or the current user has a private read receipt.
  195. if content:
  196. # Build a new event to avoid mutating the cache.
  197. new_room = {k: v for k, v in room.items() if k != "content"}
  198. new_room["content"] = content
  199. result.append(new_room)
  200. return result
  201. async def get_new_events(
  202. self,
  203. user: UserID,
  204. from_key: int,
  205. limit: Optional[int],
  206. room_ids: Iterable[str],
  207. is_guest: bool,
  208. explicit_room_id: Optional[str] = None,
  209. ) -> Tuple[List[JsonDict], int]:
  210. from_key = int(from_key)
  211. to_key = self.get_current_key()
  212. if from_key == to_key:
  213. return [], to_key
  214. events = await self.store.get_linearized_receipts_for_rooms(
  215. room_ids, from_key=from_key, to_key=to_key
  216. )
  217. if self.config.experimental.msc2285_enabled:
  218. events = ReceiptEventSource.filter_out_private_receipts(
  219. events, user.to_string()
  220. )
  221. return events, to_key
  222. async def get_new_events_as(
  223. self, from_key: int, to_key: int, service: ApplicationService
  224. ) -> Tuple[List[JsonDict], int]:
  225. """Returns a set of new read receipt events that an appservice
  226. may be interested in.
  227. Args:
  228. from_key: the stream position at which events should be fetched from
  229. to_key: the stream position up to which events should be fetched to
  230. service: The appservice which may be interested
  231. Returns:
  232. A two-tuple containing the following:
  233. * A list of json dictionaries derived from read receipts that the
  234. appservice may be interested in.
  235. * The current read receipt stream token.
  236. """
  237. from_key = int(from_key)
  238. if from_key == to_key:
  239. return [], to_key
  240. # Fetch all read receipts for all rooms, up to a limit of 100. This is ordered
  241. # by most recent.
  242. rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms(
  243. from_key=from_key, to_key=to_key
  244. )
  245. # Then filter down to rooms that the AS can read
  246. events = []
  247. for room_id, event in rooms_to_events.items():
  248. if not await service.is_interested_in_room(room_id, self.store):
  249. continue
  250. events.append(event)
  251. return events, to_key
  252. def get_current_key(self, direction: str = "f") -> int:
  253. return self.store.get_max_receipt_stream_id()