Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion tests/bwc/test_rolling_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from crate.client.exceptions import ProgrammingError
from cr8.run_crate import CrateNode

from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath
from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath, assert_busy

ROLLING_UPGRADES_V4 = (
# 4.0.0 -> 4.0.1 -> 4.0.2 don't support rolling upgrades due to a bug
Expand Down Expand Up @@ -106,6 +106,9 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int):
with connect(remote_node.http_url, error_trace=True) as remote_conn:
new_shards = init_foreign_data_wrapper_data(conn, remote_conn, node.addresses.psql.port, remote_node.addresses.psql.port)
expected_active_shards += new_shards
if node.version >= (5, 10, 0):
new_shards = init_logical_replication_data(self, conn, remote_conn, node.addresses.transport.port, remote_node.addresses.transport.port, expected_active_shards)
expected_active_shards += new_shards

for idx, node in enumerate(cluster):
# Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node.
Expand Down Expand Up @@ -140,6 +143,8 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int):
assert remote_node is not None
with connect(remote_node.http_url, error_trace=True) as remote_conn:
test_foreign_data_wrapper(self, conn, remote_conn)
if node.version >= (5, 10, 0):
test_logical_replication_queries(self, conn, remote_conn)

# Finally validate that all shards (primaries and replicas) of all partitions are started
# and writes into the partitioned table while upgrading were successful
Expand Down Expand Up @@ -380,3 +385,57 @@ def test_foreign_data_wrapper(self, local_conn: Connection, remote_conn: Connect
rc.execute("refresh table doc.y")
c.execute("select count(a) from doc.remote_y")
self.assertEqual(c.fetchall()[0][0], count + 1)


def init_logical_replication_data(self, local_conn: Connection, remote_conn: Connection, local_transport_port:int, remote_transport_port: int, local_active_shards: int) -> int:
assert 4300 <= local_transport_port <= 4310 and 4300 <= remote_transport_port <= 4310

c = local_conn.cursor()
c.execute("create table doc.x (a int) clustered into 1 shards with (number_of_replicas=0)")
c.execute("create publication p for table doc.x")

rc = remote_conn.cursor()
rc.execute("create table doc.rx (a int) clustered into 1 shards with (number_of_replicas=0)")
rc.execute("create publication rp for table doc.rx")

rc.execute(f"create subscription rs connection 'crate://localhost:{local_transport_port}?user=crate&sslmode=sniff' publication p")
c.execute(f"create subscription s connection 'crate://localhost:{remote_transport_port}?user=crate&sslmode=sniff' publication rp")

new_shards = 2 # 1 shard for doc.x and another 1 shard for doc.rx
wait_for_active_shards(rc, new_shards)
wait_for_active_shards(c, local_active_shards + new_shards)
assert_busy(lambda: self.assertEqual(num_docs_x(rc), 0))
assert_busy(lambda: self.assertEqual(num_docs_rx(c), 0))

return new_shards


def test_logical_replication_queries(self, local_conn: Connection, remote_conn: Connection):
c = local_conn.cursor()
rc = remote_conn.cursor()

# Cannot drop replicated tables
with self.assertRaises(ProgrammingError):
rc.execute("drop table doc.x")
c.execute("drop table doc.rx")

count = num_docs_x(rc)
count2 = num_docs_rx(c)

c.execute("insert into doc.x values (1)")
c.execute("refresh table doc.x")
rc.execute("insert into doc.rx values (1)")
rc.execute("refresh table doc.rx")

assert_busy(lambda: self.assertEqual(num_docs_x(rc), count + 1))
assert_busy(lambda: self.assertEqual(num_docs_rx(c), count2 + 1))


def num_docs_x(cursor):
cursor.execute("select count(*) from doc.x")
return cursor.fetchall()[0][0]


def num_docs_rx(cursor):
cursor.execute("select count(*) from doc.rx")
return cursor.fetchall()[0][0]