|
@@ -1183,7 +1183,16 @@ class PresenceHandler(BasePresenceHandler):
|
|
|
max_pos, deltas = await self.store.get_current_state_deltas(
|
|
|
self._event_pos, room_max_stream_ordering
|
|
|
)
|
|
|
- await self._handle_state_delta(deltas)
|
|
|
+
|
|
|
+ # We may get multiple deltas for different rooms, but we want to
|
|
|
+ # handle them on a room by room basis, so we batch them up by
|
|
|
+ # room.
|
|
|
+ deltas_by_room: Dict[str, List[JsonDict]] = {}
|
|
|
+ for delta in deltas:
|
|
|
+ deltas_by_room.setdefault(delta["room_id"], []).append(delta)
|
|
|
+
|
|
|
+ for room_id, deltas_for_room in deltas_by_room.items():
|
|
|
+ await self._handle_state_delta(room_id, deltas_for_room)
|
|
|
|
|
|
self._event_pos = max_pos
|
|
|
|
|
@@ -1192,17 +1201,21 @@ class PresenceHandler(BasePresenceHandler):
|
|
|
max_pos
|
|
|
)
|
|
|
|
|
|
- async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
|
|
|
- """Process current state deltas to find new joins that need to be
|
|
|
- handled.
|
|
|
+ async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
|
|
|
+ """Process current state deltas for the room to find new joins that need
|
|
|
+ to be handled.
|
|
|
"""
|
|
|
- # A map of destination to a set of user state that they should receive
|
|
|
- presence_destinations = {} # type: Dict[str, Set[UserPresenceState]]
|
|
|
+
|
|
|
+ # Sets of newly joined users. Note that if the local server is
|
|
|
+ # joining a remote room for the first time we'll see both the joining
|
|
|
+ # user and all remote users as newly joined.
|
|
|
+ newly_joined_users = set()
|
|
|
|
|
|
for delta in deltas:
|
|
|
+ assert room_id == delta["room_id"]
|
|
|
+
|
|
|
typ = delta["type"]
|
|
|
state_key = delta["state_key"]
|
|
|
- room_id = delta["room_id"]
|
|
|
event_id = delta["event_id"]
|
|
|
prev_event_id = delta["prev_event_id"]
|
|
|
|
|
@@ -1231,72 +1244,55 @@ class PresenceHandler(BasePresenceHandler):
|
|
|
# Ignore changes to join events.
|
|
|
continue
|
|
|
|
|
|
- # Retrieve any user presence state updates that need to be sent as a result,
|
|
|
- # and the destinations that need to receive it
|
|
|
- destinations, user_presence_states = await self._on_user_joined_room(
|
|
|
- room_id, state_key
|
|
|
- )
|
|
|
-
|
|
|
- # Insert the destinations and respective updates into our destinations dict
|
|
|
- for destination in destinations:
|
|
|
- presence_destinations.setdefault(destination, set()).update(
|
|
|
- user_presence_states
|
|
|
- )
|
|
|
-
|
|
|
- # Send out user presence updates for each destination
|
|
|
- for destination, user_state_set in presence_destinations.items():
|
|
|
- self._federation_queue.send_presence_to_destinations(
|
|
|
- destinations=[destination], states=user_state_set
|
|
|
- )
|
|
|
-
|
|
|
- async def _on_user_joined_room(
|
|
|
- self, room_id: str, user_id: str
|
|
|
- ) -> Tuple[List[str], List[UserPresenceState]]:
|
|
|
- """Called when we detect a user joining the room via the current state
|
|
|
- delta stream. Returns the destinations that need to be updated and the
|
|
|
- presence updates to send to them.
|
|
|
-
|
|
|
- Args:
|
|
|
- room_id: The ID of the room that the user has joined.
|
|
|
- user_id: The ID of the user that has joined the room.
|
|
|
-
|
|
|
- Returns:
|
|
|
- A tuple of destinations and presence updates to send to them.
|
|
|
- """
|
|
|
- if self.is_mine_id(user_id):
|
|
|
- # If this is a local user then we need to send their presence
|
|
|
- # out to hosts in the room (who don't already have it)
|
|
|
-
|
|
|
- # TODO: We should be able to filter the hosts down to those that
|
|
|
- # haven't previously seen the user
|
|
|
-
|
|
|
- remote_hosts = await self.state.get_current_hosts_in_room(room_id)
|
|
|
+ newly_joined_users.add(state_key)
|
|
|
|
|
|
- # Filter out ourselves.
|
|
|
- filtered_remote_hosts = [
|
|
|
- host for host in remote_hosts if host != self.server_name
|
|
|
- ]
|
|
|
-
|
|
|
- state = await self.current_state_for_user(user_id)
|
|
|
- return filtered_remote_hosts, [state]
|
|
|
- else:
|
|
|
- # A remote user has joined the room, so we need to:
|
|
|
- # 1. Check if this is a new server in the room
|
|
|
- # 2. If so send any presence they don't already have for
|
|
|
- # local users in the room.
|
|
|
-
|
|
|
- # TODO: We should be able to filter the users down to those that
|
|
|
- # the server hasn't previously seen
|
|
|
-
|
|
|
- # TODO: Check that this is actually a new server joining the
|
|
|
- # room.
|
|
|
-
|
|
|
- remote_host = get_domain_from_id(user_id)
|
|
|
+ if not newly_joined_users:
|
|
|
+ # If nobody has joined then there's nothing to do.
|
|
|
+ return
|
|
|
|
|
|
- users = await self.store.get_users_in_room(room_id)
|
|
|
- user_ids = list(filter(self.is_mine_id, users))
|
|
|
+ # We want to send:
|
|
|
+ # 1. presence states of all local users in the room to newly joined
|
|
|
+ # remote servers
|
|
|
+ # 2. presence states of newly joined users to all remote servers in
|
|
|
+ # the room.
|
|
|
+ #
|
|
|
+ # TODO: Only send presence states to remote hosts that don't already
|
|
|
+ # have them (because they already share rooms).
|
|
|
+
|
|
|
+ # Get all the users who were already in the room, by fetching the
|
|
|
+ # current users in the room and removing the newly joined users.
|
|
|
+ users = await self.store.get_users_in_room(room_id)
|
|
|
+ prev_users = set(users) - newly_joined_users
|
|
|
+
|
|
|
+ # Construct sets for all the local users and remote hosts that were
|
|
|
+ # already in the room
|
|
|
+ prev_local_users = []
|
|
|
+ prev_remote_hosts = set()
|
|
|
+ for user_id in prev_users:
|
|
|
+ if self.is_mine_id(user_id):
|
|
|
+ prev_local_users.append(user_id)
|
|
|
+ else:
|
|
|
+ prev_remote_hosts.add(get_domain_from_id(user_id))
|
|
|
+
|
|
|
+ # Similarly, construct sets for all the local users and remote hosts
|
|
|
+ # that were *not* already in the room. Care needs to be taken with the
|
|
|
+ # calculating the remote hosts, as a host may have already been in the
|
|
|
+ # room even if there is a newly joined user from that host.
|
|
|
+ newly_joined_local_users = []
|
|
|
+ newly_joined_remote_hosts = set()
|
|
|
+ for user_id in newly_joined_users:
|
|
|
+ if self.is_mine_id(user_id):
|
|
|
+ newly_joined_local_users.append(user_id)
|
|
|
+ else:
|
|
|
+ host = get_domain_from_id(user_id)
|
|
|
+ if host not in prev_remote_hosts:
|
|
|
+ newly_joined_remote_hosts.add(host)
|
|
|
|
|
|
- states_d = await self.current_state_for_users(user_ids)
|
|
|
+ # Send presence states of all local users in the room to newly joined
|
|
|
+ # remote servers. (We actually only send states for local users already
|
|
|
+ # in the room, as we'll send states for newly joined local users below.)
|
|
|
+ if prev_local_users and newly_joined_remote_hosts:
|
|
|
+ local_states = await self.current_state_for_users(prev_local_users)
|
|
|
|
|
|
# Filter out old presence, i.e. offline presence states where
|
|
|
# the user hasn't been active for a week. We can change this
|
|
@@ -1306,13 +1302,27 @@ class PresenceHandler(BasePresenceHandler):
|
|
|
now = self.clock.time_msec()
|
|
|
states = [
|
|
|
state
|
|
|
- for state in states_d.values()
|
|
|
+ for state in local_states.values()
|
|
|
if state.state != PresenceState.OFFLINE
|
|
|
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
|
|
|
or state.status_msg is not None
|
|
|
]
|
|
|
|
|
|
- return [remote_host], states
|
|
|
+ self._federation_queue.send_presence_to_destinations(
|
|
|
+ destinations=newly_joined_remote_hosts,
|
|
|
+ states=states,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Send presence states of newly joined users to all remote servers in
|
|
|
+ # the room
|
|
|
+ if newly_joined_local_users and (
|
|
|
+ prev_remote_hosts or newly_joined_remote_hosts
|
|
|
+ ):
|
|
|
+ local_states = await self.current_state_for_users(newly_joined_local_users)
|
|
|
+ self._federation_queue.send_presence_to_destinations(
|
|
|
+ destinations=prev_remote_hosts | newly_joined_remote_hosts,
|
|
|
+ states=list(local_states.values()),
|
|
|
+ )
|
|
|
|
|
|
|
|
|
def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool:
|