tcp_replication.rst 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. TCP Replication
  2. ===============
  3. Motivation
  4. ----------
  5. Previously the workers used an HTTP long poll mechanism to get updates from the
  6. master, which had the problem of causing a lot of duplicate work on the server.
  7. This TCP protocol replaces those APIs with the aim of increased efficiency.
  8. Overview
  9. --------
  10. The protocol is based on fire and forget, line based commands. An example flow
  11. would be (where '>' indicates master to worker and '<' worker to master flows)::
  12. > SERVER example.com
  13. < REPLICATE events 53
  14. > RDATA events 54 ["$foo1:bar.com", ...]
  15. > RDATA events 55 ["$foo4:bar.com", ...]
  16. The example shows the server accepting a new connection and sending its identity
  17. with the ``SERVER`` command, followed by the client asking to subscribe to the
  18. ``events`` stream from the token ``53``. The server then periodically sends ``RDATA``
  19. commands which have the format ``RDATA <stream_name> <token> <row>``, where the
  20. format of ``<row>`` is defined by the individual streams.
  21. Error reporting happens by either the client or server sending an `ERROR`
  22. command, and usually the connection will be closed.
  23. Since the protocol is a simple line based, its possible to manually connect to
  24. the server using a tool like netcat. A few things should be noted when manually
  25. using the protocol:
  26. * When subscribing to a stream using ``REPLICATE``, the special token ``NOW`` can
  27. be used to get all future updates. The special stream name ``ALL`` can be used
  28. with ``NOW`` to subscribe to all available streams.
  29. * The federation stream is only available if federation sending has been
  30. disabled on the main process.
  31. * The server will only time connections out that have sent a ``PING`` command.
  32. If a ping is sent then the connection will be closed if no further commands
  33. are receieved within 15s. Both the client and server protocol implementations
  34. will send an initial PING on connection and ensure at least one command every
  35. 5s is sent (not necessarily ``PING``).
  36. * ``RDATA`` commands *usually* include a numeric token, however if the stream
  37. has multiple rows to replicate per token the server will send multiple
  38. ``RDATA`` commands, with all but the last having a token of ``batch``. See
  39. the documentation on ``commands.RdataCommand`` for further details.
  40. Architecture
  41. ------------
  42. The basic structure of the protocol is line based, where the initial word of
  43. each line specifies the command. The rest of the line is parsed based on the
  44. command. For example, the `RDATA` command is defined as::
  45. RDATA <stream_name> <token> <row_json>
  46. (Note that `<row_json>` may contains spaces, but cannot contain newlines.)
  47. Blank lines are ignored.
  48. Keep alives
  49. ~~~~~~~~~~~
  50. Both sides are expected to send at least one command every 5s or so, and
  51. should send a ``PING`` command if necessary. If either side do not receive a
  52. command within e.g. 15s then the connection should be closed.
  53. Because the server may be connected to manually using e.g. netcat, the timeouts
  54. aren't enabled until an initial ``PING`` command is seen. Both the client and
  55. server implementations below send a ``PING`` command immediately on connection to
  56. ensure the timeouts are enabled.
  57. This ensures that both sides can quickly realize if the tcp connection has gone
  58. and handle the situation appropriately.
  59. Start up
  60. ~~~~~~~~
  61. When a new connection is made, the server:
  62. * Sends a ``SERVER`` command, which includes the identity of the server, allowing
  63. the client to detect if its connected to the expected server
  64. * Sends a ``PING`` command as above, to enable the client to time out connections
  65. promptly.
  66. The client:
  67. * Sends a ``NAME`` command, allowing the server to associate a human friendly
  68. name with the connection. This is optional.
  69. * Sends a ``PING`` as above
  70. * For each stream the client wishes to subscribe to it sends a ``REPLICATE``
  71. with the stream_name and token it wants to subscribe from.
  72. * On receipt of a ``SERVER`` command, checks that the server name matches the
  73. expected server name.
  74. Error handling
  75. ~~~~~~~~~~~~~~
  76. If either side detects an error it can send an ``ERROR`` command and close the
  77. connection.
  78. If the client side loses the connection to the server it should reconnect,
  79. following the steps above.
  80. Congestion
  81. ~~~~~~~~~~
  82. If the server sends messages faster than the client can consume them the server
  83. will first buffer a (fairly large) number of commands and then disconnect the
  84. client. This ensures that we don't queue up an unbounded number of commands in
  85. memory and gives us a potential oppurtunity to squawk loudly. When/if the client
  86. recovers it can reconnect to the server and ask for missed messages.
  87. Reliability
  88. ~~~~~~~~~~~
  89. In general the replication stream should be considered an unreliable transport
  90. since e.g. commands are not resent if the connection disappears.
  91. The exception to that are the replication streams, i.e. RDATA commands, since
  92. these include tokens which can be used to restart the stream on connection
  93. errors.
  94. The client should keep track of the token in the last RDATA command received
  95. for each stream so that on reconneciton it can start streaming from the correct
  96. place. Note: not all RDATA have valid tokens due to batching. See
  97. ``RdataCommand`` for more details.
  98. Example
  99. ~~~~~~~
  100. An example iteraction is shown below. Each line is prefixed with '>' or '<' to
  101. indicate which side is sending, these are *not* included on the wire::
  102. * connection established *
  103. > SERVER localhost:8823
  104. > PING 1490197665618
  105. < NAME synapse.app.appservice
  106. < PING 1490197665618
  107. < REPLICATE events 1
  108. < REPLICATE backfill 1
  109. < REPLICATE caches 1
  110. > POSITION events 1
  111. > POSITION backfill 1
  112. > POSITION caches 1
  113. > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
  114. > RDATA events 14 ["$149019767112vOHxz:localhost:8823",
  115. "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
  116. < PING 1490197675618
  117. > ERROR server stopping
  118. * connection closed by server *
  119. The ``POSITION`` command sent by the server is used to set the clients position
  120. without needing to send data with the ``RDATA`` command.
  121. An example of a batched set of ``RDATA`` is::
  122. > RDATA caches batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
  123. > RDATA caches batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
  124. > RDATA caches batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
  125. > RDATA caches 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
  126. In this case the client shouldn't advance their caches token until it sees the
  127. the last ``RDATA``.
  128. List of commands
  129. ~~~~~~~~~~~~~~~~
  130. The list of valid commands, with which side can send it: server (S) or client (C):
  131. SERVER (S)
  132. Sent at the start to identify which server the client is talking to
  133. RDATA (S)
  134. A single update in a stream
  135. POSITION (S)
  136. The position of the stream has been updated. Sent to the client after all
  137. missing updates for a stream have been sent to the client and they're now
  138. up to date.
  139. ERROR (S, C)
  140. There was an error
  141. PING (S, C)
  142. Sent periodically to ensure the connection is still alive
  143. NAME (C)
  144. Sent at the start by client to inform the server who they are
  145. REPLICATE (C)
  146. Asks the server to replicate a given stream
  147. USER_SYNC (C)
  148. A user has started or stopped syncing
  149. FEDERATION_ACK (C)
  150. Acknowledge receipt of some federation data
  151. REMOVE_PUSHER (C)
  152. Inform the server a pusher should be removed
  153. INVALIDATE_CACHE (C)
  154. Inform the server a cache should be invalidated
  155. SYNC (S, C)
  156. Used exclusively in tests
  157. See ``synapse/replication/tcp/commands.py`` for a detailed description and the
  158. format of each command.
  159. Cache Invalidation Stream
  160. ~~~~~~~~~~~~~~~~~~~~~~~~~
  161. The cache invalidation stream is used to inform workers when they need to
  162. invalidate any of their caches in the data store. This is done by streaming all
  163. cache invalidations done on master down to the workers, assuming that any caches
  164. on the workers also exist on the master.
  165. Each individual cache invalidation results in a row being sent down replication,
  166. which includes the cache name (the name of the function) and they key to
  167. invalidate. For example::
  168. > RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
  169. However, there are times when a number of caches need to be invalidated at the
  170. same time with the same key. To reduce traffic we batch those invalidations into
  171. a single poke by defining a special cache name that workers understand to mean
  172. to expand to invalidate the correct caches.
  173. Currently the special cache names are declared in ``synapse/storage/_base.py``
  174. and are:
  175. 1. ``cs_cache_fake`` ─ invalidates caches that depend on the current state