style: 💅 Changed order of business
This commit is contained in:
@@ -2,6 +2,7 @@ import logging
|
|||||||
import time
|
import time
|
||||||
import os
|
import os
|
||||||
import csv
|
import csv
|
||||||
|
import concurrent.futures
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from config import Config
|
from config import Config
|
||||||
@@ -35,21 +36,56 @@ if __name__ == "__main__":
|
|||||||
combiner = CombineTimeseries(Config, locations)
|
combiner = CombineTimeseries(Config, locations)
|
||||||
|
|
||||||
start = time.time()
|
start = time.time()
|
||||||
logging.info("Starting to process DAT to ASC")
|
logging.info("Starting interleaved processing of DAT files and Timeseries generation")
|
||||||
|
|
||||||
batch.process_nimrod_files()
|
# Initialize results structure
|
||||||
batch_checkpoint = time.time()
|
results = {loc[0]: {'dates': [], 'values': []} for loc in locations}
|
||||||
elapsed_time = batch_checkpoint - start
|
|
||||||
logging.info(f"DAT to ASC completed in {elapsed_time:.2f} seconds")
|
|
||||||
|
|
||||||
logging.info("Starting generating timeseries data for all locations.")
|
def process_pipeline(dat_file):
|
||||||
place_start = time.time()
|
# 1. Process DAT to ASC
|
||||||
timeseries.extract_data_for_all_locations(locations)
|
asc_file = batch._process_single_file(dat_file)
|
||||||
place_end = time.time()
|
if not asc_file:
|
||||||
place_create_time = place_end - place_start
|
return None
|
||||||
elapsed_time = place_end - start
|
|
||||||
logging.info(f"Timeseries generation completed in {place_create_time:.2f} seconds")
|
# 2. Extract data from ASC
|
||||||
logging.info(f"Total time so far {elapsed_time:.2f} seconds")
|
file_results = timeseries.process_asc_file(asc_file, locations)
|
||||||
|
return file_results
|
||||||
|
|
||||||
|
# Get list of DAT files
|
||||||
|
dat_files = [f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith('.')]
|
||||||
|
total_files = len(dat_files)
|
||||||
|
|
||||||
|
logging.info(f"Processing {total_files} files concurrently...")
|
||||||
|
|
||||||
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
|
future_to_file = {
|
||||||
|
executor.submit(process_pipeline, dat_file): dat_file
|
||||||
|
for dat_file in dat_files
|
||||||
|
}
|
||||||
|
|
||||||
|
completed_count = 0
|
||||||
|
try:
|
||||||
|
for future in concurrent.futures.as_completed(future_to_file):
|
||||||
|
file_results = future.result()
|
||||||
|
if file_results:
|
||||||
|
for res in file_results:
|
||||||
|
zone_id = res['zone_id']
|
||||||
|
results[zone_id]['dates'].append(res['date'])
|
||||||
|
results[zone_id]['values'].append(res['value'])
|
||||||
|
|
||||||
|
completed_count += 1
|
||||||
|
if completed_count % 10 == 0:
|
||||||
|
logging.info(f'Processed {completed_count} out of {total_files} files')
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logging.warning("KeyboardInterrupt received. Cancelling pending tasks...")
|
||||||
|
executor.shutdown(wait=False, cancel_futures=True)
|
||||||
|
raise
|
||||||
|
|
||||||
|
elapsed_time = time.time() - start
|
||||||
|
logging.info(f"Interleaved processing completed in {elapsed_time:.2f} seconds")
|
||||||
|
|
||||||
|
logging.info("Writing CSV files...")
|
||||||
|
timeseries.write_results_to_csv(results, locations)
|
||||||
|
|
||||||
logging.info("combining CSVs into groups")
|
logging.info("combining CSVs into groups")
|
||||||
combiner.combine_csv_files()
|
combiner.combine_csv_files()
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ class BatchNimrod:
|
|||||||
os.remove(in_file_full)
|
os.remove(in_file_full)
|
||||||
|
|
||||||
logging.debug(f"Successfully processed: {in_file_full}")
|
logging.debug(f"Successfully processed: {in_file_full}")
|
||||||
return True
|
return out_file_name
|
||||||
|
|
||||||
except Nimrod.HeaderReadError as e:
|
except Nimrod.HeaderReadError as e:
|
||||||
logging.error(f"Failed to read file {in_file_full}, is it corrupt?")
|
logging.error(f"Failed to read file {in_file_full}, is it corrupt?")
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ class GenerateTimeseries:
|
|||||||
|
|
||||||
return int(start_col), int(start_row), int(end_col), int(end_row)
|
return int(start_col), int(start_row), int(end_col), int(end_row)
|
||||||
|
|
||||||
def _process_single_file(self, file_name, locations):
|
def process_asc_file(self, file_name, locations):
|
||||||
"""Process a single ASC file and extract data for all locations.
|
"""Process a single ASC file and extract data for all locations.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -147,7 +147,7 @@ class GenerateTimeseries:
|
|||||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
# Submit all tasks
|
# Submit all tasks
|
||||||
future_to_file = {
|
future_to_file = {
|
||||||
executor.submit(self._process_single_file, file_name, locations): file_name
|
executor.submit(self.process_asc_file, file_name, locations): file_name
|
||||||
for file_name in asc_files
|
for file_name in asc_files
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -170,6 +170,13 @@ class GenerateTimeseries:
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
# Write CSVs for each location
|
# Write CSVs for each location
|
||||||
|
def write_results_to_csv(self, results, locations):
|
||||||
|
"""Write extracted data to CSV files for each location.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}}
|
||||||
|
locations (list): List of location data
|
||||||
|
"""
|
||||||
print("Writing CSV files...")
|
print("Writing CSV files...")
|
||||||
for location in locations:
|
for location in locations:
|
||||||
zone_id = location[0]
|
zone_id = location[0]
|
||||||
|
|||||||
Reference in New Issue
Block a user