feat: 用彩云的预报替代风向风速
This commit is contained in:
parent
3e50260c51
commit
eeeffa3e95
@ -36,6 +36,8 @@ func main() {
|
|||||||
var historicalOnly = flag.Bool("historical_only", false, "仅执行历史数据补完并退出")
|
var historicalOnly = flag.Bool("historical_only", false, "仅执行历史数据补完并退出")
|
||||||
var historicalStart = flag.String("historical_start", "", "历史数据开始日期(格式YYYY-MM-DD)")
|
var historicalStart = flag.String("historical_start", "", "历史数据开始日期(格式YYYY-MM-DD)")
|
||||||
var historicalEnd = flag.String("historical_end", "", "历史数据结束日期(格式YYYY-MM-DD)")
|
var historicalEnd = flag.String("historical_end", "", "历史数据结束日期(格式YYYY-MM-DD)")
|
||||||
|
// 覆盖风:使用彩云实况替换导出中的风速/风向
|
||||||
|
var useWindOverride = flag.Bool("wind", false, "使用彩云实况覆盖导出CSV中的风速/风向")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
// 设置日志
|
// 设置日志
|
||||||
@ -102,7 +104,7 @@ func main() {
|
|||||||
if err := forecast.RunOpenMeteoHistoricalFetch(context.Background(), *historicalStart, *historicalEnd); err != nil {
|
if err := forecast.RunOpenMeteoHistoricalFetch(context.Background(), *historicalStart, *historicalEnd); err != nil {
|
||||||
log.Fatalf("历史数据补完失败: %v", err)
|
log.Fatalf("历史数据补完失败: %v", err)
|
||||||
}
|
}
|
||||||
log.Println("历史数据补完完成")
|
log.Println("历史数据补完成")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -150,7 +152,21 @@ func main() {
|
|||||||
}()
|
}()
|
||||||
log.Println("启动数据导出器(10分钟)...")
|
log.Println("启动数据导出器(10分钟)...")
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
exporter := tools.NewExporter()
|
// 处理 --wind 覆盖
|
||||||
|
var opts tools.ExporterOptions
|
||||||
|
if *useWindOverride {
|
||||||
|
token := os.Getenv("CAIYUN_TOKEN")
|
||||||
|
if token == "" {
|
||||||
|
token = config.GetConfig().Forecast.CaiyunToken
|
||||||
|
}
|
||||||
|
if token == "" {
|
||||||
|
log.Println("警告: 指定了 --wind 但未提供彩云 token,忽略风覆盖")
|
||||||
|
} else {
|
||||||
|
opts.OverrideWindWithCaiyun = true
|
||||||
|
opts.CaiyunToken = token
|
||||||
|
}
|
||||||
|
}
|
||||||
|
exporter := tools.NewExporterWithOptions(opts)
|
||||||
if err := exporter.Start(ctx); err != nil {
|
if err := exporter.Start(ctx); err != nil {
|
||||||
log.Printf("导出器退出: %v", err)
|
log.Printf("导出器退出: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,11 +4,13 @@ import (
|
|||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
"math"
|
||||||
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
@ -18,13 +20,27 @@ import (
|
|||||||
|
|
||||||
// Exporter 负责每10分钟导出 CSV(含ZTD融合)
|
// Exporter 负责每10分钟导出 CSV(含ZTD融合)
|
||||||
type Exporter struct {
|
type Exporter struct {
|
||||||
pg *sql.DB
|
pg *sql.DB
|
||||||
my *sql.DB
|
my *sql.DB
|
||||||
loc *time.Location // Asia/Shanghai
|
loc *time.Location // Asia/Shanghai
|
||||||
logger *log.Logger // 专用日志记录器
|
logger *log.Logger // 专用日志记录器
|
||||||
|
opts ExporterOptions
|
||||||
|
httpClient *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExporterOptions 导出器可选项
|
||||||
|
type ExporterOptions struct {
|
||||||
|
// OverrideWindWithCaiyun 为 true 时,导出CSV时用彩云实况覆盖风速/风向
|
||||||
|
OverrideWindWithCaiyun bool
|
||||||
|
// CaiyunToken 彩云API令牌
|
||||||
|
CaiyunToken string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewExporter() *Exporter {
|
func NewExporter() *Exporter {
|
||||||
|
return NewExporterWithOptions(ExporterOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewExporterWithOptions(opts ExporterOptions) *Exporter {
|
||||||
loc, _ := time.LoadLocation("Asia/Shanghai")
|
loc, _ := time.LoadLocation("Asia/Shanghai")
|
||||||
if loc == nil {
|
if loc == nil {
|
||||||
loc = time.FixedZone("CST", 8*3600)
|
loc = time.FixedZone("CST", 8*3600)
|
||||||
@ -44,10 +60,12 @@ func NewExporter() *Exporter {
|
|||||||
logger := log.New(f, "", log.Ldate|log.Ltime|log.Lmicroseconds)
|
logger := log.New(f, "", log.Ldate|log.Ltime|log.Lmicroseconds)
|
||||||
|
|
||||||
return &Exporter{
|
return &Exporter{
|
||||||
pg: database.GetDB(),
|
pg: database.GetDB(),
|
||||||
my: database.GetMySQL(),
|
my: database.GetMySQL(),
|
||||||
loc: loc,
|
loc: loc,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
|
opts: opts,
|
||||||
|
httpClient: &http.Client{Timeout: 10 * time.Second},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -225,6 +243,17 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
|
|||||||
rhStr = fmtFloat(float64(rh.Int64), 0)
|
rhStr = fmtFloat(float64(rh.Int64), 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 如果需要,使用彩云实况覆盖风速/风向
|
||||||
|
if e.opts.OverrideWindWithCaiyun && lat.Valid && lon.Valid && e.opts.CaiyunToken != "" {
|
||||||
|
if spd, dir, ok := e.fetchCaiyunRealtimeWind(ctx, lat.Float64, lon.Float64); ok {
|
||||||
|
wsStr = fmtFloat(spd, 3)
|
||||||
|
wdStr = fmtFloat(dir, 0)
|
||||||
|
e.logger.Printf("站点 %s: 使用彩云实况覆盖风: speed=%.3f m/s, dir=%.0f°", stationID, spd, dir)
|
||||||
|
} else {
|
||||||
|
e.logger.Printf("站点 %s: 彩云实况风获取失败,保留数据库值", stationID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 使用device_id查询ZTD,使用桶末时间
|
// 使用device_id查询ZTD,使用桶末时间
|
||||||
ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd)
|
ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd)
|
||||||
if ztdStr != "" {
|
if ztdStr != "" {
|
||||||
@ -313,6 +342,52 @@ func (e *Exporter) lookupZTD(ctx context.Context, deviceID string, bucketEnd tim
|
|||||||
return fmtFloat(ztd.Float64*100.0, -1)
|
return fmtFloat(ztd.Float64*100.0, -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fetchCaiyunRealtimeWind 拉取彩云实时风(m/s, 度)。lat,lon为纬度、经度。
|
||||||
|
func (e *Exporter) fetchCaiyunRealtimeWind(ctx context.Context, lat, lon float64) (float64, float64, bool) {
|
||||||
|
if e.httpClient == nil || e.opts.CaiyunToken == "" {
|
||||||
|
return 0, 0, false
|
||||||
|
}
|
||||||
|
type realtimeResp struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
Unit string `json:"unit"`
|
||||||
|
Result struct {
|
||||||
|
Realtime struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
Wind struct {
|
||||||
|
Speed float64 `json:"speed"`
|
||||||
|
Direction float64 `json:"direction"`
|
||||||
|
} `json:"wind"`
|
||||||
|
} `json:"realtime"`
|
||||||
|
} `json:"result"`
|
||||||
|
}
|
||||||
|
url := fmt.Sprintf("https://api.caiyunapp.com/v2.6/%s/%f,%f/realtime?unit=SI", e.opts.CaiyunToken, lon, lat)
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, false
|
||||||
|
}
|
||||||
|
resp, err := e.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, false
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode/100 != 2 {
|
||||||
|
return 0, 0, false
|
||||||
|
}
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, false
|
||||||
|
}
|
||||||
|
var data realtimeResp
|
||||||
|
if err := json.Unmarshal(body, &data); err != nil {
|
||||||
|
return 0, 0, false
|
||||||
|
}
|
||||||
|
if strings.ToLower(data.Status) != "ok" || strings.ToLower(data.Result.Realtime.Status) != "ok" {
|
||||||
|
return 0, 0, false
|
||||||
|
}
|
||||||
|
// 使用 SI 单位,风速直接为 m/s
|
||||||
|
return data.Result.Realtime.Wind.Speed, data.Result.Realtime.Wind.Direction, true
|
||||||
|
}
|
||||||
|
|
||||||
func ensureFileWithHeader(path string) bool {
|
func ensureFileWithHeader(path string) bool {
|
||||||
if _, err := os.Stat(path); err == nil {
|
if _, err := os.Stat(path); err == nil {
|
||||||
return false
|
return false
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user