342 lines
11 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package forecast
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"time"
"weatherstation/internal/database"
)
type openMeteoResponse struct {
Hourly struct {
Time []string `json:"time"`
Rain []float64 `json:"rain"`
Temperature []float64 `json:"temperature_2m"`
Humidity []float64 `json:"relative_humidity_2m"`
WindSpeed []float64 `json:"wind_speed_10m"`
WindGusts []float64 `json:"wind_gusts_10m"`
WindDir []float64 `json:"wind_direction_10m"`
PrecipProb []float64 `json:"precipitation_probability"`
SurfacePres []float64 `json:"surface_pressure"`
} `json:"hourly"`
}
// RunOpenMeteoFetch 拉取各站点未来三小时并写入 forecast_hourly
func RunOpenMeteoFetch(ctx context.Context) error {
db := database.GetDB()
stations, err := loadStationsWithLatLon(ctx, db)
if err != nil {
return err
}
client := &http.Client{Timeout: 15 * time.Second}
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
issuedAt := time.Now().In(loc)
startHour := issuedAt.Truncate(time.Hour)
targets := []time.Time{startHour.Add(1 * time.Hour), startHour.Add(2 * time.Hour), startHour.Add(3 * time.Hour)}
for _, s := range stations {
apiURL := buildOpenMeteoURL(s.lat, s.lon)
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, apiURL, nil)
resp, err := client.Do(req)
if err != nil {
log.Printf("open-meteo 请求失败 station=%s err=%v", s.id, err)
continue
}
var data openMeteoResponse
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
resp.Body.Close()
log.Printf("open-meteo 解码失败 station=%s err=%v", s.id, err)
continue
}
resp.Body.Close()
// 建立 time->vals 映射按CST解析
table := map[time.Time]struct {
rain float64
temp float64
rh float64
ws float64
gust float64
wdir float64
prob float64
pres float64
}{}
for i := range data.Hourly.Time {
t, err := time.ParseInLocation("2006-01-02T15:04", data.Hourly.Time[i], loc)
if err != nil {
continue
}
v := table[t]
if i < len(data.Hourly.Rain) {
v.rain = data.Hourly.Rain[i]
}
if i < len(data.Hourly.Temperature) {
v.temp = data.Hourly.Temperature[i]
}
if i < len(data.Hourly.Humidity) {
v.rh = data.Hourly.Humidity[i]
}
if i < len(data.Hourly.WindSpeed) {
// 将 km/h 转换为 m/s: m/s = km/h ÷ 3.6
v.ws = data.Hourly.WindSpeed[i] / 3.6
}
if i < len(data.Hourly.WindGusts) {
// 将 km/h 转换为 m/s: m/s = km/h ÷ 3.6
v.gust = data.Hourly.WindGusts[i] / 3.6
}
if i < len(data.Hourly.WindDir) {
v.wdir = data.Hourly.WindDir[i]
}
if i < len(data.Hourly.PrecipProb) {
v.prob = data.Hourly.PrecipProb[i]
}
if i < len(data.Hourly.SurfacePres) {
v.pres = data.Hourly.SurfacePres[i]
}
table[t] = v
}
for _, ft := range targets {
if v, ok := table[ft]; ok {
if err := upsertForecast(ctx, db, s.id, issuedAt, ft,
int64(v.rain*1000.0),
int64(v.temp*100.0),
int64(v.rh),
int64(v.ws*1000.0),
int64(v.gust*1000.0),
int64(v.wdir),
int64(v.prob),
int64(v.pres*100.0),
); err != nil {
log.Printf("写入forecast失败 station=%s time=%s err=%v", s.id, ft.Format(time.RFC3339), err)
}
}
}
}
return nil
}
type station struct {
id string
lat sql.NullFloat64
lon sql.NullFloat64
}
func loadStationsWithLatLon(ctx context.Context, db *sql.DB) ([]station, error) {
rows, err := db.QueryContext(ctx, `SELECT station_id, latitude, longitude FROM stations WHERE latitude IS NOT NULL AND longitude IS NOT NULL`)
if err != nil {
return nil, err
}
defer rows.Close()
var list []station
for rows.Next() {
var s station
if err := rows.Scan(&s.id, &s.lat, &s.lon); err != nil {
continue
}
list = append(list, s)
}
return list, nil
}
func buildOpenMeteoURL(lat, lon sql.NullFloat64) string {
q := url.Values{}
q.Set("latitude", fmt.Sprintf("%f", lat.Float64))
q.Set("longitude", fmt.Sprintf("%f", lon.Float64))
q.Set("hourly", "rain,temperature_2m,relative_humidity_2m,wind_speed_10m,wind_gusts_10m,wind_direction_10m,precipitation_probability,surface_pressure")
q.Set("timezone", "Asia/Shanghai")
// 可以添加单位参数,但我们已经在代码中处理了单位转换,所以保持默认单位即可
// 默认单位:风速 km/h温度 °C降水量 mm气压 hPa
return "https://api.open-meteo.com/v1/forecast?" + q.Encode()
}
func upsertForecast(ctx context.Context, db *sql.DB, stationID string, issuedAt, forecastTime time.Time,
rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100 int64,
) error {
_, err := db.ExecContext(ctx, `
INSERT INTO forecast_hourly (
station_id, provider, issued_at, forecast_time,
rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000,
wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100
) VALUES ($1, 'open-meteo', $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (station_id, provider, issued_at, forecast_time)
DO UPDATE SET
rain_mm_x1000 = EXCLUDED.rain_mm_x1000,
temp_c_x100 = EXCLUDED.temp_c_x100,
humidity_pct = EXCLUDED.humidity_pct,
wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000,
wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000,
wind_dir_deg = EXCLUDED.wind_dir_deg,
precip_prob_pct = EXCLUDED.precip_prob_pct,
pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100
`, stationID, issuedAt, forecastTime,
rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100)
return err
}
// 新增支持自定义provider的upsert
func upsertForecastWithProvider(ctx context.Context, db *sql.DB, stationID, provider string, issuedAt, forecastTime time.Time,
rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100 int64,
) error {
// 调试日志
if provider == "open-meteo_historical" {
log.Printf("写入历史数据: station=%s, time=%s, temp=%.2f, humidity=%d",
stationID, forecastTime.Format("2006-01-02 15:04:05"), float64(tempCx100)/100.0, humidityPct)
}
_, err := db.ExecContext(ctx, `
INSERT INTO forecast_hourly (
station_id, provider, issued_at, forecast_time,
rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000,
wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT (station_id, provider, issued_at, forecast_time)
DO UPDATE SET
rain_mm_x1000 = EXCLUDED.rain_mm_x1000,
temp_c_x100 = EXCLUDED.temp_c_x100,
humidity_pct = EXCLUDED.humidity_pct,
wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000,
wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000,
wind_dir_deg = EXCLUDED.wind_dir_deg,
precip_prob_pct = EXCLUDED.precip_prob_pct,
pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100
`, stationID, provider, issuedAt, forecastTime,
rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100)
return err
}
// RunOpenMeteoHistoricalFetch 拉取指定时间段的历史数据并写入 forecast_hourlyprovider=open-meteo_historical
func RunOpenMeteoHistoricalFetch(ctx context.Context, startDate, endDate string) error {
db := database.GetDB()
stations, err := loadStationsWithLatLon(ctx, db)
if err != nil {
return fmt.Errorf("加载站点失败: %v", err)
}
client := &http.Client{Timeout: 30 * time.Second}
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
log.Printf("开始补完历史数据: %s 到 %s共 %d 个站点", startDate, endDate, len(stations))
for i, s := range stations {
log.Printf("处理站点 %d/%d: %s", i+1, len(stations), s.id)
apiURL := buildOpenMeteoHistoricalURL(s.lat, s.lon, startDate, endDate)
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, apiURL, nil)
resp, err := client.Do(req)
if err != nil {
log.Printf("历史数据请求失败 station=%s err=%v", s.id, err)
continue
}
var data openMeteoResponse
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
resp.Body.Close()
log.Printf("历史数据解码失败 station=%s err=%v", s.id, err)
continue
}
resp.Body.Close()
// 处理并写入forecast_hourly历史
count := 0
issuedAt := time.Now().In(loc)
for i := range data.Hourly.Time {
// 解析时间使用CST时区
t, err := time.ParseInLocation("2006-01-02T15:04", data.Hourly.Time[i], loc)
if err != nil {
log.Printf("解析时间失败: %s, err=%v", data.Hourly.Time[i], err)
continue
}
// 收集并转换与forecast_hourly缩放一致
rainMmX1000 := int64(0)
if i < len(data.Hourly.Rain) {
rainMmX1000 = int64(data.Hourly.Rain[i] * 1000.0)
}
tempCx100 := int64(0)
if i < len(data.Hourly.Temperature) {
tempCx100 = int64(data.Hourly.Temperature[i] * 100.0)
}
humidityPct := int64(0)
if i < len(data.Hourly.Humidity) {
humidityPct = int64(data.Hourly.Humidity[i])
}
wsMsX1000 := int64(0)
if i < len(data.Hourly.WindSpeed) {
wsMsX1000 = int64((data.Hourly.WindSpeed[i] / 3.6) * 1000.0)
}
gustMsX1000 := int64(0) // ERA5此接口未提供阵风置0
wdirDeg := int64(0)
if i < len(data.Hourly.WindDir) {
wdirDeg = int64(data.Hourly.WindDir[i])
}
probPct := int64(0) // 历史无降水概率置0
pressureHpaX100 := int64(0)
if i < len(data.Hourly.SurfacePres) {
pressureHpaX100 = int64(data.Hourly.SurfacePres[i] * 100.0)
}
if err := upsertForecastWithProvider(
ctx, db, s.id, "open-meteo_historical", issuedAt, t,
rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100,
); err != nil {
log.Printf("写入历史forecast失败 station=%s time=%s err=%v", s.id, t.Format(time.RFC3339), err)
} else {
count++
}
}
log.Printf("站点 %s 成功写入 %d 条历史forecast记录", s.id, count)
// 防止请求过频
time.Sleep(100 * time.Millisecond)
}
return nil
}
func buildOpenMeteoHistoricalURL(lat, lon sql.NullFloat64, startDate, endDate string) string {
q := url.Values{}
q.Set("latitude", fmt.Sprintf("%f", lat.Float64))
q.Set("longitude", fmt.Sprintf("%f", lon.Float64))
q.Set("start_date", startDate)
q.Set("end_date", endDate)
q.Set("hourly", "temperature_2m,relative_humidity_2m,surface_pressure,wind_speed_10m,wind_direction_10m,rain")
q.Set("timezone", "Asia/Shanghai")
return "https://archive-api.open-meteo.com/v1/era5?" + q.Encode()
}
func insertHistoricalData(ctx context.Context, db *sql.DB, stationID string, timestamp time.Time,
temp, humidity, pressure, windSpeed, windDir, rainfall *float64) error {
_, err := db.ExecContext(ctx, `
INSERT INTO rs485_weather_data (
station_id, timestamp, temperature, humidity, pressure,
wind_speed, wind_direction, rainfall, raw_data
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (station_id, timestamp) DO UPDATE SET
temperature = EXCLUDED.temperature,
humidity = EXCLUDED.humidity,
pressure = EXCLUDED.pressure,
wind_speed = EXCLUDED.wind_speed,
wind_direction = EXCLUDED.wind_direction,
rainfall = EXCLUDED.rainfall,
raw_data = EXCLUDED.raw_data
`, stationID, timestamp, temp, humidity, pressure, windSpeed, windDir, rainfall,
fmt.Sprintf("open-meteo-historical:%s", timestamp.Format(time.RFC3339)))
return err
}