From eeeffa3e959540fbb90e41409bc01916ec53be45 Mon Sep 17 00:00:00 2001 From: yarnom Date: Wed, 27 Aug 2025 12:47:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=94=A8=E5=BD=A9=E4=BA=91=E7=9A=84?= =?UTF-8?q?=E9=A2=84=E6=8A=A5=E6=9B=BF=E4=BB=A3=E9=A3=8E=E5=90=91=E9=A3=8E?= =?UTF-8?q?=E9=80=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/weatherstation/main.go | 20 ++++++++- internal/tools/exporter.go | 91 ++++++++++++++++++++++++++++++++++---- 2 files changed, 101 insertions(+), 10 deletions(-) diff --git a/cmd/weatherstation/main.go b/cmd/weatherstation/main.go index c5832ae..f02730b 100644 --- a/cmd/weatherstation/main.go +++ b/cmd/weatherstation/main.go @@ -36,6 +36,8 @@ func main() { var historicalOnly = flag.Bool("historical_only", false, "仅执行历史数据补完并退出") var historicalStart = flag.String("historical_start", "", "历史数据开始日期(格式YYYY-MM-DD)") var historicalEnd = flag.String("historical_end", "", "历史数据结束日期(格式YYYY-MM-DD)") + // 覆盖风:使用彩云实况替换导出中的风速/风向 + var useWindOverride = flag.Bool("wind", false, "使用彩云实况覆盖导出CSV中的风速/风向") flag.Parse() // 设置日志 @@ -102,7 +104,7 @@ func main() { if err := forecast.RunOpenMeteoHistoricalFetch(context.Background(), *historicalStart, *historicalEnd); err != nil { log.Fatalf("历史数据补完失败: %v", err) } - log.Println("历史数据补完完成") + log.Println("历史数据补完成") return } @@ -150,7 +152,21 @@ func main() { }() log.Println("启动数据导出器(10分钟)...") ctx := context.Background() - exporter := tools.NewExporter() + // 处理 --wind 覆盖 + var opts tools.ExporterOptions + if *useWindOverride { + token := os.Getenv("CAIYUN_TOKEN") + if token == "" { + token = config.GetConfig().Forecast.CaiyunToken + } + if token == "" { + log.Println("警告: 指定了 --wind 但未提供彩云 token,忽略风覆盖") + } else { + opts.OverrideWindWithCaiyun = true + opts.CaiyunToken = token + } + } + exporter := tools.NewExporterWithOptions(opts) if err := exporter.Start(ctx); err != nil { log.Printf("导出器退出: %v", err) } diff --git a/internal/tools/exporter.go b/internal/tools/exporter.go index 6b410b0..533c249 100644 --- a/internal/tools/exporter.go +++ b/internal/tools/exporter.go @@ -4,11 +4,13 @@ import ( "compress/gzip" "context" "database/sql" + "encoding/json" "errors" "fmt" "io" "log" "math" + "net/http" "os" "path/filepath" "strings" @@ -18,13 +20,27 @@ import ( // Exporter 负责每10分钟导出 CSV(含ZTD融合) type Exporter struct { - pg *sql.DB - my *sql.DB - loc *time.Location // Asia/Shanghai - logger *log.Logger // 专用日志记录器 + pg *sql.DB + my *sql.DB + loc *time.Location // Asia/Shanghai + logger *log.Logger // 专用日志记录器 + opts ExporterOptions + httpClient *http.Client +} + +// ExporterOptions 导出器可选项 +type ExporterOptions struct { + // OverrideWindWithCaiyun 为 true 时,导出CSV时用彩云实况覆盖风速/风向 + OverrideWindWithCaiyun bool + // CaiyunToken 彩云API令牌 + CaiyunToken string } func NewExporter() *Exporter { + return NewExporterWithOptions(ExporterOptions{}) +} + +func NewExporterWithOptions(opts ExporterOptions) *Exporter { loc, _ := time.LoadLocation("Asia/Shanghai") if loc == nil { loc = time.FixedZone("CST", 8*3600) @@ -44,10 +60,12 @@ func NewExporter() *Exporter { logger := log.New(f, "", log.Ldate|log.Ltime|log.Lmicroseconds) return &Exporter{ - pg: database.GetDB(), - my: database.GetMySQL(), - loc: loc, - logger: logger, + pg: database.GetDB(), + my: database.GetMySQL(), + loc: loc, + logger: logger, + opts: opts, + httpClient: &http.Client{Timeout: 10 * time.Second}, } } @@ -225,6 +243,17 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time rhStr = fmtFloat(float64(rh.Int64), 0) } + // 如果需要,使用彩云实况覆盖风速/风向 + if e.opts.OverrideWindWithCaiyun && lat.Valid && lon.Valid && e.opts.CaiyunToken != "" { + if spd, dir, ok := e.fetchCaiyunRealtimeWind(ctx, lat.Float64, lon.Float64); ok { + wsStr = fmtFloat(spd, 3) + wdStr = fmtFloat(dir, 0) + e.logger.Printf("站点 %s: 使用彩云实况覆盖风: speed=%.3f m/s, dir=%.0f°", stationID, spd, dir) + } else { + e.logger.Printf("站点 %s: 彩云实况风获取失败,保留数据库值", stationID) + } + } + // 使用device_id查询ZTD,使用桶末时间 ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd) if ztdStr != "" { @@ -313,6 +342,52 @@ func (e *Exporter) lookupZTD(ctx context.Context, deviceID string, bucketEnd tim return fmtFloat(ztd.Float64*100.0, -1) } +// fetchCaiyunRealtimeWind 拉取彩云实时风(m/s, 度)。lat,lon为纬度、经度。 +func (e *Exporter) fetchCaiyunRealtimeWind(ctx context.Context, lat, lon float64) (float64, float64, bool) { + if e.httpClient == nil || e.opts.CaiyunToken == "" { + return 0, 0, false + } + type realtimeResp struct { + Status string `json:"status"` + Unit string `json:"unit"` + Result struct { + Realtime struct { + Status string `json:"status"` + Wind struct { + Speed float64 `json:"speed"` + Direction float64 `json:"direction"` + } `json:"wind"` + } `json:"realtime"` + } `json:"result"` + } + url := fmt.Sprintf("https://api.caiyunapp.com/v2.6/%s/%f,%f/realtime?unit=SI", e.opts.CaiyunToken, lon, lat) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return 0, 0, false + } + resp, err := e.httpClient.Do(req) + if err != nil { + return 0, 0, false + } + defer resp.Body.Close() + if resp.StatusCode/100 != 2 { + return 0, 0, false + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, 0, false + } + var data realtimeResp + if err := json.Unmarshal(body, &data); err != nil { + return 0, 0, false + } + if strings.ToLower(data.Status) != "ok" || strings.ToLower(data.Result.Realtime.Status) != "ok" { + return 0, 0, false + } + // 使用 SI 单位,风速直接为 m/s + return data.Result.Realtime.Wind.Speed, data.Result.Realtime.Wind.Direction, true +} + func ensureFileWithHeader(path string) bool { if _, err := os.Stat(path); err == nil { return false