feat: 新增WRF用雷达组合反射率导出

This commit is contained in:
yarnom 2025-10-17 16:10:46 +08:00
parent d78bfd381b
commit 2ee30db410

View File

@ -0,0 +1,415 @@
package main
import (
"context"
"encoding/csv"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"math"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"weatherstation/internal/database"
)
// Service that, at each hour, processes previous hour's :30 radar tile for z/y/x,
// splits a user region into 0.1° grid, averages dBZ (linear domain), fetches OpenMeteo
// hourly variables at grid centers, and writes a CSV.
type radarTileRecord struct {
DT time.Time
Z, Y, X int
Width, Height int
West, South, East, North float64
ResDeg float64
Data []byte
}
func getRadarTileAt(ctx context.Context, z, y, x int, dt time.Time) (*radarTileRecord, error) {
const q = `SELECT dt, z, y, x, width, height, west, south, east, north, res_deg, data FROM radar_tiles WHERE z=$1 AND y=$2 AND x=$3 AND dt=$4 LIMIT 1`
row := database.GetDB().QueryRowContext(ctx, q, z, y, x, dt)
var r radarTileRecord
if err := row.Scan(&r.DT, &r.Z, &r.Y, &r.X, &r.Width, &r.Height, &r.West, &r.South, &r.East, &r.North, &r.ResDeg, &r.Data); err != nil {
return nil, err
}
return &r, nil
}
// parseBounds parses "W,S,E,N"
func parseBounds(s string) (float64, float64, float64, float64, error) {
parts := strings.Split(s, ",")
if len(parts) != 4 {
return 0, 0, 0, 0, fmt.Errorf("bounds must be 'W,S,E,N'")
}
var vals [4]float64
for i := 0; i < 4; i++ {
v, err := parseFloat(strings.TrimSpace(parts[i]))
if err != nil {
return 0, 0, 0, 0, fmt.Errorf("invalid bound %q: %v", parts[i], err)
}
vals[i] = v
}
w, s1, e, n := vals[0], vals[1], vals[2], vals[3]
if !(w < e && s1 < n) {
return 0, 0, 0, 0, errors.New("invalid bounds: require W<E and S<N")
}
return w, s1, e, n, nil
}
func parseFloat(s string) (float64, error) {
// Simple parser to avoid locale issues
return strconvParseFloat(s)
}
// Wrap strconv.ParseFloat to keep imports minimal in patch header
func strconvParseFloat(s string) (float64, error) { return strconv.ParseFloat(s, 64) }
// align0p1 snaps to 0.1° grid
func align0p1(w, s, e, n float64) (float64, float64, float64, float64) {
w2 := math.Floor(w*10.0) / 10.0
s2 := math.Floor(s*10.0) / 10.0
e2 := math.Ceil(e*10.0) / 10.0
n2 := math.Ceil(n*10.0) / 10.0
return w2, s2, e2, n2
}
func lonToCol(west, res float64, lon float64) int { return int(math.Floor((lon - west) / res)) }
func latToRow(south, res float64, lat float64) int { return int(math.Floor((lat - south) / res)) }
// dbzFromRaw converts raw bigendian int16 to dBZ, applying validity checks as in API
func dbzFromRaw(v int16) (float64, bool) {
if v >= 32766 { // invalid mask
return 0, false
}
dbz := float64(v) / 10.0
if dbz < 0 { // clip negative
return 0, false
}
return dbz, true
}
// linearZ average over dBZs
func avgDbzLinear(vals []float64) float64 {
if len(vals) == 0 {
return math.NaN()
}
zsum := 0.0
for _, d := range vals {
zsum += math.Pow(10, d/10.0)
}
meanZ := zsum / float64(len(vals))
return 10.0 * math.Log10(meanZ)
}
// openmeteo client
type meteoResp struct {
Hourly struct {
Time []string `json:"time"`
Temp []float64 `json:"temperature_2m"`
RH []float64 `json:"relative_humidity_2m"`
Dew []float64 `json:"dew_point_2m"`
WS []float64 `json:"wind_speed_10m"`
WD []float64 `json:"wind_direction_10m"`
} `json:"hourly"`
}
type meteoVals struct {
Temp, RH, Dew, WS, WD *float64
}
func fetchMeteo(ctx context.Context, client *http.Client, lon, lat float64, utcHour time.Time) (*meteoVals, error) {
base := "https://api.open-meteo.com/v1/forecast"
datePart := utcHour.UTC().Format("2006-01-02")
q := url.Values{}
q.Set("latitude", fmt.Sprintf("%.4f", lat))
q.Set("longitude", fmt.Sprintf("%.4f", lon))
q.Set("hourly", "dew_point_2m,wind_speed_10m,wind_direction_10m,relative_humidity_2m,temperature_2m")
q.Set("timezone", "UTC")
q.Set("start_date", datePart)
q.Set("end_date", datePart)
q.Set("wind_speed_unit", "ms")
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, base+"?"+q.Encode(), nil)
req.Header.Set("User-Agent", "WeatherStation-splitarea/1.0")
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("open-meteo status %d", resp.StatusCode)
}
var obj meteoResp
if err := json.NewDecoder(resp.Body).Decode(&obj); err != nil {
return nil, err
}
target := utcHour.UTC().Format("2006-01-02T15:00")
idx := -1
for i, t := range obj.Hourly.Time {
if t == target {
idx = i
break
}
}
if idx < 0 {
return &meteoVals{}, nil
}
mv := meteoVals{}
pick := func(arr []float64) *float64 {
if arr == nil || idx >= len(arr) {
return nil
}
v := arr[idx]
return &v
}
mv.Temp = pick(obj.Hourly.Temp)
mv.RH = pick(obj.Hourly.RH)
mv.Dew = pick(obj.Hourly.Dew)
mv.WS = pick(obj.Hourly.WS)
mv.WD = pick(obj.Hourly.WD)
return &mv, nil
}
// job executes the split+augment for a specific local time (CST) and fixed minute=30 of previous hour.
func job(ctx context.Context, z, y, x int, bounds string, tzOffset int, outDir string, runAt time.Time) error {
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
runAt = runAt.In(loc)
// Previous hour's :30
targetLocal := runAt.Truncate(time.Hour).Add(-1 * time.Hour).Add(30 * time.Minute)
// Fetch tile
rec, err := getRadarTileAt(ctx, z, y, x, targetLocal)
if err != nil {
return fmt.Errorf("load radar tile z=%d y=%d x=%d at %s: %w", z, y, x, targetLocal.Format("2006-01-02 15:04:05"), err)
}
// Bounds
Bw, Bs, Be, Bn, err := parseBounds(bounds)
if err != nil {
return err
}
// Clamp to tile
if !(rec.West <= Bw && Be <= rec.East && rec.South <= Bs && Bn <= rec.North) {
return fmt.Errorf("bounds not inside tile: tile=(%.4f,%.4f,%.4f,%.4f) B=(%.4f,%.4f,%.4f,%.4f)", rec.West, rec.South, rec.East, rec.North, Bw, Bs, Be, Bn)
}
Gw, Gs, Ge, Gn := align0p1(Bw, Bs, Be, Bn)
// clamp
Gw = math.Max(Gw, rec.West)
Gs = math.Max(Gs, rec.South)
Ge = math.Min(Ge, rec.East)
Gn = math.Min(Gn, rec.North)
if Ge <= Gw || Gn <= Gs {
return fmt.Errorf("aligned bounds empty")
}
// Grid iterate with 0.1°
d := 0.1
ncols := int(math.Round((Ge - Gw) / d))
nrows := int(math.Round((Gn - Gs) / d))
if ncols <= 0 || nrows <= 0 {
return fmt.Errorf("grid size zero")
}
// Decode int16 bigendian as we go; avoid full decode into []int16 to save mem
w, h := rec.Width, rec.Height
if w <= 0 || h <= 0 || len(rec.Data) < w*h*2 {
return fmt.Errorf("invalid tile data")
}
// Prepare output dir: export_data/split_area/YYYYMMDD/HH/30
ymd := targetLocal.Format("20060102")
hh := targetLocal.Format("15")
mm := targetLocal.Format("04")
dir := filepath.Join(outDir, ymd, hh, mm)
if err := os.MkdirAll(dir, 0o755); err != nil {
return err
}
base := fmt.Sprintf("%d-%d-%d", z, y, x)
outPath := filepath.Join(dir, base+"_radar.csv")
// Prepare OpenMeteo time window
// Target UTC hour = floor(local to hour) - tzOffset
floored := targetLocal.Truncate(time.Hour)
utcHour := floored.Add(-time.Duration(tzOffset) * time.Hour)
client := &http.Client{Timeout: 15 * time.Second}
cache := map[string]*meteoVals{}
keyOf := func(lon, lat float64) string { return fmt.Sprintf("%.4f,%.4f", lon, lat) }
// CSV output
f, err := os.Create(outPath)
if err != nil {
return err
}
defer f.Close()
wcsv := csv.NewWriter(f)
defer wcsv.Flush()
// Header
_ = wcsv.Write([]string{"longitude", "latitude", "reflectivity_dbz", "temperature_2m", "relative_humidity_2m", "dew_point_2m", "wind_speed_10m", "wind_direction_10m"})
// Helper to read raw at (row,col)
readRaw := func(rr, cc int) int16 {
off := (rr*w + cc) * 2
return int16(uint16(rec.Data[off])<<8 | uint16(rec.Data[off+1]))
}
// Iterate grid cells
for ri := 0; ri < nrows; ri++ {
cellS := Gs + float64(ri)*d
cellN := cellS + d
row0 := maxInt(0, latToRow(rec.South, rec.ResDeg, cellS))
row1 := minInt(h, int(math.Ceil((cellN-rec.South)/rec.ResDeg)))
for ci := 0; ci < ncols; ci++ {
cellW := Gw + float64(ci)*d
cellE := cellW + d
col0 := maxInt(0, lonToCol(rec.West, rec.ResDeg, cellW))
col1 := minInt(w, int(math.Ceil((cellE-rec.West)/rec.ResDeg)))
// accumulate
dbzs := make([]float64, 0, (row1-row0)*(col1-col0))
for rr := row0; rr < row1; rr++ {
for cc := col0; cc < col1; cc++ {
draw := readRaw(rr, cc)
if d, ok := dbzFromRaw(draw); ok {
dbzs = append(dbzs, d)
}
}
}
var cellDBZStr string
if len(dbzs) > 0 {
cellDBZ := avgDbzLinear(dbzs)
cellDBZStr = fmt.Sprintf("%.1f", cellDBZ)
} else {
cellDBZStr = ""
}
lon := (cellW + cellE) / 2.0
lat := (cellS + cellN) / 2.0
// Fetch meteo (cache by rounded lon,lat)
k := keyOf(lon, lat)
mv := cache[k]
if mv == nil {
mv, _ = fetchMeteo(ctx, client, lon, lat, utcHour)
cache[k] = mv
}
// write row
wcsv.Write([]string{
fmt.Sprintf("%.4f", lon), fmt.Sprintf("%.4f", lat), cellDBZStr,
fmtOptF(mv, func(m *meteoVals) *float64 {
if m == nil {
return nil
}
return m.Temp
}),
fmtOptF(mv, func(m *meteoVals) *float64 {
if m == nil {
return nil
}
return m.RH
}),
fmtOptF(mv, func(m *meteoVals) *float64 {
if m == nil {
return nil
}
return m.Dew
}),
fmtOptF(mv, func(m *meteoVals) *float64 {
if m == nil {
return nil
}
return m.WS
}),
fmtOptF(mv, func(m *meteoVals) *float64 {
if m == nil {
return nil
}
return m.WD
}),
})
}
}
wcsv.Flush()
if err := wcsv.Error(); err != nil {
return err
}
log.Printf("[splitarea] saved %s", outPath)
return nil
}
func fmtOptF(mv *meteoVals, pick func(*meteoVals) *float64) string {
if mv == nil {
return ""
}
p := pick(mv)
if p == nil {
return ""
}
return fmt.Sprintf("%.2f", *p)
}
func maxInt(a, b int) int {
if a > b {
return a
}
return b
}
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
func getenvDefault(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
func main() {
var (
z = flag.Int("z", 7, "tile z")
y = flag.Int("y", 40, "tile y")
x = flag.Int("x", 102, "tile x")
b = flag.String("b", getenvDefault("SPLITAREA_B", "108.15,22.83,109.27,23.61"), "region bounds W,S,E,N")
outDir = flag.String("out", "export_data/split_area", "output base directory")
tzOffset = flag.Int("tz-offset", 8, "timezone offset hours to UTC for local time")
once = flag.Bool("once", false, "run once for previous hour and exit")
)
flag.Parse()
// Bounds now have a sensible default; still validate format later in job()
// Ensure DB initialized
_ = database.GetDB()
ctx := context.Background()
if *once {
if err := job(ctx, *z, *y, *x, *b, *tzOffset, *outDir, time.Now()); err != nil {
log.Fatalf("run once: %v", err)
}
return
}
// Hourly scheduler: run at each hour boundary for previous hour :30
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
for {
now := time.Now().In(loc)
runAt := now.Truncate(time.Hour).Add(time.Hour) // next hour
time.Sleep(time.Until(runAt))
// execute
if err := job(ctx, *z, *y, *x, *b, *tzOffset, *outDir, runAt); err != nil {
log.Printf("[splitarea] job error: %v", err)
}
}
}