events.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. # Copyright 2014-2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import random
  16. from typing import TYPE_CHECKING, Iterable, List, Optional
  17. from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState
  18. from synapse.api.errors import AuthError, SynapseError
  19. from synapse.events import EventBase
  20. from synapse.events.utils import SerializeEventConfig
  21. from synapse.handlers.presence import format_user_presence_state
  22. from synapse.storage.databases.main.events_worker import EventRedactBehaviour
  23. from synapse.streams.config import PaginationConfig
  24. from synapse.types import JsonDict, UserID
  25. from synapse.visibility import filter_events_for_client
  26. if TYPE_CHECKING:
  27. from synapse.server import HomeServer
  28. logger = logging.getLogger(__name__)
  29. class EventStreamHandler:
  30. def __init__(self, hs: "HomeServer"):
  31. self.store = hs.get_datastores().main
  32. self.clock = hs.get_clock()
  33. self.hs = hs
  34. self.notifier = hs.get_notifier()
  35. self.state = hs.get_state_handler()
  36. self._server_notices_sender = hs.get_server_notices_sender()
  37. self._event_serializer = hs.get_event_client_serializer()
  38. async def get_stream(
  39. self,
  40. auth_user_id: str,
  41. pagin_config: PaginationConfig,
  42. timeout: int = 0,
  43. as_client_event: bool = True,
  44. affect_presence: bool = True,
  45. room_id: Optional[str] = None,
  46. is_guest: bool = False,
  47. ) -> JsonDict:
  48. """Fetches the events stream for a given user."""
  49. if room_id:
  50. blocked = await self.store.is_room_blocked(room_id)
  51. if blocked:
  52. raise SynapseError(403, "This room has been blocked on this server")
  53. # send any outstanding server notices to the user.
  54. await self._server_notices_sender.on_user_syncing(auth_user_id)
  55. auth_user = UserID.from_string(auth_user_id)
  56. presence_handler = self.hs.get_presence_handler()
  57. context = await presence_handler.user_syncing(
  58. auth_user_id,
  59. affect_presence=affect_presence,
  60. presence_state=PresenceState.ONLINE,
  61. )
  62. with context:
  63. if timeout:
  64. # If they've set a timeout set a minimum limit.
  65. timeout = max(timeout, 500)
  66. # Add some randomness to this value to try and mitigate against
  67. # thundering herds on restart.
  68. timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
  69. stream_result = await self.notifier.get_events_for(
  70. auth_user,
  71. pagin_config,
  72. timeout,
  73. is_guest=is_guest,
  74. explicit_room_id=room_id,
  75. )
  76. events = stream_result.events
  77. time_now = self.clock.time_msec()
  78. # When the user joins a new room, or another user joins a currently
  79. # joined room, we need to send down presence for those users.
  80. to_add: List[JsonDict] = []
  81. for event in events:
  82. if not isinstance(event, EventBase):
  83. continue
  84. if event.type == EventTypes.Member:
  85. if event.membership != Membership.JOIN:
  86. continue
  87. # Send down presence.
  88. if event.state_key == auth_user_id:
  89. # Send down presence for everyone in the room.
  90. users: Iterable[str] = await self.store.get_users_in_room(
  91. event.room_id
  92. )
  93. else:
  94. users = [event.state_key]
  95. states = await presence_handler.get_states(users)
  96. to_add.extend(
  97. {
  98. "type": EduTypes.PRESENCE,
  99. "content": format_user_presence_state(state, time_now),
  100. }
  101. for state in states
  102. )
  103. events.extend(to_add)
  104. chunks = self._event_serializer.serialize_events(
  105. events,
  106. time_now,
  107. config=SerializeEventConfig(as_client_event=as_client_event),
  108. )
  109. chunk = {
  110. "chunk": chunks,
  111. "start": await stream_result.start_token.to_string(self.store),
  112. "end": await stream_result.end_token.to_string(self.store),
  113. }
  114. return chunk
  115. class EventHandler:
  116. def __init__(self, hs: "HomeServer"):
  117. self.store = hs.get_datastores().main
  118. self._storage_controllers = hs.get_storage_controllers()
  119. async def get_event(
  120. self,
  121. user: UserID,
  122. room_id: Optional[str],
  123. event_id: str,
  124. show_redacted: bool = False,
  125. ) -> Optional[EventBase]:
  126. """Retrieve a single specified event.
  127. Args:
  128. user: The local user requesting the event
  129. room_id: The expected room id. We'll return None if the
  130. event's room does not match.
  131. event_id: The event ID to obtain.
  132. show_redacted: Should the full content of redacted events be returned?
  133. Returns:
  134. An event, or None if there is no event matching this ID.
  135. Raises:
  136. SynapseError if there was a problem retrieving this event, or
  137. AuthError if the user does not have the rights to inspect this
  138. event.
  139. """
  140. redact_behaviour = (
  141. EventRedactBehaviour.as_is if show_redacted else EventRedactBehaviour.redact
  142. )
  143. event = await self.store.get_event(
  144. event_id, check_room_id=room_id, redact_behaviour=redact_behaviour
  145. )
  146. if not event:
  147. return None
  148. is_user_in_room = await self.store.check_local_user_in_room(
  149. user_id=user.to_string(), room_id=event.room_id
  150. )
  151. # The user is peeking if they aren't in the room already
  152. is_peeking = not is_user_in_room
  153. filtered = await filter_events_for_client(
  154. self._storage_controllers, user.to_string(), [event], is_peeking=is_peeking
  155. )
  156. if not filtered:
  157. raise AuthError(403, "You don't have permission to access that event.")
  158. return event