transaction_manager.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2019 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from typing import TYPE_CHECKING, List
  17. from prometheus_client import Gauge
  18. from synapse.api.errors import HttpResponseException
  19. from synapse.events import EventBase
  20. from synapse.federation.persistence import TransactionActions
  21. from synapse.federation.units import Edu, Transaction
  22. from synapse.logging.opentracing import (
  23. extract_text_map,
  24. set_tag,
  25. start_active_span_follows_from,
  26. tags,
  27. whitelisted_homeserver,
  28. )
  29. from synapse.util import json_decoder
  30. from synapse.util.metrics import measure_func
  31. if TYPE_CHECKING:
  32. import synapse.server
  33. logger = logging.getLogger(__name__)
  34. last_pdu_ts_metric = Gauge(
  35. "synapse_federation_last_sent_pdu_time",
  36. "The timestamp of the last PDU which was successfully sent to the given domain",
  37. labelnames=("server_name",),
  38. )
  39. class TransactionManager:
  40. """Helper class which handles building and sending transactions
  41. shared between PerDestinationQueue objects
  42. """
  43. def __init__(self, hs: "synapse.server.HomeServer"):
  44. self._server_name = hs.hostname
  45. self.clock = hs.get_clock() # nb must be called this for @measure_func
  46. self._store = hs.get_datastore()
  47. self._transaction_actions = TransactionActions(self._store)
  48. self._transport_layer = hs.get_federation_transport_client()
  49. self._federation_metrics_domains = (
  50. hs.get_config().federation.federation_metrics_domains
  51. )
  52. # HACK to get unique tx id
  53. self._next_txn_id = int(self.clock.time_msec())
  54. @measure_func("_send_new_transaction")
  55. async def send_new_transaction(
  56. self,
  57. destination: str,
  58. pdus: List[EventBase],
  59. edus: List[Edu],
  60. ) -> bool:
  61. """
  62. Args:
  63. destination: The destination to send to (e.g. 'example.org')
  64. pdus: In-order list of PDUs to send
  65. edus: List of EDUs to send
  66. Returns:
  67. True iff the transaction was successful
  68. """
  69. # Make a transaction-sending opentracing span. This span follows on from
  70. # all the edus in that transaction. This needs to be done since there is
  71. # no active span here, so if the edus were not received by the remote the
  72. # span would have no causality and it would be forgotten.
  73. span_contexts = []
  74. keep_destination = whitelisted_homeserver(destination)
  75. for edu in edus:
  76. context = edu.get_context()
  77. if context:
  78. span_contexts.append(extract_text_map(json_decoder.decode(context)))
  79. if keep_destination:
  80. edu.strip_context()
  81. with start_active_span_follows_from("send_transaction", span_contexts):
  82. success = True
  83. logger.debug("TX [%s] _attempt_new_transaction", destination)
  84. txn_id = str(self._next_txn_id)
  85. logger.debug(
  86. "TX [%s] {%s} Attempting new transaction (pdus: %d, edus: %d)",
  87. destination,
  88. txn_id,
  89. len(pdus),
  90. len(edus),
  91. )
  92. transaction = Transaction.create_new(
  93. origin_server_ts=int(self.clock.time_msec()),
  94. transaction_id=txn_id,
  95. origin=self._server_name,
  96. destination=destination,
  97. pdus=pdus,
  98. edus=edus,
  99. )
  100. self._next_txn_id += 1
  101. logger.info(
  102. "TX [%s] {%s} Sending transaction [%s], (PDUs: %d, EDUs: %d)",
  103. destination,
  104. txn_id,
  105. transaction.transaction_id,
  106. len(pdus),
  107. len(edus),
  108. )
  109. # Actually send the transaction
  110. # FIXME (erikj): This is a bit of a hack to make the Pdu age
  111. # keys work
  112. # FIXME (richardv): I also believe it no longer works. We (now?) store
  113. # "age_ts" in "unsigned" rather than at the top level. See
  114. # https://github.com/matrix-org/synapse/issues/8429.
  115. def json_data_cb():
  116. data = transaction.get_dict()
  117. now = int(self.clock.time_msec())
  118. if "pdus" in data:
  119. for p in data["pdus"]:
  120. if "age_ts" in p:
  121. unsigned = p.setdefault("unsigned", {})
  122. unsigned["age"] = now - int(p["age_ts"])
  123. del p["age_ts"]
  124. return data
  125. try:
  126. response = await self._transport_layer.send_transaction(
  127. transaction, json_data_cb
  128. )
  129. code = 200
  130. except HttpResponseException as e:
  131. code = e.code
  132. response = e.response
  133. if e.code in (401, 404, 429) or 500 <= e.code:
  134. logger.info(
  135. "TX [%s] {%s} got %d response", destination, txn_id, code
  136. )
  137. raise e
  138. logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
  139. if code == 200:
  140. for e_id, r in response.get("pdus", {}).items():
  141. if "error" in r:
  142. logger.warning(
  143. "TX [%s] {%s} Remote returned error for %s: %s",
  144. destination,
  145. txn_id,
  146. e_id,
  147. r,
  148. )
  149. else:
  150. for p in pdus:
  151. logger.warning(
  152. "TX [%s] {%s} Failed to send event %s",
  153. destination,
  154. txn_id,
  155. p.event_id,
  156. )
  157. success = False
  158. if success and pdus and destination in self._federation_metrics_domains:
  159. last_pdu = pdus[-1]
  160. last_pdu_ts_metric.labels(server_name=destination).set(
  161. last_pdu.origin_server_ts / 1000
  162. )
  163. set_tag(tags.ERROR, not success)
  164. return success