560 lines
16 KiB
Go
560 lines
16 KiB
Go
package tools
|
||
|
||
import (
|
||
"compress/gzip"
|
||
"context"
|
||
"database/sql"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"math"
|
||
"net/http"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"time"
|
||
"weatherstation/internal/database"
|
||
)
|
||
|
||
// Exporter 负责每10分钟导出 CSV(含ZTD融合)
|
||
type Exporter struct {
|
||
pg *sql.DB
|
||
my *sql.DB
|
||
loc *time.Location // Asia/Shanghai
|
||
logger *log.Logger // 专用日志记录器
|
||
opts ExporterOptions
|
||
httpClient *http.Client
|
||
}
|
||
|
||
// ExporterOptions 导出器可选项
|
||
type ExporterOptions struct {
|
||
// OverrideWindWithCaiyun 为 true 时,导出CSV时用彩云实况覆盖风速/风向
|
||
OverrideWindWithCaiyun bool
|
||
// CaiyunToken 彩云API令牌
|
||
CaiyunToken string
|
||
}
|
||
|
||
func NewExporter() *Exporter {
|
||
return NewExporterWithOptions(ExporterOptions{})
|
||
}
|
||
|
||
func NewExporterWithOptions(opts ExporterOptions) *Exporter {
|
||
loc, _ := time.LoadLocation("Asia/Shanghai")
|
||
if loc == nil {
|
||
loc = time.FixedZone("CST", 8*3600)
|
||
}
|
||
|
||
// 创建导出专用日志文件(追加模式)
|
||
if err := os.MkdirAll("export_data", 0o755); err != nil {
|
||
log.Printf("创建导出日志目录失败: %v", err)
|
||
}
|
||
f, err := os.OpenFile(filepath.Join("export_data", "export.log"),
|
||
os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
|
||
if err != nil {
|
||
log.Printf("创建导出日志文件失败: %v", err)
|
||
}
|
||
|
||
// 使用自定义日志格式,包含时间戳
|
||
logger := log.New(f, "", log.Ldate|log.Ltime|log.Lmicroseconds)
|
||
|
||
return &Exporter{
|
||
pg: database.GetDB(),
|
||
my: database.GetMySQL(),
|
||
loc: loc,
|
||
logger: logger,
|
||
opts: opts,
|
||
httpClient: &http.Client{Timeout: 10 * time.Second},
|
||
}
|
||
}
|
||
|
||
// Start 启动调度循环(阻塞)
|
||
func (e *Exporter) Start(ctx context.Context) error {
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
default:
|
||
}
|
||
|
||
now := time.Now().In(e.loc)
|
||
// 下一个10分钟边界 + 10秒
|
||
next := alignToNextBucketEnd(now, 10).Add(10 * time.Second)
|
||
e.logger.Printf("调度: 当前=%s, 下次执行=%s (延迟10秒)", now.Format("2006-01-02 15:04:05"), next.Format("2006-01-02 15:04:05"))
|
||
delay := time.Until(next)
|
||
if delay > 0 {
|
||
timer := time.NewTimer(delay)
|
||
select {
|
||
case <-ctx.Done():
|
||
timer.Stop()
|
||
return ctx.Err()
|
||
case <-timer.C:
|
||
}
|
||
}
|
||
|
||
// 当前时间在 21:11:xx 时,应导出桶 [21:00, 21:10)
|
||
currentTime := time.Now().In(e.loc)
|
||
bucketEnd := alignToPrevBucketEnd(currentTime, 10)
|
||
bucketStart := bucketEnd.Add(-10 * time.Minute)
|
||
|
||
e.logger.Printf("当前时间=%s, 导出桶开始时间=%s, 桶结束时间=%s",
|
||
currentTime.Format("2006-01-02 15:04:05"),
|
||
bucketStart.Format("2006-01-02 15:04:05"),
|
||
bucketEnd.Format("2006-01-02 15:04:05"))
|
||
|
||
if err := e.exportBucket(ctx, bucketStart, bucketEnd); err != nil {
|
||
e.logger.Printf("导出桶 %s-%s 失败: %v", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), err)
|
||
}
|
||
}
|
||
}
|
||
|
||
// exportBucket 导出一个10分钟桶(CST)
|
||
func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time.Time) error {
|
||
utcDay := bucketEnd.UTC().Format("2006-01-02")
|
||
outDir := filepath.Join("export_data")
|
||
histDir := filepath.Join("export_data", "history")
|
||
if err := os.MkdirAll(histDir, 0o755); err != nil {
|
||
return err
|
||
}
|
||
activePath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", utcDay))
|
||
e.logger.Printf("开始导出: 桶开始时间=%s, 桶结束时间=%s, 文件=%s",
|
||
bucketStart.Format("2006-01-02 15:04:05"),
|
||
bucketEnd.Format("2006-01-02 15:04:05"),
|
||
activePath)
|
||
|
||
// 先查询所有符合条件的站点,用于后续比对缺失
|
||
var eligibleStations []struct {
|
||
StationID string
|
||
DeviceID string
|
||
}
|
||
stationsRows, err := e.pg.QueryContext(ctx, `
|
||
SELECT station_id, device_id
|
||
FROM stations
|
||
WHERE device_type = 'WH65LP'
|
||
AND latitude IS NOT NULL AND longitude IS NOT NULL
|
||
AND latitude <> 0 AND longitude <> 0
|
||
ORDER BY station_id
|
||
`)
|
||
if err != nil {
|
||
e.logger.Printf("查询合格站点失败: %v", err)
|
||
} else {
|
||
defer stationsRows.Close()
|
||
for stationsRows.Next() {
|
||
var station struct {
|
||
StationID string
|
||
DeviceID string
|
||
}
|
||
if err := stationsRows.Scan(&station.StationID, &station.DeviceID); err == nil {
|
||
eligibleStations = append(eligibleStations, station)
|
||
}
|
||
}
|
||
e.logger.Printf("合格站点总数: %d", len(eligibleStations))
|
||
for _, s := range eligibleStations {
|
||
e.logger.Printf("合格站点: station_id=%s, device_id=%s", s.StationID, s.DeviceID)
|
||
}
|
||
}
|
||
|
||
// 轮转上一 UTC 日的文件(若存在且未压缩)
|
||
if err := rotatePreviousUTC(outDir, histDir, utcDay); err != nil {
|
||
e.logger.Printf("轮转上一日文件失败: %v", err)
|
||
}
|
||
|
||
needHeader := ensureFileWithHeader(activePath)
|
||
|
||
// 查询该桶的数据 - 使用桶开始时间查询
|
||
e.logger.Printf("查询数据: 使用bucket_start=%s", bucketStart.Format("2006-01-02 15:04:05"))
|
||
rows, err := e.pg.QueryContext(ctx, `
|
||
SELECT
|
||
s.latitude,
|
||
s.longitude,
|
||
s.device_id,
|
||
s.altitude,
|
||
r.pressure_hpa_x100,
|
||
r.temp_c_x100,
|
||
r.wind_speed_ms_x1000,
|
||
r.wind_dir_deg,
|
||
r.humidity_pct,
|
||
r.bucket_start,
|
||
s.station_id
|
||
FROM stations s
|
||
JOIN rs485_weather_10min r ON r.station_id = s.station_id AND r.bucket_start = $1
|
||
WHERE s.device_type = 'WH65LP'
|
||
AND s.latitude IS NOT NULL AND s.longitude IS NOT NULL
|
||
AND s.latitude <> 0 AND s.longitude <> 0
|
||
ORDER BY s.station_id
|
||
`, bucketStart)
|
||
if err != nil {
|
||
return fmt.Errorf("查询10分钟数据失败: %v", err)
|
||
}
|
||
defer rows.Close()
|
||
|
||
f, err := os.OpenFile(activePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer f.Close()
|
||
if needHeader {
|
||
if _, err := f.WriteString(headerLine() + "\n"); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
var total, ztdHit, ztdMiss int
|
||
processedStationIDs := make(map[string]bool)
|
||
|
||
for rows.Next() {
|
||
var (
|
||
lat, lon, elev sql.NullFloat64
|
||
deviceID string
|
||
pX100, tX100 sql.NullInt64
|
||
wsX1000 sql.NullInt64
|
||
wdDeg sql.NullInt64
|
||
rh sql.NullInt64
|
||
bucketStartTS time.Time
|
||
stationID string
|
||
)
|
||
if err := rows.Scan(&lat, &lon, &deviceID, &elev, &pX100, &tX100, &wsX1000, &wdDeg, &rh, &bucketStartTS, &stationID); err != nil {
|
||
e.logger.Printf("扫描行失败: %v", err)
|
||
continue
|
||
}
|
||
|
||
processedStationIDs[stationID] = true
|
||
e.logger.Printf("处理站点: station_id=%s, device_id=%s, 经度=%.6f, 纬度=%.6f", stationID, deviceID, lon.Float64, lat.Float64)
|
||
|
||
// CSV 使用桶末时间作为 date_time(用于表意 10 分钟区间的右端点)
|
||
dateTimeStr := bucketEnd.In(e.loc).Format("2006-01-02 15:04:05")
|
||
e.logger.Printf("站点 %s: 写出时间(桶末)=%s,用于 ZTD 对齐参考", stationID, dateTimeStr)
|
||
|
||
var pressureStr, tempStr, wsStr, wdStr, rhStr string
|
||
if pX100.Valid {
|
||
pressureStr = fmtFloat(float64(pX100.Int64)/100.0, 2)
|
||
}
|
||
if tX100.Valid {
|
||
tempStr = fmtFloat(float64(tX100.Int64)/100.0, 2)
|
||
}
|
||
if wsX1000.Valid {
|
||
wsStr = fmtFloat(float64(wsX1000.Int64)/1000.0, 3)
|
||
}
|
||
if wdDeg.Valid {
|
||
wdStr = fmtFloat(float64(wdDeg.Int64), 0)
|
||
}
|
||
if rh.Valid {
|
||
rhStr = fmtFloat(float64(rh.Int64), 0)
|
||
}
|
||
|
||
// 如果需要,使用彩云实况覆盖风速/风向
|
||
if e.opts.OverrideWindWithCaiyun && lat.Valid && lon.Valid && e.opts.CaiyunToken != "" {
|
||
if spd, dir, ok := e.fetchCaiyunRealtimeWind(ctx, lat.Float64, lon.Float64); ok {
|
||
wsStr = fmtFloat(spd, 3)
|
||
wdStr = fmtFloat(dir, 0)
|
||
e.logger.Printf("站点 %s: 使用彩云实况覆盖风: speed=%.3f m/s, dir=%.0f°", stationID, spd, dir)
|
||
} else {
|
||
e.logger.Printf("站点 %s: 彩云实况风获取失败,保留数据库值", stationID)
|
||
}
|
||
}
|
||
|
||
// 使用device_id查询ZTD,使用桶末时间
|
||
ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd)
|
||
if ztdStr != "" {
|
||
ztdHit++
|
||
e.logger.Printf("站点 %s (device_id=%s): ZTD 数据正常, 值=%s", stationID, deviceID, ztdStr)
|
||
} else {
|
||
ztdMiss++
|
||
e.logger.Printf("站点 %s (device_id=%s): ZTD 数据缺失或超出5分钟窗口", stationID, deviceID)
|
||
}
|
||
|
||
var b strings.Builder
|
||
b.WriteString(fmtNullFloat(lat))
|
||
b.WriteByte(',')
|
||
b.WriteString(fmtNullFloat(lon))
|
||
b.WriteByte(',')
|
||
b.WriteString(deviceID) // CSV输出用device_id作为station_id
|
||
b.WriteByte(',')
|
||
b.WriteByte(',') // station_name 留空
|
||
b.WriteString(dateTimeStr)
|
||
b.WriteByte(',')
|
||
b.WriteString(fmtNullFloat(elev))
|
||
b.WriteByte(',')
|
||
b.WriteString(pressureStr)
|
||
b.WriteByte(',')
|
||
b.WriteString(tempStr)
|
||
b.WriteByte(',')
|
||
b.WriteByte(',') // dewpoint 留空
|
||
b.WriteString(wsStr)
|
||
b.WriteByte(',')
|
||
b.WriteString(wdStr)
|
||
b.WriteByte(',')
|
||
b.WriteString(rhStr)
|
||
b.WriteByte(',')
|
||
b.WriteString(ztdStr)
|
||
b.WriteByte(',') // pwv 留空
|
||
|
||
if _, err := f.WriteString(b.String() + "\n"); err != nil {
|
||
e.logger.Printf("写入CSV失败: %v", err)
|
||
}
|
||
total++
|
||
}
|
||
if err := rows.Err(); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 检查哪些站点在这个桶完全没有数据
|
||
var missingCount int
|
||
for _, station := range eligibleStations {
|
||
if !processedStationIDs[station.StationID] {
|
||
e.logger.Printf("站点缺失: station_id=%s, device_id=%s (rs485_weather_10min 表中未找到记录)",
|
||
station.StationID, station.DeviceID)
|
||
missingCount++
|
||
}
|
||
}
|
||
|
||
e.logger.Printf("导出完成: 桶开始时间=%s, 桶结束时间=%s, 总行数=%d, ZTD命中=%d, ZTD未命中=%d, 缺失站点数=%d",
|
||
bucketStart.Format("2006-01-02 15:04:05"),
|
||
bucketEnd.Format("2006-01-02 15:04:05"),
|
||
total, ztdHit, ztdMiss, missingCount)
|
||
|
||
return nil
|
||
}
|
||
|
||
// 导出一个日期范围内的全部10分钟桶(CST),start 与 end 为“日期起止(含)”
|
||
func (e *Exporter) ExportRange(ctx context.Context, startDate, endDate string) error {
|
||
// 解析日期,支持 YYYY-MM-DD 或 YYYYMMDD
|
||
parse := func(s string) (time.Time, error) {
|
||
if len(s) == 8 {
|
||
// YYYYMMDD -> YYYY-MM-DD
|
||
s = s[:4] + "-" + s[4:6] + "-" + s[6:8]
|
||
}
|
||
loc := e.loc
|
||
if loc == nil {
|
||
loc = time.FixedZone("CST", 8*3600)
|
||
}
|
||
return time.ParseInLocation("2006-01-02", s, loc)
|
||
}
|
||
|
||
fromDay, err := parse(startDate)
|
||
if err != nil {
|
||
return fmt.Errorf("起始日期解析失败: %v", err)
|
||
}
|
||
toDay, err := parse(endDate)
|
||
if err != nil {
|
||
return fmt.Errorf("结束日期解析失败: %v", err)
|
||
}
|
||
if toDay.Before(fromDay) {
|
||
return fmt.Errorf("结束日期早于起始日期")
|
||
}
|
||
|
||
// 日期起止 -> 时间范围(含):[fromDay 00:00, toDay 23:59:59]
|
||
from := time.Date(fromDay.Year(), fromDay.Month(), fromDay.Day(), 0, 0, 0, 0, e.loc)
|
||
to := time.Date(toDay.Year(), toDay.Month(), toDay.Day(), 23, 59, 59, 0, e.loc)
|
||
|
||
firstBucket := from.Truncate(10 * time.Minute)
|
||
lastBucket := to.Truncate(10 * time.Minute)
|
||
|
||
for b := firstBucket; !b.After(lastBucket); b = b.Add(10 * time.Minute) {
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
default:
|
||
}
|
||
bucketStart := b
|
||
bucketEnd := b.Add(10 * time.Minute)
|
||
if err := e.exportBucket(ctx, bucketStart, bucketEnd); err != nil {
|
||
// 不中断整个范围导出,记录错误继续
|
||
e.logger.Printf("范围导出: 桶 %s-%s 导出失败: %v", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), err)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (e *Exporter) lookupZTD(ctx context.Context, deviceID string, bucketEnd time.Time) string {
|
||
if e.my == nil {
|
||
return ""
|
||
}
|
||
var ztd sql.NullFloat64
|
||
var ts time.Time
|
||
err := e.my.QueryRowContext(ctx, `
|
||
SELECT ztd, timestamp FROM rtk_data
|
||
WHERE station_id = ?
|
||
AND ABS(TIMESTAMPDIFF(MINUTE, timestamp, ?)) <= 5
|
||
LIMIT 1
|
||
`, deviceID, bucketEnd).Scan(&ztd, &ts)
|
||
if err != nil {
|
||
if !errors.Is(err, sql.ErrNoRows) {
|
||
e.logger.Printf("查询ZTD失败: station_id=%s, error=%v", deviceID, err)
|
||
}
|
||
return ""
|
||
}
|
||
if !ztd.Valid {
|
||
e.logger.Printf("站点 device_id=%s: ZTD值为NULL", deviceID)
|
||
return ""
|
||
}
|
||
return fmtFloat(ztd.Float64*100.0, -1)
|
||
}
|
||
|
||
// fetchCaiyunRealtimeWind 拉取彩云实时风(m/s, 度)。lat,lon为纬度、经度。
|
||
func (e *Exporter) fetchCaiyunRealtimeWind(ctx context.Context, lat, lon float64) (float64, float64, bool) {
|
||
if e.httpClient == nil || e.opts.CaiyunToken == "" {
|
||
return 0, 0, false
|
||
}
|
||
type realtimeResp struct {
|
||
Status string `json:"status"`
|
||
Unit string `json:"unit"`
|
||
Result struct {
|
||
Realtime struct {
|
||
Status string `json:"status"`
|
||
Wind struct {
|
||
Speed float64 `json:"speed"`
|
||
Direction float64 `json:"direction"`
|
||
} `json:"wind"`
|
||
} `json:"realtime"`
|
||
} `json:"result"`
|
||
}
|
||
url := fmt.Sprintf("https://api.caiyunapp.com/v2.6/%s/%f,%f/realtime?unit=SI", e.opts.CaiyunToken, lon, lat)
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||
if err != nil {
|
||
return 0, 0, false
|
||
}
|
||
resp, err := e.httpClient.Do(req)
|
||
if err != nil {
|
||
return 0, 0, false
|
||
}
|
||
defer resp.Body.Close()
|
||
if resp.StatusCode/100 != 2 {
|
||
return 0, 0, false
|
||
}
|
||
body, err := io.ReadAll(resp.Body)
|
||
if err != nil {
|
||
return 0, 0, false
|
||
}
|
||
var data realtimeResp
|
||
if err := json.Unmarshal(body, &data); err != nil {
|
||
return 0, 0, false
|
||
}
|
||
if strings.ToLower(data.Status) != "ok" || strings.ToLower(data.Result.Realtime.Status) != "ok" {
|
||
return 0, 0, false
|
||
}
|
||
// 使用 SI 单位,风速为 m/s;风向为弧度,这里转换为度[0,360)
|
||
spd := data.Result.Realtime.Wind.Speed
|
||
dirRad := data.Result.Realtime.Wind.Direction
|
||
dirDeg := dirRad * 180.0 / math.Pi
|
||
for dirDeg < 0 {
|
||
dirDeg += 360
|
||
}
|
||
for dirDeg >= 360 {
|
||
dirDeg -= 360
|
||
}
|
||
return spd, dirDeg, true
|
||
}
|
||
|
||
func ensureFileWithHeader(path string) bool {
|
||
if _, err := os.Stat(path); err == nil {
|
||
return false
|
||
}
|
||
dir := filepath.Dir(path)
|
||
_ = os.MkdirAll(dir, 0o755)
|
||
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0o644)
|
||
if err != nil {
|
||
log.Printf("create csv failed: %v", err)
|
||
return false
|
||
}
|
||
_ = f.Close()
|
||
return true
|
||
}
|
||
|
||
func headerLine() string {
|
||
return "latitude,longitude,station_id,station_name,date_time,elevation,pressure,temperature,dewpoint,wind_speed,wind_direction,relative_humidity,ztd,pwv"
|
||
}
|
||
|
||
func alignToNextBucketEnd(t time.Time, minutes int) time.Time {
|
||
m := t.Minute()
|
||
next := (m/minutes + 1) * minutes
|
||
dt := time.Duration(next-m) * time.Minute
|
||
return t.Truncate(time.Minute).Add(dt).Truncate(time.Minute)
|
||
}
|
||
|
||
func alignToPrevBucketEnd(t time.Time, minutes int) time.Time {
|
||
m := t.Minute()
|
||
prev := (m / minutes) * minutes
|
||
// 返回不超过当前时间的10分钟整点(例如 21:21 -> 21:20)
|
||
return t.Truncate(time.Minute).Add(time.Duration(prev-m) * time.Minute)
|
||
}
|
||
|
||
func fmtNullFloat(v sql.NullFloat64) string {
|
||
if v.Valid {
|
||
return fmtFloat(v.Float64, -1)
|
||
}
|
||
return ""
|
||
}
|
||
|
||
// fmtFloat: prec < 0 表示用不固定小数(去除无意义零),否则保留指定小数位
|
||
func fmtFloat(fv float64, prec int) string {
|
||
if prec >= 0 {
|
||
return fmt.Sprintf("%.*f", prec, fv)
|
||
}
|
||
s := fmt.Sprintf("%.10f", fv)
|
||
s = strings.TrimRight(s, "0")
|
||
s = strings.TrimRight(s, ".")
|
||
if s == "-0" {
|
||
s = "0"
|
||
}
|
||
if s == "" || s == "-" || s == "+" || s == "." {
|
||
return "0"
|
||
}
|
||
if math.Abs(fv) < 1e-9 {
|
||
return "0"
|
||
}
|
||
return s
|
||
}
|
||
|
||
// rotatePreviousUTC 将上一UTC日的活跃CSV压缩到history目录
|
||
func rotatePreviousUTC(outDir, histDir, currentUTC string) error {
|
||
// 计算昨日 UTC 日期
|
||
curDay, err := time.Parse("2006-01-02", currentUTC)
|
||
if err != nil {
|
||
return nil
|
||
}
|
||
yesterday := curDay.Add(-24 * time.Hour).Format("2006-01-02")
|
||
prevPath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", yesterday))
|
||
gzPath := filepath.Join(histDir, fmt.Sprintf("weather_data_%s.csv.gz", yesterday))
|
||
|
||
if _, err := os.Stat(prevPath); err != nil {
|
||
return nil
|
||
}
|
||
if _, err := os.Stat(gzPath); err == nil {
|
||
// 已压缩
|
||
return nil
|
||
}
|
||
return gzipFile(prevPath, gzPath)
|
||
}
|
||
|
||
func gzipFile(src, dst string) error {
|
||
srcF, err := os.Open(src)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer srcF.Close()
|
||
|
||
if err := os.MkdirAll(filepath.Dir(dst), 0o755); err != nil {
|
||
return err
|
||
}
|
||
dstF, err := os.Create(dst)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer func() {
|
||
_ = dstF.Close()
|
||
}()
|
||
|
||
gw := gzip.NewWriter(dstF)
|
||
gw.Name = filepath.Base(src)
|
||
gw.ModTime = time.Now()
|
||
defer func() { _ = gw.Close() }()
|
||
|
||
if _, err := io.Copy(gw, srcF); err != nil {
|
||
return err
|
||
}
|
||
// 压缩成功后删除原文件
|
||
return os.Remove(src)
|
||
}
|