Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:

pip3 install --user --ignore-installed -r requirements-dev.txt
behave --junit tests/acceptance/pam
behave --junit tests/acceptance/encryption/cryptor-module.feature -t=~na=python -k
behave --junit tests/acceptance/encryption/cryptor-module.feature -t=~na=python
behave --junit tests/acceptance/subscribe
- name: Expose acceptance tests reports
uses: actions/upload-artifact@v4
Expand Down
18 changes: 9 additions & 9 deletions pubnub/endpoints/presence/heartbeat.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Optional, Union, List
from typing import Dict, Optional, Union, List, Set
from pubnub import utils
from pubnub.endpoints.endpoint import Endpoint
from pubnub.enums import HttpMethod, PNOperationType
Expand All @@ -13,22 +13,22 @@ class Heartbeat(Endpoint):
def __init__(self, pubnub, channels: Union[str, List[str]] = None, channel_groups: Union[str, List[str]] = None,
state: Optional[Dict[str, any]] = None):
super(Heartbeat, self).__init__(pubnub)
self._channels = []
self._groups = []
self._channels: Set[str] = set()
self._groups: Set[str] = set()
if channels:
utils.extend_list(self._channels, channels)
utils.update_set(self._channels, channels)

if channel_groups:
utils.extend_list(self._groups, channel_groups)
utils.update_set(self._groups, channel_groups)

self._state = state

def channels(self, channels: Union[str, List[str]]) -> 'Heartbeat':
utils.extend_list(self._channels, channels)
utils.update_set(self._channels, channels)
return self

def channel_groups(self, channel_groups: Union[str, List[str]]) -> 'Heartbeat':
utils.extend_list(self._groups, channel_groups)
utils.update_set(self._groups, channel_groups)
return self

def state(self, state: Dict[str, any]) -> 'Heartbeat':
Expand All @@ -46,14 +46,14 @@ def validate_params(self):
raise PubNubException(pn_error=PNERR_CHANNEL_OR_GROUP_MISSING)

def build_path(self):
channels = utils.join_channels(self._channels)
channels = utils.join_channels(self._channels, True)
return Heartbeat.HEARTBEAT_PATH % (self.pubnub.config.subscribe_key, channels)

def custom_params(self):
params = {'heartbeat': str(self.pubnub.config.presence_timeout)}

if len(self._groups) > 0:
params['channel-group'] = utils.join_items(self._groups)
params['channel-group'] = utils.join_items(self._groups, True)

if self._state is not None and len(self._state) > 0:
params['state'] = utils.url_write(self._state)
Expand Down
15 changes: 14 additions & 1 deletion pubnub/endpoints/presence/here_now.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ def __init__(self, pubnub, channels: Union[str, List[str]] = None, channel_group

self._include_state = include_state
self._include_uuids = include_uuids
self._offset = None
self._limit = 1000

def channels(self, channels: Union[str, List[str]]) -> 'HereNow':
utils.extend_list(self._channels, channels)
Expand All @@ -46,8 +48,16 @@ def include_uuids(self, include_uuids) -> 'HereNow':
self._include_uuids = include_uuids
return self

def limit(self, limit: int) -> 'HereNow':
self._limit = limit
return self

def offset(self, offset: int) -> 'HereNow':
self._offset = offset
return self

def custom_params(self):
params = {}
params = {'limit': self._limit}

if len(self._channel_groups) > 0:
params['channel-group'] = utils.join_items_and_encode(self._channel_groups)
Expand All @@ -58,6 +68,9 @@ def custom_params(self):
if not self._include_uuids:
params['disable_uuids'] = "1"

if self._offset is not None:
params['offset'] = self._offset

return params

def build_path(self):
Expand Down
30 changes: 12 additions & 18 deletions pubnub/endpoints/presence/leave.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Set, Union, List

from pubnub import utils
from pubnub.endpoints.endpoint import Endpoint
from pubnub.errors import PNERR_CHANNEL_OR_GROUP_MISSING
Expand All @@ -11,38 +13,30 @@ class Leave(Endpoint):

def __init__(self, pubnub):
Endpoint.__init__(self, pubnub)
self._channels = []
self._groups = []

def channels(self, channels):
if isinstance(channels, (list, tuple)):
self._channels.extend(channels)
else:
self._channels.extend(utils.split_items(channels))
self._channels: Set[str] = set()
self._groups: Set[str] = set()

def channels(self, channels: Union[str, List[str]]) -> 'Leave':
utils.update_set(self._channels, channels)
return self

def channel_groups(self, channel_groups):
if isinstance(channel_groups, (list, tuple)):
self._groups.extend(channel_groups)
else:
self._groups.extend(utils.split_items(channel_groups))

def channel_groups(self, channel_groups: Union[str, List[str]]) -> 'Leave':
utils.update_set(self._groups, channel_groups)
return self

def custom_params(self):
params = {}

if len(self._groups) > 0:
params['channel-group'] = utils.join_items(self._groups)
params['channel-group'] = utils.join_items(self._groups, True)

if hasattr(self.pubnub, '_subscription_manager'):
params.update(self.pubnub._subscription_manager.get_custom_params())

return params

def build_path(self):
return Leave.LEAVE_PATH % (self.pubnub.config.subscribe_key, utils.join_channels(self._channels))
return Leave.LEAVE_PATH % (self.pubnub.config.subscribe_key, utils.join_channels(self._channels, True))

def http_method(self):
return HttpMethod.GET
Expand All @@ -60,10 +54,10 @@ def is_auth_required(self):
return True

def affected_channels(self):
return self._channels
return sorted(self._channels)

def affected_channels_groups(self):
return self._groups
return sorted(self._groups)

def request_timeout(self):
return self.pubnub.config.non_subscribe_request_timeout
Expand Down
22 changes: 11 additions & 11 deletions pubnub/endpoints/pubsub/subscribe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional, Union, List
from typing import Optional, Union, List, Set
from pubnub import utils
from pubnub.endpoints.endpoint import Endpoint
from pubnub.enums import HttpMethod, PNOperationType
Expand All @@ -25,12 +25,12 @@ def __init__(self, pubnub, channels: Union[str, List[str]] = None,
with_presence: Optional[str] = None, state: Optional[str] = None):

super(Subscribe, self).__init__(pubnub)
self._channels = []
self._channels: Set[str] = set()
self._groups: Set[str] = set()
if channels:
utils.extend_list(self._channels, channels)
self._groups = []
utils.update_set(self._channels, channels)
if groups:
utils.extend_list(self._groups, groups)
utils.update_set(self._groups, groups)

self._region = region
self._filter_expression = filter_expression
Expand All @@ -39,11 +39,11 @@ def __init__(self, pubnub, channels: Union[str, List[str]] = None,
self._state = state

def channels(self, channels: Union[str, List[str]]) -> 'Subscribe':
utils.extend_list(self._channels, channels)
utils.update_set(self._channels, channels)
return self

def channel_groups(self, groups: Union[str, List[str]]) -> 'Subscribe':
utils.extend_list(self._groups, groups)
utils.update_set(self._groups, groups)
return self

def timetoken(self, timetoken) -> 'Subscribe':
Expand Down Expand Up @@ -72,14 +72,14 @@ def validate_params(self):
raise PubNubException(pn_error=PNERR_CHANNEL_OR_GROUP_MISSING)

def build_path(self):
channels = utils.join_channels(self._channels)
channels = utils.join_channels(self._channels, True)
return Subscribe.SUBSCRIBE_PATH % (self.pubnub.config.subscribe_key, channels)

def custom_params(self):
params = {}

if len(self._groups) > 0:
params['channel-group'] = utils.join_items_and_encode(self._groups)
params['channel-group'] = utils.join_items_and_encode(self._groups, True)

if self._filter_expression is not None and len(self._filter_expression) > 0:
params['filter-expr'] = utils.url_encode(self._filter_expression)
Expand Down Expand Up @@ -108,10 +108,10 @@ def is_auth_required(self):
return True

def affected_channels(self):
return self._channels
return sorted(self._channels)

def affected_channels_groups(self):
return self._groups
return sorted(self._groups)

def request_timeout(self):
return self.pubnub.config.subscribe_request_timeout
Expand Down
4 changes: 2 additions & 2 deletions pubnub/event_engine/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async def handshake_async(self, channels, groups, stop_event, timetoken: int = 0
self.logger.warning(f'Handshake failed: {response.status.error_data.__dict__}')
handshake_failure = events.HandshakeFailureEvent(response.status.error_data, 1, timetoken=timetoken)
self.event_engine.trigger(handshake_failure)
else:
elif 't' in response.result:
cursor = response.result['t']
timetoken = timetoken if timetoken > 0 else cursor['t']
region = cursor['r']
Expand Down Expand Up @@ -134,7 +134,7 @@ async def receive_messages_async(self, channels, groups, timetoken, region):
self.logger.warning(f'Recieve messages failed: {response.status.error_data.__dict__}')
recieve_failure = events.ReceiveFailureEvent(response.status.error_data, 1, timetoken=timetoken)
self.event_engine.trigger(recieve_failure)
else:
elif 't' in response.result:
cursor = response.result['t']
timetoken = cursor['t']
region = cursor['r']
Expand Down
2 changes: 1 addition & 1 deletion pubnub/event_engine/models/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ def reconnect_failure(self, event: events.ReceiveReconnectFailureEvent, context:
return PNTransition(
state=ReceiveReconnectingState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.UnexpectedDisconnectCategory,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNUnexpectedDisconnectCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context)
)
Expand Down
20 changes: 14 additions & 6 deletions pubnub/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import warnings

from hashlib import sha256
from typing import Set, List, Union

from pubnub.enums import PNStatusCategory, PNOperationType, PNPushType, HttpMethod, PAMPermissions
from pubnub.models.consumer.common import PNStatus
Expand Down Expand Up @@ -54,19 +55,19 @@ def split_items(items_string):
return items_string.split(",")


def join_items(items_list):
return ",".join(items_list)
def join_items(items_list, sort_items=False):
return ",".join(sorted(items_list) if sort_items else items_list)


def join_items_and_encode(items_list):
return ",".join(url_encode(x) for x in items_list)
def join_items_and_encode(items_list, sort_items=False):
return ",".join(url_encode(x) for x in (sorted(items_list) if sort_items else items_list))


def join_channels(items_list):
def join_channels(items_list, sort_items=False):
if len(items_list) == 0:
return ","
else:
return join_items_and_encode(items_list)
return join_items_and_encode(items_list, sort_items)


def extend_list(existing_items, new_items):
Expand All @@ -76,6 +77,13 @@ def extend_list(existing_items, new_items):
existing_items.extend(new_items)


def update_set(existing_items: Set[str], new_items: Union[str, List[str]]):
if isinstance(new_items, str):
existing_items.update(split_items(new_items))
else:
existing_items.update(new_items)


def build_url(scheme, origin, path, params={}):
return urllib.parse.urlunsplit((scheme, origin, path, params, ''))

Expand Down
Loading
Loading