diff --git a/cmd/weatherstation/main.go b/cmd/weatherstation/main.go index b380726..0e00836 100644 --- a/cmd/weatherstation/main.go +++ b/cmd/weatherstation/main.go @@ -10,6 +10,7 @@ import ( "weatherstation/internal/config" "weatherstation/internal/database" "weatherstation/internal/forecast" + "weatherstation/internal/radar" "weatherstation/internal/selftest" "weatherstation/internal/server" "weatherstation/internal/tools" @@ -44,6 +45,10 @@ func main() { var exportRangeOnly = flag.Bool("export_range", false, "按日期范围导出10分钟CSV(含ZTD融合),并退出。日期格式支持 YYYY-MM-DD 或 YYYYMMDD") var exportStart = flag.String("export_start", "", "导出起始日期(含),格式 YYYY-MM-DD 或 YYYYMMDD") var exportEnd = flag.String("export_end", "", "导出结束日期(含),格式 YYYY-MM-DD 或 YYYYMMDD") + // 雷达:导入单个CMA瓦片到数据库 + var importTile = flag.Bool("import_tile", false, "导入一个CMA雷达瓦片到数据库并退出") + var tileURL = flag.String("tile_url", "", "瓦片URL或/tiles/...路径,用于解析product/时间/z/y/x") + var tilePath = flag.String("tile_path", "", "瓦片本地文件路径(.bin)") flag.Parse() // 设置日志 @@ -110,6 +115,18 @@ func main() { return } + // 导入一个CMA雷达瓦片到数据库 + if *importTile { + if *tileURL == "" || *tilePath == "" { + log.Fatalln("import_tile 需要提供 --tile_url 与 --tile_path") + } + if err := radar.ImportTileFile(context.Background(), *tileURL, *tilePath); err != nil { + log.Fatalf("导入雷达瓦片失败: %v", err) + } + log.Println("导入雷达瓦片完成") + return + } + // 历史CSV范围导出 if *exportRangeOnly { if *exportStart == "" || *exportEnd == "" { @@ -222,9 +239,26 @@ func main() { }() } + startRadarSchedulerBackground := func(wg *sync.WaitGroup) { + if wg != nil { + wg.Add(1) + } + go func() { + defer func() { + if wg != nil { + wg.Done() + } + }() + log.Println("启动雷达下载任务(每10分钟,无延迟,固定瓦片 7/40/102)...") + ctx := context.Background() + _ = radar.Start(ctx, radar.Options{StoreToDB: true, Z: 7, Y: 40, X: 102}) + }() + } + if *webOnly { // 只启动Web服务器 + 导出器 startExporterBackground(nil) + startRadarSchedulerBackground(nil) log.Println("启动Web服务器模式...") if err := server.StartGinServer(); err != nil { log.Fatalf("启动Web服务器失败: %v", err) @@ -232,6 +266,7 @@ func main() { } else if *udpOnly { // 只启动UDP服务器 + 导出器 startExporterBackground(nil) + startRadarSchedulerBackground(nil) log.Println("启动UDP服务器模式...") if err := server.StartUDPServer(); err != nil { log.Fatalf("启动UDP服务器失败: %v", err) @@ -262,6 +297,7 @@ func main() { }() startExporterBackground(&wg) + startRadarSchedulerBackground(&wg) wg.Wait() } } diff --git a/db/schema.sql b/db/schema.sql index 66a8d6e..eea7b5f 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -537,4 +537,30 @@ CREATE INDEX idx_fcast_station_time ON public.forecast_hourly USING btree (stati COMMENT ON TABLE public.forecast_hourly IS '小时级预报表,按issued_at版本化;要素使用缩放整数存储'; COMMENT ON COLUMN public.forecast_hourly.issued_at IS '预报方案发布时间(版本时间)'; COMMENT ON COLUMN public.forecast_hourly.forecast_time IS '目标小时时间戳'; -COMMENT ON COLUMN public.forecast_hourly.rain_mm_x1000 IS '该小时降雨量,单位mm×1000'; \ No newline at end of file +COMMENT ON COLUMN public.forecast_hourly.rain_mm_x1000 IS '该小时降雨量,单位mm×1000'; + +-- +-- Name: radar_weather; Type: TABLE; Schema: public; Owner: - +-- 用途:雷达站实时气象(彩云实时),每10分钟采样一条 +-- +CREATE TABLE IF NOT EXISTS public.radar_weather ( + id SERIAL PRIMARY KEY, + alias TEXT NOT NULL, + lat DOUBLE PRECISION NOT NULL, + lon DOUBLE PRECISION NOT NULL, + dt TIMESTAMPTZ NOT NULL, + temperature DOUBLE PRECISION, + humidity DOUBLE PRECISION, + cloudrate DOUBLE PRECISION, + visibility DOUBLE PRECISION, + dswrf DOUBLE PRECISION, + wind_speed DOUBLE PRECISION, + wind_direction DOUBLE PRECISION, + pressure DOUBLE PRECISION, + created_at TIMESTAMPTZ DEFAULT now() +); + +-- 约束与索引 +CREATE UNIQUE INDEX IF NOT EXISTS radar_weather_udx ON public.radar_weather(alias, dt); +CREATE INDEX IF NOT EXISTS idx_radar_weather_dt ON public.radar_weather(dt); +COMMENT ON TABLE public.radar_weather IS '雷达站实时气象数据表(彩云Realtime),按10分钟存档'; diff --git a/internal/database/radar_tiles.go b/internal/database/radar_tiles.go new file mode 100644 index 0000000..f0357b0 --- /dev/null +++ b/internal/database/radar_tiles.go @@ -0,0 +1,63 @@ +package database + +import ( + "context" + "crypto/md5" + "database/sql" + "encoding/hex" + "fmt" + "math" + "time" +) + +// UpsertRadarTile stores a radar tile into table `radar_tiles`. +// Assumes the table exists with schema compatible to columns used below. +func UpsertRadarTile(ctx context.Context, db *sql.DB, product string, dt time.Time, z, y, x int, width, height int, data []byte) error { + if width == 0 { + width = 256 + } + if height == 0 { + height = 256 + } + step := 360.0 / math.Pow(2, float64(z)) + west := -180.0 + float64(x)*step + south := -90.0 + float64(y)*step + east := west + step + north := south + step + res := step / float64(width) + + sum := md5.Sum(data) + md5hex := hex.EncodeToString(sum[:]) + + q := ` + INSERT INTO radar_tiles ( + product, dt, z, y, x, width, height, + west, south, east, north, res_deg, + data, checksum_md5 + ) VALUES ( + $1,$2,$3,$4,$5,$6,$7, + $8,$9,$10,$11,$12, + $13,$14 + ) + ON CONFLICT (product, dt, z, y, x) + DO UPDATE SET + width = EXCLUDED.width, + height = EXCLUDED.height, + west = EXCLUDED.west, + south = EXCLUDED.south, + east = EXCLUDED.east, + north = EXCLUDED.north, + res_deg = EXCLUDED.res_deg, + data = EXCLUDED.data, + checksum_md5 = EXCLUDED.checksum_md5` + + _, err := db.ExecContext(ctx, q, + product, dt, z, y, x, width, height, + west, south, east, north, res, + data, md5hex, + ) + if err != nil { + return fmt.Errorf("upsert radar tile (%s %s z=%d y=%d x=%d): %w", product, dt.Format(time.RFC3339), z, y, x, err) + } + return nil +} diff --git a/internal/database/radar_weather.go b/internal/database/radar_weather.go new file mode 100644 index 0000000..e6c2845 --- /dev/null +++ b/internal/database/radar_weather.go @@ -0,0 +1,70 @@ +package database + +import ( + "context" + "database/sql" + "fmt" + "time" +) + +// UpsertRadarWeather stores a realtime snapshot for a radar station. +// Table schema (expected): +// +// CREATE TABLE IF NOT EXISTS radar_weather ( +// id SERIAL PRIMARY KEY, +// alias TEXT NOT NULL, +// lat DOUBLE PRECISION NOT NULL, +// lon DOUBLE PRECISION NOT NULL, +// dt TIMESTAMPTZ NOT NULL, +// temperature DOUBLE PRECISION, +// humidity DOUBLE PRECISION, +// cloudrate DOUBLE PRECISION, +// visibility DOUBLE PRECISION, +// dswrf DOUBLE PRECISION, +// wind_speed DOUBLE PRECISION, +// wind_direction DOUBLE PRECISION, +// pressure DOUBLE PRECISION, +// created_at TIMESTAMPTZ DEFAULT now() +// ); +// CREATE UNIQUE INDEX IF NOT EXISTS radar_weather_udx ON radar_weather(alias, dt); +func UpsertRadarWeather( + ctx context.Context, + db *sql.DB, + alias string, + lat, lon float64, + dt time.Time, + temperature, humidity, cloudrate, visibility, dswrf, windSpeed, windDir, pressure float64, +) error { + const q = ` + INSERT INTO radar_weather ( + alias, lat, lon, dt, + temperature, humidity, cloudrate, visibility, dswrf, + wind_speed, wind_direction, pressure + ) VALUES ( + $1,$2,$3,$4, + $5,$6,$7,$8,$9, + $10,$11,$12 + ) + ON CONFLICT (alias, dt) + DO UPDATE SET + lat = EXCLUDED.lat, + lon = EXCLUDED.lon, + temperature = EXCLUDED.temperature, + humidity = EXCLUDED.humidity, + cloudrate = EXCLUDED.cloudrate, + visibility = EXCLUDED.visibility, + dswrf = EXCLUDED.dswrf, + wind_speed = EXCLUDED.wind_speed, + wind_direction = EXCLUDED.wind_direction, + pressure = EXCLUDED.pressure` + + _, err := db.ExecContext(ctx, q, + alias, lat, lon, dt, + temperature, humidity, cloudrate, visibility, dswrf, + windSpeed, windDir, pressure, + ) + if err != nil { + return fmt.Errorf("upsert radar_weather (%s %s): %w", alias, dt.Format(time.RFC3339), err) + } + return nil +} diff --git a/internal/radar/scheduler.go b/internal/radar/scheduler.go new file mode 100644 index 0000000..796bf4c --- /dev/null +++ b/internal/radar/scheduler.go @@ -0,0 +1,508 @@ +package radar + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net/http" + neturl "net/url" + "os" + "path/filepath" + "regexp" + "strings" + "time" + "weatherstation/internal/config" + "weatherstation/internal/database" +) + +// Options controls the radar scheduler behavior. +type Options struct { + Enable bool + OutputDir string + Delay time.Duration // time after each 6-minute boundary to trigger download + BaseURL string // optional: where to download from (template-based) + MaxRetries int + // Tile indices (4326 pyramid). Defaults: z=7,y=40,x=102 (Nanning region example) + Z int + Y int + X int + // StoreToDB controls whether to store fetched tiles into PostgreSQL `radar_tiles`. + StoreToDB bool +} + +// Start starts the radar download scheduler. It reads options from env if zero value provided. +// Env vars: +// +// RADAR_ENABLED=true|false (default: true) +// RADAR_DIR=radar_data (default) +// RADAR_DELAY_SEC=120 (2 minutes; trigger after each boundary) +// RADAR_MAX_RETRIES=2 +// RADAR_BASE_URL=