|
33 | 33 | ) |
34 | 34 |
|
35 | 35 | ROLLING_UPGRADES_V5 = ( |
36 | | - UpgradePath('5.0.x', '5.1.x'), |
37 | | - UpgradePath('5.1.x', '5.2.x'), |
38 | | - UpgradePath('5.2.x', '5.3.x'), |
39 | | - UpgradePath('5.3.x', '5.4.x'), |
40 | | - UpgradePath('5.4.x', '5.5.x'), |
41 | | - UpgradePath('5.5.x', '5.6.x'), |
42 | | - UpgradePath('5.6.x', '5.7.x'), |
43 | | - UpgradePath('5.7.x', '5.8.x'), |
44 | 36 | UpgradePath('5.8.x', '5.9.x'), |
45 | 37 | UpgradePath('5.9.x', '5.10.x'), |
46 | 38 | UpgradePath('5.10.x', '6.0.x'), |
@@ -99,6 +91,13 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int): |
99 | 91 | with connect(node.http_url, error_trace=True) as conn: |
100 | 92 | new_shards = init_data(conn, node.version, shards, replicas) |
101 | 93 | expected_active_shards += new_shards |
| 94 | + if node.version >= (5, 7, 0): |
| 95 | + remote_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False) |
| 96 | + remote_cluster.start() |
| 97 | + remote_node = remote_cluster.node() |
| 98 | + with connect(remote_node.http_url, error_trace=True) as remote_conn: |
| 99 | + new_shards = init_foreign_data_wrapper_data(conn, remote_conn, node.addresses.psql.port, remote_node.addresses.psql.port) |
| 100 | + expected_active_shards += new_shards |
102 | 101 |
|
103 | 102 | for idx, node in enumerate(cluster): |
104 | 103 | # Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node. |
@@ -129,6 +128,10 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int): |
129 | 128 | c = conn.cursor() |
130 | 129 | new_shards = self._test_queries_on_new_node(idx, c, node, new_node, nodes, shards, expected_active_shards) |
131 | 130 | expected_active_shards += new_shards |
| 131 | + if node.version >= (5, 7, 0): |
| 132 | + assert remote_node is not None |
| 133 | + with connect(remote_node.http_url, error_trace=True) as remote_conn: |
| 134 | + test_foreign_data_wrapper(self, conn, remote_conn) |
132 | 135 |
|
133 | 136 | # Finally validate that all shards (primaries and replicas) of all partitions are started |
134 | 137 | # and writes into the partitioned table while upgrading were successful |
@@ -328,3 +331,44 @@ def init_data(conn: Connection, version: tuple[int, int, int], shards: int, repl |
328 | 331 | c.execute("INSERT INTO doc.parted (id, value) VALUES (1, 1)") |
329 | 332 | new_shards += shards |
330 | 333 | return new_shards |
| 334 | + |
| 335 | + |
| 336 | +def init_foreign_data_wrapper_data(local_conn: Connection, remote_conn: Connection, local_psql_port: int, remote_psql_port: int) -> int: |
| 337 | + assert 5430 <= local_psql_port <= 5440 and 5430 <= remote_psql_port <= 5440 |
| 338 | + |
| 339 | + c = local_conn.cursor() |
| 340 | + rc = remote_conn.cursor() |
| 341 | + |
| 342 | + c.execute("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)") |
| 343 | + rc.execute("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)") |
| 344 | + new_shards = 1 |
| 345 | + |
| 346 | + rc.execute(f"CREATE SERVER source FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{local_psql_port}/')") |
| 347 | + c.execute(f"CREATE SERVER remote FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{remote_psql_port}/')") |
| 348 | + |
| 349 | + rc.execute("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER source OPTIONS (schema_name 'doc', table_name 'y')") |
| 350 | + c.execute("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER remote OPTIONS (schema_name 'doc', table_name 'y')") |
| 351 | + |
| 352 | + wait_for_active_shards(c) |
| 353 | + wait_for_active_shards(rc) |
| 354 | + |
| 355 | + return new_shards |
| 356 | + |
| 357 | + |
| 358 | +def test_foreign_data_wrapper(self, local_conn: Connection, remote_conn: Connection): |
| 359 | + c = local_conn.cursor() |
| 360 | + rc = remote_conn.cursor() |
| 361 | + |
| 362 | + rc.execute("select count(a) from doc.remote_y") |
| 363 | + count = rc.fetchall()[0][0] |
| 364 | + c.execute("insert into doc.y values (1)") |
| 365 | + c.execute("refresh table doc.y") |
| 366 | + rc.execute("select count(a) from doc.remote_y") |
| 367 | + self.assertEqual(rc.fetchall()[0][0], count + 1) |
| 368 | + |
| 369 | + c.execute("select count(a) from doc.remote_y") |
| 370 | + count = c.fetchall()[0][0] |
| 371 | + rc.execute("insert into doc.y values (1)") |
| 372 | + rc.execute("refresh table doc.y") |
| 373 | + c.execute("select count(a) from doc.remote_y") |
| 374 | + self.assertEqual(c.fetchall()[0][0], count + 1) |
0 commit comments