package main import ( "context" "database/sql" "encoding/binary" "encoding/csv" "errors" "flag" "fmt" "log" "math" "os" "strings" "time" "weatherstation/internal/database" ) type stationInfo struct { ID string Alias string Lat float64 Lon float64 Z int Y int X int } type tileRec struct { DT time.Time Width, Height int West, South float64 East, North float64 ResDeg float64 Data []byte } func main() { var stationID string var startStr string var endStr string var outPath string var verbose bool var useImdroid bool flag.StringVar(&stationID, "station_id", "", "站点ID(留空表示全部符合条件的站)") flag.StringVar(&startStr, "start", "", "起始时间(YYYY-MM-DD HH:MM:SS,CST,表示区间左端点)") flag.StringVar(&endStr, "end", "", "结束时间(YYYY-MM-DD HH:MM:SS,CST,表示区间左端点,非包含)") flag.StringVar(&outPath, "out", "radar_hourly_stats.csv", "输出CSV文件路径") flag.BoolVar(&verbose, "info", false, "输出详细过程信息") flag.BoolVar(&useImdroid, "use_imdroid", false, "输出 imdroid 预报(右端点)") flag.Parse() if strings.TrimSpace(startStr) == "" || strings.TrimSpace(endStr) == "" { log.Fatalln("必须提供 --start 与 --end,格式 YYYY-MM-DD HH:MM:SS") } loc, _ := time.LoadLocation("Asia/Shanghai") if loc == nil { loc = time.FixedZone("CST", 8*3600) } startT, err := time.ParseInLocation("2006-01-02 15:04:05", startStr, loc) if err != nil { log.Fatalf("解析 start 失败: %v", err) } endT, err := time.ParseInLocation("2006-01-02 15:04:05", endStr, loc) if err != nil { log.Fatalf("解析 end 失败: %v", err) } if !endT.After(startT) { log.Fatalln("结束时间必须大于起始时间") } _ = database.GetDB() defer database.Close() stations, err := listStations(database.GetDB(), stationID) if err != nil { log.Fatalf("查询站点失败: %v", err) } if len(stations) == 0 { log.Fatalln("没有符合条件的站点") } if verbose { log.Printf("站点数量: %d", len(stations)) for _, s := range stations { log.Printf("站点: id=%s alias=%s lat=%.5f lon=%.5f z/y/x=%d/%d/%d", s.ID, s.Alias, s.Lat, s.Lon, s.Z, s.Y, s.X) } } f, err := os.Create(outPath) if err != nil { log.Fatalf("创建输出文件失败: %v", err) } defer f.Close() w := csv.NewWriter(f) defer w.Flush() header := []string{ "station_id", "station_alias", "hour_end", "rain_actual_mm", "wind_speed_ms", "wind_dir_deg", "openmeteo_rain_mm", "openmeteo_issued", "caiyun_rain_mm", "caiyun_issued", } if useImdroid { header = append(header, "imdroid_rain_mm", "imdroid_issued") } header = append(header, "radar_circle_max_dbz", "radar_sector_max_dbz") if err := w.Write(header); err != nil { log.Fatalf("写入CSV表头失败: %v", err) } ctx := context.Background() totalRows := 0 hours := buildHourSlots(startT, endT) for _, s := range stations { if verbose { log.Printf("处理站点 %s,共 %d 个小时区间", s.ID, len(hours)) } for _, slot := range hours { actual, windSpeed, windDir, hasObs, err := aggregateHourlyObs(ctx, database.GetDB(), s.ID, slot.from, slot.to) if err != nil { log.Printf("站点 %s 聚合观测失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err) continue } openRain, openIssued, hasOpen, err := findLatestForecast(ctx, database.GetDB(), s.ID, "open-meteo", slot.to) if err != nil { log.Printf("站点 %s 读取 open-meteo 预报失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err) } caiyunRain, caiyunIssued, hasCaiyun, err := findLatestForecast(ctx, database.GetDB(), s.ID, "caiyun", slot.to) if err != nil { log.Printf("站点 %s 读取 caiyun 预报失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err) } var ( imdroidRain float64 imdroidIssued time.Time hasImdroid bool ) if useImdroid { var errImdroid error imdroidRain, imdroidIssued, hasImdroid, errImdroid = findLatestForecast(ctx, database.GetDB(), s.ID, "imdroid", slot.to) if errImdroid != nil { log.Printf("站点 %s 读取 imdroid 预报失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), errImdroid) } } circleMax, sectorMax, hasRadar, err := hourlyRadarMax(ctx, database.GetDB(), s, slot.from, slot.to, loc, verbose) if err != nil { log.Printf("站点 %s 统计雷达失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err) } rec := []string{ s.ID, s.Alias, slot.to.Format("2006-01-02 15:04:05"), formatFloat(actual, hasObs, 3), formatFloat(windSpeed, hasObs && !math.IsNaN(windSpeed), 3), formatFloat(windDir, hasObs && !math.IsNaN(windDir), 1), formatFloat(openRain, hasOpen, 3), formatTime(openIssued, hasOpen), formatFloat(caiyunRain, hasCaiyun, 3), formatTime(caiyunIssued, hasCaiyun), } if useImdroid { rec = append(rec, formatFloat(imdroidRain, hasImdroid, 3), formatTime(imdroidIssued, hasImdroid), ) } rec = append(rec, formatFloat(circleMax, hasRadar && !math.IsNaN(circleMax), 1), formatFloat(sectorMax, hasRadar && !math.IsNaN(sectorMax), 1), ) if err := w.Write(rec); err != nil { log.Printf("写入CSV失败: %v", err) } else { totalRows++ } } } w.Flush() if err := w.Error(); err != nil { log.Fatalf("写入CSV失败: %v", err) } log.Printf("完成,输出 %d 行到 %s", totalRows, outPath) } type hourSlot struct { from time.Time to time.Time } func buildHourSlots(from, to time.Time) []hourSlot { var slots []hourSlot cursor := from for cursor.Before(to) { end := cursor.Add(time.Hour) if end.After(to) { end = to } slots = append(slots, hourSlot{from: cursor, to: end}) cursor = end } return slots } func listStations(db *sql.DB, stationID string) ([]stationInfo, error) { if strings.TrimSpace(stationID) != "" { const q = ` SELECT station_id, CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias, latitude, longitude, COALESCE(z,0), COALESCE(y,0), COALESCE(x,0) FROM stations WHERE station_id=$1 AND latitude IS NOT NULL AND longitude IS NOT NULL AND latitude<>0 AND longitude<>0 AND COALESCE(z,0)=7 AND COALESCE(y,0)=40 AND COALESCE(x,0)=102` var s stationInfo err := db.QueryRow(q, stationID).Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, nil } return nil, err } return []stationInfo{s}, nil } const qAll = ` SELECT station_id, CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias, latitude, longitude, COALESCE(z,0), COALESCE(y,0), COALESCE(x,0) FROM stations WHERE device_type='WH65LP' AND latitude IS NOT NULL AND longitude IS NOT NULL AND latitude<>0 AND longitude<>0 AND COALESCE(z,0)=7 AND COALESCE(y,0)=40 AND COALESCE(x,0)=102 ORDER BY station_id` rows, err := db.Query(qAll) if err != nil { return nil, err } defer rows.Close() var out []stationInfo for rows.Next() { var s stationInfo if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err == nil { out = append(out, s) } } return out, nil } func aggregateHourlyObs(ctx context.Context, db *sql.DB, stationID string, from, to time.Time) (rain float64, windSpeed float64, windDir float64, ok bool, err error) { const q = ` SELECT wind_speed_ms_x1000, wind_dir_deg, rain_10m_mm_x1000 FROM rs485_weather_10min WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3` rows, err := db.QueryContext(ctx, q, stationID, from, to) if err != nil { return 0, 0, 0, false, err } defer rows.Close() var totalRain int64 var count int var sumX, sumY float64 for rows.Next() { var ws sql.NullInt64 var wd sql.NullInt64 var rainX sql.NullInt64 if err := rows.Scan(&ws, &wd, &rainX); err != nil { return 0, 0, 0, false, err } if rainX.Valid { totalRain += rainX.Int64 } if ws.Valid && wd.Valid { speed := float64(ws.Int64) / 1000.0 dir := float64(wd.Int64) rad := toRad(dir) sumX += speed * math.Cos(rad) sumY += speed * math.Sin(rad) count++ } } if err := rows.Err(); err != nil { return 0, 0, 0, false, err } rain = float64(totalRain) / 1000.0 windSpeed = math.NaN() windDir = math.NaN() if count > 0 { avgX := sumX / float64(count) avgY := sumY / float64(count) windSpeed = math.Hypot(avgX, avgY) if windSpeed == 0 { windDir = 0 } else { dir := toDeg(math.Atan2(avgY, avgX)) if dir < 0 { dir += 360 } windDir = dir } ok = true } return rain, windSpeed, windDir, totalRain > 0 || count > 0, nil } func findLatestForecast(ctx context.Context, db *sql.DB, stationID, provider string, forecastTime time.Time) (rain float64, issued time.Time, ok bool, err error) { const q = ` SELECT issued_at, rain_mm_x1000 FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND forecast_time=$3 ORDER BY issued_at DESC LIMIT 1` var issuedAt time.Time var rainX sql.NullInt64 err = db.QueryRowContext(ctx, q, stationID, provider, forecastTime).Scan(&issuedAt, &rainX) if err != nil { if errors.Is(err, sql.ErrNoRows) { return 0, time.Time{}, false, nil } return 0, time.Time{}, false, err } if !rainX.Valid { return 0, issuedAt, true, nil } return float64(rainX.Int64) / 1000.0, issuedAt, true, nil } func hourlyRadarMax(ctx context.Context, db *sql.DB, s stationInfo, from, to time.Time, loc *time.Location, verbose bool) (circleMax float64, sectorMax float64, ok bool, err error) { tiles, err := listTiles(ctx, db, s.Z, s.Y, s.X, from, to) if err != nil { return math.NaN(), math.NaN(), false, err } if len(tiles) == 0 { return math.NaN(), math.NaN(), false, nil } circleMax = math.NaN() sectorMax = math.NaN() alias := strings.TrimSpace(s.Alias) if alias == "" { alias = s.ID } for _, t := range tiles { bucket := bucket10(t.DT, loc) windSpeed, windDir, windOK, err := loadWindAt(db, s.ID, alias, bucket) if err != nil { if verbose { log.Printf("站点 %s 瓦片@%s 读取风失败: %v", s.ID, t.DT.Format(time.RFC3339), err) } continue } vals, xs, ys, err := decodeTile(t) if err != nil { if verbose { log.Printf("站点 %s 解码瓦片失败: %v", s.ID, err) } continue } for r := 0; r < len(vals); r++ { row := vals[r] lat := ys[r] for c := 0; c < len(row); c++ { v := row[c] if v == nil { continue } dbz := *v lon := xs[c] dist := haversine(s.Lat, s.Lon, lat, lon) if dist <= 8000.0 { if math.IsNaN(circleMax) || dbz > circleMax { circleMax = dbz } } if windOK && windSpeed > 0 { brg := bearingDeg(s.Lat, s.Lon, lat, lon) if angDiff(brg, windDir) <= 30.0 && dist <= windSpeed*3*3600 { if math.IsNaN(sectorMax) || dbz > sectorMax { sectorMax = dbz } } } } } } return circleMax, sectorMax, !math.IsNaN(circleMax) || !math.IsNaN(sectorMax), nil } func listTiles(ctx context.Context, db *sql.DB, z, y, x int, from, to time.Time) ([]tileRec, error) { const q = ` SELECT dt, width, height, west, south, east, north, res_deg, data FROM radar_tiles WHERE z=$1 AND y=$2 AND x=$3 AND dt >= $4 AND dt < $5 ORDER BY dt ASC` rows, err := db.QueryContext(ctx, q, z, y, x, from, to) if err != nil { return nil, err } defer rows.Close() var out []tileRec for rows.Next() { var r tileRec if err := rows.Scan(&r.DT, &r.Width, &r.Height, &r.West, &r.South, &r.East, &r.North, &r.ResDeg, &r.Data); err == nil { out = append(out, r) } } return out, nil } func bucket10(t time.Time, loc *time.Location) time.Time { tt := t.In(loc) m := (tt.Minute() / 10) * 10 return time.Date(tt.Year(), tt.Month(), tt.Day(), tt.Hour(), m, 0, 0, loc) } func loadWindAt(db *sql.DB, stationID, alias string, dt time.Time) (speedMS float64, dirDeg float64, ok bool, err error) { const q = ` SELECT wind_speed, wind_direction FROM radar_weather WHERE alias=$1 AND dt=$2 LIMIT 1` tryAlias := func(a string) (float64, float64, bool, error) { var s, d sql.NullFloat64 err := db.QueryRow(q, a, dt).Scan(&s, &d) if err == sql.ErrNoRows { return 0, 0, false, nil } if err != nil { return 0, 0, false, err } if !s.Valid || !d.Valid { return 0, 0, false, nil } return s.Float64, d.Float64, true, nil } if speed, dir, ok, err := tryAlias(stationID); err != nil { return 0, 0, false, err } else if ok { return speed, dir, true, nil } return tryAlias(alias) } func decodeTile(t tileRec) (vals [][]*float64, xs []float64, ys []float64, err error) { w, h := t.Width, t.Height if w <= 0 || h <= 0 { return nil, nil, nil, fmt.Errorf("非法尺寸") } if len(t.Data) < w*h*2 { return nil, nil, nil, fmt.Errorf("数据长度不足") } xs = make([]float64, w) for c := 0; c < w; c++ { xs[c] = t.West + (float64(c)+0.5)*t.ResDeg } ys = make([]float64, h) for r := 0; r < h; r++ { ys[r] = t.South + (float64(r)+0.5)*t.ResDeg } vals = make([][]*float64, h) off := 0 for r := 0; r < h; r++ { row := make([]*float64, w) for c := 0; c < w; c++ { v := int16(binary.BigEndian.Uint16(t.Data[off : off+2])) off += 2 if v >= 32766 { row[c] = nil continue } dbz := float64(v) / 10.0 if dbz < 0 { dbz = 0 } else if dbz > 75 { dbz = 75 } value := dbz row[c] = &value } vals[r] = row } return vals, xs, ys, nil } func haversine(lat1, lon1, lat2, lon2 float64) float64 { const R = 6371000.0 dLat := toRad(lat2 - lat1) dLon := toRad(lon2 - lon1) a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2) c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a)) return R * c } func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 { φ1 := toRad(lat1) φ2 := toRad(lat2) Δλ := toRad(lon2 - lon1) y := math.Sin(Δλ) * math.Cos(φ2) x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ) brg := toDeg(math.Atan2(y, x)) if brg < 0 { brg += 360 } return brg } func angDiff(a, b float64) float64 { d := math.Mod(a-b+540, 360) - 180 if d < 0 { d = -d } return math.Abs(d) } func toRad(d float64) float64 { return d * math.Pi / 180 } func toDeg(r float64) float64 { return r * 180 / math.Pi } func formatFloat(v float64, ok bool, digits int) string { if !ok || math.IsNaN(v) { return "" } format := fmt.Sprintf("%%.%df", digits) return fmt.Sprintf(format, v) } func formatTime(t time.Time, ok bool) string { if !ok || t.IsZero() { return "" } return t.Format("2006-01-02 15:04:05") }