5 Commits

Author SHA1 Message Date
Jake 5da185a826 chore: 🧹 Ruff clean up 2025-12-24 15:32:41 +00:00
Jake 1d21ab5f36 fix: 🐞 Fixed an ordering issue when saving to CSV 2025-12-24 15:31:36 +00:00
Jake 0e682aca35 docs: 📜 ReadMe Clarity tweaks 2025-12-17 09:54:46 +00:00
Jake-Pullen 354f4c7fc6 Now deleting existing combined csv files after confirmation at start. (#4) 2025-12-15 10:17:27 +00:00
Jake-Pullen a43edb1148 Extraction streamlining (#3)
* feat:  added the extraction process into the main multi threaded loop
Also added a warning when the app finds existing CSV files in the combined folder

* fix: 🐛 Fixed time calculations for ETA & Completion
2025-12-12 19:56:14 +00:00
7 changed files with 241 additions and 117 deletions
+9 -6
View File
@@ -15,11 +15,13 @@ The project consists of a main pipeline workflow that processes multiple modules
### main.py ### main.py
- Orchestrates the entire workflow pipeline - **Startup Safety Check**: Scans the `COMBINED_FOLDER` at startup and warns the user if existing files are found, Deleting existing files if continue is accepted.
- Uncompress the packed .gz.tar files to DAT files - **Batch Processing**: Processes input tar files in configurable batches to manage resource usage.
- Processes DAT files to ASC format - **Tidy by Default**: Default settings wil delete all mid step files and keep only the original Tar files. Can be changed in config.py
- Generates timeseries data for specified locations - **End-to-End Processing**: Extracts GZ files, processes DAT/ASC, and appends to CSV in a single thread per file.
- Combines grouped CSV files into consolidated datasets formatted for Infoworks ICM - **Concurrency**: Uses multi-threading to process individual GZ files within a batch concurrently.
- **Cumulative Data**: Automatically appends new query results to the existing CSV files in `COMBINED_FOLDER` for each batch, ensuring no data is lost and columns are correctly aligned.
- **Dynamic ETA**: Provides a real-time estimate of completion time.
### extract.py ### extract.py
@@ -30,7 +32,7 @@ The project consists of a main pipeline workflow that processes multiple modules
- Process multiple NIMROD dat files - Process multiple NIMROD dat files
- Automatically extract datetime from file data - Automatically extract datetime from file data
- Export clipped raster data to ASC format - Export raster data to ASC format
### generate_timeseries.py ### generate_timeseries.py
@@ -73,6 +75,7 @@ The `config.py` file defines folder paths and file deletion options:
- ASC_TOP_FOLDER = "./asc_files" - ASC_TOP_FOLDER = "./asc_files"
- COMBINED_FOLDER = "./combined_files" - COMBINED_FOLDER = "./combined_files"
- ZONE_FOLDER = "./zone_inputs" - ZONE_FOLDER = "./zone_inputs"
- BATCH_SIZE = 5 (Number of tar files to process per batch)
Example of how the zone csv files should look: Example of how the zone csv files should look:
+2
View File
@@ -11,3 +11,5 @@ class Config:
delete_gz_after_processing = True delete_gz_after_processing = True
delete_dat_after_processing = True delete_dat_after_processing = True
delete_asc_after_processing = True delete_asc_after_processing = True
BATCH_SIZE = 5
+132 -28
View File
@@ -4,6 +4,7 @@ import os
import csv import csv
import concurrent.futures import concurrent.futures
from pathlib import Path from pathlib import Path
import shutil
from config import Config from config import Config
from modules import BatchNimrod, GenerateTimeseries, Extract from modules import BatchNimrod, GenerateTimeseries, Extract
@@ -13,14 +14,40 @@ logging.basicConfig(
) )
def process_pipeline(dat_file): def process_pipeline(gz_file_path):
# 1. Process DAT to ASC # 1. Extract GZ to DAT
asc_file = batch._process_single_file(dat_file) gz_path = Path(gz_file_path)
if not asc_file: # The dat file name is derived from the gz file name (removing .gz or .dat.gz)
# gz files are named like 'NAME.dat.gz' often.
dat_filename = gz_path.name.replace(".gz", "")
dat_path = Path(Config.DAT_TOP_FOLDER, dat_filename)
# Extract
try:
extraction.process_single_gz(gz_path, dat_path)
except Exception as e:
logging.error(f"Failed to extract {gz_path}: {e}")
return None return None
# 2. Extract data from ASC if not dat_path.exists():
logging.error(f"DAT file not found after extraction: {dat_path}")
return None
# 2. Process DAT to ASC
# BatchNimrod._process_single_file expects just the filename, not full path
asc_file = batch._process_single_file(dat_filename)
if not asc_file:
# Cleanup failed DAT file if needed (BatchNimrod might have done it or not)
if Config.delete_dat_after_processing and dat_path.exists():
try:
os.remove(dat_path)
except OSError:
pass
return None
# 3. Extract data from ASC
file_results = timeseries.process_asc_file(asc_file, locations) file_results = timeseries.process_asc_file(asc_file, locations)
return file_results return file_results
@@ -57,33 +84,74 @@ if __name__ == "__main__":
logging.info(f"Count of 1km Grids: {len(locations)}") logging.info(f"Count of 1km Grids: {len(locations)}")
logging.info(f"Count of Zones: {len(zones)}") logging.info(f"Count of Zones: {len(zones)}")
# Check for existing combined files
existing_combined = os.listdir(Config.COMBINED_FOLDER)
if existing_combined:
logging.warning("!" * 80)
logging.warning(
f"Found {len(existing_combined)} files in {Config.COMBINED_FOLDER}"
)
logging.warning(
"If you continue these WILL BE DELETED, Please make sure you have them saved."
)
logging.warning("!" * 80)
response = input("Continue? (Y/N): ").strip().lower()
if response != "y":
logging.info("Aborting...")
exit(0)
else:
shutil.rmtree(
Path(Config.COMBINED_FOLDER)
) # Delete everything including the directory
Path(Config.COMBINED_FOLDER).mkdir()
extraction = Extract(Config) extraction = Extract(Config)
batch = BatchNimrod(Config) batch = BatchNimrod(Config)
timeseries = GenerateTimeseries(Config, locations) timeseries = GenerateTimeseries(Config, locations)
start = time.time() start = time.time()
logging.info( logging.info(
"Starting interleaved processing of DAT files and Timeseries generation" "Starting interleaved processing of GZ files -> DAT -> ASC -> Timeseries"
) )
# Initialize results structure # Get list of all tar files
all_tar_files = [f for f in os.listdir(Config.TAR_TOP_FOLDER) if f.endswith(".tar")]
all_tar_files.sort()
total_tars = len(all_tar_files)
files_per_tar = 288
estimated_total_files = total_tars * files_per_tar
logging.info(f"Found {total_tars} tar files to process")
# Process in batches
for i in range(0, total_tars, Config.BATCH_SIZE):
batch_files = all_tar_files[i : i + Config.BATCH_SIZE]
logging.info(
f"Processing batch {i // Config.BATCH_SIZE + 1}: {len(batch_files)} tar files"
)
# Initialize results structure for this batch
results = {loc[0]: {"dates": [], "values": []} for loc in locations} results = {loc[0]: {"dates": [], "values": []} for loc in locations}
logging.info("Extracting tar and gz files") # 1. Extract batch (TAR -> GZ)
extraction.run_extraction() logging.info("Extracting tar files for batch")
extraction.extract_tar_batch(batch_files)
# Get list of DAT files gz_files_to_process = []
dat_files = [ for tar_file in batch_files:
f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith(".") extract_folder = Path(Config.GZ_TOP_FOLDER, tar_file.replace(".tar", ""))
] if extract_folder.exists():
total_files = len(dat_files) for root, _, files in os.walk(extract_folder):
for file in files:
if file.endswith(".gz"):
gz_files_to_process.append(Path(root, file))
logging.info(f"Processing {total_files} files concurrently...") total_files = len(gz_files_to_process)
logging.info(f"Found {total_files} GZ files to process concurrently...")
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_file = { future_to_file = {
executor.submit(process_pipeline, dat_file): dat_file executor.submit(process_pipeline, gz_file): gz_file
for dat_file in dat_files for gz_file in gz_files_to_process
} }
completed_count = 0 completed_count = 0
@@ -98,23 +166,59 @@ if __name__ == "__main__":
completed_count += 1 completed_count += 1
if completed_count % 100 == 0: if completed_count % 100 == 0:
files_processed_previous = i * files_per_tar
files_processed_so_far = (
files_processed_previous + completed_count
)
elapsed_time = time.time() - start elapsed_time = time.time() - start
files_per_minute = (completed_count / elapsed_time) * 60 rate_per_second = files_processed_so_far / elapsed_time
remaining_files = total_files - completed_count
eta_minutes = remaining_files / (files_per_minute / 60) / 60 remaining_files = estimated_total_files - files_processed_so_far
logging.info(f"""Processed {completed_count} out of {total_files} files.
Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes""") if rate_per_second > 0:
eta_seconds = remaining_files / rate_per_second
if eta_seconds < 60:
eta_str = f"{int(eta_seconds)}s"
elif eta_seconds < 3600:
eta_str = f"{int(eta_seconds // 60)}m {int(eta_seconds % 60)}s"
else:
eta_str = f"{int(eta_seconds // 3600)}h {int((eta_seconds % 3600) // 60)}m"
else:
eta_str = "Unknown"
logging.info(f"""Progress: {files_processed_so_far}/{estimated_total_files} files ({files_processed_so_far / estimated_total_files * 100:.1f}%)
Speed: {rate_per_second * 60:.2f} files/min. ETA: {eta_str}""")
except KeyboardInterrupt: except KeyboardInterrupt:
logging.warning("KeyboardInterrupt received. Cancelling pending tasks...") logging.warning(
"KeyboardInterrupt received. Cancelling pending tasks..."
)
executor.shutdown(wait=False, cancel_futures=True) executor.shutdown(wait=False, cancel_futures=True)
raise raise
elapsed_time = time.time() - start logging.info("Appending batch results to CSV files...")
logging.info(f"Interleaved processing completed in {elapsed_time:.2f} seconds") timeseries.append_results_to_csv(results, locations)
logging.info("Writing CSV files...") # Cleanup GZ folders for this batch
timeseries.write_results_to_csv(results, locations) # We loop through batch_files again to delete the folders we created
for tar_file in batch_files:
extract_folder = Path(Config.GZ_TOP_FOLDER, tar_file.replace(".tar", ""))
if extract_folder.exists():
try:
shutil.rmtree(extract_folder)
except OSError as e:
logging.warning(f"Failed to remove GZ folder {extract_folder}: {e}")
end = time.time() end = time.time()
elapsed_time = end - start elapsed_time = end - start
logging.info(f"All Complete total time {elapsed_time:.2f} seconds") if elapsed_time < 60:
elapsed_time_str = f"{int(elapsed_time)}s"
elif elapsed_time < 3600:
elapsed_time_str = f"{int(elapsed_time // 60)}m {int(elapsed_time % 60)}s"
else:
elapsed_time_str = (
f"{int(elapsed_time // 3600)}h {int((elapsed_time % 3600) // 60)}m"
)
logging.info(f"All Complete total time {elapsed_time_str}")
+30 -20
View File
@@ -3,6 +3,7 @@ import gzip
import shutil import shutil
import os import os
from pathlib import Path from pathlib import Path
import concurrent.futures
class Extract: class Extract:
@@ -10,12 +11,8 @@ class Extract:
def __init__(self, Config): def __init__(self, Config):
self.config = Config self.config = Config
def _extract_tar(self): def extract_tar_batch(self, tar_files):
for tar_file in os.listdir(self.config.TAR_TOP_FOLDER): for tar_file in tar_files:
# only handle .tar files
if not tar_file.endswith(".tar"):
pass
tar_path = Path(self.config.TAR_TOP_FOLDER, tar_file) tar_path = Path(self.config.TAR_TOP_FOLDER, tar_file)
# Create a folder for extracted tar contents # Create a folder for extracted tar contents
@@ -31,32 +28,45 @@ class Extract:
if self.config.delete_tar_after_processing: if self.config.delete_tar_after_processing:
os.remove(tar_path) os.remove(tar_path)
def _extract_gz(self): def process_single_gz(self, gz_path, dat_path):
for root, _, files in os.walk(self.config.GZ_TOP_FOLDER): try:
for file in files:
# only handle .gz files
if not file.endswith(".dat.gz"):
pass # adjust if extension differs
gz_path = Path(root, file)
dat_path = Path(self.config.DAT_TOP_FOLDER, file.replace(".gz", ""))
# Unzip .gz file
with gzip.open(gz_path, "rb") as f_in: with gzip.open(gz_path, "rb") as f_in:
with open(dat_path, "wb") as f_out: with open(dat_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out) shutil.copyfileobj(f_in, f_out)
if self.config.delete_gz_after_processing: if self.config.delete_gz_after_processing:
os.remove(gz_path) os.remove(gz_path)
except Exception as e:
print(f"Error extracting {gz_path}: {e}")
def extract_gz_batch(self):
gz_tasks = []
for root, _, files in os.walk(self.config.GZ_TOP_FOLDER):
for file in files:
# only handle .gz files
if not file.endswith(".dat.gz"):
continue
gz_path = Path(root, file)
dat_path = Path(self.config.DAT_TOP_FOLDER, file.replace(".gz", ""))
gz_tasks.append((gz_path, dat_path))
print(f"Extracting {len(gz_tasks)} gz files concurrently...")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.process_single_gz, gz_path, dat_path)
for gz_path, dat_path in gz_tasks
]
concurrent.futures.wait(futures)
try: try:
shutil.rmtree(self.config.GZ_TOP_FOLDER) shutil.rmtree(self.config.GZ_TOP_FOLDER)
# Recreate the folder for the next batch
Path(self.config.GZ_TOP_FOLDER).mkdir(exist_ok=True)
print("processing complete and GZ files deleted") print("processing complete and GZ files deleted")
except Exception as e: except Exception as e:
print(str(e)) print(str(e))
print( print(
f"processing complete but GZ folder delete failed. Please delete manually ({self.config.GZ_TOP_FOLDER})" f"processing complete but GZ folder delete failed. Please delete manually ({self.config.GZ_TOP_FOLDER})"
) )
def run_extraction(self):
self._extract_tar()
self._extract_gz()
+47 -42
View File
@@ -56,11 +56,11 @@ class GenerateTimeseries:
xpp = ncols_basin * cellres_basin xpp = ncols_basin * cellres_basin
ypp = nrows_basin * cellres_basin ypp = nrows_basin * cellres_basin
start_col = np.floor(xp / cellres_radar) start_col = np.floor(xp / cellres_radar) - 1
end_col = np.ceil((xpp + xp) / cellres_radar) end_col = np.ceil((xpp + xp) / cellres_radar) - 1
start_row = np.floor(nrows_radar - ((yp + ypp) / cellres_radar)) start_row = np.floor(nrows_radar - ((yp + ypp) / cellres_radar)) + 1
end_row = np.ceil(nrows_radar - (yp / cellres_radar)) end_row = np.ceil(nrows_radar - (yp / cellres_radar)) + 1
return int(start_col), int(start_row), int(end_col), int(end_row) return int(start_col), int(start_row), int(end_col), int(end_row)
@@ -164,8 +164,8 @@ class GenerateTimeseries:
executor.shutdown(wait=False, cancel_futures=True) executor.shutdown(wait=False, cancel_futures=True)
raise raise
def write_results_to_csv(self, results, locations): def append_results_to_csv(self, results, locations):
"""Write extracted data to CSV files for each zone. """Append extracted data to CSV files for each zone.
Args: Args:
results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}} results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}}
@@ -178,58 +178,63 @@ class GenerateTimeseries:
zone_name = loc[3] zone_name = loc[3]
if zone_name not in zone_data: if zone_name not in zone_data:
zone_data[zone_name] = {"dates": [], "values": {}} zone_data[zone_name] = {"dates": set(), "values": {}}
zone_data[zone_name]["values"][zone_id] = results[zone_id]["values"] # Create date -> value map for this grid square
zone_data[zone_name]["dates"].extend(results[zone_id]["dates"]) raw_dates = results[zone_id]["dates"]
raw_values = results[zone_id]["values"]
date_value_map = dict(zip(raw_dates, raw_values))
# Get unique sorted dates across all zones zone_data[zone_name]["values"][zone_id] = date_value_map
for zone_name, data in zone_data.items(): zone_data[zone_name]["dates"].update(raw_dates)
data["dates"] = sorted(set(data["dates"]))
# Now write one CSV per zone with aligned timestamps # Now write one CSV per zone with aligned timestamps
for zone_name, data in zone_data.items(): for zone_name, data in zone_data.items():
dates = data["dates"] sorted_dates = sorted(data["dates"])
values_dict = data["values"] values_dict = data["values"]
# Create aligned DataFrame # Create aligned DataFrame
df_dict = {"datetime": dates} df_dict = {"datetime": sorted_dates}
for grid_square, values in values_dict.items(): for grid_square, dv_map in values_dict.items():
# Align values to the common dates # Align values to the common search dates using the map
aligned_values = [] aligned_values = [dv_map.get(d) for d in sorted_dates]
value_iter = iter(values)
date_iter = iter(dates)
current_date = next(date_iter, None)
current_value = next(value_iter, None)
for expected_date in dates:
if current_date == expected_date:
aligned_values.append(current_value)
try:
current_date = next(date_iter)
current_value = next(value_iter)
except StopIteration:
current_date = None
current_value = None
else:
aligned_values.append(None) # Missing value
df_dict[grid_square] = aligned_values df_dict[grid_square] = aligned_values
df = pd.DataFrame(df_dict) new_df = pd.DataFrame(df_dict)
# Sort by datetime (already sorted) # Format datetime column in new_df
sorted_df = df.sort("datetime") new_df = new_df.with_columns(
# Format datetime column
sorted_df = sorted_df.with_columns(
pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S") pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S")
) )
output_path = ( output_path = (
Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv" Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv"
) )
if output_path.exists():
# Load existing CSV
existing_df = pd.read_csv(output_path)
# Reorder new_df to match existing_df
new_df = new_df.select(existing_df.columns)
# Concatenate
combined_df = pd.concat([existing_df, new_df])
# Sort by datetime
combined_df = combined_df.sort("datetime")
# Write back
combined_df.write_csv(output_path, float_precision=4)
else:
# Write new CSV
# Sort columns to ensure deterministic order (datetime first)
cols = new_df.columns
cols.remove("datetime")
cols.sort()
sorted_cols = ["datetime"] + cols
new_df = new_df.select(sorted_cols)
# Sort by datetime (already sorted but good practice)
sorted_df = new_df.sort("datetime")
sorted_df.write_csv(output_path, float_precision=4) sorted_df.write_csv(output_path, float_precision=4)
logging.info("All CSV files written.") logging.info("All CSV files updated.")
+2 -2
View File
@@ -1,7 +1,7 @@
[project] [project]
name = "met-office" name = "met-office"
version = "1.2.0" version = "1.3.2"
description = "Convert .dat nimrod files to .asc files" description = "Convert nimrod files to .csv timeseries"
readme = "README.md" readme = "README.md"
requires-python = ">=3.14" requires-python = ">=3.14"
dependencies = [ dependencies = [
Generated
+1 -1
View File
@@ -4,7 +4,7 @@ requires-python = ">=3.14"
[[package]] [[package]]
name = "met-office" name = "met-office"
version = "1.2.0" version = "1.3.2"
source = { virtual = "." } source = { virtual = "." }
dependencies = [ dependencies = [
{ name = "numpy" }, { name = "numpy" },