package rain import ( "context" "errors" "fmt" "io" "log" "net/http" "os" "path/filepath" "strconv" "strings" "time" "weatherstation/internal/database" ) // Options controls the rain (CMPA hourly precip) scheduler behavior. type Options struct { Enable bool OutputDir string BaseURL string // template with %Y%m%d/%H/%M and {z}/{y}/{x}; time is UTC MaxRetries int StoreToDB bool Tiles [][3]int // list of (z,y,x); defaults to [[7,40,102],[7,40,104]] } // Start starts the CMPA hourly rain tile downloader. // Runs every 10 minutes aligned to 10-minute boundaries. For each tick at local time T, // constructs the slot as floor_to_hour(T) - 1h (last completed hour), converts to UTC // (slot_utc = slot_local - 8h), and downloads 0-minute tile for each configured (z,y,x). func Start(ctx context.Context, opts Options) error { if !opts.Enable && !envEnabledDefaultTrue() { log.Println("[rain] scheduler disabled") return nil } if opts.OutputDir == "" { opts.OutputDir = getenvDefault("RAIN_DIR", "rain_data") } if opts.MaxRetries == 0 { opts.MaxRetries = getenvIntDefault("RAIN_MAX_RETRIES", 2) } if opts.BaseURL == "" { opts.BaseURL = getenvDefault("RAIN_BASE_URL", "https://image.data.cma.cn/tiles/China/CMPA_RT_China_0P01_HOR-PRE_GISJPG_Tiles/%Y%m%d/%H/%M/{z}/{y}/{x}.bin") } if len(opts.Tiles) == 0 { opts.Tiles = [][3]int{{7, 40, 102}, {7, 40, 104}} } if err := os.MkdirAll(opts.OutputDir, 0o755); err != nil { return fmt.Errorf("create rain output dir: %w", err) } loc, _ := time.LoadLocation("Asia/Shanghai") if loc == nil { loc = time.FixedZone("CST", 8*3600) } // immediate first run go func() { if err := runOnce(ctx, opts, loc); err != nil { log.Printf("[rain] first run error: %v", err) } }() // every 10 minutes go func() { for { if ctx.Err() != nil { return } now := time.Now().In(loc) runAt := roundDownN(now, 10*time.Minute).Add(10 * time.Minute) sleep := time.Until(runAt) if sleep < 0 { sleep = 0 } timer := time.NewTimer(sleep) select { case <-ctx.Done(): timer.Stop() return case <-timer.C: if err := runOnce(ctx, opts, loc); err != nil { log.Printf("[rain] run error: %v", err) } } } }() log.Printf("[rain] scheduler started (10m, dir=%s, tiles=%d)", opts.OutputDir, len(opts.Tiles)) return nil } func runOnce(ctx context.Context, opts Options, loc *time.Location) error { // target hour: current hour at 00 (floor_to_hour(now)) // e.g., 10:15 -> 10:00; if尚未发布则下载可能失败,等待下一次10分钟重试 now := time.Now().In(loc) slotLocal := now.Truncate(time.Hour) // UTC for URL path slotUTC := slotLocal.Add(-8 * time.Hour).In(time.UTC) log.Printf("[rain] tick target hour: local=%s (CST), utc=%s (UTC)", slotLocal.Format("2006-01-02 15:04"), slotUTC.Format("2006-01-02 15:04")) dateStr := slotUTC.Format("20060102") hh := slotUTC.Format("15") mm := "00" // hourly product uses minute 00 for _, t := range opts.Tiles { z, y, x := t[0], t[1], t[2] if err := downloadAndStoreTile(ctx, slotLocal, dateStr, hh, mm, z, y, x, opts); err != nil { log.Printf("[rain] download/store z=%d y=%d x=%d failed: %v", z, y, x, err) } } return nil } func downloadAndStoreTile(ctx context.Context, local time.Time, dateStr, hh, mm string, z, y, x int, opts Options) error { url := fmt.Sprintf("https://image.data.cma.cn/tiles/China/CMPA_RT_China_0P01_HOR-PRE_GISJPG_Tiles/%s/%s/%s/%d/%d/%d.bin", dateStr, hh, mm, z, y, x) // skip if exists in DB if ref, err := ParseCMPATileURL(url); err == nil { exists, err := databaseHas(ctx, ref.Product, ref.DT, z, y, x) if err != nil { return err } if exists { log.Printf("[rain] skip: already in DB z=%d y=%d x=%d dt(local)=%s url=%s", z, y, x, ref.DT.Format("2006-01-02 15:04"), url) return nil } } fname := fmt.Sprintf("rain_z%d_y%d_x%d_%s.bin", z, y, x, local.Format("20060102_1504")) dest := filepath.Join(opts.OutputDir, fname) if _, err := os.Stat(dest); err == nil { log.Printf("[rain] skip: file exists %s", dest) return nil } if err := httpDownloadTo(ctx, url, dest); err != nil { return err } log.Printf("[rain] saved %s (url=%s)", dest, url) if opts.StoreToDB { b, rerr := os.ReadFile(dest) if rerr != nil { return fmt.Errorf("read saved tile: %w", rerr) } if err := StoreTileBytes(ctx, url, b); err != nil { return fmt.Errorf("store tile db: %w", err) } log.Printf("[rain] stored to DB: %s", fname) } return nil } func httpDownloadTo(ctx context.Context, url, dest string) error { client := &http.Client{Timeout: 20 * time.Second} req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return fmt.Errorf("build request: %w", err) } req.Header.Set("Referer", "https://data.cma.cn/") req.Header.Set("Origin", "https://data.cma.cn") req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36") resp, err := client.Do(req) if err != nil { return fmt.Errorf("http get: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("unexpected status: %d", resp.StatusCode) } tmp := dest + ".part" f, err := os.Create(tmp) if err != nil { return fmt.Errorf("create temp: %w", err) } _, copyErr := io.Copy(f, resp.Body) closeErr := f.Close() if copyErr != nil { _ = os.Remove(tmp) return fmt.Errorf("write body: %w", copyErr) } if closeErr != nil { _ = os.Remove(tmp) return fmt.Errorf("close temp: %w", closeErr) } if err := os.Rename(tmp, dest); err != nil { // Cross-device fallback if !errors.Is(err, os.ErrInvalid) { return fmt.Errorf("rename: %w", err) } data, rerr := os.ReadFile(tmp) if rerr != nil { return fmt.Errorf("read temp: %w", rerr) } if werr := os.WriteFile(dest, data, 0o644); werr != nil { return fmt.Errorf("write final: %w", werr) } _ = os.Remove(tmp) } return nil } // small env helpers (duplicated minimal set to avoid cross-package deps) func getenvDefault(k, def string) string { if v := os.Getenv(k); v != "" { return v } return def } func getenvIntDefault(k string, def int) int { if v := os.Getenv(k); v != "" { if n, err := strconv.Atoi(v); err == nil { return n } } return def } func envEnabledDefaultTrue() bool { v := strings.ToLower(os.Getenv("RAIN_ENABLED")) if v == "" { return true } return v == "1" || v == "true" || v == "yes" } func databaseHas(ctx context.Context, product string, dt time.Time, z, y, x int) (bool, error) { return database.HasRainTile(ctx, database.GetDB(), product, dt, z, y, x) } func roundDownN(t time.Time, d time.Duration) time.Time { return t.Truncate(d) }