599 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package radar
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
neturl "net/url"
"os"
"path/filepath"
"regexp"
"strings"
"time"
"weatherstation/internal/config"
"weatherstation/internal/database"
)
// Options controls the radar scheduler behavior.
type Options struct {
Enable bool
OutputDir string
Delay time.Duration // time after each 6-minute boundary to trigger download
BaseURL string // optional: where to download from (template-based)
MaxRetries int
// Tile indices (4326 pyramid). Defaults: z=7,y=40,x=102 (Nanning region example)
Z int
Y int
X int
// StoreToDB controls whether to store fetched tiles into PostgreSQL `radar_tiles`.
StoreToDB bool
}
// Start starts the radar download scheduler. It reads options from env if zero value provided.
// Env vars:
//
// RADAR_ENABLED=true|false (default: true)
// RADAR_DIR=radar_data (default)
// RADAR_DELAY_SEC=120 (2 minutes; trigger after each boundary)
// RADAR_MAX_RETRIES=2
// RADAR_BASE_URL=<template URL, optional>
func Start(ctx context.Context, opts Options) error {
if !opts.Enable && !envEnabledDefaultTrue() {
log.Println("[radar] scheduler disabled")
return nil
}
if opts.OutputDir == "" {
opts.OutputDir = getenvDefault("RADAR_DIR", "radar_data")
}
// Delay 不再用于 10 分钟调度流程,这里保留读取但不使用
if opts.Delay == 0 {
delaySec := getenvIntDefault("RADAR_DELAY_SEC", 0)
opts.Delay = time.Duration(delaySec) * time.Second
}
if opts.MaxRetries == 0 {
opts.MaxRetries = getenvIntDefault("RADAR_MAX_RETRIES", 2)
}
if opts.BaseURL == "" {
// Default to CMA image server tiles
// Placeholders: %Y %m %d %H %M {z} {y} {x}
opts.BaseURL = getenvDefault("RADAR_BASE_URL", "https://image.data.cma.cn/tiles/China/RADAR_L3_MST_CREF_GISJPG_Tiles_CR/%Y%m%d/%H/%M/{z}/{y}/{x}.bin")
}
if opts.Z == 0 && opts.Y == 0 && opts.X == 0 {
// Default tile requested
opts.Z, opts.Y, opts.X = getenvIntDefault("RADAR_Z", 7), getenvIntDefault("RADAR_Y", 40), getenvIntDefault("RADAR_X", 102)
}
if err := os.MkdirAll(opts.OutputDir, 0o755); err != nil {
return fmt.Errorf("create radar output dir: %w", err)
}
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
// 先立即执行一次(不延迟):拉取一次瓦片并抓取一次彩云实况
go func() {
if err := runOnceFromNMC(ctx, opts); err != nil {
log.Printf("[radar] first run error: %v", err)
}
}()
// 瓦片每3分钟查询一次
go loop3(ctx, loc, opts)
// 实况:按配置开关运行(默认关闭)
rtEnabled := config.GetConfig().Radar.RealtimeEnabled
rtMin := config.GetConfig().Radar.RealtimeIntervalMinutes
if rtEnabled {
if rtMin != 10 && rtMin != 30 && rtMin != 60 {
rtMin = 10
}
go loopRealtime(ctx, loc, opts, time.Duration(rtMin)*time.Minute)
}
if rtEnabled {
log.Printf("[radar] scheduler started (tiles=3m, realtime=%dm, dir=%s, tile=%d/%d/%d)", rtMin, opts.OutputDir, opts.Z, opts.Y, opts.X)
} else {
log.Printf("[radar] scheduler started (tiles=3m, realtime=disabled, dir=%s, tile=%d/%d/%d)", opts.OutputDir, opts.Z, opts.Y, opts.X)
}
return nil
}
// loopRealtime 周期性拉取彩云实况,按 interval 对齐边界运行
func loopRealtime(ctx context.Context, loc *time.Location, opts Options, interval time.Duration) {
for {
if ctx.Err() != nil {
return
}
now := time.Now().In(loc)
// 对齐到 interval 边界
runAt := roundDownN(now, interval).Add(interval)
sleep := time.Until(runAt)
if sleep < 0 {
sleep = 0
}
timer := time.NewTimer(sleep)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
if err := runRealtimeFromCaiyun(ctx); err != nil {
log.Printf("[radar] realtime run error: %v", err)
}
}
}
}
// 每3分钟的瓦片轮询
func loop3(ctx context.Context, loc *time.Location, opts Options) {
for {
if ctx.Err() != nil {
return
}
now := time.Now().In(loc)
runAt := roundDownN(now, 3*time.Minute).Add(3 * time.Minute)
sleep := time.Until(runAt)
if sleep < 0 {
sleep = 0
}
timer := time.NewTimer(sleep)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
if err := runTilesFromNMC(ctx, opts); err != nil {
log.Printf("[radar] tiles run error: %v", err)
}
}
}
}
func tryStartupCatchup(ctx context.Context, loc *time.Location, opts Options) {
now := time.Now().In(loc)
lastSlot := roundDown6(now).Add(-opts.Delay)
_ = downloadForSlot(ctx, lastSlot, opts)
}
// roundDown6 returns t truncated to the nearest lower multiple of 6 minutes.
func roundDown6(t time.Time) time.Time {
// Truncate supports arbitrary durations.
return t.Truncate(6 * time.Minute)
}
func roundDownN(t time.Time, d time.Duration) time.Time {
return t.Truncate(d)
}
// downloadForSlot performs the actual download for the given nominal run time.
// It constructs a filename like: radar_COMP_20060102_1504.png under opts.OutputDir.
// If RADAR_BASE_URL is provided, it's treated as a format string with Go time
// layout tokens, e.g. https://example/COMP/%Y/%m/%d/%H%M.png (Go layout applied).
func downloadForSlot(ctx context.Context, runAt time.Time, opts Options) error {
// Determine the product nominal time: align to boundary (6-minute steps)
slot := roundDown6(runAt)
fname := fmt.Sprintf("radar_z%d_y%d_x%d_%s.bin", opts.Z, opts.Y, opts.X, slot.Format("20060102_1504"))
dest := filepath.Join(opts.OutputDir, fname)
// If file already exists, skip.
if _, err := os.Stat(dest); err == nil {
return nil
}
if opts.BaseURL == "" {
// No remote configured: create a placeholder to prove scheduling works.
content := []byte(fmt.Sprintf("placeholder for %s\n", slot.Format(time.RFC3339)))
if err := os.WriteFile(dest, content, 0o644); err != nil {
return fmt.Errorf("write placeholder: %w", err)
}
log.Printf("[radar] wrote placeholder %s", dest)
return nil
}
// Convert a possibly strftime-like template to Go layout tokens.
url := buildURLFromTemplate(opts.BaseURL, slot, opts.Z, opts.Y, opts.X)
// HTTP GET with timeout.
client := &http.Client{Timeout: 20 * time.Second}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("build request: %w", err)
}
// CMA requires referer/origin headers typically
req.Header.Set("Referer", "https://data.cma.cn/")
req.Header.Set("Origin", "https://data.cma.cn")
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36")
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("http get: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status: %d", resp.StatusCode)
}
// Write to temp then rename.
tmp := dest + ".part"
f, err := os.Create(tmp)
if err != nil {
return fmt.Errorf("create temp: %w", err)
}
_, copyErr := io.Copy(f, resp.Body)
closeErr := f.Close()
if copyErr != nil {
_ = os.Remove(tmp)
return fmt.Errorf("write body: %w", copyErr)
}
if closeErr != nil {
_ = os.Remove(tmp)
return fmt.Errorf("close temp: %w", closeErr)
}
if err := os.Rename(tmp, dest); err != nil {
// If cross-device rename fails, fallback to copy
if !errors.Is(err, os.ErrInvalid) {
return fmt.Errorf("rename: %w", err)
}
data, rerr := os.ReadFile(tmp)
if rerr != nil {
return fmt.Errorf("read temp for copy: %w", rerr)
}
if werr := os.WriteFile(dest, data, 0o644); werr != nil {
return fmt.Errorf("write final: %w", werr)
}
_ = os.Remove(tmp)
}
log.Printf("[radar] saved %s (url=%s)", dest, url)
// Optionally store to DB
if opts.StoreToDB {
// Read the just-written bytes to pass to DB store; alternatively stream earlier
b, rerr := os.ReadFile(dest)
if rerr != nil {
return fmt.Errorf("read saved tile for DB: %w", rerr)
}
if err := StoreTileBytes(ctx, url, b); err != nil {
return fmt.Errorf("store tile to DB: %w", err)
}
log.Printf("[radar] stored to DB: z=%d y=%d x=%d t=%s", opts.Z, opts.Y, opts.X, slot.Format("2006-01-02 15:04"))
}
return nil
}
func buildURLFromTemplate(tpl string, t time.Time, z, y, x int) string {
// Support a minimal subset of strftime tokens to Go layout.
repl := map[string]string{
"%Y": "2006",
"%m": "01",
"%d": "02",
"%H": "15",
"%M": "04",
}
out := tpl
for k, v := range repl {
out = strings.ReplaceAll(out, k, t.Format(v))
}
// Replace index placeholders
out = strings.ReplaceAll(out, "{z}", fmt.Sprintf("%d", z))
out = strings.ReplaceAll(out, "{y}", fmt.Sprintf("%d", y))
out = strings.ReplaceAll(out, "{x}", fmt.Sprintf("%d", x))
return out
}
// ------------------- NMC -> CMA pipeline -------------------
type nmcRadar struct {
Title string `json:"title"`
Image string `json:"image"`
URL string `json:"url"`
}
type nmcResp struct {
Radar nmcRadar `json:"radar"`
}
var reDigits17 = regexp.MustCompile(`([0-9]{17})`)
// runOnceFromNMC fetches NMC JSON, extracts timestamp, shifts +8h, then downloads CMA tile for opts.Z/Y/X.
func runOnceFromNMC(ctx context.Context, opts Options) error {
if err := runTilesFromNMC(ctx, opts); err != nil {
return err
}
if config.GetConfig().Radar.RealtimeEnabled {
return runRealtimeFromCaiyun(ctx)
}
return nil
}
// 仅瓦片下载:查询 NMC解析时间按该时刻下载 CMA 瓦片若DB已存在则跳过
func runTilesFromNMC(ctx context.Context, opts Options) error {
// 1) Fetch NMC JSON
api := "https://www.nmc.cn/rest/weather?stationid=Wqsps"
client := &http.Client{Timeout: 15 * time.Second}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, api, nil)
if err != nil {
return fmt.Errorf("nmc request: %w", err)
}
req.Header.Set("Referer", "https://www.nmc.cn/")
req.Header.Set("User-Agent", "Mozilla/5.0")
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("nmc get: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("nmc status: %d", resp.StatusCode)
}
// 仅从 data.radar.image 读取
var top struct {
Data struct {
Radar nmcRadar `json:"radar"`
} `json:"data"`
}
if err := json.NewDecoder(resp.Body).Decode(&top); err != nil {
return fmt.Errorf("nmc decode: %w", err)
}
img := top.Data.Radar.Image
if img == "" {
return fmt.Errorf("nmc data.radar.image empty")
}
// 2) Extract filename and 17-digit timestamp from image
// Example: /product/2025/09/23/RDCP/SEVP_AOC_RDCP_SLDAS3_ECREF_ANCN_L88_PI_20250923033600000.PNG?v=...
// filename: SEVP_AOC_RDCP_SLDAS3_ECREF_ANCN_L88_PI_20250923033600000.PNG
// digits: 20250923033600000 -> use first 12 as yyyyMMddHHmm
if u, err := neturl.Parse(img); err == nil { // strip query if present
img = u.Path
}
parts := strings.Split(img, "/")
fname := parts[len(parts)-1]
digits := reDigits17.FindString(fname)
if digits == "" {
return fmt.Errorf("no 17-digit timestamp in %s", fname)
}
// Parse yyyyMMddHHmm from first 12 digits as UTC, then +8h
utc12 := digits[:12]
utcT, err := time.ParseInLocation("200601021504", utc12, time.UTC)
if err != nil {
return fmt.Errorf("parse utc time: %w", err)
}
local := utcT.Add(8 * time.Hour)
// 3) Build CMA tile URL(s) for fixed z/y/x
dateStr := local.Format("20060102")
hh := local.Format("15")
mm := local.Format("04")
// Prepare tile list: primary (opts or default Nanning) + Guangzhou (7/40/104) + Wuhan (7/42/104)
z, y, x := opts.Z, opts.Y, opts.X
if z == 0 && y == 0 && x == 0 {
z, y, x = 7, 40, 102
}
type tcoord struct{ z, y, x int }
tiles := []tcoord{{z, y, x}, {7, 40, 104}, {7, 42, 104}}
// de-duplicate if same
seen := map[string]bool{}
for _, tc := range tiles {
key := fmt.Sprintf("%d/%d/%d", tc.z, tc.y, tc.x)
if seen[key] {
continue
}
seen[key] = true
if err := downloadAndStoreTile(ctx, local, dateStr, hh, mm, tc.z, tc.y, tc.x, opts); err != nil {
log.Printf("[radar] download/store %s failed: %v", key, err)
}
}
return nil
}
// 仅彩云实况10分钟一次
func runRealtimeFromCaiyun(ctx context.Context) error {
// 1) 配置中的别名列表
cfg := config.GetConfig()
for _, a := range cfg.Radar.Aliases {
if err := fetchAndStoreRadarRealtimeFor(ctx, a.Alias, a.Lat, a.Lon); err != nil {
log.Printf("[radar] realtime(alias=%s) failed: %v", a.Alias, err)
}
}
// 2) WH65LP 设备批量
token := os.Getenv("CAIYUN_TOKEN")
if token == "" {
token = cfg.Forecast.CaiyunToken
}
if token == "" {
log.Printf("[radar] skip station realtime: missing CAIYUN_TOKEN")
return nil
}
coords, err := database.ListWH65LPStationsWithLatLon(ctx, database.GetDB())
if err != nil {
log.Printf("[radar] list WH65LP stations failed: %v", err)
return nil
}
for _, s := range coords {
if err := fetchAndStoreRadarRealtimeFor(ctx, s.StationID, s.Lat, s.Lon); err != nil {
log.Printf("[radar] realtime(station=%s) failed: %v", s.StationID, err)
}
}
return nil
}
func downloadAndStoreTile(ctx context.Context, local time.Time, dateStr, hh, mm string, z, y, x int, opts Options) error {
url := fmt.Sprintf("https://image.data.cma.cn/tiles/China/RADAR_L3_MST_CREF_GISJPG_Tiles_CR/%s/%s/%s/%d/%d/%d.bin", dateStr, hh, mm, z, y, x)
// 若数据库已有该瓦片则跳过
if ref, err := ParseCMATileURL(url); err == nil {
exists, err := database.HasRadarTile(ctx, database.GetDB(), ref.Product, ref.DT, ref.Z, ref.Y, ref.X)
if err != nil {
return err
}
if exists {
log.Printf("[radar] skip existing tile in DB: %s %s z=%d y=%d x=%d", ref.Product, ref.DT.Format("2006-01-02 15:04"), ref.Z, ref.Y, ref.X)
return nil
}
}
fnameOut := fmt.Sprintf("radar_z%d_y%d_x%d_%s.bin", z, y, x, local.Format("20060102_1504"))
dest := filepath.Join(opts.OutputDir, fnameOut)
if _, err := os.Stat(dest); err == nil {
return nil // already exists
}
if err := httpDownloadTo(ctx, url, dest); err != nil {
return err
}
log.Printf("[radar] saved %s (url=%s)", dest, url)
if opts.StoreToDB {
b, rerr := os.ReadFile(dest)
if rerr != nil {
return fmt.Errorf("read saved tile: %w", rerr)
}
if err := StoreTileBytes(ctx, url, b); err != nil {
return fmt.Errorf("store tile db: %w", err)
}
log.Printf("[radar] stored to DB: %s", fnameOut)
}
return nil
}
func httpDownloadTo(ctx context.Context, url, dest string) error {
client := &http.Client{Timeout: 20 * time.Second}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("build request: %w", err)
}
req.Header.Set("Referer", "https://data.cma.cn/")
req.Header.Set("Origin", "https://data.cma.cn")
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36")
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("http get: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status: %d", resp.StatusCode)
}
tmp := dest + ".part"
f, err := os.Create(tmp)
if err != nil {
return fmt.Errorf("create temp: %w", err)
}
_, copyErr := io.Copy(f, resp.Body)
closeErr := f.Close()
if copyErr != nil {
_ = os.Remove(tmp)
return fmt.Errorf("write body: %w", copyErr)
}
if closeErr != nil {
_ = os.Remove(tmp)
return fmt.Errorf("close temp: %w", closeErr)
}
if err := os.Rename(tmp, dest); err != nil {
return fmt.Errorf("rename: %w", err)
}
return nil
}
//
// fetchAndStoreRadarRealtime calls Caiyun realtime API for the Nanning radar station
// and stores selected fields into table `radar_weather` with 10-minute bucketed dt.
func fetchAndStoreRadarRealtimeFor(ctx context.Context, alias string, lat, lon float64) error {
// Token: prefer env CAIYUN_TOKEN, else config
token := os.Getenv("CAIYUN_TOKEN")
if token == "" {
token = config.GetConfig().Forecast.CaiyunToken
}
if token == "" {
return fmt.Errorf("missing CAIYUN_TOKEN for Caiyun realtime API")
}
// Build URL: lon,lat order; metric units
api := fmt.Sprintf("https://api.caiyunapp.com/v2.6/%s/%.6f,%.6f/realtime?lang=zh_CN&unit=metric", token, lon, lat)
client := &http.Client{Timeout: 12 * time.Second}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, api, nil)
if err != nil {
return fmt.Errorf("build realtime request: %w", err)
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("realtime http get: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("realtime status: %d", resp.StatusCode)
}
var payload struct {
Status string `json:"status"`
Result struct {
Realtime struct {
Temperature float64 `json:"temperature"`
Humidity float64 `json:"humidity"`
Cloudrate float64 `json:"cloudrate"`
Visibility float64 `json:"visibility"`
Dswrf float64 `json:"dswrf"`
Wind struct {
Speed float64 `json:"speed"`
Direction float64 `json:"direction"`
} `json:"wind"`
Pressure float64 `json:"pressure"`
} `json:"realtime"`
} `json:"result"`
}
if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
return fmt.Errorf("decode realtime: %w", err)
}
if payload.Status != "ok" {
return fmt.Errorf("realtime api status=%s", payload.Status)
}
// Align to configured bucket in Asia/Shanghai
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
bucketMin := config.GetConfig().Radar.RealtimeIntervalMinutes
if bucketMin != 10 && bucketMin != 30 && bucketMin != 60 {
bucketMin = 10
}
dt := roundDownN(time.Now().In(loc), time.Duration(bucketMin)*time.Minute)
// Store
db := database.GetDB()
rt := payload.Result.Realtime
// Caiyun wind.speed is in km/h under metric; convert to m/s for storage/display
windSpeedMS := rt.Wind.Speed / 3.6
return database.UpsertRadarWeather(ctx, db, alias, lat, lon, dt,
rt.Temperature, rt.Humidity, rt.Cloudrate, rt.Visibility, rt.Dswrf,
windSpeedMS, rt.Wind.Direction, rt.Pressure,
)
}
func envEnabledDefaultTrue() bool {
v := strings.ToLower(strings.TrimSpace(os.Getenv("RADAR_ENABLED")))
if v == "" {
return true
}
return v == "1" || v == "true" || v == "yes" || v == "on"
}
func getenvDefault(key, def string) string {
v := os.Getenv(key)
if v == "" {
return def
}
return v
}
func getenvIntDefault(key string, def int) int {
v := os.Getenv(key)
if v == "" {
return def
}
var n int
_, err := fmt.Sscanf(v, "%d", &n)
if err != nil {
return def
}
return n
}