@@ -35,6 +35,8 @@ class CompressionMode(Enum):
3535class WorkerBase (object , metaclass = abc .ABCMeta ):
3636 """Worker template."""
3737
38+ _fetching_segment = "Fetching new segment {segment_name}"
39+
3840 @abc .abstractmethod
3941 def is_running (self ):
4042 """Return whether the working is running."""
@@ -226,27 +228,30 @@ def _apply_iff_if_needed(self, event):
226228 segment_list = update_feature_flag_storage (self ._feature_flag_storage , [new_feature_flag ], event .change_number )
227229 for segment_name in segment_list :
228230 if self ._segment_storage .get (segment_name ) is None :
229- _LOGGER .debug ('Fetching new segment %s' , segment_name )
231+ _LOGGER .debug (self . _fetching_segment . format ( segment_name = segment_name ) )
230232 self ._segment_handler (segment_name , event .change_number )
231233
232234 referenced_rbs = self ._get_referenced_rbs (new_feature_flag )
233- if len (referenced_rbs ) > 0 and not self ._rule_based_segment_storage .contains (referenced_rbs ):
234- _LOGGER .debug ('Fetching new rule based segment(s) %s' , referenced_rbs )
235- self ._handler (None , event .change_number )
235+ self ._fetch_rbs_segment_if_needed (referenced_rbs , event )
236236 self ._telemetry_runtime_producer .record_update_from_sse (UpdateFromSSE .SPLIT_UPDATE )
237237 else :
238238 new_rbs = rbs_from_raw (json .loads (self ._get_object_definition (event )))
239239 segment_list = update_rule_based_segment_storage (self ._rule_based_segment_storage , [new_rbs ], event .change_number )
240240 for segment_name in segment_list :
241241 if self ._segment_storage .get (segment_name ) is None :
242- _LOGGER .debug ('Fetching new segment %s' , segment_name )
242+ _LOGGER .debug (self . _fetching_segment . format ( segment_name = segment_name ) )
243243 self ._segment_handler (segment_name , event .change_number )
244244 self ._telemetry_runtime_producer .record_update_from_sse (UpdateFromSSE .RBS_UPDATE )
245245 return True
246246
247247 except Exception as e :
248248 raise SplitStorageException (e )
249249
250+ def _fetch_rbs_segment_if_needed (self , referenced_rbs , event ):
251+ if len (referenced_rbs ) > 0 and not self ._rule_based_segment_storage .contains (referenced_rbs ):
252+ _LOGGER .debug ('Fetching new rule based segment(s) %s' , referenced_rbs )
253+ self ._handler (None , event .change_number )
254+
250255 def _check_instant_ff_update (self , event ):
251256 if event .update_type == UpdateType .SPLIT_UPDATE and event .compression is not None and event .previous_change_number == self ._feature_flag_storage .get_change_number ():
252257 return True
@@ -264,16 +269,15 @@ def _run(self):
264269 break
265270 if event == self ._centinel :
266271 continue
272+
267273 _LOGGER .debug ('Processing feature flag update %d' , event .change_number )
268274 try :
269275 if self ._apply_iff_if_needed (event ):
270276 continue
277+
271278 till = None
272279 rbs_till = None
273- if event .update_type == UpdateType .SPLIT_UPDATE :
274- till = event .change_number
275- else :
276- rbs_till = event .change_number
280+ till , rbs_till = self ._check_update_type (till , rbs_till , event )
277281 sync_result = self ._handler (till , rbs_till )
278282 if not sync_result .success and sync_result .error_code is not None and sync_result .error_code == 414 :
279283 _LOGGER .error ("URI too long exception caught, sync failed" )
@@ -288,6 +292,14 @@ def _run(self):
288292 _LOGGER .error ('Exception raised in feature flag synchronization' )
289293 _LOGGER .debug ('Exception information: ' , exc_info = True )
290294
295+ def _check_update_type (self , till , rbs_till , event ):
296+ if event .update_type == UpdateType .SPLIT_UPDATE :
297+ till = event .change_number
298+ else :
299+ rbs_till = event .change_number
300+
301+ return till , rbs_till
302+
291303 def start (self ):
292304 """Start worker."""
293305 if self .is_running ():
@@ -354,27 +366,30 @@ async def _apply_iff_if_needed(self, event):
354366 segment_list = await update_feature_flag_storage_async (self ._feature_flag_storage , [new_feature_flag ], event .change_number )
355367 for segment_name in segment_list :
356368 if await self ._segment_storage .get (segment_name ) is None :
357- _LOGGER .debug ('Fetching new segment %s' , segment_name )
369+ _LOGGER .debug (self . _fetching_segment . format ( segment_name = segment_name ) )
358370 await self ._segment_handler (segment_name , event .change_number )
359371
360372 referenced_rbs = self ._get_referenced_rbs (new_feature_flag )
361- if len (referenced_rbs ) > 0 and not await self ._rule_based_segment_storage .contains (referenced_rbs ):
362- await self ._handler (None , event .change_number )
363-
373+ await self ._fetch_rbs_segment_if_needed (referenced_rbs , event )
364374 await self ._telemetry_runtime_producer .record_update_from_sse (UpdateFromSSE .SPLIT_UPDATE )
365375 else :
366376 new_rbs = rbs_from_raw (json .loads (self ._get_object_definition (event )))
367377 segment_list = await update_rule_based_segment_storage_async (self ._rule_based_segment_storage , [new_rbs ], event .change_number )
368378 for segment_name in segment_list :
369379 if await self ._segment_storage .get (segment_name ) is None :
370- _LOGGER .debug ('Fetching new segment %s' , segment_name )
380+ _LOGGER .debug (self . _fetching_segment . format ( segment_name = segment_name ) )
371381 await self ._segment_handler (segment_name , event .change_number )
372382 await self ._telemetry_runtime_producer .record_update_from_sse (UpdateFromSSE .RBS_UPDATE )
373383 return True
374384
375385 except Exception as e :
376386 raise SplitStorageException (e )
377387
388+ async def _fetch_rbs_segment_if_needed (self , referenced_rbs , event ):
389+ if len (referenced_rbs ) > 0 and not await self ._rule_based_segment_storage .contains (referenced_rbs ):
390+ _LOGGER .debug ('Fetching new rule based segment(s) %s' , referenced_rbs )
391+ await self ._handler (None , event .change_number )
392+
378393 async def _check_instant_ff_update (self , event ):
379394 if event .update_type == UpdateType .SPLIT_UPDATE and event .compression is not None and event .previous_change_number == await self ._feature_flag_storage .get_change_number ():
380395 return True
0 commit comments