267 lines
7.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rain
import (
"context"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"weatherstation/internal/database"
)
// Options controls the rain (CMPA hourly precip) scheduler behavior.
type Options struct {
Enable bool
OutputDir string
BaseURL string // template with %Y%m%d/%H/%M and {z}/{y}/{x}; time is UTC
MaxRetries int
StoreToDB bool
Tiles [][3]int // list of (z,y,x); defaults to [[7,40,102],[7,40,104]]
}
// Start starts the CMPA hourly rain tile downloader.
// Runs every 10 minutes aligned to 10-minute boundaries. For each tick at local time T,
// constructs the slot as floor_to_hour(T) - 1h (last completed hour), converts to UTC
// (slot_utc = slot_local - 8h), and downloads 0-minute tile for each configured (z,y,x).
func Start(ctx context.Context, opts Options) error {
if !opts.Enable && !envEnabledDefaultTrue() {
log.Println("[rain] scheduler disabled")
return nil
}
if opts.OutputDir == "" {
if v := os.Getenv("RAIN_DIR"); v != "" {
opts.OutputDir = v
} else {
exe, _ := os.Executable()
exeDir := filepath.Dir(exe)
opts.OutputDir = filepath.Join(exeDir, "rain_data")
}
}
if opts.MaxRetries == 0 {
opts.MaxRetries = getenvIntDefault("RAIN_MAX_RETRIES", 2)
}
if opts.BaseURL == "" {
opts.BaseURL = getenvDefault("RAIN_BASE_URL", "https://image.data.cma.cn/tiles/China/CMPA_RT_China_0P01_HOR-PRE_GISJPG_Tiles/%Y%m%d/%H/%M/{z}/{y}/{x}.bin")
}
if len(opts.Tiles) == 0 {
opts.Tiles = [][3]int{{7, 40, 102}, {7, 40, 104}}
}
if err := os.MkdirAll(opts.OutputDir, 0o755); err != nil {
return fmt.Errorf("create rain output dir: %w", err)
}
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
// immediate first run
go func() {
if err := runOnce(ctx, opts, loc); err != nil {
log.Printf("[rain] first run error: %v", err)
}
}()
// every 10 minutes
go func() {
for {
if ctx.Err() != nil {
return
}
now := time.Now().In(loc)
runAt := roundDownN(now, 10*time.Minute).Add(10 * time.Minute)
sleep := time.Until(runAt)
if sleep < 0 {
sleep = 0
}
timer := time.NewTimer(sleep)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
if err := runOnce(ctx, opts, loc); err != nil {
log.Printf("[rain] run error: %v", err)
}
}
}
}()
log.Printf("[rain] scheduler started (10m, dir=%s, tiles=%d)", opts.OutputDir, len(opts.Tiles))
return nil
}
func runOnce(ctx context.Context, opts Options, loc *time.Location) error {
// target hour: current hour at 00 (floor_to_hour(now))
// e.g., 10:15 -> 10:00; if尚未发布则下载可能失败等待下一次10分钟重试
now := time.Now().In(loc)
slotLocal := now.Truncate(time.Hour)
// UTC for URL path
slotUTC := slotLocal.Add(-8 * time.Hour).In(time.UTC)
log.Printf("[rain] tick target hour: local=%s (CST), utc=%s (UTC)", slotLocal.Format("2006-01-02 15:04"), slotUTC.Format("2006-01-02 15:04"))
dateStr := slotUTC.Format("20060102")
hh := slotUTC.Format("15")
mm := "00" // hourly product uses minute 00
for _, t := range opts.Tiles {
z, y, x := t[0], t[1], t[2]
if err := downloadAndStoreTile(ctx, slotLocal, dateStr, hh, mm, z, y, x, opts); err != nil {
log.Printf("[rain] download/store z=%d y=%d x=%d failed: %v", z, y, x, err)
}
}
return nil
}
func downloadAndStoreTile(ctx context.Context, local time.Time, dateStr, hh, mm string, z, y, x int, opts Options) error {
url := fmt.Sprintf("https://image.data.cma.cn/tiles/China/CMPA_RT_China_0P01_HOR-PRE_GISJPG_Tiles/%s/%s/%s/%d/%d/%d.bin", dateStr, hh, mm, z, y, x)
// skip if exists in DB; dt source configurable (default: local slot time)
dtSource := strings.ToLower(getenvDefault("RAIN_DT_SOURCE", "local")) // local|url
var product string
var dtForKey time.Time
if ref, err := ParseCMPATileURL(url); err == nil {
product = ref.Product
if dtSource == "url" {
dtForKey = ref.DT
} else {
dtForKey = local
}
exists, err := databaseHas(ctx, product, dtForKey, z, y, x)
if err != nil {
return err
}
if exists {
log.Printf("[rain] skip: already in DB z=%d y=%d x=%d dt(%s)=%s url=%s", z, y, x, dtSource, dtForKey.Format("2006-01-02 15:04"), url)
return nil
}
}
fname := fmt.Sprintf("rain_z%d_y%d_x%d_%s.bin", z, y, x, local.Format("20060102_1504"))
dest := filepath.Join(opts.OutputDir, fname)
if _, err := os.Stat(dest); err == nil {
log.Printf("[rain] skip: file exists %s", dest)
return nil
}
if err := httpDownloadTo(ctx, url, dest); err != nil {
return err
}
log.Printf("[rain] saved %s (url=%s)", dest, url)
if opts.StoreToDB {
b, rerr := os.ReadFile(dest)
if rerr != nil {
return fmt.Errorf("read saved tile: %w", rerr)
}
// Determine product and dt according to dtSource
if product == "" {
if ref, e := ParseCMPATileURL(url); e == nil {
product = ref.Product
if dtSource == "url" {
dtForKey = ref.DT
} else {
dtForKey = local
}
}
}
if product == "" {
return fmt.Errorf("cannot parse product from url for DB store")
}
if dtForKey.IsZero() {
if dtSource == "url" {
return fmt.Errorf("dt source=url but failed to parse dt")
}
dtForKey = local
}
if err := database.UpsertRainTile(ctx, database.GetDB(), product, dtForKey, z, y, x, 256, 256, b); err != nil {
return fmt.Errorf("store tile db: %w", err)
}
log.Printf("[rain] stored to DB: %s (dt=%s, source=%s)", fname, dtForKey.Format("2006-01-02 15:04:05"), dtSource)
}
return nil
}
func httpDownloadTo(ctx context.Context, url, dest string) error {
client := &http.Client{Timeout: 20 * time.Second}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("build request: %w", err)
}
req.Header.Set("Referer", "https://data.cma.cn/")
req.Header.Set("Origin", "https://data.cma.cn")
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36")
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("http get: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status: %d", resp.StatusCode)
}
tmp := dest + ".part"
f, err := os.Create(tmp)
if err != nil {
return fmt.Errorf("create temp: %w", err)
}
_, copyErr := io.Copy(f, resp.Body)
closeErr := f.Close()
if copyErr != nil {
_ = os.Remove(tmp)
return fmt.Errorf("write body: %w", copyErr)
}
if closeErr != nil {
_ = os.Remove(tmp)
return fmt.Errorf("close temp: %w", closeErr)
}
if err := os.Rename(tmp, dest); err != nil {
// Cross-device fallback
if !errors.Is(err, os.ErrInvalid) {
return fmt.Errorf("rename: %w", err)
}
data, rerr := os.ReadFile(tmp)
if rerr != nil {
return fmt.Errorf("read temp: %w", rerr)
}
if werr := os.WriteFile(dest, data, 0o644); werr != nil {
return fmt.Errorf("write final: %w", werr)
}
_ = os.Remove(tmp)
}
return nil
}
// small env helpers (duplicated minimal set to avoid cross-package deps)
func getenvDefault(k, def string) string {
if v := os.Getenv(k); v != "" {
return v
}
return def
}
func getenvIntDefault(k string, def int) int {
if v := os.Getenv(k); v != "" {
if n, err := strconv.Atoi(v); err == nil {
return n
}
}
return def
}
func envEnabledDefaultTrue() bool {
v := strings.ToLower(os.Getenv("RAIN_ENABLED"))
if v == "" {
return true
}
return v == "1" || v == "true" || v == "yes"
}
func databaseHas(ctx context.Context, product string, dt time.Time, z, y, x int) (bool, error) {
return database.HasRainTile(ctx, database.GetDB(), product, dt, z, y, x)
}
func roundDownN(t time.Time, d time.Duration) time.Time { return t.Truncate(d) }