2025-08-25 09:31:17 +08:00

426 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package tools
import (
"compress/gzip"
"context"
"database/sql"
"errors"
"fmt"
"io"
"log"
"math"
"os"
"path/filepath"
"strings"
"time"
"weatherstation/internal/database"
)
// Exporter 负责每10分钟导出 CSV含ZTD融合
type Exporter struct {
pg *sql.DB
my *sql.DB
loc *time.Location // Asia/Shanghai
logger *log.Logger // 专用日志记录器
}
func NewExporter() *Exporter {
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
// 创建导出专用日志文件(追加模式)
if err := os.MkdirAll("export_data", 0o755); err != nil {
log.Printf("创建导出日志目录失败: %v", err)
}
f, err := os.OpenFile(filepath.Join("export_data", "export.log"),
os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
log.Printf("创建导出日志文件失败: %v", err)
}
// 使用自定义日志格式,包含时间戳
logger := log.New(f, "", log.Ldate|log.Ltime|log.Lmicroseconds)
return &Exporter{
pg: database.GetDB(),
my: database.GetMySQL(),
loc: loc,
logger: logger,
}
}
// Start 启动调度循环(阻塞)
func (e *Exporter) Start(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
now := time.Now().In(e.loc)
// 下一个10分钟边界 + 10秒
next := alignToNextBucketEnd(now, 10).Add(10 * time.Second)
e.logger.Printf("调度: 当前=%s, 下次执行=%s (延迟10秒)", now.Format("2006-01-02 15:04:05"), next.Format("2006-01-02 15:04:05"))
delay := time.Until(next)
if delay > 0 {
timer := time.NewTimer(delay)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
}
}
// 当前时间在 21:11:xx 时,应导出桶 [21:00, 21:10)
currentTime := time.Now().In(e.loc)
bucketEnd := alignToPrevBucketEnd(currentTime, 10)
bucketStart := bucketEnd.Add(-10 * time.Minute)
e.logger.Printf("当前时间=%s, 导出桶开始时间=%s, 桶结束时间=%s",
currentTime.Format("2006-01-02 15:04:05"),
bucketStart.Format("2006-01-02 15:04:05"),
bucketEnd.Format("2006-01-02 15:04:05"))
if err := e.exportBucket(ctx, bucketStart, bucketEnd); err != nil {
e.logger.Printf("导出桶 %s-%s 失败: %v", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), err)
}
}
}
// exportBucket 导出一个10分钟桶CST
func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time.Time) error {
utcDay := bucketEnd.UTC().Format("2006-01-02")
outDir := filepath.Join("export_data")
histDir := filepath.Join("export_data", "history")
if err := os.MkdirAll(histDir, 0o755); err != nil {
return err
}
activePath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", utcDay))
e.logger.Printf("开始导出: 桶开始时间=%s, 桶结束时间=%s, 文件=%s",
bucketStart.Format("2006-01-02 15:04:05"),
bucketEnd.Format("2006-01-02 15:04:05"),
activePath)
// 先查询所有符合条件的站点,用于后续比对缺失
var eligibleStations []struct {
StationID string
DeviceID string
}
stationsRows, err := e.pg.QueryContext(ctx, `
SELECT station_id, device_id
FROM stations
WHERE device_type = 'WH65LP'
AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude <> 0 AND longitude <> 0
ORDER BY station_id
`)
if err != nil {
e.logger.Printf("查询合格站点失败: %v", err)
} else {
defer stationsRows.Close()
for stationsRows.Next() {
var station struct {
StationID string
DeviceID string
}
if err := stationsRows.Scan(&station.StationID, &station.DeviceID); err == nil {
eligibleStations = append(eligibleStations, station)
}
}
e.logger.Printf("合格站点总数: %d", len(eligibleStations))
for _, s := range eligibleStations {
e.logger.Printf("合格站点: station_id=%s, device_id=%s", s.StationID, s.DeviceID)
}
}
// 轮转上一 UTC 日的文件(若存在且未压缩)
if err := rotatePreviousUTC(outDir, histDir, utcDay); err != nil {
e.logger.Printf("轮转上一日文件失败: %v", err)
}
needHeader := ensureFileWithHeader(activePath)
// 查询该桶的数据 - 使用桶开始时间查询
e.logger.Printf("查询数据: 使用bucket_start=%s", bucketStart.Format("2006-01-02 15:04:05"))
rows, err := e.pg.QueryContext(ctx, `
SELECT
s.latitude,
s.longitude,
s.device_id,
s.altitude,
r.pressure_hpa_x100,
r.temp_c_x100,
r.wind_speed_ms_x1000,
r.wind_dir_deg,
r.humidity_pct,
r.bucket_start,
s.station_id
FROM stations s
JOIN rs485_weather_10min r ON r.station_id = s.station_id AND r.bucket_start = $1
WHERE s.device_type = 'WH65LP'
AND s.latitude IS NOT NULL AND s.longitude IS NOT NULL
AND s.latitude <> 0 AND s.longitude <> 0
ORDER BY s.station_id
`, bucketStart)
if err != nil {
return fmt.Errorf("查询10分钟数据失败: %v", err)
}
defer rows.Close()
f, err := os.OpenFile(activePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return err
}
defer f.Close()
if needHeader {
if _, err := f.WriteString(headerLine() + "\n"); err != nil {
return err
}
}
var total, ztdHit, ztdMiss int
processedStationIDs := make(map[string]bool)
for rows.Next() {
var (
lat, lon, elev sql.NullFloat64
deviceID string
pX100, tX100 sql.NullInt64
wsX1000 sql.NullInt64
wdDeg sql.NullInt64
rh sql.NullInt64
bucketStartTS time.Time
stationID string
)
if err := rows.Scan(&lat, &lon, &deviceID, &elev, &pX100, &tX100, &wsX1000, &wdDeg, &rh, &bucketStartTS, &stationID); err != nil {
e.logger.Printf("扫描行失败: %v", err)
continue
}
processedStationIDs[stationID] = true
e.logger.Printf("处理站点: station_id=%s, device_id=%s, 经度=%.6f, 纬度=%.6f", stationID, deviceID, lon.Float64, lat.Float64)
// CSV 使用桶末时间作为 date_time用于表意 10 分钟区间的右端点)
dateTimeStr := bucketEnd.In(e.loc).Format("2006-01-02 15:04:05")
e.logger.Printf("站点 %s: 写出时间(桶末)=%s用于 ZTD 对齐参考", stationID, dateTimeStr)
var pressureStr, tempStr, wsStr, wdStr, rhStr string
if pX100.Valid {
pressureStr = fmtFloat(float64(pX100.Int64)/100.0, 2)
}
if tX100.Valid {
tempStr = fmtFloat(float64(tX100.Int64)/100.0, 2)
}
if wsX1000.Valid {
wsStr = fmtFloat(float64(wsX1000.Int64)/1000.0, 3)
}
if wdDeg.Valid {
wdStr = fmtFloat(float64(wdDeg.Int64), 0)
}
if rh.Valid {
rhStr = fmtFloat(float64(rh.Int64), 0)
}
// 使用device_id查询ZTD使用桶末时间
ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd)
if ztdStr != "" {
ztdHit++
e.logger.Printf("站点 %s (device_id=%s): ZTD 数据正常, 值=%s", stationID, deviceID, ztdStr)
} else {
ztdMiss++
e.logger.Printf("站点 %s (device_id=%s): ZTD 数据缺失或超出5分钟窗口", stationID, deviceID)
}
var b strings.Builder
b.WriteString(fmtNullFloat(lat))
b.WriteByte(',')
b.WriteString(fmtNullFloat(lon))
b.WriteByte(',')
b.WriteString(deviceID) // CSV输出用device_id作为station_id
b.WriteByte(',')
b.WriteByte(',') // station_name 留空
b.WriteString(dateTimeStr)
b.WriteByte(',')
b.WriteString(fmtNullFloat(elev))
b.WriteByte(',')
b.WriteString(pressureStr)
b.WriteByte(',')
b.WriteString(tempStr)
b.WriteByte(',')
b.WriteByte(',') // dewpoint 留空
b.WriteString(wsStr)
b.WriteByte(',')
b.WriteString(wdStr)
b.WriteByte(',')
b.WriteString(rhStr)
b.WriteByte(',')
b.WriteString(ztdStr)
b.WriteByte(',') // pwv 留空
if _, err := f.WriteString(b.String() + "\n"); err != nil {
e.logger.Printf("写入CSV失败: %v", err)
}
total++
}
if err := rows.Err(); err != nil {
return err
}
// 检查哪些站点在这个桶完全没有数据
var missingCount int
for _, station := range eligibleStations {
if !processedStationIDs[station.StationID] {
e.logger.Printf("站点缺失: station_id=%s, device_id=%s (rs485_weather_10min 表中未找到记录)",
station.StationID, station.DeviceID)
missingCount++
}
}
e.logger.Printf("导出完成: 桶开始时间=%s, 桶结束时间=%s, 总行数=%d, ZTD命中=%d, ZTD未命中=%d, 缺失站点数=%d",
bucketStart.Format("2006-01-02 15:04:05"),
bucketEnd.Format("2006-01-02 15:04:05"),
total, ztdHit, ztdMiss, missingCount)
return nil
}
func (e *Exporter) lookupZTD(ctx context.Context, deviceID string, bucketEnd time.Time) string {
if e.my == nil {
return ""
}
var ztd sql.NullFloat64
var ts time.Time
err := e.my.QueryRowContext(ctx, `
SELECT ztd, timestamp FROM rtk_data
WHERE station_id = ?
AND ABS(TIMESTAMPDIFF(MINUTE, timestamp, ?)) <= 5
LIMIT 1
`, deviceID, bucketEnd).Scan(&ztd, &ts)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
e.logger.Printf("查询ZTD失败: station_id=%s, error=%v", deviceID, err)
}
return ""
}
if !ztd.Valid {
e.logger.Printf("站点 device_id=%s: ZTD值为NULL", deviceID)
return ""
}
return fmtFloat(ztd.Float64*100.0, -1)
}
func ensureFileWithHeader(path string) bool {
if _, err := os.Stat(path); err == nil {
return false
}
dir := filepath.Dir(path)
_ = os.MkdirAll(dir, 0o755)
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
log.Printf("create csv failed: %v", err)
return false
}
_ = f.Close()
return true
}
func headerLine() string {
return "latitude,longitude,station_id,station_name,date_time,elevation,pressure,temperature,dewpoint,wind_speed,wind_direction,relative_humidity,ztd,pwv"
}
func alignToNextBucketEnd(t time.Time, minutes int) time.Time {
m := t.Minute()
next := (m/minutes + 1) * minutes
dt := time.Duration(next-m) * time.Minute
return t.Truncate(time.Minute).Add(dt).Truncate(time.Minute)
}
func alignToPrevBucketEnd(t time.Time, minutes int) time.Time {
m := t.Minute()
prev := (m / minutes) * minutes
// 返回不超过当前时间的10分钟整点例如 21:21 -> 21:20
return t.Truncate(time.Minute).Add(time.Duration(prev-m) * time.Minute)
}
func fmtNullFloat(v sql.NullFloat64) string {
if v.Valid {
return fmtFloat(v.Float64, -1)
}
return ""
}
// fmtFloat: prec < 0 表示用不固定小数(去除无意义零),否则保留指定小数位
func fmtFloat(fv float64, prec int) string {
if prec >= 0 {
return fmt.Sprintf("%.*f", prec, fv)
}
s := fmt.Sprintf("%.10f", fv)
s = strings.TrimRight(s, "0")
s = strings.TrimRight(s, ".")
if s == "-0" {
s = "0"
}
if s == "" || s == "-" || s == "+" || s == "." {
return "0"
}
if math.Abs(fv) < 1e-9 {
return "0"
}
return s
}
// rotatePreviousUTC 将上一UTC日的活跃CSV压缩到history目录
func rotatePreviousUTC(outDir, histDir, currentUTC string) error {
// 计算昨日 UTC 日期
curDay, err := time.Parse("2006-01-02", currentUTC)
if err != nil {
return nil
}
yesterday := curDay.Add(-24 * time.Hour).Format("2006-01-02")
prevPath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", yesterday))
gzPath := filepath.Join(histDir, fmt.Sprintf("weather_data_%s.csv.gz", yesterday))
if _, err := os.Stat(prevPath); err != nil {
return nil
}
if _, err := os.Stat(gzPath); err == nil {
// 已压缩
return nil
}
return gzipFile(prevPath, gzPath)
}
func gzipFile(src, dst string) error {
srcF, err := os.Open(src)
if err != nil {
return err
}
defer srcF.Close()
if err := os.MkdirAll(filepath.Dir(dst), 0o755); err != nil {
return err
}
dstF, err := os.Create(dst)
if err != nil {
return err
}
defer func() {
_ = dstF.Close()
}()
gw := gzip.NewWriter(dstF)
gw.Name = filepath.Base(src)
gw.ModTime = time.Now()
defer func() { _ = gw.Close() }()
if _, err := io.Copy(gw, srcF); err != nil {
return err
}
// 压缩成功后删除原文件
return os.Remove(src)
}