123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- TCP Replication
- ===============
- Motivation
- ----------
- Previously the workers used an HTTP long poll mechanism to get updates from the
- master, which had the problem of causing a lot of duplicate work on the server.
- This TCP protocol replaces those APIs with the aim of increased efficiency.
- Overview
- --------
- The protocol is based on fire and forget, line based commands. An example flow
- would be (where '>' indicates master to worker and '<' worker to master flows)::
- > SERVER example.com
- < REPLICATE events 53
- > RDATA events 54 ["$foo1:bar.com", ...]
- > RDATA events 55 ["$foo4:bar.com", ...]
- The example shows the server accepting a new connection and sending its identity
- with the ``SERVER`` command, followed by the client asking to subscribe to the
- ``events`` stream from the token ``53``. The server then periodically sends ``RDATA``
- commands which have the format ``RDATA <stream_name> <token> <row>``, where the
- format of ``<row>`` is defined by the individual streams.
- Error reporting happens by either the client or server sending an `ERROR`
- command, and usually the connection will be closed.
- Since the protocol is a simple line based, its possible to manually connect to
- the server using a tool like netcat. A few things should be noted when manually
- using the protocol:
- * When subscribing to a stream using ``REPLICATE``, the special token ``NOW`` can
- be used to get all future updates. The special stream name ``ALL`` can be used
- with ``NOW`` to subscribe to all available streams.
- * The federation stream is only available if federation sending has been
- disabled on the main process.
- * The server will only time connections out that have sent a ``PING`` command.
- If a ping is sent then the connection will be closed if no further commands
- are receieved within 15s. Both the client and server protocol implementations
- will send an initial PING on connection and ensure at least one command every
- 5s is sent (not necessarily ``PING``).
- * ``RDATA`` commands *usually* include a numeric token, however if the stream
- has multiple rows to replicate per token the server will send multiple
- ``RDATA`` commands, with all but the last having a token of ``batch``. See
- the documentation on ``commands.RdataCommand`` for further details.
- Architecture
- ------------
- The basic structure of the protocol is line based, where the initial word of
- each line specifies the command. The rest of the line is parsed based on the
- command. For example, the `RDATA` command is defined as::
- RDATA <stream_name> <token> <row_json>
- (Note that `<row_json>` may contains spaces, but cannot contain newlines.)
- Blank lines are ignored.
- Keep alives
- ~~~~~~~~~~~
- Both sides are expected to send at least one command every 5s or so, and
- should send a ``PING`` command if necessary. If either side do not receive a
- command within e.g. 15s then the connection should be closed.
- Because the server may be connected to manually using e.g. netcat, the timeouts
- aren't enabled until an initial ``PING`` command is seen. Both the client and
- server implementations below send a ``PING`` command immediately on connection to
- ensure the timeouts are enabled.
- This ensures that both sides can quickly realize if the tcp connection has gone
- and handle the situation appropriately.
- Start up
- ~~~~~~~~
- When a new connection is made, the server:
- * Sends a ``SERVER`` command, which includes the identity of the server, allowing
- the client to detect if its connected to the expected server
- * Sends a ``PING`` command as above, to enable the client to time out connections
- promptly.
- The client:
- * Sends a ``NAME`` command, allowing the server to associate a human friendly
- name with the connection. This is optional.
- * Sends a ``PING`` as above
- * For each stream the client wishes to subscribe to it sends a ``REPLICATE``
- with the stream_name and token it wants to subscribe from.
- * On receipt of a ``SERVER`` command, checks that the server name matches the
- expected server name.
- Error handling
- ~~~~~~~~~~~~~~
- If either side detects an error it can send an ``ERROR`` command and close the
- connection.
- If the client side loses the connection to the server it should reconnect,
- following the steps above.
- Congestion
- ~~~~~~~~~~
- If the server sends messages faster than the client can consume them the server
- will first buffer a (fairly large) number of commands and then disconnect the
- client. This ensures that we don't queue up an unbounded number of commands in
- memory and gives us a potential oppurtunity to squawk loudly. When/if the client
- recovers it can reconnect to the server and ask for missed messages.
- Reliability
- ~~~~~~~~~~~
- In general the replication stream should be considered an unreliable transport
- since e.g. commands are not resent if the connection disappears.
- The exception to that are the replication streams, i.e. RDATA commands, since
- these include tokens which can be used to restart the stream on connection
- errors.
- The client should keep track of the token in the last RDATA command received
- for each stream so that on reconneciton it can start streaming from the correct
- place. Note: not all RDATA have valid tokens due to batching. See
- ``RdataCommand`` for more details.
- Example
- ~~~~~~~
- An example iteraction is shown below. Each line is prefixed with '>' or '<' to
- indicate which side is sending, these are *not* included on the wire::
- * connection established *
- > SERVER localhost:8823
- > PING 1490197665618
- < NAME synapse.app.appservice
- < PING 1490197665618
- < REPLICATE events 1
- < REPLICATE backfill 1
- < REPLICATE caches 1
- > POSITION events 1
- > POSITION backfill 1
- > POSITION caches 1
- > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
- > RDATA events 14 ["$149019767112vOHxz:localhost:8823",
- "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
- < PING 1490197675618
- > ERROR server stopping
- * connection closed by server *
- The ``POSITION`` command sent by the server is used to set the clients position
- without needing to send data with the ``RDATA`` command.
- An example of a batched set of ``RDATA`` is::
- > RDATA caches batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
- > RDATA caches batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
- > RDATA caches batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
- > RDATA caches 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
- In this case the client shouldn't advance their caches token until it sees the
- the last ``RDATA``.
- List of commands
- ~~~~~~~~~~~~~~~~
- The list of valid commands, with which side can send it: server (S) or client (C):
- SERVER (S)
- Sent at the start to identify which server the client is talking to
- RDATA (S)
- A single update in a stream
- POSITION (S)
- The position of the stream has been updated. Sent to the client after all
- missing updates for a stream have been sent to the client and they're now
- up to date.
- ERROR (S, C)
- There was an error
- PING (S, C)
- Sent periodically to ensure the connection is still alive
- NAME (C)
- Sent at the start by client to inform the server who they are
- REPLICATE (C)
- Asks the server to replicate a given stream
- USER_SYNC (C)
- A user has started or stopped syncing
- FEDERATION_ACK (C)
- Acknowledge receipt of some federation data
- REMOVE_PUSHER (C)
- Inform the server a pusher should be removed
- INVALIDATE_CACHE (C)
- Inform the server a cache should be invalidated
- SYNC (S, C)
- Used exclusively in tests
- See ``synapse/replication/tcp/commands.py`` for a detailed description and the
- format of each command.
- Cache Invalidation Stream
- ~~~~~~~~~~~~~~~~~~~~~~~~~
- The cache invalidation stream is used to inform workers when they need to
- invalidate any of their caches in the data store. This is done by streaming all
- cache invalidations done on master down to the workers, assuming that any caches
- on the workers also exist on the master.
- Each individual cache invalidation results in a row being sent down replication,
- which includes the cache name (the name of the function) and they key to
- invalidate. For example::
- > RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
- However, there are times when a number of caches need to be invalidated at the
- same time with the same key. To reduce traffic we batch those invalidations into
- a single poke by defining a special cache name that workers understand to mean
- to expand to invalidate the correct caches.
- Currently the special cache names are declared in ``synapse/storage/_base.py``
- and are:
- 1. ``cs_cache_fake`` ─ invalidates caches that depend on the current state
|