2 Commits

Author SHA1 Message Date
Jake 9aaf8a5e88 Now deleting existing combined csv files after confirmation at start. 2025-12-15 10:13:11 +00:00
Jake-Pullen a43edb1148 Extraction streamlining (#3)
* feat:  added the extraction process into the main multi threaded loop
Also added a warning when the app finds existing CSV files in the combined folder

* fix: 🐛 Fixed time calculations for ETA & Completion
2025-12-12 19:56:14 +00:00
7 changed files with 219 additions and 83 deletions
+7 -5
View File
@@ -15,11 +15,12 @@ The project consists of a main pipeline workflow that processes multiple modules
### main.py
- Orchestrates the entire workflow pipeline
- Uncompress the packed .gz.tar files to DAT files
- Processes DAT files to ASC format
- Generates timeseries data for specified locations
- Combines grouped CSV files into consolidated datasets formatted for Infoworks ICM
- **Startup Safety Check**: Scans the `COMBINED_FOLDER` at startup and warns the user if existing files are found, Deleting existing files if continue is accepted.
- **Batch Processing**: Processes input tar files in configurable batches to manage resource usage.
- **End-to-End Processing**: Extracts GZ files, processes DAT/ASC, and appends to CSV in a single thread per file.
- **Concurrency**: Uses multi-threading to process individual GZ files within a batch concurrently.
- **Cumulative Data**: Automatically appends new query results to the existing CSV files in `COMBINED_FOLDER` for each batch, ensuring no data is lost and columns are correctly aligned.
- **Dynamic ETA**: Provides a real-time estimate of completion time.
### extract.py
@@ -73,6 +74,7 @@ The `config.py` file defines folder paths and file deletion options:
- ASC_TOP_FOLDER = "./asc_files"
- COMBINED_FOLDER = "./combined_files"
- ZONE_FOLDER = "./zone_inputs"
- BATCH_SIZE = 5 (Number of tar files to process per batch)
Example of how the zone csv files should look:
+2
View File
@@ -11,3 +11,5 @@ class Config:
delete_gz_after_processing = True
delete_dat_after_processing = True
delete_asc_after_processing = True
BATCH_SIZE = 5
+147 -47
View File
@@ -4,6 +4,7 @@ import os
import csv
import concurrent.futures
from pathlib import Path
import shutil
from config import Config
from modules import BatchNimrod, GenerateTimeseries, Extract
@@ -13,14 +14,40 @@ logging.basicConfig(
)
def process_pipeline(dat_file):
# 1. Process DAT to ASC
asc_file = batch._process_single_file(dat_file)
if not asc_file:
def process_pipeline(gz_file_path):
# 1. Extract GZ to DAT
gz_path = Path(gz_file_path)
# The dat file name is derived from the gz file name (removing .gz or .dat.gz)
# gz files are named like 'NAME.dat.gz' often.
dat_filename = gz_path.name.replace(".gz", "")
dat_path = Path(Config.DAT_TOP_FOLDER, dat_filename)
# Extract
try:
extraction.process_single_gz(gz_path, dat_path)
except Exception as e:
logging.error(f"Failed to extract {gz_path}: {e}")
return None
# 2. Extract data from ASC
if not dat_path.exists():
logging.error(f"DAT file not found after extraction: {dat_path}")
return None
# 2. Process DAT to ASC
# BatchNimrod._process_single_file expects just the filename, not full path
asc_file = batch._process_single_file(dat_filename)
if not asc_file:
# Cleanup failed DAT file if needed (BatchNimrod might have done it or not)
if Config.delete_dat_after_processing and dat_path.exists():
try:
os.remove(dat_path)
except OSError:
pass
return None
# 3. Extract data from ASC
file_results = timeseries.process_asc_file(asc_file, locations)
return file_results
@@ -57,64 +84,137 @@ if __name__ == "__main__":
logging.info(f"Count of 1km Grids: {len(locations)}")
logging.info(f"Count of Zones: {len(zones)}")
# Check for existing combined files
existing_combined = os.listdir(Config.COMBINED_FOLDER)
if existing_combined:
logging.warning("!" * 80)
logging.warning(
f"Found {len(existing_combined)} files in {Config.COMBINED_FOLDER}"
)
logging.warning(
"If you continue these WILL BE DELETED, Please make sure you have them saved."
)
logging.warning("!" * 80)
response = input("Continue? (Y/N): ").strip().lower()
if response != "y":
logging.info("Aborting...")
exit(0)
else:
shutil.rmtree(Path(Config.COMBINED_FOLDER)) # Delete everything including the directory
Path(Config.COMBINED_FOLDER).mkdir()
extraction = Extract(Config)
batch = BatchNimrod(Config)
timeseries = GenerateTimeseries(Config, locations)
start = time.time()
logging.info(
"Starting interleaved processing of DAT files and Timeseries generation"
"Starting interleaved processing of GZ files -> DAT -> ASC -> Timeseries"
)
# Initialize results structure
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
# Get list of all tar files
all_tar_files = [f for f in os.listdir(Config.TAR_TOP_FOLDER) if f.endswith(".tar")]
all_tar_files.sort()
total_tars = len(all_tar_files)
files_per_tar = 288
estimated_total_files = total_tars * files_per_tar
logging.info(f"Found {total_tars} tar files to process")
logging.info("Extracting tar and gz files")
extraction.run_extraction()
# Process in batches
for i in range(0, total_tars, Config.BATCH_SIZE):
batch_files = all_tar_files[i : i + Config.BATCH_SIZE]
logging.info(
f"Processing batch {i // Config.BATCH_SIZE + 1}: {len(batch_files)} tar files"
)
# Get list of DAT files
dat_files = [
f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith(".")
]
total_files = len(dat_files)
# Initialize results structure for this batch
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
logging.info(f"Processing {total_files} files concurrently...")
# 1. Extract batch (TAR -> GZ)
logging.info("Extracting tar files for batch")
extraction.extract_tar_batch(batch_files)
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_file = {
executor.submit(process_pipeline, dat_file): dat_file
for dat_file in dat_files
}
gz_files_to_process = []
for tar_file in batch_files:
extract_folder = Path(Config.GZ_TOP_FOLDER, tar_file.replace(".tar", ""))
if extract_folder.exists():
for root, _, files in os.walk(extract_folder):
for file in files:
if file.endswith(".gz"):
gz_files_to_process.append(Path(root, file))
completed_count = 0
try:
for future in concurrent.futures.as_completed(future_to_file):
file_results = future.result()
if file_results:
for res in file_results:
zone_id = res["zone_id"]
results[zone_id]["dates"].append(res["date"])
results[zone_id]["values"].append(res["value"])
total_files = len(gz_files_to_process)
logging.info(f"Found {total_files} GZ files to process concurrently...")
completed_count += 1
if completed_count % 100 == 0:
elapsed_time = time.time() - start
files_per_minute = (completed_count / elapsed_time) * 60
remaining_files = total_files - completed_count
eta_minutes = remaining_files / (files_per_minute / 60) / 60
logging.info(f"""Processed {completed_count} out of {total_files} files.
Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes""")
except KeyboardInterrupt:
logging.warning("KeyboardInterrupt received. Cancelling pending tasks...")
executor.shutdown(wait=False, cancel_futures=True)
raise
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_file = {
executor.submit(process_pipeline, gz_file): gz_file
for gz_file in gz_files_to_process
}
elapsed_time = time.time() - start
logging.info(f"Interleaved processing completed in {elapsed_time:.2f} seconds")
completed_count = 0
try:
for future in concurrent.futures.as_completed(future_to_file):
file_results = future.result()
if file_results:
for res in file_results:
zone_id = res["zone_id"]
results[zone_id]["dates"].append(res["date"])
results[zone_id]["values"].append(res["value"])
logging.info("Writing CSV files...")
timeseries.write_results_to_csv(results, locations)
completed_count += 1
if completed_count % 100 == 0:
files_processed_previous = i * files_per_tar
files_processed_so_far = (
files_processed_previous + completed_count
)
elapsed_time = time.time() - start
rate_per_second = files_processed_so_far / elapsed_time
remaining_files = estimated_total_files - files_processed_so_far
if rate_per_second > 0:
eta_seconds = remaining_files / rate_per_second
if eta_seconds < 60:
eta_str = f"{int(eta_seconds)}s"
elif eta_seconds < 3600:
eta_str = f"{int(eta_seconds // 60)}m {int(eta_seconds % 60)}s"
else:
eta_str = f"{int(eta_seconds // 3600)}h {int((eta_seconds % 3600) // 60)}m"
else:
eta_str = "Unknown"
logging.info(f"""Progress: {files_processed_so_far}/{estimated_total_files} files ({files_processed_so_far / estimated_total_files * 100:.1f}%)
Speed: {rate_per_second * 60:.2f} files/min. ETA: {eta_str}""")
except KeyboardInterrupt:
logging.warning(
"KeyboardInterrupt received. Cancelling pending tasks..."
)
executor.shutdown(wait=False, cancel_futures=True)
raise
logging.info("Appending batch results to CSV files...")
timeseries.append_results_to_csv(results, locations)
# Cleanup GZ folders for this batch
# We loop through batch_files again to delete the folders we created
for tar_file in batch_files:
extract_folder = Path(Config.GZ_TOP_FOLDER, tar_file.replace(".tar", ""))
if extract_folder.exists():
try:
shutil.rmtree(extract_folder)
except OSError as e:
logging.warning(f"Failed to remove GZ folder {extract_folder}: {e}")
end = time.time()
elapsed_time = end - start
logging.info(f"All Complete total time {elapsed_time:.2f} seconds")
if elapsed_time < 60:
elapsed_time_str = f"{int(elapsed_time)}s"
elif elapsed_time < 3600:
elapsed_time_str = f"{int(elapsed_time // 60)}m {int(elapsed_time % 60)}s"
else:
elapsed_time_str = f"{int(elapsed_time // 3600)}h {int((elapsed_time % 3600) // 60)}m"
logging.info(f"All Complete total time {elapsed_time_str}")
+28 -18
View File
@@ -3,6 +3,7 @@ import gzip
import shutil
import os
from pathlib import Path
import concurrent.futures
class Extract:
@@ -10,12 +11,8 @@ class Extract:
def __init__(self, Config):
self.config = Config
def _extract_tar(self):
for tar_file in os.listdir(self.config.TAR_TOP_FOLDER):
# only handle .tar files
if not tar_file.endswith(".tar"):
pass
def extract_tar_batch(self, tar_files):
for tar_file in tar_files:
tar_path = Path(self.config.TAR_TOP_FOLDER, tar_file)
# Create a folder for extracted tar contents
@@ -31,32 +28,45 @@ class Extract:
if self.config.delete_tar_after_processing:
os.remove(tar_path)
def _extract_gz(self):
def process_single_gz(self, gz_path, dat_path):
try:
with gzip.open(gz_path, "rb") as f_in:
with open(dat_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
if self.config.delete_gz_after_processing:
os.remove(gz_path)
except Exception as e:
print(f"Error extracting {gz_path}: {e}")
def extract_gz_batch(self):
gz_tasks = []
for root, _, files in os.walk(self.config.GZ_TOP_FOLDER):
for file in files:
# only handle .gz files
if not file.endswith(".dat.gz"):
pass # adjust if extension differs
continue
gz_path = Path(root, file)
dat_path = Path(self.config.DAT_TOP_FOLDER, file.replace(".gz", ""))
gz_tasks.append((gz_path, dat_path))
# Unzip .gz file
with gzip.open(gz_path, "rb") as f_in:
with open(dat_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
print(f"Extracting {len(gz_tasks)} gz files concurrently...")
if self.config.delete_gz_after_processing:
os.remove(gz_path)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.process_single_gz, gz_path, dat_path)
for gz_path, dat_path in gz_tasks
]
concurrent.futures.wait(futures)
try:
shutil.rmtree(self.config.GZ_TOP_FOLDER)
# Recreate the folder for the next batch
Path(self.config.GZ_TOP_FOLDER).mkdir(exist_ok=True)
print("processing complete and GZ files deleted")
except Exception as e:
print(str(e))
print(
f"processing complete but GZ folder delete failed. Please delete manually ({self.config.GZ_TOP_FOLDER})"
)
def run_extraction(self):
self._extract_tar()
self._extract_gz()
+32 -10
View File
@@ -164,8 +164,8 @@ class GenerateTimeseries:
executor.shutdown(wait=False, cancel_futures=True)
raise
def write_results_to_csv(self, results, locations):
"""Write extracted data to CSV files for each zone.
def append_results_to_csv(self, results, locations):
"""Append extracted data to CSV files for each zone.
Args:
results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}}
@@ -217,19 +217,41 @@ class GenerateTimeseries:
df_dict[grid_square] = aligned_values
df = pd.DataFrame(df_dict)
new_df = pd.DataFrame(df_dict)
# Sort by datetime (already sorted)
sorted_df = df.sort("datetime")
# Format datetime column
sorted_df = sorted_df.with_columns(
# Format datetime column in new_df
new_df = new_df.with_columns(
pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S")
)
output_path = (
Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv"
)
sorted_df.write_csv(output_path, float_precision=4)
logging.info("All CSV files written.")
if output_path.exists():
# Load existing CSV
existing_df = pd.read_csv(output_path)
# Reorder new_df to match existing_df
new_df = new_df.select(existing_df.columns)
# Concatenate
combined_df = pd.concat([existing_df, new_df])
# Sort by datetime
combined_df = combined_df.sort("datetime")
# Write back
combined_df.write_csv(output_path, float_precision=4)
else:
# Write new CSV
# Sort columns to ensure deterministic order (datetime first)
cols = new_df.columns
cols.remove("datetime")
cols.sort()
sorted_cols = ["datetime"] + cols
new_df = new_df.select(sorted_cols)
# Sort by datetime (already sorted but good practice)
sorted_df = new_df.sort("datetime")
sorted_df.write_csv(output_path, float_precision=4)
logging.info("All CSV files updated.")
+2 -2
View File
@@ -1,7 +1,7 @@
[project]
name = "met-office"
version = "1.2.0"
description = "Convert .dat nimrod files to .asc files"
version = "1.3.2"
description = "Convert nimrod files to .csv timeseries"
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
Generated
+1 -1
View File
@@ -4,7 +4,7 @@ requires-python = ">=3.14"
[[package]]
name = "met-office"
version = "1.2.0"
version = "1.3.2"
source = { virtual = "." }
dependencies = [
{ name = "numpy" },