feat: ✨ Reduced the amount of steps and saved a lot of ram
This commit is contained in:
@@ -66,7 +66,6 @@ The `config.py` file defines folder paths:
|
|||||||
|
|
||||||
- DAT_TOP_FOLDER: "./dat_files"
|
- DAT_TOP_FOLDER: "./dat_files"
|
||||||
- ASC_TOP_FOLDER: "./asc_files"
|
- ASC_TOP_FOLDER: "./asc_files"
|
||||||
- CSV_TOP_FOLDER: "./csv_files"
|
|
||||||
- COMBINED_FOLDER: "./combined_files"
|
- COMBINED_FOLDER: "./combined_files"
|
||||||
|
|
||||||
Example of how the zone csv files should look:
|
Example of how the zone csv files should look:
|
||||||
|
|||||||
@@ -1,10 +1,8 @@
|
|||||||
class Config:
|
class Config:
|
||||||
DAT_TOP_FOLDER = "./dat_files"
|
DAT_TOP_FOLDER = "./dat_files"
|
||||||
ASC_TOP_FOLDER = "./asc_files"
|
ASC_TOP_FOLDER = "./asc_files"
|
||||||
CSV_TOP_FOLDER = "./csv_files"
|
|
||||||
COMBINED_FOLDER = "./combined_files"
|
COMBINED_FOLDER = "./combined_files"
|
||||||
ZONE_FOLDER = "./zone_inputs"
|
ZONE_FOLDER = "./zone_inputs"
|
||||||
|
|
||||||
delete_dat_after_processing = False
|
delete_dat_after_processing = True
|
||||||
delete_asc_after_processing = True
|
delete_asc_after_processing = True
|
||||||
delete_csv_after_combining = True
|
|
||||||
@@ -14,7 +14,6 @@ logging.basicConfig(
|
|||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
os.makedirs(Path(Config.ASC_TOP_FOLDER), exist_ok=True)
|
os.makedirs(Path(Config.ASC_TOP_FOLDER), exist_ok=True)
|
||||||
#os.makedirs(Path(Config.CSV_TOP_FOLDER), exist_ok=True)
|
|
||||||
os.makedirs(Path(Config.COMBINED_FOLDER), exist_ok=True)
|
os.makedirs(Path(Config.COMBINED_FOLDER), exist_ok=True)
|
||||||
|
|
||||||
locations = []
|
locations = []
|
||||||
@@ -92,11 +91,6 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
logging.info("Writing CSV files...")
|
logging.info("Writing CSV files...")
|
||||||
timeseries.write_results_to_csv(results, locations)
|
timeseries.write_results_to_csv(results, locations)
|
||||||
# results.clear()
|
|
||||||
|
|
||||||
# logging.info("combining CSVs into groups")
|
|
||||||
# combiner.combine_csv_files()
|
|
||||||
# logging.info("CSVs combined!")
|
|
||||||
end = time.time()
|
end = time.time()
|
||||||
elapsed_time = end - start
|
elapsed_time = end - start
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,3 @@
|
|||||||
import polars as pd
|
|
||||||
import os
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
class CombineTimeseries:
|
class CombineTimeseries:
|
||||||
@@ -16,55 +14,3 @@ class CombineTimeseries:
|
|||||||
self.grouped_locations[group] = []
|
self.grouped_locations[group] = []
|
||||||
self.grouped_locations[group].append(location)
|
self.grouped_locations[group].append(location)
|
||||||
logging.info(f'Count of zones: {len(self.grouped_locations)}')
|
logging.info(f'Count of zones: {len(self.grouped_locations)}')
|
||||||
|
|
||||||
# def combine_csv_files(self):
|
|
||||||
# to_delete = []
|
|
||||||
# for group, loc_list in self.grouped_locations.items():
|
|
||||||
# output_file =f"{self.config.COMBINED_FOLDER}/zone_{group}_timeseries_data.csv"
|
|
||||||
# combined_df = None
|
|
||||||
# for loc in loc_list:
|
|
||||||
# csv_to_load = f"{self.config.CSV_TOP_FOLDER}/{loc[0]}_timeseries_data.csv"
|
|
||||||
# df = pd.read_csv(csv_to_load, streaming=True)
|
|
||||||
# if combined_df is None:
|
|
||||||
# combined_df = df
|
|
||||||
# else:
|
|
||||||
# combined_df = combined_df.join(df, on='datetime')
|
|
||||||
|
|
||||||
# if self.config.delete_csv_after_combining:
|
|
||||||
# to_delete.append(csv_to_load)
|
|
||||||
|
|
||||||
# sorted_df = combined_df.sort('datetime')
|
|
||||||
# print(f'writing file to {output_file}')
|
|
||||||
# sorted_df.write_csv(output_file)
|
|
||||||
|
|
||||||
# if len(to_delete) > 0:
|
|
||||||
# for path in to_delete:
|
|
||||||
# print(f'deleting {path}')
|
|
||||||
# os.remove(path)
|
|
||||||
|
|
||||||
def combine_csv_files(self):
|
|
||||||
to_delete = []
|
|
||||||
for group, loc_list in self.grouped_locations.items():
|
|
||||||
output_file = f"{self.config.COMBINED_FOLDER}/zone_{group}_timeseries_data.csv"
|
|
||||||
|
|
||||||
# Use LazyFrame for memory-efficient processing
|
|
||||||
lazy_dfs = []
|
|
||||||
for loc in loc_list:
|
|
||||||
csv_to_load = f"{self.config.CSV_TOP_FOLDER}/{loc[0]}_timeseries_data.csv"
|
|
||||||
df = pd.scan_csv(csv_to_load) # Lazy read
|
|
||||||
lazy_dfs.append(df)
|
|
||||||
|
|
||||||
if self.config.delete_csv_after_combining:
|
|
||||||
to_delete.append(csv_to_load)
|
|
||||||
|
|
||||||
# Combine with LazyFrame operations
|
|
||||||
combined_lazy = pd.concat(lazy_dfs, how='align').collect(streaming=True) # Collect at the end
|
|
||||||
|
|
||||||
sorted_df = combined_lazy.sort('datetime')
|
|
||||||
print(f'writing file to {output_file}')
|
|
||||||
sorted_df.write_csv(output_file)
|
|
||||||
|
|
||||||
if len(to_delete) > 0:
|
|
||||||
for path in to_delete:
|
|
||||||
print(f'deleting {path}')
|
|
||||||
os.remove(path)
|
|
||||||
@@ -144,7 +144,6 @@ class GenerateTimeseries:
|
|||||||
|
|
||||||
# Use ThreadPoolExecutor for concurrent processing
|
# Use ThreadPoolExecutor for concurrent processing
|
||||||
# Since we are using Python 3.14t (free-threaded), this should scale well even for CPU work
|
# Since we are using Python 3.14t (free-threaded), this should scale well even for CPU work
|
||||||
# mixed with I/O.
|
|
||||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
# Submit all tasks
|
# Submit all tasks
|
||||||
future_to_file = {
|
future_to_file = {
|
||||||
@@ -170,39 +169,6 @@ class GenerateTimeseries:
|
|||||||
executor.shutdown(wait=False, cancel_futures=True)
|
executor.shutdown(wait=False, cancel_futures=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# def write_results_to_csv(self, results, locations):
|
|
||||||
# """Write extracted data to CSV files for each location.
|
|
||||||
|
|
||||||
# Args:
|
|
||||||
# results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}}
|
|
||||||
# locations (list): List of location data
|
|
||||||
# """
|
|
||||||
# for location in locations:
|
|
||||||
# grid_square = location[0]
|
|
||||||
# zone = location[3]
|
|
||||||
# data = results[grid_square]
|
|
||||||
|
|
||||||
# if not data['dates']:
|
|
||||||
# print(f"No data found for {grid_square}")
|
|
||||||
# continue
|
|
||||||
|
|
||||||
# df = pd.DataFrame({"datetime": data['dates'], grid_square: data['values']})
|
|
||||||
|
|
||||||
# # Sort the dataframe into date order
|
|
||||||
# sorted_df = df.sort("datetime")
|
|
||||||
|
|
||||||
# # Format datetime column
|
|
||||||
# sorted_df = sorted_df.with_columns(
|
|
||||||
# pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S")
|
|
||||||
# )
|
|
||||||
|
|
||||||
# output_path = Path(self.config.CSV_TOP_FOLDER) / f"{zone}_timeseries_data.csv"
|
|
||||||
# sorted_df.write_csv(
|
|
||||||
# output_path,
|
|
||||||
# float_precision=4
|
|
||||||
# )
|
|
||||||
# logging.info("All CSV files written.")
|
|
||||||
|
|
||||||
def write_results_to_csv(self, results, locations):
|
def write_results_to_csv(self, results, locations):
|
||||||
"""Write extracted data to CSV files for each zone.
|
"""Write extracted data to CSV files for each zone.
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user