_base.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2017-2018 New Vector Ltd
  4. # Copyright 2019 The Matrix.org Foundation C.I.C.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. import logging
  18. import random
  19. from abc import ABCMeta
  20. from typing import Any, Optional
  21. from synapse.storage.database import LoggingTransaction # noqa: F401
  22. from synapse.storage.database import make_in_list_sql_clause # noqa: F401
  23. from synapse.storage.database import DatabasePool
  24. from synapse.types import Collection, get_domain_from_id
  25. from synapse.util import json_decoder
  26. logger = logging.getLogger(__name__)
  27. # some of our subclasses have abstract methods, so we use the ABCMeta metaclass.
  28. class SQLBaseStore(metaclass=ABCMeta):
  29. """Base class for data stores that holds helper functions.
  30. Note that multiple instances of this class will exist as there will be one
  31. per data store (and not one per physical database).
  32. """
  33. def __init__(self, database: DatabasePool, db_conn, hs):
  34. self.hs = hs
  35. self._clock = hs.get_clock()
  36. self.database_engine = database.engine
  37. self.db_pool = database
  38. self.rand = random.SystemRandom()
  39. def process_replication_rows(self, stream_name, instance_name, token, rows):
  40. pass
  41. def _invalidate_state_caches(self, room_id, members_changed):
  42. """Invalidates caches that are based on the current state, but does
  43. not stream invalidations down replication.
  44. Args:
  45. room_id (str): Room where state changed
  46. members_changed (iterable[str]): The user_ids of members that have
  47. changed
  48. """
  49. for host in {get_domain_from_id(u) for u in members_changed}:
  50. self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
  51. self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
  52. self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
  53. self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))
  54. def _attempt_to_invalidate_cache(
  55. self, cache_name: str, key: Optional[Collection[Any]]
  56. ):
  57. """Attempts to invalidate the cache of the given name, ignoring if the
  58. cache doesn't exist. Mainly used for invalidating caches on workers,
  59. where they may not have the cache.
  60. Args:
  61. cache_name
  62. key: Entry to invalidate. If None then invalidates the entire
  63. cache.
  64. """
  65. try:
  66. if key is None:
  67. getattr(self, cache_name).invalidate_all()
  68. else:
  69. getattr(self, cache_name).invalidate(tuple(key))
  70. except AttributeError:
  71. # We probably haven't pulled in the cache in this worker,
  72. # which is fine.
  73. pass
  74. def db_to_json(db_content):
  75. """
  76. Take some data from a database row and return a JSON-decoded object.
  77. Args:
  78. db_content (memoryview|buffer|bytes|bytearray|unicode)
  79. """
  80. # psycopg2 on Python 3 returns memoryview objects, which we need to
  81. # cast to bytes to decode
  82. if isinstance(db_content, memoryview):
  83. db_content = db_content.tobytes()
  84. # Decode it to a Unicode string before feeding it to the JSON decoder, since
  85. # Python 3.5 does not support deserializing bytes.
  86. if isinstance(db_content, (bytes, bytearray)):
  87. db_content = db_content.decode("utf8")
  88. try:
  89. return json_decoder.decode(db_content)
  90. except Exception:
  91. logging.warning("Tried to decode '%r' as JSON and failed", db_content)
  92. raise