diff --git a/cmd/weatherstation/main.go b/cmd/weatherstation/main.go new file mode 100644 index 0000000..371d2d2 --- /dev/null +++ b/cmd/weatherstation/main.go @@ -0,0 +1,64 @@ +package main + +import ( + "flag" + "log" + "sync" + "weatherstation/internal/database" + "weatherstation/internal/server" +) + +func main() { + // 命令行参数 + var webOnly = flag.Bool("web", false, "只启动Web服务器(Gin)") + var udpOnly = flag.Bool("udp", false, "只启动UDP服务器") + flag.Parse() + + // 设置日志 + server.SetupLogger() + + // 初始化数据库连接 + _ = database.GetDB() // 确保数据库连接已初始化 + defer database.Close() + + // 根据命令行参数启动服务 + if *webOnly { + // 只启动Web服务器 + log.Println("启动Web服务器模式...") + if err := server.StartGinServer(); err != nil { + log.Fatalf("启动Web服务器失败: %v", err) + } + } else if *udpOnly { + // 只启动UDP服务器 + log.Println("启动UDP服务器模式...") + if err := server.StartUDPServer(); err != nil { + log.Fatalf("启动UDP服务器失败: %v", err) + } + } else { + // 同时启动UDP和Web服务器 + log.Println("启动完整模式:UDP + Web服务器...") + + var wg sync.WaitGroup + wg.Add(2) + + // 启动UDP服务器 + go func() { + defer wg.Done() + log.Println("正在启动UDP服务器...") + if err := server.StartUDPServer(); err != nil { + log.Printf("UDP服务器异常退出: %v", err) + } + }() + + // 启动Web服务器 + go func() { + defer wg.Done() + log.Println("正在启动Web服务器...") + if err := server.StartGinServer(); err != nil { + log.Printf("Web服务器异常退出: %v", err) + } + }() + + wg.Wait() + } +} diff --git a/gin_server.go b/gin_server.go deleted file mode 100644 index 6495565..0000000 --- a/gin_server.go +++ /dev/null @@ -1,356 +0,0 @@ -package main - -import ( - "database/sql" - "fmt" - "net/http" - "strconv" - "time" - - "weatherstation/config" - - "github.com/gin-gonic/gin" - _ "github.com/lib/pq" -) - -var ginDB *sql.DB - -func initGinDB() error { - cfg := config.GetConfig() - - connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", - cfg.Database.Host, cfg.Database.Port, cfg.Database.User, - cfg.Database.Password, cfg.Database.DBName, cfg.Database.SSLMode) - - var err error - ginDB, err = sql.Open("postgres", connStr) - if err != nil { - return fmt.Errorf("无法连接到数据库: %v", err) - } - - err = ginDB.Ping() - if err != nil { - return fmt.Errorf("数据库连接测试失败: %v", err) - } - - return nil -} - -// 获取在线设备数量 -func getOnlineDevicesCount() int { - if ginDB == nil { - return 0 - } - - query := ` - SELECT COUNT(DISTINCT station_id) - FROM rs485_weather_data - WHERE timestamp > NOW() - INTERVAL '5 minutes'` - - var count int - err := ginDB.QueryRow(query).Scan(&count) - if err != nil { - return 0 - } - return count -} - -// 主页面处理器 -func indexHandler(c *gin.Context) { - data := PageData{ - Title: "英卓气象站", - ServerTime: time.Now().Format("2006-01-02 15:04:05"), - OnlineDevices: getOnlineDevicesCount(), - TiandituKey: "0c260b8a094a4e0bc507808812cefdac", - } - c.HTML(http.StatusOK, "index.html", data) -} - -// 系统状态API -func systemStatusHandler(c *gin.Context) { - status := SystemStatus{ - OnlineDevices: getOnlineDevicesCount(), - ServerTime: time.Now().Format("2006-01-02 15:04:05"), - } - c.JSON(http.StatusOK, status) -} - -// 获取WH65LP站点列表API -func getStationsHandler(c *gin.Context) { - query := ` - SELECT DISTINCT s.station_id, - COALESCE(s.password, '') as station_name, - 'WH65LP' as device_type, - COALESCE(MAX(r.timestamp), '1970-01-01'::timestamp) as last_update, - COALESCE(s.latitude, 0) as latitude, - COALESCE(s.longitude, 0) as longitude, - COALESCE(s.name, '') as name, - COALESCE(s.location, '') as location - FROM stations s - LEFT JOIN rs485_weather_data r ON s.station_id = r.station_id - WHERE s.station_id LIKE 'RS485-%' - GROUP BY s.station_id, s.password, s.latitude, s.longitude, s.name, s.location - ORDER BY s.station_id` - - rows, err := ginDB.Query(query) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "查询站点失败"}) - return - } - defer rows.Close() - - var stations []Station - for rows.Next() { - var station Station - var lastUpdate time.Time - err := rows.Scan( - &station.StationID, - &station.StationName, - &station.DeviceType, - &lastUpdate, - &station.Latitude, - &station.Longitude, - &station.Name, - &station.Location, - ) - if err != nil { - continue - } - station.LastUpdate = lastUpdate.Format("2006-01-02 15:04:05") - - // 从station_id中提取十六进制ID并转换为十进制 - if len(station.StationID) > 6 { - hexID := station.StationID[len(station.StationID)-6:] - if decimalID, err := strconv.ParseInt(hexID, 16, 64); err == nil { - station.DecimalID = strconv.FormatInt(decimalID, 10) - } - } - - stations = append(stations, station) - } - - c.JSON(http.StatusOK, stations) -} - -// 获取历史数据API -func getDataHandler(c *gin.Context) { - // 获取查询参数 - decimalID := c.Query("decimal_id") - startTime := c.Query("start_time") - endTime := c.Query("end_time") - interval := c.Query("interval") - - // 将十进制ID转换为十六进制(补足6位) - decimalNum, err := strconv.ParseInt(decimalID, 10, 64) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "无效的站点编号"}) - return - } - hexID := fmt.Sprintf("%06X", decimalNum) - stationID := fmt.Sprintf("RS485-%s", hexID) - - // 构建查询SQL(统一风向矢量平均,雨量为累计量的正增量求和) - var query string - switch interval { - case "10min": - query = ` - WITH base AS ( - SELECT - date_trunc('hour', timestamp) + (floor(date_part('minute', timestamp) / 10) * interval '10 minute') as time_group, - timestamp as ts, - temperature, humidity, pressure, wind_speed, wind_direction, rainfall, light, uv - FROM rs485_weather_data - WHERE station_id = $1 AND timestamp BETWEEN $2 AND $3 - ), - rain_inc AS ( - SELECT time_group, GREATEST(rainfall - LAG(rainfall) OVER (PARTITION BY time_group ORDER BY ts), 0) AS inc - FROM base - ), - rain_sum AS ( - SELECT time_group, SUM(inc) AS rainfall - FROM rain_inc - GROUP BY time_group - ), - grouped_data AS ( - SELECT - time_group, - AVG(temperature) as temperature, - AVG(humidity) as humidity, - AVG(pressure) as pressure, - AVG(wind_speed) as wind_speed, - DEGREES(ATAN2(AVG(SIN(RADIANS(wind_direction))), AVG(COS(RADIANS(wind_direction))))) AS wind_direction_raw, - AVG(light) as light, - AVG(uv) as uv - FROM base - GROUP BY time_group - ) - SELECT - to_char(g.time_group, 'YYYY-MM-DD HH24:MI:SS') as date_time, - ROUND(g.temperature::numeric, 2) as temperature, - ROUND(g.humidity::numeric, 2) as humidity, - ROUND(g.pressure::numeric, 2) as pressure, - ROUND(g.wind_speed::numeric, 2) as wind_speed, - ROUND((CASE WHEN g.wind_direction_raw < 0 THEN g.wind_direction_raw + 360 ELSE g.wind_direction_raw END)::numeric, 2) AS wind_direction, - ROUND(COALESCE(r.rainfall, 0)::numeric, 3) as rainfall, - ROUND(g.light::numeric, 2) as light, - ROUND(g.uv::numeric, 2) as uv - FROM grouped_data g - LEFT JOIN rain_sum r ON r.time_group = g.time_group - ORDER BY g.time_group` - case "30min": - query = ` - WITH base AS ( - SELECT - date_trunc('hour', timestamp) + (floor(date_part('minute', timestamp) / 30) * interval '30 minute') as time_group, - timestamp as ts, - temperature, humidity, pressure, wind_speed, wind_direction, rainfall, light, uv - FROM rs485_weather_data - WHERE station_id = $1 AND timestamp BETWEEN $2 AND $3 - ), - rain_inc AS ( - SELECT time_group, GREATEST(rainfall - LAG(rainfall) OVER (PARTITION BY time_group ORDER BY ts), 0) AS inc - FROM base - ), - rain_sum AS ( - SELECT time_group, SUM(inc) AS rainfall - FROM rain_inc - GROUP BY time_group - ), - grouped_data AS ( - SELECT - time_group, - AVG(temperature) as temperature, - AVG(humidity) as humidity, - AVG(pressure) as pressure, - AVG(wind_speed) as wind_speed, - DEGREES(ATAN2(AVG(SIN(RADIANS(wind_direction))), AVG(COS(RADIANS(wind_direction))))) AS wind_direction_raw, - AVG(light) as light, - AVG(uv) as uv - FROM base - GROUP BY time_group - ) - SELECT - to_char(g.time_group, 'YYYY-MM-DD HH24:MI:SS') as date_time, - ROUND(g.temperature::numeric, 2) as temperature, - ROUND(g.humidity::numeric, 2) as humidity, - ROUND(g.pressure::numeric, 2) as pressure, - ROUND(g.wind_speed::numeric, 2) as wind_speed, - ROUND((CASE WHEN g.wind_direction_raw < 0 THEN g.wind_direction_raw + 360 ELSE g.wind_direction_raw END)::numeric, 2) AS wind_direction, - ROUND(COALESCE(r.rainfall, 0)::numeric, 3) as rainfall, - ROUND(g.light::numeric, 2) as light, - ROUND(g.uv::numeric, 2) as uv - FROM grouped_data g - LEFT JOIN rain_sum r ON r.time_group = g.time_group - ORDER BY g.time_group` - default: // 1hour - query = ` - WITH base AS ( - SELECT - date_trunc('hour', timestamp) as time_group, - timestamp as ts, - temperature, humidity, pressure, wind_speed, wind_direction, rainfall, light, uv - FROM rs485_weather_data - WHERE station_id = $1 AND timestamp BETWEEN $2 AND $3 - ), - rain_inc AS ( - SELECT time_group, GREATEST(rainfall - LAG(rainfall) OVER (PARTITION BY time_group ORDER BY ts), 0) AS inc - FROM base - ), - rain_sum AS ( - SELECT time_group, SUM(inc) AS rainfall - FROM rain_inc - GROUP BY time_group - ), - grouped_data AS ( - SELECT - time_group, - AVG(temperature) as temperature, - AVG(humidity) as humidity, - AVG(pressure) as pressure, - AVG(wind_speed) as wind_speed, - DEGREES(ATAN2(AVG(SIN(RADIANS(wind_direction))), AVG(COS(RADIANS(wind_direction))))) AS wind_direction_raw, - AVG(light) as light, - AVG(uv) as uv - FROM base - GROUP BY time_group - ) - SELECT - to_char(g.time_group, 'YYYY-MM-DD HH24:MI:SS') as date_time, - ROUND(g.temperature::numeric, 2) as temperature, - ROUND(g.humidity::numeric, 2) as humidity, - ROUND(g.pressure::numeric, 2) as pressure, - ROUND(g.wind_speed::numeric, 2) as wind_speed, - ROUND((CASE WHEN g.wind_direction_raw < 0 THEN g.wind_direction_raw + 360 ELSE g.wind_direction_raw END)::numeric, 2) AS wind_direction, - ROUND(COALESCE(r.rainfall, 0)::numeric, 3) as rainfall, - ROUND(g.light::numeric, 2) as light, - ROUND(g.uv::numeric, 2) as uv - FROM grouped_data g - LEFT JOIN rain_sum r ON r.time_group = g.time_group - ORDER BY g.time_group` - } - - // 执行查询 - rows, err := ginDB.Query(query, stationID, startTime, endTime) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "查询数据失败"}) - return - } - defer rows.Close() - - var weatherPoints []WeatherPoint - for rows.Next() { - var point WeatherPoint - err := rows.Scan( - &point.DateTime, - &point.Temperature, - &point.Humidity, - &point.Pressure, - &point.WindSpeed, - &point.WindDir, - &point.Rainfall, - &point.Light, - &point.UV, - ) - if err != nil { - continue - } - weatherPoints = append(weatherPoints, point) - } - - c.JSON(http.StatusOK, weatherPoints) -} - -func StartGinServer() { - err := initGinDB() - if err != nil { - fmt.Printf("初始化Gin数据库连接失败: %v\n", err) - return - } - - // 设置Gin模式 - gin.SetMode(gin.ReleaseMode) - - // 创建Gin引擎 - r := gin.Default() - - // 加载HTML模板 - r.LoadHTMLGlob("templates/*") - - // 静态文件服务 - r.Static("/static", "./static") - - // 路由设置 - r.GET("/", indexHandler) - - // API路由组 - api := r.Group("/api") - { - api.GET("/system/status", systemStatusHandler) - api.GET("/stations", getStationsHandler) - api.GET("/data", getDataHandler) - } - - // 启动服务器 - fmt.Println("Gin Web服务器启动,监听端口 10003...") - r.Run(":10003") -} diff --git a/go.mod b/go.mod index 76bfe67..31efa88 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23.0 toolchain go1.24.5 require ( + github.com/gin-gonic/gin v1.10.1 github.com/lib/pq v1.10.9 gopkg.in/yaml.v3 v3.0.1 ) @@ -16,7 +17,6 @@ require ( github.com/cloudwego/iasm v0.2.0 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gin-contrib/sse v0.1.0 // indirect - github.com/gin-gonic/gin v1.10.1 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.20.0 // indirect @@ -36,4 +36,4 @@ require ( golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect google.golang.org/protobuf v1.34.1 // indirect -) +) \ No newline at end of file diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..378cc03 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,87 @@ +package config + +import ( + "fmt" + "os" + "path/filepath" + "sync" + + "gopkg.in/yaml.v3" +) + +type ServerConfig struct { + WebPort int `yaml:"web_port"` // Gin Web服务器端口 + UDPPort int `yaml:"udp_port"` // UDP服务器端口 +} + +type DatabaseConfig struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + User string `yaml:"user"` + Password string `yaml:"password"` + DBName string `yaml:"dbname"` + SSLMode string `yaml:"sslmode"` +} + +type Config struct { + Server ServerConfig `yaml:"server"` + Database DatabaseConfig `yaml:"database"` +} + +var ( + instance *Config + once sync.Once +) + +// GetConfig 返回配置单例 +func GetConfig() *Config { + once.Do(func() { + instance = &Config{} + if err := instance.loadConfig(); err != nil { + panic(fmt.Sprintf("加载配置文件失败: %v", err)) + } + }) + return instance +} + +// loadConfig 从配置文件加载配置 +func (c *Config) loadConfig() error { + // 尝试多个位置查找配置文件 + configPaths := []string{ + "config.yaml", // 当前目录 + "../config.yaml", // 上级目录 + "../../config.yaml", // 项目根目录 + filepath.Join(os.Getenv("HOME"), ".weatherstation/config.yaml"), // 用户目录 + } + + var data []byte + var err error + for _, path := range configPaths { + if data, err = os.ReadFile(path); err == nil { + break + } + } + if err != nil { + return fmt.Errorf("未找到配置文件: %v", err) + } + + if err := yaml.Unmarshal(data, c); err != nil { + return fmt.Errorf("解析配置文件失败: %v", err) + } + + return c.validate() +} + +// validate 验证配置有效性 +func (c *Config) validate() error { + if c.Server.WebPort <= 0 { + c.Server.WebPort = 10003 // 默认Web端口 + } + if c.Server.UDPPort <= 0 { + c.Server.UDPPort = 10001 // 默认UDP端口 + } + if c.Database.SSLMode == "" { + c.Database.SSLMode = "disable" // 默认禁用SSL + } + return nil +} diff --git a/internal/database/db.go b/internal/database/db.go new file mode 100644 index 0000000..d735ea9 --- /dev/null +++ b/internal/database/db.go @@ -0,0 +1,50 @@ +package database + +import ( + "database/sql" + "fmt" + "sync" + "weatherstation/internal/config" + + _ "github.com/lib/pq" +) + +var ( + instance *sql.DB + once sync.Once +) + +// GetDB 返回数据库连接单例 +func GetDB() *sql.DB { + once.Do(func() { + cfg := config.GetConfig() + connStr := fmt.Sprintf( + "host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", + cfg.Database.Host, + cfg.Database.Port, + cfg.Database.User, + cfg.Database.Password, + cfg.Database.DBName, + cfg.Database.SSLMode, + ) + + var err error + instance, err = sql.Open("postgres", connStr) + if err != nil { + panic(fmt.Sprintf("无法连接到数据库: %v", err)) + } + + if err = instance.Ping(); err != nil { + panic(fmt.Sprintf("数据库连接测试失败: %v", err)) + } + }) + return instance +} + +// Close 关闭数据库连接 +func Close() error { + if instance != nil { + return instance.Close() + } + return nil +} diff --git a/internal/database/models.go b/internal/database/models.go new file mode 100644 index 0000000..db013fa --- /dev/null +++ b/internal/database/models.go @@ -0,0 +1,161 @@ +package database + +import ( + "database/sql" + "time" + "weatherstation/pkg/types" +) + +// GetOnlineDevicesCount 获取在线设备数量 +func GetOnlineDevicesCount(db *sql.DB) int { + query := ` + SELECT COUNT(DISTINCT station_id) + FROM rs485_weather_data + WHERE timestamp > NOW() - INTERVAL '5 minutes'` + + var count int + if err := db.QueryRow(query).Scan(&count); err != nil { + return 0 + } + return count +} + +// GetStations 获取所有WH65LP站点列表 +func GetStations(db *sql.DB) ([]types.Station, error) { + query := ` + SELECT DISTINCT s.station_id, + COALESCE(s.password, '') as station_name, + 'WH65LP' as device_type, + COALESCE(MAX(r.timestamp), '1970-01-01'::timestamp) as last_update, + COALESCE(s.latitude, 0) as latitude, + COALESCE(s.longitude, 0) as longitude, + COALESCE(s.name, '') as name, + COALESCE(s.location, '') as location + FROM stations s + LEFT JOIN rs485_weather_data r ON s.station_id = r.station_id + WHERE s.station_id LIKE 'RS485-%' + GROUP BY s.station_id, s.password, s.latitude, s.longitude, s.name, s.location + ORDER BY s.station_id` + + rows, err := db.Query(query) + if err != nil { + return nil, err + } + defer rows.Close() + + var stations []types.Station + for rows.Next() { + var station types.Station + var lastUpdate time.Time + err := rows.Scan( + &station.StationID, + &station.StationName, + &station.DeviceType, + &lastUpdate, + &station.Latitude, + &station.Longitude, + &station.Name, + &station.Location, + ) + if err != nil { + continue + } + station.LastUpdate = lastUpdate.Format("2006-01-02 15:04:05") + stations = append(stations, station) + } + + return stations, nil +} + +// GetWeatherData 获取指定站点的历史天气数据 +func GetWeatherData(db *sql.DB, stationID string, startTime, endTime time.Time, interval string) ([]types.WeatherPoint, error) { + // 构建查询SQL(统一风向矢量平均,雨量为累计量的正增量求和) + var query string + var intervalStr string + switch interval { + case "10min": + intervalStr = "10 minutes" + case "30min": + intervalStr = "30 minutes" + default: // 1hour + intervalStr = "1 hour" + } + query = buildWeatherDataQuery(intervalStr) + + rows, err := db.Query(query, intervalStr, stationID, startTime, endTime) + if err != nil { + return nil, err + } + defer rows.Close() + + var points []types.WeatherPoint + for rows.Next() { + var point types.WeatherPoint + err := rows.Scan( + &point.DateTime, + &point.Temperature, + &point.Humidity, + &point.Pressure, + &point.WindSpeed, + &point.WindDir, + &point.Rainfall, + &point.Light, + &point.UV, + ) + if err != nil { + continue + } + points = append(points, point) + } + + return points, nil +} + +// buildWeatherDataQuery 构建天气数据查询SQL +func buildWeatherDataQuery(interval string) string { + return ` + WITH base AS ( + SELECT + date_trunc('hour', timestamp) + + (floor(date_part('minute', timestamp) / extract(epoch from $1::interval) * 60) * $1::interval) as time_group, + timestamp as ts, + temperature, humidity, pressure, wind_speed, wind_direction, rainfall, light, uv + FROM rs485_weather_data + WHERE station_id = $2 AND timestamp BETWEEN $3 AND $4 + ), + rain_inc AS ( + SELECT time_group, GREATEST(rainfall - LAG(rainfall) OVER (PARTITION BY time_group ORDER BY ts), 0) AS inc + FROM base + ), + rain_sum AS ( + SELECT time_group, SUM(inc) AS rainfall + FROM rain_inc + GROUP BY time_group + ), + grouped_data AS ( + SELECT + time_group, + AVG(temperature) as temperature, + AVG(humidity) as humidity, + AVG(pressure) as pressure, + AVG(wind_speed) as wind_speed, + DEGREES(ATAN2(AVG(SIN(RADIANS(wind_direction))), AVG(COS(RADIANS(wind_direction))))) AS wind_direction_raw, + AVG(light) as light, + AVG(uv) as uv + FROM base + GROUP BY time_group + ) + SELECT + to_char(g.time_group, 'YYYY-MM-DD HH24:MI:SS') as date_time, + ROUND(g.temperature::numeric, 2) as temperature, + ROUND(g.humidity::numeric, 2) as humidity, + ROUND(g.pressure::numeric, 2) as pressure, + ROUND(g.wind_speed::numeric, 2) as wind_speed, + ROUND((CASE WHEN g.wind_direction_raw < 0 THEN g.wind_direction_raw + 360 ELSE g.wind_direction_raw END)::numeric, 2) AS wind_direction, + ROUND(COALESCE(r.rainfall, 0)::numeric, 3) as rainfall, + ROUND(g.light::numeric, 2) as light, + ROUND(g.uv::numeric, 2) as uv + FROM grouped_data g + LEFT JOIN rain_sum r ON r.time_group = g.time_group + ORDER BY g.time_group` +} diff --git a/internal/server/gin.go b/internal/server/gin.go new file mode 100644 index 0000000..8a1ffa0 --- /dev/null +++ b/internal/server/gin.go @@ -0,0 +1,130 @@ +package server + +import ( + "fmt" + "net/http" + "strconv" + "time" + "weatherstation/internal/config" + "weatherstation/internal/database" + "weatherstation/pkg/types" + + "github.com/gin-gonic/gin" +) + +// StartGinServer 启动Gin Web服务器 +func StartGinServer() error { + // 设置Gin模式 + gin.SetMode(gin.ReleaseMode) + + // 创建Gin引擎 + r := gin.Default() + + // 加载HTML模板 + r.LoadHTMLGlob("templates/*") + + // 静态文件服务 + r.Static("/static", "./static") + + // 路由设置 + r.GET("/", indexHandler) + + // API路由组 + api := r.Group("/api") + { + api.GET("/system/status", systemStatusHandler) + api.GET("/stations", getStationsHandler) + api.GET("/data", getDataHandler) + } + + // 获取配置的Web端口 + port := config.GetConfig().Server.WebPort + if port == 0 { + port = 10003 // 默认端口 + } + + // 启动服务器 + fmt.Printf("Gin Web服务器启动,监听端口 %d...\n", port) + return r.Run(fmt.Sprintf(":%d", port)) +} + +// indexHandler 处理主页请求 +func indexHandler(c *gin.Context) { + data := types.PageData{ + Title: "英卓气象站", + ServerTime: time.Now().Format("2006-01-02 15:04:05"), + OnlineDevices: database.GetOnlineDevicesCount(database.GetDB()), + TiandituKey: "0c260b8a094a4e0bc507808812cefdac", + } + c.HTML(http.StatusOK, "index.html", data) +} + +// systemStatusHandler 处理系统状态API请求 +func systemStatusHandler(c *gin.Context) { + status := types.SystemStatus{ + OnlineDevices: database.GetOnlineDevicesCount(database.GetDB()), + ServerTime: time.Now().Format("2006-01-02 15:04:05"), + } + c.JSON(http.StatusOK, status) +} + +// getStationsHandler 处理获取站点列表API请求 +func getStationsHandler(c *gin.Context) { + stations, err := database.GetStations(database.GetDB()) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "查询站点失败"}) + return + } + + // 为每个站点计算十进制ID + for i := range stations { + if len(stations[i].StationID) > 6 { + hexID := stations[i].StationID[len(stations[i].StationID)-6:] + if decimalID, err := strconv.ParseInt(hexID, 16, 64); err == nil { + stations[i].DecimalID = strconv.FormatInt(decimalID, 10) + } + } + } + + c.JSON(http.StatusOK, stations) +} + +// getDataHandler 处理获取历史数据API请求 +func getDataHandler(c *gin.Context) { + // 获取查询参数 + decimalID := c.Query("decimal_id") + startTime := c.Query("start_time") + endTime := c.Query("end_time") + interval := c.Query("interval") + + // 将十进制ID转换为十六进制(补足6位) + decimalNum, err := strconv.ParseInt(decimalID, 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "无效的站点编号"}) + return + } + hexID := fmt.Sprintf("%06X", decimalNum) + stationID := fmt.Sprintf("RS485-%s", hexID) + + // 解析时间 + start, err := time.Parse("2006-01-02 15:04:05", startTime) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "无效的开始时间"}) + return + } + + end, err := time.Parse("2006-01-02 15:04:05", endTime) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "无效的结束时间"}) + return + } + + // 获取数据 + points, err := database.GetWeatherData(database.GetDB(), stationID, start, end, interval) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "查询数据失败"}) + return + } + + c.JSON(http.StatusOK, points) +} diff --git a/udp_server.go b/internal/server/udp.go similarity index 53% rename from udp_server.go rename to internal/server/udp.go index fd72c2a..41cc302 100644 --- a/udp_server.go +++ b/internal/server/udp.go @@ -1,4 +1,4 @@ -package main +package server import ( "bufio" @@ -13,19 +13,21 @@ import ( "sync" "time" "unicode/utf8" - - "weatherstation/config" + "weatherstation/internal/config" "weatherstation/model" ) +// UTF8Writer 包装一个io.Writer,确保写入的数据是有效的UTF-8 type UTF8Writer struct { w io.Writer } +// NewUTF8Writer 创建一个新的UTF8Writer func NewUTF8Writer(w io.Writer) *UTF8Writer { return &UTF8Writer{w: w} } +// Write 实现io.Writer接口 func (w *UTF8Writer) Write(p []byte) (n int, err error) { if utf8.Valid(p) { return w.w.Write(p) @@ -41,11 +43,13 @@ var ( currentLogDay int ) +// getLogFileName 获取当前日期的日志文件名 func getLogFileName() string { currentTime := time.Now() return filepath.Join("log", fmt.Sprintf("%s.log", currentTime.Format("2006-01-02"))) } +// openLogFile 打开日志文件 func openLogFile() (*os.File, error) { logDir := "log" if _, err := os.Stat(logDir); os.IsNotExist(err) { @@ -56,7 +60,8 @@ func openLogFile() (*os.File, error) { return os.OpenFile(logFileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) } -func setupLogger() { +// SetupLogger 设置日志系统 +func SetupLogger() { var err error logFile, err = openLogFile() if err != nil { @@ -99,21 +104,19 @@ func setupLogger() { log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) } -func startUDP() { +// StartUDPServer 启动UDP服务器 +func StartUDPServer() error { cfg := config.GetConfig() - err := model.InitDB() - if err != nil { - log.Fatalf("初始化数据库失败: %v", err) - } - defer model.CloseDB() addr := fmt.Sprintf(":%d", cfg.Server.UDPPort) conn, err := net.ListenPacket("udp", addr) if err != nil { - log.Fatalf("无法监听UDP端口 %d: %v", cfg.Server.UDPPort, err) + return fmt.Errorf("无法监听UDP端口 %d: %v", cfg.Server.UDPPort, err) } defer conn.Close() + log.Printf("UDP服务器已启动,监听端口 %d...", cfg.Server.UDPPort) buffer := make([]byte, 2048) + for { n, addr, err := conn.ReadFrom(buffer) if err != nil { @@ -129,88 +132,98 @@ func startUDP() { log.Printf("ASCII码:\n%s", asciiDump) if len(rawData) == 25 && rawData[0] == 0x24 { - log.Println("485 型气象站数据") - - // 生成源码字符串(用于日志记录) - sourceHex := strings.ReplaceAll(strings.TrimSpace(hexDump), "\n", " ") - log.Printf("源码: %s", sourceHex) - - // 解析RS485数据 - protocol := model.NewProtocol(rawData) - rs485Protocol := model.NewRS485Protocol(rawData) - - // 获取设备ID - idParts, err := protocol.GetCompleteID() - if err != nil { - log.Printf("获取设备ID失败: %v", err) - continue - } - - // 解析RS485数据 - rs485Data, err := rs485Protocol.ParseRS485Data() - if err != nil { - log.Printf("解析RS485数据失败: %v", err) - continue - } - - // 添加设备ID和时间戳 - rs485Data.DeviceID = idParts.Complete.Hex - rs485Data.ReceivedAt = time.Now() - rs485Data.RawDataHex = sourceHex - - // 打印解析结果到日志 - log.Println("=== RS485 ===") - log.Printf("设备ID: RS485-%s", rs485Data.DeviceID) - log.Printf("温度: %.2f°C", rs485Data.Temperature) - log.Printf("湿度: %.1f%%", rs485Data.Humidity) - log.Printf("风速: %.5f m/s", rs485Data.WindSpeed) - log.Printf("风向: %.1f°", rs485Data.WindDirection) - log.Printf("降雨量: %.3f mm", rs485Data.Rainfall) - log.Printf("光照: %.1f lux", rs485Data.Light) - log.Printf("紫外线: %.1f", rs485Data.UV) - log.Printf("气压: %.2f hPa", rs485Data.Pressure) - log.Printf("接收时间: %s", rs485Data.ReceivedAt.Format("2006-01-02 15:04:05")) - - // 注册设备 - stationID := fmt.Sprintf("RS485-%s", rs485Data.DeviceID) - model.RegisterDevice(stationID, addr) - log.Printf("设备 %s 已注册,IP: %s", stationID, addr.String()) - - // 保存到数据库 - err = model.SaveWeatherData(rs485Data, string(rawData)) - if err != nil { - log.Printf("保存数据到数据库失败: %v", err) - } else { - log.Printf("数据已成功保存到数据库") - } - + handleRS485Data(rawData, addr, hexDump) } else { - // 尝试解析WIFI数据 - data, deviceType, err := model.ParseData(rawData) - if err != nil { - log.Printf("解析数据失败: %v", err) - continue - } + handleWiFiData(rawData, addr) + } + } +} - log.Println("成功解析气象站数据:") - log.Printf("设备类型: %s", getDeviceTypeString(deviceType)) - log.Println(data) +// handleRS485Data 处理RS485设备数据 +func handleRS485Data(rawData []byte, addr net.Addr, hexDump string) { + log.Println("485 型气象站数据") - if deviceType == model.DeviceTypeWIFI { - if wifiData, ok := data.(*model.WeatherData); ok { - stationID := wifiData.StationID - if stationID != "" { - model.RegisterDevice(stationID, addr) - log.Printf("设备 %s 已注册,IP: %s", stationID, addr.String()) - } else { - log.Printf("警告: 收到的数据没有站点ID") - } - } + // 生成源码字符串(用于日志记录) + sourceHex := strings.ReplaceAll(strings.TrimSpace(hexDump), "\n", " ") + log.Printf("源码: %s", sourceHex) + + // 解析RS485数据 + protocol := model.NewProtocol(rawData) + rs485Protocol := model.NewRS485Protocol(rawData) + + // 获取设备ID + idParts, err := protocol.GetCompleteID() + if err != nil { + log.Printf("获取设备ID失败: %v", err) + return + } + + // 解析RS485数据 + rs485Data, err := rs485Protocol.ParseRS485Data() + if err != nil { + log.Printf("解析RS485数据失败: %v", err) + return + } + + // 添加设备ID和时间戳 + rs485Data.DeviceID = idParts.Complete.Hex + rs485Data.ReceivedAt = time.Now() + rs485Data.RawDataHex = sourceHex + + // 打印解析结果到日志 + log.Println("=== RS485 ===") + log.Printf("设备ID: RS485-%s", rs485Data.DeviceID) + log.Printf("温度: %.2f°C", rs485Data.Temperature) + log.Printf("湿度: %.1f%%", rs485Data.Humidity) + log.Printf("风速: %.5f m/s", rs485Data.WindSpeed) + log.Printf("风向: %.1f°", rs485Data.WindDirection) + log.Printf("降雨量: %.3f mm", rs485Data.Rainfall) + log.Printf("光照: %.1f lux", rs485Data.Light) + log.Printf("紫外线: %.1f", rs485Data.UV) + log.Printf("气压: %.2f hPa", rs485Data.Pressure) + log.Printf("接收时间: %s", rs485Data.ReceivedAt.Format("2006-01-02 15:04:05")) + + // 注册设备 + stationID := fmt.Sprintf("RS485-%s", rs485Data.DeviceID) + model.RegisterDevice(stationID, addr) + log.Printf("设备 %s 已注册,IP: %s", stationID, addr.String()) + + // 保存到数据库 + err = model.SaveWeatherData(rs485Data, string(rawData)) + if err != nil { + log.Printf("保存数据到数据库失败: %v", err) + } else { + log.Printf("数据已成功保存到数据库") + } +} + +// handleWiFiData 处理WiFi设备数据 +func handleWiFiData(rawData []byte, addr net.Addr) { + // 尝试解析WIFI数据 + data, deviceType, err := model.ParseData(rawData) + if err != nil { + log.Printf("解析数据失败: %v", err) + return + } + + log.Println("成功解析气象站数据:") + log.Printf("设备类型: %s", getDeviceTypeString(deviceType)) + log.Println(data) + + if deviceType == model.DeviceTypeWIFI { + if wifiData, ok := data.(*model.WeatherData); ok { + stationID := wifiData.StationID + if stationID != "" { + model.RegisterDevice(stationID, addr) + log.Printf("设备 %s 已注册,IP: %s", stationID, addr.String()) + } else { + log.Printf("警告: 收到的数据没有站点ID") } } } } +// getDeviceTypeString 获取设备类型字符串 func getDeviceTypeString(deviceType model.DeviceType) string { switch deviceType { case model.DeviceTypeWIFI: @@ -222,6 +235,7 @@ func getDeviceTypeString(deviceType model.DeviceType) string { } } +// hexDump 生成十六进制转储 func hexDump(data []byte) string { var result strings.Builder for i := 0; i < len(data); i += 16 { @@ -242,6 +256,7 @@ func hexDump(data []byte) string { return result.String() } +// asciiDump 生成ASCII转储 func asciiDump(data []byte) string { var result strings.Builder for i := 0; i < len(data); i += 64 { diff --git a/launcher.go b/launcher.go deleted file mode 100644 index 1188190..0000000 --- a/launcher.go +++ /dev/null @@ -1,54 +0,0 @@ -package main - -import ( - "flag" - "log" - "sync" -) - -func main() { - var webOnly = flag.Bool("web", false, "只启动Web服务器(原生http)") - var ginOnly = flag.Bool("gin", false, "只启动Gin Web服务器") - var udpOnly = flag.Bool("udp", false, "只启动UDP服务器") - flag.Parse() - - // 设置日志 - setupLogger() - - var wg sync.WaitGroup - - if *webOnly { - // 只启动原生Web服务器 - log.Println("启动原生Web服务器模式...") - StartWebServer() - } else if *ginOnly { - // 只启动Gin Web服务器 - log.Println("启动Gin Web服务器模式...") - StartGinServer() - } else if *udpOnly { - // 只启动UDP服务器 - log.Println("启动UDP服务器模式...") - startUDP() - } else { - // 同时启动UDP和Gin Web服务器 - log.Println("启动完整模式:UDP + Gin Web服务器...") - - wg.Add(2) - - // 启动UDP服务器 - go func() { - defer wg.Done() - log.Println("正在启动UDP服务器...") - startUDP() - }() - - // 启动Gin Web服务器 - go func() { - defer wg.Done() - log.Println("正在启动Gin Web服务器...") - StartGinServer() - }() - - wg.Wait() - } -} diff --git a/types.go b/pkg/types/types.go similarity index 98% rename from types.go rename to pkg/types/types.go index 12169d5..f930099 100644 --- a/types.go +++ b/pkg/types/types.go @@ -1,4 +1,4 @@ -package main +package types // Station 站点信息 type Station struct { diff --git a/web_server.go b/web_server.go deleted file mode 100644 index 5a5994d..0000000 --- a/web_server.go +++ /dev/null @@ -1,251 +0,0 @@ -package main - -import ( - "database/sql" - "encoding/json" - "fmt" - "log" - "net/http" - "path/filepath" - "strings" - "time" - - "weatherstation/config" - - _ "github.com/lib/pq" -) - -var db *sql.DB - -func initWebDB() error { - cfg := config.GetConfig() - - connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", - cfg.Database.Host, cfg.Database.Port, cfg.Database.User, - cfg.Database.Password, cfg.Database.DBName, cfg.Database.SSLMode) - - var err error - db, err = sql.Open("postgres", connStr) - if err != nil { - return fmt.Errorf("无法连接到数据库: %v", err) - } - - err = db.Ping() - if err != nil { - return fmt.Errorf("数据库连接测试失败: %v", err) - } - - return nil -} - -// 获取WH65LP站点列表 -func getWH65LPStations(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.Header().Set("Access-Control-Allow-Origin", "*") - - query := ` - SELECT DISTINCT s.station_id, - COALESCE(s.password, '') as station_name, - 'WH65LP' as device_type, - COALESCE(MAX(r.timestamp), '1970-01-01'::timestamp) as last_update - FROM stations s - LEFT JOIN rs485_weather_data r ON s.station_id = r.station_id - WHERE s.station_id LIKE 'RS485-%' - GROUP BY s.station_id, s.password - ORDER BY s.station_id` - - rows, err := db.Query(query) - if err != nil { - http.Error(w, "查询站点失败", http.StatusInternalServerError) - log.Printf("查询站点失败: %v", err) - return - } - defer rows.Close() - - var stations []Station - for rows.Next() { - var station Station - var lastUpdate time.Time - err := rows.Scan(&station.StationID, &station.StationName, &station.DeviceType, &lastUpdate) - if err != nil { - log.Printf("扫描站点数据失败: %v", err) - continue - } - station.LastUpdate = lastUpdate.Format("2006-01-02 15:04:05") - stations = append(stations, station) - } - - json.NewEncoder(w).Encode(stations) -} - -// 获取站点历史数据 -func getStationData(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.Header().Set("Access-Control-Allow-Origin", "*") - - stationID := r.URL.Query().Get("station_id") - startTime := r.URL.Query().Get("start_time") - endTime := r.URL.Query().Get("end_time") - interval := r.URL.Query().Get("interval") - - if stationID == "" || startTime == "" || endTime == "" { - http.Error(w, "缺少必要参数", http.StatusBadRequest) - return - } - - // 默认间隔为1小时 - if interval == "" { - interval = "1hour" - } - - var query string - var intervalSQL string - - switch interval { - case "10min": - intervalSQL = "10 minutes" - case "30min": - intervalSQL = "30 minutes" - case "1hour": - intervalSQL = "1 hour" - default: - intervalSQL = "1 hour" - } - - // 构建查询SQL - 使用时间窗口聚合(风向矢量平均,雨量按累计量正增量求和) - query = fmt.Sprintf(` - WITH time_series AS ( - SELECT - date_trunc('hour', timestamp) + - INTERVAL '%s' * FLOOR(EXTRACT(EPOCH FROM timestamp - date_trunc('hour', timestamp)) / EXTRACT(EPOCH FROM INTERVAL '%s')) as time_bucket, - timestamp as ts, - temperature, - humidity, - pressure, - wind_speed, - wind_direction, - rainfall, - light, - uv - FROM rs485_weather_data - WHERE station_id = $1 - AND timestamp >= $2::timestamp - AND timestamp <= $3::timestamp - ), - rain_increments AS ( - SELECT - time_bucket, - GREATEST(rainfall - LAG(rainfall) OVER (PARTITION BY time_bucket ORDER BY ts), 0) AS inc - FROM time_series - ), - rain_sums AS ( - SELECT time_bucket, SUM(inc) AS rainfall - FROM rain_increments - GROUP BY time_bucket - ), - aggregated_data AS ( - SELECT - time_bucket, - ROUND(AVG(temperature)::numeric, 2) as temperature, - ROUND(AVG(humidity)::numeric, 2) as humidity, - ROUND(AVG(pressure)::numeric, 2) as pressure, - ROUND(AVG(wind_speed)::numeric, 2) as wind_speed, - -- 风向使用矢量平均 - ROUND(DEGREES(ATAN2( - AVG(SIN(RADIANS(wind_direction))), - AVG(COS(RADIANS(wind_direction))) - ))::numeric + CASE - WHEN DEGREES(ATAN2( - AVG(SIN(RADIANS(wind_direction))), - AVG(COS(RADIANS(wind_direction))) - )) < 0 THEN 360 - ELSE 0 - END, 2) as wind_direction, - ROUND(AVG(light)::numeric, 2) as light, - ROUND(AVG(uv)::numeric, 2) as uv - FROM time_series - GROUP BY time_bucket - ) - SELECT - ad.time_bucket as time_bucket, - COALESCE(ad.temperature, 0) as temperature, - COALESCE(ad.humidity, 0) as humidity, - COALESCE(ad.pressure, 0) as pressure, - COALESCE(ad.wind_speed, 0) as wind_speed, - COALESCE(ad.wind_direction, 0) as wind_direction, - COALESCE(rs.rainfall, 0) as rainfall, - COALESCE(ad.light, 0) as light, - COALESCE(ad.uv, 0) as uv - FROM aggregated_data ad - LEFT JOIN rain_sums rs ON rs.time_bucket = ad.time_bucket - ORDER BY ad.time_bucket`, intervalSQL, intervalSQL) - - rows, err := db.Query(query, stationID, startTime, endTime) - if err != nil { - http.Error(w, "查询数据失败", http.StatusInternalServerError) - log.Printf("查询数据失败: %v", err) - return - } - defer rows.Close() - - var data []WeatherPoint - for rows.Next() { - var point WeatherPoint - var timestamp time.Time - err := rows.Scan(×tamp, &point.Temperature, &point.Humidity, - &point.Pressure, &point.WindSpeed, &point.WindDir, - &point.Rainfall, &point.Light, &point.UV) - if err != nil { - log.Printf("扫描数据失败: %v", err) - continue - } - point.DateTime = timestamp.Format("2006-01-02 15:04:05") - data = append(data, point) - } - - json.NewEncoder(w).Encode(data) -} - -// 提供静态文件服务 -func serveStaticFiles() { - // 获取当前工作目录 - workDir := "/home/yarnom/Archive/code/WeatherStation" - webDir := filepath.Join(workDir, "web") - - // 创建文件服务器 - fs := http.FileServer(http.Dir(webDir)) - - // 处理根路径请求 - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/" { - http.ServeFile(w, r, filepath.Join(webDir, "index.html")) - return - } - - // 检查文件是否存在 - if _, err := http.Dir(webDir).Open(strings.TrimPrefix(r.URL.Path, "/")); err != nil { - http.NotFound(w, r) - return - } - - // 提供静态文件 - fs.ServeHTTP(w, r) - }) -} - -func StartWebServer() { - err := initWebDB() - if err != nil { - log.Fatalf("初始化Web数据库连接失败: %v", err) - } - - // API路由 - http.HandleFunc("/api/stations", getWH65LPStations) - http.HandleFunc("/api/data", getStationData) - - // 静态文件服务 - serveStaticFiles() - - log.Println("Web服务器启动,监听端口 10003...") - log.Fatal(http.ListenAndServe(":10003", nil)) -}