Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions tests/bwc/test_rolling_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int):
with connect(node.http_url, error_trace=True) as conn:
new_shards = init_data(conn, node.version, shards, replicas)
expected_active_shards += new_shards
if node.version >= (5, 7, 0):
remote_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False)
remote_cluster.start()
remote_node = remote_cluster.node()
with connect(remote_node.http_url, error_trace=True) as remote_conn:
new_shards = init_foreign_data_wrapper_data(conn, remote_conn, node.addresses.psql.port, remote_node.addresses.psql.port)
expected_active_shards += new_shards

for idx, node in enumerate(cluster):
# Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node.
Expand Down Expand Up @@ -129,6 +136,10 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int):
c = conn.cursor()
new_shards = self._test_queries_on_new_node(idx, c, node, new_node, nodes, shards, expected_active_shards)
expected_active_shards += new_shards
if node.version >= (5, 7, 0):
assert remote_node is not None
with connect(remote_node.http_url, error_trace=True) as remote_conn:
test_foreign_data_wrapper(self, conn, remote_conn)

# Finally validate that all shards (primaries and replicas) of all partitions are started
# and writes into the partitioned table while upgrading were successful
Expand Down Expand Up @@ -328,3 +339,44 @@ def init_data(conn: Connection, version: tuple[int, int, int], shards: int, repl
c.execute("INSERT INTO doc.parted (id, value) VALUES (1, 1)")
new_shards += shards
return new_shards


def init_foreign_data_wrapper_data(local_conn: Connection, remote_conn: Connection, local_psql_port: int, remote_psql_port: int) -> int:
assert 5430 <= local_psql_port <= 5440 and 5430 <= remote_psql_port <= 5440

c = local_conn.cursor()
rc = remote_conn.cursor()

c.execute("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)")
rc.execute("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)")
new_shards = 1

rc.execute(f"CREATE SERVER source FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{local_psql_port}/')")
c.execute(f"CREATE SERVER remote FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{remote_psql_port}/')")

rc.execute("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER source OPTIONS (schema_name 'doc', table_name 'y')")
c.execute("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER remote OPTIONS (schema_name 'doc', table_name 'y')")

wait_for_active_shards(c)
wait_for_active_shards(rc)

return new_shards


def test_foreign_data_wrapper(self, local_conn: Connection, remote_conn: Connection):
c = local_conn.cursor()
rc = remote_conn.cursor()

rc.execute("select count(a) from doc.remote_y")
count = rc.fetchall()[0][0]
c.execute("insert into doc.y values (1)")
c.execute("refresh table doc.y")
rc.execute("select count(a) from doc.remote_y")
self.assertEqual(rc.fetchall()[0][0], count + 1)

c.execute("select count(a) from doc.remote_y")
count = c.fetchall()[0][0]
rc.execute("insert into doc.y values (1)")
rc.execute("refresh table doc.y")
c.execute("select count(a) from doc.remote_y")
self.assertEqual(c.fetchall()[0][0], count + 1)