diff --git a/tests/bwc/test_rolling_upgrade.py b/tests/bwc/test_rolling_upgrade.py index ecaa19cc..371b3a71 100644 --- a/tests/bwc/test_rolling_upgrade.py +++ b/tests/bwc/test_rolling_upgrade.py @@ -5,7 +5,7 @@ from crate.client.exceptions import ProgrammingError from cr8.run_crate import CrateNode -from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath +from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath, assert_busy ROLLING_UPGRADES_V4 = ( # 4.0.0 -> 4.0.1 -> 4.0.2 don't support rolling upgrades due to a bug @@ -106,6 +106,9 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int): with connect(remote_node.http_url, error_trace=True) as remote_conn: new_shards = init_foreign_data_wrapper_data(conn, remote_conn, node.addresses.psql.port, remote_node.addresses.psql.port) expected_active_shards += new_shards + if node.version >= (5, 10, 0): + new_shards = init_logical_replication_data(self, conn, remote_conn, node.addresses.transport.port, remote_node.addresses.transport.port, expected_active_shards) + expected_active_shards += new_shards for idx, node in enumerate(cluster): # Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node. @@ -140,6 +143,8 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int): assert remote_node is not None with connect(remote_node.http_url, error_trace=True) as remote_conn: test_foreign_data_wrapper(self, conn, remote_conn) + if node.version >= (5, 10, 0): + test_logical_replication_queries(self, conn, remote_conn) # Finally validate that all shards (primaries and replicas) of all partitions are started # and writes into the partitioned table while upgrading were successful @@ -380,3 +385,57 @@ def test_foreign_data_wrapper(self, local_conn: Connection, remote_conn: Connect rc.execute("refresh table doc.y") c.execute("select count(a) from doc.remote_y") self.assertEqual(c.fetchall()[0][0], count + 1) + + +def init_logical_replication_data(self, local_conn: Connection, remote_conn: Connection, local_transport_port:int, remote_transport_port: int, local_active_shards: int) -> int: + assert 4300 <= local_transport_port <= 4310 and 4300 <= remote_transport_port <= 4310 + + c = local_conn.cursor() + c.execute("create table doc.x (a int) clustered into 1 shards with (number_of_replicas=0)") + c.execute("create publication p for table doc.x") + + rc = remote_conn.cursor() + rc.execute("create table doc.rx (a int) clustered into 1 shards with (number_of_replicas=0)") + rc.execute("create publication rp for table doc.rx") + + rc.execute(f"create subscription rs connection 'crate://localhost:{local_transport_port}?user=crate&sslmode=sniff' publication p") + c.execute(f"create subscription s connection 'crate://localhost:{remote_transport_port}?user=crate&sslmode=sniff' publication rp") + + new_shards = 2 # 1 shard for doc.x and another 1 shard for doc.rx + wait_for_active_shards(rc, new_shards) + wait_for_active_shards(c, local_active_shards + new_shards) + assert_busy(lambda: self.assertEqual(num_docs_x(rc), 0)) + assert_busy(lambda: self.assertEqual(num_docs_rx(c), 0)) + + return new_shards + + +def test_logical_replication_queries(self, local_conn: Connection, remote_conn: Connection): + c = local_conn.cursor() + rc = remote_conn.cursor() + + # Cannot drop replicated tables + with self.assertRaises(ProgrammingError): + rc.execute("drop table doc.x") + c.execute("drop table doc.rx") + + count = num_docs_x(rc) + count2 = num_docs_rx(c) + + c.execute("insert into doc.x values (1)") + c.execute("refresh table doc.x") + rc.execute("insert into doc.rx values (1)") + rc.execute("refresh table doc.rx") + + assert_busy(lambda: self.assertEqual(num_docs_x(rc), count + 1)) + assert_busy(lambda: self.assertEqual(num_docs_rx(c), count2 + 1)) + + +def num_docs_x(cursor): + cursor.execute("select count(*) from doc.x") + return cursor.fetchall()[0][0] + + +def num_docs_rx(cursor): + cursor.execute("select count(*) from doc.rx") + return cursor.fetchall()[0][0] \ No newline at end of file