handler.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767
  1. # Copyright 2017 Vector Creations Ltd
  2. # Copyright 2020 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from typing import (
  17. TYPE_CHECKING,
  18. Any,
  19. Awaitable,
  20. Dict,
  21. Iterable,
  22. Iterator,
  23. List,
  24. Optional,
  25. Set,
  26. Tuple,
  27. TypeVar,
  28. Union,
  29. )
  30. from prometheus_client import Counter
  31. from typing_extensions import Deque
  32. from twisted.internet.protocol import ReconnectingClientFactory
  33. from synapse.metrics import LaterGauge
  34. from synapse.metrics.background_process_metrics import run_as_background_process
  35. from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
  36. from synapse.replication.tcp.commands import (
  37. ClearUserSyncsCommand,
  38. Command,
  39. FederationAckCommand,
  40. PositionCommand,
  41. RdataCommand,
  42. RemoteServerUpCommand,
  43. ReplicateCommand,
  44. UserIpCommand,
  45. UserSyncCommand,
  46. )
  47. from synapse.replication.tcp.protocol import IReplicationConnection
  48. from synapse.replication.tcp.streams import (
  49. STREAMS_MAP,
  50. AccountDataStream,
  51. BackfillStream,
  52. CachesStream,
  53. EventsStream,
  54. FederationStream,
  55. PresenceFederationStream,
  56. PresenceStream,
  57. ReceiptsStream,
  58. Stream,
  59. TagAccountDataStream,
  60. ToDeviceStream,
  61. TypingStream,
  62. )
  63. if TYPE_CHECKING:
  64. from synapse.server import HomeServer
  65. logger = logging.getLogger(__name__)
  66. # number of updates received for each RDATA stream
  67. inbound_rdata_count = Counter(
  68. "synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
  69. )
  70. user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
  71. federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
  72. remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
  73. user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
  74. # the type of the entries in _command_queues_by_stream
  75. _StreamCommandQueue = Deque[
  76. Tuple[Union[RdataCommand, PositionCommand], IReplicationConnection]
  77. ]
  78. class ReplicationCommandHandler:
  79. """Handles incoming commands from replication as well as sending commands
  80. back out to connections.
  81. """
  82. def __init__(self, hs: "HomeServer"):
  83. self._replication_data_handler = hs.get_replication_data_handler()
  84. self._presence_handler = hs.get_presence_handler()
  85. self._store = hs.get_datastore()
  86. self._notifier = hs.get_notifier()
  87. self._clock = hs.get_clock()
  88. self._instance_id = hs.get_instance_id()
  89. self._instance_name = hs.get_instance_name()
  90. self._is_presence_writer = (
  91. hs.get_instance_name() in hs.config.worker.writers.presence
  92. )
  93. self._streams: Dict[str, Stream] = {
  94. stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
  95. }
  96. # List of streams that this instance is the source of
  97. self._streams_to_replicate: List[Stream] = []
  98. for stream in self._streams.values():
  99. if hs.config.redis.redis_enabled and stream.NAME == CachesStream.NAME:
  100. # All workers can write to the cache invalidation stream when
  101. # using redis.
  102. self._streams_to_replicate.append(stream)
  103. continue
  104. if isinstance(stream, (EventsStream, BackfillStream)):
  105. # Only add EventStream and BackfillStream as a source on the
  106. # instance in charge of event persistence.
  107. if hs.get_instance_name() in hs.config.worker.writers.events:
  108. self._streams_to_replicate.append(stream)
  109. continue
  110. if isinstance(stream, ToDeviceStream):
  111. # Only add ToDeviceStream as a source on instances in charge of
  112. # sending to device messages.
  113. if hs.get_instance_name() in hs.config.worker.writers.to_device:
  114. self._streams_to_replicate.append(stream)
  115. continue
  116. if isinstance(stream, TypingStream):
  117. # Only add TypingStream as a source on the instance in charge of
  118. # typing.
  119. if hs.get_instance_name() in hs.config.worker.writers.typing:
  120. self._streams_to_replicate.append(stream)
  121. continue
  122. if isinstance(stream, (AccountDataStream, TagAccountDataStream)):
  123. # Only add AccountDataStream and TagAccountDataStream as a source on the
  124. # instance in charge of account_data persistence.
  125. if hs.get_instance_name() in hs.config.worker.writers.account_data:
  126. self._streams_to_replicate.append(stream)
  127. continue
  128. if isinstance(stream, ReceiptsStream):
  129. # Only add ReceiptsStream as a source on the instance in charge of
  130. # receipts.
  131. if hs.get_instance_name() in hs.config.worker.writers.receipts:
  132. self._streams_to_replicate.append(stream)
  133. continue
  134. if isinstance(stream, (PresenceStream, PresenceFederationStream)):
  135. # Only add PresenceStream as a source on the instance in charge
  136. # of presence.
  137. if self._is_presence_writer:
  138. self._streams_to_replicate.append(stream)
  139. continue
  140. # Only add any other streams if we're on master.
  141. if hs.config.worker.worker_app is not None:
  142. continue
  143. if (
  144. stream.NAME == FederationStream.NAME
  145. and hs.config.worker.send_federation
  146. ):
  147. # We only support federation stream if federation sending
  148. # has been disabled on the master.
  149. continue
  150. self._streams_to_replicate.append(stream)
  151. # Map of stream name to batched updates. See RdataCommand for info on
  152. # how batching works.
  153. self._pending_batches: Dict[str, List[Any]] = {}
  154. # The factory used to create connections.
  155. self._factory: Optional[ReconnectingClientFactory] = None
  156. # The currently connected connections. (The list of places we need to send
  157. # outgoing replication commands to.)
  158. self._connections: List[IReplicationConnection] = []
  159. LaterGauge(
  160. "synapse_replication_tcp_resource_total_connections",
  161. "",
  162. [],
  163. lambda: len(self._connections),
  164. )
  165. # When POSITION or RDATA commands arrive, we stick them in a queue and process
  166. # them in order in a separate background process.
  167. # the streams which are currently being processed by _unsafe_process_queue
  168. self._processing_streams: Set[str] = set()
  169. # for each stream, a queue of commands that are awaiting processing, and the
  170. # connection that they arrived on.
  171. self._command_queues_by_stream = {
  172. stream_name: _StreamCommandQueue() for stream_name in self._streams
  173. }
  174. # For each connection, the incoming stream names that have received a POSITION
  175. # from that connection.
  176. self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
  177. LaterGauge(
  178. "synapse_replication_tcp_command_queue",
  179. "Number of inbound RDATA/POSITION commands queued for processing",
  180. ["stream_name"],
  181. lambda: {
  182. (stream_name,): len(queue)
  183. for stream_name, queue in self._command_queues_by_stream.items()
  184. },
  185. )
  186. self._is_master = hs.config.worker.worker_app is None
  187. self._federation_sender = None
  188. if self._is_master and not hs.config.worker.send_federation:
  189. self._federation_sender = hs.get_federation_sender()
  190. self._server_notices_sender = None
  191. if self._is_master:
  192. self._server_notices_sender = hs.get_server_notices_sender()
  193. def _add_command_to_stream_queue(
  194. self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand]
  195. ) -> None:
  196. """Queue the given received command for processing
  197. Adds the given command to the per-stream queue, and processes the queue if
  198. necessary
  199. """
  200. stream_name = cmd.stream_name
  201. queue = self._command_queues_by_stream.get(stream_name)
  202. if queue is None:
  203. logger.error("Got %s for unknown stream: %s", cmd.NAME, stream_name)
  204. return
  205. queue.append((cmd, conn))
  206. # if we're already processing this stream, there's nothing more to do:
  207. # the new entry on the queue will get picked up in due course
  208. if stream_name in self._processing_streams:
  209. return
  210. # fire off a background process to start processing the queue.
  211. run_as_background_process(
  212. "process-replication-data", self._unsafe_process_queue, stream_name
  213. )
  214. async def _unsafe_process_queue(self, stream_name: str):
  215. """Processes the command queue for the given stream, until it is empty
  216. Does not check if there is already a thread processing the queue, hence "unsafe"
  217. """
  218. assert stream_name not in self._processing_streams
  219. self._processing_streams.add(stream_name)
  220. try:
  221. queue = self._command_queues_by_stream.get(stream_name)
  222. while queue:
  223. cmd, conn = queue.popleft()
  224. try:
  225. await self._process_command(cmd, conn, stream_name)
  226. except Exception:
  227. logger.exception("Failed to handle command %s", cmd)
  228. finally:
  229. self._processing_streams.discard(stream_name)
  230. async def _process_command(
  231. self,
  232. cmd: Union[PositionCommand, RdataCommand],
  233. conn: IReplicationConnection,
  234. stream_name: str,
  235. ) -> None:
  236. if isinstance(cmd, PositionCommand):
  237. await self._process_position(stream_name, conn, cmd)
  238. elif isinstance(cmd, RdataCommand):
  239. await self._process_rdata(stream_name, conn, cmd)
  240. else:
  241. # This shouldn't be possible
  242. raise Exception("Unrecognised command %s in stream queue", cmd.NAME)
  243. def start_replication(self, hs: "HomeServer"):
  244. """Helper method to start a replication connection to the remote server
  245. using TCP.
  246. """
  247. if hs.config.redis.redis_enabled:
  248. from synapse.replication.tcp.redis import (
  249. RedisDirectTcpReplicationClientFactory,
  250. )
  251. # First let's ensure that we have a ReplicationStreamer started.
  252. hs.get_replication_streamer()
  253. # We need two connections to redis, one for the subscription stream and
  254. # one to send commands to (as you can't send further redis commands to a
  255. # connection after SUBSCRIBE is called).
  256. # First create the connection for sending commands.
  257. outbound_redis_connection = hs.get_outbound_redis_connection()
  258. # Now create the factory/connection for the subscription stream.
  259. self._factory = RedisDirectTcpReplicationClientFactory(
  260. hs, outbound_redis_connection
  261. )
  262. hs.get_reactor().connectTCP(
  263. hs.config.redis.redis_host, # type: ignore[arg-type]
  264. hs.config.redis.redis_port,
  265. self._factory,
  266. timeout=30,
  267. bindAddress=None,
  268. )
  269. else:
  270. client_name = hs.get_instance_name()
  271. self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
  272. host = hs.config.worker.worker_replication_host
  273. port = hs.config.worker.worker_replication_port
  274. hs.get_reactor().connectTCP(
  275. host, # type: ignore[arg-type]
  276. port,
  277. self._factory,
  278. timeout=30,
  279. bindAddress=None,
  280. )
  281. def get_streams(self) -> Dict[str, Stream]:
  282. """Get a map from stream name to all streams."""
  283. return self._streams
  284. def get_streams_to_replicate(self) -> List[Stream]:
  285. """Get a list of streams that this instances replicates."""
  286. return self._streams_to_replicate
  287. def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand):
  288. self.send_positions_to_connection(conn)
  289. def send_positions_to_connection(self, conn: IReplicationConnection):
  290. """Send current position of all streams this process is source of to
  291. the connection.
  292. """
  293. # We respond with current position of all streams this instance
  294. # replicates.
  295. for stream in self.get_streams_to_replicate():
  296. # Note that we use the current token as the prev token here (rather
  297. # than stream.last_token), as we can't be sure that there have been
  298. # no rows written between last token and the current token (since we
  299. # might be racing with the replication sending bg process).
  300. current_token = stream.current_token(self._instance_name)
  301. self.send_command(
  302. PositionCommand(
  303. stream.NAME,
  304. self._instance_name,
  305. current_token,
  306. current_token,
  307. )
  308. )
  309. def on_USER_SYNC(
  310. self, conn: IReplicationConnection, cmd: UserSyncCommand
  311. ) -> Optional[Awaitable[None]]:
  312. user_sync_counter.inc()
  313. if self._is_presence_writer:
  314. return self._presence_handler.update_external_syncs_row(
  315. cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
  316. )
  317. else:
  318. return None
  319. def on_CLEAR_USER_SYNC(
  320. self, conn: IReplicationConnection, cmd: ClearUserSyncsCommand
  321. ) -> Optional[Awaitable[None]]:
  322. if self._is_presence_writer:
  323. return self._presence_handler.update_external_syncs_clear(cmd.instance_id)
  324. else:
  325. return None
  326. def on_FEDERATION_ACK(
  327. self, conn: IReplicationConnection, cmd: FederationAckCommand
  328. ):
  329. federation_ack_counter.inc()
  330. if self._federation_sender:
  331. self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
  332. def on_USER_IP(
  333. self, conn: IReplicationConnection, cmd: UserIpCommand
  334. ) -> Optional[Awaitable[None]]:
  335. user_ip_cache_counter.inc()
  336. if self._is_master:
  337. return self._handle_user_ip(cmd)
  338. else:
  339. return None
  340. async def _handle_user_ip(self, cmd: UserIpCommand):
  341. await self._store.insert_client_ip(
  342. cmd.user_id,
  343. cmd.access_token,
  344. cmd.ip,
  345. cmd.user_agent,
  346. cmd.device_id,
  347. cmd.last_seen,
  348. )
  349. assert self._server_notices_sender is not None
  350. await self._server_notices_sender.on_user_ip(cmd.user_id)
  351. def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand):
  352. if cmd.instance_name == self._instance_name:
  353. # Ignore RDATA that are just our own echoes
  354. return
  355. stream_name = cmd.stream_name
  356. inbound_rdata_count.labels(stream_name).inc()
  357. # We put the received command into a queue here for two reasons:
  358. # 1. so we don't try and concurrently handle multiple rows for the
  359. # same stream, and
  360. # 2. so we don't race with getting a POSITION command and fetching
  361. # missing RDATA.
  362. self._add_command_to_stream_queue(conn, cmd)
  363. async def _process_rdata(
  364. self, stream_name: str, conn: IReplicationConnection, cmd: RdataCommand
  365. ) -> None:
  366. """Process an RDATA command
  367. Called after the command has been popped off the queue of inbound commands
  368. """
  369. try:
  370. row = STREAMS_MAP[stream_name].parse_row(cmd.row)
  371. except Exception as e:
  372. raise Exception(
  373. "Failed to parse RDATA: %r %r" % (stream_name, cmd.row)
  374. ) from e
  375. # make sure that we've processed a POSITION for this stream *on this
  376. # connection*. (A POSITION on another connection is no good, as there
  377. # is no guarantee that we have seen all the intermediate updates.)
  378. sbc = self._streams_by_connection.get(conn)
  379. if not sbc or stream_name not in sbc:
  380. # Let's drop the row for now, on the assumption we'll receive a
  381. # `POSITION` soon and we'll catch up correctly then.
  382. logger.debug(
  383. "Discarding RDATA for unconnected stream %s -> %s",
  384. stream_name,
  385. cmd.token,
  386. )
  387. return
  388. if cmd.token is None:
  389. # I.e. this is part of a batch of updates for this stream (in
  390. # which case batch until we get an update for the stream with a non
  391. # None token).
  392. self._pending_batches.setdefault(stream_name, []).append(row)
  393. return
  394. # Check if this is the last of a batch of updates
  395. rows = self._pending_batches.pop(stream_name, [])
  396. rows.append(row)
  397. stream = self._streams[stream_name]
  398. # Find where we previously streamed up to.
  399. current_token = stream.current_token(cmd.instance_name)
  400. # Discard this data if this token is earlier than the current
  401. # position. Note that streams can be reset (in which case you
  402. # expect an earlier token), but that must be preceded by a
  403. # POSITION command.
  404. if cmd.token <= current_token:
  405. logger.debug(
  406. "Discarding RDATA from stream %s at position %s before previous position %s",
  407. stream_name,
  408. cmd.token,
  409. current_token,
  410. )
  411. else:
  412. await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
  413. async def on_rdata(
  414. self, stream_name: str, instance_name: str, token: int, rows: list
  415. ):
  416. """Called to handle a batch of replication data with a given stream token.
  417. Args:
  418. stream_name: name of the replication stream for this batch of rows
  419. instance_name: the instance that wrote the rows.
  420. token: stream token for this batch of rows
  421. rows: a list of Stream.ROW_TYPE objects as returned by
  422. Stream.parse_row.
  423. """
  424. logger.debug("Received rdata %s (%s) -> %s", stream_name, instance_name, token)
  425. await self._replication_data_handler.on_rdata(
  426. stream_name, instance_name, token, rows
  427. )
  428. def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand):
  429. if cmd.instance_name == self._instance_name:
  430. # Ignore POSITION that are just our own echoes
  431. return
  432. logger.info("Handling '%s %s'", cmd.NAME, cmd.to_line())
  433. self._add_command_to_stream_queue(conn, cmd)
  434. async def _process_position(
  435. self, stream_name: str, conn: IReplicationConnection, cmd: PositionCommand
  436. ) -> None:
  437. """Process a POSITION command
  438. Called after the command has been popped off the queue of inbound commands
  439. """
  440. stream = self._streams[stream_name]
  441. # We're about to go and catch up with the stream, so remove from set
  442. # of connected streams.
  443. for streams in self._streams_by_connection.values():
  444. streams.discard(stream_name)
  445. # We clear the pending batches for the stream as the fetching of the
  446. # missing updates below will fetch all rows in the batch.
  447. self._pending_batches.pop(stream_name, [])
  448. # Find where we previously streamed up to.
  449. current_token = stream.current_token(cmd.instance_name)
  450. # If the position token matches our current token then we're up to
  451. # date and there's nothing to do. Otherwise, fetch all updates
  452. # between then and now.
  453. missing_updates = cmd.prev_token != current_token
  454. while missing_updates:
  455. logger.info(
  456. "Fetching replication rows for '%s' between %i and %i",
  457. stream_name,
  458. current_token,
  459. cmd.new_token,
  460. )
  461. (updates, current_token, missing_updates) = await stream.get_updates_since(
  462. cmd.instance_name, current_token, cmd.new_token
  463. )
  464. # TODO: add some tests for this
  465. # Some streams return multiple rows with the same stream IDs,
  466. # which need to be processed in batches.
  467. for token, rows in _batch_updates(updates):
  468. await self.on_rdata(
  469. stream_name,
  470. cmd.instance_name,
  471. token,
  472. [stream.parse_row(row) for row in rows],
  473. )
  474. logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token)
  475. # We've now caught up to position sent to us, notify handler.
  476. await self._replication_data_handler.on_position(
  477. cmd.stream_name, cmd.instance_name, cmd.new_token
  478. )
  479. self._streams_by_connection.setdefault(conn, set()).add(stream_name)
  480. def on_REMOTE_SERVER_UP(
  481. self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
  482. ):
  483. """Called when get a new REMOTE_SERVER_UP command."""
  484. self._replication_data_handler.on_remote_server_up(cmd.data)
  485. self._notifier.notify_remote_server_up(cmd.data)
  486. # We relay to all other connections to ensure every instance gets the
  487. # notification.
  488. #
  489. # When configured to use redis we'll always only have one connection and
  490. # so this is a no-op (all instances will have already received the same
  491. # REMOTE_SERVER_UP command).
  492. #
  493. # For direct TCP connections this will relay to all other connections
  494. # connected to us. When on master this will correctly fan out to all
  495. # other direct TCP clients and on workers there'll only be the one
  496. # connection to master.
  497. #
  498. # (The logic here should also be sound if we have a mix of Redis and
  499. # direct TCP connections so long as there is only one traffic route
  500. # between two instances, but that is not currently supported).
  501. self.send_command(cmd, ignore_conn=conn)
  502. def new_connection(self, connection: IReplicationConnection):
  503. """Called when we have a new connection."""
  504. self._connections.append(connection)
  505. # If we are connected to replication as a client (rather than a server)
  506. # we need to reset the reconnection delay on the client factory (which
  507. # is used to do exponential back off when the connection drops).
  508. #
  509. # Ideally we would reset the delay when we've "fully established" the
  510. # connection (for some definition thereof) to stop us from tightlooping
  511. # on reconnection if something fails after this point and we drop the
  512. # connection. Unfortunately, we don't really have a better definition of
  513. # "fully established" than the connection being established.
  514. if self._factory:
  515. self._factory.resetDelay()
  516. # Tell the other end if we have any users currently syncing.
  517. currently_syncing = (
  518. self._presence_handler.get_currently_syncing_users_for_replication()
  519. )
  520. now = self._clock.time_msec()
  521. for user_id in currently_syncing:
  522. connection.send_command(
  523. UserSyncCommand(self._instance_id, user_id, True, now)
  524. )
  525. def lost_connection(self, connection: IReplicationConnection):
  526. """Called when a connection is closed/lost."""
  527. # we no longer need _streams_by_connection for this connection.
  528. streams = self._streams_by_connection.pop(connection, None)
  529. if streams:
  530. logger.info(
  531. "Lost replication connection; streams now disconnected: %s", streams
  532. )
  533. try:
  534. self._connections.remove(connection)
  535. except ValueError:
  536. pass
  537. def connected(self) -> bool:
  538. """Do we have any replication connections open?
  539. Is used by e.g. `ReplicationStreamer` to no-op if nothing is connected.
  540. """
  541. return bool(self._connections)
  542. def send_command(
  543. self, cmd: Command, ignore_conn: Optional[IReplicationConnection] = None
  544. ):
  545. """Send a command to all connected connections.
  546. Args:
  547. cmd
  548. ignore_conn: If set don't send command to the given connection.
  549. Used when relaying commands from one connection to all others.
  550. """
  551. if self._connections:
  552. for connection in self._connections:
  553. if connection == ignore_conn:
  554. continue
  555. try:
  556. connection.send_command(cmd)
  557. except Exception:
  558. # We probably want to catch some types of exceptions here
  559. # and log them as warnings (e.g. connection gone), but I
  560. # can't find what those exception types they would be.
  561. logger.exception(
  562. "Failed to write command %s to connection %s",
  563. cmd.NAME,
  564. connection,
  565. )
  566. else:
  567. logger.warning("Dropping command as not connected: %r", cmd.NAME)
  568. def send_federation_ack(self, token: int):
  569. """Ack data for the federation stream. This allows the master to drop
  570. data stored purely in memory.
  571. """
  572. self.send_command(FederationAckCommand(self._instance_name, token))
  573. def send_user_sync(
  574. self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
  575. ):
  576. """Poke the master that a user has started/stopped syncing."""
  577. self.send_command(
  578. UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
  579. )
  580. def send_user_ip(
  581. self,
  582. user_id: str,
  583. access_token: str,
  584. ip: str,
  585. user_agent: str,
  586. device_id: str,
  587. last_seen: int,
  588. ):
  589. """Tell the master that the user made a request."""
  590. cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
  591. self.send_command(cmd)
  592. def send_remote_server_up(self, server: str):
  593. self.send_command(RemoteServerUpCommand(server))
  594. def stream_update(self, stream_name: str, token: str, data: Any):
  595. """Called when a new update is available to stream to clients.
  596. We need to check if the client is interested in the stream or not
  597. """
  598. self.send_command(RdataCommand(stream_name, self._instance_name, token, data))
  599. UpdateToken = TypeVar("UpdateToken")
  600. UpdateRow = TypeVar("UpdateRow")
  601. def _batch_updates(
  602. updates: Iterable[Tuple[UpdateToken, UpdateRow]]
  603. ) -> Iterator[Tuple[UpdateToken, List[UpdateRow]]]:
  604. """Collect stream updates with the same token together
  605. Given a series of updates returned by Stream.get_updates_since(), collects
  606. the updates which share the same stream_id together.
  607. For example:
  608. [(1, a), (1, b), (2, c), (3, d), (3, e)]
  609. becomes:
  610. [
  611. (1, [a, b]),
  612. (2, [c]),
  613. (3, [d, e]),
  614. ]
  615. """
  616. update_iter = iter(updates)
  617. first_update = next(update_iter, None)
  618. if first_update is None:
  619. # empty input
  620. return
  621. current_batch_token = first_update[0]
  622. current_batch = [first_update[1]]
  623. for token, row in update_iter:
  624. if token != current_batch_token:
  625. # different token to the previous row: flush the previous
  626. # batch and start anew
  627. yield current_batch_token, current_batch
  628. current_batch_token = token
  629. current_batch = []
  630. current_batch.append(row)
  631. # flush the final batch
  632. yield current_batch_token, current_batch