Prechádzať zdrojové kódy

Merge remote-tracking branch 'origin/develop' into rav/event_auth/1

Richard van der Hoff 4 rokov pred
rodič
commit
2b22faded7

+ 6 - 0
CHANGES.md

@@ -1,3 +1,9 @@
+Synapse 1.4.1 (2019-10-18)
+==========================
+
+No changes since 1.4.1rc1.
+
+
 Synapse 1.4.1rc1 (2019-10-17)
 =============================
 

+ 1 - 0
changelog.d/6196.misc

@@ -0,0 +1 @@
+Port synapse.rest.admin module to use async/await.

+ 1 - 0
changelog.d/6197.docker

@@ -0,0 +1 @@
+Fix logging getting lost for the docker image.

+ 1 - 0
changelog.d/6212.bugfix

@@ -0,0 +1 @@
+Fix bug where presence would not get timed out correctly if a synchrotron worker is used and restarted.

+ 1 - 0
changelog.d/6216.bugfix

@@ -0,0 +1 @@
+synapse_port_db: Add 2 additional BOOLEAN_COLUMNS to be able to convert from database schema v56.

+ 11 - 24
contrib/docker/README.md

@@ -1,39 +1,26 @@
-# Synapse Docker
-
-FIXME: this is out-of-date as of
-https://github.com/matrix-org/synapse/issues/5518. Contributions to bring it up
-to date would be welcome.
-
-### Automated configuration
-
-It is recommended that you use Docker Compose to run your containers, including
-this image and a Postgres server. A sample ``docker-compose.yml`` is provided,
-including example labels for reverse proxying and other artifacts.
-
-Read the section about environment variables and set at least mandatory variables,
-then run the server:
-
-```
-docker-compose up -d
-```
 
-If secrets are not specified in the environment variables, they will be generated
-as part of the startup. Please ensure these secrets are kept between launches of the
-Docker container, as their loss may require users to log in again.
+# Synapse Docker
 
-### Manual configuration
+### Configuration
 
 A sample ``docker-compose.yml`` is provided, including example labels for
 reverse proxying and other artifacts. The docker-compose file is an example,
 please comment/uncomment sections that are not suitable for your usecase.
 
 Specify a ``SYNAPSE_CONFIG_PATH``, preferably to a persistent path,
-to use manual configuration. To generate a fresh ``homeserver.yaml``, simply run:
+to use manual configuration.
+
+To generate a fresh `homeserver.yaml`, you can use the `generate` command.
+(See the [documentation](../../docker/README.md#generating-a-configuration-file)
+for more information.) You will need to specify appropriate values for at least the
+`SYNAPSE_SERVER_NAME` and `SYNAPSE_REPORT_STATS` environment variables. For example:
 
 ```
-docker-compose run --rm -e SYNAPSE_SERVER_NAME=my.matrix.host synapse generate
+docker-compose run --rm -e SYNAPSE_SERVER_NAME=my.matrix.host -e SYNAPSE_REPORT_STATS=yes synapse generate
 ```
 
+(This will also generate necessary signing keys.)
+
 Then, customize your configuration and run the server:
 
 ```

+ 16 - 5
contrib/docker/docker-compose.yml

@@ -15,13 +15,10 @@ services:
     restart: unless-stopped
     # See the readme for a full documentation of the environment settings
     environment:
-      - SYNAPSE_SERVER_NAME=my.matrix.host
-      - SYNAPSE_REPORT_STATS=no
-      - SYNAPSE_ENABLE_REGISTRATION=yes
-      - SYNAPSE_LOG_LEVEL=INFO
-      - POSTGRES_PASSWORD=changeme
+      - SYNAPSE_CONFIG_PATH=/etc/homeserver.yaml
     volumes:
       # You may either store all the files in a local folder
+      - ./matrix-config:/etc
       - ./files:/data
       # .. or you may split this between different storage points
       # - ./files:/data
@@ -35,9 +32,23 @@ services:
       - 8448:8448/tcp
     # ... or use a reverse proxy, here is an example for traefik:
     labels:
+      # The following lines are valid for Traefik version 1.x:
       - traefik.enable=true
       - traefik.frontend.rule=Host:my.matrix.Host
       - traefik.port=8008
+      # Alternatively, for Traefik version 2.0:
+      - traefik.enable=true
+      - traefik.http.routers.http-synapse.entryPoints=http
+      - traefik.http.routers.http-synapse.rule=Host(`my.matrix.host`)
+      - traefik.http.middlewares.https_redirect.redirectscheme.scheme=https
+      - traefik.http.middlewares.https_redirect.redirectscheme.permanent=true
+      - traefik.http.routers.http-synapse.middlewares=https_redirect
+      - traefik.http.routers.https-synapse.entryPoints=https
+      - traefik.http.routers.https-synapse.rule=Host(`my.matrix.host`)
+      - traefik.http.routers.https-synapse.service=synapse
+      - traefik.http.routers.https-synapse.tls=true
+      - traefik.http.services.synapse.loadbalancer.server.port=8008
+      - traefik.http.routers.https-synapse.tls.certResolver=le-ssl
 
   db:
     image: docker.io/postgres:10-alpine

+ 6 - 0
debian/changelog

@@ -1,3 +1,9 @@
+matrix-synapse-py3 (1.4.1) stable; urgency=medium
+
+  * New synapse release 1.4.1.
+
+ -- Synapse Packaging team <packages@matrix.org>  Fri, 18 Oct 2019 10:13:27 +0100
+
 matrix-synapse-py3 (1.4.0) stable; urgency=medium
 
   * New synapse release 1.4.0.

+ 2 - 0
docker/conf/log.config

@@ -24,3 +24,5 @@ loggers:
 root:
     level: {{ SYNAPSE_LOG_LEVEL or "INFO" }}
     handlers: [console]
+
+disable_existing_loggers: false

+ 2 - 0
scripts/synapse_port_db

@@ -55,6 +55,8 @@ BOOLEAN_COLUMNS = {
     "local_group_membership": ["is_publicised", "is_admin"],
     "e2e_room_keys": ["is_verified"],
     "account_validity": ["email_sent"],
+    "redactions": ["have_censored"],
+    "room_stats_state": ["is_federatable"],
 }
 
 

+ 1 - 1
synapse/__init__.py

@@ -36,7 +36,7 @@ try:
 except ImportError:
     pass
 
-__version__ = "1.4.1rc1"
+__version__ = "1.4.1"
 
 if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
     # We import here so that we don't have to install a bunch of deps when

+ 2 - 3
synapse/config/logger.py

@@ -68,9 +68,6 @@ handlers:
         filters: [context]
 
 loggers:
-    synapse:
-        level: INFO
-
     synapse.storage.SQL:
         # beware: increasing this to DEBUG will make synapse log sensitive
         # information such as access tokens.
@@ -79,6 +76,8 @@ loggers:
 root:
     level: INFO
     handlers: [file, console]
+
+disable_existing_loggers: false
 """
 )
 

+ 9 - 4
synapse/handlers/presence.py

@@ -24,6 +24,7 @@ The methods that define policy are:
 
 import logging
 from contextlib import contextmanager
+from typing import Dict, Set
 
 from six import iteritems, itervalues
 
@@ -179,8 +180,9 @@ class PresenceHandler(object):
         # we assume that all the sync requests on that process have stopped.
         # Stored as a dict from process_id to set of user_id, and a dict of
         # process_id to millisecond timestamp last updated.
-        self.external_process_to_current_syncs = {}
-        self.external_process_last_updated_ms = {}
+        self.external_process_to_current_syncs = {}  # type: Dict[int, Set[str]]
+        self.external_process_last_updated_ms = {}  # type: Dict[int, int]
+
         self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
 
         # Start a LoopingCall in 30s that fires every 5s.
@@ -349,10 +351,13 @@ class PresenceHandler(object):
             if now - last_update > EXTERNAL_PROCESS_EXPIRY
         ]
         for process_id in expired_process_ids:
+            # For each expired process drop tracking info and check the users
+            # that were syncing on that process to see if they need to be timed
+            # out.
             users_to_check.update(
-                self.external_process_last_updated_ms.pop(process_id, ())
+                self.external_process_to_current_syncs.pop(process_id, ())
             )
-            self.external_process_last_update.pop(process_id)
+            self.external_process_last_updated_ms.pop(process_id)
 
         states = [
             self.user_to_current_state.get(user_id, UserPresenceState.default(user_id))

+ 58 - 72
synapse/rest/admin/__init__.py

@@ -23,8 +23,6 @@ import re
 from six import text_type
 from six.moves import http_client
 
-from twisted.internet import defer
-
 import synapse
 from synapse.api.constants import Membership, UserTypes
 from synapse.api.errors import Codes, NotFoundError, SynapseError
@@ -46,6 +44,7 @@ from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
 from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
 from synapse.rest.admin.users import UserAdminServlet
 from synapse.types import UserID, create_requester
+from synapse.util.async_helpers import maybe_awaitable
 from synapse.util.versionstring import get_version_string
 
 logger = logging.getLogger(__name__)
@@ -59,15 +58,14 @@ class UsersRestServlet(RestServlet):
         self.auth = hs.get_auth()
         self.handlers = hs.get_handlers()
 
-    @defer.inlineCallbacks
-    def on_GET(self, request, user_id):
+    async def on_GET(self, request, user_id):
         target_user = UserID.from_string(user_id)
-        yield assert_requester_is_admin(self.auth, request)
+        await assert_requester_is_admin(self.auth, request)
 
         if not self.hs.is_mine(target_user):
             raise SynapseError(400, "Can only users a local user")
 
-        ret = yield self.handlers.admin_handler.get_users()
+        ret = await self.handlers.admin_handler.get_users()
 
         return 200, ret
 
@@ -122,8 +120,7 @@ class UserRegisterServlet(RestServlet):
         self.nonces[nonce] = int(self.reactor.seconds())
         return 200, {"nonce": nonce}
 
-    @defer.inlineCallbacks
-    def on_POST(self, request):
+    async def on_POST(self, request):
         self._clear_old_nonces()
 
         if not self.hs.config.registration_shared_secret:
@@ -204,14 +201,14 @@ class UserRegisterServlet(RestServlet):
 
         register = RegisterRestServlet(self.hs)
 
-        user_id = yield register.registration_handler.register_user(
+        user_id = await register.registration_handler.register_user(
             localpart=body["username"].lower(),
             password=body["password"],
             admin=bool(admin),
             user_type=user_type,
         )
 
-        result = yield register._create_registration_details(user_id, body)
+        result = await register._create_registration_details(user_id, body)
         return 200, result
 
 
@@ -223,19 +220,18 @@ class WhoisRestServlet(RestServlet):
         self.auth = hs.get_auth()
         self.handlers = hs.get_handlers()
 
-    @defer.inlineCallbacks
-    def on_GET(self, request, user_id):
+    async def on_GET(self, request, user_id):
         target_user = UserID.from_string(user_id)
-        requester = yield self.auth.get_user_by_req(request)
+        requester = await self.auth.get_user_by_req(request)
         auth_user = requester.user
 
         if target_user != auth_user:
-            yield assert_user_is_admin(self.auth, auth_user)
+            await assert_user_is_admin(self.auth, auth_user)
 
         if not self.hs.is_mine(target_user):
             raise SynapseError(400, "Can only whois a local user")
 
-        ret = yield self.handlers.admin_handler.get_whois(target_user)
+        ret = await self.handlers.admin_handler.get_whois(target_user)
 
         return 200, ret
 
@@ -255,9 +251,8 @@ class PurgeHistoryRestServlet(RestServlet):
         self.store = hs.get_datastore()
         self.auth = hs.get_auth()
 
-    @defer.inlineCallbacks
-    def on_POST(self, request, room_id, event_id):
-        yield assert_requester_is_admin(self.auth, request)
+    async def on_POST(self, request, room_id, event_id):
+        await assert_requester_is_admin(self.auth, request)
 
         body = parse_json_object_from_request(request, allow_empty_body=True)
 
@@ -270,12 +265,12 @@ class PurgeHistoryRestServlet(RestServlet):
             event_id = body.get("purge_up_to_event_id")
 
         if event_id is not None:
-            event = yield self.store.get_event(event_id)
+            event = await self.store.get_event(event_id)
 
             if event.room_id != room_id:
                 raise SynapseError(400, "Event is for wrong room.")
 
-            token = yield self.store.get_topological_token_for_event(event_id)
+            token = await self.store.get_topological_token_for_event(event_id)
 
             logger.info("[purge] purging up to token %s (event_id %s)", token, event_id)
         elif "purge_up_to_ts" in body:
@@ -285,12 +280,10 @@ class PurgeHistoryRestServlet(RestServlet):
                     400, "purge_up_to_ts must be an int", errcode=Codes.BAD_JSON
                 )
 
-            stream_ordering = (yield self.store.find_first_stream_ordering_after_ts(ts))
+            stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts)
 
-            r = (
-                yield self.store.get_room_event_after_stream_ordering(
-                    room_id, stream_ordering
-                )
+            r = await self.store.get_room_event_after_stream_ordering(
+                room_id, stream_ordering
             )
             if not r:
                 logger.warn(
@@ -318,7 +311,7 @@ class PurgeHistoryRestServlet(RestServlet):
                 errcode=Codes.BAD_JSON,
             )
 
-        purge_id = yield self.pagination_handler.start_purge_history(
+        purge_id = self.pagination_handler.start_purge_history(
             room_id, token, delete_local_events=delete_local_events
         )
 
@@ -339,9 +332,8 @@ class PurgeHistoryStatusRestServlet(RestServlet):
         self.pagination_handler = hs.get_pagination_handler()
         self.auth = hs.get_auth()
 
-    @defer.inlineCallbacks
-    def on_GET(self, request, purge_id):
-        yield assert_requester_is_admin(self.auth, request)
+    async def on_GET(self, request, purge_id):
+        await assert_requester_is_admin(self.auth, request)
 
         purge_status = self.pagination_handler.get_purge_status(purge_id)
         if purge_status is None:
@@ -357,9 +349,8 @@ class DeactivateAccountRestServlet(RestServlet):
         self._deactivate_account_handler = hs.get_deactivate_account_handler()
         self.auth = hs.get_auth()
 
-    @defer.inlineCallbacks
-    def on_POST(self, request, target_user_id):
-        yield assert_requester_is_admin(self.auth, request)
+    async def on_POST(self, request, target_user_id):
+        await assert_requester_is_admin(self.auth, request)
         body = parse_json_object_from_request(request, allow_empty_body=True)
         erase = body.get("erase", False)
         if not isinstance(erase, bool):
@@ -371,7 +362,7 @@ class DeactivateAccountRestServlet(RestServlet):
 
         UserID.from_string(target_user_id)
 
-        result = yield self._deactivate_account_handler.deactivate_account(
+        result = await self._deactivate_account_handler.deactivate_account(
             target_user_id, erase
         )
         if result:
@@ -405,10 +396,9 @@ class ShutdownRoomRestServlet(RestServlet):
         self.room_member_handler = hs.get_room_member_handler()
         self.auth = hs.get_auth()
 
-    @defer.inlineCallbacks
-    def on_POST(self, request, room_id):
-        requester = yield self.auth.get_user_by_req(request)
-        yield assert_user_is_admin(self.auth, requester.user)
+    async def on_POST(self, request, room_id):
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
 
         content = parse_json_object_from_request(request)
         assert_params_in_dict(content, ["new_room_user_id"])
@@ -419,7 +409,7 @@ class ShutdownRoomRestServlet(RestServlet):
         message = content.get("message", self.DEFAULT_MESSAGE)
         room_name = content.get("room_name", "Content Violation Notification")
 
-        info = yield self._room_creation_handler.create_room(
+        info = await self._room_creation_handler.create_room(
             room_creator_requester,
             config={
                 "preset": "public_chat",
@@ -438,9 +428,9 @@ class ShutdownRoomRestServlet(RestServlet):
 
         # This will work even if the room is already blocked, but that is
         # desirable in case the first attempt at blocking the room failed below.
-        yield self.store.block_room(room_id, requester_user_id)
+        await self.store.block_room(room_id, requester_user_id)
 
-        users = yield self.state.get_current_users_in_room(room_id)
+        users = await self.state.get_current_users_in_room(room_id)
         kicked_users = []
         failed_to_kick_users = []
         for user_id in users:
@@ -451,7 +441,7 @@ class ShutdownRoomRestServlet(RestServlet):
 
             try:
                 target_requester = create_requester(user_id)
-                yield self.room_member_handler.update_membership(
+                await self.room_member_handler.update_membership(
                     requester=target_requester,
                     target=target_requester.user,
                     room_id=room_id,
@@ -461,9 +451,9 @@ class ShutdownRoomRestServlet(RestServlet):
                     require_consent=False,
                 )
 
-                yield self.room_member_handler.forget(target_requester.user, room_id)
+                await self.room_member_handler.forget(target_requester.user, room_id)
 
-                yield self.room_member_handler.update_membership(
+                await self.room_member_handler.update_membership(
                     requester=target_requester,
                     target=target_requester.user,
                     room_id=new_room_id,
@@ -480,7 +470,7 @@ class ShutdownRoomRestServlet(RestServlet):
                 )
                 failed_to_kick_users.append(user_id)
 
-        yield self.event_creation_handler.create_and_send_nonmember_event(
+        await self.event_creation_handler.create_and_send_nonmember_event(
             room_creator_requester,
             {
                 "type": "m.room.message",
@@ -491,9 +481,11 @@ class ShutdownRoomRestServlet(RestServlet):
             ratelimit=False,
         )
 
-        aliases_for_room = yield self.store.get_aliases_for_room(room_id)
+        aliases_for_room = await maybe_awaitable(
+            self.store.get_aliases_for_room(room_id)
+        )
 
-        yield self.store.update_aliases_for_room(
+        await self.store.update_aliases_for_room(
             room_id, new_room_id, requester_user_id
         )
 
@@ -532,13 +524,12 @@ class ResetPasswordRestServlet(RestServlet):
         self.auth = hs.get_auth()
         self._set_password_handler = hs.get_set_password_handler()
 
-    @defer.inlineCallbacks
-    def on_POST(self, request, target_user_id):
+    async def on_POST(self, request, target_user_id):
         """Post request to allow an administrator reset password for a user.
         This needs user to have administrator access in Synapse.
         """
-        requester = yield self.auth.get_user_by_req(request)
-        yield assert_user_is_admin(self.auth, requester.user)
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
 
         UserID.from_string(target_user_id)
 
@@ -546,7 +537,7 @@ class ResetPasswordRestServlet(RestServlet):
         assert_params_in_dict(params, ["new_password"])
         new_password = params["new_password"]
 
-        yield self._set_password_handler.set_password(
+        await self._set_password_handler.set_password(
             target_user_id, new_password, requester
         )
         return 200, {}
@@ -572,12 +563,11 @@ class GetUsersPaginatedRestServlet(RestServlet):
         self.auth = hs.get_auth()
         self.handlers = hs.get_handlers()
 
-    @defer.inlineCallbacks
-    def on_GET(self, request, target_user_id):
+    async def on_GET(self, request, target_user_id):
         """Get request to get specific number of users from Synapse.
         This needs user to have administrator access in Synapse.
         """
-        yield assert_requester_is_admin(self.auth, request)
+        await assert_requester_is_admin(self.auth, request)
 
         target_user = UserID.from_string(target_user_id)
 
@@ -590,11 +580,10 @@ class GetUsersPaginatedRestServlet(RestServlet):
 
         logger.info("limit: %s, start: %s", limit, start)
 
-        ret = yield self.handlers.admin_handler.get_users_paginate(order, start, limit)
+        ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit)
         return 200, ret
 
-    @defer.inlineCallbacks
-    def on_POST(self, request, target_user_id):
+    async def on_POST(self, request, target_user_id):
         """Post request to get specific number of users from Synapse..
         This needs user to have administrator access in Synapse.
         Example:
@@ -608,7 +597,7 @@ class GetUsersPaginatedRestServlet(RestServlet):
         Returns:
             200 OK with json object {list[dict[str, Any]], count} or empty object.
         """
-        yield assert_requester_is_admin(self.auth, request)
+        await assert_requester_is_admin(self.auth, request)
         UserID.from_string(target_user_id)
 
         order = "name"  # order by name in user table
@@ -618,7 +607,7 @@ class GetUsersPaginatedRestServlet(RestServlet):
         start = params["start"]
         logger.info("limit: %s, start: %s", limit, start)
 
-        ret = yield self.handlers.admin_handler.get_users_paginate(order, start, limit)
+        ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit)
         return 200, ret
 
 
@@ -641,13 +630,12 @@ class SearchUsersRestServlet(RestServlet):
         self.auth = hs.get_auth()
         self.handlers = hs.get_handlers()
 
-    @defer.inlineCallbacks
-    def on_GET(self, request, target_user_id):
+    async def on_GET(self, request, target_user_id):
         """Get request to search user table for specific users according to
         search term.
         This needs user to have a administrator access in Synapse.
         """
-        yield assert_requester_is_admin(self.auth, request)
+        await assert_requester_is_admin(self.auth, request)
 
         target_user = UserID.from_string(target_user_id)
 
@@ -661,7 +649,7 @@ class SearchUsersRestServlet(RestServlet):
         term = parse_string(request, "term", required=True)
         logger.info("term: %s ", term)
 
-        ret = yield self.handlers.admin_handler.search_users(term)
+        ret = await self.handlers.admin_handler.search_users(term)
         return 200, ret
 
 
@@ -676,15 +664,14 @@ class DeleteGroupAdminRestServlet(RestServlet):
         self.is_mine_id = hs.is_mine_id
         self.auth = hs.get_auth()
 
-    @defer.inlineCallbacks
-    def on_POST(self, request, group_id):
-        requester = yield self.auth.get_user_by_req(request)
-        yield assert_user_is_admin(self.auth, requester.user)
+    async def on_POST(self, request, group_id):
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
 
         if not self.is_mine_id(group_id):
             raise SynapseError(400, "Can only delete local groups")
 
-        yield self.group_server.delete_group(group_id, requester.user.to_string())
+        await self.group_server.delete_group(group_id, requester.user.to_string())
         return 200, {}
 
 
@@ -700,16 +687,15 @@ class AccountValidityRenewServlet(RestServlet):
         self.account_activity_handler = hs.get_account_validity_handler()
         self.auth = hs.get_auth()
 
-    @defer.inlineCallbacks
-    def on_POST(self, request):
-        yield assert_requester_is_admin(self.auth, request)
+    async def on_POST(self, request):
+        await assert_requester_is_admin(self.auth, request)
 
         body = parse_json_object_from_request(request)
 
         if "user_id" not in body:
             raise SynapseError(400, "Missing property 'user_id' in the request body")
 
-        expiration_ts = yield self.account_activity_handler.renew_account_for_user(
+        expiration_ts = await self.account_activity_handler.renew_account_for_user(
             body["user_id"],
             body.get("expiration_ts"),
             not body.get("enable_renewal_emails", True),

+ 5 - 9
synapse/rest/admin/_base.py

@@ -15,8 +15,6 @@
 
 import re
 
-from twisted.internet import defer
-
 from synapse.api.errors import AuthError
 
 
@@ -42,8 +40,7 @@ def historical_admin_path_patterns(path_regex):
     )
 
 
-@defer.inlineCallbacks
-def assert_requester_is_admin(auth, request):
+async def assert_requester_is_admin(auth, request):
     """Verify that the requester is an admin user
 
     WARNING: MAKE SURE YOU YIELD ON THE RESULT!
@@ -58,12 +55,11 @@ def assert_requester_is_admin(auth, request):
     Raises:
         AuthError if the requester is not an admin
     """
-    requester = yield auth.get_user_by_req(request)
-    yield assert_user_is_admin(auth, requester.user)
+    requester = await auth.get_user_by_req(request)
+    await assert_user_is_admin(auth, requester.user)
 
 
-@defer.inlineCallbacks
-def assert_user_is_admin(auth, user_id):
+async def assert_user_is_admin(auth, user_id):
     """Verify that the given user is an admin user
 
     WARNING: MAKE SURE YOU YIELD ON THE RESULT!
@@ -79,6 +75,6 @@ def assert_user_is_admin(auth, user_id):
         AuthError if the user is not an admin
     """
 
-    is_admin = yield auth.is_server_admin(user_id)
+    is_admin = await auth.is_server_admin(user_id)
     if not is_admin:
         raise AuthError(403, "You are not a server admin")

+ 11 - 16
synapse/rest/admin/media.py

@@ -16,8 +16,6 @@
 
 import logging
 
-from twisted.internet import defer
-
 from synapse.api.errors import AuthError
 from synapse.http.servlet import RestServlet, parse_integer
 from synapse.rest.admin._base import (
@@ -40,12 +38,11 @@ class QuarantineMediaInRoom(RestServlet):
         self.store = hs.get_datastore()
         self.auth = hs.get_auth()
 
-    @defer.inlineCallbacks
-    def on_POST(self, request, room_id):
-        requester = yield self.auth.get_user_by_req(request)
-        yield assert_user_is_admin(self.auth, requester.user)
+    async def on_POST(self, request, room_id):
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
 
-        num_quarantined = yield self.store.quarantine_media_ids_in_room(
+        num_quarantined = await self.store.quarantine_media_ids_in_room(
             room_id, requester.user.to_string()
         )
 
@@ -62,14 +59,13 @@ class ListMediaInRoom(RestServlet):
         self.store = hs.get_datastore()
         self.auth = hs.get_auth()
 
-    @defer.inlineCallbacks
-    def on_GET(self, request, room_id):
-        requester = yield self.auth.get_user_by_req(request)
-        is_admin = yield self.auth.is_server_admin(requester.user)
+    async def on_GET(self, request, room_id):
+        requester = await self.auth.get_user_by_req(request)
+        is_admin = await self.auth.is_server_admin(requester.user)
         if not is_admin:
             raise AuthError(403, "You are not a server admin")
 
-        local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id)
+        local_mxcs, remote_mxcs = await self.store.get_media_mxcs_in_room(room_id)
 
         return 200, {"local": local_mxcs, "remote": remote_mxcs}
 
@@ -81,14 +77,13 @@ class PurgeMediaCacheRestServlet(RestServlet):
         self.media_repository = hs.get_media_repository()
         self.auth = hs.get_auth()
 
-    @defer.inlineCallbacks
-    def on_POST(self, request):
-        yield assert_requester_is_admin(self.auth, request)
+    async def on_POST(self, request):
+        await assert_requester_is_admin(self.auth, request)
 
         before_ts = parse_integer(request, "before_ts", required=True)
         logger.info("before_ts: %r", before_ts)
 
-        ret = yield self.media_repository.delete_old_remote_media(before_ts)
+        ret = await self.media_repository.delete_old_remote_media(before_ts)
 
         return 200, ret
 

+ 3 - 6
synapse/rest/admin/server_notice_servlet.py

@@ -14,8 +14,6 @@
 # limitations under the License.
 import re
 
-from twisted.internet import defer
-
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.http.servlet import (
@@ -69,9 +67,8 @@ class SendServerNoticeServlet(RestServlet):
             self.__class__.__name__,
         )
 
-    @defer.inlineCallbacks
-    def on_POST(self, request, txn_id=None):
-        yield assert_requester_is_admin(self.auth, request)
+    async def on_POST(self, request, txn_id=None):
+        await assert_requester_is_admin(self.auth, request)
         body = parse_json_object_from_request(request)
         assert_params_in_dict(body, ("user_id", "content"))
         event_type = body.get("type", EventTypes.Message)
@@ -85,7 +82,7 @@ class SendServerNoticeServlet(RestServlet):
         if not self.hs.is_mine_id(user_id):
             raise SynapseError(400, "Server notices can only be sent to local users")
 
-        event = yield self.snm.send_notice(
+        event = await self.snm.send_notice(
             user_id=body["user_id"],
             type=event_type,
             state_key=state_key,

+ 7 - 11
synapse/rest/admin/users.py

@@ -14,8 +14,6 @@
 # limitations under the License.
 import re
 
-from twisted.internet import defer
-
 from synapse.api.errors import SynapseError
 from synapse.http.servlet import (
     RestServlet,
@@ -59,24 +57,22 @@ class UserAdminServlet(RestServlet):
         self.auth = hs.get_auth()
         self.handlers = hs.get_handlers()
 
-    @defer.inlineCallbacks
-    def on_GET(self, request, user_id):
-        yield assert_requester_is_admin(self.auth, request)
+    async def on_GET(self, request, user_id):
+        await assert_requester_is_admin(self.auth, request)
 
         target_user = UserID.from_string(user_id)
 
         if not self.hs.is_mine(target_user):
             raise SynapseError(400, "Only local users can be admins of this homeserver")
 
-        is_admin = yield self.handlers.admin_handler.get_user_server_admin(target_user)
+        is_admin = await self.handlers.admin_handler.get_user_server_admin(target_user)
         is_admin = bool(is_admin)
 
         return 200, {"admin": is_admin}
 
-    @defer.inlineCallbacks
-    def on_PUT(self, request, user_id):
-        requester = yield self.auth.get_user_by_req(request)
-        yield assert_user_is_admin(self.auth, requester.user)
+    async def on_PUT(self, request, user_id):
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
         auth_user = requester.user
 
         target_user = UserID.from_string(user_id)
@@ -93,7 +89,7 @@ class UserAdminServlet(RestServlet):
         if target_user == auth_user and not set_admin_to:
             raise SynapseError(400, "You may not demote yourself.")
 
-        yield self.handlers.admin_handler.set_user_server_admin(
+        await self.handlers.admin_handler.set_user_server_admin(
             target_user, set_admin_to
         )
 

+ 29 - 0
synapse/util/async_helpers.py

@@ -21,6 +21,8 @@ from typing import Dict, Sequence, Set, Union
 
 from six.moves import range
 
+import attr
+
 from twisted.internet import defer
 from twisted.internet.defer import CancelledError
 from twisted.python import failure
@@ -483,3 +485,30 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
     deferred.addCallbacks(success_cb, failure_cb)
 
     return new_d
+
+
+@attr.s(slots=True, frozen=True)
+class DoneAwaitable(object):
+    """Simple awaitable that returns the provided value.
+    """
+
+    value = attr.ib()
+
+    def __await__(self):
+        return self
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        raise StopIteration(self.value)
+
+
+def maybe_awaitable(value):
+    """Convert a value to an awaitable if not already an awaitable.
+    """
+
+    if hasattr(value, "__await__"):
+        return value
+
+    return DoneAwaitable(value)

+ 39 - 0
tests/handlers/test_presence.py

@@ -22,6 +22,7 @@ from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.events import room_version_to_event_format
 from synapse.events.builder import EventBuilder
 from synapse.handlers.presence import (
+    EXTERNAL_PROCESS_EXPIRY,
     FEDERATION_PING_INTERVAL,
     FEDERATION_TIMEOUT,
     IDLE_TIMER,
@@ -413,6 +414,44 @@ class PresenceTimeoutTestCase(unittest.TestCase):
         self.assertEquals(state, new_state)
 
 
+class PresenceHandlerTestCase(unittest.HomeserverTestCase):
+    def prepare(self, reactor, clock, hs):
+        self.presence_handler = hs.get_presence_handler()
+        self.clock = hs.get_clock()
+
+    def test_external_process_timeout(self):
+        """Test that if an external process doesn't update the records for a while
+        we time out their syncing users presence.
+        """
+        process_id = 1
+        user_id = "@test:server"
+
+        # Notify handler that a user is now syncing.
+        self.get_success(
+            self.presence_handler.update_external_syncs_row(
+                process_id, user_id, True, self.clock.time_msec()
+            )
+        )
+
+        # Check that if we wait a while without telling the handler the user has
+        # stopped syncing that their presence state doesn't get timed out.
+        self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2)
+
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        self.assertEqual(state.state, PresenceState.ONLINE)
+
+        # Check that if the external process timeout fires, then the syncing
+        # user gets timed out
+        self.reactor.advance(EXTERNAL_PROCESS_EXPIRY)
+
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        self.assertEqual(state.state, PresenceState.OFFLINE)
+
+
 class PresenceJoinTestCase(unittest.HomeserverTestCase):
     """Tests remote servers get told about presence of users in the room when
     they join and when new local users join.

+ 1 - 1
tox.ini

@@ -161,7 +161,7 @@ basepython = python3.7
 skip_install = True
 deps =
     {[base]deps}
-    mypy
+    mypy==0.730
     mypy-zope
 env =
     MYPYPATH = stubs/