Преглед на файлове

Add a new ephemeral AS handler for to_device message edus

Here we add the ability for the application service ephemeral message
processor to handle new events on the "to_device" stream.

We keep track of a stream id (token) per application service, and every
time a new to-device message comes in, for each appservice we pull the
messages between the last-recorded and current stream id and check
whether any of the messages are for a user in that appservice's user
namespace.

get_new_messages is implemented in the next commit.

since we rebased off latest develop.
Andrew Morgan преди 2 години
родител
ревизия
b7a44d4402
променени са 2 файла, в които са добавени 114 реда и са изтрити 15 реда
  1. 97 6
      synapse/handlers/appservice.py
  2. 17 9
      synapse/notifier.py

+ 97 - 6
synapse/handlers/appservice.py

@@ -202,8 +202,9 @@ class ApplicationServicesHandler:
         Args:
             stream_key: The stream the event came from.
 
-                `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
-                value for `stream_key` will cause this function to return early.
+                `stream_key` can be "typing_key", "receipt_key", "presence_key" or
+                "to_device_key". Any other value for `stream_key` will cause this function
+                to return early.
 
                 Ephemeral events will only be pushed to appservices that have opted into
                 receiving them by setting `push_ephemeral` to true in their registration
@@ -219,10 +220,6 @@ class ApplicationServicesHandler:
         if not self.notify_appservices:
             return
 
-        # Ignore any unsupported streams
-        if stream_key not in ("typing_key", "receipt_key", "presence_key"):
-            return
-
         # Assert that new_token is an integer (and not a RoomStreamToken).
         # All of the supported streams that this function handles use an
         # integer to track progress (rather than a RoomStreamToken - a
@@ -236,6 +233,13 @@ class ApplicationServicesHandler:
         # Additional context: https://github.com/matrix-org/synapse/pull/11137
         assert isinstance(new_token, int)
 
+        # Ignore to-device messages if the feature flag is not enabled
+        if (
+            stream_key == "to_device_key"
+            and not self.msc2409_to_device_messages_enabled
+        ):
+            return
+
         # Check whether there are any appservices which have registered to receive
         # ephemeral events.
         #
@@ -310,6 +314,23 @@ class ApplicationServicesHandler:
                             service, "presence", new_token
                         )
 
+                    elif (
+                        stream_key == "to_device_key"
+                        and self.msc2409_to_device_messages_enabled
+                    ):
+                        # Retrieve an iterable of to-device message events, as well as the
+                        # maximum stream token of the messages we were able to retrieve.
+                        events = await self._handle_to_device(service, new_token, users)
+                        if events:
+                            self.scheduler.submit_ephemeral_events_for_as(
+                                service, events
+                            )
+
+                        # Persist the latest handled stream token for this appservice
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "to_device", new_token
+                        )
+
     async def _handle_typing(
         self, service: ApplicationService, new_token: int
     ) -> List[JsonDict]:
@@ -443,6 +464,76 @@ class ApplicationServicesHandler:
 
         return events
 
+    async def _handle_to_device(
+        self,
+        service: ApplicationService,
+        new_token: int,
+        users: Collection[Union[str, UserID]],
+    ) -> List[JsonDict]:
+        """
+        Given an application service, determine which events it should receive
+        from those between the last-recorded typing event stream token for this
+        appservice and the given stream token.
+
+        Args:
+            service: The application service to check for which events it should receive.
+            new_token: The latest to-device event stream token.
+            users: The users that should receive new to-device messages.
+
+        Returns:
+            A list of JSON dictionaries containing data derived from the typing events
+                that should be sent to the given application service.
+        """
+        # Get the stream token that this application service has processed up until
+        from_key = await self.store.get_type_stream_id_for_appservice(
+            service, "to_device"
+        )
+
+        # Filter out users that this appservice is not interested in
+        users_appservice_is_interested_in: List[str] = []
+        for user in users:
+            if isinstance(user, UserID):
+                user = user.to_string()
+
+            if service.is_interested_in_user(user):
+                users_appservice_is_interested_in.append(user)
+
+        if not users_appservice_is_interested_in:
+            # Return early if the AS was not interested in any of these users
+            return []
+
+        # Retrieve the to-device messages for each user
+        recipient_user_id_device_id_to_messages = await self.store.get_new_messages(
+            users_appservice_is_interested_in,
+            from_key,
+            new_token,
+        )
+
+        # According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields
+        # to the event JSON so that the application service will know which user/device
+        # combination this messages was intended for.
+        #
+        # So we mangle this dict into a flat list of to-device messages with the relevant
+        # user ID and device ID embedded inside each message dict.
+        message_payload: List[JsonDict] = []
+        for (
+            user_id,
+            device_id,
+        ), messages in recipient_user_id_device_id_to_messages.items():
+            for message_json in messages:
+                # Remove 'message_id' from the to-device message, as it's an internal ID
+                message_json.pop("message_id", None)
+
+                message_payload.append(
+                    {
+                        "to_user_id": user_id,
+                        "to_device_id": device_id,
+                        **message_json,
+                    }
+                )
+
+        return message_payload
+
     async def query_user_exists(self, user_id: str) -> bool:
         """Check if any application service knows this user_id exists.
 

+ 17 - 9
synapse/notifier.py

@@ -444,15 +444,23 @@ class Notifier:
 
             self.notify_replication()
 
-            # Notify appservices.
-            try:
-                self.appservice_handler.notify_interested_services_ephemeral(
-                    stream_key,
-                    new_token,
-                    users,
-                )
-            except Exception:
-                logger.exception("Error notifying application services of event")
+            # Notify appservices of updates in ephemeral event streams.
+            # Only the following streams are currently supported.
+            if stream_key in (
+                "typing_key",
+                "receipt_key",
+                "presence_key",
+                "to_device_key",
+            ):
+                # Notify appservices.
+                try:
+                    self.appservice_handler.notify_interested_services_ephemeral(
+                        stream_key,
+                        new_token,
+                        users,
+                    )
+                except Exception:
+                    logger.exception("Error notifying application services of event")
 
     def on_new_replication_data(self) -> None:
         """Used to inform replication listeners that something has happened