2855 lines
129 KiB
Python
2855 lines
129 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
未来多时间点降雨预报系统
|
||
功能:
|
||
1. 预测未来一小时、两小时、三小时是否会降雨
|
||
2. 判断降雨后,分析未来一小时、两小时、三小时的降雨量
|
||
"""
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
import matplotlib.pyplot as plt
|
||
import seaborn as sns
|
||
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor, GradientBoostingClassifier, GradientBoostingRegressor
|
||
from sklearn.linear_model import LogisticRegression, Ridge
|
||
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold, cross_val_predict
|
||
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
|
||
from sklearn.metrics import roc_curve, auc, precision_recall_curve
|
||
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
|
||
from sklearn.preprocessing import StandardScaler, RobustScaler
|
||
from sklearn.calibration import CalibratedClassifierCV
|
||
from sklearn.base import clone
|
||
from sklearn.utils import resample
|
||
import joblib
|
||
import os
|
||
import warnings
|
||
import datetime
|
||
from datetime import timedelta
|
||
import time
|
||
import argparse
|
||
import traceback
|
||
warnings.filterwarnings('ignore')
|
||
|
||
class RainfallTwoStageModel:
|
||
"""
|
||
两阶段降雨预测模型:
|
||
- 第一阶段: 分类器预测是否会下雨(二分类)
|
||
- 第二阶段: 对预测有雨的样本,先分类降雨强度,再使用分层回归模型预测具体降雨量
|
||
"""
|
||
def __init__(self, debug=True, clf_model_type='ensemble', reg_model_type='gbdt', output_dir='figures'):
|
||
# 初始化模型组件
|
||
self.binary_classifier = None # 二分类器(判断是否下雨)
|
||
self.regressor = None # 回归器(单一模型,用作备选)
|
||
self.regressors = {} # 分层回归器,针对不同降雨强度
|
||
self.rain_category_classifier = None # 降雨类别分类器(小、中、大、暴雨)
|
||
self.scaler = None # 特征缩放器
|
||
self.threshold = 0.5 # 分类阈值
|
||
self.feature_importances_clf = None # 分类器特征重要性
|
||
self.feature_importances_reg = None # 回归器特征重要性
|
||
self.feature_names = None # 特征名称
|
||
self.debug = debug # 调试输出
|
||
self.clf_model_type = clf_model_type # 分类器类型
|
||
self.reg_model_type = reg_model_type # 回归器类型
|
||
self.log_transform = True # 是否对降雨量进行对数变换
|
||
self.feature_stats = {} # 存储特征统计信息
|
||
self.stratified_regression = True # 是否使用分层回归
|
||
self.rain_thresholds = [0.5, 3.0, 10.0] # 降雨类别阈值:小雨、中雨、大雨、暴雨
|
||
self.heavy_rain_factor = 3.0 # 大雨预测安全系数
|
||
self.extreme_rain_factor = 5.0 # 暴雨预测安全系数
|
||
self.time_points = [1, 2, 3] # 添加时间点属性
|
||
|
||
# 创建图表输出目录
|
||
self.output_dir = output_dir
|
||
if not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
|
||
def debug_print(self, message):
|
||
"""调试输出"""
|
||
if self.debug:
|
||
print(f"[调试] {message}")
|
||
|
||
def preprocess_data(self, data, silent=False):
|
||
"""
|
||
数据预处理:特征工程与数据清洗,增强时间序列特征
|
||
|
||
参数:
|
||
data: DataFrame 原始数据
|
||
silent: bool 是否抑制输出信息
|
||
|
||
返回:
|
||
df: DataFrame 处理后的数据
|
||
"""
|
||
df = data.copy()
|
||
|
||
#print("aaa")
|
||
#print(df)
|
||
# 确保datetime列是datetime格式
|
||
if 'datetime' in df.columns:
|
||
df['datetime'] = pd.to_datetime(df['datetime'])
|
||
|
||
# 数据类型转换,确保所有特征都是数值型
|
||
for col in df.columns:
|
||
if col not in ['datetime', 'timestamp', 'date', 'time']:
|
||
try:
|
||
df[col] = pd.to_numeric(df[col], errors='coerce')
|
||
except:
|
||
if not silent:
|
||
print(f"警告: 无法将列 '{col}' 转换为数值类型")
|
||
|
||
# 处理缺失值
|
||
if 'rain' in df.columns:
|
||
df = df.dropna(subset=['rain']) # 删除目标变量缺失的行
|
||
df = df.fillna(method='ffill').fillna(method='bfill') # 填充其他缺失值
|
||
|
||
# 温度单位检查(保留原始单位)
|
||
|
||
# 保存或使用特征统计信息
|
||
if not self.feature_stats:
|
||
# 训练期间:计算并保存统计数据
|
||
for var in ['rh', 'PWV', 'Temp', 'Press', 'ZTD']:
|
||
if var in df.columns:
|
||
self.feature_stats[f"{var}_mean"] = df[var].mean()
|
||
self.feature_stats[f"{var}_std"] = df[var].std()
|
||
|
||
#print(df.head(10))
|
||
# 计算全面的时间序列特征
|
||
# 1. 异常值(相对平均值的标准化偏差)
|
||
for var in ['rh', 'PWV', 'Temp', 'Press', 'ZTD']:
|
||
if var in df.columns:
|
||
# 使用保存的均值或计算新均值
|
||
mean_value = self.feature_stats.get(f"{var}_mean", df[var].mean())
|
||
std_value = self.feature_stats.get(f"{var}_std", df[var].std())
|
||
if std_value > 0:
|
||
df[f'{var}_anomaly'] = (df[var] - mean_value) / std_value
|
||
else:
|
||
df[f'{var}_anomaly'] = 0
|
||
|
||
# 2. 多时间尺度的变化率
|
||
for var in ['rh', 'PWV', 'ZTD', 'Press', 'Temp']:
|
||
if var in df.columns:
|
||
# 短期变化(1小时,3小时)
|
||
df[f'{var}_1h_change'] = df[var].diff(1).fillna(0)
|
||
df[f'{var}_3h_change'] = df[var].diff(3).fillna(0)
|
||
|
||
# 中期变化(6小时)
|
||
df[f'{var}_6h_change'] = df[var].diff(6).fillna(0)
|
||
|
||
# 长期变化(12小时)
|
||
df[f'{var}_12h_change'] = df[var].diff(12).fillna(0)
|
||
|
||
# 变化加速度(变化率的变化率)
|
||
df[f'{var}_change_accel'] = df[f'{var}_1h_change'].diff(1).fillna(0)
|
||
|
||
# 相对变化(百分比)
|
||
if df[var].mean() != 0:
|
||
df[f'{var}_rel_change'] = df[f'{var}_1h_change'] / df[var].shift(1).fillna(df[var].mean())
|
||
|
||
# 急剧变化指标(针对极端事件)
|
||
var_std = df[f'{var}_1h_change'].std()
|
||
if var_std > 0:
|
||
df[f'{var}_rapid_change'] = (np.abs(df[f'{var}_1h_change']) > (2 * var_std)).astype(int)
|
||
else:
|
||
df[f'{var}_rapid_change'] = 0
|
||
|
||
# 3. 时间窗口特征 - 多种滚动窗口
|
||
for var in ['PWV', 'rh', 'Temp', 'Press', 'ZTD']:
|
||
if var in df.columns:
|
||
# 短窗口(3小时滚动平均)
|
||
df[f'{var}_3h_mean'] = df[var].rolling(window=3, min_periods=1).mean()
|
||
|
||
# 中窗口(6小时滚动平均和标准差)
|
||
df[f'{var}_6h_mean'] = df[var].rolling(window=6, min_periods=1).mean()
|
||
df[f'{var}_6h_std'] = df[var].rolling(window=6, min_periods=1).std().fillna(0)
|
||
|
||
# 长窗口(12小时滚动平均)
|
||
df[f'{var}_12h_mean'] = df[var].rolling(window=12, min_periods=1).mean()
|
||
|
||
# 趋势指标:当前值与滚动平均的差异
|
||
df[f'{var}_vs_6h'] = df[var] - df[f'{var}_6h_mean']
|
||
|
||
# 最大/最小值指标
|
||
df[f'{var}_12h_max'] = df[var].rolling(window=12, min_periods=1).max()
|
||
df[f'{var}_12h_min'] = df[var].rolling(window=12, min_periods=1).min()
|
||
df[f'{var}_12h_range'] = df[f'{var}_12h_max'] - df[f'{var}_12h_min']
|
||
|
||
# 4. 天气模式指标和特征组合
|
||
if 'PWV' in df.columns and 'rh' in df.columns:
|
||
# 湿度可用性指数
|
||
df['PWV_rh'] = df['PWV'] * df['rh'] / 100
|
||
|
||
# 湿度变化指标
|
||
if 'PWV_1h_change' in df.columns and 'rh_1h_change' in df.columns:
|
||
df['moisture_change'] = df['PWV_1h_change'] * df['rh_1h_change']
|
||
|
||
# 累积湿度变化
|
||
df['PWV_cumul_change'] = df['PWV'].diff(1).rolling(window=6).sum().fillna(0)
|
||
df['rh_cumul_change'] = df['rh'].diff(1).rolling(window=6).sum().fillna(0)
|
||
|
||
if 'PWV' in df.columns and 'Temp' in df.columns:
|
||
# 温度-湿度相互作用
|
||
df['PWV_Temp'] = df['PWV'] * df['Temp']
|
||
|
||
# 饱和潜力
|
||
temp_exp = np.exp(17.27 * (df['Temp']-273.15) / ((df['Temp']-273.15) + 237.3)) # 转换为摄氏度计算
|
||
df['saturation_index'] = df['PWV'] / (0.1 * temp_exp)
|
||
|
||
if 'Press' in df.columns and 'Press_1h_change' in df.columns:
|
||
# 气压变化强度(较大的变化表示更强的天气系统)
|
||
df['Press_change_intensity'] = df['Press_1h_change'].abs()
|
||
|
||
# 气压系统变化:下降的气压往往与降雨系统有关
|
||
df['Press_falling'] = (df['Press_1h_change'] < 0).astype(int)
|
||
|
||
# 气压快速下降指标(与强降水相关)
|
||
press_quantile = df['Press_1h_change'].abs().quantile(0.75)
|
||
df['Press_rapid_fall'] = ((df['Press_1h_change'] < 0) &
|
||
(df['Press_1h_change'].abs() > press_quantile)).astype(int)
|
||
|
||
if 'ZTD' in df.columns and 'PWV' in df.columns:
|
||
# ZTD-PWV比率(大气条件指标)
|
||
df['ZTD_PWV_ratio'] = df['ZTD'] / df['PWV'].replace(0, np.nan).fillna(df['PWV'].mean())
|
||
|
||
# 组合变化指标
|
||
if 'ZTD_1h_change' in df.columns and 'PWV_1h_change' in df.columns:
|
||
df['ZTD_PWV_change'] = df['ZTD_1h_change'] * df['PWV_1h_change']
|
||
|
||
# 湿度不稳定指标
|
||
pwv_mean = df['PWV_6h_mean'].mean()
|
||
if pwv_mean > 0:
|
||
df['moisture_instability'] = df['PWV_6h_std'] / df['PWV_6h_mean']
|
||
else:
|
||
df['moisture_instability'] = 0
|
||
|
||
# 5. 基于多变量的降雨潜力指标
|
||
if all(var in df.columns for var in ['PWV', 'rh', 'Temp', 'Press_1h_change']):
|
||
# 组合降雨指标
|
||
df['rain_potential'] = (df['PWV'] * df['rh'] / 100) * (0.1 - df['Press_1h_change']) / (df['Temp'] + 10)
|
||
|
||
# 强降雨潜力指标
|
||
pwv_mean = df['PWV'].mean() + 0.1 # 避免除以零
|
||
df['heavy_rain_potential'] = df['PWV'] * (df['rh'] / 100) * (df['Press_rapid_fall'] + 1) * (
|
||
1 + np.abs(df['PWV_1h_change']) / pwv_mean)
|
||
|
||
# 降雨持续性指标
|
||
df['rain_persistence'] = df['PWV_cumul_change'] * df['rh_cumul_change'] * (
|
||
1 + df['Press_change_intensity'])
|
||
|
||
# 创建二值标签 - 简单明了:1表示有雨,0表示无雨
|
||
if 'rain' in df.columns:
|
||
df['rain_binary'] = (df['rain'] > 0.0).astype(int)
|
||
#df.loc[:, 'rain_binary'] = (df['rain'] > 0).astype(int)
|
||
|
||
|
||
|
||
# 创建降雨类别标签用于分层回归
|
||
df['rain_category'] = np.zeros(len(df), dtype=int) # 默认:无雨
|
||
if self.stratified_regression:
|
||
df.loc[df['rain'] > 0, 'rain_category'] = 1 # 小雨
|
||
df.loc[df['rain'] > self.rain_thresholds[0], 'rain_category'] = 2 # 中雨
|
||
df.loc[df['rain'] > self.rain_thresholds[1], 'rain_category'] = 3 # 大雨
|
||
df.loc[df['rain'] > self.rain_thresholds[2], 'rain_category'] = 4 # 暴雨
|
||
|
||
# 只在非静默模式下打印信息
|
||
if not silent:
|
||
# 打印降雨分布信息
|
||
rain_count = df['rain_binary'].sum()
|
||
print(f"\n降雨分布统计:")
|
||
print(f"总样本数: {len(df)}")
|
||
print(f"有雨样本: {rain_count} ({rain_count/len(df)*100:.2f}%)")
|
||
print(f"无雨样本: {len(df) - rain_count} ({(len(df)-rain_count)/len(df)*100:.2f}%)")
|
||
|
||
# 分析降雨量分布和类别
|
||
rain_samples = df[df['rain'] > 0]
|
||
if len(rain_samples) > 0:
|
||
print(f"\n降雨统计(仅有雨样本):")
|
||
print(f"最小值: {rain_samples['rain'].min():.2f}mm")
|
||
print(f"最大值: {rain_samples['rain'].max():.2f}mm")
|
||
print(f"平均值: {rain_samples['rain'].mean():.2f}mm")
|
||
print(f"中位数: {rain_samples['rain'].median():.2f}mm")
|
||
|
||
# 打印降雨类别
|
||
if self.stratified_regression:
|
||
light_rain = sum((df['rain'] > 0) & (df['rain'] <= self.rain_thresholds[0]))
|
||
medium_rain = sum((df['rain'] > self.rain_thresholds[0]) & (df['rain'] <= self.rain_thresholds[1]))
|
||
heavy_rain = sum((df['rain'] > self.rain_thresholds[1]) & (df['rain'] <= self.rain_thresholds[2]))
|
||
extreme_rain = sum(df['rain'] > self.rain_thresholds[2])
|
||
|
||
print(f"\n降雨类别:")
|
||
print(f"小雨 (0-{self.rain_thresholds[0]}mm): {light_rain} 样本 ({light_rain/rain_count*100:.2f}%)")
|
||
print(f"中雨 ({self.rain_thresholds[0]}-{self.rain_thresholds[1]}mm): {medium_rain} 样本 ({medium_rain/rain_count*100:.2f}%)")
|
||
print(f"大雨 ({self.rain_thresholds[1]}-{self.rain_thresholds[2]}mm): {heavy_rain} 样本 ({heavy_rain/rain_count*100:.2f}%)")
|
||
print(f"暴雨 (>{self.rain_thresholds[2]}mm): {extreme_rain} 样本 ({extreme_rain/rain_count*100:.2f}%)")
|
||
|
||
# 绘制降雨量直方图
|
||
plt.figure(figsize=(10, 6))
|
||
plt.hist(rain_samples['rain'].values, bins=20)
|
||
plt.title('Rainfall Distribution Histogram (Rain Samples Only)')
|
||
plt.xlabel('Rainfall (mm)')
|
||
plt.ylabel('Number of Samples')
|
||
plt.grid(True, alpha=0.3)
|
||
plt.savefig(f"{self.output_dir}/rainfall_distribution.png")
|
||
plt.close()
|
||
|
||
# 对数变换降雨量分析
|
||
plt.figure(figsize=(10, 6))
|
||
plt.hist(np.log1p(rain_samples['rain'].values), bins=20)
|
||
plt.title('Log-Transformed Rainfall Distribution (Rain Samples Only)')
|
||
plt.xlabel('log(Rainfall+1)')
|
||
plt.ylabel('Number of Samples')
|
||
plt.grid(True, alpha=0.3)
|
||
plt.savefig(f"{self.output_dir}/rainfall_log_distribution.png")
|
||
plt.close()
|
||
|
||
# 如果数据中有时间特征,添加时间相关特征
|
||
if 'datetime' in df.columns:
|
||
# 小时特征(一天中的时刻)
|
||
df['hour'] = df['datetime'].dt.hour
|
||
df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
|
||
df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
|
||
|
||
# 星期特征
|
||
df['day_of_week'] = df['datetime'].dt.dayofweek
|
||
df['weekend'] = (df['day_of_week'] >= 5).astype(int)
|
||
|
||
# 月份特征(季节性)
|
||
df['month'] = df['datetime'].dt.month
|
||
df['season'] = (df['month'] % 12 + 3) // 3
|
||
|
||
# 季节性循环特征
|
||
df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)
|
||
df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)
|
||
|
||
return df
|
||
|
||
def select_features(self, data, n_features=30, silent=False):
|
||
"""
|
||
特征选择:基于相关性和实验结果选择最佳特征子集
|
||
|
||
参数:
|
||
data: DataFrame 预处理后的数据
|
||
n_features: int 要选择的特征数量
|
||
silent: bool 是否抑制输出信息
|
||
|
||
返回:
|
||
X: DataFrame 选定的特征矩阵
|
||
y_binary: Series 二值分类标签
|
||
y_amount: Series 降雨量标签
|
||
y_category: Series (可选) 降雨类别标签
|
||
"""
|
||
# 选择基本特征(如果在预定义的feature_names中已存在)
|
||
if self.feature_names is not None:
|
||
# 检查预定义特征是否在数据中
|
||
available_features = [f for f in self.feature_names if f in data.columns]
|
||
if len(available_features) != len(self.feature_names):
|
||
missing = set(self.feature_names) - set(available_features)
|
||
if not silent:
|
||
self.debug_print(f"警告:以下预定义特征在数据中缺失:{missing}")
|
||
selected_features = available_features
|
||
else:
|
||
# 选择新特征 - 优先考虑时间序列和变化特征
|
||
basic_features = ['rh', 'Temp', 'PWV', 'ZTD', 'ws', 'Press']
|
||
|
||
# 不同时间尺度的变化特征
|
||
change_features = [
|
||
'PWV_1h_change', 'PWV_3h_change', 'PWV_6h_change', 'PWV_12h_change',
|
||
'ZTD_1h_change', 'ZTD_3h_change', 'ZTD_6h_change',
|
||
'Temp_1h_change', 'Temp_6h_change',
|
||
'Press_1h_change', 'Press_6h_change',
|
||
'rh_1h_change', 'rh_6h_change',
|
||
'PWV_change_accel', 'ZTD_change_accel',
|
||
'PWV_rapid_change', 'ZTD_rapid_change', 'Press_rapid_change'
|
||
]
|
||
|
||
# 趋势和窗口特征
|
||
window_features = [
|
||
'PWV_3h_mean', 'PWV_6h_mean', 'PWV_vs_6h',
|
||
'ZTD_6h_mean', 'ZTD_vs_6h',
|
||
'Temp_6h_mean', 'Press_6h_mean', 'rh_6h_mean',
|
||
'PWV_6h_std', 'ZTD_6h_std',
|
||
'PWV_12h_range', 'ZTD_12h_range'
|
||
]
|
||
|
||
# 异常和标准化特征
|
||
anomaly_features = [
|
||
'rh_anomaly', 'PWV_anomaly', 'ZTD_anomaly',
|
||
'Temp_anomaly', 'Press_anomaly'
|
||
]
|
||
|
||
# 组合特征和指标
|
||
combined_features = [
|
||
'PWV_rh', 'PWV_Temp', 'ZTD_PWV_ratio', 'moisture_change',
|
||
'ZTD_PWV_change', 'Press_change_intensity', 'rain_potential',
|
||
'saturation_index', 'moisture_instability', 'heavy_rain_potential',
|
||
'rain_persistence', 'PWV_cumul_change', 'Press_rapid_fall',
|
||
'Press_falling'
|
||
]
|
||
|
||
# 时间相关特征
|
||
time_features = [
|
||
'hour', 'hour_sin', 'hour_cos',
|
||
'day_of_week', 'weekend',
|
||
'month', 'season', 'month_sin', 'month_cos'
|
||
]
|
||
|
||
# 合并特征列表并过滤仅存在于数据集中的特征
|
||
all_features = (basic_features + change_features + window_features +
|
||
anomaly_features + combined_features + time_features)
|
||
|
||
# 过滤出数据中存在的特征
|
||
selected_features = [f for f in all_features if f in data.columns]
|
||
|
||
# 如果可用特征超过所需数量,根据对降雨预测的价值进行优先排序
|
||
if len(selected_features) > n_features and 'rain' in data.columns:
|
||
# 计算与降雨量的相关性
|
||
try:
|
||
rain_corr = data[selected_features].corrwith(data['rain']).abs()
|
||
|
||
# 按降雨相关性排序特征
|
||
sorted_features = rain_corr.sort_values(ascending=False).index.tolist()
|
||
except Exception as e:
|
||
if not silent:
|
||
print(f"计算特征相关性时出错: {str(e)}")
|
||
sorted_features = selected_features
|
||
|
||
# 始终保留一些基本特征,无论相关性如何
|
||
must_have_features = [f for f in [
|
||
'PWV', 'ZTD', 'rh', 'Temp', 'PWV_1h_change',
|
||
'ZTD_1h_change', 'Press_1h_change', 'saturation_index',
|
||
'heavy_rain_potential', 'rain_persistence'
|
||
] if f in selected_features]
|
||
|
||
# 将必备特征与相关性最高的特征结合,避免重复
|
||
remaining_features = [f for f in sorted_features if f not in must_have_features]
|
||
|
||
# 选择最终特征集
|
||
selected_features = must_have_features + remaining_features[:n_features - len(must_have_features)]
|
||
|
||
# 如果仍然有太多特征,只取前n_features个
|
||
if len(selected_features) > n_features:
|
||
selected_features = selected_features[:n_features]
|
||
|
||
# 保存选择的特征
|
||
self.feature_names = selected_features
|
||
|
||
# 获取特征矩阵
|
||
X = data[selected_features].copy()
|
||
|
||
# 获取目标变量
|
||
y_binary = data['rain_binary'].copy() if 'rain_binary' in data.columns else None
|
||
y_amount = data['rain'].copy() if 'rain' in data.columns else None
|
||
y_category = data['rain_category'].copy() if 'rain_category' in data.columns else None
|
||
|
||
# 打印选择的特征
|
||
if not silent:
|
||
print(f"\n选择的特征({len(selected_features)}):")
|
||
for i, feature in enumerate(selected_features):
|
||
print(f" {i+1}. {feature}")
|
||
|
||
return X, y_binary, y_amount, y_category
|
||
|
||
def split_data(self, X, y_binary, y_amount, y_category=None, time_based=False, test_size=0.2):
|
||
"""数据分割:可选择时间序列分割或随机分割"""
|
||
if time_based:
|
||
# 使用时间序列分割 - 假设数据按时间排序
|
||
train_size = int(len(X) * (1 - test_size))
|
||
X_train, X_test = X.iloc[:train_size], X.iloc[train_size:]
|
||
y_binary_train, y_binary_test = y_binary.iloc[:train_size], y_binary.iloc[train_size:]
|
||
y_amount_train, y_amount_test = y_amount.iloc[:train_size], y_amount.iloc[train_size:]
|
||
|
||
if y_category is not None:
|
||
y_category_train, y_category_test = y_category.iloc[:train_size], y_category.iloc[train_size:]
|
||
else:
|
||
y_category_train, y_category_test = None, None
|
||
|
||
print("\n使用基于时间序列的数据分割...")
|
||
else:
|
||
# 使用随机分层抽样
|
||
if y_category is not None:
|
||
X_train, X_test, y_binary_train, y_binary_test, y_amount_train, y_amount_test, y_category_train, y_category_test = train_test_split(
|
||
X, y_binary, y_amount, y_category, test_size=test_size, random_state=42, stratify=y_binary
|
||
)
|
||
else:
|
||
X_train, X_test, y_binary_train, y_binary_test, y_amount_train, y_amount_test = train_test_split(
|
||
X, y_binary, y_amount, test_size=test_size, random_state=42, stratify=y_binary
|
||
)
|
||
y_category_train, y_category_test = None, None
|
||
|
||
print("\n使用随机分层抽样进行数据分割...")
|
||
|
||
# 输出分割信息
|
||
print(f"训练集: {len(X_train)} 样本 ({sum(y_binary_train == 1)} 有雨, {sum(y_binary_train == 0)} 无雨)")
|
||
print(f"测试集: {len(X_test)} 样本 ({sum(y_binary_test == 1)} 有雨, {sum(y_binary_test == 0)} 无雨)")
|
||
|
||
# 检查训练集和测试集中的雨量比例
|
||
train_rain_ratio = sum(y_binary_train == 1) / len(y_binary_train)
|
||
test_rain_ratio = sum(y_binary_test == 1) / len(y_binary_test)
|
||
print(f"训练集降雨比例: {train_rain_ratio:.2%}")
|
||
print(f"测试集降雨比例: {test_rain_ratio:.2%}")
|
||
|
||
# 如果使用分层回归,打印类别分布
|
||
if y_category is not None:
|
||
for cat in sorted(y_category_train.unique()):
|
||
cat_count_train = sum(y_category_train == cat)
|
||
cat_count_test = sum(y_category_test == cat)
|
||
if cat > 0: # 仅打印降雨类别
|
||
cat_name = "小雨" if cat == 1 else "中雨" if cat == 2 else "大雨" if cat == 3 else "暴雨"
|
||
print(f"{cat_name}: {cat_count_train} 训练样本, {cat_count_test} 测试样本")
|
||
|
||
return X_train, X_test, y_binary_train, y_binary_test, y_amount_train, y_amount_test, y_category_train, y_category_test
|
||
|
||
def scale_features(self, X_train, X_test):
|
||
"""特征标准化:使用RobustScaler减少异常值的影响"""
|
||
# 首次调用时初始化缩放器
|
||
if self.scaler is None:
|
||
self.scaler = RobustScaler()
|
||
|
||
# 将DataFrame转换为numpy数组
|
||
if isinstance(X_train, pd.DataFrame):
|
||
X_train_np = X_train.values
|
||
else:
|
||
X_train_np = X_train
|
||
|
||
if isinstance(X_test, pd.DataFrame):
|
||
X_test_np = X_test.values
|
||
else:
|
||
X_test_np = X_test
|
||
|
||
# 应用RobustScaler
|
||
X_train_scaled = self.scaler.fit_transform(X_train_np)
|
||
X_test_scaled = self.scaler.transform(X_test_np)
|
||
|
||
return X_train_scaled, X_test_scaled
|
||
|
||
def build_binary_classifier(self):
|
||
"""构建二分类模型(是否下雨)"""
|
||
if self.clf_model_type == 'lr':
|
||
# 逻辑回归 - 简单且稳健
|
||
base_model = LogisticRegression(
|
||
C=0.05, # 更强的L2正则化
|
||
penalty='elasticnet',
|
||
solver='saga',
|
||
l1_ratio=0.2,
|
||
class_weight='balanced',
|
||
max_iter=3000,
|
||
random_state=42
|
||
)
|
||
param_grid = {
|
||
'C': [0.01, 0.05, 0.1, 0.5],
|
||
'l1_ratio':[0.1, 0.2, 0.3]
|
||
}
|
||
|
||
elif self.clf_model_type == 'rf':
|
||
# 随机森林 - 控制复杂度以避免过拟合
|
||
base_model = RandomForestClassifier(
|
||
n_estimators=200,
|
||
max_depth=8, # 限制树深度
|
||
min_samples_split=8,
|
||
min_samples_leaf=4,
|
||
max_features='sqrt', # 使用特征子集
|
||
bootstrap=True,
|
||
oob_score=True, # 启用袋外评分
|
||
class_weight='balanced_subsample',
|
||
random_state=42
|
||
)
|
||
param_grid = {
|
||
'max_depth': [6, 8, 10],
|
||
'n_estimators': [150, 200, 250],
|
||
'min_samples_leaf': [2, 4, 6]
|
||
}
|
||
|
||
elif self.clf_model_type == 'gbdt':
|
||
# 梯度提升树 - 通过慢学习率降低过拟合
|
||
base_model = GradientBoostingClassifier(
|
||
n_estimators=250,
|
||
max_depth=4, # 浅树
|
||
learning_rate=0.03, # 慢学习率
|
||
subsample=0.7, # 样本子采样
|
||
min_samples_split=15,
|
||
min_samples_leaf=10,
|
||
max_features=0.8, # 特征子采样
|
||
random_state=42
|
||
)
|
||
param_grid = {
|
||
'n_estimators': [200, 250, 300],
|
||
'max_depth': [3, 4, 5],
|
||
'learning_rate': [0.01, 0.03, 0.05],
|
||
'subsample': [0.7, 0.8, 0.9]
|
||
}
|
||
|
||
# 在RainfallTwoStageModel类的build_binary_classifier方法中添加
|
||
elif self.clf_model_type == 'xgboost':
|
||
from xgboost import XGBClassifier
|
||
base_model = XGBClassifier(
|
||
n_estimators=300, # 增加树数量
|
||
max_depth=5, # 适当增加深度
|
||
learning_rate=0.03, # 降低学习率
|
||
subsample=0.7, # 减少样本比例
|
||
colsample_bytree=0.7, # 减少特征比例
|
||
min_child_weight=5, # 增加叶子节点要求
|
||
gamma=0.2, # 增加分裂阈值
|
||
scale_pos_weight=15, # 提高正样本权重
|
||
reg_alpha=0.1, # L1正则化
|
||
reg_lambda=1.0, # L2正则化
|
||
random_state=42,
|
||
tree_method='hist', # 更快的直方图算法
|
||
eval_metric='auc' # 评估指标
|
||
)
|
||
param_grid = {
|
||
'n_estimators': [250, 300, 350],
|
||
'max_depth': [4, 5, 6],
|
||
'min_child_weight': [3, 5, 7],
|
||
'scale_pos_weight': [10, 15, 20],
|
||
'subsample': [0.6, 0.7, 0.8]
|
||
}
|
||
|
||
elif self.clf_model_type == 'ensemble':
|
||
# 简单投票集成
|
||
from sklearn.ensemble import VotingClassifier
|
||
from xgboost import XGBClassifier
|
||
|
||
estimators = [
|
||
('lr', LogisticRegression(C=0.05, class_weight='balanced', max_iter=3000, solver='saga')),
|
||
('rf', RandomForestClassifier(n_estimators=200, max_depth=6, min_samples_leaf=4, class_weight='balanced_subsample')),
|
||
('gb', GradientBoostingClassifier(n_estimators=200, max_depth=4, learning_rate=0.03, subsample=0.7)),
|
||
('xgb', XGBClassifier(n_estimators=250, max_depth=5, learning_rate=0.03, scale_pos_weight=15))
|
||
]
|
||
|
||
base_model = VotingClassifier(estimators=estimators, voting='soft')
|
||
param_grid = {} # 简化集成模型调优
|
||
|
||
else:
|
||
raise ValueError(f"不支持的分类器类型: {self.clf_model_type}")
|
||
|
||
return base_model, param_grid
|
||
|
||
def build_regressor(self):
|
||
"""构建回归模型"""
|
||
if self.reg_model_type == 'rf':
|
||
# 随机森林回归器
|
||
base_model = RandomForestRegressor(
|
||
n_estimators=100,
|
||
max_depth=5,
|
||
min_samples_split=5,
|
||
min_samples_leaf=5,
|
||
random_state=42
|
||
)
|
||
param_grid = {
|
||
'max_depth': [3, 5, 8],
|
||
'min_samples_leaf': [3, 5, 10]
|
||
}
|
||
|
||
elif self.reg_model_type == 'gbdt':
|
||
# 梯度提升树回归器
|
||
base_model = GradientBoostingRegressor(
|
||
n_estimators=100,
|
||
max_depth=3,
|
||
learning_rate=0.05,
|
||
subsample=0.8,
|
||
loss='squared_error',
|
||
random_state=42
|
||
)
|
||
param_grid = {
|
||
'max_depth': [2, 3],
|
||
'learning_rate': [0.01, 0.05, 0.1]
|
||
}
|
||
|
||
elif self.reg_model_type == 'ridge':
|
||
# 岭回归 - 简单线性模型
|
||
base_model = Ridge(
|
||
alpha=1.0,
|
||
random_state=42
|
||
)
|
||
param_grid = {
|
||
'alpha': [0.1, 1.0, 10.0]
|
||
}
|
||
|
||
elif self.reg_model_type == 'xgboost':
|
||
# XGBoost回归器
|
||
try:
|
||
from xgboost import XGBRegressor
|
||
|
||
# 基础模型配置 - 增强了过拟合控制
|
||
base_model = XGBRegressor(
|
||
n_estimators=200, # 增加树的数量以提高性能
|
||
max_depth=4, # 保持不变
|
||
learning_rate=0.05, # 保持不变
|
||
subsample=0.8, # 保持不变,有助于防止过拟合
|
||
colsample_bytree=0.8, # 保持不变,有助于防止过拟合
|
||
min_child_weight=3, # 新增:防止在少量样本上过拟合
|
||
gamma=0.1, # 新增:控制树节点分裂的最小损失减少
|
||
reg_alpha=0.1, # 新增:L1正则化,减少模型复杂性
|
||
reg_lambda=1.0, # 新增:L2正则化,减少模型复杂性
|
||
random_state=42,
|
||
n_jobs=-1 # 使用所有CPU核心加速训练
|
||
)
|
||
|
||
# 增加参数网格搜索范围
|
||
param_grid = {
|
||
'max_depth': [3, 4, 5],
|
||
'learning_rate': [0.01, 0.05, 0.1],
|
||
'min_child_weight': [1, 3, 5], # 新增:控制叶节点所需的最小样本权重和
|
||
'gamma': [0, 0.1, 0.2], # 新增:控制分裂所需的最小损失减少
|
||
'subsample': [0.7, 0.8, 0.9], # 新增:每棵树随机采样的比例
|
||
'colsample_bytree': [0.7, 0.8, 0.9] # 新增:每棵树随机采样的特征比例
|
||
}
|
||
|
||
# 针对不同时间点调整的特定参数
|
||
for time_point in self.time_points:
|
||
if time_point == 1: # 1小时预测
|
||
# 1小时预测结果较好,可使用略微复杂的模型
|
||
base_model.max_depth = 5
|
||
base_model.learning_rate = 0.1
|
||
param_grid = {
|
||
'max_depth': [4, 5, 6],
|
||
'learning_rate': [0.05, 0.1, 0.15],
|
||
'min_child_weight': [1, 3],
|
||
'gamma': [0, 0.1]
|
||
}
|
||
elif time_point == 2: # 2小时预测
|
||
# 2小时预测XGBoost性能略差,增强正则化
|
||
base_model.max_depth = 3
|
||
base_model.learning_rate = 0.05
|
||
base_model.subsample = 0.7
|
||
base_model.reg_alpha = 0.5
|
||
param_grid = {
|
||
'max_depth': [2, 3, 4],
|
||
'learning_rate': [0.01, 0.05],
|
||
'min_child_weight': [3, 5],
|
||
'gamma': [0.1, 0.2]
|
||
}
|
||
elif time_point >= 3: # 3小时及以上预测
|
||
# 3小时预测需要更强的正则化防止过拟合
|
||
base_model.max_depth = 3
|
||
base_model.learning_rate = 0.03
|
||
base_model.subsample = 0.7
|
||
base_model.colsample_bytree = 0.7
|
||
base_model.reg_alpha = 1.0
|
||
base_model.reg_lambda = 2.0
|
||
param_grid = {
|
||
'max_depth': [2, 3, 4],
|
||
'learning_rate': [0.01, 0.03, 0.05],
|
||
'min_child_weight': [3, 5, 7],
|
||
'gamma': [0.1, 0.2, 0.3]
|
||
}
|
||
|
||
# 如果是对暴雨类别数据的预测,采用特殊设置
|
||
if hasattr(self, 'current_rain_category') and self.current_rain_category == 'storm':
|
||
base_model.max_depth = 2
|
||
base_model.learning_rate = 0.01
|
||
base_model.min_child_weight = 5
|
||
base_model.subsample = 0.6
|
||
base_model.colsample_bytree = 0.6
|
||
base_model.reg_alpha = 2.0
|
||
base_model.reg_lambda = 3.0
|
||
param_grid = {
|
||
'max_depth': [2, 3],
|
||
'learning_rate': [0.005, 0.01],
|
||
'min_child_weight': [5, 7]
|
||
}
|
||
except ImportError:
|
||
print("XGBoost不可用,回退到GBDT")
|
||
return self.build_regressor()
|
||
|
||
else:
|
||
raise ValueError(f"不支持的回归器类型: {self.reg_model_type}")
|
||
|
||
return base_model, param_grid
|
||
|
||
def train_binary_classifier(self, X_train, y_train, X_val=None, y_val=None):
|
||
"""训练二分类降雨分类器(判断是否下雨)"""
|
||
print(f"\n训练降雨二分类器 ({self.clf_model_type})...")
|
||
|
||
# 构建基础模型
|
||
base_model, param_grid = self.build_binary_classifier()
|
||
|
||
# 如果有参数需要调优且不是集成模型
|
||
if param_grid and self.clf_model_type != 'ensemble':
|
||
# 使用GridSearchCV查找最佳参数
|
||
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
|
||
grid_search = GridSearchCV(
|
||
estimator=base_model,
|
||
param_grid=param_grid,
|
||
cv=cv,
|
||
scoring='f1', # 使用F1分数
|
||
n_jobs=-1
|
||
)
|
||
|
||
# 训练模型
|
||
grid_search.fit(X_train, y_train)
|
||
|
||
# 获取最佳模型
|
||
best_model = grid_search.best_estimator_
|
||
print(f"最佳分类器参数: {grid_search.best_params_}")
|
||
else:
|
||
# 直接使用基础模型
|
||
best_model = base_model
|
||
best_model.fit(X_train, y_train)
|
||
|
||
# 应用概率校准 - 使用验证集进行校准
|
||
if X_val is not None and y_val is not None and hasattr(best_model, "predict_proba"):
|
||
print("使用验证集进行概率校准...")
|
||
calibrator = CalibratedClassifierCV(
|
||
best_model,
|
||
method='sigmoid', # 或 'isotonic'
|
||
cv='prefit' # 使用已训练模型
|
||
)
|
||
calibrator.fit(X_val, y_val)
|
||
self.binary_classifier = calibrator
|
||
else:
|
||
self.binary_classifier = best_model
|
||
|
||
# 检查特征重要性(如果模型支持)
|
||
if hasattr(best_model, 'feature_importances_'):
|
||
self.feature_importances_clf = best_model.feature_importances_
|
||
|
||
# 输出重要特征
|
||
if self.feature_names is not None:
|
||
importance_pairs = sorted(zip(self.feature_names, self.feature_importances_clf),
|
||
key=lambda x: x[1], reverse=True)
|
||
print("\n分类器特征重要性:")
|
||
for feature, importance in importance_pairs:
|
||
print(f" {feature}: {importance:.4f}")
|
||
|
||
# 绘制特征重要性
|
||
plt.figure(figsize=(12, 8))
|
||
features = [x[0] for x in importance_pairs]
|
||
importances = [x[1] for x in importance_pairs]
|
||
plt.barh(range(len(features)), importances, align='center')
|
||
plt.yticks(range(len(features)), features)
|
||
plt.xlabel('Importance')
|
||
plt.title('Binary Classifier Feature Importance')
|
||
plt.tight_layout()
|
||
plt.savefig(f"{self.output_dir}/binary_classifier_feature_importance.png")
|
||
plt.close()
|
||
|
||
# 评估训练集性能
|
||
y_train_pred = self.binary_classifier.predict(X_train)
|
||
train_accuracy = accuracy_score(y_train, y_train_pred)
|
||
train_precision = precision_score(y_train, y_train_pred)
|
||
train_recall = recall_score(y_train, y_train_pred)
|
||
train_f1 = f1_score(y_train, y_train_pred)
|
||
|
||
print(f"二分类器训练集性能: 准确率={train_accuracy:.4f}, 精确率={train_precision:.4f}, "
|
||
f"召回率={train_recall:.4f}, F1分数={train_f1:.4f}")
|
||
|
||
return self.binary_classifier
|
||
|
||
def train_category_classifier(self, X_train, y_category, rain_mask):
|
||
"""训练降雨强度类别分类器(小雨、中雨、大雨、暴雨)"""
|
||
# 仅使用降雨样本进行类别预测
|
||
X_train_rain = X_train[rain_mask]
|
||
y_category_rain = y_category[rain_mask]
|
||
|
||
unique_categories = np.unique(y_category_rain)
|
||
if len(unique_categories) < 2:
|
||
print(f"警告: 降雨类别样本不足,无法训练分类器 (唯一类别: {unique_categories})")
|
||
return None
|
||
|
||
print(f"\n训练降雨类别分类器,使用 {len(X_train_rain)} 个降雨样本...")
|
||
|
||
# 计算类别权重 - 反比于类别频率,加强对大雨和暴雨的权重
|
||
try:
|
||
# 将类别转换为整数
|
||
y_category_rain_int = y_category_rain.astype(int)
|
||
|
||
# 计算类别频率
|
||
class_counts = np.bincount(y_category_rain_int)[1:] # 跳过0类别
|
||
total_samples = len(y_category_rain)
|
||
|
||
# 应用不平衡权重 - 大雨和暴雨类别权重更高
|
||
class_weights = {}
|
||
for i, count in enumerate(class_counts):
|
||
category = i + 1 # 类别从1开始
|
||
# 小雨:1倍,中雨:2倍,大雨:4倍,暴雨:8倍基础权重
|
||
multiplier = 1 if category == 1 else 2 if category == 2 else 4 if category == 3 else 8
|
||
if count > 0:
|
||
class_weights[category] = (total_samples / (count * len(class_counts))) * multiplier
|
||
else:
|
||
class_weights[category] = 1.0
|
||
|
||
print("类别权重:")
|
||
for cat, weight in class_weights.items():
|
||
cat_name = "小雨" if cat == 1 else "中雨" if cat == 2 else "大雨" if cat == 3 else "暴雨"
|
||
print(f" {cat_name}: {weight:.2f}")
|
||
except Exception as e:
|
||
print(f"计算类别权重时出错: {str(e)}")
|
||
# 使用默认权重
|
||
class_weights = {1: 1.0, 2: 2.0, 3: 4.0, 4: 8.0}
|
||
print("使用默认类别权重")
|
||
|
||
# 对大雨和暴雨样本进行过采样
|
||
oversampled_X = []
|
||
oversampled_y = []
|
||
|
||
# 添加所有原始样本
|
||
oversampled_X.append(X_train_rain)
|
||
oversampled_y.append(y_category_rain)
|
||
|
||
# 过采样大雨样本
|
||
try:
|
||
heavy_mask = (y_category_rain == 3)
|
||
heavy_count = sum(heavy_mask)
|
||
if heavy_count > 0 and heavy_count < 100:
|
||
X_heavy = X_train_rain[heavy_mask]
|
||
y_heavy = y_category_rain[heavy_mask]
|
||
|
||
# 过采样到100个样本或原始样本的5倍,以较小者为准
|
||
target_samples = min(100, heavy_count * 5)
|
||
X_heavy_resampled, y_heavy_resampled = resample(
|
||
X_heavy, y_heavy,
|
||
replace=True,
|
||
n_samples=target_samples,
|
||
random_state=42
|
||
)
|
||
oversampled_X.append(X_heavy_resampled)
|
||
oversampled_y.append(y_heavy_resampled)
|
||
print(f"过采样大雨样本: {heavy_count} -> {target_samples}")
|
||
except Exception as e:
|
||
print(f"过采样大雨样本时出错: {str(e)}")
|
||
|
||
# 过采样暴雨样本
|
||
try:
|
||
extreme_mask = (y_category_rain == 4)
|
||
extreme_count = sum(extreme_mask)
|
||
if extreme_count > 0 and extreme_count < 50:
|
||
X_extreme = X_train_rain[extreme_mask]
|
||
y_extreme = y_category_rain[extreme_mask]
|
||
|
||
# 过采样到50个样本或原始样本的10倍,以较小者为准
|
||
target_samples = min(50, extreme_count * 10)
|
||
X_extreme_resampled, y_extreme_resampled = resample(
|
||
X_extreme, y_extreme,
|
||
replace=True,
|
||
n_samples=target_samples,
|
||
random_state=42
|
||
)
|
||
oversampled_X.append(X_extreme_resampled)
|
||
oversampled_y.append(y_extreme_resampled)
|
||
print(f"过采样暴雨样本: {extreme_count} -> {target_samples}")
|
||
except Exception as e:
|
||
print(f"过采样暴雨样本时出错: {str(e)}")
|
||
|
||
# 合并所有样本
|
||
try:
|
||
if len(oversampled_X) > 1:
|
||
X_train_rain_balanced = np.vstack(oversampled_X)
|
||
y_category_rain_balanced = np.hstack(oversampled_y)
|
||
else:
|
||
X_train_rain_balanced = X_train_rain
|
||
y_category_rain_balanced = y_category_rain
|
||
|
||
print(f"平衡后的训练集大小: {len(X_train_rain_balanced)}")
|
||
except Exception as e:
|
||
print(f"合并过采样数据时出错: {str(e)}")
|
||
# 使用原始数据
|
||
X_train_rain_balanced = X_train_rain
|
||
y_category_rain_balanced = y_category_rain
|
||
print("使用原始未平衡数据")
|
||
|
||
# 使用随机森林分类器进行类别预测,增加树的深度和数量
|
||
classifier = RandomForestClassifier(
|
||
n_estimators=200, # 增加树的数量
|
||
max_depth=10, # 增加深度
|
||
min_samples_leaf=2,
|
||
class_weight=class_weights, # 使用类别权重
|
||
random_state=42
|
||
)
|
||
|
||
# 训练分类器
|
||
classifier.fit(X_train_rain_balanced, y_category_rain_balanced)
|
||
|
||
# 检查性能
|
||
y_pred = classifier.predict(X_train_rain)
|
||
accuracy = accuracy_score(y_category_rain, y_pred)
|
||
|
||
print(f"降雨类别分类器训练准确率: {accuracy:.4f}")
|
||
|
||
# 检查类别分布
|
||
try:
|
||
class_counts = np.bincount(y_category_rain.astype(int))
|
||
class_predictions = np.bincount(y_pred.astype(int))
|
||
|
||
# 确保结果具有相同的大小
|
||
if len(class_counts) > len(class_predictions):
|
||
class_predictions = np.pad(class_predictions, (0, len(class_counts) - len(class_predictions)))
|
||
elif len(class_predictions) > len(class_counts):
|
||
class_counts = np.pad(class_counts, (0, len(class_predictions) - len(class_counts)))
|
||
|
||
print("降雨类别分布:")
|
||
categories = ["无雨", "小雨", "中雨", "大雨", "暴雨"]
|
||
for i in range(1, min(len(class_counts), len(categories))): # 跳过"无雨"(0)
|
||
print(f" {categories[i]}: {class_counts[i]} 实际, {class_predictions[i]} 预测")
|
||
except Exception as e:
|
||
print(f"统计类别分布时出错: {str(e)}")
|
||
|
||
# 混淆矩阵
|
||
try:
|
||
confusion = confusion_matrix(y_category_rain, y_pred)
|
||
print("\n类别分类器混淆矩阵:")
|
||
|
||
# 显示混淆矩阵,标签为雨量类别
|
||
unique_cats = sorted(np.unique(np.concatenate([y_category_rain, y_pred])))
|
||
label_names = ["小雨" if cat == 1 else "中雨" if cat == 2 else "大雨" if cat == 3 else "暴雨" for cat in unique_cats]
|
||
|
||
print("预测 →")
|
||
print("实际 ↓")
|
||
print(" " + " ".join(f"{name:8s}" for name in label_names))
|
||
for i, name in enumerate(label_names):
|
||
row_values = " ".join(f"{confusion[i, j]:8d}" for j in range(len(label_names)))
|
||
print(f"{name:6s} {row_values}")
|
||
except Exception as e:
|
||
print(f"计算混淆矩阵时出错: {str(e)}")
|
||
|
||
# 保存分类器
|
||
self.rain_category_classifier = classifier
|
||
|
||
return classifier
|
||
|
||
def train_stratified_regressors(self, X_train, y_train, y_category, rain_mask):
|
||
"""训练针对不同降雨强度的分层回归模型"""
|
||
print("\n训练分层降雨回归器...")
|
||
|
||
# 获取降雨样本
|
||
X_train_rain = X_train[rain_mask]
|
||
y_train_rain = y_train[rain_mask]
|
||
y_category_rain = y_category[rain_mask]
|
||
|
||
# 应用降雨量的对数变换
|
||
if self.log_transform:
|
||
print("对降雨量应用对数变换 (log(rain+1))...")
|
||
y_train_rain_transformed = np.log1p(y_train_rain)
|
||
else:
|
||
y_train_rain_transformed = y_train_rain
|
||
|
||
# 检查降雨类别的分布
|
||
categories = sorted(np.unique(y_category_rain))
|
||
for cat in categories:
|
||
if cat == 0: # 跳过无雨类别
|
||
continue
|
||
|
||
cat_mask = (y_category_rain == cat)
|
||
count = sum(cat_mask)
|
||
|
||
cat_name = "小雨" if cat == 1 else "中雨" if cat == 2 else "大雨" if cat == 3 else "暴雨"
|
||
print(f"{cat_name}样本: {count}")
|
||
|
||
if count < 10:
|
||
print(f" 警告: {cat_name}样本太少,无法训练可靠模型")
|
||
|
||
# 为每个降雨类别构建并训练模型
|
||
base_model, _ = self.build_regressor()
|
||
self.regressors = {}
|
||
|
||
for cat in categories:
|
||
if cat == 0: # 跳过无雨类别
|
||
continue
|
||
|
||
cat_mask = (y_category_rain == cat)
|
||
count = sum(cat_mask)
|
||
|
||
if count < 5:
|
||
print(f"跳过样本数量为{count}的类别{cat},样本太少")
|
||
continue # 跳过样本太少的类别
|
||
|
||
cat_name = "light" if cat == 1 else "medium" if cat == 2 else "heavy" if cat == 3 else "extreme"
|
||
reg_name = "light" if cat == 1 else "medium" if cat == 2 else "heavy" if cat == 3 else "extreme"
|
||
|
||
# 对大雨和暴雨样本进行过采样
|
||
try:
|
||
if cat >= 3 and count < 100: # 大雨和暴雨样本
|
||
# 简单过采样
|
||
X_rain_cat = X_train_rain[cat_mask]
|
||
y_rain_cat = y_train_rain_transformed[cat_mask]
|
||
|
||
# 过采样量取决于类别:大雨5倍,暴雨10倍
|
||
multiplier = 5 if cat == 3 else 10
|
||
target_samples = min(100, count * multiplier)
|
||
X_resampled, y_resampled = resample(
|
||
X_rain_cat, y_rain_cat,
|
||
replace=True,
|
||
n_samples=target_samples,
|
||
random_state=42
|
||
)
|
||
print(f"\n训练{cat_name}回归器,使用 {count} 个原始样本, 过采样至 {target_samples} 个样本...")
|
||
|
||
# 克隆基础模型并训练
|
||
regressor = clone(base_model)
|
||
regressor.fit(X_resampled, y_resampled)
|
||
else:
|
||
print(f"\n训练{cat_name}回归器,使用 {count} 个样本...")
|
||
|
||
# 克隆基础模型并训练
|
||
regressor = clone(base_model)
|
||
regressor.fit(X_train_rain[cat_mask], y_train_rain_transformed[cat_mask])
|
||
except Exception as e:
|
||
print(f"训练{cat_name}回归器时出错: {str(e)}")
|
||
# 尝试使用基本训练方法(无过采样)
|
||
try:
|
||
print(f"尝试使用基本训练方法...")
|
||
regressor = clone(base_model)
|
||
regressor.fit(X_train_rain[cat_mask], y_train_rain_transformed[cat_mask])
|
||
except Exception as e2:
|
||
print(f"基本训练方法也失败: {str(e2)}")
|
||
continue # 跳过此类别
|
||
|
||
# 存储训练好的模型
|
||
self.regressors[reg_name] = regressor
|
||
|
||
# 在训练数据上评估
|
||
try:
|
||
y_pred_transformed = regressor.predict(X_train_rain[cat_mask])
|
||
|
||
if self.log_transform:
|
||
y_pred = np.expm1(y_pred_transformed)
|
||
else:
|
||
y_pred = y_pred_transformed
|
||
|
||
# 防止负值
|
||
y_pred = np.maximum(y_pred, 0)
|
||
|
||
# 计算评估指标
|
||
rmse = np.sqrt(mean_squared_error(y_train_rain[cat_mask], y_pred))
|
||
mae = mean_absolute_error(y_train_rain[cat_mask], y_pred)
|
||
r2 = r2_score(y_train_rain[cat_mask], y_pred)
|
||
|
||
print(f"{cat_name}回归器训练指标: RMSE={rmse:.4f}, MAE={mae:.4f}, R²={r2:.4f}")
|
||
|
||
# 如果模型有特征重要性,保存它们
|
||
if hasattr(regressor, 'feature_importances_') and self.feature_names is not None:
|
||
importance_pairs = sorted(zip(self.feature_names, regressor.feature_importances_),
|
||
key=lambda x: x[1], reverse=True)
|
||
print(f"\n{cat_name}回归器特征重要性(前10):")
|
||
for feature, importance in importance_pairs[:10]:
|
||
print(f" {feature}: {importance:.4f}")
|
||
|
||
# 绘制特征重要性
|
||
plt.figure(figsize=(12, 8))
|
||
features = [x[0] for x in importance_pairs[:15]]
|
||
importances = [x[1] for x in importance_pairs[:15]]
|
||
plt.barh(range(len(features)), importances, align='center')
|
||
plt.yticks(range(len(features)), features)
|
||
plt.xlabel('Importance')
|
||
plt.title(f'{cat_name} Rain Regressor Feature Importance')
|
||
plt.tight_layout()
|
||
plt.savefig(f"{self.output_dir}/{reg_name}_regressor_importance.png")
|
||
plt.close()
|
||
|
||
# 绘制实际值与预测值
|
||
plt.figure(figsize=(10, 6))
|
||
plt.scatter(y_train_rain[cat_mask], y_pred, alpha=0.6)
|
||
max_val = max(max(y_train_rain[cat_mask]), max(y_pred))
|
||
plt.plot([0, max_val], [0, max_val], 'r--')
|
||
plt.xlabel('Actual Rainfall (mm)')
|
||
plt.ylabel('Predicted Rainfall (mm)')
|
||
plt.title(f'{cat_name} Rain - Actual vs Predicted (Training)')
|
||
plt.grid(True)
|
||
plt.tight_layout()
|
||
plt.savefig(f"{self.output_dir}/{reg_name}_train_performance.png")
|
||
plt.close()
|
||
except Exception as e:
|
||
print(f"评估{cat_name}回归器时出错: {str(e)}")
|
||
|
||
return self.regressors
|
||
|
||
def train_regressor(self, X_train, y_train, rain_mask):
|
||
"""训练降雨量回归器,仅使用降雨样本"""
|
||
# 获取降雨样本
|
||
X_train_rain = X_train[rain_mask]
|
||
y_train_rain = y_train[rain_mask]
|
||
|
||
if len(X_train_rain) < 10:
|
||
print("警告: 降雨样本太少,无法训练回归模型")
|
||
return None
|
||
|
||
print(f"\n训练降雨量回归器 ({self.reg_model_type}),使用 {len(X_train_rain)} 个降雨样本...")
|
||
|
||
# 考虑降雨量的对数变换
|
||
log_transform = True # 默认使用对数变换
|
||
self.log_transform = log_transform
|
||
|
||
if log_transform:
|
||
print("对降雨量应用对数变换 (log(rain+1))...")
|
||
y_train_rain_transformed = np.log1p(y_train_rain)
|
||
else:
|
||
y_train_rain_transformed = y_train_rain
|
||
|
||
# 构建回归模型
|
||
base_model, param_grid = self.build_regressor()
|
||
|
||
# 如果样本足够多,调整参数
|
||
try:
|
||
if len(X_train_rain) >= 50 and param_grid:
|
||
from sklearn.model_selection import KFold
|
||
cv = KFold(n_splits=5, shuffle=True, random_state=42)
|
||
grid_search = GridSearchCV(
|
||
estimator=base_model,
|
||
param_grid=param_grid,
|
||
cv=cv,
|
||
scoring='neg_mean_squared_error',
|
||
n_jobs=-1
|
||
)
|
||
|
||
# 训练模型
|
||
grid_search.fit(X_train_rain, y_train_rain_transformed)
|
||
|
||
# 获取最佳模型
|
||
best_model = grid_search.best_estimator_
|
||
print(f"最佳回归器参数: {grid_search.best_params_}")
|
||
else:
|
||
# 直接使用基础模型
|
||
best_model = base_model
|
||
best_model.fit(X_train_rain, y_train_rain_transformed)
|
||
|
||
# 保存模型
|
||
self.regressor = best_model
|
||
except Exception as e:
|
||
print(f"训练回归器时出错: {str(e)}")
|
||
print(f"尝试使用简化的回归器...")
|
||
# 使用简化的回归器
|
||
try:
|
||
simple_model = RandomForestRegressor(n_estimators=50, max_depth=3, random_state=42)
|
||
simple_model.fit(X_train_rain, y_train_rain_transformed)
|
||
self.regressor = simple_model
|
||
except Exception as e2:
|
||
print(f"简化回归器训练也失败: {str(e2)}")
|
||
return None
|
||
|
||
# 检查特征重要性(如果模型支持)
|
||
if hasattr(self.regressor, 'feature_importances_'):
|
||
self.feature_importances_reg = self.regressor.feature_importances_
|
||
|
||
# 输出重要特征
|
||
if self.feature_names is not None:
|
||
importance_pairs = sorted(zip(self.feature_names, self.feature_importances_reg),
|
||
key=lambda x: x[1], reverse=True)
|
||
print("\n回归器特征重要性:")
|
||
for feature, importance in importance_pairs:
|
||
print(f" {feature}: {importance:.4f}")
|
||
|
||
# 绘制特征重要性
|
||
plt.figure(figsize=(12, 8))
|
||
features = [x[0] for x in importance_pairs]
|
||
importances = [x[1] for x in importance_pairs]
|
||
plt.barh(range(len(features)), importances, align='center')
|
||
plt.yticks(range(len(features)), features)
|
||
plt.xlabel('Importance')
|
||
plt.title('Regressor Feature Importance')
|
||
plt.tight_layout()
|
||
plt.savefig(f"{self.output_dir}/regressor_feature_importance.png")
|
||
plt.close()
|
||
|
||
# 评估训练集性能
|
||
try:
|
||
if log_transform:
|
||
y_train_rain_pred_transformed = self.regressor.predict(X_train_rain)
|
||
y_train_rain_pred = np.expm1(y_train_rain_pred_transformed)
|
||
else:
|
||
y_train_rain_pred = self.regressor.predict(X_train_rain)
|
||
|
||
# 计算评估指标
|
||
train_rmse = np.sqrt(mean_squared_error(y_train_rain, y_train_rain_pred))
|
||
train_mae = mean_absolute_error(y_train_rain, y_train_rain_pred)
|
||
train_r2 = r2_score(y_train_rain, y_train_rain_pred)
|
||
|
||
print(f"回归器训练集性能: RMSE={train_rmse:.4f}, MAE={train_mae:.4f}, R²={train_r2:.4f}")
|
||
|
||
# 绘制训练集的实际值与预测值对比图
|
||
plt.figure(figsize=(10, 6))
|
||
plt.scatter(y_train_rain, y_train_rain_pred, alpha=0.5)
|
||
max_val = max(max(y_train_rain), max(y_train_rain_pred))
|
||
plt.plot([0, max_val], [0, max_val], 'r--')
|
||
plt.xlabel('Actual Rainfall (mm)')
|
||
plt.ylabel('Predicted Rainfall (mm)')
|
||
plt.title('Actual vs Predicted Rainfall - Training Set')
|
||
plt.grid(True)
|
||
plt.savefig(f"{self.output_dir}/regressor_train_performance.png")
|
||
plt.close()
|
||
except Exception as e:
|
||
print(f"评估回归器性能时出错: {str(e)}")
|
||
|
||
return self.regressor
|
||
|
||
def optimize_threshold(self, X_test, y_test):
|
||
"""优化分类阈值"""
|
||
print("\n优化分类阈值...")
|
||
|
||
# 获取预测概率
|
||
y_pred_proba = self.binary_classifier.predict_proba(X_test)[:, 1]
|
||
|
||
# 分析预测概率分布
|
||
print(f"预测概率分布: 最小值={y_pred_proba.min():.4f}, 最大值={y_pred_proba.max():.4f}, 平均值={y_pred_proba.mean():.4f}")
|
||
|
||
# 计算有雨和无雨样本的平均预测概率
|
||
rain_probs = y_pred_proba[y_test == 1]
|
||
no_rain_probs = y_pred_proba[y_test == 0]
|
||
|
||
if len(rain_probs) > 0:
|
||
print(f"有雨样本的平均预测概率: {rain_probs.mean():.4f}")
|
||
if len(no_rain_probs) > 0:
|
||
print(f"无雨样本的平均预测概率: {no_rain_probs.mean():.4f}")
|
||
|
||
# 绘制概率分布直方图
|
||
plt.figure(figsize=(10, 6))
|
||
if len(no_rain_probs) > 0:
|
||
plt.hist(no_rain_probs, bins=20, alpha=0.5, label='No Rain Samples', color='blue')
|
||
if len(rain_probs) > 0:
|
||
plt.hist(rain_probs, bins=20, alpha=0.5, label='Rain Samples', color='red')
|
||
plt.xlabel('Prediction Probability')
|
||
plt.ylabel('Number of Samples')
|
||
plt.title('Prediction Probability Distribution')
|
||
plt.legend()
|
||
plt.grid(True, alpha=0.3)
|
||
plt.savefig(f"{self.output_dir}/prediction_probability_distribution.png")
|
||
plt.close()
|
||
|
||
# 绘制ROC曲线
|
||
fpr, tpr, thresholds_roc = roc_curve(y_test, y_pred_proba)
|
||
roc_auc = auc(fpr, tpr)
|
||
|
||
plt.figure(figsize=(10, 6))
|
||
plt.plot(fpr, tpr, color='darkorange', lw=2,
|
||
label=f'ROC Curve (AUC = {roc_auc:.4f})')
|
||
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
|
||
plt.xlim([0.0, 1.0])
|
||
plt.ylim([0.0, 1.05])
|
||
plt.xlabel('False Positive Rate')
|
||
plt.ylabel('True Positive Rate')
|
||
plt.title('Receiver Operating Characteristic Curve')
|
||
plt.legend(loc="lower right")
|
||
plt.grid(True, alpha=0.3)
|
||
plt.savefig(f"{self.output_dir}/roc_curve.png")
|
||
plt.close()
|
||
|
||
# 计算不同阈值的F1分数
|
||
thresholds = np.linspace(0.05, 0.95, 19)
|
||
f1_scores = []
|
||
precision_scores = []
|
||
recall_scores = []
|
||
|
||
for threshold in thresholds:
|
||
y_pred = (y_pred_proba >= threshold).astype(int)
|
||
f1 = f1_score(y_test, y_pred)
|
||
precision = precision_score(y_test, y_pred)
|
||
recall = recall_score(y_test, y_pred)
|
||
|
||
f1_scores.append(f1)
|
||
precision_scores.append(precision)
|
||
recall_scores.append(recall)
|
||
|
||
print(f"阈值={threshold:.2f}: 精确率={precision:.4f}, 召回率={recall:.4f}, F1分数={f1:.4f}")
|
||
|
||
# 绘制不同阈值的性能曲线
|
||
plt.figure(figsize=(10, 6))
|
||
plt.plot(thresholds, precision_scores, 'b-', marker='o', label='Precision')
|
||
plt.plot(thresholds, recall_scores, 'g-', marker='s', label='Recall')
|
||
plt.plot(thresholds, f1_scores, 'r-', marker='^', label='F1 Score')
|
||
plt.xlabel('Threshold')
|
||
plt.ylabel('Score')
|
||
plt.title('Performance at Different Thresholds')
|
||
plt.legend()
|
||
plt.grid(True)
|
||
plt.savefig(f"{self.output_dir}/threshold_performance.png")
|
||
plt.close()
|
||
|
||
# 找到F1分数最高的阈值
|
||
best_idx = np.argmax(f1_scores)
|
||
self.threshold = thresholds[best_idx]
|
||
|
||
print(f"最佳阈值: {self.threshold:.4f} (F1分数: {f1_scores[best_idx]:.4f})")
|
||
|
||
return self.threshold
|
||
|
||
def evaluate_binary_classifier(self, X_test, y_test):
|
||
"""评估二分类器性能"""
|
||
print("\n评估二分类器性能...")
|
||
|
||
# 预测测试集
|
||
y_pred_proba = self.binary_classifier.predict_proba(X_test)[:, 1]
|
||
y_pred = (y_pred_proba >= self.threshold).astype(int)
|
||
|
||
# 基本分类指标
|
||
accuracy = accuracy_score(y_test, y_pred)
|
||
precision = precision_score(y_test, y_pred)
|
||
recall = recall_score(y_test, y_pred)
|
||
f1 = f1_score(y_test, y_pred)
|
||
|
||
print("二分类性能指标:")
|
||
print(f"准确率: {accuracy:.4f}")
|
||
print(f"精确率: {precision:.4f}")
|
||
print(f"召回率: {recall:.4f}")
|
||
print(f"F1分数: {f1:.4f}")
|
||
|
||
# 混淆矩阵
|
||
cm = confusion_matrix(y_test, y_pred)
|
||
plt.figure(figsize=(8, 6))
|
||
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
|
||
plt.title('Binary Classifier Confusion Matrix')
|
||
plt.ylabel('True Label')
|
||
plt.xlabel('Predicted Label')
|
||
plt.savefig(f"{self.output_dir}/binary_confusion_matrix.png")
|
||
plt.close()
|
||
|
||
# 预测统计
|
||
true_rain_count = np.sum(y_test == 1)
|
||
pred_rain_count = np.sum(y_pred == 1)
|
||
correct_rain_count = np.sum((y_test == 1) & (y_pred == 1))
|
||
|
||
print("\n预测统计:")
|
||
print(f"总样本数: {len(y_test)}")
|
||
print(f"实际有雨样本: {true_rain_count} ({true_rain_count/len(y_test)*100:.2f}%)")
|
||
print(f"预测有雨样本: {pred_rain_count} ({pred_rain_count/len(y_test)*100:.2f}%)")
|
||
|
||
if true_rain_count > 0:
|
||
print(f"正确预测有雨: {correct_rain_count} ({correct_rain_count/true_rain_count*100:.2f}% 实际有雨样本)")
|
||
|
||
return {
|
||
'accuracy': accuracy,
|
||
'precision': precision,
|
||
'recall': recall,
|
||
'f1': f1,
|
||
'confusion_matrix': cm,
|
||
'predictions': y_pred,
|
||
'probabilities': y_pred_proba
|
||
}
|
||
|
||
def evaluate_category_classifier(self, X_test, y_category_test, mask_true, mask_pred):
|
||
"""评估降雨类别分类器性能"""
|
||
if self.rain_category_classifier is None:
|
||
print("类别分类器未训练,无法评估")
|
||
return None
|
||
|
||
# 只评估正确识别为降雨的样本
|
||
mask_both = mask_true & mask_pred
|
||
|
||
if sum(mask_both) == 0:
|
||
print("没有正确分类的降雨样本,无法评估类别分类器")
|
||
return None
|
||
|
||
X_test_rain = X_test[mask_both]
|
||
y_category_test_rain = y_category_test[mask_both]
|
||
|
||
# 进行预测
|
||
y_category_pred = self.rain_category_classifier.predict(X_test_rain)
|
||
|
||
# 计算准确率
|
||
accuracy = accuracy_score(y_category_test_rain, y_category_pred)
|
||
|
||
print("\n降雨类别分类器评估:")
|
||
print(f"准确率: {accuracy:.4f}")
|
||
|
||
# 混淆矩阵
|
||
try:
|
||
categories = sorted(np.unique(np.concatenate([y_category_test_rain, y_category_pred])))
|
||
cm = confusion_matrix(y_category_test_rain, y_category_pred, labels=categories)
|
||
|
||
# 打印混淆矩阵
|
||
category_names = ["小雨", "中雨", "大雨", "暴雨"]
|
||
print("\n混淆矩阵:")
|
||
print("预测 →")
|
||
print("实际 ↓")
|
||
|
||
# 创建表头
|
||
header = " " + " ".join(f"{category_names[cat-1]:8s}" for cat in categories if cat > 0 and cat-1 < len(category_names))
|
||
print(header)
|
||
print("-" * len(header))
|
||
|
||
for i, cat in enumerate(categories):
|
||
if cat > 0 and cat-1 < len(category_names):
|
||
row = f"{category_names[cat-1]:6s} "
|
||
row += " ".join(f"{cm[i, j]:8d}" for j in range(len(categories)))
|
||
print(row)
|
||
|
||
# 绘制混淆矩阵热图
|
||
plt.figure(figsize=(10, 8))
|
||
category_labels = [category_names[cat-1] if cat > 0 and cat-1 < len(category_names) else f"类别{cat}"
|
||
for cat in categories]
|
||
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
|
||
xticklabels=category_labels,
|
||
yticklabels=category_labels)
|
||
plt.title('Rain Category Classifier Confusion Matrix')
|
||
plt.ylabel('True Category')
|
||
plt.xlabel('Predicted Category')
|
||
plt.tight_layout()
|
||
plt.savefig(f"{self.output_dir}/category_confusion_matrix.png")
|
||
plt.close()
|
||
except Exception as e:
|
||
print(f"绘制类别混淆矩阵时出错: {str(e)}")
|
||
|
||
return {
|
||
'accuracy': accuracy,
|
||
'confusion_matrix': cm if 'cm' in locals() else None,
|
||
'predictions': y_category_pred
|
||
}
|
||
|
||
def evaluate_stratified_regressors(self, X_test, y_test, y_category_test, mask_true, mask_pred):
|
||
"""评估分层回归模型的性能"""
|
||
if not self.regressors:
|
||
print("分层回归器未训练,无法评估")
|
||
return None
|
||
|
||
print("\n评估分层回归器性能...")
|
||
|
||
# 只评估正确识别为降雨的样本
|
||
mask_both = mask_true & mask_pred
|
||
|
||
if sum(mask_both) == 0:
|
||
print("没有正确分类的降雨样本,无法评估回归器")
|
||
return None
|
||
|
||
X_test_rain = X_test[mask_both]
|
||
y_test_rain = y_test[mask_both]
|
||
y_category_test_rain = y_category_test[mask_both]
|
||
|
||
# 准备存储预测结果
|
||
y_pred_rain = np.zeros_like(y_test_rain)
|
||
|
||
# 对于每个降雨样本,根据实际类别使用适当的回归器
|
||
try:
|
||
for i, (x, category) in enumerate(zip(X_test_rain, y_category_test_rain)):
|
||
reg_name = "light" if category == 1 else "medium" if category == 2 else "heavy" if category == 3 else "extreme"
|
||
|
||
# 检查是否有该类别的回归器
|
||
if reg_name in self.regressors:
|
||
regressor = self.regressors[reg_name]
|
||
|
||
# 进行预测
|
||
x_reshaped = x.reshape(1, -1)
|
||
if self.log_transform:
|
||
pred_log = regressor.predict(x_reshaped)[0]
|
||
pred = np.expm1(pred_log)
|
||
else:
|
||
pred = regressor.predict(x_reshaped)[0]
|
||
|
||
# 确保预测值非负
|
||
pred = max(0, pred)
|
||
|
||
# 对大雨和暴雨应用安全系数
|
||
if category == 3: # 大雨
|
||
pred = pred * self.heavy_rain_factor
|
||
elif category == 4: # 暴雨
|
||
pred = pred * self.extreme_rain_factor
|
||
|
||
y_pred_rain[i] = pred
|
||
else:
|
||
# 该类别没有模型,使用默认模型(如果有)
|
||
if self.regressor is not None:
|
||
x_reshaped = x.reshape(1, -1)
|
||
if self.log_transform:
|
||
pred_log = self.regressor.predict(x_reshaped)[0]
|
||
pred = np.expm1(pred_log)
|
||
else:
|
||
pred = self.regressor.predict(x_reshaped)[0]
|
||
|
||
# 如果是大雨或暴雨类别但没有专门的回归器,仍应用安全系数
|
||
if category == 3: # 大雨
|
||
pred = pred * self.heavy_rain_factor
|
||
elif category == 4: # 暴雨
|
||
pred = pred * self.extreme_rain_factor
|
||
|
||
y_pred_rain[i] = max(0, pred)
|
||
except Exception as e:
|
||
print(f"分层回归预测时出错: {str(e)}")
|
||
print("尝试使用简化的预测方法...")
|
||
|
||
# 使用单一回归器进行所有预测
|
||
if self.regressor is not None:
|
||
try:
|
||
if self.log_transform:
|
||
y_pred_transformed = self.regressor.predict(X_test_rain)
|
||
y_pred_rain = np.expm1(y_pred_transformed)
|
||
else:
|
||
y_pred_rain = self.regressor.predict(X_test_rain)
|
||
y_pred_rain = np.maximum(y_pred_rain, 0)
|
||
except Exception as e2:
|
||
print(f"单一回归器预测也失败: {str(e2)}")
|
||
return None
|
||
else:
|
||
return None
|
||
|
||
# 计算总体指标
|
||
try:
|
||
rmse = np.sqrt(mean_squared_error(y_test_rain, y_pred_rain))
|
||
mae = mean_absolute_error(y_test_rain, y_pred_rain)
|
||
r2 = r2_score(y_test_rain, y_pred_rain)
|
||
|
||
print("\n分层回归器总体性能:")
|
||
print(f"样本数: {len(y_test_rain)}")
|
||
print(f"RMSE: {rmse:.4f}")
|
||
print(f"MAE: {mae:.4f}")
|
||
print(f"R²: {r2:.4f}")
|
||
except Exception as e:
|
||
print(f"计算分层回归器总体性能时出错: {str(e)}")
|
||
rmse, mae, r2 = float('nan'), float('nan'), float('nan')
|
||
|
||
# 计算各类别的指标
|
||
try:
|
||
for category in sorted(np.unique(y_category_test_rain)):
|
||
cat_mask = (y_category_test_rain == category)
|
||
if sum(cat_mask) < 5:
|
||
continue # 跳过样本过少的类别
|
||
|
||
cat_name = "小雨" if category == 1 else "中雨" if category == 2 else "大雨" if category == 3 else "暴雨"
|
||
|
||
cat_rmse = np.sqrt(mean_squared_error(y_test_rain[cat_mask], y_pred_rain[cat_mask]))
|
||
cat_mae = mean_absolute_error(y_test_rain[cat_mask], y_pred_rain[cat_mask])
|
||
cat_r2 = r2_score(y_test_rain[cat_mask], y_pred_rain[cat_mask])
|
||
|
||
print(f"\n{cat_name}性能:")
|
||
print(f"样本数: {sum(cat_mask)}")
|
||
print(f"RMSE: {cat_rmse:.4f}")
|
||
print(f"MAE: {cat_mae:.4f}")
|
||
print(f"R²: {cat_r2:.4f}")
|
||
|
||
# 打印一些示例预测
|
||
if sum(cat_mask) >= 3:
|
||
sample_indices = np.where(cat_mask)[0][:3] # 最多3个示例
|
||
print(f"\n{cat_name}示例预测:")
|
||
for idx in sample_indices:
|
||
# 使用索引从数组或Series获取值
|
||
if isinstance(y_test_rain, pd.Series):
|
||
actual = y_test_rain.iloc[idx]
|
||
else:
|
||
actual = y_test_rain[idx]
|
||
|
||
pred = y_pred_rain[idx]
|
||
error_pct = ((pred-actual)/actual*100) if actual > 0 else 0
|
||
print(f" 实际: {actual:.2f}mm, 预测: {pred:.2f}mm, 误差: {pred-actual:.2f}mm ({error_pct:.1f}%)")
|
||
except Exception as e:
|
||
print(f"计算分层回归器分类性能时出错: {str(e)}")
|
||
|
||
# 绘制实际值与预测值
|
||
try:
|
||
plt.figure(figsize=(10, 6))
|
||
|
||
# 按类别着色点
|
||
colors = ['blue', 'green', 'red', 'purple']
|
||
markers = ['o', 's', '^', 'd']
|
||
categories = sorted(np.unique(y_category_test_rain))
|
||
|
||
for i, category in enumerate(categories):
|
||
if category > 0 and i < len(colors):
|
||
cat_mask = (y_category_test_rain == category)
|
||
cat_name = "Light" if category == 1 else "Medium" if category == 2 else "Heavy" if category == 3 else "Extreme"
|
||
plt.scatter(y_test_rain[cat_mask], y_pred_rain[cat_mask],
|
||
color=colors[i], marker=markers[i], alpha=0.7, label=f"{cat_name} Rain")
|
||
|
||
# 添加对角线
|
||
max_val = max(max(y_test_rain), max(y_pred_rain))
|
||
plt.plot([0, max_val], [0, max_val], 'k--')
|
||
|
||
plt.xlabel('Actual Rainfall (mm)')
|
||
plt.ylabel('Predicted Rainfall (mm)')
|
||
plt.title('Actual vs Predicted Values (Stratified Regressors)')
|
||
plt.legend()
|
||
plt.grid(True)
|
||
plt.tight_layout()
|
||
plt.savefig(f"{self.output_dir}/stratified_regressors_performance.png")
|
||
plt.close()
|
||
except Exception as e:
|
||
print(f"绘制分层回归性能图时出错: {str(e)}")
|
||
|
||
return {
|
||
'rmse': rmse,
|
||
'mae': mae,
|
||
'r2': r2,
|
||
'predictions': y_pred_rain
|
||
}
|
||
|
||
def evaluate_regressor(self, X_test, y_test, mask_true, mask_pred):
|
||
"""评估回归器性能"""
|
||
if self.regressor is None:
|
||
print("回归器未训练,无法评估")
|
||
return None
|
||
|
||
print("\n评估单一回归器性能...")
|
||
|
||
# 所有实际降雨样本的性能
|
||
X_test_true_rain = X_test[mask_true]
|
||
y_test_true_rain = y_test[mask_true]
|
||
|
||
if len(X_test_true_rain) > 0:
|
||
try:
|
||
# 降雨预测
|
||
if hasattr(self, 'log_transform') and self.log_transform:
|
||
y_pred_transformed = self.regressor.predict(X_test_true_rain)
|
||
y_pred_true_rain = np.expm1(y_pred_transformed)
|
||
else:
|
||
y_pred_true_rain = self.regressor.predict(X_test_true_rain)
|
||
|
||
# 防止负值
|
||
y_pred_true_rain = np.maximum(y_pred_true_rain, 0)
|
||
|
||
# 计算评估指标
|
||
rmse = np.sqrt(mean_squared_error(y_test_true_rain, y_pred_true_rain))
|
||
mae = mean_absolute_error(y_test_true_rain, y_pred_true_rain)
|
||
r2 = r2_score(y_test_true_rain, y_pred_true_rain)
|
||
|
||
print("\n所有实际降雨样本的性能:")
|
||
print(f"样本数: {len(X_test_true_rain)}")
|
||
print(f"RMSE: {rmse:.4f}")
|
||
print(f"MAE: {mae:.4f}")
|
||
print(f"R²: {r2:.4f}")
|
||
|
||
# 绘制实际值与预测值散点图
|
||
plt.figure(figsize=(10, 6))
|
||
plt.scatter(y_test_true_rain, y_pred_true_rain, alpha=0.5)
|
||
|
||
# 添加对角线
|
||
max_val = max(max(y_test_true_rain), max(y_pred_true_rain))
|
||
plt.plot([0, max_val], [0, max_val], 'r--')
|
||
|
||
plt.xlabel('Actual Rainfall (mm)')
|
||
plt.ylabel('Predicted Rainfall (mm)')
|
||
plt.title('Actual vs Predicted Values (All Actual Rain Samples)')
|
||
plt.grid(True)
|
||
plt.tight_layout()
|
||
plt.savefig(f"{self.output_dir}/actual_vs_predicted_all_rain.png")
|
||
plt.close()
|
||
except Exception as e:
|
||
print(f"评估所有实际降雨样本时出错: {str(e)}")
|
||
|
||
# 评估分类器和回归器都正确预测的样本
|
||
mask_both_correct = mask_true & mask_pred
|
||
X_test_both = X_test[mask_both_correct]
|
||
y_test_both = y_test[mask_both_correct]
|
||
|
||
if len(X_test_both) > 0:
|
||
try:
|
||
# 降雨预测
|
||
if hasattr(self, 'log_transform') and self.log_transform:
|
||
y_pred_transformed = self.regressor.predict(X_test_both)
|
||
y_pred_both = np.expm1(y_pred_transformed)
|
||
else:
|
||
y_pred_both = self.regressor.predict(X_test_both)
|
||
|
||
# 防止负值
|
||
y_pred_both = np.maximum(y_pred_both, 0)
|
||
|
||
# 计算评估指标
|
||
rmse = np.sqrt(mean_squared_error(y_test_both, y_pred_both))
|
||
mae = mean_absolute_error(y_test_both, y_pred_both)
|
||
r2 = r2_score(y_test_both, y_pred_both)
|
||
|
||
print("\n正确分类的降雨样本性能:")
|
||
print(f"样本数: {len(X_test_both)}")
|
||
print(f"RMSE: {rmse:.4f}")
|
||
print(f"MAE: {mae:.4f}")
|
||
print(f"R²: {r2:.4f}")
|
||
|
||
# 绘制实际值与预测值散点图
|
||
plt.figure(figsize=(10, 6))
|
||
plt.scatter(y_test_both, y_pred_both, alpha=0.5)
|
||
|
||
# 添加对角线
|
||
max_val = max(max(y_test_both), max(y_pred_both))
|
||
plt.plot([0, max_val], [0, max_val], 'r--')
|
||
|
||
plt.xlabel('Actual Rainfall (mm)')
|
||
plt.ylabel('Predicted Rainfall (mm)')
|
||
plt.title('Actual vs Predicted Values (Correctly Classified Samples)')
|
||
plt.grid(True)
|
||
plt.tight_layout()
|
||
plt.savefig(f"{self.output_dir}/actual_vs_predicted_correct_classified.png")
|
||
plt.close()
|
||
|
||
return {
|
||
'rmse': rmse,
|
||
'mae': mae,
|
||
'r2': r2,
|
||
'predictions': y_pred_both
|
||
}
|
||
except Exception as e:
|
||
print(f"评估正确分类的降雨样本时出错: {str(e)}")
|
||
return None
|
||
else:
|
||
print("没有正确分类的降雨样本,无法评估回归性能")
|
||
return None
|
||
|
||
def train_and_evaluate(self, data, n_features=30, time_based_split=True):
|
||
"""完整的训练和评估过程"""
|
||
try:
|
||
# 步骤1: 数据预处理
|
||
processed_data = self.preprocess_data(data)
|
||
|
||
# 步骤2: 特征选择
|
||
X, y_binary, y_amount, y_category = self.select_features(processed_data, n_features=n_features)
|
||
|
||
# 步骤3: 数据分割
|
||
X_train, X_test, y_binary_train, y_binary_test, y_amount_train, y_amount_test, y_category_train, y_category_test = \
|
||
self.split_data(X, y_binary, y_amount, y_category, time_based=time_based_split)
|
||
|
||
# 步骤4: 特征标准化
|
||
X_train_scaled, X_test_scaled = self.scale_features(X_train, X_test)
|
||
|
||
# 步骤5: 训练二分类器(是否下雨)
|
||
# 使用测试集的前一半进行概率校准
|
||
val_size = len(X_test_scaled) // 2
|
||
X_val, y_val = X_test_scaled[:val_size], y_binary_test[:val_size]
|
||
X_test_final, y_test_final = X_test_scaled[val_size:], y_binary_test[val_size:]
|
||
y_amount_test_final = y_amount_test[val_size:]
|
||
y_category_test_final = y_category_test[val_size:] if y_category_test is not None else None
|
||
|
||
self.train_binary_classifier(X_train_scaled, y_binary_train, X_val, y_val)
|
||
|
||
# 步骤6: 优化分类阈值
|
||
self.optimize_threshold(X_test_final, y_test_final)
|
||
|
||
# 步骤7: 评估二分类器性能
|
||
clf_results = self.evaluate_binary_classifier(X_test_final, y_test_final)
|
||
|
||
# 步骤8: 训练降雨类别分类器
|
||
rain_mask_train = y_binary_train == 1
|
||
if self.stratified_regression and y_category_train is not None:
|
||
self.train_category_classifier(X_train_scaled, y_category_train, rain_mask_train)
|
||
|
||
# 步骤9: 训练针对不同降雨强度的分层回归器
|
||
if self.stratified_regression and y_category_train is not None:
|
||
self.train_stratified_regressors(X_train_scaled, y_amount_train, y_category_train, rain_mask_train)
|
||
|
||
# 步骤10: 同时训练单一回归器作为备选
|
||
self.train_regressor(X_train_scaled, y_amount_train, rain_mask_train)
|
||
|
||
# 步骤11: 评估回归性能
|
||
mask_true = y_test_final == 1 # 实际有雨
|
||
mask_pred = clf_results['predictions'] == 1 # 预测有雨
|
||
|
||
# 如果有分层回归器,评估其性能
|
||
strat_reg_results = None
|
||
if self.stratified_regression and y_category_test_final is not None and self.regressors:
|
||
strat_reg_results = self.evaluate_stratified_regressors(
|
||
X_test_final, y_amount_test_final, y_category_test_final, mask_true, mask_pred)
|
||
|
||
# 同时评估类别分类器
|
||
if self.rain_category_classifier is not None:
|
||
self.evaluate_category_classifier(X_test_final, y_category_test_final, mask_true, mask_pred)
|
||
|
||
# 评估单一回归器
|
||
reg_results = self.evaluate_regressor(X_test_final, y_amount_test_final, mask_true, mask_pred)
|
||
|
||
# 步骤12: 综合评估
|
||
print("\n两阶段降雨预测系统评估总结:")
|
||
print(f"二分类器 ({self.clf_model_type}) F1分数: {clf_results['f1']:.4f}")
|
||
|
||
if strat_reg_results:
|
||
print(f"分层回归器 RMSE: {strat_reg_results['rmse']:.4f}, R²: {strat_reg_results['r2']:.4f}")
|
||
|
||
if reg_results:
|
||
print(f"单一回归器 ({self.reg_model_type}) RMSE: {reg_results['rmse']:.4f}, R²: {reg_results['r2']:.4f}")
|
||
|
||
return {
|
||
'classifier_results': clf_results,
|
||
'regressor_results': reg_results,
|
||
'stratified_regressor_results': strat_reg_results
|
||
}
|
||
except Exception as e:
|
||
print(f"训练和评估过程中出错: {str(e)}")
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
def predict(self, new_data, preprocess=True):
|
||
"""
|
||
使用训练好的模型进行预测
|
||
|
||
参数:
|
||
new_data: DataFrame或numpy数组,输入数据
|
||
preprocess: bool,是否预处理输入数据
|
||
|
||
返回:
|
||
dict: 包含降雨概率、二值分类结果和降雨量预测的字典
|
||
"""
|
||
if self.binary_classifier is None:
|
||
raise ValueError("模型未训练,请先调用train_and_evaluate方法")
|
||
|
||
# 如果需要预处理输入数据
|
||
if preprocess:
|
||
# 执行与训练期间相同的预处理
|
||
processed_data = self.preprocess_data(new_data, silent=True)
|
||
|
||
# 使用特征选择,但不更新feature_names
|
||
X, _, _, _ = self.select_features(processed_data, silent=True)
|
||
|
||
# 确保只使用训练过的特征(解决维度不匹配问题)
|
||
if isinstance(X, pd.DataFrame) and self.feature_names is not None:
|
||
# 检查是否所有必需的特征都存在
|
||
missing_features = [f for f in self.feature_names if f not in X.columns]
|
||
if missing_features:
|
||
print(f"警告: 预处理数据中缺少以下特征: {missing_features}")
|
||
# 为缺失特征填充0
|
||
for f in missing_features:
|
||
X[f] = 0
|
||
|
||
# 只选择训练中使用的特征
|
||
X = X[self.feature_names]
|
||
else:
|
||
# 直接使用输入数据,假设已经预处理
|
||
if isinstance(new_data, pd.DataFrame):
|
||
# 检查是否所有必需的特征都存在
|
||
if self.feature_names is not None:
|
||
missing_features = [f for f in self.feature_names if f not in new_data.columns]
|
||
if missing_features:
|
||
print(f"警告: 输入数据中缺少以下特征: {missing_features}")
|
||
# 为缺失特征填充0
|
||
for f in missing_features:
|
||
new_data[f] = 0
|
||
|
||
# 只选择训练中使用的特征
|
||
X = new_data[self.feature_names]
|
||
else:
|
||
X = new_data
|
||
else:
|
||
# 假设输入是按顺序排列的特征数组
|
||
X = new_data
|
||
|
||
# 应用特征标准化
|
||
if isinstance(X, pd.DataFrame):
|
||
X_np = X.values
|
||
else:
|
||
X_np = X
|
||
|
||
if self.scaler is None:
|
||
raise ValueError("缩放器未初始化,请先训练模型")
|
||
|
||
X_scaled = self.scaler.transform(X_np)
|
||
|
||
# 第一阶段: 二分类预测是否会下雨
|
||
try:
|
||
rain_probabilities = self.binary_classifier.predict_proba(X_scaled)[:, 1]
|
||
rain_binary = (rain_probabilities >= self.threshold).astype(int)
|
||
except Exception as e:
|
||
print(f"二分类预测时出错: {str(e)}")
|
||
# 使用一个安全的默认值
|
||
rain_probabilities = np.zeros(len(X))
|
||
rain_binary = np.zeros(len(X), dtype=int)
|
||
|
||
# 第二阶段: 对预测有雨的样本,预测降雨量
|
||
rainfall_amount = np.zeros(len(X))
|
||
rain_categories = np.zeros(len(X), dtype=int)
|
||
rain_indices = np.where(rain_binary == 1)[0]
|
||
|
||
if len(rain_indices) > 0:
|
||
# 如果有分层回归,先预测降雨类别
|
||
if self.stratified_regression and self.regressors and self.rain_category_classifier is not None:
|
||
try:
|
||
# 获取降雨样本
|
||
X_rain_scaled = X_scaled[rain_indices]
|
||
|
||
# 预测降雨类别
|
||
predicted_categories = self.rain_category_classifier.predict(X_rain_scaled)
|
||
|
||
# 填充类别预测结果
|
||
for i, idx in enumerate(rain_indices):
|
||
rain_categories[idx] = predicted_categories[i]
|
||
|
||
# 对每个雨样本,根据预测类别使用适当的回归器
|
||
for i, (idx, category, prob) in enumerate(zip(rain_indices, predicted_categories, rain_probabilities[rain_indices])):
|
||
reg_name = "light" if category == 1 else "medium" if category == 2 else "heavy" if category == 3 else "extreme"
|
||
|
||
# 检查是否有该类别的回归器
|
||
if reg_name in self.regressors:
|
||
regressor = self.regressors[reg_name]
|
||
else:
|
||
# 回退到单一回归器
|
||
regressor = self.regressor
|
||
|
||
if regressor is not None:
|
||
# 进行预测
|
||
x = X_scaled[idx].reshape(1, -1)
|
||
try:
|
||
if self.log_transform:
|
||
pred_log = regressor.predict(x)[0]
|
||
pred = np.expm1(pred_log)
|
||
else:
|
||
pred = regressor.predict(x)[0]
|
||
|
||
# 确保预测值非负
|
||
pred = max(0, pred)
|
||
|
||
# 对大雨和暴雨类别应用安全系数
|
||
if category == 3: # 大雨
|
||
pred = pred * self.heavy_rain_factor
|
||
elif category == 4: # 暴雨
|
||
pred = pred * self.extreme_rain_factor
|
||
|
||
# 对于高概率样本,进一步增加预测量
|
||
if prob > 0.7 and category >= 3: # 高概率大雨或暴雨
|
||
pred = pred * 1.5 # 额外的高概率因子
|
||
|
||
rainfall_amount[idx] = pred
|
||
except Exception as e:
|
||
print(f"预测降雨量时出错: {str(e)}")
|
||
except Exception as e:
|
||
print(f"分层回归预测时出错: {str(e)}")
|
||
# 如果分层回归失败,尝试使用单一回归器
|
||
if self.regressor is not None:
|
||
try:
|
||
X_rain = X_scaled[rain_indices]
|
||
if self.log_transform:
|
||
rain_predictions_transformed = self.regressor.predict(X_rain)
|
||
rain_predictions = np.expm1(rain_predictions_transformed)
|
||
else:
|
||
rain_predictions = self.regressor.predict(X_rain)
|
||
rainfall_amount[rain_indices] = np.maximum(rain_predictions, 0)
|
||
except Exception as e2:
|
||
print(f"单一回归器预测也失败: {str(e2)}")
|
||
|
||
# 如果没有分层回归或失败,使用单一回归器
|
||
elif self.regressor is not None:
|
||
try:
|
||
X_rain = X_scaled[rain_indices]
|
||
|
||
# 预测降雨量
|
||
if hasattr(self, 'log_transform') and self.log_transform:
|
||
rain_predictions_transformed = self.regressor.predict(X_rain)
|
||
rain_predictions = np.expm1(rain_predictions_transformed)
|
||
else:
|
||
rain_predictions = self.regressor.predict(X_rain)
|
||
|
||
# 确保降雨量非负
|
||
rain_predictions = np.maximum(rain_predictions, 0)
|
||
|
||
# 填充预测结果
|
||
rainfall_amount[rain_indices] = rain_predictions
|
||
except Exception as e:
|
||
print(f"单一回归器预测时出错: {str(e)}")
|
||
|
||
return {
|
||
'rain_probability': rain_probabilities, # 降雨概率
|
||
'rain_binary': rain_binary, # 是否会下雨(二值分类结果)
|
||
'rain_category': rain_categories, # 降雨类别(1:小雨, 2:中雨, 3:大雨, 4:暴雨)
|
||
'rainfall_amount': rainfall_amount # 预测的降雨量(毫米)
|
||
}
|
||
|
||
def save_model(self, filename='rainfall_two_stage_model.joblib'):
|
||
"""保存训练好的模型"""
|
||
if self.binary_classifier is None:
|
||
raise ValueError("模型未训练,无法保存")
|
||
|
||
model_data = {
|
||
'binary_classifier': self.binary_classifier,
|
||
'regressor': self.regressor,
|
||
'regressors': self.regressors,
|
||
'rain_category_classifier': self.rain_category_classifier,
|
||
'scaler': self.scaler,
|
||
'threshold': self.threshold,
|
||
'feature_names': self.feature_names,
|
||
'feature_importances_clf': self.feature_importances_clf,
|
||
'feature_importances_reg': self.feature_importances_reg,
|
||
'clf_model_type': self.clf_model_type,
|
||
'reg_model_type': self.reg_model_type,
|
||
'log_transform': getattr(self, 'log_transform', False),
|
||
'feature_stats': self.feature_stats,
|
||
'stratified_regression': self.stratified_regression,
|
||
'rain_thresholds': self.rain_thresholds,
|
||
'heavy_rain_factor': self.heavy_rain_factor,
|
||
'extreme_rain_factor': self.extreme_rain_factor
|
||
}
|
||
|
||
joblib.dump(model_data, filename)
|
||
print(f"模型已保存到 {filename}")
|
||
|
||
@classmethod
|
||
def load_model(cls, filename='rainfall_two_stage_model.joblib'):
|
||
"""加载保存的模型"""
|
||
model_data = joblib.load(filename)
|
||
|
||
# 创建新实例
|
||
instance = cls(
|
||
debug=False,
|
||
clf_model_type=model_data.get('clf_model_type', 'ensemble'),
|
||
reg_model_type=model_data.get('reg_model_type', 'gbdt')
|
||
)
|
||
|
||
# 加载模型组件
|
||
instance.binary_classifier = model_data['binary_classifier']
|
||
instance.regressor = model_data.get('regressor')
|
||
instance.regressors = model_data.get('regressors', {})
|
||
instance.rain_category_classifier = model_data.get('rain_category_classifier')
|
||
instance.scaler = model_data['scaler']
|
||
instance.threshold = model_data['threshold']
|
||
instance.feature_names = model_data['feature_names']
|
||
instance.feature_importances_clf = model_data.get('feature_importances_clf')
|
||
instance.feature_importances_reg = model_data.get('feature_importances_reg')
|
||
|
||
# 加载其他设置
|
||
if 'log_transform' in model_data:
|
||
instance.log_transform = model_data['log_transform']
|
||
|
||
# 加载特征统计信息
|
||
if 'feature_stats' in model_data:
|
||
instance.feature_stats = model_data['feature_stats']
|
||
|
||
# 加载分层回归设置
|
||
if 'stratified_regression' in model_data:
|
||
instance.stratified_regression = model_data['stratified_regression']
|
||
|
||
if 'rain_thresholds' in model_data:
|
||
instance.rain_thresholds = model_data['rain_thresholds']
|
||
|
||
if 'heavy_rain_factor' in model_data:
|
||
instance.heavy_rain_factor = model_data['heavy_rain_factor']
|
||
|
||
if 'extreme_rain_factor' in model_data:
|
||
instance.extreme_rain_factor = model_data['extreme_rain_factor']
|
||
|
||
return instance
|
||
|
||
|
||
class FutureRainfallForecastSystem:
|
||
"""
|
||
未来多时间点降雨预报系统
|
||
基于RainfallTwoStageModel,预测未来1小时、2小时、3小时的降雨情况
|
||
"""
|
||
def __init__(self, model_path=None, debug=False):
|
||
# 存储不同时间点的预测模型
|
||
self.forecast_models = {}
|
||
self.time_points = [1, 2, 3] # 未来1小时、2小时、3小时
|
||
self.debug = debug
|
||
|
||
# 如果提供了模型路径,尝试加载模型
|
||
if model_path:
|
||
self.load_models(model_path)
|
||
|
||
def train_forecast_models(self, data, features_config=None):
|
||
"""
|
||
为每个时间点训练一个单独的预测模型
|
||
|
||
参数:
|
||
data: DataFrame 训练数据,应包含时间特征和未来时间点的降雨数据
|
||
features_config: dict 特征配置(可选)
|
||
"""
|
||
print("开始训练未来降雨预报模型...")
|
||
|
||
# 检查数据格式
|
||
if not isinstance(data, pd.DataFrame):
|
||
try:
|
||
data = pd.DataFrame(data)
|
||
except:
|
||
raise TypeError("数据必须是可转换为DataFrame的格式")
|
||
|
||
# 打印调试信息
|
||
print(f"原始数据形状: {data.shape}")
|
||
print(f"原始数据列: {data.columns.tolist()}")
|
||
print(f"原始数据rain列类型: {data['rain'].dtype}")
|
||
|
||
# 确保datetime列是datetime格式
|
||
if 'datetime' in data.columns:
|
||
data['datetime'] = pd.to_datetime(data['datetime'])
|
||
|
||
# 检查数据中是否有降雨信息
|
||
if 'rain' not in data.columns:
|
||
raise ValueError("数据必须包含降雨量列 (rain)")
|
||
|
||
# 打印数据概要
|
||
print(f"\n数据概要:")
|
||
print(f"行数: {len(data)}")
|
||
print(f"列数: {len(data.columns)}")
|
||
print(f"降雨量列: rain")
|
||
print(f"有雨样本数: {sum(data['rain'] > 0)} ({sum(data['rain'] > 0)/len(data)*100:.1f}%)")
|
||
|
||
# 默认特征配置
|
||
if features_config is None:
|
||
features_config = {
|
||
'n_features': 30, # 每个模型使用的特征数
|
||
'clf_model_type': 'gbdt', # 分类器类型
|
||
'reg_model_type': 'gbdt', # 回归器类型
|
||
'time_based_split': True # 使用时间序列分割
|
||
}
|
||
|
||
# 为每个未来时间点准备数据并训练模型
|
||
for time_point in self.time_points:
|
||
print(f"\n准备未来{time_point}小时降雨预报模型...")
|
||
|
||
try:
|
||
# 准备数据:移动目标变量以表示未来降雨
|
||
# 例如,如果time_point=1,则当前行的特征对应下一行的降雨情况
|
||
future_data = self._prepare_future_data(data, time_point)
|
||
|
||
if len(future_data) < 100:
|
||
print(f"警告: 未来{time_point}小时的有效数据样本过少 ({len(future_data)})")
|
||
continue
|
||
|
||
# 创建针对此时间点的模型
|
||
model = RainfallTwoStageModel(
|
||
debug=self.debug,
|
||
clf_model_type=features_config['clf_model_type'],
|
||
reg_model_type=features_config['reg_model_type'],
|
||
output_dir=f'rainfall_forecast_{time_point}h_figures'
|
||
)
|
||
|
||
# 训练模型
|
||
results = model.train_and_evaluate(
|
||
future_data,
|
||
n_features=features_config['n_features'],
|
||
time_based_split=features_config['time_based_split']
|
||
)
|
||
|
||
if results is None:
|
||
print(f"未来{time_point}小时模型训练失败,跳过...")
|
||
continue
|
||
|
||
# 保存训练好的模型
|
||
model.save_model(f'rainfall_forecast_{time_point}h_model.joblib')
|
||
|
||
# 将模型添加到字典中
|
||
self.forecast_models[time_point] = model
|
||
|
||
print(f"未来{time_point}小时降雨预报模型训练完成!")
|
||
except Exception as e:
|
||
print(f"训练未来{time_point}小时模型时出错: {str(e)}")
|
||
traceback.print_exc()
|
||
|
||
if self.forecast_models:
|
||
print("\n所有时间点的降雨预报模型训练完成!")
|
||
else:
|
||
print("\n警告: 所有模型训练都失败了!")
|
||
|
||
def _prepare_future_data(self, data, time_point):
|
||
"""
|
||
准备用于未来时间点预测的数据
|
||
将目标变量滞后处理,使当前特征与未来降雨对齐
|
||
"""
|
||
df = data.copy()
|
||
|
||
# 确保索引是连续的
|
||
df = df.reset_index(drop=True)
|
||
|
||
# 确保时间列是正确的格式
|
||
if 'datetime' in df.columns and not pd.api.types.is_datetime64_any_dtype(df['datetime']):
|
||
try:
|
||
df['datetime'] = pd.to_datetime(df['datetime'])
|
||
except:
|
||
print(f"警告:无法转换时间列 datetime")
|
||
|
||
# 创建未来降雨量和降雨二值列
|
||
# 采用移位操作:当前行的特征对应未来行的降雨
|
||
df[f'future_{time_point}h_rain'] = df['rain'].shift(-time_point)
|
||
df[f'future_{time_point}h_rain_binary'] = (df[f'future_{time_point}h_rain'] > 0).astype(int)
|
||
|
||
# 删除含有NaN的行(通常是最后几行,因为它们没有对应的未来降雨数据)
|
||
df = df.dropna(subset=[f'future_{time_point}h_rain'])
|
||
|
||
# 修改后 - 如果已存在rain列,先删除原来的rain列
|
||
if 'rain' in df.columns:
|
||
df = df.drop('rain', axis=1) # 删除原始rain列
|
||
if 'rain_binary' in df.columns:
|
||
df = df.drop('rain_binary', axis=1) # 删除原始rain_binary列(如果有)
|
||
|
||
# 将未来降雨列重命名为标准名称,以便模型处理
|
||
df = df.rename(columns={
|
||
f'future_{time_point}h_rain': 'rain',
|
||
f'future_{time_point}h_rain_binary': 'rain_binary'
|
||
})
|
||
|
||
return df
|
||
|
||
def forecast(self, current_data):
|
||
"""
|
||
预测未来多个时间点的降雨情况
|
||
|
||
参数:
|
||
current_data: DataFrame 当前气象数据
|
||
|
||
返回:
|
||
dict: 包含各时间点降雨预测结果的字典
|
||
"""
|
||
if not self.forecast_models:
|
||
raise ValueError("模型未训练,请先调用train_forecast_models方法")
|
||
|
||
forecast_results = {}
|
||
|
||
# 对每个时间点进行预测
|
||
for time_point, model in self.forecast_models.items():
|
||
print(f"\n预测未来{time_point}小时降雨情况...")
|
||
|
||
# 使用当前模型预测未来降雨
|
||
try:
|
||
prediction = model.predict(current_data)
|
||
forecast_results[time_point] = prediction
|
||
|
||
# 显示摘要结果
|
||
rain_prob = prediction['rain_probability'].mean()
|
||
rain_samples = sum(prediction['rain_binary'])
|
||
|
||
print(f"未来{time_point}小时降雨概率: {rain_prob:.4f}")
|
||
print(f"预测有雨样本数: {rain_samples}/{len(current_data)} ({rain_samples/len(current_data)*100:.2f}%)")
|
||
|
||
# 如果有降雨预测,显示降雨量情况
|
||
if rain_samples > 0:
|
||
rain_amounts = prediction['rainfall_amount'][prediction['rain_binary'] == 1]
|
||
if len(rain_amounts)>0:
|
||
|
||
# 对降雨量进行排序
|
||
sorted_rain = np.sort(rain_amounts)
|
||
|
||
# 计算截取中间60%的索引位置
|
||
n = len(sorted_rain)
|
||
start_idx = int(n * 0.2) # 跳过最低的20%
|
||
end_idx = int(n * 0.8) # 跳过最高的20%
|
||
|
||
# 确保至少有一个元素
|
||
if start_idx == end_idx:
|
||
middle_rain = sorted_rain[start_idx:start_idx+1]
|
||
else:
|
||
middle_rain = sorted_rain[start_idx:end_idx]
|
||
|
||
# 计算中间60%的平均值
|
||
middle_avg = np.mean(middle_rain)
|
||
|
||
print(f"预测降雨量: 中间60%平均 {middle_avg:.2f}mm, 全部平均 {rain_amounts.mean():.2f}mm, 最大 {rain_amounts.max():.2f}mm")
|
||
|
||
# 统计降雨类别
|
||
categories = prediction['rain_category'][prediction['rain_binary'] == 1]
|
||
if len(categories) > 0:
|
||
try:
|
||
cat_counts = np.bincount(categories.astype(int))
|
||
cat_names = ["", "小雨", "中雨", "大雨", "暴雨"]
|
||
for cat, count in enumerate(cat_counts):
|
||
if cat > 0 and count > 0 and cat < len(cat_names):
|
||
print(f" {cat_names[cat]}: {count}个样本 ({count/rain_samples*100:.1f}%)")
|
||
except Exception as e:
|
||
print(f"统计降雨类别时出错: {str(e)}")
|
||
|
||
except Exception as e:
|
||
print(f"未来{time_point}小时预测出错: {str(e)}")
|
||
forecast_results[time_point] = None
|
||
|
||
return forecast_results
|
||
|
||
def load_models(self, base_path):
|
||
"""加载所有时间点的预测模型"""
|
||
models_loaded = 0
|
||
|
||
try:
|
||
for time_point in self.time_points:
|
||
model_path = f'{base_path}/rainfall_forecast_{time_point}h_model.joblib'
|
||
if os.path.exists(model_path):
|
||
print(f"加载未来{time_point}小时降雨预报模型...")
|
||
try:
|
||
model = RainfallTwoStageModel.load_model(model_path)
|
||
self.forecast_models[time_point] = model
|
||
models_loaded += 1
|
||
except Exception as e:
|
||
print(f"加载未来{time_point}小时模型时出错: {str(e)}")
|
||
else:
|
||
print(f"未找到未来{time_point}小时的模型文件: {model_path}")
|
||
|
||
if models_loaded > 0:
|
||
print(f"成功加载 {models_loaded} 个时间点的预报模型")
|
||
else:
|
||
print("警告: 未能加载任何预报模型")
|
||
|
||
except Exception as e:
|
||
print(f"加载模型出错: {str(e)}")
|
||
|
||
def save_models(self, base_path):
|
||
"""保存所有时间点的预测模型"""
|
||
os.makedirs(base_path, exist_ok=True)
|
||
|
||
for time_point, model in self.forecast_models.items():
|
||
model_path = f'{base_path}/rainfall_forecast_{time_point}h_model.joblib'
|
||
print(f"保存未来{time_point}小时降雨预报模型...")
|
||
model.save_model(model_path)
|
||
|
||
print(f"成功保存 {len(self.forecast_models)} 个时间点的预报模型")
|
||
|
||
def visualize_forecast(self, forecast_results, output_dir='forecast_visualizations'):
|
||
"""
|
||
可视化预测结果
|
||
|
||
参数:
|
||
forecast_results: dict 各时间点的预测结果
|
||
output_dir: str 输出目录
|
||
"""
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
if not forecast_results:
|
||
print("没有预测结果可供可视化")
|
||
return
|
||
|
||
# 准备绘图数据
|
||
time_labels = [f"{t}hour" for t in forecast_results.keys()]
|
||
rain_probs = [forecast_results[t]['rain_probability'].mean() for t in forecast_results.keys()]
|
||
|
||
# 准备降雨量数据
|
||
rain_amounts = []
|
||
for t in forecast_results.keys():
|
||
result = forecast_results[t]
|
||
if sum(result['rain_binary']) > 0:
|
||
# 计算有雨样本的平均降雨量
|
||
amounts = result['rainfall_amount'][result['rain_binary'] == 1]
|
||
rain_amounts.append(amounts.mean())
|
||
else:
|
||
rain_amounts.append(0)
|
||
|
||
# 绘制降雨概率随时间变化图
|
||
plt.figure(figsize=(10, 6))
|
||
plt.bar(time_labels, rain_probs, color='skyblue')
|
||
plt.axhline(y=0.5, color='r', linestyle='--', label='50%概率阈值')
|
||
plt.ylim(0, 1)
|
||
plt.title('Prediction of rainfall probability at various time points in the future')
|
||
plt.ylabel('Rainfall probability')
|
||
plt.xlabel('pred_time')
|
||
plt.grid(True, alpha=0.3)
|
||
plt.legend()
|
||
plt.tight_layout()
|
||
plt.savefig(f"{output_dir}/rainfall_probability_forecast.png")
|
||
plt.close()
|
||
|
||
# 绘制降雨量随时间变化图
|
||
plt.figure(figsize=(10, 6))
|
||
plt.bar(time_labels, rain_amounts, color='blue')
|
||
plt.title('Prediction of rainfall at different time points in the future')
|
||
plt.ylabel('rain (mm)')
|
||
plt.xlabel('pred_time')
|
||
plt.grid(True, alpha=0.3)
|
||
plt.tight_layout()
|
||
plt.savefig(f"{output_dir}/rainfall_amount_forecast.png")
|
||
plt.close()
|
||
|
||
def get_current_weather_data():
|
||
"""
|
||
模拟从气象站获取当前的气象数据
|
||
在实际应用中,这个函数会从传感器、API或其他数据源获取数据
|
||
"""
|
||
# 模拟数据 - 在实际应用中会被真实数据替代
|
||
current_data = {
|
||
'datetime': [datetime.datetime.now()],
|
||
'PWV': [25.5], # 可降水量
|
||
'ZTD': [2450.3], # 对流层延迟
|
||
'Temp': [305.6], # 温度(开尔文)
|
||
'Press': [1008.5], # 气压(hPa)
|
||
'rh': [78.5], # 相对湿度(%)
|
||
'ws': [3.2] # 风速(m/s)
|
||
}
|
||
|
||
# 创建DataFrame
|
||
current_df = pd.DataFrame(current_data)
|
||
|
||
# 加载历史数据以计算变化率和滚动统计
|
||
try:
|
||
history_df = pd.read_csv('weather_history_cache.csv')
|
||
history_df['datetime'] = pd.to_datetime(history_df['datetime'])
|
||
|
||
# 合并当前数据与历史数据
|
||
combined_df = pd.concat([history_df, current_df]).reset_index(drop=True)
|
||
|
||
# 确保按时间排序
|
||
combined_df = combined_df.sort_values(by='datetime')
|
||
|
||
# 保存最新的24小时数据作为历史缓存
|
||
combined_df.tail(24).to_csv('weather_history_cache.csv', index=False)
|
||
|
||
# 计算变化率
|
||
for var in ['PWV', 'ZTD', 'Temp', 'Press', 'rh']:
|
||
# 1小时变化(假设数据是每小时一条)
|
||
combined_df[f'{var}_1h_change'] = combined_df[var].diff(1)
|
||
|
||
# 3小时变化
|
||
combined_df[f'{var}_3h_change'] = combined_df[var].diff(3)
|
||
|
||
# 6小时变化
|
||
combined_df[f'{var}_6h_change'] = combined_df[var].diff(6)
|
||
|
||
# 计算滚动均值
|
||
combined_df[f'{var}_3h_mean'] = combined_df[var].rolling(window=3, min_periods=1).mean()
|
||
combined_df[f'{var}_6h_mean'] = combined_df[var].rolling(window=6, min_periods=1).mean()
|
||
|
||
# 滚动标准差
|
||
combined_df[f'{var}_6h_std'] = combined_df[var].rolling(window=6, min_periods=1).std().fillna(0)
|
||
|
||
# 异常值(相对于平均值的标准化偏差)
|
||
var_mean = combined_df[var].mean()
|
||
var_std = combined_df[var].std()
|
||
combined_df[f'{var}_anomaly'] = (combined_df[var] - var_mean) / var_std if var_std > 0 else 0
|
||
|
||
# 急剧变化指标
|
||
var_std = combined_df[f'{var}_1h_change'].std()
|
||
if var_std > 0:
|
||
combined_df[f'{var}_rapid_change'] = (np.abs(combined_df[f'{var}_1h_change']) > (2 * var_std)).astype(int)
|
||
else:
|
||
combined_df[f'{var}_rapid_change'] = 0
|
||
|
||
# 变化加速度(变化率的变化率)
|
||
combined_df[f'{var}_change_accel'] = combined_df[f'{var}_1h_change'].diff(1).fillna(0)
|
||
|
||
# 添加组合特征
|
||
if all(var in combined_df.columns for var in ['PWV', 'rh']):
|
||
# 湿度可用性指数
|
||
combined_df['PWV_rh'] = combined_df['PWV'] * combined_df['rh'] / 100
|
||
|
||
# 累积湿度变化
|
||
combined_df['PWV_cumul_change'] = combined_df['PWV'].diff(1).rolling(window=6).sum().fillna(0)
|
||
combined_df['rh_cumul_change'] = combined_df['rh'].diff(1).rolling(window=6).sum().fillna(0)
|
||
|
||
if all(var in combined_df.columns for var in ['PWV', 'Temp']):
|
||
# 温度-湿度相互作用
|
||
combined_df['PWV_Temp'] = combined_df['PWV'] * combined_df['Temp']
|
||
|
||
# 饱和潜力
|
||
temp_exp = np.exp(17.27 * (combined_df['Temp']-273.15) / ((combined_df['Temp']-273.15) + 237.3))
|
||
combined_df['saturation_index'] = combined_df['PWV'] / (0.1 * temp_exp)
|
||
|
||
if all(var in combined_df.columns for var in ['Press', 'Press_1h_change']):
|
||
# 气压变化强度
|
||
combined_df['Press_change_intensity'] = combined_df['Press_1h_change'].abs()
|
||
|
||
# 气压下降指标
|
||
combined_df['Press_falling'] = (combined_df['Press_1h_change'] < 0).astype(int)
|
||
|
||
# 气压快速下降指标
|
||
press_quantile = combined_df['Press_1h_change'].abs().quantile(0.75)
|
||
combined_df['Press_rapid_fall'] = ((combined_df['Press_1h_change'] < 0) &
|
||
(combined_df['Press_1h_change'].abs() > press_quantile)).astype(int)
|
||
|
||
if all(var in combined_df.columns for var in ['ZTD', 'PWV']):
|
||
# ZTD-PWV比率
|
||
combined_df['ZTD_PWV_ratio'] = combined_df['ZTD'] / combined_df['PWV'].replace(0, np.nan).fillna(combined_df['PWV'].mean())
|
||
|
||
# 如果有变化率特征
|
||
if all(var in combined_df.columns for var in ['ZTD_1h_change', 'PWV_1h_change']):
|
||
combined_df['ZTD_PWV_change'] = combined_df['ZTD_1h_change'] * combined_df['PWV_1h_change']
|
||
|
||
# 湿度不稳定指标
|
||
pwv_mean = combined_df['PWV_6h_mean'].mean()
|
||
if pwv_mean > 0:
|
||
combined_df['moisture_instability'] = combined_df['PWV_6h_std'] / combined_df['PWV_6h_mean']
|
||
else:
|
||
combined_df['moisture_instability'] = 0
|
||
|
||
# 添加降雨潜力指标
|
||
if all(var in combined_df.columns for var in ['PWV', 'rh', 'Temp', 'Press_1h_change']):
|
||
# 组合降雨指标
|
||
combined_df['rain_potential'] = ((combined_df['PWV'] * combined_df['rh'] / 100) *
|
||
(0.1 - combined_df['Press_1h_change']) /
|
||
(combined_df['Temp'] + 10))
|
||
|
||
# 强降雨潜力指标
|
||
pwv_mean = combined_df['PWV'].mean() + 0.1 # 避免除以零
|
||
combined_df['heavy_rain_potential'] = (combined_df['PWV'] * (combined_df['rh'] / 100) *
|
||
(combined_df['Press_rapid_fall'] + 1) *
|
||
(1 + np.abs(combined_df['PWV_1h_change']) /
|
||
pwv_mean))
|
||
|
||
# 降雨持续性指标
|
||
combined_df['rain_persistence'] = (combined_df['PWV_cumul_change'] *
|
||
combined_df['rh_cumul_change'] *
|
||
(1 + combined_df['Press_change_intensity']))
|
||
|
||
# 获取最新一行作为当前数据,包含所有计算的特征
|
||
current_df = combined_df.iloc[-1:].reset_index(drop=True)
|
||
|
||
except Exception as e:
|
||
print(f"计算特征时出错: {str(e)}")
|
||
print("创建一个新的历史缓存文件...")
|
||
|
||
# 如果没有历史数据或出错,将当前数据作为历史缓存的第一条记录
|
||
current_df.to_csv('weather_history_cache.csv', index=False)
|
||
|
||
# 这种情况下无法计算时间序列特征
|
||
print("警告: 无法计算变化率和滚动统计特征,首次运行只能使用基础特征")
|
||
|
||
return current_df
|
||
|
||
def run_realtime_rainfall_forecast():
|
||
"""
|
||
运行实时降雨预报系统
|
||
"""
|
||
print("启动实时降雨预报系统...")
|
||
|
||
# 检查模型目录是否存在
|
||
model_dir = 'rainfall_forecast_models'
|
||
if not os.path.exists(model_dir):
|
||
print(f"错误: 未找到模型目录 '{model_dir}'")
|
||
print("请先使用历史数据训练模型")
|
||
return
|
||
|
||
# 加载预测模型
|
||
print("加载未来降雨预报模型...")
|
||
forecast_system = FutureRainfallForecastSystem()
|
||
forecast_system.load_models(model_dir)
|
||
|
||
if not forecast_system.forecast_models:
|
||
print("错误: 未能加载任何模型")
|
||
return
|
||
|
||
# 获取当前气象数据
|
||
print("获取当前气象数据...")
|
||
current_weather = get_current_weather_data()
|
||
|
||
# 检查数据是否完整
|
||
if current_weather is None or len(current_weather) == 0:
|
||
print("错误: 无法获取当前气象数据")
|
||
return
|
||
|
||
print("\n当前气象条件:")
|
||
print(f"时间: {current_weather['datetime'].iloc[0]}")
|
||
print(f"温度: {current_weather['Temp'].iloc[0]:.1f}K")
|
||
print(f"湿度: {current_weather['rh'].iloc[0]:.1f}%")
|
||
print(f"气压: {current_weather['Press'].iloc[0]:.1f} hPa")
|
||
print(f"可降水量(PWV): {current_weather['PWV'].iloc[0]:.1f} mm")
|
||
|
||
# 进行未来降雨预测
|
||
print("\n开始预测未来降雨情况...")
|
||
forecast_results = forecast_system.forecast(current_weather)
|
||
|
||
# 可视化预测结果
|
||
print("\n生成降雨预报可视化...")
|
||
forecast_system.visualize_forecast(forecast_results, output_dir='realtime_forecast_results')
|
||
|
||
# 输出预测报告
|
||
print("\n未来降雨预报摘要:")
|
||
print("=" * 50)
|
||
|
||
# 当前时间
|
||
current_time = datetime.datetime.now()
|
||
print(f"预报时间: {current_time.strftime('%Y-%m-%d %H:%M')}")
|
||
|
||
# 用于存储预报结果的列表,以便可能的后续处理或发送通知
|
||
forecast_summary = []
|
||
|
||
for time_point, result in forecast_results.items():
|
||
# 计算预测时间
|
||
forecast_time = current_time + timedelta(hours=time_point)
|
||
|
||
print(f"\n未来{time_point}小时 ({forecast_time.strftime('%H:%M')}) 降雨预报:")
|
||
print("-" * 50)
|
||
|
||
# 降雨概率
|
||
rain_prob = result['rain_probability'].mean()
|
||
|
||
# 是否有雨
|
||
rain_predicted = sum(result['rain_binary']) > 0
|
||
rain_status = "有" if rain_predicted else "无"
|
||
|
||
print(f"降雨概率: {rain_prob:.2f}")
|
||
print(f"预计降雨: {rain_status}")
|
||
|
||
forecast_record = {
|
||
'time_point': time_point,
|
||
'forecast_time': forecast_time,
|
||
'rain_probability': rain_prob,
|
||
'rain_predicted': rain_predicted
|
||
}
|
||
|
||
# 如果预测有雨,显示详细信息
|
||
if rain_predicted:
|
||
rain_indices = np.where(result['rain_binary'] == 1)[0]
|
||
rain_amounts = result['rainfall_amount'][rain_indices]
|
||
rain_categories = result['rain_category'][rain_indices]
|
||
|
||
# 计算平均降雨量和最大降雨量
|
||
avg_rain = rain_amounts.mean()
|
||
max_rain = rain_amounts.max()
|
||
|
||
print(f"预计降雨量: {avg_rain:.2f}mm (平均), {max_rain:.2f}mm (最大)")
|
||
|
||
forecast_record['avg_rainfall'] = avg_rain
|
||
forecast_record['max_rainfall'] = max_rain
|
||
|
||
# 统计降雨类别
|
||
try:
|
||
cat_counts = np.bincount(rain_categories.astype(int))
|
||
cat_names = ["", "小雨", "中雨", "大雨", "暴雨"]
|
||
|
||
print("降雨类型:")
|
||
most_common_cat = 0
|
||
most_common_count = 0
|
||
|
||
for cat, count in enumerate(cat_counts):
|
||
if cat > 0 and count > 0:
|
||
if count > most_common_count:
|
||
most_common_cat = cat
|
||
most_common_count = count
|
||
print(f" {cat_names[cat]}: {count}个样本 ({count/len(rain_indices)*100:.1f}%)")
|
||
|
||
forecast_record['rain_category'] = most_common_cat
|
||
forecast_record['rain_category_name'] = cat_names[most_common_cat] if most_common_cat > 0 else ""
|
||
|
||
# 主要降雨类型
|
||
if most_common_cat > 0:
|
||
print(f"主要降雨类型: {cat_names[most_common_cat]}")
|
||
|
||
# 给出降雨建议
|
||
advice = ""
|
||
if most_common_cat >= 3: # 大雨或暴雨
|
||
advice = "建议避免外出,注意防范强降雨可能带来的积水和交通影响"
|
||
elif most_common_cat == 2: # 中雨
|
||
advice = "出门请携带雨具,道路可能湿滑,驾车注意安全"
|
||
else: # 小雨
|
||
advice = "带好雨具,不会对出行造成太大影响"
|
||
|
||
print(f"出行建议: {advice}")
|
||
forecast_record['advice'] = advice
|
||
except Exception as e:
|
||
print(f"处理降雨类别时出错: {str(e)}")
|
||
forecast_record['rain_category'] = 0
|
||
forecast_record['rain_category_name'] = ""
|
||
forecast_record['advice'] = "请注意携带雨具"
|
||
else:
|
||
forecast_record['avg_rainfall'] = 0
|
||
forecast_record['max_rainfall'] = 0
|
||
forecast_record['rain_category'] = 0
|
||
forecast_record['rain_category_name'] = ""
|
||
forecast_record['advice'] = "天气良好,适合户外活动"
|
||
|
||
forecast_summary.append(forecast_record)
|
||
|
||
# 保存预报结果用于历史比较和验证
|
||
try:
|
||
forecast_history = pd.DataFrame(forecast_summary)
|
||
forecast_time_str = current_time.strftime('%Y%m%d_%H%M')
|
||
forecast_history.to_csv(f'realtime_forecast_results/forecast_{forecast_time_str}.csv', index=False)
|
||
except Exception as e:
|
||
print(f"保存预报结果时出错: {str(e)}")
|
||
|
||
print("\n预报完成.")
|
||
|
||
# 检查是否需要发送预警
|
||
for record in forecast_summary:
|
||
if record['rain_predicted'] and record.get('rain_category', 0) >= 3: # 大雨或暴雨
|
||
print(f"\n⚠️ 预警: 未来{record['time_point']}小时预计有{record['rain_category_name']}!")
|
||
# 这里可以添加发送邮件、短信或其他通知的代码
|
||
|
||
return forecast_summary
|
||
|
||
|
||
# 主程序入口
|
||
if __name__ == "__main__":
|
||
parser = argparse.ArgumentParser(description="未来降雨预报系统 - 训练模型和实时预测")
|
||
parser.add_argument('--mode', type=str, default='train', choices=['train', 'predict', 'realtime'],
|
||
help='执行模式: train-训练模型, predict-单次预测, realtime-实时预测')
|
||
parser.add_argument('--data', type=str, default='rainfall_data_time.csv',
|
||
help='数据文件路径')
|
||
parser.add_argument('--model_dir', type=str, default='rainfall_forecast_models',
|
||
help='模型保存/加载目录')
|
||
parser.add_argument('--n_features', type=int, default=35,
|
||
help='使用的特征数量')
|
||
parser.add_argument('--clf_model', type=str, default='ensemble',
|
||
choices=['lr', 'rf', 'gbdt', 'xgboost', 'ensemble'],
|
||
help='分类器类型')
|
||
parser.add_argument('--reg_model', type=str, default='gbdt',
|
||
choices=['rf', 'gbdt', 'ridge', 'xgboost'],
|
||
help='回归器类型')
|
||
parser.add_argument('--time_based', action='store_true',
|
||
help='是否使用基于时间的数据分割(默认使用随机分割)')
|
||
parser.add_argument('--debug', action='store_true',
|
||
help='启用调试模式,输出更详细的信息')
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 创建输出目录
|
||
os.makedirs('realtime_forecast_results', exist_ok=True)
|
||
|
||
try:
|
||
if args.mode == 'train':
|
||
# 训练模式
|
||
print(f"加载训练数据: {args.data}")
|
||
try:
|
||
# 读取历史数据
|
||
historical_data = pd.read_csv(args.data)
|
||
|
||
# 确保时间列是日期时间格式
|
||
if 'datetime' in historical_data.columns:
|
||
historical_data['datetime'] = pd.to_datetime(historical_data['datetime'])
|
||
|
||
# 按时间排序数据
|
||
historical_data = historical_data.sort_values(by='datetime')
|
||
|
||
print(f"数据时间范围: {historical_data['datetime'].min()} 到 {historical_data['datetime'].max()}")
|
||
else:
|
||
print("警告: 数据中没有找到时间列,假设数据已按时间排序")
|
||
|
||
print(f"总样本数: {len(historical_data)}")
|
||
|
||
# 创建未来降雨预报系统
|
||
print("\n创建未来降雨预报系统...")
|
||
forecast_system = FutureRainfallForecastSystem(debug=args.debug)
|
||
|
||
# 配置特征
|
||
features_config = {
|
||
'n_features': args.n_features,
|
||
'clf_model_type': args.clf_model,
|
||
'reg_model_type': args.reg_model,
|
||
'time_based_split': args.time_based
|
||
}
|
||
|
||
# 训练未来各时间点的预报模型
|
||
print("\n开始训练未来降雨预报模型...")
|
||
forecast_system.train_forecast_models(historical_data, features_config)
|
||
|
||
# 保存训练好的模型
|
||
print("\n保存训练好的模型...")
|
||
forecast_system.save_models(args.model_dir)
|
||
|
||
print("\n未来降雨预报系统训练完成!")
|
||
|
||
except FileNotFoundError:
|
||
print(f"错误: 找不到训练数据文件 '{args.data}'")
|
||
print("请确保该文件存在于当前目录中")
|
||
|
||
except Exception as e:
|
||
print(f"训练过程中出错: {str(e)}")
|
||
traceback.print_exc()
|
||
|
||
elif args.mode == 'predict':
|
||
# 单次预测模式
|
||
print(f"加载当前气象数据: {args.data}")
|
||
try:
|
||
# 读取当前气象数据
|
||
current_data = pd.read_csv(args.data)
|
||
|
||
# 加载预测模型
|
||
print("\n加载未来降雨预报模型...")
|
||
forecast_system = FutureRainfallForecastSystem(model_path=args.model_dir, debug=args.debug)
|
||
|
||
# 进行预测
|
||
print("\n开始预测未来降雨情况...")
|
||
forecast_results = forecast_system.forecast(current_data)
|
||
|
||
# 可视化预测结果
|
||
print("\n生成预测结果可视化...")
|
||
forecast_system.visualize_forecast(forecast_results)
|
||
|
||
print("\n预测完成.")
|
||
|
||
except FileNotFoundError:
|
||
print(f"错误: 找不到当前气象数据文件 '{args.data}'")
|
||
|
||
except Exception as e:
|
||
print(f"预测过程中出错: {str(e)}")
|
||
traceback.print_exc()
|
||
|
||
elif args.mode == 'realtime':
|
||
# 实时预测模式
|
||
try:
|
||
run_realtime_rainfall_forecast()
|
||
except Exception as e:
|
||
print(f"实时预测过程中出错: {str(e)}")
|
||
traceback.print_exc()
|
||
|
||
# 如果需要周期性运行
|
||
"""
|
||
while True:
|
||
try:
|
||
run_realtime_rainfall_forecast()
|
||
print("\n等待下次预报...")
|
||
# 等待1小时
|
||
time.sleep(3600)
|
||
except KeyboardInterrupt:
|
||
print("\n用户中断,系统退出")
|
||
break
|
||
except Exception as e:
|
||
print(f"\n运行时错误: {str(e)}")
|
||
traceback.print_exc()
|
||
# 出错后等待10分钟再重试
|
||
time.sleep(600)
|
||
"""
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n用户中断,系统退出")
|
||
except Exception as e:
|
||
print(f"\n系统错误: {str(e)}")
|
||
traceback.print_exc()
|