123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638 |
- # -*- coding: utf-8 -*-
- # Copyright 2017 Vector Creations Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """This module contains the implementation of both the client and server
- protocols.
- The basic structure of the protocol is line based, where the initial word of
- each line specifies the command. The rest of the line is parsed based on the
- command. For example, the `RDATA` command is defined as::
- RDATA <stream_name> <token> <row_json>
- (Note that `<row_json>` may contains spaces, but cannot contain newlines.)
- Blank lines are ignored.
- # Example
- An example iteraction is shown below. Each line is prefixed with '>' or '<' to
- indicate which side is sending, these are *not* included on the wire::
- * connection established *
- > SERVER localhost:8823
- > PING 1490197665618
- < NAME synapse.app.appservice
- < PING 1490197665618
- < REPLICATE events 1
- < REPLICATE backfill 1
- < REPLICATE caches 1
- > POSITION events 1
- > POSITION backfill 1
- > POSITION caches 1
- > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
- > RDATA events 14 ["$149019767112vOHxz:localhost:8823",
- "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
- < PING 1490197675618
- > ERROR server stopping
- * connection closed by server *
- """
- from twisted.internet import defer
- from twisted.protocols.basic import LineOnlyReceiver
- from twisted.python.failure import Failure
- from .commands import (
- COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS,
- ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand,
- NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand,
- )
- from .streams import STREAMS_MAP
- from synapse.metrics import LaterGauge
- from synapse.util.stringutils import random_string
- from prometheus_client import Counter
- from collections import defaultdict
- from six import iterkeys, iteritems
- import logging
- import struct
- import fcntl
- connection_close_counter = Counter(
- "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"])
- # A list of all connected protocols. This allows us to send metrics about the
- # connections.
- connected_connections = []
- logger = logging.getLogger(__name__)
- PING_TIME = 5000
- PING_TIMEOUT_MULTIPLIER = 5
- PING_TIMEOUT_MS = PING_TIME * PING_TIMEOUT_MULTIPLIER
- class ConnectionStates(object):
- CONNECTING = "connecting"
- ESTABLISHED = "established"
- PAUSED = "paused"
- CLOSED = "closed"
- class BaseReplicationStreamProtocol(LineOnlyReceiver):
- """Base replication protocol shared between client and server.
- Reads lines (ignoring blank ones) and parses them into command classes,
- asserting that they are valid for the given direction, i.e. server commands
- are only sent by the server.
- On receiving a new command it calls `on_<COMMAND_NAME>` with the parsed
- command.
- It also sends `PING` periodically, and correctly times out remote connections
- (if they send a `PING` command)
- """
- delimiter = b'\n'
- VALID_INBOUND_COMMANDS = [] # Valid commands we expect to receive
- VALID_OUTBOUND_COMMANDS = [] # Valid commans we can send
- max_line_buffer = 10000
- def __init__(self, clock):
- self.clock = clock
- self.last_received_command = self.clock.time_msec()
- self.last_sent_command = 0
- self.time_we_closed = None # When we requested the connection be closed
- self.received_ping = False # Have we reecived a ping from the other side
- self.state = ConnectionStates.CONNECTING
- self.name = "anon" # The name sent by a client.
- self.conn_id = random_string(5) # To dedupe in case of name clashes.
- # List of pending commands to send once we've established the connection
- self.pending_commands = []
- # The LoopingCall for sending pings.
- self._send_ping_loop = None
- self.inbound_commands_counter = defaultdict(int)
- self.outbound_commands_counter = defaultdict(int)
- def connectionMade(self):
- logger.info("[%s] Connection established", self.id())
- self.state = ConnectionStates.ESTABLISHED
- connected_connections.append(self) # Register connection for metrics
- self.transport.registerProducer(self, True) # For the *Producing callbacks
- self._send_pending_commands()
- # Starts sending pings
- self._send_ping_loop = self.clock.looping_call(self.send_ping, 5000)
- # Always send the initial PING so that the other side knows that they
- # can time us out.
- self.send_command(PingCommand(self.clock.time_msec()))
- def send_ping(self):
- """Periodically sends a ping and checks if we should close the connection
- due to the other side timing out.
- """
- now = self.clock.time_msec()
- if self.time_we_closed:
- if now - self.time_we_closed > PING_TIMEOUT_MS:
- logger.info(
- "[%s] Failed to close connection gracefully, aborting", self.id()
- )
- self.transport.abortConnection()
- else:
- if now - self.last_sent_command >= PING_TIME:
- self.send_command(PingCommand(now))
- if self.received_ping and now - self.last_received_command > PING_TIMEOUT_MS:
- logger.info(
- "[%s] Connection hasn't received command in %r ms. Closing.",
- self.id(), now - self.last_received_command
- )
- self.send_error("ping timeout")
- def lineReceived(self, line):
- """Called when we've received a line
- """
- if line.strip() == "":
- # Ignore blank lines
- return
- line = line.decode("utf-8")
- cmd_name, rest_of_line = line.split(" ", 1)
- if cmd_name not in self.VALID_INBOUND_COMMANDS:
- logger.error("[%s] invalid command %s", self.id(), cmd_name)
- self.send_error("invalid command: %s", cmd_name)
- return
- self.last_received_command = self.clock.time_msec()
- self.inbound_commands_counter[cmd_name] = (
- self.inbound_commands_counter[cmd_name] + 1)
- cmd_cls = COMMAND_MAP[cmd_name]
- try:
- cmd = cmd_cls.from_line(rest_of_line)
- except Exception as e:
- logger.exception(
- "[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line
- )
- self.send_error(
- "failed to parse line for %r: %r (%r):" % (cmd_name, e, rest_of_line)
- )
- return
- # Now lets try and call on_<CMD_NAME> function
- try:
- getattr(self, "on_%s" % (cmd_name,))(cmd)
- except Exception:
- logger.exception("[%s] Failed to handle line: %r", self.id(), line)
- def close(self):
- logger.warn("[%s] Closing connection", self.id())
- self.time_we_closed = self.clock.time_msec()
- self.transport.loseConnection()
- self.on_connection_closed()
- def send_error(self, error_string, *args):
- """Send an error to remote and close the connection.
- """
- self.send_command(ErrorCommand(error_string % args))
- self.close()
- def send_command(self, cmd, do_buffer=True):
- """Send a command if connection has been established.
- Args:
- cmd (Command)
- do_buffer (bool): Whether to buffer the message or always attempt
- to send the command. This is mostly used to send an error
- message if we're about to close the connection due our buffers
- becoming full.
- """
- if self.state == ConnectionStates.CLOSED:
- logger.debug("[%s] Not sending, connection closed", self.id())
- return
- if do_buffer and self.state != ConnectionStates.ESTABLISHED:
- self._queue_command(cmd)
- return
- self.outbound_commands_counter[cmd.NAME] = (
- self.outbound_commands_counter[cmd.NAME] + 1)
- string = "%s %s" % (cmd.NAME, cmd.to_line(),)
- if "\n" in string:
- raise Exception("Unexpected newline in command: %r", string)
- self.sendLine(string.encode("utf-8"))
- self.last_sent_command = self.clock.time_msec()
- def _queue_command(self, cmd):
- """Queue the command until the connection is ready to write to again.
- """
- logger.debug("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd)
- self.pending_commands.append(cmd)
- if len(self.pending_commands) > self.max_line_buffer:
- # The other side is failing to keep up and out buffers are becoming
- # full, so lets close the connection.
- # XXX: should we squawk more loudly?
- logger.error("[%s] Remote failed to keep up", self.id())
- self.send_command(ErrorCommand("Failed to keep up"), do_buffer=False)
- self.close()
- def _send_pending_commands(self):
- """Send any queued commandes
- """
- pending = self.pending_commands
- self.pending_commands = []
- for cmd in pending:
- self.send_command(cmd)
- def on_PING(self, line):
- self.received_ping = True
- def on_ERROR(self, cmd):
- logger.error("[%s] Remote reported error: %r", self.id(), cmd.data)
- def pauseProducing(self):
- """This is called when both the kernel send buffer and the twisted
- tcp connection send buffers have become full.
- We don't actually have any control over those sizes, so we buffer some
- commands ourselves before knifing the connection due to the remote
- failing to keep up.
- """
- logger.info("[%s] Pause producing", self.id())
- self.state = ConnectionStates.PAUSED
- def resumeProducing(self):
- """The remote has caught up after we started buffering!
- """
- logger.info("[%s] Resume producing", self.id())
- self.state = ConnectionStates.ESTABLISHED
- self._send_pending_commands()
- def stopProducing(self):
- """We're never going to send any more data (normally because either
- we or the remote has closed the connection)
- """
- logger.info("[%s] Stop producing", self.id())
- self.on_connection_closed()
- def connectionLost(self, reason):
- logger.info("[%s] Replication connection closed: %r", self.id(), reason)
- if isinstance(reason, Failure):
- connection_close_counter.labels(reason.type.__name__).inc()
- else:
- connection_close_counter.labels(reason.__class__.__name__).inc()
- try:
- # Remove us from list of connections to be monitored
- connected_connections.remove(self)
- except ValueError:
- pass
- # Stop the looping call sending pings.
- if self._send_ping_loop and self._send_ping_loop.running:
- self._send_ping_loop.stop()
- self.on_connection_closed()
- def on_connection_closed(self):
- logger.info("[%s] Connection was closed", self.id())
- self.state = ConnectionStates.CLOSED
- self.pending_commands = []
- if self.transport:
- self.transport.unregisterProducer()
- def __str__(self):
- return "ReplicationConnection<name=%s,conn_id=%s,addr=%s>" % (
- self.name, self.conn_id, self.addr,
- )
- def id(self):
- return "%s-%s" % (self.name, self.conn_id)
- class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
- VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS
- VALID_OUTBOUND_COMMANDS = VALID_SERVER_COMMANDS
- def __init__(self, server_name, clock, streamer, addr):
- BaseReplicationStreamProtocol.__init__(self, clock) # Old style class
- self.server_name = server_name
- self.streamer = streamer
- self.addr = addr
- # The streams the client has subscribed to and is up to date with
- self.replication_streams = set()
- # The streams the client is currently subscribing to.
- self.connecting_streams = set()
- # Map from stream name to list of updates to send once we've finished
- # subscribing the client to the stream.
- self.pending_rdata = {}
- def connectionMade(self):
- self.send_command(ServerCommand(self.server_name))
- BaseReplicationStreamProtocol.connectionMade(self)
- self.streamer.new_connection(self)
- def on_NAME(self, cmd):
- logger.info("[%s] Renamed to %r", self.id(), cmd.data)
- self.name = cmd.data
- def on_USER_SYNC(self, cmd):
- self.streamer.on_user_sync(
- self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms,
- )
- def on_REPLICATE(self, cmd):
- stream_name = cmd.stream_name
- token = cmd.token
- if stream_name == "ALL":
- # Subscribe to all streams we're publishing to.
- for stream in iterkeys(self.streamer.streams_by_name):
- self.subscribe_to_stream(stream, token)
- else:
- self.subscribe_to_stream(stream_name, token)
- def on_FEDERATION_ACK(self, cmd):
- self.streamer.federation_ack(cmd.token)
- def on_REMOVE_PUSHER(self, cmd):
- self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
- def on_INVALIDATE_CACHE(self, cmd):
- self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
- def on_USER_IP(self, cmd):
- self.streamer.on_user_ip(
- cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
- cmd.last_seen,
- )
- @defer.inlineCallbacks
- def subscribe_to_stream(self, stream_name, token):
- """Subscribe the remote to a streams.
- This invloves checking if they've missed anything and sending those
- updates down if they have. During that time new updates for the stream
- are queued and sent once we've sent down any missed updates.
- """
- self.replication_streams.discard(stream_name)
- self.connecting_streams.add(stream_name)
- try:
- # Get missing updates
- updates, current_token = yield self.streamer.get_stream_updates(
- stream_name, token,
- )
- # Send all the missing updates
- for update in updates:
- token, row = update[0], update[1]
- self.send_command(RdataCommand(stream_name, token, row))
- # We send a POSITION command to ensure that they have an up to
- # date token (especially useful if we didn't send any updates
- # above)
- self.send_command(PositionCommand(stream_name, current_token))
- # Now we can send any updates that came in while we were subscribing
- pending_rdata = self.pending_rdata.pop(stream_name, [])
- for token, update in pending_rdata:
- # Only send updates newer than the current token
- if token > current_token:
- self.send_command(RdataCommand(stream_name, token, update))
- # They're now fully subscribed
- self.replication_streams.add(stream_name)
- except Exception as e:
- logger.exception("[%s] Failed to handle REPLICATE command", self.id())
- self.send_error("failed to handle replicate: %r", e)
- finally:
- self.connecting_streams.discard(stream_name)
- def stream_update(self, stream_name, token, data):
- """Called when a new update is available to stream to clients.
- We need to check if the client is interested in the stream or not
- """
- if stream_name in self.replication_streams:
- # The client is subscribed to the stream
- self.send_command(RdataCommand(stream_name, token, data))
- elif stream_name in self.connecting_streams:
- # The client is being subscribed to the stream
- logger.debug("[%s] Queuing RDATA %r %r", self.id(), stream_name, token)
- self.pending_rdata.setdefault(stream_name, []).append((token, data))
- else:
- # The client isn't subscribed
- logger.debug("[%s] Dropping RDATA %r %r", self.id(), stream_name, token)
- def send_sync(self, data):
- self.send_command(SyncCommand(data))
- def on_connection_closed(self):
- BaseReplicationStreamProtocol.on_connection_closed(self)
- self.streamer.lost_connection(self)
- class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
- VALID_INBOUND_COMMANDS = VALID_SERVER_COMMANDS
- VALID_OUTBOUND_COMMANDS = VALID_CLIENT_COMMANDS
- def __init__(self, client_name, server_name, clock, handler):
- BaseReplicationStreamProtocol.__init__(self, clock)
- self.client_name = client_name
- self.server_name = server_name
- self.handler = handler
- # Map of stream to batched updates. See RdataCommand for info on how
- # batching works.
- self.pending_batches = {}
- def connectionMade(self):
- self.send_command(NameCommand(self.client_name))
- BaseReplicationStreamProtocol.connectionMade(self)
- # Once we've connected subscribe to the necessary streams
- for stream_name, token in iteritems(self.handler.get_streams_to_replicate()):
- self.replicate(stream_name, token)
- # Tell the server if we have any users currently syncing (should only
- # happen on synchrotrons)
- currently_syncing = self.handler.get_currently_syncing_users()
- now = self.clock.time_msec()
- for user_id in currently_syncing:
- self.send_command(UserSyncCommand(user_id, True, now))
- # We've now finished connecting to so inform the client handler
- self.handler.update_connection(self)
- def on_SERVER(self, cmd):
- if cmd.data != self.server_name:
- logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
- self.send_error("Wrong remote")
- def on_RDATA(self, cmd):
- stream_name = cmd.stream_name
- inbound_rdata_count.labels(stream_name).inc()
- try:
- row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
- except Exception:
- logger.exception(
- "[%s] Failed to parse RDATA: %r %r",
- self.id(), stream_name, cmd.row
- )
- raise
- if cmd.token is None:
- # I.e. this is part of a batch of updates for this stream. Batch
- # until we get an update for the stream with a non None token
- self.pending_batches.setdefault(stream_name, []).append(row)
- else:
- # Check if this is the last of a batch of updates
- rows = self.pending_batches.pop(stream_name, [])
- rows.append(row)
- self.handler.on_rdata(stream_name, cmd.token, rows)
- def on_POSITION(self, cmd):
- self.handler.on_position(cmd.stream_name, cmd.token)
- def on_SYNC(self, cmd):
- self.handler.on_sync(cmd.data)
- def replicate(self, stream_name, token):
- """Send the subscription request to the server
- """
- if stream_name not in STREAMS_MAP:
- raise Exception("Invalid stream name %r" % (stream_name,))
- logger.info(
- "[%s] Subscribing to replication stream: %r from %r",
- self.id(), stream_name, token
- )
- self.send_command(ReplicateCommand(stream_name, token))
- def on_connection_closed(self):
- BaseReplicationStreamProtocol.on_connection_closed(self)
- self.handler.update_connection(None)
- # The following simply registers metrics for the replication connections
- pending_commands = LaterGauge(
- "pending_commands", "", ["name", "conn_id"],
- lambda: {
- (p.name, p.conn_id): len(p.pending_commands)
- for p in connected_connections
- })
- def transport_buffer_size(protocol):
- if protocol.transport:
- size = len(protocol.transport.dataBuffer) + protocol.transport._tempDataLen
- return size
- return 0
- transport_send_buffer = LaterGauge(
- "synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"],
- lambda: {
- (p.name, p.conn_id): transport_buffer_size(p)
- for p in connected_connections
- })
- def transport_kernel_read_buffer_size(protocol, read=True):
- SIOCINQ = 0x541B
- SIOCOUTQ = 0x5411
- if protocol.transport:
- fileno = protocol.transport.getHandle().fileno()
- if read:
- op = SIOCINQ
- else:
- op = SIOCOUTQ
- size = struct.unpack("I", fcntl.ioctl(fileno, op, '\0\0\0\0'))[0]
- return size
- return 0
- tcp_transport_kernel_send_buffer = LaterGauge(
- "synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"],
- lambda: {
- (p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
- for p in connected_connections
- })
- tcp_transport_kernel_read_buffer = LaterGauge(
- "synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"],
- lambda: {
- (p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
- for p in connected_connections
- })
- tcp_inbound_commands = LaterGauge(
- "synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"],
- lambda: {
- (k[0], p.name, p.conn_id): count
- for p in connected_connections
- for k, count in iteritems(p.inbound_commands_counter)
- })
- tcp_outbound_commands = LaterGauge(
- "synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"],
- lambda: {
- (k[0], p.name, p.conn_id): count
- for p in connected_connections
- for k, count in iteritems(p.outbound_commands_counter)
- })
- # number of updates received for each RDATA stream
- inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "",
- ["stream_name"])
|