Browse Source

Merge branch 'develop' of github.com:matrix-org/synapse into release-v0.17.1

Erik Johnston 7 years ago
parent
commit
90d5983d7a

+ 3 - 3
synapse/appservice/scheduler.py

@@ -150,12 +150,12 @@ class _TransactionController(object):
             if service_is_up:
                 sent = yield txn.send(self.as_api)
                 if sent:
-                    txn.complete(self.store)
+                    yield txn.complete(self.store)
                 else:
-                    self._start_recoverer(service)
+                    preserve_fn(self._start_recoverer)(service)
         except Exception as e:
             logger.exception(e)
-            self._start_recoverer(service)
+            preserve_fn(self._start_recoverer)(service)
 
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):

+ 18 - 18
synapse/crypto/keyring.py

@@ -308,15 +308,15 @@ class Keyring(object):
 
     @defer.inlineCallbacks
     def get_keys_from_store(self, server_name_and_key_ids):
-        res = yield defer.gatherResults(
+        res = yield preserve_context_over_deferred(defer.gatherResults(
             [
-                self.store.get_server_verify_keys(
+                preserve_fn(self.store.get_server_verify_keys)(
                     server_name, key_ids
                 ).addCallback(lambda ks, server: (server, ks), server_name)
                 for server_name, key_ids in server_name_and_key_ids
             ],
             consumeErrors=True,
-        ).addErrback(unwrapFirstError)
+        )).addErrback(unwrapFirstError)
 
         defer.returnValue(dict(res))
 
@@ -337,13 +337,13 @@ class Keyring(object):
                 )
                 defer.returnValue({})
 
-        results = yield defer.gatherResults(
+        results = yield preserve_context_over_deferred(defer.gatherResults(
             [
-                get_key(p_name, p_keys)
+                preserve_fn(get_key)(p_name, p_keys)
                 for p_name, p_keys in self.perspective_servers.items()
             ],
             consumeErrors=True,
-        ).addErrback(unwrapFirstError)
+        )).addErrback(unwrapFirstError)
 
         union_of_keys = {}
         for result in results:
@@ -383,13 +383,13 @@ class Keyring(object):
 
             defer.returnValue(keys)
 
-        results = yield defer.gatherResults(
+        results = yield preserve_context_over_deferred(defer.gatherResults(
             [
-                get_key(server_name, key_ids)
+                preserve_fn(get_key)(server_name, key_ids)
                 for server_name, key_ids in server_name_and_key_ids
             ],
             consumeErrors=True,
-        ).addErrback(unwrapFirstError)
+        )).addErrback(unwrapFirstError)
 
         merged = {}
         for result in results:
@@ -466,9 +466,9 @@ class Keyring(object):
             for server_name, response_keys in processed_response.items():
                 keys.setdefault(server_name, {}).update(response_keys)
 
-        yield defer.gatherResults(
+        yield preserve_context_over_deferred(defer.gatherResults(
             [
-                self.store_keys(
+                preserve_fn(self.store_keys)(
                     server_name=server_name,
                     from_server=perspective_name,
                     verify_keys=response_keys,
@@ -476,7 +476,7 @@ class Keyring(object):
                 for server_name, response_keys in keys.items()
             ],
             consumeErrors=True
-        ).addErrback(unwrapFirstError)
+        )).addErrback(unwrapFirstError)
 
         defer.returnValue(keys)
 
@@ -524,7 +524,7 @@ class Keyring(object):
 
             keys.update(response_keys)
 
-        yield defer.gatherResults(
+        yield preserve_context_over_deferred(defer.gatherResults(
             [
                 preserve_fn(self.store_keys)(
                     server_name=key_server_name,
@@ -534,7 +534,7 @@ class Keyring(object):
                 for key_server_name, verify_keys in keys.items()
             ],
             consumeErrors=True
-        ).addErrback(unwrapFirstError)
+        )).addErrback(unwrapFirstError)
 
         defer.returnValue(keys)
 
@@ -600,7 +600,7 @@ class Keyring(object):
         response_keys.update(verify_keys)
         response_keys.update(old_verify_keys)
 
-        yield defer.gatherResults(
+        yield preserve_context_over_deferred(defer.gatherResults(
             [
                 preserve_fn(self.store.store_server_keys_json)(
                     server_name=server_name,
@@ -613,7 +613,7 @@ class Keyring(object):
                 for key_id in updated_key_ids
             ],
             consumeErrors=True,
-        ).addErrback(unwrapFirstError)
+        )).addErrback(unwrapFirstError)
 
         results[server_name] = response_keys
 
@@ -702,7 +702,7 @@ class Keyring(object):
             A deferred that completes when the keys are stored.
         """
         # TODO(markjh): Store whether the keys have expired.
-        yield defer.gatherResults(
+        yield preserve_context_over_deferred(defer.gatherResults(
             [
                 preserve_fn(self.store.store_server_verify_key)(
                     server_name, server_name, key.time_added, key
@@ -710,4 +710,4 @@ class Keyring(object):
                 for key_id, key in verify_keys.items()
             ],
             consumeErrors=True,
-        ).addErrback(unwrapFirstError)
+        )).addErrback(unwrapFirstError)

+ 4 - 3
synapse/federation/federation_base.py

@@ -23,6 +23,7 @@ from synapse.crypto.event_signing import check_event_content_hash
 from synapse.api.errors import SynapseError
 
 from synapse.util import unwrapFirstError
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
 
 import logging
 
@@ -102,10 +103,10 @@ class FederationBase(object):
                 warn, pdu
             )
 
-        valid_pdus = yield defer.gatherResults(
+        valid_pdus = yield preserve_context_over_deferred(defer.gatherResults(
             deferreds,
             consumeErrors=True
-        ).addErrback(unwrapFirstError)
+        )).addErrback(unwrapFirstError)
 
         if include_none:
             defer.returnValue(valid_pdus)
@@ -129,7 +130,7 @@ class FederationBase(object):
             for pdu in pdus
         ]
 
-        deferreds = self.keyring.verify_json_objects_for_server([
+        deferreds = preserve_fn(self.keyring.verify_json_objects_for_server)([
             (p.origin, p.get_pdu_json())
             for p in redacted_pdus
         ])

+ 11 - 6
synapse/federation/federation_client.py

@@ -27,6 +27,7 @@ from synapse.util import unwrapFirstError
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.logutils import log_function
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
 from synapse.events import FrozenEvent
 import synapse.metrics
 
@@ -225,10 +226,10 @@ class FederationClient(FederationBase):
         ]
 
         # FIXME: We should handle signature failures more gracefully.
-        pdus[:] = yield defer.gatherResults(
+        pdus[:] = yield preserve_context_over_deferred(defer.gatherResults(
             self._check_sigs_and_hashes(pdus),
             consumeErrors=True,
-        ).addErrback(unwrapFirstError)
+        )).addErrback(unwrapFirstError)
 
         defer.returnValue(pdus)
 
@@ -457,14 +458,16 @@ class FederationClient(FederationBase):
             batch = set(missing_events[i:i + batch_size])
 
             deferreds = [
-                self.get_pdu(
+                preserve_fn(self.get_pdu)(
                     destinations=random_server_list(),
                     event_id=e_id,
                 )
                 for e_id in batch
             ]
 
-            res = yield defer.DeferredList(deferreds, consumeErrors=True)
+            res = yield preserve_context_over_deferred(
+                defer.DeferredList(deferreds, consumeErrors=True)
+            )
             for success, result in res:
                 if success:
                     signed_events.append(result)
@@ -853,14 +856,16 @@ class FederationClient(FederationBase):
                 return srvs
 
             deferreds = [
-                self.get_pdu(
+                preserve_fn(self.get_pdu)(
                     destinations=random_server_list(),
                     event_id=e_id,
                 )
                 for e_id, depth in ordered_missing[:limit - len(signed_events)]
             ]
 
-            res = yield defer.DeferredList(deferreds, consumeErrors=True)
+            res = yield preserve_context_over_deferred(
+                defer.DeferredList(deferreds, consumeErrors=True)
+            )
             for (result, val), (e_id, _) in zip(res, ordered_missing):
                 if result and val:
                     signed_events.append(val)

+ 4 - 4
synapse/handlers/appservice.py

@@ -17,7 +17,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
 from synapse.util.metrics import Measure
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
 
 import logging
 
@@ -163,10 +163,10 @@ class ApplicationServicesHandler(object):
     def query_3pe(self, kind, protocol, fields):
         services = yield self._get_services_for_3pn(protocol)
 
-        results = yield defer.DeferredList([
-            self.appservice_api.query_3pe(service, kind, protocol, fields)
+        results = yield preserve_context_over_deferred(defer.DeferredList([
+            preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
             for service in services
-        ], consumeErrors=True)
+        ], consumeErrors=True))
 
         ret = []
         for (success, result) in results:

+ 17 - 15
synapse/handlers/federation.py

@@ -26,7 +26,9 @@ from synapse.api.errors import (
 from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import (
+    PreserveLoggingContext, preserve_fn, preserve_context_over_deferred
+)
 from synapse.util.logutils import log_function
 from synapse.util.async import run_on_reactor
 from synapse.util.frozenutils import unfreeze
@@ -361,9 +363,9 @@ class FederationHandler(BaseHandler):
                     missing_auth - failed_to_fetch
                 )
 
-                results = yield defer.gatherResults(
+                results = yield preserve_context_over_deferred(defer.gatherResults(
                     [
-                        self.replication_layer.get_pdu(
+                        preserve_fn(self.replication_layer.get_pdu)(
                             [dest],
                             event_id,
                             outlier=True,
@@ -372,10 +374,10 @@ class FederationHandler(BaseHandler):
                         for event_id in missing_auth - failed_to_fetch
                     ],
                     consumeErrors=True
-                ).addErrback(unwrapFirstError)
-                auth_events.update({a.event_id: a for a in results})
+                )).addErrback(unwrapFirstError)
+                auth_events.update({a.event_id: a for a in results if a})
                 required_auth.update(
-                    a_id for event in results for a_id, _ in event.auth_events
+                    a_id for event in results for a_id, _ in event.auth_events if event
                 )
                 missing_auth = required_auth - set(auth_events)
 
@@ -552,10 +554,10 @@ class FederationHandler(BaseHandler):
 
         event_ids = list(extremities.keys())
 
-        states = yield defer.gatherResults([
-            self.state_handler.resolve_state_groups(room_id, [e])
+        states = yield preserve_context_over_deferred(defer.gatherResults([
+            preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e])
             for e in event_ids
-        ])
+        ]))
         states = dict(zip(event_ids, [s[1] for s in states]))
 
         for e_id, _ in sorted_extremeties_tuple:
@@ -1166,9 +1168,9 @@ class FederationHandler(BaseHandler):
         a bunch of outliers, but not a chunk of individual events that depend
         on each other for state calculations.
         """
-        contexts = yield defer.gatherResults(
+        contexts = yield preserve_context_over_deferred(defer.gatherResults(
             [
-                self._prep_event(
+                preserve_fn(self._prep_event)(
                     origin,
                     ev_info["event"],
                     state=ev_info.get("state"),
@@ -1176,7 +1178,7 @@ class FederationHandler(BaseHandler):
                 )
                 for ev_info in event_infos
             ]
-        )
+        ))
 
         yield self.store.persist_events(
             [
@@ -1460,9 +1462,9 @@ class FederationHandler(BaseHandler):
             # Do auth conflict res.
             logger.info("Different auth: %s", different_auth)
 
-            different_events = yield defer.gatherResults(
+            different_events = yield preserve_context_over_deferred(defer.gatherResults(
                 [
-                    self.store.get_event(
+                    preserve_fn(self.store.get_event)(
                         d,
                         allow_none=True,
                         allow_rejected=False,
@@ -1471,7 +1473,7 @@ class FederationHandler(BaseHandler):
                     if d in have_events and not have_events[d]
                 ],
                 consumeErrors=True
-            ).addErrback(unwrapFirstError)
+            )).addErrback(unwrapFirstError)
 
             if different_events:
                 local_view = dict(auth_events)

+ 20 - 15
synapse/handlers/message.py

@@ -28,7 +28,8 @@ from synapse.types import (
 from synapse.util import unwrapFirstError
 from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock
 from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.metrics import measure_func
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
@@ -502,15 +503,17 @@ class MessageHandler(BaseHandler):
                         lambda states: states[event.event_id]
                     )
 
-                (messages, token), current_state = yield defer.gatherResults(
-                    [
-                        self.store.get_recent_events_for_room(
-                            event.room_id,
-                            limit=limit,
-                            end_token=room_end_token,
-                        ),
-                        deferred_room_state,
-                    ]
+                (messages, token), current_state = yield preserve_context_over_deferred(
+                    defer.gatherResults(
+                        [
+                            preserve_fn(self.store.get_recent_events_for_room)(
+                                event.room_id,
+                                limit=limit,
+                                end_token=room_end_token,
+                            ),
+                            deferred_room_state,
+                        ]
+                    )
                 ).addErrback(unwrapFirstError)
 
                 messages = yield filter_events_for_client(
@@ -719,9 +722,9 @@ class MessageHandler(BaseHandler):
 
         presence, receipts, (messages, token) = yield defer.gatherResults(
             [
-                get_presence(),
-                get_receipts(),
-                self.store.get_recent_events_for_room(
+                preserve_fn(get_presence)(),
+                preserve_fn(get_receipts)(),
+                preserve_fn(self.store.get_recent_events_for_room)(
                     room_id,
                     limit=limit,
                     end_token=now_token.room_key,
@@ -755,6 +758,7 @@ class MessageHandler(BaseHandler):
 
         defer.returnValue(ret)
 
+    @measure_func("_create_new_client_event")
     @defer.inlineCallbacks
     def _create_new_client_event(self, builder, prev_event_ids=None):
         if prev_event_ids:
@@ -806,6 +810,7 @@ class MessageHandler(BaseHandler):
             (event, context,)
         )
 
+    @measure_func("handle_new_client_event")
     @defer.inlineCallbacks
     def handle_new_client_event(
         self,
@@ -934,7 +939,7 @@ class MessageHandler(BaseHandler):
         @defer.inlineCallbacks
         def _notify():
             yield run_on_reactor()
-            self.notifier.on_new_room_event(
+            yield self.notifier.on_new_room_event(
                 event, event_stream_id, max_stream_id,
                 extra_users=extra_users
             )
@@ -944,6 +949,6 @@ class MessageHandler(BaseHandler):
         # If invite, remove room_state from unsigned before sending.
         event.unsigned.pop("invite_room_state", None)
 
-        federation_handler.handle_new_event(
+        preserve_fn(federation_handler.handle_new_event)(
             event, destinations=destinations,
         )

+ 12 - 2
synapse/handlers/room_member.py

@@ -59,10 +59,13 @@ class RoomMemberHandler(BaseHandler):
         prev_event_ids,
         txn_id=None,
         ratelimit=True,
+        content=None,
     ):
+        if content is None:
+            content = {}
         msg_handler = self.hs.get_handlers().message_handler
 
-        content = {"membership": membership}
+        content["membership"] = membership
         if requester.is_guest:
             content["kind"] = "guest"
 
@@ -140,6 +143,7 @@ class RoomMemberHandler(BaseHandler):
             remote_room_hosts=None,
             third_party_signed=None,
             ratelimit=True,
+            content=None,
     ):
         key = (room_id,)
 
@@ -153,6 +157,7 @@ class RoomMemberHandler(BaseHandler):
                 remote_room_hosts=remote_room_hosts,
                 third_party_signed=third_party_signed,
                 ratelimit=ratelimit,
+                content=content,
             )
 
         defer.returnValue(result)
@@ -168,7 +173,11 @@ class RoomMemberHandler(BaseHandler):
             remote_room_hosts=None,
             third_party_signed=None,
             ratelimit=True,
+            content=None,
     ):
+        if content is None:
+            content = {}
+
         effective_membership_state = action
         if action in ["kick", "unban"]:
             effective_membership_state = "leave"
@@ -218,7 +227,7 @@ class RoomMemberHandler(BaseHandler):
                 if inviter and not self.hs.is_mine(inviter):
                     remote_room_hosts.append(inviter.domain)
 
-                content = {"membership": Membership.JOIN}
+                content["membership"] = Membership.JOIN
 
                 profile = self.hs.get_handlers().profile_handler
                 content["displayname"] = yield profile.get_displayname(target)
@@ -272,6 +281,7 @@ class RoomMemberHandler(BaseHandler):
             txn_id=txn_id,
             ratelimit=ratelimit,
             prev_event_ids=latest_event_ids,
+            content=content,
         )
 
     @defer.inlineCallbacks

+ 8 - 4
synapse/handlers/typing.py

@@ -16,7 +16,9 @@
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError, AuthError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import (
+    PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
+)
 from synapse.util.metrics import Measure
 from synapse.types import UserID
 
@@ -169,13 +171,13 @@ class TypingHandler(object):
         deferreds = []
         for domain in domains:
             if domain == self.server_name:
-                self._push_update_local(
+                preserve_fn(self._push_update_local)(
                     room_id=room_id,
                     user_id=user_id,
                     typing=typing
                 )
             else:
-                deferreds.append(self.federation.send_edu(
+                deferreds.append(preserve_fn(self.federation.send_edu)(
                     destination=domain,
                     edu_type="m.typing",
                     content={
@@ -185,7 +187,9 @@ class TypingHandler(object):
                     },
                 ))
 
-        yield defer.DeferredList(deferreds, consumeErrors=True)
+        yield preserve_context_over_deferred(
+            defer.DeferredList(deferreds, consumeErrors=True)
+        )
 
     @defer.inlineCallbacks
     def _recv_edu(self, origin, content):

+ 6 - 1
synapse/notifier.py

@@ -19,7 +19,7 @@ from synapse.api.errors import AuthError
 
 from synapse.util.logutils import log_function
 from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
 from synapse.util.metrics import Measure
 from synapse.types import StreamToken
 from synapse.visibility import filter_events_for_client
@@ -174,6 +174,7 @@ class Notifier(object):
             lambda: len(self.user_to_user_stream),
         )
 
+    @preserve_fn
     def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
                           extra_users=[]):
         """ Used by handlers to inform the notifier something has happened
@@ -195,6 +196,7 @@ class Notifier(object):
 
             self.notify_replication()
 
+    @preserve_fn
     def _notify_pending_new_room_events(self, max_room_stream_id):
         """Notify for the room events that were queued waiting for a previous
         event to be persisted.
@@ -212,6 +214,7 @@ class Notifier(object):
             else:
                 self._on_new_room_event(event, room_stream_id, extra_users)
 
+    @preserve_fn
     def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
@@ -226,6 +229,7 @@ class Notifier(object):
             rooms=[event.room_id],
         )
 
+    @preserve_fn
     def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
         """ Used to inform listeners that something has happend event wise.
 
@@ -252,6 +256,7 @@ class Notifier(object):
 
                 self.notify_replication()
 
+    @preserve_fn
     def on_new_replication_data(self):
         """Used to inform replication listeners that something has happend
         without waking up any of the normal user event streams"""

+ 5 - 4
synapse/push/push_tools.py

@@ -17,14 +17,15 @@ from twisted.internet import defer
 from synapse.util.presentable_names import (
     calculate_room_name, name_from_member_event
 )
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
 
 
 @defer.inlineCallbacks
 def get_badge_count(store, user_id):
-    invites, joins = yield defer.gatherResults([
-        store.get_invited_rooms_for_user(user_id),
-        store.get_rooms_for_user(user_id),
-    ], consumeErrors=True)
+    invites, joins = yield preserve_context_over_deferred(defer.gatherResults([
+        preserve_fn(store.get_invited_rooms_for_user)(user_id),
+        preserve_fn(store.get_rooms_for_user)(user_id),
+    ], consumeErrors=True))
 
     my_receipts_by_room = yield store.get_receipts_for_user(
         user_id, "m.read",

+ 7 - 5
synapse/push/pusherpool.py

@@ -17,7 +17,7 @@
 from twisted.internet import defer
 
 import pusher
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
 from synapse.util.async import run_on_reactor
 
 import logging
@@ -130,10 +130,12 @@ class PusherPool:
                 if u in self.pushers:
                     for p in self.pushers[u].values():
                         deferreds.append(
-                            p.on_new_notifications(min_stream_id, max_stream_id)
+                            preserve_fn(p.on_new_notifications)(
+                                min_stream_id, max_stream_id
+                            )
                         )
 
-            yield defer.gatherResults(deferreds)
+            yield preserve_context_over_deferred(defer.gatherResults(deferreds))
         except:
             logger.exception("Exception in pusher on_new_notifications")
 
@@ -155,10 +157,10 @@ class PusherPool:
                 if u in self.pushers:
                     for p in self.pushers[u].values():
                         deferreds.append(
-                            p.on_new_receipts(min_stream_id, max_stream_id)
+                            preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
                         )
 
-            yield defer.gatherResults(deferreds)
+            yield preserve_context_over_deferred(defer.gatherResults(deferreds))
         except:
             logger.exception("Exception in pusher on_new_receipts")
 

+ 1 - 0
synapse/rest/client/v1/room.py

@@ -268,6 +268,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
             action="join",
             txn_id=txn_id,
             remote_room_hosts=remote_room_hosts,
+            content=content,
             third_party_signed=content.get("third_party_signed", None),
         )
 

+ 1 - 2
synapse/rest/client/v2_alpha/register.py

@@ -403,10 +403,9 @@ class RegisterRestServlet(RestServlet):
         # register the user's device
         device_id = params.get("device_id")
         initial_display_name = params.get("initial_device_display_name")
-        device_id = self.device_handler.check_device_registered(
+        return self.device_handler.check_device_registered(
             user_id, device_id, initial_display_name
         )
-        return device_id
 
     @defer.inlineCallbacks
     def _do_guest_registration(self):

+ 10 - 6
synapse/storage/events.py

@@ -20,7 +20,9 @@ from synapse.events import FrozenEvent, USE_FROZEN_DICTS
 from synapse.events.utils import prune_event
 
 from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
+from synapse.util.logcontext import (
+    preserve_fn, PreserveLoggingContext, preserve_context_over_deferred
+)
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.api.constants import EventTypes
@@ -202,7 +204,7 @@ class EventsStore(SQLBaseStore):
 
         deferreds = []
         for room_id, evs_ctxs in partitioned.items():
-            d = self._event_persist_queue.add_to_queue(
+            d = preserve_fn(self._event_persist_queue.add_to_queue)(
                 room_id, evs_ctxs,
                 backfilled=backfilled,
                 current_state=None,
@@ -212,7 +214,9 @@ class EventsStore(SQLBaseStore):
         for room_id in partitioned.keys():
             self._maybe_start_persisting(room_id)
 
-        return defer.gatherResults(deferreds, consumeErrors=True)
+        return preserve_context_over_deferred(
+            defer.gatherResults(deferreds, consumeErrors=True)
+        )
 
     @defer.inlineCallbacks
     @log_function
@@ -225,7 +229,7 @@ class EventsStore(SQLBaseStore):
 
         self._maybe_start_persisting(event.room_id)
 
-        yield deferred
+        yield preserve_context_over_deferred(deferred)
 
         max_persisted_id = yield self._stream_id_gen.get_current_token()
         defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
@@ -1088,7 +1092,7 @@ class EventsStore(SQLBaseStore):
         if not allow_rejected:
             rows[:] = [r for r in rows if not r["rejects"]]
 
-        res = yield defer.gatherResults(
+        res = yield preserve_context_over_deferred(defer.gatherResults(
             [
                 preserve_fn(self._get_event_from_row)(
                     row["internal_metadata"], row["json"], row["redacts"],
@@ -1097,7 +1101,7 @@ class EventsStore(SQLBaseStore):
                 for row in rows
             ],
             consumeErrors=True
-        )
+        ))
 
         defer.returnValue({
             e.event.event_id: e

+ 32 - 0
synapse/storage/schema/delta/34/received_txn_purge.py

@@ -0,0 +1,32 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage.engines import PostgresEngine
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    if isinstance(database_engine, PostgresEngine):
+        cur.execute("TRUNCATE received_transactions")
+    else:
+        cur.execute("DELETE FROM received_transactions")
+
+    cur.execute("CREATE INDEX received_transactions_ts ON received_transactions(ts)")
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    pass

+ 3 - 3
synapse/storage/stream.py

@@ -39,7 +39,7 @@ from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached
 from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
 import logging
@@ -234,12 +234,12 @@ class StreamStore(SQLBaseStore):
         results = {}
         room_ids = list(room_ids)
         for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
-            res = yield defer.gatherResults([
+            res = yield preserve_context_over_deferred(defer.gatherResults([
                 preserve_fn(self.get_room_events_stream_for_room)(
                     room_id, from_key, to_key, limit, order=order,
                 )
                 for room_id in rm_ids
-            ])
+            ]))
             results.update(dict(zip(rm_ids, res)))
 
         defer.returnValue(results)

+ 13 - 4
synapse/storage/transactions.py

@@ -62,10 +62,9 @@ class TransactionStore(SQLBaseStore):
         self.last_transaction = {}
 
         reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns)
-        hs.get_clock().looping_call(
-            self._persist_in_mem_txns,
-            1000,
-        )
+        self._clock.looping_call(self._persist_in_mem_txns, 1000)
+
+        self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)
 
     def get_received_txn_response(self, transaction_id, origin):
         """For an incoming transaction from a given origin, check if we have
@@ -127,6 +126,7 @@ class TransactionStore(SQLBaseStore):
                 "origin": origin,
                 "response_code": code,
                 "response_json": buffer(encode_canonical_json(response_dict)),
+                "ts": self._clock.time_msec(),
             },
             or_ignore=True,
             desc="set_received_txn_response",
@@ -383,3 +383,12 @@ class TransactionStore(SQLBaseStore):
                 yield self.runInteraction("_persist_in_mem_txns", f)
         except:
             logger.exception("Failed to persist transactions!")
+
+    def _cleanup_transactions(self):
+        now = self._clock.time_msec()
+        month_ago = now - 30 * 24 * 60 * 60 * 1000
+
+        def _cleanup_transactions_txn(txn):
+            txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))
+
+        return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn)

+ 5 - 4
synapse/util/async.py

@@ -146,10 +146,10 @@ def concurrently_execute(func, args, limit):
         except StopIteration:
             pass
 
-    return defer.gatherResults([
+    return preserve_context_over_deferred(defer.gatherResults([
         preserve_fn(_concurrently_execute_inner)()
         for _ in xrange(limit)
-    ], consumeErrors=True).addErrback(unwrapFirstError)
+    ], consumeErrors=True)).addErrback(unwrapFirstError)
 
 
 class Linearizer(object):
@@ -181,7 +181,8 @@ class Linearizer(object):
         self.key_to_defer[key] = new_defer
 
         if current_defer:
-            yield preserve_context_over_deferred(current_defer)
+            with PreserveLoggingContext():
+                yield current_defer
 
         @contextmanager
         def _ctx_manager():
@@ -264,7 +265,7 @@ class ReadWriteLock(object):
         curr_readers.clear()
         self.key_to_current_writer[key] = new_defer
 
-        yield defer.gatherResults(to_wait_on)
+        yield preserve_context_over_deferred(defer.gatherResults(to_wait_on))
 
         @contextmanager
         def _ctx_manager():

+ 11 - 4
synapse/util/logcontext.py

@@ -297,12 +297,13 @@ def preserve_context_over_fn(fn, *args, **kwargs):
         return res
 
 
-def preserve_context_over_deferred(deferred):
+def preserve_context_over_deferred(deferred, context=None):
     """Given a deferred wrap it such that any callbacks added later to it will
     be invoked with the current context.
     """
-    current_context = LoggingContext.current_context()
-    d = _PreservingContextDeferred(current_context)
+    if context is None:
+        context = LoggingContext.current_context()
+    d = _PreservingContextDeferred(context)
     deferred.chainDeferred(d)
     return d
 
@@ -316,7 +317,13 @@ def preserve_fn(f):
 
     def g(*args, **kwargs):
         with PreserveLoggingContext(current):
-            return f(*args, **kwargs)
+            res = f(*args, **kwargs)
+            if isinstance(res, defer.Deferred):
+                return preserve_context_over_deferred(
+                    res, context=LoggingContext.sentinel
+                )
+            else:
+                return res
     return g
 
 

+ 3 - 3
synapse/visibility.py

@@ -17,7 +17,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import Membership, EventTypes
 
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
 
 import logging
 
@@ -55,12 +55,12 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
             given events
         events ([synapse.events.EventBase]): list of events to filter
     """
-    forgotten = yield defer.gatherResults([
+    forgotten = yield preserve_context_over_deferred(defer.gatherResults([
         preserve_fn(store.who_forgot_in_room)(
             room_id,
         )
         for room_id in frozenset(e.room_id for e in events)
-    ], consumeErrors=True)
+    ], consumeErrors=True))
 
     # Set of membership event_ids that have been forgotten
     event_id_forgotten = frozenset(