diff --git a/README.MD b/README.MD index 50fde5d..5c672dc 100644 --- a/README.MD +++ b/README.MD @@ -15,11 +15,12 @@ The project consists of a main pipeline workflow that processes multiple modules ### main.py -- Orchestrates the entire workflow pipeline -- Uncompress the packed .gz.tar files to DAT files -- Processes DAT files to ASC format -- Generates timeseries data for specified locations -- Combines grouped CSV files into consolidated datasets formatted for Infoworks ICM +- **Startup Safety Check**: Scans the `COMBINED_FOLDER` at startup and warns the user if existing files are found, offering a chance to abort to prevent accidental data mixing. +- **Batch Processing**: Processes input tar files in configurable batches to manage resource usage. +- **End-to-End Processing**: Extracts GZ files, processes DAT/ASC, and appends to CSV in a single thread per file. +- **Concurrency**: Uses multi-threading to process individual GZ files within a batch concurrently. +- **Cumulative Data**: Automatically appends new query results to the existing CSV files in `COMBINED_FOLDER` for each batch, ensuring no data is lost and columns are correctly aligned. +- **Dynamic ETA**: Provides a real-time estimate of completion time. ### extract.py @@ -73,6 +74,7 @@ The `config.py` file defines folder paths and file deletion options: - ASC_TOP_FOLDER = "./asc_files" - COMBINED_FOLDER = "./combined_files" - ZONE_FOLDER = "./zone_inputs" +- BATCH_SIZE = 5 (Number of tar files to process per batch) Example of how the zone csv files should look: diff --git a/config.py b/config.py index e5b008b..51c00ae 100644 --- a/config.py +++ b/config.py @@ -11,3 +11,5 @@ class Config: delete_gz_after_processing = True delete_dat_after_processing = True delete_asc_after_processing = True + + BATCH_SIZE = 5 diff --git a/main.py b/main.py index fa6fa40..4a52b93 100644 --- a/main.py +++ b/main.py @@ -4,6 +4,7 @@ import os import csv import concurrent.futures from pathlib import Path +import shutil from config import Config from modules import BatchNimrod, GenerateTimeseries, Extract @@ -13,14 +14,40 @@ logging.basicConfig( ) -def process_pipeline(dat_file): - # 1. Process DAT to ASC - asc_file = batch._process_single_file(dat_file) - if not asc_file: +def process_pipeline(gz_file_path): + # 1. Extract GZ to DAT + gz_path = Path(gz_file_path) + # The dat file name is derived from the gz file name (removing .gz or .dat.gz) + # gz files are named like 'NAME.dat.gz' often. + dat_filename = gz_path.name.replace(".gz", "") + dat_path = Path(Config.DAT_TOP_FOLDER, dat_filename) + + # Extract + try: + extraction.process_single_gz(gz_path, dat_path) + except Exception as e: + logging.error(f"Failed to extract {gz_path}: {e}") return None - # 2. Extract data from ASC + if not dat_path.exists(): + logging.error(f"DAT file not found after extraction: {dat_path}") + return None + + # 2. Process DAT to ASC + # BatchNimrod._process_single_file expects just the filename, not full path + asc_file = batch._process_single_file(dat_filename) + if not asc_file: + # Cleanup failed DAT file if needed (BatchNimrod might have done it or not) + if Config.delete_dat_after_processing and dat_path.exists(): + try: + os.remove(dat_path) + except OSError: + pass + return None + + # 3. Extract data from ASC file_results = timeseries.process_asc_file(asc_file, locations) + return file_results @@ -57,64 +84,140 @@ if __name__ == "__main__": logging.info(f"Count of 1km Grids: {len(locations)}") logging.info(f"Count of Zones: {len(zones)}") + # Check for existing combined files + existing_combined = os.listdir(Config.COMBINED_FOLDER) + if existing_combined: + logging.warning("!" * 80) + logging.warning( + f"Found {len(existing_combined)} files in {Config.COMBINED_FOLDER}" + ) + logging.warning( + "You may want to remove these before continuing to avoid duplicates or messy data." + ) + logging.warning("!" * 80) + response = input("Continue? (Y/N): ").strip().lower() + if response != "y": + logging.info("Aborting...") + exit(0) + extraction = Extract(Config) batch = BatchNimrod(Config) timeseries = GenerateTimeseries(Config, locations) start = time.time() logging.info( - "Starting interleaved processing of DAT files and Timeseries generation" + "Starting interleaved processing of GZ files -> DAT -> ASC -> Timeseries" ) - # Initialize results structure - results = {loc[0]: {"dates": [], "values": []} for loc in locations} + # Get list of all tar files + all_tar_files = [f for f in os.listdir(Config.TAR_TOP_FOLDER) if f.endswith(".tar")] + all_tar_files.sort() + total_tars = len(all_tar_files) + files_per_tar = 288 + estimated_total_files = total_tars * files_per_tar + logging.info(f"Found {total_tars} tar files to process") - logging.info("Extracting tar and gz files") - extraction.run_extraction() + # Process in batches + for i in range(0, total_tars, Config.BATCH_SIZE): + batch_files = all_tar_files[i : i + Config.BATCH_SIZE] + logging.info( + f"Processing batch {i // Config.BATCH_SIZE + 1}: {len(batch_files)} tar files" + ) - # Get list of DAT files - dat_files = [ - f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith(".") - ] - total_files = len(dat_files) + # Initialize results structure for this batch + results = {loc[0]: {"dates": [], "values": []} for loc in locations} - logging.info(f"Processing {total_files} files concurrently...") + # 1. Extract batch (TAR -> GZ) + logging.info("Extracting tar files for batch") + extraction.extract_tar_batch(batch_files) + # Note: We do NOT run extract_gz_batch anymore. We will find GZ files and process them. - with concurrent.futures.ThreadPoolExecutor() as executor: - future_to_file = { - executor.submit(process_pipeline, dat_file): dat_file - for dat_file in dat_files - } + # Get list of GZ files (recursively or flat?) + # extract_tar_batch puts them in GZ_TOP_FOLDER/tar_name_without_ext + # So we need to look there. + # Ideally we know where we put them. - completed_count = 0 - try: - for future in concurrent.futures.as_completed(future_to_file): - file_results = future.result() - if file_results: - for res in file_results: - zone_id = res["zone_id"] - results[zone_id]["dates"].append(res["date"]) - results[zone_id]["values"].append(res["value"]) + gz_files_to_process = [] + for tar_file in batch_files: + extract_folder = Path(Config.GZ_TOP_FOLDER, tar_file.replace(".tar", "")) + if extract_folder.exists(): + for root, _, files in os.walk(extract_folder): + for file in files: + if file.endswith(".gz"): + gz_files_to_process.append(Path(root, file)) - completed_count += 1 - if completed_count % 100 == 0: - elapsed_time = time.time() - start - files_per_minute = (completed_count / elapsed_time) * 60 - remaining_files = total_files - completed_count - eta_minutes = remaining_files / (files_per_minute / 60) / 60 - logging.info(f"""Processed {completed_count} out of {total_files} files. - Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes""") - except KeyboardInterrupt: - logging.warning("KeyboardInterrupt received. Cancelling pending tasks...") - executor.shutdown(wait=False, cancel_futures=True) - raise + total_files = len(gz_files_to_process) + logging.info(f"Found {total_files} GZ files to process concurrently...") - elapsed_time = time.time() - start - logging.info(f"Interleaved processing completed in {elapsed_time:.2f} seconds") + with concurrent.futures.ThreadPoolExecutor() as executor: + future_to_file = { + executor.submit(process_pipeline, gz_file): gz_file + for gz_file in gz_files_to_process + } - logging.info("Writing CSV files...") - timeseries.write_results_to_csv(results, locations) + completed_count = 0 + try: + for future in concurrent.futures.as_completed(future_to_file): + file_results = future.result() + if file_results: + for res in file_results: + zone_id = res["zone_id"] + results[zone_id]["dates"].append(res["date"]) + results[zone_id]["values"].append(res["value"]) + + completed_count += 1 + if completed_count % 100 == 0: + files_processed_previous = i * files_per_tar + files_processed_so_far = ( + files_processed_previous + completed_count + ) + + elapsed_time = time.time() - start + rate_per_second = files_processed_so_far / elapsed_time + + remaining_files = estimated_total_files - files_processed_so_far + + if rate_per_second > 0: + eta_seconds = remaining_files / rate_per_second + + if eta_seconds < 60: + eta_str = f"{int(eta_seconds)}s" + elif eta_seconds < 3600: + eta_str = f"{int(eta_seconds // 60)}m {int(eta_seconds % 60)}s" + else: + eta_str = f"{int(eta_seconds // 3600)}h {int((eta_seconds % 3600) // 60)}m" + else: + eta_str = "Unknown" + + logging.info(f"""Progress: {files_processed_so_far}/{estimated_total_files} files ({files_processed_so_far / estimated_total_files * 100:.1f}%) + Speed: {rate_per_second * 60:.2f} files/min. ETA: {eta_str}""") + except KeyboardInterrupt: + logging.warning( + "KeyboardInterrupt received. Cancelling pending tasks..." + ) + executor.shutdown(wait=False, cancel_futures=True) + raise + + logging.info("Appending batch results to CSV files...") + timeseries.append_results_to_csv(results, locations) + + # Cleanup GZ folders for this batch + # We loop through batch_files again to delete the folders we created + for tar_file in batch_files: + extract_folder = Path(Config.GZ_TOP_FOLDER, tar_file.replace(".tar", "")) + if extract_folder.exists(): + try: + shutil.rmtree(extract_folder) + except OSError as e: + logging.warning(f"Failed to remove GZ folder {extract_folder}: {e}") end = time.time() elapsed_time = end - start - logging.info(f"All Complete total time {elapsed_time:.2f} seconds") + if elapsed_time < 60: + elapsed_time_str = f"{int(elapsed_time)}s" + elif elapsed_time < 3600: + elapsed_time_str = f"{int(elapsed_time // 60)}m {int(elapsed_time % 60)}s" + else: + elapsed_time_str = f"{int(elapsed_time // 3600)}h {int((elapsed_time % 3600) // 60)}m" + + logging.info(f"All Complete total time {elapsed_time_str}") diff --git a/modules/extract.py b/modules/extract.py index 249b520..f0f5d68 100755 --- a/modules/extract.py +++ b/modules/extract.py @@ -3,6 +3,7 @@ import gzip import shutil import os from pathlib import Path +import concurrent.futures class Extract: @@ -10,12 +11,8 @@ class Extract: def __init__(self, Config): self.config = Config - def _extract_tar(self): - for tar_file in os.listdir(self.config.TAR_TOP_FOLDER): - # only handle .tar files - if not tar_file.endswith(".tar"): - pass - + def extract_tar_batch(self, tar_files): + for tar_file in tar_files: tar_path = Path(self.config.TAR_TOP_FOLDER, tar_file) # Create a folder for extracted tar contents @@ -31,32 +28,45 @@ class Extract: if self.config.delete_tar_after_processing: os.remove(tar_path) - def _extract_gz(self): + def process_single_gz(self, gz_path, dat_path): + try: + with gzip.open(gz_path, "rb") as f_in: + with open(dat_path, "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + + if self.config.delete_gz_after_processing: + os.remove(gz_path) + except Exception as e: + print(f"Error extracting {gz_path}: {e}") + + def extract_gz_batch(self): + gz_tasks = [] for root, _, files in os.walk(self.config.GZ_TOP_FOLDER): for file in files: # only handle .gz files if not file.endswith(".dat.gz"): - pass # adjust if extension differs + continue + gz_path = Path(root, file) dat_path = Path(self.config.DAT_TOP_FOLDER, file.replace(".gz", "")) + gz_tasks.append((gz_path, dat_path)) - # Unzip .gz file - with gzip.open(gz_path, "rb") as f_in: - with open(dat_path, "wb") as f_out: - shutil.copyfileobj(f_in, f_out) + print(f"Extracting {len(gz_tasks)} gz files concurrently...") - if self.config.delete_gz_after_processing: - os.remove(gz_path) + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit(self.process_single_gz, gz_path, dat_path) + for gz_path, dat_path in gz_tasks + ] + concurrent.futures.wait(futures) try: shutil.rmtree(self.config.GZ_TOP_FOLDER) + # Recreate the folder for the next batch + Path(self.config.GZ_TOP_FOLDER).mkdir(exist_ok=True) print("processing complete and GZ files deleted") except Exception as e: print(str(e)) print( f"processing complete but GZ folder delete failed. Please delete manually ({self.config.GZ_TOP_FOLDER})" ) - - def run_extraction(self): - self._extract_tar() - self._extract_gz() diff --git a/modules/generate_timeseries.py b/modules/generate_timeseries.py index 2011873..5988800 100644 --- a/modules/generate_timeseries.py +++ b/modules/generate_timeseries.py @@ -164,8 +164,8 @@ class GenerateTimeseries: executor.shutdown(wait=False, cancel_futures=True) raise - def write_results_to_csv(self, results, locations): - """Write extracted data to CSV files for each zone. + def append_results_to_csv(self, results, locations): + """Append extracted data to CSV files for each zone. Args: results (dict): Aggregated results {zone_id: {'dates': [], 'values': []}} @@ -217,19 +217,41 @@ class GenerateTimeseries: df_dict[grid_square] = aligned_values - df = pd.DataFrame(df_dict) + new_df = pd.DataFrame(df_dict) - # Sort by datetime (already sorted) - sorted_df = df.sort("datetime") - - # Format datetime column - sorted_df = sorted_df.with_columns( + # Format datetime column in new_df + new_df = new_df.with_columns( pd.col("datetime").dt.strftime("%Y-%m-%d %H:%M:%S") ) output_path = ( Path(self.config.COMBINED_FOLDER) / f"{zone_name}_timeseries_data.csv" ) - sorted_df.write_csv(output_path, float_precision=4) - logging.info("All CSV files written.") + if output_path.exists(): + # Load existing CSV + existing_df = pd.read_csv(output_path) + + # Reorder new_df to match existing_df + new_df = new_df.select(existing_df.columns) + + # Concatenate + combined_df = pd.concat([existing_df, new_df]) + # Sort by datetime + combined_df = combined_df.sort("datetime") + # Write back + combined_df.write_csv(output_path, float_precision=4) + else: + # Write new CSV + # Sort columns to ensure deterministic order (datetime first) + cols = new_df.columns + cols.remove("datetime") + cols.sort() + sorted_cols = ["datetime"] + cols + new_df = new_df.select(sorted_cols) + + # Sort by datetime (already sorted but good practice) + sorted_df = new_df.sort("datetime") + sorted_df.write_csv(output_path, float_precision=4) + + logging.info("All CSV files updated.") diff --git a/pyproject.toml b/pyproject.toml index c8b4255..46833a1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "met-office" -version = "1.2.0" +version = "1.3.1" description = "Convert .dat nimrod files to .asc files" readme = "README.md" requires-python = ">=3.14" diff --git a/uv.lock b/uv.lock index e63ab11..6d8e156 100644 --- a/uv.lock +++ b/uv.lock @@ -4,7 +4,7 @@ requires-python = ">=3.14" [[package]] name = "met-office" -version = "1.2.0" +version = "1.3.1" source = { virtual = "." } dependencies = [ { name = "numpy" },