events.py 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from ._base import SQLBaseStore, _RollbackButIsFineException
  16. from twisted.internet import defer, reactor
  17. from synapse.events import FrozenEvent, USE_FROZEN_DICTS
  18. from synapse.events.utils import prune_event
  19. from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
  20. from synapse.util.logutils import log_function
  21. from synapse.api.constants import EventTypes
  22. from canonicaljson import encode_canonical_json
  23. from contextlib import contextmanager
  24. import logging
  25. import math
  26. import ujson as json
  27. logger = logging.getLogger(__name__)
  28. def encode_json(json_object):
  29. if USE_FROZEN_DICTS:
  30. # ujson doesn't like frozen_dicts
  31. return encode_canonical_json(json_object)
  32. else:
  33. return json.dumps(json_object, ensure_ascii=False)
  34. # These values are used in the `enqueus_event` and `_do_fetch` methods to
  35. # control how we batch/bulk fetch events from the database.
  36. # The values are plucked out of thing air to make initial sync run faster
  37. # on jki.re
  38. # TODO: Make these configurable.
  39. EVENT_QUEUE_THREADS = 3 # Max number of threads that will fetch events
  40. EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events
  41. EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
  42. class EventsStore(SQLBaseStore):
  43. EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
  44. def __init__(self, hs):
  45. super(EventsStore, self).__init__(hs)
  46. self.register_background_update_handler(
  47. self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
  48. )
  49. @defer.inlineCallbacks
  50. def persist_events(self, events_and_contexts, backfilled=False,
  51. is_new_state=True):
  52. if not events_and_contexts:
  53. return
  54. if backfilled:
  55. start = self.min_stream_token - 1
  56. self.min_stream_token -= len(events_and_contexts) + 1
  57. stream_orderings = range(start, self.min_stream_token, -1)
  58. @contextmanager
  59. def stream_ordering_manager():
  60. yield stream_orderings
  61. stream_ordering_manager = stream_ordering_manager()
  62. else:
  63. stream_ordering_manager = self._stream_id_gen.get_next_mult(
  64. len(events_and_contexts)
  65. )
  66. with stream_ordering_manager as stream_orderings:
  67. for (event, _), stream in zip(events_and_contexts, stream_orderings):
  68. event.internal_metadata.stream_ordering = stream
  69. chunks = [
  70. events_and_contexts[x:x + 100]
  71. for x in xrange(0, len(events_and_contexts), 100)
  72. ]
  73. for chunk in chunks:
  74. # We can't easily parallelize these since different chunks
  75. # might contain the same event. :(
  76. yield self.runInteraction(
  77. "persist_events",
  78. self._persist_events_txn,
  79. events_and_contexts=chunk,
  80. backfilled=backfilled,
  81. is_new_state=is_new_state,
  82. )
  83. @defer.inlineCallbacks
  84. @log_function
  85. def persist_event(self, event, context,
  86. is_new_state=True, current_state=None):
  87. try:
  88. with self._stream_id_gen.get_next() as stream_ordering:
  89. event.internal_metadata.stream_ordering = stream_ordering
  90. yield self.runInteraction(
  91. "persist_event",
  92. self._persist_event_txn,
  93. event=event,
  94. context=context,
  95. is_new_state=is_new_state,
  96. current_state=current_state,
  97. )
  98. except _RollbackButIsFineException:
  99. pass
  100. max_persisted_id = yield self._stream_id_gen.get_max_token()
  101. defer.returnValue((stream_ordering, max_persisted_id))
  102. @defer.inlineCallbacks
  103. def get_event(self, event_id, check_redacted=True,
  104. get_prev_content=False, allow_rejected=False,
  105. allow_none=False):
  106. """Get an event from the database by event_id.
  107. Args:
  108. event_id (str): The event_id of the event to fetch
  109. check_redacted (bool): If True, check if event has been redacted
  110. and redact it.
  111. get_prev_content (bool): If True and event is a state event,
  112. include the previous states content in the unsigned field.
  113. allow_rejected (bool): If True return rejected events.
  114. allow_none (bool): If True, return None if no event found, if
  115. False throw an exception.
  116. Returns:
  117. Deferred : A FrozenEvent.
  118. """
  119. events = yield self._get_events(
  120. [event_id],
  121. check_redacted=check_redacted,
  122. get_prev_content=get_prev_content,
  123. allow_rejected=allow_rejected,
  124. )
  125. if not events and not allow_none:
  126. raise RuntimeError("Could not find event %s" % (event_id,))
  127. defer.returnValue(events[0] if events else None)
  128. @defer.inlineCallbacks
  129. def get_events(self, event_ids, check_redacted=True,
  130. get_prev_content=False, allow_rejected=False):
  131. """Get events from the database
  132. Args:
  133. event_ids (list): The event_ids of the events to fetch
  134. check_redacted (bool): If True, check if event has been redacted
  135. and redact it.
  136. get_prev_content (bool): If True and event is a state event,
  137. include the previous states content in the unsigned field.
  138. allow_rejected (bool): If True return rejected events.
  139. Returns:
  140. Deferred : Dict from event_id to event.
  141. """
  142. events = yield self._get_events(
  143. event_ids,
  144. check_redacted=check_redacted,
  145. get_prev_content=get_prev_content,
  146. allow_rejected=allow_rejected,
  147. )
  148. defer.returnValue({e.event_id: e for e in events})
  149. @log_function
  150. def _persist_event_txn(self, txn, event, context,
  151. is_new_state=True, current_state=None):
  152. # We purposefully do this first since if we include a `current_state`
  153. # key, we *want* to update the `current_state_events` table
  154. if current_state:
  155. txn.call_after(self._get_current_state_for_key.invalidate_all)
  156. txn.call_after(self.get_rooms_for_user.invalidate_all)
  157. txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
  158. txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
  159. txn.call_after(self.get_room_name_and_aliases, event.room_id)
  160. self._simple_delete_txn(
  161. txn,
  162. table="current_state_events",
  163. keyvalues={"room_id": event.room_id},
  164. )
  165. for s in current_state:
  166. self._simple_insert_txn(
  167. txn,
  168. "current_state_events",
  169. {
  170. "event_id": s.event_id,
  171. "room_id": s.room_id,
  172. "type": s.type,
  173. "state_key": s.state_key,
  174. }
  175. )
  176. return self._persist_events_txn(
  177. txn,
  178. [(event, context)],
  179. backfilled=False,
  180. is_new_state=is_new_state,
  181. )
  182. @log_function
  183. def _persist_events_txn(self, txn, events_and_contexts, backfilled,
  184. is_new_state=True):
  185. depth_updates = {}
  186. for event, context in events_and_contexts:
  187. # Remove the any existing cache entries for the event_ids
  188. txn.call_after(self._invalidate_get_event_cache, event.event_id)
  189. if not backfilled:
  190. txn.call_after(
  191. self._events_stream_cache.entity_has_changed,
  192. event.room_id, event.internal_metadata.stream_ordering,
  193. )
  194. if not event.internal_metadata.is_outlier():
  195. depth_updates[event.room_id] = max(
  196. event.depth, depth_updates.get(event.room_id, event.depth)
  197. )
  198. if context.push_actions:
  199. self._set_push_actions_for_event_and_users_txn(
  200. txn, event, context.push_actions
  201. )
  202. if event.type == EventTypes.Redaction and event.redacts is not None:
  203. self._remove_push_actions_for_event_id_txn(
  204. txn, event.room_id, event.redacts
  205. )
  206. for room_id, depth in depth_updates.items():
  207. self._update_min_depth_for_room_txn(txn, room_id, depth)
  208. txn.execute(
  209. "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % (
  210. ",".join(["?"] * len(events_and_contexts)),
  211. ),
  212. [event.event_id for event, _ in events_and_contexts]
  213. )
  214. have_persisted = {
  215. event_id: outlier
  216. for event_id, outlier in txn.fetchall()
  217. }
  218. event_map = {}
  219. to_remove = set()
  220. for event, context in events_and_contexts:
  221. # Handle the case of the list including the same event multiple
  222. # times. The tricky thing here is when they differ by whether
  223. # they are an outlier.
  224. if event.event_id in event_map:
  225. other = event_map[event.event_id]
  226. if not other.internal_metadata.is_outlier():
  227. to_remove.add(event)
  228. continue
  229. elif not event.internal_metadata.is_outlier():
  230. to_remove.add(event)
  231. continue
  232. else:
  233. to_remove.add(other)
  234. event_map[event.event_id] = event
  235. if event.event_id not in have_persisted:
  236. continue
  237. to_remove.add(event)
  238. outlier_persisted = have_persisted[event.event_id]
  239. if not event.internal_metadata.is_outlier() and outlier_persisted:
  240. self._store_state_groups_txn(
  241. txn, event, context,
  242. )
  243. metadata_json = encode_json(
  244. event.internal_metadata.get_dict()
  245. ).decode("UTF-8")
  246. sql = (
  247. "UPDATE event_json SET internal_metadata = ?"
  248. " WHERE event_id = ?"
  249. )
  250. txn.execute(
  251. sql,
  252. (metadata_json, event.event_id,)
  253. )
  254. sql = (
  255. "UPDATE events SET outlier = ?"
  256. " WHERE event_id = ?"
  257. )
  258. txn.execute(
  259. sql,
  260. (False, event.event_id,)
  261. )
  262. self._update_extremeties(txn, [event])
  263. events_and_contexts = filter(
  264. lambda ec: ec[0] not in to_remove,
  265. events_and_contexts
  266. )
  267. if not events_and_contexts:
  268. return
  269. self._store_mult_state_groups_txn(txn, [
  270. (event, context)
  271. for event, context in events_and_contexts
  272. if not event.internal_metadata.is_outlier()
  273. ])
  274. self._handle_mult_prev_events(
  275. txn,
  276. events=[event for event, _ in events_and_contexts],
  277. )
  278. for event, _ in events_and_contexts:
  279. if event.type == EventTypes.Name:
  280. self._store_room_name_txn(txn, event)
  281. elif event.type == EventTypes.Topic:
  282. self._store_room_topic_txn(txn, event)
  283. elif event.type == EventTypes.Message:
  284. self._store_room_message_txn(txn, event)
  285. elif event.type == EventTypes.Redaction:
  286. self._store_redaction(txn, event)
  287. elif event.type == EventTypes.RoomHistoryVisibility:
  288. self._store_history_visibility_txn(txn, event)
  289. elif event.type == EventTypes.GuestAccess:
  290. self._store_guest_access_txn(txn, event)
  291. self._store_room_members_txn(
  292. txn,
  293. [
  294. event
  295. for event, _ in events_and_contexts
  296. if event.type == EventTypes.Member
  297. ]
  298. )
  299. def event_dict(event):
  300. return {
  301. k: v
  302. for k, v in event.get_dict().items()
  303. if k not in [
  304. "redacted",
  305. "redacted_because",
  306. ]
  307. }
  308. self._simple_insert_many_txn(
  309. txn,
  310. table="event_json",
  311. values=[
  312. {
  313. "event_id": event.event_id,
  314. "room_id": event.room_id,
  315. "internal_metadata": encode_json(
  316. event.internal_metadata.get_dict()
  317. ).decode("UTF-8"),
  318. "json": encode_json(event_dict(event)).decode("UTF-8"),
  319. }
  320. for event, _ in events_and_contexts
  321. ],
  322. )
  323. self._simple_insert_many_txn(
  324. txn,
  325. table="events",
  326. values=[
  327. {
  328. "stream_ordering": event.internal_metadata.stream_ordering,
  329. "topological_ordering": event.depth,
  330. "depth": event.depth,
  331. "event_id": event.event_id,
  332. "room_id": event.room_id,
  333. "type": event.type,
  334. "processed": True,
  335. "outlier": event.internal_metadata.is_outlier(),
  336. "content": encode_json(event.content).decode("UTF-8"),
  337. "origin_server_ts": int(event.origin_server_ts),
  338. }
  339. for event, _ in events_and_contexts
  340. ],
  341. )
  342. if context.rejected:
  343. self._store_rejections_txn(
  344. txn, event.event_id, context.rejected
  345. )
  346. self._simple_insert_many_txn(
  347. txn,
  348. table="event_auth",
  349. values=[
  350. {
  351. "event_id": event.event_id,
  352. "room_id": event.room_id,
  353. "auth_id": auth_id,
  354. }
  355. for event, _ in events_and_contexts
  356. for auth_id, _ in event.auth_events
  357. ],
  358. )
  359. self._store_event_reference_hashes_txn(
  360. txn, [event for event, _ in events_and_contexts]
  361. )
  362. state_events_and_contexts = filter(
  363. lambda i: i[0].is_state(),
  364. events_and_contexts,
  365. )
  366. state_values = []
  367. for event, context in state_events_and_contexts:
  368. vals = {
  369. "event_id": event.event_id,
  370. "room_id": event.room_id,
  371. "type": event.type,
  372. "state_key": event.state_key,
  373. }
  374. # TODO: How does this work with backfilling?
  375. if hasattr(event, "replaces_state"):
  376. vals["prev_state"] = event.replaces_state
  377. state_values.append(vals)
  378. self._simple_insert_many_txn(
  379. txn,
  380. table="state_events",
  381. values=state_values,
  382. )
  383. self._simple_insert_many_txn(
  384. txn,
  385. table="event_edges",
  386. values=[
  387. {
  388. "event_id": event.event_id,
  389. "prev_event_id": prev_id,
  390. "room_id": event.room_id,
  391. "is_state": True,
  392. }
  393. for event, _ in state_events_and_contexts
  394. for prev_id, _ in event.prev_state
  395. ],
  396. )
  397. if is_new_state:
  398. for event, _ in state_events_and_contexts:
  399. if not context.rejected:
  400. txn.call_after(
  401. self._get_current_state_for_key.invalidate,
  402. (event.room_id, event.type, event.state_key,)
  403. )
  404. if event.type in [EventTypes.Name, EventTypes.Aliases]:
  405. txn.call_after(
  406. self.get_room_name_and_aliases.invalidate,
  407. (event.room_id,)
  408. )
  409. self._simple_upsert_txn(
  410. txn,
  411. "current_state_events",
  412. keyvalues={
  413. "room_id": event.room_id,
  414. "type": event.type,
  415. "state_key": event.state_key,
  416. },
  417. values={
  418. "event_id": event.event_id,
  419. }
  420. )
  421. return
  422. def _store_redaction(self, txn, event):
  423. # invalidate the cache for the redacted event
  424. txn.call_after(self._invalidate_get_event_cache, event.redacts)
  425. txn.execute(
  426. "INSERT INTO redactions (event_id, redacts) VALUES (?,?)",
  427. (event.event_id, event.redacts)
  428. )
  429. def have_events(self, event_ids):
  430. """Given a list of event ids, check if we have already processed them.
  431. Returns:
  432. dict: Has an entry for each event id we already have seen. Maps to
  433. the rejected reason string if we rejected the event, else maps to
  434. None.
  435. """
  436. if not event_ids:
  437. return defer.succeed({})
  438. def f(txn):
  439. sql = (
  440. "SELECT e.event_id, reason FROM events as e "
  441. "LEFT JOIN rejections as r ON e.event_id = r.event_id "
  442. "WHERE e.event_id = ?"
  443. )
  444. res = {}
  445. for event_id in event_ids:
  446. txn.execute(sql, (event_id,))
  447. row = txn.fetchone()
  448. if row:
  449. _, rejected = row
  450. res[event_id] = rejected
  451. return res
  452. return self.runInteraction(
  453. "have_events", f,
  454. )
  455. @defer.inlineCallbacks
  456. def _get_events(self, event_ids, check_redacted=True,
  457. get_prev_content=False, allow_rejected=False):
  458. if not event_ids:
  459. defer.returnValue([])
  460. event_id_list = event_ids
  461. event_ids = set(event_ids)
  462. event_map = self._get_events_from_cache(
  463. event_ids,
  464. check_redacted=check_redacted,
  465. get_prev_content=get_prev_content,
  466. allow_rejected=allow_rejected,
  467. )
  468. missing_events_ids = [e for e in event_ids if e not in event_map]
  469. if missing_events_ids:
  470. missing_events = yield self._enqueue_events(
  471. missing_events_ids,
  472. check_redacted=check_redacted,
  473. get_prev_content=get_prev_content,
  474. allow_rejected=allow_rejected,
  475. )
  476. event_map.update(missing_events)
  477. defer.returnValue([
  478. event_map[e_id] for e_id in event_id_list
  479. if e_id in event_map and event_map[e_id]
  480. ])
  481. def _get_events_txn(self, txn, event_ids, check_redacted=True,
  482. get_prev_content=False, allow_rejected=False):
  483. if not event_ids:
  484. return []
  485. event_map = self._get_events_from_cache(
  486. event_ids,
  487. check_redacted=check_redacted,
  488. get_prev_content=get_prev_content,
  489. allow_rejected=allow_rejected,
  490. )
  491. missing_events_ids = [e for e in event_ids if e not in event_map]
  492. if not missing_events_ids:
  493. return [
  494. event_map[e_id] for e_id in event_ids
  495. if e_id in event_map and event_map[e_id]
  496. ]
  497. missing_events = self._fetch_events_txn(
  498. txn,
  499. missing_events_ids,
  500. check_redacted=check_redacted,
  501. get_prev_content=get_prev_content,
  502. allow_rejected=allow_rejected,
  503. )
  504. event_map.update(missing_events)
  505. return [
  506. event_map[e_id] for e_id in event_ids
  507. if e_id in event_map and event_map[e_id]
  508. ]
  509. def _invalidate_get_event_cache(self, event_id):
  510. for check_redacted in (False, True):
  511. for get_prev_content in (False, True):
  512. self._get_event_cache.invalidate(
  513. (event_id, check_redacted, get_prev_content)
  514. )
  515. def _get_event_txn(self, txn, event_id, check_redacted=True,
  516. get_prev_content=False, allow_rejected=False):
  517. events = self._get_events_txn(
  518. txn, [event_id],
  519. check_redacted=check_redacted,
  520. get_prev_content=get_prev_content,
  521. allow_rejected=allow_rejected,
  522. )
  523. return events[0] if events else None
  524. def _get_events_from_cache(self, events, check_redacted, get_prev_content,
  525. allow_rejected):
  526. event_map = {}
  527. for event_id in events:
  528. try:
  529. ret = self._get_event_cache.get(
  530. (event_id, check_redacted, get_prev_content,)
  531. )
  532. if allow_rejected or not ret.rejected_reason:
  533. event_map[event_id] = ret
  534. else:
  535. event_map[event_id] = None
  536. except KeyError:
  537. pass
  538. return event_map
  539. def _do_fetch(self, conn):
  540. """Takes a database connection and waits for requests for events from
  541. the _event_fetch_list queue.
  542. """
  543. event_list = []
  544. i = 0
  545. while True:
  546. try:
  547. with self._event_fetch_lock:
  548. event_list = self._event_fetch_list
  549. self._event_fetch_list = []
  550. if not event_list:
  551. single_threaded = self.database_engine.single_threaded
  552. if single_threaded or i > EVENT_QUEUE_ITERATIONS:
  553. self._event_fetch_ongoing -= 1
  554. return
  555. else:
  556. self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
  557. i += 1
  558. continue
  559. i = 0
  560. event_id_lists = zip(*event_list)[0]
  561. event_ids = [
  562. item for sublist in event_id_lists for item in sublist
  563. ]
  564. rows = self._new_transaction(
  565. conn, "do_fetch", [], None, self._fetch_event_rows, event_ids
  566. )
  567. row_dict = {
  568. r["event_id"]: r
  569. for r in rows
  570. }
  571. # We only want to resolve deferreds from the main thread
  572. def fire(lst, res):
  573. for ids, d in lst:
  574. if not d.called:
  575. try:
  576. with PreserveLoggingContext():
  577. d.callback([
  578. res[i]
  579. for i in ids
  580. if i in res
  581. ])
  582. except:
  583. logger.exception("Failed to callback")
  584. with PreserveLoggingContext():
  585. reactor.callFromThread(fire, event_list, row_dict)
  586. except Exception as e:
  587. logger.exception("do_fetch")
  588. # We only want to resolve deferreds from the main thread
  589. def fire(evs):
  590. for _, d in evs:
  591. if not d.called:
  592. with PreserveLoggingContext():
  593. d.errback(e)
  594. if event_list:
  595. with PreserveLoggingContext():
  596. reactor.callFromThread(fire, event_list)
  597. @defer.inlineCallbacks
  598. def _enqueue_events(self, events, check_redacted=True,
  599. get_prev_content=False, allow_rejected=False):
  600. """Fetches events from the database using the _event_fetch_list. This
  601. allows batch and bulk fetching of events - it allows us to fetch events
  602. without having to create a new transaction for each request for events.
  603. """
  604. if not events:
  605. defer.returnValue({})
  606. events_d = defer.Deferred()
  607. with self._event_fetch_lock:
  608. self._event_fetch_list.append(
  609. (events, events_d)
  610. )
  611. self._event_fetch_lock.notify()
  612. if self._event_fetch_ongoing < EVENT_QUEUE_THREADS:
  613. self._event_fetch_ongoing += 1
  614. should_start = True
  615. else:
  616. should_start = False
  617. if should_start:
  618. with PreserveLoggingContext():
  619. self.runWithConnection(
  620. self._do_fetch
  621. )
  622. with PreserveLoggingContext():
  623. rows = yield events_d
  624. if not allow_rejected:
  625. rows[:] = [r for r in rows if not r["rejects"]]
  626. res = yield defer.gatherResults(
  627. [
  628. preserve_fn(self._get_event_from_row)(
  629. row["internal_metadata"], row["json"], row["redacts"],
  630. check_redacted=check_redacted,
  631. get_prev_content=get_prev_content,
  632. rejected_reason=row["rejects"],
  633. )
  634. for row in rows
  635. ],
  636. consumeErrors=True
  637. )
  638. defer.returnValue({
  639. e.event_id: e
  640. for e in res if e
  641. })
  642. def _fetch_event_rows(self, txn, events):
  643. rows = []
  644. N = 200
  645. for i in range(1 + len(events) / N):
  646. evs = events[i * N:(i + 1) * N]
  647. if not evs:
  648. break
  649. sql = (
  650. "SELECT "
  651. " e.event_id as event_id, "
  652. " e.internal_metadata,"
  653. " e.json,"
  654. " r.redacts as redacts,"
  655. " rej.event_id as rejects "
  656. " FROM event_json as e"
  657. " LEFT JOIN rejections as rej USING (event_id)"
  658. " LEFT JOIN redactions as r ON e.event_id = r.redacts"
  659. " WHERE e.event_id IN (%s)"
  660. ) % (",".join(["?"] * len(evs)),)
  661. txn.execute(sql, evs)
  662. rows.extend(self.cursor_to_dict(txn))
  663. return rows
  664. def _fetch_events_txn(self, txn, events, check_redacted=True,
  665. get_prev_content=False, allow_rejected=False):
  666. if not events:
  667. return {}
  668. rows = self._fetch_event_rows(
  669. txn, events,
  670. )
  671. if not allow_rejected:
  672. rows[:] = [r for r in rows if not r["rejects"]]
  673. res = [
  674. self._get_event_from_row_txn(
  675. txn,
  676. row["internal_metadata"], row["json"], row["redacts"],
  677. check_redacted=check_redacted,
  678. get_prev_content=get_prev_content,
  679. rejected_reason=row["rejects"],
  680. )
  681. for row in rows
  682. ]
  683. return {
  684. r.event_id: r
  685. for r in res
  686. }
  687. @defer.inlineCallbacks
  688. def _get_event_from_row(self, internal_metadata, js, redacted,
  689. check_redacted=True, get_prev_content=False,
  690. rejected_reason=None):
  691. d = json.loads(js)
  692. internal_metadata = json.loads(internal_metadata)
  693. if rejected_reason:
  694. rejected_reason = yield self._simple_select_one_onecol(
  695. table="rejections",
  696. keyvalues={"event_id": rejected_reason},
  697. retcol="reason",
  698. desc="_get_event_from_row",
  699. )
  700. ev = FrozenEvent(
  701. d,
  702. internal_metadata_dict=internal_metadata,
  703. rejected_reason=rejected_reason,
  704. )
  705. if check_redacted and redacted:
  706. ev = prune_event(ev)
  707. redaction_id = yield self._simple_select_one_onecol(
  708. table="redactions",
  709. keyvalues={"redacts": ev.event_id},
  710. retcol="event_id",
  711. desc="_get_event_from_row",
  712. )
  713. ev.unsigned["redacted_by"] = redaction_id
  714. # Get the redaction event.
  715. because = yield self.get_event(
  716. redaction_id,
  717. check_redacted=False,
  718. allow_none=True,
  719. )
  720. if because:
  721. # It's fine to do add the event directly, since get_pdu_json
  722. # will serialise this field correctly
  723. ev.unsigned["redacted_because"] = because
  724. if get_prev_content and "replaces_state" in ev.unsigned:
  725. prev = yield self.get_event(
  726. ev.unsigned["replaces_state"],
  727. get_prev_content=False,
  728. allow_none=True,
  729. )
  730. if prev:
  731. ev.unsigned["prev_content"] = prev.content
  732. ev.unsigned["prev_sender"] = prev.sender
  733. self._get_event_cache.prefill(
  734. (ev.event_id, check_redacted, get_prev_content), ev
  735. )
  736. defer.returnValue(ev)
  737. def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
  738. check_redacted=True, get_prev_content=False,
  739. rejected_reason=None):
  740. d = json.loads(js)
  741. internal_metadata = json.loads(internal_metadata)
  742. if rejected_reason:
  743. rejected_reason = self._simple_select_one_onecol_txn(
  744. txn,
  745. table="rejections",
  746. keyvalues={"event_id": rejected_reason},
  747. retcol="reason",
  748. )
  749. ev = FrozenEvent(
  750. d,
  751. internal_metadata_dict=internal_metadata,
  752. rejected_reason=rejected_reason,
  753. )
  754. if check_redacted and redacted:
  755. ev = prune_event(ev)
  756. redaction_id = self._simple_select_one_onecol_txn(
  757. txn,
  758. table="redactions",
  759. keyvalues={"redacts": ev.event_id},
  760. retcol="event_id",
  761. )
  762. ev.unsigned["redacted_by"] = redaction_id
  763. # Get the redaction event.
  764. because = self._get_event_txn(
  765. txn,
  766. redaction_id,
  767. check_redacted=False
  768. )
  769. if because:
  770. ev.unsigned["redacted_because"] = because
  771. if get_prev_content and "replaces_state" in ev.unsigned:
  772. prev = self._get_event_txn(
  773. txn,
  774. ev.unsigned["replaces_state"],
  775. get_prev_content=False,
  776. )
  777. if prev:
  778. ev.unsigned["prev_content"] = prev.content
  779. ev.unsigned["prev_sender"] = prev.sender
  780. self._get_event_cache.prefill(
  781. (ev.event_id, check_redacted, get_prev_content), ev
  782. )
  783. return ev
  784. def _parse_events_txn(self, txn, rows):
  785. event_ids = [r["event_id"] for r in rows]
  786. return self._get_events_txn(txn, event_ids)
  787. @defer.inlineCallbacks
  788. def count_daily_messages(self):
  789. """
  790. Returns an estimate of the number of messages sent in the last day.
  791. If it has been significantly less or more than one day since the last
  792. call to this function, it will return None.
  793. """
  794. def _count_messages(txn):
  795. now = self.hs.get_clock().time()
  796. txn.execute(
  797. "SELECT reported_stream_token, reported_time FROM stats_reporting"
  798. )
  799. last_reported = self.cursor_to_dict(txn)
  800. txn.execute(
  801. "SELECT stream_ordering"
  802. " FROM events"
  803. " ORDER BY stream_ordering DESC"
  804. " LIMIT 1"
  805. )
  806. now_reporting = self.cursor_to_dict(txn)
  807. if not now_reporting:
  808. logger.info("Calculating daily messages skipped; no now_reporting")
  809. return None
  810. now_reporting = now_reporting[0]["stream_ordering"]
  811. txn.execute("DELETE FROM stats_reporting")
  812. txn.execute(
  813. "INSERT INTO stats_reporting"
  814. " (reported_stream_token, reported_time)"
  815. " VALUES (?, ?)",
  816. (now_reporting, now,)
  817. )
  818. if not last_reported:
  819. logger.info("Calculating daily messages skipped; no last_reported")
  820. return None
  821. # Close enough to correct for our purposes.
  822. yesterday = (now - 24 * 60 * 60)
  823. since_yesterday_seconds = yesterday - last_reported[0]["reported_time"]
  824. any_since_yesterday = math.fabs(since_yesterday_seconds) > 60 * 60
  825. if any_since_yesterday:
  826. logger.info(
  827. "Calculating daily messages skipped; since_yesterday_seconds: %d" %
  828. (since_yesterday_seconds,)
  829. )
  830. return None
  831. txn.execute(
  832. "SELECT COUNT(*) as messages"
  833. " FROM events NATURAL JOIN event_json"
  834. " WHERE json like '%m.room.message%'"
  835. " AND stream_ordering > ?"
  836. " AND stream_ordering <= ?",
  837. (
  838. last_reported[0]["reported_stream_token"],
  839. now_reporting,
  840. )
  841. )
  842. rows = self.cursor_to_dict(txn)
  843. if not rows:
  844. logger.info("Calculating daily messages skipped; messages count missing")
  845. return None
  846. return rows[0]["messages"]
  847. ret = yield self.runInteraction("count_messages", _count_messages)
  848. defer.returnValue(ret)
  849. @defer.inlineCallbacks
  850. def _background_reindex_origin_server_ts(self, progress, batch_size):
  851. target_min_stream_id = progress["target_min_stream_id_inclusive"]
  852. max_stream_id = progress["max_stream_id_exclusive"]
  853. rows_inserted = progress.get("rows_inserted", 0)
  854. INSERT_CLUMP_SIZE = 1000
  855. def reindex_search_txn(txn):
  856. sql = (
  857. "SELECT stream_ordering, event_id FROM events"
  858. " WHERE ? <= stream_ordering AND stream_ordering < ?"
  859. " ORDER BY stream_ordering DESC"
  860. " LIMIT ?"
  861. )
  862. txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
  863. rows = txn.fetchall()
  864. if not rows:
  865. return 0
  866. min_stream_id = rows[-1][0]
  867. event_ids = [row[1] for row in rows]
  868. events = self._get_events_txn(txn, event_ids)
  869. rows = []
  870. for event in events:
  871. try:
  872. event_id = event.event_id
  873. origin_server_ts = event.origin_server_ts
  874. except (KeyError, AttributeError):
  875. # If the event is missing a necessary field then
  876. # skip over it.
  877. continue
  878. rows.append((origin_server_ts, event_id))
  879. sql = (
  880. "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
  881. )
  882. for index in range(0, len(rows), INSERT_CLUMP_SIZE):
  883. clump = rows[index:index + INSERT_CLUMP_SIZE]
  884. txn.executemany(sql, clump)
  885. progress = {
  886. "target_min_stream_id_inclusive": target_min_stream_id,
  887. "max_stream_id_exclusive": min_stream_id,
  888. "rows_inserted": rows_inserted + len(rows)
  889. }
  890. self._background_update_progress_txn(
  891. txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
  892. )
  893. return len(rows)
  894. result = yield self.runInteraction(
  895. self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
  896. )
  897. if not result:
  898. yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
  899. defer.returnValue(result)
  900. def get_current_backfill_token(self):
  901. """The current minimum token that backfilled events have reached"""
  902. # TODO: Fix race with the persit_event txn by using one of the
  903. # stream id managers
  904. return -self.min_stream_token
  905. def get_all_new_events(self, last_backfill_id, last_forward_id,
  906. current_backfill_id, current_forward_id, limit):
  907. """Get all the new events that have arrived at the server either as
  908. new events or as backfilled events"""
  909. def get_all_new_events_txn(txn):
  910. sql = (
  911. "SELECT e.stream_ordering, ej.internal_metadata, ej.json"
  912. " FROM events as e"
  913. " JOIN event_json as ej"
  914. " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
  915. " WHERE ? < e.stream_ordering AND e.stream_ordering <= ?"
  916. " ORDER BY e.stream_ordering ASC"
  917. " LIMIT ?"
  918. )
  919. if last_forward_id != current_forward_id:
  920. txn.execute(sql, (last_forward_id, current_forward_id, limit))
  921. new_forward_events = txn.fetchall()
  922. else:
  923. new_forward_events = []
  924. sql = (
  925. "SELECT -e.stream_ordering, ej.internal_metadata, ej.json"
  926. " FROM events as e"
  927. " JOIN event_json as ej"
  928. " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
  929. " WHERE ? > e.stream_ordering AND e.stream_ordering >= ?"
  930. " ORDER BY e.stream_ordering DESC"
  931. " LIMIT ?"
  932. )
  933. if last_backfill_id != current_backfill_id:
  934. txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit))
  935. new_backfill_events = txn.fetchall()
  936. else:
  937. new_backfill_events = []
  938. return (new_forward_events, new_backfill_events)
  939. return self.runInteraction("get_all_new_events", get_all_new_events_txn)