feat: 优化项目结构

This commit is contained in:
yarnom 2025-08-21 15:57:09 +08:00
parent cc5c607457
commit 3152c6bb14
11 changed files with 595 additions and 749 deletions

View File

@ -0,0 +1,64 @@
package main
import (
"flag"
"log"
"sync"
"weatherstation/internal/database"
"weatherstation/internal/server"
)
func main() {
// 命令行参数
var webOnly = flag.Bool("web", false, "只启动Web服务器Gin")
var udpOnly = flag.Bool("udp", false, "只启动UDP服务器")
flag.Parse()
// 设置日志
server.SetupLogger()
// 初始化数据库连接
_ = database.GetDB() // 确保数据库连接已初始化
defer database.Close()
// 根据命令行参数启动服务
if *webOnly {
// 只启动Web服务器
log.Println("启动Web服务器模式...")
if err := server.StartGinServer(); err != nil {
log.Fatalf("启动Web服务器失败: %v", err)
}
} else if *udpOnly {
// 只启动UDP服务器
log.Println("启动UDP服务器模式...")
if err := server.StartUDPServer(); err != nil {
log.Fatalf("启动UDP服务器失败: %v", err)
}
} else {
// 同时启动UDP和Web服务器
log.Println("启动完整模式UDP + Web服务器...")
var wg sync.WaitGroup
wg.Add(2)
// 启动UDP服务器
go func() {
defer wg.Done()
log.Println("正在启动UDP服务器...")
if err := server.StartUDPServer(); err != nil {
log.Printf("UDP服务器异常退出: %v", err)
}
}()
// 启动Web服务器
go func() {
defer wg.Done()
log.Println("正在启动Web服务器...")
if err := server.StartGinServer(); err != nil {
log.Printf("Web服务器异常退出: %v", err)
}
}()
wg.Wait()
}
}

View File

@ -1,356 +0,0 @@
package main
import (
"database/sql"
"fmt"
"net/http"
"strconv"
"time"
"weatherstation/config"
"github.com/gin-gonic/gin"
_ "github.com/lib/pq"
)
var ginDB *sql.DB
func initGinDB() error {
cfg := config.GetConfig()
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
cfg.Database.Host, cfg.Database.Port, cfg.Database.User,
cfg.Database.Password, cfg.Database.DBName, cfg.Database.SSLMode)
var err error
ginDB, err = sql.Open("postgres", connStr)
if err != nil {
return fmt.Errorf("无法连接到数据库: %v", err)
}
err = ginDB.Ping()
if err != nil {
return fmt.Errorf("数据库连接测试失败: %v", err)
}
return nil
}
// 获取在线设备数量
func getOnlineDevicesCount() int {
if ginDB == nil {
return 0
}
query := `
SELECT COUNT(DISTINCT station_id)
FROM rs485_weather_data
WHERE timestamp > NOW() - INTERVAL '5 minutes'`
var count int
err := ginDB.QueryRow(query).Scan(&count)
if err != nil {
return 0
}
return count
}
// 主页面处理器
func indexHandler(c *gin.Context) {
data := PageData{
Title: "英卓气象站",
ServerTime: time.Now().Format("2006-01-02 15:04:05"),
OnlineDevices: getOnlineDevicesCount(),
TiandituKey: "0c260b8a094a4e0bc507808812cefdac",
}
c.HTML(http.StatusOK, "index.html", data)
}
// 系统状态API
func systemStatusHandler(c *gin.Context) {
status := SystemStatus{
OnlineDevices: getOnlineDevicesCount(),
ServerTime: time.Now().Format("2006-01-02 15:04:05"),
}
c.JSON(http.StatusOK, status)
}
// 获取WH65LP站点列表API
func getStationsHandler(c *gin.Context) {
query := `
SELECT DISTINCT s.station_id,
COALESCE(s.password, '') as station_name,
'WH65LP' as device_type,
COALESCE(MAX(r.timestamp), '1970-01-01'::timestamp) as last_update,
COALESCE(s.latitude, 0) as latitude,
COALESCE(s.longitude, 0) as longitude,
COALESCE(s.name, '') as name,
COALESCE(s.location, '') as location
FROM stations s
LEFT JOIN rs485_weather_data r ON s.station_id = r.station_id
WHERE s.station_id LIKE 'RS485-%'
GROUP BY s.station_id, s.password, s.latitude, s.longitude, s.name, s.location
ORDER BY s.station_id`
rows, err := ginDB.Query(query)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "查询站点失败"})
return
}
defer rows.Close()
var stations []Station
for rows.Next() {
var station Station
var lastUpdate time.Time
err := rows.Scan(
&station.StationID,
&station.StationName,
&station.DeviceType,
&lastUpdate,
&station.Latitude,
&station.Longitude,
&station.Name,
&station.Location,
)
if err != nil {
continue
}
station.LastUpdate = lastUpdate.Format("2006-01-02 15:04:05")
// 从station_id中提取十六进制ID并转换为十进制
if len(station.StationID) > 6 {
hexID := station.StationID[len(station.StationID)-6:]
if decimalID, err := strconv.ParseInt(hexID, 16, 64); err == nil {
station.DecimalID = strconv.FormatInt(decimalID, 10)
}
}
stations = append(stations, station)
}
c.JSON(http.StatusOK, stations)
}
// 获取历史数据API
func getDataHandler(c *gin.Context) {
// 获取查询参数
decimalID := c.Query("decimal_id")
startTime := c.Query("start_time")
endTime := c.Query("end_time")
interval := c.Query("interval")
// 将十进制ID转换为十六进制补足6位
decimalNum, err := strconv.ParseInt(decimalID, 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "无效的站点编号"})
return
}
hexID := fmt.Sprintf("%06X", decimalNum)
stationID := fmt.Sprintf("RS485-%s", hexID)
// 构建查询SQL统一风向矢量平均雨量为累计量的正增量求和
var query string
switch interval {
case "10min":
query = `
WITH base AS (
SELECT
date_trunc('hour', timestamp) + (floor(date_part('minute', timestamp) / 10) * interval '10 minute') as time_group,
timestamp as ts,
temperature, humidity, pressure, wind_speed, wind_direction, rainfall, light, uv
FROM rs485_weather_data
WHERE station_id = $1 AND timestamp BETWEEN $2 AND $3
),
rain_inc AS (
SELECT time_group, GREATEST(rainfall - LAG(rainfall) OVER (PARTITION BY time_group ORDER BY ts), 0) AS inc
FROM base
),
rain_sum AS (
SELECT time_group, SUM(inc) AS rainfall
FROM rain_inc
GROUP BY time_group
),
grouped_data AS (
SELECT
time_group,
AVG(temperature) as temperature,
AVG(humidity) as humidity,
AVG(pressure) as pressure,
AVG(wind_speed) as wind_speed,
DEGREES(ATAN2(AVG(SIN(RADIANS(wind_direction))), AVG(COS(RADIANS(wind_direction))))) AS wind_direction_raw,
AVG(light) as light,
AVG(uv) as uv
FROM base
GROUP BY time_group
)
SELECT
to_char(g.time_group, 'YYYY-MM-DD HH24:MI:SS') as date_time,
ROUND(g.temperature::numeric, 2) as temperature,
ROUND(g.humidity::numeric, 2) as humidity,
ROUND(g.pressure::numeric, 2) as pressure,
ROUND(g.wind_speed::numeric, 2) as wind_speed,
ROUND((CASE WHEN g.wind_direction_raw < 0 THEN g.wind_direction_raw + 360 ELSE g.wind_direction_raw END)::numeric, 2) AS wind_direction,
ROUND(COALESCE(r.rainfall, 0)::numeric, 3) as rainfall,
ROUND(g.light::numeric, 2) as light,
ROUND(g.uv::numeric, 2) as uv
FROM grouped_data g
LEFT JOIN rain_sum r ON r.time_group = g.time_group
ORDER BY g.time_group`
case "30min":
query = `
WITH base AS (
SELECT
date_trunc('hour', timestamp) + (floor(date_part('minute', timestamp) / 30) * interval '30 minute') as time_group,
timestamp as ts,
temperature, humidity, pressure, wind_speed, wind_direction, rainfall, light, uv
FROM rs485_weather_data
WHERE station_id = $1 AND timestamp BETWEEN $2 AND $3
),
rain_inc AS (
SELECT time_group, GREATEST(rainfall - LAG(rainfall) OVER (PARTITION BY time_group ORDER BY ts), 0) AS inc
FROM base
),
rain_sum AS (
SELECT time_group, SUM(inc) AS rainfall
FROM rain_inc
GROUP BY time_group
),
grouped_data AS (
SELECT
time_group,
AVG(temperature) as temperature,
AVG(humidity) as humidity,
AVG(pressure) as pressure,
AVG(wind_speed) as wind_speed,
DEGREES(ATAN2(AVG(SIN(RADIANS(wind_direction))), AVG(COS(RADIANS(wind_direction))))) AS wind_direction_raw,
AVG(light) as light,
AVG(uv) as uv
FROM base
GROUP BY time_group
)
SELECT
to_char(g.time_group, 'YYYY-MM-DD HH24:MI:SS') as date_time,
ROUND(g.temperature::numeric, 2) as temperature,
ROUND(g.humidity::numeric, 2) as humidity,
ROUND(g.pressure::numeric, 2) as pressure,
ROUND(g.wind_speed::numeric, 2) as wind_speed,
ROUND((CASE WHEN g.wind_direction_raw < 0 THEN g.wind_direction_raw + 360 ELSE g.wind_direction_raw END)::numeric, 2) AS wind_direction,
ROUND(COALESCE(r.rainfall, 0)::numeric, 3) as rainfall,
ROUND(g.light::numeric, 2) as light,
ROUND(g.uv::numeric, 2) as uv
FROM grouped_data g
LEFT JOIN rain_sum r ON r.time_group = g.time_group
ORDER BY g.time_group`
default: // 1hour
query = `
WITH base AS (
SELECT
date_trunc('hour', timestamp) as time_group,
timestamp as ts,
temperature, humidity, pressure, wind_speed, wind_direction, rainfall, light, uv
FROM rs485_weather_data
WHERE station_id = $1 AND timestamp BETWEEN $2 AND $3
),
rain_inc AS (
SELECT time_group, GREATEST(rainfall - LAG(rainfall) OVER (PARTITION BY time_group ORDER BY ts), 0) AS inc
FROM base
),
rain_sum AS (
SELECT time_group, SUM(inc) AS rainfall
FROM rain_inc
GROUP BY time_group
),
grouped_data AS (
SELECT
time_group,
AVG(temperature) as temperature,
AVG(humidity) as humidity,
AVG(pressure) as pressure,
AVG(wind_speed) as wind_speed,
DEGREES(ATAN2(AVG(SIN(RADIANS(wind_direction))), AVG(COS(RADIANS(wind_direction))))) AS wind_direction_raw,
AVG(light) as light,
AVG(uv) as uv
FROM base
GROUP BY time_group
)
SELECT
to_char(g.time_group, 'YYYY-MM-DD HH24:MI:SS') as date_time,
ROUND(g.temperature::numeric, 2) as temperature,
ROUND(g.humidity::numeric, 2) as humidity,
ROUND(g.pressure::numeric, 2) as pressure,
ROUND(g.wind_speed::numeric, 2) as wind_speed,
ROUND((CASE WHEN g.wind_direction_raw < 0 THEN g.wind_direction_raw + 360 ELSE g.wind_direction_raw END)::numeric, 2) AS wind_direction,
ROUND(COALESCE(r.rainfall, 0)::numeric, 3) as rainfall,
ROUND(g.light::numeric, 2) as light,
ROUND(g.uv::numeric, 2) as uv
FROM grouped_data g
LEFT JOIN rain_sum r ON r.time_group = g.time_group
ORDER BY g.time_group`
}
// 执行查询
rows, err := ginDB.Query(query, stationID, startTime, endTime)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "查询数据失败"})
return
}
defer rows.Close()
var weatherPoints []WeatherPoint
for rows.Next() {
var point WeatherPoint
err := rows.Scan(
&point.DateTime,
&point.Temperature,
&point.Humidity,
&point.Pressure,
&point.WindSpeed,
&point.WindDir,
&point.Rainfall,
&point.Light,
&point.UV,
)
if err != nil {
continue
}
weatherPoints = append(weatherPoints, point)
}
c.JSON(http.StatusOK, weatherPoints)
}
func StartGinServer() {
err := initGinDB()
if err != nil {
fmt.Printf("初始化Gin数据库连接失败: %v\n", err)
return
}
// 设置Gin模式
gin.SetMode(gin.ReleaseMode)
// 创建Gin引擎
r := gin.Default()
// 加载HTML模板
r.LoadHTMLGlob("templates/*")
// 静态文件服务
r.Static("/static", "./static")
// 路由设置
r.GET("/", indexHandler)
// API路由组
api := r.Group("/api")
{
api.GET("/system/status", systemStatusHandler)
api.GET("/stations", getStationsHandler)
api.GET("/data", getDataHandler)
}
// 启动服务器
fmt.Println("Gin Web服务器启动监听端口 10003...")
r.Run(":10003")
}

2
go.mod
View File

@ -5,6 +5,7 @@ go 1.23.0
toolchain go1.24.5 toolchain go1.24.5
require ( require (
github.com/gin-gonic/gin v1.10.1
github.com/lib/pq v1.10.9 github.com/lib/pq v1.10.9
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )
@ -16,7 +17,6 @@ require (
github.com/cloudwego/iasm v0.2.0 // indirect github.com/cloudwego/iasm v0.2.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect
github.com/gin-gonic/gin v1.10.1 // indirect
github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect github.com/go-playground/validator/v10 v10.20.0 // indirect

87
internal/config/config.go Normal file
View File

@ -0,0 +1,87 @@
package config
import (
"fmt"
"os"
"path/filepath"
"sync"
"gopkg.in/yaml.v3"
)
type ServerConfig struct {
WebPort int `yaml:"web_port"` // Gin Web服务器端口
UDPPort int `yaml:"udp_port"` // UDP服务器端口
}
type DatabaseConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
User string `yaml:"user"`
Password string `yaml:"password"`
DBName string `yaml:"dbname"`
SSLMode string `yaml:"sslmode"`
}
type Config struct {
Server ServerConfig `yaml:"server"`
Database DatabaseConfig `yaml:"database"`
}
var (
instance *Config
once sync.Once
)
// GetConfig 返回配置单例
func GetConfig() *Config {
once.Do(func() {
instance = &Config{}
if err := instance.loadConfig(); err != nil {
panic(fmt.Sprintf("加载配置文件失败: %v", err))
}
})
return instance
}
// loadConfig 从配置文件加载配置
func (c *Config) loadConfig() error {
// 尝试多个位置查找配置文件
configPaths := []string{
"config.yaml", // 当前目录
"../config.yaml", // 上级目录
"../../config.yaml", // 项目根目录
filepath.Join(os.Getenv("HOME"), ".weatherstation/config.yaml"), // 用户目录
}
var data []byte
var err error
for _, path := range configPaths {
if data, err = os.ReadFile(path); err == nil {
break
}
}
if err != nil {
return fmt.Errorf("未找到配置文件: %v", err)
}
if err := yaml.Unmarshal(data, c); err != nil {
return fmt.Errorf("解析配置文件失败: %v", err)
}
return c.validate()
}
// validate 验证配置有效性
func (c *Config) validate() error {
if c.Server.WebPort <= 0 {
c.Server.WebPort = 10003 // 默认Web端口
}
if c.Server.UDPPort <= 0 {
c.Server.UDPPort = 10001 // 默认UDP端口
}
if c.Database.SSLMode == "" {
c.Database.SSLMode = "disable" // 默认禁用SSL
}
return nil
}

50
internal/database/db.go Normal file
View File

@ -0,0 +1,50 @@
package database
import (
"database/sql"
"fmt"
"sync"
"weatherstation/internal/config"
_ "github.com/lib/pq"
)
var (
instance *sql.DB
once sync.Once
)
// GetDB 返回数据库连接单例
func GetDB() *sql.DB {
once.Do(func() {
cfg := config.GetConfig()
connStr := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
cfg.Database.Host,
cfg.Database.Port,
cfg.Database.User,
cfg.Database.Password,
cfg.Database.DBName,
cfg.Database.SSLMode,
)
var err error
instance, err = sql.Open("postgres", connStr)
if err != nil {
panic(fmt.Sprintf("无法连接到数据库: %v", err))
}
if err = instance.Ping(); err != nil {
panic(fmt.Sprintf("数据库连接测试失败: %v", err))
}
})
return instance
}
// Close 关闭数据库连接
func Close() error {
if instance != nil {
return instance.Close()
}
return nil
}

161
internal/database/models.go Normal file
View File

@ -0,0 +1,161 @@
package database
import (
"database/sql"
"time"
"weatherstation/pkg/types"
)
// GetOnlineDevicesCount 获取在线设备数量
func GetOnlineDevicesCount(db *sql.DB) int {
query := `
SELECT COUNT(DISTINCT station_id)
FROM rs485_weather_data
WHERE timestamp > NOW() - INTERVAL '5 minutes'`
var count int
if err := db.QueryRow(query).Scan(&count); err != nil {
return 0
}
return count
}
// GetStations 获取所有WH65LP站点列表
func GetStations(db *sql.DB) ([]types.Station, error) {
query := `
SELECT DISTINCT s.station_id,
COALESCE(s.password, '') as station_name,
'WH65LP' as device_type,
COALESCE(MAX(r.timestamp), '1970-01-01'::timestamp) as last_update,
COALESCE(s.latitude, 0) as latitude,
COALESCE(s.longitude, 0) as longitude,
COALESCE(s.name, '') as name,
COALESCE(s.location, '') as location
FROM stations s
LEFT JOIN rs485_weather_data r ON s.station_id = r.station_id
WHERE s.station_id LIKE 'RS485-%'
GROUP BY s.station_id, s.password, s.latitude, s.longitude, s.name, s.location
ORDER BY s.station_id`
rows, err := db.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
var stations []types.Station
for rows.Next() {
var station types.Station
var lastUpdate time.Time
err := rows.Scan(
&station.StationID,
&station.StationName,
&station.DeviceType,
&lastUpdate,
&station.Latitude,
&station.Longitude,
&station.Name,
&station.Location,
)
if err != nil {
continue
}
station.LastUpdate = lastUpdate.Format("2006-01-02 15:04:05")
stations = append(stations, station)
}
return stations, nil
}
// GetWeatherData 获取指定站点的历史天气数据
func GetWeatherData(db *sql.DB, stationID string, startTime, endTime time.Time, interval string) ([]types.WeatherPoint, error) {
// 构建查询SQL统一风向矢量平均雨量为累计量的正增量求和
var query string
var intervalStr string
switch interval {
case "10min":
intervalStr = "10 minutes"
case "30min":
intervalStr = "30 minutes"
default: // 1hour
intervalStr = "1 hour"
}
query = buildWeatherDataQuery(intervalStr)
rows, err := db.Query(query, intervalStr, stationID, startTime, endTime)
if err != nil {
return nil, err
}
defer rows.Close()
var points []types.WeatherPoint
for rows.Next() {
var point types.WeatherPoint
err := rows.Scan(
&point.DateTime,
&point.Temperature,
&point.Humidity,
&point.Pressure,
&point.WindSpeed,
&point.WindDir,
&point.Rainfall,
&point.Light,
&point.UV,
)
if err != nil {
continue
}
points = append(points, point)
}
return points, nil
}
// buildWeatherDataQuery 构建天气数据查询SQL
func buildWeatherDataQuery(interval string) string {
return `
WITH base AS (
SELECT
date_trunc('hour', timestamp) +
(floor(date_part('minute', timestamp) / extract(epoch from $1::interval) * 60) * $1::interval) as time_group,
timestamp as ts,
temperature, humidity, pressure, wind_speed, wind_direction, rainfall, light, uv
FROM rs485_weather_data
WHERE station_id = $2 AND timestamp BETWEEN $3 AND $4
),
rain_inc AS (
SELECT time_group, GREATEST(rainfall - LAG(rainfall) OVER (PARTITION BY time_group ORDER BY ts), 0) AS inc
FROM base
),
rain_sum AS (
SELECT time_group, SUM(inc) AS rainfall
FROM rain_inc
GROUP BY time_group
),
grouped_data AS (
SELECT
time_group,
AVG(temperature) as temperature,
AVG(humidity) as humidity,
AVG(pressure) as pressure,
AVG(wind_speed) as wind_speed,
DEGREES(ATAN2(AVG(SIN(RADIANS(wind_direction))), AVG(COS(RADIANS(wind_direction))))) AS wind_direction_raw,
AVG(light) as light,
AVG(uv) as uv
FROM base
GROUP BY time_group
)
SELECT
to_char(g.time_group, 'YYYY-MM-DD HH24:MI:SS') as date_time,
ROUND(g.temperature::numeric, 2) as temperature,
ROUND(g.humidity::numeric, 2) as humidity,
ROUND(g.pressure::numeric, 2) as pressure,
ROUND(g.wind_speed::numeric, 2) as wind_speed,
ROUND((CASE WHEN g.wind_direction_raw < 0 THEN g.wind_direction_raw + 360 ELSE g.wind_direction_raw END)::numeric, 2) AS wind_direction,
ROUND(COALESCE(r.rainfall, 0)::numeric, 3) as rainfall,
ROUND(g.light::numeric, 2) as light,
ROUND(g.uv::numeric, 2) as uv
FROM grouped_data g
LEFT JOIN rain_sum r ON r.time_group = g.time_group
ORDER BY g.time_group`
}

130
internal/server/gin.go Normal file
View File

@ -0,0 +1,130 @@
package server
import (
"fmt"
"net/http"
"strconv"
"time"
"weatherstation/internal/config"
"weatherstation/internal/database"
"weatherstation/pkg/types"
"github.com/gin-gonic/gin"
)
// StartGinServer 启动Gin Web服务器
func StartGinServer() error {
// 设置Gin模式
gin.SetMode(gin.ReleaseMode)
// 创建Gin引擎
r := gin.Default()
// 加载HTML模板
r.LoadHTMLGlob("templates/*")
// 静态文件服务
r.Static("/static", "./static")
// 路由设置
r.GET("/", indexHandler)
// API路由组
api := r.Group("/api")
{
api.GET("/system/status", systemStatusHandler)
api.GET("/stations", getStationsHandler)
api.GET("/data", getDataHandler)
}
// 获取配置的Web端口
port := config.GetConfig().Server.WebPort
if port == 0 {
port = 10003 // 默认端口
}
// 启动服务器
fmt.Printf("Gin Web服务器启动监听端口 %d...\n", port)
return r.Run(fmt.Sprintf(":%d", port))
}
// indexHandler 处理主页请求
func indexHandler(c *gin.Context) {
data := types.PageData{
Title: "英卓气象站",
ServerTime: time.Now().Format("2006-01-02 15:04:05"),
OnlineDevices: database.GetOnlineDevicesCount(database.GetDB()),
TiandituKey: "0c260b8a094a4e0bc507808812cefdac",
}
c.HTML(http.StatusOK, "index.html", data)
}
// systemStatusHandler 处理系统状态API请求
func systemStatusHandler(c *gin.Context) {
status := types.SystemStatus{
OnlineDevices: database.GetOnlineDevicesCount(database.GetDB()),
ServerTime: time.Now().Format("2006-01-02 15:04:05"),
}
c.JSON(http.StatusOK, status)
}
// getStationsHandler 处理获取站点列表API请求
func getStationsHandler(c *gin.Context) {
stations, err := database.GetStations(database.GetDB())
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "查询站点失败"})
return
}
// 为每个站点计算十进制ID
for i := range stations {
if len(stations[i].StationID) > 6 {
hexID := stations[i].StationID[len(stations[i].StationID)-6:]
if decimalID, err := strconv.ParseInt(hexID, 16, 64); err == nil {
stations[i].DecimalID = strconv.FormatInt(decimalID, 10)
}
}
}
c.JSON(http.StatusOK, stations)
}
// getDataHandler 处理获取历史数据API请求
func getDataHandler(c *gin.Context) {
// 获取查询参数
decimalID := c.Query("decimal_id")
startTime := c.Query("start_time")
endTime := c.Query("end_time")
interval := c.Query("interval")
// 将十进制ID转换为十六进制补足6位
decimalNum, err := strconv.ParseInt(decimalID, 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "无效的站点编号"})
return
}
hexID := fmt.Sprintf("%06X", decimalNum)
stationID := fmt.Sprintf("RS485-%s", hexID)
// 解析时间
start, err := time.Parse("2006-01-02 15:04:05", startTime)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "无效的开始时间"})
return
}
end, err := time.Parse("2006-01-02 15:04:05", endTime)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "无效的结束时间"})
return
}
// 获取数据
points, err := database.GetWeatherData(database.GetDB(), stationID, start, end, interval)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "查询数据失败"})
return
}
c.JSON(http.StatusOK, points)
}

View File

@ -1,4 +1,4 @@
package main package server
import ( import (
"bufio" "bufio"
@ -13,19 +13,21 @@ import (
"sync" "sync"
"time" "time"
"unicode/utf8" "unicode/utf8"
"weatherstation/internal/config"
"weatherstation/config"
"weatherstation/model" "weatherstation/model"
) )
// UTF8Writer 包装一个io.Writer确保写入的数据是有效的UTF-8
type UTF8Writer struct { type UTF8Writer struct {
w io.Writer w io.Writer
} }
// NewUTF8Writer 创建一个新的UTF8Writer
func NewUTF8Writer(w io.Writer) *UTF8Writer { func NewUTF8Writer(w io.Writer) *UTF8Writer {
return &UTF8Writer{w: w} return &UTF8Writer{w: w}
} }
// Write 实现io.Writer接口
func (w *UTF8Writer) Write(p []byte) (n int, err error) { func (w *UTF8Writer) Write(p []byte) (n int, err error) {
if utf8.Valid(p) { if utf8.Valid(p) {
return w.w.Write(p) return w.w.Write(p)
@ -41,11 +43,13 @@ var (
currentLogDay int currentLogDay int
) )
// getLogFileName 获取当前日期的日志文件名
func getLogFileName() string { func getLogFileName() string {
currentTime := time.Now() currentTime := time.Now()
return filepath.Join("log", fmt.Sprintf("%s.log", currentTime.Format("2006-01-02"))) return filepath.Join("log", fmt.Sprintf("%s.log", currentTime.Format("2006-01-02")))
} }
// openLogFile 打开日志文件
func openLogFile() (*os.File, error) { func openLogFile() (*os.File, error) {
logDir := "log" logDir := "log"
if _, err := os.Stat(logDir); os.IsNotExist(err) { if _, err := os.Stat(logDir); os.IsNotExist(err) {
@ -56,7 +60,8 @@ func openLogFile() (*os.File, error) {
return os.OpenFile(logFileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) return os.OpenFile(logFileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
} }
func setupLogger() { // SetupLogger 设置日志系统
func SetupLogger() {
var err error var err error
logFile, err = openLogFile() logFile, err = openLogFile()
if err != nil { if err != nil {
@ -99,21 +104,19 @@ func setupLogger() {
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
} }
func startUDP() { // StartUDPServer 启动UDP服务器
func StartUDPServer() error {
cfg := config.GetConfig() cfg := config.GetConfig()
err := model.InitDB()
if err != nil {
log.Fatalf("初始化数据库失败: %v", err)
}
defer model.CloseDB()
addr := fmt.Sprintf(":%d", cfg.Server.UDPPort) addr := fmt.Sprintf(":%d", cfg.Server.UDPPort)
conn, err := net.ListenPacket("udp", addr) conn, err := net.ListenPacket("udp", addr)
if err != nil { if err != nil {
log.Fatalf("无法监听UDP端口 %d: %v", cfg.Server.UDPPort, err) return fmt.Errorf("无法监听UDP端口 %d: %v", cfg.Server.UDPPort, err)
} }
defer conn.Close() defer conn.Close()
log.Printf("UDP服务器已启动监听端口 %d...", cfg.Server.UDPPort) log.Printf("UDP服务器已启动监听端口 %d...", cfg.Server.UDPPort)
buffer := make([]byte, 2048) buffer := make([]byte, 2048)
for { for {
n, addr, err := conn.ReadFrom(buffer) n, addr, err := conn.ReadFrom(buffer)
if err != nil { if err != nil {
@ -129,88 +132,98 @@ func startUDP() {
log.Printf("ASCII码:\n%s", asciiDump) log.Printf("ASCII码:\n%s", asciiDump)
if len(rawData) == 25 && rawData[0] == 0x24 { if len(rawData) == 25 && rawData[0] == 0x24 {
log.Println("485 型气象站数据") handleRS485Data(rawData, addr, hexDump)
// 生成源码字符串(用于日志记录)
sourceHex := strings.ReplaceAll(strings.TrimSpace(hexDump), "\n", " ")
log.Printf("源码: %s", sourceHex)
// 解析RS485数据
protocol := model.NewProtocol(rawData)
rs485Protocol := model.NewRS485Protocol(rawData)
// 获取设备ID
idParts, err := protocol.GetCompleteID()
if err != nil {
log.Printf("获取设备ID失败: %v", err)
continue
}
// 解析RS485数据
rs485Data, err := rs485Protocol.ParseRS485Data()
if err != nil {
log.Printf("解析RS485数据失败: %v", err)
continue
}
// 添加设备ID和时间戳
rs485Data.DeviceID = idParts.Complete.Hex
rs485Data.ReceivedAt = time.Now()
rs485Data.RawDataHex = sourceHex
// 打印解析结果到日志
log.Println("=== RS485 ===")
log.Printf("设备ID: RS485-%s", rs485Data.DeviceID)
log.Printf("温度: %.2f°C", rs485Data.Temperature)
log.Printf("湿度: %.1f%%", rs485Data.Humidity)
log.Printf("风速: %.5f m/s", rs485Data.WindSpeed)
log.Printf("风向: %.1f°", rs485Data.WindDirection)
log.Printf("降雨量: %.3f mm", rs485Data.Rainfall)
log.Printf("光照: %.1f lux", rs485Data.Light)
log.Printf("紫外线: %.1f", rs485Data.UV)
log.Printf("气压: %.2f hPa", rs485Data.Pressure)
log.Printf("接收时间: %s", rs485Data.ReceivedAt.Format("2006-01-02 15:04:05"))
// 注册设备
stationID := fmt.Sprintf("RS485-%s", rs485Data.DeviceID)
model.RegisterDevice(stationID, addr)
log.Printf("设备 %s 已注册IP: %s", stationID, addr.String())
// 保存到数据库
err = model.SaveWeatherData(rs485Data, string(rawData))
if err != nil {
log.Printf("保存数据到数据库失败: %v", err)
} else {
log.Printf("数据已成功保存到数据库")
}
} else { } else {
// 尝试解析WIFI数据 handleWiFiData(rawData, addr)
data, deviceType, err := model.ParseData(rawData) }
if err != nil { }
log.Printf("解析数据失败: %v", err) }
continue
}
log.Println("成功解析气象站数据:") // handleRS485Data 处理RS485设备数据
log.Printf("设备类型: %s", getDeviceTypeString(deviceType)) func handleRS485Data(rawData []byte, addr net.Addr, hexDump string) {
log.Println(data) log.Println("485 型气象站数据")
if deviceType == model.DeviceTypeWIFI { // 生成源码字符串(用于日志记录)
if wifiData, ok := data.(*model.WeatherData); ok { sourceHex := strings.ReplaceAll(strings.TrimSpace(hexDump), "\n", " ")
stationID := wifiData.StationID log.Printf("源码: %s", sourceHex)
if stationID != "" {
model.RegisterDevice(stationID, addr) // 解析RS485数据
log.Printf("设备 %s 已注册IP: %s", stationID, addr.String()) protocol := model.NewProtocol(rawData)
} else { rs485Protocol := model.NewRS485Protocol(rawData)
log.Printf("警告: 收到的数据没有站点ID")
} // 获取设备ID
} idParts, err := protocol.GetCompleteID()
if err != nil {
log.Printf("获取设备ID失败: %v", err)
return
}
// 解析RS485数据
rs485Data, err := rs485Protocol.ParseRS485Data()
if err != nil {
log.Printf("解析RS485数据失败: %v", err)
return
}
// 添加设备ID和时间戳
rs485Data.DeviceID = idParts.Complete.Hex
rs485Data.ReceivedAt = time.Now()
rs485Data.RawDataHex = sourceHex
// 打印解析结果到日志
log.Println("=== RS485 ===")
log.Printf("设备ID: RS485-%s", rs485Data.DeviceID)
log.Printf("温度: %.2f°C", rs485Data.Temperature)
log.Printf("湿度: %.1f%%", rs485Data.Humidity)
log.Printf("风速: %.5f m/s", rs485Data.WindSpeed)
log.Printf("风向: %.1f°", rs485Data.WindDirection)
log.Printf("降雨量: %.3f mm", rs485Data.Rainfall)
log.Printf("光照: %.1f lux", rs485Data.Light)
log.Printf("紫外线: %.1f", rs485Data.UV)
log.Printf("气压: %.2f hPa", rs485Data.Pressure)
log.Printf("接收时间: %s", rs485Data.ReceivedAt.Format("2006-01-02 15:04:05"))
// 注册设备
stationID := fmt.Sprintf("RS485-%s", rs485Data.DeviceID)
model.RegisterDevice(stationID, addr)
log.Printf("设备 %s 已注册IP: %s", stationID, addr.String())
// 保存到数据库
err = model.SaveWeatherData(rs485Data, string(rawData))
if err != nil {
log.Printf("保存数据到数据库失败: %v", err)
} else {
log.Printf("数据已成功保存到数据库")
}
}
// handleWiFiData 处理WiFi设备数据
func handleWiFiData(rawData []byte, addr net.Addr) {
// 尝试解析WIFI数据
data, deviceType, err := model.ParseData(rawData)
if err != nil {
log.Printf("解析数据失败: %v", err)
return
}
log.Println("成功解析气象站数据:")
log.Printf("设备类型: %s", getDeviceTypeString(deviceType))
log.Println(data)
if deviceType == model.DeviceTypeWIFI {
if wifiData, ok := data.(*model.WeatherData); ok {
stationID := wifiData.StationID
if stationID != "" {
model.RegisterDevice(stationID, addr)
log.Printf("设备 %s 已注册IP: %s", stationID, addr.String())
} else {
log.Printf("警告: 收到的数据没有站点ID")
} }
} }
} }
} }
// getDeviceTypeString 获取设备类型字符串
func getDeviceTypeString(deviceType model.DeviceType) string { func getDeviceTypeString(deviceType model.DeviceType) string {
switch deviceType { switch deviceType {
case model.DeviceTypeWIFI: case model.DeviceTypeWIFI:
@ -222,6 +235,7 @@ func getDeviceTypeString(deviceType model.DeviceType) string {
} }
} }
// hexDump 生成十六进制转储
func hexDump(data []byte) string { func hexDump(data []byte) string {
var result strings.Builder var result strings.Builder
for i := 0; i < len(data); i += 16 { for i := 0; i < len(data); i += 16 {
@ -242,6 +256,7 @@ func hexDump(data []byte) string {
return result.String() return result.String()
} }
// asciiDump 生成ASCII转储
func asciiDump(data []byte) string { func asciiDump(data []byte) string {
var result strings.Builder var result strings.Builder
for i := 0; i < len(data); i += 64 { for i := 0; i < len(data); i += 64 {

View File

@ -1,54 +0,0 @@
package main
import (
"flag"
"log"
"sync"
)
func main() {
var webOnly = flag.Bool("web", false, "只启动Web服务器原生http")
var ginOnly = flag.Bool("gin", false, "只启动Gin Web服务器")
var udpOnly = flag.Bool("udp", false, "只启动UDP服务器")
flag.Parse()
// 设置日志
setupLogger()
var wg sync.WaitGroup
if *webOnly {
// 只启动原生Web服务器
log.Println("启动原生Web服务器模式...")
StartWebServer()
} else if *ginOnly {
// 只启动Gin Web服务器
log.Println("启动Gin Web服务器模式...")
StartGinServer()
} else if *udpOnly {
// 只启动UDP服务器
log.Println("启动UDP服务器模式...")
startUDP()
} else {
// 同时启动UDP和Gin Web服务器
log.Println("启动完整模式UDP + Gin Web服务器...")
wg.Add(2)
// 启动UDP服务器
go func() {
defer wg.Done()
log.Println("正在启动UDP服务器...")
startUDP()
}()
// 启动Gin Web服务器
go func() {
defer wg.Done()
log.Println("正在启动Gin Web服务器...")
StartGinServer()
}()
wg.Wait()
}
}

View File

@ -1,4 +1,4 @@
package main package types
// Station 站点信息 // Station 站点信息
type Station struct { type Station struct {

View File

@ -1,251 +0,0 @@
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"path/filepath"
"strings"
"time"
"weatherstation/config"
_ "github.com/lib/pq"
)
var db *sql.DB
func initWebDB() error {
cfg := config.GetConfig()
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
cfg.Database.Host, cfg.Database.Port, cfg.Database.User,
cfg.Database.Password, cfg.Database.DBName, cfg.Database.SSLMode)
var err error
db, err = sql.Open("postgres", connStr)
if err != nil {
return fmt.Errorf("无法连接到数据库: %v", err)
}
err = db.Ping()
if err != nil {
return fmt.Errorf("数据库连接测试失败: %v", err)
}
return nil
}
// 获取WH65LP站点列表
func getWH65LPStations(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
query := `
SELECT DISTINCT s.station_id,
COALESCE(s.password, '') as station_name,
'WH65LP' as device_type,
COALESCE(MAX(r.timestamp), '1970-01-01'::timestamp) as last_update
FROM stations s
LEFT JOIN rs485_weather_data r ON s.station_id = r.station_id
WHERE s.station_id LIKE 'RS485-%'
GROUP BY s.station_id, s.password
ORDER BY s.station_id`
rows, err := db.Query(query)
if err != nil {
http.Error(w, "查询站点失败", http.StatusInternalServerError)
log.Printf("查询站点失败: %v", err)
return
}
defer rows.Close()
var stations []Station
for rows.Next() {
var station Station
var lastUpdate time.Time
err := rows.Scan(&station.StationID, &station.StationName, &station.DeviceType, &lastUpdate)
if err != nil {
log.Printf("扫描站点数据失败: %v", err)
continue
}
station.LastUpdate = lastUpdate.Format("2006-01-02 15:04:05")
stations = append(stations, station)
}
json.NewEncoder(w).Encode(stations)
}
// 获取站点历史数据
func getStationData(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
stationID := r.URL.Query().Get("station_id")
startTime := r.URL.Query().Get("start_time")
endTime := r.URL.Query().Get("end_time")
interval := r.URL.Query().Get("interval")
if stationID == "" || startTime == "" || endTime == "" {
http.Error(w, "缺少必要参数", http.StatusBadRequest)
return
}
// 默认间隔为1小时
if interval == "" {
interval = "1hour"
}
var query string
var intervalSQL string
switch interval {
case "10min":
intervalSQL = "10 minutes"
case "30min":
intervalSQL = "30 minutes"
case "1hour":
intervalSQL = "1 hour"
default:
intervalSQL = "1 hour"
}
// 构建查询SQL - 使用时间窗口聚合(风向矢量平均,雨量按累计量正增量求和)
query = fmt.Sprintf(`
WITH time_series AS (
SELECT
date_trunc('hour', timestamp) +
INTERVAL '%s' * FLOOR(EXTRACT(EPOCH FROM timestamp - date_trunc('hour', timestamp)) / EXTRACT(EPOCH FROM INTERVAL '%s')) as time_bucket,
timestamp as ts,
temperature,
humidity,
pressure,
wind_speed,
wind_direction,
rainfall,
light,
uv
FROM rs485_weather_data
WHERE station_id = $1
AND timestamp >= $2::timestamp
AND timestamp <= $3::timestamp
),
rain_increments AS (
SELECT
time_bucket,
GREATEST(rainfall - LAG(rainfall) OVER (PARTITION BY time_bucket ORDER BY ts), 0) AS inc
FROM time_series
),
rain_sums AS (
SELECT time_bucket, SUM(inc) AS rainfall
FROM rain_increments
GROUP BY time_bucket
),
aggregated_data AS (
SELECT
time_bucket,
ROUND(AVG(temperature)::numeric, 2) as temperature,
ROUND(AVG(humidity)::numeric, 2) as humidity,
ROUND(AVG(pressure)::numeric, 2) as pressure,
ROUND(AVG(wind_speed)::numeric, 2) as wind_speed,
-- 风向使用矢量平均
ROUND(DEGREES(ATAN2(
AVG(SIN(RADIANS(wind_direction))),
AVG(COS(RADIANS(wind_direction)))
))::numeric + CASE
WHEN DEGREES(ATAN2(
AVG(SIN(RADIANS(wind_direction))),
AVG(COS(RADIANS(wind_direction)))
)) < 0 THEN 360
ELSE 0
END, 2) as wind_direction,
ROUND(AVG(light)::numeric, 2) as light,
ROUND(AVG(uv)::numeric, 2) as uv
FROM time_series
GROUP BY time_bucket
)
SELECT
ad.time_bucket as time_bucket,
COALESCE(ad.temperature, 0) as temperature,
COALESCE(ad.humidity, 0) as humidity,
COALESCE(ad.pressure, 0) as pressure,
COALESCE(ad.wind_speed, 0) as wind_speed,
COALESCE(ad.wind_direction, 0) as wind_direction,
COALESCE(rs.rainfall, 0) as rainfall,
COALESCE(ad.light, 0) as light,
COALESCE(ad.uv, 0) as uv
FROM aggregated_data ad
LEFT JOIN rain_sums rs ON rs.time_bucket = ad.time_bucket
ORDER BY ad.time_bucket`, intervalSQL, intervalSQL)
rows, err := db.Query(query, stationID, startTime, endTime)
if err != nil {
http.Error(w, "查询数据失败", http.StatusInternalServerError)
log.Printf("查询数据失败: %v", err)
return
}
defer rows.Close()
var data []WeatherPoint
for rows.Next() {
var point WeatherPoint
var timestamp time.Time
err := rows.Scan(&timestamp, &point.Temperature, &point.Humidity,
&point.Pressure, &point.WindSpeed, &point.WindDir,
&point.Rainfall, &point.Light, &point.UV)
if err != nil {
log.Printf("扫描数据失败: %v", err)
continue
}
point.DateTime = timestamp.Format("2006-01-02 15:04:05")
data = append(data, point)
}
json.NewEncoder(w).Encode(data)
}
// 提供静态文件服务
func serveStaticFiles() {
// 获取当前工作目录
workDir := "/home/yarnom/Archive/code/WeatherStation"
webDir := filepath.Join(workDir, "web")
// 创建文件服务器
fs := http.FileServer(http.Dir(webDir))
// 处理根路径请求
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
http.ServeFile(w, r, filepath.Join(webDir, "index.html"))
return
}
// 检查文件是否存在
if _, err := http.Dir(webDir).Open(strings.TrimPrefix(r.URL.Path, "/")); err != nil {
http.NotFound(w, r)
return
}
// 提供静态文件
fs.ServeHTTP(w, r)
})
}
func StartWebServer() {
err := initWebDB()
if err != nil {
log.Fatalf("初始化Web数据库连接失败: %v", err)
}
// API路由
http.HandleFunc("/api/stations", getWH65LPStations)
http.HandleFunc("/api/data", getStationData)
// 静态文件服务
serveStaticFiles()
log.Println("Web服务器启动监听端口 10003...")
log.Fatal(http.ListenAndServe(":10003", nil))
}