215 lines
5.5 KiB
Go
215 lines
5.5 KiB
Go
package model
|
||
|
||
import (
|
||
"database/sql"
|
||
"fmt"
|
||
"net"
|
||
"time"
|
||
|
||
"weatherstation/config"
|
||
|
||
_ "github.com/lib/pq"
|
||
)
|
||
|
||
var db *sql.DB
|
||
|
||
// DBDevice 数据库中的设备信息
|
||
type DBDevice struct {
|
||
StationID string
|
||
IP string
|
||
LastUpdate time.Time
|
||
DeviceType DeviceType
|
||
}
|
||
|
||
func InitDB() error {
|
||
cfg := config.GetConfig()
|
||
|
||
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
|
||
cfg.Database.Host, cfg.Database.Port, cfg.Database.User,
|
||
cfg.Database.Password, cfg.Database.DBName, cfg.Database.SSLMode)
|
||
|
||
var err error
|
||
db, err = sql.Open("postgres", connStr)
|
||
if err != nil {
|
||
return fmt.Errorf("无法连接到数据库: %v", err)
|
||
}
|
||
|
||
err = db.Ping()
|
||
if err != nil {
|
||
return fmt.Errorf("数据库连接测试失败: %v", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func CloseDB() {
|
||
if db != nil {
|
||
db.Close()
|
||
}
|
||
}
|
||
|
||
func ensureStationExists(stationID, password string, deviceType DeviceType) error {
|
||
if db == nil {
|
||
return fmt.Errorf("数据库未初始化")
|
||
}
|
||
|
||
var count int
|
||
err := db.QueryRow("SELECT COUNT(*) FROM stations WHERE station_id = $1", stationID).Scan(&count)
|
||
if err != nil {
|
||
return fmt.Errorf("查询站点失败: %v", err)
|
||
}
|
||
|
||
if count == 0 {
|
||
_, err = db.Exec("INSERT INTO stations (station_id, password, device_type) VALUES ($1, $2, $3)",
|
||
stationID, password, deviceType)
|
||
if err != nil {
|
||
return fmt.Errorf("添加站点失败: %v", err)
|
||
}
|
||
} else {
|
||
_, err = db.Exec("UPDATE stations SET password = $1, last_update = $2, device_type = $3 WHERE station_id = $4",
|
||
password, time.Now(), deviceType, stationID)
|
||
if err != nil {
|
||
return fmt.Errorf("更新站点失败: %v", err)
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// RegisterDeviceInDB 在数据库中注册设备
|
||
func RegisterDeviceInDB(stationID string, addr net.Addr) error {
|
||
if db == nil {
|
||
return fmt.Errorf("数据库未初始化")
|
||
}
|
||
|
||
ipStr := addr.String()
|
||
if ipStr == "" {
|
||
return fmt.Errorf("无效的IP地址")
|
||
}
|
||
|
||
ip, _, err := net.SplitHostPort(ipStr)
|
||
if err != nil {
|
||
return fmt.Errorf("解析IP地址失败: %v", err)
|
||
}
|
||
|
||
_, err = db.Exec(`
|
||
INSERT INTO device_ips (station_id, ip_address, last_update)
|
||
VALUES ($1, $2, $3)
|
||
ON CONFLICT (station_id)
|
||
DO UPDATE SET ip_address = $2, last_update = $3
|
||
`, stationID, ip, time.Now())
|
||
|
||
if err != nil {
|
||
return fmt.Errorf("注册设备IP失败: %v", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// GetOnlineDevicesFromDB 从数据库获取在线设备
|
||
func GetOnlineDevicesFromDB() []DBDevice {
|
||
if db == nil {
|
||
return nil
|
||
}
|
||
|
||
rows, err := db.Query(`
|
||
SELECT d.station_id, d.ip_address, d.last_update, s.device_type
|
||
FROM device_ips d
|
||
JOIN stations s ON d.station_id = s.station_id
|
||
WHERE d.last_update > $1
|
||
ORDER BY d.last_update DESC
|
||
`, time.Now().Add(-24*time.Hour))
|
||
|
||
if err != nil {
|
||
return nil
|
||
}
|
||
defer rows.Close()
|
||
|
||
var devices []DBDevice
|
||
for rows.Next() {
|
||
var d DBDevice
|
||
err := rows.Scan(&d.StationID, &d.IP, &d.LastUpdate, &d.DeviceType)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
devices = append(devices, d)
|
||
}
|
||
|
||
return devices
|
||
}
|
||
|
||
func SaveWeatherData(data *WeatherData, rawData string) error {
|
||
if db == nil {
|
||
return fmt.Errorf("数据库未初始化")
|
||
}
|
||
|
||
err := ensureStationExists(data.StationID, data.Password, DeviceTypeEcowitt)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
cst := time.FixedZone("CST", 8*60*60)
|
||
timestamp := time.Now().In(cst)
|
||
|
||
_, err = db.Exec(`
|
||
INSERT INTO weather_data (
|
||
station_id, timestamp, temp_f, humidity, dewpoint_f, windchill_f,
|
||
wind_dir, wind_speed_mph, wind_gust_mph, rain_in, daily_rain_in,
|
||
weekly_rain_in, monthly_rain_in, yearly_rain_in, total_rain_in,
|
||
solar_radiation, uv, indoor_temp_f, indoor_humidity,
|
||
abs_barometer_in, barometer_in, low_battery, raw_data
|
||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)`,
|
||
data.StationID, timestamp,
|
||
int(data.TempF*10), data.Humidity, int(data.DewpointF*10), int(data.WindchillF*10),
|
||
data.WindDir, int(data.WindSpeedMph*100), int(data.WindGustMph*100),
|
||
int(data.RainIn*1000), int(data.DailyRainIn*1000), int(data.WeeklyRainIn*1000),
|
||
int(data.MonthlyRainIn*1000), int(data.YearlyRainIn*1000), int(data.TotalRainIn*1000),
|
||
int(data.SolarRadiation*100), data.UV, int(data.IndoorTempF*10), data.IndoorHumidity,
|
||
int(data.AbsBarometerIn*1000), int(data.BarometerIn*1000), data.LowBattery, rawData)
|
||
|
||
if err != nil {
|
||
return fmt.Errorf("保存气象数据失败: %v", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func SaveWH65LPData(data *WH65LPData, rawData []byte) error {
|
||
if db == nil {
|
||
return fmt.Errorf("数据库未初始化")
|
||
}
|
||
|
||
// 确保设备存在,WH65LP设备没有密码
|
||
err := ensureStationExists(data.StationID, "", DeviceTypeWH65LP)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 插入数据
|
||
_, err = db.Exec(`
|
||
INSERT INTO wh65lp_data (
|
||
station_id, timestamp, temperature, humidity, wind_direction,
|
||
wind_speed, wind_gust, rain, uv_index, light, pressure,
|
||
low_battery, wsp_flag, raw_data
|
||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)`,
|
||
data.StationID, data.Timestamp,
|
||
int(data.Temperature*10), // 温度保存为整数,精确到0.1
|
||
data.Humidity,
|
||
data.WindDirection,
|
||
int(data.WindSpeed*100), // 风速保存为整数,精确到0.01
|
||
int(data.WindGust*100), // 阵风保存为整数,精确到0.01
|
||
int(data.Rain*1000), // 降雨量保存为整数,精确到0.001
|
||
data.UV,
|
||
int(data.Light*10), // 光照保存为整数,精确到0.1
|
||
int(data.Pressure*100), // 气压保存为整数,精确到0.01
|
||
data.LowBattery,
|
||
data.WSPFlag,
|
||
rawData)
|
||
|
||
if err != nil {
|
||
return fmt.Errorf("保存WH65LP数据失败: %v", err)
|
||
}
|
||
|
||
return nil
|
||
}
|