2025-05-15 18:38:18 +08:00

222 lines
5.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package dao
import (
"database/sql"
"math"
"sort"
"time"
"go_rain_dtu/internal/model"
"go_rain_dtu/pkg/logger"
)
type SensorDAO struct {
db *sql.DB
}
func NewSensorDAO(db *sql.DB) *SensorDAO {
return &SensorDAO{db: db}
}
// 插入传感器数据
func (dao *SensorDAO) Insert(data *model.SensorData) error {
query := `
INSERT INTO sensor_data (
timestamp, wind_speed, wind_force, wind_direction_8,
wind_direction_360, humidity, temperature, atm_pressure,
solar_radiation, rainfall
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
_, err := dao.db.Exec(query,
data.Timestamp, data.WindSpeed, data.WindForce, data.WindDirection8,
data.WindDirection360, data.Humidity, data.Temperature, data.AtmPressure,
data.SolarRadiation, data.Rainfall,
)
if err != nil {
logger.Logger.Printf("插入传感器数据失败: %v", err)
return err
}
return nil
}
// 获取聚合数据
func (dao *SensorDAO) GetAggregatedData(start, end time.Time, interval string) ([]model.AggregatedData, error) {
// 参数化时间间隔
var intervalMinutes int
switch interval {
case "1min":
intervalMinutes = 1
case "5min":
intervalMinutes = 5
case "30min":
intervalMinutes = 30
default: // 1hour
intervalMinutes = 60
}
// 使用时间桶函数创建标准化的时间戳并将其包含在GROUP BY子句中
query := `
SELECT
DATE_FORMAT(
DATE_ADD(
DATE(timestamp),
INTERVAL (HOUR(timestamp) * 60 + FLOOR(MINUTE(timestamp) / ?) * ?) MINUTE
),
'%Y-%m-%dT%H:%i:00'
) AS bucket_time,
ROUND(AVG(temperature)/10, 1) AS avg_temp,
MAX(rainfall) - MIN(rainfall) AS rainfall,
ROUND(AVG(humidity)/10, 1) AS avg_humidity,
ROUND(AVG(wind_speed)/100, 1) AS avg_wind_speed,
ROUND(AVG(atm_pressure)/10, 1) AS avg_atm_pressure,
ROUND(AVG(solar_radiation), 1) AS avg_solar_radiation
FROM sensor_data
WHERE timestamp BETWEEN ? AND ?
GROUP BY bucket_time
ORDER BY bucket_time DESC
`
// 执行查询
rows, err := dao.db.Query(
query,
intervalMinutes, intervalMinutes, // 时间桶计算参数
start, end) // 时间范围
if err != nil {
logger.Logger.Printf("查询聚合数据失败: %v (间隔=%s, 开始=%s, 结束=%s)",
err, interval, start.Format(time.RFC3339), end.Format(time.RFC3339))
return nil, err
}
defer rows.Close()
var result []model.AggregatedData
for rows.Next() {
var data model.AggregatedData
var tsStr string
err := rows.Scan(&tsStr, &data.AvgTemperature, &data.Rainfall,
&data.AvgHumidity, &data.AvgWindSpeed, &data.AvgAtmPressure, &data.AvgSolarRadiation)
if err != nil {
logger.Logger.Printf("扫描数据行失败: %v", err)
continue
}
// 解析ISO格式的时间字符串
data.Timestamp, err = time.Parse("2006-01-02T15:04:00", tsStr)
if err != nil {
logger.Logger.Printf("解析时间失败: %v, 原始字符串: %s", err, tsStr)
continue
}
result = append(result, data)
}
// 检查是否有游标错误
if err = rows.Err(); err != nil {
logger.Logger.Printf("查询游标错误: %v", err)
return nil, err
}
// 如果是1分钟粒度且数据不足可能需要进行插值
if intervalMinutes == 1 && len(result) > 0 {
result = fillMissingMinutes(result, start, end)
}
logger.Logger.Printf("聚合查询成功: 返回%d条记录 (间隔=%s)", len(result), interval)
return result, nil
}
// fillMissingMinutes 填充缺失的分钟数据点
func fillMissingMinutes(data []model.AggregatedData, start, end time.Time) []model.AggregatedData {
// 只有在数据点少于应有数量的一半时才进行插值
expectedPoints := int(end.Sub(start).Minutes()) + 1
if len(data) > expectedPoints/2 {
return data // 数据点足够多,不需要插值
}
// 创建一个映射,用于快速查找已有数据点
dataMap := make(map[int64]model.AggregatedData)
for _, point := range data {
// 将时间转换为Unix时间戳便于比较
key := point.Timestamp.Unix()
dataMap[key] = point
}
// 初始化结果数组,预分配足够的空间
result := make([]model.AggregatedData, 0, expectedPoints)
// 创建按时间排序的索引
var timestamps []int64
for k := range dataMap {
timestamps = append(timestamps, k)
}
sort.Slice(timestamps, func(i, j int) bool {
return timestamps[i] < timestamps[j]
})
// 如果没有数据点,直接返回
if len(timestamps) == 0 {
return data
}
// 获取第一个和最后一个数据点
firstPointTime := time.Unix(timestamps[0], 0)
lastPointTime := time.Unix(timestamps[len(timestamps)-1], 0)
// 调整开始和结束时间,确保它们不会超出实际数据范围
if start.Before(firstPointTime) {
start = firstPointTime
}
if end.After(lastPointTime) {
end = lastPointTime
}
// 将start向下取整到整分钟
start = time.Date(start.Year(), start.Month(), start.Day(), start.Hour(), start.Minute(), 0, 0, start.Location())
// 按分钟迭代时间范围
for t := start; !t.After(end); t = t.Add(time.Minute) {
key := t.Unix()
if point, exists := dataMap[key]; exists {
// 使用现有数据点
result = append(result, point)
} else {
// 使用最近的有效数据点进行插值
nearestPoint := findNearestPoint(t, timestamps, dataMap)
if !nearestPoint.Timestamp.IsZero() {
// 创建新的数据点
newPoint := nearestPoint
newPoint.Timestamp = t
result = append(result, newPoint)
}
}
}
return result
}
// findNearestPoint 查找最接近指定时间的数据点
func findNearestPoint(t time.Time, timestamps []int64, dataMap map[int64]model.AggregatedData) model.AggregatedData {
target := t.Unix()
nearest := int64(0)
minDiff := int64(math.MaxInt64)
for _, ts := range timestamps {
diff := int64(math.Abs(float64(ts - target)))
if diff < minDiff {
minDiff = diff
nearest = ts
}
}
// 如果最近的点超过10分钟返回空值
if minDiff > 10*60 {
return model.AggregatedData{}
}
return dataMap[nearest]
}