exploring options

This commit is contained in:
2025-12-09 15:33:28 +00:00
parent bd0a421bb9
commit c415b81bc8
4 changed files with 57 additions and 23 deletions
+1 -1
View File
@@ -51,7 +51,7 @@ It is recommended to use UV for environment and package handling.
1. Adjust the config.py file to match your needs. 1. Adjust the config.py file to match your needs.
1. Ensure your .dat files are in the DAT_TOP_FOLDER (as per config location) 1. Ensure your .dat files are in the DAT_TOP_FOLDER (as per config location)
1. Ensure your zone csv files are in the ZONE_FOLDER (as per config location) 1. Ensure your zone csv files are in the ZONE_FOLDER (as per config location)
1. RunMain Pipeline `uv run main.py 1. RunMain Pipeline `uv run main.py` Note that you will have to set your environment variable `PYTHON_GIL=0` first
1. find the output in the COMBINED_FOLDER (as per config location) 1. find the output in the COMBINED_FOLDER (as per config location)
The main pipeline will: The main pipeline will:
+3 -1
View File
@@ -30,6 +30,7 @@ if __name__ == "__main__":
northing = int(row[2]) # Northing column northing = int(row[2]) # Northing column
zone = int(row[3]) # ZoneID column zone = int(row[3]) # ZoneID column
locations.append([zone_id, easting, northing, zone]) locations.append([zone_id, easting, northing, zone])
logging.info(f'Count of 1K Grids: {len(locations)}')
batch = BatchNimrod(Config) batch = BatchNimrod(Config)
timeseries = GenerateTimeseries(Config) timeseries = GenerateTimeseries(Config)
@@ -79,7 +80,7 @@ if __name__ == "__main__":
files_per_minute = (completed_count / elapsed_time) * 60 files_per_minute = (completed_count / elapsed_time) * 60
remaining_files = total_files - completed_count remaining_files = total_files - completed_count
eta_minutes = remaining_files / (files_per_minute / 60) / 60 eta_minutes = remaining_files / (files_per_minute / 60) / 60
logging.info(f''''Processed {completed_count} out of {total_files} files. logging.info(f'''Processed {completed_count} out of {total_files} files.
Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes''') Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes''')
except KeyboardInterrupt: except KeyboardInterrupt:
logging.warning("KeyboardInterrupt received. Cancelling pending tasks...") logging.warning("KeyboardInterrupt received. Cancelling pending tasks...")
@@ -91,6 +92,7 @@ if __name__ == "__main__":
logging.info("Writing CSV files...") logging.info("Writing CSV files...")
timeseries.write_results_to_csv(results, locations) timeseries.write_results_to_csv(results, locations)
results.clear()
logging.info("combining CSVs into groups") logging.info("combining CSVs into groups")
combiner.combine_csv_files() combiner.combine_csv_files()
+46 -13
View File
@@ -1,6 +1,6 @@
import polars as pd import polars as pd
import os import os
import logging
class CombineTimeseries: class CombineTimeseries:
def __init__(self, config, locations): def __init__(self, config, locations):
@@ -15,23 +15,56 @@ class CombineTimeseries:
if group not in self.grouped_locations: if group not in self.grouped_locations:
self.grouped_locations[group] = [] self.grouped_locations[group] = []
self.grouped_locations[group].append(location) self.grouped_locations[group].append(location)
logging.info(f'Count of zones: {len(self.grouped_locations)}')
# def combine_csv_files(self):
# to_delete = []
# for group, loc_list in self.grouped_locations.items():
# output_file =f"{self.config.COMBINED_FOLDER}/zone_{group}_timeseries_data.csv"
# combined_df = None
# for loc in loc_list:
# csv_to_load = f"{self.config.CSV_TOP_FOLDER}/{loc[0]}_timeseries_data.csv"
# df = pd.read_csv(csv_to_load, streaming=True)
# if combined_df is None:
# combined_df = df
# else:
# combined_df = combined_df.join(df, on='datetime')
# if self.config.delete_csv_after_combining:
# to_delete.append(csv_to_load)
# sorted_df = combined_df.sort('datetime')
# print(f'writing file to {output_file}')
# sorted_df.write_csv(output_file)
# if len(to_delete) > 0:
# for path in to_delete:
# print(f'deleting {path}')
# os.remove(path)
def combine_csv_files(self): def combine_csv_files(self):
to_delete = []
for group, loc_list in self.grouped_locations.items(): for group, loc_list in self.grouped_locations.items():
combined_df = None output_file = f"{self.config.COMBINED_FOLDER}/zone_{group}_timeseries_data.csv"
# Use LazyFrame for memory-efficient processing
lazy_dfs = []
for loc in loc_list: for loc in loc_list:
csv_to_load = f"./csv_files/{loc[0]}_timeseries_data.csv" csv_to_load = f"{self.config.CSV_TOP_FOLDER}/{loc[0]}_timeseries_data.csv"
df = pd.read_csv(csv_to_load) df = pd.scan_csv(csv_to_load) # Lazy read
if combined_df is None: lazy_dfs.append(df)
combined_df = df
else:
combined_df = combined_df.join(df, on='datetime')
if self.config.delete_csv_after_combining: if self.config.delete_csv_after_combining:
os.remove(csv_to_load) to_delete.append(csv_to_load)
output_file = ( # Combine with LazyFrame operations
f"{self.config.COMBINED_FOLDER}/zone_{group}_timeseries_data.csv" combined_lazy = pd.concat(lazy_dfs, how='align').collect(streaming=True) # Collect at the end
)
sorted_df = combined_df.sort('datetime') sorted_df = combined_lazy.sort('datetime')
print(f'writing file to {output_file}')
sorted_df.write_csv(output_file) sorted_df.write_csv(output_file)
if len(to_delete) > 0:
for path in to_delete:
print(f'deleting {path}')
os.remove(path)
+2 -3
View File
@@ -5,6 +5,7 @@ import polars as pd
from datetime import datetime from datetime import datetime
import os import os
import concurrent.futures import concurrent.futures
import logging
@@ -169,7 +170,6 @@ class GenerateTimeseries:
executor.shutdown(wait=False, cancel_futures=True) executor.shutdown(wait=False, cancel_futures=True)
raise raise
# Write CSVs for each location
def write_results_to_csv(self, results, locations): def write_results_to_csv(self, results, locations):
"""Write extracted data to CSV files for each location. """Write extracted data to CSV files for each location.
@@ -177,7 +177,6 @@ class GenerateTimeseries:
results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}} results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}}
locations (list): List of location data locations (list): List of location data
""" """
print("Writing CSV files...")
for location in locations: for location in locations:
zone_id = location[0] zone_id = location[0]
data = results[zone_id] data = results[zone_id]
@@ -201,4 +200,4 @@ class GenerateTimeseries:
output_path, output_path,
float_precision=4 float_precision=4
) )
print("All CSV files written.") logging.info("All CSV files written.")