diff --git a/data_process.go b/data_process.go deleted file mode 100644 index 8587976..0000000 --- a/data_process.go +++ /dev/null @@ -1,80 +0,0 @@ -package main - -import ( - "time" -) - -// 传感器数据结构 -type SensorData struct { - WindSpeed int - WindForce int - WindDirection8 int - WindDirection360 int - Humidity int - Temperature int - Noise int - PM25 int - PM10 int - AtmPressure int - Lux20WH int - Lux20WL int - Light20W int - OpticalRain int - CompassAngle int - SolarRadiation int - Timestamp time.Time -} - -// 解析传感器数据 -func parseData(data []byte) *SensorData { - // 检查响应格式是否正确 - if len(data) < 37 || data[0] != 0x01 || data[1] != 0x03 || data[2] != 0x20 { - logger.Println("响应格式无效") - return nil - } - - // 提取数据部分 - dataBytes := data[3:35] // 跳过地址码、功能码、长度,不包括CRC - - sensorData := SensorData{ - WindSpeed: bytesToInt(dataBytes[0:2]), - WindForce: bytesToInt(dataBytes[2:4]), - WindDirection8: bytesToInt(dataBytes[4:6]), - WindDirection360: bytesToInt(dataBytes[6:8]), - Humidity: bytesToInt(dataBytes[8:10]), - Temperature: bytesToInt(dataBytes[10:12]), - Noise: bytesToInt(dataBytes[12:14]), - PM25: bytesToInt(dataBytes[14:16]), - PM10: bytesToInt(dataBytes[16:18]), - AtmPressure: bytesToInt(dataBytes[18:20]), - Lux20WH: bytesToInt(dataBytes[20:22]), - Lux20WL: bytesToInt(dataBytes[22:24]), - Light20W: bytesToInt(dataBytes[24:26]), - OpticalRain: bytesToInt(dataBytes[26:28]), - CompassAngle: bytesToInt(dataBytes[28:30]), - SolarRadiation: bytesToInt(dataBytes[30:32]), - Timestamp: time.Now(), - } - - return &sensorData -} - -// 记录传感器数据到日志 -func logSensorData(data SensorData) { - logger.Printf("[传感器] 时间: %s, 温度: %.1f°C, 湿度: %.1f%%, 风速: %.2fm/s (%d°), 雨量: %.1fmm, PM2.5: %dμg/m³", - data.Timestamp.Format("2006-01-02 15:04:05"), - float64(data.Temperature)/10.0, - float64(data.Humidity)/10.0, - float64(data.WindSpeed)/100.0, - data.WindDirection360, - float64(data.OpticalRain)/10.0, - data.PM25) -} - -// 将两个字节转换为整数 -func bytesToInt(bytes []byte) int { - if len(bytes) != 2 { - return 0 - } - return int(bytes[0])<<8 | int(bytes[1]) -} diff --git a/database.go b/database.go deleted file mode 100644 index 9633fca..0000000 --- a/database.go +++ /dev/null @@ -1,74 +0,0 @@ -package main - -import ( - "database/sql" - "os" -) - -var db *sql.DB - -// 初始化数据库 -func initDB() { - var err error - if err := os.MkdirAll("data", 0755); err != nil { - logger.Fatalf("创建数据目录失败: %v", err) - } - - db, err = sql.Open("sqlite", "data/sensor.db") - if err != nil { - logger.Fatalf("打开数据库错误: %v", err) - } - - // 创建数据表 - createTableSQL := `CREATE TABLE IF NOT EXISTS sensor_data ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp DATETIME NOT NULL, - wind_speed INTEGER, - wind_force INTEGER, - wind_direction8 INTEGER, - wind_direction360 INTEGER, - humidity INTEGER, - temperature INTEGER, - noise INTEGER, - pm25 INTEGER, - pm10 INTEGER, - atm_pressure INTEGER, - lux20wh INTEGER, - lux20wl INTEGER, - light20w INTEGER, - optical_rain INTEGER, - compass_angle INTEGER, - solar_radiation INTEGER - );` - - _, err = db.Exec(createTableSQL) - if err != nil { - logger.Fatalf("创建表错误: %v", err) - } -} - -// 保存传感器数据到数据库 -func saveSensorData(data SensorData) { - stmt, err := db.Prepare(`INSERT INTO sensor_data ( - timestamp, wind_speed, wind_force, wind_direction8, wind_direction360, - humidity, temperature, noise, pm25, pm10, atm_pressure, - lux20wh, lux20wl, light20w, optical_rain, compass_angle, solar_radiation - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) - - if err != nil { - logger.Printf("准备SQL语句错误: %v", err) - return - } - defer stmt.Close() - - _, err = stmt.Exec( - data.Timestamp, data.WindSpeed, data.WindForce, data.WindDirection8, data.WindDirection360, - data.Humidity, data.Temperature, data.Noise, data.PM25, data.PM10, data.AtmPressure, - data.Lux20WH, data.Lux20WL, data.Light20W, data.OpticalRain, data.CompassAngle, data.SolarRadiation, - ) - - if err != nil { - logger.Printf("保存数据错误: %v", err) - return - } -} diff --git a/go.mod b/go.mod index b3cf273..381d3e3 100644 --- a/go.mod +++ b/go.mod @@ -1,18 +1,5 @@ -module tcp_server +module go_rain_dtu -go 1.24.2 +go 1.21 -require modernc.org/sqlite v1.37.0 - -require ( - github.com/dustin/go-humanize v1.0.1 // indirect - github.com/google/uuid v1.6.0 // indirect - github.com/mattn/go-isatty v0.0.20 // indirect - github.com/ncruces/go-strftime v0.1.9 // indirect - github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect - golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 // indirect - golang.org/x/sys v0.31.0 // indirect - modernc.org/libc v1.62.1 // indirect - modernc.org/mathutil v1.7.1 // indirect - modernc.org/memory v1.9.1 // indirect -) +require github.com/go-sql-driver/mysql v1.7.1 diff --git a/go.sum b/go.sum index e944b01..fd7ae07 100644 --- a/go.sum +++ b/go.sum @@ -1,47 +1,2 @@ -github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= -github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= -github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= -github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= -github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 h1:nDVHiLt8aIbd/VzvPWN6kSOPE7+F/fNFDSXLVYkE/Iw= -golang.org/x/exp v0.0.0-20250305212735-054e65f0b394/go.mod h1:sIifuuw/Yco/y6yb6+bDNfyeQ/MdPUy/hKEMYQV17cM= -golang.org/x/mod v0.24.0 h1:ZfthKaKaT4NrhGVZHO1/WDTwGES4De8KtWO0SIbNJMU= -golang.org/x/mod v0.24.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= -golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= -golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= -golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/tools v0.31.0 h1:0EedkvKDbh+qistFTd0Bcwe/YLh4vHwWEkiI0toFIBU= -golang.org/x/tools v0.31.0/go.mod h1:naFTU+Cev749tSJRXJlna0T3WxKvb1kWEx15xA4SdmQ= -modernc.org/cc/v4 v4.25.2 h1:T2oH7sZdGvTaie0BRNFbIYsabzCxUQg8nLqCdQ2i0ic= -modernc.org/cc/v4 v4.25.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= -modernc.org/ccgo/v4 v4.25.1 h1:TFSzPrAGmDsdnhT9X2UrcPMI3N/mJ9/X9ykKXwLhDsU= -modernc.org/ccgo/v4 v4.25.1/go.mod h1:njjuAYiPflywOOrm3B7kCB444ONP5pAVr8PIEoE0uDw= -modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE= -modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ= -modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= -modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= -modernc.org/libc v1.62.1 h1:s0+fv5E3FymN8eJVmnk0llBe6rOxCu/DEU+XygRbS8s= -modernc.org/libc v1.62.1/go.mod h1:iXhATfJQLjG3NWy56a6WVU73lWOcdYVxsvwCgoPljuo= -modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= -modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= -modernc.org/memory v1.9.1 h1:V/Z1solwAVmMW1yttq3nDdZPJqV1rM05Ccq6KMSZ34g= -modernc.org/memory v1.9.1/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= -modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= -modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= -modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= -modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= -modernc.org/sqlite v1.37.0 h1:s1TMe7T3Q3ovQiK2Ouz4Jwh7dw4ZDqbebSDTlSJdfjI= -modernc.org/sqlite v1.37.0/go.mod h1:5YiWv+YviqGMuGw4V+PNplcyaJ5v+vQd7TQOgkACoJM= -modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= -modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= -modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= -modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= +github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= +github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= diff --git a/hourly_rainfall.go b/hourly_rainfall.go deleted file mode 100644 index b292b64..0000000 --- a/hourly_rainfall.go +++ /dev/null @@ -1,224 +0,0 @@ -package main - -import ( - "database/sql" - "fmt" - "time" -) - -// 定期更新小时雨量统计 -func startHourlyRainfallUpdater() { - // 确保小时雨量表存在 - ensureHourlyRainfallTable() - - // 更新历史数据 - updateHistoricalHourlyRainfall() - - // 启动定时更新 - go periodicHourlyRainfallUpdate() -} - -// 确保小时雨量表存在 -func ensureHourlyRainfallTable() { - _, err := db.Exec(` - CREATE TABLE IF NOT EXISTS hourly_rainfall ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - hour_start DATETIME NOT NULL, - hour_end DATETIME NOT NULL, - rainfall INTEGER NOT NULL, - min_value INTEGER, - max_value INTEGER, - samples INTEGER, - created_at DATETIME NOT NULL, - UNIQUE(hour_start) - ) - `) - - if err != nil { - logger.Printf("创建小时雨量表错误: %v", err) - } -} - -// 更新历史小时雨量数据(从2025-04-21 18:00:00开始) -func updateHistoricalHourlyRainfall() { - logger.Printf("开始更新历史小时雨量数据...") - - // 查询现有的小时数据 - rows, err := db.Query(` - SELECT - datetime(strftime('%Y-%m-%d %H:00:00', timestamp)) as hour_start, - datetime(strftime('%Y-%m-%d %H:59:59', timestamp)) as hour_end, - MIN(optical_rain) as min_value, - MAX(optical_rain) as max_value, - COUNT(*) as samples, - MAX(optical_rain) - MIN(optical_rain) as rainfall - FROM sensor_data - WHERE timestamp >= '2025-04-21 18:00:00' - GROUP BY strftime('%Y-%m-%d %H', timestamp) - ORDER BY hour_start - `) - - if err != nil { - logger.Printf("查询历史数据错误: %v", err) - return - } - defer rows.Close() - - // 准备插入语句 - stmt, err := db.Prepare(` - INSERT OR IGNORE INTO hourly_rainfall - (hour_start, hour_end, rainfall, min_value, max_value, samples, created_at) - VALUES (?, ?, ?, ?, ?, ?, datetime('now')) - `) - - if err != nil { - logger.Printf("准备插入语句错误: %v", err) - return - } - defer stmt.Close() - - // 插入数据 - count := 0 - for rows.Next() { - var hourStart, hourEnd string - var minValue, maxValue, samples, rainfall int - - err := rows.Scan(&hourStart, &hourEnd, &minValue, &maxValue, &samples, &rainfall) - if err != nil { - logger.Printf("读取行数据错误: %v", err) - continue - } - - _, err = stmt.Exec(hourStart, hourEnd, rainfall, minValue, maxValue, samples) - if err != nil { - logger.Printf("插入数据错误: %v", err) - continue - } - - count++ - } - - logger.Printf("历史小时雨量数据更新完成,共插入 %d 条记录", count) -} - -// 定期更新小时雨量 -func periodicHourlyRainfallUpdate() { - // 计算下一个整点后5分钟的时间(给足够时间收集整点数据) - now := time.Now() - nextHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+1, 5, 0, 0, now.Location()) - delay := nextHour.Sub(now) - - // 首次等待到下一个整点后5分钟 - time.Sleep(delay) - - // 然后每小时更新一次 - ticker := time.NewTicker(time.Hour) - for range ticker.C { - updateLastHourRainfall() - } -} - -// 更新上一个小时的雨量数据 -func updateLastHourRainfall() { - // 计算上一个小时的时间范围 - now := time.Now() - lastHourEnd := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) - lastHourStart := lastHourEnd.Add(-time.Hour) - - // 查询这个小时的数据 - var minValue, maxValue, samples, rainfall sql.NullInt64 - - err := db.QueryRow(` - SELECT - MIN(optical_rain), - MAX(optical_rain), - COUNT(*), - MAX(optical_rain) - MIN(optical_rain) - FROM sensor_data - WHERE timestamp >= ? AND timestamp < ? - `, lastHourStart.Format("2006-01-02 15:04:05"), lastHourEnd.Format("2006-01-02 15:04:05")).Scan( - &minValue, &maxValue, &samples, &rainfall) - - if err != nil { - logger.Printf("查询上一小时数据错误: %v", err) - return - } - - // 检查是否有足够的数据 - if !samples.Valid || samples.Int64 < 2 { - logger.Printf("上一小时 (%s) 数据样本不足,跳过更新", lastHourStart.Format("2006-01-02 15:04")) - return - } - - // 插入或更新小时雨量记录 - _, err = db.Exec(` - INSERT OR REPLACE INTO hourly_rainfall - (hour_start, hour_end, rainfall, min_value, max_value, samples, created_at) - VALUES (?, ?, ?, ?, ?, ?, datetime('now')) - `, - lastHourStart.Format("2006-01-02 15:04:05"), - lastHourEnd.Format("2006-01-02 15:04:05"), - rainfall.Int64, - minValue.Int64, - maxValue.Int64, - samples.Int64) - - if err != nil { - logger.Printf("更新小时雨量记录错误: %v", err) - return - } - - logger.Printf("已更新 %s 小时的雨量数据: %.1fmm (%d 个样本)", - lastHourStart.Format("2006-01-02 15:04"), - float64(rainfall.Int64)/10.0, - samples.Int64) -} - -// 获取最近24小时的降雨数据 -func getRecentHourlyRainfall(hours int) ([]HourlyRainfall, error) { - if hours <= 0 { - hours = 24 - } - - rows, err := db.Query(` - SELECT hour_start, rainfall, samples - FROM hourly_rainfall - ORDER BY hour_start DESC - LIMIT ? - `, hours) - - if err != nil { - return nil, fmt.Errorf("查询小时雨量错误: %v", err) - } - defer rows.Close() - - var result []HourlyRainfall - for rows.Next() { - var hr HourlyRainfall - var hourStart string - - err := rows.Scan(&hourStart, &hr.Rainfall, &hr.Samples) - if err != nil { - return nil, fmt.Errorf("读取小时雨量错误: %v", err) - } - - // 解析时间 - hr.HourStart, _ = time.Parse("2006-01-02 15:04:05", hourStart) - - result = append(result, hr) - } - - // 反转数组,使其按时间顺序排列 - for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 { - result[i], result[j] = result[j], result[i] - } - - return result, nil -} - -// 小时雨量数据结构 -type HourlyRainfall struct { - HourStart time.Time - Rainfall int64 - Samples int64 -} diff --git a/imports.go b/imports.go deleted file mode 100644 index bdedb62..0000000 --- a/imports.go +++ /dev/null @@ -1,5 +0,0 @@ -package main - -import ( - _ "modernc.org/sqlite" -) diff --git a/internal/dao/sensor.go b/internal/dao/sensor.go new file mode 100644 index 0000000..82fbe7d --- /dev/null +++ b/internal/dao/sensor.go @@ -0,0 +1,110 @@ +package dao + +import ( + "database/sql" + "time" + + "go_rain_dtu/internal/model" + "go_rain_dtu/pkg/logger" +) + +type SensorDAO struct { + db *sql.DB +} + +func NewSensorDAO(db *sql.DB) *SensorDAO { + return &SensorDAO{db: db} +} + +// 插入传感器数据 +func (dao *SensorDAO) Insert(data *model.SensorData) error { + query := ` + INSERT INTO sensor_data ( + timestamp, wind_speed, wind_force, wind_direction_8, + wind_direction_360, humidity, temperature, atm_pressure, + solar_radiation, rainfall + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ` + + _, err := dao.db.Exec(query, + data.Timestamp, data.WindSpeed, data.WindForce, data.WindDirection8, + data.WindDirection360, data.Humidity, data.Temperature, data.AtmPressure, + data.SolarRadiation, data.Rainfall, + ) + + if err != nil { + logger.Logger.Printf("插入传感器数据失败: %v", err) + return err + } + + return nil +} + +// 获取聚合数据 +func (dao *SensorDAO) GetAggregatedData(start, end time.Time, interval string) ([]model.AggregatedData, error) { + var query string + switch interval { + case "5min": + query = ` + SELECT + DATE_FORMAT(timestamp, '%Y-%m-%d %H:%i:00') as ts, + ROUND(AVG(temperature)/10, 1) as avg_temp, + MAX(rainfall) - MIN(rainfall) as rainfall, + ROUND(AVG(humidity)/10, 1) as avg_humidity, + ROUND(AVG(wind_speed)/10, 1) as avg_wind_speed + FROM sensor_data + WHERE timestamp BETWEEN ? AND ? + GROUP BY FLOOR(UNIX_TIMESTAMP(timestamp)/(5*60)) + ORDER BY ts DESC + ` + case "30min": + query = ` + SELECT + DATE_FORMAT(timestamp, '%Y-%m-%d %H:%i:00') as ts, + ROUND(AVG(temperature)/10, 1) as avg_temp, + MAX(rainfall) - MIN(rainfall) as rainfall, + ROUND(AVG(humidity)/10, 1) as avg_humidity, + ROUND(AVG(wind_speed)/10, 1) as avg_wind_speed + FROM sensor_data + WHERE timestamp BETWEEN ? AND ? + GROUP BY FLOOR(UNIX_TIMESTAMP(timestamp)/(30*60)) + ORDER BY ts DESC + ` + default: // 1hour + query = ` + SELECT + DATE_FORMAT(timestamp, '%Y-%m-%d %H:00:00') as ts, + ROUND(AVG(temperature)/10, 1) as avg_temp, + MAX(rainfall) - MIN(rainfall) as rainfall, + ROUND(AVG(humidity)/10, 1) as avg_humidity, + ROUND(AVG(wind_speed)/10, 1) as avg_wind_speed + FROM sensor_data + WHERE timestamp BETWEEN ? AND ? + GROUP BY DATE_FORMAT(timestamp, '%Y-%m-%d %H') + ORDER BY ts DESC + ` + } + + rows, err := dao.db.Query(query, start, end) + if err != nil { + logger.Logger.Printf("查询聚合数据失败: %v", err) + return nil, err + } + defer rows.Close() + + var result []model.AggregatedData + for rows.Next() { + var data model.AggregatedData + var tsStr string + err := rows.Scan(&tsStr, &data.AvgTemperature, &data.Rainfall, + &data.AvgHumidity, &data.AvgWindSpeed) + if err != nil { + logger.Logger.Printf("扫描数据行失败: %v", err) + continue + } + data.Timestamp, _ = time.Parse("2006-01-02 15:04:05", tsStr) + result = append(result, data) + } + + return result, nil +} diff --git a/internal/handler/sensor.go b/internal/handler/sensor.go new file mode 100644 index 0000000..5df1edb --- /dev/null +++ b/internal/handler/sensor.go @@ -0,0 +1,58 @@ +package handler + +import ( + "encoding/json" + "net/http" + "time" + + "go_rain_dtu/internal/dao" + "go_rain_dtu/pkg/logger" +) + +type SensorHandler struct { + dao *dao.SensorDAO +} + +func NewSensorHandler(dao *dao.SensorDAO) *SensorHandler { + return &SensorHandler{dao: dao} +} + +func (h *SensorHandler) GetAggregatedData(w http.ResponseWriter, r *http.Request) { + interval := r.URL.Query().Get("interval") + if interval == "" { + interval = "1hour" + } + + endTime := time.Now() + startTime := endTime.Add(-24 * time.Hour) // 默认显示24小时数据 + + if startStr := r.URL.Query().Get("start"); startStr != "" { + if t, err := time.Parse("2006-01-02T15:04", startStr); err == nil { + startTime = t + } + } + if endStr := r.URL.Query().Get("end"); endStr != "" { + if t, err := time.Parse("2006-01-02T15:04", endStr); err == nil { + endTime = t + } + } + + data, err := h.dao.GetAggregatedData(startTime, endTime, interval) + if err != nil { + logger.Logger.Printf("获取聚合数据失败: %v", err) + http.Error(w, "服务器内部错误", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(data) +} + +// 处理静态文件 +func (h *SensorHandler) ServeStatic(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/" { + http.ServeFile(w, r, "static/index.html") + return + } + http.FileServer(http.Dir("static")).ServeHTTP(w, r) +} diff --git a/internal/model/sensor.go b/internal/model/sensor.go new file mode 100644 index 0000000..de3b1fd --- /dev/null +++ b/internal/model/sensor.go @@ -0,0 +1,27 @@ +package model + +import "time" + +// SensorData 传感器数据结构 +type SensorData struct { + ID int64 `json:"id"` + Timestamp time.Time `json:"timestamp"` + WindSpeed int `json:"wind_speed"` // 风速 + WindForce int `json:"wind_force"` // 风力 + WindDirection8 int `json:"wind_direction_8"` // 8方位风向 + WindDirection360 int `json:"wind_direction_360"` // 360度风向 + Humidity int `json:"humidity"` // 湿度 + Temperature int `json:"temperature"` // 温度 + AtmPressure int `json:"atm_pressure"` // 大气压 + SolarRadiation int `json:"solar_radiation"` // 太阳辐射 + Rainfall int `json:"rainfall"` // 累计雨量 +} + +// AggregatedData 聚合数据结构 +type AggregatedData struct { + Timestamp time.Time `json:"timestamp"` + AvgTemperature float64 `json:"avg_temperature"` + Rainfall float64 `json:"rainfall"` + AvgHumidity float64 `json:"avg_humidity"` + AvgWindSpeed float64 `json:"avg_wind_speed"` +} diff --git a/internal/tcp/sensor.go b/internal/tcp/sensor.go new file mode 100644 index 0000000..98a49ae --- /dev/null +++ b/internal/tcp/sensor.go @@ -0,0 +1,210 @@ +package tcp + +import ( + "encoding/binary" + "fmt" + "net" + "sync" + "time" + + "go_rain_dtu/internal/dao" + "go_rain_dtu/internal/model" + "go_rain_dtu/pkg/logger" +) + +const ( + queryCmd = "01 03 01 F4 00 10 04 08" // 读取寄存器命令 + resetCmd = "01 06 60 02 00 5A B6 31" // 清除雨量统计命令 + tcpPort = ":10004" // TCP服务器端口 +) + +type SensorComm struct { + conn net.Conn + dao *dao.SensorDAO + address string + mu sync.Mutex +} + +// 创建新的传感器通信实例 +func NewSensorComm(conn net.Conn, dao *dao.SensorDAO) *SensorComm { + return &SensorComm{ + conn: conn, + dao: dao, + address: conn.RemoteAddr().String(), + } +} + +// 发送查询命令 +func (s *SensorComm) sendQuery() error { + s.mu.Lock() + defer s.mu.Unlock() + + cmd := hexStringToBytes(queryCmd) + _, err := s.conn.Write(cmd) + if err != nil { + logger.Logger.Printf("发送查询命令失败: %v", err) + return err + } + + logger.Logger.Printf("发送查询命令: %X", cmd) + return nil +} + +// 处理接收到的数据 +func (s *SensorComm) handleData(data []byte) *model.SensorData { + if len(data) < 37 { + logger.Logger.Printf("数据长度不足: %d", len(data)) + return nil + } + + // 解析数据,从第4个字节开始是数据部分 + sensorData := &model.SensorData{ + Timestamp: time.Now(), + WindSpeed: int(binary.BigEndian.Uint16(data[3:5])), + WindForce: int(binary.BigEndian.Uint16(data[5:7])), + WindDirection8: int(binary.BigEndian.Uint16(data[7:9])), + WindDirection360: int(binary.BigEndian.Uint16(data[9:11])), + Humidity: int(binary.BigEndian.Uint16(data[11:13])), + Temperature: int(binary.BigEndian.Uint16(data[13:15])), + AtmPressure: int(binary.BigEndian.Uint16(data[21:23])), + SolarRadiation: int(binary.BigEndian.Uint16(data[33:35])), + } + + // 保存数据到数据库 + if err := s.dao.Insert(sensorData); err != nil { + logger.Logger.Printf("保存数据失败: %v", err) + return nil + } + + return sensorData +} + +// 关闭连接 +func (s *SensorComm) Close() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.conn != nil { + s.conn.Close() + s.conn = nil + } +} + +// 启动TCP服务器 +func StartTCPServer(dao *dao.SensorDAO) error { + listener, err := net.Listen("tcp", tcpPort) + if err != nil { + return fmt.Errorf("启动TCP服务器失败: %v", err) + } + defer listener.Close() + + logger.Logger.Printf("TCP服务器启动在端口%s", tcpPort) + + var currentConn *SensorComm + var mu sync.Mutex + + for { + conn, err := listener.Accept() + if err != nil { + logger.Logger.Printf("接受连接失败: %v", err) + continue + } + + mu.Lock() + // 关闭旧连接 + if currentConn != nil { + currentConn.Close() + } + + // 创建新连接 + sensor := NewSensorComm(conn, dao) + currentConn = sensor + mu.Unlock() + + logger.Logger.Printf("新连接建立: %s", conn.RemoteAddr()) + + // 处理连接 + go handleConnection(sensor) + } +} + +// 处理连接 +func handleConnection(sensor *SensorComm) { + defer sensor.Close() + + // 发送首次查询 + if err := sensor.sendQuery(); err != nil { + return + } + + // 设置定时器,每分钟查询一次 + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + // 读取数据的缓冲区 + buffer := make([]byte, 1024) + + for { + // 设置读取超时 + sensor.conn.SetReadDeadline(time.Now().Add(time.Second * 30)) + + n, err := sensor.conn.Read(buffer) + if err != nil { + logger.Logger.Printf("读取数据失败: %v", err) + return + } + + if n > 0 { + logger.Logger.Printf("接收数据: %X", buffer[:n]) + if sensorData := sensor.handleData(buffer[:n]); sensorData != nil { + logger.Logger.Printf("处理数据成功: %+v", sensorData) + } + } + + select { + case <-ticker.C: + if err := sensor.sendQuery(); err != nil { + return + } + default: + } + } +} + +// 辅助函数:将十六进制字符串转换为字节数组 +func hexStringToBytes(s string) []byte { + var bytes []byte + var b byte + var ok bool + + for i := 0; i < len(s); i++ { + if s[i] == ' ' { + continue + } + + if b, ok = hexCharToByte(s[i]); !ok { + continue + } + + if i%2 == 0 { + bytes = append(bytes, b<<4) + } else { + bytes[len(bytes)-1] |= b + } + } + + return bytes +} + +// 辅助函数:将十六进制字符转换为字节 +func hexCharToByte(c byte) (byte, bool) { + switch { + case '0' <= c && c <= '9': + return c - '0', true + case 'a' <= c && c <= 'f': + return c - 'a' + 10, true + case 'A' <= c && c <= 'F': + return c - 'A' + 10, true + } + return 0, false +} diff --git a/main.go b/main.go index d4cb0f0..0fbe42b 100644 --- a/main.go +++ b/main.go @@ -1,175 +1,93 @@ package main import ( - "log" - "net" - "os" - "sync" - "time" + "database/sql" + "fmt" + "net/http" + + _ "github.com/go-sql-driver/mysql" + "go_rain_dtu/internal/dao" + "go_rain_dtu/internal/handler" + "go_rain_dtu/internal/tcp" + "go_rain_dtu/pkg/logger" ) -var ( - logger *log.Logger - logFile *os.File - activeConn net.Conn - activeSensor *SensorComm - activeConnMutex sync.Mutex - clientAddress string +const ( + username = "root" + password = "root" + host = "localhost" + port = "3306" + dbName = "rain_db" ) func main() { - setupLogging() - defer logFile.Close() + // 初始化日志 + if err := logger.InitLogger(); err != nil { + fmt.Printf("初始化日志失败: %v\n", err) + return + } + defer logger.CloseLogger() + logger.StartLogRotation() - initDB() + // 连接数据库 + dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?parseTime=true", + username, password, host, port, dbName) + + db, err := sql.Open("mysql", dsn) + if err != nil { + logger.Logger.Printf("连接数据库失败: %v", err) + return + } defer db.Close() - go startWebServer() - - addr := "0.0.0.0:10004" - listener, err := net.Listen("tcp", addr) - if err != nil { - logger.Fatalf("监听端口错误: %v", err) + // 初始化数据库表 + if err := initDB(db); err != nil { + logger.Logger.Printf("初始化数据库表失败: %v", err) + return } - defer listener.Close() - logger.Printf("服务器已启动: %s", addr) + // 初始化各层 + sensorDAO := dao.NewSensorDAO(db) + sensorHandler := handler.NewSensorHandler(sensorDAO) - for { - conn, err := listener.Accept() - if err != nil { - logger.Printf("接受连接错误: %v", err) - continue - } + // 设置路由 + http.HandleFunc("/api/data", sensorHandler.GetAggregatedData) + http.HandleFunc("/", sensorHandler.ServeStatic) - activeConnMutex.Lock() - // 关闭旧连接 - if activeConn != nil { - oldConn := activeConn - oldSensor := activeSensor - activeConn = nil - activeSensor = nil - activeConnMutex.Unlock() - - if oldSensor != nil { - oldSensor.close() - } else if oldConn != nil { - oldConn.Close() - } - - logger.Printf("关闭旧连接,接入新连接: %s", conn.RemoteAddr().String()) - } else { - activeConnMutex.Unlock() - logger.Printf("新连接: %s", conn.RemoteAddr().String()) - } - - sensorComm := newSensorComm(conn) - - activeConnMutex.Lock() - activeConn = conn - activeSensor = sensorComm - clientAddress = conn.RemoteAddr().String() - activeConnMutex.Unlock() + // 启动TCP服务器 + go startTCPServer(sensorDAO) - go handleConnection(sensorComm) + // 启动HTTP服务器 + logger.Logger.Println("HTTP服务器启动在 :8080") + if err := http.ListenAndServe(":8080", nil); err != nil { + logger.Logger.Printf("HTTP服务器启动失败: %v", err) } } -func handleConnection(sensor *SensorComm) { - defer func() { - activeConnMutex.Lock() - if activeConn == sensor.conn { - activeConn = nil - activeSensor = nil - clientAddress = "" - } - activeConnMutex.Unlock() - - sensor.close() - }() +func initDB(db *sql.DB) error { + query := ` + CREATE TABLE IF NOT EXISTS sensor_data ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + timestamp DATETIME NOT NULL, + wind_speed INT, + wind_force INT, + wind_direction_8 INT, + wind_direction_360 INT, + humidity INT, + temperature INT, + atm_pressure INT, + solar_radiation INT, + rainfall INT, + INDEX idx_timestamp (timestamp) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + ` - logger.Printf("处理连接: %s", sensor.address) - - nextQuery := getNextQueryTime() - nextReset := getNextHourTime() - - var nextEvent time.Time - isQueryEvent := true - - if nextQuery.Before(nextReset) { - nextEvent = nextQuery - isQueryEvent = true - } else { - nextEvent = nextReset - isQueryEvent = false - } - - timer := time.NewTimer(time.Until(nextEvent)) - sensor.sendQuery() - - buffer := make([]byte, 1024) - done := make(chan bool) - - go func() { - for { - n, err := sensor.conn.Read(buffer) - if err != nil { - logger.Printf("客户端断开: %v", err) - done <- true - return - } - - if n >= 37 { - sensorData := sensor.handleData(buffer[:n]) - if sensorData != nil { - saveSensorData(*sensorData) - logSensorData(*sensorData) - } - } - - logger.Printf("接收数据长度: %d", n) - logger.Printf("Raw Data: %x\n",buffer[:n]) - } - }() - - for { - select { - case <-timer.C: - if isQueryEvent { - sensor.sendQuery() - nextQuery = getNextQueryTime() - } else { - sensor.resetHourly() - nextReset = getNextHourTime() - } - - if nextQuery.Before(nextReset) { - nextEvent = nextQuery - isQueryEvent = true - } else { - nextEvent = nextReset - isQueryEvent = false - } - - timer.Reset(time.Until(nextEvent)) - - case <-done: - return - } - } + _, err := db.Exec(query) + return err } -func setupLogging() { - if err := os.MkdirAll("logs", 0755); err != nil { - log.Fatalf("创建日志目录失败: %v", err) +func startTCPServer(dao *dao.SensorDAO) { + if err := tcp.StartTCPServer(dao); err != nil { + logger.Logger.Printf("TCP服务器启动失败: %v", err) } - - logFileName := "logs/sensor_" + time.Now().Format("2006-01-02") + ".log" - var err error - logFile, err = os.OpenFile(logFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - log.Fatalf("打开日志文件失败: %v", err) - } - - logger = log.New(logFile, "", log.LstdFlags) } diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go new file mode 100644 index 0000000..3a37ff1 --- /dev/null +++ b/pkg/logger/logger.go @@ -0,0 +1,68 @@ +package logger + +import ( + "fmt" + "io" + "log" + "os" + "path/filepath" + "time" +) + +var ( + logFile *os.File + Logger *log.Logger +) + +// 初始化日志系统 +func InitLogger() error { + logsDir := "logs" + if err := os.MkdirAll(logsDir, 0755); err != nil { + return fmt.Errorf("创建日志目录失败: %v", err) + } + + today := time.Now().Format("2006-01-02") + logFilePath := filepath.Join(logsDir, fmt.Sprintf("server_%s.log", today)) + + file, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("打开日志文件失败: %v", err) + } + + logFile = file + multiWriter := io.MultiWriter(os.Stdout, file) + Logger = log.New(multiWriter, "", log.Ldate|log.Ltime|log.Lshortfile) + + log.SetOutput(multiWriter) + log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) + + Logger.Println("日志系统初始化完成") + return nil +} + +// 关闭日志文件 +func CloseLogger() { + if logFile != nil { + logFile.Close() + } +} + +// 日志轮转,每天创建新的日志文件 +func StartLogRotation() { + go func() { + for { + now := time.Now() + next := now.Add(24 * time.Hour) + next = time.Date(next.Year(), next.Month(), next.Day(), 0, 0, 0, 0, next.Location()) + duration := next.Sub(now) + + time.Sleep(duration) + + Logger.Println("开始日志轮转...") + CloseLogger() + if err := InitLogger(); err != nil { + log.Printf("日志轮转失败: %v", err) + } + } + }() +} diff --git a/sensor.db b/sensor.db deleted file mode 100644 index 5b44e59..0000000 Binary files a/sensor.db and /dev/null differ diff --git a/sensor_comm.go b/sensor_comm.go deleted file mode 100644 index 8e1a622..0000000 --- a/sensor_comm.go +++ /dev/null @@ -1,140 +0,0 @@ -package main - -import ( - "encoding/hex" - "net" - "time" -) - -// 传感器通信相关结构和函数 -type SensorComm struct { - conn net.Conn - address string - lastQueryTime time.Time - lastResetTime time.Time - querySuccess bool - responseRecv chan bool - queryCmd []byte - resetCmd []byte -} - -// 创建新的传感器通信实例 -func newSensorComm(conn net.Conn) *SensorComm { - sc := &SensorComm{ - conn: conn, - address: conn.RemoteAddr().String(), - lastQueryTime: time.Time{}, - lastResetTime: time.Time{}, - querySuccess: false, - responseRecv: make(chan bool, 1), // 带缓冲的通道,避免阻塞 - queryCmd: prepareQueryCommand(), - resetCmd: prepareResetCommand(), - } - return sc -} - -// 准备查询命令 -func prepareQueryCommand() []byte { - queryHexData := "01 03 01 F4 00 10 04 08" - queryHexData = removeSpaces(queryHexData) - queryData, err := hex.DecodeString(queryHexData) - if err != nil { - logger.Printf("解析查询命令错误: %v", err) - return nil - } - return queryData -} - -// 准备重置命令 -func prepareResetCommand() []byte { - resetHexData := "01 06 60 02 00 5A B6 31" - resetHexData = removeSpaces(resetHexData) - resetData, err := hex.DecodeString(resetHexData) - if err != nil { - logger.Printf("解析重置命令错误: %v", err) - return nil - } - return resetData -} - -// 发送查询命令 -func (sc *SensorComm) sendQuery() bool { - sc.querySuccess = false - _, err := sc.conn.Write(sc.queryCmd) - if err != nil { - logger.Printf("发送查询命令错误: %v", err) - return false - } - - sc.lastQueryTime = time.Now() - logger.Printf("发送查询命令: %s", time.Now().Format("15:04:05")) - return true -} - -// 发送整点重置命令,在重置前先查询保存数据 -func (sc *SensorComm) resetHourly() bool { - if time.Since(sc.lastResetTime) < 5*time.Minute { - logger.Printf("最近5分钟内已重置过雨量,跳过本次重置") - return false - } - - sc.querySuccess = false - _, err := sc.conn.Write(sc.queryCmd) - if err != nil { - logger.Printf("重置前查询命令错误: %v", err) - // 继续执行重置,不因查询失败而中断重置 - } else { - logger.Printf("重置前发送查询命令: %s", time.Now().Format("15:04:05")) - - // 等待一小段时间确保查询完成并且数据被处理 - time.Sleep(2000 * time.Millisecond) - } - - // 发送重置命令 - _, err = sc.conn.Write(sc.resetCmd) - if err != nil { - logger.Printf("发送重置命令错误: %v", err) - return false - } - - logger.Printf("发送雨量重置命令: %s", time.Now().Format("15:04:05")) - sc.lastResetTime = time.Now() - return true -} - -// 处理接收到的数据 -func (sc *SensorComm) handleData(data []byte) *SensorData { - sensorData := parseData(data) - if sensorData != nil { - sc.querySuccess = true - - // 通知已收到响应 - select { - case sc.responseRecv <- true: - default: - // 通道已满或无接收者,这里不阻塞 - } - - return sensorData - } - return nil -} - -// 关闭连接 -func (sc *SensorComm) close() { - if sc.conn != nil { - sc.conn.Close() - } - close(sc.responseRecv) -} - -// 移除字符串中的空格 -func removeSpaces(s string) string { - var result []rune - for _, r := range s { - if r != ' ' { - result = append(result, r) - } - } - return string(result) -} diff --git a/static/index.html b/static/index.html new file mode 100644 index 0000000..6d0f734 --- /dev/null +++ b/static/index.html @@ -0,0 +1,322 @@ + + +
+ + +| 时间 | +降雨量(mm) | +平均温度(℃) | +平均湿度(%) | +平均风速(m/s) | +
|---|
- 总降雨量: {{printf "%.1f" $sum}} mm
- 最大小时降雨量: {{printf "%.1f" $max}} mm ({{$maxTime}})
- 统计时段: {{len .HourlyData}} 小时
-
暂无降雨数据
- {{end}} -| 时间 | -降雨量(mm) | -数据采样数 | -雨量图示 | -
|---|---|---|---|
| {{formatTime .HourStart}} | -{{printf "%.1f" (divInt64 .Rainfall 10)}} | -{{.Samples}} | -- - | -
所选时间段内暂无降雨记录。
- {{end}} -- 连接状态: {{.ConnStatus}} | - 客户端地址: {{.ClientAddr}} -
-- 温度: {{printf "%.1f" (div .Latest.Temperature 10)}}°C | - 湿度: {{printf "%.1f" (div .Latest.Humidity 10)}}% | - 风速: {{printf "%.2f" (div .Latest.WindSpeed 100)}}m/s | - 风向: {{.Latest.WindDirection360}}° | - 风力: {{.Latest.WindForce}}级 -
-- 雨量: {{printf "%.1f" (div .Latest.OpticalRain 10)}}mm | - 大气压: {{printf "%.1f" (div .Latest.AtmPressure 10)}}kPa | - 太阳辐射: {{.Latest.SolarRadiation}}W/m² -
-| 时间 | -温度(°C) | -湿度(%) | -风速(m/s) | -风向(°) | -风力(级) | -雨量(mm) | -大气压(kPa) | -太阳辐射(W/m²) | -
|---|---|---|---|---|---|---|---|---|
| {{.Timestamp.Format "2006-01-02 15:04:05"}} | -{{printf "%.1f" (div .Temperature 10)}} | -{{printf "%.1f" (div .Humidity 10)}} | -{{printf "%.2f" (div .WindSpeed 100)}} | -{{.WindDirection360}} | -{{.WindForce}} | -{{printf "%.1f" (div .OpticalRain 10)}} | -{{printf "%.1f" (div .AtmPressure 10)}} | -{{.SolarRadiation}} | -