Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
2c4c4a3f4e
|
@@ -15,7 +15,7 @@ The project consists of a main pipeline workflow that processes multiple modules
|
|||||||
|
|
||||||
### main.py
|
### main.py
|
||||||
|
|
||||||
- **Startup Safety Check**: Scans the `COMBINED_FOLDER` at startup and warns the user if existing files are found, Deleting existing files if continue is accepted.
|
- **Startup Safety Check**: Scans the `COMBINED_FOLDER` at startup and warns the user if existing files are found, offering a chance to abort to prevent accidental data mixing.
|
||||||
- **Batch Processing**: Processes input tar files in configurable batches to manage resource usage.
|
- **Batch Processing**: Processes input tar files in configurable batches to manage resource usage.
|
||||||
- **End-to-End Processing**: Extracts GZ files, processes DAT/ASC, and appends to CSV in a single thread per file.
|
- **End-to-End Processing**: Extracts GZ files, processes DAT/ASC, and appends to CSV in a single thread per file.
|
||||||
- **Concurrency**: Uses multi-threading to process individual GZ files within a batch concurrently.
|
- **Concurrency**: Uses multi-threading to process individual GZ files within a batch concurrently.
|
||||||
|
|||||||
@@ -92,16 +92,13 @@ if __name__ == "__main__":
|
|||||||
f"Found {len(existing_combined)} files in {Config.COMBINED_FOLDER}"
|
f"Found {len(existing_combined)} files in {Config.COMBINED_FOLDER}"
|
||||||
)
|
)
|
||||||
logging.warning(
|
logging.warning(
|
||||||
"If you continue these WILL BE DELETED, Please make sure you have them saved."
|
"You may want to remove these before continuing to avoid duplicates or messy data."
|
||||||
)
|
)
|
||||||
logging.warning("!" * 80)
|
logging.warning("!" * 80)
|
||||||
response = input("Continue? (Y/N): ").strip().lower()
|
response = input("Continue? (Y/N): ").strip().lower()
|
||||||
if response != "y":
|
if response != "y":
|
||||||
logging.info("Aborting...")
|
logging.info("Aborting...")
|
||||||
exit(0)
|
exit(0)
|
||||||
else:
|
|
||||||
shutil.rmtree(Path(Config.COMBINED_FOLDER)) # Delete everything including the directory
|
|
||||||
Path(Config.COMBINED_FOLDER).mkdir()
|
|
||||||
|
|
||||||
extraction = Extract(Config)
|
extraction = Extract(Config)
|
||||||
batch = BatchNimrod(Config)
|
batch = BatchNimrod(Config)
|
||||||
@@ -133,6 +130,12 @@ if __name__ == "__main__":
|
|||||||
# 1. Extract batch (TAR -> GZ)
|
# 1. Extract batch (TAR -> GZ)
|
||||||
logging.info("Extracting tar files for batch")
|
logging.info("Extracting tar files for batch")
|
||||||
extraction.extract_tar_batch(batch_files)
|
extraction.extract_tar_batch(batch_files)
|
||||||
|
# Note: We do NOT run extract_gz_batch anymore. We will find GZ files and process them.
|
||||||
|
|
||||||
|
# Get list of GZ files (recursively or flat?)
|
||||||
|
# extract_tar_batch puts them in GZ_TOP_FOLDER/tar_name_without_ext
|
||||||
|
# So we need to look there.
|
||||||
|
# Ideally we know where we put them.
|
||||||
|
|
||||||
gz_files_to_process = []
|
gz_files_to_process = []
|
||||||
for tar_file in batch_files:
|
for tar_file in batch_files:
|
||||||
@@ -164,14 +167,14 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
completed_count += 1
|
completed_count += 1
|
||||||
if completed_count % 100 == 0:
|
if completed_count % 100 == 0:
|
||||||
|
elapsed_time = time.time() - start
|
||||||
|
rate_per_second = completed_count / elapsed_time
|
||||||
|
|
||||||
files_processed_previous = i * files_per_tar
|
files_processed_previous = i * files_per_tar
|
||||||
files_processed_so_far = (
|
files_processed_so_far = (
|
||||||
files_processed_previous + completed_count
|
files_processed_previous + completed_count
|
||||||
)
|
)
|
||||||
|
|
||||||
elapsed_time = time.time() - start
|
|
||||||
rate_per_second = files_processed_so_far / elapsed_time
|
|
||||||
|
|
||||||
remaining_files = estimated_total_files - files_processed_so_far
|
remaining_files = estimated_total_files - files_processed_so_far
|
||||||
|
|
||||||
if rate_per_second > 0:
|
if rate_per_second > 0:
|
||||||
@@ -210,11 +213,4 @@ if __name__ == "__main__":
|
|||||||
end = time.time()
|
end = time.time()
|
||||||
elapsed_time = end - start
|
elapsed_time = end - start
|
||||||
|
|
||||||
if elapsed_time < 60:
|
logging.info(f"All Complete total time {elapsed_time:.2f} seconds")
|
||||||
elapsed_time_str = f"{int(elapsed_time)}s"
|
|
||||||
elif elapsed_time < 3600:
|
|
||||||
elapsed_time_str = f"{int(elapsed_time // 60)}m {int(elapsed_time % 60)}s"
|
|
||||||
else:
|
|
||||||
elapsed_time_str = f"{int(elapsed_time // 3600)}h {int((elapsed_time % 3600) // 60)}m"
|
|
||||||
|
|
||||||
logging.info(f"All Complete total time {elapsed_time_str}")
|
|
||||||
|
|||||||
+2
-2
@@ -1,7 +1,7 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "met-office"
|
name = "met-office"
|
||||||
version = "1.3.2"
|
version = "1.3.0"
|
||||||
description = "Convert nimrod files to .csv timeseries"
|
description = "Convert .dat nimrod files to .asc files"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.14"
|
requires-python = ">=3.14"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ requires-python = ">=3.14"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "met-office"
|
name = "met-office"
|
||||||
version = "1.3.2"
|
version = "1.3.0"
|
||||||
source = { virtual = "." }
|
source = { virtual = "." }
|
||||||
dependencies = [
|
dependencies = [
|
||||||
{ name = "numpy" },
|
{ name = "numpy" },
|
||||||
|
|||||||
Reference in New Issue
Block a user