package tools import ( "context" "database/sql" "fmt" "math" "time" "weatherstation/internal/database" ) type BackfillOptions struct { StationID string // 为空则处理所有站点 FromTime time.Time // 含 ToTime time.Time // 含 WrapCycleMM float64 // 设备累计回绕一圈对应的毫米值;<=0 则按“回绕后仅记当次值”降级处理 BucketMinutes int // 默认10 } // RunBackfill10Min 将 rs485_weather_data 的16秒数据汇总写入 rs485_weather_10min func RunBackfill10Min(ctx context.Context, opts BackfillOptions) error { if opts.BucketMinutes <= 0 { opts.BucketMinutes = 10 } db := database.GetDB() // 取时序数据 query := ` SELECT station_id, timestamp, temperature, humidity, wind_speed, wind_direction, rainfall, light, uv, pressure FROM rs485_weather_data WHERE timestamp >= $1 AND timestamp <= $2 %s ORDER BY station_id, timestamp` stationFilter := "" args := []any{opts.FromTime, opts.ToTime} if opts.StationID != "" { stationFilter = "AND station_id = $3" args = append(args, opts.StationID) } rows, err := db.QueryContext(ctx, fmt.Sprintf(query, stationFilter), args...) if err != nil { return fmt.Errorf("query raw failed: %w", err) } defer rows.Close() // 载入上海时区用于分桶 loc, _ := time.LoadLocation("Asia/Shanghai") if loc == nil { loc = time.FixedZone("CST", 8*3600) } type agg struct { // sums sumTemp float64 sumHum float64 sumWS float64 sumLight float64 sumUV float64 sumP float64 sinSum float64 cosSum float64 gustMax float64 rainIncSum float64 count int lastTotal float64 // 桶末累计 lastTS time.Time } currentStation := "" var prevTotal float64 var prevTS time.Time buckets := make(map[string]map[time.Time]*agg) // station -> bucketStart -> agg for rows.Next() { var ( stationID string ts time.Time t, h, ws, wd, rf, light, uv, p sql.NullFloat64 ) if err := rows.Scan(&stationID, &ts, &t, &h, &ws, &wd, &rf, &light, &uv, &p); err != nil { return fmt.Errorf("scan failed: %w", err) } // 切换设备时重置 prevTotal if stationID != currentStation { currentStation = stationID prevTotal = math.NaN() prevTS = time.Time{} } // 计算该样本所在桶(CST对齐) // 改为左开右闭 (left-open, right-closed):恰好落在边界的样本归入“结束于该边界”的桶 localTs := ts.In(loc) floor := localTs.Truncate(time.Duration(opts.BucketMinutes) * time.Minute) bucketStart := floor if localTs.Equal(floor) { bucketStart = floor.Add(-time.Duration(opts.BucketMinutes) * time.Minute) } bucketStart = time.Date(bucketStart.Year(), bucketStart.Month(), bucketStart.Day(), bucketStart.Hour(), bucketStart.Minute(), 0, 0, loc) if _, ok := buckets[stationID]; !ok { buckets[stationID] = make(map[time.Time]*agg) } ag := buckets[stationID][bucketStart] if ag == nil { ag = &agg{gustMax: -1} buckets[stationID][bucketStart] = ag } // 累加平均项 if t.Valid { ag.sumTemp += t.Float64 } if h.Valid { ag.sumHum += h.Float64 } if ws.Valid { ag.sumWS += ws.Float64 if ws.Float64 > ag.gustMax { ag.gustMax = ws.Float64 } } if light.Valid { ag.sumLight += light.Float64 } if uv.Valid { ag.sumUV += uv.Float64 } if p.Valid { ag.sumP += p.Float64 } if wd.Valid { rad := wd.Float64 * math.Pi / 180.0 ag.sinSum += math.Sin(rad) ag.cosSum += math.Cos(rad) } ag.count++ // 雨量增量:按时间比例切分到跨越的各个桶,避免边界全部被计入后一桶 if rf.Valid { curr := rf.Float64 // 若该站点的上一条样本未知(窗口首条),尝试读取窗口前一条样本作为种子,避免首桶丢雨 if math.IsNaN(prevTotal) || prevTS.IsZero() { var seedTS time.Time var seedTotal sql.NullFloat64 if err := db.QueryRowContext(ctx, ` SELECT timestamp, rainfall FROM rs485_weather_data WHERE station_id = $1 AND timestamp < $2 ORDER BY timestamp DESC LIMIT 1 `, stationID, ts).Scan(&seedTS, &seedTotal); err == nil && seedTotal.Valid { prevTotal = seedTotal.Float64 prevTS = seedTS } } if !math.IsNaN(prevTotal) && !prevTS.IsZero() { // 计算增量(带回绕) inc := 0.0 if curr >= prevTotal { inc = curr - prevTotal } else { if opts.WrapCycleMM > 0 { inc = (opts.WrapCycleMM - prevTotal) + curr } else { // 降级:仅计当前值 inc = curr } } // 将 [prevTS, ts] 区间按10分钟边界切分,按时长比例分配增量 startLocal := prevTS.In(loc) endLocal := ts.In(loc) if endLocal.After(startLocal) && inc > 0 { totalSec := endLocal.Sub(startLocal).Seconds() segStart := startLocal for segStart.Before(endLocal) { segBucketStart := segStart.Truncate(time.Duration(opts.BucketMinutes) * time.Minute) segBucketEnd := segBucketStart.Add(time.Duration(opts.BucketMinutes) * time.Minute) segEnd := endLocal if segBucketEnd.Before(segEnd) { segEnd = segBucketEnd } portion := inc * (segEnd.Sub(segStart).Seconds() / totalSec) // 确保段对应桶存在 if _, ok := buckets[stationID][segBucketStart]; !ok { buckets[stationID][segBucketStart] = &agg{gustMax: -1} } buckets[stationID][segBucketStart].rainIncSum += portion segStart = segEnd } } } // 记录桶末累计到当前样本所在桶 ag.lastTotal = curr prevTotal = curr prevTS = ts } } if err := rows.Err(); err != nil { return fmt.Errorf("iterate rows failed: %w", err) } // 写入/更新到 10 分钟表 upsert := ` INSERT INTO rs485_weather_10min ( station_id, bucket_start, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, rain_10m_mm_x1000, rain_total_mm_x1000, solar_wm2_x100, uv_index, pressure_hpa_x100, sample_count ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 ) ON CONFLICT (station_id, bucket_start) DO UPDATE SET -- 仅当新聚合样本数不小于已有样本数时才用新值覆盖均值类字段,避免回退 temp_c_x100 = CASE WHEN EXCLUDED.sample_count >= rs485_weather_10min.sample_count THEN EXCLUDED.temp_c_x100 ELSE rs485_weather_10min.temp_c_x100 END, humidity_pct = CASE WHEN EXCLUDED.sample_count >= rs485_weather_10min.sample_count THEN EXCLUDED.humidity_pct ELSE rs485_weather_10min.humidity_pct END, wind_speed_ms_x1000 = CASE WHEN EXCLUDED.sample_count >= rs485_weather_10min.sample_count THEN EXCLUDED.wind_speed_ms_x1000 ELSE rs485_weather_10min.wind_speed_ms_x1000 END, wind_gust_ms_x1000 = GREATEST(rs485_weather_10min.wind_gust_ms_x1000, EXCLUDED.wind_gust_ms_x1000), wind_dir_deg = CASE WHEN EXCLUDED.sample_count >= rs485_weather_10min.sample_count THEN EXCLUDED.wind_dir_deg ELSE rs485_weather_10min.wind_dir_deg END, rain_10m_mm_x1000 = GREATEST(rs485_weather_10min.rain_10m_mm_x1000, EXCLUDED.rain_10m_mm_x1000), rain_total_mm_x1000 = GREATEST(rs485_weather_10min.rain_total_mm_x1000, EXCLUDED.rain_total_mm_x1000), solar_wm2_x100 = CASE WHEN EXCLUDED.sample_count >= rs485_weather_10min.sample_count THEN EXCLUDED.solar_wm2_x100 ELSE rs485_weather_10min.solar_wm2_x100 END, uv_index = CASE WHEN EXCLUDED.sample_count >= rs485_weather_10min.sample_count THEN EXCLUDED.uv_index ELSE rs485_weather_10min.uv_index END, pressure_hpa_x100 = CASE WHEN EXCLUDED.sample_count >= rs485_weather_10min.sample_count THEN EXCLUDED.pressure_hpa_x100 ELSE rs485_weather_10min.pressure_hpa_x100 END, sample_count = GREATEST(rs485_weather_10min.sample_count, EXCLUDED.sample_count)` for stationID, m := range buckets { for bucketStart, ag := range m { if ag.count == 0 { continue } avgTemp := ag.sumTemp / float64(ag.count) avgHum := ag.sumHum / float64(ag.count) avgWS := ag.sumWS / float64(ag.count) avgLight := ag.sumLight / float64(ag.count) avgUV := ag.sumUV / float64(ag.count) avgP := ag.sumP / float64(ag.count) // 风向向量平均 windDir := 0.0 if ag.sinSum != 0 || ag.cosSum != 0 { ang := math.Atan2(ag.sinSum/float64(ag.count), ag.cosSum/float64(ag.count)) * 180 / math.Pi if ang < 0 { ang += 360 } windDir = ang } // 缩放整数 tempScaled := int(math.Round(avgTemp * 100)) humScaled := int(math.Round(avgHum)) wsScaled := int(math.Round(avgWS * 1000)) gustScaled := int(math.Round(ag.gustMax * 1000)) wdScaled := int(math.Round(windDir)) rain10mScaled := int(math.Round(ag.rainIncSum * 1000)) rainTotalScaled := int(math.Round(ag.lastTotal * 1000)) solarScaled := int(math.Round(avgLight * 100)) uvScaled := int(math.Round(avgUV)) pScaled := int(math.Round(avgP * 100)) if _, err := db.ExecContext(ctx, upsert, stationID, bucketStart, tempScaled, humScaled, wsScaled, gustScaled, wdScaled, rain10mScaled, rainTotalScaled, solarScaled, uvScaled, pScaled, ag.count, ); err != nil { return fmt.Errorf("upsert 10min failed: %w", err) } } } return nil }