Browse Source

Use fractions for ordering of chunks

Using floats turned out to be a bad idea, as it broke subtely if the
needed precision was too large. This PR replaces the implementation with
one that uses fractions and stores them in the database as two integers.
Erik Johnston 6 years ago
parent
commit
918a5055ff

+ 366 - 108
synapse/storage/chunk_ordered_table.py

@@ -16,9 +16,11 @@
 import math
 import logging
 
+from fractions import Fraction
+
 from synapse.storage._base import SQLBaseStore
+from synapse.storage.engines import PostgresEngine
 from synapse.util.katriel_bodlaender import OrderedListStore
-from synapse.util.metrics import Measure
 
 import synapse.metrics
 
@@ -59,11 +61,12 @@ class ChunkDBOrderedListStore(OrderedListStore):
     re-instantiated in each transaction, so all state needs to be stored
     in the database.
 
-    Internally the ordering is implemented using floats, and the average is
-    taken when a node is inserted between other nodes. To avoid precision
-    errors a minimum difference between sucessive orderings is attempted to be
-    kept; whenever the difference is too small we attempt to rebalance. See
-    the `_rebalance` function for implementation details.
+    Internally the ordering is implemented using a linked list and assigning
+    each chunk a fraction. `get_next` and `get_prev` are implemented via linked
+    lists, and comparisons implemented using the fractions. When inserting
+    chunks fractions are picked such that their denominator is the smallest
+    possible. However, if the denominators grow too big then a rebalancing has
+    to take place to reduce the denominators; see `_rebalance` for details.
 
     Note that OrderedListStore orders nodes such that source of an edge
     comes before the target. This is counter intuitive when edges represent
@@ -80,23 +83,24 @@ class ChunkDBOrderedListStore(OrderedListStore):
         txn
         room_id (str)
         clock
-        rebalance_digits (int): When a rebalance is triggered we rebalance
-            in a range around the node, where the bounds are rounded to this
-            number of digits.
-        min_difference (int): A rebalance is triggered when the difference
-            between two successive orderings is less than the reciprocal of
-            this.
+        database_engine
+        rebalance_max_denominator (int): When a rebalance is triggered we
+            replace existing orders with those that have a denominator smaller
+            or equal to this
+        max_denominator (int): A rebalance is triggered when a node has an
+            ordering with a denominator greater than this
     """
     def __init__(self,
-                 txn, room_id, clock,
-                 rebalance_digits=3,
-                 min_difference=1000000):
+                 txn, room_id, clock, database_engine,
+                 rebalance_max_denominator=100,
+                 max_denominator=100000):
         self.txn = txn
         self.room_id = room_id
         self.clock = clock
+        self.database_engine = database_engine
 
-        self.rebalance_digits = rebalance_digits
-        self.min_difference = 1. / min_difference
+        self.rebalance_md = rebalance_max_denominator
+        self.max_denominator = max_denominator
 
     def is_before(self, a, b):
         """Implements OrderedListStore"""
@@ -104,16 +108,13 @@ class ChunkDBOrderedListStore(OrderedListStore):
 
     def get_prev(self, node_id):
         """Implements OrderedListStore"""
-        order = self._get_order(node_id)
 
         sql = """
             SELECT chunk_id FROM chunk_linearized
-            WHERE ordering < ? AND room_id = ?
-            ORDER BY ordering DESC
-            LIMIT 1
+            WHERE next_chunk_id = ?
         """
 
-        self.txn.execute(sql, (order, self.room_id,))
+        self.txn.execute(sql, (node_id,))
 
         row = self.txn.fetchone()
         if row:
@@ -122,16 +123,13 @@ class ChunkDBOrderedListStore(OrderedListStore):
 
     def get_next(self, node_id):
         """Implements OrderedListStore"""
-        order = self._get_order(node_id)
 
         sql = """
-            SELECT chunk_id FROM chunk_linearized
-            WHERE ordering > ? AND room_id = ?
-            ORDER BY ordering ASC
-            LIMIT 1
+            SELECT next_chunk_id FROM chunk_linearized
+            WHERE chunk_id = ?
         """
 
-        self.txn.execute(sql, (order, self.room_id,))
+        self.txn.execute(sql, (node_id,))
 
         row = self.txn.fetchone()
         if row:
@@ -144,27 +142,26 @@ class ChunkDBOrderedListStore(OrderedListStore):
         rebalance = False  # Set to true if we need to trigger a rebalance
 
         if target_id:
-            target_order = self._get_order(target_id)
             before_id = self.get_prev(target_id)
-
             if before_id:
-                before_order = self._get_order(before_id)
-                new_order = (target_order + before_order) / 2.
-
-                rebalance = math.fabs(target_order - before_order) < self.min_difference
+                new_order = self._insert_between(node_id, before_id, target_id)
             else:
-                new_order = math.floor(target_order) - 1
+                new_order = self._insert_at_start(node_id, target_id)
         else:
             # If target_id is None then we insert at the end.
             self.txn.execute("""
-                SELECT COALESCE(MAX(ordering), 0) + 1
+                SELECT chunk_id
                 FROM chunk_linearized
-                WHERE room_id = ?
+                WHERE room_id = ? AND next_chunk_id is NULL
             """, (self.room_id,))
 
-            new_order, = self.txn.fetchone()
+            row = self.txn.fetchone()
+            if row:
+                new_order = self._insert_at_end(node_id, row[0])
+            else:
+                new_order = self._insert_first(node_id)
 
-        self._insert(node_id, new_order)
+        rebalance = new_order.denominator > self.max_denominator
 
         if rebalance:
             self._rebalance(node_id)
@@ -174,64 +171,204 @@ class ChunkDBOrderedListStore(OrderedListStore):
 
         rebalance = False  # Set to true if we need to trigger a rebalance
 
+        next_chunk_id = None
         if target_id:
-            target_order = self._get_order(target_id)
-            after_id = self.get_next(target_id)
-            if after_id:
-                after_order = self._get_order(after_id)
-                new_order = (target_order + after_order) / 2.
-
-                rebalance = math.fabs(target_order - after_order) < self.min_difference
+            next_chunk_id = self.get_next(target_id)
+            if next_chunk_id:
+                new_order = self._insert_between(node_id, target_id, next_chunk_id)
             else:
-                new_order = math.ceil(target_order) + 1
+                new_order = self._insert_at_end(node_id, target_id)
         else:
             # If target_id is None then we insert at the start.
             self.txn.execute("""
-                SELECT COALESCE(MIN(ordering), 0) - 1
+                SELECT chunk_id
                 FROM chunk_linearized
+                NATURAL JOIN chunk_linearized_first
                 WHERE room_id = ?
             """, (self.room_id,))
 
-            new_order, = self.txn.fetchone()
+            row = self.txn.fetchone()
+            if row:
+                new_order = self._insert_at_start(node_id, row[0])
+            else:
+                new_order = self._insert_first(node_id)
 
-        self._insert(node_id, new_order)
+        rebalance = new_order.denominator > self.max_denominator
 
         if rebalance:
             self._rebalance(node_id)
 
+    def _insert_between(self, node_id, left_id, right_id):
+        """Inserts node between given existing nodes.
+        """
+
+        left_order = self._get_order(left_id)
+        right_order = self._get_order(right_id)
+
+        assert left_order < right_order
+
+        new_order = get_fraction_in_range(left_order, right_order)
+
+        SQLBaseStore._simple_update_one_txn(
+            self.txn,
+            table="chunk_linearized",
+            keyvalues={"chunk_id": left_id},
+            updatevalues={"next_chunk_id": node_id},
+        )
+
+        SQLBaseStore._simple_insert_txn(
+            self.txn,
+            table="chunk_linearized",
+            values={
+                "chunk_id": node_id,
+                "room_id": self.room_id,
+                "next_chunk_id": right_id,
+                "numerator": int(new_order.numerator),
+                "denominator": int(new_order.denominator),
+            }
+        )
+
+        return new_order
+
+    def _insert_at_end(self, node_id, last_id):
+        """Inserts node at the end using existing last node.
+        """
+
+        last_order = self._get_order(last_id)
+        new_order = Fraction(int(math.ceil(last_order)) + 1, 1)
+
+        SQLBaseStore._simple_update_one_txn(
+            self.txn,
+            table="chunk_linearized",
+            keyvalues={"chunk_id": last_id},
+            updatevalues={"next_chunk_id": node_id},
+        )
+
+        SQLBaseStore._simple_insert_txn(
+            self.txn,
+            table="chunk_linearized",
+            values={
+                "chunk_id": node_id,
+                "room_id": self.room_id,
+                "next_chunk_id": None,
+                "numerator": int(new_order.numerator),
+                "denominator": int(new_order.denominator),
+            }
+        )
+
+        return new_order
+
+    def _insert_at_start(self, node_id, first_id):
+        """Inserts node at the start using existing first node.
+        """
+
+        first_order = self._get_order(first_id)
+        new_order = get_fraction_in_range(0, first_order)
+
+        SQLBaseStore._simple_update_one_txn(
+            self.txn,
+            table="chunk_linearized_first",
+            keyvalues={"room_id": self.room_id},
+            updatevalues={"chunk_id": node_id},
+        )
+
+        SQLBaseStore._simple_insert_txn(
+            self.txn,
+            table="chunk_linearized",
+            values={
+                "chunk_id": node_id,
+                "room_id": self.room_id,
+                "next_chunk_id": first_id,
+                "numerator": int(new_order.numerator),
+                "denominator": int(new_order.denominator),
+            }
+        )
+
+        return new_order
+
+    def _insert_first(self, node_id):
+        """Inserts the first node for this room.
+        """
+
+        SQLBaseStore._simple_insert_txn(
+            self.txn,
+            table="chunk_linearized_first",
+            values={
+                "room_id": self.room_id,
+                "chunk_id": node_id,
+            },
+        )
+
+        SQLBaseStore._simple_insert_txn(
+            self.txn,
+            table="chunk_linearized",
+            values={
+                "chunk_id": node_id,
+                "room_id": self.room_id,
+                "next_chunk_id": None,
+                "numerator": 1,
+                "denominator": 1,
+            }
+        )
+
+        return Fraction(1, 1)
+
     def get_nodes_with_edges_to(self, node_id):
         """Implements OrderedListStore"""
 
         # Note that we use the inverse relation here
         sql = """
-            SELECT l.ordering, l.chunk_id FROM chunk_graph AS g
+            SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g
             INNER JOIN chunk_linearized AS l ON g.prev_id = l.chunk_id
             WHERE g.chunk_id = ?
         """
         self.txn.execute(sql, (node_id,))
-        return self.txn.fetchall()
+        return [(Fraction(n, d), c) for c, n, d in self.txn]
 
     def get_nodes_with_edges_from(self, node_id):
         """Implements OrderedListStore"""
 
         # Note that we use the inverse relation here
         sql = """
-            SELECT l.ordering, l.chunk_id FROM chunk_graph AS g
+            SELECT  l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g
             INNER JOIN chunk_linearized AS l ON g.chunk_id = l.chunk_id
             WHERE g.prev_id = ?
         """
         self.txn.execute(sql, (node_id,))
-        return self.txn.fetchall()
+        return [(Fraction(n, d), c) for c, n, d in self.txn]
 
     def _delete_ordering(self, node_id):
         """Implements OrderedListStore"""
 
+        next_chunk_id = SQLBaseStore._simple_select_one_onecol_txn(
+            self.txn,
+            table="chunk_linearized",
+            keyvalues={
+                "chunk_id": node_id,
+            },
+            retcol="next_chunk_id",
+        )
+
         SQLBaseStore._simple_delete_txn(
             self.txn,
             table="chunk_linearized",
             keyvalues={"chunk_id": node_id},
         )
 
+        sql = """
+            UPDATE chunk_linearized SET next_chunk_id = ?
+            WHERE next_chunk_id = ?
+        """
+
+        self.txn.execute(sql, (next_chunk_id, node_id,))
+
+        sql = """
+            UPDATE chunk_linearized_first SET chunk_id = ?
+            WHERE chunk_id = ?
+        """
+
+        self.txn.execute(sql, (next_chunk_id, node_id,))
+
     def _add_edge_to_graph(self, source_id, target_id):
         """Implements OrderedListStore"""
 
@@ -242,78 +379,199 @@ class ChunkDBOrderedListStore(OrderedListStore):
             values={"chunk_id": target_id, "prev_id": source_id}
         )
 
-    def _insert(self, node_id, order):
-        """Inserts the node with the given ordering.
-        """
-        SQLBaseStore._simple_insert_txn(
-            self.txn,
-            table="chunk_linearized",
-            values={
-                "chunk_id": node_id,
-                "room_id": self.room_id,
-                "ordering": order,
-            }
-        )
-
     def _get_order(self, node_id):
         """Get the ordering of the given node.
         """
 
-        return SQLBaseStore._simple_select_one_onecol_txn(
+        row = SQLBaseStore._simple_select_one_txn(
             self.txn,
             table="chunk_linearized",
             keyvalues={"chunk_id": node_id},
-            retcol="ordering"
+            retcols=("numerator", "denominator",),
         )
+        return Fraction(row["numerator"], row["denominator"])
 
     def _rebalance(self, node_id):
         """Rebalances the list around the given node to ensure that the
-        ordering floats don't get too small.
+        ordering denominators aren't too big.
+
+        This is done by starting at the given chunk and generating new orders
+        based on a Farey sequence of order `self.rebalance_md` for all
+        subsequent chunks that have an order less than that of the ordering
+        generated by the Farey sequence.
+
+        For example say we have chunks (and orders): A (23/90),  B (24/91) and
+        C (2/3), and we have rebalance_md set to 5, a rebalancing would produce:
+
+            A: 23/90 -> 1/3
+            B: 24/91 -> 2/5
+            C: 2/3  (no change)
 
-        This works by finding a range that includes the given node, and
-        recalculating the ordering floats such that they're equidistant in
-        that range.
+        Since the farey sequence is 1/5, 1/4, 1/3, 2/5, 1/2, ... and 1/3 is the
+        smallest term greater than 23/90.
+
+        Note that we've extended Farey Sequence to be infinite by repeating the
+        sequence with an added integer. For example sequence with order 3:
+
+            0/1, 1/3, 2/3, 1/1, 4/3, 5/3, 2/1, 7/3, ...
         """
 
         logger.info("Rebalancing room %s, chunk %s", self.room_id, node_id)
 
-        with Measure(self.clock, "chunk_rebalance"):
-            # We pick the interval to try and minimise the number of decimal
-            # places, i.e. we round to nearest float with `rebalance_digits` and
-            # use that as one side of the interval
-            order = self._get_order(node_id)
-            a = round(order, self.rebalance_digits)
-            min_order = a - 10 ** -self.rebalance_digits
-            max_order = a + 10 ** -self.rebalance_digits
-
-            # Now we get all the nodes in the range. We add the minimum difference
-            # to the bounds to ensure that we don't accidentally move a node to be
-            # within the minimum difference of a node outside the range.
-            sql = """
-                SELECT chunk_id FROM chunk_linearized
-                WHERE ordering >= ? AND ordering <= ? AND room_id = ?
-            """
-            self.txn.execute(sql, (
-                min_order - self.min_difference,
-                max_order + self.min_difference,
-                self.room_id,
-            ))
-
-            chunk_ids = [c for c, in self.txn]
+        old_order = self._get_order(node_id)
+
+        a, b, c, d = find_farey_terms(old_order, self.rebalance_md)
+        assert old_order < Fraction(a, b)
+        assert b + d > self.rebalance_md
+
+        # Since we can easily produce farey sequence terms with an iterative
+        # algorithm, we can use WITH RECURSIVE to do so. This is less clear
+        # than doing it in python, but saves us being killed by the RTT to the
+        # DB if we need to rebalance a large number of nodes.
+        with_sql = """
+            WITH RECURSIVE chunks (chunk_id, next, n, a, b, c, d) AS (
+                    SELECT chunk_id, next_chunk_id, ?, ?, ?, ?, ?
+                    FROM chunk_linearized WHERE chunk_id = ?
+                UNION ALL
+                    SELECT n.chunk_id, n.next_chunk_id, n,
+                    c, d, ((n + b) / d) * c - a, ((n + b) / d) * d - b
+                    FROM chunks AS c
+                    INNER JOIN chunk_linearized AS l ON l.chunk_id = c.chunk_id
+                    INNER JOIN chunk_linearized AS n ON n.chunk_id = l.next_chunk_id
+                    WHERE c * 1.0 / d > n.numerator * 1.0 / n.denominator
+            )
+        """
 
-            sql = """
+        # Annoyingly, postgres 9.4 doesn't support the standard SQL subquery
+        # syntax for updates.
+        if isinstance(self.database_engine, PostgresEngine):
+            sql = with_sql + """
+                UPDATE chunk_linearized AS l
+                SET numerator = a, denominator = b
+                FROM chunks AS c
+                WHERE c.chunk_id = l.chunk_id
+            """
+        else:
+            sql = with_sql + """
                 UPDATE chunk_linearized
-                SET ordering = ?
-                WHERE chunk_id = ?
+                SET (numerator, denominator) = (
+                    SELECT a, b FROM chunks
+                    WHERE chunks.chunk_id = chunk_linearized.chunk_id
+                )
+                WHERE chunk_id in (SELECT chunk_id FROM chunks)
             """
 
-            step = (max_order - min_order) / len(chunk_ids)
-            self.txn.executemany(
-                sql,
-                (
-                    ((idx * step + min_order), chunk_id)
-                    for idx, chunk_id in enumerate(chunk_ids)
-                )
-            )
+        self.txn.execute(sql, (
+            self.rebalance_md, a, b, c, d, node_id
+        ))
+
+        logger.info("Rebalanced %d chunks in room %s", self.txn.rowcount, self.room_id)
+
+        rebalance_counter.inc()
+
+
+def get_fraction_in_range(min_frac, max_frac):
+    """Gets a fraction in between the given numbers.
+
+    Uses Stern-Brocot tree to generate the fraction with the smallest
+    denominator.
+
+    See https://en.wikipedia.org/wiki/Stern%E2%80%93Brocot_tree
+
+    Args:
+        min_frac (numbers.Rational)
+        max_frac (numbers.Rational)
+
+    Returns:
+        numbers.Rational
+    """
+
+    assert 0 <= min_frac < max_frac
+
+    # If the determinant is 1 then the fraction with smallest numerator and
+    # denominator in the range is the mediant, so we don't have to use the
+    # stern brocot tree to search for it.
+    determinant = (
+        min_frac.denominator * max_frac.numerator
+        - min_frac.numerator * max_frac.denominator
+    )
+
+    if determinant == 1:
+        return Fraction(
+            min_frac.numerator + max_frac.numerator,
+            min_frac.denominator + max_frac.denominator,
+        )
+
+    # This works by tracking two fractions a/b and c/d and repeatedly replacing
+    # one of them with their mediant, depending on if the mediant is smaller
+    # or greater than the specified range.
+    a, b, c, d = 0, 1, 1, 0
+
+    while True:
+        f = Fraction(a + c, b + d)
+
+        if f <= min_frac:
+            a, b, c, d = a + c, b + d, c, d
+        elif min_frac < f < max_frac:
+            return f
+        else:
+            a, b, c, d = a, b, a + c, b + d
+
+
+def find_farey_terms(min_frac, max_denom):
+    """Find the smallest pair of fractions that are part of the Farey sequence
+    of order `max_denom` (the ordered sequence of all fraction with denominator
+    less than or equal to max_denom).
+
+    This is useful as it can be fed into a simple iterative algorithm to
+    generate subsequent entries in the sequence.
+
+    A pair of fractions a/b, c/d are neighbours in the sequence of order
+    max(b, d) if and only if their determinant is one, i.e. bc - ad = 1. Note
+    that the next order sequence is generate by taking the mediants of the
+    previous order, so a/b and c/d are neighbours in all sequences with orders
+    between max(b, d) and b + d.
+
+    We can therefore use the Stern-Brocot tree to find the closest pair of
+    fractions to min_frac such that b + d is strictly greater than max_denom,
+    since all neighbouring fractions in Stern-Brocot satisfy the necessary
+    determinant property.
+
+    Note that we've extended Farey Sequence to be infinite by repeating the
+    sequence with an added integer. For example sequence with order 3:
+
+        0/1, 1/3, 2/3, 1/1, 4/3, 5/3, 2/1, 7/3, ...
+
+    See https://en.wikipedia.org/wiki/Farey_sequence
+
+    Args:
+        min_frac (numbers.Rational)
+        max_frac (int)
+
+    Returns:
+        tuple[int, int, int, int]
+    """
+
+    a, b, c, d = 0, 1, 1, 0
+
+    while True:
+        cur_frac = Fraction(a + c, b + d)
+
+        if b + d > max_denom:
+            break
+
+        if cur_frac <= min_frac:
+            a, b, c, d = a + c, b + d, c, d
+        elif min_frac < cur_frac:
+            a, b, c, d = a, b, a + c, b + d
+
+    # a/b may be smaller than min_frac, so we run the algorithm to generate
+    # next Farey sequence terms until a/b is strictly greater than min_frac
+    while Fraction(a, b) <= min_frac:
+        k = int((max_denom + b) / d)
+        a, b, c, d = c, d, k * c - a, k * d - b
+
+    assert min_frac < Fraction(a, b) < Fraction(c, d)
+    assert b * c - a * d == 1
 
-            rebalance_counter.inc()
+    return a, b, c, d

+ 1 - 1
synapse/storage/events.py

@@ -1446,7 +1446,7 @@ class EventsStore(EventsWorkerStore):
             sibling_events.update(pes)
 
         table = ChunkDBOrderedListStore(
-            txn, room_id, self.clock,
+            txn, room_id, self.clock, self.database_engine,
         )
 
         # If there is only one previous chunk (and that isn't None), then this

+ 39 - 8
synapse/storage/schema/delta/49/event_chunks.py

@@ -53,11 +53,23 @@ CREATE INDEX chunk_backwards_extremities_event_id ON chunk_backwards_extremities
 CREATE TABLE chunk_linearized (
     chunk_id BIGINT NOT NULL,
     room_id TEXT NOT NULL,
-    ordering DOUBLE PRECISION NOT NULL
+    next_chunk_id BIGINT,  -- The chunk directly after this chunk, or NULL if last chunk
+    numerator BIGINT NOT NULL,
+    denominator BIGINT NOT NULL
 );
 
 CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id);
-CREATE INDEX chunk_linearized_ordering ON chunk_linearized (room_id, ordering);
+CREATE UNIQUE INDEX chunk_linearized_next_id ON chunk_linearized (
+    next_chunk_id, room_id
+);
+
+-- Records the first chunk in a room.
+CREATE TABLE chunk_linearized_first (
+    chunk_id BIGINT NOT NULL,
+    room_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX chunk_linearized_first_id ON chunk_linearized_first (room_id);
 
 INSERT into background_updates (update_name, progress_json)
     VALUES ('event_fields_chunk_id', '{}');
@@ -69,10 +81,6 @@ def run_create(cur, database_engine, *args, **kwargs):
     for statement in get_statements(SQL.splitlines()):
         cur.execute(statement)
 
-    # We now go through and assign chunk IDs for all forward extremities.
-    # Note that we know that extremities can't reference each other, so we
-    # can simply assign each event a new chunk ID with an arbitrary order.
-
     txn = LoggingTransaction(
         cur, "schema_update", database_engine, [], [],
     )
@@ -86,6 +94,7 @@ def run_create(cur, database_engine, *args, **kwargs):
 
     next_chunk_id = 1
     room_to_next_order = {}
+    prev_chunks_by_room = {}
 
     for row in rows:
         chunk_id = next_chunk_id
@@ -101,19 +110,41 @@ def run_create(cur, database_engine, *args, **kwargs):
             updatevalues={"chunk_id": chunk_id},
         )
 
-        ordering = room_to_next_order.get(room_id, 0)
+        ordering = room_to_next_order.get(room_id, 1)
         room_to_next_order[room_id] = ordering + 1
 
+        prev_chunks = prev_chunks_by_room.setdefault(room_id, [])
+
         SQLBaseStore._simple_insert_txn(
             txn,
             table="chunk_linearized",
             values={
                 "chunk_id": chunk_id,
                 "room_id": row["room_id"],
-                "ordering": 0,
+                "numerator": ordering,
+                "denominator": 1,
             },
         )
 
+        if prev_chunks:
+            SQLBaseStore._simple_update_one_txn(
+                txn,
+                table="chunk_linearized",
+                keyvalues={"chunk_id": prev_chunks[-1]},
+                updatevalues={"next_chunk_id": chunk_id},
+            )
+        else:
+            SQLBaseStore._simple_insert_txn(
+                txn,
+                table="chunk_linearized_first",
+                values={
+                    "chunk_id": chunk_id,
+                    "room_id": row["room_id"],
+                },
+            )
+
+        prev_chunks.append(chunk_id)
+
 
 def run_upgrade(*args, **kwargs):
     pass

+ 1 - 1
synapse/storage/stream.py

@@ -792,7 +792,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 iterated_chunks = [chunk_id]
 
         table = ChunkDBOrderedListStore(
-            txn, room_id, self.clock,
+            txn, room_id, self.clock, self.database_engine,
         )
 
         if filter_clause:

+ 135 - 18
tests/storage/test_chunk_linearizer_table.py

@@ -15,11 +15,16 @@
 
 from twisted.internet import defer
 
+import itertools
 import random
 import tests.unittest
 import tests.utils
 
-from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
+from fractions import Fraction
+
+from synapse.storage.chunk_ordered_table import (
+    ChunkDBOrderedListStore, find_farey_terms, get_fraction_in_range,
+)
 
 
 class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
@@ -42,23 +47,26 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
 
         def test_txn(txn):
             table = ChunkDBOrderedListStore(
-                txn, room_id, self.clock, 1, 100,
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
             )
 
             table.add_node("A")
             table._insert_after("B", "A")
             table._insert_before("C", "A")
+            table._insert_after("D", "A")
 
             sql = """
-                SELECT chunk_id FROM chunk_linearized
+                SELECT chunk_id, numerator, denominator FROM chunk_linearized
                 WHERE room_id = ?
-                ORDER BY ordering ASC
             """
             txn.execute(sql, (room_id,))
 
-            ordered = [r for r, in txn]
+            ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
+            ordered = [c for _, c in ordered]
 
-            self.assertEqual(["C", "A", "B"], ordered)
+            self.assertEqual(["C", "A", "D", "B"], ordered)
 
         yield self.store.runInteraction("test", test_txn)
 
@@ -68,7 +76,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
 
         def test_txn(txn):
             table = ChunkDBOrderedListStore(
-                txn, room_id, self.clock, 1, 20,
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
             )
 
             nodes = [(i, "node_%d" % (i,)) for i in xrange(1, 1000)]
@@ -95,13 +105,13 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
                 already_inserted.sort()
 
             sql = """
-                SELECT chunk_id FROM chunk_linearized
+                SELECT chunk_id, numerator, denominator FROM chunk_linearized
                 WHERE room_id = ?
-                ORDER BY ordering ASC
             """
             txn.execute(sql, (room_id,))
 
-            ordered = [r for r, in txn]
+            ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
+            ordered = [c for _, c in ordered]
 
             self.assertEqual(expected, ordered)
 
@@ -113,7 +123,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
 
         def test_txn(txn):
             table = ChunkDBOrderedListStore(
-                txn, room_id, self.clock, 1, 20,
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 1000,
             )
 
             table.add_node("a")
@@ -131,13 +143,13 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
                 expected.append(node_id)
 
             sql = """
-                SELECT chunk_id FROM chunk_linearized
+                SELECT chunk_id, numerator, denominator FROM chunk_linearized
                 WHERE room_id = ?
-                ORDER BY ordering ASC
             """
             txn.execute(sql, (room_id,))
 
-            ordered = [r for r, in txn]
+            ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
+            ordered = [c for _, c in ordered]
 
             self.assertEqual(expected, ordered)
 
@@ -149,7 +161,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
 
         def test_txn(txn):
             table = ChunkDBOrderedListStore(
-                txn, room_id, self.clock, 1, 100,
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
             )
 
             table.add_node("a")
@@ -170,16 +184,119 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
                 prev_node = node_id
 
             sql = """
-                SELECT chunk_id FROM chunk_linearized
+                SELECT chunk_id, numerator, denominator FROM chunk_linearized
                 WHERE room_id = ?
-                ORDER BY ordering ASC
             """
             txn.execute(sql, (room_id,))
 
-            ordered = [r for r, in txn]
+            ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
+            ordered = [c for _, c in ordered]
 
             expected = expected_prefix + list(reversed(expected_suffix))
 
             self.assertEqual(expected, ordered)
 
         yield self.store.runInteraction("test", test_txn)
+
+    @defer.inlineCallbacks
+    def test_get_edges_to(self):
+        room_id = "foo_room4"
+
+        def test_txn(txn):
+            table = ChunkDBOrderedListStore(
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
+            )
+
+            table.add_node("A")
+            table._insert_after("B", "A")
+            table._add_edge_to_graph("A", "B")
+            table._insert_before("C", "A")
+            table._add_edge_to_graph("C", "A")
+
+            nodes = table.get_nodes_with_edges_from("A")
+            self.assertEqual([n for _, n in nodes], ["B"])
+
+            nodes = table.get_nodes_with_edges_to("A")
+            self.assertEqual([n for _, n in nodes], ["C"])
+
+        yield self.store.runInteraction("test", test_txn)
+
+    @defer.inlineCallbacks
+    def test_get_next_and_prev(self):
+        room_id = "foo_room5"
+
+        def test_txn(txn):
+            table = ChunkDBOrderedListStore(
+                txn, room_id, self.clock,
+                self.store.database_engine,
+                5, 100,
+            )
+
+            table.add_node("A")
+            table._insert_after("B", "A")
+            table._insert_before("C", "A")
+
+            self.assertEqual(table.get_next("A"), "B")
+            self.assertEqual(table.get_prev("A"), "C")
+
+        yield self.store.runInteraction("test", test_txn)
+
+    def test_find_farey_terms(self):
+        def _test(min_frac, max_denom):
+            """"Calls `find_farey_terms` with given values and checks they
+            are neighbours in the Farey Sequence.
+            """
+
+            a, b, c, d = find_farey_terms(min_frac, max_denom)
+
+            p = Fraction(a, b)
+            q = Fraction(c, d)
+
+            assert min_frac < p < q
+
+            for x, y in _pairwise(_farey_generator(max_denom)):
+                if min_frac < x < y:
+                    self.assertEqual(x, p)
+                    self.assertEqual(y, q)
+                    break
+
+        _test(Fraction(5, 3), 12)
+        _test(Fraction(1, 3), 12)
+        _test(Fraction(1, 2), 9)
+        _test(Fraction(1, 2), 10)
+        _test(Fraction(1, 2), 15)
+
+    def test_get_fraction_in_range(self):
+        def _test(x, y):
+            assert x < get_fraction_in_range(x, y) < y
+
+        _test(Fraction(1, 2), Fraction(2, 3))
+        _test(Fraction(1, 2), Fraction(3, 2))
+        _test(Fraction(5, 203), Fraction(6, 204))
+
+
+def _farey_generator(n):
+    """Generates Farey sequence of order `n`.
+
+    Note that this doesn't terminate.
+
+    Taken from https://en.wikipedia.org/wiki/Farey_sequence#Next_term
+    """
+
+    a, b, c, d = 0, 1, 1, n
+
+    yield Fraction(a, b)
+
+    while True:
+        k = int((n + b) / d)
+        a, b, c, d = c, d, (k * c - a), (k * d - b)
+        yield Fraction(a, b)
+
+
+def _pairwise(iterable):
+    "s -> (s0,s1), (s1,s2), (s2, s3), ..."
+    a, b = itertools.tee(iterable)
+    next(b, None)
+    return itertools.izip(a, b)

+ 26 - 0
tests/util/test_katriel_bodlaender.py

@@ -56,3 +56,29 @@ class KatrielBodlaenderTests(unittest.TestCase):
         store.add_edge("node_4", "node_3")
 
         self.assertEqual(list(reversed(nodes)), store.list)
+
+    def test_divergent_graph(self):
+        store = InMemoryOrderedListStore()
+
+        nodes = [
+            "node_1",
+            "node_2",
+            "node_3",
+            "node_4",
+            "node_5",
+            "node_6",
+        ]
+
+        for node in reversed(nodes):
+            store.add_node(node)
+
+        store.add_edge("node_2", "node_3")
+        store.add_edge("node_2", "node_5")
+        store.add_edge("node_1", "node_2")
+        store.add_edge("node_3", "node_4")
+        store.add_edge("node_1", "node_3")
+        store.add_edge("node_4", "node_5")
+        store.add_edge("node_5", "node_6")
+        store.add_edge("node_4", "node_6")
+
+        self.assertEqual(nodes, store.list)