package dao import ( "database/sql" "math" "sort" "time" "go_rain_dtu/internal/model" "go_rain_dtu/pkg/logger" ) type SensorDAO struct { db *sql.DB } func NewSensorDAO(db *sql.DB) *SensorDAO { return &SensorDAO{db: db} } // 插入传感器数据 func (dao *SensorDAO) Insert(data *model.SensorData) error { query := ` INSERT INTO sensor_data ( timestamp, wind_speed, wind_force, wind_direction_8, wind_direction_360, humidity, temperature, atm_pressure, solar_radiation, rainfall ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` _, err := dao.db.Exec(query, data.Timestamp, data.WindSpeed, data.WindForce, data.WindDirection8, data.WindDirection360, data.Humidity, data.Temperature, data.AtmPressure, data.SolarRadiation, data.OpticalRain, ) if err != nil { logger.Logger.Printf("插入传感器数据失败: %v", err) return err } return nil } // 获取聚合数据 func (dao *SensorDAO) GetAggregatedData(start, end time.Time, interval string) ([]model.AggregatedData, error) { // 参数化时间间隔 var intervalMinutes int switch interval { case "1min": intervalMinutes = 1 case "5min": intervalMinutes = 5 case "30min": intervalMinutes = 30 default: // 1hour intervalMinutes = 60 } // 使用时间桶函数创建标准化的时间戳,并将其包含在GROUP BY子句中 query := ` SELECT DATE_FORMAT( DATE_ADD( DATE(timestamp), INTERVAL (HOUR(timestamp) * 60 + FLOOR(MINUTE(timestamp) / ?) * ?) MINUTE ), '%Y-%m-%dT%H:%i:00' ) AS bucket_time, ROUND(AVG(temperature)/10, 1) AS avg_temp, ROUND(MAX(rainfall)/10, 1) AS rainfall, ROUND(AVG(humidity)/10, 1) AS avg_humidity, ROUND(AVG(wind_speed)/100, 1) AS avg_wind_speed, ROUND(AVG(atm_pressure)/10, 1) AS avg_atm_pressure, ROUND(AVG(solar_radiation), 1) AS avg_solar_radiation FROM sensor_data WHERE timestamp BETWEEN ? AND ? GROUP BY bucket_time ORDER BY bucket_time DESC ` // 执行查询 rows, err := dao.db.Query( query, intervalMinutes, intervalMinutes, // 时间桶计算参数 start, end) // 时间范围 if err != nil { logger.Logger.Printf("查询聚合数据失败: %v (间隔=%s, 开始=%s, 结束=%s)", err, interval, start.Format(time.RFC3339), end.Format(time.RFC3339)) return nil, err } defer rows.Close() var result []model.AggregatedData for rows.Next() { var data model.AggregatedData var tsStr string err := rows.Scan(&tsStr, &data.AvgTemperature, &data.Rainfall, &data.AvgHumidity, &data.AvgWindSpeed, &data.AvgAtmPressure, &data.AvgSolarRadiation) if err != nil { logger.Logger.Printf("扫描数据行失败: %v", err) continue } // 解析ISO格式的时间字符串 data.Timestamp, err = time.Parse("2006-01-02T15:04:00", tsStr) if err != nil { logger.Logger.Printf("解析时间失败: %v, 原始字符串: %s", err, tsStr) continue } result = append(result, data) } // 检查是否有游标错误 if err = rows.Err(); err != nil { logger.Logger.Printf("查询游标错误: %v", err) return nil, err } // 如果是1分钟粒度,且数据不足,可能需要进行插值 if intervalMinutes == 1 && len(result) > 0 { result = fillMissingMinutes(result, start, end) } logger.Logger.Printf("聚合查询成功: 返回%d条记录 (间隔=%s)", len(result), interval) return result, nil } // fillMissingMinutes 填充缺失的分钟数据点 func fillMissingMinutes(data []model.AggregatedData, start, end time.Time) []model.AggregatedData { // 只有在数据点少于应有数量的一半时才进行插值 expectedPoints := int(end.Sub(start).Minutes()) + 1 if len(data) > expectedPoints/2 { return data // 数据点足够多,不需要插值 } // 创建一个映射,用于快速查找已有数据点 dataMap := make(map[int64]model.AggregatedData) for _, point := range data { // 将时间转换为Unix时间戳(秒),便于比较 key := point.Timestamp.Unix() dataMap[key] = point } // 初始化结果数组,预分配足够的空间 result := make([]model.AggregatedData, 0, expectedPoints) // 创建按时间排序的索引 var timestamps []int64 for k := range dataMap { timestamps = append(timestamps, k) } sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] }) // 如果没有数据点,直接返回 if len(timestamps) == 0 { return data } // 获取第一个和最后一个数据点 firstPointTime := time.Unix(timestamps[0], 0) lastPointTime := time.Unix(timestamps[len(timestamps)-1], 0) // 调整开始和结束时间,确保它们不会超出实际数据范围 if start.Before(firstPointTime) { start = firstPointTime } if end.After(lastPointTime) { end = lastPointTime } // 将start向下取整到整分钟 start = time.Date(start.Year(), start.Month(), start.Day(), start.Hour(), start.Minute(), 0, 0, start.Location()) // 按分钟迭代时间范围 for t := start; !t.After(end); t = t.Add(time.Minute) { key := t.Unix() if point, exists := dataMap[key]; exists { // 使用现有数据点 result = append(result, point) } else { // 使用最近的有效数据点进行插值 nearestPoint := findNearestPoint(t, timestamps, dataMap) if !nearestPoint.Timestamp.IsZero() { // 创建新的数据点 newPoint := nearestPoint newPoint.Timestamp = t result = append(result, newPoint) } } } return result } // findNearestPoint 查找最接近指定时间的数据点 func findNearestPoint(t time.Time, timestamps []int64, dataMap map[int64]model.AggregatedData) model.AggregatedData { target := t.Unix() nearest := int64(0) minDiff := int64(math.MaxInt64) for _, ts := range timestamps { diff := int64(math.Abs(float64(ts - target))) if diff < minDiff { minDiff = diff nearest = ts } } // 如果最近的点超过10分钟,返回空值 if minDiff > 10*60 { return model.AggregatedData{} } return dataMap[nearest] } // GetLatestSensorData 获取最新的一条传感器数据 func (dao *SensorDAO) GetLatestSensorData() (*model.SensorData, error) { query := ` SELECT id, timestamp, wind_speed, wind_force, wind_direction_8, wind_direction_360, humidity, temperature, atm_pressure, solar_radiation, rainfall FROM sensor_data ORDER BY timestamp DESC LIMIT 1 ` var data model.SensorData err := dao.db.QueryRow(query).Scan( &data.ID, &data.Timestamp, &data.WindSpeed, &data.WindForce, &data.WindDirection8, &data.WindDirection360, &data.Humidity, &data.Temperature, &data.AtmPressure, &data.SolarRadiation, &data.OpticalRain, ) if err != nil { if err == sql.ErrNoRows { logger.Logger.Printf("未找到传感器数据") return nil, nil } logger.Logger.Printf("获取最新传感器数据失败: %v", err) return nil, err } return &data, nil } // GetRawData 获取指定时间范围内的原始传感器数据 func (dao *SensorDAO) GetRawData(start, end time.Time) ([]model.AggregatedData, error) { query := ` SELECT timestamp, temperature/10 AS temperature, rainfall/10 AS rainfall, humidity/10 AS humidity, wind_speed/100 AS wind_speed, atm_pressure/10 AS atm_pressure, solar_radiation FROM sensor_data WHERE timestamp BETWEEN ? AND ? ORDER BY timestamp DESC ` // 执行查询 rows, err := dao.db.Query(query, start, end) if err != nil { logger.Logger.Printf("查询原始数据失败: %v (开始=%s, 结束=%s)", err, start.Format(time.RFC3339), end.Format(time.RFC3339)) return nil, err } defer rows.Close() var result []model.AggregatedData for rows.Next() { var data model.AggregatedData err := rows.Scan( &data.Timestamp, &data.AvgTemperature, &data.Rainfall, &data.AvgHumidity, &data.AvgWindSpeed, &data.AvgAtmPressure, &data.AvgSolarRadiation) if err != nil { logger.Logger.Printf("扫描数据行失败: %v", err) continue } result = append(result, data) } // 检查是否有游标错误 if err = rows.Err(); err != nil { logger.Logger.Printf("查询游标错误: %v", err) return nil, err } logger.Logger.Printf("原始数据查询成功: 返回%d条记录", len(result)) return result, nil }