package model import ( "database/sql" "fmt" "net" "time" "weatherstation/config" _ "github.com/lib/pq" ) var db *sql.DB // DBDevice 数据库中的设备信息 type DBDevice struct { StationID string IP string LastUpdate time.Time DeviceType DeviceType } func InitDB() error { cfg := config.GetConfig() connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", cfg.Database.Host, cfg.Database.Port, cfg.Database.User, cfg.Database.Password, cfg.Database.DBName, cfg.Database.SSLMode) var err error db, err = sql.Open("postgres", connStr) if err != nil { return fmt.Errorf("无法连接到数据库: %v", err) } err = db.Ping() if err != nil { return fmt.Errorf("数据库连接测试失败: %v", err) } return nil } func CloseDB() { if db != nil { db.Close() } } func ensureStationExists(stationID, password string, deviceType DeviceType) error { if db == nil { return fmt.Errorf("数据库未初始化") } var count int err := db.QueryRow("SELECT COUNT(*) FROM stations WHERE station_id = $1", stationID).Scan(&count) if err != nil { return fmt.Errorf("查询站点失败: %v", err) } if count == 0 { _, err = db.Exec("INSERT INTO stations (station_id, password, device_type) VALUES ($1, $2, $3)", stationID, password, deviceType) if err != nil { return fmt.Errorf("添加站点失败: %v", err) } } else { _, err = db.Exec("UPDATE stations SET password = $1, last_update = $2, device_type = $3 WHERE station_id = $4", password, time.Now(), deviceType, stationID) if err != nil { return fmt.Errorf("更新站点失败: %v", err) } } return nil } // RegisterDeviceInDB 在数据库中注册设备 func RegisterDeviceInDB(stationID string, addr net.Addr) error { if db == nil { return fmt.Errorf("数据库未初始化") } ipStr := addr.String() if ipStr == "" { return fmt.Errorf("无效的IP地址") } ip, _, err := net.SplitHostPort(ipStr) if err != nil { return fmt.Errorf("解析IP地址失败: %v", err) } _, err = db.Exec(` INSERT INTO device_ips (station_id, ip_address, last_update) VALUES ($1, $2, $3) ON CONFLICT (station_id) DO UPDATE SET ip_address = $2, last_update = $3 `, stationID, ip, time.Now()) if err != nil { return fmt.Errorf("注册设备IP失败: %v", err) } return nil } // GetOnlineDevicesFromDB 从数据库获取在线设备 func GetOnlineDevicesFromDB() []DBDevice { if db == nil { return nil } rows, err := db.Query(` SELECT d.station_id, d.ip_address, d.last_update, s.device_type FROM device_ips d JOIN stations s ON d.station_id = s.station_id WHERE d.last_update > $1 ORDER BY d.last_update DESC `, time.Now().Add(-24*time.Hour)) if err != nil { return nil } defer rows.Close() var devices []DBDevice for rows.Next() { var d DBDevice err := rows.Scan(&d.StationID, &d.IP, &d.LastUpdate, &d.DeviceType) if err != nil { continue } devices = append(devices, d) } return devices } func SaveWeatherData(data *WeatherData, rawData string) error { if db == nil { return fmt.Errorf("数据库未初始化") } err := ensureStationExists(data.StationID, data.Password, DeviceTypeEcowitt) if err != nil { return err } cst := time.FixedZone("CST", 8*60*60) timestamp := time.Now().In(cst) _, err = db.Exec(` INSERT INTO weather_data ( station_id, timestamp, temp_f, humidity, dewpoint_f, windchill_f, wind_dir, wind_speed_mph, wind_gust_mph, rain_in, daily_rain_in, weekly_rain_in, monthly_rain_in, yearly_rain_in, total_rain_in, solar_radiation, uv, indoor_temp_f, indoor_humidity, abs_barometer_in, barometer_in, low_battery, raw_data ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)`, data.StationID, timestamp, int(data.TempF*10), data.Humidity, int(data.DewpointF*10), int(data.WindchillF*10), data.WindDir, int(data.WindSpeedMph*100), int(data.WindGustMph*100), int(data.RainIn*1000), int(data.DailyRainIn*1000), int(data.WeeklyRainIn*1000), int(data.MonthlyRainIn*1000), int(data.YearlyRainIn*1000), int(data.TotalRainIn*1000), int(data.SolarRadiation*100), data.UV, int(data.IndoorTempF*10), data.IndoorHumidity, int(data.AbsBarometerIn*1000), int(data.BarometerIn*1000), data.LowBattery, rawData) if err != nil { return fmt.Errorf("保存气象数据失败: %v", err) } return nil } func SaveWH65LPData(data *WH65LPData, rawData []byte) error { if db == nil { return fmt.Errorf("数据库未初始化") } // 确保设备存在,WH65LP设备没有密码 err := ensureStationExists(data.StationID, "", DeviceTypeWH65LP) if err != nil { return err } // 插入数据 _, err = db.Exec(` INSERT INTO wh65lp_data ( station_id, timestamp, temperature, humidity, wind_direction, wind_speed, wind_gust, rain, uv_index, light, pressure, low_battery, wsp_flag, raw_data ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)`, data.StationID, data.Timestamp, int(data.Temperature*10), // 温度保存为整数,精确到0.1 data.Humidity, data.WindDirection, int(data.WindSpeed*100), // 风速保存为整数,精确到0.01 int(data.WindGust*100), // 阵风保存为整数,精确到0.01 int(data.Rain*1000), // 降雨量保存为整数,精确到0.001 data.UV, int(data.Light*10), // 光照保存为整数,精确到0.1 int(data.Pressure*100), // 气压保存为整数,精确到0.01 data.LowBattery, data.WSPFlag, rawData) if err != nil { return fmt.Errorf("保存WH65LP数据失败: %v", err) } return nil }