|
@@ -364,147 +364,161 @@ class EventsStore(
|
|
|
if not events_and_contexts:
|
|
|
return
|
|
|
|
|
|
- if backfilled:
|
|
|
- stream_ordering_manager = self._backfill_id_gen.get_next_mult(
|
|
|
- len(events_and_contexts)
|
|
|
- )
|
|
|
- else:
|
|
|
- stream_ordering_manager = self._stream_id_gen.get_next_mult(
|
|
|
- len(events_and_contexts)
|
|
|
- )
|
|
|
-
|
|
|
- with stream_ordering_manager as stream_orderings:
|
|
|
- for (event, context), stream in zip(events_and_contexts, stream_orderings):
|
|
|
- event.internal_metadata.stream_ordering = stream
|
|
|
-
|
|
|
- chunks = [
|
|
|
- events_and_contexts[x : x + 100]
|
|
|
- for x in range(0, len(events_and_contexts), 100)
|
|
|
- ]
|
|
|
-
|
|
|
- for chunk in chunks:
|
|
|
- # We can't easily parallelize these since different chunks
|
|
|
- # might contain the same event. :(
|
|
|
+ chunks = [
|
|
|
+ events_and_contexts[x : x + 100]
|
|
|
+ for x in range(0, len(events_and_contexts), 100)
|
|
|
+ ]
|
|
|
|
|
|
- # NB: Assumes that we are only persisting events for one room
|
|
|
- # at a time.
|
|
|
+ for chunk in chunks:
|
|
|
+ # We can't easily parallelize these since different chunks
|
|
|
+ # might contain the same event. :(
|
|
|
|
|
|
- # map room_id->list[event_ids] giving the new forward
|
|
|
- # extremities in each room
|
|
|
- new_forward_extremeties = {}
|
|
|
+ # NB: Assumes that we are only persisting events for one room
|
|
|
+ # at a time.
|
|
|
|
|
|
- # map room_id->(type,state_key)->event_id tracking the full
|
|
|
- # state in each room after adding these events.
|
|
|
- # This is simply used to prefill the get_current_state_ids
|
|
|
- # cache
|
|
|
- current_state_for_room = {}
|
|
|
+ # map room_id->list[event_ids] giving the new forward
|
|
|
+ # extremities in each room
|
|
|
+ new_forward_extremeties = {}
|
|
|
|
|
|
- # map room_id->(to_delete, to_insert) where to_delete is a list
|
|
|
- # of type/state keys to remove from current state, and to_insert
|
|
|
- # is a map (type,key)->event_id giving the state delta in each
|
|
|
- # room
|
|
|
- state_delta_for_room = {}
|
|
|
+ # map room_id->(type,state_key)->event_id tracking the full
|
|
|
+ # state in each room after adding these events.
|
|
|
+ # This is simply used to prefill the get_current_state_ids
|
|
|
+ # cache
|
|
|
+ current_state_for_room = {}
|
|
|
|
|
|
- if not backfilled:
|
|
|
- with Measure(self._clock, "_calculate_state_and_extrem"):
|
|
|
- # Work out the new "current state" for each room.
|
|
|
- # We do this by working out what the new extremities are and then
|
|
|
- # calculating the state from that.
|
|
|
- events_by_room = {}
|
|
|
- for event, context in chunk:
|
|
|
- events_by_room.setdefault(event.room_id, []).append(
|
|
|
- (event, context)
|
|
|
- )
|
|
|
+ # map room_id->(to_delete, to_insert) where to_delete is a list
|
|
|
+ # of type/state keys to remove from current state, and to_insert
|
|
|
+ # is a map (type,key)->event_id giving the state delta in each
|
|
|
+ # room
|
|
|
+ state_delta_for_room = {}
|
|
|
|
|
|
- for room_id, ev_ctx_rm in iteritems(events_by_room):
|
|
|
- latest_event_ids = yield self.get_latest_event_ids_in_room(
|
|
|
- room_id
|
|
|
- )
|
|
|
- new_latest_event_ids = yield self._calculate_new_extremities(
|
|
|
- room_id, ev_ctx_rm, latest_event_ids
|
|
|
+ if not backfilled:
|
|
|
+ with Measure(self._clock, "_calculate_state_and_extrem"):
|
|
|
+ # Work out the new "current state" for each room.
|
|
|
+ # We do this by working out what the new extremities are and then
|
|
|
+ # calculating the state from that.
|
|
|
+ events_by_room = {}
|
|
|
+ for event, context in chunk:
|
|
|
+ events_by_room.setdefault(event.room_id, []).append(
|
|
|
+ (event, context)
|
|
|
+ )
|
|
|
+
|
|
|
+ for room_id, ev_ctx_rm in iteritems(events_by_room):
|
|
|
+ latest_event_ids = yield self.get_latest_event_ids_in_room(
|
|
|
+ room_id
|
|
|
+ )
|
|
|
+ new_latest_event_ids = yield self._calculate_new_extremities(
|
|
|
+ room_id, ev_ctx_rm, latest_event_ids
|
|
|
+ )
|
|
|
+
|
|
|
+ latest_event_ids = set(latest_event_ids)
|
|
|
+ if new_latest_event_ids == latest_event_ids:
|
|
|
+ # No change in extremities, so no change in state
|
|
|
+ continue
|
|
|
+
|
|
|
+ # there should always be at least one forward extremity.
|
|
|
+ # (except during the initial persistence of the send_join
|
|
|
+ # results, in which case there will be no existing
|
|
|
+ # extremities, so we'll `continue` above and skip this bit.)
|
|
|
+ assert new_latest_event_ids, "No forward extremities left!"
|
|
|
+
|
|
|
+ new_forward_extremeties[room_id] = new_latest_event_ids
|
|
|
+
|
|
|
+ len_1 = (
|
|
|
+ len(latest_event_ids) == 1
|
|
|
+ and len(new_latest_event_ids) == 1
|
|
|
+ )
|
|
|
+ if len_1:
|
|
|
+ all_single_prev_not_state = all(
|
|
|
+ len(event.prev_event_ids()) == 1
|
|
|
+ and not event.is_state()
|
|
|
+ for event, ctx in ev_ctx_rm
|
|
|
)
|
|
|
-
|
|
|
- latest_event_ids = set(latest_event_ids)
|
|
|
- if new_latest_event_ids == latest_event_ids:
|
|
|
- # No change in extremities, so no change in state
|
|
|
+ # Don't bother calculating state if they're just
|
|
|
+ # a long chain of single ancestor non-state events.
|
|
|
+ if all_single_prev_not_state:
|
|
|
continue
|
|
|
|
|
|
- # there should always be at least one forward extremity.
|
|
|
- # (except during the initial persistence of the send_join
|
|
|
- # results, in which case there will be no existing
|
|
|
- # extremities, so we'll `continue` above and skip this bit.)
|
|
|
- assert new_latest_event_ids, "No forward extremities left!"
|
|
|
-
|
|
|
- new_forward_extremeties[room_id] = new_latest_event_ids
|
|
|
-
|
|
|
- len_1 = (
|
|
|
- len(latest_event_ids) == 1
|
|
|
- and len(new_latest_event_ids) == 1
|
|
|
+ state_delta_counter.inc()
|
|
|
+ if len(new_latest_event_ids) == 1:
|
|
|
+ state_delta_single_event_counter.inc()
|
|
|
+
|
|
|
+ # This is a fairly handwavey check to see if we could
|
|
|
+ # have guessed what the delta would have been when
|
|
|
+ # processing one of these events.
|
|
|
+ # What we're interested in is if the latest extremities
|
|
|
+ # were the same when we created the event as they are
|
|
|
+ # now. When this server creates a new event (as opposed
|
|
|
+ # to receiving it over federation) it will use the
|
|
|
+ # forward extremities as the prev_events, so we can
|
|
|
+ # guess this by looking at the prev_events and checking
|
|
|
+ # if they match the current forward extremities.
|
|
|
+ for ev, _ in ev_ctx_rm:
|
|
|
+ prev_event_ids = set(ev.prev_event_ids())
|
|
|
+ if latest_event_ids == prev_event_ids:
|
|
|
+ state_delta_reuse_delta_counter.inc()
|
|
|
+ break
|
|
|
+
|
|
|
+ logger.info("Calculating state delta for room %s", room_id)
|
|
|
+ with Measure(
|
|
|
+ self._clock, "persist_events.get_new_state_after_events"
|
|
|
+ ):
|
|
|
+ res = yield self._get_new_state_after_events(
|
|
|
+ room_id,
|
|
|
+ ev_ctx_rm,
|
|
|
+ latest_event_ids,
|
|
|
+ new_latest_event_ids,
|
|
|
)
|
|
|
- if len_1:
|
|
|
- all_single_prev_not_state = all(
|
|
|
- len(event.prev_event_ids()) == 1
|
|
|
- and not event.is_state()
|
|
|
- for event, ctx in ev_ctx_rm
|
|
|
- )
|
|
|
- # Don't bother calculating state if they're just
|
|
|
- # a long chain of single ancestor non-state events.
|
|
|
- if all_single_prev_not_state:
|
|
|
- continue
|
|
|
-
|
|
|
- state_delta_counter.inc()
|
|
|
- if len(new_latest_event_ids) == 1:
|
|
|
- state_delta_single_event_counter.inc()
|
|
|
-
|
|
|
- # This is a fairly handwavey check to see if we could
|
|
|
- # have guessed what the delta would have been when
|
|
|
- # processing one of these events.
|
|
|
- # What we're interested in is if the latest extremities
|
|
|
- # were the same when we created the event as they are
|
|
|
- # now. When this server creates a new event (as opposed
|
|
|
- # to receiving it over federation) it will use the
|
|
|
- # forward extremities as the prev_events, so we can
|
|
|
- # guess this by looking at the prev_events and checking
|
|
|
- # if they match the current forward extremities.
|
|
|
- for ev, _ in ev_ctx_rm:
|
|
|
- prev_event_ids = set(ev.prev_event_ids())
|
|
|
- if latest_event_ids == prev_event_ids:
|
|
|
- state_delta_reuse_delta_counter.inc()
|
|
|
- break
|
|
|
-
|
|
|
- logger.info("Calculating state delta for room %s", room_id)
|
|
|
+ current_state, delta_ids = res
|
|
|
+
|
|
|
+ # If either are not None then there has been a change,
|
|
|
+ # and we need to work out the delta (or use that
|
|
|
+ # given)
|
|
|
+ if delta_ids is not None:
|
|
|
+ # If there is a delta we know that we've
|
|
|
+ # only added or replaced state, never
|
|
|
+ # removed keys entirely.
|
|
|
+ state_delta_for_room[room_id] = ([], delta_ids)
|
|
|
+ elif current_state is not None:
|
|
|
with Measure(
|
|
|
- self._clock, "persist_events.get_new_state_after_events"
|
|
|
+ self._clock, "persist_events.calculate_state_delta"
|
|
|
):
|
|
|
- res = yield self._get_new_state_after_events(
|
|
|
- room_id,
|
|
|
- ev_ctx_rm,
|
|
|
- latest_event_ids,
|
|
|
- new_latest_event_ids,
|
|
|
+ delta = yield self._calculate_state_delta(
|
|
|
+ room_id, current_state
|
|
|
)
|
|
|
- current_state, delta_ids = res
|
|
|
-
|
|
|
- # If either are not None then there has been a change,
|
|
|
- # and we need to work out the delta (or use that
|
|
|
- # given)
|
|
|
- if delta_ids is not None:
|
|
|
- # If there is a delta we know that we've
|
|
|
- # only added or replaced state, never
|
|
|
- # removed keys entirely.
|
|
|
- state_delta_for_room[room_id] = ([], delta_ids)
|
|
|
- elif current_state is not None:
|
|
|
- with Measure(
|
|
|
- self._clock, "persist_events.calculate_state_delta"
|
|
|
- ):
|
|
|
- delta = yield self._calculate_state_delta(
|
|
|
- room_id, current_state
|
|
|
- )
|
|
|
- state_delta_for_room[room_id] = delta
|
|
|
-
|
|
|
- # If we have the current_state then lets prefill
|
|
|
- # the cache with it.
|
|
|
- if current_state is not None:
|
|
|
- current_state_for_room[room_id] = current_state
|
|
|
+ state_delta_for_room[room_id] = delta
|
|
|
+
|
|
|
+ # If we have the current_state then lets prefill
|
|
|
+ # the cache with it.
|
|
|
+ if current_state is not None:
|
|
|
+ current_state_for_room[room_id] = current_state
|
|
|
+
|
|
|
+ # We want to calculate the stream orderings as late as possible, as
|
|
|
+ # we only notify after all events with a lesser stream ordering have
|
|
|
+ # been persisted. I.e. if we spend 10s inside the with block then
|
|
|
+ # that will delay all subsequent events from being notified about.
|
|
|
+ # Hence why we do it down here rather than wrapping the entire
|
|
|
+ # function.
|
|
|
+ #
|
|
|
+ # Its safe to do this after calculating the state deltas etc as we
|
|
|
+ # only need to protect the *persistence* of the events. This is to
|
|
|
+ # ensure that queries of the form "fetch events since X" don't
|
|
|
+ # return events and stream positions after events that are still in
|
|
|
+ # flight, as otherwise subsequent requests "fetch event since Y"
|
|
|
+ # will not return those events.
|
|
|
+ #
|
|
|
+ # Note: Multiple instances of this function cannot be in flight at
|
|
|
+ # the same time for the same room.
|
|
|
+ if backfilled:
|
|
|
+ stream_ordering_manager = self._backfill_id_gen.get_next_mult(
|
|
|
+ len(chunk)
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ stream_ordering_manager = self._stream_id_gen.get_next_mult(len(chunk))
|
|
|
+
|
|
|
+ with stream_ordering_manager as stream_orderings:
|
|
|
+ for (event, context), stream in zip(chunk, stream_orderings):
|
|
|
+ event.internal_metadata.stream_ordering = stream
|
|
|
|
|
|
yield self.runInteraction(
|
|
|
"persist_events",
|