feat: 接入雨量预测
This commit is contained in:
parent
e4b1c19064
commit
753d4dcbc7
@ -7,6 +7,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
"weatherstation/internal/database"
|
"weatherstation/internal/database"
|
||||||
|
"weatherstation/internal/forecast"
|
||||||
"weatherstation/internal/selftest"
|
"weatherstation/internal/selftest"
|
||||||
"weatherstation/internal/server"
|
"weatherstation/internal/server"
|
||||||
"weatherstation/internal/tools"
|
"weatherstation/internal/tools"
|
||||||
@ -25,6 +26,8 @@ func main() {
|
|||||||
// 自检控制
|
// 自检控制
|
||||||
var noSelftest = flag.Bool("no-selftest", false, "跳过启动自检")
|
var noSelftest = flag.Bool("no-selftest", false, "跳过启动自检")
|
||||||
var selftestOnly = flag.Bool("selftest_only", false, "仅执行自检后退出")
|
var selftestOnly = flag.Bool("selftest_only", false, "仅执行自检后退出")
|
||||||
|
// 预报抓取
|
||||||
|
var forecastOnly = flag.Bool("forecast_only", false, "仅执行一次open-meteo拉取并退出")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
// 设置日志
|
// 设置日志
|
||||||
@ -45,6 +48,15 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 单次 open-meteo 拉取
|
||||||
|
if *forecastOnly {
|
||||||
|
if err := forecast.RunOpenMeteoFetch(context.Background()); err != nil {
|
||||||
|
log.Fatalf("open-meteo 拉取失败: %v", err)
|
||||||
|
}
|
||||||
|
log.Println("open-meteo 拉取完成")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Backfill 调试路径
|
// Backfill 调试路径
|
||||||
if *doBackfill {
|
if *doBackfill {
|
||||||
if *bfFrom == "" || *bfTo == "" {
|
if *bfFrom == "" || *bfTo == "" {
|
||||||
|
|||||||
182
internal/forecast/open_meteo.go
Normal file
182
internal/forecast/open_meteo.go
Normal file
@ -0,0 +1,182 @@
|
|||||||
|
package forecast
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"weatherstation/internal/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
type openMeteoResponse struct {
|
||||||
|
Hourly struct {
|
||||||
|
Time []string `json:"time"`
|
||||||
|
Rain []float64 `json:"rain"`
|
||||||
|
Temperature []float64 `json:"temperature_2m"`
|
||||||
|
Humidity []float64 `json:"relative_humidity_2m"`
|
||||||
|
WindSpeed []float64 `json:"wind_speed_10m"`
|
||||||
|
WindGusts []float64 `json:"wind_gusts_10m"`
|
||||||
|
WindDir []float64 `json:"wind_direction_10m"`
|
||||||
|
PrecipProb []float64 `json:"precipitation_probability"`
|
||||||
|
SurfacePres []float64 `json:"surface_pressure"`
|
||||||
|
} `json:"hourly"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunOpenMeteoFetch 拉取各站点未来三小时并写入 forecast_hourly
|
||||||
|
func RunOpenMeteoFetch(ctx context.Context) error {
|
||||||
|
db := database.GetDB()
|
||||||
|
stations, err := loadStationsWithLatLon(ctx, db)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
client := &http.Client{Timeout: 15 * time.Second}
|
||||||
|
loc, _ := time.LoadLocation("Asia/Shanghai")
|
||||||
|
if loc == nil {
|
||||||
|
loc = time.FixedZone("CST", 8*3600)
|
||||||
|
}
|
||||||
|
|
||||||
|
issuedAt := time.Now().In(loc)
|
||||||
|
startHour := issuedAt.Truncate(time.Hour)
|
||||||
|
targets := []time.Time{startHour.Add(1 * time.Hour), startHour.Add(2 * time.Hour), startHour.Add(3 * time.Hour)}
|
||||||
|
|
||||||
|
for _, s := range stations {
|
||||||
|
apiURL := buildOpenMeteoURL(s.lat, s.lon)
|
||||||
|
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, apiURL, nil)
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("open-meteo 请求失败 station=%s err=%v", s.id, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var data openMeteoResponse
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
|
||||||
|
resp.Body.Close()
|
||||||
|
log.Printf("open-meteo 解码失败 station=%s err=%v", s.id, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
|
||||||
|
// 建立 time->vals 映射(按CST解析)
|
||||||
|
table := map[time.Time]struct {
|
||||||
|
rain float64
|
||||||
|
temp float64
|
||||||
|
rh float64
|
||||||
|
ws float64
|
||||||
|
gust float64
|
||||||
|
wdir float64
|
||||||
|
prob float64
|
||||||
|
pres float64
|
||||||
|
}{}
|
||||||
|
for i := range data.Hourly.Time {
|
||||||
|
t, err := time.ParseInLocation("2006-01-02T15:04", data.Hourly.Time[i], loc)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
v := table[t]
|
||||||
|
if i < len(data.Hourly.Rain) {
|
||||||
|
v.rain = data.Hourly.Rain[i]
|
||||||
|
}
|
||||||
|
if i < len(data.Hourly.Temperature) {
|
||||||
|
v.temp = data.Hourly.Temperature[i]
|
||||||
|
}
|
||||||
|
if i < len(data.Hourly.Humidity) {
|
||||||
|
v.rh = data.Hourly.Humidity[i]
|
||||||
|
}
|
||||||
|
if i < len(data.Hourly.WindSpeed) {
|
||||||
|
v.ws = data.Hourly.WindSpeed[i]
|
||||||
|
}
|
||||||
|
if i < len(data.Hourly.WindGusts) {
|
||||||
|
v.gust = data.Hourly.WindGusts[i]
|
||||||
|
}
|
||||||
|
if i < len(data.Hourly.WindDir) {
|
||||||
|
v.wdir = data.Hourly.WindDir[i]
|
||||||
|
}
|
||||||
|
if i < len(data.Hourly.PrecipProb) {
|
||||||
|
v.prob = data.Hourly.PrecipProb[i]
|
||||||
|
}
|
||||||
|
if i < len(data.Hourly.SurfacePres) {
|
||||||
|
v.pres = data.Hourly.SurfacePres[i]
|
||||||
|
}
|
||||||
|
table[t] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ft := range targets {
|
||||||
|
if v, ok := table[ft]; ok {
|
||||||
|
if err := upsertForecast(ctx, db, s.id, issuedAt, ft,
|
||||||
|
int64(v.rain*1000.0),
|
||||||
|
int64(v.temp*100.0),
|
||||||
|
int64(v.rh),
|
||||||
|
int64(v.ws*1000.0),
|
||||||
|
int64(v.gust*1000.0),
|
||||||
|
int64(v.wdir),
|
||||||
|
int64(v.prob),
|
||||||
|
int64(v.pres*100.0),
|
||||||
|
); err != nil {
|
||||||
|
log.Printf("写入forecast失败 station=%s time=%s err=%v", s.id, ft.Format(time.RFC3339), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type station struct {
|
||||||
|
id string
|
||||||
|
lat sql.NullFloat64
|
||||||
|
lon sql.NullFloat64
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadStationsWithLatLon(ctx context.Context, db *sql.DB) ([]station, error) {
|
||||||
|
rows, err := db.QueryContext(ctx, `SELECT station_id, latitude, longitude FROM stations WHERE latitude IS NOT NULL AND longitude IS NOT NULL`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var list []station
|
||||||
|
for rows.Next() {
|
||||||
|
var s station
|
||||||
|
if err := rows.Scan(&s.id, &s.lat, &s.lon); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
list = append(list, s)
|
||||||
|
}
|
||||||
|
return list, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildOpenMeteoURL(lat, lon sql.NullFloat64) string {
|
||||||
|
q := url.Values{}
|
||||||
|
q.Set("latitude", fmt.Sprintf("%f", lat.Float64))
|
||||||
|
q.Set("longitude", fmt.Sprintf("%f", lon.Float64))
|
||||||
|
q.Set("hourly", "rain,temperature_2m,relative_humidity_2m,wind_speed_10m,wind_gusts_10m,wind_direction_10m,precipitation_probability,surface_pressure")
|
||||||
|
q.Set("timezone", "Asia/Shanghai")
|
||||||
|
return "https://api.open-meteo.com/v1/forecast?" + q.Encode()
|
||||||
|
}
|
||||||
|
|
||||||
|
func upsertForecast(ctx context.Context, db *sql.DB, stationID string, issuedAt, forecastTime time.Time,
|
||||||
|
rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100 int64,
|
||||||
|
) error {
|
||||||
|
_, err := db.ExecContext(ctx, `
|
||||||
|
INSERT INTO forecast_hourly (
|
||||||
|
station_id, provider, issued_at, forecast_time,
|
||||||
|
rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000,
|
||||||
|
wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100
|
||||||
|
) VALUES ($1, 'open-meteo', $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
||||||
|
ON CONFLICT (station_id, provider, issued_at, forecast_time)
|
||||||
|
DO UPDATE SET
|
||||||
|
rain_mm_x1000 = EXCLUDED.rain_mm_x1000,
|
||||||
|
temp_c_x100 = EXCLUDED.temp_c_x100,
|
||||||
|
humidity_pct = EXCLUDED.humidity_pct,
|
||||||
|
wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000,
|
||||||
|
wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000,
|
||||||
|
wind_dir_deg = EXCLUDED.wind_dir_deg,
|
||||||
|
precip_prob_pct = EXCLUDED.precip_prob_pct,
|
||||||
|
pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100
|
||||||
|
`, stationID, issuedAt, forecastTime,
|
||||||
|
rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100)
|
||||||
|
return err
|
||||||
|
}
|
||||||
@ -15,6 +15,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
"weatherstation/internal/config"
|
"weatherstation/internal/config"
|
||||||
|
"weatherstation/internal/forecast"
|
||||||
"weatherstation/internal/tools"
|
"weatherstation/internal/tools"
|
||||||
"weatherstation/model"
|
"weatherstation/model"
|
||||||
)
|
)
|
||||||
@ -144,6 +145,20 @@ func StartUDPServer() error {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// 后台定时:每小时拉取open-meteo(全站)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
now := time.Now()
|
||||||
|
next := now.Truncate(time.Hour).Add(time.Hour)
|
||||||
|
time.Sleep(time.Until(next))
|
||||||
|
if err := forecast.RunOpenMeteoFetch(context.Background()); err != nil {
|
||||||
|
log.Printf("open-meteo 定时拉取失败: %v", err)
|
||||||
|
} else {
|
||||||
|
log.Printf("open-meteo 定时拉取完成")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
n, addr, err := conn.ReadFrom(buffer)
|
n, addr, err := conn.ReadFrom(buffer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user