Browse Source

Merge remote-tracking branch 'origin/develop' into dmr/test-mypy-zope-92

David Robertson 1 year ago
parent
commit
0c09fccab1
47 changed files with 1223 additions and 355 deletions
  1. 4 3
      .ci/scripts/prepare_old_deps.sh
  2. 58 19
      .github/workflows/docs.yaml
  3. 2 1
      .gitignore
  4. 11 11
      Cargo.lock
  5. 1 0
      changelog.d/15128.misc
  6. 1 0
      changelog.d/15265.misc
  7. 1 0
      changelog.d/15280.misc
  8. 1 0
      changelog.d/15311.misc
  9. 1 0
      changelog.d/15316.misc
  10. 1 0
      changelog.d/15319.misc
  11. 1 0
      changelog.d/15324.misc
  12. 1 0
      changelog.d/15325.misc
  13. 1 0
      changelog.d/15326.misc
  14. 1 0
      changelog.d/15327.misc
  15. 1 0
      changelog.d/15329.misc
  16. 1 0
      changelog.d/15330.misc
  17. 20 0
      dev-docs/Makefile
  18. 50 0
      dev-docs/conf.py
  19. 22 0
      dev-docs/index.rst
  20. 5 0
      dev-docs/modules/federation_sender.md
  21. 527 183
      poetry.lock
  22. 12 3
      pyproject.toml
  23. 1 0
      scripts-dev/lint.sh
  24. 5 1
      synapse/_scripts/synapse_port_db.py
  25. 32 10
      synapse/events/validator.py
  26. 113 0
      synapse/federation/sender/__init__.py
  27. 6 2
      synapse/handlers/auth.py
  28. 2 0
      synapse/handlers/device.py
  29. 2 2
      synapse/handlers/register.py
  30. 16 6
      synapse/http/servlet.py
  31. 6 1
      synapse/push/__init__.py
  32. 42 16
      synapse/push/pusherpool.py
  33. 0 1
      synapse/rest/admin/users.py
  34. 0 1
      synapse/rest/client/pusher.py
  35. 17 6
      synapse/storage/databases/main/events.py
  36. 4 2
      synapse/storage/databases/main/purge_events.py
  37. 31 9
      synapse/storage/databases/main/pusher.py
  38. 11 4
      synapse/storage/databases/main/user_directory.py
  39. 10 4
      synapse/storage/schema/__init__.py
  40. 20 0
      synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql
  41. 19 0
      synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql
  42. 79 0
      synapse/storage/schema/main/delta/74/02membership_tables_event_stream_ordering_triggers.py
  43. 56 38
      tests/push/test_bulk_push_rule_evaluator.py
  44. 3 3
      tests/push/test_email.py
  45. 21 25
      tests/push/test_http.py
  46. 2 2
      tests/replication/test_pusher_shard.py
  47. 2 2
      tests/rest/admin/test_user.py

+ 4 - 3
.ci/scripts/prepare_old_deps.sh

@@ -35,9 +35,9 @@ sed -i \
 # compatible (as far the package metadata declares, anyway); pip's package resolver
 # is more lax.
 #
-# Rather than `poetry install --no-dev`, we drop all dev dependencies from the
-# toml file. This means we don't have to ensure compatibility between old deps and
-# dev tools.
+# Rather than `poetry install --no-dev`, we drop all dev dependencies and the dev-docs
+# group from the toml file. This means we don't have to ensure compatibility between
+# old deps and dev tools.
 
 pip install toml wheel
 
@@ -47,6 +47,7 @@ with open('pyproject.toml', 'r') as f:
     data = toml.loads(f.read())
 
 del data['tool']['poetry']['dev-dependencies']
+del data['tool']['poetry']['group']['dev-docs']
 
 with open('pyproject.toml', 'w') as f:
     toml.dump(data, f)

+ 58 - 19
.github/workflows/docs.yaml

@@ -13,25 +13,10 @@ on:
   workflow_dispatch:
 
 jobs:
-  pages:
-    name: GitHub Pages
+  pre:
+    name: Calculate variables for GitHub Pages deployment
     runs-on: ubuntu-latest
     steps:
-      - uses: actions/checkout@v3
-
-      - name: Setup mdbook
-        uses: peaceiris/actions-mdbook@adeb05db28a0c0004681db83893d56c0388ea9ea # v1.2.0
-        with:
-          mdbook-version: '0.4.17'
-
-      - name: Build the documentation
-        # mdbook will only create an index.html if we're including docs/README.md in SUMMARY.md.
-        # However, we're using docs/README.md for other purposes and need to pick a new page
-        # as the default. Let's opt for the welcome page instead.
-        run: |
-          mdbook build
-          cp book/welcome_and_overview.html book/index.html
-
       # Figure out the target directory.
       #
       # The target directory depends on the name of the branch
@@ -55,11 +40,65 @@ jobs:
 
           # finally, set the 'branch-version' var.
           echo "branch-version=$branch" >> "$GITHUB_OUTPUT"
-          
+    outputs:
+      branch-version: ${{ steps.vars.outputs.branch-version }}
+
+################################################################################
+  pages-docs:
+    name: GitHub Pages
+    runs-on: ubuntu-latest
+    needs:
+      - pre
+    steps:
+      - uses: actions/checkout@v3
+
+      - name: Setup mdbook
+        uses: peaceiris/actions-mdbook@adeb05db28a0c0004681db83893d56c0388ea9ea # v1.2.0
+        with:
+          mdbook-version: '0.4.17'
+
+      - name: Build the documentation
+        # mdbook will only create an index.html if we're including docs/README.md in SUMMARY.md.
+        # However, we're using docs/README.md for other purposes and need to pick a new page
+        # as the default. Let's opt for the welcome page instead.
+        run: |
+          mdbook build
+          cp book/welcome_and_overview.html book/index.html
+
       # Deploy to the target directory.
       - name: Deploy to gh pages
         uses: peaceiris/actions-gh-pages@bd8c6b06eba6b3d25d72b7a1767993c0aeee42e7 # v3.9.2
         with:
           github_token: ${{ secrets.GITHUB_TOKEN }}
           publish_dir: ./book
-          destination_dir: ./${{ steps.vars.outputs.branch-version }}
+          destination_dir: ./${{ needs.pre.outputs.branch-version }}
+
+################################################################################
+  pages-devdocs:
+    name: GitHub Pages (developer docs)
+    runs-on: ubuntu-latest
+    needs:
+      - pre
+    steps:
+      - uses: action/checkout@v3
+
+      - name: "Set up Sphinx"
+        uses: matrix-org/setup-python-poetry@v1
+        with:
+          python-version: "3.x"
+          poetry-version: "1.3.2"
+          groups: "dev-docs"
+          extras: ""
+
+      - name: Build the documentation
+        run: |
+          cd dev-docs
+          poetry run make html
+
+      # Deploy to the target directory.
+      - name: Deploy to gh pages
+        uses: peaceiris/actions-gh-pages@bd8c6b06eba6b3d25d72b7a1767993c0aeee42e7 # v3.9.2
+        with:
+          github_token: ${{ secrets.GITHUB_TOKEN }}
+          publish_dir: ./dev-docs/_build/html
+          destination_dir: ./dev-docs/${{ needs.pre.outputs.branch-version }}

+ 2 - 1
.gitignore

@@ -53,6 +53,7 @@ __pycache__/
 /coverage.*
 /dist/
 /docs/build/
+/dev-docs/_build/
 /htmlcov
 /pip-wheel-metadata/
 
@@ -61,7 +62,7 @@ book/
 
 # complement
 /complement-*
-/master.tar.gz
+/main.tar.gz
 
 # rust
 /target/

+ 11 - 11
Cargo.lock

@@ -294,9 +294,9 @@ dependencies = [
 
 [[package]]
 name = "regex"
-version = "1.7.1"
+version = "1.7.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "48aaa5748ba571fb95cd2c85c09f629215d3a6ece942baa100950af03a34f733"
+checksum = "8b1f693b24f6ac912f4893ef08244d70b6067480d2f1a46e950c9691e6749d1d"
 dependencies = [
  "aho-corasick",
  "memchr",
@@ -305,9 +305,9 @@ dependencies = [
 
 [[package]]
 name = "regex-syntax"
-version = "0.6.27"
+version = "0.6.29"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244"
+checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
 
 [[package]]
 name = "ryu"
@@ -323,22 +323,22 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
 
 [[package]]
 name = "serde"
-version = "1.0.157"
+version = "1.0.158"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "707de5fcf5df2b5788fca98dd7eab490bc2fd9b7ef1404defc462833b83f25ca"
+checksum = "771d4d9c4163ee138805e12c710dd365e4f44be8be0503cb1bb9eb989425d9c9"
 dependencies = [
  "serde_derive",
 ]
 
 [[package]]
 name = "serde_derive"
-version = "1.0.157"
+version = "1.0.158"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "78997f4555c22a7971214540c4a661291970619afd56de19f77e0de86296e1e5"
+checksum = "e801c1712f48475582b7696ac71e0ca34ebb30e09338425384269d9717c62cad"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.2",
+ "syn 2.0.10",
 ]
 
 [[package]]
@@ -377,9 +377,9 @@ dependencies = [
 
 [[package]]
 name = "syn"
-version = "2.0.2"
+version = "2.0.10"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "59d3276aee1fa0c33612917969b5172b5be2db051232a6e4826f1a1a9191b045"
+checksum = "5aad1363ed6d37b84299588d62d3a7d95b5a5c2d9aad5c85609fda12afaa1f40"
 dependencies = [
  "proc-macro2",
  "quote",

+ 1 - 0
changelog.d/15128.misc

@@ -0,0 +1 @@
+Add denormalised event stream ordering column to membership state tables for future use. Contributed by Nick @ Beeper (@fizzadar).

+ 1 - 0
changelog.d/15265.misc

@@ -0,0 +1 @@
+Add developer documentation for the Federation Sender and add a documentation mechanism using Sphinx.

+ 1 - 0
changelog.d/15280.misc

@@ -0,0 +1 @@
+Make the pushers rely on the `device_id` instead of the `access_token_id` for various operations.

+ 1 - 0
changelog.d/15311.misc

@@ -0,0 +1 @@
+Reject events with an invalid "mentions" property pert [MSC3952](https://github.com/matrix-org/matrix-spec-proposals/pull/3952).

+ 1 - 0
changelog.d/15316.misc

@@ -0,0 +1 @@
+As an optimisation, use `TRUNCATE` on Postgres when clearing the user directory tables.

+ 1 - 0
changelog.d/15319.misc

@@ -0,0 +1 @@
+Fix `.gitignore` rule for the Complement source tarball downloaded automatically by `complement.sh`.

+ 1 - 0
changelog.d/15324.misc

@@ -0,0 +1 @@
+Bump serde from 1.0.157 to 1.0.158.

+ 1 - 0
changelog.d/15325.misc

@@ -0,0 +1 @@
+Bump regex from 1.7.1 to 1.7.3.

+ 1 - 0
changelog.d/15326.misc

@@ -0,0 +1 @@
+Bump types-pyopenssl from 23.0.0.4 to 23.1.0.0.

+ 1 - 0
changelog.d/15327.misc

@@ -0,0 +1 @@
+Bump furo from 2022.12.7 to 2023.3.23.

+ 1 - 0
changelog.d/15329.misc

@@ -0,0 +1 @@
+Bump cryptography from 40.0.0 to 40.0.1.

+ 1 - 0
changelog.d/15330.misc

@@ -0,0 +1 @@
+Bump mypy-zope from 0.9.0 to 0.9.1.

+ 20 - 0
dev-docs/Makefile

@@ -0,0 +1,20 @@
+# Minimal makefile for Sphinx documentation
+#
+
+# You can set these variables from the command line, and also
+# from the environment for the first two.
+SPHINXOPTS    ?=
+SPHINXBUILD   ?= sphinx-build
+SOURCEDIR     = .
+BUILDDIR      = _build
+
+# Put it first so that "make" without argument is like "make help".
+help:
+	@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
+
+.PHONY: help Makefile
+
+# Catch-all target: route all unknown targets to Sphinx using the new
+# "make mode" option.  $(O) is meant as a shortcut for $(SPHINXOPTS).
+%: Makefile
+	@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

+ 50 - 0
dev-docs/conf.py

@@ -0,0 +1,50 @@
+# Configuration file for the Sphinx documentation builder.
+#
+# For the full list of built-in configuration values, see the documentation:
+# https://www.sphinx-doc.org/en/master/usage/configuration.html
+
+# -- Project information -----------------------------------------------------
+# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information
+
+project = "Synapse development"
+copyright = "2023, The Matrix.org Foundation C.I.C."
+author = "The Synapse Maintainers and Community"
+
+# -- General configuration ---------------------------------------------------
+# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
+
+extensions = [
+    "autodoc2",
+    "myst_parser",
+]
+
+templates_path = ["_templates"]
+exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
+
+
+# -- Options for Autodoc2 ----------------------------------------------------
+
+autodoc2_docstring_parser_regexes = [
+    # this will render all docstrings as 'MyST' Markdown
+    (r".*", "myst"),
+]
+
+autodoc2_packages = [
+    {
+        "path": "../synapse",
+        # Don't render documentation for everything as a matter of course
+        "auto_mode": False,
+    },
+]
+
+
+# -- Options for MyST (Markdown) ---------------------------------------------
+
+# myst_heading_anchors = 2
+
+
+# -- Options for HTML output -------------------------------------------------
+# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
+
+html_theme = "furo"
+html_static_path = ["_static"]

+ 22 - 0
dev-docs/index.rst

@@ -0,0 +1,22 @@
+.. Synapse Developer Documentation documentation master file, created by
+   sphinx-quickstart on Mon Mar 13 08:59:51 2023.
+   You can adapt this file completely to your liking, but it should at least
+   contain the root `toctree` directive.
+
+Welcome to the Synapse Developer Documentation!
+===========================================================
+
+.. toctree::
+   :maxdepth: 2
+   :caption: Contents:
+
+   modules/federation_sender
+
+
+
+Indices and tables
+==================
+
+* :ref:`genindex`
+* :ref:`modindex`
+* :ref:`search`

+ 5 - 0
dev-docs/modules/federation_sender.md

@@ -0,0 +1,5 @@
+Federation Sender
+=================
+
+```{autodoc2-docstring} synapse.federation.sender
+```

File diff suppressed because it is too large
+ 527 - 183
poetry.lock


+ 12 - 3
pyproject.toml

@@ -315,7 +315,8 @@ ruff = "0.0.252"
 
 # Typechecking
 mypy = "*"
-mypy-zope = "*"
+mypy-zope = {git = "https://github.com/Shoobx/mypy-zope.git", rev = "fix-unreachable"}
+
 types-bleach = ">=4.1.0"
 types-commonmark = ">=0.9.2"
 types-jsonschema = ">=3.2.0"
@@ -350,8 +351,16 @@ towncrier = ">=18.6.0rc1"
 # Used for checking the Poetry lockfile
 tomli = ">=1.2.3"
 
-[tool.poetry.group.dev.dependencies]
-mypy-zope = {git = "https://github.com/Shoobx/mypy-zope.git", rev = "fix-unreachable"}
+# Dependencies for building the development documentation
+[tool.poetry.group.dev-docs]
+optional = true
+
+[tool.poetry.group.dev-docs.dependencies]
+sphinx = {version = "^6.1", python = "^3.8"}
+sphinx-autodoc2 = {version = "^0.4.2", python = "^3.8"}
+myst-parser = {version = "^1.0.0", python = "^3.8"}
+furo = ">=2022.12.7,<2024.0.0"
+
 
 [build-system]
 # The upper bounds here are defensive, intended to prevent situations like

+ 1 - 0
scripts-dev/lint.sh

@@ -91,6 +91,7 @@ else
           "synapse" "docker" "tests"
           "scripts-dev"
           "contrib" "synmark" "stubs" ".ci"
+          "dev-docs"
       )
   fi
 fi

+ 5 - 1
synapse/_scripts/synapse_port_db.py

@@ -68,7 +68,10 @@ from synapse.storage.databases.main.media_repository import (
     MediaRepositoryBackgroundUpdateStore,
 )
 from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
-from synapse.storage.databases.main.pusher import PusherWorkerStore
+from synapse.storage.databases.main.pusher import (
+    PusherBackgroundUpdatesStore,
+    PusherWorkerStore,
+)
 from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
 from synapse.storage.databases.main.registration import (
     RegistrationBackgroundUpdateStore,
@@ -226,6 +229,7 @@ class Store(
     AccountDataWorkerStore,
     PushRuleStore,
     PusherWorkerStore,
+    PusherBackgroundUpdatesStore,
     PresenceBackgroundUpdateStore,
     ReceiptsBackgroundUpdateStore,
     RelationsWorkerStore,

+ 32 - 10
synapse/events/validator.py

@@ -12,11 +12,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import collections.abc
-from typing import Iterable, Type, Union, cast
+from typing import Iterable, List, Type, Union, cast
 
 import jsonschema
+from pydantic import Field, StrictBool, StrictStr
 
-from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership
+from synapse.api.constants import (
+    MAX_ALIAS_LENGTH,
+    EventContentFields,
+    EventTypes,
+    Membership,
+)
 from synapse.api.errors import Codes, SynapseError
 from synapse.api.room_versions import EventFormatVersions
 from synapse.config.homeserver import HomeServerConfig
@@ -28,6 +34,8 @@ from synapse.events.utils import (
     validate_canonicaljson,
 )
 from synapse.federation.federation_server import server_matches_acl_event
+from synapse.http.servlet import validate_json_object
+from synapse.rest.models import RequestBodyModel
 from synapse.types import EventID, JsonDict, RoomID, UserID
 
 
@@ -88,27 +96,27 @@ class EventValidator:
                             Codes.INVALID_PARAM,
                         )
 
-        if event.type == EventTypes.Retention:
+        elif event.type == EventTypes.Retention:
             self._validate_retention(event)
 
-        if event.type == EventTypes.ServerACL:
+        elif event.type == EventTypes.ServerACL:
             if not server_matches_acl_event(config.server.server_name, event):
                 raise SynapseError(
                     400, "Can't create an ACL event that denies the local server"
                 )
 
-        if event.type == EventTypes.PowerLevels:
+        elif event.type == EventTypes.PowerLevels:
             try:
                 jsonschema.validate(
                     instance=event.content,
                     schema=POWER_LEVELS_SCHEMA,
-                    cls=plValidator,
+                    cls=POWER_LEVELS_VALIDATOR,
                 )
             except jsonschema.ValidationError as e:
                 if e.path:
                     # example: "users_default": '0' is not of type 'integer'
                     # cast safety: path entries can be integers, if we fail to validate
-                    # items in an array. However the POWER_LEVELS_SCHEMA doesn't expect
+                    # items in an array. However, the POWER_LEVELS_SCHEMA doesn't expect
                     # to see any arrays.
                     message = (
                         '"' + cast(str, e.path[-1]) + '": ' + e.message  # noqa: B306
@@ -125,6 +133,15 @@ class EventValidator:
                     errcode=Codes.BAD_JSON,
                 )
 
+        # If the event contains a mentions key, validate it.
+        if (
+            EventContentFields.MSC3952_MENTIONS in event.content
+            and config.experimental.msc3952_intentional_mentions
+        ):
+            validate_json_object(
+                event.content[EventContentFields.MSC3952_MENTIONS], Mentions
+            )
+
     def _validate_retention(self, event: EventBase) -> None:
         """Checks that an event that defines the retention policy for a room respects the
         format enforced by the spec.
@@ -253,10 +270,15 @@ POWER_LEVELS_SCHEMA = {
 }
 
 
+class Mentions(RequestBodyModel):
+    user_ids: List[StrictStr] = Field(default_factory=list)
+    room: StrictBool = False
+
+
 # This could return something newer than Draft 7, but that's the current "latest"
 # validator.
-def _create_power_level_validator() -> Type[jsonschema.Draft7Validator]:
-    validator = jsonschema.validators.validator_for(POWER_LEVELS_SCHEMA)
+def _create_validator(schema: JsonDict) -> Type[jsonschema.Draft7Validator]:
+    validator = jsonschema.validators.validator_for(schema)
 
     # by default jsonschema does not consider a immutabledict to be an object so
     # we need to use a custom type checker
@@ -268,4 +290,4 @@ def _create_power_level_validator() -> Type[jsonschema.Draft7Validator]:
     return jsonschema.validators.extend(validator, type_checker=type_checker)
 
 
-plValidator = _create_power_level_validator()
+POWER_LEVELS_VALIDATOR = _create_validator(POWER_LEVELS_SCHEMA)

+ 113 - 0
synapse/federation/sender/__init__.py

@@ -11,6 +11,119 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+"""
+The Federation Sender is responsible for sending Persistent Data Units (PDUs)
+and Ephemeral Data Units (EDUs) to other homeservers using
+the `/send` Federation API.
+
+
+## How do PDUs get sent?
+
+The Federation Sender is made aware of new PDUs due to `FederationSender.notify_new_events`.
+When the sender is notified about a newly-persisted PDU that originates from this homeserver
+and is not an out-of-band event, we pass the PDU to the `_PerDestinationQueue` for each
+remote homeserver that is in the room at that point in the DAG.
+
+
+### Per-Destination Queues
+
+There is one `PerDestinationQueue` per 'destination' homeserver.
+The `PerDestinationQueue` maintains the following information about the destination:
+
+- whether the destination is currently in [catch-up mode (see below)](#catch-up-mode);
+- a queue of PDUs to be sent to the destination; and
+- a queue of EDUs to be sent to the destination (not considered in this section).
+
+Upon a new PDU being enqueued, `attempt_new_transaction` is called to start a new
+transaction if there is not already one in progress.
+
+
+### Transactions and the Transaction Transmission Loop
+
+Each federation HTTP request to the `/send` endpoint is referred to as a 'transaction'.
+The body of the HTTP request contains a list of PDUs and EDUs to send to the destination.
+
+The *Transaction Transmission Loop* (`_transaction_transmission_loop`) is responsible
+for emptying the queued PDUs (and EDUs) from a `PerDestinationQueue` by sending
+them to the destination.
+
+There can only be one transaction in flight for a given destination at any time.
+(Other than preventing us from overloading the destination, this also makes it easier to
+reason about because we process events sequentially for each destination.
+This is useful for *Catch-Up Mode*, described later.)
+
+The loop continues so long as there is anything to send. At each iteration of the loop, we:
+
+- dequeue up to 50 PDUs (and up to 100 EDUs).
+- make the `/send` request to the destination homeserver with the dequeued PDUs and EDUs.
+- if successful, make note of the fact that we succeeded in transmitting PDUs up to
+  the given `stream_ordering` of the latest PDU by
+- if unsuccessful, back off from the remote homeserver for some time.
+  If we have been unsuccessful for too long (when the backoff interval grows to exceed 1 hour),
+  the in-memory queues are emptied and we enter [*Catch-Up Mode*, described below](#catch-up-mode).
+
+
+### Catch-Up Mode
+
+When the `PerDestinationQueue` has the catch-up flag set, the *Catch-Up Transmission Loop*
+(`_catch_up_transmission_loop`) is used in lieu of the regular `_transaction_transmission_loop`.
+(Only once the catch-up mode has been exited can the regular tranaction transmission behaviour
+be resumed.)
+
+*Catch-Up Mode*, entered upon Synapse startup or once a homeserver has fallen behind due to
+connection problems, is responsible for sending PDUs that have been missed by the destination
+homeserver. (PDUs can be missed because the `PerDestinationQueue` is volatile — i.e. resets
+on startup — and it does not hold PDUs forever if `/send` requests to the destination fail.)
+
+The catch-up mechanism makes use of the `last_successful_stream_ordering` column in the
+`destinations` table (which gives the `stream_ordering` of the most recent successfully
+sent PDU) and the `stream_ordering` column in the `destination_rooms` table (which gives,
+for each room, the `stream_ordering` of the most recent PDU that needs to be sent to this
+destination).
+
+Each iteration of the loop pulls out 50 `destination_rooms` entries with the oldest
+`stream_ordering`s that are greater than the `last_successful_stream_ordering`.
+In other words, from the set of latest PDUs in each room to be sent to the destination,
+the 50 oldest such PDUs are pulled out.
+
+These PDUs could, in principle, now be directly sent to the destination. However, as an
+optimisation intended to prevent overloading destination homeservers, we instead attempt
+to send the latest forward extremities so long as the destination homeserver is still
+eligible to receive those.
+This reduces load on the destination **in aggregate** because all Synapse homeservers
+will behave according to this principle and therefore avoid sending lots of different PDUs
+at different points in the DAG to a recovering homeserver.
+*This optimisation is not currently valid in rooms which are partial-state on this homeserver,
+since we are unable to determine whether the destination homeserver is eligible to receive
+the latest forward extremities unless this homeserver sent those PDUs — in this case, we
+just send the latest PDUs originating from this server and skip this optimisation.*
+
+Whilst PDUs are sent through this mechanism, the position of `last_successful_stream_ordering`
+is advanced as normal.
+Once there are no longer any rooms containing outstanding PDUs to be sent to the destination
+*that are not already in the `PerDestinationQueue` because they arrived since Catch-Up Mode
+was enabled*, Catch-Up Mode is exited and we return to `_transaction_transmission_loop`.
+
+
+#### A note on failures and back-offs
+
+If a remote server is unreachable over federation, we back off from that server,
+with an exponentially-increasing retry interval.
+Whilst we don't automatically retry after the interval, we prevent making new attempts
+until such time as the back-off has cleared.
+Once the back-off is cleared and a new PDU or EDU arrives for transmission, the transmission
+loop resumes and empties the queue by making federation requests.
+
+If the backoff grows too large (> 1 hour), the in-memory queue is emptied (to prevent
+unbounded growth) and Catch-Up Mode is entered.
+
+It is worth noting that the back-off for a remote server is cleared once an inbound
+request from that remote server is received (see `notify_remote_server_up`).
+At this point, the transaction transmission loop is also started up, to proactively
+send missed PDUs and EDUs to the destination (i.e. you don't need to wait for a new PDU
+or EDU, destined for that destination, to be created in order to send out missed PDUs and
+EDUs).
+"""
 
 import abc
 import logging

+ 6 - 2
synapse/handlers/auth.py

@@ -1504,8 +1504,10 @@ class AuthHandler:
         )
 
         # delete pushers associated with this access token
+        # XXX(quenting): This is only needed until the 'set_device_id_for_pushers'
+        # background update completes.
         if token.token_id is not None:
-            await self.hs.get_pusherpool().remove_pushers_by_access_token(
+            await self.hs.get_pusherpool().remove_pushers_by_access_tokens(
                 token.user_id, (token.token_id,)
             )
 
@@ -1535,7 +1537,9 @@ class AuthHandler:
             )
 
         # delete pushers associated with the access tokens
-        await self.hs.get_pusherpool().remove_pushers_by_access_token(
+        # XXX(quenting): This is only needed until the 'set_device_id_for_pushers'
+        # background update completes.
+        await self.hs.get_pusherpool().remove_pushers_by_access_tokens(
             user_id, (token_id for _, token_id, _ in tokens_and_devices)
         )
 

+ 2 - 0
synapse/handlers/device.py

@@ -503,6 +503,8 @@ class DeviceHandler(DeviceWorkerHandler):
             else:
                 raise
 
+        await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)
+
         # Delete data specific to each device. Not optimised as it is not
         # considered as part of a critical path.
         for device_id in device_ids:

+ 2 - 2
synapse/handlers/register.py

@@ -1013,11 +1013,11 @@ class RegistrationHandler:
             user_tuple = await self.store.get_user_by_access_token(token)
             # The token better still exist.
             assert user_tuple
-            token_id = user_tuple.token_id
+            device_id = user_tuple.device_id
 
             await self.pusher_pool.add_or_update_pusher(
                 user_id=user_id,
-                access_token=token_id,
+                device_id=device_id,
                 kind="email",
                 app_id="m.email",
                 app_display_name="Email Notifications",

+ 16 - 6
synapse/http/servlet.py

@@ -778,17 +778,13 @@ def parse_json_object_from_request(
 Model = TypeVar("Model", bound=BaseModel)
 
 
-def parse_and_validate_json_object_from_request(
-    request: Request, model_type: Type[Model]
-) -> Model:
-    """Parse a JSON object from the body of a twisted HTTP request, then deserialise and
-    validate using the given pydantic model.
+def validate_json_object(content: JsonDict, model_type: Type[Model]) -> Model:
+    """Validate a deserialized JSON object using the given pydantic model.
 
     Raises:
         SynapseError if the request body couldn't be decoded as JSON or
             if it wasn't a JSON object.
     """
-    content = parse_json_object_from_request(request, allow_empty_body=False)
     try:
         instance = model_type.parse_obj(content)
     except ValidationError as e:
@@ -811,6 +807,20 @@ def parse_and_validate_json_object_from_request(
     return instance
 
 
+def parse_and_validate_json_object_from_request(
+    request: Request, model_type: Type[Model]
+) -> Model:
+    """Parse a JSON object from the body of a twisted HTTP request, then deserialise and
+    validate using the given pydantic model.
+
+    Raises:
+        SynapseError if the request body couldn't be decoded as JSON or
+            if it wasn't a JSON object.
+    """
+    content = parse_json_object_from_request(request, allow_empty_body=False)
+    return validate_json_object(content, model_type)
+
+
 def assert_params_in_dict(body: JsonDict, required: Iterable[str]) -> None:
     absent = []
     for k in required:

+ 6 - 1
synapse/push/__init__.py

@@ -103,7 +103,7 @@ class PusherConfig:
 
     id: Optional[str]
     user_name: str
-    access_token: Optional[int]
+
     profile_tag: str
     kind: str
     app_id: str
@@ -119,6 +119,11 @@ class PusherConfig:
     enabled: bool
     device_id: Optional[str]
 
+    # XXX(quenting): The access_token is not persisted anymore for new pushers, but we
+    # keep it when reading from the database, so that we don't get stale pushers
+    # while the "set_device_id_for_pushers" background update is running.
+    access_token: Optional[int]
+
     def as_dict(self) -> Dict[str, Any]:
         """Information that can be retrieved about a pusher after creation."""
         return {

+ 42 - 16
synapse/push/pusherpool.py

@@ -25,7 +25,7 @@ from synapse.metrics.background_process_metrics import (
 from synapse.push import Pusher, PusherConfig, PusherConfigException
 from synapse.push.pusher import PusherFactory
 from synapse.replication.http.push import ReplicationRemovePusherRestServlet
-from synapse.types import JsonDict, RoomStreamToken
+from synapse.types import JsonDict, RoomStreamToken, StrCollection
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.threepids import canonicalise_email
 
@@ -97,7 +97,6 @@ class PusherPool:
     async def add_or_update_pusher(
         self,
         user_id: str,
-        access_token: Optional[int],
         kind: str,
         app_id: str,
         app_display_name: str,
@@ -128,6 +127,22 @@ class PusherPool:
         # stream ordering, so it will process pushes from this point onwards.
         last_stream_ordering = self.store.get_room_max_stream_ordering()
 
+        # Before we actually persist the pusher, we check if the user already has one
+        # for this app ID and pushkey. If so, we want to keep the access token and
+        # device ID in place, since this could be one device modifying
+        # (e.g. enabling/disabling) another device's pusher.
+        # XXX(quenting): Even though we're not persisting the access_token_id for new
+        # pushers anymore, we still need to copy existing access_token_ids over when
+        # updating a pusher, in case the "set_device_id_for_pushers" background update
+        # hasn't run yet.
+        access_token_id = None
+        existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
+            user_id, app_id, pushkey
+        )
+        if existing_config:
+            device_id = existing_config.device_id
+            access_token_id = existing_config.access_token
+
         # we try to create the pusher just to validate the config: it
         # will then get pulled out of the database,
         # recreated, added and started: this means we have only one
@@ -136,7 +151,6 @@ class PusherPool:
             PusherConfig(
                 id=None,
                 user_name=user_id,
-                access_token=access_token,
                 profile_tag=profile_tag,
                 kind=kind,
                 app_id=app_id,
@@ -151,23 +165,12 @@ class PusherPool:
                 failing_since=None,
                 enabled=enabled,
                 device_id=device_id,
+                access_token=access_token_id,
             )
         )
 
-        # Before we actually persist the pusher, we check if the user already has one
-        # this app ID and pushkey. If so, we want to keep the access token and device ID
-        # in place, since this could be one device modifying (e.g. enabling/disabling)
-        # another device's pusher.
-        existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
-            user_id, app_id, pushkey
-        )
-        if existing_config:
-            access_token = existing_config.access_token
-            device_id = existing_config.device_id
-
         await self.store.add_pusher(
             user_id=user_id,
-            access_token=access_token,
             kind=kind,
             app_id=app_id,
             app_display_name=app_display_name,
@@ -180,6 +183,7 @@ class PusherPool:
             profile_tag=profile_tag,
             enabled=enabled,
             device_id=device_id,
+            access_token_id=access_token_id,
         )
         pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)
 
@@ -199,7 +203,7 @@ class PusherPool:
                 )
                 await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
 
-    async def remove_pushers_by_access_token(
+    async def remove_pushers_by_access_tokens(
         self, user_id: str, access_tokens: Iterable[int]
     ) -> None:
         """Remove the pushers for a given user corresponding to a set of
@@ -209,6 +213,8 @@ class PusherPool:
             user_id: user to remove pushers for
             access_tokens: access token *ids* to remove pushers for
         """
+        # XXX(quenting): This is only needed until the "set_device_id_for_pushers"
+        # background update finishes
         tokens = set(access_tokens)
         for p in await self.store.get_pushers_by_user_id(user_id):
             if p.access_token in tokens:
@@ -220,6 +226,26 @@ class PusherPool:
                 )
                 await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
 
+    async def remove_pushers_by_devices(
+        self, user_id: str, devices: StrCollection
+    ) -> None:
+        """Remove the pushers for a given user corresponding to a set of devices
+
+        Args:
+            user_id: user to remove pushers for
+            devices: device IDs to remove pushers for
+        """
+        device_ids = set(devices)
+        for p in await self.store.get_pushers_by_user_id(user_id):
+            if p.device_id in device_ids:
+                logger.info(
+                    "Removing pusher for app id %s, pushkey %s, user %s",
+                    p.app_id,
+                    p.pushkey,
+                    p.user_name,
+                )
+                await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
+
     def on_new_notifications(self, max_token: RoomStreamToken) -> None:
         if not self.pushers:
             # nothing to do here.

+ 0 - 1
synapse/rest/admin/users.py

@@ -425,7 +425,6 @@ class UserRestServletV2(RestServlet):
                     ):
                         await self.pusher_pool.add_or_update_pusher(
                             user_id=user_id,
-                            access_token=None,
                             kind="email",
                             app_id="m.email",
                             app_display_name="Email Notifications",

+ 0 - 1
synapse/rest/client/pusher.py

@@ -126,7 +126,6 @@ class PushersSetRestServlet(RestServlet):
         try:
             await self.pusher_pool.add_or_update_pusher(
                 user_id=user.to_string(),
-                access_token=requester.access_token_id,
                 kind=content["kind"],
                 app_id=content["app_id"],
                 app_display_name=content["app_display_name"],

+ 17 - 6
synapse/storage/databases/main/events.py

@@ -1126,11 +1126,15 @@ class PersistEventsStore:
                 # been inserted into room_memberships.
                 txn.execute_batch(
                     """INSERT INTO current_state_events
-                        (room_id, type, state_key, event_id, membership)
-                    VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
+                        (room_id, type, state_key, event_id, membership, event_stream_ordering)
+                    VALUES (
+                        ?, ?, ?, ?,
+                        (SELECT membership FROM room_memberships WHERE event_id = ?),
+                        (SELECT stream_ordering FROM events WHERE event_id = ?)
+                    )
                     """,
                     [
-                        (room_id, key[0], key[1], ev_id, ev_id)
+                        (room_id, key[0], key[1], ev_id, ev_id, ev_id)
                         for key, ev_id in to_insert.items()
                     ],
                 )
@@ -1157,11 +1161,15 @@ class PersistEventsStore:
             if to_insert:
                 txn.execute_batch(
                     """INSERT INTO local_current_membership
-                        (room_id, user_id, event_id, membership)
-                    VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
+                        (room_id, user_id, event_id, membership, event_stream_ordering)
+                    VALUES (
+                        ?, ?, ?,
+                        (SELECT membership FROM room_memberships WHERE event_id = ?),
+                        (SELECT stream_ordering FROM events WHERE event_id = ?)
+                    )
                     """,
                     [
-                        (room_id, key[1], ev_id, ev_id)
+                        (room_id, key[1], ev_id, ev_id, ev_id)
                         for key, ev_id in to_insert.items()
                         if key[0] == EventTypes.Member and self.is_mine_id(key[1])
                     ],
@@ -1769,6 +1777,7 @@ class PersistEventsStore:
             table="room_memberships",
             keys=(
                 "event_id",
+                "event_stream_ordering",
                 "user_id",
                 "sender",
                 "room_id",
@@ -1779,6 +1788,7 @@ class PersistEventsStore:
             values=[
                 (
                     event.event_id,
+                    event.internal_metadata.stream_ordering,
                     event.state_key,
                     event.user_id,
                     event.room_id,
@@ -1811,6 +1821,7 @@ class PersistEventsStore:
                     keyvalues={"room_id": event.room_id, "user_id": event.state_key},
                     values={
                         "event_id": event.event_id,
+                        "event_stream_ordering": event.internal_metadata.stream_ordering,
                         "membership": event.membership,
                     },
                 )

+ 4 - 2
synapse/storage/databases/main/purge_events.py

@@ -428,14 +428,16 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
             "partial_state_events",
             "partial_state_rooms_servers",
             "partial_state_rooms",
+            # Note: the _membership(s) tables have foreign keys to the `events` table
+            # so must be deleted first.
+            "local_current_membership",
+            "room_memberships",
             "events",
             "federation_inbound_events_staging",
-            "local_current_membership",
             "receipts_graph",
             "receipts_linearized",
             "room_aliases",
             "room_depth",
-            "room_memberships",
             "room_stats_state",
             "room_stats_current",
             "room_stats_earliest_token",

+ 31 - 9
synapse/storage/databases/main/pusher.py

@@ -509,19 +509,24 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
     async def _set_device_id_for_pushers(
         self, progress: JsonDict, batch_size: int
     ) -> int:
-        """Background update to populate the device_id column of the pushers table."""
+        """
+        Background update to populate the device_id column and clear the access_token
+        column for the pushers table.
+        """
         last_pusher_id = progress.get("pusher_id", 0)
 
         def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
             txn.execute(
                 """
-                    SELECT p.id, at.device_id
+                    SELECT 
+                        p.id AS pusher_id,
+                        p.device_id AS pusher_device_id,
+                        at.device_id AS token_device_id
                     FROM pushers AS p
-                    INNER JOIN access_tokens AS at
+                    LEFT JOIN access_tokens AS at
                         ON p.access_token = at.id
                     WHERE
                         p.access_token IS NOT NULL
-                        AND at.device_id IS NOT NULL
                         AND p.id > ?
                     ORDER BY p.id
                     LIMIT ?
@@ -533,13 +538,27 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
             if len(rows) == 0:
                 return 0
 
+            # The reason we're clearing the access_token column here is a bit subtle.
+            # When a user logs out, we:
+            #  (1) delete the access token
+            #  (2) delete the device
+            #
+            # Ideally, we would delete the pushers only via its link to the device
+            # during (2), but since this background update might not have fully run yet,
+            # we're still deleting the pushers via the access token during (1).
             self.db_pool.simple_update_many_txn(
                 txn=txn,
                 table="pushers",
                 key_names=("id",),
-                key_values=[(row["id"],) for row in rows],
-                value_names=("device_id",),
-                value_values=[(row["device_id"],) for row in rows],
+                key_values=[(row["pusher_id"],) for row in rows],
+                value_names=("device_id", "access_token"),
+                # If there was already a device_id on the pusher, we only want to clear
+                # the access_token column, so we keep the existing device_id. Otherwise,
+                # we set the device_id we got from joining the access_tokens table.
+                value_values=[
+                    (row["pusher_device_id"] or row["token_device_id"], None)
+                    for row in rows
+                ],
             )
 
             self.db_pool.updates._background_update_progress_txn(
@@ -568,7 +587,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
     async def add_pusher(
         self,
         user_id: str,
-        access_token: Optional[int],
         kind: str,
         app_id: str,
         app_display_name: str,
@@ -581,13 +599,13 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
         profile_tag: str = "",
         enabled: bool = True,
         device_id: Optional[str] = None,
+        access_token_id: Optional[int] = None,
     ) -> None:
         async with self._pushers_id_gen.get_next() as stream_id:
             await self.db_pool.simple_upsert(
                 table="pushers",
                 keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
                 values={
-                    "access_token": access_token,
                     "kind": kind,
                     "app_display_name": app_display_name,
                     "device_display_name": device_display_name,
@@ -599,6 +617,10 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
                     "id": stream_id,
                     "enabled": enabled,
                     "device_id": device_id,
+                    # XXX(quenting): We're only really persisting the access token ID
+                    # when updating an existing pusher. This is in case the
+                    # 'set_device_id_for_pushers' background update hasn't finished yet.
+                    "access_token": access_token_id,
                 },
                 desc="add_pusher",
             )

+ 11 - 4
synapse/storage/databases/main/user_directory.py

@@ -698,10 +698,17 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
         """Delete the entire user directory"""
 
         def _delete_all_from_user_dir_txn(txn: LoggingTransaction) -> None:
-            txn.execute("DELETE FROM user_directory")
-            txn.execute("DELETE FROM user_directory_search")
-            txn.execute("DELETE FROM users_in_public_rooms")
-            txn.execute("DELETE FROM users_who_share_private_rooms")
+            # SQLite doesn't support TRUNCATE.
+            # On Postgres, DELETE FROM does a table scan but TRUNCATE is more efficient.
+            truncate = (
+                "DELETE FROM"
+                if isinstance(self.database_engine, Sqlite3Engine)
+                else "TRUNCATE"
+            )
+            txn.execute(f"{truncate} user_directory")
+            txn.execute(f"{truncate} user_directory_search")
+            txn.execute(f"{truncate} users_in_public_rooms")
+            txn.execute(f"{truncate} users_who_share_private_rooms")
             txn.call_after(self.get_user_in_directory.invalidate_all)
 
         await self.db_pool.runInteraction(

+ 10 - 4
synapse/storage/schema/__init__.py

@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 74  # remember to update the list below when updating
+SCHEMA_VERSION = 75  # remember to update the list below when updating
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
@@ -91,13 +91,19 @@ Changes in SCHEMA_VERSION = 74:
     - A query on `event_stream_ordering` column has now been disambiguated (i.e. the
       codebase can handle the `current_state_events`, `local_current_memberships` and
       `room_memberships` tables having an `event_stream_ordering` column).
+
+Changes in SCHEMA_VERSION = 75:
+    - The `event_stream_ordering` column in membership tables (`current_state_events`,
+      `local_current_membership` & `room_memberships`) is now being populated for new
+      rows. When the background job to populate historical rows lands this will
+      become the compat schema version.
 """
 
 
 SCHEMA_COMPAT_VERSION = (
-    # The threads_id column must exist for event_push_actions, event_push_summary,
-    # receipts_linearized, and receipts_graph.
-    73
+    # Queries against `event_stream_ordering` columns in membership tables must
+    # be disambiguated.
+    74
 )
 """Limit on how far the synapse codebase can be rolled back without breaking db compat
 

+ 20 - 0
synapse/storage/schema/main/delta/74/01membership_tables_event_stream_ordering.sql

@@ -0,0 +1,20 @@
+/* Copyright 2022 Beeper
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which
+-- we use to improve database performance by reduring JOINs.
+ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
+ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
+ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);

+ 19 - 0
synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql

@@ -0,0 +1,19 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Triggers the background update to set the device_id for pushers
+-- that don't have one, and clear the access_token column.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+    (7402, 'set_device_id_for_pushers', '{}');

+ 79 - 0
synapse/storage/schema/main/delta/74/02membership_tables_event_stream_ordering_triggers.py

@@ -0,0 +1,79 @@
+# Copyright 2022 Beeper
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""
+This migration adds triggers to the room membership tables to enforce consistency.
+Triggers cannot be expressed in .sql files, so we have to use a separate file.
+"""
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
+from synapse.storage.types import Cursor
+
+
+def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+    # Complain if the `event_stream_ordering` in membership tables doesn't match
+    # the `stream_ordering` row with the same `event_id` in `events`.
+    if isinstance(database_engine, Sqlite3Engine):
+        for table in (
+            "current_state_events",
+            "local_current_membership",
+            "room_memberships",
+        ):
+            cur.execute(
+                f"""
+                CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering
+                BEFORE INSERT ON {table}
+                FOR EACH ROW
+                BEGIN
+                    SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}')
+                    WHERE EXISTS (
+                        SELECT 1 FROM events
+                        WHERE events.event_id = NEW.event_id
+                           AND events.stream_ordering != NEW.event_stream_ordering
+                    );
+                END;
+                """
+            )
+    elif isinstance(database_engine, PostgresEngine):
+        cur.execute(
+            """
+            CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$
+            BEGIN
+                IF EXISTS (
+                    SELECT 1 FROM events
+                    WHERE events.event_id = NEW.event_id
+                       AND events.stream_ordering != NEW.event_stream_ordering
+                ) THEN
+                    RAISE EXCEPTION 'Incorrect event_stream_ordering';
+                END IF;
+                RETURN NEW;
+            END;
+            $BODY$ LANGUAGE plpgsql;
+            """
+        )
+
+        for table in (
+            "current_state_events",
+            "local_current_membership",
+            "room_memberships",
+        ):
+            cur.execute(
+                f"""
+                CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table}
+                FOR EACH ROW
+                EXECUTE PROCEDURE check_event_stream_ordering()
+                """
+            )
+    else:
+        raise NotImplementedError("Unknown database engine")

+ 56 - 38
tests/push/test_bulk_push_rule_evaluator.py

@@ -243,22 +243,28 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
         )
 
         # Non-dict mentions should be ignored.
-        mentions: Any
-        for mentions in (None, True, False, 1, "foo", []):
-            self.assertFalse(
-                self._create_and_process(
-                    bulk_evaluator, {EventContentFields.MSC3952_MENTIONS: mentions}
+        #
+        # Avoid C-S validation as these aren't expected.
+        with patch(
+            "synapse.events.validator.EventValidator.validate_new",
+            new=lambda s, event, config: True,
+        ):
+            mentions: Any
+            for mentions in (None, True, False, 1, "foo", []):
+                self.assertFalse(
+                    self._create_and_process(
+                        bulk_evaluator, {EventContentFields.MSC3952_MENTIONS: mentions}
+                    )
                 )
-            )
 
-        # A non-list should be ignored.
-        for mentions in (None, True, False, 1, "foo", {}):
-            self.assertFalse(
-                self._create_and_process(
-                    bulk_evaluator,
-                    {EventContentFields.MSC3952_MENTIONS: {"user_ids": mentions}},
+            # A non-list should be ignored.
+            for mentions in (None, True, False, 1, "foo", {}):
+                self.assertFalse(
+                    self._create_and_process(
+                        bulk_evaluator,
+                        {EventContentFields.MSC3952_MENTIONS: {"user_ids": mentions}},
+                    )
                 )
-            )
 
         # The Matrix ID appearing anywhere in the list should notify.
         self.assertTrue(
@@ -291,26 +297,32 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
         )
 
         # Invalid entries in the list are ignored.
-        self.assertFalse(
-            self._create_and_process(
-                bulk_evaluator,
-                {
-                    EventContentFields.MSC3952_MENTIONS: {
-                        "user_ids": [None, True, False, {}, []]
-                    }
-                },
+        #
+        # Avoid C-S validation as these aren't expected.
+        with patch(
+            "synapse.events.validator.EventValidator.validate_new",
+            new=lambda s, event, config: True,
+        ):
+            self.assertFalse(
+                self._create_and_process(
+                    bulk_evaluator,
+                    {
+                        EventContentFields.MSC3952_MENTIONS: {
+                            "user_ids": [None, True, False, {}, []]
+                        }
+                    },
+                )
             )
-        )
-        self.assertTrue(
-            self._create_and_process(
-                bulk_evaluator,
-                {
-                    EventContentFields.MSC3952_MENTIONS: {
-                        "user_ids": [None, True, False, {}, [], self.alice]
-                    }
-                },
+            self.assertTrue(
+                self._create_and_process(
+                    bulk_evaluator,
+                    {
+                        EventContentFields.MSC3952_MENTIONS: {
+                            "user_ids": [None, True, False, {}, [], self.alice]
+                        }
+                    },
+                )
             )
-        )
 
         # The legacy push rule should not mention if the mentions field exists.
         self.assertFalse(
@@ -351,14 +363,20 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
         )
 
         # Invalid data should not notify.
-        mentions: Any
-        for mentions in (None, False, 1, "foo", [], {}):
-            self.assertFalse(
-                self._create_and_process(
-                    bulk_evaluator,
-                    {EventContentFields.MSC3952_MENTIONS: {"room": mentions}},
+        #
+        # Avoid C-S validation as these aren't expected.
+        with patch(
+            "synapse.events.validator.EventValidator.validate_new",
+            new=lambda s, event, config: True,
+        ):
+            mentions: Any
+            for mentions in (None, False, 1, "foo", [], {}):
+                self.assertFalse(
+                    self._create_and_process(
+                        bulk_evaluator,
+                        {EventContentFields.MSC3952_MENTIONS: {"room": mentions}},
+                    )
                 )
-            )
 
         # The legacy push rule should not mention if the mentions field exists.
         self.assertFalse(

+ 3 - 3
tests/push/test_email.py

@@ -105,7 +105,7 @@ class EmailPusherTests(HomeserverTestCase):
             self.hs.get_datastores().main.get_user_by_access_token(self.access_token)
         )
         assert user_tuple is not None
-        self.token_id = user_tuple.token_id
+        self.device_id = user_tuple.device_id
 
         # We need to add email to account before we can create a pusher.
         self.get_success(
@@ -117,7 +117,7 @@ class EmailPusherTests(HomeserverTestCase):
         pusher = self.get_success(
             self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=self.user_id,
-                access_token=self.token_id,
+                device_id=self.device_id,
                 kind="email",
                 app_id="m.email",
                 app_display_name="Email Notifications",
@@ -141,7 +141,7 @@ class EmailPusherTests(HomeserverTestCase):
             self.get_success_or_raise(
                 self.hs.get_pusherpool().add_or_update_pusher(
                     user_id=self.user_id,
-                    access_token=self.token_id,
+                    device_id=self.device_id,
                     kind="email",
                     app_id="m.email",
                     app_display_name="Email Notifications",

+ 21 - 25
tests/push/test_http.py

@@ -67,13 +67,13 @@ class HTTPPusherTests(HomeserverTestCase):
             self.hs.get_datastores().main.get_user_by_access_token(access_token)
         )
         assert user_tuple is not None
-        token_id = user_tuple.token_id
+        device_id = user_tuple.device_id
 
         def test_data(data: Any) -> None:
             self.get_failure(
                 self.hs.get_pusherpool().add_or_update_pusher(
                     user_id=user_id,
-                    access_token=token_id,
+                    device_id=device_id,
                     kind="http",
                     app_id="m.http",
                     app_display_name="HTTP Push Notifications",
@@ -114,12 +114,12 @@ class HTTPPusherTests(HomeserverTestCase):
             self.hs.get_datastores().main.get_user_by_access_token(access_token)
         )
         assert user_tuple is not None
-        token_id = user_tuple.token_id
+        device_id = user_tuple.device_id
 
         self.get_success(
             self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
-                access_token=token_id,
+                device_id=device_id,
                 kind="http",
                 app_id="m.http",
                 app_display_name="HTTP Push Notifications",
@@ -235,12 +235,12 @@ class HTTPPusherTests(HomeserverTestCase):
             self.hs.get_datastores().main.get_user_by_access_token(access_token)
         )
         assert user_tuple is not None
-        token_id = user_tuple.token_id
+        device_id = user_tuple.device_id
 
         self.get_success(
             self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
-                access_token=token_id,
+                device_id=device_id,
                 kind="http",
                 app_id="m.http",
                 app_display_name="HTTP Push Notifications",
@@ -356,12 +356,12 @@ class HTTPPusherTests(HomeserverTestCase):
             self.hs.get_datastores().main.get_user_by_access_token(access_token)
         )
         assert user_tuple is not None
-        token_id = user_tuple.token_id
+        device_id = user_tuple.device_id
 
         self.get_success(
             self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
-                access_token=token_id,
+                device_id=device_id,
                 kind="http",
                 app_id="m.http",
                 app_display_name="HTTP Push Notifications",
@@ -443,12 +443,12 @@ class HTTPPusherTests(HomeserverTestCase):
             self.hs.get_datastores().main.get_user_by_access_token(access_token)
         )
         assert user_tuple is not None
-        token_id = user_tuple.token_id
+        device_id = user_tuple.device_id
 
         self.get_success(
             self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
-                access_token=token_id,
+                device_id=device_id,
                 kind="http",
                 app_id="m.http",
                 app_display_name="HTTP Push Notifications",
@@ -521,12 +521,12 @@ class HTTPPusherTests(HomeserverTestCase):
             self.hs.get_datastores().main.get_user_by_access_token(access_token)
         )
         assert user_tuple is not None
-        token_id = user_tuple.token_id
+        device_id = user_tuple.device_id
 
         self.get_success(
             self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
-                access_token=token_id,
+                device_id=device_id,
                 kind="http",
                 app_id="m.http",
                 app_display_name="HTTP Push Notifications",
@@ -628,12 +628,12 @@ class HTTPPusherTests(HomeserverTestCase):
             self.hs.get_datastores().main.get_user_by_access_token(access_token)
         )
         assert user_tuple is not None
-        token_id = user_tuple.token_id
+        device_id = user_tuple.device_id
 
         self.get_success(
             self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
-                access_token=token_id,
+                device_id=device_id,
                 kind="http",
                 app_id="m.http",
                 app_display_name="HTTP Push Notifications",
@@ -764,12 +764,12 @@ class HTTPPusherTests(HomeserverTestCase):
             self.hs.get_datastores().main.get_user_by_access_token(access_token)
         )
         assert user_tuple is not None
-        token_id = user_tuple.token_id
+        device_id = user_tuple.device_id
 
         self.get_success(
             self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
-                access_token=token_id,
+                device_id=device_id,
                 kind="http",
                 app_id="m.http",
                 app_display_name="HTTP Push Notifications",
@@ -778,7 +778,6 @@ class HTTPPusherTests(HomeserverTestCase):
                 lang=None,
                 data={"url": "http://example.com/_matrix/push/v1/notify"},
                 enabled=enabled,
-                device_id=user_tuple.device_id,
             )
         )
 
@@ -895,19 +894,17 @@ class HTTPPusherTests(HomeserverTestCase):
 
     def test_update_different_device_access_token_device_id(self) -> None:
         """Tests that if we create a pusher from one device, the update it from another
-        device, the access token and device ID associated with the pusher stays the
-        same.
+        device, the device ID associated with the pusher stays the same.
         """
         # Create a user with a pusher.
         user_id, access_token = self._make_user_with_pusher("user")
 
-        # Get the token ID for the current access token, since that's what we store in
-        # the pushers table. Also get the device ID from it.
+        # Get the device ID for the current access token, since that's what we store in
+        # the pushers table.
         user_tuple = self.get_success(
             self.hs.get_datastores().main.get_user_by_access_token(access_token)
         )
         assert user_tuple is not None
-        token_id = user_tuple.token_id
         device_id = user_tuple.device_id
 
         # Generate a new access token, and update the pusher with it.
@@ -920,10 +917,9 @@ class HTTPPusherTests(HomeserverTestCase):
         )
         pushers: List[PusherConfig] = list(ret)
 
-        # Check that we still have one pusher, and that the access token and device ID
-        # associated with it didn't change.
+        # Check that we still have one pusher, and that the device ID associated with
+        # it didn't change.
         self.assertEqual(len(pushers), 1)
-        self.assertEqual(pushers[0].access_token, token_id)
         self.assertEqual(pushers[0].device_id, device_id)
 
     @override_config({"experimental_features": {"msc3881_enabled": True}})

+ 2 - 2
tests/replication/test_pusher_shard.py

@@ -51,12 +51,12 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
             self.hs.get_datastores().main.get_user_by_access_token(access_token)
         )
         assert user_dict is not None
-        token_id = user_dict.token_id
+        device_id = user_dict.device_id
 
         self.get_success(
             self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
-                access_token=token_id,
+                device_id=device_id,
                 kind="http",
                 app_id="m.http",
                 app_display_name="HTTP Push Notifications",

+ 2 - 2
tests/rest/admin/test_user.py

@@ -3047,12 +3047,12 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
             self.store.get_user_by_access_token(other_user_token)
         )
         assert user_tuple is not None
-        token_id = user_tuple.token_id
+        device_id = user_tuple.device_id
 
         self.get_success(
             self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=self.other_user,
-                access_token=token_id,
+                device_id=device_id,
                 kind="http",
                 app_id="m.http",
                 app_display_name="HTTP Push Notifications",

Some files were not shown because too many files changed in this diff