438 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"database/sql"
"encoding/csv"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"math"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"weatherstation/internal/database"
)
// Service that, at hh:45 each hour, processes current hour's :30 radar tile for z/y/x,
// splits a user region into 0.1° grid, averages dBZ (linear domain), fetches OpenMeteo
// hourly variables at grid centers, and writes a CSV.
type radarTileRecord struct {
DT time.Time
Z, Y, X int
Width, Height int
West, South, East, North float64
ResDeg float64
Data []byte
}
func getRadarTileAt(ctx context.Context, z, y, x int, dt time.Time) (*radarTileRecord, error) {
const q = `SELECT dt, z, y, x, width, height, west, south, east, north, res_deg, data FROM radar_tiles WHERE z=$1 AND y=$2 AND x=$3 AND dt=$4 LIMIT 1`
row := database.GetDB().QueryRowContext(ctx, q, z, y, x, dt)
var r radarTileRecord
if err := row.Scan(&r.DT, &r.Z, &r.Y, &r.X, &r.Width, &r.Height, &r.West, &r.South, &r.East, &r.North, &r.ResDeg, &r.Data); err != nil {
return nil, err
}
return &r, nil
}
// parseBounds parses "W,S,E,N"
func parseBounds(s string) (float64, float64, float64, float64, error) {
parts := strings.Split(s, ",")
if len(parts) != 4 {
return 0, 0, 0, 0, fmt.Errorf("bounds must be 'W,S,E,N'")
}
var vals [4]float64
for i := 0; i < 4; i++ {
v, err := parseFloat(strings.TrimSpace(parts[i]))
if err != nil {
return 0, 0, 0, 0, fmt.Errorf("invalid bound %q: %v", parts[i], err)
}
vals[i] = v
}
w, s1, e, n := vals[0], vals[1], vals[2], vals[3]
if !(w < e && s1 < n) {
return 0, 0, 0, 0, errors.New("invalid bounds: require W<E and S<N")
}
return w, s1, e, n, nil
}
func parseFloat(s string) (float64, error) {
// Simple parser to avoid locale issues
return strconvParseFloat(s)
}
// Wrap strconv.ParseFloat to keep imports minimal in patch header
func strconvParseFloat(s string) (float64, error) { return strconv.ParseFloat(s, 64) }
// align0p1 snaps to 0.1° grid
func align0p1(w, s, e, n float64) (float64, float64, float64, float64) {
w2 := math.Floor(w*10.0) / 10.0
s2 := math.Floor(s*10.0) / 10.0
e2 := math.Ceil(e*10.0) / 10.0
n2 := math.Ceil(n*10.0) / 10.0
return w2, s2, e2, n2
}
func lonToCol(west, res float64, lon float64) int { return int(math.Floor((lon - west) / res)) }
func latToRow(south, res float64, lat float64) int { return int(math.Floor((lat - south) / res)) }
// dbzFromRaw converts raw bigendian int16 to dBZ, applying validity checks as in API
func dbzFromRaw(v int16) (float64, bool) {
if v >= 32766 { // invalid mask
return 0, false
}
dbz := float64(v) / 10.0
if dbz < 0 { // clip negative
return 0, false
}
return dbz, true
}
// linearZ average over dBZs
func avgDbzLinear(vals []float64) float64 {
if len(vals) == 0 {
return math.NaN()
}
zsum := 0.0
for _, d := range vals {
zsum += math.Pow(10, d/10.0)
}
meanZ := zsum / float64(len(vals))
return 10.0 * math.Log10(meanZ)
}
// openmeteo client
type meteoResp struct {
Hourly struct {
Time []string `json:"time"`
Temp []float64 `json:"temperature_2m"`
RH []float64 `json:"relative_humidity_2m"`
Dew []float64 `json:"dew_point_2m"`
WS []float64 `json:"wind_speed_10m"`
WD []float64 `json:"wind_direction_10m"`
} `json:"hourly"`
}
type meteoVals struct {
Temp, RH, Dew, WS, WD *float64
}
func fetchMeteo(ctx context.Context, client *http.Client, lon, lat float64, utcHour time.Time) (*meteoVals, error) {
base := "https://api.open-meteo.com/v1/forecast"
datePart := utcHour.UTC().Format("2006-01-02")
q := url.Values{}
q.Set("latitude", fmt.Sprintf("%.4f", lat))
q.Set("longitude", fmt.Sprintf("%.4f", lon))
q.Set("hourly", "dew_point_2m,wind_speed_10m,wind_direction_10m,relative_humidity_2m,temperature_2m")
q.Set("timezone", "UTC")
q.Set("start_date", datePart)
q.Set("end_date", datePart)
q.Set("wind_speed_unit", "ms")
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, base+"?"+q.Encode(), nil)
req.Header.Set("User-Agent", "WeatherStation-splitarea/1.0")
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("open-meteo status %d", resp.StatusCode)
}
var obj meteoResp
if err := json.NewDecoder(resp.Body).Decode(&obj); err != nil {
return nil, err
}
target := utcHour.UTC().Format("2006-01-02T15:00")
idx := -1
for i, t := range obj.Hourly.Time {
if t == target {
idx = i
break
}
}
if idx < 0 {
return &meteoVals{}, nil
}
mv := meteoVals{}
pick := func(arr []float64) *float64 {
if arr == nil || idx >= len(arr) {
return nil
}
v := arr[idx]
return &v
}
mv.Temp = pick(obj.Hourly.Temp)
mv.RH = pick(obj.Hourly.RH)
mv.Dew = pick(obj.Hourly.Dew)
mv.WS = pick(obj.Hourly.WS)
mv.WD = pick(obj.Hourly.WD)
return &mv, nil
}
// job executes the split+augment for a specific local time (CST) and fixed minute=30 of the same hour.
func job(ctx context.Context, z, y, x int, bounds string, tzOffset int, outDir string, runAt time.Time) error {
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
runAt = runAt.In(loc)
// Prefer current hour's :30 tile, fallback to :24 then :00 if missing.
targetBase := runAt.Truncate(time.Hour)
candidates := []int{30, 24, 0}
var (
targetLocal time.Time
rec *radarTileRecord
)
for _, minute := range candidates {
candidate := targetBase.Add(time.Duration(minute) * time.Minute)
tile, err := getRadarTileAt(ctx, z, y, x, candidate)
if err == nil {
if minute != candidates[0] {
log.Printf("[splitarea] fallback to %02d-minute radar tile at %s", minute, candidate.Format("2006-01-02 15:04:05"))
}
targetLocal = candidate
rec = tile
break
}
if !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("load radar tile z=%d y=%d x=%d at %s: %w", z, y, x, candidate.Format("2006-01-02 15:04:05"), err)
}
}
if rec == nil {
return fmt.Errorf("no radar tile found for z=%d y=%d x=%d near %s (checked minutes %v)", z, y, x, targetBase.Format("2006-01-02 15:04:05"), candidates)
}
// Bounds
Bw, Bs, Be, Bn, err := parseBounds(bounds)
if err != nil {
return err
}
// Clamp to tile
if !(rec.West <= Bw && Be <= rec.East && rec.South <= Bs && Bn <= rec.North) {
return fmt.Errorf("bounds not inside tile: tile=(%.4f,%.4f,%.4f,%.4f) B=(%.4f,%.4f,%.4f,%.4f)", rec.West, rec.South, rec.East, rec.North, Bw, Bs, Be, Bn)
}
Gw, Gs, Ge, Gn := align0p1(Bw, Bs, Be, Bn)
// clamp
Gw = math.Max(Gw, rec.West)
Gs = math.Max(Gs, rec.South)
Ge = math.Min(Ge, rec.East)
Gn = math.Min(Gn, rec.North)
if Ge <= Gw || Gn <= Gs {
return fmt.Errorf("aligned bounds empty")
}
// Grid iterate with 0.1°
d := 0.1
ncols := int(math.Round((Ge - Gw) / d))
nrows := int(math.Round((Gn - Gs) / d))
if ncols <= 0 || nrows <= 0 {
return fmt.Errorf("grid size zero")
}
// Decode int16 bigendian as we go; avoid full decode into []int16 to save mem
w, h := rec.Width, rec.Height
if w <= 0 || h <= 0 || len(rec.Data) < w*h*2 {
return fmt.Errorf("invalid tile data")
}
// Prepare output dir: export_data/split_area/YYYYMMDD/HH/mm
ymd := targetLocal.Format("20060102")
hh := targetLocal.Format("15")
mm := targetLocal.Format("04")
dir := filepath.Join(outDir, ymd, hh, mm)
if err := os.MkdirAll(dir, 0o755); err != nil {
return err
}
base := fmt.Sprintf("%d-%d-%d", z, y, x)
outPath := filepath.Join(dir, base+"_radar.csv")
// Prepare OpenMeteo time window
// Target UTC hour = floor(local to hour) - tzOffset
floored := targetLocal.Truncate(time.Hour)
utcHour := floored.Add(-time.Duration(tzOffset) * time.Hour)
client := &http.Client{Timeout: 15 * time.Second}
cache := map[string]*meteoVals{}
keyOf := func(lon, lat float64) string { return fmt.Sprintf("%.4f,%.4f", lon, lat) }
// CSV output
f, err := os.Create(outPath)
if err != nil {
return err
}
defer f.Close()
wcsv := csv.NewWriter(f)
defer wcsv.Flush()
// Header
_ = wcsv.Write([]string{"longitude", "latitude", "reflectivity_dbz", "temperature_2m", "relative_humidity_2m", "dew_point_2m", "wind_speed_10m", "wind_direction_10m"})
// Helper to read raw at (row,col)
readRaw := func(rr, cc int) int16 {
off := (rr*w + cc) * 2
return int16(uint16(rec.Data[off])<<8 | uint16(rec.Data[off+1]))
}
// Iterate grid cells
for ri := 0; ri < nrows; ri++ {
cellS := Gs + float64(ri)*d
cellN := cellS + d
row0 := maxInt(0, latToRow(rec.South, rec.ResDeg, cellS))
row1 := minInt(h, int(math.Ceil((cellN-rec.South)/rec.ResDeg)))
for ci := 0; ci < ncols; ci++ {
cellW := Gw + float64(ci)*d
cellE := cellW + d
col0 := maxInt(0, lonToCol(rec.West, rec.ResDeg, cellW))
col1 := minInt(w, int(math.Ceil((cellE-rec.West)/rec.ResDeg)))
// accumulate
dbzs := make([]float64, 0, (row1-row0)*(col1-col0))
for rr := row0; rr < row1; rr++ {
for cc := col0; cc < col1; cc++ {
draw := readRaw(rr, cc)
if d, ok := dbzFromRaw(draw); ok {
dbzs = append(dbzs, d)
}
}
}
var cellDBZStr string
if len(dbzs) > 0 {
cellDBZ := avgDbzLinear(dbzs)
cellDBZStr = fmt.Sprintf("%.1f", cellDBZ)
} else {
cellDBZStr = ""
}
lon := (cellW + cellE) / 2.0
lat := (cellS + cellN) / 2.0
// Fetch meteo (cache by rounded lon,lat)
k := keyOf(lon, lat)
mv := cache[k]
if mv == nil {
mv, _ = fetchMeteo(ctx, client, lon, lat, utcHour)
cache[k] = mv
}
// write row
wcsv.Write([]string{
fmt.Sprintf("%.4f", lon), fmt.Sprintf("%.4f", lat), cellDBZStr,
fmtOptF(mv, func(m *meteoVals) *float64 {
if m == nil {
return nil
}
return m.Temp
}),
fmtOptF(mv, func(m *meteoVals) *float64 {
if m == nil {
return nil
}
return m.RH
}),
fmtOptF(mv, func(m *meteoVals) *float64 {
if m == nil {
return nil
}
return m.Dew
}),
fmtOptF(mv, func(m *meteoVals) *float64 {
if m == nil {
return nil
}
return m.WS
}),
fmtOptF(mv, func(m *meteoVals) *float64 {
if m == nil {
return nil
}
return m.WD
}),
})
}
}
wcsv.Flush()
if err := wcsv.Error(); err != nil {
return err
}
log.Printf("[splitarea] saved %s", outPath)
return nil
}
func fmtOptF(mv *meteoVals, pick func(*meteoVals) *float64) string {
if mv == nil {
return ""
}
p := pick(mv)
if p == nil {
return ""
}
return fmt.Sprintf("%.2f", *p)
}
func maxInt(a, b int) int {
if a > b {
return a
}
return b
}
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
func getenvDefault(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
func main() {
var (
z = flag.Int("z", 7, "tile z")
y = flag.Int("y", 40, "tile y")
x = flag.Int("x", 102, "tile x")
b = flag.String("b", getenvDefault("SPLITAREA_B", "108.15,22.83,109.27,23.61"), "region bounds W,S,E,N")
outDir = flag.String("out", "export_data/split_area", "output base directory")
tzOffset = flag.Int("tz-offset", 8, "timezone offset hours to UTC for local time")
once = flag.Bool("once", false, "run once for previous hour and exit")
)
flag.Parse()
// Bounds now have a sensible default; still validate format later in job()
// Ensure DB initialized
_ = database.GetDB()
ctx := context.Background()
if *once {
if err := job(ctx, *z, *y, *x, *b, *tzOffset, *outDir, time.Now()); err != nil {
log.Fatalf("run once: %v", err)
}
return
}
// Scheduler: run hourly at hh:45 for current hour's :30 radar tile
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
for {
now := time.Now().In(loc)
runAt := now.Truncate(time.Hour).Add(45 * time.Minute)
if now.After(runAt) {
runAt = runAt.Add(time.Hour)
}
time.Sleep(time.Until(runAt))
// execute
if err := job(ctx, *z, *y, *x, *b, *tzOffset, *outDir, runAt); err != nil {
log.Printf("[splitarea] job error: %v", err)
}
}
}