From 2ee30db41087a86cdd66cebfc0645260e6893673 Mon Sep 17 00:00:00 2001 From: yarnom Date: Fri, 17 Oct 2025 16:10:46 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9EWRF=E7=94=A8=E9=9B=B7?= =?UTF-8?q?=E8=BE=BE=E7=BB=84=E5=90=88=E5=8F=8D=E5=B0=84=E7=8E=87=E5=AF=BC?= =?UTF-8?q?=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/service-splitarea/main.go | 415 ++++++++++++++++++++++++++++++++++ 1 file changed, 415 insertions(+) create mode 100644 cmd/service-splitarea/main.go diff --git a/cmd/service-splitarea/main.go b/cmd/service-splitarea/main.go new file mode 100644 index 0000000..aeb6ff4 --- /dev/null +++ b/cmd/service-splitarea/main.go @@ -0,0 +1,415 @@ +package main + +import ( + "context" + "encoding/csv" + "encoding/json" + "errors" + "flag" + "fmt" + "log" + "math" + "net/http" + "net/url" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "weatherstation/internal/database" +) + +// Service that, at each hour, processes previous hour's :30 radar tile for z/y/x, +// splits a user region into 0.1° grid, averages dBZ (linear domain), fetches Open‑Meteo +// hourly variables at grid centers, and writes a CSV. + +type radarTileRecord struct { + DT time.Time + Z, Y, X int + Width, Height int + West, South, East, North float64 + ResDeg float64 + Data []byte +} + +func getRadarTileAt(ctx context.Context, z, y, x int, dt time.Time) (*radarTileRecord, error) { + const q = `SELECT dt, z, y, x, width, height, west, south, east, north, res_deg, data FROM radar_tiles WHERE z=$1 AND y=$2 AND x=$3 AND dt=$4 LIMIT 1` + row := database.GetDB().QueryRowContext(ctx, q, z, y, x, dt) + var r radarTileRecord + if err := row.Scan(&r.DT, &r.Z, &r.Y, &r.X, &r.Width, &r.Height, &r.West, &r.South, &r.East, &r.North, &r.ResDeg, &r.Data); err != nil { + return nil, err + } + return &r, nil +} + +// parseBounds parses "W,S,E,N" +func parseBounds(s string) (float64, float64, float64, float64, error) { + parts := strings.Split(s, ",") + if len(parts) != 4 { + return 0, 0, 0, 0, fmt.Errorf("bounds must be 'W,S,E,N'") + } + var vals [4]float64 + for i := 0; i < 4; i++ { + v, err := parseFloat(strings.TrimSpace(parts[i])) + if err != nil { + return 0, 0, 0, 0, fmt.Errorf("invalid bound %q: %v", parts[i], err) + } + vals[i] = v + } + w, s1, e, n := vals[0], vals[1], vals[2], vals[3] + if !(w < e && s1 < n) { + return 0, 0, 0, 0, errors.New("invalid bounds: require W= 32766 { // invalid mask + return 0, false + } + dbz := float64(v) / 10.0 + if dbz < 0 { // clip negative + return 0, false + } + return dbz, true +} + +// linearZ average over dBZs +func avgDbzLinear(vals []float64) float64 { + if len(vals) == 0 { + return math.NaN() + } + zsum := 0.0 + for _, d := range vals { + zsum += math.Pow(10, d/10.0) + } + meanZ := zsum / float64(len(vals)) + return 10.0 * math.Log10(meanZ) +} + +// open‑meteo client +type meteoResp struct { + Hourly struct { + Time []string `json:"time"` + Temp []float64 `json:"temperature_2m"` + RH []float64 `json:"relative_humidity_2m"` + Dew []float64 `json:"dew_point_2m"` + WS []float64 `json:"wind_speed_10m"` + WD []float64 `json:"wind_direction_10m"` + } `json:"hourly"` +} + +type meteoVals struct { + Temp, RH, Dew, WS, WD *float64 +} + +func fetchMeteo(ctx context.Context, client *http.Client, lon, lat float64, utcHour time.Time) (*meteoVals, error) { + base := "https://api.open-meteo.com/v1/forecast" + datePart := utcHour.UTC().Format("2006-01-02") + q := url.Values{} + q.Set("latitude", fmt.Sprintf("%.4f", lat)) + q.Set("longitude", fmt.Sprintf("%.4f", lon)) + q.Set("hourly", "dew_point_2m,wind_speed_10m,wind_direction_10m,relative_humidity_2m,temperature_2m") + q.Set("timezone", "UTC") + q.Set("start_date", datePart) + q.Set("end_date", datePart) + q.Set("wind_speed_unit", "ms") + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, base+"?"+q.Encode(), nil) + req.Header.Set("User-Agent", "WeatherStation-splitarea/1.0") + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("open-meteo status %d", resp.StatusCode) + } + var obj meteoResp + if err := json.NewDecoder(resp.Body).Decode(&obj); err != nil { + return nil, err + } + target := utcHour.UTC().Format("2006-01-02T15:00") + idx := -1 + for i, t := range obj.Hourly.Time { + if t == target { + idx = i + break + } + } + if idx < 0 { + return &meteoVals{}, nil + } + mv := meteoVals{} + pick := func(arr []float64) *float64 { + if arr == nil || idx >= len(arr) { + return nil + } + v := arr[idx] + return &v + } + mv.Temp = pick(obj.Hourly.Temp) + mv.RH = pick(obj.Hourly.RH) + mv.Dew = pick(obj.Hourly.Dew) + mv.WS = pick(obj.Hourly.WS) + mv.WD = pick(obj.Hourly.WD) + return &mv, nil +} + +// job executes the split+augment for a specific local time (CST) and fixed minute=30 of previous hour. +func job(ctx context.Context, z, y, x int, bounds string, tzOffset int, outDir string, runAt time.Time) error { + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + runAt = runAt.In(loc) + + // Previous hour's :30 + targetLocal := runAt.Truncate(time.Hour).Add(-1 * time.Hour).Add(30 * time.Minute) + + // Fetch tile + rec, err := getRadarTileAt(ctx, z, y, x, targetLocal) + if err != nil { + return fmt.Errorf("load radar tile z=%d y=%d x=%d at %s: %w", z, y, x, targetLocal.Format("2006-01-02 15:04:05"), err) + } + // Bounds + Bw, Bs, Be, Bn, err := parseBounds(bounds) + if err != nil { + return err + } + // Clamp to tile + if !(rec.West <= Bw && Be <= rec.East && rec.South <= Bs && Bn <= rec.North) { + return fmt.Errorf("bounds not inside tile: tile=(%.4f,%.4f,%.4f,%.4f) B=(%.4f,%.4f,%.4f,%.4f)", rec.West, rec.South, rec.East, rec.North, Bw, Bs, Be, Bn) + } + Gw, Gs, Ge, Gn := align0p1(Bw, Bs, Be, Bn) + // clamp + Gw = math.Max(Gw, rec.West) + Gs = math.Max(Gs, rec.South) + Ge = math.Min(Ge, rec.East) + Gn = math.Min(Gn, rec.North) + if Ge <= Gw || Gn <= Gs { + return fmt.Errorf("aligned bounds empty") + } + + // Grid iterate with 0.1° + d := 0.1 + ncols := int(math.Round((Ge - Gw) / d)) + nrows := int(math.Round((Gn - Gs) / d)) + if ncols <= 0 || nrows <= 0 { + return fmt.Errorf("grid size zero") + } + + // Decode int16 big‑endian as we go; avoid full decode into []int16 to save mem + w, h := rec.Width, rec.Height + if w <= 0 || h <= 0 || len(rec.Data) < w*h*2 { + return fmt.Errorf("invalid tile data") + } + + // Prepare output dir: export_data/split_area/YYYYMMDD/HH/30 + ymd := targetLocal.Format("20060102") + hh := targetLocal.Format("15") + mm := targetLocal.Format("04") + dir := filepath.Join(outDir, ymd, hh, mm) + if err := os.MkdirAll(dir, 0o755); err != nil { + return err + } + base := fmt.Sprintf("%d-%d-%d", z, y, x) + outPath := filepath.Join(dir, base+"_radar.csv") + + // Prepare Open‑Meteo time window + // Target UTC hour = floor(local to hour) - tzOffset + floored := targetLocal.Truncate(time.Hour) + utcHour := floored.Add(-time.Duration(tzOffset) * time.Hour) + client := &http.Client{Timeout: 15 * time.Second} + cache := map[string]*meteoVals{} + keyOf := func(lon, lat float64) string { return fmt.Sprintf("%.4f,%.4f", lon, lat) } + + // CSV output + f, err := os.Create(outPath) + if err != nil { + return err + } + defer f.Close() + wcsv := csv.NewWriter(f) + defer wcsv.Flush() + // Header + _ = wcsv.Write([]string{"longitude", "latitude", "reflectivity_dbz", "temperature_2m", "relative_humidity_2m", "dew_point_2m", "wind_speed_10m", "wind_direction_10m"}) + + // Helper to read raw at (row,col) + readRaw := func(rr, cc int) int16 { + off := (rr*w + cc) * 2 + return int16(uint16(rec.Data[off])<<8 | uint16(rec.Data[off+1])) + } + + // Iterate grid cells + for ri := 0; ri < nrows; ri++ { + cellS := Gs + float64(ri)*d + cellN := cellS + d + row0 := maxInt(0, latToRow(rec.South, rec.ResDeg, cellS)) + row1 := minInt(h, int(math.Ceil((cellN-rec.South)/rec.ResDeg))) + for ci := 0; ci < ncols; ci++ { + cellW := Gw + float64(ci)*d + cellE := cellW + d + col0 := maxInt(0, lonToCol(rec.West, rec.ResDeg, cellW)) + col1 := minInt(w, int(math.Ceil((cellE-rec.West)/rec.ResDeg))) + // accumulate + dbzs := make([]float64, 0, (row1-row0)*(col1-col0)) + for rr := row0; rr < row1; rr++ { + for cc := col0; cc < col1; cc++ { + draw := readRaw(rr, cc) + if d, ok := dbzFromRaw(draw); ok { + dbzs = append(dbzs, d) + } + } + } + var cellDBZStr string + if len(dbzs) > 0 { + cellDBZ := avgDbzLinear(dbzs) + cellDBZStr = fmt.Sprintf("%.1f", cellDBZ) + } else { + cellDBZStr = "" + } + lon := (cellW + cellE) / 2.0 + lat := (cellS + cellN) / 2.0 + + // Fetch meteo (cache by rounded lon,lat) + k := keyOf(lon, lat) + mv := cache[k] + if mv == nil { + mv, _ = fetchMeteo(ctx, client, lon, lat, utcHour) + cache[k] = mv + } + // write row + wcsv.Write([]string{ + fmt.Sprintf("%.4f", lon), fmt.Sprintf("%.4f", lat), cellDBZStr, + fmtOptF(mv, func(m *meteoVals) *float64 { + if m == nil { + return nil + } + return m.Temp + }), + fmtOptF(mv, func(m *meteoVals) *float64 { + if m == nil { + return nil + } + return m.RH + }), + fmtOptF(mv, func(m *meteoVals) *float64 { + if m == nil { + return nil + } + return m.Dew + }), + fmtOptF(mv, func(m *meteoVals) *float64 { + if m == nil { + return nil + } + return m.WS + }), + fmtOptF(mv, func(m *meteoVals) *float64 { + if m == nil { + return nil + } + return m.WD + }), + }) + } + } + wcsv.Flush() + if err := wcsv.Error(); err != nil { + return err + } + log.Printf("[splitarea] saved %s", outPath) + return nil +} + +func fmtOptF(mv *meteoVals, pick func(*meteoVals) *float64) string { + if mv == nil { + return "" + } + p := pick(mv) + if p == nil { + return "" + } + return fmt.Sprintf("%.2f", *p) +} + +func maxInt(a, b int) int { + if a > b { + return a + } + return b +} +func minInt(a, b int) int { + if a < b { + return a + } + return b +} + +func getenvDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +func main() { + var ( + z = flag.Int("z", 7, "tile z") + y = flag.Int("y", 40, "tile y") + x = flag.Int("x", 102, "tile x") + b = flag.String("b", getenvDefault("SPLITAREA_B", "108.15,22.83,109.27,23.61"), "region bounds W,S,E,N") + outDir = flag.String("out", "export_data/split_area", "output base directory") + tzOffset = flag.Int("tz-offset", 8, "timezone offset hours to UTC for local time") + once = flag.Bool("once", false, "run once for previous hour and exit") + ) + flag.Parse() + // Bounds now have a sensible default; still validate format later in job() + // Ensure DB initialized + _ = database.GetDB() + + ctx := context.Background() + if *once { + if err := job(ctx, *z, *y, *x, *b, *tzOffset, *outDir, time.Now()); err != nil { + log.Fatalf("run once: %v", err) + } + return + } + + // Hourly scheduler: run at each hour boundary for previous hour :30 + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + for { + now := time.Now().In(loc) + runAt := now.Truncate(time.Hour).Add(time.Hour) // next hour + time.Sleep(time.Until(runAt)) + // execute + if err := job(ctx, *z, *y, *x, *b, *tzOffset, *outDir, runAt); err != nil { + log.Printf("[splitarea] job error: %v", err) + } + } +}