449 lines
12 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"database/sql"
"encoding/binary"
"encoding/csv"
"errors"
"flag"
"fmt"
"log"
"math"
"os"
"strings"
"time"
"weatherstation/internal/database"
)
type stationInfo struct {
ID string
Alias string
Lat float64
Lon float64
Z int
Y int
X int
}
type tileRec struct {
DT time.Time
Width, Height int
West, South float64
East, North float64
ResDeg float64
Data []byte
}
func main() {
var stationID string
var startStr string
var endStr string
var outPath string
var verbose bool
flag.StringVar(&stationID, "station_id", "", "站点ID留空表示全部WH65LP且有经纬度的站")
flag.StringVar(&startStr, "start", "", "起始时间YYYY-MM-DD HH:MM:SSCST")
flag.StringVar(&endStr, "end", "", "结束时间YYYY-MM-DD HH:MM:SSCST")
flag.StringVar(&outPath, "out", "radar_stats.csv", "输出CSV文件路径")
flag.BoolVar(&verbose, "info", false, "输出详细过程信息")
flag.Parse()
if strings.TrimSpace(startStr) == "" || strings.TrimSpace(endStr) == "" {
log.Fatalln("必须提供 --start 与 --end格式 YYYY-MM-DD HH:MM:SS")
}
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
startT, err := time.ParseInLocation("2006-01-02 15:04:05", startStr, loc)
if err != nil {
log.Fatalf("解析 start 失败: %v", err)
}
endT, err := time.ParseInLocation("2006-01-02 15:04:05", endStr, loc)
if err != nil {
log.Fatalf("解析 end 失败: %v", err)
}
if !endT.After(startT) {
log.Fatalln("结束时间必须大于起始时间")
}
// 初始化数据库
_ = database.GetDB()
defer database.Close()
// 获取站点列表
stations, err := listStations(database.GetDB(), stationID)
if err != nil {
log.Fatalf("查询站点失败: %v", err)
}
if len(stations) == 0 {
log.Fatalln("没有符合条件的站点")
}
if verbose {
log.Printf("站点数量: %d", len(stations))
for _, s := range stations {
log.Printf("站点: id=%s alias=%s lat=%.5f lon=%.5f z/y/x=%d/%d/%d", s.ID, s.Alias, s.Lat, s.Lon, s.Z, s.Y, s.X)
}
}
// 创建CSV
f, err := os.Create(outPath)
if err != nil {
log.Fatalf("创建输出文件失败: %v", err)
}
defer f.Close()
w := csv.NewWriter(f)
defer w.Flush()
header := []string{
"station_id", "station_alias", "dt", "lat", "lon", "wind_speed_ms", "wind_dir_deg",
"sector_ge40_cnt", "sector_ge40_sum", "sector_ge30_cnt", "sector_ge30_sum",
"circle_ge40_cnt", "circle_ge40_sum", "circle_ge30_cnt", "circle_ge30_sum",
"rs485_rain_total_mm",
}
if err := w.Write(header); err != nil {
log.Fatalf("写入CSV表头失败: %v", err)
}
ctx := context.Background()
totalRows := 0
var totalTiles, skipNoZYX, skipNoWind, skipDecode int
for _, s := range stations {
if s.Z == 0 && s.Y == 0 && s.X == 0 {
log.Printf("跳过站点 %s无z/y/x映射", s.ID)
skipNoZYX++
continue
}
tiles, err := listTiles(ctx, database.GetDB(), s.Z, s.Y, s.X, startT, endT)
if err != nil {
log.Printf("查询瓦片失败 station=%s: %v", s.ID, err)
continue
}
totalTiles += len(tiles)
if verbose {
log.Printf("站点 %s 瓦片数量: %d", s.ID, len(tiles))
}
if len(tiles) == 0 {
log.Printf("站点 %s 在范围内无瓦片", s.ID)
}
for _, t := range tiles {
// 10分钟向下取整时间bucket
bucket := bucket10(t.DT, loc)
// NOTE: 按需改为用 station_id 匹配 radar_weather.alias
windSpeed, windDir, ok, err := loadWindAt(database.GetDB(), s.ID, bucket)
if err != nil {
log.Printf("读取风失败 %s @%s: %v", s.ID, t.DT.Format(time.RFC3339), err)
continue
}
if !ok { // 无风场:跳过该时次
skipNoWind++
if verbose {
log.Printf("跳过: %s 瓦片@%s桶=%s在 radar_weather(alias=%s) 无记录", s.ID, t.DT.In(loc).Format("2006-01-02 15:04:05"), bucket.Format("2006-01-02 15:04:05"), s.ID)
}
continue
}
// 解码 dBZ 网格
vals, xs, ys, err := decodeTile(t)
if err != nil {
log.Printf("解码瓦片失败 %s @%s: %v", s.ID, t.DT.Format(time.RFC3339), err)
skipDecode++
continue
}
// 统计
sec40Cnt, sec40Sum, sec30Cnt, sec30Sum,
cir40Cnt, cir40Sum, cir30Cnt, cir30Sum := computeStats(vals, xs, ys, s.Lat, s.Lon, windSpeed, windDir)
// 最近一条累计雨量
rainTotal, rainOK := loadNearestRain(database.GetDB(), s.ID, t.DT)
if verbose {
log.Printf("写出: %s dt=%s wind=%.3f m/s %.1f° 扇形(>=40:%d/%.1f >=30:%d/%.1f) 圆形(>=40:%d/%.1f >=30:%d/%.1f) rain_total=%v(%.3f)",
s.ID,
t.DT.In(loc).Format("2006-01-02 15:04:05"),
windSpeed, windDir,
sec40Cnt, sec40Sum, sec30Cnt, sec30Sum,
cir40Cnt, cir40Sum, cir30Cnt, cir30Sum,
rainOK, rainTotal,
)
}
rec := []string{
s.ID,
s.Alias,
t.DT.In(loc).Format("2006-01-02 15:04:05"),
fmt.Sprintf("%.6f", s.Lat),
fmt.Sprintf("%.6f", s.Lon),
fmt.Sprintf("%.3f", windSpeed),
fmt.Sprintf("%.2f", windDir),
fmt.Sprintf("%d", sec40Cnt),
fmt.Sprintf("%.1f", sec40Sum),
fmt.Sprintf("%d", sec30Cnt),
fmt.Sprintf("%.1f", sec30Sum),
fmt.Sprintf("%d", cir40Cnt),
fmt.Sprintf("%.1f", cir40Sum),
fmt.Sprintf("%d", cir30Cnt),
fmt.Sprintf("%.1f", cir30Sum),
fmt.Sprintf("%.3f", rainTotal),
}
if err := w.Write(rec); err != nil {
log.Printf("写入CSV失败: %v", err)
}
totalRows++
}
}
w.Flush()
if err := w.Error(); err != nil {
log.Fatalf("写入CSV失败: %v", err)
}
if verbose {
log.Printf("汇总: 站点数=%d 瓦片总数=%d 跳过(无z/y/x)=%d 跳过(无风)=%d 跳过(解码失败)=%d", len(stations), totalTiles, skipNoZYX, skipNoWind, skipDecode)
}
log.Printf("完成,输出 %d 行到 %s", totalRows, outPath)
}
func listStations(db *sql.DB, stationID string) ([]stationInfo, error) {
// 与前端一致device_type='WH65LP' 且 lat/lon 非空且非零
if strings.TrimSpace(stationID) != "" {
const q = `
SELECT station_id,
CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias,
latitude, longitude,
COALESCE(z,0), COALESCE(y,0), COALESCE(x,0)
FROM stations
WHERE device_type='WH65LP' AND station_id=$1
AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude<>0 AND longitude<>0`
var s stationInfo
err := db.QueryRow(q, stationID).Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, err
}
return []stationInfo{s}, nil
}
const qAll = `
SELECT station_id,
CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias,
latitude, longitude,
COALESCE(z,0), COALESCE(y,0), COALESCE(x,0)
FROM stations
WHERE device_type='WH65LP'
AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude<>0 AND longitude<>0
ORDER BY station_id`
rows, err := db.Query(qAll)
if err != nil {
return nil, err
}
defer rows.Close()
var out []stationInfo
for rows.Next() {
var s stationInfo
if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err == nil {
out = append(out, s)
}
}
return out, nil
}
func listTiles(ctx context.Context, db *sql.DB, z, y, x int, from, to time.Time) ([]tileRec, error) {
const q = `
SELECT dt, width, height, west, south, east, north, res_deg, data
FROM radar_tiles
WHERE z=$1 AND y=$2 AND x=$3 AND dt BETWEEN $4 AND $5
ORDER BY dt ASC`
rows, err := db.QueryContext(ctx, q, z, y, x, from, to)
if err != nil {
return nil, err
}
defer rows.Close()
var out []tileRec
for rows.Next() {
var r tileRec
if err := rows.Scan(&r.DT, &r.Width, &r.Height, &r.West, &r.South, &r.East, &r.North, &r.ResDeg, &r.Data); err == nil {
out = append(out, r)
}
}
return out, nil
}
func bucket10(t time.Time, loc *time.Location) time.Time {
tt := t.In(loc)
m := (tt.Minute() / 10) * 10
return time.Date(tt.Year(), tt.Month(), tt.Day(), tt.Hour(), m, 0, 0, loc)
}
// loadWindAt 以别名(alias)精确匹配 radar_weather本导出按需传入 station_id 作为 alias 参数
func loadWindAt(db *sql.DB, alias string, dt time.Time) (speedMS float64, dirDeg float64, ok bool, err error) {
const q = `
SELECT wind_speed, wind_direction
FROM radar_weather
WHERE alias=$1 AND dt=$2
LIMIT 1`
var s, d sql.NullFloat64
err = db.QueryRow(q, alias, dt).Scan(&s, &d)
if err == sql.ErrNoRows {
return 0, 0, false, nil
}
if err != nil {
return 0, 0, false, err
}
if !s.Valid || !d.Valid {
return 0, 0, false, nil
}
return s.Float64, d.Float64, true, nil
}
func loadNearestRain(db *sql.DB, stationID string, dt time.Time) (rainTotal float64, ok bool) {
// 取最近一条累计雨量单位mm。如不存在返回0,false
const q = `
SELECT rainfall
FROM rs485_weather_data
WHERE station_id=$1
ORDER BY ABS(EXTRACT(EPOCH FROM (timestamp - $2))) ASC
LIMIT 1`
var r sql.NullFloat64
if err := db.QueryRow(q, stationID, dt).Scan(&r); err != nil {
return 0, false
}
if !r.Valid {
return 0, false
}
return r.Float64, true
}
func decodeTile(t tileRec) (vals [][]*float64, xs []float64, ys []float64, err error) {
w, h := t.Width, t.Height
if w <= 0 || h <= 0 {
return nil, nil, nil, fmt.Errorf("非法尺寸")
}
if len(t.Data) < w*h*2 {
return nil, nil, nil, fmt.Errorf("数据长度不足")
}
xs = make([]float64, w)
for c := 0; c < w; c++ {
xs[c] = t.West + (float64(c)+0.5)*t.ResDeg
}
ys = make([]float64, h)
for r := 0; r < h; r++ {
ys[r] = t.South + (float64(r)+0.5)*t.ResDeg
}
vals = make([][]*float64, h)
off := 0
for r := 0; r < h; r++ {
row := make([]*float64, w)
for c := 0; c < w; c++ {
v := int16(binary.BigEndian.Uint16(t.Data[off : off+2]))
off += 2
if v >= 32766 {
row[c] = nil
continue
}
dbz := float64(v) / 10.0
if dbz < 0 {
dbz = 0
} else if dbz > 75 {
dbz = 75
}
vv := dbz
row[c] = &vv
}
vals[r] = row
}
return vals, xs, ys, nil
}
func computeStats(vals [][]*float64, xs, ys []float64, stLat, stLon, windMS, windFromDeg float64) (
sec40Cnt int, sec40Sum float64, sec30Cnt int, sec30Sum float64,
cir40Cnt int, cir40Sum float64, cir30Cnt int, cir30Sum float64,
) {
h := len(vals)
if h == 0 {
return
}
w := len(vals[0])
// 半径(米)与半角
halfAngle := 30.0
rangeM := windMS * 3 * 3600
circleR := 8000.0
for r := 0; r < h; r++ {
lat := ys[r]
row := vals[r]
for c := 0; c < w; c++ {
if row[c] == nil {
continue
}
dbz := *row[c]
lon := xs[c]
dist := haversine(stLat, stLon, lat, lon)
// 8km 圆
if dist <= circleR {
if dbz >= 40 {
cir40Cnt++
cir40Sum += dbz
}
if dbz >= 30 {
cir30Cnt++
cir30Sum += dbz
}
}
// 扇形(需同时满足距离与角度)
if dist <= rangeM {
brg := bearingDeg(stLat, stLon, lat, lon)
if angDiff(brg, windFromDeg) <= halfAngle {
if dbz >= 40 {
sec40Cnt++
sec40Sum += dbz
}
if dbz >= 30 {
sec30Cnt++
sec30Sum += dbz
}
}
}
}
}
return
}
func toRad(d float64) float64 { return d * math.Pi / 180 }
func toDeg(r float64) float64 { return r * 180 / math.Pi }
func haversine(lat1, lon1, lat2, lon2 float64) float64 {
const R = 6371000.0
dLat := toRad(lat2 - lat1)
dLon := toRad(lon2 - lon1)
a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2)
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
return R * c
}
func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 {
φ1 := toRad(lat1)
φ2 := toRad(lat2)
Δλ := toRad(lon2 - lon1)
y := math.Sin(Δλ) * math.Cos(φ2)
x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ)
brg := toDeg(math.Atan2(y, x))
if brg < 0 {
brg += 360
}
return brg
}
func angDiff(a, b float64) float64 {
d := math.Mod(a-b+540, 360) - 180
if d < 0 {
d = -d
}
return math.Abs(d)
}