267 lines
7.6 KiB
Go
267 lines
7.6 KiB
Go
package rain
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"net/http"
|
||
"os"
|
||
"path/filepath"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"weatherstation/internal/database"
|
||
)
|
||
|
||
// Options controls the rain (CMPA hourly precip) scheduler behavior.
|
||
type Options struct {
|
||
Enable bool
|
||
OutputDir string
|
||
BaseURL string // template with %Y%m%d/%H/%M and {z}/{y}/{x}; time is UTC
|
||
MaxRetries int
|
||
StoreToDB bool
|
||
Tiles [][3]int // list of (z,y,x); defaults to [[7,40,102],[7,40,104]]
|
||
}
|
||
|
||
// Start starts the CMPA hourly rain tile downloader.
|
||
// Runs every 10 minutes aligned to 10-minute boundaries. For each tick at local time T,
|
||
// constructs the slot as floor_to_hour(T) - 1h (last completed hour), converts to UTC
|
||
// (slot_utc = slot_local - 8h), and downloads 0-minute tile for each configured (z,y,x).
|
||
func Start(ctx context.Context, opts Options) error {
|
||
if !opts.Enable && !envEnabledDefaultTrue() {
|
||
log.Println("[rain] scheduler disabled")
|
||
return nil
|
||
}
|
||
if opts.OutputDir == "" {
|
||
if v := os.Getenv("RAIN_DIR"); v != "" {
|
||
opts.OutputDir = v
|
||
} else {
|
||
exe, _ := os.Executable()
|
||
exeDir := filepath.Dir(exe)
|
||
opts.OutputDir = filepath.Join(exeDir, "rain_data")
|
||
}
|
||
}
|
||
if opts.MaxRetries == 0 {
|
||
opts.MaxRetries = getenvIntDefault("RAIN_MAX_RETRIES", 2)
|
||
}
|
||
if opts.BaseURL == "" {
|
||
opts.BaseURL = getenvDefault("RAIN_BASE_URL", "https://image.data.cma.cn/tiles/China/CMPA_RT_China_0P01_HOR-PRE_GISJPG_Tiles/%Y%m%d/%H/%M/{z}/{y}/{x}.bin")
|
||
}
|
||
if len(opts.Tiles) == 0 {
|
||
opts.Tiles = [][3]int{{7, 40, 102}, {7, 40, 104}}
|
||
}
|
||
if err := os.MkdirAll(opts.OutputDir, 0o755); err != nil {
|
||
return fmt.Errorf("create rain output dir: %w", err)
|
||
}
|
||
|
||
loc, _ := time.LoadLocation("Asia/Shanghai")
|
||
if loc == nil {
|
||
loc = time.FixedZone("CST", 8*3600)
|
||
}
|
||
|
||
// immediate first run
|
||
go func() {
|
||
if err := runOnce(ctx, opts, loc); err != nil {
|
||
log.Printf("[rain] first run error: %v", err)
|
||
}
|
||
}()
|
||
|
||
// every 10 minutes
|
||
go func() {
|
||
for {
|
||
if ctx.Err() != nil {
|
||
return
|
||
}
|
||
now := time.Now().In(loc)
|
||
runAt := roundDownN(now, 10*time.Minute).Add(10 * time.Minute)
|
||
sleep := time.Until(runAt)
|
||
if sleep < 0 {
|
||
sleep = 0
|
||
}
|
||
timer := time.NewTimer(sleep)
|
||
select {
|
||
case <-ctx.Done():
|
||
timer.Stop()
|
||
return
|
||
case <-timer.C:
|
||
if err := runOnce(ctx, opts, loc); err != nil {
|
||
log.Printf("[rain] run error: %v", err)
|
||
}
|
||
}
|
||
}
|
||
}()
|
||
|
||
log.Printf("[rain] scheduler started (10m, dir=%s, tiles=%d)", opts.OutputDir, len(opts.Tiles))
|
||
return nil
|
||
}
|
||
|
||
func runOnce(ctx context.Context, opts Options, loc *time.Location) error {
|
||
// target hour: current hour at 00 (floor_to_hour(now))
|
||
// e.g., 10:15 -> 10:00; if尚未发布则下载可能失败,等待下一次10分钟重试
|
||
now := time.Now().In(loc)
|
||
slotLocal := now.Truncate(time.Hour)
|
||
// UTC for URL path
|
||
slotUTC := slotLocal.Add(-8 * time.Hour).In(time.UTC)
|
||
log.Printf("[rain] tick target hour: local=%s (CST), utc=%s (UTC)", slotLocal.Format("2006-01-02 15:04"), slotUTC.Format("2006-01-02 15:04"))
|
||
dateStr := slotUTC.Format("20060102")
|
||
hh := slotUTC.Format("15")
|
||
mm := "00" // hourly product uses minute 00
|
||
|
||
for _, t := range opts.Tiles {
|
||
z, y, x := t[0], t[1], t[2]
|
||
if err := downloadAndStoreTile(ctx, slotLocal, dateStr, hh, mm, z, y, x, opts); err != nil {
|
||
log.Printf("[rain] download/store z=%d y=%d x=%d failed: %v", z, y, x, err)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func downloadAndStoreTile(ctx context.Context, local time.Time, dateStr, hh, mm string, z, y, x int, opts Options) error {
|
||
url := fmt.Sprintf("https://image.data.cma.cn/tiles/China/CMPA_RT_China_0P01_HOR-PRE_GISJPG_Tiles/%s/%s/%s/%d/%d/%d.bin", dateStr, hh, mm, z, y, x)
|
||
// skip if exists in DB; dt source configurable (default: local slot time)
|
||
dtSource := strings.ToLower(getenvDefault("RAIN_DT_SOURCE", "local")) // local|url
|
||
var product string
|
||
var dtForKey time.Time
|
||
if ref, err := ParseCMPATileURL(url); err == nil {
|
||
product = ref.Product
|
||
if dtSource == "url" {
|
||
dtForKey = ref.DT
|
||
} else {
|
||
dtForKey = local
|
||
}
|
||
exists, err := databaseHas(ctx, product, dtForKey, z, y, x)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if exists {
|
||
log.Printf("[rain] skip: already in DB z=%d y=%d x=%d dt(%s)=%s url=%s", z, y, x, dtSource, dtForKey.Format("2006-01-02 15:04"), url)
|
||
return nil
|
||
}
|
||
}
|
||
|
||
fname := fmt.Sprintf("rain_z%d_y%d_x%d_%s.bin", z, y, x, local.Format("20060102_1504"))
|
||
dest := filepath.Join(opts.OutputDir, fname)
|
||
if _, err := os.Stat(dest); err == nil {
|
||
log.Printf("[rain] skip: file exists %s", dest)
|
||
return nil
|
||
}
|
||
|
||
if err := httpDownloadTo(ctx, url, dest); err != nil {
|
||
return err
|
||
}
|
||
log.Printf("[rain] saved %s (url=%s)", dest, url)
|
||
|
||
if opts.StoreToDB {
|
||
b, rerr := os.ReadFile(dest)
|
||
if rerr != nil {
|
||
return fmt.Errorf("read saved tile: %w", rerr)
|
||
}
|
||
// Determine product and dt according to dtSource
|
||
if product == "" {
|
||
if ref, e := ParseCMPATileURL(url); e == nil {
|
||
product = ref.Product
|
||
if dtSource == "url" {
|
||
dtForKey = ref.DT
|
||
} else {
|
||
dtForKey = local
|
||
}
|
||
}
|
||
}
|
||
if product == "" {
|
||
return fmt.Errorf("cannot parse product from url for DB store")
|
||
}
|
||
if dtForKey.IsZero() {
|
||
if dtSource == "url" {
|
||
return fmt.Errorf("dt source=url but failed to parse dt")
|
||
}
|
||
dtForKey = local
|
||
}
|
||
if err := database.UpsertRainTile(ctx, database.GetDB(), product, dtForKey, z, y, x, 256, 256, b); err != nil {
|
||
return fmt.Errorf("store tile db: %w", err)
|
||
}
|
||
log.Printf("[rain] stored to DB: %s (dt=%s, source=%s)", fname, dtForKey.Format("2006-01-02 15:04:05"), dtSource)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func httpDownloadTo(ctx context.Context, url, dest string) error {
|
||
client := &http.Client{Timeout: 20 * time.Second}
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||
if err != nil {
|
||
return fmt.Errorf("build request: %w", err)
|
||
}
|
||
req.Header.Set("Referer", "https://data.cma.cn/")
|
||
req.Header.Set("Origin", "https://data.cma.cn")
|
||
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36")
|
||
resp, err := client.Do(req)
|
||
if err != nil {
|
||
return fmt.Errorf("http get: %w", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
if resp.StatusCode != http.StatusOK {
|
||
return fmt.Errorf("unexpected status: %d", resp.StatusCode)
|
||
}
|
||
tmp := dest + ".part"
|
||
f, err := os.Create(tmp)
|
||
if err != nil {
|
||
return fmt.Errorf("create temp: %w", err)
|
||
}
|
||
_, copyErr := io.Copy(f, resp.Body)
|
||
closeErr := f.Close()
|
||
if copyErr != nil {
|
||
_ = os.Remove(tmp)
|
||
return fmt.Errorf("write body: %w", copyErr)
|
||
}
|
||
if closeErr != nil {
|
||
_ = os.Remove(tmp)
|
||
return fmt.Errorf("close temp: %w", closeErr)
|
||
}
|
||
if err := os.Rename(tmp, dest); err != nil {
|
||
// Cross-device fallback
|
||
if !errors.Is(err, os.ErrInvalid) {
|
||
return fmt.Errorf("rename: %w", err)
|
||
}
|
||
data, rerr := os.ReadFile(tmp)
|
||
if rerr != nil {
|
||
return fmt.Errorf("read temp: %w", rerr)
|
||
}
|
||
if werr := os.WriteFile(dest, data, 0o644); werr != nil {
|
||
return fmt.Errorf("write final: %w", werr)
|
||
}
|
||
_ = os.Remove(tmp)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// small env helpers (duplicated minimal set to avoid cross-package deps)
|
||
func getenvDefault(k, def string) string {
|
||
if v := os.Getenv(k); v != "" {
|
||
return v
|
||
}
|
||
return def
|
||
}
|
||
func getenvIntDefault(k string, def int) int {
|
||
if v := os.Getenv(k); v != "" {
|
||
if n, err := strconv.Atoi(v); err == nil {
|
||
return n
|
||
}
|
||
}
|
||
return def
|
||
}
|
||
func envEnabledDefaultTrue() bool {
|
||
v := strings.ToLower(os.Getenv("RAIN_ENABLED"))
|
||
if v == "" {
|
||
return true
|
||
}
|
||
return v == "1" || v == "true" || v == "yes"
|
||
}
|
||
|
||
func databaseHas(ctx context.Context, product string, dt time.Time, z, y, x int) (bool, error) {
|
||
return database.HasRainTile(ctx, database.GetDB(), product, dt, z, y, x)
|
||
}
|
||
|
||
func roundDownN(t time.Time, d time.Duration) time.Time { return t.Truncate(d) }
|