diff --git a/splunklib/searchcommands/monitor_dispatch.py b/splunklib/searchcommands/monitor_dispatch.py new file mode 100644 index 000000000..4c3738ab3 --- /dev/null +++ b/splunklib/searchcommands/monitor_dispatch.py @@ -0,0 +1,107 @@ + +import sys +import os +import csv +import atexit +from typing import Any, Dict, Union +from logging import LoggerAdapter +from .internals import CsvDialect +from .monitor_fs import create_file_monitor, PollingFileMonitor, LinuxInotifyMonitor + +class DispatchMonitor: + """ Monitor the dispatch directory for file creations and update the job state + based on the created files. Prevents the need to repeatedly poll the job. """ + def __init__(self, search_command_instance): + self.command = search_command_instance + self.logger = LoggerAdapter(search_command_instance.logger, {'component': 'DispatchMonitor'}) + self.dispatch_dir: str = search_command_instance.metadata.searchinfo.dispatch_dir + + self.fs_monitor: Union[LinuxInotifyMonitor, PollingFileMonitor] = \ + create_file_monitor( + self.dispatch_dir, + self.on_create, + False + ) + self._atexit_registered = False + + def on_create(self, filepath): + """ Handle file creation events in the dispatch directory """ + filename = os.path.basename(filepath) + self.logger.debug(f"File created in dispatch directory: {filename}") + + # Handle creation of the finalize file + if filename == 'finalize': + self.logger.warning(f"Finalize file created; stopping search: {filepath}") + self.command._finalizing = True + + # Handle status.csv file creation + elif filename == 'status.csv': + self.logger.debug(f"Status file created: {filepath}") + + # Process the status.csv file + self.command.status = read_csv_to_dict(filepath) + print("Updated job status:", self.command.status, file=sys.stderr) + + # TODO: Do something with this output data + # Read the status string from the updated job state (status.csv) + job_status_updated = self.command.status.get('state', None) + self.logger.debug("Job state read: %s", self.command.status) + + # Handle info.csv file creation if needed + elif filename == 'info.csv': + self.logger.debug(f"Info file created: {filepath}") + #self.dispatch_info_result = read_csv_to_dict(filepath) + # Refresh the info from the job + self.command._search_results_info_refresh() + + def start(self): + """ + Monitor the dispatch directory for file creations using watchdog. + + Sets the _finalize_file_exists flag when the finalize file is created. + """ + # Check if a monitoring observer is already running + if self.fs_monitor is not None and self.fs_monitor.running: + return + + # Start the observer in a daemon thread + self.fs_monitor.start() + + # Register atexit handler to stop the observer cleanly + self.register() + + + print(f"Started filesystem observer for dispatch directory: {self.dispatch_dir}", file=sys.stderr) + return self.fs_monitor + + def stop(self): + """ Stop the dispatch directory observer """ + try: + if self.fs_monitor is not None and self.fs_monitor.running: + self.fs_monitor.stop() + print("Dispatch directory observer stopped cleanly at exit.", file=sys.stderr) + except Exception as e: + print(f"Error stopping dispatch directory observer: {e}", file=sys.stderr) + + def register(self): + """ + Register a function to be called at exit. + This is used to ensure that all observers are stopped gracefully. + """ + if not self._atexit_registered: + atexit.register(self.stop) + self._atexit_registered = True + + +def read_csv_to_dict(csv_path) -> Dict[str, Any]: + """ + Read a CSV file and return a dictionary. Merge row data. + """ + result = {} + with open(csv_path, mode='r', encoding='utf-8') as csvfile: + reader = csv.DictReader(csvfile, dialect=CsvDialect) + for row in reader: + result = {**result, **{k.lstrip('_'): v for k, v in row.items()}} + return result + +__all__ = ['DispatchMonitor'] diff --git a/splunklib/searchcommands/monitor_fs.py b/splunklib/searchcommands/monitor_fs.py new file mode 100644 index 000000000..18042ad04 --- /dev/null +++ b/splunklib/searchcommands/monitor_fs.py @@ -0,0 +1,329 @@ +""" +Cross-platform file monitor for Splunk Enterprise +Uses Linux inotify when available, falls back to polling on Windows/other platforms +""" + +import os +import sys +import time +import threading +import struct +import select + +def create_file_monitor(watch_dir, callback, recursive=False): + """Factory function to create appropriate monitor for the platform""" + if sys.platform.startswith('linux'): + try: + return LinuxInotifyMonitor(watch_dir, callback, recursive) + except Exception as e: + print(f"Failed to create inotify monitor, falling back to polling: {e}") + return PollingFileMonitor(watch_dir, callback, recursive) + else: + return PollingFileMonitor(watch_dir, callback, recursive) + +class LinuxInotifyMonitor: + """Linux-specific file monitor using inotify via ctypes""" + + # inotify event constants + IN_CREATE = 0x00000100 + IN_MOVED_TO = 0x00000080 + IN_CLOSE_WRITE = 0x00000008 + IN_ISDIR = 0x40000000 + + def __init__(self, watch_dir, callback, recursive=False): + self.watch_dir = os.path.abspath(watch_dir) + self.callback = callback + self.recursive = recursive + self.running = False + self.watches = {} # watch_descriptor -> path mapping + self.pending_files = {} # Track files being written + + # Validate directory exists + if not os.path.isdir(self.watch_dir): + raise ValueError(f"Directory does not exist: {self.watch_dir}") + + # Load libc and set up inotify + self._setup_inotify() + + def _setup_inotify(self): + """Initialize inotify system calls""" + import ctypes + import ctypes.util + + libc = ctypes.CDLL(ctypes.util.find_library('c')) + + # inotify_init + self.inotify_init = libc.inotify_init + self.inotify_init.restype = ctypes.c_int + + # inotify_add_watch + self.inotify_add_watch = libc.inotify_add_watch + self.inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32] + self.inotify_add_watch.restype = ctypes.c_int + + # inotify_rm_watch + self.inotify_rm_watch = libc.inotify_rm_watch + self.inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int] + self.inotify_rm_watch.restype = ctypes.c_int + + # Initialize inotify file descriptor + self.fd = self.inotify_init() + if self.fd < 0: + raise OSError("Failed to initialize inotify") + + def start(self): + """Start monitoring for file changes""" + # Add initial watches + self._add_watch(self.watch_dir) + + if self.recursive: + self._add_recursive_watches(self.watch_dir) + + self.running = True + self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) + self.cleanup_thread = threading.Thread(target=self._cleanup_pending_files, daemon=True) + + self.monitor_thread.start() + self.cleanup_thread.start() + + print(f"Started inotify monitoring on: {self.watch_dir}") + return self.monitor_thread + + def _add_watch(self, path): + """Add inotify watch to a directory""" + mask = self.IN_CREATE | self.IN_MOVED_TO | self.IN_CLOSE_WRITE + if self.recursive: + mask |= self.IN_ISDIR + + try: + wd = self.inotify_add_watch(self.fd, path.encode('utf-8'), mask) + if wd < 0: + print(f"Warning: Failed to add watch for {path}") + return None + self.watches[wd] = path + return wd + except Exception as e: + print(f"Error adding watch for {path}: {e}") + return None + + def _add_recursive_watches(self, root_path): + """Recursively add watches to all subdirectories""" + for dirpath, dirnames, filenames in os.walk(root_path): + if dirpath != root_path: # Already added root + self._add_watch(dirpath) + + def _monitor_loop(self): + """Main monitoring loop""" + while self.running: + try: + # Use select with timeout for graceful shutdown + ready, _, _ = select.select([self.fd], [], [], 1.0) + + if ready and self.fd in ready: + data = os.read(self.fd, 4096) + self._process_events(data) + + except OSError as e: + if self.running: # Only print error if we're supposed to be running + print(f"Error in monitor loop: {e}") + break + + def _process_events(self, data): + """Parse and process inotify events""" + offset = 0 + while offset < len(data): + if len(data) - offset < 16: + break + + # Parse inotify_event structure + wd, mask, cookie, name_len = struct.unpack('iIII', data[offset:offset+16]) + offset += 16 + + name = "" + if name_len > 0: + name = data[offset:offset+name_len].rstrip(b'\0').decode('utf-8', errors='replace') + offset += name_len + + if wd in self.watches: + self._handle_event(self.watches[wd], name, mask) + + def _handle_event(self, watch_path, filename, mask): + """Handle individual inotify events""" + if not filename: + return + + full_path = os.path.join(watch_path, filename) + + # Handle directory creation for recursive monitoring + if (mask & self.IN_ISDIR) and (mask & (self.IN_CREATE | self.IN_MOVED_TO)) and self.recursive: + self._add_watch(full_path) + return + + # Handle file events + if mask & self.IN_CREATE: + # File created but might still be writing + self.pending_files[full_path] = time.time() + + elif mask & self.IN_MOVED_TO: + # File moved into directory - treat as creation + self.pending_files[full_path] = time.time() + + elif mask & self.IN_CLOSE_WRITE: + # File finished writing. Delete from list of pending files. + if full_path in self.pending_files: + del self.pending_files[full_path] + self._safe_callback(full_path) + + def _cleanup_pending_files(self): + """Clean up files that have been pending for too long""" + while self.running: + current_time = time.time() + timeout_files = [] + + for file_path, create_time in self.pending_files.items(): + if current_time - create_time > 5.0: # 5 second timeout + if os.path.exists(file_path): + timeout_files.append(file_path) + + for file_path in timeout_files: + if file_path in self.pending_files: + del self.pending_files[file_path] + self._safe_callback(file_path) + + time.sleep(1.0) + + def _safe_callback(self, file_path): + """Safely call the user callback""" + try: + # Verify file still exists and is readable + if os.path.isfile(file_path): + self.callback(file_path) + except Exception as e: + print(f"Error in callback for {file_path}: {e}") + + def stop(self): + """Stop monitoring""" + self.running = False + + # Clean up watches + for wd in list(self.watches.keys()): + self.inotify_rm_watch(self.fd, wd) + + # Close file descriptor + if hasattr(self, 'fd') and self.fd >= 0: + os.close(self.fd) + + print("Stopped inotify monitoring") + + +class PollingFileMonitor: + """Cross-platform file monitor using polling (fallback for Windows/other platforms)""" + + def __init__(self, watch_dir, callback, recursive=False): + self.watch_dir = os.path.abspath(watch_dir) + self.callback = callback + self.recursive = recursive + self.running = False + self.known_files = {} # path -> (size, mtime) + self.poll_interval = 0.5 # Seconds between polls + + if not os.path.isdir(self.watch_dir): + raise ValueError(f"Directory does not exist: {self.watch_dir}") + + # Initialize known files + self._scan_directory() + + def start(self): + """Start monitoring for file changes""" + self.running = True + self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) + self.monitor_thread.start() + + print(f"Started polling file monitor on: {self.watch_dir} (interval: {self.poll_interval}s)") + return self.monitor_thread + + def _scan_directory(self): + """Scan directory and update known files""" + new_files = {} + + if self.recursive: + for root, dirs, files in os.walk(self.watch_dir): + for filename in files: + file_path = os.path.join(root, filename) + new_files[file_path] = self._get_file_info(file_path) + else: + try: + for filename in os.listdir(self.watch_dir): + file_path = os.path.join(self.watch_dir, filename) + if os.path.isfile(file_path): + new_files[file_path] = self._get_file_info(file_path) + except OSError: + pass # Directory might have been deleted + + return new_files + + def _get_file_info(self, file_path): + """Get file size and modification time""" + try: + stat_info = os.stat(file_path) + return (stat_info.st_size, stat_info.st_mtime) + except OSError: + return None + + def _monitor_loop(self): + """Main polling loop""" + while self.running: + try: + current_files = self._scan_directory() + + # Check for new files + for file_path in current_files: + if file_path not in self.known_files: + # New file detected + if self._wait_for_stable_file(file_path): + self._safe_callback(file_path) + + self.known_files = current_files + + except Exception as e: + print(f"Error in polling loop: {e}") + + time.sleep(self.poll_interval) + + def _wait_for_stable_file(self, file_path): + """Wait for file to stabilize (stop growing)""" + stable_checks = 3 + check_interval = 0.1 + + prev_info = self._get_file_info(file_path) + if not prev_info: + return False + + for _ in range(stable_checks): + time.sleep(check_interval) + current_info = self._get_file_info(file_path) + + if not current_info: + return False + + if current_info != prev_info: + # File is still changing + prev_info = current_info + stable_checks = 3 # Reset counter + else: + stable_checks -= 1 + + return True + + def _safe_callback(self, file_path): + """Safely call the user callback""" + try: + if os.path.isfile(file_path): + self.callback(file_path) + except Exception as e: + print(f"Error in callback for {file_path}: {e}") + + def stop(self): + """Stop monitoring""" + self.running = False + print("Stopped polling file monitor") diff --git a/splunklib/searchcommands/search_command.py b/splunklib/searchcommands/search_command.py index 2c4f2ab54..7ad9a2e4f 100644 --- a/splunklib/searchcommands/search_command.py +++ b/splunklib/searchcommands/search_command.py @@ -52,6 +52,7 @@ ) from ..client import Service from ..utils import ensure_str +from .monitor_dispatch import DispatchMonitor # ---------------------------------------------------------------------------------------------------------------------- @@ -94,12 +95,14 @@ def __init__(self): self._configuration = self.ConfigurationSettings(self) self._input_header = InputHeader() self._fieldnames = None + self._finalizing = None self._finished = None self._metadata = None self._options = None self._protocol_version = None self._search_results_info = None self._service = None + self._status = None # Internal variables @@ -265,6 +268,9 @@ def search_results_info(self): """ if self._search_results_info is not None: return self._search_results_info + return self._search_results_info_refresh() + + def _search_results_info_refresh(self): if self._protocol_version == 1: try: @@ -665,6 +671,8 @@ def _process_protocol_v1(self, argv, ifile, ofile): ifile = self._prepare_protocol_v1(argv, ifile, ofile) self._records = self._records_protocol_v1 self._metadata.action = "execute" + self.monitor = DispatchMonitor(self) + self.monitor.start() self._execute(ifile, None) else: @@ -763,6 +771,8 @@ def _process_protocol_v2(self, argv, ifile, ofile): f"{class_name}.metadata.searchinfo.dispatch_dir is undefined" ) + self.monitor = DispatchMonitor(self) + self.monitor.start() debug(" tempfile.tempdir=%r", tempfile.tempdir) except: self._record_writer = RecordWriterV2(ofile) @@ -930,6 +940,13 @@ def _decode_list(mv): r"\$(?P(?:\$\$|[^$])*)\$(?:;|$)" ) # matches a single value in an encoded list + def _raise_if_stopped(self, records): + """Wraps the records iterator to check for exit conditions.""" + for record in records: + if self._finalizing: + raise RuntimeError("Search has been finalized") + yield record + # Note: Subclasses must override this method so that it can be called # called as self._execute(ifile, None) def _execute(self, ifile, process): @@ -946,7 +963,8 @@ def _execute(self, ifile, process): """ if self.protocol_version == 1: - self._record_writer.write_records(process(self._records(ifile))) + self._record_writer.write_records( + process(self._raise_if_stopped(self._records(ifile)))) self.finish() else: assert self._protocol_version == 2 @@ -1066,6 +1084,7 @@ def _execute_v2(self, ifile, process): self._metadata.update(metadata) self._execute_chunk_v2(process, result) + # Tell the downstream commands if we are done or manually stopped self._record_writer.write_chunk(finished=self._finished) def _execute_chunk_v2(self, process, chunk):