341 lines
9.4 KiB
Go
341 lines
9.4 KiB
Go
package dao
|
||
|
||
import (
|
||
"database/sql"
|
||
"math"
|
||
"sort"
|
||
"time"
|
||
|
||
"go_rain_dtu/internal/model"
|
||
"go_rain_dtu/pkg/logger"
|
||
)
|
||
|
||
type SensorDAO struct {
|
||
db *sql.DB
|
||
}
|
||
|
||
func NewSensorDAO(db *sql.DB) *SensorDAO {
|
||
return &SensorDAO{db: db}
|
||
}
|
||
|
||
// 插入传感器数据
|
||
func (dao *SensorDAO) Insert(data *model.SensorData) error {
|
||
query := `
|
||
INSERT INTO sensor_data (
|
||
timestamp, wind_speed, wind_force, wind_direction_8,
|
||
wind_direction_360, humidity, temperature, atm_pressure,
|
||
solar_radiation, rainfall
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
`
|
||
|
||
_, err := dao.db.Exec(query,
|
||
data.Timestamp, data.WindSpeed, data.WindForce, data.WindDirection8,
|
||
data.WindDirection360, data.Humidity, data.Temperature, data.AtmPressure,
|
||
data.SolarRadiation, data.OpticalRain,
|
||
)
|
||
|
||
if err != nil {
|
||
logger.Logger.Printf("插入传感器数据失败: %v", err)
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// 获取聚合数据
|
||
func (dao *SensorDAO) GetAggregatedData(start, end time.Time, interval string) ([]model.AggregatedData, error) {
|
||
// 参数化时间间隔
|
||
var intervalMinutes int
|
||
|
||
switch interval {
|
||
case "1min":
|
||
intervalMinutes = 1
|
||
case "5min":
|
||
intervalMinutes = 5
|
||
case "30min":
|
||
intervalMinutes = 30
|
||
default: // 1hour
|
||
intervalMinutes = 60
|
||
}
|
||
|
||
// 对于每个时间桶,我们需要找到该时间桶内最接近结束时间的记录
|
||
// 首先,生成所有的时间桶
|
||
var buckets []time.Time
|
||
current := start
|
||
for current.Before(end) || current.Equal(end) {
|
||
buckets = append(buckets, current)
|
||
// 移动到下一个时间桶
|
||
current = current.Add(time.Duration(intervalMinutes) * time.Minute)
|
||
}
|
||
|
||
// 如果没有时间桶,直接返回空结果
|
||
if len(buckets) == 0 {
|
||
return []model.AggregatedData{}, nil
|
||
}
|
||
|
||
// 查询原始数据
|
||
rawData, err := dao.GetRawData(start, end)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 按时间戳排序原始数据
|
||
sort.Slice(rawData, func(i, j int) bool {
|
||
return rawData[i].Timestamp.Before(rawData[j].Timestamp)
|
||
})
|
||
|
||
// 创建结果数组
|
||
var result []model.AggregatedData
|
||
|
||
// 对每个时间桶进行处理
|
||
for i, bucketTime := range buckets {
|
||
var nextBucketTime time.Time
|
||
if i < len(buckets)-1 {
|
||
nextBucketTime = buckets[i+1]
|
||
} else {
|
||
nextBucketTime = end.Add(time.Second)
|
||
}
|
||
|
||
// 收集该时间桶内的所有数据点
|
||
var bucketData []model.AggregatedData
|
||
for _, data := range rawData {
|
||
if (data.Timestamp.Equal(bucketTime) || data.Timestamp.After(bucketTime)) &&
|
||
data.Timestamp.Before(nextBucketTime) {
|
||
bucketData = append(bucketData, data)
|
||
}
|
||
}
|
||
|
||
if len(bucketData) == 0 {
|
||
// 如果该时间桶内没有数据,尝试使用上一个时间桶的最后数据
|
||
if i > 0 && len(result) > 0 {
|
||
lastBucketData := result[len(result)-1]
|
||
// 复制上一个时间桶的数据,但更新时间戳
|
||
newData := lastBucketData
|
||
newData.Timestamp = bucketTime
|
||
result = append(result, newData)
|
||
}
|
||
continue
|
||
}
|
||
|
||
// 计算该时间桶的聚合数据
|
||
var aggregated model.AggregatedData
|
||
aggregated.Timestamp = bucketTime
|
||
|
||
// 温度、湿度、风速、大气压和太阳辐射使用平均值
|
||
var sumTemp, sumHumidity, sumWindSpeed, sumAtmPressure, sumSolarRadiation float64
|
||
for _, data := range bucketData {
|
||
sumTemp += data.AvgTemperature
|
||
sumHumidity += data.AvgHumidity
|
||
sumWindSpeed += data.AvgWindSpeed
|
||
sumAtmPressure += data.AvgAtmPressure
|
||
sumSolarRadiation += data.AvgSolarRadiation
|
||
}
|
||
count := float64(len(bucketData))
|
||
aggregated.AvgTemperature = sumTemp / count
|
||
aggregated.AvgHumidity = sumHumidity / count
|
||
aggregated.AvgWindSpeed = sumWindSpeed / count
|
||
aggregated.AvgAtmPressure = sumAtmPressure / count
|
||
aggregated.AvgSolarRadiation = sumSolarRadiation / count
|
||
|
||
// 对于雨量,使用时间桶内最后一个(最新的)数据点的值
|
||
aggregated.Rainfall = bucketData[len(bucketData)-1].Rainfall
|
||
|
||
// 计算新增雨量(相对于前一个时间桶)
|
||
if i > 0 && len(result) > 0 {
|
||
prevRainfall := result[len(result)-1].Rainfall
|
||
// 如果当前雨量大于前一个时间桶的雨量,计算差值作为新增雨量
|
||
if aggregated.Rainfall > prevRainfall {
|
||
aggregated.Rainfall = aggregated.Rainfall - prevRainfall
|
||
} else if aggregated.Rainfall == prevRainfall {
|
||
// 如果雨量没有变化,表示没有新增雨量
|
||
aggregated.Rainfall = 0
|
||
}
|
||
// 如果当前雨量小于前一个时间桶的雨量(可能是雨量计重置),保留当前值作为新增雨量
|
||
}
|
||
|
||
result = append(result, aggregated)
|
||
}
|
||
|
||
// 检查是否需要进行插值处理
|
||
if intervalMinutes == 1 && len(result) > 0 {
|
||
result = fillMissingMinutes(result, start, end)
|
||
}
|
||
|
||
logger.Logger.Printf("聚合查询成功: 返回%d条记录 (间隔=%s)", len(result), interval)
|
||
return result, nil
|
||
}
|
||
|
||
// fillMissingMinutes 填充缺失的分钟数据点
|
||
func fillMissingMinutes(data []model.AggregatedData, start, end time.Time) []model.AggregatedData {
|
||
// 只有在数据点少于应有数量的一半时才进行插值
|
||
expectedPoints := int(end.Sub(start).Minutes()) + 1
|
||
if len(data) > expectedPoints/2 {
|
||
return data // 数据点足够多,不需要插值
|
||
}
|
||
|
||
// 创建一个映射,用于快速查找已有数据点
|
||
dataMap := make(map[int64]model.AggregatedData)
|
||
for _, point := range data {
|
||
// 将时间转换为Unix时间戳(秒),便于比较
|
||
key := point.Timestamp.Unix()
|
||
dataMap[key] = point
|
||
}
|
||
|
||
// 初始化结果数组,预分配足够的空间
|
||
result := make([]model.AggregatedData, 0, expectedPoints)
|
||
|
||
// 创建按时间排序的索引
|
||
var timestamps []int64
|
||
for k := range dataMap {
|
||
timestamps = append(timestamps, k)
|
||
}
|
||
sort.Slice(timestamps, func(i, j int) bool {
|
||
return timestamps[i] < timestamps[j]
|
||
})
|
||
|
||
// 如果没有数据点,直接返回
|
||
if len(timestamps) == 0 {
|
||
return data
|
||
}
|
||
|
||
// 获取第一个和最后一个数据点
|
||
firstPointTime := time.Unix(timestamps[0], 0)
|
||
lastPointTime := time.Unix(timestamps[len(timestamps)-1], 0)
|
||
|
||
// 调整开始和结束时间,确保它们不会超出实际数据范围
|
||
if start.Before(firstPointTime) {
|
||
start = firstPointTime
|
||
}
|
||
if end.After(lastPointTime) {
|
||
end = lastPointTime
|
||
}
|
||
|
||
// 将start向下取整到整分钟
|
||
start = time.Date(start.Year(), start.Month(), start.Day(), start.Hour(), start.Minute(), 0, 0, start.Location())
|
||
|
||
// 按分钟迭代时间范围
|
||
for t := start; !t.After(end); t = t.Add(time.Minute) {
|
||
key := t.Unix()
|
||
if point, exists := dataMap[key]; exists {
|
||
// 使用现有数据点
|
||
result = append(result, point)
|
||
} else {
|
||
// 使用最近的有效数据点进行插值
|
||
nearestPoint := findNearestPoint(t, timestamps, dataMap)
|
||
if !nearestPoint.Timestamp.IsZero() {
|
||
// 创建新的数据点
|
||
newPoint := nearestPoint
|
||
newPoint.Timestamp = t
|
||
result = append(result, newPoint)
|
||
}
|
||
}
|
||
}
|
||
|
||
return result
|
||
}
|
||
|
||
// findNearestPoint 查找最接近指定时间的数据点
|
||
func findNearestPoint(t time.Time, timestamps []int64, dataMap map[int64]model.AggregatedData) model.AggregatedData {
|
||
target := t.Unix()
|
||
nearest := int64(0)
|
||
minDiff := int64(math.MaxInt64)
|
||
|
||
for _, ts := range timestamps {
|
||
diff := int64(math.Abs(float64(ts - target)))
|
||
if diff < minDiff {
|
||
minDiff = diff
|
||
nearest = ts
|
||
}
|
||
}
|
||
|
||
// 如果最近的点超过10分钟,返回空值
|
||
if minDiff > 10*60 {
|
||
return model.AggregatedData{}
|
||
}
|
||
|
||
return dataMap[nearest]
|
||
}
|
||
|
||
// GetLatestSensorData 获取最新的一条传感器数据
|
||
func (dao *SensorDAO) GetLatestSensorData() (*model.SensorData, error) {
|
||
query := `
|
||
SELECT id, timestamp, wind_speed, wind_force, wind_direction_8,
|
||
wind_direction_360, humidity, temperature, atm_pressure,
|
||
solar_radiation, rainfall
|
||
FROM sensor_data
|
||
ORDER BY timestamp DESC
|
||
LIMIT 1
|
||
`
|
||
|
||
var data model.SensorData
|
||
err := dao.db.QueryRow(query).Scan(
|
||
&data.ID, &data.Timestamp, &data.WindSpeed, &data.WindForce, &data.WindDirection8,
|
||
&data.WindDirection360, &data.Humidity, &data.Temperature, &data.AtmPressure,
|
||
&data.SolarRadiation, &data.OpticalRain,
|
||
)
|
||
|
||
if err != nil {
|
||
if err == sql.ErrNoRows {
|
||
logger.Logger.Printf("未找到传感器数据")
|
||
return nil, nil
|
||
}
|
||
logger.Logger.Printf("获取最新传感器数据失败: %v", err)
|
||
return nil, err
|
||
}
|
||
|
||
return &data, nil
|
||
}
|
||
|
||
// GetRawData 获取指定时间范围内的原始传感器数据
|
||
func (dao *SensorDAO) GetRawData(start, end time.Time) ([]model.AggregatedData, error) {
|
||
query := `
|
||
SELECT
|
||
timestamp,
|
||
temperature/10 AS temperature,
|
||
rainfall/10 AS rainfall,
|
||
humidity/10 AS humidity,
|
||
wind_speed/100 AS wind_speed,
|
||
atm_pressure/10 AS atm_pressure,
|
||
solar_radiation
|
||
FROM sensor_data
|
||
WHERE timestamp BETWEEN ? AND ?
|
||
ORDER BY timestamp DESC
|
||
`
|
||
|
||
// 执行查询
|
||
rows, err := dao.db.Query(query, start, end)
|
||
if err != nil {
|
||
logger.Logger.Printf("查询原始数据失败: %v (开始=%s, 结束=%s)",
|
||
err, start.Format(time.RFC3339), end.Format(time.RFC3339))
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
|
||
var result []model.AggregatedData
|
||
for rows.Next() {
|
||
var data model.AggregatedData
|
||
err := rows.Scan(
|
||
&data.Timestamp,
|
||
&data.AvgTemperature,
|
||
&data.Rainfall,
|
||
&data.AvgHumidity,
|
||
&data.AvgWindSpeed,
|
||
&data.AvgAtmPressure,
|
||
&data.AvgSolarRadiation)
|
||
if err != nil {
|
||
logger.Logger.Printf("扫描数据行失败: %v", err)
|
||
continue
|
||
}
|
||
result = append(result, data)
|
||
}
|
||
|
||
// 检查是否有游标错误
|
||
if err = rows.Err(); err != nil {
|
||
logger.Logger.Printf("查询游标错误: %v", err)
|
||
return nil, err
|
||
}
|
||
|
||
logger.Logger.Printf("原始数据查询成功: 返回%d条记录", len(result))
|
||
return result, nil
|
||
}
|