|
@@ -40,6 +40,9 @@ import synapse.metrics
|
|
|
from synapse.events import EventBase # noqa: F401
|
|
|
from synapse.events.snapshot import EventContext # noqa: F401
|
|
|
|
|
|
+from six.moves import range
|
|
|
+from six import itervalues, iteritems
|
|
|
+
|
|
|
from prometheus_client import Counter
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
@@ -245,7 +248,7 @@ class EventsStore(EventsWorkerStore):
|
|
|
partitioned.setdefault(event.room_id, []).append((event, ctx))
|
|
|
|
|
|
deferreds = []
|
|
|
- for room_id, evs_ctxs in partitioned.iteritems():
|
|
|
+ for room_id, evs_ctxs in iteritems(partitioned):
|
|
|
d = self._event_persist_queue.add_to_queue(
|
|
|
room_id, evs_ctxs,
|
|
|
backfilled=backfilled,
|
|
@@ -330,7 +333,7 @@ class EventsStore(EventsWorkerStore):
|
|
|
|
|
|
chunks = [
|
|
|
events_and_contexts[x:x + 100]
|
|
|
- for x in xrange(0, len(events_and_contexts), 100)
|
|
|
+ for x in range(0, len(events_and_contexts), 100)
|
|
|
]
|
|
|
|
|
|
for chunk in chunks:
|
|
@@ -364,7 +367,7 @@ class EventsStore(EventsWorkerStore):
|
|
|
(event, context)
|
|
|
)
|
|
|
|
|
|
- for room_id, ev_ctx_rm in events_by_room.iteritems():
|
|
|
+ for room_id, ev_ctx_rm in iteritems(events_by_room):
|
|
|
# Work out new extremities by recursively adding and removing
|
|
|
# the new events.
|
|
|
latest_event_ids = yield self.get_latest_event_ids_in_room(
|
|
@@ -459,12 +462,12 @@ class EventsStore(EventsWorkerStore):
|
|
|
|
|
|
event_counter.labels(event.type, origin_type, origin_entity).inc()
|
|
|
|
|
|
- for room_id, new_state in current_state_for_room.iteritems():
|
|
|
+ for room_id, new_state in iteritems(current_state_for_room):
|
|
|
self.get_current_state_ids.prefill(
|
|
|
(room_id, ), new_state
|
|
|
)
|
|
|
|
|
|
- for room_id, latest_event_ids in new_forward_extremeties.iteritems():
|
|
|
+ for room_id, latest_event_ids in iteritems(new_forward_extremeties):
|
|
|
self.get_latest_event_ids_in_room.prefill(
|
|
|
(room_id,), list(latest_event_ids)
|
|
|
)
|
|
@@ -641,20 +644,20 @@ class EventsStore(EventsWorkerStore):
|
|
|
"""
|
|
|
existing_state = yield self.get_current_state_ids(room_id)
|
|
|
|
|
|
- existing_events = set(existing_state.itervalues())
|
|
|
- new_events = set(ev_id for ev_id in current_state.itervalues())
|
|
|
+ existing_events = set(itervalues(existing_state))
|
|
|
+ new_events = set(ev_id for ev_id in itervalues(current_state))
|
|
|
changed_events = existing_events ^ new_events
|
|
|
|
|
|
if not changed_events:
|
|
|
return
|
|
|
|
|
|
to_delete = {
|
|
|
- key: ev_id for key, ev_id in existing_state.iteritems()
|
|
|
+ key: ev_id for key, ev_id in iteritems(existing_state)
|
|
|
if ev_id in changed_events
|
|
|
}
|
|
|
events_to_insert = (new_events - existing_events)
|
|
|
to_insert = {
|
|
|
- key: ev_id for key, ev_id in current_state.iteritems()
|
|
|
+ key: ev_id for key, ev_id in iteritems(current_state)
|
|
|
if ev_id in events_to_insert
|
|
|
}
|
|
|
|
|
@@ -757,11 +760,11 @@ class EventsStore(EventsWorkerStore):
|
|
|
)
|
|
|
|
|
|
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
|
|
|
- for room_id, current_state_tuple in state_delta_by_room.iteritems():
|
|
|
+ for room_id, current_state_tuple in iteritems(state_delta_by_room):
|
|
|
to_delete, to_insert = current_state_tuple
|
|
|
txn.executemany(
|
|
|
"DELETE FROM current_state_events WHERE event_id = ?",
|
|
|
- [(ev_id,) for ev_id in to_delete.itervalues()],
|
|
|
+ [(ev_id,) for ev_id in itervalues(to_delete)],
|
|
|
)
|
|
|
|
|
|
self._simple_insert_many_txn(
|
|
@@ -774,7 +777,7 @@ class EventsStore(EventsWorkerStore):
|
|
|
"type": key[0],
|
|
|
"state_key": key[1],
|
|
|
}
|
|
|
- for key, ev_id in to_insert.iteritems()
|
|
|
+ for key, ev_id in iteritems(to_insert)
|
|
|
],
|
|
|
)
|
|
|
|
|
@@ -793,7 +796,7 @@ class EventsStore(EventsWorkerStore):
|
|
|
"event_id": ev_id,
|
|
|
"prev_event_id": to_delete.get(key, None),
|
|
|
}
|
|
|
- for key, ev_id in state_deltas.iteritems()
|
|
|
+ for key, ev_id in iteritems(state_deltas)
|
|
|
]
|
|
|
)
|
|
|
|
|
@@ -836,7 +839,7 @@ class EventsStore(EventsWorkerStore):
|
|
|
|
|
|
def _update_forward_extremities_txn(self, txn, new_forward_extremities,
|
|
|
max_stream_order):
|
|
|
- for room_id, new_extrem in new_forward_extremities.iteritems():
|
|
|
+ for room_id, new_extrem in iteritems(new_forward_extremities):
|
|
|
self._simple_delete_txn(
|
|
|
txn,
|
|
|
table="event_forward_extremities",
|
|
@@ -854,7 +857,7 @@ class EventsStore(EventsWorkerStore):
|
|
|
"event_id": ev_id,
|
|
|
"room_id": room_id,
|
|
|
}
|
|
|
- for room_id, new_extrem in new_forward_extremities.iteritems()
|
|
|
+ for room_id, new_extrem in iteritems(new_forward_extremities)
|
|
|
for ev_id in new_extrem
|
|
|
],
|
|
|
)
|
|
@@ -871,7 +874,7 @@ class EventsStore(EventsWorkerStore):
|
|
|
"event_id": event_id,
|
|
|
"stream_ordering": max_stream_order,
|
|
|
}
|
|
|
- for room_id, new_extrem in new_forward_extremities.iteritems()
|
|
|
+ for room_id, new_extrem in iteritems(new_forward_extremities)
|
|
|
for event_id in new_extrem
|
|
|
]
|
|
|
)
|
|
@@ -899,7 +902,7 @@ class EventsStore(EventsWorkerStore):
|
|
|
new_events_and_contexts[event.event_id] = (event, context)
|
|
|
else:
|
|
|
new_events_and_contexts[event.event_id] = (event, context)
|
|
|
- return new_events_and_contexts.values()
|
|
|
+ return list(new_events_and_contexts.values())
|
|
|
|
|
|
def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
|
|
|
"""Update min_depth for each room
|
|
@@ -925,7 +928,7 @@ class EventsStore(EventsWorkerStore):
|
|
|
event.depth, depth_updates.get(event.room_id, event.depth)
|
|
|
)
|
|
|
|
|
|
- for room_id, depth in depth_updates.iteritems():
|
|
|
+ for room_id, depth in iteritems(depth_updates):
|
|
|
self._update_min_depth_for_room_txn(txn, room_id, depth)
|
|
|
|
|
|
def _update_outliers_txn(self, txn, events_and_contexts):
|
|
@@ -1309,7 +1312,7 @@ class EventsStore(EventsWorkerStore):
|
|
|
" WHERE e.event_id IN (%s)"
|
|
|
) % (",".join(["?"] * len(ev_map)),)
|
|
|
|
|
|
- txn.execute(sql, ev_map.keys())
|
|
|
+ txn.execute(sql, list(ev_map))
|
|
|
rows = self.cursor_to_dict(txn)
|
|
|
for row in rows:
|
|
|
event = ev_map[row["event_id"]]
|
|
@@ -1572,7 +1575,7 @@ class EventsStore(EventsWorkerStore):
|
|
|
|
|
|
chunks = [
|
|
|
event_ids[i:i + 100]
|
|
|
- for i in xrange(0, len(event_ids), 100)
|
|
|
+ for i in range(0, len(event_ids), 100)
|
|
|
]
|
|
|
for chunk in chunks:
|
|
|
ev_rows = self._simple_select_many_txn(
|
|
@@ -1986,7 +1989,7 @@ class EventsStore(EventsWorkerStore):
|
|
|
logger.info("[purge] finding state groups which depend on redundant"
|
|
|
" state groups")
|
|
|
remaining_state_groups = []
|
|
|
- for i in xrange(0, len(state_rows), 100):
|
|
|
+ for i in range(0, len(state_rows), 100):
|
|
|
chunk = [sg for sg, in state_rows[i:i + 100]]
|
|
|
# look for state groups whose prev_state_group is one we are about
|
|
|
# to delete
|
|
@@ -2042,7 +2045,7 @@ class EventsStore(EventsWorkerStore):
|
|
|
"state_key": key[1],
|
|
|
"event_id": state_id,
|
|
|
}
|
|
|
- for key, state_id in curr_state.iteritems()
|
|
|
+ for key, state_id in iteritems(curr_state)
|
|
|
],
|
|
|
)
|
|
|
|