transaction_manager.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2019 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from twisted.internet import defer
  17. from synapse.api.errors import HttpResponseException
  18. from synapse.federation.persistence import TransactionActions
  19. from synapse.federation.units import Transaction
  20. from synapse.util.metrics import measure_func
  21. logger = logging.getLogger(__name__)
  22. class TransactionManager(object):
  23. """Helper class which handles building and sending transactions
  24. shared between PerDestinationQueue objects
  25. """
  26. def __init__(self, hs):
  27. self._server_name = hs.hostname
  28. self.clock = hs.get_clock() # nb must be called this for @measure_func
  29. self._store = hs.get_datastore()
  30. self._transaction_actions = TransactionActions(self._store)
  31. self._transport_layer = hs.get_federation_transport_client()
  32. # HACK to get unique tx id
  33. self._next_txn_id = int(self.clock.time_msec())
  34. @measure_func("_send_new_transaction")
  35. @defer.inlineCallbacks
  36. def send_new_transaction(self, destination, pending_pdus, pending_edus):
  37. # Sort based on the order field
  38. pending_pdus.sort(key=lambda t: t[1])
  39. pdus = [x[0] for x in pending_pdus]
  40. edus = pending_edus
  41. success = True
  42. logger.debug("TX [%s] _attempt_new_transaction", destination)
  43. txn_id = str(self._next_txn_id)
  44. logger.debug(
  45. "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
  46. destination,
  47. txn_id,
  48. len(pdus),
  49. len(edus),
  50. )
  51. transaction = Transaction.create_new(
  52. origin_server_ts=int(self.clock.time_msec()),
  53. transaction_id=txn_id,
  54. origin=self._server_name,
  55. destination=destination,
  56. pdus=pdus,
  57. edus=edus,
  58. )
  59. self._next_txn_id += 1
  60. logger.info(
  61. "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
  62. destination,
  63. txn_id,
  64. transaction.transaction_id,
  65. len(pdus),
  66. len(edus),
  67. )
  68. # Actually send the transaction
  69. # FIXME (erikj): This is a bit of a hack to make the Pdu age
  70. # keys work
  71. def json_data_cb():
  72. data = transaction.get_dict()
  73. now = int(self.clock.time_msec())
  74. if "pdus" in data:
  75. for p in data["pdus"]:
  76. if "age_ts" in p:
  77. unsigned = p.setdefault("unsigned", {})
  78. unsigned["age"] = now - int(p["age_ts"])
  79. del p["age_ts"]
  80. return data
  81. try:
  82. response = yield self._transport_layer.send_transaction(
  83. transaction, json_data_cb
  84. )
  85. code = 200
  86. except HttpResponseException as e:
  87. code = e.code
  88. response = e.response
  89. if e.code in (401, 404, 429) or 500 <= e.code:
  90. logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
  91. raise e
  92. logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
  93. if code == 200:
  94. for e_id, r in response.get("pdus", {}).items():
  95. if "error" in r:
  96. logger.warn(
  97. "TX [%s] {%s} Remote returned error for %s: %s",
  98. destination,
  99. txn_id,
  100. e_id,
  101. r,
  102. )
  103. else:
  104. for p in pdus:
  105. logger.warn(
  106. "TX [%s] {%s} Failed to send event %s",
  107. destination,
  108. txn_id,
  109. p.event_id,
  110. )
  111. success = False
  112. return success