events.py 61 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018-2019 New Vector Ltd
  4. # Copyright 2019 The Matrix.org Foundation C.I.C.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. import itertools
  18. import logging
  19. from collections import OrderedDict, namedtuple
  20. from functools import wraps
  21. from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
  22. import attr
  23. from canonicaljson import json
  24. from prometheus_client import Counter
  25. from twisted.internet import defer
  26. import synapse.metrics
  27. from synapse.api.constants import (
  28. EventContentFields,
  29. EventTypes,
  30. Membership,
  31. RelationTypes,
  32. )
  33. from synapse.api.room_versions import RoomVersions
  34. from synapse.crypto.event_signing import compute_event_reference_hash
  35. from synapse.events import EventBase # noqa: F401
  36. from synapse.events.snapshot import EventContext # noqa: F401
  37. from synapse.logging.utils import log_function
  38. from synapse.storage._base import make_in_list_sql_clause
  39. from synapse.storage.data_stores.main.search import SearchEntry
  40. from synapse.storage.database import Database, LoggingTransaction
  41. from synapse.storage.util.id_generators import StreamIdGenerator
  42. from synapse.types import StateMap, get_domain_from_id
  43. from synapse.util.frozenutils import frozendict_json_encoder
  44. from synapse.util.iterutils import batch_iter
  45. if TYPE_CHECKING:
  46. from synapse.storage.data_stores.main import DataStore
  47. from synapse.server import HomeServer
  48. logger = logging.getLogger(__name__)
  49. persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
  50. event_counter = Counter(
  51. "synapse_storage_events_persisted_events_sep",
  52. "",
  53. ["type", "origin_type", "origin_entity"],
  54. )
  55. def encode_json(json_object):
  56. """
  57. Encode a Python object as JSON and return it in a Unicode string.
  58. """
  59. out = frozendict_json_encoder.encode(json_object)
  60. if isinstance(out, bytes):
  61. out = out.decode("utf8")
  62. return out
  63. _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
  64. def _retry_on_integrity_error(func):
  65. """Wraps a database function so that it gets retried on IntegrityError,
  66. with `delete_existing=True` passed in.
  67. Args:
  68. func: function that returns a Deferred and accepts a `delete_existing` arg
  69. """
  70. @wraps(func)
  71. @defer.inlineCallbacks
  72. def f(self, *args, **kwargs):
  73. try:
  74. res = yield func(self, *args, delete_existing=False, **kwargs)
  75. except self.database_engine.module.IntegrityError:
  76. logger.exception("IntegrityError, retrying.")
  77. res = yield func(self, *args, delete_existing=True, **kwargs)
  78. return res
  79. return f
  80. @attr.s(slots=True)
  81. class DeltaState:
  82. """Deltas to use to update the `current_state_events` table.
  83. Attributes:
  84. to_delete: List of type/state_keys to delete from current state
  85. to_insert: Map of state to upsert into current state
  86. no_longer_in_room: The server is not longer in the room, so the room
  87. should e.g. be removed from `current_state_events` table.
  88. """
  89. to_delete = attr.ib(type=List[Tuple[str, str]])
  90. to_insert = attr.ib(type=StateMap[str])
  91. no_longer_in_room = attr.ib(type=bool, default=False)
  92. class PersistEventsStore:
  93. """Contains all the functions for writing events to the database.
  94. Should only be instantiated on one process (when using a worker mode setup).
  95. Note: This is not part of the `DataStore` mixin.
  96. """
  97. def __init__(self, hs: "HomeServer", db: Database, main_data_store: "DataStore"):
  98. self.hs = hs
  99. self.db = db
  100. self.store = main_data_store
  101. self.database_engine = db.engine
  102. self._clock = hs.get_clock()
  103. self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
  104. self.is_mine_id = hs.is_mine_id
  105. # Ideally we'd move these ID gens here, unfortunately some other ID
  106. # generators are chained off them so doing so is a bit of a PITA.
  107. self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator
  108. self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator
  109. # This should only exist on instances that are configured to write
  110. assert (
  111. hs.config.worker.writers.events == hs.get_instance_name()
  112. ), "Can only instantiate EventsStore on master"
  113. @_retry_on_integrity_error
  114. @defer.inlineCallbacks
  115. def _persist_events_and_state_updates(
  116. self,
  117. events_and_contexts: List[Tuple[EventBase, EventContext]],
  118. current_state_for_room: Dict[str, StateMap[str]],
  119. state_delta_for_room: Dict[str, DeltaState],
  120. new_forward_extremeties: Dict[str, List[str]],
  121. backfilled: bool = False,
  122. delete_existing: bool = False,
  123. ):
  124. """Persist a set of events alongside updates to the current state and
  125. forward extremities tables.
  126. Args:
  127. events_and_contexts:
  128. current_state_for_room: Map from room_id to the current state of
  129. the room based on forward extremities
  130. state_delta_for_room: Map from room_id to the delta to apply to
  131. room state
  132. new_forward_extremities: Map from room_id to list of event IDs
  133. that are the new forward extremities of the room.
  134. backfilled
  135. delete_existing
  136. Returns:
  137. Deferred: resolves when the events have been persisted
  138. """
  139. # We want to calculate the stream orderings as late as possible, as
  140. # we only notify after all events with a lesser stream ordering have
  141. # been persisted. I.e. if we spend 10s inside the with block then
  142. # that will delay all subsequent events from being notified about.
  143. # Hence why we do it down here rather than wrapping the entire
  144. # function.
  145. #
  146. # Its safe to do this after calculating the state deltas etc as we
  147. # only need to protect the *persistence* of the events. This is to
  148. # ensure that queries of the form "fetch events since X" don't
  149. # return events and stream positions after events that are still in
  150. # flight, as otherwise subsequent requests "fetch event since Y"
  151. # will not return those events.
  152. #
  153. # Note: Multiple instances of this function cannot be in flight at
  154. # the same time for the same room.
  155. if backfilled:
  156. stream_ordering_manager = self._backfill_id_gen.get_next_mult(
  157. len(events_and_contexts)
  158. )
  159. else:
  160. stream_ordering_manager = self._stream_id_gen.get_next_mult(
  161. len(events_and_contexts)
  162. )
  163. with stream_ordering_manager as stream_orderings:
  164. for (event, context), stream in zip(events_and_contexts, stream_orderings):
  165. event.internal_metadata.stream_ordering = stream
  166. yield self.db.runInteraction(
  167. "persist_events",
  168. self._persist_events_txn,
  169. events_and_contexts=events_and_contexts,
  170. backfilled=backfilled,
  171. delete_existing=delete_existing,
  172. state_delta_for_room=state_delta_for_room,
  173. new_forward_extremeties=new_forward_extremeties,
  174. )
  175. persist_event_counter.inc(len(events_and_contexts))
  176. if not backfilled:
  177. # backfilled events have negative stream orderings, so we don't
  178. # want to set the event_persisted_position to that.
  179. synapse.metrics.event_persisted_position.set(
  180. events_and_contexts[-1][0].internal_metadata.stream_ordering
  181. )
  182. for event, context in events_and_contexts:
  183. if context.app_service:
  184. origin_type = "local"
  185. origin_entity = context.app_service.id
  186. elif self.hs.is_mine_id(event.sender):
  187. origin_type = "local"
  188. origin_entity = "*client*"
  189. else:
  190. origin_type = "remote"
  191. origin_entity = get_domain_from_id(event.sender)
  192. event_counter.labels(event.type, origin_type, origin_entity).inc()
  193. for room_id, new_state in current_state_for_room.items():
  194. self.store.get_current_state_ids.prefill((room_id,), new_state)
  195. for room_id, latest_event_ids in new_forward_extremeties.items():
  196. self.store.get_latest_event_ids_in_room.prefill(
  197. (room_id,), list(latest_event_ids)
  198. )
  199. @defer.inlineCallbacks
  200. def _get_events_which_are_prevs(self, event_ids):
  201. """Filter the supplied list of event_ids to get those which are prev_events of
  202. existing (non-outlier/rejected) events.
  203. Args:
  204. event_ids (Iterable[str]): event ids to filter
  205. Returns:
  206. Deferred[List[str]]: filtered event ids
  207. """
  208. results = []
  209. def _get_events_which_are_prevs_txn(txn, batch):
  210. sql = """
  211. SELECT prev_event_id, internal_metadata
  212. FROM event_edges
  213. INNER JOIN events USING (event_id)
  214. LEFT JOIN rejections USING (event_id)
  215. LEFT JOIN event_json USING (event_id)
  216. WHERE
  217. NOT events.outlier
  218. AND rejections.event_id IS NULL
  219. AND
  220. """
  221. clause, args = make_in_list_sql_clause(
  222. self.database_engine, "prev_event_id", batch
  223. )
  224. txn.execute(sql + clause, args)
  225. results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
  226. for chunk in batch_iter(event_ids, 100):
  227. yield self.db.runInteraction(
  228. "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
  229. )
  230. return results
  231. @defer.inlineCallbacks
  232. def _get_prevs_before_rejected(self, event_ids):
  233. """Get soft-failed ancestors to remove from the extremities.
  234. Given a set of events, find all those that have been soft-failed or
  235. rejected. Returns those soft failed/rejected events and their prev
  236. events (whether soft-failed/rejected or not), and recurses up the
  237. prev-event graph until it finds no more soft-failed/rejected events.
  238. This is used to find extremities that are ancestors of new events, but
  239. are separated by soft failed events.
  240. Args:
  241. event_ids (Iterable[str]): Events to find prev events for. Note
  242. that these must have already been persisted.
  243. Returns:
  244. Deferred[set[str]]
  245. """
  246. # The set of event_ids to return. This includes all soft-failed events
  247. # and their prev events.
  248. existing_prevs = set()
  249. def _get_prevs_before_rejected_txn(txn, batch):
  250. to_recursively_check = batch
  251. while to_recursively_check:
  252. sql = """
  253. SELECT
  254. event_id, prev_event_id, internal_metadata,
  255. rejections.event_id IS NOT NULL
  256. FROM event_edges
  257. INNER JOIN events USING (event_id)
  258. LEFT JOIN rejections USING (event_id)
  259. LEFT JOIN event_json USING (event_id)
  260. WHERE
  261. NOT events.outlier
  262. AND
  263. """
  264. clause, args = make_in_list_sql_clause(
  265. self.database_engine, "event_id", to_recursively_check
  266. )
  267. txn.execute(sql + clause, args)
  268. to_recursively_check = []
  269. for event_id, prev_event_id, metadata, rejected in txn:
  270. if prev_event_id in existing_prevs:
  271. continue
  272. soft_failed = json.loads(metadata).get("soft_failed")
  273. if soft_failed or rejected:
  274. to_recursively_check.append(prev_event_id)
  275. existing_prevs.add(prev_event_id)
  276. for chunk in batch_iter(event_ids, 100):
  277. yield self.db.runInteraction(
  278. "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
  279. )
  280. return existing_prevs
  281. @log_function
  282. def _persist_events_txn(
  283. self,
  284. txn: LoggingTransaction,
  285. events_and_contexts: List[Tuple[EventBase, EventContext]],
  286. backfilled: bool,
  287. delete_existing: bool = False,
  288. state_delta_for_room: Dict[str, DeltaState] = {},
  289. new_forward_extremeties: Dict[str, List[str]] = {},
  290. ):
  291. """Insert some number of room events into the necessary database tables.
  292. Rejected events are only inserted into the events table, the events_json table,
  293. and the rejections table. Things reading from those table will need to check
  294. whether the event was rejected.
  295. Args:
  296. txn
  297. events_and_contexts: events to persist
  298. backfilled: True if the events were backfilled
  299. delete_existing True to purge existing table rows for the events
  300. from the database. This is useful when retrying due to
  301. IntegrityError.
  302. state_delta_for_room: The current-state delta for each room.
  303. new_forward_extremetie: The new forward extremities for each room.
  304. For each room, a list of the event ids which are the forward
  305. extremities.
  306. """
  307. all_events_and_contexts = events_and_contexts
  308. min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
  309. max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
  310. self._update_forward_extremities_txn(
  311. txn,
  312. new_forward_extremities=new_forward_extremeties,
  313. max_stream_order=max_stream_order,
  314. )
  315. # Ensure that we don't have the same event twice.
  316. events_and_contexts = self._filter_events_and_contexts_for_duplicates(
  317. events_and_contexts
  318. )
  319. self._update_room_depths_txn(
  320. txn, events_and_contexts=events_and_contexts, backfilled=backfilled
  321. )
  322. # _update_outliers_txn filters out any events which have already been
  323. # persisted, and returns the filtered list.
  324. events_and_contexts = self._update_outliers_txn(
  325. txn, events_and_contexts=events_and_contexts
  326. )
  327. # From this point onwards the events are only events that we haven't
  328. # seen before.
  329. if delete_existing:
  330. # For paranoia reasons, we go and delete all the existing entries
  331. # for these events so we can reinsert them.
  332. # This gets around any problems with some tables already having
  333. # entries.
  334. self._delete_existing_rows_txn(txn, events_and_contexts=events_and_contexts)
  335. self._store_event_txn(txn, events_and_contexts=events_and_contexts)
  336. # Insert into event_to_state_groups.
  337. self._store_event_state_mappings_txn(txn, events_and_contexts)
  338. # We want to store event_auth mappings for rejected events, as they're
  339. # used in state res v2.
  340. # This is only necessary if the rejected event appears in an accepted
  341. # event's auth chain, but its easier for now just to store them (and
  342. # it doesn't take much storage compared to storing the entire event
  343. # anyway).
  344. self.db.simple_insert_many_txn(
  345. txn,
  346. table="event_auth",
  347. values=[
  348. {
  349. "event_id": event.event_id,
  350. "room_id": event.room_id,
  351. "auth_id": auth_id,
  352. }
  353. for event, _ in events_and_contexts
  354. for auth_id in event.auth_event_ids()
  355. if event.is_state()
  356. ],
  357. )
  358. # _store_rejected_events_txn filters out any events which were
  359. # rejected, and returns the filtered list.
  360. events_and_contexts = self._store_rejected_events_txn(
  361. txn, events_and_contexts=events_and_contexts
  362. )
  363. # From this point onwards the events are only ones that weren't
  364. # rejected.
  365. self._update_metadata_tables_txn(
  366. txn,
  367. events_and_contexts=events_and_contexts,
  368. all_events_and_contexts=all_events_and_contexts,
  369. backfilled=backfilled,
  370. )
  371. # We call this last as it assumes we've inserted the events into
  372. # room_memberships, where applicable.
  373. self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
  374. def _update_current_state_txn(
  375. self,
  376. txn: LoggingTransaction,
  377. state_delta_by_room: Dict[str, DeltaState],
  378. stream_id: int,
  379. ):
  380. for room_id, delta_state in state_delta_by_room.items():
  381. to_delete = delta_state.to_delete
  382. to_insert = delta_state.to_insert
  383. if delta_state.no_longer_in_room:
  384. # Server is no longer in the room so we delete the room from
  385. # current_state_events, being careful we've already updated the
  386. # rooms.room_version column (which gets populated in a
  387. # background task).
  388. self._upsert_room_version_txn(txn, room_id)
  389. # Before deleting we populate the current_state_delta_stream
  390. # so that async background tasks get told what happened.
  391. sql = """
  392. INSERT INTO current_state_delta_stream
  393. (stream_id, room_id, type, state_key, event_id, prev_event_id)
  394. SELECT ?, room_id, type, state_key, null, event_id
  395. FROM current_state_events
  396. WHERE room_id = ?
  397. """
  398. txn.execute(sql, (stream_id, room_id))
  399. self.db.simple_delete_txn(
  400. txn, table="current_state_events", keyvalues={"room_id": room_id},
  401. )
  402. else:
  403. # We're still in the room, so we update the current state as normal.
  404. # First we add entries to the current_state_delta_stream. We
  405. # do this before updating the current_state_events table so
  406. # that we can use it to calculate the `prev_event_id`. (This
  407. # allows us to not have to pull out the existing state
  408. # unnecessarily).
  409. #
  410. # The stream_id for the update is chosen to be the minimum of the stream_ids
  411. # for the batch of the events that we are persisting; that means we do not
  412. # end up in a situation where workers see events before the
  413. # current_state_delta updates.
  414. #
  415. sql = """
  416. INSERT INTO current_state_delta_stream
  417. (stream_id, room_id, type, state_key, event_id, prev_event_id)
  418. SELECT ?, ?, ?, ?, ?, (
  419. SELECT event_id FROM current_state_events
  420. WHERE room_id = ? AND type = ? AND state_key = ?
  421. )
  422. """
  423. txn.executemany(
  424. sql,
  425. (
  426. (
  427. stream_id,
  428. room_id,
  429. etype,
  430. state_key,
  431. to_insert.get((etype, state_key)),
  432. room_id,
  433. etype,
  434. state_key,
  435. )
  436. for etype, state_key in itertools.chain(to_delete, to_insert)
  437. ),
  438. )
  439. # Now we actually update the current_state_events table
  440. txn.executemany(
  441. "DELETE FROM current_state_events"
  442. " WHERE room_id = ? AND type = ? AND state_key = ?",
  443. (
  444. (room_id, etype, state_key)
  445. for etype, state_key in itertools.chain(to_delete, to_insert)
  446. ),
  447. )
  448. # We include the membership in the current state table, hence we do
  449. # a lookup when we insert. This assumes that all events have already
  450. # been inserted into room_memberships.
  451. txn.executemany(
  452. """INSERT INTO current_state_events
  453. (room_id, type, state_key, event_id, membership)
  454. VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
  455. """,
  456. [
  457. (room_id, key[0], key[1], ev_id, ev_id)
  458. for key, ev_id in to_insert.items()
  459. ],
  460. )
  461. # We now update `local_current_membership`. We do this regardless
  462. # of whether we're still in the room or not to handle the case where
  463. # e.g. we just got banned (where we need to record that fact here).
  464. # Note: Do we really want to delete rows here (that we do not
  465. # subsequently reinsert below)? While technically correct it means
  466. # we have no record of the fact the user *was* a member of the
  467. # room but got, say, state reset out of it.
  468. if to_delete or to_insert:
  469. txn.executemany(
  470. "DELETE FROM local_current_membership"
  471. " WHERE room_id = ? AND user_id = ?",
  472. (
  473. (room_id, state_key)
  474. for etype, state_key in itertools.chain(to_delete, to_insert)
  475. if etype == EventTypes.Member and self.is_mine_id(state_key)
  476. ),
  477. )
  478. if to_insert:
  479. txn.executemany(
  480. """INSERT INTO local_current_membership
  481. (room_id, user_id, event_id, membership)
  482. VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
  483. """,
  484. [
  485. (room_id, key[1], ev_id, ev_id)
  486. for key, ev_id in to_insert.items()
  487. if key[0] == EventTypes.Member and self.is_mine_id(key[1])
  488. ],
  489. )
  490. txn.call_after(
  491. self.store._curr_state_delta_stream_cache.entity_has_changed,
  492. room_id,
  493. stream_id,
  494. )
  495. # Invalidate the various caches
  496. # Figure out the changes of membership to invalidate the
  497. # `get_rooms_for_user` cache.
  498. # We find out which membership events we may have deleted
  499. # and which we have added, then we invlidate the caches for all
  500. # those users.
  501. members_changed = {
  502. state_key
  503. for ev_type, state_key in itertools.chain(to_delete, to_insert)
  504. if ev_type == EventTypes.Member
  505. }
  506. for member in members_changed:
  507. txn.call_after(
  508. self.store.get_rooms_for_user_with_stream_ordering.invalidate,
  509. (member,),
  510. )
  511. self.store._invalidate_state_caches_and_stream(
  512. txn, room_id, members_changed
  513. )
  514. def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str):
  515. """Update the room version in the database based off current state
  516. events.
  517. This is used when we're about to delete current state and we want to
  518. ensure that the `rooms.room_version` column is up to date.
  519. """
  520. sql = """
  521. SELECT json FROM event_json
  522. INNER JOIN current_state_events USING (room_id, event_id)
  523. WHERE room_id = ? AND type = ? AND state_key = ?
  524. """
  525. txn.execute(sql, (room_id, EventTypes.Create, ""))
  526. row = txn.fetchone()
  527. if row:
  528. event_json = json.loads(row[0])
  529. content = event_json.get("content", {})
  530. creator = content.get("creator")
  531. room_version_id = content.get("room_version", RoomVersions.V1.identifier)
  532. self.db.simple_upsert_txn(
  533. txn,
  534. table="rooms",
  535. keyvalues={"room_id": room_id},
  536. values={"room_version": room_version_id},
  537. insertion_values={"is_public": False, "creator": creator},
  538. )
  539. def _update_forward_extremities_txn(
  540. self, txn, new_forward_extremities, max_stream_order
  541. ):
  542. for room_id, new_extrem in new_forward_extremities.items():
  543. self.db.simple_delete_txn(
  544. txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
  545. )
  546. txn.call_after(
  547. self.store.get_latest_event_ids_in_room.invalidate, (room_id,)
  548. )
  549. self.db.simple_insert_many_txn(
  550. txn,
  551. table="event_forward_extremities",
  552. values=[
  553. {"event_id": ev_id, "room_id": room_id}
  554. for room_id, new_extrem in new_forward_extremities.items()
  555. for ev_id in new_extrem
  556. ],
  557. )
  558. # We now insert into stream_ordering_to_exterm a mapping from room_id,
  559. # new stream_ordering to new forward extremeties in the room.
  560. # This allows us to later efficiently look up the forward extremeties
  561. # for a room before a given stream_ordering
  562. self.db.simple_insert_many_txn(
  563. txn,
  564. table="stream_ordering_to_exterm",
  565. values=[
  566. {
  567. "room_id": room_id,
  568. "event_id": event_id,
  569. "stream_ordering": max_stream_order,
  570. }
  571. for room_id, new_extrem in new_forward_extremities.items()
  572. for event_id in new_extrem
  573. ],
  574. )
  575. @classmethod
  576. def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
  577. """Ensure that we don't have the same event twice.
  578. Pick the earliest non-outlier if there is one, else the earliest one.
  579. Args:
  580. events_and_contexts (list[(EventBase, EventContext)]):
  581. Returns:
  582. list[(EventBase, EventContext)]: filtered list
  583. """
  584. new_events_and_contexts = OrderedDict()
  585. for event, context in events_and_contexts:
  586. prev_event_context = new_events_and_contexts.get(event.event_id)
  587. if prev_event_context:
  588. if not event.internal_metadata.is_outlier():
  589. if prev_event_context[0].internal_metadata.is_outlier():
  590. # To ensure correct ordering we pop, as OrderedDict is
  591. # ordered by first insertion.
  592. new_events_and_contexts.pop(event.event_id, None)
  593. new_events_and_contexts[event.event_id] = (event, context)
  594. else:
  595. new_events_and_contexts[event.event_id] = (event, context)
  596. return list(new_events_and_contexts.values())
  597. def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
  598. """Update min_depth for each room
  599. Args:
  600. txn (twisted.enterprise.adbapi.Connection): db connection
  601. events_and_contexts (list[(EventBase, EventContext)]): events
  602. we are persisting
  603. backfilled (bool): True if the events were backfilled
  604. """
  605. depth_updates = {}
  606. for event, context in events_and_contexts:
  607. # Remove the any existing cache entries for the event_ids
  608. txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
  609. if not backfilled:
  610. txn.call_after(
  611. self.store._events_stream_cache.entity_has_changed,
  612. event.room_id,
  613. event.internal_metadata.stream_ordering,
  614. )
  615. if not event.internal_metadata.is_outlier() and not context.rejected:
  616. depth_updates[event.room_id] = max(
  617. event.depth, depth_updates.get(event.room_id, event.depth)
  618. )
  619. for room_id, depth in depth_updates.items():
  620. self._update_min_depth_for_room_txn(txn, room_id, depth)
  621. def _update_outliers_txn(self, txn, events_and_contexts):
  622. """Update any outliers with new event info.
  623. This turns outliers into ex-outliers (unless the new event was
  624. rejected).
  625. Args:
  626. txn (twisted.enterprise.adbapi.Connection): db connection
  627. events_and_contexts (list[(EventBase, EventContext)]): events
  628. we are persisting
  629. Returns:
  630. list[(EventBase, EventContext)] new list, without events which
  631. are already in the events table.
  632. """
  633. txn.execute(
  634. "SELECT event_id, outlier FROM events WHERE event_id in (%s)"
  635. % (",".join(["?"] * len(events_and_contexts)),),
  636. [event.event_id for event, _ in events_and_contexts],
  637. )
  638. have_persisted = {event_id: outlier for event_id, outlier in txn}
  639. to_remove = set()
  640. for event, context in events_and_contexts:
  641. if event.event_id not in have_persisted:
  642. continue
  643. to_remove.add(event)
  644. if context.rejected:
  645. # If the event is rejected then we don't care if the event
  646. # was an outlier or not.
  647. continue
  648. outlier_persisted = have_persisted[event.event_id]
  649. if not event.internal_metadata.is_outlier() and outlier_persisted:
  650. # We received a copy of an event that we had already stored as
  651. # an outlier in the database. We now have some state at that
  652. # so we need to update the state_groups table with that state.
  653. # insert into event_to_state_groups.
  654. try:
  655. self._store_event_state_mappings_txn(txn, ((event, context),))
  656. except Exception:
  657. logger.exception("")
  658. raise
  659. metadata_json = encode_json(event.internal_metadata.get_dict())
  660. sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
  661. txn.execute(sql, (metadata_json, event.event_id))
  662. # Add an entry to the ex_outlier_stream table to replicate the
  663. # change in outlier status to our workers.
  664. stream_order = event.internal_metadata.stream_ordering
  665. state_group_id = context.state_group
  666. self.db.simple_insert_txn(
  667. txn,
  668. table="ex_outlier_stream",
  669. values={
  670. "event_stream_ordering": stream_order,
  671. "event_id": event.event_id,
  672. "state_group": state_group_id,
  673. },
  674. )
  675. sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
  676. txn.execute(sql, (False, event.event_id))
  677. # Update the event_backward_extremities table now that this
  678. # event isn't an outlier any more.
  679. self._update_backward_extremeties(txn, [event])
  680. return [ec for ec in events_and_contexts if ec[0] not in to_remove]
  681. @classmethod
  682. def _delete_existing_rows_txn(cls, txn, events_and_contexts):
  683. if not events_and_contexts:
  684. # nothing to do here
  685. return
  686. logger.info("Deleting existing")
  687. for table in (
  688. "events",
  689. "event_auth",
  690. "event_json",
  691. "event_edges",
  692. "event_forward_extremities",
  693. "event_reference_hashes",
  694. "event_search",
  695. "event_to_state_groups",
  696. "local_invites",
  697. "state_events",
  698. "rejections",
  699. "redactions",
  700. "room_memberships",
  701. ):
  702. txn.executemany(
  703. "DELETE FROM %s WHERE event_id = ?" % (table,),
  704. [(ev.event_id,) for ev, _ in events_and_contexts],
  705. )
  706. for table in ("event_push_actions",):
  707. txn.executemany(
  708. "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
  709. [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts],
  710. )
  711. def _store_event_txn(self, txn, events_and_contexts):
  712. """Insert new events into the event and event_json tables
  713. Args:
  714. txn (twisted.enterprise.adbapi.Connection): db connection
  715. events_and_contexts (list[(EventBase, EventContext)]): events
  716. we are persisting
  717. """
  718. if not events_and_contexts:
  719. # nothing to do here
  720. return
  721. def event_dict(event):
  722. d = event.get_dict()
  723. d.pop("redacted", None)
  724. d.pop("redacted_because", None)
  725. return d
  726. self.db.simple_insert_many_txn(
  727. txn,
  728. table="event_json",
  729. values=[
  730. {
  731. "event_id": event.event_id,
  732. "room_id": event.room_id,
  733. "internal_metadata": encode_json(
  734. event.internal_metadata.get_dict()
  735. ),
  736. "json": encode_json(event_dict(event)),
  737. "format_version": event.format_version,
  738. }
  739. for event, _ in events_and_contexts
  740. ],
  741. )
  742. self.db.simple_insert_many_txn(
  743. txn,
  744. table="events",
  745. values=[
  746. {
  747. "stream_ordering": event.internal_metadata.stream_ordering,
  748. "topological_ordering": event.depth,
  749. "depth": event.depth,
  750. "event_id": event.event_id,
  751. "room_id": event.room_id,
  752. "type": event.type,
  753. "processed": True,
  754. "outlier": event.internal_metadata.is_outlier(),
  755. "origin_server_ts": int(event.origin_server_ts),
  756. "received_ts": self._clock.time_msec(),
  757. "sender": event.sender,
  758. "contains_url": (
  759. "url" in event.content and isinstance(event.content["url"], str)
  760. ),
  761. }
  762. for event, _ in events_and_contexts
  763. ],
  764. )
  765. for event, _ in events_and_contexts:
  766. if not event.internal_metadata.is_redacted():
  767. # If we're persisting an unredacted event we go and ensure
  768. # that we mark any redactions that reference this event as
  769. # requiring censoring.
  770. self.db.simple_update_txn(
  771. txn,
  772. table="redactions",
  773. keyvalues={"redacts": event.event_id},
  774. updatevalues={"have_censored": False},
  775. )
  776. def _store_rejected_events_txn(self, txn, events_and_contexts):
  777. """Add rows to the 'rejections' table for received events which were
  778. rejected
  779. Args:
  780. txn (twisted.enterprise.adbapi.Connection): db connection
  781. events_and_contexts (list[(EventBase, EventContext)]): events
  782. we are persisting
  783. Returns:
  784. list[(EventBase, EventContext)] new list, without the rejected
  785. events.
  786. """
  787. # Remove the rejected events from the list now that we've added them
  788. # to the events table and the events_json table.
  789. to_remove = set()
  790. for event, context in events_and_contexts:
  791. if context.rejected:
  792. # Insert the event_id into the rejections table
  793. self._store_rejections_txn(txn, event.event_id, context.rejected)
  794. to_remove.add(event)
  795. return [ec for ec in events_and_contexts if ec[0] not in to_remove]
  796. def _update_metadata_tables_txn(
  797. self, txn, events_and_contexts, all_events_and_contexts, backfilled
  798. ):
  799. """Update all the miscellaneous tables for new events
  800. Args:
  801. txn (twisted.enterprise.adbapi.Connection): db connection
  802. events_and_contexts (list[(EventBase, EventContext)]): events
  803. we are persisting
  804. all_events_and_contexts (list[(EventBase, EventContext)]): all
  805. events that we were going to persist. This includes events
  806. we've already persisted, etc, that wouldn't appear in
  807. events_and_context.
  808. backfilled (bool): True if the events were backfilled
  809. """
  810. # Insert all the push actions into the event_push_actions table.
  811. self._set_push_actions_for_event_and_users_txn(
  812. txn,
  813. events_and_contexts=events_and_contexts,
  814. all_events_and_contexts=all_events_and_contexts,
  815. )
  816. if not events_and_contexts:
  817. # nothing to do here
  818. return
  819. for event, context in events_and_contexts:
  820. if event.type == EventTypes.Redaction and event.redacts is not None:
  821. # Remove the entries in the event_push_actions table for the
  822. # redacted event.
  823. self._remove_push_actions_for_event_id_txn(
  824. txn, event.room_id, event.redacts
  825. )
  826. # Remove from relations table.
  827. self._handle_redaction(txn, event.redacts)
  828. # Update the event_forward_extremities, event_backward_extremities and
  829. # event_edges tables.
  830. self._handle_mult_prev_events(
  831. txn, events=[event for event, _ in events_and_contexts]
  832. )
  833. for event, _ in events_and_contexts:
  834. if event.type == EventTypes.Name:
  835. # Insert into the event_search table.
  836. self._store_room_name_txn(txn, event)
  837. elif event.type == EventTypes.Topic:
  838. # Insert into the event_search table.
  839. self._store_room_topic_txn(txn, event)
  840. elif event.type == EventTypes.Message:
  841. # Insert into the event_search table.
  842. self._store_room_message_txn(txn, event)
  843. elif event.type == EventTypes.Redaction and event.redacts is not None:
  844. # Insert into the redactions table.
  845. self._store_redaction(txn, event)
  846. elif event.type == EventTypes.Retention:
  847. # Update the room_retention table.
  848. self._store_retention_policy_for_room_txn(txn, event)
  849. self._handle_event_relations(txn, event)
  850. # Store the labels for this event.
  851. labels = event.content.get(EventContentFields.LABELS)
  852. if labels:
  853. self.insert_labels_for_event_txn(
  854. txn, event.event_id, labels, event.room_id, event.depth
  855. )
  856. if self._ephemeral_messages_enabled:
  857. # If there's an expiry timestamp on the event, store it.
  858. expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
  859. if isinstance(expiry_ts, int) and not event.is_state():
  860. self._insert_event_expiry_txn(txn, event.event_id, expiry_ts)
  861. # Insert into the room_memberships table.
  862. self._store_room_members_txn(
  863. txn,
  864. [
  865. event
  866. for event, _ in events_and_contexts
  867. if event.type == EventTypes.Member
  868. ],
  869. backfilled=backfilled,
  870. )
  871. # Insert event_reference_hashes table.
  872. self._store_event_reference_hashes_txn(
  873. txn, [event for event, _ in events_and_contexts]
  874. )
  875. state_events_and_contexts = [
  876. ec for ec in events_and_contexts if ec[0].is_state()
  877. ]
  878. state_values = []
  879. for event, context in state_events_and_contexts:
  880. vals = {
  881. "event_id": event.event_id,
  882. "room_id": event.room_id,
  883. "type": event.type,
  884. "state_key": event.state_key,
  885. }
  886. # TODO: How does this work with backfilling?
  887. if hasattr(event, "replaces_state"):
  888. vals["prev_state"] = event.replaces_state
  889. state_values.append(vals)
  890. self.db.simple_insert_many_txn(txn, table="state_events", values=state_values)
  891. # Prefill the event cache
  892. self._add_to_cache(txn, events_and_contexts)
  893. def _add_to_cache(self, txn, events_and_contexts):
  894. to_prefill = []
  895. rows = []
  896. N = 200
  897. for i in range(0, len(events_and_contexts), N):
  898. ev_map = {e[0].event_id: e[0] for e in events_and_contexts[i : i + N]}
  899. if not ev_map:
  900. break
  901. sql = (
  902. "SELECT "
  903. " e.event_id as event_id, "
  904. " r.redacts as redacts,"
  905. " rej.event_id as rejects "
  906. " FROM events as e"
  907. " LEFT JOIN rejections as rej USING (event_id)"
  908. " LEFT JOIN redactions as r ON e.event_id = r.redacts"
  909. " WHERE "
  910. )
  911. clause, args = make_in_list_sql_clause(
  912. self.database_engine, "e.event_id", list(ev_map)
  913. )
  914. txn.execute(sql + clause, args)
  915. rows = self.db.cursor_to_dict(txn)
  916. for row in rows:
  917. event = ev_map[row["event_id"]]
  918. if not row["rejects"] and not row["redacts"]:
  919. to_prefill.append(
  920. _EventCacheEntry(event=event, redacted_event=None)
  921. )
  922. def prefill():
  923. for cache_entry in to_prefill:
  924. self.store._get_event_cache.prefill(
  925. (cache_entry[0].event_id,), cache_entry
  926. )
  927. txn.call_after(prefill)
  928. def _store_redaction(self, txn, event):
  929. # invalidate the cache for the redacted event
  930. txn.call_after(self.store._invalidate_get_event_cache, event.redacts)
  931. self.db.simple_insert_txn(
  932. txn,
  933. table="redactions",
  934. values={
  935. "event_id": event.event_id,
  936. "redacts": event.redacts,
  937. "received_ts": self._clock.time_msec(),
  938. },
  939. )
  940. def insert_labels_for_event_txn(
  941. self, txn, event_id, labels, room_id, topological_ordering
  942. ):
  943. """Store the mapping between an event's ID and its labels, with one row per
  944. (event_id, label) tuple.
  945. Args:
  946. txn (LoggingTransaction): The transaction to execute.
  947. event_id (str): The event's ID.
  948. labels (list[str]): A list of text labels.
  949. room_id (str): The ID of the room the event was sent to.
  950. topological_ordering (int): The position of the event in the room's topology.
  951. """
  952. return self.db.simple_insert_many_txn(
  953. txn=txn,
  954. table="event_labels",
  955. values=[
  956. {
  957. "event_id": event_id,
  958. "label": label,
  959. "room_id": room_id,
  960. "topological_ordering": topological_ordering,
  961. }
  962. for label in labels
  963. ],
  964. )
  965. def _insert_event_expiry_txn(self, txn, event_id, expiry_ts):
  966. """Save the expiry timestamp associated with a given event ID.
  967. Args:
  968. txn (LoggingTransaction): The database transaction to use.
  969. event_id (str): The event ID the expiry timestamp is associated with.
  970. expiry_ts (int): The timestamp at which to expire (delete) the event.
  971. """
  972. return self.db.simple_insert_txn(
  973. txn=txn,
  974. table="event_expiry",
  975. values={"event_id": event_id, "expiry_ts": expiry_ts},
  976. )
  977. def _store_event_reference_hashes_txn(self, txn, events):
  978. """Store a hash for a PDU
  979. Args:
  980. txn (cursor):
  981. events (list): list of Events.
  982. """
  983. vals = []
  984. for event in events:
  985. ref_alg, ref_hash_bytes = compute_event_reference_hash(event)
  986. vals.append(
  987. {
  988. "event_id": event.event_id,
  989. "algorithm": ref_alg,
  990. "hash": memoryview(ref_hash_bytes),
  991. }
  992. )
  993. self.db.simple_insert_many_txn(txn, table="event_reference_hashes", values=vals)
  994. def _store_room_members_txn(self, txn, events, backfilled):
  995. """Store a room member in the database.
  996. """
  997. self.db.simple_insert_many_txn(
  998. txn,
  999. table="room_memberships",
  1000. values=[
  1001. {
  1002. "event_id": event.event_id,
  1003. "user_id": event.state_key,
  1004. "sender": event.user_id,
  1005. "room_id": event.room_id,
  1006. "membership": event.membership,
  1007. "display_name": event.content.get("displayname", None),
  1008. "avatar_url": event.content.get("avatar_url", None),
  1009. }
  1010. for event in events
  1011. ],
  1012. )
  1013. for event in events:
  1014. txn.call_after(
  1015. self.store._membership_stream_cache.entity_has_changed,
  1016. event.state_key,
  1017. event.internal_metadata.stream_ordering,
  1018. )
  1019. txn.call_after(
  1020. self.store.get_invited_rooms_for_local_user.invalidate,
  1021. (event.state_key,),
  1022. )
  1023. # We update the local_invites table only if the event is "current",
  1024. # i.e., its something that has just happened. If the event is an
  1025. # outlier it is only current if its an "out of band membership",
  1026. # like a remote invite or a rejection of a remote invite.
  1027. is_new_state = not backfilled and (
  1028. not event.internal_metadata.is_outlier()
  1029. or event.internal_metadata.is_out_of_band_membership()
  1030. )
  1031. is_mine = self.is_mine_id(event.state_key)
  1032. if is_new_state and is_mine:
  1033. if event.membership == Membership.INVITE:
  1034. self.db.simple_insert_txn(
  1035. txn,
  1036. table="local_invites",
  1037. values={
  1038. "event_id": event.event_id,
  1039. "invitee": event.state_key,
  1040. "inviter": event.sender,
  1041. "room_id": event.room_id,
  1042. "stream_id": event.internal_metadata.stream_ordering,
  1043. },
  1044. )
  1045. else:
  1046. sql = (
  1047. "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE"
  1048. " room_id = ? AND invitee = ? AND locally_rejected is NULL"
  1049. " AND replaced_by is NULL"
  1050. )
  1051. txn.execute(
  1052. sql,
  1053. (
  1054. event.internal_metadata.stream_ordering,
  1055. event.event_id,
  1056. event.room_id,
  1057. event.state_key,
  1058. ),
  1059. )
  1060. # We also update the `local_current_membership` table with
  1061. # latest invite info. This will usually get updated by the
  1062. # `current_state_events` handling, unless its an outlier.
  1063. if event.internal_metadata.is_outlier():
  1064. # This should only happen for out of band memberships, so
  1065. # we add a paranoia check.
  1066. assert event.internal_metadata.is_out_of_band_membership()
  1067. self.db.simple_upsert_txn(
  1068. txn,
  1069. table="local_current_membership",
  1070. keyvalues={
  1071. "room_id": event.room_id,
  1072. "user_id": event.state_key,
  1073. },
  1074. values={
  1075. "event_id": event.event_id,
  1076. "membership": event.membership,
  1077. },
  1078. )
  1079. def _handle_event_relations(self, txn, event):
  1080. """Handles inserting relation data during peristence of events
  1081. Args:
  1082. txn
  1083. event (EventBase)
  1084. """
  1085. relation = event.content.get("m.relates_to")
  1086. if not relation:
  1087. # No relations
  1088. return
  1089. rel_type = relation.get("rel_type")
  1090. if rel_type not in (
  1091. RelationTypes.ANNOTATION,
  1092. RelationTypes.REFERENCE,
  1093. RelationTypes.REPLACE,
  1094. ):
  1095. # Unknown relation type
  1096. return
  1097. parent_id = relation.get("event_id")
  1098. if not parent_id:
  1099. # Invalid relation
  1100. return
  1101. aggregation_key = relation.get("key")
  1102. self.db.simple_insert_txn(
  1103. txn,
  1104. table="event_relations",
  1105. values={
  1106. "event_id": event.event_id,
  1107. "relates_to_id": parent_id,
  1108. "relation_type": rel_type,
  1109. "aggregation_key": aggregation_key,
  1110. },
  1111. )
  1112. txn.call_after(self.store.get_relations_for_event.invalidate_many, (parent_id,))
  1113. txn.call_after(
  1114. self.store.get_aggregation_groups_for_event.invalidate_many, (parent_id,)
  1115. )
  1116. if rel_type == RelationTypes.REPLACE:
  1117. txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
  1118. def _handle_redaction(self, txn, redacted_event_id):
  1119. """Handles receiving a redaction and checking whether we need to remove
  1120. any redacted relations from the database.
  1121. Args:
  1122. txn
  1123. redacted_event_id (str): The event that was redacted.
  1124. """
  1125. self.db.simple_delete_txn(
  1126. txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
  1127. )
  1128. def _store_room_topic_txn(self, txn, event):
  1129. if hasattr(event, "content") and "topic" in event.content:
  1130. self.store_event_search_txn(
  1131. txn, event, "content.topic", event.content["topic"]
  1132. )
  1133. def _store_room_name_txn(self, txn, event):
  1134. if hasattr(event, "content") and "name" in event.content:
  1135. self.store_event_search_txn(
  1136. txn, event, "content.name", event.content["name"]
  1137. )
  1138. def _store_room_message_txn(self, txn, event):
  1139. if hasattr(event, "content") and "body" in event.content:
  1140. self.store_event_search_txn(
  1141. txn, event, "content.body", event.content["body"]
  1142. )
  1143. def _store_retention_policy_for_room_txn(self, txn, event):
  1144. if hasattr(event, "content") and (
  1145. "min_lifetime" in event.content or "max_lifetime" in event.content
  1146. ):
  1147. if (
  1148. "min_lifetime" in event.content
  1149. and not isinstance(event.content.get("min_lifetime"), int)
  1150. ) or (
  1151. "max_lifetime" in event.content
  1152. and not isinstance(event.content.get("max_lifetime"), int)
  1153. ):
  1154. # Ignore the event if one of the value isn't an integer.
  1155. return
  1156. self.db.simple_insert_txn(
  1157. txn=txn,
  1158. table="room_retention",
  1159. values={
  1160. "room_id": event.room_id,
  1161. "event_id": event.event_id,
  1162. "min_lifetime": event.content.get("min_lifetime"),
  1163. "max_lifetime": event.content.get("max_lifetime"),
  1164. },
  1165. )
  1166. self.store._invalidate_cache_and_stream(
  1167. txn, self.store.get_retention_policy_for_room, (event.room_id,)
  1168. )
  1169. def store_event_search_txn(self, txn, event, key, value):
  1170. """Add event to the search table
  1171. Args:
  1172. txn (cursor):
  1173. event (EventBase):
  1174. key (str):
  1175. value (str):
  1176. """
  1177. self.store.store_search_entries_txn(
  1178. txn,
  1179. (
  1180. SearchEntry(
  1181. key=key,
  1182. value=value,
  1183. event_id=event.event_id,
  1184. room_id=event.room_id,
  1185. stream_ordering=event.internal_metadata.stream_ordering,
  1186. origin_server_ts=event.origin_server_ts,
  1187. ),
  1188. ),
  1189. )
  1190. def _set_push_actions_for_event_and_users_txn(
  1191. self, txn, events_and_contexts, all_events_and_contexts
  1192. ):
  1193. """Handles moving push actions from staging table to main
  1194. event_push_actions table for all events in `events_and_contexts`.
  1195. Also ensures that all events in `all_events_and_contexts` are removed
  1196. from the push action staging area.
  1197. Args:
  1198. events_and_contexts (list[(EventBase, EventContext)]): events
  1199. we are persisting
  1200. all_events_and_contexts (list[(EventBase, EventContext)]): all
  1201. events that we were going to persist. This includes events
  1202. we've already persisted, etc, that wouldn't appear in
  1203. events_and_context.
  1204. """
  1205. sql = """
  1206. INSERT INTO event_push_actions (
  1207. room_id, event_id, user_id, actions, stream_ordering,
  1208. topological_ordering, notif, highlight
  1209. )
  1210. SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
  1211. FROM event_push_actions_staging
  1212. WHERE event_id = ?
  1213. """
  1214. if events_and_contexts:
  1215. txn.executemany(
  1216. sql,
  1217. (
  1218. (
  1219. event.room_id,
  1220. event.internal_metadata.stream_ordering,
  1221. event.depth,
  1222. event.event_id,
  1223. )
  1224. for event, _ in events_and_contexts
  1225. ),
  1226. )
  1227. for event, _ in events_and_contexts:
  1228. user_ids = self.db.simple_select_onecol_txn(
  1229. txn,
  1230. table="event_push_actions_staging",
  1231. keyvalues={"event_id": event.event_id},
  1232. retcol="user_id",
  1233. )
  1234. for uid in user_ids:
  1235. txn.call_after(
  1236. self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
  1237. (event.room_id, uid),
  1238. )
  1239. # Now we delete the staging area for *all* events that were being
  1240. # persisted.
  1241. txn.executemany(
  1242. "DELETE FROM event_push_actions_staging WHERE event_id = ?",
  1243. ((event.event_id,) for event, _ in all_events_and_contexts),
  1244. )
  1245. def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
  1246. # Sad that we have to blow away the cache for the whole room here
  1247. txn.call_after(
  1248. self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
  1249. (room_id,),
  1250. )
  1251. txn.execute(
  1252. "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
  1253. (room_id, event_id),
  1254. )
  1255. def _store_rejections_txn(self, txn, event_id, reason):
  1256. self.db.simple_insert_txn(
  1257. txn,
  1258. table="rejections",
  1259. values={
  1260. "event_id": event_id,
  1261. "reason": reason,
  1262. "last_check": self._clock.time_msec(),
  1263. },
  1264. )
  1265. def _store_event_state_mappings_txn(
  1266. self, txn, events_and_contexts: Iterable[Tuple[EventBase, EventContext]]
  1267. ):
  1268. state_groups = {}
  1269. for event, context in events_and_contexts:
  1270. if event.internal_metadata.is_outlier():
  1271. continue
  1272. # if the event was rejected, just give it the same state as its
  1273. # predecessor.
  1274. if context.rejected:
  1275. state_groups[event.event_id] = context.state_group_before_event
  1276. continue
  1277. state_groups[event.event_id] = context.state_group
  1278. self.db.simple_insert_many_txn(
  1279. txn,
  1280. table="event_to_state_groups",
  1281. values=[
  1282. {"state_group": state_group_id, "event_id": event_id}
  1283. for event_id, state_group_id in state_groups.items()
  1284. ],
  1285. )
  1286. for event_id, state_group_id in state_groups.items():
  1287. txn.call_after(
  1288. self.store._get_state_group_for_event.prefill,
  1289. (event_id,),
  1290. state_group_id,
  1291. )
  1292. def _update_min_depth_for_room_txn(self, txn, room_id, depth):
  1293. min_depth = self.store._get_min_depth_interaction(txn, room_id)
  1294. if min_depth is not None and depth >= min_depth:
  1295. return
  1296. self.db.simple_upsert_txn(
  1297. txn,
  1298. table="room_depth",
  1299. keyvalues={"room_id": room_id},
  1300. values={"min_depth": depth},
  1301. )
  1302. def _handle_mult_prev_events(self, txn, events):
  1303. """
  1304. For the given event, update the event edges table and forward and
  1305. backward extremities tables.
  1306. """
  1307. self.db.simple_insert_many_txn(
  1308. txn,
  1309. table="event_edges",
  1310. values=[
  1311. {
  1312. "event_id": ev.event_id,
  1313. "prev_event_id": e_id,
  1314. "room_id": ev.room_id,
  1315. "is_state": False,
  1316. }
  1317. for ev in events
  1318. for e_id in ev.prev_event_ids()
  1319. ],
  1320. )
  1321. self._update_backward_extremeties(txn, events)
  1322. def _update_backward_extremeties(self, txn, events):
  1323. """Updates the event_backward_extremities tables based on the new/updated
  1324. events being persisted.
  1325. This is called for new events *and* for events that were outliers, but
  1326. are now being persisted as non-outliers.
  1327. Forward extremities are handled when we first start persisting the events.
  1328. """
  1329. events_by_room = {}
  1330. for ev in events:
  1331. events_by_room.setdefault(ev.room_id, []).append(ev)
  1332. query = (
  1333. "INSERT INTO event_backward_extremities (event_id, room_id)"
  1334. " SELECT ?, ? WHERE NOT EXISTS ("
  1335. " SELECT 1 FROM event_backward_extremities"
  1336. " WHERE event_id = ? AND room_id = ?"
  1337. " )"
  1338. " AND NOT EXISTS ("
  1339. " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
  1340. " AND outlier = ?"
  1341. " )"
  1342. )
  1343. txn.executemany(
  1344. query,
  1345. [
  1346. (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
  1347. for ev in events
  1348. for e_id in ev.prev_event_ids()
  1349. if not ev.internal_metadata.is_outlier()
  1350. ],
  1351. )
  1352. query = (
  1353. "DELETE FROM event_backward_extremities"
  1354. " WHERE event_id = ? AND room_id = ?"
  1355. )
  1356. txn.executemany(
  1357. query,
  1358. [
  1359. (ev.event_id, ev.room_id)
  1360. for ev in events
  1361. if not ev.internal_metadata.is_outlier()
  1362. ],
  1363. )
  1364. async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
  1365. """Mark the invite has having been rejected even though we failed to
  1366. create a leave event for it.
  1367. """
  1368. sql = (
  1369. "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
  1370. " room_id = ? AND invitee = ? AND locally_rejected is NULL"
  1371. " AND replaced_by is NULL"
  1372. )
  1373. def f(txn, stream_ordering):
  1374. txn.execute(sql, (stream_ordering, True, room_id, user_id))
  1375. # We also clear this entry from `local_current_membership`.
  1376. # Ideally we'd point to a leave event, but we don't have one, so
  1377. # nevermind.
  1378. self.db.simple_delete_txn(
  1379. txn,
  1380. table="local_current_membership",
  1381. keyvalues={"room_id": room_id, "user_id": user_id},
  1382. )
  1383. with self._stream_id_gen.get_next() as stream_ordering:
  1384. await self.db.runInteraction("locally_reject_invite", f, stream_ordering)
  1385. return stream_ordering