2 Commits

Author SHA1 Message Date
Jake 9aaf8a5e88 Now deleting existing combined csv files after confirmation at start. 2025-12-15 10:13:11 +00:00
Jake-Pullen a43edb1148 Extraction streamlining (#3)
* feat:  added the extraction process into the main multi threaded loop
Also added a warning when the app finds existing CSV files in the combined folder

* fix: 🐛 Fixed time calculations for ETA & Completion
2025-12-12 19:56:14 +00:00
4 changed files with 19 additions and 15 deletions
+1 -1
View File
@@ -15,7 +15,7 @@ The project consists of a main pipeline workflow that processes multiple modules
### main.py ### main.py
- **Startup Safety Check**: Scans the `COMBINED_FOLDER` at startup and warns the user if existing files are found, offering a chance to abort to prevent accidental data mixing. - **Startup Safety Check**: Scans the `COMBINED_FOLDER` at startup and warns the user if existing files are found, Deleting existing files if continue is accepted.
- **Batch Processing**: Processes input tar files in configurable batches to manage resource usage. - **Batch Processing**: Processes input tar files in configurable batches to manage resource usage.
- **End-to-End Processing**: Extracts GZ files, processes DAT/ASC, and appends to CSV in a single thread per file. - **End-to-End Processing**: Extracts GZ files, processes DAT/ASC, and appends to CSV in a single thread per file.
- **Concurrency**: Uses multi-threading to process individual GZ files within a batch concurrently. - **Concurrency**: Uses multi-threading to process individual GZ files within a batch concurrently.
+15 -11
View File
@@ -92,13 +92,16 @@ if __name__ == "__main__":
f"Found {len(existing_combined)} files in {Config.COMBINED_FOLDER}" f"Found {len(existing_combined)} files in {Config.COMBINED_FOLDER}"
) )
logging.warning( logging.warning(
"You may want to remove these before continuing to avoid duplicates or messy data." "If you continue these WILL BE DELETED, Please make sure you have them saved."
) )
logging.warning("!" * 80) logging.warning("!" * 80)
response = input("Continue? (Y/N): ").strip().lower() response = input("Continue? (Y/N): ").strip().lower()
if response != "y": if response != "y":
logging.info("Aborting...") logging.info("Aborting...")
exit(0) exit(0)
else:
shutil.rmtree(Path(Config.COMBINED_FOLDER)) # Delete everything including the directory
Path(Config.COMBINED_FOLDER).mkdir()
extraction = Extract(Config) extraction = Extract(Config)
batch = BatchNimrod(Config) batch = BatchNimrod(Config)
@@ -130,12 +133,6 @@ if __name__ == "__main__":
# 1. Extract batch (TAR -> GZ) # 1. Extract batch (TAR -> GZ)
logging.info("Extracting tar files for batch") logging.info("Extracting tar files for batch")
extraction.extract_tar_batch(batch_files) extraction.extract_tar_batch(batch_files)
# Note: We do NOT run extract_gz_batch anymore. We will find GZ files and process them.
# Get list of GZ files (recursively or flat?)
# extract_tar_batch puts them in GZ_TOP_FOLDER/tar_name_without_ext
# So we need to look there.
# Ideally we know where we put them.
gz_files_to_process = [] gz_files_to_process = []
for tar_file in batch_files: for tar_file in batch_files:
@@ -167,13 +164,13 @@ if __name__ == "__main__":
completed_count += 1 completed_count += 1
if completed_count % 100 == 0: if completed_count % 100 == 0:
elapsed_time = time.time() - start
rate_per_second = completed_count / elapsed_time
files_processed_previous = i * files_per_tar files_processed_previous = i * files_per_tar
files_processed_so_far = ( files_processed_so_far = (
files_processed_previous + completed_count files_processed_previous + completed_count
) )
elapsed_time = time.time() - start
rate_per_second = files_processed_so_far / elapsed_time
remaining_files = estimated_total_files - files_processed_so_far remaining_files = estimated_total_files - files_processed_so_far
@@ -213,4 +210,11 @@ if __name__ == "__main__":
end = time.time() end = time.time()
elapsed_time = end - start elapsed_time = end - start
logging.info(f"All Complete total time {elapsed_time:.2f} seconds") if elapsed_time < 60:
elapsed_time_str = f"{int(elapsed_time)}s"
elif elapsed_time < 3600:
elapsed_time_str = f"{int(elapsed_time // 60)}m {int(elapsed_time % 60)}s"
else:
elapsed_time_str = f"{int(elapsed_time // 3600)}h {int((elapsed_time % 3600) // 60)}m"
logging.info(f"All Complete total time {elapsed_time_str}")
+2 -2
View File
@@ -1,7 +1,7 @@
[project] [project]
name = "met-office" name = "met-office"
version = "1.3.0" version = "1.3.2"
description = "Convert .dat nimrod files to .asc files" description = "Convert nimrod files to .csv timeseries"
readme = "README.md" readme = "README.md"
requires-python = ">=3.14" requires-python = ">=3.14"
dependencies = [ dependencies = [
Generated
+1 -1
View File
@@ -4,7 +4,7 @@ requires-python = ">=3.14"
[[package]] [[package]]
name = "met-office" name = "met-office"
version = "1.3.0" version = "1.3.2"
source = { virtual = "." } source = { virtual = "." }
dependencies = [ dependencies = [
{ name = "numpy" }, { name = "numpy" },