225 lines
5.5 KiB
Go
225 lines
5.5 KiB
Go
package main
|
||
|
||
import (
|
||
"database/sql"
|
||
"fmt"
|
||
"time"
|
||
)
|
||
|
||
// 定期更新小时雨量统计
|
||
func startHourlyRainfallUpdater() {
|
||
// 确保小时雨量表存在
|
||
ensureHourlyRainfallTable()
|
||
|
||
// 更新历史数据
|
||
updateHistoricalHourlyRainfall()
|
||
|
||
// 启动定时更新
|
||
go periodicHourlyRainfallUpdate()
|
||
}
|
||
|
||
// 确保小时雨量表存在
|
||
func ensureHourlyRainfallTable() {
|
||
_, err := db.Exec(`
|
||
CREATE TABLE IF NOT EXISTS hourly_rainfall (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
hour_start DATETIME NOT NULL,
|
||
hour_end DATETIME NOT NULL,
|
||
rainfall INTEGER NOT NULL,
|
||
min_value INTEGER,
|
||
max_value INTEGER,
|
||
samples INTEGER,
|
||
created_at DATETIME NOT NULL,
|
||
UNIQUE(hour_start)
|
||
)
|
||
`)
|
||
|
||
if err != nil {
|
||
logger.Printf("创建小时雨量表错误: %v", err)
|
||
}
|
||
}
|
||
|
||
// 更新历史小时雨量数据(从2025-04-21 18:00:00开始)
|
||
func updateHistoricalHourlyRainfall() {
|
||
logger.Printf("开始更新历史小时雨量数据...")
|
||
|
||
// 查询现有的小时数据
|
||
rows, err := db.Query(`
|
||
SELECT
|
||
datetime(strftime('%Y-%m-%d %H:00:00', timestamp)) as hour_start,
|
||
datetime(strftime('%Y-%m-%d %H:59:59', timestamp)) as hour_end,
|
||
MIN(optical_rain) as min_value,
|
||
MAX(optical_rain) as max_value,
|
||
COUNT(*) as samples,
|
||
MAX(optical_rain) - MIN(optical_rain) as rainfall
|
||
FROM sensor_data
|
||
WHERE timestamp >= '2025-04-21 18:00:00'
|
||
GROUP BY strftime('%Y-%m-%d %H', timestamp)
|
||
ORDER BY hour_start
|
||
`)
|
||
|
||
if err != nil {
|
||
logger.Printf("查询历史数据错误: %v", err)
|
||
return
|
||
}
|
||
defer rows.Close()
|
||
|
||
// 准备插入语句
|
||
stmt, err := db.Prepare(`
|
||
INSERT OR IGNORE INTO hourly_rainfall
|
||
(hour_start, hour_end, rainfall, min_value, max_value, samples, created_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, datetime('now'))
|
||
`)
|
||
|
||
if err != nil {
|
||
logger.Printf("准备插入语句错误: %v", err)
|
||
return
|
||
}
|
||
defer stmt.Close()
|
||
|
||
// 插入数据
|
||
count := 0
|
||
for rows.Next() {
|
||
var hourStart, hourEnd string
|
||
var minValue, maxValue, samples, rainfall int
|
||
|
||
err := rows.Scan(&hourStart, &hourEnd, &minValue, &maxValue, &samples, &rainfall)
|
||
if err != nil {
|
||
logger.Printf("读取行数据错误: %v", err)
|
||
continue
|
||
}
|
||
|
||
_, err = stmt.Exec(hourStart, hourEnd, rainfall, minValue, maxValue, samples)
|
||
if err != nil {
|
||
logger.Printf("插入数据错误: %v", err)
|
||
continue
|
||
}
|
||
|
||
count++
|
||
}
|
||
|
||
logger.Printf("历史小时雨量数据更新完成,共插入 %d 条记录", count)
|
||
}
|
||
|
||
// 定期更新小时雨量
|
||
func periodicHourlyRainfallUpdate() {
|
||
// 计算下一个整点后5分钟的时间(给足够时间收集整点数据)
|
||
now := time.Now()
|
||
nextHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+1, 5, 0, 0, now.Location())
|
||
delay := nextHour.Sub(now)
|
||
|
||
// 首次等待到下一个整点后5分钟
|
||
time.Sleep(delay)
|
||
|
||
// 然后每小时更新一次
|
||
ticker := time.NewTicker(time.Hour)
|
||
for range ticker.C {
|
||
updateLastHourRainfall()
|
||
}
|
||
}
|
||
|
||
// 更新上一个小时的雨量数据
|
||
func updateLastHourRainfall() {
|
||
// 计算上一个小时的时间范围
|
||
now := time.Now()
|
||
lastHourEnd := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
|
||
lastHourStart := lastHourEnd.Add(-time.Hour)
|
||
|
||
// 查询这个小时的数据
|
||
var minValue, maxValue, samples, rainfall sql.NullInt64
|
||
|
||
err := db.QueryRow(`
|
||
SELECT
|
||
MIN(optical_rain),
|
||
MAX(optical_rain),
|
||
COUNT(*),
|
||
MAX(optical_rain) - MIN(optical_rain)
|
||
FROM sensor_data
|
||
WHERE timestamp >= ? AND timestamp < ?
|
||
`, lastHourStart.Format("2006-01-02 15:04:05"), lastHourEnd.Format("2006-01-02 15:04:05")).Scan(
|
||
&minValue, &maxValue, &samples, &rainfall)
|
||
|
||
if err != nil {
|
||
logger.Printf("查询上一小时数据错误: %v", err)
|
||
return
|
||
}
|
||
|
||
// 检查是否有足够的数据
|
||
if !samples.Valid || samples.Int64 < 2 {
|
||
logger.Printf("上一小时 (%s) 数据样本不足,跳过更新", lastHourStart.Format("2006-01-02 15:04"))
|
||
return
|
||
}
|
||
|
||
// 插入或更新小时雨量记录
|
||
_, err = db.Exec(`
|
||
INSERT OR REPLACE INTO hourly_rainfall
|
||
(hour_start, hour_end, rainfall, min_value, max_value, samples, created_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, datetime('now'))
|
||
`,
|
||
lastHourStart.Format("2006-01-02 15:04:05"),
|
||
lastHourEnd.Format("2006-01-02 15:04:05"),
|
||
rainfall.Int64,
|
||
minValue.Int64,
|
||
maxValue.Int64,
|
||
samples.Int64)
|
||
|
||
if err != nil {
|
||
logger.Printf("更新小时雨量记录错误: %v", err)
|
||
return
|
||
}
|
||
|
||
logger.Printf("已更新 %s 小时的雨量数据: %.1fmm (%d 个样本)",
|
||
lastHourStart.Format("2006-01-02 15:04"),
|
||
float64(rainfall.Int64)/10.0,
|
||
samples.Int64)
|
||
}
|
||
|
||
// 获取最近24小时的降雨数据
|
||
func getRecentHourlyRainfall(hours int) ([]HourlyRainfall, error) {
|
||
if hours <= 0 {
|
||
hours = 24
|
||
}
|
||
|
||
rows, err := db.Query(`
|
||
SELECT hour_start, rainfall, samples
|
||
FROM hourly_rainfall
|
||
ORDER BY hour_start DESC
|
||
LIMIT ?
|
||
`, hours)
|
||
|
||
if err != nil {
|
||
return nil, fmt.Errorf("查询小时雨量错误: %v", err)
|
||
}
|
||
defer rows.Close()
|
||
|
||
var result []HourlyRainfall
|
||
for rows.Next() {
|
||
var hr HourlyRainfall
|
||
var hourStart string
|
||
|
||
err := rows.Scan(&hourStart, &hr.Rainfall, &hr.Samples)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("读取小时雨量错误: %v", err)
|
||
}
|
||
|
||
// 解析时间
|
||
hr.HourStart, _ = time.Parse("2006-01-02 15:04:05", hourStart)
|
||
|
||
result = append(result, hr)
|
||
}
|
||
|
||
// 反转数组,使其按时间顺序排列
|
||
for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
|
||
result[i], result[j] = result[j], result[i]
|
||
}
|
||
|
||
return result, nil
|
||
}
|
||
|
||
// 小时雨量数据结构
|
||
type HourlyRainfall struct {
|
||
HourStart time.Time
|
||
Rainfall int64
|
||
Samples int64
|
||
}
|