123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- # -*- coding: utf-8 -*-
- # Copyright 2014-2016 OpenMarket Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """ This is an example of using the server to server implementation to do a
- basic chat style thing. It accepts commands from stdin and outputs to stdout.
- It assumes that ucids are of the form <user>@<domain>, and uses <domain> as
- the address of the remote home server to hit.
- Usage:
- python test_messaging.py <port>
- Currently assumes the local address is localhost:<port>
- """
- from synapse.federation import ReplicationHandler
- from synapse.federation.units import Pdu
- from synapse.util import origin_from_ucid
- from synapse.app.homeserver import SynapseHomeServer
- # from synapse.logging.utils import log_function
- from twisted.internet import reactor, defer
- from twisted.python import log
- import argparse
- import json
- import logging
- import os
- import re
- import cursesio
- import curses.wrapper
- logger = logging.getLogger("example")
- def excpetion_errback(failure):
- logging.exception(failure)
- class InputOutput(object):
- """ This is responsible for basic I/O so that a user can interact with
- the example app.
- """
- def __init__(self, screen, user):
- self.screen = screen
- self.user = user
- def set_home_server(self, server):
- self.server = server
- def on_line(self, line):
- """ This is where we process commands.
- """
- try:
- m = re.match("^join (\S+)$", line)
- if m:
- # The `sender` wants to join a room.
- (room_name,) = m.groups()
- self.print_line("%s joining %s" % (self.user, room_name))
- self.server.join_room(room_name, self.user, self.user)
- # self.print_line("OK.")
- return
- m = re.match("^invite (\S+) (\S+)$", line)
- if m:
- # `sender` wants to invite someone to a room
- room_name, invitee = m.groups()
- self.print_line("%s invited to %s" % (invitee, room_name))
- self.server.invite_to_room(room_name, self.user, invitee)
- # self.print_line("OK.")
- return
- m = re.match("^send (\S+) (.*)$", line)
- if m:
- # `sender` wants to message a room
- room_name, body = m.groups()
- self.print_line("%s send to %s" % (self.user, room_name))
- self.server.send_message(room_name, self.user, body)
- # self.print_line("OK.")
- return
- m = re.match("^backfill (\S+)$", line)
- if m:
- # we want to backfill a room
- (room_name,) = m.groups()
- self.print_line("backfill %s" % room_name)
- self.server.backfill(room_name)
- return
- self.print_line("Unrecognized command")
- except Exception as e:
- logger.exception(e)
- def print_line(self, text):
- self.screen.print_line(text)
- def print_log(self, text):
- self.screen.print_log(text)
- class IOLoggerHandler(logging.Handler):
- def __init__(self, io):
- logging.Handler.__init__(self)
- self.io = io
- def emit(self, record):
- if record.levelno < logging.WARN:
- return
- msg = self.format(record)
- self.io.print_log(msg)
- class Room(object):
- """ Used to store (in memory) the current membership state of a room, and
- which home servers we should send PDUs associated with the room to.
- """
- def __init__(self, room_name):
- self.room_name = room_name
- self.invited = set()
- self.participants = set()
- self.servers = set()
- self.oldest_server = None
- self.have_got_metadata = False
- def add_participant(self, participant):
- """ Someone has joined the room
- """
- self.participants.add(participant)
- self.invited.discard(participant)
- server = origin_from_ucid(participant)
- self.servers.add(server)
- if not self.oldest_server:
- self.oldest_server = server
- def add_invited(self, invitee):
- """ Someone has been invited to the room
- """
- self.invited.add(invitee)
- self.servers.add(origin_from_ucid(invitee))
- class HomeServer(ReplicationHandler):
- """ A very basic home server implentation that allows people to join a
- room and then invite other people.
- """
- def __init__(self, server_name, replication_layer, output):
- self.server_name = server_name
- self.replication_layer = replication_layer
- self.replication_layer.set_handler(self)
- self.joined_rooms = {}
- self.output = output
- def on_receive_pdu(self, pdu):
- """ We just received a PDU
- """
- pdu_type = pdu.pdu_type
- if pdu_type == "sy.room.message":
- self._on_message(pdu)
- elif pdu_type == "sy.room.member" and "membership" in pdu.content:
- if pdu.content["membership"] == "join":
- self._on_join(pdu.context, pdu.state_key)
- elif pdu.content["membership"] == "invite":
- self._on_invite(pdu.origin, pdu.context, pdu.state_key)
- else:
- self.output.print_line(
- "#%s (unrec) %s = %s"
- % (pdu.context, pdu.pdu_type, json.dumps(pdu.content))
- )
- # def on_state_change(self, pdu):
- ##self.output.print_line("#%s (state) %s *** %s" %
- ##(pdu.context, pdu.state_key, pdu.pdu_type)
- ##)
- # if "joinee" in pdu.content:
- # self._on_join(pdu.context, pdu.content["joinee"])
- # elif "invitee" in pdu.content:
- # self._on_invite(pdu.origin, pdu.context, pdu.content["invitee"])
- def _on_message(self, pdu):
- """ We received a message
- """
- self.output.print_line(
- "#%s %s %s" % (pdu.context, pdu.content["sender"], pdu.content["body"])
- )
- def _on_join(self, context, joinee):
- """ Someone has joined a room, either a remote user or a local user
- """
- room = self._get_or_create_room(context)
- room.add_participant(joinee)
- self.output.print_line("#%s %s %s" % (context, joinee, "*** JOINED"))
- def _on_invite(self, origin, context, invitee):
- """ Someone has been invited
- """
- room = self._get_or_create_room(context)
- room.add_invited(invitee)
- self.output.print_line("#%s %s %s" % (context, invitee, "*** INVITED"))
- if not room.have_got_metadata and origin is not self.server_name:
- logger.debug("Get room state")
- self.replication_layer.get_state_for_context(origin, context)
- room.have_got_metadata = True
- @defer.inlineCallbacks
- def send_message(self, room_name, sender, body):
- """ Send a message to a room!
- """
- destinations = yield self.get_servers_for_context(room_name)
- try:
- yield self.replication_layer.send_pdu(
- Pdu.create_new(
- context=room_name,
- pdu_type="sy.room.message",
- content={"sender": sender, "body": body},
- origin=self.server_name,
- destinations=destinations,
- )
- )
- except Exception as e:
- logger.exception(e)
- @defer.inlineCallbacks
- def join_room(self, room_name, sender, joinee):
- """ Join a room!
- """
- self._on_join(room_name, joinee)
- destinations = yield self.get_servers_for_context(room_name)
- try:
- pdu = Pdu.create_new(
- context=room_name,
- pdu_type="sy.room.member",
- is_state=True,
- state_key=joinee,
- content={"membership": "join"},
- origin=self.server_name,
- destinations=destinations,
- )
- yield self.replication_layer.send_pdu(pdu)
- except Exception as e:
- logger.exception(e)
- @defer.inlineCallbacks
- def invite_to_room(self, room_name, sender, invitee):
- """ Invite someone to a room!
- """
- self._on_invite(self.server_name, room_name, invitee)
- destinations = yield self.get_servers_for_context(room_name)
- try:
- yield self.replication_layer.send_pdu(
- Pdu.create_new(
- context=room_name,
- is_state=True,
- pdu_type="sy.room.member",
- state_key=invitee,
- content={"membership": "invite"},
- origin=self.server_name,
- destinations=destinations,
- )
- )
- except Exception as e:
- logger.exception(e)
- def backfill(self, room_name, limit=5):
- room = self.joined_rooms.get(room_name)
- if not room:
- return
- dest = room.oldest_server
- return self.replication_layer.backfill(dest, room_name, limit)
- def _get_room_remote_servers(self, room_name):
- return [i for i in self.joined_rooms.setdefault(room_name).servers]
- def _get_or_create_room(self, room_name):
- return self.joined_rooms.setdefault(room_name, Room(room_name))
- def get_servers_for_context(self, context):
- return defer.succeed(
- self.joined_rooms.setdefault(context, Room(context)).servers
- )
- def main(stdscr):
- parser = argparse.ArgumentParser()
- parser.add_argument("user", type=str)
- parser.add_argument("-v", "--verbose", action="count")
- args = parser.parse_args()
- user = args.user
- server_name = origin_from_ucid(user)
- ## Set up logging ##
- root_logger = logging.getLogger()
- formatter = logging.Formatter(
- "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s"
- )
- if not os.path.exists("logs"):
- os.makedirs("logs")
- fh = logging.FileHandler("logs/%s" % user)
- fh.setFormatter(formatter)
- root_logger.addHandler(fh)
- root_logger.setLevel(logging.DEBUG)
- # Hack: The only way to get it to stop logging to sys.stderr :(
- log.theLogPublisher.observers = []
- observer = log.PythonLoggingObserver()
- observer.start()
- ## Set up synapse server
- curses_stdio = cursesio.CursesStdIO(stdscr)
- input_output = InputOutput(curses_stdio, user)
- curses_stdio.set_callback(input_output)
- app_hs = SynapseHomeServer(server_name, db_name="dbs/%s" % user)
- replication = app_hs.get_replication_layer()
- hs = HomeServer(server_name, replication, curses_stdio)
- input_output.set_home_server(hs)
- ## Add input_output logger
- io_logger = IOLoggerHandler(input_output)
- io_logger.setFormatter(formatter)
- root_logger.addHandler(io_logger)
- ## Start! ##
- try:
- port = int(server_name.split(":")[1])
- except:
- port = 12345
- app_hs.get_http_server().start_listening(port)
- reactor.addReader(curses_stdio)
- reactor.run()
- if __name__ == "__main__":
- curses.wrapper(main)
|