From 2c2d5232f8eb304085d5306bc011453ba4c8d558 Mon Sep 17 00:00:00 2001 From: yarnom Date: Wed, 3 Dec 2025 11:30:13 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E4=B8=80=E4=BA=9B?= =?UTF-8?q?=E8=BE=85=E5=8A=A9=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/caiyun_parse/main.go | 279 +++++++++++++++ cmd/radar_export_csv/main.go | 448 ++++++++++++++++++++++++ cmd/radar_rain_export_csv/main.go | 553 ++++++++++++++++++++++++++++++ cmd/rainfetch/main.go | 227 ++++++++++++ cmd/service-rain/main.go | 28 ++ core/cmd/im_export_data/main.go | 235 +++++++++++++ 6 files changed, 1770 insertions(+) create mode 100644 cmd/caiyun_parse/main.go create mode 100644 cmd/radar_export_csv/main.go create mode 100644 cmd/radar_rain_export_csv/main.go create mode 100644 cmd/rainfetch/main.go create mode 100644 cmd/service-rain/main.go create mode 100644 core/cmd/im_export_data/main.go diff --git a/cmd/caiyun_parse/main.go b/cmd/caiyun_parse/main.go new file mode 100644 index 0000000..4d4c490 --- /dev/null +++ b/cmd/caiyun_parse/main.go @@ -0,0 +1,279 @@ +package main + +import ( + "bufio" + "encoding/csv" + "encoding/json" + "flag" + "fmt" + "io" + "os" + "sort" + "strings" + "time" +) + +// Minimal Caiyun hourly model focusing on required fields. +type caiyunHourly struct { + Status string `json:"status"` + Result struct { + Hourly struct { + Status string `json:"status"` + Temperature []valTime `json:"temperature"` + Humidity []valTime `json:"humidity"` + Visibility []valTime `json:"visibility"` + Dswrf []valTime `json:"dswrf"` + Pressure []valTime `json:"pressure"` + Wind []windTime `json:"wind"` + } `json:"hourly"` + } `json:"result"` +} + +type valTime struct { + Datetime string `json:"datetime"` + Value float64 `json:"value"` +} + +type windTime struct { + Datetime string `json:"datetime"` + Speed float64 `json:"speed"` + Direction float64 `json:"direction"` +} + +type row struct { + t time.Time + temperature *float64 + humidity *float64 + windSpeed *float64 + windDir *float64 + pressure *float64 + visibility *float64 + dswrf *float64 +} + +func main() { + var file string + var tz string + var mode string + var alias string + var lat float64 + var lon float64 + var sqlTable string + flag.StringVar(&file, "file", "", "Path to Caiyun hourly JSON; if empty, read from stdin") + flag.StringVar(&tz, "tz", "Asia/Shanghai", "Timezone for output timestamps") + flag.StringVar(&mode, "mode", "csv", "Output mode: csv | sql") + flag.StringVar(&alias, "alias", "", "Station alias for SQL output (required for mode=sql)") + flag.Float64Var(&lat, "lat", 0, "Latitude for SQL output") + flag.Float64Var(&lon, "lon", 0, "Longitude for SQL output") + flag.StringVar(&sqlTable, "sqltable", "radar_weather", "SQL table name for inserts") + flag.Parse() + + var r io.Reader + if file == "" { + r = bufio.NewReader(os.Stdin) + } else { + f, err := os.Open(file) + if err != nil { + fatalf("open file: %v", err) + } + defer f.Close() + r = f + } + + var payload caiyunHourly + dec := json.NewDecoder(r) + if err := dec.Decode(&payload); err != nil { + fatalf("decode json: %v", err) + } + if strings.ToLower(payload.Status) != "ok" && payload.Status != "" { + fatalf("top-level status not ok: %s", payload.Status) + } + + loc, _ := time.LoadLocation(tz) + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + + // Merge series by timestamp + rowsByTime := map[time.Time]*row{} + upsert := func(ts string) *row { + t, ok := parseTime(ts, loc) + if !ok { + fatalf("parse time failed: %s", ts) + } + if v, exists := rowsByTime[t]; exists { + return v + } + nr := &row{t: t} + rowsByTime[t] = nr + return nr + } + + for _, v := range payload.Result.Hourly.Temperature { + rr := upsert(v.Datetime) + rr.temperature = ptr(v.Value) + } + for _, v := range payload.Result.Hourly.Humidity { + rr := upsert(v.Datetime) + rr.humidity = ptr(v.Value) + } + for _, v := range payload.Result.Hourly.Visibility { + rr := upsert(v.Datetime) + rr.visibility = ptr(v.Value) + } + for _, v := range payload.Result.Hourly.Dswrf { + rr := upsert(v.Datetime) + rr.dswrf = ptr(v.Value) + } + for _, v := range payload.Result.Hourly.Pressure { + rr := upsert(v.Datetime) + rr.pressure = ptr(v.Value) + } + for _, w := range payload.Result.Hourly.Wind { + rr := upsert(w.Datetime) + rr.windSpeed = ptr(w.Speed) + rr.windDir = ptr(w.Direction) + } + + // Sort by time + times := make([]time.Time, 0, len(rowsByTime)) + for t := range rowsByTime { + times = append(times, t) + } + sort.Slice(times, func(i, j int) bool { return times[i].Before(times[j]) }) + + if mode == "sql" { + if alias == "" { + fatalf("-alias is required for mode=sql") + } + // Emit upserts into radar_weather; convert wind_speed km/h -> m/s, keep humidity as ratio (0..1) + fmt.Println("BEGIN;") + for _, t := range times { + rr := rowsByTime[t] + // wind speed conversion + var ws string + if rr.windSpeed != nil { + v := *rr.windSpeed / 3.6 + ws = trimZeros(fmt.Sprintf("%.6f", v)) + } + // Build SQL with NULLs where missing + q := fmt.Sprintf( + "INSERT INTO %s (alias, lat, lon, dt, temperature, humidity, cloudrate, visibility, dswrf, wind_speed, wind_direction, pressure) "+ + "VALUES (%s, %s, %s, %s, %s, %s, NULL, %s, %s, %s, %s, %s) "+ + "ON CONFLICT (alias, dt) DO UPDATE SET "+ + "lat=EXCLUDED.lat, lon=EXCLUDED.lon, temperature=EXCLUDED.temperature, humidity=EXCLUDED.humidity, "+ + "visibility=EXCLUDED.visibility, dswrf=EXCLUDED.dswrf, wind_speed=EXCLUDED.wind_speed, "+ + "wind_direction=EXCLUDED.wind_direction, pressure=EXCLUDED.pressure;", + sqlTable, + sqlQuote(alias), + sqlNum(lat), + sqlNum(lon), + sqlTime(t), + sqlOpt(rr.temperature), + sqlOpt(rr.humidity), + sqlOpt(rr.visibility), + sqlOpt(rr.dswrf), + sqlStrOrNull(ws), + sqlOpt(rr.windDir), + sqlOpt(rr.pressure), + ) + fmt.Println(q) + } + fmt.Println("COMMIT;") + return + } + + // CSV output + w := csv.NewWriter(os.Stdout) + _ = w.Write([]string{"datetime", "temperature", "humidity", "wind_speed", "wind_direction", "pressure", "visibility", "dswrf"}) + for _, t := range times { + rr := rowsByTime[t] + var ws string + if rr.windSpeed != nil { + v := *rr.windSpeed / 3.6 + ws = trimZeros(fmt.Sprintf("%.6f", v)) + } + rec := []string{ + t.Format("2006-01-02 15:04:05"), + optf(rr.temperature), + optf(rr.humidity), + ws, + optf(rr.windDir), + optf(rr.pressure), + optf(rr.visibility), + optf(rr.dswrf), + } + _ = w.Write(rec) + } + w.Flush() + if err := w.Error(); err != nil { + fatalf("write csv: %v", err) + } +} + +func ptr(f float64) *float64 { return &f } + +func optf(p *float64) string { + if p == nil { + return "" + } + // Trim trailing zeros via fmt + return trimZeros(fmt.Sprintf("%.6f", *p)) +} + +func trimZeros(s string) string { + if !strings.Contains(s, ".") { + return s + } + s = strings.TrimRight(s, "0") + s = strings.TrimRight(s, ".") + return s +} + +// parseTime attempts RFC3339 and common Caiyun formats without seconds. +func parseTime(s string, loc *time.Location) (time.Time, bool) { + // Try RFC3339 first + if t, err := time.Parse(time.RFC3339, s); err == nil { + return t.In(loc), true + } + // Try without seconds, with offset, e.g. 2006-01-02T15:04+08:00 + if t, err := time.Parse("2006-01-02T15:04-07:00", s); err == nil { + return t.In(loc), true + } + // Try without offset (assume loc) + if t, err := time.ParseInLocation("2006-01-02 15:04", s, loc); err == nil { + return t.In(loc), true + } + return time.Time{}, false +} + +func fatalf(format string, args ...any) { + fmt.Fprintf(os.Stderr, format+"\n", args...) + os.Exit(1) +} + +func sqlQuote(s string) string { + return "'" + strings.ReplaceAll(s, "'", "''") + "'" +} + +func sqlNum(f float64) string { + return trimZeros(fmt.Sprintf("%.8f", f)) +} + +func sqlTime(t time.Time) string { + return sqlQuote(t.Format("2006-01-02 15:04:05")) +} + +func sqlOpt(p *float64) string { + if p == nil { + return "NULL" + } + return trimZeros(fmt.Sprintf("%.6f", *p)) +} + +func sqlStrOrNull(s string) string { + if s == "" { + return "NULL" + } + return s +} diff --git a/cmd/radar_export_csv/main.go b/cmd/radar_export_csv/main.go new file mode 100644 index 0000000..2c36877 --- /dev/null +++ b/cmd/radar_export_csv/main.go @@ -0,0 +1,448 @@ +package main + +import ( + "context" + "database/sql" + "encoding/binary" + "encoding/csv" + "errors" + "flag" + "fmt" + "log" + "math" + "os" + "strings" + "time" + "weatherstation/internal/database" +) + +type stationInfo struct { + ID string + Alias string + Lat float64 + Lon float64 + Z int + Y int + X int +} + +type tileRec struct { + DT time.Time + Width, Height int + West, South float64 + East, North float64 + ResDeg float64 + Data []byte +} + +func main() { + var stationID string + var startStr string + var endStr string + var outPath string + var verbose bool + + flag.StringVar(&stationID, "station_id", "", "站点ID(留空表示全部WH65LP且有经纬度的站)") + flag.StringVar(&startStr, "start", "", "起始时间(YYYY-MM-DD HH:MM:SS,CST)") + flag.StringVar(&endStr, "end", "", "结束时间(YYYY-MM-DD HH:MM:SS,CST)") + flag.StringVar(&outPath, "out", "radar_stats.csv", "输出CSV文件路径") + flag.BoolVar(&verbose, "info", false, "输出详细过程信息") + flag.Parse() + + if strings.TrimSpace(startStr) == "" || strings.TrimSpace(endStr) == "" { + log.Fatalln("必须提供 --start 与 --end,格式 YYYY-MM-DD HH:MM:SS") + } + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + startT, err := time.ParseInLocation("2006-01-02 15:04:05", startStr, loc) + if err != nil { + log.Fatalf("解析 start 失败: %v", err) + } + endT, err := time.ParseInLocation("2006-01-02 15:04:05", endStr, loc) + if err != nil { + log.Fatalf("解析 end 失败: %v", err) + } + if !endT.After(startT) { + log.Fatalln("结束时间必须大于起始时间") + } + + // 初始化数据库 + _ = database.GetDB() + defer database.Close() + + // 获取站点列表 + stations, err := listStations(database.GetDB(), stationID) + if err != nil { + log.Fatalf("查询站点失败: %v", err) + } + if len(stations) == 0 { + log.Fatalln("没有符合条件的站点") + } + if verbose { + log.Printf("站点数量: %d", len(stations)) + for _, s := range stations { + log.Printf("站点: id=%s alias=%s lat=%.5f lon=%.5f z/y/x=%d/%d/%d", s.ID, s.Alias, s.Lat, s.Lon, s.Z, s.Y, s.X) + } + } + + // 创建CSV + f, err := os.Create(outPath) + if err != nil { + log.Fatalf("创建输出文件失败: %v", err) + } + defer f.Close() + w := csv.NewWriter(f) + defer w.Flush() + + header := []string{ + "station_id", "station_alias", "dt", "lat", "lon", "wind_speed_ms", "wind_dir_deg", + "sector_ge40_cnt", "sector_ge40_sum", "sector_ge30_cnt", "sector_ge30_sum", + "circle_ge40_cnt", "circle_ge40_sum", "circle_ge30_cnt", "circle_ge30_sum", + "rs485_rain_total_mm", + } + if err := w.Write(header); err != nil { + log.Fatalf("写入CSV表头失败: %v", err) + } + + ctx := context.Background() + totalRows := 0 + var totalTiles, skipNoZYX, skipNoWind, skipDecode int + for _, s := range stations { + if s.Z == 0 && s.Y == 0 && s.X == 0 { + log.Printf("跳过站点 %s(无z/y/x映射)", s.ID) + skipNoZYX++ + continue + } + tiles, err := listTiles(ctx, database.GetDB(), s.Z, s.Y, s.X, startT, endT) + if err != nil { + log.Printf("查询瓦片失败 station=%s: %v", s.ID, err) + continue + } + totalTiles += len(tiles) + if verbose { + log.Printf("站点 %s 瓦片数量: %d", s.ID, len(tiles)) + } + if len(tiles) == 0 { + log.Printf("站点 %s 在范围内无瓦片", s.ID) + } + + for _, t := range tiles { + // 10分钟向下取整时间(bucket) + bucket := bucket10(t.DT, loc) + // NOTE: 按需改为用 station_id 匹配 radar_weather.alias + windSpeed, windDir, ok, err := loadWindAt(database.GetDB(), s.ID, bucket) + if err != nil { + log.Printf("读取风失败 %s @%s: %v", s.ID, t.DT.Format(time.RFC3339), err) + continue + } + if !ok { // 无风场:跳过该时次 + skipNoWind++ + if verbose { + log.Printf("跳过: %s 瓦片@%s(桶=%s)在 radar_weather(alias=%s) 无记录", s.ID, t.DT.In(loc).Format("2006-01-02 15:04:05"), bucket.Format("2006-01-02 15:04:05"), s.ID) + } + continue + } + + // 解码 dBZ 网格 + vals, xs, ys, err := decodeTile(t) + if err != nil { + log.Printf("解码瓦片失败 %s @%s: %v", s.ID, t.DT.Format(time.RFC3339), err) + skipDecode++ + continue + } + + // 统计 + sec40Cnt, sec40Sum, sec30Cnt, sec30Sum, + cir40Cnt, cir40Sum, cir30Cnt, cir30Sum := computeStats(vals, xs, ys, s.Lat, s.Lon, windSpeed, windDir) + + // 最近一条累计雨量 + rainTotal, rainOK := loadNearestRain(database.GetDB(), s.ID, t.DT) + if verbose { + log.Printf("写出: %s dt=%s wind=%.3f m/s %.1f° 扇形(>=40:%d/%.1f >=30:%d/%.1f) 圆形(>=40:%d/%.1f >=30:%d/%.1f) rain_total=%v(%.3f)", + s.ID, + t.DT.In(loc).Format("2006-01-02 15:04:05"), + windSpeed, windDir, + sec40Cnt, sec40Sum, sec30Cnt, sec30Sum, + cir40Cnt, cir40Sum, cir30Cnt, cir30Sum, + rainOK, rainTotal, + ) + } + + rec := []string{ + s.ID, + s.Alias, + t.DT.In(loc).Format("2006-01-02 15:04:05"), + fmt.Sprintf("%.6f", s.Lat), + fmt.Sprintf("%.6f", s.Lon), + fmt.Sprintf("%.3f", windSpeed), + fmt.Sprintf("%.2f", windDir), + fmt.Sprintf("%d", sec40Cnt), + fmt.Sprintf("%.1f", sec40Sum), + fmt.Sprintf("%d", sec30Cnt), + fmt.Sprintf("%.1f", sec30Sum), + fmt.Sprintf("%d", cir40Cnt), + fmt.Sprintf("%.1f", cir40Sum), + fmt.Sprintf("%d", cir30Cnt), + fmt.Sprintf("%.1f", cir30Sum), + fmt.Sprintf("%.3f", rainTotal), + } + if err := w.Write(rec); err != nil { + log.Printf("写入CSV失败: %v", err) + } + totalRows++ + } + } + w.Flush() + if err := w.Error(); err != nil { + log.Fatalf("写入CSV失败: %v", err) + } + if verbose { + log.Printf("汇总: 站点数=%d 瓦片总数=%d 跳过(无z/y/x)=%d 跳过(无风)=%d 跳过(解码失败)=%d", len(stations), totalTiles, skipNoZYX, skipNoWind, skipDecode) + } + log.Printf("完成,输出 %d 行到 %s", totalRows, outPath) +} + +func listStations(db *sql.DB, stationID string) ([]stationInfo, error) { + // 与前端一致:device_type='WH65LP' 且 lat/lon 非空且非零 + if strings.TrimSpace(stationID) != "" { + const q = ` + SELECT station_id, + CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias, + latitude, longitude, + COALESCE(z,0), COALESCE(y,0), COALESCE(x,0) + FROM stations + WHERE device_type='WH65LP' AND station_id=$1 + AND latitude IS NOT NULL AND longitude IS NOT NULL + AND latitude<>0 AND longitude<>0` + var s stationInfo + err := db.QueryRow(q, stationID).Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return []stationInfo{s}, nil + } + const qAll = ` + SELECT station_id, + CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias, + latitude, longitude, + COALESCE(z,0), COALESCE(y,0), COALESCE(x,0) + FROM stations + WHERE device_type='WH65LP' + AND latitude IS NOT NULL AND longitude IS NOT NULL + AND latitude<>0 AND longitude<>0 + ORDER BY station_id` + rows, err := db.Query(qAll) + if err != nil { + return nil, err + } + defer rows.Close() + var out []stationInfo + for rows.Next() { + var s stationInfo + if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err == nil { + out = append(out, s) + } + } + return out, nil +} + +func listTiles(ctx context.Context, db *sql.DB, z, y, x int, from, to time.Time) ([]tileRec, error) { + const q = ` + SELECT dt, width, height, west, south, east, north, res_deg, data + FROM radar_tiles + WHERE z=$1 AND y=$2 AND x=$3 AND dt BETWEEN $4 AND $5 + ORDER BY dt ASC` + rows, err := db.QueryContext(ctx, q, z, y, x, from, to) + if err != nil { + return nil, err + } + defer rows.Close() + var out []tileRec + for rows.Next() { + var r tileRec + if err := rows.Scan(&r.DT, &r.Width, &r.Height, &r.West, &r.South, &r.East, &r.North, &r.ResDeg, &r.Data); err == nil { + out = append(out, r) + } + } + return out, nil +} + +func bucket10(t time.Time, loc *time.Location) time.Time { + tt := t.In(loc) + m := (tt.Minute() / 10) * 10 + return time.Date(tt.Year(), tt.Month(), tt.Day(), tt.Hour(), m, 0, 0, loc) +} + +// loadWindAt 以别名(alias)精确匹配 radar_weather;本导出按需传入 station_id 作为 alias 参数 +func loadWindAt(db *sql.DB, alias string, dt time.Time) (speedMS float64, dirDeg float64, ok bool, err error) { + const q = ` + SELECT wind_speed, wind_direction + FROM radar_weather + WHERE alias=$1 AND dt=$2 + LIMIT 1` + var s, d sql.NullFloat64 + err = db.QueryRow(q, alias, dt).Scan(&s, &d) + if err == sql.ErrNoRows { + return 0, 0, false, nil + } + if err != nil { + return 0, 0, false, err + } + if !s.Valid || !d.Valid { + return 0, 0, false, nil + } + return s.Float64, d.Float64, true, nil +} + +func loadNearestRain(db *sql.DB, stationID string, dt time.Time) (rainTotal float64, ok bool) { + // 取最近一条累计雨量(单位mm)。如不存在返回0,false + const q = ` + SELECT rainfall + FROM rs485_weather_data + WHERE station_id=$1 + ORDER BY ABS(EXTRACT(EPOCH FROM (timestamp - $2))) ASC + LIMIT 1` + var r sql.NullFloat64 + if err := db.QueryRow(q, stationID, dt).Scan(&r); err != nil { + return 0, false + } + if !r.Valid { + return 0, false + } + return r.Float64, true +} + +func decodeTile(t tileRec) (vals [][]*float64, xs []float64, ys []float64, err error) { + w, h := t.Width, t.Height + if w <= 0 || h <= 0 { + return nil, nil, nil, fmt.Errorf("非法尺寸") + } + if len(t.Data) < w*h*2 { + return nil, nil, nil, fmt.Errorf("数据长度不足") + } + xs = make([]float64, w) + for c := 0; c < w; c++ { + xs[c] = t.West + (float64(c)+0.5)*t.ResDeg + } + ys = make([]float64, h) + for r := 0; r < h; r++ { + ys[r] = t.South + (float64(r)+0.5)*t.ResDeg + } + vals = make([][]*float64, h) + off := 0 + for r := 0; r < h; r++ { + row := make([]*float64, w) + for c := 0; c < w; c++ { + v := int16(binary.BigEndian.Uint16(t.Data[off : off+2])) + off += 2 + if v >= 32766 { + row[c] = nil + continue + } + dbz := float64(v) / 10.0 + if dbz < 0 { + dbz = 0 + } else if dbz > 75 { + dbz = 75 + } + vv := dbz + row[c] = &vv + } + vals[r] = row + } + return vals, xs, ys, nil +} + +func computeStats(vals [][]*float64, xs, ys []float64, stLat, stLon, windMS, windFromDeg float64) ( + sec40Cnt int, sec40Sum float64, sec30Cnt int, sec30Sum float64, + cir40Cnt int, cir40Sum float64, cir30Cnt int, cir30Sum float64, +) { + h := len(vals) + if h == 0 { + return + } + w := len(vals[0]) + // 半径(米)与半角 + halfAngle := 30.0 + rangeM := windMS * 3 * 3600 + circleR := 8000.0 + + for r := 0; r < h; r++ { + lat := ys[r] + row := vals[r] + for c := 0; c < w; c++ { + if row[c] == nil { + continue + } + dbz := *row[c] + lon := xs[c] + dist := haversine(stLat, stLon, lat, lon) + + // 8km 圆 + if dist <= circleR { + if dbz >= 40 { + cir40Cnt++ + cir40Sum += dbz + } + if dbz >= 30 { + cir30Cnt++ + cir30Sum += dbz + } + } + + // 扇形(需同时满足距离与角度) + if dist <= rangeM { + brg := bearingDeg(stLat, stLon, lat, lon) + if angDiff(brg, windFromDeg) <= halfAngle { + if dbz >= 40 { + sec40Cnt++ + sec40Sum += dbz + } + if dbz >= 30 { + sec30Cnt++ + sec30Sum += dbz + } + } + } + } + } + return +} + +func toRad(d float64) float64 { return d * math.Pi / 180 } +func toDeg(r float64) float64 { return r * 180 / math.Pi } + +func haversine(lat1, lon1, lat2, lon2 float64) float64 { + const R = 6371000.0 + dLat := toRad(lat2 - lat1) + dLon := toRad(lon2 - lon1) + a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2) + c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a)) + return R * c +} + +func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 { + φ1 := toRad(lat1) + φ2 := toRad(lat2) + Δλ := toRad(lon2 - lon1) + y := math.Sin(Δλ) * math.Cos(φ2) + x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ) + brg := toDeg(math.Atan2(y, x)) + if brg < 0 { + brg += 360 + } + return brg +} + +func angDiff(a, b float64) float64 { + d := math.Mod(a-b+540, 360) - 180 + if d < 0 { + d = -d + } + return math.Abs(d) +} diff --git a/cmd/radar_rain_export_csv/main.go b/cmd/radar_rain_export_csv/main.go new file mode 100644 index 0000000..f65a92a --- /dev/null +++ b/cmd/radar_rain_export_csv/main.go @@ -0,0 +1,553 @@ +package main + +import ( + "context" + "database/sql" + "encoding/binary" + "encoding/csv" + "errors" + "flag" + "fmt" + "log" + "math" + "os" + "strings" + "time" + + "weatherstation/internal/database" +) + +type stationInfo struct { + ID string + Alias string + Lat float64 + Lon float64 + Z int + Y int + X int +} + +type tileRec struct { + DT time.Time + Width, Height int + West, South float64 + East, North float64 + ResDeg float64 + Data []byte +} + +func main() { + var stationID string + var startStr string + var endStr string + var outPath string + var verbose bool + var useImdroid bool + + flag.StringVar(&stationID, "station_id", "", "站点ID(留空表示全部符合条件的站)") + flag.StringVar(&startStr, "start", "", "起始时间(YYYY-MM-DD HH:MM:SS,CST,表示区间左端点)") + flag.StringVar(&endStr, "end", "", "结束时间(YYYY-MM-DD HH:MM:SS,CST,表示区间左端点,非包含)") + flag.StringVar(&outPath, "out", "radar_hourly_stats.csv", "输出CSV文件路径") + flag.BoolVar(&verbose, "info", false, "输出详细过程信息") + flag.BoolVar(&useImdroid, "use_imdroid", false, "输出 imdroid 预报(右端点)") + flag.Parse() + + if strings.TrimSpace(startStr) == "" || strings.TrimSpace(endStr) == "" { + log.Fatalln("必须提供 --start 与 --end,格式 YYYY-MM-DD HH:MM:SS") + } + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + startT, err := time.ParseInLocation("2006-01-02 15:04:05", startStr, loc) + if err != nil { + log.Fatalf("解析 start 失败: %v", err) + } + endT, err := time.ParseInLocation("2006-01-02 15:04:05", endStr, loc) + if err != nil { + log.Fatalf("解析 end 失败: %v", err) + } + if !endT.After(startT) { + log.Fatalln("结束时间必须大于起始时间") + } + + _ = database.GetDB() + defer database.Close() + + stations, err := listStations(database.GetDB(), stationID) + if err != nil { + log.Fatalf("查询站点失败: %v", err) + } + if len(stations) == 0 { + log.Fatalln("没有符合条件的站点") + } + if verbose { + log.Printf("站点数量: %d", len(stations)) + for _, s := range stations { + log.Printf("站点: id=%s alias=%s lat=%.5f lon=%.5f z/y/x=%d/%d/%d", s.ID, s.Alias, s.Lat, s.Lon, s.Z, s.Y, s.X) + } + } + + f, err := os.Create(outPath) + if err != nil { + log.Fatalf("创建输出文件失败: %v", err) + } + defer f.Close() + w := csv.NewWriter(f) + defer w.Flush() + + header := []string{ + "station_id", + "station_alias", + "hour_end", + "rain_actual_mm", + "wind_speed_ms", + "wind_dir_deg", + "openmeteo_rain_mm", + "openmeteo_issued", + "caiyun_rain_mm", + "caiyun_issued", + } + if useImdroid { + header = append(header, "imdroid_rain_mm", "imdroid_issued") + } + header = append(header, "radar_circle_max_dbz", "radar_sector_max_dbz") + if err := w.Write(header); err != nil { + log.Fatalf("写入CSV表头失败: %v", err) + } + + ctx := context.Background() + totalRows := 0 + hours := buildHourSlots(startT, endT) + + for _, s := range stations { + if verbose { + log.Printf("处理站点 %s,共 %d 个小时区间", s.ID, len(hours)) + } + for _, slot := range hours { + actual, windSpeed, windDir, hasObs, err := aggregateHourlyObs(ctx, database.GetDB(), s.ID, slot.from, slot.to) + if err != nil { + log.Printf("站点 %s 聚合观测失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err) + continue + } + + openRain, openIssued, hasOpen, err := findLatestForecast(ctx, database.GetDB(), s.ID, "open-meteo", slot.to) + if err != nil { + log.Printf("站点 %s 读取 open-meteo 预报失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err) + } + caiyunRain, caiyunIssued, hasCaiyun, err := findLatestForecast(ctx, database.GetDB(), s.ID, "caiyun", slot.to) + if err != nil { + log.Printf("站点 %s 读取 caiyun 预报失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err) + } + + var ( + imdroidRain float64 + imdroidIssued time.Time + hasImdroid bool + ) + if useImdroid { + var errImdroid error + imdroidRain, imdroidIssued, hasImdroid, errImdroid = findLatestForecast(ctx, database.GetDB(), s.ID, "imdroid", slot.to) + if errImdroid != nil { + log.Printf("站点 %s 读取 imdroid 预报失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), errImdroid) + } + } + + circleMax, sectorMax, hasRadar, err := hourlyRadarMax(ctx, database.GetDB(), s, slot.from, slot.to, loc, verbose) + if err != nil { + log.Printf("站点 %s 统计雷达失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err) + } + + rec := []string{ + s.ID, + s.Alias, + slot.to.Format("2006-01-02 15:04:05"), + formatFloat(actual, hasObs, 3), + formatFloat(windSpeed, hasObs && !math.IsNaN(windSpeed), 3), + formatFloat(windDir, hasObs && !math.IsNaN(windDir), 1), + formatFloat(openRain, hasOpen, 3), + formatTime(openIssued, hasOpen), + formatFloat(caiyunRain, hasCaiyun, 3), + formatTime(caiyunIssued, hasCaiyun), + } + if useImdroid { + rec = append(rec, + formatFloat(imdroidRain, hasImdroid, 3), + formatTime(imdroidIssued, hasImdroid), + ) + } + rec = append(rec, + formatFloat(circleMax, hasRadar && !math.IsNaN(circleMax), 1), + formatFloat(sectorMax, hasRadar && !math.IsNaN(sectorMax), 1), + ) + if err := w.Write(rec); err != nil { + log.Printf("写入CSV失败: %v", err) + } else { + totalRows++ + } + } + } + + w.Flush() + if err := w.Error(); err != nil { + log.Fatalf("写入CSV失败: %v", err) + } + log.Printf("完成,输出 %d 行到 %s", totalRows, outPath) +} + +type hourSlot struct { + from time.Time + to time.Time +} + +func buildHourSlots(from, to time.Time) []hourSlot { + var slots []hourSlot + cursor := from + for cursor.Before(to) { + end := cursor.Add(time.Hour) + if end.After(to) { + end = to + } + slots = append(slots, hourSlot{from: cursor, to: end}) + cursor = end + } + return slots +} + +func listStations(db *sql.DB, stationID string) ([]stationInfo, error) { + if strings.TrimSpace(stationID) != "" { + const q = ` + SELECT station_id, + CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias, + latitude, longitude, + COALESCE(z,0), COALESCE(y,0), COALESCE(x,0) + FROM stations + WHERE station_id=$1 + AND latitude IS NOT NULL AND longitude IS NOT NULL + AND latitude<>0 AND longitude<>0 + AND COALESCE(z,0)=7 AND COALESCE(y,0)=40 AND COALESCE(x,0)=102` + var s stationInfo + err := db.QueryRow(q, stationID).Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return []stationInfo{s}, nil + } + const qAll = ` + SELECT station_id, + CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias, + latitude, longitude, + COALESCE(z,0), COALESCE(y,0), COALESCE(x,0) + FROM stations + WHERE device_type='WH65LP' + AND latitude IS NOT NULL AND longitude IS NOT NULL + AND latitude<>0 AND longitude<>0 + AND COALESCE(z,0)=7 AND COALESCE(y,0)=40 AND COALESCE(x,0)=102 + ORDER BY station_id` + rows, err := db.Query(qAll) + if err != nil { + return nil, err + } + defer rows.Close() + var out []stationInfo + for rows.Next() { + var s stationInfo + if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err == nil { + out = append(out, s) + } + } + return out, nil +} + +func aggregateHourlyObs(ctx context.Context, db *sql.DB, stationID string, from, to time.Time) (rain float64, windSpeed float64, windDir float64, ok bool, err error) { + const q = ` + SELECT wind_speed_ms_x1000, wind_dir_deg, rain_10m_mm_x1000 + FROM rs485_weather_10min + WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3` + rows, err := db.QueryContext(ctx, q, stationID, from, to) + if err != nil { + return 0, 0, 0, false, err + } + defer rows.Close() + + var totalRain int64 + var count int + var sumX, sumY float64 + + for rows.Next() { + var ws sql.NullInt64 + var wd sql.NullInt64 + var rainX sql.NullInt64 + if err := rows.Scan(&ws, &wd, &rainX); err != nil { + return 0, 0, 0, false, err + } + if rainX.Valid { + totalRain += rainX.Int64 + } + if ws.Valid && wd.Valid { + speed := float64(ws.Int64) / 1000.0 + dir := float64(wd.Int64) + rad := toRad(dir) + sumX += speed * math.Cos(rad) + sumY += speed * math.Sin(rad) + count++ + } + } + if err := rows.Err(); err != nil { + return 0, 0, 0, false, err + } + + rain = float64(totalRain) / 1000.0 + windSpeed = math.NaN() + windDir = math.NaN() + if count > 0 { + avgX := sumX / float64(count) + avgY := sumY / float64(count) + windSpeed = math.Hypot(avgX, avgY) + if windSpeed == 0 { + windDir = 0 + } else { + dir := toDeg(math.Atan2(avgY, avgX)) + if dir < 0 { + dir += 360 + } + windDir = dir + } + ok = true + } + return rain, windSpeed, windDir, totalRain > 0 || count > 0, nil +} + +func findLatestForecast(ctx context.Context, db *sql.DB, stationID, provider string, forecastTime time.Time) (rain float64, issued time.Time, ok bool, err error) { + const q = ` + SELECT issued_at, rain_mm_x1000 + FROM forecast_hourly + WHERE station_id=$1 AND provider=$2 AND forecast_time=$3 + ORDER BY issued_at DESC + LIMIT 1` + var issuedAt time.Time + var rainX sql.NullInt64 + err = db.QueryRowContext(ctx, q, stationID, provider, forecastTime).Scan(&issuedAt, &rainX) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return 0, time.Time{}, false, nil + } + return 0, time.Time{}, false, err + } + if !rainX.Valid { + return 0, issuedAt, true, nil + } + return float64(rainX.Int64) / 1000.0, issuedAt, true, nil +} + +func hourlyRadarMax(ctx context.Context, db *sql.DB, s stationInfo, from, to time.Time, loc *time.Location, verbose bool) (circleMax float64, sectorMax float64, ok bool, err error) { + tiles, err := listTiles(ctx, db, s.Z, s.Y, s.X, from, to) + if err != nil { + return math.NaN(), math.NaN(), false, err + } + if len(tiles) == 0 { + return math.NaN(), math.NaN(), false, nil + } + circleMax = math.NaN() + sectorMax = math.NaN() + alias := strings.TrimSpace(s.Alias) + if alias == "" { + alias = s.ID + } + + for _, t := range tiles { + bucket := bucket10(t.DT, loc) + windSpeed, windDir, windOK, err := loadWindAt(db, s.ID, alias, bucket) + if err != nil { + if verbose { + log.Printf("站点 %s 瓦片@%s 读取风失败: %v", s.ID, t.DT.Format(time.RFC3339), err) + } + continue + } + + vals, xs, ys, err := decodeTile(t) + if err != nil { + if verbose { + log.Printf("站点 %s 解码瓦片失败: %v", s.ID, err) + } + continue + } + + for r := 0; r < len(vals); r++ { + row := vals[r] + lat := ys[r] + for c := 0; c < len(row); c++ { + v := row[c] + if v == nil { + continue + } + dbz := *v + lon := xs[c] + dist := haversine(s.Lat, s.Lon, lat, lon) + + if dist <= 8000.0 { + if math.IsNaN(circleMax) || dbz > circleMax { + circleMax = dbz + } + } + + if windOK && windSpeed > 0 { + brg := bearingDeg(s.Lat, s.Lon, lat, lon) + if angDiff(brg, windDir) <= 30.0 && dist <= windSpeed*3*3600 { + if math.IsNaN(sectorMax) || dbz > sectorMax { + sectorMax = dbz + } + } + } + } + } + } + return circleMax, sectorMax, !math.IsNaN(circleMax) || !math.IsNaN(sectorMax), nil +} + +func listTiles(ctx context.Context, db *sql.DB, z, y, x int, from, to time.Time) ([]tileRec, error) { + const q = ` + SELECT dt, width, height, west, south, east, north, res_deg, data + FROM radar_tiles + WHERE z=$1 AND y=$2 AND x=$3 AND dt >= $4 AND dt < $5 + ORDER BY dt ASC` + rows, err := db.QueryContext(ctx, q, z, y, x, from, to) + if err != nil { + return nil, err + } + defer rows.Close() + var out []tileRec + for rows.Next() { + var r tileRec + if err := rows.Scan(&r.DT, &r.Width, &r.Height, &r.West, &r.South, &r.East, &r.North, &r.ResDeg, &r.Data); err == nil { + out = append(out, r) + } + } + return out, nil +} + +func bucket10(t time.Time, loc *time.Location) time.Time { + tt := t.In(loc) + m := (tt.Minute() / 10) * 10 + return time.Date(tt.Year(), tt.Month(), tt.Day(), tt.Hour(), m, 0, 0, loc) +} + +func loadWindAt(db *sql.DB, stationID, alias string, dt time.Time) (speedMS float64, dirDeg float64, ok bool, err error) { + const q = ` + SELECT wind_speed, wind_direction + FROM radar_weather + WHERE alias=$1 AND dt=$2 + LIMIT 1` + tryAlias := func(a string) (float64, float64, bool, error) { + var s, d sql.NullFloat64 + err := db.QueryRow(q, a, dt).Scan(&s, &d) + if err == sql.ErrNoRows { + return 0, 0, false, nil + } + if err != nil { + return 0, 0, false, err + } + if !s.Valid || !d.Valid { + return 0, 0, false, nil + } + return s.Float64, d.Float64, true, nil + } + if speed, dir, ok, err := tryAlias(stationID); err != nil { + return 0, 0, false, err + } else if ok { + return speed, dir, true, nil + } + return tryAlias(alias) +} + +func decodeTile(t tileRec) (vals [][]*float64, xs []float64, ys []float64, err error) { + w, h := t.Width, t.Height + if w <= 0 || h <= 0 { + return nil, nil, nil, fmt.Errorf("非法尺寸") + } + if len(t.Data) < w*h*2 { + return nil, nil, nil, fmt.Errorf("数据长度不足") + } + xs = make([]float64, w) + for c := 0; c < w; c++ { + xs[c] = t.West + (float64(c)+0.5)*t.ResDeg + } + ys = make([]float64, h) + for r := 0; r < h; r++ { + ys[r] = t.South + (float64(r)+0.5)*t.ResDeg + } + vals = make([][]*float64, h) + off := 0 + for r := 0; r < h; r++ { + row := make([]*float64, w) + for c := 0; c < w; c++ { + v := int16(binary.BigEndian.Uint16(t.Data[off : off+2])) + off += 2 + if v >= 32766 { + row[c] = nil + continue + } + dbz := float64(v) / 10.0 + if dbz < 0 { + dbz = 0 + } else if dbz > 75 { + dbz = 75 + } + value := dbz + row[c] = &value + } + vals[r] = row + } + return vals, xs, ys, nil +} + +func haversine(lat1, lon1, lat2, lon2 float64) float64 { + const R = 6371000.0 + dLat := toRad(lat2 - lat1) + dLon := toRad(lon2 - lon1) + a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2) + c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a)) + return R * c +} + +func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 { + φ1 := toRad(lat1) + φ2 := toRad(lat2) + Δλ := toRad(lon2 - lon1) + y := math.Sin(Δλ) * math.Cos(φ2) + x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ) + brg := toDeg(math.Atan2(y, x)) + if brg < 0 { + brg += 360 + } + return brg +} + +func angDiff(a, b float64) float64 { + d := math.Mod(a-b+540, 360) - 180 + if d < 0 { + d = -d + } + return math.Abs(d) +} + +func toRad(d float64) float64 { return d * math.Pi / 180 } +func toDeg(r float64) float64 { return r * 180 / math.Pi } + +func formatFloat(v float64, ok bool, digits int) string { + if !ok || math.IsNaN(v) { + return "" + } + format := fmt.Sprintf("%%.%df", digits) + return fmt.Sprintf(format, v) +} + +func formatTime(t time.Time, ok bool) string { + if !ok || t.IsZero() { + return "" + } + return t.Format("2006-01-02 15:04:05") +} diff --git a/cmd/rainfetch/main.go b/cmd/rainfetch/main.go new file mode 100644 index 0000000..69166a7 --- /dev/null +++ b/cmd/rainfetch/main.go @@ -0,0 +1,227 @@ +package main + +import ( + "context" + "database/sql" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "path/filepath" + "strings" + "time" + + dbpkg "weatherstation/internal/database" + "weatherstation/internal/rain" +) + +// 简单的小时雨量(CMPA)按时间范围下载器: +// - 输入时间为北京时间(Asia/Shanghai) +// - 构造下载路径使用 UTC(本地整点 -8h) +// - 入库前通过 rain.StoreTileBytes 使用 URL 解析将 UTC 还原为北京时间并写库 +// 用法示例: +// +// go run ./cmd/rainfetch --from "2025-10-07 09:00:00" --to "2025-10-07 11:00:00" \ +// --tiles "7/40/102,7/40/104" --outdir rain_data +func main() { + var ( + fromStr = flag.String("from", "", "起始时间(北京时间,YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD)") + toStr = flag.String("to", "", "结束时间(北京时间,YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD)") + tiles = flag.String("tiles", "7/40/102,7/40/104", "瓦片列表,逗号分隔,每项为 z/y/x,如 7/40/102") + outDir = flag.String("outdir", "rain_data", "保存目录(同时也会写入数据库)") + baseURL = flag.String("base", "https://image.data.cma.cn/tiles/China/CMPA_RT_China_0P01_HOR-PRE_GISJPG_Tiles/%Y%m%d/%H/%M/{z}/{y}/{x}.bin", "下载基础URL模板(UTC路径时间)") + dryRun = flag.Bool("dry", false, "仅打印将要下载的URL与目标,不实际下载写库") + ) + flag.Parse() + + if strings.TrimSpace(*fromStr) == "" || strings.TrimSpace(*toStr) == "" { + log.Fatalln("必须提供 --from 与 --to(北京时间)") + } + + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + + parseCST := func(s string) (time.Time, error) { + s = strings.TrimSpace(s) + var t time.Time + var err error + if len(s) == len("2006-01-02") { + // 日期:按 00:00:00 处理 + if tm, e := time.ParseInLocation("2006-01-02", s, loc); e == nil { + t = tm + } else { + err = e + } + } else { + t, err = time.ParseInLocation("2006-01-02 15:04:05", s, loc) + } + return t, err + } + + start, err1 := parseCST(*fromStr) + end, err2 := parseCST(*toStr) + if err1 != nil || err2 != nil { + log.Fatalf("解析时间失败: from=%v to=%v", err1, err2) + } + if end.Before(start) { + log.Fatalln("结束时间需不小于起始时间") + } + + // 小时步进(包含端点):先对齐到小时 + cur := start.Truncate(time.Hour) + end = end.Truncate(time.Hour) + + // 解析 tiles 参数 + type tcoord struct{ z, y, x int } + var tlist []tcoord + for _, part := range strings.Split(*tiles, ",") { + p := strings.TrimSpace(part) + if p == "" { + continue + } + var z, y, x int + if _, err := fmt.Sscanf(p, "%d/%d/%d", &z, &y, &x); err != nil { + log.Fatalf("无效的 tiles 项: %s", p) + } + tlist = append(tlist, tcoord{z, y, x}) + } + if len(tlist) == 0 { + log.Fatalln("tiles 解析后为空") + } + + if err := os.MkdirAll(*outDir, 0o755); err != nil { + log.Fatalf("创建输出目录失败: %v", err) + } + + ctx := context.Background() + total := 0 + success := 0 + for !cur.After(end) { + // 本地整点(CST)→ UTC 路径时间 + slotLocal := cur + slotUTC := slotLocal.Add(-8 * time.Hour).In(time.UTC) + dateStr := slotUTC.Format("20060102") + hh := slotUTC.Format("15") + mm := "00" + log.Printf("[rainfetch] 时次 local=%s, utc=%s", slotLocal.Format("2006-01-02 15:04"), slotUTC.Format("2006-01-02 15:04")) + + for _, tc := range tlist { + total++ + // 构造 URL + url := *baseURL + url = strings.ReplaceAll(url, "%Y%m%d", dateStr) + url = strings.ReplaceAll(url, "%H", hh) + url = strings.ReplaceAll(url, "%M", mm) + url = strings.ReplaceAll(url, "{z}", fmt.Sprintf("%d", tc.z)) + url = strings.ReplaceAll(url, "{y}", fmt.Sprintf("%d", tc.y)) + url = strings.ReplaceAll(url, "{x}", fmt.Sprintf("%d", tc.x)) + + fname := fmt.Sprintf("rain_z%d_y%d_x%d_%s.bin", tc.z, tc.y, tc.x, slotLocal.Format("20060102_1504")) + dest := filepath.Join(*outDir, fname) + + if *dryRun { + log.Printf("[rainfetch] DRY url=%s -> %s", url, dest) + continue + } + + // 若 DB 已有,则跳过 + if ref, e := rain.ParseCMPATileURL(url); e == nil { + exists, e2 := databaseHas(ctx, ref.Product, ref.DT, tc.z, tc.y, tc.x) + if e2 == nil && exists { + log.Printf("[rainfetch] skip exists in DB z=%d y=%d x=%d dt=%s", tc.z, tc.y, tc.x, ref.DT.Format("2006-01-02 15:04")) + continue + } + } + + if err := httpDownloadTo(ctx, url, dest); err != nil { + log.Printf("[rainfetch] 下载失败 z=%d y=%d x=%d: %v", tc.z, tc.y, tc.x, err) + continue + } + log.Printf("[rainfetch] 保存 %s", dest) + + // 写库(使用 URL 解析 UTC → CST 后 upsert) + b, rerr := os.ReadFile(dest) + if rerr != nil { + log.Printf("[rainfetch] 读文件失败: %v", rerr) + continue + } + if err := rain.StoreTileBytes(ctx, url, b); err != nil { + log.Printf("[rainfetch] 入库失败: %v", err) + continue + } + success++ + } + cur = cur.Add(1 * time.Hour) + } + log.Printf("[rainfetch] 完成:尝试 %d,成功 %d", total, success) +} + +// 轻量 DB 存在性检查(避免引入内部 database 包到该命令): +// 为了避免循环依赖,这里复制一份最小 SQL 调用;实际工程也可抽取共享函数。 +// 但当前 repo 中 database.GetDB 在 internal/database 包内,雨量 API 直接使用它。 + +// 注意:为保持最小侵入,这里通过 rain.StoreTileBytes 完成入库; +// 仅在下载前进行“是否已存在”查询,避免重复下载。为此需要访问 internal/database。 + +func databaseHas(ctx context.Context, product string, dt time.Time, z, y, x int) (bool, error) { + const q = `SELECT 1 FROM rain_tiles WHERE product=$1 AND dt=$2 AND z=$3 AND y=$4 AND x=$5 LIMIT 1` + var one int + err := dbpkg.GetDB().QueryRowContext(ctx, q, product, dt, z, y, x).Scan(&one) + if err == sql.ErrNoRows { + return false, nil + } + if err != nil { + return false, err + } + return true, nil +} + +func httpDownloadTo(ctx context.Context, url, dest string) error { + client := &http.Client{Timeout: 20 * time.Second} + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return fmt.Errorf("build request: %w", err) + } + req.Header.Set("Referer", "https://data.cma.cn/") + req.Header.Set("Origin", "https://data.cma.cn") + req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36") + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("http get: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status: %d", resp.StatusCode) + } + tmp := dest + ".part" + f, err := os.Create(tmp) + if err != nil { + return fmt.Errorf("create temp: %w", err) + } + _, copyErr := io.Copy(f, resp.Body) + closeErr := f.Close() + if copyErr != nil { + _ = os.Remove(tmp) + return fmt.Errorf("write body: %w", copyErr) + } + if closeErr != nil { + _ = os.Remove(tmp) + return fmt.Errorf("close temp: %w", closeErr) + } + if err := os.Rename(tmp, dest); err != nil { + // Cross-device fallback + data, rerr := os.ReadFile(tmp) + if rerr != nil { + return fmt.Errorf("read temp: %w", rerr) + } + if werr := os.WriteFile(dest, data, 0o644); werr != nil { + return fmt.Errorf("write final: %w", werr) + } + _ = os.Remove(tmp) + } + return nil +} diff --git a/cmd/service-rain/main.go b/cmd/service-rain/main.go new file mode 100644 index 0000000..244cbb8 --- /dev/null +++ b/cmd/service-rain/main.go @@ -0,0 +1,28 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + "weatherstation/internal/rain" + "weatherstation/internal/server" +) + +// service-rain: standalone CMPA hourly rain tile downloader +// - Uses internal/rain scheduler with defaults +// - Controlled by env vars in internal/rain (e.g., RAIN_ENABLED, RAIN_DIR, RAIN_BASE_URL) +func main() { + server.SetupLogger() + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + if err := rain.Start(ctx, rain.Options{StoreToDB: true}); err != nil { + log.Fatalf("service-rain start error: %v", err) + } + + <-ctx.Done() + log.Println("service-rain shutting down") +} diff --git a/core/cmd/im_export_data/main.go b/core/cmd/im_export_data/main.go new file mode 100644 index 0000000..db70a89 --- /dev/null +++ b/core/cmd/im_export_data/main.go @@ -0,0 +1,235 @@ +package main + +import ( + "context" + "database/sql" + "encoding/csv" + "flag" + "fmt" + "log" + "os" + "strings" + "time" + + "weatherstation/core/internal/data" +) + +type actualHour struct { + HourEnd time.Time + TempC float64 + HumidityPct float64 + PressureHpa float64 + WindSpeedMs float64 + WindDirDeg float64 + RainActualMM float64 +} + +func main() { + var stationID, startStr, endStr, providersCSV, outPath, tzName string + flag.StringVar(&stationID, "station", "", "站点ID,如 RS485-002A6E") + flag.StringVar(&startStr, "start", "", "开始时间,格式 2006-01-02 15:00") + flag.StringVar(&endStr, "end", "", "结束时间,格式 2006-01-02 15:00(开区间)") + flag.StringVar(&providersCSV, "providers", "caiyun,ec,wrf", "逗号分隔的预报源,默认 caiyun,ec,wrf") + flag.StringVar(&outPath, "out", "", "输出 CSV 文件路径;留空输出到 stdout") + flag.StringVar(&tzName, "tz", "Asia/Shanghai", "时区,例如 Asia/Shanghai") + flag.Parse() + + if strings.TrimSpace(stationID) == "" || strings.TrimSpace(startStr) == "" || strings.TrimSpace(endStr) == "" { + log.Fatalf("用法: im_export_data --station RS485-XXXXXX --start '2024-08-01 00:00' --end '2024-08-02 00:00' [--providers caiyun,ec,wrf] [--out out.csv]") + } + + loc, _ := time.LoadLocation(tzName) + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + parse := func(s string) time.Time { + var t time.Time + var err error + for _, ly := range []string{"2006-01-02 15:04", "2006-01-02 15", "2006-01-02"} { + t, err = time.ParseInLocation(ly, s, loc) + if err == nil { + return t.Truncate(time.Hour) + } + } + log.Fatalf("无法解析时间: %s", s) + return time.Time{} + } + start := parse(startStr) + end := parse(endStr) + if !end.After(start) { + log.Fatalf("end 必须大于 start") + } + + providers := splitCSV(providersCSV) + if len(providers) == 0 { + providers = []string{"caiyun"} + } + + // Prepare writer + var out *csv.Writer + var file *os.File + if strings.TrimSpace(outPath) != "" { + f, err := os.Create(outPath) + if err != nil { + log.Fatalf("打开输出文件失败: %v", err) + } + defer f.Close() + out = csv.NewWriter(f) + file = f + } else { + out = csv.NewWriter(os.Stdout) + } + defer out.Flush() + + // Header + header := []string{"station_id", "hour_end", "temp_c", "humidity_pct", "wind_dir_deg", "wind_speed_ms", "pressure_hpa", "rain_actual_mm"} + for _, p := range providers { + header = append(header, fmt.Sprintf("%s_lead1_rain_mm", p)) + header = append(header, fmt.Sprintf("%s_lead2_rain_mm", p)) + header = append(header, fmt.Sprintf("%s_lead3_rain_mm", p)) + } + if err := out.Write(header); err != nil { + log.Fatalf("写入 CSV 失败: %v", err) + } + + ctx := context.Background() + rows, err := loadActualHourly(ctx, stationID, start, end) + if err != nil { + log.Fatalf("查询实况失败: %v", err) + } + + for _, row := range rows { + rec := []string{ + stationID, + row.HourEnd.Format("2006-01-02 15:04:05"), + fmt.Sprintf("%.2f", row.TempC), + fmt.Sprintf("%.2f", row.HumidityPct), + fmt.Sprintf("%.2f", row.WindDirDeg), + fmt.Sprintf("%.3f", row.WindSpeedMs), + fmt.Sprintf("%.2f", row.PressureHpa), + fmt.Sprintf("%.3f", row.RainActualMM), + } + for _, p := range providers { + // For each lead 1..3, get rain for forecast_time = hour_end, latest issued_at for that lead + for lead := 1; lead <= 3; lead++ { + v, _ := loadProviderRainAt(ctx, stationID, p, row.HourEnd, lead) + if v < 0 { + rec = append(rec, "") + } else { + rec = append(rec, fmt.Sprintf("%.3f", v)) + } + } + } + if err := out.Write(rec); err != nil { + log.Fatalf("写入 CSV 失败: %v", err) + } + } + out.Flush() + if err := out.Error(); err != nil { + log.Fatalf("写入 CSV 错误: %v", err) + } + if file != nil { + log.Printf("导出完成: %s,共 %d 行", outPath, len(rows)) + } +} + +func splitCSV(s string) []string { + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + out = append(out, p) + } + } + return out +} + +func loadActualHourly(ctx context.Context, stationID string, start, end time.Time) ([]actualHour, error) { + // Right-endpoint hourly aggregation from rs485_weather_10min + const q = ` +WITH base AS ( + SELECT * FROM rs485_weather_10min + WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3 +), g AS ( + SELECT date_trunc('hour', bucket_start) AS grp, + SUM(temp_c_x100 * sample_count)::bigint AS w_temp, + SUM(humidity_pct * sample_count)::bigint AS w_hum, + SUM(pressure_hpa_x100 * sample_count)::bigint AS w_p, + SUM(solar_wm2_x100 * sample_count)::bigint AS w_solar, + SUM(uv_index * sample_count)::bigint AS w_uv, + SUM(wind_speed_ms_x1000 * sample_count)::bigint AS w_ws, + MAX(wind_gust_ms_x1000) AS gust_max, + SUM(sin(radians(wind_dir_deg)) * sample_count)::double precision AS sin_sum, + SUM(cos(radians(wind_dir_deg)) * sample_count)::double precision AS cos_sum, + SUM(rain_10m_mm_x1000) AS rain_sum, + SUM(sample_count) AS n_sum + FROM base GROUP BY 1 +) +SELECT grp + interval '1 hour' AS hour_end, + (w_temp/NULLIF(n_sum,0))/100.0 AS temp_c, + (w_hum/NULLIF(n_sum,0))::double precision AS humidity_pct, + (w_p/NULLIF(n_sum,0))/100.0 AS pressure_hpa, + (w_ws/NULLIF(n_sum,0))/1000.0 AS wind_speed_ms, + CASE WHEN sin_sum IS NULL OR cos_sum IS NULL THEN NULL + ELSE ( + CASE WHEN degrees(atan2(sin_sum, cos_sum)) < 0 + THEN degrees(atan2(sin_sum, cos_sum)) + 360 + ELSE degrees(atan2(sin_sum, cos_sum)) END) + END AS wind_dir_deg, + (rain_sum/1000.0) AS rain_mm +FROM g +ORDER BY hour_end` + + rows, err := data.DB().QueryContext(ctx, q, stationID, start, end) + if err != nil { + return nil, err + } + defer rows.Close() + var out []actualHour + for rows.Next() { + var t time.Time + var ta, ua, pa, ws, dm, rain sql.NullFloat64 + if err := rows.Scan(&t, &ta, &ua, &pa, &ws, &dm, &rain); err != nil { + continue + } + out = append(out, actualHour{ + HourEnd: t, + TempC: nullF(ta), + HumidityPct: nullF(ua), + PressureHpa: nullF(pa), + WindSpeedMs: nullF(ws), + WindDirDeg: nullF(dm), + RainActualMM: nullF(rain), + }) + } + return out, nil +} + +// loadProviderRainAt returns rain(mm) for a provider at forecast_time=t with fixed lead, picking latest issued_at. +func loadProviderRainAt(ctx context.Context, stationID, provider string, t time.Time, lead int) (float64, error) { + const q = ` +SELECT COALESCE(rain_mm_x1000,0)::bigint +FROM ( + SELECT rain_mm_x1000, issued_at, + CEIL(EXTRACT(EPOCH FROM ($3 - issued_at)) / 3600.0)::int AS lead_hours + FROM forecast_hourly + WHERE station_id=$1 AND provider=$2 AND forecast_time=$3 +) x +WHERE lead_hours=$4 +ORDER BY issued_at DESC +LIMIT 1` + var v int64 + err := data.DB().QueryRowContext(ctx, q, stationID, provider, t, lead).Scan(&v) + if err != nil { + return -1, err + } + return float64(v) / 1000.0, nil +} + +func nullF(n sql.NullFloat64) float64 { + if n.Valid { + return n.Float64 + } + return 0 +}