From c415b81bc8ed4b3cee3d6d22cfe25936d095d074 Mon Sep 17 00:00:00 2001 From: Jake Pullen Date: Tue, 9 Dec 2025 15:33:28 +0000 Subject: [PATCH] exploring options --- README.MD | 2 +- main.py | 6 ++- modules/combine_timeseries.py | 67 +++++++++++++++++++++++++--------- modules/generate_timeseries.py | 5 +-- 4 files changed, 57 insertions(+), 23 deletions(-) diff --git a/README.MD b/README.MD index 164081f..1a67d0f 100644 --- a/README.MD +++ b/README.MD @@ -51,7 +51,7 @@ It is recommended to use UV for environment and package handling. 1. Adjust the config.py file to match your needs. 1. Ensure your .dat files are in the DAT_TOP_FOLDER (as per config location) 1. Ensure your zone csv files are in the ZONE_FOLDER (as per config location) -1. RunMain Pipeline `uv run main.py +1. RunMain Pipeline `uv run main.py` Note that you will have to set your environment variable `PYTHON_GIL=0` first 1. find the output in the COMBINED_FOLDER (as per config location) The main pipeline will: diff --git a/main.py b/main.py index 1b0e509..2aafe41 100644 --- a/main.py +++ b/main.py @@ -30,6 +30,7 @@ if __name__ == "__main__": northing = int(row[2]) # Northing column zone = int(row[3]) # ZoneID column locations.append([zone_id, easting, northing, zone]) + logging.info(f'Count of 1K Grids: {len(locations)}') batch = BatchNimrod(Config) timeseries = GenerateTimeseries(Config) @@ -79,7 +80,7 @@ if __name__ == "__main__": files_per_minute = (completed_count / elapsed_time) * 60 remaining_files = total_files - completed_count eta_minutes = remaining_files / (files_per_minute / 60) / 60 - logging.info(f''''Processed {completed_count} out of {total_files} files. + logging.info(f'''Processed {completed_count} out of {total_files} files. Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes''') except KeyboardInterrupt: logging.warning("KeyboardInterrupt received. Cancelling pending tasks...") @@ -91,7 +92,8 @@ if __name__ == "__main__": logging.info("Writing CSV files...") timeseries.write_results_to_csv(results, locations) - + results.clear() + logging.info("combining CSVs into groups") combiner.combine_csv_files() logging.info("CSVs combined!") diff --git a/modules/combine_timeseries.py b/modules/combine_timeseries.py index fb14354..a4b32c6 100644 --- a/modules/combine_timeseries.py +++ b/modules/combine_timeseries.py @@ -1,6 +1,6 @@ import polars as pd import os - +import logging class CombineTimeseries: def __init__(self, config, locations): @@ -15,23 +15,56 @@ class CombineTimeseries: if group not in self.grouped_locations: self.grouped_locations[group] = [] self.grouped_locations[group].append(location) + logging.info(f'Count of zones: {len(self.grouped_locations)}') - def combine_csv_files(self): - for group, loc_list in self.grouped_locations.items(): - combined_df = None - for loc in loc_list: - csv_to_load = f"./csv_files/{loc[0]}_timeseries_data.csv" - df = pd.read_csv(csv_to_load) - if combined_df is None: - combined_df = df - else: - combined_df = combined_df.join(df, on='datetime') + # def combine_csv_files(self): + # to_delete = [] + # for group, loc_list in self.grouped_locations.items(): + # output_file =f"{self.config.COMBINED_FOLDER}/zone_{group}_timeseries_data.csv" + # combined_df = None + # for loc in loc_list: + # csv_to_load = f"{self.config.CSV_TOP_FOLDER}/{loc[0]}_timeseries_data.csv" + # df = pd.read_csv(csv_to_load, streaming=True) + # if combined_df is None: + # combined_df = df + # else: + # combined_df = combined_df.join(df, on='datetime') + + # if self.config.delete_csv_after_combining: + # to_delete.append(csv_to_load) + + # sorted_df = combined_df.sort('datetime') + # print(f'writing file to {output_file}') + # sorted_df.write_csv(output_file) + + # if len(to_delete) > 0: + # for path in to_delete: + # print(f'deleting {path}') + # os.remove(path) + + def combine_csv_files(self): + to_delete = [] + for group, loc_list in self.grouped_locations.items(): + output_file = f"{self.config.COMBINED_FOLDER}/zone_{group}_timeseries_data.csv" + + # Use LazyFrame for memory-efficient processing + lazy_dfs = [] + for loc in loc_list: + csv_to_load = f"{self.config.CSV_TOP_FOLDER}/{loc[0]}_timeseries_data.csv" + df = pd.scan_csv(csv_to_load) # Lazy read + lazy_dfs.append(df) if self.config.delete_csv_after_combining: - os.remove(csv_to_load) + to_delete.append(csv_to_load) - output_file = ( - f"{self.config.COMBINED_FOLDER}/zone_{group}_timeseries_data.csv" - ) - sorted_df = combined_df.sort('datetime') - sorted_df.write_csv(output_file) + # Combine with LazyFrame operations + combined_lazy = pd.concat(lazy_dfs, how='align').collect(streaming=True) # Collect at the end + + sorted_df = combined_lazy.sort('datetime') + print(f'writing file to {output_file}') + sorted_df.write_csv(output_file) + + if len(to_delete) > 0: + for path in to_delete: + print(f'deleting {path}') + os.remove(path) \ No newline at end of file diff --git a/modules/generate_timeseries.py b/modules/generate_timeseries.py index a534b38..012911a 100644 --- a/modules/generate_timeseries.py +++ b/modules/generate_timeseries.py @@ -5,6 +5,7 @@ import polars as pd from datetime import datetime import os import concurrent.futures +import logging @@ -169,7 +170,6 @@ class GenerateTimeseries: executor.shutdown(wait=False, cancel_futures=True) raise - # Write CSVs for each location def write_results_to_csv(self, results, locations): """Write extracted data to CSV files for each location. @@ -177,7 +177,6 @@ class GenerateTimeseries: results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}} locations (list): List of location data """ - print("Writing CSV files...") for location in locations: zone_id = location[0] data = results[zone_id] @@ -201,4 +200,4 @@ class GenerateTimeseries: output_path, float_precision=4 ) - print("All CSV files written.") + logging.info("All CSV files written.")