2025-08-26 19:37:13 +08:00

276 lines
8.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package tools
import (
"context"
"database/sql"
"fmt"
"math"
"time"
"weatherstation/internal/database"
)
type BackfillOptions struct {
StationID string // 为空则处理所有站点
FromTime time.Time // 含
ToTime time.Time // 含
WrapCycleMM float64 // 设备累计回绕一圈对应的毫米值;<=0 则按“回绕后仅记当次值”降级处理
BucketMinutes int // 默认10
}
// RunBackfill10Min 将 rs485_weather_data 的16秒数据汇总写入 rs485_weather_10min
func RunBackfill10Min(ctx context.Context, opts BackfillOptions) error {
if opts.BucketMinutes <= 0 {
opts.BucketMinutes = 10
}
db := database.GetDB()
// 取时序数据
query := `
SELECT station_id, timestamp, temperature, humidity, wind_speed, wind_direction,
rainfall, light, uv, pressure
FROM rs485_weather_data
WHERE timestamp >= $1 AND timestamp <= $2
%s
ORDER BY station_id, timestamp`
stationFilter := ""
args := []any{opts.FromTime, opts.ToTime}
if opts.StationID != "" {
stationFilter = "AND station_id = $3"
args = append(args, opts.StationID)
}
rows, err := db.QueryContext(ctx, fmt.Sprintf(query, stationFilter), args...)
if err != nil {
return fmt.Errorf("query raw failed: %w", err)
}
defer rows.Close()
// 载入上海时区用于分桶
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
type agg struct {
// sums
sumTemp float64
sumHum float64
sumWS float64
sumLight float64
sumUV float64
sumP float64
sinSum float64
cosSum float64
gustMax float64
rainIncSum float64
count int
lastTotal float64 // 桶末累计
lastTS time.Time
}
currentStation := ""
var prevTotal float64
var prevTS time.Time
buckets := make(map[string]map[time.Time]*agg) // station -> bucketStart -> agg
for rows.Next() {
var (
stationID string
ts time.Time
t, h, ws, wd, rf, light, uv, p sql.NullFloat64
)
if err := rows.Scan(&stationID, &ts, &t, &h, &ws, &wd, &rf, &light, &uv, &p); err != nil {
return fmt.Errorf("scan failed: %w", err)
}
// 切换设备时重置 prevTotal
if stationID != currentStation {
currentStation = stationID
prevTotal = math.NaN()
prevTS = time.Time{}
}
// 计算该样本所在桶CST对齐
bucketLocal := ts.In(loc).Truncate(time.Duration(opts.BucketMinutes) * time.Minute)
bucketStart := time.Date(bucketLocal.Year(), bucketLocal.Month(), bucketLocal.Day(), bucketLocal.Hour(), bucketLocal.Minute(), 0, 0, loc)
if _, ok := buckets[stationID]; !ok {
buckets[stationID] = make(map[time.Time]*agg)
}
ag := buckets[stationID][bucketStart]
if ag == nil {
ag = &agg{gustMax: -1}
buckets[stationID][bucketStart] = ag
}
// 累加平均项
if t.Valid {
ag.sumTemp += t.Float64
}
if h.Valid {
ag.sumHum += h.Float64
}
if ws.Valid {
ag.sumWS += ws.Float64
if ws.Float64 > ag.gustMax {
ag.gustMax = ws.Float64
}
}
if light.Valid {
ag.sumLight += light.Float64
}
if uv.Valid {
ag.sumUV += uv.Float64
}
if p.Valid {
ag.sumP += p.Float64
}
if wd.Valid {
rad := wd.Float64 * math.Pi / 180.0
ag.sinSum += math.Sin(rad)
ag.cosSum += math.Cos(rad)
}
ag.count++
// 雨量增量:按时间比例切分到跨越的各个桶,避免边界全部被计入后一桶
if rf.Valid {
curr := rf.Float64
// 若该站点的上一条样本未知(窗口首条),尝试读取窗口前一条样本作为种子,避免首桶丢雨
if math.IsNaN(prevTotal) || prevTS.IsZero() {
var seedTS time.Time
var seedTotal sql.NullFloat64
if err := db.QueryRowContext(ctx, `
SELECT timestamp, rainfall
FROM rs485_weather_data
WHERE station_id = $1 AND timestamp < $2
ORDER BY timestamp DESC
LIMIT 1
`, stationID, ts).Scan(&seedTS, &seedTotal); err == nil && seedTotal.Valid {
prevTotal = seedTotal.Float64
prevTS = seedTS
}
}
if !math.IsNaN(prevTotal) && !prevTS.IsZero() {
// 计算增量(带回绕)
inc := 0.0
if curr >= prevTotal {
inc = curr - prevTotal
} else {
if opts.WrapCycleMM > 0 {
inc = (opts.WrapCycleMM - prevTotal) + curr
} else {
// 降级:仅计当前值
inc = curr
}
}
// 将 [prevTS, ts] 区间按10分钟边界切分按时长比例分配增量
startLocal := prevTS.In(loc)
endLocal := ts.In(loc)
if endLocal.After(startLocal) && inc > 0 {
totalSec := endLocal.Sub(startLocal).Seconds()
segStart := startLocal
for segStart.Before(endLocal) {
segBucketStart := segStart.Truncate(time.Duration(opts.BucketMinutes) * time.Minute)
segBucketEnd := segBucketStart.Add(time.Duration(opts.BucketMinutes) * time.Minute)
segEnd := endLocal
if segBucketEnd.Before(segEnd) {
segEnd = segBucketEnd
}
portion := inc * (segEnd.Sub(segStart).Seconds() / totalSec)
// 确保段对应桶存在
if _, ok := buckets[stationID][segBucketStart]; !ok {
buckets[stationID][segBucketStart] = &agg{gustMax: -1}
}
buckets[stationID][segBucketStart].rainIncSum += portion
segStart = segEnd
}
}
}
// 记录桶末累计到当前样本所在桶
ag.lastTotal = curr
prevTotal = curr
prevTS = ts
}
}
if err := rows.Err(); err != nil {
return fmt.Errorf("iterate rows failed: %w", err)
}
// 写入/更新到 10 分钟表
upsert := `
INSERT INTO rs485_weather_10min (
station_id, bucket_start,
temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000,
wind_dir_deg, rain_10m_mm_x1000, rain_total_mm_x1000,
solar_wm2_x100, uv_index, pressure_hpa_x100, sample_count
) VALUES (
$1, $2, $3, $4, $5, $6,
$7, $8, $9,
$10, $11, $12, $13
) ON CONFLICT (station_id, bucket_start) DO UPDATE SET
-- 仅当新聚合样本数不小于已有样本数时才用新值覆盖均值类字段,避免回退
temp_c_x100 = CASE WHEN EXCLUDED.sample_count >= rs485_weather_10min.sample_count THEN EXCLUDED.temp_c_x100 ELSE rs485_weather_10min.temp_c_x100 END,
humidity_pct = CASE WHEN EXCLUDED.sample_count >= rs485_weather_10min.sample_count THEN EXCLUDED.humidity_pct ELSE rs485_weather_10min.humidity_pct END,
wind_speed_ms_x1000 = CASE WHEN EXCLUDED.sample_count >= rs485_weather_10min.sample_count THEN EXCLUDED.wind_speed_ms_x1000 ELSE rs485_weather_10min.wind_speed_ms_x1000 END,
wind_gust_ms_x1000 = GREATEST(rs485_weather_10min.wind_gust_ms_x1000, EXCLUDED.wind_gust_ms_x1000),
wind_dir_deg = CASE WHEN EXCLUDED.sample_count >= rs485_weather_10min.sample_count THEN EXCLUDED.wind_dir_deg ELSE rs485_weather_10min.wind_dir_deg END,
rain_10m_mm_x1000 = GREATEST(rs485_weather_10min.rain_10m_mm_x1000, EXCLUDED.rain_10m_mm_x1000),
rain_total_mm_x1000 = GREATEST(rs485_weather_10min.rain_total_mm_x1000, EXCLUDED.rain_total_mm_x1000),
solar_wm2_x100 = CASE WHEN EXCLUDED.sample_count >= rs485_weather_10min.sample_count THEN EXCLUDED.solar_wm2_x100 ELSE rs485_weather_10min.solar_wm2_x100 END,
uv_index = CASE WHEN EXCLUDED.sample_count >= rs485_weather_10min.sample_count THEN EXCLUDED.uv_index ELSE rs485_weather_10min.uv_index END,
pressure_hpa_x100 = CASE WHEN EXCLUDED.sample_count >= rs485_weather_10min.sample_count THEN EXCLUDED.pressure_hpa_x100 ELSE rs485_weather_10min.pressure_hpa_x100 END,
sample_count = GREATEST(rs485_weather_10min.sample_count, EXCLUDED.sample_count)`
for stationID, m := range buckets {
for bucketStart, ag := range m {
if ag.count == 0 {
continue
}
avgTemp := ag.sumTemp / float64(ag.count)
avgHum := ag.sumHum / float64(ag.count)
avgWS := ag.sumWS / float64(ag.count)
avgLight := ag.sumLight / float64(ag.count)
avgUV := ag.sumUV / float64(ag.count)
avgP := ag.sumP / float64(ag.count)
// 风向向量平均
windDir := 0.0
if ag.sinSum != 0 || ag.cosSum != 0 {
ang := math.Atan2(ag.sinSum/float64(ag.count), ag.cosSum/float64(ag.count)) * 180 / math.Pi
if ang < 0 {
ang += 360
}
windDir = ang
}
// 缩放整数
tempScaled := int(math.Round(avgTemp * 100))
humScaled := int(math.Round(avgHum))
wsScaled := int(math.Round(avgWS * 1000))
gustScaled := int(math.Round(ag.gustMax * 1000))
wdScaled := int(math.Round(windDir))
rain10mScaled := int(math.Round(ag.rainIncSum * 1000))
rainTotalScaled := int(math.Round(ag.lastTotal * 1000))
solarScaled := int(math.Round(avgLight * 100))
uvScaled := int(math.Round(avgUV))
pScaled := int(math.Round(avgP * 100))
if _, err := db.ExecContext(ctx, upsert,
stationID, bucketStart,
tempScaled, humScaled, wsScaled, gustScaled,
wdScaled, rain10mScaled, rainTotalScaled,
solarScaled, uvScaled, pScaled, ag.count,
); err != nil {
return fmt.Errorf("upsert 10min failed: %w", err)
}
}
}
return nil
}