From e4f8c2d50265824cac0d0299ae9b8a4551d60540 Mon Sep 17 00:00:00 2001 From: Jake Pullen Date: Tue, 9 Dec 2025 18:03:37 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20=E2=9C=A8=20Reduced=20complexity=20&=20?= =?UTF-8?q?formatted=20files?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.py | 4 +-- main.py | 50 +++++++++++++++------------- modules/__init__.py | 4 +-- modules/batch_nimrod.py | 30 ++++++++++------- modules/combine_timeseries.py | 16 --------- modules/generate_timeseries.py | 60 +++++++++++++++------------------- modules/nimrod.py | 12 ++++--- pyproject.toml | 2 +- uv.lock | 2 +- 9 files changed, 85 insertions(+), 95 deletions(-) delete mode 100644 modules/combine_timeseries.py diff --git a/config.py b/config.py index 4c11afd..22bbd56 100644 --- a/config.py +++ b/config.py @@ -4,5 +4,5 @@ class Config: COMBINED_FOLDER = "./combined_files" ZONE_FOLDER = "./zone_inputs" - delete_dat_after_processing = True - delete_asc_after_processing = True \ No newline at end of file + delete_dat_after_processing = False + delete_asc_after_processing = True diff --git a/main.py b/main.py index 4f30814..d244034 100644 --- a/main.py +++ b/main.py @@ -6,7 +6,7 @@ import concurrent.futures from pathlib import Path from config import Config -from modules import BatchNimrod, GenerateTimeseries, CombineTimeseries +from modules import BatchNimrod, GenerateTimeseries logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" @@ -17,70 +17,76 @@ if __name__ == "__main__": os.makedirs(Path(Config.COMBINED_FOLDER), exist_ok=True) locations = [] - #load zone inputs here + zones = set() + # load zone inputs here for file in os.listdir(Path(Config.ZONE_FOLDER)): - with open(Path(Config.ZONE_FOLDER,file), 'r') as csvfile: + with open(Path(Config.ZONE_FOLDER, file), "r") as csvfile: reader = csv.reader(csvfile) header = next(reader) # Skip header row for row in reader: # Extract the relevant fields: 1K Grid, Easting, Northing, Zone - zone_id = row[0] # Ossheet column + grid_name = row[0] # 1k Grid name easting = int(row[1]) # Easting column northing = int(row[2]) # Northing column zone = int(row[3]) # ZoneID column - locations.append([zone_id, easting, northing, zone]) - logging.info(f'Count of 1K Grids: {len(locations)}') + locations.append([grid_name, easting, northing, zone]) + zones.add(zone) + logging.info(f"Count of 1km Grids: {len(locations)}") + logging.info(f"Count of Zones: {len(zones)}") batch = BatchNimrod(Config) - timeseries = GenerateTimeseries(Config) - combiner = CombineTimeseries(Config, locations) + timeseries = GenerateTimeseries(Config, locations) start = time.time() - logging.info("Starting interleaved processing of DAT files and Timeseries generation") - + logging.info( + "Starting interleaved processing of DAT files and Timeseries generation" + ) + # Initialize results structure - results = {loc[0]: {'dates': [], 'values': []} for loc in locations} + results = {loc[0]: {"dates": [], "values": []} for loc in locations} def process_pipeline(dat_file): # 1. Process DAT to ASC asc_file = batch._process_single_file(dat_file) if not asc_file: return None - + # 2. Extract data from ASC file_results = timeseries.process_asc_file(asc_file, locations) return file_results # Get list of DAT files - dat_files = [f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith('.')] + dat_files = [ + f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith(".") + ] total_files = len(dat_files) - + logging.info(f"Processing {total_files} files concurrently...") with concurrent.futures.ThreadPoolExecutor() as executor: future_to_file = { - executor.submit(process_pipeline, dat_file): dat_file + executor.submit(process_pipeline, dat_file): dat_file for dat_file in dat_files } - + completed_count = 0 try: for future in concurrent.futures.as_completed(future_to_file): file_results = future.result() if file_results: for res in file_results: - zone_id = res['zone_id'] - results[zone_id]['dates'].append(res['date']) - results[zone_id]['values'].append(res['value']) - + zone_id = res["zone_id"] + results[zone_id]["dates"].append(res["date"]) + results[zone_id]["values"].append(res["value"]) + completed_count += 1 if completed_count % 100 == 0: elapsed_time = time.time() - start files_per_minute = (completed_count / elapsed_time) * 60 remaining_files = total_files - completed_count eta_minutes = remaining_files / (files_per_minute / 60) / 60 - logging.info(f'''Processed {completed_count} out of {total_files} files. - Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes''') + logging.info(f"""Processed {completed_count} out of {total_files} files. + Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes""") except KeyboardInterrupt: logging.warning("KeyboardInterrupt received. Cancelling pending tasks...") executor.shutdown(wait=False, cancel_futures=True) diff --git a/modules/__init__.py b/modules/__init__.py index 6c3385d..6745012 100644 --- a/modules/__init__.py +++ b/modules/__init__.py @@ -1,11 +1,9 @@ from .nimrod import Nimrod from .batch_nimrod import BatchNimrod from .generate_timeseries import GenerateTimeseries -from .combine_timeseries import CombineTimeseries __all__ = [ "Nimrod", "BatchNimrod", "GenerateTimeseries", - "CombineTimeseries" -] \ No newline at end of file +] diff --git a/modules/batch_nimrod.py b/modules/batch_nimrod.py index db829db..0f0760a 100644 --- a/modules/batch_nimrod.py +++ b/modules/batch_nimrod.py @@ -5,27 +5,26 @@ import logging import concurrent.futures - class BatchNimrod: def __init__(self, config) -> None: self.config = config def _process_single_file(self, in_file): """Process a single Nimrod DAT file. - + Args: in_file (str): Filename of the DAT file. - + Returns: bool: True if successful, False otherwise. """ in_file_full = Path(self.config.DAT_TOP_FOLDER, in_file) - + try: # We need to open the file here, inside the thread with open(in_file_full, "rb") as f: image = Nimrod(f) - + out_file_name = f"{image.get_validity_time()}.asc" out_file_path = Path(self.config.ASC_TOP_FOLDER, out_file_name) @@ -59,26 +58,33 @@ class BatchNimrod: box for each area, and exports clipped raster data to OUT_TOP_FOLDER. """ # Read all file names in the folder - files_to_process = [f for f in os.listdir(Path(self.config.DAT_TOP_FOLDER)) if not f.startswith('.')] + files_to_process = [ + f + for f in os.listdir(Path(self.config.DAT_TOP_FOLDER)) + if not f.startswith(".") + ] total_files = len(files_to_process) logging.info(f"Processing {total_files} files concurrently...") - + with concurrent.futures.ThreadPoolExecutor() as executor: # Submit all tasks future_to_file = { - executor.submit(self._process_single_file, in_file): in_file + executor.submit(self._process_single_file, in_file): in_file for in_file in files_to_process } - + completed_count = 0 try: for future in concurrent.futures.as_completed(future_to_file): completed_count += 1 if completed_count % 10 == 0: - logging.info(f'processed {completed_count} out of {total_files} files') + logging.info( + f"processed {completed_count} out of {total_files} files" + ) except KeyboardInterrupt: - logging.warning("KeyboardInterrupt received. Cancelling pending tasks...") + logging.warning( + "KeyboardInterrupt received. Cancelling pending tasks..." + ) executor.shutdown(wait=False, cancel_futures=True) raise - diff --git a/modules/combine_timeseries.py b/modules/combine_timeseries.py deleted file mode 100644 index 2671e5c..0000000 --- a/modules/combine_timeseries.py +++ /dev/null @@ -1,16 +0,0 @@ -import logging - -class CombineTimeseries: - def __init__(self, config, locations): - self.config = config - self.locations = locations - self.grouped_locations = {} - self.build_location_groups() - - def build_location_groups(self): - for location in self.locations: - group = location[3] # zone number - if group not in self.grouped_locations: - self.grouped_locations[group] = [] - self.grouped_locations[group].append(location) - logging.info(f'Count of zones: {len(self.grouped_locations)}') diff --git a/modules/generate_timeseries.py b/modules/generate_timeseries.py index 09b5847..2011873 100644 --- a/modules/generate_timeseries.py +++ b/modules/generate_timeseries.py @@ -8,10 +8,10 @@ import concurrent.futures import logging - class GenerateTimeseries: - def __init__(self, config): + def __init__(self, config, locations): self.config = config + self.locations = locations def _read_ascii_header(self, ascii_raster_file: str) -> list: """Reads header information from an ASCII DEM @@ -66,17 +66,17 @@ class GenerateTimeseries: def process_asc_file(self, file_name, locations): """Process a single ASC file and extract data for all locations. - + Args: file_name (str): Name of the ASC file. locations (list): List of locations. - + Returns: - list: A list of dictionaries containing extracted data for each location, + list: A list of dictionaries containing extracted data for each location, or None if processing fails. Format: [{'zone_id': id, 'date': datetime, 'value': float}, ...] """ - if not file_name.endswith('.asc'): + if not file_name.endswith(".asc"): return None file_path = Path(self.config.ASC_TOP_FOLDER, file_name) @@ -84,7 +84,7 @@ class GenerateTimeseries: try: radar_header = self._read_ascii_header(str(file_path)) - + # Read grid once cur_rawgrid = np.loadtxt(file_path, skiprows=6, dtype=float, delimiter=None) @@ -97,14 +97,14 @@ class GenerateTimeseries: # Extract data for each location for location in locations: zone_id = location[0] - + # Calculate crop coordinates start_col, start_row, end_col, end_row = self._calculate_crop_coords( location, radar_header ) cur_croppedrain = cur_rawgrid[start_row:end_row, start_col:end_col] - + if cur_croppedrain.size > 2: val = cur_croppedrain.flatten()[2] / 32 else: @@ -112,17 +112,12 @@ class GenerateTimeseries: # print(f"Warning: Crop too small for {zone_id} in {file_name}") val = 0.0 - results.append({ - 'zone_id': zone_id, - 'date': parsed_date, - 'value': val - }) + results.append({"zone_id": zone_id, "date": parsed_date, "value": val}) if self.config.delete_asc_after_processing: os.remove(file_path) return results - except Exception as e: print(f"Error processing file {file_name}: {e}") @@ -135,7 +130,7 @@ class GenerateTimeseries: locations (list): List of location data [zone_id, easting, northing, zone] """ # Initialize data structure to hold results: {zone_id: {'dates': [], 'values': []}} - results = {loc[0]: {'dates': [], 'values': []} for loc in locations} + results = {loc[0]: {"dates": [], "values": []} for loc in locations} # Get list of ASC files asc_files = sorted(os.listdir(Path(self.config.ASC_TOP_FOLDER))) @@ -147,20 +142,20 @@ class GenerateTimeseries: with concurrent.futures.ThreadPoolExecutor() as executor: # Submit all tasks future_to_file = { - executor.submit(self.process_asc_file, file_name, locations): file_name + executor.submit(self.process_asc_file, file_name, locations): file_name for file_name in asc_files } - + completed_count = 0 try: for future in concurrent.futures.as_completed(future_to_file): file_results = future.result() if file_results: for res in file_results: - zone_id = res['zone_id'] - results[zone_id]['dates'].append(res['date']) - results[zone_id]['values'].append(res['value']) - + zone_id = res["zone_id"] + results[zone_id]["dates"].append(res["date"]) + results[zone_id]["values"].append(res["value"]) + completed_count += 1 if completed_count % 100 == 0: print(f"Processed {completed_count}/{total_files} files") @@ -176,9 +171,6 @@ class GenerateTimeseries: results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}} locations (list): List of location data [zone_id, easting, northing, zone] """ - # Map zone_id -> zone - zone_map = {loc[0]: loc[3] for loc in locations} - # Group results by zone and collect all unique dates zone_data = {} for loc in locations: @@ -186,19 +178,19 @@ class GenerateTimeseries: zone_name = loc[3] if zone_name not in zone_data: - zone_data[zone_name] = {'dates': [], 'values': {}} + zone_data[zone_name] = {"dates": [], "values": {}} - zone_data[zone_name]['values'][zone_id] = results[zone_id]['values'] - zone_data[zone_name]['dates'].extend(results[zone_id]['dates']) + zone_data[zone_name]["values"][zone_id] = results[zone_id]["values"] + zone_data[zone_name]["dates"].extend(results[zone_id]["dates"]) # Get unique sorted dates across all zones for zone_name, data in zone_data.items(): - data['dates'] = sorted(set(data['dates'])) + data["dates"] = sorted(set(data["dates"])) # Now write one CSV per zone with aligned timestamps for zone_name, data in zone_data.items(): - dates = data['dates'] - values_dict = data['values'] + dates = data["dates"] + values_dict = data["values"] # Create aligned DataFrame df_dict = {"datetime": dates} @@ -235,7 +227,9 @@ class GenerateTimeseries: pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S") ) - output_path = Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv" + output_path = ( + Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv" + ) sorted_df.write_csv(output_path, float_precision=4) - logging.info("All CSV files written.") \ No newline at end of file + logging.info("All CSV files written.") diff --git a/modules/nimrod.py b/modules/nimrod.py index 3ed9436..b93a6e4 100644 --- a/modules/nimrod.py +++ b/modules/nimrod.py @@ -258,12 +258,12 @@ class Nimrod: # Read data as big-endian 16-bit integers # numpy.frombuffer is efficient for reading from bytes data_bytes = infile.read(array_size * 2) - self.data = np.frombuffer(data_bytes, dtype='>h').astype(np.int16) - + self.data = np.frombuffer(data_bytes, dtype=">h").astype(np.int16) + # Reshape to (nrows, ncols) for easier 2D manipulation # Note: NIMROD data is row-major (C-style), starting from top-left self.data = self.data.reshape((self.nrows, self.ncols)) - + except Exception: infile.close() raise Nimrod.PayloadReadError @@ -392,7 +392,9 @@ class Nimrod: # Use numpy slicing to extract the sub-array # Note: y indices correspond to rows, x indices to columns # Slicing is [start:end], so we need +1 for the end index - self.data = self.data[yMinPixelId : yMaxPixelId + 1, xMinPixelId : xMaxPixelId + 1] + self.data = self.data[ + yMinPixelId : yMaxPixelId + 1, xMinPixelId : xMaxPixelId + 1 + ] # Update object where necessary self.x_right = self.x_left + xMaxPixelId * self.x_pixel_size @@ -435,7 +437,7 @@ class Nimrod: # Write raster data to output file using numpy.savetxt # This is significantly faster than iterating in Python - np.savetxt(outfile, self.data, fmt='%d', delimiter=' ') + np.savetxt(outfile, self.data, fmt="%d", delimiter=" ") outfile.close() diff --git a/pyproject.toml b/pyproject.toml index 6de0cec..c3ecfef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "met-office" -version = "1.0.0" +version = "1.1.0" description = "Convert .dat nimrod files to .asc files" readme = "README.md" requires-python = ">=3.14" diff --git a/uv.lock b/uv.lock index 59360e5..a0b486f 100644 --- a/uv.lock +++ b/uv.lock @@ -4,7 +4,7 @@ requires-python = ">=3.14" [[package]] name = "met-office" -version = "1.0.0" +version = "1.1.0" source = { virtual = "." } dependencies = [ { name = "numpy" },