|
@@ -500,12 +500,27 @@ class SyncHandler:
|
|
|
async def _load_filtered_recents(
|
|
|
self,
|
|
|
room_id: str,
|
|
|
+ sync_result_builder: "SyncResultBuilder",
|
|
|
sync_config: SyncConfig,
|
|
|
- now_token: StreamToken,
|
|
|
+ upto_token: StreamToken,
|
|
|
since_token: Optional[StreamToken] = None,
|
|
|
potential_recents: Optional[List[EventBase]] = None,
|
|
|
newly_joined_room: bool = False,
|
|
|
) -> TimelineBatch:
|
|
|
+ """Create a timeline batch for the room
|
|
|
+
|
|
|
+ Args:
|
|
|
+ room_id
|
|
|
+ sync_result_builder
|
|
|
+ sync_config
|
|
|
+ upto_token: The token up to which we should fetch (more) events.
|
|
|
+ If `potential_results` is non-empty then this is *start* of
|
|
|
+ the the list.
|
|
|
+ since_token
|
|
|
+ potential_recents: If non-empty, the events between the since token
|
|
|
+ and current token to send down to clients.
|
|
|
+ newly_joined_room
|
|
|
+ """
|
|
|
with Measure(self.clock, "load_filtered_recents"):
|
|
|
timeline_limit = sync_config.filter_collection.timeline_limit()
|
|
|
block_all_timeline = (
|
|
@@ -521,6 +536,20 @@ class SyncHandler:
|
|
|
else:
|
|
|
limited = False
|
|
|
|
|
|
+ # Check if there is a gap, if so we need to mark this as limited and
|
|
|
+ # recalculate which events to send down.
|
|
|
+ gap_token = await self.store.get_timeline_gaps(
|
|
|
+ room_id,
|
|
|
+ since_token.room_key if since_token else None,
|
|
|
+ sync_result_builder.now_token.room_key,
|
|
|
+ )
|
|
|
+ if gap_token:
|
|
|
+ # There's a gap, so we need to ignore the passed in
|
|
|
+ # `potential_recents`, and reset `upto_token` to match.
|
|
|
+ potential_recents = None
|
|
|
+ upto_token = sync_result_builder.now_token
|
|
|
+ limited = True
|
|
|
+
|
|
|
log_kv({"limited": limited})
|
|
|
|
|
|
if potential_recents:
|
|
@@ -559,10 +588,10 @@ class SyncHandler:
|
|
|
recents = []
|
|
|
|
|
|
if not limited or block_all_timeline:
|
|
|
- prev_batch_token = now_token
|
|
|
+ prev_batch_token = upto_token
|
|
|
if recents:
|
|
|
room_key = recents[0].internal_metadata.before
|
|
|
- prev_batch_token = now_token.copy_and_replace(
|
|
|
+ prev_batch_token = upto_token.copy_and_replace(
|
|
|
StreamKeyType.ROOM, room_key
|
|
|
)
|
|
|
|
|
@@ -573,11 +602,15 @@ class SyncHandler:
|
|
|
filtering_factor = 2
|
|
|
load_limit = max(timeline_limit * filtering_factor, 10)
|
|
|
max_repeat = 5 # Only try a few times per room, otherwise
|
|
|
- room_key = now_token.room_key
|
|
|
+ room_key = upto_token.room_key
|
|
|
end_key = room_key
|
|
|
|
|
|
since_key = None
|
|
|
- if since_token and not newly_joined_room:
|
|
|
+ if since_token and gap_token:
|
|
|
+ # If there is a gap then we need to only include events after
|
|
|
+ # it.
|
|
|
+ since_key = gap_token
|
|
|
+ elif since_token and not newly_joined_room:
|
|
|
since_key = since_token.room_key
|
|
|
|
|
|
while limited and len(recents) < timeline_limit and max_repeat:
|
|
@@ -647,7 +680,7 @@ class SyncHandler:
|
|
|
recents = recents[-timeline_limit:]
|
|
|
room_key = recents[0].internal_metadata.before
|
|
|
|
|
|
- prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key)
|
|
|
+ prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key)
|
|
|
|
|
|
# Don't bother to bundle aggregations if the timeline is unlimited,
|
|
|
# as clients will have all the necessary information.
|
|
@@ -662,7 +695,9 @@ class SyncHandler:
|
|
|
return TimelineBatch(
|
|
|
events=recents,
|
|
|
prev_batch=prev_batch_token,
|
|
|
- limited=limited or newly_joined_room,
|
|
|
+ # Also mark as limited if this is a new room or there has been a gap
|
|
|
+ # (to force client to paginate the gap).
|
|
|
+ limited=limited or newly_joined_room or gap_token is not None,
|
|
|
bundled_aggregations=bundled_aggregations,
|
|
|
)
|
|
|
|
|
@@ -2397,8 +2432,9 @@ class SyncHandler:
|
|
|
|
|
|
batch = await self._load_filtered_recents(
|
|
|
room_id,
|
|
|
+ sync_result_builder,
|
|
|
sync_config,
|
|
|
- now_token=upto_token,
|
|
|
+ upto_token=upto_token,
|
|
|
since_token=since_token,
|
|
|
potential_recents=events,
|
|
|
newly_joined_room=newly_joined,
|