From 24dca2f4891f4c4e33858b1f907d1fac886e696e Mon Sep 17 00:00:00 2001 From: yarnom Date: Fri, 22 Aug 2025 20:15:07 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E6=96=B0=E5=A2=9E=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=AF=BC=E5=87=BA=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/weatherstation/main.go | 35 +++- config.yaml | 8 + go.mod | 4 +- go.sum | 13 ++ internal/config/config.go | 14 ++ internal/database/db.go | 38 +++++ internal/tools/exporter.go | 338 +++++++++++++++++++++++++++++++++++++ 7 files changed, 444 insertions(+), 6 deletions(-) create mode 100644 internal/tools/exporter.go diff --git a/cmd/weatherstation/main.go b/cmd/weatherstation/main.go index 5ef4bde..3d1f2cc 100644 --- a/cmd/weatherstation/main.go +++ b/cmd/weatherstation/main.go @@ -38,8 +38,11 @@ func main() { server.SetupLogger() // 初始化数据库连接 - _ = database.GetDB() // 确保数据库连接已初始化 + _ = database.GetDB() // 确保PostgreSQL连接已初始化 defer database.Close() + // 初始化MySQL连接(如果配置存在) + _ = database.GetMySQL() + defer database.CloseMySQL() // 启动前自检 if !*noSelftest { @@ -115,21 +118,42 @@ func main() { } // 根据命令行参数启动服务 + startExporterBackground := func(wg *sync.WaitGroup) { + if wg != nil { + wg.Add(1) + } + go func() { + defer func() { + if wg != nil { + wg.Done() + } + }() + log.Println("启动数据导出器(10分钟)...") + ctx := context.Background() + exporter := tools.NewExporter() + if err := exporter.Start(ctx); err != nil { + log.Printf("导出器退出: %v", err) + } + }() + } + if *webOnly { - // 只启动Web服务器 + // 只启动Web服务器 + 导出器 + startExporterBackground(nil) log.Println("启动Web服务器模式...") if err := server.StartGinServer(); err != nil { log.Fatalf("启动Web服务器失败: %v", err) } } else if *udpOnly { - // 只启动UDP服务器 + // 只启动UDP服务器 + 导出器 + startExporterBackground(nil) log.Println("启动UDP服务器模式...") if err := server.StartUDPServer(); err != nil { log.Fatalf("启动UDP服务器失败: %v", err) } } else { - // 同时启动UDP和Web服务器 - log.Println("启动完整模式:UDP + Web服务器...") + // 同时启动UDP和Web服务器 + 导出器 + log.Println("启动完整模式:UDP + Web服务器 + 导出器...") var wg sync.WaitGroup wg.Add(2) @@ -152,6 +176,7 @@ func main() { } }() + startExporterBackground(&wg) wg.Wait() } } diff --git a/config.yaml b/config.yaml index 118fe6a..ac02389 100644 --- a/config.yaml +++ b/config.yaml @@ -11,3 +11,11 @@ database: forecast: caiyun_token: "ZAcZq49qzibr10F0" + +mysql: + host: "127.0.0.1" + port: 3306 + user: "remote" + password: "your_password" + dbname: "rtk_data" + params: "parseTime=true&loc=Asia%2FShanghai" diff --git a/go.mod b/go.mod index 31efa88..797a897 100644 --- a/go.mod +++ b/go.mod @@ -6,11 +6,13 @@ toolchain go1.24.5 require ( github.com/gin-gonic/gin v1.10.1 + github.com/go-sql-driver/mysql v1.8.1 github.com/lib/pq v1.10.9 gopkg.in/yaml.v3 v3.0.1 ) require ( + filippo.io/edwards25519 v1.1.0 // indirect github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect github.com/cloudwego/base64x v0.1.4 // indirect @@ -36,4 +38,4 @@ require ( golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect google.golang.org/protobuf v1.34.1 // indirect -) \ No newline at end of file +) diff --git a/go.sum b/go.sum index a202db1..f87f65d 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= @@ -7,6 +9,7 @@ github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJ github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= @@ -14,14 +17,20 @@ github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.10.1 h1:T0ujvqyCSqRopADpgPgiTT63DUQVSfojyME59Ei63pQ= github.com/gin-gonic/gin v1.10.1/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8= github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= @@ -42,6 +51,7 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -53,6 +63,7 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= @@ -71,6 +82,8 @@ golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/internal/config/config.go b/internal/config/config.go index 0a8f609..d2c801a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -28,10 +28,21 @@ type ForecastConfig struct { CaiyunToken string `yaml:"caiyun_token"` } +// MySQLConfig MySQL 连接配置(用于 rtk_data) +type MySQLConfig struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + User string `yaml:"user"` + Password string `yaml:"password"` + DBName string `yaml:"dbname"` + Params string `yaml:"params"` // 例如: parseTime=true&loc=Asia%2FShanghai +} + type Config struct { Server ServerConfig `yaml:"server"` Database DatabaseConfig `yaml:"database"` Forecast ForecastConfig `yaml:"forecast"` + MySQL MySQLConfig `yaml:"mysql"` } var ( @@ -89,6 +100,9 @@ func (c *Config) validate() error { if c.Database.SSLMode == "" { c.Database.SSLMode = "disable" // 默认禁用SSL } + if c.MySQL.Port <= 0 { + c.MySQL.Port = 3306 + } // CaiyunToken 允许为空:表示不启用彩云定时任务 return nil } diff --git a/internal/database/db.go b/internal/database/db.go index d735ea9..0d355b9 100644 --- a/internal/database/db.go +++ b/internal/database/db.go @@ -6,6 +6,7 @@ import ( "sync" "weatherstation/internal/config" + _ "github.com/go-sql-driver/mysql" _ "github.com/lib/pq" ) @@ -48,3 +49,40 @@ func Close() error { } return nil } + +// -------------------- MySQL 连接(rtk_data) -------------------- + +var ( + mysqlInstance *sql.DB + mysqlOnce sync.Once +) + +// GetMySQL 返回 MySQL 连接单例(rtk_data) +func GetMySQL() *sql.DB { + mysqlOnce.Do(func() { + cfg := config.GetConfig().MySQL + var dsn string + if cfg.Params != "" { + dsn = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.DBName, cfg.Params) + } else { + dsn = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.DBName) + } + var err error + mysqlInstance, err = sql.Open("mysql", dsn) + if err != nil { + panic(fmt.Sprintf("无法连接到 MySQL: %v", err)) + } + if err = mysqlInstance.Ping(); err != nil { + panic(fmt.Sprintf("MySQL 连接测试失败: %v", err)) + } + }) + return mysqlInstance +} + +// CloseMySQL 关闭 MySQL 连接 +func CloseMySQL() error { + if mysqlInstance != nil { + return mysqlInstance.Close() + } + return nil +} diff --git a/internal/tools/exporter.go b/internal/tools/exporter.go new file mode 100644 index 0000000..e439f4c --- /dev/null +++ b/internal/tools/exporter.go @@ -0,0 +1,338 @@ +package tools + +import ( + "compress/gzip" + "context" + "database/sql" + "errors" + "fmt" + "io" + "log" + "math" + "os" + "path/filepath" + "strings" + "time" + "weatherstation/internal/database" +) + +// Exporter 负责每10分钟导出 CSV(含ZTD融合) +type Exporter struct { + pg *sql.DB + my *sql.DB + loc *time.Location // Asia/Shanghai +} + +func NewExporter() *Exporter { + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + return &Exporter{ + pg: database.GetDB(), + my: database.GetMySQL(), + loc: loc, + } +} + +// Start 启动调度循环(阻塞) +func (e *Exporter) Start(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + now := time.Now().In(e.loc) + next := alignToNextBucketEnd(now, 10).Add(30 * time.Second) + log.Printf("exporter: now=%s next_run=%s", now.Format("2006-01-02 15:04:05"), next.Format("2006-01-02 15:04:05")) + delay := time.Until(next) + if delay > 0 { + timer := time.NewTimer(delay) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + } + } + + end := time.Now().In(e.loc).Truncate(time.Minute) + end = alignToPrevBucketEnd(end, 10) + start := end.Add(-10 * time.Minute) + + if err := e.exportBucket(ctx, start, end); err != nil { + log.Printf("export bucket %s-%s failed: %v", start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), err) + } + } +} + +// exportBucket 导出一个10分钟桶(CST) +func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time.Time) error { + utcDay := bucketEnd.UTC().Format("2006-01-02") + outDir := filepath.Join("export_data") + histDir := filepath.Join("export_data", "history") + if err := os.MkdirAll(histDir, 0o755); err != nil { + return err + } + activePath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", utcDay)) + log.Printf("exporter: begin bucket start=%s end=%s -> %s", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), activePath) + + // 轮转上一 UTC 日的文件(若存在且未压缩) + if err := rotatePreviousUTC(outDir, histDir, utcDay); err != nil { + log.Printf("rotate previous day failed: %v", err) + } + + needHeader := ensureFileWithHeader(activePath) + + rows, err := e.pg.QueryContext(ctx, ` + SELECT + s.latitude, + s.longitude, + s.device_id, + s.altitude, + r.pressure_hpa_x100, + r.temp_c_x100, + r.wind_speed_ms_x1000, + r.wind_dir_deg, + r.humidity_pct, + r.bucket_start + FROM stations s + JOIN rs485_weather_10min r ON r.station_id = s.station_id AND r.bucket_start = $1 + WHERE s.device_type = 'WH65LP' + AND s.latitude IS NOT NULL AND s.longitude IS NOT NULL + AND s.latitude <> 0 AND s.longitude <> 0 + ORDER BY s.device_id + `, bucketStart) + if err != nil { + return err + } + defer rows.Close() + + f, err := os.OpenFile(activePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + return err + } + defer f.Close() + if needHeader { + if _, err := f.WriteString(headerLine() + "\n"); err != nil { + return err + } + } + + var total, ztdHit, ztdMiss int + + for rows.Next() { + var ( + lat, lon, elev sql.NullFloat64 + deviceID string + pX100, tX100 sql.NullInt64 + wsX1000 sql.NullInt64 + wdDeg sql.NullInt64 + rh sql.NullInt64 + bucketStartTS time.Time + ) + if err := rows.Scan(&lat, &lon, &deviceID, &elev, &pX100, &tX100, &wsX1000, &wdDeg, &rh, &bucketStartTS); err != nil { + log.Printf("scan row failed: %v", err) + continue + } + + // date_time 使用 Go 端按 CST 格式化 + dateTimeStr := bucketStartTS.In(e.loc).Format("2006-01-02 15:04:05") + + var pressureStr, tempStr, wsStr, wdStr, rhStr string + if pX100.Valid { + pressureStr = fmtFloat(float64(pX100.Int64)/100.0, 2) + } + if tX100.Valid { + tempStr = fmtFloat(float64(tX100.Int64)/100.0, 2) + } + if wsX1000.Valid { + wsStr = fmtFloat(float64(wsX1000.Int64)/1000.0, 3) + } + if wdDeg.Valid { + wdStr = fmtFloat(float64(wdDeg.Int64), 0) + } + if rh.Valid { + rhStr = fmtFloat(float64(rh.Int64), 0) + } + + ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd) + if ztdStr != "" { + ztdHit++ + } else { + ztdMiss++ + } + + var b strings.Builder + b.WriteString(fmtNullFloat(lat)) + b.WriteByte(',') + b.WriteString(fmtNullFloat(lon)) + b.WriteByte(',') + b.WriteString(deviceID) + b.WriteByte(',') + b.WriteByte(',') // station_name 留空 + b.WriteString(dateTimeStr) + b.WriteByte(',') + b.WriteString(fmtNullFloat(elev)) + b.WriteByte(',') + b.WriteString(pressureStr) + b.WriteByte(',') + b.WriteString(tempStr) + b.WriteByte(',') + b.WriteByte(',') // dewpoint 留空 + b.WriteString(wsStr) + b.WriteByte(',') + b.WriteString(wdStr) + b.WriteByte(',') + b.WriteString(rhStr) + b.WriteByte(',') + b.WriteString(ztdStr) + b.WriteByte(',') // pwv 留空 + + if _, err := f.WriteString(b.String() + "\n"); err != nil { + log.Printf("write csv failed: %v", err) + } + total++ + } + if err := rows.Err(); err != nil { + return err + } + log.Printf("exporter: done bucket start=%s end=%s, rows=%d, ztd_hit=%d, ztd_miss=%d", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), total, ztdHit, ztdMiss) + return nil +} + +func (e *Exporter) lookupZTD(ctx context.Context, deviceID string, bucketEnd time.Time) string { + if e.my == nil { + return "" + } + var ztd sql.NullFloat64 + var ts time.Time + err := e.my.QueryRowContext(ctx, ` + SELECT ztd, timestamp FROM rtk_data + WHERE station_id = ? + AND ABS(TIMESTAMPDIFF(MINUTE, timestamp, ?)) <= 5 + LIMIT 1 + `, deviceID, bucketEnd).Scan(&ztd, &ts) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + log.Printf("mysql query ztd failed: %v", err) + } + return "" + } + if !ztd.Valid { + return "" + } + return fmtFloat(ztd.Float64*1000.0, -1) +} + +func ensureFileWithHeader(path string) bool { + if _, err := os.Stat(path); err == nil { + return false + } + dir := filepath.Dir(path) + _ = os.MkdirAll(dir, 0o755) + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + log.Printf("create csv failed: %v", err) + return false + } + _ = f.Close() + return true +} + +func headerLine() string { + return "latitude,longitude,station_id,station_name,date_time,elevation,pressure,temperature,dewpoint,wind_speed,wind_direction,relative_humidity,ztd,pwv" +} + +func alignToNextBucketEnd(t time.Time, minutes int) time.Time { + m := t.Minute() + next := (m/minutes + 1) * minutes + dt := time.Duration(next-m) * time.Minute + return t.Truncate(time.Minute).Add(dt).Truncate(time.Minute) +} + +func alignToPrevBucketEnd(t time.Time, minutes int) time.Time { + m := t.Minute() + prev := (m / minutes) * minutes + return t.Truncate(time.Minute).Add(time.Duration(prev-m) * time.Minute).Add(time.Duration(minutes) * time.Minute) +} + +func fmtNullFloat(v sql.NullFloat64) string { + if v.Valid { + return fmtFloat(v.Float64, -1) + } + return "" +} + +// fmtFloat: prec < 0 表示用不固定小数(去除无意义零),否则保留指定小数位 +func fmtFloat(fv float64, prec int) string { + if prec >= 0 { + return fmt.Sprintf("%.*f", prec, fv) + } + s := fmt.Sprintf("%.10f", fv) + s = strings.TrimRight(s, "0") + s = strings.TrimRight(s, ".") + if s == "-0" { + s = "0" + } + if s == "" || s == "-" || s == "+" || s == "." { + return "0" + } + if math.Abs(fv) < 1e-9 { + return "0" + } + return s +} + +// rotatePreviousUTC 将上一UTC日的活跃CSV压缩到history目录 +func rotatePreviousUTC(outDir, histDir, currentUTC string) error { + // 计算昨日 UTC 日期 + curDay, err := time.Parse("2006-01-02", currentUTC) + if err != nil { + return nil + } + yesterday := curDay.Add(-24 * time.Hour).Format("2006-01-02") + prevPath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", yesterday)) + gzPath := filepath.Join(histDir, fmt.Sprintf("weather_data_%s.csv.gz", yesterday)) + + if _, err := os.Stat(prevPath); err != nil { + return nil + } + if _, err := os.Stat(gzPath); err == nil { + // 已压缩 + return nil + } + return gzipFile(prevPath, gzPath) +} + +func gzipFile(src, dst string) error { + srcF, err := os.Open(src) + if err != nil { + return err + } + defer srcF.Close() + + if err := os.MkdirAll(filepath.Dir(dst), 0o755); err != nil { + return err + } + dstF, err := os.Create(dst) + if err != nil { + return err + } + defer func() { + _ = dstF.Close() + }() + + gw := gzip.NewWriter(dstF) + gw.Name = filepath.Base(src) + gw.ModTime = time.Now() + defer func() { _ = gw.Close() }() + + if _, err := io.Copy(gw, srcF); err != nil { + return err + } + // 压缩成功后删除原文件 + return os.Remove(src) +}