"""从 Copernicus CDS 下载 ERA5-Land 再分析数据(逐月,支持并行)""" import logging import time from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed import cdsapi from src.utils.config import ( CITIES, DATA_RAW, ERA5_START_YEAR, ERA5_END_YEAR, ERA5_VARIABLES, ) logger = logging.getLogger(__name__) def build_request(city: str, year: int, month: int) -> dict: lat = CITIES[city]["lat"] lon = CITIES[city]["lon"] return { "product_type": ["reanalysis"], "format": "netcdf", "variable": ERA5_VARIABLES, "year": [str(year)], "month": [f"{month:02d}"], "day": [f"{d:02d}" for d in range(1, 32)], "time": [f"{h:02d}:00" for h in [0, 6, 12, 18]], "area": [lat + 0.5, lon - 0.5, lat - 0.5, lon + 0.5], } def download_one_month(city: str, year: int, month: int) -> bool: """下载单月数据,返回 True 表示成功""" client = cdsapi.Client() out_dir = Path(DATA_RAW) / "era5" / city out_dir.mkdir(parents=True, exist_ok=True) out_path = out_dir / f"era5_{city}_{year}_{month:02d}.nc" if out_path.exists(): return True # 已存在,跳过 request = build_request(city, year, month) for attempt in range(1, 6): try: logger.info("请求 %s %d-%02d (第 %d/5 次)", city, year, month, attempt) client.retrieve("reanalysis-era5-land", request, str(out_path)) if out_path.exists() and out_path.stat().st_size > 0: return True else: logger.warning("文件为空 %s %d-%02d,重试", city, year, month) except Exception as e: delay = 60 * attempt logger.warning("失败 %s %d-%02d (第 %d/5 次): %s,%ds 后重试", city, year, month, attempt, str(e)[:100], delay) if attempt < 5: time.sleep(delay) return False def download_city(city: str, start_year: int = ERA5_START_YEAR, end_year: int = ERA5_END_YEAR, max_workers: int = 1): """并行下载(3线程),兼顾速度和 CDS 限流""" name = CITIES[city]["name"] tasks = [(city, y, m) for y in range(start_year, end_year + 1) for m in range(1, 13)] total = len(tasks) done = 0 fail = 0 # 先统计已存在的 existed = sum(1 for _, y, m in tasks if (Path(DATA_RAW) / "era5" / city / f"era5_{city}_{y}_{m:02d}.nc").exists()) if existed > 0: logger.info("%s: %d/%d 已存在,跳过", name, existed, total) done = existed with ThreadPoolExecutor(max_workers=max_workers) as pool: futures = {pool.submit(download_one_month, c, y, m): (y, m) for c, y, m in tasks if not (Path(DATA_RAW) / "era5" / city / f"era5_{c}_{y}_{m:02d}.nc").exists()} for f in as_completed(futures): y, m = futures[f] if f.result(): done += 1 else: fail += 1 if (done + fail) % 10 == 0 or (done + fail) == (total - existed): logger.info("%s: %d/%d 完成 (%d 失败)", name, done + existed, total, fail) if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) for city_name in CITIES: download_city(city_name)