Browse Source

Merge remote-tracking branch 'origin/develop' into dmr/pyproject-poetry

David Robertson 2 years ago
parent
commit
443b94f0d5

+ 4 - 5
.github/workflows/twisted_trunk.yml

@@ -65,16 +65,15 @@ jobs:
     steps:
       - uses: actions/checkout@v2
       - name: Patch dependencies
-        # Note: The script below runs poetry against a virtualenv in /src/.venv/
-        #       ...but the sytest-synapse container expects it to be in /venv/
-        #       So we temporarily symlink it, run poetry, then remove the link.
-        #       Tidying up is important because `/src` is a mounted volume.
+        # Note: The poetry commands want to create a virtualenv in /src/.venv/,
+        #       but the sytest-synapse container expects it to be in /venv/.
+        #       We symlink it before running poetry so that poetry actually
+        #       ends up installing to `/venv`.
         run: |
           ln -s -T /venv /src/.venv
           poetry remove twisted
           poetry add --extras tls git+https://github.com/twisted/twisted.git#trunk
           poetry install --no-interaction --extras "all test"
-          rm /src/.venv
         working-directory: /src
       - name: Run SyTest
         run: /bootstrap.sh synapse

+ 1 - 0
changelog.d/12213.bugfix

@@ -0,0 +1 @@
+Prevent a sync request from removing a user's busy presence status.

+ 1 - 0
changelog.d/12319.bugfix

@@ -0,0 +1 @@
+Fix bug with incremental sync missing events when rejoining/backfilling. Contributed by Nick @ Beeper.

+ 1 - 0
changelog.d/12340.doc

@@ -0,0 +1 @@
+Fix rendering of the documentation site when using the 'print' feature.

+ 1 - 0
changelog.d/12425.misc

@@ -0,0 +1 @@
+Run twisted trunk CI job in the locked poetry environment.

+ 1 - 0
changelog.d/12441.misc

@@ -0,0 +1 @@
+Bump twisted version in `poetry.lock` to work around [pip bug #9644](https://github.com/pypa/pip/issues/9644).

+ 1 - 0
changelog.d/12445.misc

@@ -0,0 +1 @@
+Change Mutual Rooms' `unstable_features` flag to `uk.half-shot.msc2666.mutual_rooms` which matches the current MSC iteration.

+ 1 - 0
changelog.d/12454.misc

@@ -0,0 +1 @@
+Limit length of device_id to less than 512 characters.

+ 1 - 0
changelog.d/12466.misc

@@ -0,0 +1 @@
+Dockerfile-workers: give the master its own log config.

+ 31 - 17
docker/configure_workers_and_start.py

@@ -29,7 +29,7 @@
 import os
 import subprocess
 import sys
-from typing import Any, Dict, Set
+from typing import Any, Dict, Mapping, Set
 
 import jinja2
 import yaml
@@ -341,7 +341,7 @@ def generate_worker_files(environ, config_path: str, data_dir: str):
     # base shared worker jinja2 template.
     #
     # This config file will be passed to all workers, included Synapse's main process.
-    shared_config = {"listeners": listeners}
+    shared_config: Dict[str, Any] = {"listeners": listeners}
 
     # The supervisord config. The contents of which will be inserted into the
     # base supervisord jinja2 template.
@@ -446,21 +446,7 @@ def generate_worker_files(environ, config_path: str, data_dir: str):
 
         # Write out the worker's logging config file
 
-        # Check whether we should write worker logs to disk, in addition to the console
-        extra_log_template_args = {}
-        if environ.get("SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK"):
-            extra_log_template_args["LOG_FILE_PATH"] = "{dir}/logs/{name}.log".format(
-                dir=data_dir, name=worker_name
-            )
-
-        # Render and write the file
-        log_config_filepath = "/conf/workers/{name}.log.config".format(name=worker_name)
-        convert(
-            "/conf/log.config",
-            log_config_filepath,
-            worker_name=worker_name,
-            **extra_log_template_args,
-        )
+        log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
 
         # Then a worker config file
         convert(
@@ -496,6 +482,10 @@ def generate_worker_files(environ, config_path: str, data_dir: str):
 
     # Finally, we'll write out the config files.
 
+    # log config for the master process
+    master_log_config = generate_worker_log_config(environ, "master", data_dir)
+    shared_config["log_config"] = master_log_config
+
     # Shared homeserver config
     convert(
         "/conf/shared.yaml.j2",
@@ -532,6 +522,30 @@ def generate_worker_files(environ, config_path: str, data_dir: str):
         os.mkdir(log_dir)
 
 
+def generate_worker_log_config(
+    environ: Mapping[str, str], worker_name: str, data_dir: str
+) -> str:
+    """Generate a log.config file for the given worker.
+
+    Returns: the path to the generated file
+    """
+    # Check whether we should write worker logs to disk, in addition to the console
+    extra_log_template_args = {}
+    if environ.get("SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK"):
+        extra_log_template_args["LOG_FILE_PATH"] = "{dir}/logs/{name}.log".format(
+            dir=data_dir, name=worker_name
+        )
+    # Render and write the file
+    log_config_filepath = "/conf/workers/{name}.log.config".format(name=worker_name)
+    convert(
+        "/conf/log.config",
+        log_config_filepath,
+        worker_name=worker_name,
+        **extra_log_template_args,
+    )
+    return log_config_filepath
+
+
 def start_supervisord():
     """Starts up supervisord which then starts and monitors all other necessary processes
 

+ 14 - 0
docs/website_files/table-of-contents.js

@@ -75,6 +75,20 @@ function setTocEntry() {
  * Populate sidebar on load
  */
 window.addEventListener('load', () => {
+    // Prevent rendering the table of contents of the "print book" page, as it
+    // will end up being rendered into the output (in a broken-looking way)
+
+    // Get the name of the current page (i.e. 'print.html')
+    const pageNameExtension = window.location.pathname.split('/').pop();
+
+    // Split off the extension (as '.../print' is also a valid page name), which
+    // should result in 'print'
+    const pageName = pageNameExtension.split('.')[0];
+    if (pageName === "print") {
+        // Don't render the table of contents on this page
+        return;
+    }
+
     // Only create table of contents if there is more than one header on the page
     if (headers.length <= 1) {
         return;

+ 10 - 9
poetry.lock

@@ -1285,7 +1285,7 @@ urllib3 = ">=1.26.0"
 
 [[package]]
 name = "twisted"
-version = "22.2.0"
+version = "22.4.0"
 description = "An asynchronous networking framework written in Python"
 category = "main"
 optional = false
@@ -1305,19 +1305,20 @@ typing-extensions = ">=3.6.5"
 "zope.interface" = ">=4.4.2"
 
 [package.extras]
-all_non_platform = ["cython-test-exception-raiser (>=1.0.2,<2)", "PyHamcrest (>=1.9.0)", "pyopenssl (>=16.0.0)", "service-identity (>=18.1.0)", "idna (>=2.4)", "pyasn1", "cryptography (>=2.6)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "pyserial (>=3.0)", "h2 (>=3.0,<4.0)", "priority (>=1.1.0,<2.0)", "pywin32 (!=226)", "contextvars (>=2.4,<3)"]
+all_non_platform = ["cython-test-exception-raiser (>=1.0.2,<2)", "PyHamcrest (>=1.9.0)", "pyopenssl (>=16.0.0)", "service-identity (>=18.1.0)", "idna (>=2.4)", "pyasn1", "cryptography (>=2.6)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "pyserial (>=3.0)", "h2 (>=3.0,<5.0)", "priority (>=1.1.0,<2.0)", "pywin32 (!=226)", "contextvars (>=2.4,<3)"]
 conch = ["pyasn1", "cryptography (>=2.6)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)"]
+conch_nacl = ["pyasn1", "cryptography (>=2.6)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "pynacl"]
 contextvars = ["contextvars (>=2.4,<3)"]
 dev = ["towncrier (>=19.2,<20.0)", "sphinx-rtd-theme (>=0.5,<1.0)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "sphinx (>=4.1.2,<6)", "pyflakes (>=2.2,<3.0)", "twistedchecker (>=0.7,<1.0)", "coverage (>=6b1,<7)", "python-subunit (>=1.4,<2.0)", "pydoctor (>=21.9.0,<21.10.0)"]
 dev_release = ["towncrier (>=19.2,<20.0)", "sphinx-rtd-theme (>=0.5,<1.0)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "sphinx (>=4.1.2,<6)", "pydoctor (>=21.9.0,<21.10.0)"]
-http2 = ["h2 (>=3.0,<4.0)", "priority (>=1.1.0,<2.0)"]
-macos_platform = ["pyobjc-core", "pyobjc-framework-cfnetwork", "pyobjc-framework-cocoa", "cython-test-exception-raiser (>=1.0.2,<2)", "PyHamcrest (>=1.9.0)", "pyopenssl (>=16.0.0)", "service-identity (>=18.1.0)", "idna (>=2.4)", "pyasn1", "cryptography (>=2.6)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "pyserial (>=3.0)", "h2 (>=3.0,<4.0)", "priority (>=1.1.0,<2.0)", "pywin32 (!=226)", "contextvars (>=2.4,<3)"]
-mypy = ["mypy (==0.930)", "mypy-zope (==0.3.4)", "types-setuptools", "types-pyopenssl", "towncrier (>=19.2,<20.0)", "sphinx-rtd-theme (>=0.5,<1.0)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "sphinx (>=4.1.2,<6)", "pyflakes (>=2.2,<3.0)", "twistedchecker (>=0.7,<1.0)", "coverage (>=6b1,<7)", "cython-test-exception-raiser (>=1.0.2,<2)", "PyHamcrest (>=1.9.0)", "pyopenssl (>=16.0.0)", "service-identity (>=18.1.0)", "idna (>=2.4)", "pyasn1", "cryptography (>=2.6)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "pyserial (>=3.0)", "h2 (>=3.0,<4.0)", "priority (>=1.1.0,<2.0)", "pywin32 (!=226)", "python-subunit (>=1.4,<2.0)", "contextvars (>=2.4,<3)", "pydoctor (>=21.9.0,<21.10.0)"]
-osx_platform = ["pyobjc-core", "pyobjc-framework-cfnetwork", "pyobjc-framework-cocoa", "cython-test-exception-raiser (>=1.0.2,<2)", "PyHamcrest (>=1.9.0)", "pyopenssl (>=16.0.0)", "service-identity (>=18.1.0)", "idna (>=2.4)", "pyasn1", "cryptography (>=2.6)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "pyserial (>=3.0)", "h2 (>=3.0,<4.0)", "priority (>=1.1.0,<2.0)", "pywin32 (!=226)", "contextvars (>=2.4,<3)"]
+http2 = ["h2 (>=3.0,<5.0)", "priority (>=1.1.0,<2.0)"]
+macos_platform = ["pyobjc-core", "pyobjc-framework-cfnetwork", "pyobjc-framework-cocoa", "cython-test-exception-raiser (>=1.0.2,<2)", "PyHamcrest (>=1.9.0)", "pyopenssl (>=16.0.0)", "service-identity (>=18.1.0)", "idna (>=2.4)", "pyasn1", "cryptography (>=2.6)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "pyserial (>=3.0)", "h2 (>=3.0,<5.0)", "priority (>=1.1.0,<2.0)", "pywin32 (!=226)", "contextvars (>=2.4,<3)"]
+mypy = ["mypy (==0.930)", "mypy-zope (==0.3.4)", "types-setuptools", "types-pyopenssl", "towncrier (>=19.2,<20.0)", "sphinx-rtd-theme (>=0.5,<1.0)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "sphinx (>=4.1.2,<6)", "pyflakes (>=2.2,<3.0)", "twistedchecker (>=0.7,<1.0)", "coverage (>=6b1,<7)", "cython-test-exception-raiser (>=1.0.2,<2)", "PyHamcrest (>=1.9.0)", "pyopenssl (>=16.0.0)", "service-identity (>=18.1.0)", "idna (>=2.4)", "pyasn1", "cryptography (>=2.6)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "pyserial (>=3.0)", "h2 (>=3.0,<5.0)", "priority (>=1.1.0,<2.0)", "pynacl", "pywin32 (!=226)", "python-subunit (>=1.4,<2.0)", "contextvars (>=2.4,<3)", "pydoctor (>=21.9.0,<21.10.0)"]
+osx_platform = ["pyobjc-core", "pyobjc-framework-cfnetwork", "pyobjc-framework-cocoa", "cython-test-exception-raiser (>=1.0.2,<2)", "PyHamcrest (>=1.9.0)", "pyopenssl (>=16.0.0)", "service-identity (>=18.1.0)", "idna (>=2.4)", "pyasn1", "cryptography (>=2.6)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "pyserial (>=3.0)", "h2 (>=3.0,<5.0)", "priority (>=1.1.0,<2.0)", "pywin32 (!=226)", "contextvars (>=2.4,<3)"]
 serial = ["pyserial (>=3.0)", "pywin32 (!=226)"]
 test = ["cython-test-exception-raiser (>=1.0.2,<2)", "PyHamcrest (>=1.9.0)"]
 tls = ["pyopenssl (>=16.0.0)", "service-identity (>=18.1.0)", "idna (>=2.4)"]
-windows_platform = ["pywin32 (!=226)", "cython-test-exception-raiser (>=1.0.2,<2)", "PyHamcrest (>=1.9.0)", "pyopenssl (>=16.0.0)", "service-identity (>=18.1.0)", "idna (>=2.4)", "pyasn1", "cryptography (>=2.6)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "pyserial (>=3.0)", "h2 (>=3.0,<4.0)", "priority (>=1.1.0,<2.0)", "pywin32 (!=226)", "contextvars (>=2.4,<3)"]
+windows_platform = ["pywin32 (!=226)", "cython-test-exception-raiser (>=1.0.2,<2)", "PyHamcrest (>=1.9.0)", "pyopenssl (>=16.0.0)", "service-identity (>=18.1.0)", "idna (>=2.4)", "pyasn1", "cryptography (>=2.6)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "pyserial (>=3.0)", "h2 (>=3.0,<5.0)", "priority (>=1.1.0,<2.0)", "pywin32 (!=226)", "contextvars (>=2.4,<3)"]
 
 [[package]]
 name = "twisted-iocpsupport"
@@ -2592,8 +2593,8 @@ twine = [
     {file = "twine-3.8.0.tar.gz", hash = "sha256:8efa52658e0ae770686a13b675569328f1fba9837e5de1867bfe5f46a9aefe19"},
 ]
 twisted = [
-    {file = "Twisted-22.2.0-py3-none-any.whl", hash = "sha256:5c63c149eb6b8fe1e32a0215b1cef96fabdba04f705d8efb9174b1ccf5b49d49"},
-    {file = "Twisted-22.2.0.tar.gz", hash = "sha256:57f32b1f6838facb8c004c89467840367ad38e9e535f8252091345dba500b4f2"},
+    {file = "Twisted-22.4.0-py3-none-any.whl", hash = "sha256:f9f7a91f94932477a9fc3b169d57f54f96c6e74a23d78d9ce54039a7f48928a2"},
+    {file = "Twisted-22.4.0.tar.gz", hash = "sha256:a047990f57dfae1e0bd2b7df2526d4f16dcdc843774dc108b78c52f2a5f13680"},
 ]
 twisted-iocpsupport = [
     {file = "twisted-iocpsupport-1.0.2.tar.gz", hash = "sha256:72068b206ee809c9c596b57b5287259ea41ddb4774d86725b19f35bf56aa32a9"},

+ 4 - 2
synapse/handlers/events.py

@@ -16,7 +16,7 @@ import logging
 import random
 from typing import TYPE_CHECKING, Iterable, List, Optional
 
-from synapse.api.constants import EduTypes, EventTypes, Membership
+from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState
 from synapse.api.errors import AuthError, SynapseError
 from synapse.events import EventBase
 from synapse.events.utils import SerializeEventConfig
@@ -67,7 +67,9 @@ class EventStreamHandler:
         presence_handler = self.hs.get_presence_handler()
 
         context = await presence_handler.user_syncing(
-            auth_user_id, affect_presence=affect_presence
+            auth_user_id,
+            affect_presence=affect_presence,
+            presence_state=PresenceState.ONLINE,
         )
         with context:
             if timeout:

+ 5 - 9
synapse/handlers/message.py

@@ -175,17 +175,13 @@ class MessageHandler:
         state_filter = state_filter or StateFilter.all()
 
         if at_token:
-            # FIXME this claims to get the state at a stream position, but
-            # get_recent_events_for_room operates by topo ordering. This therefore
-            # does not reliably give you the state at the given stream position.
-            # (https://github.com/matrix-org/synapse/issues/3305)
-            last_events, _ = await self.store.get_recent_events_for_room(
-                room_id, end_token=at_token.room_key, limit=1
+            last_event = await self.store.get_last_event_in_room_before_stream_ordering(
+                room_id,
+                end_token=at_token.room_key,
             )
 
-            if not last_events:
+            if not last_event:
                 raise NotFoundError("Can't find event for token %s" % (at_token,))
-            last_event = last_events[0]
 
             # check whether the user is in the room at that time to determine
             # whether they should be treated as peeking.
@@ -204,7 +200,7 @@ class MessageHandler:
             visible_events = await filter_events_for_client(
                 self.storage,
                 user_id,
-                last_events,
+                [last_event],
                 filter_send_to_client=False,
                 is_peeking=is_peeking,
             )

+ 46 - 10
synapse/handlers/presence.py

@@ -151,7 +151,7 @@ class BasePresenceHandler(abc.ABC):
 
     @abc.abstractmethod
     async def user_syncing(
-        self, user_id: str, affect_presence: bool
+        self, user_id: str, affect_presence: bool, presence_state: str
     ) -> ContextManager[None]:
         """Returns a context manager that should surround any stream requests
         from the user.
@@ -165,6 +165,7 @@ class BasePresenceHandler(abc.ABC):
             affect_presence: If false this function will be a no-op.
                 Useful for streams that are not associated with an actual
                 client that is being used by a user.
+            presence_state: The presence state indicated in the sync request
         """
 
     @abc.abstractmethod
@@ -228,6 +229,11 @@ class BasePresenceHandler(abc.ABC):
 
         return states
 
+    async def current_state_for_user(self, user_id: str) -> UserPresenceState:
+        """Get the current presence state for a user."""
+        res = await self.current_state_for_users([user_id])
+        return res[user_id]
+
     @abc.abstractmethod
     async def set_state(
         self,
@@ -461,7 +467,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
                 self.send_user_sync(user_id, False, last_sync_ms)
 
     async def user_syncing(
-        self, user_id: str, affect_presence: bool
+        self, user_id: str, affect_presence: bool, presence_state: str
     ) -> ContextManager[None]:
         """Record that a user is syncing.
 
@@ -471,6 +477,17 @@ class WorkerPresenceHandler(BasePresenceHandler):
         if not affect_presence or not self._presence_enabled:
             return _NullContextManager()
 
+        prev_state = await self.current_state_for_user(user_id)
+        if prev_state != PresenceState.BUSY:
+            # We set state here but pass ignore_status_msg = True as we don't want to
+            # cause the status message to be cleared.
+            # Note that this causes last_active_ts to be incremented which is not
+            # what the spec wants: see comment in the BasePresenceHandler version
+            # of this function.
+            await self.set_state(
+                UserID.from_string(user_id), {"presence": presence_state}, True
+            )
+
         curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
         self._user_to_num_current_syncs[user_id] = curr_sync + 1
 
@@ -942,7 +959,10 @@ class PresenceHandler(BasePresenceHandler):
         await self._update_states([prev_state.copy_and_replace(**new_fields)])
 
     async def user_syncing(
-        self, user_id: str, affect_presence: bool = True
+        self,
+        user_id: str,
+        affect_presence: bool = True,
+        presence_state: str = PresenceState.ONLINE,
     ) -> ContextManager[None]:
         """Returns a context manager that should surround any stream requests
         from the user.
@@ -956,6 +976,7 @@ class PresenceHandler(BasePresenceHandler):
             affect_presence: If false this function will be a no-op.
                 Useful for streams that are not associated with an actual
                 client that is being used by a user.
+            presence_state: The presence state indicated in the sync request
         """
         # Override if it should affect the user's presence, if presence is
         # disabled.
@@ -967,9 +988,25 @@ class PresenceHandler(BasePresenceHandler):
             self.user_to_num_current_syncs[user_id] = curr_sync + 1
 
             prev_state = await self.current_state_for_user(user_id)
+
+            # If they're busy then they don't stop being busy just by syncing,
+            # so just update the last sync time.
+            if prev_state.state != PresenceState.BUSY:
+                # XXX: We set_state separately here and just update the last_active_ts above
+                # This keeps the logic as similar as possible between the worker and single
+                # process modes. Using set_state will actually cause last_active_ts to be
+                # updated always, which is not what the spec calls for, but synapse has done
+                # this for... forever, I think.
+                await self.set_state(
+                    UserID.from_string(user_id), {"presence": presence_state}, True
+                )
+                # Retrieve the new state for the logic below. This should come from the
+                # in-memory cache.
+                prev_state = await self.current_state_for_user(user_id)
+
+            # To keep the single process behaviour consistent with worker mode, run the
+            # same logic as `update_external_syncs_row`, even though it looks weird.
             if prev_state.state == PresenceState.OFFLINE:
-                # If they're currently offline then bring them online, otherwise
-                # just update the last sync times.
                 await self._update_states(
                     [
                         prev_state.copy_and_replace(
@@ -979,6 +1016,10 @@ class PresenceHandler(BasePresenceHandler):
                         )
                     ]
                 )
+            # otherwise, set the new presence state & update the last sync time,
+            # but don't update last_active_ts as this isn't an indication that
+            # they've been active (even though it's probably been updated by
+            # set_state above)
             else:
                 await self._update_states(
                     [
@@ -1086,11 +1127,6 @@ class PresenceHandler(BasePresenceHandler):
             )
             self.external_process_last_updated_ms.pop(process_id, None)
 
-    async def current_state_for_user(self, user_id: str) -> UserPresenceState:
-        """Get the current presence state for a user."""
-        res = await self.current_state_for_users([user_id])
-        return res[user_id]
-
     async def _persist_and_notify(self, states: List[UserPresenceState]) -> None:
         """Persist states in the database, poke the notifier and send to
         interested remote servers

+ 7 - 8
synapse/handlers/sync.py

@@ -661,16 +661,15 @@ class SyncHandler:
             stream_position: point at which to get state
             state_filter: The state filter used to fetch state from the database.
         """
-        # FIXME this claims to get the state at a stream position, but
-        # get_recent_events_for_room operates by topo ordering. This therefore
-        # does not reliably give you the state at the given stream position.
-        # (https://github.com/matrix-org/synapse/issues/3305)
-        last_events, _ = await self.store.get_recent_events_for_room(
-            room_id, end_token=stream_position.room_key, limit=1
+        # FIXME: This gets the state at the latest event before the stream ordering,
+        # which might not be the same as the "current state" of the room at the time
+        # of the stream token if there were multiple forward extremities at the time.
+        last_event = await self.store.get_last_event_in_room_before_stream_ordering(
+            room_id,
+            end_token=stream_position.room_key,
         )
 
-        if last_events:
-            last_event = last_events[-1]
+        if last_event:
             state = await self.get_state_after_event(
                 last_event, state_filter=state_filter or StateFilter.all()
             )

+ 9 - 0
synapse/rest/client/login.py

@@ -342,6 +342,15 @@ class LoginRestServlet(RestServlet):
             user_id = canonical_uid
 
         device_id = login_submission.get("device_id")
+
+        # If device_id is present, check that device_id is not longer than a reasonable 512 characters
+        if device_id and len(device_id) > 512:
+            raise LoginError(
+                400,
+                "device_id cannot be longer than 512 characters.",
+                errcode=Codes.INVALID_PARAM,
+            )
+
         initial_display_name = login_submission.get("initial_device_display_name")
         (
             device_id,

+ 3 - 6
synapse/rest/client/sync.py

@@ -180,13 +180,10 @@ class SyncRestServlet(RestServlet):
 
         affect_presence = set_presence != PresenceState.OFFLINE
 
-        if affect_presence:
-            await self.presence_handler.set_state(
-                user, {"presence": set_presence}, True
-            )
-
         context = await self.presence_handler.user_syncing(
-            user.to_string(), affect_presence=affect_presence
+            user.to_string(),
+            affect_presence=affect_presence,
+            presence_state=set_presence,
         )
         with context:
             sync_result = await self.sync_handler.wait_for_sync_for_user(

+ 1 - 1
synapse/rest/client/versions.py

@@ -86,7 +86,7 @@ class VersionsRestServlet(RestServlet):
                     # Implements additional endpoints as described in MSC2432
                     "org.matrix.msc2432": True,
                     # Implements additional endpoints as described in MSC2666
-                    "uk.half-shot.msc2666": True,
+                    "uk.half-shot.msc2666.mutual_rooms": True,
                     # Whether new rooms will be set to encrypted or not (based on presets).
                     "io.element.e2ee_forced.public": self.e2ee_forced_public,
                     "io.element.e2ee_forced.private": self.e2ee_forced_private,

+ 26 - 0
synapse/storage/databases/main/stream.py

@@ -758,6 +758,32 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "get_room_event_before_stream_ordering", _f
         )
 
+    async def get_last_event_in_room_before_stream_ordering(
+        self,
+        room_id: str,
+        end_token: RoomStreamToken,
+    ) -> Optional[EventBase]:
+        """Returns the last event in a room at or before a stream ordering
+
+        Args:
+            room_id
+            end_token: The token used to stream from
+
+        Returns:
+            The most recent event.
+        """
+
+        last_row = await self.get_room_event_before_stream_ordering(
+            room_id=room_id,
+            stream_ordering=end_token.stream,
+        )
+        if last_row:
+            _, _, event_id = last_row
+            event = await self.get_event(event_id, get_prev_content=True)
+            return event
+
+        return None
+
     async def get_current_room_stream_token_for_room_id(
         self, room_id: Optional[str] = None
     ) -> RoomStreamToken:

+ 79 - 0
tests/handlers/test_presence.py

@@ -657,6 +657,85 @@ class PresenceHandlerTestCase(unittest.HomeserverTestCase):
         # Mark user as online and `status_msg = None`
         self._set_presencestate_with_status_msg(user_id, PresenceState.ONLINE, None)
 
+    def test_set_presence_from_syncing_not_set(self):
+        """Test that presence is not set by syncing if affect_presence is false"""
+        user_id = "@test:server"
+        status_msg = "I'm here!"
+
+        self._set_presencestate_with_status_msg(
+            user_id, PresenceState.UNAVAILABLE, status_msg
+        )
+
+        self.get_success(
+            self.presence_handler.user_syncing(user_id, False, PresenceState.ONLINE)
+        )
+
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        # we should still be unavailable
+        self.assertEqual(state.state, PresenceState.UNAVAILABLE)
+        # and status message should still be the same
+        self.assertEqual(state.status_msg, status_msg)
+
+    def test_set_presence_from_syncing_is_set(self):
+        """Test that presence is set by syncing if affect_presence is true"""
+        user_id = "@test:server"
+        status_msg = "I'm here!"
+
+        self._set_presencestate_with_status_msg(
+            user_id, PresenceState.UNAVAILABLE, status_msg
+        )
+
+        self.get_success(
+            self.presence_handler.user_syncing(user_id, True, PresenceState.ONLINE)
+        )
+
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        # we should now be online
+        self.assertEqual(state.state, PresenceState.ONLINE)
+
+    def test_set_presence_from_syncing_keeps_status(self):
+        """Test that presence set by syncing retains status message"""
+        user_id = "@test:server"
+        status_msg = "I'm here!"
+
+        self._set_presencestate_with_status_msg(
+            user_id, PresenceState.UNAVAILABLE, status_msg
+        )
+
+        self.get_success(
+            self.presence_handler.user_syncing(user_id, True, PresenceState.ONLINE)
+        )
+
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        # our status message should be the same as it was before
+        self.assertEqual(state.status_msg, status_msg)
+
+    def test_set_presence_from_syncing_keeps_busy(self):
+        """Test that presence set by syncing doesn't affect busy status"""
+        # while this isn't the default
+        self.presence_handler._busy_presence_enabled = True
+
+        user_id = "@test:server"
+        status_msg = "I'm busy!"
+
+        self._set_presencestate_with_status_msg(user_id, PresenceState.BUSY, status_msg)
+
+        self.get_success(
+            self.presence_handler.user_syncing(user_id, True, PresenceState.ONLINE)
+        )
+
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        # we should still be busy
+        self.assertEqual(state.state, PresenceState.BUSY)
+
     def _set_presencestate_with_status_msg(
         self, user_id: str, state: str, status_msg: Optional[str]
     ):

+ 26 - 1
tests/rest/client/test_login.py

@@ -11,7 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import json
 import time
 import urllib.parse
 from typing import Any, Dict, List, Optional, Union
@@ -384,6 +384,31 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase):
         channel = self.make_request(b"POST", "/logout/all", access_token=access_token)
         self.assertEqual(channel.result["code"], b"200", channel.result)
 
+    def test_login_with_overly_long_device_id_fails(self) -> None:
+        self.register_user("mickey", "cheese")
+
+        # create a device_id longer than 512 characters
+        device_id = "yolo" * 512
+
+        body = {
+            "type": "m.login.password",
+            "user": "mickey",
+            "password": "cheese",
+            "device_id": device_id,
+        }
+
+        # make a login request with the bad device_id
+        channel = self.make_request(
+            "POST",
+            "/_matrix/client/v3/login",
+            json.dumps(body).encode("utf8"),
+            custom_headers=None,
+        )
+
+        # test that the login fails with the correct error code
+        self.assertEqual(channel.code, 400)
+        self.assertEqual(channel.json_body["errcode"], "M_INVALID_PARAM")
+
 
 @skip_unless(has_saml2 and HAS_OIDC, "Requires SAML2 and OIDC")
 class MultiSSOTestCase(unittest.HomeserverTestCase):

+ 123 - 2
tests/rest/client/test_room_batch.py

@@ -7,9 +7,9 @@ from twisted.test.proto_helpers import MemoryReactor
 from synapse.api.constants import EventContentFields, EventTypes
 from synapse.appservice import ApplicationService
 from synapse.rest import admin
-from synapse.rest.client import login, register, room, room_batch
+from synapse.rest.client import login, register, room, room_batch, sync
 from synapse.server import HomeServer
-from synapse.types import JsonDict
+from synapse.types import JsonDict, RoomStreamToken
 from synapse.util import Clock
 
 from tests import unittest
@@ -63,6 +63,7 @@ class RoomBatchTestCase(unittest.HomeserverTestCase):
         room.register_servlets,
         register.register_servlets,
         login.register_servlets,
+        sync.register_servlets,
     ]
 
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
@@ -178,3 +179,123 @@ class RoomBatchTestCase(unittest.HomeserverTestCase):
             "Expected a single state_group to be returned by saw state_groups=%s"
             % (state_group_map.keys(),),
         )
+
+    @unittest.override_config({"experimental_features": {"msc2716_enabled": True}})
+    def test_sync_while_batch_importing(self) -> None:
+        """
+        Make sure that /sync correctly returns full room state when a user joins
+        during ongoing batch backfilling.
+        See: https://github.com/matrix-org/synapse/issues/12281
+        """
+        # Create user who will be invited & join room
+        user_id = self.register_user("beep", "test")
+        user_tok = self.login("beep", "test")
+
+        time_before_room = int(self.clock.time_msec())
+
+        # Create a room with some events
+        room_id, _, _, _ = self._create_test_room()
+        # Invite the user
+        self.helper.invite(
+            room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id
+        )
+
+        # Create another room, send a bunch of events to advance the stream token
+        other_room_id = self.helper.create_room_as(
+            self.appservice.sender, tok=self.appservice.token
+        )
+        for _ in range(5):
+            self.helper.send_event(
+                room_id=other_room_id,
+                type=EventTypes.Message,
+                content={"msgtype": "m.text", "body": "C"},
+                tok=self.appservice.token,
+            )
+
+        # Join the room as the normal user
+        self.helper.join(room_id, user_id, tok=user_tok)
+
+        # Create an event to hang the historical batch from - In order to see
+        # the failure case originally reported in #12281, the historical batch
+        # must be hung from the most recent event in the room so the base
+        # insertion event ends up with the highest `topogological_ordering`
+        # (`depth`) in the room but will have a negative `stream_ordering`
+        # because it's a `historical` event. Previously, when assembling the
+        # `state` for the `/sync` response, the bugged logic would sort by
+        # `topological_ordering` descending and pick up the base insertion
+        # event because it has a negative `stream_ordering` below the given
+        # pagination token. Now we properly sort by `stream_ordering`
+        # descending which puts `historical` events with a negative
+        # `stream_ordering` way at the bottom and aren't selected as expected.
+        response = self.helper.send_event(
+            room_id=room_id,
+            type=EventTypes.Message,
+            content={
+                "msgtype": "m.text",
+                "body": "C",
+            },
+            tok=self.appservice.token,
+        )
+        event_to_hang_id = response["event_id"]
+
+        channel = self.make_request(
+            "POST",
+            "/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s"
+            % (room_id, event_to_hang_id),
+            content={
+                "events": _create_message_events_for_batch_send_request(
+                    self.virtual_user_id, time_before_room, 3
+                ),
+                "state_events_at_start": _create_join_state_events_for_batch_send_request(
+                    [self.virtual_user_id], time_before_room
+                ),
+            },
+            access_token=self.appservice.token,
+        )
+        self.assertEqual(channel.code, 200, channel.result)
+
+        # Now we need to find the invite + join events stream tokens so we can sync between
+        main_store = self.hs.get_datastores().main
+        events, next_key = self.get_success(
+            main_store.get_recent_events_for_room(
+                room_id,
+                50,
+                end_token=main_store.get_room_max_token(),
+            ),
+        )
+        invite_event_position = None
+        for event in events:
+            if (
+                event.type == "m.room.member"
+                and event.content["membership"] == "invite"
+            ):
+                invite_event_position = self.get_success(
+                    main_store.get_topological_token_for_event(event.event_id)
+                )
+                break
+
+        assert invite_event_position is not None, "No invite event found"
+
+        # Remove the topological order from the token by re-creating w/stream only
+        invite_event_position = RoomStreamToken(None, invite_event_position.stream)
+
+        # Sync everything after this token
+        since_token = self.get_success(invite_event_position.to_string(main_store))
+        sync_response = self.make_request(
+            "GET",
+            f"/sync?since={since_token}",
+            access_token=user_tok,
+        )
+
+        # Assert that, for this room, the user was considered to have joined and thus
+        # receives the full state history
+        state_event_types = [
+            event["type"]
+            for event in sync_response.json_body["rooms"]["join"][room_id]["state"][
+                "events"
+            ]
+        ]
+
+        assert (
+            "m.room.create" in state_event_types
+        ), "Missing room full state in sync response"