From dd63cd5e218e3b90aff7339415dcea4a3c35fd2c Mon Sep 17 00:00:00 2001 From: jeeminso Date: Thu, 23 Oct 2025 19:26:25 -0400 Subject: [PATCH] Improve metadata bwc test - FDW --- tests/bwc/test_rolling_upgrade.py | 52 +++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/tests/bwc/test_rolling_upgrade.py b/tests/bwc/test_rolling_upgrade.py index bb60c5d7..ecaa19cc 100644 --- a/tests/bwc/test_rolling_upgrade.py +++ b/tests/bwc/test_rolling_upgrade.py @@ -99,6 +99,13 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int): with connect(node.http_url, error_trace=True) as conn: new_shards = init_data(conn, node.version, shards, replicas) expected_active_shards += new_shards + if node.version >= (5, 7, 0): + remote_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False) + remote_cluster.start() + remote_node = remote_cluster.node() + with connect(remote_node.http_url, error_trace=True) as remote_conn: + new_shards = init_foreign_data_wrapper_data(conn, remote_conn, node.addresses.psql.port, remote_node.addresses.psql.port) + expected_active_shards += new_shards for idx, node in enumerate(cluster): # Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node. @@ -129,6 +136,10 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int): c = conn.cursor() new_shards = self._test_queries_on_new_node(idx, c, node, new_node, nodes, shards, expected_active_shards) expected_active_shards += new_shards + if node.version >= (5, 7, 0): + assert remote_node is not None + with connect(remote_node.http_url, error_trace=True) as remote_conn: + test_foreign_data_wrapper(self, conn, remote_conn) # Finally validate that all shards (primaries and replicas) of all partitions are started # and writes into the partitioned table while upgrading were successful @@ -328,3 +339,44 @@ def init_data(conn: Connection, version: tuple[int, int, int], shards: int, repl c.execute("INSERT INTO doc.parted (id, value) VALUES (1, 1)") new_shards += shards return new_shards + + +def init_foreign_data_wrapper_data(local_conn: Connection, remote_conn: Connection, local_psql_port: int, remote_psql_port: int) -> int: + assert 5430 <= local_psql_port <= 5440 and 5430 <= remote_psql_port <= 5440 + + c = local_conn.cursor() + rc = remote_conn.cursor() + + c.execute("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)") + rc.execute("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)") + new_shards = 1 + + rc.execute(f"CREATE SERVER source FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{local_psql_port}/')") + c.execute(f"CREATE SERVER remote FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{remote_psql_port}/')") + + rc.execute("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER source OPTIONS (schema_name 'doc', table_name 'y')") + c.execute("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER remote OPTIONS (schema_name 'doc', table_name 'y')") + + wait_for_active_shards(c) + wait_for_active_shards(rc) + + return new_shards + + +def test_foreign_data_wrapper(self, local_conn: Connection, remote_conn: Connection): + c = local_conn.cursor() + rc = remote_conn.cursor() + + rc.execute("select count(a) from doc.remote_y") + count = rc.fetchall()[0][0] + c.execute("insert into doc.y values (1)") + c.execute("refresh table doc.y") + rc.execute("select count(a) from doc.remote_y") + self.assertEqual(rc.fetchall()[0][0], count + 1) + + c.execute("select count(a) from doc.remote_y") + count = c.fetchall()[0][0] + rc.execute("insert into doc.y values (1)") + rc.execute("refresh table doc.y") + c.execute("select count(a) from doc.remote_y") + self.assertEqual(c.fetchall()[0][0], count + 1)