diff --git a/internal/dao/sensor.go b/internal/dao/sensor.go index 0b3525d..8b7b5e8 100644 --- a/internal/dao/sensor.go +++ b/internal/dao/sensor.go @@ -58,69 +58,92 @@ func (dao *SensorDAO) GetAggregatedData(start, end time.Time, interval string) ( intervalMinutes = 60 } - // 使用时间桶函数创建标准化的时间戳,并将其包含在GROUP BY子句中 - query := ` - SELECT - DATE_FORMAT( - DATE_ADD( - DATE(timestamp), - INTERVAL (HOUR(timestamp) * 60 + FLOOR(MINUTE(timestamp) / ?) * ?) MINUTE - ), - '%Y-%m-%dT%H:%i:00' - ) AS bucket_time, - ROUND(AVG(temperature)/10, 1) AS avg_temp, - ROUND(MAX(rainfall)/10, 1) AS rainfall, - ROUND(AVG(humidity)/10, 1) AS avg_humidity, - ROUND(AVG(wind_speed)/100, 1) AS avg_wind_speed, - ROUND(AVG(atm_pressure)/10, 1) AS avg_atm_pressure, - ROUND(AVG(solar_radiation), 1) AS avg_solar_radiation - FROM sensor_data - WHERE timestamp BETWEEN ? AND ? - GROUP BY bucket_time - ORDER BY bucket_time DESC - ` + // 对于每个时间桶,我们需要找到该时间桶内最接近结束时间的记录 + // 首先,生成所有的时间桶 + var buckets []time.Time + current := start + for current.Before(end) || current.Equal(end) { + buckets = append(buckets, current) + // 移动到下一个时间桶 + current = current.Add(time.Duration(intervalMinutes) * time.Minute) + } - // 执行查询 - rows, err := dao.db.Query( - query, - intervalMinutes, intervalMinutes, // 时间桶计算参数 - start, end) // 时间范围 + // 如果没有时间桶,直接返回空结果 + if len(buckets) == 0 { + return []model.AggregatedData{}, nil + } + // 查询原始数据 + rawData, err := dao.GetRawData(start, end) if err != nil { - logger.Logger.Printf("查询聚合数据失败: %v (间隔=%s, 开始=%s, 结束=%s)", - err, interval, start.Format(time.RFC3339), end.Format(time.RFC3339)) return nil, err } - defer rows.Close() + // 按时间戳排序原始数据 + sort.Slice(rawData, func(i, j int) bool { + return rawData[i].Timestamp.Before(rawData[j].Timestamp) + }) + + // 创建结果数组 var result []model.AggregatedData - for rows.Next() { - var data model.AggregatedData - var tsStr string - err := rows.Scan(&tsStr, &data.AvgTemperature, &data.Rainfall, - &data.AvgHumidity, &data.AvgWindSpeed, &data.AvgAtmPressure, &data.AvgSolarRadiation) - if err != nil { - logger.Logger.Printf("扫描数据行失败: %v", err) + + // 对每个时间桶进行处理 + for i, bucketTime := range buckets { + var nextBucketTime time.Time + if i < len(buckets)-1 { + nextBucketTime = buckets[i+1] + } else { + nextBucketTime = end.Add(time.Second) + } + + // 收集该时间桶内的所有数据点 + var bucketData []model.AggregatedData + for _, data := range rawData { + if (data.Timestamp.Equal(bucketTime) || data.Timestamp.After(bucketTime)) && + data.Timestamp.Before(nextBucketTime) { + bucketData = append(bucketData, data) + } + } + + if len(bucketData) == 0 { + // 如果该时间桶内没有数据,尝试使用上一个时间桶的最后数据 + if i > 0 && len(result) > 0 { + lastBucketData := result[len(result)-1] + // 复制上一个时间桶的数据,但更新时间戳 + newData := lastBucketData + newData.Timestamp = bucketTime + result = append(result, newData) + } continue } - // 解析ISO格式的时间字符串 - data.Timestamp, err = time.Parse("2006-01-02T15:04:00", tsStr) - if err != nil { - logger.Logger.Printf("解析时间失败: %v, 原始字符串: %s", err, tsStr) - continue + // 计算该时间桶的聚合数据 + var aggregated model.AggregatedData + aggregated.Timestamp = bucketTime + + // 温度、湿度、风速、大气压和太阳辐射使用平均值 + var sumTemp, sumHumidity, sumWindSpeed, sumAtmPressure, sumSolarRadiation float64 + for _, data := range bucketData { + sumTemp += data.AvgTemperature + sumHumidity += data.AvgHumidity + sumWindSpeed += data.AvgWindSpeed + sumAtmPressure += data.AvgAtmPressure + sumSolarRadiation += data.AvgSolarRadiation } + count := float64(len(bucketData)) + aggregated.AvgTemperature = sumTemp / count + aggregated.AvgHumidity = sumHumidity / count + aggregated.AvgWindSpeed = sumWindSpeed / count + aggregated.AvgAtmPressure = sumAtmPressure / count + aggregated.AvgSolarRadiation = sumSolarRadiation / count - result = append(result, data) + // 对于雨量,使用时间桶内最后一个(最新的)数据点的值 + aggregated.Rainfall = bucketData[len(bucketData)-1].Rainfall + + result = append(result, aggregated) } - // 检查是否有游标错误 - if err = rows.Err(); err != nil { - logger.Logger.Printf("查询游标错误: %v", err) - return nil, err - } - - // 如果是1分钟粒度,且数据不足,可能需要进行插值 + // 检查是否需要进行插值处理 if intervalMinutes == 1 && len(result) > 0 { result = fillMissingMinutes(result, start, end) }