8 Commits

Author SHA1 Message Date
Jake e4f8c2d502 feat: Reduced complexity & formatted files 2025-12-09 18:03:37 +00:00
Jake 59f459d4d0 feat: Reduced the amount of steps and saved a lot of ram 2025-12-09 16:29:48 +00:00
Jake 84ba6c837c fix: 🐛 bring the combine into the write to csv step 2025-12-09 16:19:51 +00:00
Jake c415b81bc8 exploring options 2025-12-09 15:33:28 +00:00
Jake bd0a421bb9 chore: 🔧 Minor tweaks for neatness 2025-12-09 09:04:22 +00:00
Jake 4bd32641bd style: 💅 Extra logging info 2025-12-09 07:46:13 +00:00
Jake 83405eb17e style: 💅 Changed order of business 2025-12-08 19:58:12 +00:00
Jake 009c40e08a fix: 🐛 Quick Exit on keyboard interrupt 2025-12-08 19:14:02 +00:00
11 changed files with 220 additions and 152 deletions
+4 -1
View File
@@ -9,10 +9,13 @@ wheels/
# Virtual environments
.venv
dat_other/*
dat_files/*
asc_files/*
csv_files/*
combined_files/*
zone_inputs/*
*.tar.gz
*.tar.gz
generate_test_data.py
+18 -11
View File
@@ -5,6 +5,7 @@ This project provides tools for processing UK Met Office Rain Radar NIMROD image
## Overview
The project consists of a main pipeline workflow that processes multiple modules in sequence:
- `main.py`: Main pipeline orchestrator that calls on the modules as needed
- `batch_nimrod.py`: Module for batch processing multiple NIMROD files with configurable bounding boxes
- `generate_timeseries.py`: Module for extracting cropped rain data and creating rainfall timeseries
@@ -13,22 +14,26 @@ The project consists of a main pipeline workflow that processes multiple modules
## Features
### main.py
- Orchestrates the entire workflow pipeline
- Processes DAT files to ASC format
- Generates timeseries data for specified locations
- Combines grouped CSV files into consolidated datasets
### batch_nimrod.py
- Process multiple NIMROD dat files
- Automatically extract datetime from file data
- Export clipped raster data to ASC format
### generate_timeseries.py
- Extract cropped rain data based on specified locations
- Create rainfall timeseries CSVs for each location
- Parse datetime from filename and create proper datetime index
### combine_timeseries.py
- Combine multiple timeseries CSV files into grouped datasets
- Group locations by specified output groups
- Create consolidated CSV files for each group
@@ -46,32 +51,34 @@ It is recommended to use UV for environment and package handling.
1. Adjust the config.py file to match your needs.
1. Ensure your .dat files are in the DAT_TOP_FOLDER (as per config location)
1. Ensure your zone csv files are in the ZONE_FOLDER (as per config location)
1. RunMain Pipeline `uv run main.py
1. RunMain Pipeline `uv run main.py` Note that you will have to set your environment variable `PYTHON_GIL=0` first
1. find the output in the COMBINED_FOLDER (as per config location)
The main pipeline will:
1. Process DAT files to ASC format if needed
2. Generate timeseries data for specified locations
3. Combine grouped CSV files into consolidated datasets
1. Generate timeseries data for specified locations
1. Combine grouped CSV files into consolidated datasets
## Configuration
The `config.py` file defines folder paths:
- DAT_TOP_FOLDER: "./dat_files"
- ASC_TOP_FOLDER: "./asc_files"
- CSV_TOP_FOLDER: "./csv_files"
- COMBINED_FOLDER: "./combined_files"
Example of how the zone csv files should look:
```
filler, zone_name, easting, northing, other_filler, last_filler, zone_number
aa, TM0816, 608500, 216500, a, a, 1
aa, TF6842, 568500, 342500, a, a, 1
```csv
1K Grid, easting, northing, zone_number
TM0816, 608500, 216500, 1
TF6842, 568500, 342500, 1
```
## Acknowledgments
Thank you to the following projects for their inspiration and code:
* [Richard Thomas - Original Nimrod dat to asc file conversion](https://github.com/richard-thomas/MetOffice_NIMROD)
* [Declan Valters - building the timeseries from the asc files](https://github.com/dvalters/NIMROD-toolbox)
- [Richard Thomas - Original Nimrod dat to asc file conversion](https://github.com/richard-thomas/MetOffice_NIMROD)
- [Declan Valters - building the timeseries from the asc files](https://github.com/dvalters/NIMROD-toolbox)
-2
View File
@@ -1,10 +1,8 @@
class Config:
DAT_TOP_FOLDER = "./dat_files"
ASC_TOP_FOLDER = "./asc_files"
CSV_TOP_FOLDER = "./csv_files"
COMBINED_FOLDER = "./combined_files"
ZONE_FOLDER = "./zone_inputs"
delete_dat_after_processing = False
delete_asc_after_processing = True
delete_csv_after_combining = True
+71 -28
View File
@@ -2,10 +2,11 @@ import logging
import time
import os
import csv
import concurrent.futures
from pathlib import Path
from config import Config
from modules import BatchNimrod, GenerateTimeseries, CombineTimeseries
from modules import BatchNimrod, GenerateTimeseries
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
@@ -13,47 +14,89 @@ logging.basicConfig(
if __name__ == "__main__":
os.makedirs(Path(Config.ASC_TOP_FOLDER), exist_ok=True)
os.makedirs(Path(Config.CSV_TOP_FOLDER), exist_ok=True)
os.makedirs(Path(Config.COMBINED_FOLDER), exist_ok=True)
locations = []
#load zone inputs here
zones = set()
# load zone inputs here
for file in os.listdir(Path(Config.ZONE_FOLDER)):
with open(Path(Config.ZONE_FOLDER,file), 'r') as csvfile:
with open(Path(Config.ZONE_FOLDER, file), "r") as csvfile:
reader = csv.reader(csvfile)
header = next(reader) # Skip header row
for row in reader:
# Extract the relevant fields: Ossheet (location ID), Easting, Northing, Zone
zone_id = row[1] # Ossheet column
easting = int(row[2]) # Easting column
northing = int(row[3]) # Northing column
zone = int(row[6]) # ZoneID column
locations.append([zone_id, easting, northing, zone])
# Extract the relevant fields: 1K Grid, Easting, Northing, Zone
grid_name = row[0] # 1k Grid name
easting = int(row[1]) # Easting column
northing = int(row[2]) # Northing column
zone = int(row[3]) # ZoneID column
locations.append([grid_name, easting, northing, zone])
zones.add(zone)
logging.info(f"Count of 1km Grids: {len(locations)}")
logging.info(f"Count of Zones: {len(zones)}")
batch = BatchNimrod(Config)
timeseries = GenerateTimeseries(Config)
combiner = CombineTimeseries(Config, locations)
timeseries = GenerateTimeseries(Config, locations)
start = time.time()
logging.info("Starting to process DAT to ASC")
logging.info(
"Starting interleaved processing of DAT files and Timeseries generation"
)
batch.process_nimrod_files()
batch_checkpoint = time.time()
elapsed_time = batch_checkpoint - start
logging.info(f"DAT to ASC completed in {elapsed_time:.2f} seconds")
# Initialize results structure
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
logging.info("Starting generating timeseries data for all locations.")
place_start = time.time()
timeseries.extract_data_for_all_locations(locations)
place_end = time.time()
place_create_time = place_end - place_start
elapsed_time = place_end - start
logging.info(f"Timeseries generation completed in {place_create_time:.2f} seconds")
logging.info(f"Total time so far {elapsed_time:.2f} seconds")
def process_pipeline(dat_file):
# 1. Process DAT to ASC
asc_file = batch._process_single_file(dat_file)
if not asc_file:
return None
logging.info("combining CSVs into groups")
combiner.combine_csv_files()
logging.info("CSVs combined!")
# 2. Extract data from ASC
file_results = timeseries.process_asc_file(asc_file, locations)
return file_results
# Get list of DAT files
dat_files = [
f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith(".")
]
total_files = len(dat_files)
logging.info(f"Processing {total_files} files concurrently...")
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_file = {
executor.submit(process_pipeline, dat_file): dat_file
for dat_file in dat_files
}
completed_count = 0
try:
for future in concurrent.futures.as_completed(future_to_file):
file_results = future.result()
if file_results:
for res in file_results:
zone_id = res["zone_id"]
results[zone_id]["dates"].append(res["date"])
results[zone_id]["values"].append(res["value"])
completed_count += 1
if completed_count % 100 == 0:
elapsed_time = time.time() - start
files_per_minute = (completed_count / elapsed_time) * 60
remaining_files = total_files - completed_count
eta_minutes = remaining_files / (files_per_minute / 60) / 60
logging.info(f"""Processed {completed_count} out of {total_files} files.
Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes""")
except KeyboardInterrupt:
logging.warning("KeyboardInterrupt received. Cancelling pending tasks...")
executor.shutdown(wait=False, cancel_futures=True)
raise
elapsed_time = time.time() - start
logging.info(f"Interleaved processing completed in {elapsed_time:.2f} seconds")
logging.info("Writing CSV files...")
timeseries.write_results_to_csv(results, locations)
end = time.time()
elapsed_time = end - start
+1 -3
View File
@@ -1,11 +1,9 @@
from .nimrod import Nimrod
from .batch_nimrod import BatchNimrod
from .generate_timeseries import GenerateTimeseries
from .combine_timeseries import CombineTimeseries
__all__ = [
"Nimrod",
"BatchNimrod",
"GenerateTimeseries",
"CombineTimeseries"
]
]
+26 -15
View File
@@ -5,27 +5,26 @@ import logging
import concurrent.futures
class BatchNimrod:
def __init__(self, config) -> None:
self.config = config
def _process_single_file(self, in_file):
"""Process a single Nimrod DAT file.
Args:
in_file (str): Filename of the DAT file.
Returns:
bool: True if successful, False otherwise.
"""
in_file_full = Path(self.config.DAT_TOP_FOLDER, in_file)
try:
# We need to open the file here, inside the thread
with open(in_file_full, "rb") as f:
image = Nimrod(f)
out_file_name = f"{image.get_validity_time()}.asc"
out_file_path = Path(self.config.ASC_TOP_FOLDER, out_file_name)
@@ -36,7 +35,7 @@ class BatchNimrod:
os.remove(in_file_full)
logging.debug(f"Successfully processed: {in_file_full}")
return True
return out_file_name
except Nimrod.HeaderReadError as e:
logging.error(f"Failed to read file {in_file_full}, is it corrupt?")
@@ -59,21 +58,33 @@ class BatchNimrod:
box for each area, and exports clipped raster data to OUT_TOP_FOLDER.
"""
# Read all file names in the folder
files_to_process = [f for f in os.listdir(Path(self.config.DAT_TOP_FOLDER)) if not f.startswith('.')]
files_to_process = [
f
for f in os.listdir(Path(self.config.DAT_TOP_FOLDER))
if not f.startswith(".")
]
total_files = len(files_to_process)
logging.info(f"Processing {total_files} files concurrently...")
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit all tasks
future_to_file = {
executor.submit(self._process_single_file, in_file): in_file
executor.submit(self._process_single_file, in_file): in_file
for in_file in files_to_process
}
completed_count = 0
for future in concurrent.futures.as_completed(future_to_file):
completed_count += 1
if completed_count % 10 == 0:
logging.info(f'processed {completed_count} out of {total_files} files')
completed_count = 0
try:
for future in concurrent.futures.as_completed(future_to_file):
completed_count += 1
if completed_count % 10 == 0:
logging.info(
f"processed {completed_count} out of {total_files} files"
)
except KeyboardInterrupt:
logging.warning(
"KeyboardInterrupt received. Cancelling pending tasks..."
)
executor.shutdown(wait=False, cancel_futures=True)
raise
-37
View File
@@ -1,37 +0,0 @@
import polars as pd
import os
class CombineTimeseries:
def __init__(self, config, locations):
self.config = config
self.locations = locations
self.grouped_locations = {}
self.build_location_groups()
def build_location_groups(self):
for location in self.locations:
group = location[3] # zone number
if group not in self.grouped_locations:
self.grouped_locations[group] = []
self.grouped_locations[group].append(location)
def combine_csv_files(self):
for group, loc_list in self.grouped_locations.items():
combined_df = None
for loc in loc_list:
csv_to_load = f"./csv_files/{loc[0]}_timeseries_data.csv"
df = pd.read_csv(csv_to_load)
if combined_df is None:
combined_df = df
else:
combined_df = combined_df.join(df, on='datetime')
if self.config.delete_csv_after_combining:
os.remove(csv_to_load)
output_file = (
f"{self.config.COMBINED_FOLDER}/zone_{group}_timeseries_data.csv"
)
sorted_df = combined_df.sort('datetime')
sorted_df.write_csv(output_file)
+91 -48
View File
@@ -5,12 +5,13 @@ import polars as pd
from datetime import datetime
import os
import concurrent.futures
import logging
class GenerateTimeseries:
def __init__(self, config):
def __init__(self, config, locations):
self.config = config
self.locations = locations
def _read_ascii_header(self, ascii_raster_file: str) -> list:
"""Reads header information from an ASCII DEM
@@ -63,19 +64,19 @@ class GenerateTimeseries:
return int(start_col), int(start_row), int(end_col), int(end_row)
def _process_single_file(self, file_name, locations):
def process_asc_file(self, file_name, locations):
"""Process a single ASC file and extract data for all locations.
Args:
file_name (str): Name of the ASC file.
locations (list): List of locations.
Returns:
list: A list of dictionaries containing extracted data for each location,
list: A list of dictionaries containing extracted data for each location,
or None if processing fails.
Format: [{'zone_id': id, 'date': datetime, 'value': float}, ...]
"""
if not file_name.endswith('.asc'):
if not file_name.endswith(".asc"):
return None
file_path = Path(self.config.ASC_TOP_FOLDER, file_name)
@@ -83,7 +84,7 @@ class GenerateTimeseries:
try:
radar_header = self._read_ascii_header(str(file_path))
# Read grid once
cur_rawgrid = np.loadtxt(file_path, skiprows=6, dtype=float, delimiter=None)
@@ -96,14 +97,14 @@ class GenerateTimeseries:
# Extract data for each location
for location in locations:
zone_id = location[0]
# Calculate crop coordinates
start_col, start_row, end_col, end_row = self._calculate_crop_coords(
location, radar_header
)
cur_croppedrain = cur_rawgrid[start_row:end_row, start_col:end_col]
if cur_croppedrain.size > 2:
val = cur_croppedrain.flatten()[2] / 32
else:
@@ -111,17 +112,12 @@ class GenerateTimeseries:
# print(f"Warning: Crop too small for {zone_id} in {file_name}")
val = 0.0
results.append({
'zone_id': zone_id,
'date': parsed_date,
'value': val
})
results.append({"zone_id": zone_id, "date": parsed_date, "value": val})
if self.config.delete_asc_after_processing:
os.remove(file_path)
return results
except Exception as e:
print(f"Error processing file {file_name}: {e}")
@@ -134,7 +130,7 @@ class GenerateTimeseries:
locations (list): List of location data [zone_id, easting, northing, zone]
"""
# Initialize data structure to hold results: {zone_id: {'dates': [], 'values': []}}
results = {loc[0]: {'dates': [], 'values': []} for loc in locations}
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
# Get list of ASC files
asc_files = sorted(os.listdir(Path(self.config.ASC_TOP_FOLDER)))
@@ -143,50 +139,97 @@ class GenerateTimeseries:
# Use ThreadPoolExecutor for concurrent processing
# Since we are using Python 3.14t (free-threaded), this should scale well even for CPU work
# mixed with I/O.
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit all tasks
future_to_file = {
executor.submit(self._process_single_file, file_name, locations): file_name
executor.submit(self.process_asc_file, file_name, locations): file_name
for file_name in asc_files
}
completed_count = 0
for future in concurrent.futures.as_completed(future_to_file):
file_results = future.result()
if file_results:
for res in file_results:
zone_id = res['zone_id']
results[zone_id]['dates'].append(res['date'])
results[zone_id]['values'].append(res['value'])
completed_count += 1
if completed_count % 100 == 0:
print(f"Processed {completed_count}/{total_files} files")
try:
for future in concurrent.futures.as_completed(future_to_file):
file_results = future.result()
if file_results:
for res in file_results:
zone_id = res["zone_id"]
results[zone_id]["dates"].append(res["date"])
results[zone_id]["values"].append(res["value"])
# Write CSVs for each location
print("Writing CSV files...")
for location in locations:
zone_id = location[0]
data = results[zone_id]
if not data['dates']:
print(f"No data found for {zone_id}")
continue
completed_count += 1
if completed_count % 100 == 0:
print(f"Processed {completed_count}/{total_files} files")
except KeyboardInterrupt:
print("KeyboardInterrupt received. Cancelling pending tasks...")
executor.shutdown(wait=False, cancel_futures=True)
raise
df = pd.DataFrame({"datetime": data['dates'], zone_id: data['values']})
def write_results_to_csv(self, results, locations):
"""Write extracted data to CSV files for each zone.
# Sort the dataframe into date order
Args:
results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}}
locations (list): List of location data [zone_id, easting, northing, zone]
"""
# Group results by zone and collect all unique dates
zone_data = {}
for loc in locations:
zone_id = loc[0]
zone_name = loc[3]
if zone_name not in zone_data:
zone_data[zone_name] = {"dates": [], "values": {}}
zone_data[zone_name]["values"][zone_id] = results[zone_id]["values"]
zone_data[zone_name]["dates"].extend(results[zone_id]["dates"])
# Get unique sorted dates across all zones
for zone_name, data in zone_data.items():
data["dates"] = sorted(set(data["dates"]))
# Now write one CSV per zone with aligned timestamps
for zone_name, data in zone_data.items():
dates = data["dates"]
values_dict = data["values"]
# Create aligned DataFrame
df_dict = {"datetime": dates}
for grid_square, values in values_dict.items():
# Align values to the common dates
aligned_values = []
value_iter = iter(values)
date_iter = iter(dates)
current_date = next(date_iter, None)
current_value = next(value_iter, None)
for expected_date in dates:
if current_date == expected_date:
aligned_values.append(current_value)
try:
current_date = next(date_iter)
current_value = next(value_iter)
except StopIteration:
current_date = None
current_value = None
else:
aligned_values.append(None) # Missing value
df_dict[grid_square] = aligned_values
df = pd.DataFrame(df_dict)
# Sort by datetime (already sorted)
sorted_df = df.sort("datetime")
# Format datetime column
sorted_df = sorted_df.with_columns(
pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S")
)
output_path = Path(self.config.CSV_TOP_FOLDER) / f"{zone_id}_timeseries_data.csv"
sorted_df.write_csv(
output_path,
float_precision=4
output_path = (
Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv"
)
print("All CSV files written.")
sorted_df.write_csv(output_path, float_precision=4)
logging.info("All CSV files written.")
+7 -5
View File
@@ -258,12 +258,12 @@ class Nimrod:
# Read data as big-endian 16-bit integers
# numpy.frombuffer is efficient for reading from bytes
data_bytes = infile.read(array_size * 2)
self.data = np.frombuffer(data_bytes, dtype='>h').astype(np.int16)
self.data = np.frombuffer(data_bytes, dtype=">h").astype(np.int16)
# Reshape to (nrows, ncols) for easier 2D manipulation
# Note: NIMROD data is row-major (C-style), starting from top-left
self.data = self.data.reshape((self.nrows, self.ncols))
except Exception:
infile.close()
raise Nimrod.PayloadReadError
@@ -392,7 +392,9 @@ class Nimrod:
# Use numpy slicing to extract the sub-array
# Note: y indices correspond to rows, x indices to columns
# Slicing is [start:end], so we need +1 for the end index
self.data = self.data[yMinPixelId : yMaxPixelId + 1, xMinPixelId : xMaxPixelId + 1]
self.data = self.data[
yMinPixelId : yMaxPixelId + 1, xMinPixelId : xMaxPixelId + 1
]
# Update object where necessary
self.x_right = self.x_left + xMaxPixelId * self.x_pixel_size
@@ -435,7 +437,7 @@ class Nimrod:
# Write raster data to output file using numpy.savetxt
# This is significantly faster than iterating in Python
np.savetxt(outfile, self.data, fmt='%d', delimiter=' ')
np.savetxt(outfile, self.data, fmt="%d", delimiter=" ")
outfile.close()
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "met-office"
version = "1.0.0"
version = "1.1.0"
description = "Convert .dat nimrod files to .asc files"
readme = "README.md"
requires-python = ">=3.14"
Generated
+1 -1
View File
@@ -4,7 +4,7 @@ requires-python = ">=3.14"
[[package]]
name = "met-office"
version = "1.0.0"
version = "1.1.0"
source = { virtual = "." }
dependencies = [
{ name = "numpy" },