123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- # -*- coding: utf-8 -*-
- # Copyright 2017 Vector Creations Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Defines the various valid commands
- The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
- allowed to be sent by which side.
- """
- import logging
- import simplejson
- logger = logging.getLogger(__name__)
- _json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False)
- class Command(object):
- """The base command class.
- All subclasses must set the NAME variable which equates to the name of the
- command on the wire.
- A full command line on the wire is constructed from `NAME + " " + to_line()`
- The default implementation creates a command of form `<NAME> <data>`
- """
- NAME = None
- def __init__(self, data):
- self.data = data
- @classmethod
- def from_line(cls, line):
- """Deserialises a line from the wire into this command. `line` does not
- include the command.
- """
- return cls(line)
- def to_line(self):
- """Serialises the comamnd for the wire. Does not include the command
- prefix.
- """
- return self.data
- class ServerCommand(Command):
- """Sent by the server on new connection and includes the server_name.
- Format::
- SERVER <server_name>
- """
- NAME = "SERVER"
- class RdataCommand(Command):
- """Sent by server when a subscribed stream has an update.
- Format::
- RDATA <stream_name> <token> <row_json>
- The `<token>` may either be a numeric stream id OR "batch". The latter case
- is used to support sending multiple updates with the same stream ID. This
- is done by sending an RDATA for each row, with all but the last RDATA having
- a token of "batch" and the last having the final stream ID.
- The client should batch all incoming RDATA with a token of "batch" (per
- stream_name) until it sees an RDATA with a numeric stream ID.
- `<token>` of "batch" maps to the instance variable `token` being None.
- An example of a batched series of RDATA::
- RDATA presence batch ["@foo:example.com", "online", ...]
- RDATA presence batch ["@bar:example.com", "online", ...]
- RDATA presence 59 ["@baz:example.com", "online", ...]
- """
- NAME = "RDATA"
- def __init__(self, stream_name, token, row):
- self.stream_name = stream_name
- self.token = token
- self.row = row
- @classmethod
- def from_line(cls, line):
- stream_name, token, row_json = line.split(" ", 2)
- return cls(
- stream_name,
- None if token == "batch" else int(token),
- simplejson.loads(row_json)
- )
- def to_line(self):
- return " ".join((
- self.stream_name,
- str(self.token) if self.token is not None else "batch",
- _json_encoder.encode(self.row),
- ))
- class PositionCommand(Command):
- """Sent by the client to tell the client the stream postition without
- needing to send an RDATA.
- """
- NAME = "POSITION"
- def __init__(self, stream_name, token):
- self.stream_name = stream_name
- self.token = token
- @classmethod
- def from_line(cls, line):
- stream_name, token = line.split(" ", 1)
- return cls(stream_name, int(token))
- def to_line(self):
- return " ".join((self.stream_name, str(self.token),))
- class ErrorCommand(Command):
- """Sent by either side if there was an ERROR. The data is a string describing
- the error.
- """
- NAME = "ERROR"
- class PingCommand(Command):
- """Sent by either side as a keep alive. The data is arbitary (often timestamp)
- """
- NAME = "PING"
- class NameCommand(Command):
- """Sent by client to inform the server of the client's identity. The data
- is the name
- """
- NAME = "NAME"
- class ReplicateCommand(Command):
- """Sent by the client to subscribe to the stream.
- Format::
- REPLICATE <stream_name> <token>
- Where <token> may be either:
- * a numeric stream_id to stream updates from
- * "NOW" to stream all subsequent updates.
- The <stream_name> can be "ALL" to subscribe to all known streams, in which
- case the <token> must be set to "NOW", i.e.::
- REPLICATE ALL NOW
- """
- NAME = "REPLICATE"
- def __init__(self, stream_name, token):
- self.stream_name = stream_name
- self.token = token
- @classmethod
- def from_line(cls, line):
- stream_name, token = line.split(" ", 1)
- if token in ("NOW", "now"):
- token = "NOW"
- else:
- token = int(token)
- return cls(stream_name, token)
- def to_line(self):
- return " ".join((self.stream_name, str(self.token),))
- class UserSyncCommand(Command):
- """Sent by the client to inform the server that a user has started or
- stopped syncing. Used to calculate presence on the master.
- Includes a timestamp of when the last user sync was.
- Format::
- USER_SYNC <user_id> <state> <last_sync_ms>
- Where <state> is either "start" or "stop"
- """
- NAME = "USER_SYNC"
- def __init__(self, user_id, is_syncing, last_sync_ms):
- self.user_id = user_id
- self.is_syncing = is_syncing
- self.last_sync_ms = last_sync_ms
- @classmethod
- def from_line(cls, line):
- user_id, state, last_sync_ms = line.split(" ", 2)
- if state not in ("start", "end"):
- raise Exception("Invalid USER_SYNC state %r" % (state,))
- return cls(user_id, state == "start", int(last_sync_ms))
- def to_line(self):
- return " ".join((
- self.user_id, "start" if self.is_syncing else "end", str(self.last_sync_ms),
- ))
- class FederationAckCommand(Command):
- """Sent by the client when it has processed up to a given point in the
- federation stream. This allows the master to drop in-memory caches of the
- federation stream.
- This must only be sent from one worker (i.e. the one sending federation)
- Format::
- FEDERATION_ACK <token>
- """
- NAME = "FEDERATION_ACK"
- def __init__(self, token):
- self.token = token
- @classmethod
- def from_line(cls, line):
- return cls(int(line))
- def to_line(self):
- return str(self.token)
- class SyncCommand(Command):
- """Used for testing. The client protocol implementation allows waiting
- on a SYNC command with a specified data.
- """
- NAME = "SYNC"
- class RemovePusherCommand(Command):
- """Sent by the client to request the master remove the given pusher.
- Format::
- REMOVE_PUSHER <app_id> <push_key> <user_id>
- """
- NAME = "REMOVE_PUSHER"
- def __init__(self, app_id, push_key, user_id):
- self.user_id = user_id
- self.app_id = app_id
- self.push_key = push_key
- @classmethod
- def from_line(cls, line):
- app_id, push_key, user_id = line.split(" ", 2)
- return cls(app_id, push_key, user_id)
- def to_line(self):
- return " ".join((self.app_id, self.push_key, self.user_id))
- class InvalidateCacheCommand(Command):
- """Sent by the client to invalidate an upstream cache.
- THIS IS NOT RELIABLE, AND SHOULD *NOT* BE USED ACCEPT FOR THINGS THAT ARE
- NOT DISASTROUS IF WE DROP ON THE FLOOR.
- Mainly used to invalidate destination retry timing caches.
- Format::
- INVALIDATE_CACHE <cache_func> <keys_json>
- Where <keys_json> is a json list.
- """
- NAME = "INVALIDATE_CACHE"
- def __init__(self, cache_func, keys):
- self.cache_func = cache_func
- self.keys = keys
- @classmethod
- def from_line(cls, line):
- cache_func, keys_json = line.split(" ", 1)
- return cls(cache_func, simplejson.loads(keys_json))
- def to_line(self):
- return " ".join((
- self.cache_func, _json_encoder.encode(self.keys),
- ))
- class UserIpCommand(Command):
- """Sent periodically when a worker sees activity from a client.
- Format::
- USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent>
- """
- NAME = "USER_IP"
- def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen):
- self.user_id = user_id
- self.access_token = access_token
- self.ip = ip
- self.user_agent = user_agent
- self.device_id = device_id
- self.last_seen = last_seen
- @classmethod
- def from_line(cls, line):
- user_id, jsn = line.split(" ", 1)
- access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn)
- return cls(
- user_id, access_token, ip, user_agent, device_id, last_seen
- )
- def to_line(self):
- return self.user_id + " " + _json_encoder.encode((
- self.access_token, self.ip, self.user_agent, self.device_id,
- self.last_seen,
- ))
- # Map of command name to command type.
- COMMAND_MAP = {
- cmd.NAME: cmd
- for cmd in (
- ServerCommand,
- RdataCommand,
- PositionCommand,
- ErrorCommand,
- PingCommand,
- NameCommand,
- ReplicateCommand,
- UserSyncCommand,
- FederationAckCommand,
- SyncCommand,
- RemovePusherCommand,
- InvalidateCacheCommand,
- UserIpCommand,
- )
- }
- # The commands the server is allowed to send
- VALID_SERVER_COMMANDS = (
- ServerCommand.NAME,
- RdataCommand.NAME,
- PositionCommand.NAME,
- ErrorCommand.NAME,
- PingCommand.NAME,
- SyncCommand.NAME,
- )
- # The commands the client is allowed to send
- VALID_CLIENT_COMMANDS = (
- NameCommand.NAME,
- ReplicateCommand.NAME,
- PingCommand.NAME,
- UserSyncCommand.NAME,
- FederationAckCommand.NAME,
- RemovePusherCommand.NAME,
- InvalidateCacheCommand.NAME,
- UserIpCommand.NAME,
- ErrorCommand.NAME,
- )
|