284 lines
7.7 KiB
Go
284 lines
7.7 KiB
Go
package db
|
||
|
||
import (
|
||
"database/sql"
|
||
"fmt"
|
||
_ "github.com/go-sql-driver/mysql"
|
||
"log"
|
||
"math"
|
||
"rain_monitor/models"
|
||
"time"
|
||
)
|
||
|
||
var db *sql.DB
|
||
|
||
type DBConfig struct {
|
||
Host string
|
||
Port int
|
||
User string
|
||
Password string
|
||
DBName string
|
||
}
|
||
|
||
func InitDB(config DBConfig) error {
|
||
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=true&loc=Local",
|
||
config.User, config.Password, config.Host, config.Port, config.DBName)
|
||
|
||
var err error
|
||
db, err = sql.Open("mysql", dsn)
|
||
if err != nil {
|
||
return fmt.Errorf("数据库连接失败: %v", err)
|
||
}
|
||
|
||
db.SetMaxOpenConns(20)
|
||
db.SetMaxIdleConns(10)
|
||
db.SetConnMaxLifetime(time.Hour)
|
||
|
||
if err = db.Ping(); err != nil {
|
||
return fmt.Errorf("数据库Ping失败: %v", err)
|
||
}
|
||
|
||
if err = createTables(); err != nil {
|
||
return fmt.Errorf("创建表失败: %v", err)
|
||
}
|
||
|
||
log.Println("数据库连接成功")
|
||
return nil
|
||
}
|
||
|
||
func CloseDB() {
|
||
if db != nil {
|
||
db.Close()
|
||
}
|
||
}
|
||
|
||
func createTables() error {
|
||
_, err := db.Exec(models.CreateWeatherDataTable)
|
||
if err != nil {
|
||
return fmt.Errorf("创建气象站数据表失败: %v", err)
|
||
}
|
||
|
||
_, err = db.Exec(models.CreateRainGaugeDataTable)
|
||
if err != nil {
|
||
return fmt.Errorf("创建雨量计数据表失败: %v", err)
|
||
}
|
||
|
||
// 创建EM3395TY相关表
|
||
_, err = db.Exec(models.CreateEM3395TYDevicesTable)
|
||
if err != nil {
|
||
return fmt.Errorf("创建EM3395TY设备表失败: %v", err)
|
||
}
|
||
|
||
_, err = db.Exec(models.CreateEM3395TYDataTable)
|
||
if err != nil {
|
||
return fmt.Errorf("创建EM3395TY数据表失败: %v", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func SaveWeatherData(data *models.WeatherData) (int64, error) {
|
||
result, err := db.Exec(models.InsertWeatherDataSQL,
|
||
data.Timestamp, data.WindSpeed, data.WindForce, data.WindDirection8, data.WindDirection360,
|
||
data.Humidity, data.Temperature, data.Noise, data.PM25, data.PM10, data.AtmPressure,
|
||
data.LuxHigh, data.LuxLow, data.LightIntensity, data.Rainfall, data.CompassAngle, data.SolarRadiation)
|
||
if err != nil {
|
||
return 0, fmt.Errorf("保存气象站数据失败: %v", err)
|
||
}
|
||
|
||
id, err := result.LastInsertId()
|
||
if err != nil {
|
||
return 0, fmt.Errorf("获取插入ID失败: %v", err)
|
||
}
|
||
|
||
return id, nil
|
||
}
|
||
|
||
func SaveRainGaugeData(data *models.RainGaugeData) (int64, error) {
|
||
fiveMinutesAgo := data.Timestamp.Add(-5 * time.Minute)
|
||
rows, err := db.Query(`
|
||
SELECT id, timestamp, total_rainfall
|
||
FROM rain_gauge_data
|
||
WHERE timestamp BETWEEN ? AND ?
|
||
ORDER BY timestamp DESC
|
||
`, fiveMinutesAgo, data.Timestamp)
|
||
|
||
if err != nil {
|
||
log.Printf("查询最近雨量计数据失败: %v", err)
|
||
} else {
|
||
defer rows.Close()
|
||
|
||
for rows.Next() {
|
||
var id int64
|
||
var ts time.Time
|
||
var rainfall float64
|
||
|
||
if err := rows.Scan(&id, &ts, &rainfall); err != nil {
|
||
log.Printf("扫描雨量计数据失败: %v", err)
|
||
continue
|
||
}
|
||
|
||
timeDiff := data.Timestamp.Sub(ts)
|
||
rainfallDiff := math.Abs(data.TotalRainfall - rainfall)
|
||
|
||
if timeDiff < time.Minute && rainfallDiff < 0.1 {
|
||
log.Printf("检测到重复的雨量计数据,跳过插入。ID=%d, 时间=%v", id, ts)
|
||
return id, nil
|
||
}
|
||
}
|
||
}
|
||
|
||
result, err := db.Exec(models.InsertRainGaugeDataSQL,
|
||
data.Timestamp, data.DailyRainfall, data.InstantRainfall, data.YesterdayRainfall,
|
||
data.TotalRainfall, data.HourlyRainfall, data.LastHourRainfall, data.Max24hRainfall,
|
||
data.Max24hRainfallPeriod, data.Min24hRainfall, data.Min24hRainfallPeriod)
|
||
if err != nil {
|
||
return 0, fmt.Errorf("保存雨量计数据失败: %v", err)
|
||
}
|
||
|
||
id, err := result.LastInsertId()
|
||
if err != nil {
|
||
return 0, fmt.Errorf("获取插入ID失败: %v", err)
|
||
}
|
||
log.Printf("雨量计数据已保存,ID=%d, 时间=%v", id, data.Timestamp)
|
||
|
||
return id, nil
|
||
}
|
||
|
||
func GetWeatherDataByTimeRange(start, end time.Time) ([]models.WeatherData, error) {
|
||
rows, err := db.Query(models.QueryWeatherDataByTimeRangeSQL, start, end)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("查询气象站数据失败: %v", err)
|
||
}
|
||
defer rows.Close()
|
||
|
||
var result []models.WeatherData
|
||
for rows.Next() {
|
||
var data models.WeatherData
|
||
err := rows.Scan(
|
||
&data.ID, &data.Timestamp, &data.WindSpeed, &data.WindForce, &data.WindDirection8,
|
||
&data.WindDirection360, &data.Humidity, &data.Temperature, &data.Noise, &data.PM25,
|
||
&data.PM10, &data.AtmPressure, &data.LuxHigh, &data.LuxLow, &data.LightIntensity,
|
||
&data.Rainfall, &data.CompassAngle, &data.SolarRadiation,
|
||
)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("扫描气象站数据失败: %v", err)
|
||
}
|
||
result = append(result, data)
|
||
}
|
||
|
||
return result, nil
|
||
}
|
||
|
||
func GetRainGaugeDataByTimeRange(start, end time.Time) ([]models.RainGaugeData, error) {
|
||
rows, err := db.Query(models.QueryRainGaugeDataByTimeRangeSQL, start, end)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("查询雨量计数据失败: %v", err)
|
||
}
|
||
defer rows.Close()
|
||
|
||
var result []models.RainGaugeData
|
||
for rows.Next() {
|
||
var data models.RainGaugeData
|
||
err := rows.Scan(
|
||
&data.ID, &data.Timestamp, &data.DailyRainfall, &data.InstantRainfall,
|
||
&data.YesterdayRainfall, &data.TotalRainfall, &data.HourlyRainfall,
|
||
&data.LastHourRainfall, &data.Max24hRainfall, &data.Max24hRainfallPeriod,
|
||
&data.Min24hRainfall, &data.Min24hRainfallPeriod,
|
||
)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("扫描雨量计数据失败: %v", err)
|
||
}
|
||
result = append(result, data)
|
||
}
|
||
|
||
return result, nil
|
||
}
|
||
|
||
func GetLatestWeatherData() (*models.WeatherData, error) {
|
||
row := db.QueryRow(models.QueryLatestWeatherDataSQL)
|
||
if row == nil {
|
||
return nil, fmt.Errorf("没有气象站数据")
|
||
}
|
||
|
||
data := &models.WeatherData{}
|
||
err := row.Scan(
|
||
&data.ID, &data.Timestamp, &data.WindSpeed, &data.WindForce, &data.WindDirection8,
|
||
&data.WindDirection360, &data.Humidity, &data.Temperature, &data.Noise, &data.PM25,
|
||
&data.PM10, &data.AtmPressure, &data.LuxHigh, &data.LuxLow, &data.LightIntensity,
|
||
&data.Rainfall, &data.CompassAngle, &data.SolarRadiation,
|
||
)
|
||
if err == sql.ErrNoRows {
|
||
return nil, nil
|
||
} else if err != nil {
|
||
return nil, fmt.Errorf("扫描最新气象站数据失败: %v", err)
|
||
}
|
||
|
||
return data, nil
|
||
}
|
||
|
||
func GetLatestRainGaugeData() (*models.RainGaugeData, error) {
|
||
row := db.QueryRow(models.QueryLatestRainGaugeDataSQL)
|
||
if row == nil {
|
||
return nil, fmt.Errorf("没有雨量计数据")
|
||
}
|
||
|
||
data := &models.RainGaugeData{}
|
||
err := row.Scan(
|
||
&data.ID, &data.Timestamp, &data.DailyRainfall, &data.InstantRainfall,
|
||
&data.YesterdayRainfall, &data.TotalRainfall, &data.HourlyRainfall,
|
||
&data.LastHourRainfall, &data.Max24hRainfall, &data.Max24hRainfallPeriod,
|
||
&data.Min24hRainfall, &data.Min24hRainfallPeriod,
|
||
)
|
||
if err == sql.ErrNoRows {
|
||
return nil, nil
|
||
} else if err != nil {
|
||
return nil, fmt.Errorf("扫描最新雨量计数据失败: %v", err)
|
||
}
|
||
|
||
return data, nil
|
||
}
|
||
|
||
func GetAggregatedData(start, end time.Time) ([]models.AggregatedData, error) {
|
||
log.Printf("GetAggregatedData调用 - 时间范围: 开始=%v, 结束=%v", start, end)
|
||
|
||
rows, err := db.Query(models.QueryAggregatedDataSQL, start, end, start, end)
|
||
if err != nil {
|
||
log.Printf("SQL查询失败: %v", err)
|
||
return nil, fmt.Errorf("查询聚合数据失败: %v", err)
|
||
}
|
||
defer rows.Close()
|
||
|
||
var result []models.AggregatedData
|
||
for rows.Next() {
|
||
var data models.AggregatedData
|
||
var timestampStr string
|
||
err := rows.Scan(
|
||
×tampStr, &data.Rainfall, &data.AvgTemperature, &data.AvgHumidity,
|
||
&data.AvgWindSpeed, &data.AtmPressure, &data.SolarRadiation,
|
||
)
|
||
if err != nil {
|
||
log.Printf("扫描行数据失败: %v", err)
|
||
return nil, fmt.Errorf("扫描聚合数据失败: %v", err)
|
||
}
|
||
|
||
data.FormattedTime = timestampStr
|
||
|
||
timestamp, err := time.Parse("2006-01-02 15:04:05", timestampStr)
|
||
if err != nil {
|
||
log.Printf("解析时间字符串失败: %v, 原始字符串: %s", err, timestampStr)
|
||
} else {
|
||
data.Timestamp = timestamp
|
||
}
|
||
|
||
result = append(result, data)
|
||
}
|
||
|
||
log.Printf("查询结果: 找到 %d 条记录", len(result))
|
||
return result, nil
|
||
}
|
||
|
||
func GetDB() *sql.DB {
|
||
return db
|
||
}
|