|
@@ -12,7 +12,7 @@
|
|
|
# See the License for the specific language governing permissions and
|
|
|
# limitations under the License.
|
|
|
import logging
|
|
|
-from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Union
|
|
|
+from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Union, Set
|
|
|
|
|
|
from prometheus_client import Counter
|
|
|
|
|
@@ -194,6 +194,7 @@ class ApplicationServicesHandler:
|
|
|
stream_key: str,
|
|
|
new_token: Union[int, RoomStreamToken],
|
|
|
users: Collection[Union[str, UserID]],
|
|
|
+ room_ids: Optional[Collection[str]] = None,
|
|
|
) -> None:
|
|
|
"""
|
|
|
This is called by the notifier in the background when an ephemeral event is handled
|
|
@@ -218,6 +219,9 @@ class ApplicationServicesHandler:
|
|
|
|
|
|
new_token: The stream token of the event.
|
|
|
users: The users that should be informed of the new event, if any.
|
|
|
+ room_ids: The room IDs whose joined members should be notified of
|
|
|
+ the new event. Application services that register explicit interest
|
|
|
+ in a room ID in this list are also be notified.
|
|
|
"""
|
|
|
if not self.notify_appservices:
|
|
|
return
|
|
@@ -271,7 +275,7 @@ class ApplicationServicesHandler:
|
|
|
# We only start a new background process if necessary rather than
|
|
|
# optimistically (to cut down on overhead).
|
|
|
self._notify_interested_services_ephemeral(
|
|
|
- services, stream_key, new_token, users
|
|
|
+ services, stream_key, new_token, users, room_ids
|
|
|
)
|
|
|
|
|
|
@wrap_as_background_process("notify_interested_services_ephemeral")
|
|
@@ -281,7 +285,17 @@ class ApplicationServicesHandler:
|
|
|
stream_key: str,
|
|
|
new_token: int,
|
|
|
users: Collection[Union[str, UserID]],
|
|
|
+ room_ids: Optional[Collection[str]] = None,
|
|
|
) -> None:
|
|
|
+ # Calculate potential users to notify from the joined members of the given rooms
|
|
|
+ if room_ids:
|
|
|
+ joined_members: Set[Union[str, UserID]] = set()
|
|
|
+ for room_id in room_ids:
|
|
|
+ room_members = await self.store.get_users_in_room(room_id)
|
|
|
+ joined_members.update(room_members)
|
|
|
+
|
|
|
+ users = joined_members.union(users)
|
|
|
+
|
|
|
logger.debug("Checking interested services for %s", stream_key)
|
|
|
with Measure(self.clock, "notify_interested_services_ephemeral"):
|
|
|
for service in services:
|