361 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package dao
import (
"database/sql"
"math"
"sort"
"time"
"go_rain_dtu/internal/model"
"go_rain_dtu/pkg/logger"
)
type SensorDAO struct {
db *sql.DB
}
func NewSensorDAO(db *sql.DB) *SensorDAO {
return &SensorDAO{db: db}
}
// 插入传感器数据
func (dao *SensorDAO) Insert(data *model.SensorData) error {
query := `
INSERT INTO sensor_data (
timestamp, wind_speed, wind_force, wind_direction_8,
wind_direction_360, humidity, temperature, atm_pressure,
solar_radiation, rainfall
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
_, err := dao.db.Exec(query,
data.Timestamp, data.WindSpeed, data.WindForce, data.WindDirection8,
data.WindDirection360, data.Humidity, data.Temperature, data.AtmPressure,
data.SolarRadiation, data.OpticalRain,
)
if err != nil {
logger.Logger.Printf("插入传感器数据失败: %v", err)
return err
}
return nil
}
// 获取聚合数据
func (dao *SensorDAO) GetAggregatedData(start, end time.Time, interval string) ([]model.AggregatedData, error) {
// 参数化时间间隔
var intervalMinutes int
switch interval {
case "1min":
intervalMinutes = 1
case "5min":
intervalMinutes = 5
case "30min":
intervalMinutes = 30
default: // 1hour
intervalMinutes = 60
}
// 对于每个时间桶,我们需要找到该时间桶内最接近结束时间的记录
// 首先,生成所有的时间桶
var buckets []time.Time
current := start
for current.Before(end) || current.Equal(end) {
buckets = append(buckets, current)
// 移动到下一个时间桶
current = current.Add(time.Duration(intervalMinutes) * time.Minute)
}
// 如果没有时间桶,直接返回空结果
if len(buckets) == 0 {
return []model.AggregatedData{}, nil
}
// 查询原始数据
rawData, err := dao.GetRawData(start, end)
if err != nil {
return nil, err
}
// 按时间戳排序原始数据
sort.Slice(rawData, func(i, j int) bool {
return rawData[i].Timestamp.Before(rawData[j].Timestamp)
})
// 创建结果数组
var result []model.AggregatedData
// 对每个时间桶进行处理
var lastKnownRainfall float64 = 0
var firstBucketProcessed bool = false
for i, bucketTime := range buckets {
var nextBucketTime time.Time
if i < len(buckets)-1 {
nextBucketTime = buckets[i+1]
} else {
nextBucketTime = end.Add(time.Second)
}
// 收集该时间桶内的所有数据点
var bucketData []model.AggregatedData
for _, data := range rawData {
if (data.Timestamp.Equal(bucketTime) || data.Timestamp.After(bucketTime)) &&
data.Timestamp.Before(nextBucketTime) {
bucketData = append(bucketData, data)
}
}
if len(bucketData) == 0 {
// 如果该时间桶内没有数据,表示没有新增雨量
var newData model.AggregatedData
newData.Timestamp = bucketTime
// 使用上一个有效时间桶的其他数据(温度等)
if i > 0 && len(result) > 0 {
lastBucketData := result[len(result)-1]
newData.AvgTemperature = lastBucketData.AvgTemperature
newData.AvgHumidity = lastBucketData.AvgHumidity
newData.AvgWindSpeed = lastBucketData.AvgWindSpeed
newData.AvgAtmPressure = lastBucketData.AvgAtmPressure
newData.AvgSolarRadiation = lastBucketData.AvgSolarRadiation
}
// 新增雨量为0
newData.Rainfall = 0
result = append(result, newData)
continue
}
// 计算该时间桶的聚合数据
var aggregated model.AggregatedData
aggregated.Timestamp = bucketTime
// 温度、湿度、风速、大气压和太阳辐射使用平均值
var sumTemp, sumHumidity, sumWindSpeed, sumAtmPressure, sumSolarRadiation float64
for _, data := range bucketData {
sumTemp += data.AvgTemperature
sumHumidity += data.AvgHumidity
sumWindSpeed += data.AvgWindSpeed
sumAtmPressure += data.AvgAtmPressure
sumSolarRadiation += data.AvgSolarRadiation
}
count := float64(len(bucketData))
aggregated.AvgTemperature = sumTemp / count
aggregated.AvgHumidity = sumHumidity / count
aggregated.AvgWindSpeed = sumWindSpeed / count
aggregated.AvgAtmPressure = sumAtmPressure / count
aggregated.AvgSolarRadiation = sumSolarRadiation / count
// 获取该时间桶内最后一个(最新的)数据点的累积雨量值
currentRainfall := bucketData[len(bucketData)-1].Rainfall
// 计算新增雨量
if !firstBucketProcessed {
// 第一个有数据的时间桶,记录累积雨量作为基准
lastKnownRainfall = currentRainfall
// 如果是第一个时间桶,新增雨量就是当前累积值
aggregated.Rainfall = currentRainfall
firstBucketProcessed = true
} else {
// 后续时间桶,计算相对于上一个已知累积雨量的差值
if currentRainfall > lastKnownRainfall {
// 如果当前累积雨量增加,计算新增部分
aggregated.Rainfall = currentRainfall - lastKnownRainfall
lastKnownRainfall = currentRainfall
} else if currentRainfall < lastKnownRainfall {
// 如果当前累积雨量减少(可能是雨量计重置),视为新一轮累积
aggregated.Rainfall = currentRainfall
lastKnownRainfall = currentRainfall
} else {
// 如果累积雨量没有变化,表示没有新增雨量
aggregated.Rainfall = 0
}
}
result = append(result, aggregated)
}
// 检查是否需要进行插值处理
if intervalMinutes == 1 && len(result) > 0 {
result = fillMissingMinutes(result, start, end)
}
logger.Logger.Printf("聚合查询成功: 返回%d条记录 (间隔=%s)", len(result), interval)
return result, nil
}
// fillMissingMinutes 填充缺失的分钟数据点
func fillMissingMinutes(data []model.AggregatedData, start, end time.Time) []model.AggregatedData {
// 只有在数据点少于应有数量的一半时才进行插值
expectedPoints := int(end.Sub(start).Minutes()) + 1
if len(data) > expectedPoints/2 {
return data // 数据点足够多,不需要插值
}
// 创建一个映射,用于快速查找已有数据点
dataMap := make(map[int64]model.AggregatedData)
for _, point := range data {
// 将时间转换为Unix时间戳便于比较
key := point.Timestamp.Unix()
dataMap[key] = point
}
// 初始化结果数组,预分配足够的空间
result := make([]model.AggregatedData, 0, expectedPoints)
// 创建按时间排序的索引
var timestamps []int64
for k := range dataMap {
timestamps = append(timestamps, k)
}
sort.Slice(timestamps, func(i, j int) bool {
return timestamps[i] < timestamps[j]
})
// 如果没有数据点,直接返回
if len(timestamps) == 0 {
return data
}
// 获取第一个和最后一个数据点
firstPointTime := time.Unix(timestamps[0], 0)
lastPointTime := time.Unix(timestamps[len(timestamps)-1], 0)
// 调整开始和结束时间,确保它们不会超出实际数据范围
if start.Before(firstPointTime) {
start = firstPointTime
}
if end.After(lastPointTime) {
end = lastPointTime
}
// 将start向下取整到整分钟
start = time.Date(start.Year(), start.Month(), start.Day(), start.Hour(), start.Minute(), 0, 0, start.Location())
// 按分钟迭代时间范围
for t := start; !t.After(end); t = t.Add(time.Minute) {
key := t.Unix()
if point, exists := dataMap[key]; exists {
// 使用现有数据点
result = append(result, point)
} else {
// 使用最近的有效数据点进行插值
nearestPoint := findNearestPoint(t, timestamps, dataMap)
if !nearestPoint.Timestamp.IsZero() {
// 创建新的数据点
newPoint := nearestPoint
newPoint.Timestamp = t
result = append(result, newPoint)
}
}
}
return result
}
// findNearestPoint 查找最接近指定时间的数据点
func findNearestPoint(t time.Time, timestamps []int64, dataMap map[int64]model.AggregatedData) model.AggregatedData {
target := t.Unix()
nearest := int64(0)
minDiff := int64(math.MaxInt64)
for _, ts := range timestamps {
diff := int64(math.Abs(float64(ts - target)))
if diff < minDiff {
minDiff = diff
nearest = ts
}
}
// 如果最近的点超过10分钟返回空值
if minDiff > 10*60 {
return model.AggregatedData{}
}
return dataMap[nearest]
}
// GetLatestSensorData 获取最新的一条传感器数据
func (dao *SensorDAO) GetLatestSensorData() (*model.SensorData, error) {
query := `
SELECT id, timestamp, wind_speed, wind_force, wind_direction_8,
wind_direction_360, humidity, temperature, atm_pressure,
solar_radiation, rainfall
FROM sensor_data
ORDER BY timestamp DESC
LIMIT 1
`
var data model.SensorData
err := dao.db.QueryRow(query).Scan(
&data.ID, &data.Timestamp, &data.WindSpeed, &data.WindForce, &data.WindDirection8,
&data.WindDirection360, &data.Humidity, &data.Temperature, &data.AtmPressure,
&data.SolarRadiation, &data.OpticalRain,
)
if err != nil {
if err == sql.ErrNoRows {
logger.Logger.Printf("未找到传感器数据")
return nil, nil
}
logger.Logger.Printf("获取最新传感器数据失败: %v", err)
return nil, err
}
return &data, nil
}
// GetRawData 获取指定时间范围内的原始传感器数据
func (dao *SensorDAO) GetRawData(start, end time.Time) ([]model.AggregatedData, error) {
query := `
SELECT
timestamp,
temperature/10 AS temperature,
rainfall/10 AS rainfall,
humidity/10 AS humidity,
wind_speed/100 AS wind_speed,
atm_pressure/10 AS atm_pressure,
solar_radiation
FROM sensor_data
WHERE timestamp BETWEEN ? AND ?
ORDER BY timestamp DESC
`
// 执行查询
rows, err := dao.db.Query(query, start, end)
if err != nil {
logger.Logger.Printf("查询原始数据失败: %v (开始=%s, 结束=%s)",
err, start.Format(time.RFC3339), end.Format(time.RFC3339))
return nil, err
}
defer rows.Close()
var result []model.AggregatedData
for rows.Next() {
var data model.AggregatedData
err := rows.Scan(
&data.Timestamp,
&data.AvgTemperature,
&data.Rainfall,
&data.AvgHumidity,
&data.AvgWindSpeed,
&data.AvgAtmPressure,
&data.AvgSolarRadiation)
if err != nil {
logger.Logger.Printf("扫描数据行失败: %v", err)
continue
}
result = append(result, data)
}
// 检查是否有游标错误
if err = rows.Err(); err != nil {
logger.Logger.Printf("查询游标错误: %v", err)
return nil, err
}
logger.Logger.Printf("原始数据查询成功: 返回%d条记录", len(result))
return result, nil
}