feat: 添加探索性数据分析和多时间尺度高温风险预测模型

This commit is contained in:
2026-05-26 20:09:07 +08:00
parent a0478b0b11
commit eeab4d1330
3 changed files with 346 additions and 0 deletions
+82
View File
@@ -0,0 +1,82 @@
"""LSTM + Multi-Head Attention 多时间尺度预警模型"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from src.utils.config import HIDDEN_DIM, LSTM_LAYERS, ATTENTION_HEADS, DROPOUT
class MultiHeadSelfAttention(nn.Module):
"""多头自注意力层"""
def __init__(self, embed_dim: int, num_heads: int = ATTENTION_HEADS,
dropout: float = DROPOUT):
super().__init__()
assert embed_dim % num_heads == 0, f"embed_dim {embed_dim} must be divisible by num_heads {num_heads}"
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
self.qkv = nn.Linear(embed_dim, 3 * embed_dim)
self.out_proj = nn.Linear(embed_dim, embed_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
B, T, D = x.shape
qkv = self.qkv(x).reshape(B, T, 3, self.num_heads, self.head_dim)
qkv = qkv.permute(2, 0, 3, 1, 4) # (3, B, heads, T, head_dim)
q, k, v = qkv[0], qkv[1], qkv[2]
scale = self.head_dim ** -0.5
attn = (q @ k.transpose(-2, -1)) * scale
attn = F.softmax(attn, dim=-1)
attn = self.dropout(attn)
out = attn @ v # (B, heads, T, head_dim)
out = out.permute(0, 2, 1, 3).reshape(B, T, D)
return self.out_proj(out)
class HeatRiskPredictor(nn.Module):
"""LSTM-Attention 多时间尺度高温风险预测模型
Input: (B, T, input_dim) sequence of meteorological features
Output: dict with 'short', 'medium', 'long' keys, each (B, 4) logits
"""
def __init__(self, input_dim: int, hidden_dim: int = HIDDEN_DIM,
num_layers: int = LSTM_LAYERS, num_classes: int = 4):
super().__init__()
self.input_proj = nn.Linear(input_dim, hidden_dim)
self.lstm = nn.LSTM(
hidden_dim, hidden_dim, num_layers,
batch_first=True, bidirectional=True, dropout=DROPOUT if num_layers > 1 else 0,
)
lstm_out_dim = hidden_dim * 2 # bidirectional
self.attention = MultiHeadSelfAttention(lstm_out_dim)
self.lstm_proj = nn.Linear(lstm_out_dim, hidden_dim)
self.head_short = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(), nn.Dropout(DROPOUT),
nn.Linear(hidden_dim // 2, num_classes),
)
self.head_medium = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(), nn.Dropout(DROPOUT),
nn.Linear(hidden_dim // 2, num_classes),
)
self.head_long = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(), nn.Dropout(DROPOUT),
nn.Linear(hidden_dim // 2, num_classes),
)
def forward(self, x: torch.Tensor) -> dict:
x = self.input_proj(x)
lstm_out, _ = self.lstm(x)
attn_out = self.attention(lstm_out)
last_hidden = self.lstm_proj(attn_out[:, -1, :])
return {
"short": self.head_short(last_hidden),
"medium": self.head_medium(last_hidden),
"long": self.head_long(last_hidden),
}
+91
View File
@@ -0,0 +1,91 @@
"""XGBoost 基线模型 — 三个独立分类器用于多时间尺度对比"""
import numpy as np
import xgboost as xgb
from sklearn.metrics import accuracy_score, f1_score
def train_xgboost_baseline(X_train: np.ndarray, y_train: np.ndarray,
X_test: np.ndarray, y_test: np.ndarray) -> dict:
"""
训练三个独立的 XGBoost 分类器 (短/中/长期)。
Args:
X_train: (N, T, D) 训练特征,自动展平为 (N, T*D)
y_train: (N, 3) 标签矩阵,列顺序: short, medium, long
X_test: (N_test, T, D) 测试特征
y_test: (N_test, 3) 测试标签
Returns:
dict: {
"short": {"model": ..., "accuracy": ..., "f1_macro": ..., "predictions": ...},
"medium": {...},
"long": {...},
}
"""
# 展平时序特征为二维
N_train, T, D = X_train.shape
X_train_flat = X_train.reshape(N_train, T * D)
N_test = X_test.shape[0]
X_test_flat = X_test.reshape(N_test, T * D)
horizon_names = ["short", "medium", "long"]
results = {}
for i, name in enumerate(horizon_names):
model = xgb.XGBClassifier(
n_estimators=200,
max_depth=6,
learning_rate=0.05,
subsample=0.8,
colsample_bytree=0.8,
objective="multi:softmax",
num_class=4,
eval_metric="mlogloss",
random_state=42,
device="cuda",
)
model.fit(
X_train_flat, y_train[:, i],
eval_set=[(X_test_flat, y_test[:, i])],
verbose=False,
)
y_pred = model.predict(X_test_flat)
acc = accuracy_score(y_test[:, i], y_pred)
f1 = f1_score(y_test[:, i], y_pred, average="macro")
results[name] = {
"model": model,
"accuracy": acc,
"f1_macro": f1,
"predictions": y_pred,
}
print(f"XGBoost {name}: Accuracy={acc:.4f}, F1 Macro={f1:.4f}")
return results
def train_xgboost_single(X_train: np.ndarray, y_train: np.ndarray,
X_test: np.ndarray, y_test: np.ndarray,
horizon_idx: int = 0) -> dict:
"""训练单个时间尺度的XGBoost模型(用于单独调用)"""
N_train, T, D = X_train.shape
X_train_flat = X_train.reshape(N_train, T * D)
N_test = X_test.shape[0]
X_test_flat = X_test.reshape(N_test, T * D)
model = xgb.XGBClassifier(
n_estimators=200, max_depth=6, learning_rate=0.05,
subsample=0.8, colsample_bytree=0.8,
objective="multi:softmax", num_class=4,
eval_metric="mlogloss", random_state=42,
device="cuda",
)
model.fit(X_train_flat, y_train[:, horizon_idx], verbose=False)
y_pred = model.predict(X_test_flat)
return {
"model": model,
"accuracy": accuracy_score(y_test[:, horizon_idx], y_pred),
"f1_macro": f1_score(y_test[:, horizon_idx], y_pred, average="macro"),
"predictions": y_pred,
}