Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions custom-recipes/pi-system-retrieve-list/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
get_credentials, get_interpolated_parameters, normalize_af_path,
get_combined_description, get_base_for_data_type, check_debug_mode,
PerformanceTimer, get_max_count, check_must_convert_object_to_string,
convert_schema_objects_to_string, get_summary_parameters, get_advanced_parameters
convert_schema_objects_to_string, get_summary_parameters, get_advanced_parameters,
get_batch_parameters
)
from osisoft_client import OSIsoftClient
from osisoft_constants import OSIsoftConstants
Expand Down Expand Up @@ -63,6 +64,8 @@ def get_step_value(item):
record_boundary_type = config.get("record_boundary_type") if data_type == "RecordedData" else None
summary_type, summary_duration = get_summary_parameters(config)
do_duplicate_input_row = config.get("do_duplicate_input_row", False)
max_request_size, estimated_density, maximum_points_returned = get_batch_parameters(config)
max_time_to_retrieve_per_batch = estimated_density / maximum_points_returned #density per hour <- max time is in hour

network_timer = PerformanceTimer()
processing_timer = PerformanceTimer()
Expand Down Expand Up @@ -150,7 +153,9 @@ def get_step_value(item):
object_id=object_id,
summary_type=summary_type,
summary_duration=summary_duration,
endpoint_type="AF"
endpoint_type="AF",
estimated_density=estimated_density,
maximum_points_returned=maximum_points_returned
)
batch_buffer_size = 0
buffer = []
Expand Down
14 changes: 11 additions & 3 deletions python-lib/osisoft_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from osisoft_plugin_common import (
assert_server_url_ok, build_requests_params,
is_filtered_out, is_server_throttling, escape, epoch_to_iso,
iso_to_epoch, RecordsLimit, is_iso8601, get_next_page_url, change_key_in_dict
iso_to_epoch, RecordsLimit, is_iso8601, get_next_page_url, change_key_in_dict,
BatchTimeCounter
)
from osisoft_pagination import OffsetPagination
from safe_logger import SafeLogger
Expand Down Expand Up @@ -243,7 +244,10 @@ def get_rows_from_webid(self, webid, data_type, **kwargs):
def get_rows_from_webids(self, input_rows, data_type, **kwargs):
endpoint_type = kwargs.get("endpoint_type", "event_frames")
batch_size = kwargs.get("batch_size", 500)

estimated_density = kwargs.get("estimated_density", 500)
maximum_points_returned = kwargs.get("maximum_points_returned", 500)
max_time_to_retrieve_per_batch = maximum_points_returned / estimated_density
batch_time = BatchTimeCounter(max_time_to_retrieve_per_batch)
batch_requests_parameters = []
number_processed_webids = 0
number_of_webids_to_process = len(input_rows)
Expand All @@ -259,14 +263,18 @@ def get_rows_from_webids(self, input_rows, data_type, **kwargs):
else:
webid = input_row
url = self.endpoint.get_data_from_webid_url(endpoint_type, data_type, webid)
start_date = kwargs.get("start_date")
end_date = kwargs.get("end_date")
interval = kwargs.get("interval")
requests_kwargs = self.generic_get_kwargs(**kwargs)
batch_time.add(start_date, end_date, interval)
requests_kwargs['url'] = build_query_string(url, requests_kwargs.get("params"))
web_ids.append(webid)
event_start_times.append(event_start_time)
event_end_times.append(event_end_time)
batch_requests_parameters.append(requests_kwargs)
number_processed_webids += 1
if (len(batch_requests_parameters) >= batch_size) or (number_processed_webids == number_of_webids_to_process):
if (len(batch_requests_parameters) >= batch_size) or (number_processed_webids == number_of_webids_to_process) or batch_time.is_batch_full():
json_responses = self._batch_requests(batch_requests_parameters)
batch_requests_parameters = []
response_index = 0
Expand Down
21 changes: 9 additions & 12 deletions python-lib/osisoft_plugin_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,9 @@ def epoch_to_iso(epoch):


def iso_to_epoch(iso_timestamp):
logger.info("Converting iso timestamp '{}' to epoch".format(iso_timestamp))
# logger.info("Converting iso timestamp '{}' to epoch".format(iso_timestamp))
if is_epoch(iso_timestamp):
logger.info("Timestamp is already epoch")
# logger.info("Timestamp is already epoch")
return iso_timestamp
epoch_timestamp = None
try:
Expand All @@ -453,7 +453,7 @@ def iso_to_epoch(iso_timestamp):
except Exception:
logger.error("Error when converting iso timestamp '{}' to epoch".format(iso_timestamp))
return None
logger.info("Timestamp is now '{}'".format(epoch_timestamp))
# logger.info("Timestamp is now '{}'".format(epoch_timestamp))
return epoch_timestamp


Expand Down Expand Up @@ -619,21 +619,18 @@ def get_worst_performers(self):

class BatchTimeCounter(object):
def __init__(self, max_time_to_retrieve_per_batch):
logger.info("BatchTimeCounter:max_time_to_retrieve_per_batch={}s".format(max_time_to_retrieve_per_batch * 60 * 60))
self.max_time_to_retrieve_per_batch = max_time_to_retrieve_per_batch * 60 * 60
self.total_batch_time = 0
# 2 points /h each line
# max 1 000 000 lines back -> 500k hours max
self.total_batched_time = 0

def is_batch_full(self):
# return False
if self.max_time_to_retrieve_per_batch < 0:
return False
if self.total_batch_time > self.max_time_to_retrieve_per_batch:
logger.warning("batch contains {}s of request, needs to flush now".format(self.total_batch_time))
self.total_batch_time = 0
if self.total_batched_time > self.max_time_to_retrieve_per_batch:
logger.warning("batch contains {}s of request, needs to flush now".format(self.total_batched_time))
self.total_batched_time = 0
return True
logger.info("Batch below time threshold")
return False

def add(self, start_time, end_time, interval):
self.total_batch_time += compute_time_spent(start_time, end_time, interval)
self.total_batched_time += compute_time_spent(start_time, end_time, interval)