From 4d3afaad7a405bc89c36b085435756b8bb87ce68 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 29 Oct 2025 00:57:37 +0200 Subject: [PATCH 01/12] feat(here-now): add `limit` and `offset` configuration options Add `limit` (default `1000`) and `offset` parameters for `here_now` to fetch presence in portions. --- pubnub/endpoints/presence/here_now.py | 15 ++++++++++++++- tests/functional/test_here_now.py | 14 +++++++++----- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/pubnub/endpoints/presence/here_now.py b/pubnub/endpoints/presence/here_now.py index e1d22a7e..1ca4e819 100644 --- a/pubnub/endpoints/presence/here_now.py +++ b/pubnub/endpoints/presence/here_now.py @@ -29,6 +29,8 @@ def __init__(self, pubnub, channels: Union[str, List[str]] = None, channel_group self._include_state = include_state self._include_uuids = include_uuids + self._offset = None + self._limit = 1000 def channels(self, channels: Union[str, List[str]]) -> 'HereNow': utils.extend_list(self._channels, channels) @@ -46,8 +48,16 @@ def include_uuids(self, include_uuids) -> 'HereNow': self._include_uuids = include_uuids return self + def limit(self, limit: int) -> 'HereNow': + self._limit = limit + return self + + def offset(self, offset: int) -> 'HereNow': + self._offset = offset + return self + def custom_params(self): - params = {} + params = { 'limit': self._limit } if len(self._channel_groups) > 0: params['channel-group'] = utils.join_items_and_encode(self._channel_groups) @@ -58,6 +68,9 @@ def custom_params(self): if not self._include_uuids: params['disable_uuids'] = "1" + if self._offset is not None: + params['offset'] = self._offset + return params def build_path(self): diff --git a/tests/functional/test_here_now.py b/tests/functional/test_here_now.py index 48be47ea..6a3d8381 100644 --- a/tests/functional/test_here_now.py +++ b/tests/functional/test_here_now.py @@ -30,11 +30,12 @@ def test_here_now(self): self.assertEqual(self.here_now.build_params_callback()({}), { 'pnsdk': sdk_name, - 'uuid': self.pubnub.uuid + 'uuid': self.pubnub.uuid, + 'limit': 1000, }) def test_here_now_groups(self): - self.here_now.channel_groups("gr1") + self.here_now.channel_groups("gr1").limit(10000) self.assertEqual(self.here_now.build_path(), HereNow.HERE_NOW_PATH % (pnconf.subscribe_key, ",")) @@ -42,11 +43,12 @@ def test_here_now_groups(self): self.assertEqual(self.here_now.build_params_callback()({}), { 'channel-group': 'gr1', 'pnsdk': sdk_name, - 'uuid': self.pubnub.uuid + 'uuid': self.pubnub.uuid, + 'limit': 10000, }) def test_here_now_with_options(self): - self.here_now.channels(["ch1"]).channel_groups("gr1").include_state(True).include_uuids(False) + self.here_now.channels(["ch1"]).channel_groups("gr1").include_state(True).include_uuids(False).offset(3) self.assertEqual(self.here_now.build_path(), HereNow.HERE_NOW_PATH % (pnconf.subscribe_key, "ch1")) @@ -56,5 +58,7 @@ def test_here_now_with_options(self): 'state': '1', 'disable_uuids': '1', 'pnsdk': sdk_name, - 'uuid': self.pubnub.uuid + 'uuid': self.pubnub.uuid, + 'limit': 1000, + 'offset': 3, }) From 9f205dcd6ed34ee456b7ba938666260e79b55fa8 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 12 Nov 2025 11:40:31 +0200 Subject: [PATCH 02/12] fix(subscribe-heartbeat): fix duplicated channels issue Fix issue because of which it was possible to add duplicated entries of `channels` and `groups` to the `subscribe`, `heartbeat`, and `leave` requests. --- pubnub/endpoints/presence/heartbeat.py | 18 ++++---- pubnub/endpoints/presence/leave.py | 30 +++++-------- pubnub/endpoints/pubsub/subscribe.py | 22 ++++----- pubnub/event_engine/models/states.py | 2 +- pubnub/utils.py | 20 +++++---- tests/functional/test_heartbeat.py | 33 +++++++++++--- tests/functional/test_leave.py | 36 +++++++++++---- tests/functional/test_subscribe.py | 45 +++++++++++++++---- .../integrational/asyncio/test_change_uuid.py | 4 +- .../asyncio/test_message_count.py | 4 +- tests/integrational/asyncio/test_where_now.py | 4 +- 11 files changed, 141 insertions(+), 77 deletions(-) diff --git a/pubnub/endpoints/presence/heartbeat.py b/pubnub/endpoints/presence/heartbeat.py index 9fc2267c..df84f255 100644 --- a/pubnub/endpoints/presence/heartbeat.py +++ b/pubnub/endpoints/presence/heartbeat.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional, Union, List +from typing import Dict, Optional, Union, List, Set from pubnub import utils from pubnub.endpoints.endpoint import Endpoint from pubnub.enums import HttpMethod, PNOperationType @@ -13,22 +13,22 @@ class Heartbeat(Endpoint): def __init__(self, pubnub, channels: Union[str, List[str]] = None, channel_groups: Union[str, List[str]] = None, state: Optional[Dict[str, any]] = None): super(Heartbeat, self).__init__(pubnub) - self._channels = [] - self._groups = [] + self._channels: Set[str] = set() + self._groups: Set[str] = set() if channels: - utils.extend_list(self._channels, channels) + utils.update_set(self._channels, channels) if channel_groups: - utils.extend_list(self._groups, channel_groups) + utils.update_set(self._groups, channel_groups) self._state = state def channels(self, channels: Union[str, List[str]]) -> 'Heartbeat': - utils.extend_list(self._channels, channels) + utils.update_set(self._channels, channels) return self def channel_groups(self, channel_groups: Union[str, List[str]]) -> 'Heartbeat': - utils.extend_list(self._groups, channel_groups) + utils.update_set(self._groups, channel_groups) return self def state(self, state: Dict[str, any]) -> 'Heartbeat': @@ -46,14 +46,14 @@ def validate_params(self): raise PubNubException(pn_error=PNERR_CHANNEL_OR_GROUP_MISSING) def build_path(self): - channels = utils.join_channels(self._channels) + channels = utils.join_channels(self._channels, True) return Heartbeat.HEARTBEAT_PATH % (self.pubnub.config.subscribe_key, channels) def custom_params(self): params = {'heartbeat': str(self.pubnub.config.presence_timeout)} if len(self._groups) > 0: - params['channel-group'] = utils.join_items(self._groups) + params['channel-group'] = utils.join_items(self._groups, True) if self._state is not None and len(self._state) > 0: params['state'] = utils.url_write(self._state) diff --git a/pubnub/endpoints/presence/leave.py b/pubnub/endpoints/presence/leave.py index 113150e8..cefa5aa5 100644 --- a/pubnub/endpoints/presence/leave.py +++ b/pubnub/endpoints/presence/leave.py @@ -1,3 +1,5 @@ +from typing import Set, Union, List + from pubnub import utils from pubnub.endpoints.endpoint import Endpoint from pubnub.errors import PNERR_CHANNEL_OR_GROUP_MISSING @@ -11,30 +13,22 @@ class Leave(Endpoint): def __init__(self, pubnub): Endpoint.__init__(self, pubnub) - self._channels = [] - self._groups = [] - - def channels(self, channels): - if isinstance(channels, (list, tuple)): - self._channels.extend(channels) - else: - self._channels.extend(utils.split_items(channels)) + self._channels: Set[str] = set() + self._groups: Set[str] = set() + def channels(self, channels: Union[str, List[str]]) -> 'Leave': + utils.update_set(self._channels, channels) return self - def channel_groups(self, channel_groups): - if isinstance(channel_groups, (list, tuple)): - self._groups.extend(channel_groups) - else: - self._groups.extend(utils.split_items(channel_groups)) - + def channel_groups(self, channel_groups: Union[str, List[str]]) -> 'Leave': + utils.update_set(self._groups, channel_groups) return self def custom_params(self): params = {} if len(self._groups) > 0: - params['channel-group'] = utils.join_items(self._groups) + params['channel-group'] = utils.join_items(self._groups, True) if hasattr(self.pubnub, '_subscription_manager'): params.update(self.pubnub._subscription_manager.get_custom_params()) @@ -42,7 +36,7 @@ def custom_params(self): return params def build_path(self): - return Leave.LEAVE_PATH % (self.pubnub.config.subscribe_key, utils.join_channels(self._channels)) + return Leave.LEAVE_PATH % (self.pubnub.config.subscribe_key, utils.join_channels(self._channels, True)) def http_method(self): return HttpMethod.GET @@ -60,10 +54,10 @@ def is_auth_required(self): return True def affected_channels(self): - return self._channels + return list(self._channels) def affected_channels_groups(self): - return self._groups + return list(self._groups) def request_timeout(self): return self.pubnub.config.non_subscribe_request_timeout diff --git a/pubnub/endpoints/pubsub/subscribe.py b/pubnub/endpoints/pubsub/subscribe.py index d91a8ca0..277e5562 100644 --- a/pubnub/endpoints/pubsub/subscribe.py +++ b/pubnub/endpoints/pubsub/subscribe.py @@ -1,4 +1,4 @@ -from typing import Optional, Union, List +from typing import Optional, Union, List, Set from pubnub import utils from pubnub.endpoints.endpoint import Endpoint from pubnub.enums import HttpMethod, PNOperationType @@ -25,12 +25,12 @@ def __init__(self, pubnub, channels: Union[str, List[str]] = None, with_presence: Optional[str] = None, state: Optional[str] = None): super(Subscribe, self).__init__(pubnub) - self._channels = [] + self._channels: Set[str] = set() + self._groups: Set[str] = set() if channels: - utils.extend_list(self._channels, channels) - self._groups = [] + utils.update_set(self._channels, channels) if groups: - utils.extend_list(self._groups, groups) + utils.update_set(self._groups, groups) self._region = region self._filter_expression = filter_expression @@ -39,11 +39,11 @@ def __init__(self, pubnub, channels: Union[str, List[str]] = None, self._state = state def channels(self, channels: Union[str, List[str]]) -> 'Subscribe': - utils.extend_list(self._channels, channels) + utils.update_set(self._channels, channels) return self def channel_groups(self, groups: Union[str, List[str]]) -> 'Subscribe': - utils.extend_list(self._groups, groups) + utils.update_set(self._groups, groups) return self def timetoken(self, timetoken) -> 'Subscribe': @@ -72,14 +72,14 @@ def validate_params(self): raise PubNubException(pn_error=PNERR_CHANNEL_OR_GROUP_MISSING) def build_path(self): - channels = utils.join_channels(self._channels) + channels = utils.join_channels(self._channels, True) return Subscribe.SUBSCRIBE_PATH % (self.pubnub.config.subscribe_key, channels) def custom_params(self): params = {} if len(self._groups) > 0: - params['channel-group'] = utils.join_items_and_encode(self._groups) + params['channel-group'] = utils.join_items_and_encode(self._groups, True) if self._filter_expression is not None and len(self._filter_expression) > 0: params['filter-expr'] = utils.url_encode(self._filter_expression) @@ -108,10 +108,10 @@ def is_auth_required(self): return True def affected_channels(self): - return self._channels + return list(self._channels) def affected_channels_groups(self): - return self._groups + return list(self._groups) def request_timeout(self): return self.pubnub.config.subscribe_request_timeout diff --git a/pubnub/event_engine/models/states.py b/pubnub/event_engine/models/states.py index d9873323..6d40b3e5 100644 --- a/pubnub/event_engine/models/states.py +++ b/pubnub/event_engine/models/states.py @@ -568,7 +568,7 @@ def reconnect_failure(self, event: events.ReceiveReconnectFailureEvent, context: return PNTransition( state=ReceiveReconnectingState, context=self._context, - invocation=invocations.EmitStatusInvocation(PNStatusCategory.UnexpectedDisconnectCategory, + invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNUnexpectedDisconnectCategory, operation=PNOperationType.PNSubscribeOperation, context=self._context) ) diff --git a/pubnub/utils.py b/pubnub/utils.py index 3b5d2976..ebab6f43 100644 --- a/pubnub/utils.py +++ b/pubnub/utils.py @@ -8,6 +8,7 @@ import warnings from hashlib import sha256 +from typing import Set, List, Union from pubnub.enums import PNStatusCategory, PNOperationType, PNPushType, HttpMethod, PAMPermissions from pubnub.models.consumer.common import PNStatus @@ -54,19 +55,17 @@ def split_items(items_string): return items_string.split(",") -def join_items(items_list): - return ",".join(items_list) +def join_items(items_list, sort_items=False): + return ",".join(sorted(items_list) if sort_items else items_list) +def join_items_and_encode(items_list, sort_items=False): + return ",".join(url_encode(x) for x in (sorted(items_list) if sort_items else items_list)) -def join_items_and_encode(items_list): - return ",".join(url_encode(x) for x in items_list) - - -def join_channels(items_list): +def join_channels(items_list, sort_items=False): if len(items_list) == 0: return "," else: - return join_items_and_encode(items_list) + return join_items_and_encode(items_list, sort_items) def extend_list(existing_items, new_items): @@ -75,6 +74,11 @@ def extend_list(existing_items, new_items): else: existing_items.extend(new_items) +def update_set(existing_items: Set[str], new_items: Union[str, List[str]]): + if isinstance(new_items, str): + existing_items.update(split_items(new_items)) + else: + existing_items.update(new_items) def build_url(scheme, origin, path, params={}): return urllib.parse.urlunsplit((scheme, origin, path, params, '')) diff --git a/tests/functional/test_heartbeat.py b/tests/functional/test_heartbeat.py index cf144afe..56c7753d 100644 --- a/tests/functional/test_heartbeat.py +++ b/tests/functional/test_heartbeat.py @@ -32,7 +32,7 @@ def test_sub_single_channel(self): 'heartbeat': '20' }) - self.assertEqual(self.hb._channels, ['ch']) + self.assertEqual(list(self.hb._channels), ['ch']) def test_hb_multiple_channels_using_list(self): self.hb.channels(['ch1', 'ch2', 'ch3']) @@ -46,7 +46,15 @@ def test_hb_multiple_channels_using_list(self): 'heartbeat': '20' }) - self.assertEqual(self.hb._channels, ['ch1', 'ch2', 'ch3']) + self.assertEqual(sorted(self.hb._channels), ['ch1', 'ch2', 'ch3']) + + def test_hb_unique_channels_using_list(self): + self.hb.channels(['ch1', 'ch2', 'ch1']) + + self.assertEqual(self.hb.build_path(), Heartbeat.HEARTBEAT_PATH + % (pnconf.subscribe_key, "ch1,ch2")) + + self.assertEqual(sorted(self.hb._channels), ['ch1', 'ch2']) def test_hb_single_group(self): self.hb.channel_groups("gr") @@ -61,7 +69,7 @@ def test_hb_single_group(self): 'heartbeat': '20' }) - self.assertEqual(self.hb._groups, ['gr']) + self.assertEqual(list(self.hb._groups), ['gr']) def test_hb_multiple_groups_using_list(self): self.hb.channel_groups(['gr1', 'gr2', 'gr3']) @@ -76,7 +84,20 @@ def test_hb_multiple_groups_using_list(self): 'heartbeat': '20' }) - self.assertEqual(self.hb._groups, ['gr1', 'gr2', 'gr3']) + self.assertEqual(sorted(self.hb._groups), ['gr1', 'gr2', 'gr3']) + + def test_hb_unique_channel_groups_using_list(self): + self.hb.channel_groups(['gr1', 'gr2', 'gr1']) + + self.assertEqual(self.hb.build_path(), Heartbeat.HEARTBEAT_PATH + % (pnconf.subscribe_key, ",")) + + self.assertEqual(self.hb.build_params_callback()({}), { + 'channel-group': 'gr1,gr2', + 'pnsdk': sdk_name, + 'uuid': self.pubnub.uuid, + 'heartbeat': '20' + }) def test_hb_with_state(self): state = {"name": "Alex", "count": 7} @@ -95,5 +116,5 @@ def test_hb_with_state(self): 'state': state }) - self.assertEqual(self.hb._groups, []) - self.assertEqual(self.hb._channels, ['ch1', 'ch2']) + self.assertEqual(list(self.hb._groups), []) + self.assertEqual(sorted(self.hb._channels), ['ch1', 'ch2']) diff --git a/tests/functional/test_leave.py b/tests/functional/test_leave.py index 0d56ae8b..b8e38802 100644 --- a/tests/functional/test_leave.py +++ b/tests/functional/test_leave.py @@ -33,7 +33,7 @@ def test_leave_single_channel(self): 'uuid': self.pubnub.uuid }) - self.assertEqual(self.leave._channels, ['ch']) + self.assertEqual(sorted(list(self.leave._channels)), ['ch']) def test_leave_multiple_channels(self): self.leave.channels("ch1,ch2,ch3") @@ -45,7 +45,7 @@ def test_leave_multiple_channels(self): 'uuid': self.pubnub.uuid }) - self.assertEqual(self.leave._channels, ['ch1', 'ch2', 'ch3']) + self.assertEqual(sorted(list(self.leave._channels)), ['ch1', 'ch2', 'ch3']) def test_leave_multiple_channels_using_list(self): self.leave.channels(['ch1', 'ch2', 'ch3']) @@ -57,7 +57,7 @@ def test_leave_multiple_channels_using_list(self): 'uuid': self.pubnub.uuid }) - self.assertEqual(self.leave._channels, ['ch1', 'ch2', 'ch3']) + self.assertEqual(sorted(list(self.leave._channels)), ['ch1', 'ch2', 'ch3']) def test_leave_multiple_channels_using_tuple(self): self.leave.channels(('ch1', 'ch2', 'ch3')) @@ -69,7 +69,14 @@ def test_leave_multiple_channels_using_tuple(self): 'uuid': self.pubnub.uuid }) - self.assertEqual(self.leave._channels, ['ch1', 'ch2', 'ch3']) + self.assertEqual(sorted(list(self.leave._channels)), ['ch1', 'ch2', 'ch3']) + + def test_leave_unique_channels_using_list(self): + self.leave.channels(['ch1', 'ch2', 'ch1']) + + self.assertEqual(self.leave.build_path(), Leave.LEAVE_PATH % (pnconf.subscribe_key, "ch1,ch2")) + + self.assertEqual(sorted(list(self.leave._channels)), ['ch1', 'ch2']) def test_leave_single_group(self): self.leave.channel_groups("gr") @@ -83,7 +90,7 @@ def test_leave_single_group(self): 'uuid': self.pubnub.uuid }) - self.assertEqual(self.leave._groups, ['gr']) + self.assertEqual(list(self.leave._groups), ['gr']) def test_leave_multiple_groups_using_string(self): self.leave.channel_groups("gr1,gr2,gr3") @@ -97,7 +104,7 @@ def test_leave_multiple_groups_using_string(self): 'uuid': self.pubnub.uuid }) - self.assertEqual(self.leave._groups, ['gr1', 'gr2', 'gr3']) + self.assertEqual(sorted(list(self.leave._groups)), ['gr1', 'gr2', 'gr3']) def test_leave_multiple_groups_using_list(self): self.leave.channel_groups(['gr1', 'gr2', 'gr3']) @@ -111,7 +118,18 @@ def test_leave_multiple_groups_using_list(self): 'uuid': self.pubnub.uuid }) - self.assertEqual(self.leave._groups, ['gr1', 'gr2', 'gr3']) + self.assertEqual(sorted(list(self.leave._groups)), ['gr1', 'gr2', 'gr3']) + + def test_leave_unique_channel_groups_using_list(self): + self.leave.channel_groups(['gr1', 'gr2', 'gr1']) + + self.assertEqual(self.leave.build_params_callback()({}), { + 'channel-group': 'gr1,gr2', + 'pnsdk': sdk_name, + 'uuid': self.pubnub.uuid + }) + + self.assertEqual(sorted(list(self.leave._groups)), ['gr1', 'gr2']) def test_leave_channels_and_groups(self): self.leave.channels('ch1,ch2').channel_groups(["gr1", "gr2"]) @@ -125,5 +143,5 @@ def test_leave_channels_and_groups(self): 'channel-group': 'gr1,gr2', }) - self.assertEqual(self.leave._groups, ['gr1', 'gr2']) - self.assertEqual(self.leave._channels, ['ch1', 'ch2']) + self.assertEqual(sorted(list(self.leave._groups)), ['gr1', 'gr2']) + self.assertEqual(sorted(list(self.leave._channels)), ['ch1', 'ch2']) diff --git a/tests/functional/test_subscribe.py b/tests/functional/test_subscribe.py index 792d1227..fb57371e 100644 --- a/tests/functional/test_subscribe.py +++ b/tests/functional/test_subscribe.py @@ -30,7 +30,7 @@ def test_pub_single_channel(self): 'uuid': self.pubnub.uuid }) - self.assertEqual(self.sub._channels, ['ch']) + self.assertEqual(list(self.sub._channels), ['ch']) def test_sub_multiple_channels_using_string(self): self.sub.channels("ch1,ch2,ch3") @@ -43,7 +43,7 @@ def test_sub_multiple_channels_using_string(self): 'uuid': self.pubnub.uuid }) - self.assertEqual(self.sub._channels, ['ch1', 'ch2', 'ch3']) + self.assertEqual(sorted(self.sub._channels), ['ch1', 'ch2', 'ch3']) def test_sub_multiple_channels_using_list(self): self.sub.channels(['ch1', 'ch2', 'ch3']) @@ -56,7 +56,7 @@ def test_sub_multiple_channels_using_list(self): 'uuid': self.pubnub.uuid }) - self.assertEqual(self.sub._channels, ['ch1', 'ch2', 'ch3']) + self.assertEqual(sorted(self.sub._channels), ['ch1', 'ch2', 'ch3']) def test_sub_multiple_channels_using_tuple(self): self.sub.channels(('ch1', 'ch2', 'ch3')) @@ -69,7 +69,20 @@ def test_sub_multiple_channels_using_tuple(self): 'uuid': self.pubnub.uuid }) - self.assertEqual(self.sub._channels, ['ch1', 'ch2', 'ch3']) + self.assertEqual(sorted(self.sub._channels), ['ch1', 'ch2', 'ch3']) + + def test_sub_unique_channels_using_list(self): + self.sub.channels(['ch1', 'ch2', 'ch1']) + + self.assertEqual(self.sub.build_path(), Subscribe.SUBSCRIBE_PATH + % (pnconf.subscribe_key, "ch1,ch2")) + + self.assertEqual(self.sub.build_params_callback()({}), { + 'pnsdk': sdk_name, + 'uuid': self.pubnub.uuid + }) + + self.assertEqual(sorted(self.sub._channels), ['ch1', 'ch2']) def test_sub_single_group(self): self.sub.channel_groups("gr") @@ -83,7 +96,7 @@ def test_sub_single_group(self): 'uuid': self.pubnub.uuid }) - self.assertEqual(self.sub._groups, ['gr']) + self.assertEqual(list(self.sub._groups), ['gr']) def test_sub_multiple_groups_using_string(self): self.sub.channel_groups("gr1,gr2,gr3") @@ -97,7 +110,7 @@ def test_sub_multiple_groups_using_string(self): 'uuid': self.pubnub.uuid }) - self.assertEqual(self.sub._groups, ['gr1', 'gr2', 'gr3']) + self.assertEqual(sorted(self.sub._groups), ['gr1', 'gr2', 'gr3']) def test_sub_multiple_groups_using_list(self): self.sub.channel_groups(['gr1', 'gr2', 'gr3']) @@ -111,7 +124,21 @@ def test_sub_multiple_groups_using_list(self): 'uuid': self.pubnub.uuid }) - self.assertEqual(self.sub._groups, ['gr1', 'gr2', 'gr3']) + self.assertEqual(sorted(self.sub._groups), ['gr1', 'gr2', 'gr3']) + + def test_sub_unique_channel_groups_using_list(self): + self.sub.channel_groups(['gr1', 'gr2', 'gr1']) + + self.assertEqual(self.sub.build_path(), Subscribe.SUBSCRIBE_PATH + % (pnconf.subscribe_key, ",")) + + self.assertEqual(self.sub.build_params_callback()({}), { + 'channel-group': 'gr1,gr2', + 'pnsdk': sdk_name, + 'uuid': self.pubnub.uuid + }) + + self.assertEqual(sorted(self.sub._groups), ['gr1', 'gr2']) def test_sub_multiple(self): self.sub.channels('ch1,ch2').filter_expression('blah').region('us-east-1').timetoken('123') @@ -127,8 +154,8 @@ def test_sub_multiple(self): 'tt': '123' }) - self.assertEqual(self.sub._groups, []) - self.assertEqual(self.sub._channels, ['ch1', 'ch2']) + self.assertEqual(list(self.sub._groups), []) + self.assertEqual(sorted(self.sub._channels), ['ch1', 'ch2']) def test_affected_channels_returns_provided_channels(self): self.sub.channels(('ch1', 'ch2', 'ch3')) diff --git a/tests/integrational/asyncio/test_change_uuid.py b/tests/integrational/asyncio/test_change_uuid.py index 2fb5a0a9..e545d396 100644 --- a/tests/integrational/asyncio/test_change_uuid.py +++ b/tests/integrational/asyncio/test_change_uuid.py @@ -52,12 +52,12 @@ async def test_change_uuid_no_lock(): assert isinstance(envelope.status, PNStatus) -def test_uuid_validation_at_init(_function_event_loop): +def test_uuid_validation_at_init(): with pytest.raises(AssertionError) as exception: pnconf = PNConfiguration() pnconf.publish_key = "demo" pnconf.subscribe_key = "demo" - PubNubAsyncio(pnconf, custom_event_loop=_function_event_loop) + PubNubAsyncio(pnconf) assert str(exception.value) == 'UUID missing or invalid type' diff --git a/tests/integrational/asyncio/test_message_count.py b/tests/integrational/asyncio/test_message_count.py index f2f547c2..c5c714fe 100644 --- a/tests/integrational/asyncio/test_message_count.py +++ b/tests/integrational/asyncio/test_message_count.py @@ -9,10 +9,10 @@ @pytest.fixture -def pn(_function_event_loop): +def pn(): config = pnconf_mc_copy() config.enable_subscribe = False - pn = PubNubAsyncio(config, custom_event_loop=_function_event_loop) + pn = PubNubAsyncio(config) yield pn diff --git a/tests/integrational/asyncio/test_where_now.py b/tests/integrational/asyncio/test_where_now.py index a40a1b43..6a7cae3c 100644 --- a/tests/integrational/asyncio/test_where_now.py +++ b/tests/integrational/asyncio/test_where_now.py @@ -82,8 +82,8 @@ async def test_multiple_channels(): # @pytest.mark.asyncio @pytest.mark.skip(reason="Needs to be reworked to use VCR") -async def test_where_now_super_admin_call(_function_event_loop): - pubnub = PubNubAsyncio(pnconf_pam_copy(), custom_event_loop=_function_event_loop) +async def test_where_now_super_admin_call(): + pubnub = PubNubAsyncio(pnconf_pam_copy()) uuid = 'test-where-now-asyncio-uuid-.*|@' pubnub.config.uuid = uuid From f254bdd6a04e76db4969706a76a2e2bae0acf858 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 12 Nov 2025 11:44:59 +0200 Subject: [PATCH 03/12] refactor(lint): fix issue reported by linter --- pubnub/endpoints/presence/here_now.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubnub/endpoints/presence/here_now.py b/pubnub/endpoints/presence/here_now.py index 1ca4e819..4c094b79 100644 --- a/pubnub/endpoints/presence/here_now.py +++ b/pubnub/endpoints/presence/here_now.py @@ -57,7 +57,7 @@ def offset(self, offset: int) -> 'HereNow': return self def custom_params(self): - params = { 'limit': self._limit } + params = {'limit': self._limit} if len(self._channel_groups) > 0: params['channel-group'] = utils.join_items_and_encode(self._channel_groups) From 4f5be7685b0144a9dea1af054ee03998726c8dd8 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 12 Nov 2025 11:47:41 +0200 Subject: [PATCH 04/12] refactor(lint): fix issue reported by linter --- pubnub/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pubnub/utils.py b/pubnub/utils.py index ebab6f43..8de978eb 100644 --- a/pubnub/utils.py +++ b/pubnub/utils.py @@ -58,9 +58,11 @@ def split_items(items_string): def join_items(items_list, sort_items=False): return ",".join(sorted(items_list) if sort_items else items_list) + def join_items_and_encode(items_list, sort_items=False): return ",".join(url_encode(x) for x in (sorted(items_list) if sort_items else items_list)) + def join_channels(items_list, sort_items=False): if len(items_list) == 0: return "," @@ -74,12 +76,14 @@ def extend_list(existing_items, new_items): else: existing_items.extend(new_items) + def update_set(existing_items: Set[str], new_items: Union[str, List[str]]): if isinstance(new_items, str): existing_items.update(split_items(new_items)) else: existing_items.update(new_items) + def build_url(scheme, origin, path, params={}): return urllib.parse.urlunsplit((scheme, origin, path, params, '')) From fd6754b049e7e6b12adc4a461092ef61cd955501 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 12 Nov 2025 12:12:37 +0200 Subject: [PATCH 05/12] test: fix issues causing tests fail --- pubnub/endpoints/presence/leave.py | 4 ++-- pubnub/endpoints/pubsub/subscribe.py | 4 ++-- pytest.ini | 2 ++ 3 files changed, 6 insertions(+), 4 deletions(-) create mode 100644 pytest.ini diff --git a/pubnub/endpoints/presence/leave.py b/pubnub/endpoints/presence/leave.py index cefa5aa5..88e4a40f 100644 --- a/pubnub/endpoints/presence/leave.py +++ b/pubnub/endpoints/presence/leave.py @@ -54,10 +54,10 @@ def is_auth_required(self): return True def affected_channels(self): - return list(self._channels) + return sorted(self._channels) def affected_channels_groups(self): - return list(self._groups) + return sorted(self._groups) def request_timeout(self): return self.pubnub.config.non_subscribe_request_timeout diff --git a/pubnub/endpoints/pubsub/subscribe.py b/pubnub/endpoints/pubsub/subscribe.py index 277e5562..d616fcf3 100644 --- a/pubnub/endpoints/pubsub/subscribe.py +++ b/pubnub/endpoints/pubsub/subscribe.py @@ -108,10 +108,10 @@ def is_auth_required(self): return True def affected_channels(self): - return list(self._channels) + return sorted(self._channels) def affected_channels_groups(self): - return list(self._groups) + return sorted(self._groups) def request_timeout(self): return self.pubnub.config.subscribe_request_timeout diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..d280de04 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +asyncio_mode = auto \ No newline at end of file From 18fdf5c5bce593760be199280cf8f04059407658 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 12 Nov 2025 13:19:55 +0200 Subject: [PATCH 06/12] test(acceptance): define missing steps --- pytest.ini | 2 - tests/acceptance/pam/steps/then_steps.py | 76 +++++++++++++++++++ .../integrational/asyncio/test_change_uuid.py | 14 +++- .../asyncio/test_message_count.py | 18 ++++- 4 files changed, 103 insertions(+), 7 deletions(-) delete mode 100644 pytest.ini diff --git a/pytest.ini b/pytest.ini deleted file mode 100644 index d280de04..00000000 --- a/pytest.ini +++ /dev/null @@ -1,2 +0,0 @@ -[pytest] -asyncio_mode = auto \ No newline at end of file diff --git a/tests/acceptance/pam/steps/then_steps.py b/tests/acceptance/pam/steps/then_steps.py index 6f3d4b8a..89593a56 100644 --- a/tests/acceptance/pam/steps/then_steps.py +++ b/tests/acceptance/pam/steps/then_steps.py @@ -1,5 +1,7 @@ import json from behave import then +from soupsieve.util import lower + from pubnub.exceptions import PubNubException @@ -18,6 +20,11 @@ def step_impl(context, channel): context.token_resource = context.parsed_token["resources"]["channels"].get(channel.strip("'")) assert context.token_resource +@then("token {data_type} permission {permission}") +def step_impl(context, data_type, permission): + assert context.token_resource + assert context.token_resource[lower(permission)] == True + @then("the token contains the authorized UUID {test_uuid}") def step_impl(context, test_uuid): @@ -80,6 +87,75 @@ def step_impl(context): context.pam_call_error = json.loads(context.pam_call_result._errormsg) +@then("the error status code is {error_code}") +def step_impl(context, error_code): + assert context.pam_call_error['status'] == int(error_code) + + +@then("the auth error message is '{error_message}'") +@then("the error message is '{error_message}'") +def step_impl(context, error_message): + if 'message' in context.pam_call_error: + assert context.pam_call_error['message'] == error_message + elif 'error' in context.pam_call_error and 'message' in context.pam_call_error['error']: + assert context.pam_call_error['error']['message'] == error_message + else: + raise AssertionError("Unexpected payload: {}".format(context.pam_call_error)) + + +@then("the error detail message is not empty") +def step_impl(context, details_message): + if 'error' in context.pam_call_error and 'details' in context.pam_call_error['error']: + assert len(context.pam_call_error['error']['details']) > 0 + assert 'message' in context.pam_call_error['error']['details'][0] + assert len(context.pam_call_error['error']['details'][0]['message']) > 0 + else: + raise AssertionError("Unexpected payload: {}".format(context.pam_call_error)) + + +@then("the error detail message is '{details_message}'") +def step_impl(context, details_message): + if 'error' in context.pam_call_error and 'details' in context.pam_call_error['error']: + assert len(context.pam_call_error['error']['details']) > 0 + assert 'message' in context.pam_call_error['error']['details'][0] + assert context.pam_call_error['error']['details'][0]['message'] == details_message + else: + raise AssertionError("Unexpected payload: {}".format(context.pam_call_error)) + + +@then("the error detail location is '{details_location}'") +def step_impl(context, details_location): + if 'error' in context.pam_call_error and 'details' in context.pam_call_error['error']: + assert len(context.pam_call_error['error']['details']) > 0 + assert 'location' in context.pam_call_error['error']['details'][0] + assert context.pam_call_error['error']['details'][0]['location'] == details_location + else: + raise AssertionError("Unexpected payload: {}".format(context.pam_call_error)) + + +@then("the error detail location type is '{details_location_type}'") +def step_impl(context, details_location_type): + if 'error' in context.pam_call_error and 'details' in context.pam_call_error['error']: + assert len(context.pam_call_error['error']['details']) > 0 + assert 'locationType' in context.pam_call_error['error']['details'][0] + assert context.pam_call_error['error']['details'][0]['locationType'] == details_location_type + else: + raise AssertionError("Unexpected payload: {}".format(context.pam_call_error)) + + +@then("the error service is '{error_service}'") +def step_impl(context, error_service): + assert context.pam_call_error['service'] == error_service + + +@then("the error source is '{error_source}'") +def step_impl(context, error_source): + if 'error' in context.pam_call_error and 'source' in context.pam_call_error['error']: + assert context.pam_call_error['error']['source'] == error_source + else: + raise AssertionError("Unexpected payload: {}".format(context.pam_call_error)) + + @then("the result is successful") def step_impl(context): assert context.publish_result.result.timetoken diff --git a/tests/integrational/asyncio/test_change_uuid.py b/tests/integrational/asyncio/test_change_uuid.py index e545d396..6458557c 100644 --- a/tests/integrational/asyncio/test_change_uuid.py +++ b/tests/integrational/asyncio/test_change_uuid.py @@ -1,3 +1,4 @@ +import asyncio import pytest from pubnub.models.consumer.signal import PNSignalResult @@ -52,12 +53,21 @@ async def test_change_uuid_no_lock(): assert isinstance(envelope.status, PNStatus) -def test_uuid_validation_at_init(): +@pytest.fixture +def event_loop(): + loop = asyncio.new_event_loop() + try: + yield loop + finally: + loop.run_until_complete(asyncio.sleep(0)) + loop.close() + +def test_uuid_validation_at_init(event_loop): with pytest.raises(AssertionError) as exception: pnconf = PNConfiguration() pnconf.publish_key = "demo" pnconf.subscribe_key = "demo" - PubNubAsyncio(pnconf) + PubNubAsyncio(pnconf, custom_event_loop=event_loop) assert str(exception.value) == 'UUID missing or invalid type' diff --git a/tests/integrational/asyncio/test_message_count.py b/tests/integrational/asyncio/test_message_count.py index c5c714fe..33ed768f 100644 --- a/tests/integrational/asyncio/test_message_count.py +++ b/tests/integrational/asyncio/test_message_count.py @@ -1,3 +1,4 @@ +import asyncio import pytest from pubnub.pubnub_asyncio import PubNubAsyncio @@ -7,13 +8,24 @@ from tests.helper import pnconf_mc_copy from tests.integrational.vcr_helper import pn_vcr +@pytest.fixture +def event_loop(): + loop = asyncio.new_event_loop() + try: + yield loop + finally: + loop.run_until_complete(asyncio.sleep(0)) + loop.close() @pytest.fixture -def pn(): +def pn(event_loop): config = pnconf_mc_copy() config.enable_subscribe = False - pn = PubNubAsyncio(config) - yield pn + pn = PubNubAsyncio(config, custom_event_loop=event_loop) + try: + yield pn + finally: + event_loop.run_until_complete(pn.stop()) @pn_vcr.use_cassette( From 496bea24af6023bae6f11ee81d8db3e8c6d8f700 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 12 Nov 2025 13:25:56 +0200 Subject: [PATCH 07/12] test: remove 'lower' --- tests/acceptance/pam/steps/then_steps.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/acceptance/pam/steps/then_steps.py b/tests/acceptance/pam/steps/then_steps.py index 89593a56..ba453788 100644 --- a/tests/acceptance/pam/steps/then_steps.py +++ b/tests/acceptance/pam/steps/then_steps.py @@ -1,6 +1,5 @@ import json from behave import then -from soupsieve.util import lower from pubnub.exceptions import PubNubException @@ -23,7 +22,7 @@ def step_impl(context, channel): @then("token {data_type} permission {permission}") def step_impl(context, data_type, permission): assert context.token_resource - assert context.token_resource[lower(permission)] == True + assert context.token_resource[permission.lower()] == True @then("the token contains the authorized UUID {test_uuid}") From 717156ef47466d921999ab5c7f619d05f7a41681 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 12 Nov 2025 13:57:04 +0200 Subject: [PATCH 08/12] test(acceptance): fix implementation declaration --- tests/acceptance/pam/steps/then_steps.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/acceptance/pam/steps/then_steps.py b/tests/acceptance/pam/steps/then_steps.py index ba453788..99c7ec38 100644 --- a/tests/acceptance/pam/steps/then_steps.py +++ b/tests/acceptance/pam/steps/then_steps.py @@ -103,7 +103,7 @@ def step_impl(context, error_message): @then("the error detail message is not empty") -def step_impl(context, details_message): +def step_impl(context): if 'error' in context.pam_call_error and 'details' in context.pam_call_error['error']: assert len(context.pam_call_error['error']['details']) > 0 assert 'message' in context.pam_call_error['error']['details'][0] From 0ca20f1f33eee180699cd2a49a9f1a63dfbeb4c2 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 12 Nov 2025 14:08:51 +0200 Subject: [PATCH 09/12] test(asyncio): set current loop --- tests/integrational/asyncio/test_change_uuid.py | 2 ++ tests/integrational/asyncio/test_message_count.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/tests/integrational/asyncio/test_change_uuid.py b/tests/integrational/asyncio/test_change_uuid.py index 6458557c..1b349574 100644 --- a/tests/integrational/asyncio/test_change_uuid.py +++ b/tests/integrational/asyncio/test_change_uuid.py @@ -56,10 +56,12 @@ async def test_change_uuid_no_lock(): @pytest.fixture def event_loop(): loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) try: yield loop finally: loop.run_until_complete(asyncio.sleep(0)) + asyncio.set_event_loop(None) loop.close() def test_uuid_validation_at_init(event_loop): diff --git a/tests/integrational/asyncio/test_message_count.py b/tests/integrational/asyncio/test_message_count.py index 33ed768f..a47191af 100644 --- a/tests/integrational/asyncio/test_message_count.py +++ b/tests/integrational/asyncio/test_message_count.py @@ -11,10 +11,12 @@ @pytest.fixture def event_loop(): loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) try: yield loop finally: loop.run_until_complete(asyncio.sleep(0)) + asyncio.set_event_loop(None) loop.close() @pytest.fixture From e36136f13e9d19aaef06610a3d5c8bf8ab51a84c Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Wed, 12 Nov 2025 14:39:22 +0200 Subject: [PATCH 10/12] test: remove legacy key from behave --- .github/workflows/run-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 32dbcd60..7070c6a9 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -87,7 +87,7 @@ jobs: pip3 install --user --ignore-installed -r requirements-dev.txt behave --junit tests/acceptance/pam - behave --junit tests/acceptance/encryption/cryptor-module.feature -t=~na=python -k + behave --junit tests/acceptance/encryption/cryptor-module.feature -t=~na=python behave --junit tests/acceptance/subscribe - name: Expose acceptance tests reports uses: actions/upload-artifact@v4 From 1c37a158b596a613527a26363e73c17ce3a9eaf7 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Thu, 13 Nov 2025 00:35:13 +0200 Subject: [PATCH 11/12] test(acceptance): add missing steps Add missing presence and subscribe acceptance test steps definition. refactor(subscribe): check for `t` presence in response Refactor `handshake` and `receive` subscribe `effects` to check for cursor presence in response (adjustment for acceptance tests server). --- pubnub/event_engine/effects.py | 4 ++-- tests/acceptance/subscribe/steps/then_steps.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pubnub/event_engine/effects.py b/pubnub/event_engine/effects.py index e14e7e86..d7c0b28d 100644 --- a/pubnub/event_engine/effects.py +++ b/pubnub/event_engine/effects.py @@ -88,7 +88,7 @@ async def handshake_async(self, channels, groups, stop_event, timetoken: int = 0 self.logger.warning(f'Handshake failed: {response.status.error_data.__dict__}') handshake_failure = events.HandshakeFailureEvent(response.status.error_data, 1, timetoken=timetoken) self.event_engine.trigger(handshake_failure) - else: + elif 't' in response.result: cursor = response.result['t'] timetoken = timetoken if timetoken > 0 else cursor['t'] region = cursor['r'] @@ -134,7 +134,7 @@ async def receive_messages_async(self, channels, groups, timetoken, region): self.logger.warning(f'Recieve messages failed: {response.status.error_data.__dict__}') recieve_failure = events.ReceiveFailureEvent(response.status.error_data, 1, timetoken=timetoken) self.event_engine.trigger(recieve_failure) - else: + elif 't' in response.result: cursor = response.result['t'] timetoken = cursor['t'] region = cursor['r'] diff --git a/tests/acceptance/subscribe/steps/then_steps.py b/tests/acceptance/subscribe/steps/then_steps.py index b97d7940..60e9187e 100644 --- a/tests/acceptance/subscribe/steps/then_steps.py +++ b/tests/acceptance/subscribe/steps/then_steps.py @@ -25,6 +25,7 @@ async def step_impl(ctx: PNContext): await ctx.pubnub.stop() +@then("I observe the following:") @then("I observe the following") @async_run_until_complete async def step_impl(ctx): @@ -74,6 +75,7 @@ async def step_impl(ctx: PNContext, wait_time: str): await asyncio.sleep(int(wait_time)) +@then(u'I observe the following Events and Invocations of the Presence EE:') @then(u'I observe the following Events and Invocations of the Presence EE') @async_run_until_complete async def step_impl(ctx): From 2b28a0cc98fed0a4d395299e41766c50282fad51 Mon Sep 17 00:00:00 2001 From: Serhii Mamontov Date: Thu, 13 Nov 2025 00:44:58 +0200 Subject: [PATCH 12/12] refactor(lint): fix issue reported by linter --- tests/acceptance/pam/steps/then_steps.py | 3 ++- tests/integrational/asyncio/test_change_uuid.py | 1 + tests/integrational/asyncio/test_message_count.py | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/acceptance/pam/steps/then_steps.py b/tests/acceptance/pam/steps/then_steps.py index 99c7ec38..6a1a4ff1 100644 --- a/tests/acceptance/pam/steps/then_steps.py +++ b/tests/acceptance/pam/steps/then_steps.py @@ -19,10 +19,11 @@ def step_impl(context, channel): context.token_resource = context.parsed_token["resources"]["channels"].get(channel.strip("'")) assert context.token_resource + @then("token {data_type} permission {permission}") def step_impl(context, data_type, permission): assert context.token_resource - assert context.token_resource[permission.lower()] == True + assert context.token_resource[permission.lower()] @then("the token contains the authorized UUID {test_uuid}") diff --git a/tests/integrational/asyncio/test_change_uuid.py b/tests/integrational/asyncio/test_change_uuid.py index 1b349574..b3bfb49e 100644 --- a/tests/integrational/asyncio/test_change_uuid.py +++ b/tests/integrational/asyncio/test_change_uuid.py @@ -64,6 +64,7 @@ def event_loop(): asyncio.set_event_loop(None) loop.close() + def test_uuid_validation_at_init(event_loop): with pytest.raises(AssertionError) as exception: pnconf = PNConfiguration() diff --git a/tests/integrational/asyncio/test_message_count.py b/tests/integrational/asyncio/test_message_count.py index a47191af..8a6555f4 100644 --- a/tests/integrational/asyncio/test_message_count.py +++ b/tests/integrational/asyncio/test_message_count.py @@ -8,6 +8,7 @@ from tests.helper import pnconf_mc_copy from tests.integrational.vcr_helper import pn_vcr + @pytest.fixture def event_loop(): loop = asyncio.new_event_loop() @@ -19,6 +20,7 @@ def event_loop(): asyncio.set_event_loop(None) loop.close() + @pytest.fixture def pn(event_loop): config = pnconf_mc_copy()