258 lines
7.1 KiB
Go
258 lines
7.1 KiB
Go
package tools
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"fmt"
|
||
"math"
|
||
"time"
|
||
|
||
"weatherstation/internal/database"
|
||
)
|
||
|
||
type BackfillOptions struct {
|
||
StationID string // 为空则处理所有站点
|
||
FromTime time.Time // 含
|
||
ToTime time.Time // 含
|
||
WrapCycleMM float64 // 设备累计回绕一圈对应的毫米值;<=0 则按“回绕后仅记当次值”降级处理
|
||
BucketMinutes int // 默认10
|
||
}
|
||
|
||
// RunBackfill10Min 将 rs485_weather_data 的16秒数据汇总写入 rs485_weather_10min
|
||
func RunBackfill10Min(ctx context.Context, opts BackfillOptions) error {
|
||
if opts.BucketMinutes <= 0 {
|
||
opts.BucketMinutes = 10
|
||
}
|
||
|
||
db := database.GetDB()
|
||
// 取时序数据
|
||
query := `
|
||
SELECT station_id, timestamp, temperature, humidity, wind_speed, wind_direction,
|
||
rainfall, light, uv, pressure
|
||
FROM rs485_weather_data
|
||
WHERE timestamp >= $1 AND timestamp <= $2
|
||
%s
|
||
ORDER BY station_id, timestamp`
|
||
|
||
stationFilter := ""
|
||
args := []any{opts.FromTime, opts.ToTime}
|
||
if opts.StationID != "" {
|
||
stationFilter = "AND station_id = $3"
|
||
args = append(args, opts.StationID)
|
||
}
|
||
|
||
rows, err := db.QueryContext(ctx, fmt.Sprintf(query, stationFilter), args...)
|
||
if err != nil {
|
||
return fmt.Errorf("query raw failed: %w", err)
|
||
}
|
||
defer rows.Close()
|
||
|
||
// 载入上海时区用于分桶
|
||
loc, _ := time.LoadLocation("Asia/Shanghai")
|
||
if loc == nil {
|
||
loc = time.FixedZone("CST", 8*3600)
|
||
}
|
||
|
||
type agg struct {
|
||
// sums
|
||
sumTemp float64
|
||
sumHum float64
|
||
sumWS float64
|
||
sumLight float64
|
||
sumUV float64
|
||
sumP float64
|
||
sinSum float64
|
||
cosSum float64
|
||
gustMax float64
|
||
rainIncSum float64
|
||
count int
|
||
lastTotal float64 // 桶末累计
|
||
lastTS time.Time
|
||
}
|
||
|
||
currentStation := ""
|
||
var prevTotal float64
|
||
var prevTS time.Time
|
||
|
||
buckets := make(map[string]map[time.Time]*agg) // station -> bucketStart -> agg
|
||
|
||
for rows.Next() {
|
||
var (
|
||
stationID string
|
||
ts time.Time
|
||
t, h, ws, wd, rf, light, uv, p sql.NullFloat64
|
||
)
|
||
if err := rows.Scan(&stationID, &ts, &t, &h, &ws, &wd, &rf, &light, &uv, &p); err != nil {
|
||
return fmt.Errorf("scan failed: %w", err)
|
||
}
|
||
|
||
// 切换设备时重置 prevTotal
|
||
if stationID != currentStation {
|
||
currentStation = stationID
|
||
prevTotal = math.NaN()
|
||
prevTS = time.Time{}
|
||
}
|
||
|
||
// 计算该样本所在桶(CST对齐)
|
||
bucketLocal := ts.In(loc).Truncate(time.Duration(opts.BucketMinutes) * time.Minute)
|
||
bucketStart := time.Date(bucketLocal.Year(), bucketLocal.Month(), bucketLocal.Day(), bucketLocal.Hour(), bucketLocal.Minute(), 0, 0, loc)
|
||
|
||
if _, ok := buckets[stationID]; !ok {
|
||
buckets[stationID] = make(map[time.Time]*agg)
|
||
}
|
||
ag := buckets[stationID][bucketStart]
|
||
if ag == nil {
|
||
ag = &agg{gustMax: -1}
|
||
buckets[stationID][bucketStart] = ag
|
||
}
|
||
|
||
// 累加平均项
|
||
if t.Valid {
|
||
ag.sumTemp += t.Float64
|
||
}
|
||
if h.Valid {
|
||
ag.sumHum += h.Float64
|
||
}
|
||
if ws.Valid {
|
||
ag.sumWS += ws.Float64
|
||
if ws.Float64 > ag.gustMax {
|
||
ag.gustMax = ws.Float64
|
||
}
|
||
}
|
||
if light.Valid {
|
||
ag.sumLight += light.Float64
|
||
}
|
||
if uv.Valid {
|
||
ag.sumUV += uv.Float64
|
||
}
|
||
if p.Valid {
|
||
ag.sumP += p.Float64
|
||
}
|
||
if wd.Valid {
|
||
rad := wd.Float64 * math.Pi / 180.0
|
||
ag.sinSum += math.Sin(rad)
|
||
ag.cosSum += math.Cos(rad)
|
||
}
|
||
ag.count++
|
||
|
||
// 雨量增量:按时间比例切分到跨越的各个桶,避免边界全部被计入后一桶
|
||
if rf.Valid {
|
||
curr := rf.Float64
|
||
if !math.IsNaN(prevTotal) && !prevTS.IsZero() {
|
||
// 计算增量(带回绕)
|
||
inc := 0.0
|
||
if curr >= prevTotal {
|
||
inc = curr - prevTotal
|
||
} else {
|
||
if opts.WrapCycleMM > 0 {
|
||
inc = (opts.WrapCycleMM - prevTotal) + curr
|
||
} else {
|
||
// 降级:仅计当前值
|
||
inc = curr
|
||
}
|
||
}
|
||
|
||
// 将 [prevTS, ts] 区间按10分钟边界切分,按时长比例分配增量
|
||
startLocal := prevTS.In(loc)
|
||
endLocal := ts.In(loc)
|
||
if endLocal.After(startLocal) && inc > 0 {
|
||
totalSec := endLocal.Sub(startLocal).Seconds()
|
||
segStart := startLocal
|
||
for segStart.Before(endLocal) {
|
||
segBucketStart := segStart.Truncate(time.Duration(opts.BucketMinutes) * time.Minute)
|
||
segBucketEnd := segBucketStart.Add(time.Duration(opts.BucketMinutes) * time.Minute)
|
||
segEnd := endLocal
|
||
if segBucketEnd.Before(segEnd) {
|
||
segEnd = segBucketEnd
|
||
}
|
||
portion := inc * (segEnd.Sub(segStart).Seconds() / totalSec)
|
||
// 确保段对应桶存在
|
||
if _, ok := buckets[stationID][segBucketStart]; !ok {
|
||
buckets[stationID][segBucketStart] = &agg{gustMax: -1}
|
||
}
|
||
buckets[stationID][segBucketStart].rainIncSum += portion
|
||
segStart = segEnd
|
||
}
|
||
}
|
||
}
|
||
// 记录桶末累计到当前样本所在桶
|
||
ag.lastTotal = curr
|
||
prevTotal = curr
|
||
prevTS = ts
|
||
}
|
||
}
|
||
if err := rows.Err(); err != nil {
|
||
return fmt.Errorf("iterate rows failed: %w", err)
|
||
}
|
||
|
||
// 写入/更新到 10 分钟表
|
||
upsert := `
|
||
INSERT INTO rs485_weather_10min (
|
||
station_id, bucket_start,
|
||
temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000,
|
||
wind_dir_deg, rain_10m_mm_x1000, rain_total_mm_x1000,
|
||
solar_wm2_x100, uv_index, pressure_hpa_x100, sample_count
|
||
) VALUES (
|
||
$1, $2, $3, $4, $5, $6,
|
||
$7, $8, $9,
|
||
$10, $11, $12, $13
|
||
) ON CONFLICT (station_id, bucket_start) DO UPDATE SET
|
||
temp_c_x100 = EXCLUDED.temp_c_x100,
|
||
humidity_pct = EXCLUDED.humidity_pct,
|
||
wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000,
|
||
wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000,
|
||
wind_dir_deg = EXCLUDED.wind_dir_deg,
|
||
rain_10m_mm_x1000 = EXCLUDED.rain_10m_mm_x1000,
|
||
rain_total_mm_x1000 = EXCLUDED.rain_total_mm_x1000,
|
||
solar_wm2_x100 = EXCLUDED.solar_wm2_x100,
|
||
uv_index = EXCLUDED.uv_index,
|
||
pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100,
|
||
sample_count = EXCLUDED.sample_count`
|
||
|
||
for stationID, m := range buckets {
|
||
for bucketStart, ag := range m {
|
||
if ag.count == 0 {
|
||
continue
|
||
}
|
||
avgTemp := ag.sumTemp / float64(ag.count)
|
||
avgHum := ag.sumHum / float64(ag.count)
|
||
avgWS := ag.sumWS / float64(ag.count)
|
||
avgLight := ag.sumLight / float64(ag.count)
|
||
avgUV := ag.sumUV / float64(ag.count)
|
||
avgP := ag.sumP / float64(ag.count)
|
||
// 风向向量平均
|
||
windDir := 0.0
|
||
if ag.sinSum != 0 || ag.cosSum != 0 {
|
||
ang := math.Atan2(ag.sinSum/float64(ag.count), ag.cosSum/float64(ag.count)) * 180 / math.Pi
|
||
if ang < 0 {
|
||
ang += 360
|
||
}
|
||
windDir = ang
|
||
}
|
||
|
||
// 缩放整数
|
||
tempScaled := int(math.Round(avgTemp * 100))
|
||
humScaled := int(math.Round(avgHum))
|
||
wsScaled := int(math.Round(avgWS * 1000))
|
||
gustScaled := int(math.Round(ag.gustMax * 1000))
|
||
wdScaled := int(math.Round(windDir))
|
||
rain10mScaled := int(math.Round(ag.rainIncSum * 1000))
|
||
rainTotalScaled := int(math.Round(ag.lastTotal * 1000))
|
||
solarScaled := int(math.Round(avgLight * 100))
|
||
uvScaled := int(math.Round(avgUV))
|
||
pScaled := int(math.Round(avgP * 100))
|
||
|
||
if _, err := db.ExecContext(ctx, upsert,
|
||
stationID, bucketStart,
|
||
tempScaled, humScaled, wsScaled, gustScaled,
|
||
wdScaled, rain10mScaled, rainTotalScaled,
|
||
solarScaled, uvScaled, pScaled, ag.count,
|
||
); err != nil {
|
||
return fmt.Errorf("upsert 10min failed: %w", err)
|
||
}
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|