Browse Source

use farey function

Erik Johnston 6 years ago
parent
commit
1205362f1d
2 changed files with 90 additions and 73 deletions
  1. 72 67
      synapse/storage/chunk_ordered_table.py
  2. 18 6
      tests/storage/test_chunk_linearizer_table.py

+ 72 - 67
synapse/storage/chunk_ordered_table.py

@@ -21,6 +21,7 @@ from gmpy2 import mpq as Fraction
 from fractions import Fraction as FractionPy
 
 from synapse.storage._base import SQLBaseStore
+from synapse.storage.engines import PostgresEngine
 from synapse.util.katriel_bodlaender import OrderedListStore
 
 import synapse.metrics
@@ -91,12 +92,13 @@ class ChunkDBOrderedListStore(OrderedListStore):
             this.
     """
     def __init__(self,
-                 txn, room_id, clock,
+                 txn, room_id, clock, database_engine,
                  rebalance_max_denominator=100,
                  max_denominator=100000):
         self.txn = txn
         self.room_id = room_id
         self.clock = clock
+        self.database_engine = database_engine
 
         self.rebalance_md = rebalance_max_denominator
         self.max_denominator = max_denominator
@@ -390,69 +392,43 @@ class ChunkDBOrderedListStore(OrderedListStore):
         logger.info("Rebalancing room %s, chunk %s", self.room_id, node_id)
 
         old_order = self._get_order(node_id)
-        new_order = FractionPy(
-            int(old_order.numerator),
-            int(old_order.denominator),
-        ).limit_denominator(
-            self.rebalance_md,
-        )
-        new_order = Fraction(new_order.numerator, new_order.denominator)
-        if new_order < old_order:
-            new_order += Fraction(1, new_order.denominator)
-
-        count_nodes = [node_id]
-        next_id = node_id
-        while True:
-            next_id = self.get_next(next_id)
-
-            if not next_id:
-                max_order = None
-                break
-
-            count_nodes.append(next_id)
-
-            max_order = self._get_order(next_id)
-
-            if len(count_nodes) < self.rebalance_md * (max_order - new_order):
-                break
-
-        if len(count_nodes) == 1:
-            orders = [new_order]
-        if max_order:
-            orders = stern_brocot_range(len(count_nodes), new_order, max_order)
-            orders.sort(reverse=True)
-        else:
-            orders = [
-                Fraction(int(math.ceil(new_order)) + i, 1)
-                for i in xrange(0, len(count_nodes))
-            ]
-            orders.reverse()
 
-        assert len(count_nodes) == len(orders)
-
-        next_id = node_id
-        prev_order = old_order
-        while orders:
-            order = orders.pop()
-
-            if max_order:
-                assert old_order <= new_order <= max_order
-            else:
-                assert old_order <= new_order
-
-            assert prev_order < order
-
-            SQLBaseStore._simple_update_txn(
-                self.txn,
-                table="chunk_linearized",
-                keyvalues={"chunk_id": next_id},
-                updatevalues={
-                    "numerator": int(order.numerator),
-                    "denominator": int(order.denominator),
-                },
+        a, b, c, d = find_farey_terms(old_order, self.rebalance_md)
+        n = max(b, d)
+
+        with_sql = """
+            WITH RECURSIVE chunks (chunk_id, next, n, a, b, c, d) AS (
+                    SELECT chunk_id, next_chunk_id, ?, ?, ?, ?, ?
+                    FROM chunk_linearized WHERE chunk_id = ?
+                UNION ALL
+                    SELECT n.chunk_id, n.next_chunk_id, n, c, d, ((n + b) / d) * c - a, ((n + b) / d) * d - b
+                    FROM chunks AS c
+                    INNER JOIN chunk_linearized AS l ON l.chunk_id = c.chunk_id
+                    INNER JOIN chunk_linearized AS n ON n.chunk_id = l.next_chunk_id
+                    WHERE c * 1.0 / d > n.numerator * 1.0 / n.denominator
             )
+        """
 
-            next_id = self.get_next(next_id)
+        if isinstance(self.database_engine, PostgresEngine):
+            sql = with_sql + """
+                UPDATE chunk_linearized AS l
+                SET numerator = a, denominator = b
+                FROM chunks AS c
+                WHERE c.chunk_id = l.chunk_id
+            """
+        else:
+            sql = with_sql + """
+                UPDATE chunk_linearized
+                SET (numerator, denominator) = (
+                    SELECT a, b FROM chunks
+                    WHERE chunks.chunk_id = chunk_linearized.chunk_id
+                )
+                WHERE chunk_id in (SELECT chunk_id FROM chunks)
+            """
+
+        self.txn.execute(sql, (
+            n, a, b, c, d, node_id
+        ))
 
         rebalance_counter.inc()
 
@@ -512,7 +488,7 @@ def stern_brocot_single(min_frac, max_frac):
 def stern_brocot_range_depth(min_frac, max_denom):
     assert 0 < min_frac
 
-    states = stern_brocot_single(min_frac)
+    states = stern_brocot_singless(min_frac)
 
     while len(states):
         a, b, c, d = states.pop()
@@ -534,22 +510,51 @@ def stern_brocot_range_depth(min_frac, max_denom):
 
 
 
-def stern_brocot_single(min_frac):
+def stern_brocot_state(min_frac, target_d):
     assert 0 <= min_frac
 
     states = []
 
     a, b, c, d = 0, 1, 1, 0
 
-    states.append((a, b, c, d))
-
     while True:
         f = Fraction(a + c, b + d)
+
+        if b + d >= target_d:
+            return a + c, b + d, c, d
+
         if f < min_frac:
             a, b, c, d = a + c, b + d, c, d
         elif f == min_frac:
-            return states
+            return a + c, b + d, c, d
         else:
             a, b, c, d = a, b, a + c, b + d
 
-        states.append((a, b, c, d))
+
+def find_farey_terms(min_frac, max_denom):
+    states = deque([(0, 1, 1, 0)])
+
+    while True:
+        a, b, c, d = states.popleft()
+
+        left = a / float(b)
+        mid = (a + c) / float(b + d)
+        right = c / float(d) if d > 0 else None
+
+        if min_frac < left:
+            if b >= max_denom or d >= max_denom:
+                return a, b, c, d
+            if b + d  >= max_denom:
+                return a + c, b + d, c, d
+
+            states.append((a, b, a + c, b + d))
+        elif min_frac < mid:
+            if b + d  >= max_denom:
+                return a + c, b + d, c, d
+
+            states.append((a, b, a + c, b + d))
+            states.append((a + c, b + d, c, d))
+        elif right is None or min_frac < right:
+            states.append((a + c, b + d, c, d))
+        else:
+            states.append((a + c, b + d, c, d))

+ 18 - 6
tests/storage/test_chunk_linearizer_table.py

@@ -44,7 +44,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
 
         def test_txn(txn):
             table = ChunkDBOrderedListStore(
-                txn, room_id, self.clock, 5, 100,
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
             )
 
             table.add_node("A")
@@ -71,7 +73,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
 
         def test_txn(txn):
             table = ChunkDBOrderedListStore(
-                txn, room_id, self.clock, 5, 100,
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
             )
 
             nodes = [(i, "node_%d" % (i,)) for i in xrange(1, 1000)]
@@ -116,7 +120,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
 
         def test_txn(txn):
             table = ChunkDBOrderedListStore(
-                txn, room_id, self.clock, 5, 1000,
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 1000,
             )
 
             table.add_node("a")
@@ -152,7 +158,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
 
         def test_txn(txn):
             table = ChunkDBOrderedListStore(
-                txn, room_id, self.clock, 5, 100,
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
             )
 
             table.add_node("a")
@@ -193,7 +201,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
 
         def test_txn(txn):
             table = ChunkDBOrderedListStore(
-                txn, room_id, self.clock, 5, 100,
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
             )
 
             table.add_node("A")
@@ -216,7 +226,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
 
         def test_txn(txn):
             table = ChunkDBOrderedListStore(
-                txn, room_id, self.clock, 5, 100,
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
             )
 
             table.add_node("A")