package forecast import ( "context" "database/sql" "encoding/json" "fmt" "log" "net/http" "net/url" "time" "weatherstation/internal/database" ) type openMeteoResponse struct { Hourly struct { Time []string `json:"time"` Rain []float64 `json:"rain"` Temperature []float64 `json:"temperature_2m"` Humidity []float64 `json:"relative_humidity_2m"` WindSpeed []float64 `json:"wind_speed_10m"` WindGusts []float64 `json:"wind_gusts_10m"` WindDir []float64 `json:"wind_direction_10m"` PrecipProb []float64 `json:"precipitation_probability"` SurfacePres []float64 `json:"surface_pressure"` } `json:"hourly"` } // RunOpenMeteoFetch 拉取各站点未来三小时并写入 forecast_hourly func RunOpenMeteoFetch(ctx context.Context) error { db := database.GetDB() stations, err := loadStationsWithLatLon(ctx, db) if err != nil { return err } client := &http.Client{Timeout: 15 * time.Second} loc, _ := time.LoadLocation("Asia/Shanghai") if loc == nil { loc = time.FixedZone("CST", 8*3600) } issuedAt := time.Now().In(loc) startHour := issuedAt.Truncate(time.Hour) targets := []time.Time{startHour.Add(1 * time.Hour), startHour.Add(2 * time.Hour), startHour.Add(3 * time.Hour)} for _, s := range stations { apiURL := buildOpenMeteoURL(s.lat, s.lon) req, _ := http.NewRequestWithContext(ctx, http.MethodGet, apiURL, nil) resp, err := client.Do(req) if err != nil { log.Printf("open-meteo 请求失败 station=%s err=%v", s.id, err) continue } var data openMeteoResponse if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { resp.Body.Close() log.Printf("open-meteo 解码失败 station=%s err=%v", s.id, err) continue } resp.Body.Close() // 建立 time->vals 映射(按CST解析) table := map[time.Time]struct { rain float64 temp float64 rh float64 ws float64 gust float64 wdir float64 prob float64 pres float64 }{} for i := range data.Hourly.Time { t, err := time.ParseInLocation("2006-01-02T15:04", data.Hourly.Time[i], loc) if err != nil { continue } v := table[t] if i < len(data.Hourly.Rain) { v.rain = data.Hourly.Rain[i] } if i < len(data.Hourly.Temperature) { v.temp = data.Hourly.Temperature[i] } if i < len(data.Hourly.Humidity) { v.rh = data.Hourly.Humidity[i] } if i < len(data.Hourly.WindSpeed) { // 将 km/h 转换为 m/s: m/s = km/h ÷ 3.6 v.ws = data.Hourly.WindSpeed[i] / 3.6 } if i < len(data.Hourly.WindGusts) { // 将 km/h 转换为 m/s: m/s = km/h ÷ 3.6 v.gust = data.Hourly.WindGusts[i] / 3.6 } if i < len(data.Hourly.WindDir) { v.wdir = data.Hourly.WindDir[i] } if i < len(data.Hourly.PrecipProb) { v.prob = data.Hourly.PrecipProb[i] } if i < len(data.Hourly.SurfacePres) { v.pres = data.Hourly.SurfacePres[i] } table[t] = v } for _, ft := range targets { if v, ok := table[ft]; ok { if err := upsertForecast(ctx, db, s.id, issuedAt, ft, int64(v.rain*1000.0), int64(v.temp*100.0), int64(v.rh), int64(v.ws*1000.0), int64(v.gust*1000.0), int64(v.wdir), int64(v.prob), int64(v.pres*100.0), ); err != nil { log.Printf("写入forecast失败 station=%s time=%s err=%v", s.id, ft.Format(time.RFC3339), err) } } } } return nil } type station struct { id string lat sql.NullFloat64 lon sql.NullFloat64 } func loadStationsWithLatLon(ctx context.Context, db *sql.DB) ([]station, error) { rows, err := db.QueryContext(ctx, `SELECT station_id, latitude, longitude FROM stations WHERE latitude IS NOT NULL AND longitude IS NOT NULL`) if err != nil { return nil, err } defer rows.Close() var list []station for rows.Next() { var s station if err := rows.Scan(&s.id, &s.lat, &s.lon); err != nil { continue } list = append(list, s) } return list, nil } func buildOpenMeteoURL(lat, lon sql.NullFloat64) string { q := url.Values{} q.Set("latitude", fmt.Sprintf("%f", lat.Float64)) q.Set("longitude", fmt.Sprintf("%f", lon.Float64)) q.Set("hourly", "rain,temperature_2m,relative_humidity_2m,wind_speed_10m,wind_gusts_10m,wind_direction_10m,precipitation_probability,surface_pressure") q.Set("timezone", "Asia/Shanghai") // 可以添加单位参数,但我们已经在代码中处理了单位转换,所以保持默认单位即可 // 默认单位:风速 km/h,温度 °C,降水量 mm,气压 hPa return "https://api.open-meteo.com/v1/forecast?" + q.Encode() } func upsertForecast(ctx context.Context, db *sql.DB, stationID string, issuedAt, forecastTime time.Time, rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100 int64, ) error { _, err := db.ExecContext(ctx, ` INSERT INTO forecast_hourly ( station_id, provider, issued_at, forecast_time, rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100 ) VALUES ($1, 'open-meteo', $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (station_id, provider, issued_at, forecast_time) DO UPDATE SET rain_mm_x1000 = EXCLUDED.rain_mm_x1000, temp_c_x100 = EXCLUDED.temp_c_x100, humidity_pct = EXCLUDED.humidity_pct, wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000, wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000, wind_dir_deg = EXCLUDED.wind_dir_deg, precip_prob_pct = EXCLUDED.precip_prob_pct, pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100 `, stationID, issuedAt, forecastTime, rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100) return err } // 新增:支持自定义provider的upsert func upsertForecastWithProvider(ctx context.Context, db *sql.DB, stationID, provider string, issuedAt, forecastTime time.Time, rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100 int64, ) error { // 调试日志 if provider == "open-meteo_historical" { log.Printf("写入历史数据: station=%s, time=%s, temp=%.2f, humidity=%d", stationID, forecastTime.Format("2006-01-02 15:04:05"), float64(tempCx100)/100.0, humidityPct) } _, err := db.ExecContext(ctx, ` INSERT INTO forecast_hourly ( station_id, provider, issued_at, forecast_time, rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT (station_id, provider, issued_at, forecast_time) DO UPDATE SET rain_mm_x1000 = EXCLUDED.rain_mm_x1000, temp_c_x100 = EXCLUDED.temp_c_x100, humidity_pct = EXCLUDED.humidity_pct, wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000, wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000, wind_dir_deg = EXCLUDED.wind_dir_deg, precip_prob_pct = EXCLUDED.precip_prob_pct, pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100 `, stationID, provider, issuedAt, forecastTime, rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100) return err } // RunOpenMeteoHistoricalFetch 拉取指定时间段的历史数据并写入 forecast_hourly(provider=open-meteo_historical) func RunOpenMeteoHistoricalFetch(ctx context.Context, startDate, endDate string) error { db := database.GetDB() stations, err := loadStationsWithLatLon(ctx, db) if err != nil { return fmt.Errorf("加载站点失败: %v", err) } client := &http.Client{Timeout: 30 * time.Second} loc, _ := time.LoadLocation("Asia/Shanghai") if loc == nil { loc = time.FixedZone("CST", 8*3600) } log.Printf("开始补完历史数据: %s 到 %s,共 %d 个站点", startDate, endDate, len(stations)) for i, s := range stations { log.Printf("处理站点 %d/%d: %s", i+1, len(stations), s.id) apiURL := buildOpenMeteoHistoricalURL(s.lat, s.lon, startDate, endDate) req, _ := http.NewRequestWithContext(ctx, http.MethodGet, apiURL, nil) resp, err := client.Do(req) if err != nil { log.Printf("历史数据请求失败 station=%s err=%v", s.id, err) continue } var data openMeteoResponse if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { resp.Body.Close() log.Printf("历史数据解码失败 station=%s err=%v", s.id, err) continue } resp.Body.Close() // 处理并写入forecast_hourly(历史) count := 0 issuedAt := time.Now().In(loc) for i := range data.Hourly.Time { // 解析时间(使用CST时区) t, err := time.ParseInLocation("2006-01-02T15:04", data.Hourly.Time[i], loc) if err != nil { log.Printf("解析时间失败: %s, err=%v", data.Hourly.Time[i], err) continue } // 收集并转换(与forecast_hourly缩放一致) rainMmX1000 := int64(0) if i < len(data.Hourly.Rain) { rainMmX1000 = int64(data.Hourly.Rain[i] * 1000.0) } tempCx100 := int64(0) if i < len(data.Hourly.Temperature) { tempCx100 = int64(data.Hourly.Temperature[i] * 100.0) } humidityPct := int64(0) if i < len(data.Hourly.Humidity) { humidityPct = int64(data.Hourly.Humidity[i]) } wsMsX1000 := int64(0) if i < len(data.Hourly.WindSpeed) { wsMsX1000 = int64((data.Hourly.WindSpeed[i] / 3.6) * 1000.0) } gustMsX1000 := int64(0) // ERA5此接口未提供阵风,置0 wdirDeg := int64(0) if i < len(data.Hourly.WindDir) { wdirDeg = int64(data.Hourly.WindDir[i]) } probPct := int64(0) // 历史无降水概率,置0 pressureHpaX100 := int64(0) if i < len(data.Hourly.SurfacePres) { pressureHpaX100 = int64(data.Hourly.SurfacePres[i] * 100.0) } if err := upsertForecastWithProvider( ctx, db, s.id, "open-meteo_historical", issuedAt, t, rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100, ); err != nil { log.Printf("写入历史forecast失败 station=%s time=%s err=%v", s.id, t.Format(time.RFC3339), err) } else { count++ } } log.Printf("站点 %s 成功写入 %d 条历史forecast记录", s.id, count) // 防止请求过频 time.Sleep(100 * time.Millisecond) } return nil } func buildOpenMeteoHistoricalURL(lat, lon sql.NullFloat64, startDate, endDate string) string { q := url.Values{} q.Set("latitude", fmt.Sprintf("%f", lat.Float64)) q.Set("longitude", fmt.Sprintf("%f", lon.Float64)) q.Set("start_date", startDate) q.Set("end_date", endDate) q.Set("hourly", "temperature_2m,relative_humidity_2m,surface_pressure,wind_speed_10m,wind_direction_10m,rain") q.Set("timezone", "Asia/Shanghai") return "https://archive-api.open-meteo.com/v1/era5?" + q.Encode() } func insertHistoricalData(ctx context.Context, db *sql.DB, stationID string, timestamp time.Time, temp, humidity, pressure, windSpeed, windDir, rainfall *float64) error { _, err := db.ExecContext(ctx, ` INSERT INTO rs485_weather_data ( station_id, timestamp, temperature, humidity, pressure, wind_speed, wind_direction, rainfall, raw_data ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (station_id, timestamp) DO UPDATE SET temperature = EXCLUDED.temperature, humidity = EXCLUDED.humidity, pressure = EXCLUDED.pressure, wind_speed = EXCLUDED.wind_speed, wind_direction = EXCLUDED.wind_direction, rainfall = EXCLUDED.rainfall, raw_data = EXCLUDED.raw_data `, stationID, timestamp, temp, humidity, pressure, windSpeed, windDir, rainfall, fmt.Sprintf("open-meteo-historical:%s", timestamp.Format(time.RFC3339))) return err }