feat: 新增数据导出

This commit is contained in:
yarnom 2025-11-12 16:29:22 +08:00
parent 5ec67da170
commit 8123ecb5c8

View File

@ -0,0 +1,667 @@
package main
import (
"bufio"
"context"
"database/sql"
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"math"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"weatherstation/internal/config"
"weatherstation/internal/database"
)
// This command maintains a rolling CSV export at weather_data_export/main.csv
// holding the last 48 hours of 10-minute bucket data, functionally mirroring
// internal/tools/exporter.go without reusing it directly.
const (
outBaseDir = "weather_data_export"
mainCSVName = "main.csv"
historyDir = "history"
csvHeader = "latitude,longitude,station_id,station_name,date_time,elevation,pressure,temperature,dewpoint,wind_speed,wind_direction,relative_humidity,ztd,pwv"
bucketMin = 10
windowHours = 48
)
type options struct {
// If true and CAIYUN_TOKEN provided, override wind fields from Caiyun realtime API.
overrideWind bool
caiyunToken string
}
func main() {
// Load config to initialize DB connections used by internal/database
_ = config.GetConfig()
pg := database.GetDB()
my := database.GetMySQL() // may be nil if not configured; functions handle nil
_ = my
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
// Options from env
opts := options{
overrideWind: isTruthy(os.Getenv("EXPORT_OVERRIDE_WIND")),
caiyunToken: getenvDefault("CAIYUN_TOKEN", ""),
}
// Ensure directories and header
base := outBaseDir
mainPath := filepath.Join(base, mainCSVName)
histPath := filepath.Join(base, historyDir)
mustMkdirAll(base)
mustMkdirAll(histPath)
ensureFileWithHeader(mainPath)
// On startup, backfill the last 48 hours (10-minute buckets) and enforce retention
now := time.Now().In(loc)
lastEnd := alignToPrevBucketEnd(now, bucketMin)
firstStart := lastEnd.Add(-windowHours * time.Hour)
// Iterate buckets: [firstStart, lastEnd] stepping 10 minutes
for b := firstStart; !b.After(lastEnd); b = b.Add(bucketMin * time.Minute) {
bucketStart := b
bucketEnd := b.Add(bucketMin * time.Minute)
if err := exportBucket(context.Background(), pg, my, loc, opts, bucketStart, bucketEnd, mainPath); err != nil {
log.Printf("startup export bucket %s-%s failed: %v", tf(bucketStart), tf(bucketEnd), err)
}
}
if err := enforceRetention(mainPath, histPath, loc, windowHours); err != nil {
log.Printf("startup retention failed: %v", err)
}
// Scheduler loop: every 10 minutes aligned to next bucket end + 10s
for {
now = time.Now().In(loc)
next := alignToNextBucketEnd(now, bucketMin).Add(10 * time.Second)
sleep := time.Until(next)
if sleep > 0 {
time.Sleep(sleep)
}
// Current bucket is (prevEnd-10m, prevEnd]
cur := time.Now().In(loc)
bucketEnd := alignToPrevBucketEnd(cur, bucketMin)
bucketStart := bucketEnd.Add(-bucketMin * time.Minute)
if err := exportBucket(context.Background(), pg, my, loc, opts, bucketStart, bucketEnd, mainPath); err != nil {
log.Printf("export bucket %s-%s failed: %v", tf(bucketStart), tf(bucketEnd), err)
}
if err := enforceRetention(mainPath, histPath, loc, windowHours); err != nil {
log.Printf("retention failed: %v", err)
}
}
}
// exportBucket renders one 10-minute bucket and appends to mainPath after removing
// any existing lines for that bucket to keep idempotence.
func exportBucket(ctx context.Context, pg, my *sql.DB, loc *time.Location, opts options, bucketStart, bucketEnd time.Time, mainPath string) error {
// Remove any existing lines for this bucket from mainPath first
if err := removeBucketFromMain(mainPath, loc, bucketEnd); err != nil {
return err
}
// First: WH65LP (10-min aggregated table)
rows, err := pg.QueryContext(ctx, `
SELECT
s.latitude,
s.longitude,
s.device_id,
s.altitude,
r.pressure_hpa_x100,
r.temp_c_x100,
r.wind_speed_ms_x1000,
r.wind_dir_deg,
r.humidity_pct,
r.bucket_start,
s.station_id
FROM stations s
JOIN rs485_weather_10min r ON r.station_id = s.station_id AND r.bucket_start = $1
WHERE s.device_type = 'WH65LP'
AND s.latitude IS NOT NULL AND s.longitude IS NOT NULL
AND s.latitude <> 0 AND s.longitude <> 0
ORDER BY s.station_id`, bucketStart)
if err != nil {
return fmt.Errorf("query bucket rows failed: %w", err)
}
defer rows.Close()
// Append rows
f, err := os.OpenFile(mainPath, os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return err
}
defer f.Close()
w := bufio.NewWriter(f)
defer w.Flush()
dateTimeStr := bucketEnd.In(loc).Format("2006-01-02 15:04:05")
for rows.Next() {
var (
lat, lon, elev sql.NullFloat64
deviceID string
pX100, tX100 sql.NullInt64
wsX1000 sql.NullInt64
wdDeg sql.NullInt64
rh sql.NullInt64
bucketStartTS time.Time
stationID string
)
if err := rows.Scan(&lat, &lon, &deviceID, &elev, &pX100, &tX100, &wsX1000, &wdDeg, &rh, &bucketStartTS, &stationID); err != nil {
log.Printf("scan row failed: %v", err)
continue
}
var pressureStr, tempStr, wsStr, wdStr, rhStr string
if pX100.Valid {
pressureStr = fmtFloat(float64(pX100.Int64)/100.0, 2)
}
if tX100.Valid {
tempStr = fmtFloat(float64(tX100.Int64)/100.0, 2)
}
if wsX1000.Valid {
wsStr = fmtFloat(float64(wsX1000.Int64)/1000.0, 3)
}
if wdDeg.Valid {
wdStr = fmtFloat(float64(wdDeg.Int64), 0)
}
if rh.Valid {
rhStr = fmtFloat(float64(rh.Int64), 0)
}
// Optional: override wind from Caiyun realtime
if opts.overrideWind && opts.caiyunToken != "" && lat.Valid && lon.Valid {
if spd, dir, ok := fetchCaiyunRealtimeWind(ctx, opts.caiyunToken, lat.Float64, lon.Float64); ok {
wsStr = fmtFloat(spd, 3)
wdStr = fmtFloat(dir, 0)
}
}
// ZTD lookup from MySQL within ±5 minutes around bucketEnd
ztdStr := lookupZTD(ctx, my, deviceID, bucketEnd)
// Build CSV line: use device_id as station_id, station_name empty, dewpoint/pwv empty
var b strings.Builder
b.WriteString(fmtNullFloat(lat))
b.WriteByte(',')
b.WriteString(fmtNullFloat(lon))
b.WriteByte(',')
b.WriteString(deviceID)
b.WriteByte(',')
b.WriteByte(',') // station_name
b.WriteString(dateTimeStr)
b.WriteByte(',')
b.WriteString(fmtNullFloat(elev))
b.WriteByte(',')
b.WriteString(pressureStr)
b.WriteByte(',')
b.WriteString(tempStr)
b.WriteByte(',')
b.WriteByte(',') // dewpoint
b.WriteString(wsStr)
b.WriteByte(',')
b.WriteString(wdStr)
b.WriteByte(',')
b.WriteString(rhStr)
b.WriteByte(',')
b.WriteString(ztdStr)
b.WriteByte(',') // pwv
b.WriteByte('\n')
if _, err := w.WriteString(b.String()); err != nil {
log.Printf("write csv failed: %v", err)
}
}
if err := rows.Err(); err != nil {
return err
}
// Second: RADAR stations -> latest radar_weather by station_alias
if err := exportRadarStations(ctx, pg, loc, bucketEnd, mainPath); err != nil {
return err
}
return nil
}
// exportRadarStations appends rows for device_type='RADAR' using latest radar_weather by alias.
func exportRadarStations(ctx context.Context, pg *sql.DB, loc *time.Location, bucketEnd time.Time, mainPath string) error {
// Load RADAR stations
stRows, err := pg.QueryContext(ctx, `
SELECT name, latitude, longitude, altitude, station_alias
FROM stations
WHERE device_type = 'RADAR'
AND latitude IS NOT NULL AND longitude IS NOT NULL
`)
if err != nil {
return fmt.Errorf("query RADAR stations failed: %w", err)
}
defer stRows.Close()
f, err := os.OpenFile(mainPath, os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return err
}
defer f.Close()
w := bufio.NewWriter(f)
defer w.Flush()
dateTimeStr := bucketEnd.In(loc).Format("2006-01-02 15:04:05")
for stRows.Next() {
var (
name string
lat, lon, elev sql.NullFloat64
alias sql.NullString
)
if err := stRows.Scan(&name, &lat, &lon, &elev, &alias); err != nil {
log.Printf("scan RADAR station failed: %v", err)
continue
}
if !alias.Valid || strings.TrimSpace(alias.String) == "" {
continue
}
// Latest radar_weather for this alias, not later than bucketEnd
var (
rwTemp, rwHum, rwWS, rwWD, rwP sql.NullFloat64
rwDT time.Time
)
err := pg.QueryRowContext(ctx, `
SELECT temperature, humidity, wind_speed, wind_direction, pressure, dt
FROM radar_weather
WHERE alias = $1 AND dt <= $2
ORDER BY dt DESC
LIMIT 1
`, alias.String, bucketEnd).Scan(&rwTemp, &rwHum, &rwWS, &rwWD, &rwP, &rwDT)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
log.Printf("query radar_weather failed: alias=%s err=%v", alias.String, err)
}
continue
}
// Map fields
pressureStr := ""
if rwP.Valid {
// Convert: DB value / 100.0
pressureStr = fmtFloat(rwP.Float64/100.0, 2)
}
tempStr := ""
if rwTemp.Valid {
tempStr = fmtFloat(rwTemp.Float64, -1)
}
wsStr := ""
if rwWS.Valid {
wsStr = fmtFloat(rwWS.Float64, 3)
}
wdStr := ""
if rwWD.Valid {
wdStr = fmtFloat(rwWD.Float64, 0)
}
rhStr := ""
if rwHum.Valid {
rhStr = fmtFloat(rwHum.Float64, -1)
}
var b strings.Builder
b.WriteString(fmtNullFloat(lat))
b.WriteByte(',')
b.WriteString(fmtNullFloat(lon))
b.WriteByte(',')
// station_id = stations.name
b.WriteString(name)
b.WriteByte(',')
// station_name = stations.name
b.WriteString(name)
b.WriteByte(',')
b.WriteString(dateTimeStr)
b.WriteByte(',')
b.WriteString(fmtNullFloat(elev))
b.WriteByte(',')
b.WriteString(pressureStr)
b.WriteByte(',')
b.WriteString(tempStr)
b.WriteByte(',')
b.WriteByte(',') // dewpoint blank
b.WriteString(wsStr)
b.WriteByte(',')
b.WriteString(wdStr)
b.WriteByte(',')
b.WriteString(rhStr)
b.WriteByte(',')
b.WriteByte(',') // ztd blank
b.WriteByte(',') // pwv blank
b.WriteByte('\n')
if _, err := w.WriteString(b.String()); err != nil {
log.Printf("write RADAR csv failed: %v", err)
}
}
return stRows.Err()
}
func lookupZTD(ctx context.Context, my *sql.DB, deviceID string, bucketEnd time.Time) string {
if my == nil {
return ""
}
var ztd sql.NullFloat64
var ts time.Time
err := my.QueryRowContext(ctx, `
SELECT ztd, timestamp FROM rtk_data
WHERE station_id = ?
AND ABS(TIMESTAMPDIFF(MINUTE, timestamp, ?)) <= 5
LIMIT 1
`, deviceID, bucketEnd).Scan(&ztd, &ts)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
log.Printf("lookup ZTD failed: station=%s err=%v", deviceID, err)
}
return ""
}
if !ztd.Valid {
return ""
}
// Exported as ztd*100 (to match existing exporter behavior)
return fmtFloat(ztd.Float64*100.0, -1)
}
// fetchCaiyunRealtimeWind returns speed (m/s) and direction (deg)
func fetchCaiyunRealtimeWind(ctx context.Context, token string, lat, lon float64) (float64, float64, bool) {
type realtimeResp struct {
Status string `json:"status"`
Unit string `json:"unit"`
Result struct {
Realtime struct {
Status string `json:"status"`
Wind struct {
Speed float64 `json:"speed"`
Direction float64 `json:"direction"`
} `json:"wind"`
} `json:"realtime"`
} `json:"result"`
}
url := fmt.Sprintf("https://api.caiyunapp.com/v2.6/%s/%f,%f/realtime?unit=SI", token, lon, lat)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return 0, 0, false
}
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return 0, 0, false
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
return 0, 0, false
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, 0, false
}
var data realtimeResp
if err := json.Unmarshal(body, &data); err != nil {
return 0, 0, false
}
if strings.ToLower(data.Status) != "ok" || strings.ToLower(data.Result.Realtime.Status) != "ok" {
return 0, 0, false
}
spd := data.Result.Realtime.Wind.Speed
dirRad := data.Result.Realtime.Wind.Direction
dirDeg := dirRad * 180.0 / math.Pi
for dirDeg < 0 {
dirDeg += 360
}
for dirDeg >= 360 {
dirDeg -= 360
}
return spd, dirDeg, true
}
// enforceRetention keeps only rows with date_time >= now-keepHours in mainPath,
// moving older rows to history files grouped by UTC date (CSV header ensured).
func enforceRetention(mainPath, histDir string, loc *time.Location, keepHours int) error {
cutoff := time.Now().In(loc).Add(-time.Duration(keepHours) * time.Hour)
f, err := os.Open(mainPath)
if err != nil {
return err
}
defer f.Close()
r := csv.NewReader(f)
r.FieldsPerRecord = -1
// Read header
records, err := r.ReadAll()
if err != nil {
return err
}
if len(records) == 0 {
return nil
}
// Prepare buffers
header := records[0]
var keep [][]string
keep = append(keep, header)
// History writers cache per day
writers := map[string]*csv.Writer{}
files := map[string]*os.File{}
ensureWriter := func(day string) (*csv.Writer, error) {
if w, ok := writers[day]; ok {
return w, nil
}
// history file path
histPath := filepath.Join(histDir, fmt.Sprintf("weather_data_%s.csv", day))
needHeader := ensureFileWithHeader(histPath)
hf, err := os.OpenFile(histPath, os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return nil, err
}
hw := csv.NewWriter(hf)
if needHeader {
if err := hw.Write(header); err != nil {
_ = hf.Close()
return nil, err
}
hw.Flush()
}
writers[day] = hw
files[day] = hf
return hw, nil
}
for i := 1; i < len(records); i++ {
rec := records[i]
if len(rec) < 5 {
continue
}
dtStr := strings.TrimSpace(rec[4])
dt, err := time.ParseInLocation("2006-01-02 15:04:05", dtStr, loc)
if err != nil {
// keep malformed lines to avoid data loss
keep = append(keep, rec)
continue
}
if !dt.Before(cutoff) {
keep = append(keep, rec)
continue
}
// Move to history file by UTC day of dt
day := dt.UTC().Format("2006-01-02")
w, err := ensureWriter(day)
if err != nil {
return err
}
if err := w.Write(rec); err != nil {
return err
}
}
// Flush & close writers
for _, w := range writers {
w.Flush()
}
for _, f := range files {
_ = f.Close()
}
// Rewrite main.csv with kept rows
tmp := mainPath + ".part"
outf, err := os.OpenFile(tmp, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
if err != nil {
return err
}
cw := csv.NewWriter(outf)
if err := cw.WriteAll(keep); err != nil {
_ = outf.Close()
return err
}
cw.Flush()
if err := outf.Close(); err != nil {
return err
}
return os.Rename(tmp, mainPath)
}
// removeBucketFromMain removes all rows with date_time == bucketEnd from main CSV.
func removeBucketFromMain(mainPath string, loc *time.Location, bucketEnd time.Time) error {
dtStr := bucketEnd.In(loc).Format("2006-01-02 15:04:05")
// Fast path: if file small, rewrite; otherwise stream
in, err := os.Open(mainPath)
if err != nil {
return err
}
defer in.Close()
r := csv.NewReader(in)
r.FieldsPerRecord = -1
recs, err := r.ReadAll()
if err != nil {
return err
}
if len(recs) == 0 {
return nil
}
out := make([][]string, 0, len(recs))
out = append(out, recs[0]) // header
for i := 1; i < len(recs); i++ {
rec := recs[i]
if len(rec) < 5 {
out = append(out, rec)
continue
}
if strings.TrimSpace(rec[4]) == dtStr {
continue // drop
}
out = append(out, rec)
}
tmp := mainPath + ".part"
outf, err := os.OpenFile(tmp, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
if err != nil {
return err
}
cw := csv.NewWriter(outf)
if err := cw.WriteAll(out); err != nil {
_ = outf.Close()
return err
}
cw.Flush()
if err := outf.Close(); err != nil {
return err
}
return os.Rename(tmp, mainPath)
}
func ensureFileWithHeader(path string) bool {
if _, err := os.Stat(path); err == nil {
return false
}
_ = os.MkdirAll(filepath.Dir(path), 0o755)
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
log.Printf("create csv failed: %v", err)
return false
}
// Write header
if _, err := f.WriteString(csvHeader + "\n"); err != nil {
_ = f.Close()
return false
}
_ = f.Close()
return true
}
func alignToNextBucketEnd(t time.Time, minutes int) time.Time {
m := t.Minute()
next := (m/minutes + 1) * minutes
dt := time.Duration(next-m) * time.Minute
return t.Truncate(time.Minute).Add(dt).Truncate(time.Minute)
}
func alignToPrevBucketEnd(t time.Time, minutes int) time.Time {
m := t.Minute()
prev := (m / minutes) * minutes
return t.Truncate(time.Minute).Add(time.Duration(prev-m) * time.Minute)
}
func fmtNullFloat(v sql.NullFloat64) string {
if v.Valid {
return fmtFloat(v.Float64, -1)
}
return ""
}
// fmtFloat: prec < 0 -> trim trailing zeros
func fmtFloat(fv float64, prec int) string {
if prec >= 0 {
return fmt.Sprintf("%.*f", prec, fv)
}
s := fmt.Sprintf("%.10f", fv)
s = strings.TrimRight(s, "0")
s = strings.TrimRight(s, ".")
if s == "-0" {
s = "0"
}
if s == "" || s == "-" || s == "+" || s == "." {
return "0"
}
if math.Abs(fv) < 1e-9 {
return "0"
}
return s
}
func isTruthy(s string) bool {
switch strings.ToLower(strings.TrimSpace(s)) {
case "1", "true", "yes", "on":
return true
default:
return false
}
}
func getenvDefault(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
func mustMkdirAll(dir string) {
if err := os.MkdirAll(dir, 0o755); err != nil {
log.Fatalf("mkdir %s: %v", dir, err)
}
}
func tf(t time.Time) string { return t.Format("2006-01-02 15:04:05") }