419 lines
11 KiB
Go
419 lines
11 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"encoding/csv"
|
||
"encoding/json"
|
||
"errors"
|
||
"flag"
|
||
"fmt"
|
||
"log"
|
||
"math"
|
||
"net/http"
|
||
"net/url"
|
||
"os"
|
||
"path/filepath"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"weatherstation/internal/database"
|
||
)
|
||
|
||
// Service that, at hh:45 each hour, processes current hour's :30 radar tile for z/y/x,
|
||
// splits a user region into 0.1° grid, averages dBZ (linear domain), fetches Open‑Meteo
|
||
// hourly variables at grid centers, and writes a CSV.
|
||
|
||
type radarTileRecord struct {
|
||
DT time.Time
|
||
Z, Y, X int
|
||
Width, Height int
|
||
West, South, East, North float64
|
||
ResDeg float64
|
||
Data []byte
|
||
}
|
||
|
||
func getRadarTileAt(ctx context.Context, z, y, x int, dt time.Time) (*radarTileRecord, error) {
|
||
const q = `SELECT dt, z, y, x, width, height, west, south, east, north, res_deg, data FROM radar_tiles WHERE z=$1 AND y=$2 AND x=$3 AND dt=$4 LIMIT 1`
|
||
row := database.GetDB().QueryRowContext(ctx, q, z, y, x, dt)
|
||
var r radarTileRecord
|
||
if err := row.Scan(&r.DT, &r.Z, &r.Y, &r.X, &r.Width, &r.Height, &r.West, &r.South, &r.East, &r.North, &r.ResDeg, &r.Data); err != nil {
|
||
return nil, err
|
||
}
|
||
return &r, nil
|
||
}
|
||
|
||
// parseBounds parses "W,S,E,N"
|
||
func parseBounds(s string) (float64, float64, float64, float64, error) {
|
||
parts := strings.Split(s, ",")
|
||
if len(parts) != 4 {
|
||
return 0, 0, 0, 0, fmt.Errorf("bounds must be 'W,S,E,N'")
|
||
}
|
||
var vals [4]float64
|
||
for i := 0; i < 4; i++ {
|
||
v, err := parseFloat(strings.TrimSpace(parts[i]))
|
||
if err != nil {
|
||
return 0, 0, 0, 0, fmt.Errorf("invalid bound %q: %v", parts[i], err)
|
||
}
|
||
vals[i] = v
|
||
}
|
||
w, s1, e, n := vals[0], vals[1], vals[2], vals[3]
|
||
if !(w < e && s1 < n) {
|
||
return 0, 0, 0, 0, errors.New("invalid bounds: require W<E and S<N")
|
||
}
|
||
return w, s1, e, n, nil
|
||
}
|
||
|
||
func parseFloat(s string) (float64, error) {
|
||
// Simple parser to avoid locale issues
|
||
return strconvParseFloat(s)
|
||
}
|
||
|
||
// Wrap strconv.ParseFloat to keep imports minimal in patch header
|
||
func strconvParseFloat(s string) (float64, error) { return strconv.ParseFloat(s, 64) }
|
||
|
||
// align0p1 snaps to 0.1° grid
|
||
func align0p1(w, s, e, n float64) (float64, float64, float64, float64) {
|
||
w2 := math.Floor(w*10.0) / 10.0
|
||
s2 := math.Floor(s*10.0) / 10.0
|
||
e2 := math.Ceil(e*10.0) / 10.0
|
||
n2 := math.Ceil(n*10.0) / 10.0
|
||
return w2, s2, e2, n2
|
||
}
|
||
|
||
func lonToCol(west, res float64, lon float64) int { return int(math.Floor((lon - west) / res)) }
|
||
func latToRow(south, res float64, lat float64) int { return int(math.Floor((lat - south) / res)) }
|
||
|
||
// dbzFromRaw converts raw big‑endian int16 to dBZ, applying validity checks as in API
|
||
func dbzFromRaw(v int16) (float64, bool) {
|
||
if v >= 32766 { // invalid mask
|
||
return 0, false
|
||
}
|
||
dbz := float64(v) / 10.0
|
||
if dbz < 0 { // clip negative
|
||
return 0, false
|
||
}
|
||
return dbz, true
|
||
}
|
||
|
||
// linearZ average over dBZs
|
||
func avgDbzLinear(vals []float64) float64 {
|
||
if len(vals) == 0 {
|
||
return math.NaN()
|
||
}
|
||
zsum := 0.0
|
||
for _, d := range vals {
|
||
zsum += math.Pow(10, d/10.0)
|
||
}
|
||
meanZ := zsum / float64(len(vals))
|
||
return 10.0 * math.Log10(meanZ)
|
||
}
|
||
|
||
// open‑meteo client
|
||
type meteoResp struct {
|
||
Hourly struct {
|
||
Time []string `json:"time"`
|
||
Temp []float64 `json:"temperature_2m"`
|
||
RH []float64 `json:"relative_humidity_2m"`
|
||
Dew []float64 `json:"dew_point_2m"`
|
||
WS []float64 `json:"wind_speed_10m"`
|
||
WD []float64 `json:"wind_direction_10m"`
|
||
} `json:"hourly"`
|
||
}
|
||
|
||
type meteoVals struct {
|
||
Temp, RH, Dew, WS, WD *float64
|
||
}
|
||
|
||
func fetchMeteo(ctx context.Context, client *http.Client, lon, lat float64, utcHour time.Time) (*meteoVals, error) {
|
||
base := "https://api.open-meteo.com/v1/forecast"
|
||
datePart := utcHour.UTC().Format("2006-01-02")
|
||
q := url.Values{}
|
||
q.Set("latitude", fmt.Sprintf("%.4f", lat))
|
||
q.Set("longitude", fmt.Sprintf("%.4f", lon))
|
||
q.Set("hourly", "dew_point_2m,wind_speed_10m,wind_direction_10m,relative_humidity_2m,temperature_2m")
|
||
q.Set("timezone", "UTC")
|
||
q.Set("start_date", datePart)
|
||
q.Set("end_date", datePart)
|
||
q.Set("wind_speed_unit", "ms")
|
||
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, base+"?"+q.Encode(), nil)
|
||
req.Header.Set("User-Agent", "WeatherStation-splitarea/1.0")
|
||
resp, err := client.Do(req)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer resp.Body.Close()
|
||
if resp.StatusCode != http.StatusOK {
|
||
return nil, fmt.Errorf("open-meteo status %d", resp.StatusCode)
|
||
}
|
||
var obj meteoResp
|
||
if err := json.NewDecoder(resp.Body).Decode(&obj); err != nil {
|
||
return nil, err
|
||
}
|
||
target := utcHour.UTC().Format("2006-01-02T15:00")
|
||
idx := -1
|
||
for i, t := range obj.Hourly.Time {
|
||
if t == target {
|
||
idx = i
|
||
break
|
||
}
|
||
}
|
||
if idx < 0 {
|
||
return &meteoVals{}, nil
|
||
}
|
||
mv := meteoVals{}
|
||
pick := func(arr []float64) *float64 {
|
||
if arr == nil || idx >= len(arr) {
|
||
return nil
|
||
}
|
||
v := arr[idx]
|
||
return &v
|
||
}
|
||
mv.Temp = pick(obj.Hourly.Temp)
|
||
mv.RH = pick(obj.Hourly.RH)
|
||
mv.Dew = pick(obj.Hourly.Dew)
|
||
mv.WS = pick(obj.Hourly.WS)
|
||
mv.WD = pick(obj.Hourly.WD)
|
||
return &mv, nil
|
||
}
|
||
|
||
// job executes the split+augment for a specific local time (CST) and fixed minute=30 of the same hour.
|
||
func job(ctx context.Context, z, y, x int, bounds string, tzOffset int, outDir string, runAt time.Time) error {
|
||
loc, _ := time.LoadLocation("Asia/Shanghai")
|
||
if loc == nil {
|
||
loc = time.FixedZone("CST", 8*3600)
|
||
}
|
||
runAt = runAt.In(loc)
|
||
|
||
// Current hour's :30
|
||
targetLocal := runAt.Truncate(time.Hour).Add(24 * time.Minute)
|
||
|
||
// Fetch tile
|
||
rec, err := getRadarTileAt(ctx, z, y, x, targetLocal)
|
||
if err != nil {
|
||
return fmt.Errorf("load radar tile z=%d y=%d x=%d at %s: %w", z, y, x, targetLocal.Format("2006-01-02 15:04:05"), err)
|
||
}
|
||
|
||
// Bounds
|
||
Bw, Bs, Be, Bn, err := parseBounds(bounds)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
// Clamp to tile
|
||
if !(rec.West <= Bw && Be <= rec.East && rec.South <= Bs && Bn <= rec.North) {
|
||
return fmt.Errorf("bounds not inside tile: tile=(%.4f,%.4f,%.4f,%.4f) B=(%.4f,%.4f,%.4f,%.4f)", rec.West, rec.South, rec.East, rec.North, Bw, Bs, Be, Bn)
|
||
}
|
||
Gw, Gs, Ge, Gn := align0p1(Bw, Bs, Be, Bn)
|
||
// clamp
|
||
Gw = math.Max(Gw, rec.West)
|
||
Gs = math.Max(Gs, rec.South)
|
||
Ge = math.Min(Ge, rec.East)
|
||
Gn = math.Min(Gn, rec.North)
|
||
if Ge <= Gw || Gn <= Gs {
|
||
return fmt.Errorf("aligned bounds empty")
|
||
}
|
||
|
||
// Grid iterate with 0.1°
|
||
d := 0.1
|
||
ncols := int(math.Round((Ge - Gw) / d))
|
||
nrows := int(math.Round((Gn - Gs) / d))
|
||
if ncols <= 0 || nrows <= 0 {
|
||
return fmt.Errorf("grid size zero")
|
||
}
|
||
|
||
// Decode int16 big‑endian as we go; avoid full decode into []int16 to save mem
|
||
w, h := rec.Width, rec.Height
|
||
if w <= 0 || h <= 0 || len(rec.Data) < w*h*2 {
|
||
return fmt.Errorf("invalid tile data")
|
||
}
|
||
|
||
// Prepare output dir: export_data/split_area/YYYYMMDD/HH/30 (keep legacy minute path)
|
||
ymd := targetLocal.Format("20060102")
|
||
hh := targetLocal.Format("15")
|
||
dir := filepath.Join(outDir, ymd, hh, "30")
|
||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||
return err
|
||
}
|
||
base := fmt.Sprintf("%d-%d-%d", z, y, x)
|
||
outPath := filepath.Join(dir, base+"_radar.csv")
|
||
|
||
// Prepare Open‑Meteo time window
|
||
// Target UTC hour = floor(local to hour) - tzOffset
|
||
floored := targetLocal.Truncate(time.Hour)
|
||
utcHour := floored.Add(-time.Duration(tzOffset) * time.Hour)
|
||
client := &http.Client{Timeout: 15 * time.Second}
|
||
cache := map[string]*meteoVals{}
|
||
keyOf := func(lon, lat float64) string { return fmt.Sprintf("%.4f,%.4f", lon, lat) }
|
||
|
||
// CSV output
|
||
f, err := os.Create(outPath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer f.Close()
|
||
wcsv := csv.NewWriter(f)
|
||
defer wcsv.Flush()
|
||
// Header
|
||
_ = wcsv.Write([]string{"longitude", "latitude", "reflectivity_dbz", "temperature_2m", "relative_humidity_2m", "dew_point_2m", "wind_speed_10m", "wind_direction_10m"})
|
||
|
||
// Helper to read raw at (row,col)
|
||
readRaw := func(rr, cc int) int16 {
|
||
off := (rr*w + cc) * 2
|
||
return int16(uint16(rec.Data[off])<<8 | uint16(rec.Data[off+1]))
|
||
}
|
||
|
||
// Iterate grid cells
|
||
for ri := 0; ri < nrows; ri++ {
|
||
cellS := Gs + float64(ri)*d
|
||
cellN := cellS + d
|
||
row0 := maxInt(0, latToRow(rec.South, rec.ResDeg, cellS))
|
||
row1 := minInt(h, int(math.Ceil((cellN-rec.South)/rec.ResDeg)))
|
||
for ci := 0; ci < ncols; ci++ {
|
||
cellW := Gw + float64(ci)*d
|
||
cellE := cellW + d
|
||
col0 := maxInt(0, lonToCol(rec.West, rec.ResDeg, cellW))
|
||
col1 := minInt(w, int(math.Ceil((cellE-rec.West)/rec.ResDeg)))
|
||
// accumulate
|
||
dbzs := make([]float64, 0, (row1-row0)*(col1-col0))
|
||
for rr := row0; rr < row1; rr++ {
|
||
for cc := col0; cc < col1; cc++ {
|
||
draw := readRaw(rr, cc)
|
||
if d, ok := dbzFromRaw(draw); ok {
|
||
dbzs = append(dbzs, d)
|
||
}
|
||
}
|
||
}
|
||
var cellDBZStr string
|
||
if len(dbzs) > 0 {
|
||
cellDBZ := avgDbzLinear(dbzs)
|
||
cellDBZStr = fmt.Sprintf("%.1f", cellDBZ)
|
||
} else {
|
||
cellDBZStr = ""
|
||
}
|
||
lon := (cellW + cellE) / 2.0
|
||
lat := (cellS + cellN) / 2.0
|
||
|
||
// Fetch meteo (cache by rounded lon,lat)
|
||
k := keyOf(lon, lat)
|
||
mv := cache[k]
|
||
if mv == nil {
|
||
mv, _ = fetchMeteo(ctx, client, lon, lat, utcHour)
|
||
cache[k] = mv
|
||
}
|
||
// write row
|
||
wcsv.Write([]string{
|
||
fmt.Sprintf("%.4f", lon), fmt.Sprintf("%.4f", lat), cellDBZStr,
|
||
fmtOptF(mv, func(m *meteoVals) *float64 {
|
||
if m == nil {
|
||
return nil
|
||
}
|
||
return m.Temp
|
||
}),
|
||
fmtOptF(mv, func(m *meteoVals) *float64 {
|
||
if m == nil {
|
||
return nil
|
||
}
|
||
return m.RH
|
||
}),
|
||
fmtOptF(mv, func(m *meteoVals) *float64 {
|
||
if m == nil {
|
||
return nil
|
||
}
|
||
return m.Dew
|
||
}),
|
||
fmtOptF(mv, func(m *meteoVals) *float64 {
|
||
if m == nil {
|
||
return nil
|
||
}
|
||
return m.WS
|
||
}),
|
||
fmtOptF(mv, func(m *meteoVals) *float64 {
|
||
if m == nil {
|
||
return nil
|
||
}
|
||
return m.WD
|
||
}),
|
||
})
|
||
}
|
||
}
|
||
wcsv.Flush()
|
||
if err := wcsv.Error(); err != nil {
|
||
return err
|
||
}
|
||
log.Printf("[splitarea] saved %s", outPath)
|
||
return nil
|
||
}
|
||
|
||
func fmtOptF(mv *meteoVals, pick func(*meteoVals) *float64) string {
|
||
if mv == nil {
|
||
return ""
|
||
}
|
||
p := pick(mv)
|
||
if p == nil {
|
||
return ""
|
||
}
|
||
return fmt.Sprintf("%.2f", *p)
|
||
}
|
||
|
||
func maxInt(a, b int) int {
|
||
if a > b {
|
||
return a
|
||
}
|
||
return b
|
||
}
|
||
func minInt(a, b int) int {
|
||
if a < b {
|
||
return a
|
||
}
|
||
return b
|
||
}
|
||
|
||
func getenvDefault(key, def string) string {
|
||
if v := os.Getenv(key); v != "" {
|
||
return v
|
||
}
|
||
return def
|
||
}
|
||
|
||
func main() {
|
||
var (
|
||
z = flag.Int("z", 7, "tile z")
|
||
y = flag.Int("y", 40, "tile y")
|
||
x = flag.Int("x", 102, "tile x")
|
||
b = flag.String("b", getenvDefault("SPLITAREA_B", "108.15,22.83,109.27,23.61"), "region bounds W,S,E,N")
|
||
outDir = flag.String("out", "export_data/split_area", "output base directory")
|
||
tzOffset = flag.Int("tz-offset", 8, "timezone offset hours to UTC for local time")
|
||
once = flag.Bool("once", false, "run once for previous hour and exit")
|
||
)
|
||
flag.Parse()
|
||
// Bounds now have a sensible default; still validate format later in job()
|
||
// Ensure DB initialized
|
||
_ = database.GetDB()
|
||
|
||
ctx := context.Background()
|
||
if *once {
|
||
if err := job(ctx, *z, *y, *x, *b, *tzOffset, *outDir, time.Now()); err != nil {
|
||
log.Fatalf("run once: %v", err)
|
||
}
|
||
return
|
||
}
|
||
|
||
// Scheduler: run hourly at hh:45 for current hour's :30 radar tile
|
||
loc, _ := time.LoadLocation("Asia/Shanghai")
|
||
if loc == nil {
|
||
loc = time.FixedZone("CST", 8*3600)
|
||
}
|
||
for {
|
||
now := time.Now().In(loc)
|
||
runAt := now.Truncate(time.Hour).Add(45 * time.Minute)
|
||
if now.After(runAt) {
|
||
runAt = runAt.Add(time.Hour)
|
||
}
|
||
time.Sleep(time.Until(runAt))
|
||
// execute
|
||
if err := job(ctx, *z, *y, *x, *b, *tzOffset, *outDir, runAt); err != nil {
|
||
log.Printf("[splitarea] job error: %v", err)
|
||
}
|
||
}
|
||
}
|