multithreading

This commit is contained in:
2025-11-18 22:09:20 +00:00
parent 69c1b86bf1
commit aa1879582a
5 changed files with 150 additions and 178 deletions
+60 -32
View File
@@ -2,50 +2,78 @@ from modules.nimrod import Nimrod
import os
from pathlib import Path
import logging
import concurrent.futures
class BatchNimrod:
def __init__(self, config) -> None:
self.config = config
def process_nimrod_files(self) -> None:
def _process_single_file(self, in_file):
"""Process a single Nimrod DAT file.
Args:
in_file (str): Filename of the DAT file.
Returns:
bool: True if successful, False otherwise.
"""
Process all Nimrod files in the input directory, applying bounding box clipping
and exporting to ASC format.
This function reads all files from DAT_TOP_FOLDER, applies the appropriate bounding
box for each area, and exports clipped raster data to OUT_TOP_FOLDER.
"""
# Read all file names in the folder
files_to_process = len([f for f in os.listdir(Path(self.config.DAT_TOP_FOLDER))])
logging.info(f"Processing {files_to_process} files...")
file_counter = 0
for in_file in os.listdir(Path(self.config.DAT_TOP_FOLDER)):
in_file_full = Path(self.config.DAT_TOP_FOLDER, in_file)
try:
image = Nimrod(open(in_file_full, "rb"))
in_file_full = Path(self.config.DAT_TOP_FOLDER, in_file)
try:
# We need to open the file here, inside the thread
with open(in_file_full, "rb") as f:
image = Nimrod(f)
out_file_name = f"{image.get_validity_time()}.asc"
out_file_path = Path(self.config.ASC_TOP_FOLDER, out_file_name)
with open(out_file_path, "w") as outfile:
image.extract_asc(outfile)
if self.config.delete_dat_after_processing:
os.remove(in_file_full)
if self.config.delete_dat_after_processing:
os.remove(in_file_full)
file_counter += 1
logging.debug(f"Successfully processed: {in_file_full}")
if file_counter %10 == 0:
logging.info(f'processed {file_counter} out of {files_to_process} files')
logging.debug(f"Successfully processed: {in_file_full}")
return True
except Nimrod.HeaderReadError as e:
logging.error(f"Failed to read file {in_file_full}, is it corrupt?")
logging.error(e)
return False
except Nimrod.PayloadReadError as e:
logging.error(f"Failed to load the raster data in {in_file_full}")
logging.error(e)
return False
except Exception as e:
logging.error(f"Unexpected error processing {in_file_full}: {e}")
return False
def process_nimrod_files(self) -> None:
"""
Process all Nimrod files in the input directory concurrently, applying bounding box clipping
and exporting to ASC format.
This function reads all files from DAT_TOP_FOLDER, applies the appropriate bounding
box for each area, and exports clipped raster data to OUT_TOP_FOLDER.
"""
# Read all file names in the folder
files_to_process = [f for f in os.listdir(Path(self.config.DAT_TOP_FOLDER)) if not f.startswith('.')]
total_files = len(files_to_process)
logging.info(f"Processing {total_files} files concurrently...")
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit all tasks
future_to_file = {
executor.submit(self._process_single_file, in_file): in_file
for in_file in files_to_process
}
completed_count = 0
for future in concurrent.futures.as_completed(future_to_file):
completed_count += 1
if completed_count % 10 == 0:
logging.info(f'processed {completed_count} out of {total_files} files')
except Nimrod.HeaderReadError as e:
logging.error(f"Failed to read file {in_file_full}, is it corrupt?")
logging.error(e)
continue
except Nimrod.PayloadReadError as e:
logging.error(f"Failed to load the raster data in {in_file_full}")
logging.error(e)
continue
+87 -70
View File
@@ -4,6 +4,8 @@ from pathlib import Path
import polars as pd
from datetime import datetime
import os
import concurrent.futures
class GenerateTimeseries:
@@ -61,8 +63,67 @@ class GenerateTimeseries:
return int(start_col), int(start_row), int(end_col), int(end_row)
def _process_single_file(self, file_name, locations):
"""Process a single ASC file and extract data for all locations.
Args:
file_name (str): Name of the ASC file.
locations (list): List of locations.
Returns:
list: A list of dictionaries containing extracted data for each location,
or None if processing fails.
Format: [{'zone_id': id, 'date': datetime, 'value': float}, ...]
"""
if not file_name.endswith('.asc'):
return None
file_path = Path(self.config.ASC_TOP_FOLDER, file_name)
results = []
try:
radar_header = self._read_ascii_header(str(file_path))
# Read grid once
cur_rawgrid = np.loadtxt(file_path, skiprows=6, dtype=float, delimiter=None)
# Parse datetime from filename once
filename = os.path.basename(file_path)
date_str = filename[:8] # YYYYMMDD
time_str = filename[8:12] # HHMM
parsed_date = datetime.strptime(f"{date_str}{time_str}", "%Y%m%d%H%M")
# Extract data for each location
for location in locations:
zone_id = location[0]
# Calculate crop coordinates
start_col, start_row, end_col, end_row = self._calculate_crop_coords(
location, radar_header
)
cur_croppedrain = cur_rawgrid[start_row:end_row, start_col:end_col]
if cur_croppedrain.size > 2:
val = cur_croppedrain.flatten()[2] / 32
else:
# Handle edge case
# print(f"Warning: Crop too small for {zone_id} in {file_name}")
val = 0.0
results.append({
'zone_id': zone_id,
'date': parsed_date,
'value': val
})
return results
except Exception as e:
print(f"Error processing file {file_name}: {e}")
return None
def extract_data_for_all_locations(self, locations):
"""Extract cropped rain data for all locations by iterating over ASC files once.
"""Extract cropped rain data for all locations by iterating over ASC files concurrently.
Args:
locations (list): List of location data [zone_id, easting, northing, zone]
@@ -70,74 +131,33 @@ class GenerateTimeseries:
# Initialize data structure to hold results: {zone_id: {'dates': [], 'values': []}}
results = {loc[0]: {'dates': [], 'values': []} for loc in locations}
# Get list of ASC files and sort them to ensure chronological order if needed
# Get list of ASC files
asc_files = sorted(os.listdir(Path(self.config.ASC_TOP_FOLDER)))
total_files = len(asc_files)
print(f"Processing {total_files} ASC files...")
print(f"Processing {total_files} ASC files concurrently...")
for i, file_name in enumerate(asc_files):
if not file_name.endswith('.asc'):
continue
file_path = Path(self.config.ASC_TOP_FOLDER, file_name)
try:
radar_header = self._read_ascii_header(str(file_path))
# Read grid once
cur_rawgrid = np.loadtxt(file_path, skiprows=6, dtype=float, delimiter=None)
# Parse datetime from filename once
filename = os.path.basename(file_path)
date_str = filename[:8] # YYYYMMDD
time_str = filename[8:12] # HHMM
parsed_date = datetime.strptime(f"{date_str}{time_str}", "%Y%m%d%H%M")
# Extract data for each location
for location in locations:
zone_id = location[0]
# Calculate crop coordinates
start_col, start_row, end_col, end_row = self._calculate_crop_coords(
location, radar_header
)
# Extract value
# Note: The original code used cur_croppedrain.flatten()[2] / 32
# We need to ensure the crop is valid and has enough elements.
# Assuming the crop size is fixed as per original code (2x2 basin -> 4 cells?)
# Original:
# nrows_basin = 2
# ncols_basin = 2
# cellres_basin = 1000
# cellres_radar = radar_header[4] (usually 1000)
# So it's likely a small grid.
cur_croppedrain = cur_rawgrid[start_row:end_row, start_col:end_col]
# Original logic: rainfile.append(cur_croppedrain.flatten()[2] / 32)
# We replicate this exactly.
if cur_croppedrain.size > 2:
val = cur_croppedrain.flatten()[2] / 32
else:
val = 0.0 # Handle edge case if crop is too small? Or maybe NaN?
# For now, let's assume it works as before, but maybe add a check?
# If the original code worked, this should work too provided indices are correct.
# If size is too small, it would raise IndexError in original code too.
if cur_croppedrain.size <= 2:
print(f"Warning: Crop too small for {zone_id} in {file_name}")
val = 0.0 # Default or error?
results[zone_id]['dates'].append(parsed_date)
results[zone_id]['values'].append(val)
# Use ThreadPoolExecutor for concurrent processing
# Since we are using Python 3.14t (free-threaded), this should scale well even for CPU work
# mixed with I/O.
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit all tasks
future_to_file = {
executor.submit(self._process_single_file, file_name, locations): file_name
for file_name in asc_files
}
except Exception as e:
print(f"Error processing file {file_name}: {e}")
continue
if (i + 1) % 100 == 0:
print(f"Processed {i + 1}/{total_files} files")
completed_count = 0
for future in concurrent.futures.as_completed(future_to_file):
file_results = future.result()
if file_results:
for res in file_results:
zone_id = res['zone_id']
results[zone_id]['dates'].append(res['date'])
results[zone_id]['values'].append(res['value'])
completed_count += 1
if completed_count % 100 == 0:
print(f"Processed {completed_count}/{total_files} files")
# Write CSVs for each location
print("Writing CSV files...")
@@ -151,12 +171,9 @@ class GenerateTimeseries:
df = pd.DataFrame({"datetime": data['dates'], zone_id: data['values']})
# Sort and set index (Polars)
# Sort the dataframe into date order
sorted_df = df.sort("datetime")
sorted_df = sorted_df.with_columns(
pd.Series(data['dates']).alias("datetime")
).set_sorted("datetime")
output_path = Path(self.config.CSV_TOP_FOLDER) / f"{zone_id}_timeseries_data.csv"
sorted_df.write_csv(
output_path,