|
@@ -214,6 +214,10 @@ class Porter(object):
|
|
|
|
|
|
self.progress.add_table(table, postgres_size, table_size)
|
|
|
|
|
|
+ if table == "event_search":
|
|
|
+ yield self.handle_search_table(postgres_size, table_size, next_chunk)
|
|
|
+ return
|
|
|
+
|
|
|
select = (
|
|
|
"SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?"
|
|
|
% (table,)
|
|
@@ -232,60 +236,95 @@ class Porter(object):
|
|
|
if rows:
|
|
|
next_chunk = rows[-1][0] + 1
|
|
|
|
|
|
- if table == "event_search":
|
|
|
- # We have to treat event_search differently since it has a
|
|
|
- # different structure in the two different databases.
|
|
|
- def insert(txn):
|
|
|
- sql = (
|
|
|
- "INSERT INTO event_search (event_id, room_id, key, sender, vector)"
|
|
|
- " VALUES (?,?,?,?,to_tsvector('english', ?))"
|
|
|
- )
|
|
|
+ self._convert_rows(table, headers, rows)
|
|
|
|
|
|
- rows_dict = [
|
|
|
- dict(zip(headers, row))
|
|
|
- for row in rows
|
|
|
- ]
|
|
|
-
|
|
|
- txn.executemany(sql, [
|
|
|
- (
|
|
|
- row["event_id"],
|
|
|
- row["room_id"],
|
|
|
- row["key"],
|
|
|
- row["sender"],
|
|
|
- row["value"],
|
|
|
- )
|
|
|
- for row in rows_dict
|
|
|
- ])
|
|
|
-
|
|
|
- self.postgres_store._simple_update_one_txn(
|
|
|
- txn,
|
|
|
- table="port_from_sqlite3",
|
|
|
- keyvalues={"table_name": table},
|
|
|
- updatevalues={"rowid": next_chunk},
|
|
|
- )
|
|
|
- else:
|
|
|
- self._convert_rows(table, headers, rows)
|
|
|
+ def insert(txn):
|
|
|
+ self.postgres_store.insert_many_txn(
|
|
|
+ txn, table, headers[1:], rows
|
|
|
+ )
|
|
|
|
|
|
- def insert(txn):
|
|
|
- self.postgres_store.insert_many_txn(
|
|
|
- txn, table, headers[1:], rows
|
|
|
- )
|
|
|
+ self.postgres_store._simple_update_one_txn(
|
|
|
+ txn,
|
|
|
+ table="port_from_sqlite3",
|
|
|
+ keyvalues={"table_name": table},
|
|
|
+ updatevalues={"rowid": next_chunk},
|
|
|
+ )
|
|
|
+
|
|
|
+ yield self.postgres_store.execute(insert)
|
|
|
+
|
|
|
+ postgres_size += len(rows)
|
|
|
+
|
|
|
+ self.progress.update(table, postgres_size)
|
|
|
+ else:
|
|
|
+ return
|
|
|
+
|
|
|
+ @defer.inlineCallbacks
|
|
|
+ def handle_search_table(self, postgres_size, table_size, next_chunk):
|
|
|
+ select = (
|
|
|
+ "SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering"
|
|
|
+ " FROM event_search as es"
|
|
|
+ " INNER JOIN events AS e USING (event_id, room_id)"
|
|
|
+ " WHERE es.rowid >= ?"
|
|
|
+ " ORDER BY es.rowid LIMIT ?"
|
|
|
+ )
|
|
|
|
|
|
- self.postgres_store._simple_update_one_txn(
|
|
|
- txn,
|
|
|
- table="port_from_sqlite3",
|
|
|
- keyvalues={"table_name": table},
|
|
|
- updatevalues={"rowid": next_chunk},
|
|
|
+ while True:
|
|
|
+ def r(txn):
|
|
|
+ txn.execute(select, (next_chunk, self.batch_size,))
|
|
|
+ rows = txn.fetchall()
|
|
|
+ headers = [column[0] for column in txn.description]
|
|
|
+
|
|
|
+ return headers, rows
|
|
|
+
|
|
|
+ headers, rows = yield self.sqlite_store.runInteraction("select", r)
|
|
|
+
|
|
|
+ if rows:
|
|
|
+ next_chunk = rows[-1][0] + 1
|
|
|
+
|
|
|
+ # We have to treat event_search differently since it has a
|
|
|
+ # different structure in the two different databases.
|
|
|
+ def insert(txn):
|
|
|
+ sql = (
|
|
|
+ "INSERT INTO event_search (event_id, room_id, key,"
|
|
|
+ " sender, vector, origin_server_ts, stream_ordering)"
|
|
|
+ " VALUES (?,?,?,?,to_tsvector('english', ?),?,?)"
|
|
|
+ )
|
|
|
+
|
|
|
+ rows_dict = [
|
|
|
+ dict(zip(headers, row))
|
|
|
+ for row in rows
|
|
|
+ ]
|
|
|
+
|
|
|
+ txn.executemany(sql, [
|
|
|
+ (
|
|
|
+ row["event_id"],
|
|
|
+ row["room_id"],
|
|
|
+ row["key"],
|
|
|
+ row["sender"],
|
|
|
+ row["value"],
|
|
|
+ row["origin_server_ts"],
|
|
|
+ row["stream_ordering"],
|
|
|
)
|
|
|
+ for row in rows_dict
|
|
|
+ ])
|
|
|
+
|
|
|
+ self.postgres_store._simple_update_one_txn(
|
|
|
+ txn,
|
|
|
+ table="port_from_sqlite3",
|
|
|
+ keyvalues={"table_name": "event_search"},
|
|
|
+ updatevalues={"rowid": next_chunk},
|
|
|
+ )
|
|
|
|
|
|
yield self.postgres_store.execute(insert)
|
|
|
|
|
|
postgres_size += len(rows)
|
|
|
|
|
|
- self.progress.update(table, postgres_size)
|
|
|
+ self.progress.update("event_search", postgres_size)
|
|
|
+
|
|
|
else:
|
|
|
return
|
|
|
|
|
|
+
|
|
|
def setup_db(self, db_config, database_engine):
|
|
|
db_conn = database_engine.module.connect(
|
|
|
**{
|