From 83405eb17e97cceb9c5a98da86399ffeffa8e990 Mon Sep 17 00:00:00 2001 From: Jake Pullen Date: Mon, 8 Dec 2025 19:58:12 +0000 Subject: [PATCH] =?UTF-8?q?style:=20=F0=9F=92=85=20Changed=20order=20of=20?= =?UTF-8?q?business?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 62 +++++++++++++++++++++++++++------- modules/batch_nimrod.py | 2 +- modules/generate_timeseries.py | 11 ++++-- 3 files changed, 59 insertions(+), 16 deletions(-) diff --git a/main.py b/main.py index d0a50d0..843c895 100644 --- a/main.py +++ b/main.py @@ -2,6 +2,7 @@ import logging import time import os import csv +import concurrent.futures from pathlib import Path from config import Config @@ -35,22 +36,57 @@ if __name__ == "__main__": combiner = CombineTimeseries(Config, locations) start = time.time() - logging.info("Starting to process DAT to ASC") + logging.info("Starting interleaved processing of DAT files and Timeseries generation") + + # Initialize results structure + results = {loc[0]: {'dates': [], 'values': []} for loc in locations} - batch.process_nimrod_files() - batch_checkpoint = time.time() - elapsed_time = batch_checkpoint - start - logging.info(f"DAT to ASC completed in {elapsed_time:.2f} seconds") + def process_pipeline(dat_file): + # 1. Process DAT to ASC + asc_file = batch._process_single_file(dat_file) + if not asc_file: + return None + + # 2. Extract data from ASC + file_results = timeseries.process_asc_file(asc_file, locations) + return file_results - logging.info("Starting generating timeseries data for all locations.") - place_start = time.time() - timeseries.extract_data_for_all_locations(locations) - place_end = time.time() - place_create_time = place_end - place_start - elapsed_time = place_end - start - logging.info(f"Timeseries generation completed in {place_create_time:.2f} seconds") - logging.info(f"Total time so far {elapsed_time:.2f} seconds") + # Get list of DAT files + dat_files = [f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith('.')] + total_files = len(dat_files) + + logging.info(f"Processing {total_files} files concurrently...") + with concurrent.futures.ThreadPoolExecutor() as executor: + future_to_file = { + executor.submit(process_pipeline, dat_file): dat_file + for dat_file in dat_files + } + + completed_count = 0 + try: + for future in concurrent.futures.as_completed(future_to_file): + file_results = future.result() + if file_results: + for res in file_results: + zone_id = res['zone_id'] + results[zone_id]['dates'].append(res['date']) + results[zone_id]['values'].append(res['value']) + + completed_count += 1 + if completed_count % 10 == 0: + logging.info(f'Processed {completed_count} out of {total_files} files') + except KeyboardInterrupt: + logging.warning("KeyboardInterrupt received. Cancelling pending tasks...") + executor.shutdown(wait=False, cancel_futures=True) + raise + + elapsed_time = time.time() - start + logging.info(f"Interleaved processing completed in {elapsed_time:.2f} seconds") + + logging.info("Writing CSV files...") + timeseries.write_results_to_csv(results, locations) + logging.info("combining CSVs into groups") combiner.combine_csv_files() logging.info("CSVs combined!") diff --git a/modules/batch_nimrod.py b/modules/batch_nimrod.py index e38ae50..db829db 100644 --- a/modules/batch_nimrod.py +++ b/modules/batch_nimrod.py @@ -36,7 +36,7 @@ class BatchNimrod: os.remove(in_file_full) logging.debug(f"Successfully processed: {in_file_full}") - return True + return out_file_name except Nimrod.HeaderReadError as e: logging.error(f"Failed to read file {in_file_full}, is it corrupt?") diff --git a/modules/generate_timeseries.py b/modules/generate_timeseries.py index a771f83..a534b38 100644 --- a/modules/generate_timeseries.py +++ b/modules/generate_timeseries.py @@ -63,7 +63,7 @@ class GenerateTimeseries: return int(start_col), int(start_row), int(end_col), int(end_row) - def _process_single_file(self, file_name, locations): + def process_asc_file(self, file_name, locations): """Process a single ASC file and extract data for all locations. Args: @@ -147,7 +147,7 @@ class GenerateTimeseries: with concurrent.futures.ThreadPoolExecutor() as executor: # Submit all tasks future_to_file = { - executor.submit(self._process_single_file, file_name, locations): file_name + executor.submit(self.process_asc_file, file_name, locations): file_name for file_name in asc_files } @@ -170,6 +170,13 @@ class GenerateTimeseries: raise # Write CSVs for each location + def write_results_to_csv(self, results, locations): + """Write extracted data to CSV files for each location. + + Args: + results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}} + locations (list): List of location data + """ print("Writing CSV files...") for location in locations: zone_id = location[0]