5 Commits

Author SHA1 Message Date
Jake 9aaf8a5e88 Now deleting existing combined csv files after confirmation at start. 2025-12-15 10:13:11 +00:00
Jake-Pullen a43edb1148 Extraction streamlining (#3)
* feat:  added the extraction process into the main multi threaded loop
Also added a warning when the app finds existing CSV files in the combined folder

* fix: 🐛 Fixed time calculations for ETA & Completion
2025-12-12 19:56:14 +00:00
Jake-Pullen ad6b31e644 Merge pull request #2 from Jake-Pullen/extraction
feat:  Extraction now part of the main workflow
2025-12-11 08:52:55 +00:00
Jake d386317957 feat: Extraction now part of the main workflow 2025-12-11 08:47:29 +00:00
Jake 1c6418e044 docs: 📜 Added some more clarity on the readme 2025-12-10 08:32:03 +00:00
9 changed files with 309 additions and 79 deletions
+2
View File
@@ -10,6 +10,8 @@ wheels/
.venv
dat_other/*
tar_files/*
gz_files/*
dat_files/*
asc_files/*
csv_files/*
+25 -12
View File
@@ -1,6 +1,6 @@
# UK Met Office Rain Radar NIMROD Data Processor
This project provides tools for processing UK Met Office Rain Radar NIMROD image files. It allows extraction of raster data from NIMROD .dat format files and conversion to ESRI ASCII (.asc) format. It also allows the creation of timeseries data from the ASC files.
This project provides tools for processing UK Met Office Rain Radar NIMROD image files. It allows extraction of raster data from NIMROD .dat format files and conversion to ESRI ASCII (.asc) format. It also allows the creation of timeseries data from the ASC files, formatted for Infoworks ICM.
## Overview
@@ -9,15 +9,23 @@ The project consists of a main pipeline workflow that processes multiple modules
- `main.py`: Main pipeline orchestrator that calls on the modules as needed
- `batch_nimrod.py`: Module for batch processing multiple NIMROD files with configurable bounding boxes
- `generate_timeseries.py`: Module for extracting cropped rain data and creating rainfall timeseries
- `extract.py`: Module for extracting the dat files from the .gz.tar files that are downloaded from source
## Features
### main.py
- Orchestrates the entire workflow pipeline
- Processes DAT files to ASC format
- Generates timeseries data for specified locations
- Combines grouped CSV files into consolidated datasets
- **Startup Safety Check**: Scans the `COMBINED_FOLDER` at startup and warns the user if existing files are found, Deleting existing files if continue is accepted.
- **Batch Processing**: Processes input tar files in configurable batches to manage resource usage.
- **End-to-End Processing**: Extracts GZ files, processes DAT/ASC, and appends to CSV in a single thread per file.
- **Concurrency**: Uses multi-threading to process individual GZ files within a batch concurrently.
- **Cumulative Data**: Automatically appends new query results to the existing CSV files in `COMBINED_FOLDER` for each batch, ensuring no data is lost and columns are correctly aligned.
- **Dynamic ETA**: Provides a real-time estimate of completion time.
### extract.py
- Converts all .gz.tar files first to 288 (1 day) of .gz files
- Converts all .gz files to .dat files ready for processing.
### batch_nimrod.py
@@ -44,24 +52,29 @@ It is recommended to use UV for environment and package handling.
1. Ensure all required packages are installed `uv sync`
1. Adjust the config.py file to match your needs.
1. Ensure your .dat files are in the DAT_TOP_FOLDER (as per config location)
1. Ensure your .gz.tar files are in the TAR_TOP_FOLDER (as per config location)
1. Ensure your zone csv files are in the ZONE_FOLDER (as per config location)
1. RunMain Pipeline `uv run main.py` Note that you will have to set your environment variable `PYTHON_GIL=0` first
1. find the output in the COMBINED_FOLDER (as per config location)
The main pipeline will:
1. Process DAT files to ASC format if needed
1. Uncompress the .gz.tar files ready for processing
1. Process DAT files to ASC format
1. Generate timeseries data for specified locations
1. Combine grouped CSV files into consolidated datasets
1. Combine grouped locations into consolidated datasets
## Configuration
The `config.py` file defines folder paths:
The `config.py` file defines folder paths and file deletion options:
- DAT_TOP_FOLDER: "./dat_files"
- ASC_TOP_FOLDER: "./asc_files"
- COMBINED_FOLDER: "./combined_files"
- TAR_TOP_FOLDER = "./tar_files"
- GZ_TOP_FOLDER = "./gz_files"
- DAT_TOP_FOLDER = "./dat_files"
- ASC_TOP_FOLDER = "./asc_files"
- COMBINED_FOLDER = "./combined_files"
- ZONE_FOLDER = "./zone_inputs"
- BATCH_SIZE = 5 (Number of tar files to process per batch)
Example of how the zone csv files should look:
+8 -1
View File
@@ -1,8 +1,15 @@
class Config:
TAR_TOP_FOLDER = "./tar_files"
GZ_TOP_FOLDER = "./gz_files"
DAT_TOP_FOLDER = "./dat_files"
ASC_TOP_FOLDER = "./asc_files"
COMBINED_FOLDER = "./combined_files"
ZONE_FOLDER = "./zone_inputs"
delete_dat_after_processing = False
delete_tar_after_processing = False
delete_gz_after_processing = True
delete_dat_after_processing = True
delete_asc_after_processing = True
BATCH_SIZE = 5
+146 -29
View File
@@ -4,27 +4,67 @@ import os
import csv
import concurrent.futures
from pathlib import Path
import shutil
from config import Config
from modules import BatchNimrod, GenerateTimeseries
from modules import BatchNimrod, GenerateTimeseries, Extract
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
def process_pipeline(dat_file):
# 1. Process DAT to ASC
asc_file = batch._process_single_file(dat_file)
if not asc_file:
def process_pipeline(gz_file_path):
# 1. Extract GZ to DAT
gz_path = Path(gz_file_path)
# The dat file name is derived from the gz file name (removing .gz or .dat.gz)
# gz files are named like 'NAME.dat.gz' often.
dat_filename = gz_path.name.replace(".gz", "")
dat_path = Path(Config.DAT_TOP_FOLDER, dat_filename)
# Extract
try:
extraction.process_single_gz(gz_path, dat_path)
except Exception as e:
logging.error(f"Failed to extract {gz_path}: {e}")
return None
# 2. Extract data from ASC
if not dat_path.exists():
logging.error(f"DAT file not found after extraction: {dat_path}")
return None
# 2. Process DAT to ASC
# BatchNimrod._process_single_file expects just the filename, not full path
asc_file = batch._process_single_file(dat_filename)
if not asc_file:
# Cleanup failed DAT file if needed (BatchNimrod might have done it or not)
if Config.delete_dat_after_processing and dat_path.exists():
try:
os.remove(dat_path)
except OSError:
pass
return None
# 3. Extract data from ASC
file_results = timeseries.process_asc_file(asc_file, locations)
return file_results
def initialise_folders():
folder_list = [
Config.ASC_TOP_FOLDER,
Config.COMBINED_FOLDER,
Config.GZ_TOP_FOLDER,
Config.DAT_TOP_FOLDER,
Config.TAR_TOP_FOLDER,
]
for path in folder_list:
Path(path).mkdir(exist_ok=True)
if __name__ == "__main__":
os.makedirs(Path(Config.ASC_TOP_FOLDER), exist_ok=True)
os.makedirs(Path(Config.COMBINED_FOLDER), exist_ok=True)
initialise_folders()
locations = []
zones = set()
@@ -44,29 +84,72 @@ if __name__ == "__main__":
logging.info(f"Count of 1km Grids: {len(locations)}")
logging.info(f"Count of Zones: {len(zones)}")
# Check for existing combined files
existing_combined = os.listdir(Config.COMBINED_FOLDER)
if existing_combined:
logging.warning("!" * 80)
logging.warning(
f"Found {len(existing_combined)} files in {Config.COMBINED_FOLDER}"
)
logging.warning(
"If you continue these WILL BE DELETED, Please make sure you have them saved."
)
logging.warning("!" * 80)
response = input("Continue? (Y/N): ").strip().lower()
if response != "y":
logging.info("Aborting...")
exit(0)
else:
shutil.rmtree(Path(Config.COMBINED_FOLDER)) # Delete everything including the directory
Path(Config.COMBINED_FOLDER).mkdir()
extraction = Extract(Config)
batch = BatchNimrod(Config)
timeseries = GenerateTimeseries(Config, locations)
start = time.time()
logging.info(
"Starting interleaved processing of DAT files and Timeseries generation"
"Starting interleaved processing of GZ files -> DAT -> ASC -> Timeseries"
)
# Initialize results structure
# Get list of all tar files
all_tar_files = [f for f in os.listdir(Config.TAR_TOP_FOLDER) if f.endswith(".tar")]
all_tar_files.sort()
total_tars = len(all_tar_files)
files_per_tar = 288
estimated_total_files = total_tars * files_per_tar
logging.info(f"Found {total_tars} tar files to process")
# Process in batches
for i in range(0, total_tars, Config.BATCH_SIZE):
batch_files = all_tar_files[i : i + Config.BATCH_SIZE]
logging.info(
f"Processing batch {i // Config.BATCH_SIZE + 1}: {len(batch_files)} tar files"
)
# Initialize results structure for this batch
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
# Get list of DAT files
dat_files = [
f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith(".")
]
total_files = len(dat_files)
# 1. Extract batch (TAR -> GZ)
logging.info("Extracting tar files for batch")
extraction.extract_tar_batch(batch_files)
logging.info(f"Processing {total_files} files concurrently...")
gz_files_to_process = []
for tar_file in batch_files:
extract_folder = Path(Config.GZ_TOP_FOLDER, tar_file.replace(".tar", ""))
if extract_folder.exists():
for root, _, files in os.walk(extract_folder):
for file in files:
if file.endswith(".gz"):
gz_files_to_process.append(Path(root, file))
total_files = len(gz_files_to_process)
logging.info(f"Found {total_files} GZ files to process concurrently...")
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_file = {
executor.submit(process_pipeline, dat_file): dat_file
for dat_file in dat_files
executor.submit(process_pipeline, gz_file): gz_file
for gz_file in gz_files_to_process
}
completed_count = 0
@@ -81,23 +164,57 @@ if __name__ == "__main__":
completed_count += 1
if completed_count % 100 == 0:
files_processed_previous = i * files_per_tar
files_processed_so_far = (
files_processed_previous + completed_count
)
elapsed_time = time.time() - start
files_per_minute = (completed_count / elapsed_time) * 60
remaining_files = total_files - completed_count
eta_minutes = remaining_files / (files_per_minute / 60) / 60
logging.info(f"""Processed {completed_count} out of {total_files} files.
Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes""")
rate_per_second = files_processed_so_far / elapsed_time
remaining_files = estimated_total_files - files_processed_so_far
if rate_per_second > 0:
eta_seconds = remaining_files / rate_per_second
if eta_seconds < 60:
eta_str = f"{int(eta_seconds)}s"
elif eta_seconds < 3600:
eta_str = f"{int(eta_seconds // 60)}m {int(eta_seconds % 60)}s"
else:
eta_str = f"{int(eta_seconds // 3600)}h {int((eta_seconds % 3600) // 60)}m"
else:
eta_str = "Unknown"
logging.info(f"""Progress: {files_processed_so_far}/{estimated_total_files} files ({files_processed_so_far / estimated_total_files * 100:.1f}%)
Speed: {rate_per_second * 60:.2f} files/min. ETA: {eta_str}""")
except KeyboardInterrupt:
logging.warning("KeyboardInterrupt received. Cancelling pending tasks...")
logging.warning(
"KeyboardInterrupt received. Cancelling pending tasks..."
)
executor.shutdown(wait=False, cancel_futures=True)
raise
elapsed_time = time.time() - start
logging.info(f"Interleaved processing completed in {elapsed_time:.2f} seconds")
logging.info("Appending batch results to CSV files...")
timeseries.append_results_to_csv(results, locations)
logging.info("Writing CSV files...")
timeseries.write_results_to_csv(results, locations)
# Cleanup GZ folders for this batch
# We loop through batch_files again to delete the folders we created
for tar_file in batch_files:
extract_folder = Path(Config.GZ_TOP_FOLDER, tar_file.replace(".tar", ""))
if extract_folder.exists():
try:
shutil.rmtree(extract_folder)
except OSError as e:
logging.warning(f"Failed to remove GZ folder {extract_folder}: {e}")
end = time.time()
elapsed_time = end - start
logging.info(f"All Complete total time {elapsed_time:.2f} seconds")
if elapsed_time < 60:
elapsed_time_str = f"{int(elapsed_time)}s"
elif elapsed_time < 3600:
elapsed_time_str = f"{int(elapsed_time // 60)}m {int(elapsed_time % 60)}s"
else:
elapsed_time_str = f"{int(elapsed_time // 3600)}h {int((elapsed_time % 3600) // 60)}m"
logging.info(f"All Complete total time {elapsed_time_str}")
+2 -5
View File
@@ -1,9 +1,6 @@
from .nimrod import Nimrod
from .batch_nimrod import BatchNimrod
from .generate_timeseries import GenerateTimeseries
from .extract import Extract
__all__ = [
"Nimrod",
"BatchNimrod",
"GenerateTimeseries",
]
__all__ = ["Nimrod", "BatchNimrod", "GenerateTimeseries", "Extract"]
+72
View File
@@ -0,0 +1,72 @@
import tarfile
import gzip
import shutil
import os
from pathlib import Path
import concurrent.futures
class Extract:
# Directory containing .tar files
def __init__(self, Config):
self.config = Config
def extract_tar_batch(self, tar_files):
for tar_file in tar_files:
tar_path = Path(self.config.TAR_TOP_FOLDER, tar_file)
# Create a folder for extracted tar contents
extract_folder = Path(
self.config.GZ_TOP_FOLDER, tar_file.replace(".tar", "")
)
Path(extract_folder).mkdir(exist_ok=True)
# Extract .tar file
with tarfile.open(tar_path, "r") as tar:
tar.extractall(path=extract_folder)
if self.config.delete_tar_after_processing:
os.remove(tar_path)
def process_single_gz(self, gz_path, dat_path):
try:
with gzip.open(gz_path, "rb") as f_in:
with open(dat_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
if self.config.delete_gz_after_processing:
os.remove(gz_path)
except Exception as e:
print(f"Error extracting {gz_path}: {e}")
def extract_gz_batch(self):
gz_tasks = []
for root, _, files in os.walk(self.config.GZ_TOP_FOLDER):
for file in files:
# only handle .gz files
if not file.endswith(".dat.gz"):
continue
gz_path = Path(root, file)
dat_path = Path(self.config.DAT_TOP_FOLDER, file.replace(".gz", ""))
gz_tasks.append((gz_path, dat_path))
print(f"Extracting {len(gz_tasks)} gz files concurrently...")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.process_single_gz, gz_path, dat_path)
for gz_path, dat_path in gz_tasks
]
concurrent.futures.wait(futures)
try:
shutil.rmtree(self.config.GZ_TOP_FOLDER)
# Recreate the folder for the next batch
Path(self.config.GZ_TOP_FOLDER).mkdir(exist_ok=True)
print("processing complete and GZ files deleted")
except Exception as e:
print(str(e))
print(
f"processing complete but GZ folder delete failed. Please delete manually ({self.config.GZ_TOP_FOLDER})"
)
+31 -9
View File
@@ -164,8 +164,8 @@ class GenerateTimeseries:
executor.shutdown(wait=False, cancel_futures=True)
raise
def write_results_to_csv(self, results, locations):
"""Write extracted data to CSV files for each zone.
def append_results_to_csv(self, results, locations):
"""Append extracted data to CSV files for each zone.
Args:
results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}}
@@ -217,19 +217,41 @@ class GenerateTimeseries:
df_dict[grid_square] = aligned_values
df = pd.DataFrame(df_dict)
new_df = pd.DataFrame(df_dict)
# Sort by datetime (already sorted)
sorted_df = df.sort("datetime")
# Format datetime column
sorted_df = sorted_df.with_columns(
# Format datetime column in new_df
new_df = new_df.with_columns(
pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S")
)
output_path = (
Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv"
)
if output_path.exists():
# Load existing CSV
existing_df = pd.read_csv(output_path)
# Reorder new_df to match existing_df
new_df = new_df.select(existing_df.columns)
# Concatenate
combined_df = pd.concat([existing_df, new_df])
# Sort by datetime
combined_df = combined_df.sort("datetime")
# Write back
combined_df.write_csv(output_path, float_precision=4)
else:
# Write new CSV
# Sort columns to ensure deterministic order (datetime first)
cols = new_df.columns
cols.remove("datetime")
cols.sort()
sorted_cols = ["datetime"] + cols
new_df = new_df.select(sorted_cols)
# Sort by datetime (already sorted but good practice)
sorted_df = new_df.sort("datetime")
sorted_df.write_csv(output_path, float_precision=4)
logging.info("All CSV files written.")
logging.info("All CSV files updated.")
+2 -2
View File
@@ -1,7 +1,7 @@
[project]
name = "met-office"
version = "1.1.1"
description = "Convert .dat nimrod files to .asc files"
version = "1.3.2"
description = "Convert nimrod files to .csv timeseries"
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
Generated
+1 -1
View File
@@ -4,7 +4,7 @@ requires-python = ">=3.14"
[[package]]
name = "met-office"
version = "1.1.1"
version = "1.3.2"
source = { virtual = "." }
dependencies = [
{ name = "numpy" },