|
@@ -23,7 +23,7 @@ from functools import wraps
|
|
|
from six import iteritems, text_type
|
|
|
from six.moves import range
|
|
|
|
|
|
-from canonicaljson import json
|
|
|
+from canonicaljson import encode_canonical_json, json
|
|
|
from prometheus_client import Counter, Histogram
|
|
|
|
|
|
from twisted.internet import defer
|
|
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
|
|
|
from synapse.api.errors import SynapseError
|
|
|
from synapse.events import EventBase # noqa: F401
|
|
|
from synapse.events.snapshot import EventContext # noqa: F401
|
|
|
+from synapse.events.utils import prune_event_dict
|
|
|
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
|
|
from synapse.logging.utils import log_function
|
|
|
from synapse.metrics import BucketCollector
|
|
@@ -262,6 +263,14 @@ class EventsStore(
|
|
|
|
|
|
hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
|
|
|
|
|
|
+ def _censor_redactions():
|
|
|
+ return run_as_background_process(
|
|
|
+ "_censor_redactions", self._censor_redactions
|
|
|
+ )
|
|
|
+
|
|
|
+ if self.hs.config.redaction_retention_period is not None:
|
|
|
+ hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)
|
|
|
+
|
|
|
@defer.inlineCallbacks
|
|
|
def _read_forward_extremities(self):
|
|
|
def fetch(txn):
|
|
@@ -1548,6 +1557,98 @@ class EventsStore(
|
|
|
(event.event_id, event.redacts),
|
|
|
)
|
|
|
|
|
|
+ @defer.inlineCallbacks
|
|
|
+ def _censor_redactions(self):
|
|
|
+ """Censors all redactions older than the configured period that haven't
|
|
|
+ been censored yet.
|
|
|
+
|
|
|
+ By censor we mean update the event_json table with the redacted event.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Deferred
|
|
|
+ """
|
|
|
+
|
|
|
+ if self.hs.config.redaction_retention_period is None:
|
|
|
+ return
|
|
|
+
|
|
|
+ max_pos = yield self.find_first_stream_ordering_after_ts(
|
|
|
+ self._clock.time_msec() - self.hs.config.redaction_retention_period
|
|
|
+ )
|
|
|
+
|
|
|
+ # We fetch all redactions that:
|
|
|
+ # 1. point to an event we have,
|
|
|
+ # 2. has a stream ordering from before the cut off, and
|
|
|
+ # 3. we haven't yet censored.
|
|
|
+ #
|
|
|
+ # This is limited to 100 events to ensure that we don't try and do too
|
|
|
+ # much at once. We'll get called again so this should eventually catch
|
|
|
+ # up.
|
|
|
+ #
|
|
|
+ # We use the range [-max_pos, max_pos] to handle backfilled events,
|
|
|
+ # which are given negative stream ordering.
|
|
|
+ sql = """
|
|
|
+ SELECT redact_event.event_id, redacts FROM redactions
|
|
|
+ INNER JOIN events AS redact_event USING (event_id)
|
|
|
+ INNER JOIN events AS original_event ON (
|
|
|
+ redact_event.room_id = original_event.room_id
|
|
|
+ AND redacts = original_event.event_id
|
|
|
+ )
|
|
|
+ WHERE NOT have_censored
|
|
|
+ AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ?
|
|
|
+ ORDER BY redact_event.stream_ordering ASC
|
|
|
+ LIMIT ?
|
|
|
+ """
|
|
|
+
|
|
|
+ rows = yield self._execute(
|
|
|
+ "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100
|
|
|
+ )
|
|
|
+
|
|
|
+ updates = []
|
|
|
+
|
|
|
+ for redaction_id, event_id in rows:
|
|
|
+ redaction_event = yield self.get_event(redaction_id, allow_none=True)
|
|
|
+ original_event = yield self.get_event(
|
|
|
+ event_id, allow_rejected=True, allow_none=True
|
|
|
+ )
|
|
|
+
|
|
|
+ # The SQL above ensures that we have both the redaction and
|
|
|
+ # original event, so if the `get_event` calls return None it
|
|
|
+ # means that the redaction wasn't allowed. Either way we know that
|
|
|
+ # the result won't change so we mark the fact that we've checked.
|
|
|
+ if (
|
|
|
+ redaction_event
|
|
|
+ and original_event
|
|
|
+ and original_event.internal_metadata.is_redacted()
|
|
|
+ ):
|
|
|
+ # Redaction was allowed
|
|
|
+ pruned_json = encode_canonical_json(
|
|
|
+ prune_event_dict(original_event.get_dict())
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ # Redaction wasn't allowed
|
|
|
+ pruned_json = None
|
|
|
+
|
|
|
+ updates.append((redaction_id, event_id, pruned_json))
|
|
|
+
|
|
|
+ def _update_censor_txn(txn):
|
|
|
+ for redaction_id, event_id, pruned_json in updates:
|
|
|
+ if pruned_json:
|
|
|
+ self._simple_update_one_txn(
|
|
|
+ txn,
|
|
|
+ table="event_json",
|
|
|
+ keyvalues={"event_id": event_id},
|
|
|
+ updatevalues={"json": pruned_json},
|
|
|
+ )
|
|
|
+
|
|
|
+ self._simple_update_one_txn(
|
|
|
+ txn,
|
|
|
+ table="redactions",
|
|
|
+ keyvalues={"event_id": redaction_id},
|
|
|
+ updatevalues={"have_censored": True},
|
|
|
+ )
|
|
|
+
|
|
|
+ yield self.runInteraction("_update_censor_txn", _update_censor_txn)
|
|
|
+
|
|
|
@defer.inlineCallbacks
|
|
|
def count_daily_messages(self):
|
|
|
"""
|