|
@@ -495,9 +495,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|
|
users=users_to_states.keys(),
|
|
|
)
|
|
|
|
|
|
- # If this is a federation sender, notify about presence updates.
|
|
|
- await self.maybe_send_presence_to_interested_destinations(states)
|
|
|
-
|
|
|
async def process_replication_rows(
|
|
|
self, stream_name: str, instance_name: str, token: int, rows: list
|
|
|
):
|
|
@@ -519,11 +516,27 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|
|
for row in rows
|
|
|
]
|
|
|
|
|
|
- for state in states:
|
|
|
- self.user_to_current_state[state.user_id] = state
|
|
|
+ # The list of states to notify sync streams and remote servers about.
|
|
|
+ # This is calculated by comparing the old and new states for each user
|
|
|
+ # using `should_notify(..)`.
|
|
|
+ #
|
|
|
+ # Note that this is necessary as the presence writer will periodically
|
|
|
+ # flush presence state changes that should not be notified about to the
|
|
|
+ # DB, and so will be sent over the replication stream.
|
|
|
+ state_to_notify = []
|
|
|
+
|
|
|
+ for new_state in states:
|
|
|
+ old_state = self.user_to_current_state.get(new_state.user_id)
|
|
|
+ self.user_to_current_state[new_state.user_id] = new_state
|
|
|
+
|
|
|
+ if not old_state or should_notify(old_state, new_state):
|
|
|
+ state_to_notify.append(new_state)
|
|
|
|
|
|
stream_id = token
|
|
|
- await self.notify_from_replication(states, stream_id)
|
|
|
+ await self.notify_from_replication(state_to_notify, stream_id)
|
|
|
+
|
|
|
+ # If this is a federation sender, notify about presence updates.
|
|
|
+ await self.maybe_send_presence_to_interested_destinations(state_to_notify)
|
|
|
|
|
|
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
|
|
|
return [
|