Files
met_office_radar_data/main.py
T
Jake 2c4c4a3f4e feat: added the extraction proces into the main multithreaded loop
Also added a warning when the app finds existing CSV files in the combined folder
2025-12-12 18:35:26 +00:00

217 lines
8.1 KiB
Python

import logging
import time
import os
import csv
import concurrent.futures
from pathlib import Path
import shutil
from config import Config
from modules import BatchNimrod, GenerateTimeseries, Extract
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
def process_pipeline(gz_file_path):
# 1. Extract GZ to DAT
gz_path = Path(gz_file_path)
# The dat file name is derived from the gz file name (removing .gz or .dat.gz)
# gz files are named like 'NAME.dat.gz' often.
dat_filename = gz_path.name.replace(".gz", "")
dat_path = Path(Config.DAT_TOP_FOLDER, dat_filename)
# Extract
try:
extraction.process_single_gz(gz_path, dat_path)
except Exception as e:
logging.error(f"Failed to extract {gz_path}: {e}")
return None
if not dat_path.exists():
logging.error(f"DAT file not found after extraction: {dat_path}")
return None
# 2. Process DAT to ASC
# BatchNimrod._process_single_file expects just the filename, not full path
asc_file = batch._process_single_file(dat_filename)
if not asc_file:
# Cleanup failed DAT file if needed (BatchNimrod might have done it or not)
if Config.delete_dat_after_processing and dat_path.exists():
try:
os.remove(dat_path)
except OSError:
pass
return None
# 3. Extract data from ASC
file_results = timeseries.process_asc_file(asc_file, locations)
return file_results
def initialise_folders():
folder_list = [
Config.ASC_TOP_FOLDER,
Config.COMBINED_FOLDER,
Config.GZ_TOP_FOLDER,
Config.DAT_TOP_FOLDER,
Config.TAR_TOP_FOLDER,
]
for path in folder_list:
Path(path).mkdir(exist_ok=True)
if __name__ == "__main__":
initialise_folders()
locations = []
zones = set()
# load zone inputs here
for file in os.listdir(Path(Config.ZONE_FOLDER)):
with open(Path(Config.ZONE_FOLDER, file), "r") as csvfile:
reader = csv.reader(csvfile)
header = next(reader) # Skip header row
for row in reader:
# Extract the relevant fields: 1K Grid, Easting, Northing, Zone
grid_name = row[0] # 1k Grid name
easting = int(row[1]) # Easting column
northing = int(row[2]) # Northing column
zone = int(row[3]) # ZoneID column
locations.append([grid_name, easting, northing, zone])
zones.add(zone)
logging.info(f"Count of 1km Grids: {len(locations)}")
logging.info(f"Count of Zones: {len(zones)}")
# Check for existing combined files
existing_combined = os.listdir(Config.COMBINED_FOLDER)
if existing_combined:
logging.warning("!" * 80)
logging.warning(
f"Found {len(existing_combined)} files in {Config.COMBINED_FOLDER}"
)
logging.warning(
"You may want to remove these before continuing to avoid duplicates or messy data."
)
logging.warning("!" * 80)
response = input("Continue? (Y/N): ").strip().lower()
if response != "y":
logging.info("Aborting...")
exit(0)
extraction = Extract(Config)
batch = BatchNimrod(Config)
timeseries = GenerateTimeseries(Config, locations)
start = time.time()
logging.info(
"Starting interleaved processing of GZ files -> DAT -> ASC -> Timeseries"
)
# Get list of all tar files
all_tar_files = [f for f in os.listdir(Config.TAR_TOP_FOLDER) if f.endswith(".tar")]
all_tar_files.sort()
total_tars = len(all_tar_files)
files_per_tar = 288
estimated_total_files = total_tars * files_per_tar
logging.info(f"Found {total_tars} tar files to process")
# Process in batches
for i in range(0, total_tars, Config.BATCH_SIZE):
batch_files = all_tar_files[i : i + Config.BATCH_SIZE]
logging.info(
f"Processing batch {i // Config.BATCH_SIZE + 1}: {len(batch_files)} tar files"
)
# Initialize results structure for this batch
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
# 1. Extract batch (TAR -> GZ)
logging.info("Extracting tar files for batch")
extraction.extract_tar_batch(batch_files)
# Note: We do NOT run extract_gz_batch anymore. We will find GZ files and process them.
# Get list of GZ files (recursively or flat?)
# extract_tar_batch puts them in GZ_TOP_FOLDER/tar_name_without_ext
# So we need to look there.
# Ideally we know where we put them.
gz_files_to_process = []
for tar_file in batch_files:
extract_folder = Path(Config.GZ_TOP_FOLDER, tar_file.replace(".tar", ""))
if extract_folder.exists():
for root, _, files in os.walk(extract_folder):
for file in files:
if file.endswith(".gz"):
gz_files_to_process.append(Path(root, file))
total_files = len(gz_files_to_process)
logging.info(f"Found {total_files} GZ files to process concurrently...")
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_file = {
executor.submit(process_pipeline, gz_file): gz_file
for gz_file in gz_files_to_process
}
completed_count = 0
try:
for future in concurrent.futures.as_completed(future_to_file):
file_results = future.result()
if file_results:
for res in file_results:
zone_id = res["zone_id"]
results[zone_id]["dates"].append(res["date"])
results[zone_id]["values"].append(res["value"])
completed_count += 1
if completed_count % 100 == 0:
elapsed_time = time.time() - start
rate_per_second = completed_count / elapsed_time
files_processed_previous = i * files_per_tar
files_processed_so_far = (
files_processed_previous + completed_count
)
remaining_files = estimated_total_files - files_processed_so_far
if rate_per_second > 0:
eta_seconds = remaining_files / rate_per_second
if eta_seconds < 60:
eta_str = f"{int(eta_seconds)}s"
elif eta_seconds < 3600:
eta_str = f"{int(eta_seconds // 60)}m {int(eta_seconds % 60)}s"
else:
eta_str = f"{int(eta_seconds // 3600)}h {int((eta_seconds % 3600) // 60)}m"
else:
eta_str = "Unknown"
logging.info(f"""Progress: {files_processed_so_far}/{estimated_total_files} files ({files_processed_so_far / estimated_total_files * 100:.1f}%)
Speed: {rate_per_second * 60:.2f} files/min. ETA: {eta_str}""")
except KeyboardInterrupt:
logging.warning(
"KeyboardInterrupt received. Cancelling pending tasks..."
)
executor.shutdown(wait=False, cancel_futures=True)
raise
logging.info("Appending batch results to CSV files...")
timeseries.append_results_to_csv(results, locations)
# Cleanup GZ folders for this batch
# We loop through batch_files again to delete the folders we created
for tar_file in batch_files:
extract_folder = Path(Config.GZ_TOP_FOLDER, tar_file.replace(".tar", ""))
if extract_folder.exists():
try:
shutil.rmtree(extract_folder)
except OSError as e:
logging.warning(f"Failed to remove GZ folder {extract_folder}: {e}")
end = time.time()
elapsed_time = end - start
logging.info(f"All Complete total time {elapsed_time:.2f} seconds")