Răsfoiți Sursa

Merge branch 'develop' of github.com:matrix-org/synapse into erikj/groups_merged

Erik Johnston 6 ani în urmă
părinte
comite
b5feaa5a49

+ 47 - 0
.github/ISSUE_TEMPLATE.md

@@ -0,0 +1,47 @@
+<!-- 
+
+**IF YOU HAVE SUPPORT QUESTIONS ABOUT RUNNING OR CONFIGURING YOUR OWN HOME SERVER**: 
+You will likely get better support more quickly if you ask in ** #matrix:matrix.org ** ;)
+
+
+This is a bug report template. By following the instructions below and
+filling out the sections with your information, you will help the us to get all
+the necessary data to fix your issue.
+
+You can also preview your report before submitting it. You may remove sections
+that aren't relevant to your particular case.
+
+Text between <!-- and --​> marks will be invisible in the report.
+
+-->
+
+### Description
+
+Describe here the problem that you are experiencing, or the feature you are requesting.
+
+### Steps to reproduce
+
+- For bugs, list the steps
+- that reproduce the bug
+- using hyphens as bullet points
+
+Describe how what happens differs from what you expected.
+
+If you can identify any relevant log snippets from _homeserver.log_, please include
+those here (please be careful to remove any personal or private data):
+
+### Version information
+
+<!-- IMPORTANT: please answer the following questions, to help us narrow down the problem -->
+
+- **Homeserver**: Was this issue identified on matrix.org or another homeserver?
+
+If not matrix.org:
+- **Version**:        What version of Synapse is running? <!-- 
+You can find the Synapse version by inspecting the server headers (replace matrix.org with
+your own homeserver domain):
+$ curl -v https://matrix.org/_matrix/client/versions 2>&1 | grep "Server:"
+-->
+- **Install method**: package manager/git clone/pip      
+- **Platform**:       Tell us about the environment in which your homeserver is operating
+                      - distro, hardware, if it's running in a vm/container, etc.

+ 1 - 0
MANIFEST.in

@@ -27,4 +27,5 @@ exclude jenkins*.sh
 exclude jenkins*
 recursive-exclude jenkins *.sh
 
+prune .github
 prune demo/etc

+ 42 - 33
UPGRADE.rst

@@ -5,39 +5,48 @@ Before upgrading check if any special steps are required to upgrade from the
 what you currently have installed to current version of synapse. The extra
 instructions that may be required are listed later in this document.
 
-If synapse was installed in a virtualenv then active that virtualenv before
-upgrading. If synapse is installed in a virtualenv in ``~/.synapse/`` then run:
+1. If synapse was installed in a virtualenv then active that virtualenv before
+   upgrading. If synapse is installed in a virtualenv in ``~/.synapse/`` then
+   run:
 
-.. code:: bash
+   .. code:: bash
 
-    source ~/.synapse/bin/activate
+       source ~/.synapse/bin/activate
 
-If synapse was installed using pip then upgrade to the latest version by
-running:
+2. If synapse was installed using pip then upgrade to the latest version by
+   running:
 
-.. code:: bash
+   .. code:: bash
 
-    pip install --upgrade --process-dependency-links https://github.com/matrix-org/synapse/tarball/master
+       pip install --upgrade --process-dependency-links https://github.com/matrix-org/synapse/tarball/master
 
-If synapse was installed using git then upgrade to the latest version by
-running:
+       # restart synapse
+       synctl restart
 
-.. code:: bash
 
-    # Pull the latest version of the master branch.
-    git pull
-    # Update the versions of synapse's python dependencies.
-    python synapse/python_dependencies.py | xargs -n1 pip install --upgrade
-	
-To check whether your update was sucessfull, run:
+   If synapse was installed using git then upgrade to the latest version by
+   running:
 
-.. code:: bash
+   .. code:: bash
+
+       # Pull the latest version of the master branch.
+       git pull
+       # Update the versions of synapse's python dependencies.
+       python synapse/python_dependencies.py | xargs pip install --upgrade
 
-	 # replace your.server.domain with ther domain of your synapse homeserver
-	 curl https://<your.server.domain>/_matrix/federation/v1/version 
+       # restart synapse
+       ./synctl restart
 
-So for the Matrix.org HS server the URL would be: https://matrix.org/_matrix/federation/v1/version.
 
+To check whether your update was sucessful, you can check the Server header
+returned by the Client-Server API:
+
+.. code:: bash
+
+    # replace <host.name> with the hostname of your synapse homeserver.
+    # You may need to specify a port (eg, :8448) if your server is not
+    # configured on port 443.
+    curl -kv https://<host.name>/_matrix/client/versions 2>&1 | grep "Server:"
 
 Upgrading to v0.15.0
 ====================
@@ -77,7 +86,7 @@ It has been replaced by specifying a list of application service registrations i
 ``homeserver.yaml``::
 
   app_service_config_files: ["registration-01.yaml", "registration-02.yaml"]
-  
+
 Where ``registration-01.yaml`` looks like::
 
   url: <String>  # e.g. "https://my.application.service.com"
@@ -166,7 +175,7 @@ This release completely changes the database schema and so requires upgrading
 it before starting the new version of the homeserver.
 
 The script "database-prepare-for-0.5.0.sh" should be used to upgrade the
-database. This will save all user information, such as logins and profiles, 
+database. This will save all user information, such as logins and profiles,
 but will otherwise purge the database. This includes messages, which
 rooms the home server was a member of and room alias mappings.
 
@@ -175,18 +184,18 @@ file and ask for help in #matrix:matrix.org. The upgrade process is,
 unfortunately, non trivial and requires human intervention to resolve any
 resulting conflicts during the upgrade process.
 
-Before running the command the homeserver should be first completely 
+Before running the command the homeserver should be first completely
 shutdown. To run it, simply specify the location of the database, e.g.:
 
   ./scripts/database-prepare-for-0.5.0.sh "homeserver.db"
 
-Once this has successfully completed it will be safe to restart the 
-homeserver. You may notice that the homeserver takes a few seconds longer to 
+Once this has successfully completed it will be safe to restart the
+homeserver. You may notice that the homeserver takes a few seconds longer to
 restart than usual as it reinitializes the database.
 
 On startup of the new version, users can either rejoin remote rooms using room
 aliases or by being reinvited. Alternatively, if any other homeserver sends a
-message to a room that the homeserver was previously in the local HS will 
+message to a room that the homeserver was previously in the local HS will
 automatically rejoin the room.
 
 Upgrading to v0.4.0
@@ -245,7 +254,7 @@ automatically generate default config use::
         --config-path homeserver.config \
         --generate-config
 
-This config can be edited if desired, for example to specify a different SSL 
+This config can be edited if desired, for example to specify a different SSL
 certificate to use. Once done you can run the home server using::
 
     $ python synapse/app/homeserver.py --config-path homeserver.config
@@ -266,20 +275,20 @@ This release completely changes the database schema and so requires upgrading
 it before starting the new version of the homeserver.
 
 The script "database-prepare-for-0.0.1.sh" should be used to upgrade the
-database. This will save all user information, such as logins and profiles, 
+database. This will save all user information, such as logins and profiles,
 but will otherwise purge the database. This includes messages, which
 rooms the home server was a member of and room alias mappings.
 
-Before running the command the homeserver should be first completely 
+Before running the command the homeserver should be first completely
 shutdown. To run it, simply specify the location of the database, e.g.:
 
   ./scripts/database-prepare-for-0.0.1.sh "homeserver.db"
 
-Once this has successfully completed it will be safe to restart the 
-homeserver. You may notice that the homeserver takes a few seconds longer to 
+Once this has successfully completed it will be safe to restart the
+homeserver. You may notice that the homeserver takes a few seconds longer to
 restart than usual as it reinitializes the database.
 
 On startup of the new version, users can either rejoin remote rooms using room
 aliases or by being reinvited. Alternatively, if any other homeserver sends a
-message to a room that the homeserver was previously in the local HS will 
+message to a room that the homeserver was previously in the local HS will
 automatically rejoin the room.

+ 20 - 0
contrib/prometheus/README

@@ -0,0 +1,20 @@
+This directory contains some sample monitoring config for using the
+'Prometheus' monitoring server against synapse.
+
+To use it, first install prometheus by following the instructions at
+
+  http://prometheus.io/
+
+Then add a new job to the main prometheus.conf file:
+
+  job: {
+    name: "synapse"
+
+    target_group: {
+      target: "http://SERVER.LOCATION.HERE:PORT/_synapse/metrics"
+    }
+  }
+
+Metrics are disabled by default when running synapse; they must be enabled
+with the 'enable-metrics' option, either in the synapse config file or as a
+command-line option.

+ 395 - 0
contrib/prometheus/consoles/synapse.html

@@ -0,0 +1,395 @@
+{{ template "head" . }}
+
+{{ template "prom_content_head" . }}
+<h1>System Resources</h1>
+
+<h3>CPU</h3>
+<div id="process_resource_utime"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#process_resource_utime"),
+  expr: "rate(process_cpu_seconds_total[2m]) * 100",
+  name: "[[job]]",  
+  min: 0,
+  max: 100,
+  renderer: "line",
+  height: 150,
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "%",
+  yTitle: "CPU Usage"
+})
+</script>
+
+<h3>Memory</h3>
+<div id="process_resource_maxrss"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#process_resource_maxrss"),
+  expr: "process_psutil_rss:max",
+  name: "Maxrss",
+  min: 0,
+  renderer: "line",
+  height: 150,
+  yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yUnits: "bytes",
+  yTitle: "Usage"
+})
+</script>
+
+<h3>File descriptors</h3>
+<div id="process_fds"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#process_fds"),
+  expr: "process_open_fds{job='synapse'}",
+  name: "FDs",
+  min: 0,
+  renderer: "line",
+  height: 150,
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "",
+  yTitle: "Descriptors"
+})
+</script>
+
+<h1>Reactor</h1>
+
+<h3>Total reactor time</h3>
+<div id="reactor_total_time"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#reactor_total_time"),
+  expr: "rate(python_twisted_reactor_tick_time:total[2m]) / 1000",
+  name: "time",
+  max: 1,
+  min: 0,
+  renderer: "area",
+  height: 150,
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s/s",
+  yTitle: "Usage"
+})
+</script>
+
+<h3>Average reactor tick time</h3>
+<div id="reactor_average_time"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#reactor_average_time"),
+  expr: "rate(python_twisted_reactor_tick_time:total[2m]) / rate(python_twisted_reactor_tick_time:count[2m]) / 1000",
+  name: "time",
+  min: 0,
+  renderer: "line",
+  height: 150,
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s",
+  yTitle: "Time"
+})
+</script>
+
+<h3>Pending calls per tick</h3>
+<div id="reactor_pending_calls"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#reactor_pending_calls"),
+  expr: "rate(python_twisted_reactor_pending_calls:total[30s])/rate(python_twisted_reactor_pending_calls:count[30s])",
+  name: "calls",
+  min: 0,
+  renderer: "line",
+  height: 150,
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yTitle: "Pending Cals"
+})
+</script>
+
+<h1>Storage</h1>
+
+<h3>Queries</h3>
+<div id="synapse_storage_query_time"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_storage_query_time"),
+  expr: "rate(synapse_storage_query_time:count[2m])",
+  name: "[[verb]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yUnits: "queries/s",
+  yTitle: "Queries"
+})
+</script>
+
+<h3>Transactions</h3>
+<div id="synapse_storage_transaction_time"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_storage_transaction_time"),
+  expr: "rate(synapse_storage_transaction_time:count[2m])",
+  name: "[[desc]]",
+  min: 0,
+  yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yUnits: "txn/s",
+  yTitle: "Transactions"
+})
+</script>
+
+<h3>Transaction execution time</h3>
+<div id="synapse_storage_transactions_time_msec"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_storage_transactions_time_msec"),
+  expr: "rate(synapse_storage_transaction_time:total[2m]) / 1000",
+  name: "[[desc]]",
+  min: 0,
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s/s",
+  yTitle: "Usage"
+})
+</script>
+
+<h3>Database scheduling latency</h3>
+<div id="synapse_storage_schedule_time"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_storage_schedule_time"),
+  expr: "rate(synapse_storage_schedule_time:total[2m]) / 1000",
+  name: "Total latency",
+  min: 0,
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s/s",
+  yTitle: "Usage"
+})
+</script>
+
+<h3>Cache hit ratio</h3>
+<div id="synapse_cache_ratio"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_cache_ratio"),
+  expr: "rate(synapse_util_caches_cache:total[2m]) * 100",
+  name: "[[name]]",
+  min: 0,
+  max: 100,
+  yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yUnits: "%",
+  yTitle: "Percentage"
+})
+</script>
+
+<h3>Cache size</h3>
+<div id="synapse_cache_size"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_cache_size"),
+  expr: "synapse_util_caches_cache:size",
+  name: "[[name]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yUnits: "",
+  yTitle: "Items"
+})
+</script>
+
+<h1>Requests</h1>
+
+<h3>Requests by Servlet</h3>
+<div id="synapse_http_server_requests_servlet"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_requests_servlet"),
+  expr: "rate(synapse_http_server_requests:servlet[2m])",
+  name: "[[servlet]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "req/s",
+  yTitle: "Requests"
+})
+</script>
+<h4>&nbsp;(without <tt>EventStreamRestServlet</tt> or <tt>SyncRestServlet</tt>)</h4>
+<div id="synapse_http_server_requests_servlet_minus_events"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_requests_servlet_minus_events"),
+  expr: "rate(synapse_http_server_requests:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])",
+  name: "[[servlet]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "req/s",
+  yTitle: "Requests"
+})
+</script>
+
+<h3>Average response times</h3>
+<div id="synapse_http_server_response_time_avg"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_response_time_avg"),
+  expr: "rate(synapse_http_server_response_time:total[2m]) / rate(synapse_http_server_response_time:count[2m]) / 1000",
+  name: "[[servlet]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s/req",
+  yTitle: "Response time"
+})
+</script>
+
+<h3>All responses by code</h3>
+<div id="synapse_http_server_responses"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_responses"),
+  expr: "rate(synapse_http_server_responses[2m])",
+  name: "[[method]] / [[code]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "req/s",
+  yTitle: "Requests"
+})
+</script>
+
+<h3>Error responses by code</h3>
+<div id="synapse_http_server_responses_err"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_responses_err"),
+  expr: "rate(synapse_http_server_responses{code=~\"[45]..\"}[2m])",
+  name: "[[method]] / [[code]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "req/s",
+  yTitle: "Requests"
+})
+</script>
+
+
+<h3>CPU Usage</h3>
+<div id="synapse_http_server_response_ru_utime"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_response_ru_utime"),
+  expr: "rate(synapse_http_server_response_ru_utime:total[2m])",
+  name: "[[servlet]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s/s",
+  yTitle: "CPU Usage"
+})
+</script>
+
+
+<h3>DB Usage</h3>
+<div id="synapse_http_server_response_db_txn_duration"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_response_db_txn_duration"),
+  expr: "rate(synapse_http_server_response_db_txn_duration:total[2m])",
+  name: "[[servlet]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s/s",
+  yTitle: "DB Usage"
+})
+</script>
+
+
+<h3>Average event send times</h3>
+<div id="synapse_http_server_send_time_avg"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_send_time_avg"),
+  expr: "rate(synapse_http_server_response_time:total{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_time:count{servlet='RoomSendEventRestServlet'}[2m]) / 1000",
+  name: "[[servlet]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s/req",
+  yTitle: "Response time"
+})
+</script>
+
+<h1>Federation</h1>
+
+<h3>Sent Messages</h3>
+<div id="synapse_federation_client_sent"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_federation_client_sent"),
+  expr: "rate(synapse_federation_client_sent[2m])",
+  name: "[[type]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "req/s",
+  yTitle: "Requests"
+})
+</script>
+
+<h3>Received Messages</h3>
+<div id="synapse_federation_server_received"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_federation_server_received"),
+  expr: "rate(synapse_federation_server_received[2m])",
+  name: "[[type]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "req/s",
+  yTitle: "Requests"
+})
+</script>
+
+<h3>Pending</h3>
+<div id="synapse_federation_transaction_queue_pending"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_federation_transaction_queue_pending"),
+  expr: "synapse_federation_transaction_queue_pending",
+  name: "[[type]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yUnits: "",
+  yTitle: "Units"
+})
+</script>
+
+<h1>Clients</h1>
+
+<h3>Notifiers</h3>
+<div id="synapse_notifier_listeners"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_notifier_listeners"),
+  expr: "synapse_notifier_listeners",
+  name: "listeners",
+  min: 0,
+  yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yUnits: "",
+  yTitle: "Listeners"
+})
+</script>
+
+<h3>Notified Events</h3>
+<div id="synapse_notifier_notified_events"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_notifier_notified_events"),
+  expr: "rate(synapse_notifier_notified_events[2m])",
+  name: "events",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "events/s",
+  yTitle: "Event rate"
+})
+</script>
+
+{{ template "prom_content_tail" . }}
+
+{{ template "tail" }}

+ 21 - 0
contrib/prometheus/synapse.rules

@@ -0,0 +1,21 @@
+synapse_federation_transaction_queue_pendingEdus:total = sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)
+synapse_federation_transaction_queue_pendingPdus:total = sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)
+
+synapse_http_server_requests:method{servlet=""} = sum(synapse_http_server_requests) by (method)
+synapse_http_server_requests:servlet{method=""} = sum(synapse_http_server_requests) by (servlet)
+
+synapse_http_server_requests:total{servlet=""} = sum(synapse_http_server_requests:by_method) by (servlet)
+
+synapse_cache:hit_ratio_5m = rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])
+synapse_cache:hit_ratio_30s = rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])
+
+synapse_federation_client_sent{type="EDU"} = synapse_federation_client_sent_edus + 0
+synapse_federation_client_sent{type="PDU"} = synapse_federation_client_sent_pdu_destinations:count + 0
+synapse_federation_client_sent{type="Query"} = sum(synapse_federation_client_sent_queries) by (job)
+
+synapse_federation_server_received{type="EDU"} = synapse_federation_server_received_edus + 0
+synapse_federation_server_received{type="PDU"} = synapse_federation_server_received_pdus + 0
+synapse_federation_server_received{type="Query"} = sum(synapse_federation_server_received_queries) by (job)
+
+synapse_federation_transaction_queue_pending{type="EDU"} = synapse_federation_transaction_queue_pending_edus + 0
+synapse_federation_transaction_queue_pending{type="PDU"} = synapse_federation_transaction_queue_pending_pdus + 0

+ 2 - 0
docs/postgres.rst

@@ -1,6 +1,8 @@
 Using Postgres
 --------------
 
+Postgres version 9.4 or later is known to work.
+
 Set up database
 ===============
 

+ 1 - 0
jenkins-dendron-haproxy-postgres.sh

@@ -17,6 +17,7 @@ export HAPROXY_BIN=/home/haproxy/haproxy-1.6.11/haproxy
 ./sytest/jenkins/prep_sytest_for_postgres.sh
 
 ./sytest/jenkins/install_and_run.sh \
+    --python $WORKSPACE/.tox/py27/bin/python \
     --synapse-directory $WORKSPACE \
     --dendron $WORKSPACE/dendron/bin/dendron \
     --haproxy \

+ 1 - 0
jenkins-dendron-postgres.sh

@@ -15,5 +15,6 @@ export SYNAPSE_CACHE_FACTOR=1
 ./sytest/jenkins/prep_sytest_for_postgres.sh
 
 ./sytest/jenkins/install_and_run.sh \
+    --python $WORKSPACE/.tox/py27/bin/python \
     --synapse-directory $WORKSPACE \
     --dendron $WORKSPACE/dendron/bin/dendron \

+ 1 - 0
jenkins-postgres.sh

@@ -14,4 +14,5 @@ export SYNAPSE_CACHE_FACTOR=1
 ./sytest/jenkins/prep_sytest_for_postgres.sh
 
 ./sytest/jenkins/install_and_run.sh \
+    --python $WORKSPACE/.tox/py27/bin/python \
     --synapse-directory $WORKSPACE \

+ 1 - 0
jenkins-sqlite.sh

@@ -12,4 +12,5 @@ export SYNAPSE_CACHE_FACTOR=1
 ./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
 
 ./sytest/jenkins/install_and_run.sh \
+    --python $WORKSPACE/.tox/py27/bin/python \
     --synapse-directory $WORKSPACE \

+ 79 - 8
scripts-dev/federation_client.py

@@ -1,10 +1,30 @@
+#!/usr/bin/env python
+#
+# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+
+import argparse
 import nacl.signing
 import json
 import base64
 import requests
 import sys
 import srvlookup
-
+import yaml
 
 def encode_base64(input_bytes):
     """Encode bytes as a base64 string without any padding."""
@@ -120,11 +140,13 @@ def get_json(origin_name, origin_key, destination, path):
             origin_name, key, sig,
         )
         authorization_headers.append(bytes(header))
-        sys.stderr.write(header)
-        sys.stderr.write("\n")
+        print ("Authorization: %s" % header, file=sys.stderr)
+
+    dest = lookup(destination, path)
+    print ("Requesting %s" % dest, file=sys.stderr)
 
     result = requests.get(
-        lookup(destination, path),
+        dest,
         headers={"Authorization": authorization_headers[0]},
         verify=False,
     )
@@ -133,17 +155,66 @@ def get_json(origin_name, origin_key, destination, path):
 
 
 def main():
-    origin_name, keyfile, destination, path = sys.argv[1:]
+    parser = argparse.ArgumentParser(
+        description=
+            "Signs and sends a federation request to a matrix homeserver",
+    )
+
+    parser.add_argument(
+        "-N", "--server-name",
+        help="Name to give as the local homeserver. If unspecified, will be "
+             "read from the config file.",
+    )
+
+    parser.add_argument(
+        "-k", "--signing-key-path",
+        help="Path to the file containing the private ed25519 key to sign the "
+             "request with.",
+    )
+
+    parser.add_argument(
+        "-c", "--config",
+        default="homeserver.yaml",
+        help="Path to server config file. Ignored if --server-name and "
+             "--signing-key-path are both given.",
+    )
 
-    with open(keyfile) as f:
+    parser.add_argument(
+        "-d", "--destination",
+        default="matrix.org",
+        help="name of the remote homeserver. We will do SRV lookups and "
+             "connect appropriately.",
+    )
+
+    parser.add_argument(
+        "path",
+        help="request path. We will add '/_matrix/federation/v1/' to this."
+    )
+
+    args = parser.parse_args()
+
+    if not args.server_name or not args.signing_key_path:
+        read_args_from_config(args)
+
+    with open(args.signing_key_path) as f:
         key = read_signing_keys(f)[0]
 
     result = get_json(
-        origin_name, key, destination, "/_matrix/federation/v1/" + path
+        args.server_name, key, args.destination, "/_matrix/federation/v1/" + args.path
     )
 
     json.dump(result, sys.stdout)
-    print ""
+    print ("")
+
+
+def read_args_from_config(args):
+    with open(args.config, 'r') as fh:
+        config = yaml.safe_load(fh)
+        if not args.server_name:
+            args.server_name = config['server_name']
+        if not args.signing_key_path:
+            args.signing_key_path = config['signing_key_path']
+
 
 if __name__ == "__main__":
     main()

+ 19 - 0
scripts/synapse_port_db

@@ -252,6 +252,25 @@ class Porter(object):
             )
             return
 
+        if table in (
+            "user_directory", "user_directory_search", "users_who_share_rooms",
+            "users_in_pubic_room",
+        ):
+            # We don't port these tables, as they're a faff and we can regenreate
+            # them anyway.
+            self.progress.update(table, table_size)  # Mark table as done
+            return
+
+        if table == "user_directory_stream_pos":
+            # We need to make sure there is a single row, `(X, null), as that is
+            # what synapse expects to be there.
+            yield self.postgres_store._simple_insert(
+                table=table,
+                values={"stream_id": None},
+            )
+            self.progress.update(table, table_size)  # Mark table as done
+            return
+
         forward_select = (
             "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?"
             % (table,)

+ 99 - 0
synapse/app/_base.py

@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import gc
+import logging
+
+import affinity
+from daemonize import Daemonize
+from synapse.util import PreserveLoggingContext
+from synapse.util.rlimit import change_resource_limit
+from twisted.internet import reactor
+
+
+def start_worker_reactor(appname, config):
+    """ Run the reactor in the main process
+
+    Daemonizes if necessary, and then configures some resources, before starting
+    the reactor. Pulls configuration from the 'worker' settings in 'config'.
+
+    Args:
+        appname (str): application name which will be sent to syslog
+        config (synapse.config.Config): config object
+    """
+
+    logger = logging.getLogger(config.worker_app)
+
+    start_reactor(
+        appname,
+        config.soft_file_limit,
+        config.gc_thresholds,
+        config.worker_pid_file,
+        config.worker_daemonize,
+        config.worker_cpu_affinity,
+        logger,
+    )
+
+
+def start_reactor(
+        appname,
+        soft_file_limit,
+        gc_thresholds,
+        pid_file,
+        daemonize,
+        cpu_affinity,
+        logger,
+):
+    """ Run the reactor in the main process
+
+    Daemonizes if necessary, and then configures some resources, before starting
+    the reactor
+
+    Args:
+        appname (str): application name which will be sent to syslog
+        soft_file_limit (int):
+        gc_thresholds:
+        pid_file (str): name of pid file to write to if daemonize is True
+        daemonize (bool): true to run the reactor in a background process
+        cpu_affinity (int|None): cpu affinity mask
+        logger (logging.Logger): logger instance to pass to Daemonize
+    """
+
+    def run():
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
+            logger.info("Running")
+            if cpu_affinity is not None:
+                logger.info("Setting CPU affinity to %s" % cpu_affinity)
+                affinity.set_process_affinity_mask(0, cpu_affinity)
+            change_resource_limit(soft_file_limit)
+            if gc_thresholds:
+                gc.set_threshold(*gc_thresholds)
+            reactor.run()
+
+    if daemonize:
+        daemon = Daemonize(
+            app=appname,
+            pid=pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()

+ 10 - 40
synapse/app/appservice.py

@@ -13,38 +13,31 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.appservice")
 
 
@@ -181,36 +174,13 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-appservice",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-appservice", config)
 
 
 if __name__ == '__main__':

+ 11 - 42
synapse/app/client_reader.py

@@ -13,47 +13,39 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
-from synapse.http.site import SynapseSite
+from synapse.crypto import context_factory
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
-from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import PublicRoomListRestServlet
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.client_reader")
 
 
@@ -183,36 +175,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-client-reader",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-client-reader", config)
 
 
 if __name__ == '__main__':

+ 11 - 42
synapse/app/federation_reader.py

@@ -13,44 +13,36 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.api.urls import FEDERATION_PREFIX
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
+from synapse.federation.transport.server import TransportLayerServer
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.api.urls import FEDERATION_PREFIX
-from synapse.federation.transport.server import TransportLayerServer
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.federation_reader")
 
 
@@ -172,36 +164,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-federation-reader",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-federation-reader", config)
 
 
 if __name__ == '__main__':

+ 13 - 44
synapse/app/federation_sender.py

@@ -13,44 +13,37 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
 from synapse.federation import send_queue
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.federation_sender")
 
 
@@ -213,36 +206,12 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
-
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-federation-sender",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-federation-sender", config)
 
 
 class FederationSenderHandler(object):

+ 16 - 48
synapse/app/frontend_proxy.py

@@ -13,48 +13,39 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.api.errors import SynapseError
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
-from synapse.http.site import SynapseSite
+from synapse.crypto import context_factory
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.servlet import (
+    RestServlet, parse_json_object_from_request,
+)
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v2_alpha._base import client_v2_patterns
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.crypto import context_factory
-from synapse.api.errors import SynapseError
-from synapse.http.servlet import (
-    RestServlet, parse_json_object_from_request,
-)
-from synapse.rest.client.v2_alpha._base import client_v2_patterns
-
-from synapse import events
-
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
-
 logger = logging.getLogger("synapse.app.frontend_proxy")
 
 
@@ -234,36 +225,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-frontend-proxy",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-frontend-proxy", config)
 
 
 if __name__ == '__main__':

+ 41 - 73
synapse/app/homeserver.py

@@ -13,61 +13,48 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import synapse
-
 import gc
 import logging
 import os
 import sys
 
+import synapse
 import synapse.config.logger
+from synapse import events
+from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \
+    LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \
+    STATIC_PREFIX, WEB_CLIENT_PREFIX
+from synapse.app import _base
 from synapse.config._base import ConfigError
-
-from synapse.python_dependencies import (
-    check_requirements, CONDITIONAL_REQUIREMENTS
-)
-
-from synapse.rest import ClientRestResource
-from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
-from synapse.storage import are_all_users_on_domain
-from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
-
-from synapse.server import HomeServer
-
-from twisted.internet import reactor, defer
-from twisted.application import service
-from twisted.web.resource import Resource, EncodingResourceWrapper
-from twisted.web.static import File
-from twisted.web.server import GzipEncoderFactory
-from synapse.http.server import RootRedirect
-from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
-from synapse.rest.key.v1.server_key_resource import LocalKey
-from synapse.rest.key.v2 import KeyApiV2Resource
-from synapse.api.urls import (
-    FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX,
-    SERVER_KEY_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, STATIC_PREFIX,
-    SERVER_KEY_V2_PREFIX,
-)
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.federation.transport.server import TransportLayerServer
+from synapse.http.server import RootRedirect
+from synapse.http.site import SynapseSite
 from synapse.metrics import register_memory_metrics
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
+    check_requirements
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
-from synapse.federation.transport.server import TransportLayerServer
-
-from synapse.util.rlimit import change_resource_limit
-from synapse.util.versionstring import get_version_string
+from synapse.rest import ClientRestResource
+from synapse.rest.key.v1.server_key_resource import LocalKey
+from synapse.rest.key.v2 import KeyApiV2Resource
+from synapse.rest.media.v0.content_repository import ContentRepoResource
+from synapse.rest.media.v1.media_repository import MediaRepositoryResource
+from synapse.server import HomeServer
+from synapse.storage import are_all_users_on_domain
+from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
+from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
 from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-
-from synapse.http.site import SynapseSite
-
-from synapse import events
-
-from daemonize import Daemonize
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from twisted.application import service
+from twisted.internet import defer, reactor
+from twisted.web.resource import EncodingResourceWrapper, Resource
+from twisted.web.server import GzipEncoderFactory
+from twisted.web.static import File
 
 logger = logging.getLogger("synapse.app.homeserver")
 
@@ -446,37 +433,18 @@ def run(hs):
         # be quite busy the first few minutes
         clock.call_later(5 * 60, phone_stats_home)
 
-    def in_thread():
-        # Uncomment to enable tracing of log context changes.
-        # sys.settrace(logcontext_tracer)
-
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            change_resource_limit(hs.config.soft_file_limit)
-            if hs.config.gc_thresholds:
-                gc.set_threshold(*hs.config.gc_thresholds)
-            reactor.run()
-
-    if hs.config.daemonize:
-
-        if hs.config.print_pidfile:
-            print (hs.config.pid_file)
-
-        daemon = Daemonize(
-            app="synapse-homeserver",
-            pid=hs.config.pid_file,
-            action=lambda: in_thread(),
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-
-        daemon.start()
-    else:
-        in_thread()
+    if hs.config.daemonize and hs.config.print_pidfile:
+        print (hs.config.pid_file)
+
+    _base.start_reactor(
+        "synapse-homeserver",
+        hs.config.soft_file_limit,
+        hs.config.gc_thresholds,
+        hs.config.pid_file,
+        hs.config.daemonize,
+        hs.config.cpu_affinity,
+        logger,
+    )
 
 
 def main():

+ 11 - 42
synapse/app/media_repository.py

@@ -13,14 +13,21 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.api.urls import (
+    CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
+)
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -33,27 +40,12 @@ from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.media_repository import MediaRepositoryStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.api.urls import (
-    CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
-)
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.media_repository")
 
 
@@ -180,36 +172,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-media-repository",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-media-repository", config)
 
 
 if __name__ == '__main__':

+ 13 - 44
synapse/app/pusher.py

@@ -13,41 +13,33 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.storage.roommember import RoomMemberStore
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
-from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.storage.engines import create_engine
+from synapse.server import HomeServer
 from synapse.storage import DataStore
+from synapse.storage.engines import create_engine
+from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn, \
-    PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.pusher")
 
 
@@ -244,18 +236,6 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_pusherpool().start()
         ps.get_datastore().start_profiling()
@@ -263,18 +243,7 @@ def start(config_options):
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-pusher",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-pusher", config)
 
 
 if __name__ == '__main__':

+ 20 - 49
synapse/app/synchrotron.py

@@ -13,57 +13,51 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import contextlib
+import logging
+import sys
 
 import synapse
-
 from synapse.api.constants import EventTypes
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
 from synapse.handlers.presence import PresenceHandler, get_interested_parties
-from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.rest.client.v2_alpha import sync
-from synapse.rest.client.v1 import events
-from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
-from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
-from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
-from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
-from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.filtering import SlavedFilteringStore
-from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.groups import SlavedGroupServerStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1 import events
+from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
+from synapse.rest.client.v2_alpha import sync
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.stringutils import random_string
 from synapse.util.versionstring import get_version_string
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import contextlib
-import gc
-
 logger = logging.getLogger("synapse.app.synchrotron")
 
 
@@ -446,36 +440,13 @@ def start(config_options):
     ss.setup()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_datastore().start_profiling()
         ss.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-synchrotron",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-synchrotron", config)
 
 
 if __name__ == '__main__':

+ 12 - 41
synapse/app/user_dir.py

@@ -14,16 +14,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import synapse
+import logging
+import sys
 
-from synapse.server import HomeServer
+import synapse
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -31,26 +34,17 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v2_alpha import user_directory
+from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.user_directory import UserDirectoryStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.util.caches.stream_change_cache import StreamChangeCache
-
-from synapse import events
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.user_dir")
 
 
@@ -233,36 +227,13 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-user-dir",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-user-dir", config)
 
 
 if __name__ == '__main__':

+ 23 - 0
synapse/config/server.py

@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -29,6 +30,7 @@ class ServerConfig(Config):
         self.user_agent_suffix = config.get("user_agent_suffix")
         self.use_frozen_dicts = config.get("use_frozen_dicts", False)
         self.public_baseurl = config.get("public_baseurl")
+        self.cpu_affinity = config.get("cpu_affinity")
 
         # Whether to send federation traffic out in this process. This only
         # applies to some federation traffic, and so shouldn't be used to
@@ -147,6 +149,27 @@ class ServerConfig(Config):
         # When running as a daemon, the file to store the pid in
         pid_file: %(pid_file)s
 
+        # CPU affinity mask. Setting this restricts the CPUs on which the
+        # process will be scheduled. It is represented as a bitmask, with the
+        # lowest order bit corresponding to the first logical CPU and the
+        # highest order bit corresponding to the last logical CPU. Not all CPUs
+        # may exist on a given system but a mask may specify more CPUs than are
+        # present.
+        #
+        # For example:
+        #    0x00000001  is processor #0,
+        #    0x00000003  is processors #0 and #1,
+        #    0xFFFFFFFF  is all processors (#0 through #31).
+        #
+        # Pinning a Python process to a single CPU is desirable, because Python
+        # is inherently single-threaded due to the GIL, and can suffer a
+        # 30-40%% slowdown due to cache blow-out and thread context switching
+        # if the scheduler happens to schedule the underlying threads across
+        # different cores. See
+        # https://www.mirantis.com/blog/improve-performance-python-programs-restricting-single-cpu/.
+        #
+        # cpu_affinity: 0xFFFFFFFF
+
         # Whether to serve a web client from the HTTP/HTTPS root resource.
         web_client: True
 

+ 1 - 0
synapse/config/workers.py

@@ -33,6 +33,7 @@ class WorkerConfig(Config):
         self.worker_name = config.get("worker_name", self.worker_app)
 
         self.worker_main_http_uri = config.get("worker_main_http_uri", None)
+        self.worker_cpu_affinity = config.get("worker_cpu_affinity")
 
         if self.worker_listeners:
             for listener in self.worker_listeners:

+ 36 - 28
synapse/crypto/keyring.py

@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,10 +16,9 @@
 
 from synapse.crypto.keyclient import fetch_server_key
 from synapse.api.errors import SynapseError, Codes
-from synapse.util import unwrapFirstError
-from synapse.util.async import ObservableDeferred
+from synapse.util import unwrapFirstError, logcontext
 from synapse.util.logcontext import (
-    preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext,
+    preserve_context_over_fn, PreserveLoggingContext,
     preserve_fn
 )
 from synapse.util.metrics import Measure
@@ -74,6 +74,11 @@ class Keyring(object):
         self.perspective_servers = self.config.perspectives
         self.hs = hs
 
+        # map from server name to Deferred. Has an entry for each server with
+        # an ongoing key download; the Deferred completes once the download
+        # completes.
+        #
+        # These are regular, logcontext-agnostic Deferreds.
         self.key_downloads = {}
 
     def verify_json_for_server(self, server_name, json_object):
@@ -82,7 +87,7 @@ class Keyring(object):
         )[0]
 
     def verify_json_objects_for_server(self, server_and_json):
-        """Bulk verfies signatures of json objects, bulk fetching keys as
+        """Bulk verifies signatures of json objects, bulk fetching keys as
         necessary.
 
         Args:
@@ -212,7 +217,13 @@ class Keyring(object):
         Args:
             server_names (list): list of server_names we want to lookup
             server_to_deferred (dict): server_name to deferred which gets
-                resolved once we've finished looking up keys for that server
+                resolved once we've finished looking up keys for that server.
+                The Deferreds should be regular twisted ones which call their
+                callbacks with no logcontext.
+
+        Returns: a Deferred which resolves once all key lookups for the given
+            servers have completed. Follows the synapse rules of logcontext
+            preservation.
         """
         while True:
             wait_on = [
@@ -226,15 +237,13 @@ class Keyring(object):
             else:
                 break
 
-        for server_name, deferred in server_to_deferred.items():
-            d = ObservableDeferred(preserve_context_over_deferred(deferred))
-            self.key_downloads[server_name] = d
-
-            def rm(r, server_name):
-                self.key_downloads.pop(server_name, None)
-                return r
+        def rm(r, server_name_):
+            self.key_downloads.pop(server_name_, None)
+            return r
 
-            d.addBoth(rm, server_name)
+        for server_name, deferred in server_to_deferred.items():
+            self.key_downloads[server_name] = deferred
+            deferred.addBoth(rm, server_name)
 
     def get_server_verify_keys(self, verify_requests):
         """Tries to find at least one key for each verify request
@@ -333,7 +342,7 @@ class Keyring(object):
             Deferred: resolves to dict[str, dict[str, VerifyKey]]: map from
                 server_name -> key_id -> VerifyKey
         """
-        res = yield preserve_context_over_deferred(defer.gatherResults(
+        res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store.get_server_verify_keys)(
                     server_name, key_ids
@@ -341,7 +350,7 @@ class Keyring(object):
                 for server_name, key_ids in server_name_and_key_ids
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         defer.returnValue(dict(res))
 
@@ -362,13 +371,13 @@ class Keyring(object):
                 )
                 defer.returnValue({})
 
-        results = yield preserve_context_over_deferred(defer.gatherResults(
+        results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(get_key)(p_name, p_keys)
                 for p_name, p_keys in self.perspective_servers.items()
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         union_of_keys = {}
         for result in results:
@@ -402,13 +411,13 @@ class Keyring(object):
 
             defer.returnValue(keys)
 
-        results = yield preserve_context_over_deferred(defer.gatherResults(
+        results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(get_key)(server_name, key_ids)
                 for server_name, key_ids in server_name_and_key_ids
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         merged = {}
         for result in results:
@@ -485,7 +494,7 @@ class Keyring(object):
             for server_name, response_keys in processed_response.items():
                 keys.setdefault(server_name, {}).update(response_keys)
 
-        yield preserve_context_over_deferred(defer.gatherResults(
+        yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store_keys)(
                     server_name=server_name,
@@ -495,7 +504,7 @@ class Keyring(object):
                 for server_name, response_keys in keys.items()
             ],
             consumeErrors=True
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         defer.returnValue(keys)
 
@@ -543,7 +552,7 @@ class Keyring(object):
 
             keys.update(response_keys)
 
-        yield preserve_context_over_deferred(defer.gatherResults(
+        yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store_keys)(
                     server_name=key_server_name,
@@ -553,7 +562,7 @@ class Keyring(object):
                 for key_server_name, verify_keys in keys.items()
             ],
             consumeErrors=True
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         defer.returnValue(keys)
 
@@ -619,7 +628,7 @@ class Keyring(object):
         response_keys.update(verify_keys)
         response_keys.update(old_verify_keys)
 
-        yield preserve_context_over_deferred(defer.gatherResults(
+        yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store.store_server_keys_json)(
                     server_name=server_name,
@@ -632,7 +641,7 @@ class Keyring(object):
                 for key_id in updated_key_ids
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         results[server_name] = response_keys
 
@@ -710,7 +719,6 @@ class Keyring(object):
 
         defer.returnValue(verify_keys)
 
-    @defer.inlineCallbacks
     def store_keys(self, server_name, from_server, verify_keys):
         """Store a collection of verify keys for a given server
         Args:
@@ -721,7 +729,7 @@ class Keyring(object):
             A deferred that completes when the keys are stored.
         """
         # TODO(markjh): Store whether the keys have expired.
-        yield preserve_context_over_deferred(defer.gatherResults(
+        return logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store.store_server_verify_key)(
                     server_name, server_name, key.time_added, key
@@ -729,4 +737,4 @@ class Keyring(object):
                 for key_id, key in verify_keys.items()
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))

+ 57 - 11
synapse/handlers/device.py

@@ -270,6 +270,8 @@ class DeviceHandler(BaseHandler):
             user_id (str)
             from_token (StreamToken)
         """
+        now_token = yield self.hs.get_event_sources().get_current_token()
+
         room_ids = yield self.store.get_rooms_for_user(user_id)
 
         # First we check if any devices have changed
@@ -280,11 +282,30 @@ class DeviceHandler(BaseHandler):
         # Then work out if any users have since joined
         rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
 
+        member_events = yield self.store.get_membership_changes_for_user(
+            user_id, from_token.room_key, now_token.room_key
+        )
+        rooms_changed.update(event.room_id for event in member_events)
+
         stream_ordering = RoomStreamToken.parse_stream_token(
-            from_token.room_key).stream
+            from_token.room_key
+        ).stream
 
         possibly_changed = set(changed)
+        possibly_left = set()
         for room_id in rooms_changed:
+            current_state_ids = yield self.store.get_current_state_ids(room_id)
+
+            # The user may have left the room
+            # TODO: Check if they actually did or if we were just invited.
+            if room_id not in room_ids:
+                for key, event_id in current_state_ids.iteritems():
+                    etype, state_key = key
+                    if etype != EventTypes.Member:
+                        continue
+                    possibly_left.add(state_key)
+                continue
+
             # Fetch the current state at the time.
             try:
                 event_ids = yield self.store.get_forward_extremeties_for_room(
@@ -295,8 +316,6 @@ class DeviceHandler(BaseHandler):
                 # ordering: treat it the same as a new room
                 event_ids = []
 
-            current_state_ids = yield self.store.get_current_state_ids(room_id)
-
             # special-case for an empty prev state: include all members
             # in the changed list
             if not event_ids:
@@ -307,9 +326,25 @@ class DeviceHandler(BaseHandler):
                     possibly_changed.add(state_key)
                 continue
 
+            current_member_id = current_state_ids.get((EventTypes.Member, user_id))
+            if not current_member_id:
+                continue
+
             # mapping from event_id -> state_dict
             prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
 
+            # Check if we've joined the room? If so we just blindly add all the users to
+            # the "possibly changed" users.
+            for state_dict in prev_state_ids.itervalues():
+                member_event = state_dict.get((EventTypes.Member, user_id), None)
+                if not member_event or member_event != current_member_id:
+                    for key, event_id in current_state_ids.iteritems():
+                        etype, state_key = key
+                        if etype != EventTypes.Member:
+                            continue
+                        possibly_changed.add(state_key)
+                    break
+
             # If there has been any change in membership, include them in the
             # possibly changed list. We'll check if they are joined below,
             # and we're not toooo worried about spuriously adding users.
@@ -320,19 +355,30 @@ class DeviceHandler(BaseHandler):
 
                 # check if this member has changed since any of the extremities
                 # at the stream_ordering, and add them to the list if so.
-                for state_dict in prev_state_ids.values():
+                for state_dict in prev_state_ids.itervalues():
                     prev_event_id = state_dict.get(key, None)
                     if not prev_event_id or prev_event_id != event_id:
-                        possibly_changed.add(state_key)
+                        if state_key != user_id:
+                            possibly_changed.add(state_key)
                         break
 
-        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-            user_id
-        )
+        if possibly_changed or possibly_left:
+            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+                user_id
+            )
 
-        # Take the intersection of the users whose devices may have changed
-        # and those that actually still share a room with the user
-        defer.returnValue(users_who_share_room & possibly_changed)
+            # Take the intersection of the users whose devices may have changed
+            # and those that actually still share a room with the user
+            possibly_joined = possibly_changed & users_who_share_room
+            possibly_left = (possibly_changed | possibly_left) - users_who_share_room
+        else:
+            possibly_joined = []
+            possibly_left = []
+
+        defer.returnValue({
+            "changed": list(possibly_joined),
+            "left": list(possibly_left),
+        })
 
     @defer.inlineCallbacks
     def on_federation_query_user_devices(self, user_id):

+ 1 - 1
synapse/handlers/federation.py

@@ -1606,7 +1606,7 @@ class FederationHandler(BaseHandler):
 
             context.rejected = RejectedReason.AUTH_ERROR
 
-        if event.type == EventTypes.GuestAccess:
+        if event.type == EventTypes.GuestAccess and not context.rejected:
             yield self.maybe_kick_guest_users(event)
 
         defer.returnValue(context)

+ 105 - 21
synapse/handlers/sync.py

@@ -119,6 +119,16 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
         return bool(self.join or self.invite or self.leave)
 
 
+class DeviceLists(collections.namedtuple("DeviceLists", [
+    "changed",   # list of user_ids whose devices may have changed
+    "left",      # list of user_ids whose devices we no longer track
+])):
+    __slots__ = []
+
+    def __nonzero__(self):
+        return bool(self.changed or self.left)
+
+
 class SyncResult(collections.namedtuple("SyncResult", [
     "next_batch",  # Token for the next sync
     "presence",  # List of presence events for the user.
@@ -296,6 +306,11 @@ class SyncHandler(object):
             timeline_limit = sync_config.filter_collection.timeline_limit()
             block_all_timeline = sync_config.filter_collection.blocks_all_room_timeline()
 
+            # Pull out the current state, as we always want to include those events
+            # in the timeline if they're there.
+            current_state_ids = yield self.state.get_current_state_ids(room_id)
+            current_state_ids = frozenset(current_state_ids.itervalues())
+
             if recents is None or newly_joined_room or timeline_limit < len(recents):
                 limited = True
             else:
@@ -307,6 +322,7 @@ class SyncHandler(object):
                     self.store,
                     sync_config.user.to_string(),
                     recents,
+                    always_include_ids=current_state_ids,
                 )
             else:
                 recents = []
@@ -342,6 +358,7 @@ class SyncHandler(object):
                     self.store,
                     sync_config.user.to_string(),
                     loaded_recents,
+                    always_include_ids=current_state_ids,
                 )
                 loaded_recents.extend(recents)
                 recents = loaded_recents
@@ -548,7 +565,8 @@ class SyncHandler(object):
         res = yield self._generate_sync_entry_for_rooms(
             sync_result_builder, account_data_by_room
         )
-        newly_joined_rooms, newly_joined_users = res
+        newly_joined_rooms, newly_joined_users, _, _ = res
+        _, _, newly_left_rooms, newly_left_users = res
 
         block_all_presence_data = (
             since_token is None and
@@ -562,7 +580,11 @@ class SyncHandler(object):
         yield self._generate_sync_entry_for_to_device(sync_result_builder)
 
         device_lists = yield self._generate_sync_entry_for_device_list(
-            sync_result_builder
+            sync_result_builder,
+            newly_joined_rooms=newly_joined_rooms,
+            newly_joined_users=newly_joined_users,
+            newly_left_rooms=newly_left_rooms,
+            newly_left_users=newly_left_users,
         )
 
         device_id = sync_config.device_id
@@ -635,25 +657,50 @@ class SyncHandler(object):
 
     @measure_func("_generate_sync_entry_for_device_list")
     @defer.inlineCallbacks
-    def _generate_sync_entry_for_device_list(self, sync_result_builder):
+    def _generate_sync_entry_for_device_list(self, sync_result_builder,
+                                             newly_joined_rooms, newly_joined_users,
+                                             newly_left_rooms, newly_left_users):
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
 
         if since_token and since_token.device_list_key:
-            room_ids = yield self.store.get_rooms_for_user(user_id)
-
-            user_ids_changed = set()
             changed = yield self.store.get_user_whose_devices_changed(
                 since_token.device_list_key
             )
-            for other_user_id in changed:
-                other_room_ids = yield self.store.get_rooms_for_user(other_user_id)
-                if room_ids.intersection(other_room_ids):
-                    user_ids_changed.add(other_user_id)
 
-            defer.returnValue(user_ids_changed)
+            # TODO: Be more clever than this, i.e. remove users who we already
+            # share a room with?
+            for room_id in newly_joined_rooms:
+                joined_users = yield self.state.get_current_user_in_room(room_id)
+                newly_joined_users.update(joined_users)
+
+            for room_id in newly_left_rooms:
+                left_users = yield self.state.get_current_user_in_room(room_id)
+                newly_left_users.update(left_users)
+
+            # TODO: Check that these users are actually new, i.e. either they
+            # weren't in the previous sync *or* they left and rejoined.
+            changed.update(newly_joined_users)
+
+            if not changed and not newly_left_users:
+                defer.returnValue(DeviceLists(
+                    changed=[],
+                    left=newly_left_users,
+                ))
+
+            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+                user_id
+            )
+
+            defer.returnValue(DeviceLists(
+                changed=users_who_share_room & changed,
+                left=set(newly_left_users) - users_who_share_room,
+            ))
         else:
-            defer.returnValue([])
+            defer.returnValue(DeviceLists(
+                changed=[],
+                left=[],
+            ))
 
     @defer.inlineCallbacks
     def _generate_sync_entry_for_to_device(self, sync_result_builder):
@@ -817,8 +864,8 @@ class SyncHandler(object):
             account_data_by_room(dict): Dictionary of per room account data
 
         Returns:
-            Deferred(tuple): Returns a 2-tuple of
-            `(newly_joined_rooms, newly_joined_users)`
+            Deferred(tuple): Returns a 4-tuple of
+            `(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)`
         """
         user_id = sync_result_builder.sync_config.user.to_string()
         block_all_room_ephemeral = (
@@ -849,7 +896,7 @@ class SyncHandler(object):
                     )
                     if not tags_by_room:
                         logger.debug("no-oping sync")
-                        defer.returnValue(([], []))
+                        defer.returnValue(([], [], [], []))
 
         ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
             "m.ignored_user_list", user_id=user_id,
@@ -862,7 +909,7 @@ class SyncHandler(object):
 
         if since_token:
             res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
-            room_entries, invited, newly_joined_rooms = res
+            room_entries, invited, newly_joined_rooms, newly_left_rooms = res
 
             tags_by_room = yield self.store.get_updated_tags(
                 user_id, since_token.account_data_key,
@@ -870,6 +917,7 @@ class SyncHandler(object):
         else:
             res = yield self._get_all_rooms(sync_result_builder, ignored_users)
             room_entries, invited, newly_joined_rooms = res
+            newly_left_rooms = []
 
             tags_by_room = yield self.store.get_tags_for_user(user_id)
 
@@ -890,17 +938,30 @@ class SyncHandler(object):
 
         # Now we want to get any newly joined users
         newly_joined_users = set()
+        newly_left_users = set()
         if since_token:
             for joined_sync in sync_result_builder.joined:
                 it = itertools.chain(
-                    joined_sync.timeline.events, joined_sync.state.values()
+                    joined_sync.timeline.events, joined_sync.state.itervalues()
                 )
                 for event in it:
                     if event.type == EventTypes.Member:
                         if event.membership == Membership.JOIN:
                             newly_joined_users.add(event.state_key)
-
-        defer.returnValue((newly_joined_rooms, newly_joined_users))
+                        else:
+                            prev_content = event.unsigned.get("prev_content", {})
+                            prev_membership = prev_content.get("membership", None)
+                            if prev_membership == Membership.JOIN:
+                                newly_left_users.add(event.state_key)
+
+        newly_left_users -= newly_joined_users
+
+        defer.returnValue((
+            newly_joined_rooms,
+            newly_joined_users,
+            newly_left_rooms,
+            newly_left_users,
+        ))
 
     @defer.inlineCallbacks
     def _have_rooms_changed(self, sync_result_builder):
@@ -970,15 +1031,17 @@ class SyncHandler(object):
             mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
 
         newly_joined_rooms = []
+        newly_left_rooms = []
         room_entries = []
         invited = []
-        for room_id, events in mem_change_events_by_room_id.items():
+        for room_id, events in mem_change_events_by_room_id.iteritems():
             non_joins = [e for e in events if e.membership != Membership.JOIN]
             has_join = len(non_joins) != len(events)
 
             # We want to figure out if we joined the room at some point since
             # the last sync (even if we have since left). This is to make sure
             # we do send down the room, and with full state, where necessary
+            old_state_ids = None
             if room_id in joined_room_ids or has_join:
                 old_state_ids = yield self.get_state_at(room_id, since_token)
                 old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
@@ -996,6 +1059,26 @@ class SyncHandler(object):
             if not non_joins:
                 continue
 
+            # Check if we have left the room. This can either be because we were
+            # joined before *or* that we since joined and then left.
+            if events[-1].membership != Membership.JOIN:
+                if has_join:
+                    newly_left_rooms.append(room_id)
+                else:
+                    if not old_state_ids:
+                        old_state_ids = yield self.get_state_at(room_id, since_token)
+                        old_mem_ev_id = old_state_ids.get(
+                            (EventTypes.Member, user_id),
+                            None,
+                        )
+                        old_mem_ev = None
+                        if old_mem_ev_id:
+                            old_mem_ev = yield self.store.get_event(
+                                old_mem_ev_id, allow_none=True
+                            )
+                    if old_mem_ev and old_mem_ev.membership == Membership.JOIN:
+                        newly_left_rooms.append(room_id)
+
             # Only bother if we're still currently invited
             should_invite = non_joins[-1].membership == Membership.INVITE
             if should_invite:
@@ -1073,7 +1156,7 @@ class SyncHandler(object):
                     upto_token=since_token,
                 ))
 
-        defer.returnValue((room_entries, invited, newly_joined_rooms))
+        defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms))
 
     @defer.inlineCallbacks
     def _get_all_rooms(self, sync_result_builder, ignored_users):
@@ -1322,6 +1405,7 @@ class SyncResultBuilder(object):
         self.archived = []
         self.device = []
         self.groups = None
+        self.to_device = []
 
 
 class RoomSyncResultBuilder(object):

+ 54 - 2
synapse/push/bulk_push_rule_evaluator.py

@@ -20,6 +20,8 @@ from twisted.internet import defer
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
 from synapse.api.constants import EventTypes, Membership
+from synapse.metrics import get_metrics_for
+from synapse.util.caches import metrics as cache_metrics
 from synapse.util.caches.descriptors import cached
 from synapse.util.async import Linearizer
 
@@ -31,6 +33,23 @@ logger = logging.getLogger(__name__)
 
 rules_by_room = {}
 
+push_metrics = get_metrics_for(__name__)
+
+push_rules_invalidation_counter = push_metrics.register_counter(
+    "push_rules_invalidation_counter"
+)
+push_rules_state_size_counter = push_metrics.register_counter(
+    "push_rules_state_size_counter"
+)
+
+# Measures whether we use the fast path of using state deltas, or if we have to
+# recalculate from scratch
+push_rules_delta_state_cache_metric = cache_metrics.register_cache(
+    "cache",
+    size_callback=lambda: 0,  # Meaningless size, as this isn't a cache that stores values
+    cache_name="push_rules_delta_state_cache_metric",
+)
+
 
 class BulkPushRuleEvaluator(object):
     """Calculates the outcome of push rules for an event for all users in the
@@ -41,6 +60,12 @@ class BulkPushRuleEvaluator(object):
         self.hs = hs
         self.store = hs.get_datastore()
 
+        self.room_push_rule_cache_metrics = cache_metrics.register_cache(
+            "cache",
+            size_callback=lambda: 0,  # There's not good value for this
+            cache_name="room_push_rule_cache",
+        )
+
     @defer.inlineCallbacks
     def _get_rules_for_event(self, event, context):
         """This gets the rules for all users in the room at the time of the event,
@@ -78,7 +103,10 @@ class BulkPushRuleEvaluator(object):
         # It's important that RulesForRoom gets added to self._get_rules_for_room.cache
         # before any lookup methods get called on it as otherwise there may be
         # a race if invalidate_all gets called (which assumes its in the cache)
-        return RulesForRoom(self.hs, room_id, self._get_rules_for_room.cache)
+        return RulesForRoom(
+            self.hs, room_id, self._get_rules_for_room.cache,
+            self.room_push_rule_cache_metrics,
+        )
 
     @defer.inlineCallbacks
     def action_for_event_by_user(self, event, context):
@@ -161,17 +189,19 @@ class RulesForRoom(object):
     the entire cache for the room.
     """
 
-    def __init__(self, hs, room_id, rules_for_room_cache):
+    def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics):
         """
         Args:
             hs (HomeServer)
             room_id (str)
             rules_for_room_cache(Cache): The cache object that caches these
                 RoomsForUser objects.
+            room_push_rule_cache_metrics (CacheMetric)
         """
         self.room_id = room_id
         self.is_mine_id = hs.is_mine_id
         self.store = hs.get_datastore()
+        self.room_push_rule_cache_metrics = room_push_rule_cache_metrics
 
         self.linearizer = Linearizer(name="rules_for_room")
 
@@ -213,11 +243,19 @@ class RulesForRoom(object):
         """
         state_group = context.state_group
 
+        if state_group and self.state_group == state_group:
+            logger.debug("Using cached rules for %r", self.room_id)
+            self.room_push_rule_cache_metrics.inc_hits()
+            defer.returnValue(self.rules_by_user)
+
         with (yield self.linearizer.queue(())):
             if state_group and self.state_group == state_group:
                 logger.debug("Using cached rules for %r", self.room_id)
+                self.room_push_rule_cache_metrics.inc_hits()
                 defer.returnValue(self.rules_by_user)
 
+            self.room_push_rule_cache_metrics.inc_misses()
+
             ret_rules_by_user = {}
             missing_member_event_ids = {}
             if state_group and self.state_group == context.prev_group:
@@ -225,8 +263,13 @@ class RulesForRoom(object):
                 # results.
                 ret_rules_by_user = self.rules_by_user
                 current_state_ids = context.delta_ids
+
+                push_rules_delta_state_cache_metric.inc_hits()
             else:
                 current_state_ids = context.current_state_ids
+                push_rules_delta_state_cache_metric.inc_misses()
+
+            push_rules_state_size_counter.inc_by(len(current_state_ids))
 
             logger.debug(
                 "Looking for member changes in %r %r", state_group, current_state_ids
@@ -273,6 +316,14 @@ class RulesForRoom(object):
                 yield self._update_rules_with_member_event_ids(
                     ret_rules_by_user, missing_member_event_ids, state_group, event
                 )
+            else:
+                # The push rules didn't change but lets update the cache anyway
+                self.update_cache(
+                    self.sequence,
+                    members={},  # There were no membership changes
+                    rules_by_user=ret_rules_by_user,
+                    state_group=state_group
+                )
 
         if logger.isEnabledFor(logging.DEBUG):
             logger.debug(
@@ -371,6 +422,7 @@ class RulesForRoom(object):
         self.state_group = object()
         self.member_map = {}
         self.rules_by_user = {}
+        push_rules_invalidation_counter.inc()
 
     def update_cache(self, sequence, members, rules_by_user, state_group):
         if sequence == self.sequence:

+ 20 - 0
synapse/push/httppusher.py

@@ -244,6 +244,26 @@ class HttpPusher(object):
 
     @defer.inlineCallbacks
     def _build_notification_dict(self, event, tweaks, badge):
+        if self.data.get('format') == 'event_id_only':
+            d = {
+                'notification': {
+                    'event_id': event.event_id,
+                    'room_id': event.room_id,
+                    'counts': {
+                        'unread': badge,
+                    },
+                    'devices': [
+                        {
+                            'app_id': self.app_id,
+                            'pushkey': self.pushkey,
+                            'pushkey_ts': long(self.pushkey_ts / 1000),
+                            'data': self.data_minus_url,
+                        }
+                    ]
+                }
+            }
+            defer.returnValue(d)
+
         ctx = yield push_tools.get_context_for_event(
             self.store, self.state_handler, event, self.user_id
         )

+ 2 - 1
synapse/python_dependencies.py

@@ -31,7 +31,7 @@ REQUIREMENTS = {
     "pyyaml": ["yaml"],
     "pyasn1": ["pyasn1"],
     "daemonize": ["daemonize"],
-    "py-bcrypt": ["bcrypt"],
+    "bcrypt": ["bcrypt"],
     "pillow": ["PIL"],
     "pydenticon": ["pydenticon"],
     "ujson": ["ujson"],
@@ -40,6 +40,7 @@ REQUIREMENTS = {
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
     "phonenumbers>=8.2.0": ["phonenumbers"],
+    "affinity": ["affinity"],
 }
 CONDITIONAL_REQUIREMENTS = {
     "web_client": {

+ 8 - 8
synapse/rest/client/v1/admin.py

@@ -168,7 +168,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
 
     DEFAULT_MESSAGE = (
         "Sharing illegal content on this server is not permitted and rooms in"
-        " violatation will be blocked."
+        " violation will be blocked."
     )
 
     def __init__(self, hs):
@@ -296,7 +296,7 @@ class QuarantineMediaInRoom(ClientV1RestServlet):
 
 class ResetPasswordRestServlet(ClientV1RestServlet):
     """Post request to allow an administrator reset password for a user.
-    This need a user have a administrator access in Synapse.
+    This needs user to have administrator access in Synapse.
         Example:
             http://localhost:8008/_matrix/client/api/v1/admin/reset_password/
             @user:to_reset_password?access_token=admin_access_token
@@ -319,7 +319,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
     @defer.inlineCallbacks
     def on_POST(self, request, target_user_id):
         """Post request to allow an administrator reset password for a user.
-        This need a user have a administrator access in Synapse.
+        This needs user to have administrator access in Synapse.
         """
         UserID.from_string(target_user_id)
         requester = yield self.auth.get_user_by_req(request)
@@ -343,7 +343,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
 
 class GetUsersPaginatedRestServlet(ClientV1RestServlet):
     """Get request to get specific number of users from Synapse.
-    This need a user have a administrator access in Synapse.
+    This needs user to have administrator access in Synapse.
         Example:
             http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
             @admin:user?access_token=admin_access_token&start=0&limit=10
@@ -362,7 +362,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
     @defer.inlineCallbacks
     def on_GET(self, request, target_user_id):
         """Get request to get specific number of users from Synapse.
-        This need a user have a administrator access in Synapse.
+        This needs user to have administrator access in Synapse.
         """
         target_user = UserID.from_string(target_user_id)
         requester = yield self.auth.get_user_by_req(request)
@@ -395,7 +395,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
     @defer.inlineCallbacks
     def on_POST(self, request, target_user_id):
         """Post request to get specific number of users from Synapse..
-        This need a user have a administrator access in Synapse.
+        This needs user to have administrator access in Synapse.
         Example:
             http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
             @admin:user?access_token=admin_access_token
@@ -433,7 +433,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
 class SearchUsersRestServlet(ClientV1RestServlet):
     """Get request to search user table for specific users according to
     search term.
-    This need a user have a administrator access in Synapse.
+    This needs user to have administrator access in Synapse.
         Example:
             http://localhost:8008/_matrix/client/api/v1/admin/search_users/
             @admin:user?access_token=admin_access_token&term=alice
@@ -453,7 +453,7 @@ class SearchUsersRestServlet(ClientV1RestServlet):
     def on_GET(self, request, target_user_id):
         """Get request to search user table for specific users according to
         search term.
-        This need a user have a administrator access in Synapse.
+        This needs user to have a administrator access in Synapse.
         """
         target_user = UserID.from_string(target_user_id)
         requester = yield self.auth.get_user_by_req(request)

+ 2 - 4
synapse/rest/client/v2_alpha/keys.py

@@ -188,13 +188,11 @@ class KeyChangesServlet(RestServlet):
 
         user_id = requester.user.to_string()
 
-        changed = yield self.device_handler.get_user_ids_changed(
+        results = yield self.device_handler.get_user_ids_changed(
             user_id, from_token,
         )
 
-        defer.returnValue((200, {
-            "changed": list(changed),
-        }))
+        defer.returnValue((200, results))
 
 
 class OneTimeKeyServlet(RestServlet):

+ 3 - 2
synapse/rest/client/v2_alpha/sync.py

@@ -110,7 +110,7 @@ class SyncRestServlet(RestServlet):
         filter_id = parse_string(request, "filter", default=None)
         full_state = parse_boolean(request, "full_state", default=False)
 
-        logger.info(
+        logger.debug(
             "/sync: user=%r, timeout=%r, since=%r,"
             " set_presence=%r, filter_id=%r, device_id=%r" % (
                 user, timeout, since, set_presence, filter_id, device_id
@@ -189,7 +189,8 @@ class SyncRestServlet(RestServlet):
             "account_data": {"events": sync_result.account_data},
             "to_device": {"events": sync_result.to_device},
             "device_lists": {
-                "changed": list(sync_result.device_lists),
+                "changed": list(sync_result.device_lists.changed),
+                "left": list(sync_result.device_lists.left),
             },
             "presence": SyncRestServlet.encode_presence(
                 sync_result.presence, time_now

+ 11 - 3
synapse/visibility.py

@@ -43,7 +43,8 @@ MEMBERSHIP_PRIORITY = (
 
 
 @defer.inlineCallbacks
-def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
+def filter_events_for_clients(store, user_tuples, events, event_id_to_state,
+                              always_include_ids=frozenset()):
     """ Returns dict of user_id -> list of events that user is allowed to
     see.
 
@@ -54,6 +55,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
             * the user has not been a member of the room since the
             given events
         events ([synapse.events.EventBase]): list of events to filter
+        always_include_ids (set(event_id)): set of event ids to specifically
+            include (unless sender is ignored)
     """
     forgotten = yield preserve_context_over_deferred(defer.gatherResults([
         defer.maybeDeferred(
@@ -91,6 +94,9 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
         if not event.is_state() and event.sender in ignore_list:
             return False
 
+        if event.event_id in always_include_ids:
+            return True
+
         state = event_id_to_state[event.event_id]
 
         # get the room_visibility at the time of the event.
@@ -189,7 +195,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
 
 
 @defer.inlineCallbacks
-def filter_events_for_client(store, user_id, events, is_peeking=False):
+def filter_events_for_client(store, user_id, events, is_peeking=False,
+                             always_include_ids=frozenset()):
     """
     Check which events a user is allowed to see
 
@@ -213,6 +220,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False):
         types=types
     )
     res = yield filter_events_for_clients(
-        store, [(user_id, is_peeking)], events, event_id_to_state
+        store, [(user_id, is_peeking)], events, event_id_to_state,
+        always_include_ids=always_include_ids,
     )
     defer.returnValue(res.get(user_id, []))

+ 74 - 0
tests/crypto/test_keyring.py

@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.crypto import keyring
+from synapse.util.logcontext import LoggingContext
+from tests import utils, unittest
+from twisted.internet import defer
+
+
+class KeyringTestCase(unittest.TestCase):
+    @defer.inlineCallbacks
+    def setUp(self):
+        self.hs = yield utils.setup_test_homeserver(handlers=None)
+
+    @defer.inlineCallbacks
+    def test_wait_for_previous_lookups(self):
+        sentinel_context = LoggingContext.current_context()
+
+        kr = keyring.Keyring(self.hs)
+
+        def check_context(_, expected):
+            self.assertEquals(
+                LoggingContext.current_context().test_key, expected
+            )
+
+        lookup_1_deferred = defer.Deferred()
+        lookup_2_deferred = defer.Deferred()
+
+        with LoggingContext("one") as context_one:
+            context_one.test_key = "one"
+
+            wait_1_deferred = kr.wait_for_previous_lookups(
+                ["server1"],
+                {"server1": lookup_1_deferred},
+            )
+
+            # there were no previous lookups, so the deferred should be ready
+            self.assertTrue(wait_1_deferred.called)
+            # ... so we should have preserved the LoggingContext.
+            self.assertIs(LoggingContext.current_context(), context_one)
+            wait_1_deferred.addBoth(check_context, "one")
+
+        with LoggingContext("two") as context_two:
+            context_two.test_key = "two"
+
+            # set off another wait. It should block because the first lookup
+            # hasn't yet completed.
+            wait_2_deferred = kr.wait_for_previous_lookups(
+                ["server1"],
+                {"server1": lookup_2_deferred},
+            )
+            self.assertFalse(wait_2_deferred.called)
+            # ... so we should have reset the LoggingContext.
+            self.assertIs(LoggingContext.current_context(), sentinel_context)
+            wait_2_deferred.addBoth(check_context, "two")
+
+            # let the first lookup complete (in the sentinel context)
+            lookup_1_deferred.callback(None)
+
+            # now the second wait should complete and restore our
+            # loggingcontext.
+            yield wait_2_deferred

+ 29 - 5
tox.ini

@@ -14,14 +14,38 @@ deps =
 
 setenv =
     PYTHONDONTWRITEBYTECODE = no_byte_code
-    # As of twisted 16.4, trial tries to import the tests as a package, which
-    # means it needs to be on the pythonpath.
-    PYTHONPATH = {toxinidir}
+
 commands =
-    /bin/sh -c "find {toxinidir} -name '*.pyc' -delete ; coverage run {env:COVERAGE_OPTS:} --source={toxinidir}/synapse \
-        {envbindir}/trial {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}"
+    /usr/bin/find "{toxinidir}" -name '*.pyc' -delete
+    coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \
+        "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
     {env:DUMP_COVERAGE_COMMAND:coverage report -m}
 
+[testenv:py27]
+
+# As of twisted 16.4, trial tries to import the tests as a package (previously
+# it loaded the files explicitly), which means they need to be on the
+# pythonpath. Our sdist doesn't include the 'tests' package, so normally it
+# doesn't work within the tox virtualenv.
+#
+# As a workaround, we tell tox to do install with 'pip -e', which just
+# creates a symlink to the project directory instead of unpacking the sdist.
+#
+# (An alternative to this would be to set PYTHONPATH to include the project
+# directory. Note two problems with this:
+#
+#   - if you set it via `setenv`, then it is also set during the 'install'
+#     phase, which inhibits unpacking the sdist, so the virtualenv isn't
+#     useful for anything else without setting PYTHONPATH similarly.
+#
+#   - `synapse` is also loaded from PYTHONPATH so even if you only set
+#     PYTHONPATH for the test phase, we're still running the tests against
+#     the working copy rather than the contents of the sdist. So frankly
+#     you might as well use -e in the first place.
+#
+# )
+usedevelop=true
+
 [testenv:packaging]
 deps =
     check-manifest