Files
met_office_radar_data/main.py
T

121 lines
4.0 KiB
Python

import logging
import time
import os
import csv
import concurrent.futures
from pathlib import Path
from config import Config
from modules import BatchNimrod, GenerateTimeseries, Extract
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
def process_pipeline(dat_file):
# 1. Process DAT to ASC
asc_file = batch._process_single_file(dat_file)
if not asc_file:
return None
# 2. Extract data from ASC
file_results = timeseries.process_asc_file(asc_file, locations)
return file_results
def initialise_folders():
folder_list = [
Config.ASC_TOP_FOLDER,
Config.COMBINED_FOLDER,
Config.GZ_TOP_FOLDER,
Config.DAT_TOP_FOLDER,
Config.TAR_TOP_FOLDER,
]
for path in folder_list:
Path(path).mkdir(exist_ok=True)
if __name__ == "__main__":
initialise_folders()
locations = []
zones = set()
# load zone inputs here
for file in os.listdir(Path(Config.ZONE_FOLDER)):
with open(Path(Config.ZONE_FOLDER, file), "r") as csvfile:
reader = csv.reader(csvfile)
header = next(reader) # Skip header row
for row in reader:
# Extract the relevant fields: 1K Grid, Easting, Northing, Zone
grid_name = row[0] # 1k Grid name
easting = int(row[1]) # Easting column
northing = int(row[2]) # Northing column
zone = int(row[3]) # ZoneID column
locations.append([grid_name, easting, northing, zone])
zones.add(zone)
logging.info(f"Count of 1km Grids: {len(locations)}")
logging.info(f"Count of Zones: {len(zones)}")
extraction = Extract(Config)
batch = BatchNimrod(Config)
timeseries = GenerateTimeseries(Config, locations)
start = time.time()
logging.info(
"Starting interleaved processing of DAT files and Timeseries generation"
)
# Initialize results structure
results = {loc[0]: {"dates": [], "values": []} for loc in locations}
logging.info("Extracting tar and gz files")
extraction.run_extraction()
# Get list of DAT files
dat_files = [
f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith(".")
]
total_files = len(dat_files)
logging.info(f"Processing {total_files} files concurrently...")
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_file = {
executor.submit(process_pipeline, dat_file): dat_file
for dat_file in dat_files
}
completed_count = 0
try:
for future in concurrent.futures.as_completed(future_to_file):
file_results = future.result()
if file_results:
for res in file_results:
zone_id = res["zone_id"]
results[zone_id]["dates"].append(res["date"])
results[zone_id]["values"].append(res["value"])
completed_count += 1
if completed_count % 100 == 0:
elapsed_time = time.time() - start
files_per_minute = (completed_count / elapsed_time) * 60
remaining_files = total_files - completed_count
eta_minutes = remaining_files / (files_per_minute / 60) / 60
logging.info(f"""Processed {completed_count} out of {total_files} files.
Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes""")
except KeyboardInterrupt:
logging.warning("KeyboardInterrupt received. Cancelling pending tasks...")
executor.shutdown(wait=False, cancel_futures=True)
raise
elapsed_time = time.time() - start
logging.info(f"Interleaved processing completed in {elapsed_time:.2f} seconds")
logging.info("Writing CSV files...")
timeseries.write_results_to_csv(results, locations)
end = time.time()
elapsed_time = end - start
logging.info(f"All Complete total time {elapsed_time:.2f} seconds")