339 lines
8.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package tools
import (
"compress/gzip"
"context"
"database/sql"
"errors"
"fmt"
"io"
"log"
"math"
"os"
"path/filepath"
"strings"
"time"
"weatherstation/internal/database"
)
// Exporter 负责每10分钟导出 CSV含ZTD融合
type Exporter struct {
pg *sql.DB
my *sql.DB
loc *time.Location // Asia/Shanghai
}
func NewExporter() *Exporter {
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
return &Exporter{
pg: database.GetDB(),
my: database.GetMySQL(),
loc: loc,
}
}
// Start 启动调度循环(阻塞)
func (e *Exporter) Start(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
now := time.Now().In(e.loc)
next := alignToNextBucketEnd(now, 10).Add(30 * time.Second)
log.Printf("exporter: now=%s next_run=%s", now.Format("2006-01-02 15:04:05"), next.Format("2006-01-02 15:04:05"))
delay := time.Until(next)
if delay > 0 {
timer := time.NewTimer(delay)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
}
}
end := time.Now().In(e.loc).Truncate(time.Minute)
end = alignToPrevBucketEnd(end, 10)
start := end.Add(-10 * time.Minute)
if err := e.exportBucket(ctx, start, end); err != nil {
log.Printf("export bucket %s-%s failed: %v", start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), err)
}
}
}
// exportBucket 导出一个10分钟桶CST
func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time.Time) error {
utcDay := bucketEnd.UTC().Format("2006-01-02")
outDir := filepath.Join("export_data")
histDir := filepath.Join("export_data", "history")
if err := os.MkdirAll(histDir, 0o755); err != nil {
return err
}
activePath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", utcDay))
log.Printf("exporter: begin bucket start=%s end=%s -> %s", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), activePath)
// 轮转上一 UTC 日的文件(若存在且未压缩)
if err := rotatePreviousUTC(outDir, histDir, utcDay); err != nil {
log.Printf("rotate previous day failed: %v", err)
}
needHeader := ensureFileWithHeader(activePath)
rows, err := e.pg.QueryContext(ctx, `
SELECT
s.latitude,
s.longitude,
s.device_id,
s.altitude,
r.pressure_hpa_x100,
r.temp_c_x100,
r.wind_speed_ms_x1000,
r.wind_dir_deg,
r.humidity_pct,
r.bucket_start
FROM stations s
JOIN rs485_weather_10min r ON r.station_id = s.station_id AND r.bucket_start = $1
WHERE s.device_type = 'WH65LP'
AND s.latitude IS NOT NULL AND s.longitude IS NOT NULL
AND s.latitude <> 0 AND s.longitude <> 0
ORDER BY s.device_id
`, bucketStart)
if err != nil {
return err
}
defer rows.Close()
f, err := os.OpenFile(activePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return err
}
defer f.Close()
if needHeader {
if _, err := f.WriteString(headerLine() + "\n"); err != nil {
return err
}
}
var total, ztdHit, ztdMiss int
for rows.Next() {
var (
lat, lon, elev sql.NullFloat64
deviceID string
pX100, tX100 sql.NullInt64
wsX1000 sql.NullInt64
wdDeg sql.NullInt64
rh sql.NullInt64
bucketStartTS time.Time
)
if err := rows.Scan(&lat, &lon, &deviceID, &elev, &pX100, &tX100, &wsX1000, &wdDeg, &rh, &bucketStartTS); err != nil {
log.Printf("scan row failed: %v", err)
continue
}
// date_time 使用 Go 端按 CST 格式化
dateTimeStr := bucketStartTS.In(e.loc).Format("2006-01-02 15:04:05")
var pressureStr, tempStr, wsStr, wdStr, rhStr string
if pX100.Valid {
pressureStr = fmtFloat(float64(pX100.Int64)/100.0, 2)
}
if tX100.Valid {
tempStr = fmtFloat(float64(tX100.Int64)/100.0, 2)
}
if wsX1000.Valid {
wsStr = fmtFloat(float64(wsX1000.Int64)/1000.0, 3)
}
if wdDeg.Valid {
wdStr = fmtFloat(float64(wdDeg.Int64), 0)
}
if rh.Valid {
rhStr = fmtFloat(float64(rh.Int64), 0)
}
ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd)
if ztdStr != "" {
ztdHit++
} else {
ztdMiss++
}
var b strings.Builder
b.WriteString(fmtNullFloat(lat))
b.WriteByte(',')
b.WriteString(fmtNullFloat(lon))
b.WriteByte(',')
b.WriteString(deviceID)
b.WriteByte(',')
b.WriteByte(',') // station_name 留空
b.WriteString(dateTimeStr)
b.WriteByte(',')
b.WriteString(fmtNullFloat(elev))
b.WriteByte(',')
b.WriteString(pressureStr)
b.WriteByte(',')
b.WriteString(tempStr)
b.WriteByte(',')
b.WriteByte(',') // dewpoint 留空
b.WriteString(wsStr)
b.WriteByte(',')
b.WriteString(wdStr)
b.WriteByte(',')
b.WriteString(rhStr)
b.WriteByte(',')
b.WriteString(ztdStr)
b.WriteByte(',') // pwv 留空
if _, err := f.WriteString(b.String() + "\n"); err != nil {
log.Printf("write csv failed: %v", err)
}
total++
}
if err := rows.Err(); err != nil {
return err
}
log.Printf("exporter: done bucket start=%s end=%s, rows=%d, ztd_hit=%d, ztd_miss=%d", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), total, ztdHit, ztdMiss)
return nil
}
func (e *Exporter) lookupZTD(ctx context.Context, deviceID string, bucketEnd time.Time) string {
if e.my == nil {
return ""
}
var ztd sql.NullFloat64
var ts time.Time
err := e.my.QueryRowContext(ctx, `
SELECT ztd, timestamp FROM rtk_data
WHERE station_id = ?
AND ABS(TIMESTAMPDIFF(MINUTE, timestamp, ?)) <= 5
LIMIT 1
`, deviceID, bucketEnd).Scan(&ztd, &ts)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
log.Printf("mysql query ztd failed: %v", err)
}
return ""
}
if !ztd.Valid {
return ""
}
return fmtFloat(ztd.Float64*1000.0, -1)
}
func ensureFileWithHeader(path string) bool {
if _, err := os.Stat(path); err == nil {
return false
}
dir := filepath.Dir(path)
_ = os.MkdirAll(dir, 0o755)
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
log.Printf("create csv failed: %v", err)
return false
}
_ = f.Close()
return true
}
func headerLine() string {
return "latitude,longitude,station_id,station_name,date_time,elevation,pressure,temperature,dewpoint,wind_speed,wind_direction,relative_humidity,ztd,pwv"
}
func alignToNextBucketEnd(t time.Time, minutes int) time.Time {
m := t.Minute()
next := (m/minutes + 1) * minutes
dt := time.Duration(next-m) * time.Minute
return t.Truncate(time.Minute).Add(dt).Truncate(time.Minute)
}
func alignToPrevBucketEnd(t time.Time, minutes int) time.Time {
m := t.Minute()
prev := (m / minutes) * minutes
return t.Truncate(time.Minute).Add(time.Duration(prev-m) * time.Minute).Add(time.Duration(minutes) * time.Minute)
}
func fmtNullFloat(v sql.NullFloat64) string {
if v.Valid {
return fmtFloat(v.Float64, -1)
}
return ""
}
// fmtFloat: prec < 0 表示用不固定小数(去除无意义零),否则保留指定小数位
func fmtFloat(fv float64, prec int) string {
if prec >= 0 {
return fmt.Sprintf("%.*f", prec, fv)
}
s := fmt.Sprintf("%.10f", fv)
s = strings.TrimRight(s, "0")
s = strings.TrimRight(s, ".")
if s == "-0" {
s = "0"
}
if s == "" || s == "-" || s == "+" || s == "." {
return "0"
}
if math.Abs(fv) < 1e-9 {
return "0"
}
return s
}
// rotatePreviousUTC 将上一UTC日的活跃CSV压缩到history目录
func rotatePreviousUTC(outDir, histDir, currentUTC string) error {
// 计算昨日 UTC 日期
curDay, err := time.Parse("2006-01-02", currentUTC)
if err != nil {
return nil
}
yesterday := curDay.Add(-24 * time.Hour).Format("2006-01-02")
prevPath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", yesterday))
gzPath := filepath.Join(histDir, fmt.Sprintf("weather_data_%s.csv.gz", yesterday))
if _, err := os.Stat(prevPath); err != nil {
return nil
}
if _, err := os.Stat(gzPath); err == nil {
// 已压缩
return nil
}
return gzipFile(prevPath, gzPath)
}
func gzipFile(src, dst string) error {
srcF, err := os.Open(src)
if err != nil {
return err
}
defer srcF.Close()
if err := os.MkdirAll(filepath.Dir(dst), 0o755); err != nil {
return err
}
dstF, err := os.Create(dst)
if err != nil {
return err
}
defer func() {
_ = dstF.Close()
}()
gw := gzip.NewWriter(dstF)
gw.Name = filepath.Base(src)
gw.ModTime = time.Now()
defer func() { _ = gw.Close() }()
if _, err := io.Copy(gw, srcF); err != nil {
return err
}
// 压缩成功后删除原文件
return os.Remove(src)
}