Files
Jake-Pullen a43edb1148 Extraction streamlining (#3)
* feat:  added the extraction process into the main multi threaded loop
Also added a warning when the app finds existing CSV files in the combined folder

* fix: 🐛 Fixed time calculations for ETA & Completion
2025-12-12 19:56:14 +00:00

73 lines
2.4 KiB
Python
Executable File

import tarfile
import gzip
import shutil
import os
from pathlib import Path
import concurrent.futures
class Extract:
# Directory containing .tar files
def __init__(self, Config):
self.config = Config
def extract_tar_batch(self, tar_files):
for tar_file in tar_files:
tar_path = Path(self.config.TAR_TOP_FOLDER, tar_file)
# Create a folder for extracted tar contents
extract_folder = Path(
self.config.GZ_TOP_FOLDER, tar_file.replace(".tar", "")
)
Path(extract_folder).mkdir(exist_ok=True)
# Extract .tar file
with tarfile.open(tar_path, "r") as tar:
tar.extractall(path=extract_folder)
if self.config.delete_tar_after_processing:
os.remove(tar_path)
def process_single_gz(self, gz_path, dat_path):
try:
with gzip.open(gz_path, "rb") as f_in:
with open(dat_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
if self.config.delete_gz_after_processing:
os.remove(gz_path)
except Exception as e:
print(f"Error extracting {gz_path}: {e}")
def extract_gz_batch(self):
gz_tasks = []
for root, _, files in os.walk(self.config.GZ_TOP_FOLDER):
for file in files:
# only handle .gz files
if not file.endswith(".dat.gz"):
continue
gz_path = Path(root, file)
dat_path = Path(self.config.DAT_TOP_FOLDER, file.replace(".gz", ""))
gz_tasks.append((gz_path, dat_path))
print(f"Extracting {len(gz_tasks)} gz files concurrently...")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.process_single_gz, gz_path, dat_path)
for gz_path, dat_path in gz_tasks
]
concurrent.futures.wait(futures)
try:
shutil.rmtree(self.config.GZ_TOP_FOLDER)
# Recreate the folder for the next batch
Path(self.config.GZ_TOP_FOLDER).mkdir(exist_ok=True)
print("processing complete and GZ files deleted")
except Exception as e:
print(str(e))
print(
f"processing complete but GZ folder delete failed. Please delete manually ({self.config.GZ_TOP_FOLDER})"
)