fix:修改粒度解析

This commit is contained in:
fengyarnom 2025-05-16 15:29:06 +08:00
parent e7d70b32e0
commit 8fd2ccd7e8

View File

@ -58,69 +58,92 @@ func (dao *SensorDAO) GetAggregatedData(start, end time.Time, interval string) (
intervalMinutes = 60
}
// 使用时间桶函数创建标准化的时间戳并将其包含在GROUP BY子句中
query := `
SELECT
DATE_FORMAT(
DATE_ADD(
DATE(timestamp),
INTERVAL (HOUR(timestamp) * 60 + FLOOR(MINUTE(timestamp) / ?) * ?) MINUTE
),
'%Y-%m-%dT%H:%i:00'
) AS bucket_time,
ROUND(AVG(temperature)/10, 1) AS avg_temp,
ROUND(MAX(rainfall)/10, 1) AS rainfall,
ROUND(AVG(humidity)/10, 1) AS avg_humidity,
ROUND(AVG(wind_speed)/100, 1) AS avg_wind_speed,
ROUND(AVG(atm_pressure)/10, 1) AS avg_atm_pressure,
ROUND(AVG(solar_radiation), 1) AS avg_solar_radiation
FROM sensor_data
WHERE timestamp BETWEEN ? AND ?
GROUP BY bucket_time
ORDER BY bucket_time DESC
`
// 对于每个时间桶,我们需要找到该时间桶内最接近结束时间的记录
// 首先,生成所有的时间桶
var buckets []time.Time
current := start
for current.Before(end) || current.Equal(end) {
buckets = append(buckets, current)
// 移动到下一个时间桶
current = current.Add(time.Duration(intervalMinutes) * time.Minute)
}
// 执行查询
rows, err := dao.db.Query(
query,
intervalMinutes, intervalMinutes, // 时间桶计算参数
start, end) // 时间范围
// 如果没有时间桶,直接返回空结果
if len(buckets) == 0 {
return []model.AggregatedData{}, nil
}
// 查询原始数据
rawData, err := dao.GetRawData(start, end)
if err != nil {
logger.Logger.Printf("查询聚合数据失败: %v (间隔=%s, 开始=%s, 结束=%s)",
err, interval, start.Format(time.RFC3339), end.Format(time.RFC3339))
return nil, err
}
defer rows.Close()
// 按时间戳排序原始数据
sort.Slice(rawData, func(i, j int) bool {
return rawData[i].Timestamp.Before(rawData[j].Timestamp)
})
// 创建结果数组
var result []model.AggregatedData
for rows.Next() {
var data model.AggregatedData
var tsStr string
err := rows.Scan(&tsStr, &data.AvgTemperature, &data.Rainfall,
&data.AvgHumidity, &data.AvgWindSpeed, &data.AvgAtmPressure, &data.AvgSolarRadiation)
if err != nil {
logger.Logger.Printf("扫描数据行失败: %v", err)
// 对每个时间桶进行处理
for i, bucketTime := range buckets {
var nextBucketTime time.Time
if i < len(buckets)-1 {
nextBucketTime = buckets[i+1]
} else {
nextBucketTime = end.Add(time.Second)
}
// 收集该时间桶内的所有数据点
var bucketData []model.AggregatedData
for _, data := range rawData {
if (data.Timestamp.Equal(bucketTime) || data.Timestamp.After(bucketTime)) &&
data.Timestamp.Before(nextBucketTime) {
bucketData = append(bucketData, data)
}
}
if len(bucketData) == 0 {
// 如果该时间桶内没有数据,尝试使用上一个时间桶的最后数据
if i > 0 && len(result) > 0 {
lastBucketData := result[len(result)-1]
// 复制上一个时间桶的数据,但更新时间戳
newData := lastBucketData
newData.Timestamp = bucketTime
result = append(result, newData)
}
continue
}
// 解析ISO格式的时间字符串
data.Timestamp, err = time.Parse("2006-01-02T15:04:00", tsStr)
if err != nil {
logger.Logger.Printf("解析时间失败: %v, 原始字符串: %s", err, tsStr)
continue
// 计算该时间桶的聚合数据
var aggregated model.AggregatedData
aggregated.Timestamp = bucketTime
// 温度、湿度、风速、大气压和太阳辐射使用平均值
var sumTemp, sumHumidity, sumWindSpeed, sumAtmPressure, sumSolarRadiation float64
for _, data := range bucketData {
sumTemp += data.AvgTemperature
sumHumidity += data.AvgHumidity
sumWindSpeed += data.AvgWindSpeed
sumAtmPressure += data.AvgAtmPressure
sumSolarRadiation += data.AvgSolarRadiation
}
count := float64(len(bucketData))
aggregated.AvgTemperature = sumTemp / count
aggregated.AvgHumidity = sumHumidity / count
aggregated.AvgWindSpeed = sumWindSpeed / count
aggregated.AvgAtmPressure = sumAtmPressure / count
aggregated.AvgSolarRadiation = sumSolarRadiation / count
// 对于雨量,使用时间桶内最后一个(最新的)数据点的值
aggregated.Rainfall = bucketData[len(bucketData)-1].Rainfall
result = append(result, aggregated)
}
result = append(result, data)
}
// 检查是否有游标错误
if err = rows.Err(); err != nil {
logger.Logger.Printf("查询游标错误: %v", err)
return nil, err
}
// 如果是1分钟粒度且数据不足可能需要进行插值
// 检查是否需要进行插值处理
if intervalMinutes == 1 && len(result) > 0 {
result = fillMissingMinutes(result, start, end)
}