|
@@ -16,7 +16,6 @@ import logging
|
|
|
|
|
|
from twisted.internet import defer
|
|
from twisted.internet import defer
|
|
|
|
|
|
-from synapse.metrics.background_process_metrics import run_as_background_process
|
|
|
|
from synapse.types import get_domain_from_id
|
|
from synapse.types import get_domain_from_id
|
|
|
|
|
|
from ._base import BaseHandler
|
|
from ._base import BaseHandler
|
|
@@ -38,31 +37,6 @@ class ReceiptsHandler(BaseHandler):
|
|
self.clock = self.hs.get_clock()
|
|
self.clock = self.hs.get_clock()
|
|
self.state = hs.get_state_handler()
|
|
self.state = hs.get_state_handler()
|
|
|
|
|
|
- @defer.inlineCallbacks
|
|
|
|
- def received_client_receipt(self, room_id, receipt_type, user_id,
|
|
|
|
- event_id):
|
|
|
|
- """Called when a client tells us a local user has read up to the given
|
|
|
|
- event_id in the room.
|
|
|
|
- """
|
|
|
|
- receipt = {
|
|
|
|
- "room_id": room_id,
|
|
|
|
- "receipt_type": receipt_type,
|
|
|
|
- "user_id": user_id,
|
|
|
|
- "event_ids": [event_id],
|
|
|
|
- "data": {
|
|
|
|
- "ts": int(self.clock.time_msec()),
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- is_new = yield self._handle_new_receipts([receipt])
|
|
|
|
-
|
|
|
|
- if is_new:
|
|
|
|
- # fire off a process in the background to send the receipt to
|
|
|
|
- # remote servers
|
|
|
|
- run_as_background_process(
|
|
|
|
- 'push_receipts_to_remotes', self._push_remotes, receipt
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
@defer.inlineCallbacks
|
|
@defer.inlineCallbacks
|
|
def _received_remote_receipt(self, origin, content):
|
|
def _received_remote_receipt(self, origin, content):
|
|
"""Called when we receive an EDU of type m.receipt from a remote HS.
|
|
"""Called when we receive an EDU of type m.receipt from a remote HS.
|
|
@@ -128,43 +102,54 @@ class ReceiptsHandler(BaseHandler):
|
|
defer.returnValue(True)
|
|
defer.returnValue(True)
|
|
|
|
|
|
@defer.inlineCallbacks
|
|
@defer.inlineCallbacks
|
|
- def _push_remotes(self, receipt):
|
|
|
|
- """Given a receipt, works out which remote servers should be
|
|
|
|
- poked and pokes them.
|
|
|
|
|
|
+ def received_client_receipt(self, room_id, receipt_type, user_id,
|
|
|
|
+ event_id):
|
|
|
|
+ """Called when a client tells us a local user has read up to the given
|
|
|
|
+ event_id in the room.
|
|
"""
|
|
"""
|
|
- try:
|
|
|
|
- # TODO: optimise this to move some of the work to the workers.
|
|
|
|
- room_id = receipt["room_id"]
|
|
|
|
- receipt_type = receipt["receipt_type"]
|
|
|
|
- user_id = receipt["user_id"]
|
|
|
|
- event_ids = receipt["event_ids"]
|
|
|
|
- data = receipt["data"]
|
|
|
|
|
|
+ receipt = {
|
|
|
|
+ "room_id": room_id,
|
|
|
|
+ "receipt_type": receipt_type,
|
|
|
|
+ "user_id": user_id,
|
|
|
|
+ "event_ids": [event_id],
|
|
|
|
+ "data": {
|
|
|
|
+ "ts": int(self.clock.time_msec()),
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- users = yield self.state.get_current_user_in_room(room_id)
|
|
|
|
- remotedomains = set(get_domain_from_id(u) for u in users)
|
|
|
|
- remotedomains = remotedomains.copy()
|
|
|
|
- remotedomains.discard(self.server_name)
|
|
|
|
-
|
|
|
|
- logger.debug("Sending receipt to: %r", remotedomains)
|
|
|
|
-
|
|
|
|
- for domain in remotedomains:
|
|
|
|
- self.federation.build_and_send_edu(
|
|
|
|
- destination=domain,
|
|
|
|
- edu_type="m.receipt",
|
|
|
|
- content={
|
|
|
|
- room_id: {
|
|
|
|
- receipt_type: {
|
|
|
|
- user_id: {
|
|
|
|
- "event_ids": event_ids,
|
|
|
|
- "data": data,
|
|
|
|
- }
|
|
|
|
|
|
+ is_new = yield self._handle_new_receipts([receipt])
|
|
|
|
+ if not is_new:
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ # Work out which remote servers should be poked and poke them.
|
|
|
|
+
|
|
|
|
+ # TODO: optimise this to move some of the work to the workers.
|
|
|
|
+ data = receipt["data"]
|
|
|
|
+
|
|
|
|
+ # XXX why does this not use state.get_current_hosts_in_room() ?
|
|
|
|
+ users = yield self.state.get_current_user_in_room(room_id)
|
|
|
|
+ remotedomains = set(get_domain_from_id(u) for u in users)
|
|
|
|
+ remotedomains = remotedomains.copy()
|
|
|
|
+ remotedomains.discard(self.server_name)
|
|
|
|
+
|
|
|
|
+ logger.debug("Sending receipt to: %r", remotedomains)
|
|
|
|
+
|
|
|
|
+ for domain in remotedomains:
|
|
|
|
+ self.federation.build_and_send_edu(
|
|
|
|
+ destination=domain,
|
|
|
|
+ edu_type="m.receipt",
|
|
|
|
+ content={
|
|
|
|
+ room_id: {
|
|
|
|
+ receipt_type: {
|
|
|
|
+ user_id: {
|
|
|
|
+ "event_ids": [event_id],
|
|
|
|
+ "data": data,
|
|
}
|
|
}
|
|
- },
|
|
|
|
|
|
+ }
|
|
},
|
|
},
|
|
- key=(room_id, receipt_type, user_id),
|
|
|
|
- )
|
|
|
|
- except Exception:
|
|
|
|
- logger.exception("Error pushing receipts to remote servers")
|
|
|
|
|
|
+ },
|
|
|
|
+ key=(room_id, receipt_type, user_id),
|
|
|
|
+ )
|
|
|
|
|
|
@defer.inlineCallbacks
|
|
@defer.inlineCallbacks
|
|
def get_receipts_for_room(self, room_id, to_key):
|
|
def get_receipts_for_room(self, room_id, to_key):
|