"""数据预处理管道 — 将 ERA5 NetCDF 原始数据转换为 ML 就绪的序列数据 工作流: NetCDF → 日聚合 → 特征工程 → 风险标签 → 序列化 (NPZ) 八个核心函数: 1. load_era5_city — 加载并拼接城市月度 NetCDF 2. compute_daily_aggregates — 6h→日平均, K→°C, 列重命名 3. compute_relative_humidity — Magnus 公式计算相对湿度 4. compute_heat_index — NOAA Rothfusz 公式计算体感温度 5. build_features — 滚动均值、滞后、热浪检测、季节 6. compute_risk_labels — 四级风险标签 (0-3) 7. create_sequences — 滑动窗口构建 (X, y) 样本 8. preprocess_all — 遍历所有城市执行完整管线 """ import logging from pathlib import Path import numpy as np import pandas as pd import xarray as xr from scipy import stats from src.utils.config import ( CITIES, DATA_PROCESSED, DATA_RAW, LOOKBACK_DAYS, PREDICTION_WINDOWS, ) logger = logging.getLogger(__name__) # ============================================================================ # 函数 1: 加载 ERA5 数据 # ============================================================================ def load_era5_city(city: str) -> xr.Dataset: """加载指定城市的所有月度 ERA5 NetCDF 文件并沿时间维度拼接 Args: city: 城市键名 (如 "jiaozuo", "zhengzhou") Returns: 沿 valid_time 维度拼接并去重排序后的 xarray Dataset Raises: FileNotFoundError: 当数据目录不存在或未找到任何 NetCDF 文件时 """ era5_dir = Path(DATA_RAW) / "era5" / city if not era5_dir.exists(): raise FileNotFoundError( f"ERA5 数据目录不存在: {era5_dir}\n" f"请先运行 python -m src.data.download_era5 下载数据" ) nc_files = sorted(era5_dir.glob("era5_*.nc")) if not nc_files: raise FileNotFoundError( f"在 {era5_dir} 中未找到任何 ERA5 NetCDF 文件\n" f"请先运行 python -m src.data.download_era5 下载数据" ) logger.info("加载 %s 的 %d 个月度 NetCDF 文件...", city, len(nc_files)) # 使用 open_mfdataset 自动沿时间维度拼接 combined = xr.open_mfdataset( nc_files, combine="by_coords", engine="netcdf4", chunks=None, # 小区域数据直接加载到内存 ) # 确保时间维度已排序且无重复 if "valid_time" in combined.dims: combined = combined.sortby("valid_time") _, unique_idx = np.unique(combined["valid_time"], return_index=True) combined = combined.isel(valid_time=sorted(unique_idx)) t0 = str(combined["valid_time"].values[0])[:10] t1 = str(combined["valid_time"].values[-1])[:10] logger.info("已加载 %s: %d 时间步 (%s ~ %s)", city, combined.dims["valid_time"], t0, t1) return combined # ============================================================================ # 函数 2: 日聚合 # ============================================================================ def compute_daily_aggregates(ds: xr.Dataset) -> pd.DataFrame: """将 6 小时间隔的 ERA5 数据重采样为日平均值 执行以下转换: - 重采样: 6h (valid_time) → 1D (天) - 温度单位: K (开尔文) → °C (摄氏度,减 273.15) - 降水单位: m → mm (乘 1000) - 列重命名: t2m→temp_mean, d2m→dewpoint_mean, sp→pressure_mean, u10→u_wind, v10→v_wind, tp→precip Args: ds: xarray Dataset,包含 valid_time 维度和 ERA5 变量 Returns: DataFrame,索引为 valid_time,列为重命名后的日平均值 """ # ERA5 变量短名 → 目标列名 VAR_MAP = { "t2m": "temp_mean", "d2m": "dewpoint_mean", "sp": "pressure_mean", "u10": "u_wind", "v10": "v_wind", "tp": "precip", } # 检查数据集中实际存在的变量 available = {era5_name: col_name for era5_name, col_name in VAR_MAP.items() if era5_name in ds.variables} if not available: logger.warning("数据集中无预期气象变量,可用变量: %s", list(ds.variables)) return pd.DataFrame() # 选取可用变量后重采样为日平均 daily_ds = ds[list(available.keys())].resample(valid_time="1D").mean() # 转为 DataFrame df = daily_ds.to_dataframe().reset_index() # 重命名列 df = df.rename(columns={k: v for k, v in available.items()}) # 温度变量: K → °C for temp_col in ["temp_mean", "dewpoint_mean"]: if temp_col in df.columns: df[temp_col] = df[temp_col] - 273.15 # 降水: m (ERA5 日均累积) → mm if "precip" in df.columns: df["precip"] = df["precip"] * 1000.0 logger.info("日聚合完成: %d 天, %d 变量", len(df), len(available)) return df # ============================================================================ # 函数 3: 相对湿度 (Magnus 公式) # ============================================================================ def compute_relative_humidity(temp_c: np.ndarray, dewpoint_c: np.ndarray) -> np.ndarray: """使用 Magnus 公式从气温和露点温度计算相对湿度 公式: e_s(T) = exp(a*T / (b+T)) (饱和水汽压) e_a(Td) = exp(a*Td / (b+Td)) (实际水汽压) RH = 100 * e_a / e_s = 100 * exp(a*Td/(b+Td) - a*T/(b+T)) Args: temp_c: 气温数组 (°C) dewpoint_c: 露点温度数组 (°C) Returns: 相对湿度数组 (%),值域 [0, 100] """ a = 17.27 b = 237.7 # °C gamma = (a * dewpoint_c) / (b + dewpoint_c) - (a * temp_c) / (b + temp_c) rh = 100.0 * np.exp(gamma) return np.clip(rh, 0.0, 100.0) # ============================================================================ # 函数 4: 体感温度 (NOAA Heat Index) # ============================================================================ def compute_heat_index(temp_c: np.ndarray, rh: np.ndarray) -> np.ndarray: """使用 NOAA Rothfusz 回归公式计算体感温度 (Heat Index) 算法: 1. °C → °F 转换 2. T ≥ 80°F (≈26.7°C) 时使用 Rothfusz 回归公式 3. T < 80°F 时使用简化线性公式 4. 根据湿度条件进行修正 (NOAA 官方方法) 5. °F → °C 转换 Args: temp_c: 气温 (°C) rh: 相对湿度 (%) Returns: 体感温度 (°C) """ # °C → °F t_f = temp_c * 9.0 / 5.0 + 32.0 # 简化公式: 用于 T < 80°F 时 # HI = 0.5 * [T + 61.0 + (T - 68.0)*1.2 + RH*0.094] hi_simple = 0.5 * (t_f + 61.0 + (t_f - 68.0) * 1.2 + (rh * 0.094)) # Rothfusz 回归公式: 用于 T ≥ 80°F 时 hi_rothfusz = ( -42.379 + 2.04901523 * t_f + 10.14333127 * rh - 0.22475541 * t_f * rh - 6.83783e-3 * (t_f ** 2) - 5.481717e-2 * (rh ** 2) + 1.22874e-3 * (t_f ** 2) * rh + 8.5282e-4 * t_f * (rh ** 2) - 1.99e-6 * (t_f ** 2) * (rh ** 2) ) # 湿度修正 (仅对符合条件的元素计算,避免 NaN 产生的 RuntimeWarning) # 低湿修正: RH < 13% 且 80°F < T < 112°F mask_low = (rh < 13.0) & (t_f > 80.0) & (t_f < 112.0) adj_low = np.where( mask_low, ((13.0 - rh) / 4.0) * np.sqrt(np.maximum((17.0 - np.abs(t_f - 95.0)) / 17.0, 0.0)), 0.0, ) # 高湿修正: RH > 85% 且 80°F < T < 87°F mask_high = (rh > 85.0) & (t_f > 80.0) & (t_f < 87.0) adj_high = np.where( mask_high, ((rh - 85.0) / 10.0) * ((87.0 - t_f) / 5.0), 0.0, ) # 组合: 选择公式 → 应用修正 hi_f = np.where(t_f >= 80.0, hi_rothfusz, hi_simple) hi_f = np.where(mask_low, hi_f - adj_low, hi_f) hi_f = np.where(mask_high, hi_f + adj_high, hi_f) # 体感温度不能低于实际气温 hi_f = np.maximum(hi_f, t_f) # °F → °C return (hi_f - 32.0) * 5.0 / 9.0 # ============================================================================ # 函数 5: 特征工程 # ============================================================================ def build_features(df: pd.DataFrame) -> pd.DataFrame: """从日聚合气象数据构建 ML 模型特征 生成以下特征: - rh : 相对湿度 - heat_index : 体感温度 (NOAA) - temp_7d_avg : 7 天滚动平均气温 - temp_14d_avg : 14 天滚动平均气温 - temp_lag_0..7: 滞后 0, 1, 3, 7 天的气温 - heatwave : 热浪标记 (连续 3 天体感温度 > 35°C) - heatwave_strength : 热浪期间平均体感温度 - month : 月份 (1-12) - season : 季节 (1=冬/2=春/3=夏/4=秋) Args: df: 日聚合 DataFrame,至少包含 temp_mean 和 dewpoint_mean Returns: 添加了所有特征列的 DataFrame (含 NaN 在滞后特征起始位置) Raises: KeyError: 缺少必要列时 """ df = df.copy() # 验证必要列 required = {"temp_mean", "dewpoint_mean"} missing = required - set(df.columns) if missing: raise KeyError(f"缺少必要列: {missing}。请确认 compute_daily_aggregates 的输出") # 检测时间列 if "valid_time" in df.columns: time_col = "valid_time" elif "time" in df.columns: time_col = "time" else: raise KeyError("DataFrame 缺少时间列 ('valid_time' 或 'time')") # --- 推导特征 --- # 相对湿度 df["rh"] = compute_relative_humidity( df["temp_mean"].values, df["dewpoint_mean"].values ) # 体感温度 df["heat_index"] = compute_heat_index( df["temp_mean"].values, df["rh"].values ) # 滚动平均气温 (min_periods=1 避免起始 NaN) df["temp_7d_avg"] = df["temp_mean"].rolling(window=7, min_periods=1).mean() df["temp_14d_avg"] = df["temp_mean"].rolling(window=14, min_periods=1).mean() # 滞后气温 df["temp_lag_0"] = df["temp_mean"] df["temp_lag_1"] = df["temp_mean"].shift(1) df["temp_lag_3"] = df["temp_mean"].shift(3) df["temp_lag_7"] = df["temp_mean"].shift(7) # 热浪检测: 连续 3 天体感温度 > 35°C hot_mask = (df["heat_index"] > 35.0).astype(int) df["heatwave"] = ( hot_mask.rolling(window=3, min_periods=3).sum() >= 3 ).astype(int) # 热浪强度: 热浪期间的体感温度均值 (非热浪天填 0) df["heatwave_strength"] = np.where( df["heatwave"] == 1, df["heat_index"].rolling(window=3, min_periods=3).mean(), 0.0, ) # 时间特征 time_series = pd.to_datetime(df[time_col]) df["month"] = time_series.dt.month # 季节编码: 12,1,2=冬(1) 3,4,5=春(2) 6,7,8=夏(3) 9,10,11=秋(4) # month % 12 // 3 + 1 恰好满足此映射 df["season"] = (time_series.dt.month % 12) // 3 + 1 # 统一时间列名为 "time" if time_col != "time": df["time"] = df[time_col] logger.info("特征工程完成: %d 行 x %d 列", len(df), len(df.columns)) return df # ============================================================================ # 函数 6: 风险等级标签 # ============================================================================ def compute_risk_labels(df: pd.DataFrame) -> pd.DataFrame: """根据体感温度和热浪状态计算四级风险等级标签 等级定义: 0 (低) : heat_index < 32°C 1 (中) : 32°C ≤ heat_index < 35°C 2 (高) : 35°C ≤ heat_index < 38°C OR 热浪期间 3 (严重) : heat_index ≥ 38°C AND 热浪期间 Args: df: 包含 heat_index 和 heatwave 列的 DataFrame Returns: 添加了 risk_label 列 (int64) 的 DataFrame """ df = df.copy() hi = df["heat_index"].values hw = df["heatwave"].values # 0 或 1 # 初始化全为 0 (低风险) risk = np.zeros(len(df), dtype=np.int64) # 等级 1: 中风险 risk = np.where((hi >= 32.0) & (hi < 35.0), 1, risk) # 等级 2: 高风险 (体感温度达到阈值 OR 热浪期间) risk = np.where(((hi >= 35.0) & (hi < 38.0)) | (hw == 1), 2, risk) # 等级 3: 严重风险 (体感温度极高 AND 热浪期间) risk = np.where((hi >= 38.0) & (hw == 1), 3, risk) df["risk_label"] = risk # 统计各等级分布 label_names = {0: "低", 1: "中", 2: "高", 3: "严重"} for level in range(4): count = int((risk == level).sum()) pct = count / len(df) * 100 logger.info(" 等级 %d (%s): %d 天 (%.1f%%)", level, label_names[level], count, pct) return df # ============================================================================ # 函数 7: 创建 ML 序列 # ============================================================================ def create_sequences( df: pd.DataFrame, lookback: int = LOOKBACK_DAYS, horizons: dict | None = None, ) -> tuple[np.ndarray, np.ndarray, list[str]]: """从特征 DataFrame 创建监督学习时间序列样本 对每个时间步 i (从 lookback 到数据末尾): X[i - lookback] = 特征矩阵 [i-lookback, i) 行,所有特征列 y[i - lookback] = [ 未来 3 天风险等级众数, 未来 7 天风险等级众数, 未来 30 天风险等级众数, ] 排除的特征列: time, valid_time, city, city_name, risk_label, month, season Args: df: 包含特征列和 risk_label 的 DataFrame lookback: 输入序列天数 horizons: 预测窗口字典 {"short": N, "medium": N, "long": N} Returns: X: float32 数组 (N_samples, lookback, n_features) y: int64 数组 (N_samples, 3),列对应 [short, medium, long] feature_cols: 用于 X 的特征列名列表 """ if horizons is None: horizons = PREDICTION_WINDOWS # 排除非特征列 exclude_cols = {"time", "valid_time", "city", "city_name", "risk_label", "month", "season"} feature_cols = [c for c in df.columns if c not in exclude_cols] # 仅保留数值型特征 feature_cols = [c for c in feature_cols if pd.api.types.is_numeric_dtype(df[c])] logger.info("序列特征 (%d): %s", len(feature_cols), feature_cols) n_total = len(df) horizon_order = ["short", "medium", "long"] horizon_values = [horizons[h] for h in horizon_order] max_horizon = max(horizon_values) X_list: list[np.ndarray] = [] y_list: list[list[int]] = [] for i in range(lookback, n_total): # 输入窗口: 前 lookback 天 x_win = df.iloc[i - lookback : i][feature_cols].values.astype(np.float32) X_list.append(x_win) # 目标: 各预测窗口的风险等级众数 y_row: list[int] = [] for h in horizon_values: end_idx = min(i + h, n_total) if end_idx > i: future = df.iloc[i:end_idx]["risk_label"].values mode_result = stats.mode(future, keepdims=False) # mode 可能是 0-d array 或标量 y_row.append(int(np.atleast_1d(mode_result.mode)[0])) else: y_row.append(int(df.iloc[-1]["risk_label"])) y_list.append(y_row) X = np.array(X_list, dtype=np.float32) y = np.array(y_list, dtype=np.int64) logger.info("序列创建完成: X%s, y%s", X.shape, y.shape) # 打印各窗口标签分布 for j, name in enumerate(horizon_order): values, counts = np.unique(y[:, j], return_counts=True) dist = {int(v): int(c) for v, c in zip(values, counts)} logger.info(" y_%s 分布: %s", name, dist) return X, y, feature_cols # ============================================================================ # 函数 8: 完整预处理管线 # ============================================================================ def preprocess_all() -> None: """执行完整的数据预处理管线 对配置中每个城市依次执行: 1. load_era5_city — 加载 NetCDF 2. compute_daily_aggregates — 日聚合 3. build_features — 特征工程 4. compute_risk_labels — 风险标签 5. dropna → 保存 feature CSV 6. create_sequences — 构建序列 → 保存 NPZ 最后合并所有城市数据,保存 combined CSV 和 NPZ 若 ERA5 数据尚未下载,会记录警告并跳过对应城市。 """ DATA_PROCESSED.mkdir(parents=True, exist_ok=True) combined_dfs: list[pd.DataFrame] = [] # 记录第一个城市的特征列名,用于合并 NPZ saved_feature_cols: list[str] = [] for city_key, city_info in CITIES.items(): city_name = city_info["name"] logger.info("=" * 60) logger.info(">>> 处理城市: %s (%s)", city_name, city_key) # ---- 1. 加载 ---- try: ds = load_era5_city(city_key) except FileNotFoundError as e: logger.warning("跳过 %s: %s", city_key, e) continue # ---- 2. 日聚合 ---- df = compute_daily_aggregates(ds) if df.empty: logger.warning("跳过 %s: 日聚合结果为空", city_key) continue # 添加城市标识列 df["city"] = city_key df["city_name"] = city_name # ---- 3. 特征工程 ---- df = build_features(df) # ---- 4. 风险标签 ---- df = compute_risk_labels(df) # ---- 5. 删除含 NaN 的行并保存 ---- df_clean = df.dropna().reset_index(drop=True) csv_path = DATA_PROCESSED / f"features_{city_key}.csv" df_clean.to_csv(csv_path, index=False, encoding="utf-8-sig") logger.info("已保存特征 CSV: %s (%d 行 x %d 列)", csv_path.name, len(df_clean), len(df_clean.columns)) # ---- 6. 创建序列 ---- X, y, feature_cols = create_sequences(df_clean) if not saved_feature_cols: saved_feature_cols = feature_cols npz_path = DATA_PROCESSED / f"sequences_{city_key}.npz" np.savez_compressed( npz_path, X=X, y=y, feature_cols=np.array(feature_cols, dtype=object), ) logger.info("已保存序列 NPZ: %s (X%s, y%s)", npz_path.name, X.shape, y.shape) combined_dfs.append(df_clean) # ---- 合并所有城市 ---- if not combined_dfs: logger.warning("没有城市完成处理。请先下载 ERA5 数据") logger.warning("运行: python -m src.data.download_era5") return # 合并 CSV combined = pd.concat(combined_dfs, ignore_index=True) combined_csv = DATA_PROCESSED / "features_combined.csv" combined.to_csv(combined_csv, index=False, encoding="utf-8-sig") logger.info("已保存合并特征 CSV: %s (%d 行)", combined_csv.name, len(combined)) # 合并 NPZ all_X, all_y = [], [] for city_key in CITIES: npz_path = DATA_PROCESSED / f"sequences_{city_key}.npz" if npz_path.exists(): data = np.load(npz_path, allow_pickle=True) all_X.append(data["X"]) all_y.append(data["y"]) if all_X and saved_feature_cols: combined_X = np.concatenate(all_X, axis=0) combined_y = np.concatenate(all_y, axis=0) combined_npz = DATA_PROCESSED / "sequences_combined.npz" np.savez_compressed( combined_npz, X=combined_X, y=combined_y, feature_cols=np.array(saved_feature_cols, dtype=object), ) logger.info("已保存合并序列 NPZ: %s (X%s, y%s)", combined_npz.name, combined_X.shape, combined_y.shape) logger.info("=" * 60) logger.info("数据预处理管线全部完成!") # ============================================================================ # CLI 入口 # ============================================================================ if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) preprocess_all()