commands.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2017 Vector Creations Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. """Defines the various valid commands
  16. The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
  17. allowed to be sent by which side.
  18. """
  19. import logging
  20. import simplejson
  21. logger = logging.getLogger(__name__)
  22. _json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False)
  23. class Command(object):
  24. """The base command class.
  25. All subclasses must set the NAME variable which equates to the name of the
  26. command on the wire.
  27. A full command line on the wire is constructed from `NAME + " " + to_line()`
  28. The default implementation creates a command of form `<NAME> <data>`
  29. """
  30. NAME = None
  31. def __init__(self, data):
  32. self.data = data
  33. @classmethod
  34. def from_line(cls, line):
  35. """Deserialises a line from the wire into this command. `line` does not
  36. include the command.
  37. """
  38. return cls(line)
  39. def to_line(self):
  40. """Serialises the comamnd for the wire. Does not include the command
  41. prefix.
  42. """
  43. return self.data
  44. class ServerCommand(Command):
  45. """Sent by the server on new connection and includes the server_name.
  46. Format::
  47. SERVER <server_name>
  48. """
  49. NAME = "SERVER"
  50. class RdataCommand(Command):
  51. """Sent by server when a subscribed stream has an update.
  52. Format::
  53. RDATA <stream_name> <token> <row_json>
  54. The `<token>` may either be a numeric stream id OR "batch". The latter case
  55. is used to support sending multiple updates with the same stream ID. This
  56. is done by sending an RDATA for each row, with all but the last RDATA having
  57. a token of "batch" and the last having the final stream ID.
  58. The client should batch all incoming RDATA with a token of "batch" (per
  59. stream_name) until it sees an RDATA with a numeric stream ID.
  60. `<token>` of "batch" maps to the instance variable `token` being None.
  61. An example of a batched series of RDATA::
  62. RDATA presence batch ["@foo:example.com", "online", ...]
  63. RDATA presence batch ["@bar:example.com", "online", ...]
  64. RDATA presence 59 ["@baz:example.com", "online", ...]
  65. """
  66. NAME = "RDATA"
  67. def __init__(self, stream_name, token, row):
  68. self.stream_name = stream_name
  69. self.token = token
  70. self.row = row
  71. @classmethod
  72. def from_line(cls, line):
  73. stream_name, token, row_json = line.split(" ", 2)
  74. return cls(
  75. stream_name,
  76. None if token == "batch" else int(token),
  77. simplejson.loads(row_json)
  78. )
  79. def to_line(self):
  80. return " ".join((
  81. self.stream_name,
  82. str(self.token) if self.token is not None else "batch",
  83. _json_encoder.encode(self.row),
  84. ))
  85. class PositionCommand(Command):
  86. """Sent by the client to tell the client the stream postition without
  87. needing to send an RDATA.
  88. """
  89. NAME = "POSITION"
  90. def __init__(self, stream_name, token):
  91. self.stream_name = stream_name
  92. self.token = token
  93. @classmethod
  94. def from_line(cls, line):
  95. stream_name, token = line.split(" ", 1)
  96. return cls(stream_name, int(token))
  97. def to_line(self):
  98. return " ".join((self.stream_name, str(self.token),))
  99. class ErrorCommand(Command):
  100. """Sent by either side if there was an ERROR. The data is a string describing
  101. the error.
  102. """
  103. NAME = "ERROR"
  104. class PingCommand(Command):
  105. """Sent by either side as a keep alive. The data is arbitary (often timestamp)
  106. """
  107. NAME = "PING"
  108. class NameCommand(Command):
  109. """Sent by client to inform the server of the client's identity. The data
  110. is the name
  111. """
  112. NAME = "NAME"
  113. class ReplicateCommand(Command):
  114. """Sent by the client to subscribe to the stream.
  115. Format::
  116. REPLICATE <stream_name> <token>
  117. Where <token> may be either:
  118. * a numeric stream_id to stream updates from
  119. * "NOW" to stream all subsequent updates.
  120. The <stream_name> can be "ALL" to subscribe to all known streams, in which
  121. case the <token> must be set to "NOW", i.e.::
  122. REPLICATE ALL NOW
  123. """
  124. NAME = "REPLICATE"
  125. def __init__(self, stream_name, token):
  126. self.stream_name = stream_name
  127. self.token = token
  128. @classmethod
  129. def from_line(cls, line):
  130. stream_name, token = line.split(" ", 1)
  131. if token in ("NOW", "now"):
  132. token = "NOW"
  133. else:
  134. token = int(token)
  135. return cls(stream_name, token)
  136. def to_line(self):
  137. return " ".join((self.stream_name, str(self.token),))
  138. class UserSyncCommand(Command):
  139. """Sent by the client to inform the server that a user has started or
  140. stopped syncing. Used to calculate presence on the master.
  141. Includes a timestamp of when the last user sync was.
  142. Format::
  143. USER_SYNC <user_id> <state> <last_sync_ms>
  144. Where <state> is either "start" or "stop"
  145. """
  146. NAME = "USER_SYNC"
  147. def __init__(self, user_id, is_syncing, last_sync_ms):
  148. self.user_id = user_id
  149. self.is_syncing = is_syncing
  150. self.last_sync_ms = last_sync_ms
  151. @classmethod
  152. def from_line(cls, line):
  153. user_id, state, last_sync_ms = line.split(" ", 2)
  154. if state not in ("start", "end"):
  155. raise Exception("Invalid USER_SYNC state %r" % (state,))
  156. return cls(user_id, state == "start", int(last_sync_ms))
  157. def to_line(self):
  158. return " ".join((
  159. self.user_id, "start" if self.is_syncing else "end", str(self.last_sync_ms),
  160. ))
  161. class FederationAckCommand(Command):
  162. """Sent by the client when it has processed up to a given point in the
  163. federation stream. This allows the master to drop in-memory caches of the
  164. federation stream.
  165. This must only be sent from one worker (i.e. the one sending federation)
  166. Format::
  167. FEDERATION_ACK <token>
  168. """
  169. NAME = "FEDERATION_ACK"
  170. def __init__(self, token):
  171. self.token = token
  172. @classmethod
  173. def from_line(cls, line):
  174. return cls(int(line))
  175. def to_line(self):
  176. return str(self.token)
  177. class SyncCommand(Command):
  178. """Used for testing. The client protocol implementation allows waiting
  179. on a SYNC command with a specified data.
  180. """
  181. NAME = "SYNC"
  182. class RemovePusherCommand(Command):
  183. """Sent by the client to request the master remove the given pusher.
  184. Format::
  185. REMOVE_PUSHER <app_id> <push_key> <user_id>
  186. """
  187. NAME = "REMOVE_PUSHER"
  188. def __init__(self, app_id, push_key, user_id):
  189. self.user_id = user_id
  190. self.app_id = app_id
  191. self.push_key = push_key
  192. @classmethod
  193. def from_line(cls, line):
  194. app_id, push_key, user_id = line.split(" ", 2)
  195. return cls(app_id, push_key, user_id)
  196. def to_line(self):
  197. return " ".join((self.app_id, self.push_key, self.user_id))
  198. class InvalidateCacheCommand(Command):
  199. """Sent by the client to invalidate an upstream cache.
  200. THIS IS NOT RELIABLE, AND SHOULD *NOT* BE USED ACCEPT FOR THINGS THAT ARE
  201. NOT DISASTROUS IF WE DROP ON THE FLOOR.
  202. Mainly used to invalidate destination retry timing caches.
  203. Format::
  204. INVALIDATE_CACHE <cache_func> <keys_json>
  205. Where <keys_json> is a json list.
  206. """
  207. NAME = "INVALIDATE_CACHE"
  208. def __init__(self, cache_func, keys):
  209. self.cache_func = cache_func
  210. self.keys = keys
  211. @classmethod
  212. def from_line(cls, line):
  213. cache_func, keys_json = line.split(" ", 1)
  214. return cls(cache_func, simplejson.loads(keys_json))
  215. def to_line(self):
  216. return " ".join((
  217. self.cache_func, _json_encoder.encode(self.keys),
  218. ))
  219. class UserIpCommand(Command):
  220. """Sent periodically when a worker sees activity from a client.
  221. Format::
  222. USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent>
  223. """
  224. NAME = "USER_IP"
  225. def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen):
  226. self.user_id = user_id
  227. self.access_token = access_token
  228. self.ip = ip
  229. self.user_agent = user_agent
  230. self.device_id = device_id
  231. self.last_seen = last_seen
  232. @classmethod
  233. def from_line(cls, line):
  234. user_id, jsn = line.split(" ", 1)
  235. access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn)
  236. return cls(
  237. user_id, access_token, ip, user_agent, device_id, last_seen
  238. )
  239. def to_line(self):
  240. return self.user_id + " " + _json_encoder.encode((
  241. self.access_token, self.ip, self.user_agent, self.device_id,
  242. self.last_seen,
  243. ))
  244. # Map of command name to command type.
  245. COMMAND_MAP = {
  246. cmd.NAME: cmd
  247. for cmd in (
  248. ServerCommand,
  249. RdataCommand,
  250. PositionCommand,
  251. ErrorCommand,
  252. PingCommand,
  253. NameCommand,
  254. ReplicateCommand,
  255. UserSyncCommand,
  256. FederationAckCommand,
  257. SyncCommand,
  258. RemovePusherCommand,
  259. InvalidateCacheCommand,
  260. UserIpCommand,
  261. )
  262. }
  263. # The commands the server is allowed to send
  264. VALID_SERVER_COMMANDS = (
  265. ServerCommand.NAME,
  266. RdataCommand.NAME,
  267. PositionCommand.NAME,
  268. ErrorCommand.NAME,
  269. PingCommand.NAME,
  270. SyncCommand.NAME,
  271. )
  272. # The commands the client is allowed to send
  273. VALID_CLIENT_COMMANDS = (
  274. NameCommand.NAME,
  275. ReplicateCommand.NAME,
  276. PingCommand.NAME,
  277. UserSyncCommand.NAME,
  278. FederationAckCommand.NAME,
  279. RemovePusherCommand.NAME,
  280. InvalidateCacheCommand.NAME,
  281. UserIpCommand.NAME,
  282. ErrorCommand.NAME,
  283. )