import logging import time import os import csv import concurrent.futures from pathlib import Path from config import Config from modules import BatchNimrod, GenerateTimeseries logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) def process_pipeline(dat_file): # 1. Process DAT to ASC asc_file = batch._process_single_file(dat_file) if not asc_file: return None # 2. Extract data from ASC file_results = timeseries.process_asc_file(asc_file, locations) return file_results if __name__ == "__main__": os.makedirs(Path(Config.ASC_TOP_FOLDER), exist_ok=True) os.makedirs(Path(Config.COMBINED_FOLDER), exist_ok=True) locations = [] zones = set() # load zone inputs here for file in os.listdir(Path(Config.ZONE_FOLDER)): with open(Path(Config.ZONE_FOLDER, file), "r") as csvfile: reader = csv.reader(csvfile) header = next(reader) # Skip header row for row in reader: # Extract the relevant fields: 1K Grid, Easting, Northing, Zone grid_name = row[0] # 1k Grid name easting = int(row[1]) # Easting column northing = int(row[2]) # Northing column zone = int(row[3]) # ZoneID column locations.append([grid_name, easting, northing, zone]) zones.add(zone) logging.info(f"Count of 1km Grids: {len(locations)}") logging.info(f"Count of Zones: {len(zones)}") batch = BatchNimrod(Config) timeseries = GenerateTimeseries(Config, locations) start = time.time() logging.info( "Starting interleaved processing of DAT files and Timeseries generation" ) # Initialize results structure results = {loc[0]: {"dates": [], "values": []} for loc in locations} # Get list of DAT files dat_files = [ f for f in os.listdir(Path(Config.DAT_TOP_FOLDER)) if not f.startswith(".") ] total_files = len(dat_files) logging.info(f"Processing {total_files} files concurrently...") with concurrent.futures.ThreadPoolExecutor() as executor: future_to_file = { executor.submit(process_pipeline, dat_file): dat_file for dat_file in dat_files } completed_count = 0 try: for future in concurrent.futures.as_completed(future_to_file): file_results = future.result() if file_results: for res in file_results: zone_id = res["zone_id"] results[zone_id]["dates"].append(res["date"]) results[zone_id]["values"].append(res["value"]) completed_count += 1 if completed_count % 100 == 0: elapsed_time = time.time() - start files_per_minute = (completed_count / elapsed_time) * 60 remaining_files = total_files - completed_count eta_minutes = remaining_files / (files_per_minute / 60) / 60 logging.info(f"""Processed {completed_count} out of {total_files} files. Speed: {files_per_minute:.2f} files/min. ETA: {eta_minutes:.2f} minutes""") except KeyboardInterrupt: logging.warning("KeyboardInterrupt received. Cancelling pending tasks...") executor.shutdown(wait=False, cancel_futures=True) raise elapsed_time = time.time() - start logging.info(f"Interleaved processing completed in {elapsed_time:.2f} seconds") logging.info("Writing CSV files...") timeseries.write_results_to_csv(results, locations) end = time.time() elapsed_time = end - start logging.info(f"All Complete total time {elapsed_time:.2f} seconds")