diff --git a/cmd/weatherstation/main.go b/cmd/weatherstation/main.go index 6801bc7..4430f65 100644 --- a/cmd/weatherstation/main.go +++ b/cmd/weatherstation/main.go @@ -7,6 +7,7 @@ import ( "sync" "time" "weatherstation/internal/database" + "weatherstation/internal/forecast" "weatherstation/internal/selftest" "weatherstation/internal/server" "weatherstation/internal/tools" @@ -25,6 +26,8 @@ func main() { // 自检控制 var noSelftest = flag.Bool("no-selftest", false, "跳过启动自检") var selftestOnly = flag.Bool("selftest_only", false, "仅执行自检后退出") + // 预报抓取 + var forecastOnly = flag.Bool("forecast_only", false, "仅执行一次open-meteo拉取并退出") flag.Parse() // 设置日志 @@ -45,6 +48,15 @@ func main() { } } + // 单次 open-meteo 拉取 + if *forecastOnly { + if err := forecast.RunOpenMeteoFetch(context.Background()); err != nil { + log.Fatalf("open-meteo 拉取失败: %v", err) + } + log.Println("open-meteo 拉取完成") + return + } + // Backfill 调试路径 if *doBackfill { if *bfFrom == "" || *bfTo == "" { diff --git a/internal/forecast/open_meteo.go b/internal/forecast/open_meteo.go new file mode 100644 index 0000000..0a9547d --- /dev/null +++ b/internal/forecast/open_meteo.go @@ -0,0 +1,182 @@ +package forecast + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "log" + "net/http" + "net/url" + "time" + + "weatherstation/internal/database" +) + +type openMeteoResponse struct { + Hourly struct { + Time []string `json:"time"` + Rain []float64 `json:"rain"` + Temperature []float64 `json:"temperature_2m"` + Humidity []float64 `json:"relative_humidity_2m"` + WindSpeed []float64 `json:"wind_speed_10m"` + WindGusts []float64 `json:"wind_gusts_10m"` + WindDir []float64 `json:"wind_direction_10m"` + PrecipProb []float64 `json:"precipitation_probability"` + SurfacePres []float64 `json:"surface_pressure"` + } `json:"hourly"` +} + +// RunOpenMeteoFetch 拉取各站点未来三小时并写入 forecast_hourly +func RunOpenMeteoFetch(ctx context.Context) error { + db := database.GetDB() + stations, err := loadStationsWithLatLon(ctx, db) + if err != nil { + return err + } + + client := &http.Client{Timeout: 15 * time.Second} + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + + issuedAt := time.Now().In(loc) + startHour := issuedAt.Truncate(time.Hour) + targets := []time.Time{startHour.Add(1 * time.Hour), startHour.Add(2 * time.Hour), startHour.Add(3 * time.Hour)} + + for _, s := range stations { + apiURL := buildOpenMeteoURL(s.lat, s.lon) + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, apiURL, nil) + resp, err := client.Do(req) + if err != nil { + log.Printf("open-meteo 请求失败 station=%s err=%v", s.id, err) + continue + } + var data openMeteoResponse + if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { + resp.Body.Close() + log.Printf("open-meteo 解码失败 station=%s err=%v", s.id, err) + continue + } + resp.Body.Close() + + // 建立 time->vals 映射(按CST解析) + table := map[time.Time]struct { + rain float64 + temp float64 + rh float64 + ws float64 + gust float64 + wdir float64 + prob float64 + pres float64 + }{} + for i := range data.Hourly.Time { + t, err := time.ParseInLocation("2006-01-02T15:04", data.Hourly.Time[i], loc) + if err != nil { + continue + } + v := table[t] + if i < len(data.Hourly.Rain) { + v.rain = data.Hourly.Rain[i] + } + if i < len(data.Hourly.Temperature) { + v.temp = data.Hourly.Temperature[i] + } + if i < len(data.Hourly.Humidity) { + v.rh = data.Hourly.Humidity[i] + } + if i < len(data.Hourly.WindSpeed) { + v.ws = data.Hourly.WindSpeed[i] + } + if i < len(data.Hourly.WindGusts) { + v.gust = data.Hourly.WindGusts[i] + } + if i < len(data.Hourly.WindDir) { + v.wdir = data.Hourly.WindDir[i] + } + if i < len(data.Hourly.PrecipProb) { + v.prob = data.Hourly.PrecipProb[i] + } + if i < len(data.Hourly.SurfacePres) { + v.pres = data.Hourly.SurfacePres[i] + } + table[t] = v + } + + for _, ft := range targets { + if v, ok := table[ft]; ok { + if err := upsertForecast(ctx, db, s.id, issuedAt, ft, + int64(v.rain*1000.0), + int64(v.temp*100.0), + int64(v.rh), + int64(v.ws*1000.0), + int64(v.gust*1000.0), + int64(v.wdir), + int64(v.prob), + int64(v.pres*100.0), + ); err != nil { + log.Printf("写入forecast失败 station=%s time=%s err=%v", s.id, ft.Format(time.RFC3339), err) + } + } + } + } + return nil +} + +type station struct { + id string + lat sql.NullFloat64 + lon sql.NullFloat64 +} + +func loadStationsWithLatLon(ctx context.Context, db *sql.DB) ([]station, error) { + rows, err := db.QueryContext(ctx, `SELECT station_id, latitude, longitude FROM stations WHERE latitude IS NOT NULL AND longitude IS NOT NULL`) + if err != nil { + return nil, err + } + defer rows.Close() + var list []station + for rows.Next() { + var s station + if err := rows.Scan(&s.id, &s.lat, &s.lon); err != nil { + continue + } + list = append(list, s) + } + return list, nil +} + +func buildOpenMeteoURL(lat, lon sql.NullFloat64) string { + q := url.Values{} + q.Set("latitude", fmt.Sprintf("%f", lat.Float64)) + q.Set("longitude", fmt.Sprintf("%f", lon.Float64)) + q.Set("hourly", "rain,temperature_2m,relative_humidity_2m,wind_speed_10m,wind_gusts_10m,wind_direction_10m,precipitation_probability,surface_pressure") + q.Set("timezone", "Asia/Shanghai") + return "https://api.open-meteo.com/v1/forecast?" + q.Encode() +} + +func upsertForecast(ctx context.Context, db *sql.DB, stationID string, issuedAt, forecastTime time.Time, + rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100 int64, +) error { + _, err := db.ExecContext(ctx, ` + INSERT INTO forecast_hourly ( + station_id, provider, issued_at, forecast_time, + rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, + wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100 + ) VALUES ($1, 'open-meteo', $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT (station_id, provider, issued_at, forecast_time) + DO UPDATE SET + rain_mm_x1000 = EXCLUDED.rain_mm_x1000, + temp_c_x100 = EXCLUDED.temp_c_x100, + humidity_pct = EXCLUDED.humidity_pct, + wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000, + wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000, + wind_dir_deg = EXCLUDED.wind_dir_deg, + precip_prob_pct = EXCLUDED.precip_prob_pct, + pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100 + `, stationID, issuedAt, forecastTime, + rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100) + return err +} diff --git a/internal/server/udp.go b/internal/server/udp.go index d0a50b4..6c26398 100644 --- a/internal/server/udp.go +++ b/internal/server/udp.go @@ -15,6 +15,7 @@ import ( "time" "unicode/utf8" "weatherstation/internal/config" + "weatherstation/internal/forecast" "weatherstation/internal/tools" "weatherstation/model" ) @@ -144,6 +145,20 @@ func StartUDPServer() error { } }() + // 后台定时:每小时拉取open-meteo(全站) + go func() { + for { + now := time.Now() + next := now.Truncate(time.Hour).Add(time.Hour) + time.Sleep(time.Until(next)) + if err := forecast.RunOpenMeteoFetch(context.Background()); err != nil { + log.Printf("open-meteo 定时拉取失败: %v", err) + } else { + log.Printf("open-meteo 定时拉取完成") + } + } + }() + for { n, addr, err := conn.ReadFrom(buffer) if err != nil {